chore(processor): update input output of main 3 processors for better semantics (#1942)

* chore(processor): update input output of main 3 processors for better semantics

* refactor(processor): replace Any with RobotObservation for improved type safety in processors

* fix(processors): no PolicyObservation

* chore(processor): update with RobotObservation

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: AdilZouitine <adilzouitinegm@gmail.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Steven Palma
2025-09-15 20:16:43 +02:00
committed by GitHub
parent 99213daa3e
commit 03891f66da
7 changed files with 69 additions and 56 deletions
+7 -7
View File
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.configs.types import FeatureType, PolicyFeature
@@ -25,14 +24,15 @@ from lerobot.model.kinematics import RobotKinematics
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import (
EnvTransition,
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_teleop_action_processor,
)
from lerobot.processor.converters import (
identity_transition,
observation_to_transition,
robot_action_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.record import record_loop
@@ -74,7 +74,7 @@ kinematics_solver = RobotKinematics(
)
# Build pipeline to convert ee pose action to joint action
robot_ee_to_joints_processor = RobotProcessorPipeline[EnvTransition, RobotAction](
robot_ee_to_joints_processor = RobotProcessorPipeline[RobotAction, RobotAction](
steps=[
AddRobotObservationAsComplimentaryData(robot=robot),
InverseKinematicsEEToJoints(
@@ -83,17 +83,17 @@ robot_ee_to_joints_processor = RobotProcessorPipeline[EnvTransition, RobotAction
initial_guess_current_joints=True,
),
],
to_transition=identity_transition,
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert joint observation to ee pose observation
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[dict[str, Any], EnvTransition](
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()))
],
to_transition=observation_to_transition,
to_output=identity_transition,
to_output=transition_to_observation,
)
+8 -9
View File
@@ -14,18 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import EnvTransition, RobotAction, RobotProcessorPipeline
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
identity_transition,
observation_to_transition,
robot_action_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.record import record_loop
@@ -75,7 +74,7 @@ kinematics_solver = RobotKinematics(
)
# Build pipeline to convert phone action to ee pose action
phone_to_robot_ee_pose_processor = RobotProcessorPipeline[RobotAction, EnvTransition](
phone_to_robot_ee_pose_processor = RobotProcessorPipeline[RobotAction, RobotAction](
steps=[
MapPhoneActionToRobotAction(platform=teleop_config.phone_os),
AddRobotObservationAsComplimentaryData(robot=robot),
@@ -92,11 +91,11 @@ phone_to_robot_ee_pose_processor = RobotProcessorPipeline[RobotAction, EnvTransi
GripperVelocityToJoint(),
],
to_transition=robot_action_to_transition,
to_output=identity_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert ee pose action to joint action
robot_ee_to_joints_processor = RobotProcessorPipeline[EnvTransition, RobotAction](
robot_ee_to_joints_processor = RobotProcessorPipeline[RobotAction, RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
@@ -104,17 +103,17 @@ robot_ee_to_joints_processor = RobotProcessorPipeline[EnvTransition, RobotAction
initial_guess_current_joints=True,
),
],
to_transition=identity_transition,
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert joint observation to ee pose observation
robot_joints_to_ee_pose = RobotProcessorPipeline[dict[str, Any], EnvTransition](
robot_joints_to_ee_pose = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()))
],
to_transition=observation_to_transition,
to_output=identity_transition,
to_output=transition_to_observation,
)
# Create the dataset