diff --git a/examples/so100_to_so100_EE/record.py b/examples/so100_to_so100_EE/record.py index d85a1c5cc..9d40dda7d 100644 --- a/examples/so100_to_so100_EE/record.py +++ b/examples/so100_to_so100_EE/record.py @@ -17,25 +17,20 @@ from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features -from lerobot.datasets.utils import combine_feature_dicts -from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline -from lerobot.processor.converters import ( - observation_to_transition, - robot_action_observation_to_transition, - transition_to_observation, - transition_to_robot_action, -) from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig -from lerobot.robots.so_follower.robot_kinematic_processor import ( - EEBoundsAndSafety, - ForwardKinematicsJointsToEE, - InverseKinematicsEEToJoints, +from lerobot.robots.so_follower.pipelines import ( + make_so10x_fk_observation_pipeline, + make_so10x_ik_action_pipeline, ) from lerobot.scripts.lerobot_record import record_loop from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig +from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline from lerobot.utils.control_utils import init_keyboard_listener +from lerobot.utils.pipeline_utils import ( + build_dataset_features, + check_action_space_compatibility, + check_observation_space_compatibility, +) from lerobot.utils.utils import log_say from lerobot.utils.visualization_utils import init_rerun @@ -46,6 +41,10 @@ RESET_TIME_SEC = 30 TASK_DESCRIPTION = "My task description" HF_REPO_ID = "/" +# NOTE: Use the URDF from the SO-ARM100 repo: +# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +URDF_PATH = "./SO101/so101_new_calib.urdf" + def main(): # Create the robot and teleoperator configurations @@ -62,77 +61,17 @@ def main(): follower = SO100Follower(follower_config) leader = SO100Leader(leader_config) - # NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf - follower_kinematics_solver = RobotKinematics( - urdf_path="./SO101/so101_new_calib.urdf", - target_frame_name="gripper_frame_link", - joint_names=list(follower.bus.motors.keys()), - ) + # Attach EE-space pipelines to the objects + motor_names = list(follower.bus.motors.keys()) + follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names)) + follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names)) + leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, list(leader.bus.motors.keys()))) - # NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf - leader_kinematics_solver = RobotKinematics( - urdf_path="./SO101/so101_new_calib.urdf", - target_frame_name="gripper_frame_link", - joint_names=list(leader.bus.motors.keys()), - ) - - # Build pipeline to convert follower joints to EE observation - follower_joints_to_ee = RobotProcessorPipeline[RobotObservation, RobotObservation]( - steps=[ - ForwardKinematicsJointsToEE( - kinematics=follower_kinematics_solver, motor_names=list(follower.bus.motors.keys()) - ), - ], - to_transition=observation_to_transition, - to_output=transition_to_observation, - ) - - # Build pipeline to convert leader joints to EE action - leader_joints_to_ee = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( - steps=[ - ForwardKinematicsJointsToEE( - kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys()) - ), - ], - to_transition=robot_action_observation_to_transition, - to_output=transition_to_robot_action, - ) - - # Build pipeline to convert EE action to follower joints - ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( - [ - EEBoundsAndSafety( - end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}, - max_ee_step_m=0.10, - ), - InverseKinematicsEEToJoints( - kinematics=follower_kinematics_solver, - motor_names=list(follower.bus.motors.keys()), - initial_guess_current_joints=True, - ), - ], - to_transition=robot_action_observation_to_transition, - to_output=transition_to_robot_action, - ) - - # Create the dataset + # Dataset features are derived automatically from robot/teleop pipelines dataset = LeRobotDataset.create( repo_id=HF_REPO_ID, fps=FPS, - features=combine_feature_dicts( - # Run the feature contract of the pipelines - # This tells you how the features would look like after the pipeline steps - aggregate_pipeline_dataset_features( - pipeline=leader_joints_to_ee, - initial_features=create_initial_features(action=leader.action_features), - use_videos=True, - ), - aggregate_pipeline_dataset_features( - pipeline=follower_joints_to_ee, - initial_features=create_initial_features(observation=follower.observation_features), - use_videos=True, - ), - ), + features=build_dataset_features(follower, leader, use_videos=True), robot_type=follower.name, use_videos=True, image_writer_threads=4, @@ -142,9 +81,13 @@ def main(): leader.connect() follower.connect() + # Verify action/observation space alignment (warns on mismatch) + check_action_space_compatibility(leader, follower) + check_observation_space_compatibility(follower, leader) + # Initialize the keyboard listener and rerun visualization listener, events = init_keyboard_listener() - init_rerun(session_name="recording_phone") + init_rerun(session_name="recording_ee") try: if not leader.is_connected or not follower.is_connected: @@ -155,7 +98,8 @@ def main(): while episode_idx < NUM_EPISODES and not events["stop_recording"]: log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}") - # Main record loop + # Pipelines applied automatically inside robot.get_observation(), + # teleop.get_action(), and robot.send_action() record_loop( robot=follower, events=events, @@ -165,9 +109,6 @@ def main(): control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=leader_joints_to_ee, - robot_action_processor=ee_to_follower_joints, - robot_observation_processor=follower_joints_to_ee, ) # Reset the environment if not stopping or re-recording @@ -183,9 +124,6 @@ def main(): control_time_s=RESET_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=leader_joints_to_ee, - robot_action_processor=ee_to_follower_joints, - robot_observation_processor=follower_joints_to_ee, ) if events["rerecord_episode"]: diff --git a/examples/so100_to_so100_EE/teleoperate.py b/examples/so100_to_so100_EE/teleoperate.py index 71d2899de..d60240ca3 100644 --- a/examples/so100_to_so100_EE/teleoperate.py +++ b/examples/so100_to_so100_EE/teleoperate.py @@ -14,27 +14,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time - -from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline -from lerobot.processor.converters import ( - robot_action_observation_to_transition, - robot_action_to_transition, - transition_to_robot_action, -) from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig -from lerobot.robots.so_follower.robot_kinematic_processor import ( - EEBoundsAndSafety, - ForwardKinematicsJointsToEE, - InverseKinematicsEEToJoints, +from lerobot.robots.so_follower.pipelines import ( + make_so10x_fk_observation_pipeline, + make_so10x_ik_action_pipeline, ) +from lerobot.scripts.lerobot_teleoperate import teleop_loop from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig -from lerobot.utils.robot_utils import precise_sleep -from lerobot.utils.visualization_utils import init_rerun, log_rerun_data +from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline +from lerobot.utils.pipeline_utils import check_action_space_compatibility +from lerobot.utils.visualization_utils import init_rerun FPS = 30 +# NOTE: Use the URDF from the SO-ARM100 repo: +# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +URDF_PATH = "./SO101/so101_new_calib.urdf" + def main(): # Initialize the robot and teleoperator config @@ -47,47 +43,14 @@ def main(): follower = SO100Follower(follower_config) leader = SO100Leader(leader_config) - # NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf - follower_kinematics_solver = RobotKinematics( - urdf_path="./SO101/so101_new_calib.urdf", - target_frame_name="gripper_frame_link", - joint_names=list(follower.bus.motors.keys()), - ) + # Attach EE-space pipelines to the objects + motor_names = list(follower.bus.motors.keys()) + follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names)) + follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names)) + leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, list(leader.bus.motors.keys()))) - # NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf - leader_kinematics_solver = RobotKinematics( - urdf_path="./SO101/so101_new_calib.urdf", - target_frame_name="gripper_frame_link", - joint_names=list(leader.bus.motors.keys()), - ) - - # Build pipeline to convert teleop joints to EE action - leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction]( - steps=[ - ForwardKinematicsJointsToEE( - kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys()) - ), - ], - to_transition=robot_action_to_transition, - to_output=transition_to_robot_action, - ) - - # build pipeline to convert EE action to robot joints - ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( - [ - EEBoundsAndSafety( - end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}, - max_ee_step_m=0.10, - ), - InverseKinematicsEEToJoints( - kinematics=follower_kinematics_solver, - motor_names=list(follower.bus.motors.keys()), - initial_guess_current_joints=False, - ), - ], - to_transition=robot_action_observation_to_transition, - to_output=transition_to_robot_action, - ) + # Verify action space alignment (warns if leader EE ≠ follower action_features) + check_action_space_compatibility(leader, follower) # Connect to the robot and teleoperator follower.connect() @@ -97,28 +60,12 @@ def main(): init_rerun(session_name="so100_so100_EE_teleop") print("Starting teleop loop...") - while True: - t0 = time.perf_counter() - - # Get robot observation - robot_obs = follower.get_observation() - - # Get teleop observation - leader_joints_obs = leader.get_action() - - # teleop joints -> teleop EE action - leader_ee_act = leader_to_ee(leader_joints_obs) - - # teleop EE -> robot joints - follower_joints_act = ee_to_follower_joints((leader_ee_act, robot_obs)) - - # Send action to robot - _ = follower.send_action(follower_joints_act) - - # Visualize - log_rerun_data(observation=leader_ee_act, action=follower_joints_act) - - precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0)) + try: + # Pipelines applied automatically inside teleop.get_action() and robot.send_action() + teleop_loop(teleop=leader, robot=follower, fps=FPS, display_data=True) + finally: + follower.disconnect() + leader.disconnect() if __name__ == "__main__": diff --git a/src/lerobot/datasets/pipeline_features.py b/src/lerobot/datasets/pipeline_features.py index 161633f26..9aa815732 100644 --- a/src/lerobot/datasets/pipeline_features.py +++ b/src/lerobot/datasets/pipeline_features.py @@ -12,15 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import re from collections.abc import Sequence -from typing import Any +from typing import TYPE_CHECKING, Any from lerobot.configs.types import PipelineFeatureType from lerobot.datasets.utils import hw_to_dataset_features -from lerobot.processor import DataProcessorPipeline, RobotAction, RobotObservation from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR +if TYPE_CHECKING: + from lerobot.processor import DataProcessorPipeline, RobotAction, RobotObservation + def create_initial_features( action: RobotAction | None = None, observation: RobotObservation | None = None diff --git a/src/lerobot/processor/factory.py b/src/lerobot/processor/factory.py index 5a0c41072..ffac4cb35 100644 --- a/src/lerobot/processor/factory.py +++ b/src/lerobot/processor/factory.py @@ -17,6 +17,7 @@ from .converters import ( observation_to_transition, robot_action_observation_to_transition, + robot_action_to_transition, transition_to_observation, transition_to_robot_action, ) @@ -24,6 +25,50 @@ from .core import RobotAction, RobotObservation from .pipeline import IdentityProcessorStep, RobotProcessorPipeline +# ── Internal identity pipeline helpers (used by Robot/Teleoperator base classes) ────────────────── + + +def _make_identity_observation_pipeline() -> RobotProcessorPipeline[RobotObservation, RobotObservation]: + """Identity pipeline for robot observations (get_observation output pipeline).""" + return RobotProcessorPipeline[RobotObservation, RobotObservation]( + steps=[IdentityProcessorStep()], + to_transition=observation_to_transition, + to_output=transition_to_observation, + ) + + +def _make_identity_robot_action_pipeline() -> RobotProcessorPipeline[ + tuple[RobotAction, RobotObservation], RobotAction +]: + """Identity pipeline for robot action input (send_action input pipeline, takes (action, obs) tuple).""" + return RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( + steps=[IdentityProcessorStep()], + to_transition=robot_action_observation_to_transition, + to_output=transition_to_robot_action, + ) + + +def _make_identity_teleop_action_pipeline() -> RobotProcessorPipeline[RobotAction, RobotAction]: + """Identity pipeline for teleop action output (get_action output pipeline, takes just action).""" + return RobotProcessorPipeline[RobotAction, RobotAction]( + steps=[IdentityProcessorStep()], + to_transition=robot_action_to_transition, + to_output=transition_to_robot_action, + ) + + +def _make_identity_feedback_pipeline() -> RobotProcessorPipeline[dict, dict]: + """Identity pipeline for teleop feedback input (send_feedback input pipeline).""" + return RobotProcessorPipeline[dict, dict]( + steps=[IdentityProcessorStep()], + to_transition=observation_to_transition, + to_output=transition_to_observation, + ) + + +# ── Public factory functions (kept for backward compatibility and gym/HIL paths) ────────────────── + + def make_default_teleop_action_processor() -> RobotProcessorPipeline[ tuple[RobotAction, RobotObservation], RobotAction ]: diff --git a/src/lerobot/processor/normalize_processor.py b/src/lerobot/processor/normalize_processor.py index 4769b91ac..df88a79df 100644 --- a/src/lerobot/processor/normalize_processor.py +++ b/src/lerobot/processor/normalize_processor.py @@ -19,15 +19,17 @@ from __future__ import annotations from copy import deepcopy from dataclasses import dataclass, field -from typing import Any +from typing import TYPE_CHECKING, Any import torch from torch import Tensor from lerobot.configs.types import FeatureType, NormalizationMode, PipelineFeatureType, PolicyFeature -from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.utils.constants import ACTION +if TYPE_CHECKING: + from lerobot.datasets.lerobot_dataset import LeRobotDataset + from .converters import from_tensor_to_numpy, to_tensor from .core import EnvTransition, PolicyAction, TransitionKey from .pipeline import PolicyProcessorPipeline, ProcessorStep, ProcessorStepRegistry, RobotObservation diff --git a/src/lerobot/processor/tokenizer_processor.py b/src/lerobot/processor/tokenizer_processor.py index df559555a..8e51290ec 100644 --- a/src/lerobot/processor/tokenizer_processor.py +++ b/src/lerobot/processor/tokenizer_processor.py @@ -43,12 +43,9 @@ from lerobot.utils.import_utils import _transformers_available from .core import EnvTransition, RobotObservation, TransitionKey from .pipeline import ActionProcessorStep, ObservationProcessorStep, ProcessorStepRegistry -# Conditional import for type checking and lazy loading -if TYPE_CHECKING or _transformers_available: +# Type-checking only import — do NOT import transformers at module level (it loads TF which blocks) +if TYPE_CHECKING: from transformers import AutoProcessor, AutoTokenizer -else: - AutoProcessor = None - AutoTokenizer = None @dataclass @@ -106,8 +103,7 @@ class TokenizerProcessorStep(ObservationProcessorStep): # Use provided tokenizer object directly self.input_tokenizer = self.tokenizer elif self.tokenizer_name is not None: - if AutoTokenizer is None: - raise ImportError("AutoTokenizer is not available") + from transformers import AutoTokenizer # lazy import to avoid TF deadlock at module load self.input_tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_name) else: raise ValueError( @@ -370,12 +366,12 @@ class ActionTokenizerProcessorStep(ActionProcessorStep): "Please install it with `pip install 'lerobot[transformers-dep]'` to use ActionTokenizerProcessorStep." ) + from transformers import AutoProcessor, AutoTokenizer # lazy import to avoid TF deadlock at module load + if self.action_tokenizer_input_object is not None: self.action_tokenizer = self.action_tokenizer_input_object elif self.action_tokenizer_name is not None: - if AutoProcessor is None: - raise ImportError("AutoProcessor is not available") self.action_tokenizer = AutoProcessor.from_pretrained( self.action_tokenizer_name, trust_remote_code=self.trust_remote_code ) diff --git a/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py b/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py index 2e3885e67..ba0ae4663 100644 --- a/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py +++ b/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py @@ -102,7 +102,7 @@ class BiOpenArmFollower(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property @@ -136,7 +136,7 @@ class BiOpenArmFollower(Robot): ) @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: obs_dict = {} # Add "left_" prefix @@ -150,7 +150,7 @@ class BiOpenArmFollower(Robot): return obs_dict @check_if_not_connected - def send_action( + def _send_action( self, action: RobotAction, custom_kp: dict[str, float] | None = None, diff --git a/src/lerobot/robots/bi_so_follower/bi_so_follower.py b/src/lerobot/robots/bi_so_follower/bi_so_follower.py index 28c58b898..56cc2f831 100644 --- a/src/lerobot/robots/bi_so_follower/bi_so_follower.py +++ b/src/lerobot/robots/bi_so_follower/bi_so_follower.py @@ -86,7 +86,7 @@ class BiSOFollower(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property @@ -119,7 +119,7 @@ class BiSOFollower(Robot): self.right_arm.setup_motors() @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: obs_dict = {} # Add "left_" prefix @@ -133,7 +133,7 @@ class BiSOFollower(Robot): return obs_dict @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: # Remove "left_" prefix left_action = { key.removeprefix("left_"): value for key, value in action.items() if key.startswith("left_") diff --git a/src/lerobot/robots/earthrover_mini_plus/robot_earthrover_mini_plus.py b/src/lerobot/robots/earthrover_mini_plus/robot_earthrover_mini_plus.py index cdf6efde1..fd0bdf143 100644 --- a/src/lerobot/robots/earthrover_mini_plus/robot_earthrover_mini_plus.py +++ b/src/lerobot/robots/earthrover_mini_plus/robot_earthrover_mini_plus.py @@ -147,7 +147,7 @@ class EarthRoverMiniPlus(Robot): pass @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: """Define the observation space for dataset recording. Returns: @@ -198,7 +198,7 @@ class EarthRoverMiniPlus(Robot): } @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: """Get current robot observation from SDK. Returns: @@ -255,7 +255,7 @@ class EarthRoverMiniPlus(Robot): return observation @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: """Send action to robot via SDK. Args: diff --git a/src/lerobot/robots/hope_jr/hope_jr_arm.py b/src/lerobot/robots/hope_jr/hope_jr_arm.py index e8269ae46..d56c8c652 100644 --- a/src/lerobot/robots/hope_jr/hope_jr_arm.py +++ b/src/lerobot/robots/hope_jr/hope_jr_arm.py @@ -71,7 +71,7 @@ class HopeJrArm(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property @@ -128,7 +128,7 @@ class HopeJrArm(Robot): print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: # Read arm position start = time.perf_counter() obs_dict = self.bus.sync_read("Present_Position", self.other_motors) @@ -147,7 +147,7 @@ class HopeJrArm(Robot): return obs_dict @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} # Cap goal position when too far away from present position. diff --git a/src/lerobot/robots/hope_jr/hope_jr_hand.py b/src/lerobot/robots/hope_jr/hope_jr_hand.py index a05c4bbcb..2a1485754 100644 --- a/src/lerobot/robots/hope_jr/hope_jr_hand.py +++ b/src/lerobot/robots/hope_jr/hope_jr_hand.py @@ -107,7 +107,7 @@ class HopeJrHand(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property @@ -158,7 +158,7 @@ class HopeJrHand(Robot): print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: obs_dict = {} # Read hand position @@ -178,7 +178,7 @@ class HopeJrHand(Robot): return obs_dict @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} self.bus.sync_write("Goal_Position", goal_pos) return action diff --git a/src/lerobot/robots/koch_follower/koch_follower.py b/src/lerobot/robots/koch_follower/koch_follower.py index 53a32beed..453016e45 100644 --- a/src/lerobot/robots/koch_follower/koch_follower.py +++ b/src/lerobot/robots/koch_follower/koch_follower.py @@ -73,7 +73,7 @@ class KochFollower(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property @@ -182,7 +182,7 @@ class KochFollower(Robot): print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: # Read arm position start = time.perf_counter() obs_dict = self.bus.sync_read("Present_Position") @@ -200,7 +200,7 @@ class KochFollower(Robot): return obs_dict @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: """Command arm to move to a target joint configuration. The relative action magnitude may be clipped depending on the configuration parameter diff --git a/src/lerobot/robots/lekiwi/lekiwi.py b/src/lerobot/robots/lekiwi/lekiwi.py index 9d11a000f..39444a05b 100644 --- a/src/lerobot/robots/lekiwi/lekiwi.py +++ b/src/lerobot/robots/lekiwi/lekiwi.py @@ -98,7 +98,7 @@ class LeKiwi(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._state_ft, **self._cameras_ft} @cached_property @@ -338,7 +338,7 @@ class LeKiwi(Robot): } # m/s and deg/s @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: # Read actuators position for arm and vel for base start = time.perf_counter() arm_pos = self.bus.sync_read("Present_Position", self.arm_motors) @@ -367,7 +367,7 @@ class LeKiwi(Robot): return obs_dict @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: """Command lekiwi to move to a target joint configuration. The relative action magnitude may be clipped depending on the configuration parameter diff --git a/src/lerobot/robots/lekiwi/lekiwi_client.py b/src/lerobot/robots/lekiwi/lekiwi_client.py index 1d5ea64a6..c2055963c 100644 --- a/src/lerobot/robots/lekiwi/lekiwi_client.py +++ b/src/lerobot/robots/lekiwi/lekiwi_client.py @@ -98,7 +98,7 @@ class LeKiwiClient(Robot): return {name: (cfg.height, cfg.width, 3) for name, cfg in self.config.cameras.items()} @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._state_ft, **self._cameras_ft} @cached_property @@ -250,7 +250,7 @@ class LeKiwiClient(Robot): return new_frames, new_state @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: """ Capture observations from the remote robot: current follower arm positions, present wheel speeds (converted to body-frame velocities: x, y, theta), @@ -304,7 +304,7 @@ class LeKiwiClient(Robot): pass @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: """Command lekiwi to move to a target joint configuration. Translates to motor space + sends over ZMQ Args: diff --git a/src/lerobot/robots/omx_follower/omx_follower.py b/src/lerobot/robots/omx_follower/omx_follower.py index e0b612c60..f8030ef46 100644 --- a/src/lerobot/robots/omx_follower/omx_follower.py +++ b/src/lerobot/robots/omx_follower/omx_follower.py @@ -73,7 +73,7 @@ class OmxFollower(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property @@ -165,7 +165,7 @@ class OmxFollower(Robot): print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: # Read arm position start = time.perf_counter() obs_dict = self.bus.sync_read("Present_Position") @@ -183,7 +183,7 @@ class OmxFollower(Robot): return obs_dict @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: """Command arm to move to a target joint configuration. The relative action magnitude may be clipped depending on the configuration parameter diff --git a/src/lerobot/robots/openarm_follower/openarm_follower.py b/src/lerobot/robots/openarm_follower/openarm_follower.py index c865f1ec1..405dd92da 100644 --- a/src/lerobot/robots/openarm_follower/openarm_follower.py +++ b/src/lerobot/robots/openarm_follower/openarm_follower.py @@ -105,7 +105,7 @@ class OpenArmFollower(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: """Combined observation features from motors and cameras.""" return {**self._motors_ft, **self._cameras_ft} @@ -219,7 +219,7 @@ class OpenArmFollower(Robot): ) @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: """ Get current observation from robot including position, velocity, and torque. @@ -251,7 +251,7 @@ class OpenArmFollower(Robot): return obs_dict @check_if_not_connected - def send_action( + def _send_action( self, action: RobotAction, custom_kp: dict[str, float] | None = None, diff --git a/src/lerobot/robots/reachy2/robot_reachy2.py b/src/lerobot/robots/reachy2/robot_reachy2.py index fb466f85b..0886de18b 100644 --- a/src/lerobot/robots/reachy2/robot_reachy2.py +++ b/src/lerobot/robots/reachy2/robot_reachy2.py @@ -95,7 +95,7 @@ class Reachy2Robot(Robot): self.joints_dict: dict[str, str] = self._generate_joints_dict() @property - def observation_features(self) -> dict[str, Any]: + def raw_observation_features(self) -> dict[str, Any]: return {**self.motors_features, **self.camera_features} @property @@ -170,7 +170,7 @@ class Reachy2Robot(Robot): else: return {} - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: obs_dict: RobotObservation = {} # Read Reachy 2 state @@ -184,7 +184,7 @@ class Reachy2Robot(Robot): return obs_dict - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: if self.reachy is not None: if not self.is_connected: raise ConnectionError() diff --git a/src/lerobot/robots/robot.py b/src/lerobot/robots/robot.py index d165886b9..725a44e43 100644 --- a/src/lerobot/robots/robot.py +++ b/src/lerobot/robots/robot.py @@ -18,8 +18,11 @@ from pathlib import Path import draccus +from lerobot.configs.types import PipelineFeatureType from lerobot.motors import MotorCalibration -from lerobot.processor import RobotAction, RobotObservation +from lerobot.processor.core import RobotAction, RobotObservation +from lerobot.processor.factory import _make_identity_observation_pipeline, _make_identity_robot_action_pipeline +from lerobot.processor.pipeline import RobotProcessorPipeline from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, ROBOTS from .config import RobotConfig @@ -34,6 +37,10 @@ class Robot(abc.ABC): This class provides a standardized interface for interacting with physical robots. Subclasses must implement all abstract methods and properties to be usable. + Pipelines are first-class citizens: every robot carries an optional output pipeline + (applied in get_observation()) and an optional input pipeline (applied in send_action()). + Both default to identity (no-op), so existing robots work without any changes. + Attributes: config_class (RobotConfig): The expected configuration class for this robot. name (str): The unique robot name used to identify this robot type. @@ -55,6 +62,12 @@ class Robot(abc.ABC): if self.calibration_fpath.is_file(): self._load_calibration() + # Pipeline interface — default to identity (no-op), swap via set_output/input_pipeline() + self._output_pipeline: RobotProcessorPipeline = _make_identity_observation_pipeline() + self._input_pipeline: RobotProcessorPipeline = _make_identity_robot_action_pipeline() + # Cache of most recent raw observation; used by input_pipeline for IK initial guess + self._last_raw_obs: RobotObservation = {} + def __str__(self) -> str: return f"{self.id} {self.__class__.__name__}" @@ -84,18 +97,72 @@ class Robot(abc.ABC): except Exception: # nosec B110 pass - # TODO(aliberts): create a proper Feature class for this that links with datasets + # ── Pipeline interface ──────────────────────────────────────────────────── + + def output_pipeline(self) -> RobotProcessorPipeline: + """ + Pipeline applied inside get_observation() to transform raw hardware observations. + Default: identity (no-op). Override via set_output_pipeline() or subclassing. + + Example: set a forward-kinematics pipeline to convert joint positions to EE pose. + """ + return self._output_pipeline + + def input_pipeline(self) -> RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]: + """ + Pipeline applied inside send_action() to transform incoming actions before hardware write. + Default: identity (no-op). Override via set_input_pipeline() or subclassing. + + The pipeline receives a (action, last_raw_obs) tuple so IK solvers can use the + current joint configuration as an initial guess. + + Example: set an inverse-kinematics pipeline to convert EE commands to joint positions. + """ + return self._input_pipeline + + def set_output_pipeline(self, pipeline: RobotProcessorPipeline) -> None: + """Set the observation output pipeline (applied in get_observation()).""" + self._output_pipeline = pipeline + + def set_input_pipeline(self, pipeline: RobotProcessorPipeline) -> None: + """Set the action input pipeline (applied in send_action()).""" + self._input_pipeline = pipeline + + # ── Feature properties ──────────────────────────────────────────────────── + @property - @abc.abstractmethod def observation_features(self) -> dict: """ - A dictionary describing the structure and types of the observations produced by the robot. - Its structure (keys) should match the structure of what is returned by :pymeth:`get_observation`. - Values for the dict should either be: - - The type of the value if it's a simple value, e.g. `float` for single proprioceptive value (a joint's position/velocity) - - A tuple representing the shape if it's an array-type value, e.g. `(height, width, channel)` for images + Pipeline-transformed observation features. - Note: this property should be able to be called regardless of whether the robot is connected or not. + Applies output_pipeline().transform_features() to raw_observation_features so the + returned dict matches what get_observation() actually returns to callers. + + Use raw_observation_features to inspect hardware-level feature shapes. + + Note: this property should be able to be called regardless of whether the robot + is connected or not. + """ + from lerobot.datasets.pipeline_features import create_initial_features # lazy import + + initial = create_initial_features(observation=self.raw_observation_features) + transformed = self.output_pipeline().transform_features(initial) + return transformed.get(PipelineFeatureType.OBSERVATION, {}) + + @property + @abc.abstractmethod + def raw_observation_features(self) -> dict: + """ + Hardware-level observation features (before any pipeline transformation). + + A dictionary describing the structure and types of the observations produced + directly by the robot hardware. Its structure (keys) should match the structure + of what is returned by :pymeth:`_get_observation`. Values should be: + - The type if it's a simple value, e.g. ``float`` for joint position + - A tuple representing the shape for array values, e.g. ``(H, W, C)`` for images + + Note: this property should be able to be called regardless of whether the robot + is connected or not. """ pass @@ -103,12 +170,18 @@ class Robot(abc.ABC): @abc.abstractmethod def action_features(self) -> dict: """ - A dictionary describing the structure and types of the actions expected by the robot. Its structure - (keys) should match the structure of what is passed to :pymeth:`send_action`. Values for the dict - should be the type of the value if it's a simple value, e.g. `float` for single proprioceptive value - (a joint's goal position/velocity) + A dictionary describing the structure and types of the actions expected by the robot. + Its structure (keys) should match the structure of what is passed to + :pymeth:`send_action`. Values for the dict should be the type of the value if it's + a simple value, e.g. ``float`` for single proprioceptive value + (a joint's goal position/velocity). - Note: this property should be able to be called regardless of whether the robot is connected or not. + For simple robots (no input pipeline), this returns motor-level features. + For robots with an IK pipeline, override this to return the pipeline's input spec + (e.g., EE features) so that compatibility checks work correctly. + + Note: this property should be able to be called regardless of whether the robot + is connected or not. """ pass @@ -116,8 +189,8 @@ class Robot(abc.ABC): @abc.abstractmethod def is_connected(self) -> bool: """ - Whether the robot is currently connected or not. If `False`, calling :pymeth:`get_observation` or - :pymeth:`send_action` should raise an error. + Whether the robot is currently connected or not. If ``False``, calling + :pymeth:`get_observation` or :pymeth:`send_action` should raise an error. """ pass @@ -135,7 +208,7 @@ class Robot(abc.ABC): @property @abc.abstractmethod def is_calibrated(self) -> bool: - """Whether the robot is currently calibrated or not. Should be always `True` if not applicable""" + """Whether the robot is currently calibrated or not. Should be always ``True`` if not applicable""" pass @abc.abstractmethod @@ -153,7 +226,7 @@ class Robot(abc.ABC): Helper to load calibration data from the specified file. Args: - fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`. + fpath (Path | None): Optional path to the calibration file. Defaults to ``self.calibration_fpath``. """ fpath = self.calibration_fpath if fpath is None else fpath with open(fpath) as f, draccus.config_type("json"): @@ -164,7 +237,7 @@ class Robot(abc.ABC): Helper to save calibration data to the specified file. Args: - fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`. + fpath (Path | None): Optional path to save the calibration file. Defaults to ``self.calibration_fpath``. """ fpath = self.calibration_fpath if fpath is None else fpath with open(fpath, "w") as f, draccus.config_type("json"): @@ -178,30 +251,64 @@ class Robot(abc.ABC): """ pass - @abc.abstractmethod + # ── Template methods (concrete, call pipeline internally) ───────────────── + def get_observation(self) -> RobotObservation: """ - Retrieve the current observation from the robot. + Retrieve the current observation from the robot and apply the output pipeline. + + Calls :pymeth:`_get_observation` to get raw hardware data, caches it for use as + IK initial guess in :pymeth:`send_action`, then applies :pymeth:`output_pipeline`. Returns: - RobotObservation: A flat dictionary representing the robot's current sensory state. Its structure - should match :pymeth:`observation_features`. + RobotObservation: Pipeline-transformed observation. With the default identity + pipeline this equals the raw observation from :pymeth:`_get_observation`. """ - - pass + raw = self._get_observation() + self._last_raw_obs = raw + return self.output_pipeline()(raw) @abc.abstractmethod - def send_action(self, action: RobotAction) -> RobotAction: + def _get_observation(self) -> RobotObservation: """ - Send an action command to the robot. - - Args: - action (RobotAction): Dictionary representing the desired action. Its structure should match - :pymeth:`action_features`. + Retrieve the raw observation directly from robot hardware. Returns: - RobotAction: The action actually sent to the motors potentially clipped or modified, e.g. by - safety limits on velocity. + RobotObservation: A flat dictionary representing the robot's current sensory + state. Its structure should match :pymeth:`raw_observation_features`. + """ + pass + + def send_action(self, action: RobotAction) -> RobotAction: + """ + Apply the input pipeline and send the resulting action to robot hardware. + + The input pipeline receives ``(action, last_raw_obs)`` so IK solvers can use the + cached joint configuration as an initial guess. With the default identity pipeline, + the action is forwarded unchanged. + + Args: + action (RobotAction): Dictionary representing the desired action. Its structure + should match :pymeth:`action_features`. + + Returns: + RobotAction: The action actually sent to the motors, potentially clipped or + modified by the pipeline or hardware safety limits. + """ + transformed = self.input_pipeline()((action, self._last_raw_obs)) + return self._send_action(transformed) + + @abc.abstractmethod + def _send_action(self, action: RobotAction) -> RobotAction: + """ + Send an action command directly to robot hardware. + + Args: + action (RobotAction): Dictionary of motor-level commands. Its structure should + match what the hardware expects (typically motor positions/velocities). + + Returns: + RobotAction: The action actually sent, potentially clipped by safety limits. """ pass diff --git a/src/lerobot/robots/so_follower/pipelines/__init__.py b/src/lerobot/robots/so_follower/pipelines/__init__.py new file mode 100644 index 000000000..94a3bb755 --- /dev/null +++ b/src/lerobot/robots/so_follower/pipelines/__init__.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .ee_space import make_so10x_fk_observation_pipeline, make_so10x_ik_action_pipeline + +__all__ = ["make_so10x_fk_observation_pipeline", "make_so10x_ik_action_pipeline"] diff --git a/src/lerobot/robots/so_follower/pipelines/ee_space.py b/src/lerobot/robots/so_follower/pipelines/ee_space.py new file mode 100644 index 000000000..d9dcb169e --- /dev/null +++ b/src/lerobot/robots/so_follower/pipelines/ee_space.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +End-effector space pipelines for SO-100/101 follower robots. + +These factory functions return ready-to-use pipelines that convert between joint space +and Cartesian end-effector space. Attach them to a robot with ``set_output_pipeline`` / +``set_input_pipeline`` to enable EE-space recording and teleoperation. + +Example:: + + from lerobot.robots.so_follower.pipelines import ( + make_so10x_fk_observation_pipeline, + make_so10x_ik_action_pipeline, + ) + + motor_names = list(follower.bus.motors.keys()) + follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names)) + follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names)) +""" + +from lerobot.model.kinematics import RobotKinematics +from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline +from lerobot.processor.converters import ( + observation_to_transition, + robot_action_observation_to_transition, + transition_to_observation, + transition_to_robot_action, +) +from lerobot.robots.so_follower.robot_kinematic_processor import ( + EEBoundsAndSafety, + ForwardKinematicsJointsToEE, + InverseKinematicsEEToJoints, +) + +_DEFAULT_EE_BOUNDS = {"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]} +_DEFAULT_GRIPPER_FRAME = "gripper_frame_link" + + +def make_so10x_fk_observation_pipeline( + urdf_path: str, + motor_names: list[str], + *, + target_frame_name: str = _DEFAULT_GRIPPER_FRAME, +) -> RobotProcessorPipeline[RobotObservation, RobotObservation]: + """ + Create a forward-kinematics observation pipeline for SO-100/101 follower robots. + + Converts raw joint positions (observation) into end-effector pose (position + orientation). + Attach this to a follower robot via ``set_output_pipeline`` so that ``get_observation()`` + returns EE coordinates instead of raw joint angles. + + Args: + urdf_path: Path to the SO-100/101 URDF file used for kinematics. + motor_names: Ordered list of motor names matching the URDF joint names. + target_frame_name: Name of the end-effector frame in the URDF. + + Returns: + A RobotProcessorPipeline that maps joint observations to EE observations. + + Example:: + + follower.set_output_pipeline( + make_so10x_fk_observation_pipeline("./so101.urdf", motor_names) + ) + obs = follower.get_observation() # now contains ee.x, ee.y, ee.z, ... + """ + kinematics = RobotKinematics( + urdf_path=urdf_path, + target_frame_name=target_frame_name, + joint_names=motor_names, + ) + return RobotProcessorPipeline[RobotObservation, RobotObservation]( + steps=[ForwardKinematicsJointsToEE(kinematics=kinematics, motor_names=motor_names)], + to_transition=observation_to_transition, + to_output=transition_to_observation, + ) + + +def make_so10x_ik_action_pipeline( + urdf_path: str, + motor_names: list[str], + *, + target_frame_name: str = _DEFAULT_GRIPPER_FRAME, + end_effector_bounds: dict | None = None, + max_ee_step_m: float = 0.10, +) -> RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]: + """ + Create an inverse-kinematics action pipeline for SO-100/101 follower robots. + + Converts incoming end-effector pose commands into joint positions, applying safety + bounds and step-size limits before solving IK. The current joint positions are used + as the IK initial guess (taken from the cached ``_last_raw_obs``). + + Attach this to a follower robot via ``set_input_pipeline`` so that ``send_action()`` + receives EE commands and translates them to motor positions before the hardware write. + + Args: + urdf_path: Path to the SO-100/101 URDF file used for kinematics. + motor_names: Ordered list of motor names matching the URDF joint names. + target_frame_name: Name of the end-effector frame in the URDF. + end_effector_bounds: Dict with ``"min"`` and ``"max"`` lists (3D position bounds in metres). + Defaults to ``{"min": [-1, -1, -1], "max": [1, 1, 1]}``. + max_ee_step_m: Maximum allowed EE position change per step in metres. + + Returns: + A RobotProcessorPipeline that maps (EE action, raw obs) to joint action. + + Example:: + + follower.set_input_pipeline( + make_so10x_ik_action_pipeline("./so101.urdf", motor_names) + ) + # send_action() now accepts ee.x, ee.y, ee.z, ee.wx, ee.wy, ee.wz, ee.gripper_vel + """ + kinematics = RobotKinematics( + urdf_path=urdf_path, + target_frame_name=target_frame_name, + joint_names=motor_names, + ) + bounds = end_effector_bounds or _DEFAULT_EE_BOUNDS + return RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( + steps=[ + EEBoundsAndSafety(end_effector_bounds=bounds, max_ee_step_m=max_ee_step_m), + InverseKinematicsEEToJoints( + kinematics=kinematics, + motor_names=motor_names, + initial_guess_current_joints=True, + ), + ], + to_transition=robot_action_observation_to_transition, + to_output=transition_to_robot_action, + ) diff --git a/src/lerobot/robots/so_follower/so_follower.py b/src/lerobot/robots/so_follower/so_follower.py index bc72a2b6a..e0ad623ab 100644 --- a/src/lerobot/robots/so_follower/so_follower.py +++ b/src/lerobot/robots/so_follower/so_follower.py @@ -74,7 +74,7 @@ class SOFollower(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property @@ -176,7 +176,7 @@ class SOFollower(Robot): print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: # Read arm position start = time.perf_counter() obs_dict = self.bus.sync_read("Present_Position") @@ -194,7 +194,7 @@ class SOFollower(Robot): return obs_dict @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: """Command arm to move to a target joint configuration. The relative action magnitude may be clipped depending on the configuration parameter diff --git a/src/lerobot/robots/unitree_g1/unitree_g1.py b/src/lerobot/robots/unitree_g1/unitree_g1.py index df0de8f19..887c444e8 100644 --- a/src/lerobot/robots/unitree_g1/unitree_g1.py +++ b/src/lerobot/robots/unitree_g1/unitree_g1.py @@ -273,7 +273,7 @@ class UnitreeG1(Robot): for cam in self._cameras.values(): cam.disconnect() - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: lowstate = self._lowstate if lowstate is None: return {} @@ -351,10 +351,10 @@ class UnitreeG1(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: for motor in G1_29_JointIndex: key = f"{motor.name}.q" if key in action: @@ -421,7 +421,7 @@ class UnitreeG1(Robot): num_steps = int(total_time / control_dt) # get current state - obs = self.get_observation() + obs = self._get_observation() # record current positions init_dof_pos = np.zeros(29, dtype=np.float32) @@ -439,7 +439,7 @@ class UnitreeG1(Robot): interp_pos = init_dof_pos[motor.value] * (1 - alpha) + target_pos * alpha action_dict[f"{motor.name}.q"] = float(interp_pos) - self.send_action(action_dict) + self._send_action(action_dict) # Maintain constant control rate elapsed = time.time() - start_time diff --git a/src/lerobot/scripts/lerobot_record.py b/src/lerobot/scripts/lerobot_record.py index 72708ba23..d6dda81cc 100644 --- a/src/lerobot/scripts/lerobot_record.py +++ b/src/lerobot/scripts/lerobot_record.py @@ -74,6 +74,8 @@ from pathlib import Path from pprint import pformat from typing import Any +import torch + from lerobot.cameras import ( # noqa: F401 CameraConfig, # noqa: F401 ) @@ -85,19 +87,16 @@ from lerobot.configs import parser from lerobot.configs.policies import PreTrainedConfig from lerobot.datasets.image_writer import safe_stop_image_writer from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features -from lerobot.datasets.utils import build_dataset_frame, combine_feature_dicts +from lerobot.datasets.utils import build_dataset_frame from lerobot.datasets.video_utils import VideoEncodingManager from lerobot.policies.factory import make_policy, make_pre_post_processors from lerobot.policies.pretrained import PreTrainedPolicy +from lerobot.policies.rtc import ActionInterpolator from lerobot.policies.utils import make_robot_action from lerobot.processor import ( PolicyAction, PolicyProcessorPipeline, RobotAction, - RobotObservation, - RobotProcessorPipeline, - make_default_processors, ) from lerobot.processor.rename_processor import rename_stats from lerobot.robots import ( # noqa: F401 @@ -140,6 +139,11 @@ from lerobot.utils.control_utils import ( sanity_check_dataset_robot_compatibility, ) from lerobot.utils.import_utils import register_third_party_plugins +from lerobot.utils.pipeline_utils import ( + build_dataset_features, + check_action_space_compatibility, + check_observation_space_compatibility, +) from lerobot.utils.robot_utils import precise_sleep from lerobot.utils.utils import ( get_safe_torch_device, @@ -155,7 +159,7 @@ class DatasetRecordConfig: repo_id: str # A short but accurate description of the task performed during the recording (e.g. "Pick the Lego block and drop it in the box on the right.") single_task: str - # Root directory where the dataset will be stored (e.g. 'dataset/path'). If None, defaults to $HF_LEROBOT_HOME/repo_id. + # Root directory where the dataset will be stored (e.g. 'dataset/path'). root: str | Path | None = None # Limit the frames per second. fps: int = 30 @@ -226,6 +230,9 @@ class RecordConfig: play_sounds: bool = True # Resume recording on an existing dataset. resume: bool = False + # Action interpolation multiplier for smoother policy control (1=off, 2=2x, 3=3x) + # Only applies when using a policy (not teleop) + interpolation_multiplier: int = 1 def __post_init__(self): # HACK: We parse again the cli args here to get the pretrained path if there was one. @@ -249,28 +256,23 @@ class RecordConfig: """ --------------- record_loop() data flow -------------------------- [ Robot ] V - [ robot.get_observation() ] ---> raw_obs - V - [ robot_observation_processor ] ---> processed_obs + [ robot.get_observation() ] → applies output_pipeline internally → obs V .-----( ACTION LOGIC )------------------. V V [ From Teleoperator ] [ From Policy ] | | - | [teleop.get_action] -> raw_action | [predict_action] - | | | | - | V | V - | [teleop_action_processor] | | - | | | | - '---> processed_teleop_action '---> processed_policy_action + | teleop.get_action() | predict_action(obs) + | (output_pipeline applied internally) | | + | | | V + '----> action '---> policy_action_dict | | '-------------------------.-------------' V - [ robot_action_processor ] --> robot_action_to_send + [ robot.send_action(action) ] + (input_pipeline applied internally) V - [ robot.send_action() ] -- (Robot Executes) - V - ( Save to Dataset ) + ( Save action + obs to Dataset ) V ( Rerun Log / Loop Wait ) """ @@ -281,15 +283,6 @@ def record_loop( robot: Robot, events: dict, fps: int, - teleop_action_processor: RobotProcessorPipeline[ - tuple[RobotAction, RobotObservation], RobotAction - ], # runs after teleop - robot_action_processor: RobotProcessorPipeline[ - tuple[RobotAction, RobotObservation], RobotAction - ], # runs before robot - robot_observation_processor: RobotProcessorPipeline[ - RobotObservation, RobotObservation - ], # runs after robot dataset: LeRobotDataset | None = None, teleop: Teleoperator | list[Teleoperator] | None = None, policy: PreTrainedPolicy | None = None, @@ -298,8 +291,30 @@ def record_loop( control_time_s: int | None = None, single_task: str | None = None, display_data: bool = False, + interpolator: ActionInterpolator | None = None, display_compressed_images: bool = False, ): + """ + Core recording loop. Robot and teleoperator pipelines are applied internally — + no explicit processor arguments are needed. + + Args: + robot: The robot instance. Its output_pipeline() transforms observations and + its input_pipeline() transforms actions before hardware write. + events: Control events dict (exit_early, stop_recording, rerecord_episode). + fps: Target control loop frequency. + dataset: If provided, frames are written here each step. + teleop: Teleoperator or list of teleoperators. Its output_pipeline() transforms + actions (e.g., joint → EE) before they are sent to the robot. + policy: Optional pre-trained policy for closed-loop control. + preprocessor: Policy input pre-processor. + postprocessor: Policy output post-processor. + control_time_s: Episode duration in seconds. + single_task: Task description string saved with each frame. + display_data: If True, log observations and actions to Rerun. + interpolator: Optional action interpolator for smoother policy control. + display_compressed_images: If True, compress images before Rerun display. + """ if dataset is not None and dataset.fps != fps: raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).") @@ -334,6 +349,16 @@ def record_loop( preprocessor.reset() postprocessor.reset() + # Reset interpolator if provided + if interpolator is not None: + interpolator.reset() + + # Calculate control interval based on interpolation + use_interpolation = interpolator is not None and interpolator.enabled and policy is not None + control_interval = interpolator.get_control_interval(fps) if interpolator else 1 / fps + # Pre-compute once — action features don't change during a recording episode + action_keys = sorted(robot.action_features) if use_interpolation else [] + no_action_count = 0 timestamp = 0 start_episode_t = time.perf_counter() @@ -344,43 +369,75 @@ def record_loop( events["exit_early"] = False break - # Get robot observation + # Get robot observation (output_pipeline applied internally) obs = robot.get_observation() - # Applies a pipeline to the raw robot observation, default is IdentityProcessor - obs_processed = robot_observation_processor(obs) - if policy is not None or dataset is not None: - observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix=OBS_STR) + observation_frame = build_dataset_frame(dataset.features, obs, prefix=OBS_STR) # Get action from either policy or teleop if policy is not None and preprocessor is not None and postprocessor is not None: - action_values = predict_action( - observation=observation_frame, - policy=policy, - device=get_safe_torch_device(policy.config.device), - preprocessor=preprocessor, - postprocessor=postprocessor, - use_amp=policy.config.use_amp, - task=single_task, - robot_type=robot.robot_type, - ) + # With interpolation: only call policy when interpolator needs new action + if use_interpolation: + if interpolator.needs_new_action(): + action_values = predict_action( + observation=observation_frame, + policy=policy, + device=get_safe_torch_device(policy.config.device), + preprocessor=preprocessor, + postprocessor=postprocessor, + use_amp=policy.config.use_amp, + task=single_task, + robot_type=robot.robot_type, + ) + act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features) + # send_action applies input_pipeline (e.g. IK) internally; + # capture the actually-sent joint action for interpolation + sent_joint_action = robot.send_action(act_processed_policy) - act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features) + # Build interpolation tensor from the motor-level joint action + action_tensor = torch.tensor([sent_joint_action[k] for k in action_keys]) + interpolator.add(action_tensor) + + # Get interpolated action (in joint/motor space) + interp_action = interpolator.get() + if interp_action is not None: + action_values = {k: interp_action[i].item() for i, k in enumerate(action_keys)} + # Interpolated values are already in joint space; bypass IK pipeline + robot._send_action(action_values) + else: + # No action available yet, skip this iteration + continue + else: + action_values = predict_action( + observation=observation_frame, + policy=policy, + device=get_safe_torch_device(policy.config.device), + preprocessor=preprocessor, + postprocessor=postprocessor, + use_amp=policy.config.use_amp, + task=single_task, + robot_type=robot.robot_type, + ) + act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features) + # send_action applies input_pipeline (e.g. IK) internally + robot.send_action(act_processed_policy) + action_values = act_processed_policy elif policy is None and isinstance(teleop, Teleoperator): - act = teleop.get_action() - - # Applies a pipeline to the raw teleop action, default is IdentityProcessor - act_processed_teleop = teleop_action_processor((act, obs)) + # get_action applies output_pipeline (e.g. FK) internally + action_values = teleop.get_action() + # send_action applies input_pipeline (e.g. IK) internally + robot.send_action(action_values) elif policy is None and isinstance(teleop, list): - arm_action = teleop_arm.get_action() + # LeKiwi multi-teleop path + arm_action = teleop_arm.get_action() # output_pipeline applied internally arm_action = {f"arm_{k}": v for k, v in arm_action.items()} keyboard_action = teleop_keyboard.get_action() base_action = robot._from_keyboard_to_base_action(keyboard_action) - act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action - act_processed_teleop = teleop_action_processor((act, obs)) + action_values = {**arm_action, **base_action} if len(base_action) > 0 else arm_action + robot.send_action(action_values) # input_pipeline applied internally else: no_action_count += 1 if no_action_count == 1 or no_action_count % 10 == 0: @@ -391,20 +448,6 @@ def record_loop( ) continue - # Applies a pipeline to the action, default is IdentityProcessor - if policy is not None and act_processed_policy is not None: - action_values = act_processed_policy - robot_action_to_send = robot_action_processor((act_processed_policy, obs)) - else: - action_values = act_processed_teleop - robot_action_to_send = robot_action_processor((act_processed_teleop, obs)) - - # Send action to robot - # Action can eventually be clipped using `max_relative_target`, - # so action actually sent is saved in the dataset. action = postprocessor.process(action) - # TODO(steven, pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot. - _sent_action = robot.send_action(robot_action_to_send) - # Write to dataset if dataset is not None: action_frame = build_dataset_frame(dataset.features, action_values, prefix=ACTION) @@ -413,12 +456,12 @@ def record_loop( if display_data: log_rerun_data( - observation=obs_processed, action=action_values, compress_images=display_compressed_images + observation=obs, action=action_values, compress_images=display_compressed_images ) dt_s = time.perf_counter() - start_loop_t - sleep_time_s: float = 1 / fps - dt_s + sleep_time_s: float = control_interval - dt_s if sleep_time_s < 0: logging.warning( f"Record loop is running slower ({1 / dt_s:.1f} Hz) than the target FPS ({fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation" @@ -444,22 +487,18 @@ def record(cfg: RecordConfig) -> LeRobotDataset: robot = make_robot_from_config(cfg.robot) teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None - teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors() + # Dataset features derived automatically from robot/teleop pipelines + if teleop is not None: + dataset_features = build_dataset_features(robot, teleop, use_videos=cfg.dataset.video) + else: + # Policy-only recording: use robot observation features only + from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features - dataset_features = combine_feature_dicts( - aggregate_pipeline_dataset_features( - pipeline=teleop_action_processor, - initial_features=create_initial_features( - action=robot.action_features - ), # TODO(steven, pepijn): in future this should be come from teleop or policy + dataset_features = aggregate_pipeline_dataset_features( + pipeline=robot.output_pipeline(), + initial_features=create_initial_features(observation=robot.raw_observation_features), use_videos=cfg.dataset.video, - ), - aggregate_pipeline_dataset_features( - pipeline=robot_observation_processor, - initial_features=create_initial_features(observation=robot.observation_features), - use_videos=cfg.dataset.video, - ), - ) + ) dataset = None listener = None @@ -505,6 +544,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset: policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta) preprocessor = None postprocessor = None + interpolator = None if cfg.policy is not None: preprocessor, postprocessor = make_pre_post_processors( policy_cfg=cfg.policy, @@ -515,11 +555,19 @@ def record(cfg: RecordConfig) -> LeRobotDataset: "rename_observations_processor": {"rename_map": cfg.dataset.rename_map}, }, ) + # Create interpolator for smoother policy control + if cfg.interpolation_multiplier > 1: + interpolator = ActionInterpolator(multiplier=cfg.interpolation_multiplier) + logging.info(f"Action interpolation enabled: {cfg.interpolation_multiplier}x control rate") robot.connect() if teleop is not None: teleop.connect() + if teleop is not None: + check_action_space_compatibility(teleop, robot) + check_observation_space_compatibility(robot, teleop) + listener, events = init_keyboard_listener() if not cfg.dataset.streaming_encoding: @@ -535,9 +583,6 @@ def record(cfg: RecordConfig) -> LeRobotDataset: robot=robot, events=events, fps=cfg.dataset.fps, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, teleop=teleop, policy=policy, preprocessor=preprocessor, @@ -546,6 +591,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset: control_time_s=cfg.dataset.episode_time_s, single_task=cfg.dataset.single_task, display_data=cfg.display_data, + interpolator=interpolator, display_compressed_images=display_compressed_images, ) @@ -564,9 +610,6 @@ def record(cfg: RecordConfig) -> LeRobotDataset: robot=robot, events=events, fps=cfg.dataset.fps, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, teleop=teleop, control_time_s=cfg.dataset.reset_time_s, single_task=cfg.dataset.single_task, diff --git a/src/lerobot/scripts/lerobot_teleoperate.py b/src/lerobot/scripts/lerobot_teleoperate.py index dad479b2e..6147a8852 100644 --- a/src/lerobot/scripts/lerobot_teleoperate.py +++ b/src/lerobot/scripts/lerobot_teleoperate.py @@ -61,12 +61,6 @@ import rerun as rr from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401 from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401 from lerobot.configs import parser -from lerobot.processor import ( - RobotAction, - RobotObservation, - RobotProcessorPipeline, - make_default_processors, -) from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, @@ -100,6 +94,7 @@ from lerobot.teleoperators import ( # noqa: F401 unitree_g1, ) from lerobot.utils.import_utils import register_third_party_plugins +from lerobot.utils.pipeline_utils import check_action_space_compatibility, check_observation_space_compatibility from lerobot.utils.robot_utils import precise_sleep from lerobot.utils.utils import init_logging, move_cursor_up from lerobot.utils.visualization_utils import init_rerun, log_rerun_data @@ -127,28 +122,28 @@ def teleop_loop( teleop: Teleoperator, robot: Robot, fps: int, - teleop_action_processor: RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction], - robot_action_processor: RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction], - robot_observation_processor: RobotProcessorPipeline[RobotObservation, RobotObservation], display_data: bool = False, duration: float | None = None, display_compressed_images: bool = False, ): """ - This function continuously reads actions from a teleoperation device, processes them through optional - pipelines, sends them to a robot, and optionally displays the robot's state. The loop runs at a - specified frequency until a set duration is reached or it is manually interrupted. + Continuously reads actions from a teleoperation device, sends them to a robot, + and optionally displays the robot's state. Pipelines are applied internally by + the robot and teleoperator objects. + + The loop runs at the specified frequency until a set duration is reached or it + is manually interrupted. Args: teleop: The teleoperator device instance providing control actions. robot: The robot instance being controlled. fps: The target frequency for the control loop in frames per second. - display_data: If True, fetches robot observations and displays them in the console and Rerun. - display_compressed_images: If True, compresses images before sending them to Rerun for display. - duration: The maximum duration of the teleoperation loop in seconds. If None, the loop runs indefinitely. - teleop_action_processor: An optional pipeline to process raw actions from the teleoperator. - robot_action_processor: An optional pipeline to process actions before they are sent to the robot. - robot_observation_processor: An optional pipeline to process raw observations from the robot. + display_data: If True, fetches robot observations and displays them in the + console and Rerun. + display_compressed_images: If True, compresses images before sending them + to Rerun for display. + duration: The maximum duration of the teleoperation loop in seconds. + If None, the loop runs indefinitely. """ display_len = max(len(key) for key in robot.action_features) @@ -157,40 +152,29 @@ def teleop_loop( while True: loop_start = time.perf_counter() - # Get robot observation - # Not really needed for now other than for visualization - # teleop_action_processor can take None as an observation - # given that it is the identity processor as default - obs = robot.get_observation() + # Get teleop action (output_pipeline applied internally) + action = teleop.get_action() - # Get teleop action - raw_action = teleop.get_action() - - # Process teleop action through pipeline - teleop_action = teleop_action_processor((raw_action, obs)) - - # Process action for robot through pipeline - robot_action_to_send = robot_action_processor((teleop_action, obs)) - - # Send processed action to robot (robot_action_processor.to_output should return RobotAction) - _ = robot.send_action(robot_action_to_send) + # Send action to robot (input_pipeline applied internally) + robot_action_sent = robot.send_action(action) if display_data: - # Process robot observation through pipeline - obs_transition = robot_observation_processor(obs) + # Get robot observation (output_pipeline applied internally) + obs = robot.get_observation() + teleop.send_feedback(obs) log_rerun_data( - observation=obs_transition, - action=teleop_action, + observation=obs, + action=action, compress_images=display_compressed_images, ) print("\n" + "-" * (display_len + 10)) print(f"{'NAME':<{display_len}} | {'NORM':>7}") - # Display the final robot action that was sent - for motor, value in robot_action_to_send.items(): - print(f"{motor:<{display_len}} | {value:>7.2f}") - move_cursor_up(len(robot_action_to_send) + 3) + for motor, value in robot_action_sent.items(): + if isinstance(value, float | int): + print(f"{motor:<{display_len}} | {value:>7.2f}") + move_cursor_up(len(robot_action_sent) + 3) dt_s = time.perf_counter() - loop_start precise_sleep(max(1 / fps - dt_s, 0.0)) @@ -216,11 +200,13 @@ def teleoperate(cfg: TeleoperateConfig): teleop = make_teleoperator_from_config(cfg.teleop) robot = make_robot_from_config(cfg.robot) - teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors() teleop.connect() robot.connect() + check_action_space_compatibility(teleop, robot) + check_observation_space_compatibility(robot, teleop) + try: teleop_loop( teleop=teleop, @@ -228,9 +214,6 @@ def teleoperate(cfg: TeleoperateConfig): fps=cfg.fps, display_data=cfg.display_data, duration=cfg.teleop_time_s, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, display_compressed_images=display_compressed_images, ) except KeyboardInterrupt: diff --git a/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py b/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py index 74b0c9b83..539a433e4 100644 --- a/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py +++ b/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py @@ -72,9 +72,9 @@ class BiOpenArmLeader(Teleoperator): self.right_arm = OpenArmLeader(right_arm_config) @cached_property - def action_features(self) -> dict[str, type]: - left_arm_features = self.left_arm.action_features - right_arm_features = self.right_arm.action_features + def raw_action_features(self) -> dict[str, type]: + left_arm_features = self.left_arm.raw_action_features + right_arm_features = self.right_arm.raw_action_features return { **{f"left_{k}": v for k, v in left_arm_features.items()}, @@ -112,7 +112,7 @@ class BiOpenArmLeader(Teleoperator): ) @check_if_not_connected - def get_action(self) -> RobotAction: + def _get_action(self) -> RobotAction: action_dict = {} # Add "left_" prefix @@ -125,7 +125,7 @@ class BiOpenArmLeader(Teleoperator): return action_dict - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: # TODO: Implement force feedback raise NotImplementedError diff --git a/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py b/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py index e84ac6f50..9c72e2674 100644 --- a/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py +++ b/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py @@ -55,9 +55,9 @@ class BiSOLeader(Teleoperator): self.right_arm = SOLeader(right_arm_config) @cached_property - def action_features(self) -> dict[str, type]: - left_arm_features = self.left_arm.action_features - right_arm_features = self.right_arm.action_features + def raw_action_features(self) -> dict[str, type]: + left_arm_features = self.left_arm.raw_action_features + right_arm_features = self.right_arm.raw_action_features return { **{f"left_{k}": v for k, v in left_arm_features.items()}, @@ -94,7 +94,7 @@ class BiSOLeader(Teleoperator): self.right_arm.setup_motors() @check_if_not_connected - def get_action(self) -> dict[str, float]: + def _get_action(self) -> dict[str, float]: action_dict = {} # Add "left_" prefix @@ -107,7 +107,7 @@ class BiSOLeader(Teleoperator): return action_dict - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: # TODO: Implement force feedback raise NotImplementedError diff --git a/src/lerobot/teleoperators/gamepad/teleop_gamepad.py b/src/lerobot/teleoperators/gamepad/teleop_gamepad.py index 69cb0f971..3b57ed54b 100644 --- a/src/lerobot/teleoperators/gamepad/teleop_gamepad.py +++ b/src/lerobot/teleoperators/gamepad/teleop_gamepad.py @@ -57,7 +57,7 @@ class GamepadTeleop(Teleoperator): self.gamepad = None @property - def action_features(self) -> dict: + def raw_action_features(self) -> dict: if self.config.use_gripper: return { "dtype": "float32", @@ -87,7 +87,7 @@ class GamepadTeleop(Teleoperator): self.gamepad.start() @check_if_not_connected - def get_action(self) -> RobotAction: + def _get_action(self) -> RobotAction: # Update the controller to get fresh inputs self.gamepad.update() @@ -180,7 +180,7 @@ class GamepadTeleop(Teleoperator): # No additional configuration needed pass - def send_feedback(self, feedback: dict) -> None: + def _send_feedback(self, feedback: dict) -> None: """Send feedback to the gamepad.""" # Gamepad doesn't support feedback pass diff --git a/src/lerobot/teleoperators/homunculus/homunculus_arm.py b/src/lerobot/teleoperators/homunculus/homunculus_arm.py index 178eed544..82a9aa6e7 100644 --- a/src/lerobot/teleoperators/homunculus/homunculus_arm.py +++ b/src/lerobot/teleoperators/homunculus/homunculus_arm.py @@ -81,7 +81,7 @@ class HomunculusArm(Teleoperator): self.state_lock = threading.Lock() @property - def action_features(self) -> dict: + def raw_action_features(self) -> dict: return {f"{joint}.pos": float for joint in self.joints} @property @@ -298,11 +298,11 @@ class HomunculusArm(Teleoperator): logger.debug(f"Error reading frame in background thread for {self}: {e}") @check_if_not_connected - def get_action(self) -> dict[str, float]: + def _get_action(self) -> dict[str, float]: joint_positions = self._read() return {f"{joint}.pos": pos for joint, pos in joint_positions.items()} - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: raise NotImplementedError @check_if_not_connected diff --git a/src/lerobot/teleoperators/homunculus/homunculus_glove.py b/src/lerobot/teleoperators/homunculus/homunculus_glove.py index c4393d660..c42acb3e4 100644 --- a/src/lerobot/teleoperators/homunculus/homunculus_glove.py +++ b/src/lerobot/teleoperators/homunculus/homunculus_glove.py @@ -107,7 +107,7 @@ class HomunculusGlove(Teleoperator): self.state_lock = threading.Lock() @property - def action_features(self) -> dict: + def raw_action_features(self) -> dict: return {f"{joint}.pos": float for joint in self.joints} @property @@ -324,13 +324,13 @@ class HomunculusGlove(Teleoperator): logger.debug(f"Error reading frame in background thread for {self}: {e}") @check_if_not_connected - def get_action(self) -> dict[str, float]: + def _get_action(self) -> dict[str, float]: joint_positions = self._read() return homunculus_glove_to_hope_jr_hand( {f"{joint}.pos": pos for joint, pos in joint_positions.items()} ) - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: raise NotImplementedError @check_if_not_connected diff --git a/src/lerobot/teleoperators/keyboard/teleop_keyboard.py b/src/lerobot/teleoperators/keyboard/teleop_keyboard.py index 919f463d3..820d94e94 100644 --- a/src/lerobot/teleoperators/keyboard/teleop_keyboard.py +++ b/src/lerobot/teleoperators/keyboard/teleop_keyboard.py @@ -67,7 +67,7 @@ class KeyboardTeleop(Teleoperator): self.logs = {} @property - def action_features(self) -> dict: + def raw_action_features(self) -> dict: return { "dtype": "float32", "shape": (len(self.arm),), @@ -122,7 +122,7 @@ class KeyboardTeleop(Teleoperator): pass @check_if_not_connected - def get_action(self) -> RobotAction: + def _get_action(self) -> RobotAction: before_read_t = time.perf_counter() self._drain_pressed_keys() @@ -133,7 +133,7 @@ class KeyboardTeleop(Teleoperator): return dict.fromkeys(action, None) - def send_feedback(self, feedback: dict[str, Any]) -> None: + def _send_feedback(self, feedback: dict[str, Any]) -> None: pass @check_if_not_connected @@ -157,7 +157,7 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop): self.misc_keys_queue = Queue() @property - def action_features(self) -> dict: + def raw_action_features(self) -> dict: if self.config.use_gripper: return { "dtype": "float32", @@ -172,7 +172,7 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop): } @check_if_not_connected - def get_action(self) -> RobotAction: + def _get_action(self) -> RobotAction: self._drain_pressed_keys() delta_x = 0.0 delta_y = 0.0 @@ -338,7 +338,7 @@ class KeyboardRoverTeleop(KeyboardTeleop): self.current_angular_speed = config.angular_speed @property - def action_features(self) -> dict: + def raw_action_features(self) -> dict: """Return action format for rover (linear and angular velocities).""" return { "linear.vel": float, @@ -361,7 +361,7 @@ class KeyboardRoverTeleop(KeyboardTeleop): self.current_pressed.pop(key_char, None) @check_if_not_connected - def get_action(self) -> RobotAction: + def _get_action(self) -> RobotAction: """ Get the current action based on pressed keys. diff --git a/src/lerobot/teleoperators/koch_leader/koch_leader.py b/src/lerobot/teleoperators/koch_leader/koch_leader.py index 87084b6b9..49351cbd3 100644 --- a/src/lerobot/teleoperators/koch_leader/koch_leader.py +++ b/src/lerobot/teleoperators/koch_leader/koch_leader.py @@ -58,7 +58,7 @@ class KochLeader(Teleoperator): ) @property - def action_features(self) -> dict[str, type]: + def raw_action_features(self) -> dict[str, type]: return {f"{motor}.pos": float for motor in self.bus.motors} @property @@ -160,7 +160,7 @@ class KochLeader(Teleoperator): print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected - def get_action(self) -> dict[str, float]: + def _get_action(self) -> dict[str, float]: start = time.perf_counter() action = self.bus.sync_read("Present_Position") action = {f"{motor}.pos": val for motor, val in action.items()} @@ -168,7 +168,7 @@ class KochLeader(Teleoperator): logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: # TODO(rcadene, aliberts): Implement force feedback raise NotImplementedError diff --git a/src/lerobot/teleoperators/omx_leader/omx_leader.py b/src/lerobot/teleoperators/omx_leader/omx_leader.py index 4264b0485..a25427597 100644 --- a/src/lerobot/teleoperators/omx_leader/omx_leader.py +++ b/src/lerobot/teleoperators/omx_leader/omx_leader.py @@ -57,7 +57,7 @@ class OmxLeader(Teleoperator): ) @property - def action_features(self) -> dict[str, type]: + def raw_action_features(self) -> dict[str, type]: return {f"{motor}.pos": float for motor in self.bus.motors} @property @@ -149,7 +149,7 @@ class OmxLeader(Teleoperator): print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected - def get_action(self) -> dict[str, float]: + def _get_action(self) -> dict[str, float]: start = time.perf_counter() action = self.bus.sync_read("Present_Position") action = {f"{motor}.pos": val for motor, val in action.items()} @@ -157,7 +157,7 @@ class OmxLeader(Teleoperator): logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: # TODO(rcadene, aliberts): Implement force feedback raise NotImplementedError diff --git a/src/lerobot/teleoperators/openarm_leader/openarm_leader.py b/src/lerobot/teleoperators/openarm_leader/openarm_leader.py index d9eaabe0f..0e2879a25 100644 --- a/src/lerobot/teleoperators/openarm_leader/openarm_leader.py +++ b/src/lerobot/teleoperators/openarm_leader/openarm_leader.py @@ -65,7 +65,7 @@ class OpenArmLeader(Teleoperator): ) @property - def action_features(self) -> dict[str, type]: + def raw_action_features(self) -> dict[str, type]: """Features produced by this teleoperator.""" features: dict[str, type] = {} for motor in self.bus.motors: @@ -183,7 +183,7 @@ class OpenArmLeader(Teleoperator): ) @check_if_not_connected - def get_action(self) -> RobotAction: + def _get_action(self) -> RobotAction: """ Get current action from the leader arm. @@ -209,7 +209,7 @@ class OpenArmLeader(Teleoperator): return action_dict - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: raise NotImplementedError("Feedback is not yet implemented for OpenArm leader.") @check_if_not_connected diff --git a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py index 3fbcecf24..28c2cd1fb 100644 --- a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py +++ b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py @@ -31,9 +31,17 @@ from .config_openarm_mini import OpenArmMiniConfig logger = logging.getLogger(__name__) -# Motors whose direction is inverted during readout -RIGHT_MOTORS_TO_FLIP = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"] -LEFT_MOTORS_TO_FLIP = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"] +# Motors whose direction is inverted on the leader side. +LEFT_MOTORS_TO_FLIP = {"joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_7"} +RIGHT_MOTORS_TO_FLIP = {"joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"} +# Leader(OpenArmMini) -> Follower(OpenArms) joint remap +JOINT_REMAP_TO_OPENARMS = {"joint_6": "joint_7", "joint_7": "joint_6"} +# Follower(OpenArms) -> Leader(OpenArmMini) joint remap +JOINT_REMAP_TO_MINI = {"joint_7": "joint_6", "joint_6": "joint_7"} +OPENARMS_GRIPPER_MIN = -65.0 +OPENARMS_GRIPPER_MAX = 0.0 +MINI_GRIPPER_MIN = 0.0 +MINI_GRIPPER_MAX = 100.0 class OpenArmMini(Teleoperator): @@ -93,8 +101,28 @@ class OpenArmMini(Teleoperator): calibration=cal_left, ) + @staticmethod + def _mini_gripper_to_openarms(value: float) -> float: + """Convert OpenArmMini gripper range [0, 100] to OpenArms gripper range [-65, 0].""" + mapped = OPENARMS_GRIPPER_MAX + ( + (value - MINI_GRIPPER_MIN) + * (OPENARMS_GRIPPER_MIN - OPENARMS_GRIPPER_MAX) + / (MINI_GRIPPER_MAX - MINI_GRIPPER_MIN) + ) + return max(min(mapped, OPENARMS_GRIPPER_MAX), OPENARMS_GRIPPER_MIN) + + @staticmethod + def _openarms_gripper_to_mini(value: float) -> float: + """Convert OpenArms gripper range [-65, 0] to OpenArmMini gripper range [0, 100].""" + clipped = max(min(value, OPENARMS_GRIPPER_MAX), OPENARMS_GRIPPER_MIN) + return MINI_GRIPPER_MIN + ( + (OPENARMS_GRIPPER_MAX - clipped) + * (MINI_GRIPPER_MAX - MINI_GRIPPER_MIN) + / (OPENARMS_GRIPPER_MAX - OPENARMS_GRIPPER_MIN) + ) + @property - def action_features(self) -> dict[str, type]: + def raw_action_features(self) -> dict[str, type]: features: dict[str, type] = {} for motor in self.bus_right.motors: features[f"right_{motor}.pos"] = float @@ -269,7 +297,7 @@ class OpenArmMini(Teleoperator): print(f"LEFT '{motor}' motor id set to {self.bus_left.motors[motor].id}") @check_if_not_connected - def get_action(self) -> RobotAction: + def _get_action(self) -> RobotAction: """Get current action from both arms (read positions from all motors).""" start = time.perf_counter() @@ -278,16 +306,64 @@ class OpenArmMini(Teleoperator): action: dict[str, Any] = {} for motor, val in right_positions.items(): - action[f"right_{motor}.pos"] = -val if motor in RIGHT_MOTORS_TO_FLIP else val + target_motor = JOINT_REMAP_TO_OPENARMS.get(motor, motor) + mapped_val = -val if motor in RIGHT_MOTORS_TO_FLIP else val + if target_motor == "gripper": + mapped_val = self._mini_gripper_to_openarms(mapped_val) + action[f"right_{target_motor}.pos"] = mapped_val for motor, val in left_positions.items(): - action[f"left_{motor}.pos"] = -val if motor in LEFT_MOTORS_TO_FLIP else val + target_motor = JOINT_REMAP_TO_OPENARMS.get(motor, motor) + mapped_val = -val if motor in LEFT_MOTORS_TO_FLIP else val + if target_motor == "gripper": + mapped_val = self._mini_gripper_to_openarms(mapped_val) + action[f"left_{target_motor}.pos"] = mapped_val dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action - def send_feedback(self, feedback: dict[str, float]) -> None: - raise NotImplementedError("Feedback is not yet implemented for OpenArm Mini.") + @check_if_not_connected + def enable_torque(self) -> None: + """Enable torque on both arms for active motion commands.""" + self.bus_right.enable_torque() + self.bus_left.enable_torque() + + @check_if_not_connected + def disable_torque(self) -> None: + """Disable torque on both arms for manual teleoperation.""" + self.bus_right.disable_torque() + self.bus_left.disable_torque() + + @check_if_not_connected + def write_goal_positions(self, action: dict[str, float]) -> None: + """Send normalized bilateral goal positions to the underlying Feetech buses.""" + right_goals: dict[str, float] = {} + left_goals: dict[str, float] = {} + + for key, value in action.items(): + if not key.endswith(".pos"): + continue + + if key.startswith("right_"): + openarms_motor = key.removeprefix("right_").removesuffix(".pos") + mini_motor = JOINT_REMAP_TO_MINI.get(openarms_motor, openarms_motor) + mapped_val = self._openarms_gripper_to_mini(value) if openarms_motor == "gripper" else value + right_goals[mini_motor] = -mapped_val if mini_motor in RIGHT_MOTORS_TO_FLIP else mapped_val + elif key.startswith("left_"): + openarms_motor = key.removeprefix("left_").removesuffix(".pos") + mini_motor = JOINT_REMAP_TO_MINI.get(openarms_motor, openarms_motor) + mapped_val = self._openarms_gripper_to_mini(value) if openarms_motor == "gripper" else value + left_goals[mini_motor] = -mapped_val if mini_motor in LEFT_MOTORS_TO_FLIP else mapped_val + + if right_goals: + self.bus_right.sync_write("Goal_Position", right_goals) + if left_goals: + self.bus_left.sync_write("Goal_Position", left_goals) + + @check_if_not_connected + def _send_feedback(self, feedback: dict[str, float]) -> None: + """Route feedback position commands through the same OpenArms/OpenArmMini mapping.""" + self.write_goal_positions(feedback) @check_if_not_connected def disconnect(self) -> None: diff --git a/src/lerobot/teleoperators/phone/teleop_phone.py b/src/lerobot/teleoperators/phone/teleop_phone.py index 221ee8083..c9cbf2888 100644 --- a/src/lerobot/teleoperators/phone/teleop_phone.py +++ b/src/lerobot/teleoperators/phone/teleop_phone.py @@ -47,7 +47,7 @@ class BasePhone: return (self._calib_pos is not None) and (self._calib_rot_inv is not None) @property - def action_features(self) -> dict[str, type]: + def raw_action_features(self) -> dict[str, type]: return { "phone.pos": np.ndarray, # shape (3,) "phone.rot": Rotation, # scipy.spatial.transform.Rotation @@ -64,7 +64,7 @@ class BasePhone: # No additional configuration required for phone teleop pass - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: # We could add haptic feedback (vibrations) here, but it's not implemented yet raise NotImplementedError @@ -163,7 +163,7 @@ class IOSPhone(BasePhone, Teleoperator): return True, pos, rot, pose @check_if_not_connected - def get_action(self) -> dict: + def _get_action(self) -> dict: has_pose, raw_position, raw_rotation, fb_pose = self._read_current_pose() if not has_pose or not self.is_calibrated: return {} @@ -314,7 +314,7 @@ class AndroidPhone(BasePhone, Teleoperator): self._latest_message = message @check_if_not_connected - def get_action(self) -> dict: + def _get_action(self) -> dict: ok, raw_pos, raw_rot, pose = self._read_current_pose() if not ok or not self.is_calibrated: return {} @@ -395,8 +395,8 @@ class Phone(Teleoperator): return self._phone_impl.is_calibrated @property - def action_features(self) -> dict[str, type]: - return self._phone_impl.action_features + def raw_action_features(self) -> dict[str, type]: + return self._phone_impl.raw_action_features @property def feedback_features(self) -> dict[str, type]: @@ -405,11 +405,11 @@ class Phone(Teleoperator): def configure(self) -> None: return self._phone_impl.configure() - def get_action(self) -> dict: - return self._phone_impl.get_action() + def _get_action(self) -> dict: + return self._phone_impl._get_action() - def send_feedback(self, feedback: dict[str, float]) -> None: - return self._phone_impl.send_feedback(feedback) + def _send_feedback(self, feedback: dict[str, float]) -> None: + return self._phone_impl._send_feedback(feedback) def disconnect(self) -> None: return self._phone_impl.disconnect() diff --git a/src/lerobot/teleoperators/reachy2_teleoperator/reachy2_teleoperator.py b/src/lerobot/teleoperators/reachy2_teleoperator/reachy2_teleoperator.py index db076b20f..c2a94a216 100644 --- a/src/lerobot/teleoperators/reachy2_teleoperator/reachy2_teleoperator.py +++ b/src/lerobot/teleoperators/reachy2_teleoperator/reachy2_teleoperator.py @@ -104,7 +104,7 @@ class Reachy2Teleoperator(Teleoperator): return joints @property - def action_features(self) -> dict[str, type]: + def raw_action_features(self) -> dict[str, type]: if self.config.with_mobile_base: return { **dict.fromkeys( @@ -146,7 +146,7 @@ class Reachy2Teleoperator(Teleoperator): pass @check_if_not_connected - def get_action(self) -> dict[str, float]: + def _get_action(self) -> dict[str, float]: start = time.perf_counter() joint_action: dict[str, float] = {} @@ -168,7 +168,7 @@ class Reachy2Teleoperator(Teleoperator): logger.debug(f"{self} read action: {dt_ms:.1f}ms") return {**joint_action, **vel_action} - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: raise NotImplementedError def disconnect(self) -> None: diff --git a/src/lerobot/teleoperators/so_leader/pipelines/__init__.py b/src/lerobot/teleoperators/so_leader/pipelines/__init__.py new file mode 100644 index 000000000..c291b047f --- /dev/null +++ b/src/lerobot/teleoperators/so_leader/pipelines/__init__.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .ee_space import make_so10x_leader_fk_pipeline + +__all__ = ["make_so10x_leader_fk_pipeline"] diff --git a/src/lerobot/teleoperators/so_leader/pipelines/ee_space.py b/src/lerobot/teleoperators/so_leader/pipelines/ee_space.py new file mode 100644 index 000000000..87f53a184 --- /dev/null +++ b/src/lerobot/teleoperators/so_leader/pipelines/ee_space.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Forward-kinematics pipeline for SO-100/101 leader (teleoperator) arm. + +Converts raw leader joint positions into end-effector pose. Attach this to a leader +via ``set_output_pipeline`` so that ``get_action()`` returns EE coordinates instead of +raw joint angles. + +Example:: + + from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline + + motor_names = list(leader.bus.motors.keys()) + leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, motor_names)) + action = leader.get_action() # now contains ee.x, ee.y, ee.z, ... +""" + +from lerobot.model.kinematics import RobotKinematics +from lerobot.processor import RobotAction, RobotProcessorPipeline +from lerobot.processor.converters import ( + robot_action_to_transition, + transition_to_robot_action, +) +from lerobot.robots.so_follower.robot_kinematic_processor import ForwardKinematicsJointsToEE + +_DEFAULT_GRIPPER_FRAME = "gripper_frame_link" + + +def make_so10x_leader_fk_pipeline( + urdf_path: str, + motor_names: list[str], + *, + target_frame_name: str = _DEFAULT_GRIPPER_FRAME, +) -> RobotProcessorPipeline[RobotAction, RobotAction]: + """ + Create a forward-kinematics action pipeline for SO-100/101 leader teleoperators. + + Converts raw leader joint positions (action) into end-effector pose (position + + orientation + gripper). Attach this to a leader via ``set_output_pipeline`` so that + ``get_action()`` returns EE coordinates instead of raw joint angles. + + Args: + urdf_path: Path to the SO-100/101 URDF file used for kinematics. + motor_names: Ordered list of motor names matching the URDF joint names. + target_frame_name: Name of the end-effector frame in the URDF. + + Returns: + A RobotProcessorPipeline that maps joint actions to EE actions. + + Example:: + + motor_names = list(leader.bus.motors.keys()) + leader.set_output_pipeline( + make_so10x_leader_fk_pipeline("./so101.urdf", motor_names) + ) + action = leader.get_action() # returns ee.x, ee.y, ee.z, ee.wx, ee.wy, ee.wz, ee.gripper_vel + """ + kinematics = RobotKinematics( + urdf_path=urdf_path, + target_frame_name=target_frame_name, + joint_names=motor_names, + ) + return RobotProcessorPipeline[RobotAction, RobotAction]( + steps=[ForwardKinematicsJointsToEE(kinematics=kinematics, motor_names=motor_names)], + to_transition=robot_action_to_transition, + to_output=transition_to_robot_action, + ) diff --git a/src/lerobot/teleoperators/so_leader/so_leader.py b/src/lerobot/teleoperators/so_leader/so_leader.py index a10e3a61f..f5c861878 100644 --- a/src/lerobot/teleoperators/so_leader/so_leader.py +++ b/src/lerobot/teleoperators/so_leader/so_leader.py @@ -55,7 +55,7 @@ class SOLeader(Teleoperator): ) @property - def action_features(self) -> dict[str, type]: + def raw_action_features(self) -> dict[str, type]: return {f"{motor}.pos": float for motor in self.bus.motors} @property @@ -138,7 +138,7 @@ class SOLeader(Teleoperator): print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected - def get_action(self) -> dict[str, float]: + def _get_action(self) -> dict[str, float]: start = time.perf_counter() action = self.bus.sync_read("Present_Position") action = {f"{motor}.pos": val for motor, val in action.items()} @@ -146,7 +146,7 @@ class SOLeader(Teleoperator): logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: # TODO: Implement force feedback raise NotImplementedError diff --git a/src/lerobot/teleoperators/teleoperator.py b/src/lerobot/teleoperators/teleoperator.py index 847b88b7f..3569acc2f 100644 --- a/src/lerobot/teleoperators/teleoperator.py +++ b/src/lerobot/teleoperators/teleoperator.py @@ -12,17 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import abc import builtins from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import draccus +from lerobot.configs.types import PipelineFeatureType from lerobot.motors.motors_bus import MotorCalibration -from lerobot.processor import RobotAction from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS +if TYPE_CHECKING: + from lerobot.processor.core import RobotAction + from lerobot.processor.pipeline import RobotProcessorPipeline + from .config import TeleoperatorConfig @@ -33,6 +39,10 @@ class Teleoperator(abc.ABC): This class provides a standardized interface for interacting with physical teleoperators. Subclasses must implement all abstract methods and properties to be usable. + Pipelines are first-class citizens: every teleoperator carries an optional output pipeline + (applied in get_action()) and an optional input pipeline (applied in send_feedback()). + Both default to identity (no-op), so existing teleoperators work without any changes. + Attributes: config_class (RobotConfig): The expected configuration class for this teleoperator. name (str): The unique name used to identify this teleoperator type. @@ -55,6 +65,14 @@ class Teleoperator(abc.ABC): if self.calibration_fpath.is_file(): self._load_calibration() + # Pipeline interface — default to identity (no-op), swap via set_output/input_pipeline() + # Lazy import: factory is in lerobot.processor which loads after teleoperators at module init time, + # but __init__ runs at instance-creation time when lerobot.processor is fully loaded. + from lerobot.processor.factory import _make_identity_feedback_pipeline, _make_identity_teleop_action_pipeline + + self._output_pipeline: RobotProcessorPipeline = _make_identity_teleop_action_pipeline() + self._input_pipeline: RobotProcessorPipeline = _make_identity_feedback_pipeline() + def __str__(self) -> str: return f"{self.id} {self.__class__.__name__}" @@ -84,16 +102,67 @@ class Teleoperator(abc.ABC): except Exception: # nosec B110 pass + # ── Pipeline interface ──────────────────────────────────────────────────── + + def output_pipeline(self) -> RobotProcessorPipeline: + """ + Pipeline applied inside get_action() to transform raw hardware actions. + Default: identity (no-op). Override via set_output_pipeline() or subclassing. + + Example: set a forward-kinematics pipeline to convert leader joint positions to EE pose. + """ + return self._output_pipeline + + def input_pipeline(self) -> RobotProcessorPipeline: + """ + Pipeline applied inside send_feedback() to transform incoming feedback. + Default: identity (no-op). Override via set_input_pipeline() or subclassing. + """ + return self._input_pipeline + + def set_output_pipeline(self, pipeline: RobotProcessorPipeline) -> None: + """Set the action output pipeline (applied in get_action()).""" + self._output_pipeline = pipeline + + def set_input_pipeline(self, pipeline: RobotProcessorPipeline) -> None: + """Set the feedback input pipeline (applied in send_feedback()).""" + self._input_pipeline = pipeline + + # ── Feature properties ──────────────────────────────────────────────────── + @property - @abc.abstractmethod def action_features(self) -> dict: """ - A dictionary describing the structure and types of the actions produced by the teleoperator. Its - structure (keys) should match the structure of what is returned by :pymeth:`get_action`. Values for - the dict should be the type of the value if it's a simple value, e.g. `float` for single - proprioceptive value (a joint's goal position/velocity) + Pipeline-transformed action features. - Note: this property should be able to be called regardless of whether the robot is connected or not. + Applies output_pipeline().transform_features() to raw_action_features so the + returned dict matches what get_action() actually produces for callers. + + Use raw_action_features to inspect hardware-level feature shapes. + + Note: this property should be able to be called regardless of whether the + teleoperator is connected or not. + """ + from lerobot.datasets.pipeline_features import create_initial_features # lazy import + + initial = create_initial_features(action=self.raw_action_features) + transformed = self.output_pipeline().transform_features(initial) + return transformed.get(PipelineFeatureType.ACTION, {}) + + @property + @abc.abstractmethod + def raw_action_features(self) -> dict: + """ + Hardware-level action features (before any pipeline transformation). + + A dictionary describing the structure and types of the actions produced + directly by the teleoperator hardware. Its structure (keys) should match + the structure of what is returned by :pymeth:`_get_action`. Values should be + the type of the value if it's a simple value, e.g. ``float`` for single + proprioceptive value (a joint's goal position/velocity). + + Note: this property should be able to be called regardless of whether the + teleoperator is connected or not. """ pass @@ -101,12 +170,13 @@ class Teleoperator(abc.ABC): @abc.abstractmethod def feedback_features(self) -> dict: """ - A dictionary describing the structure and types of the feedback actions expected by the robot. Its - structure (keys) should match the structure of what is passed to :pymeth:`send_feedback`. Values for - the dict should be the type of the value if it's a simple value, e.g. `float` for single - proprioceptive value (a joint's goal position/velocity) + A dictionary describing the structure and types of the feedback actions expected + by the teleoperator. Its structure (keys) should match the structure of what is + passed to :pymeth:`send_feedback`. Values should be the type of the value if it's + a simple value, e.g. ``float`` for single proprioceptive value. - Note: this property should be able to be called regardless of whether the robot is connected or not. + Note: this property should be able to be called regardless of whether the + teleoperator is connected or not. """ pass @@ -114,8 +184,8 @@ class Teleoperator(abc.ABC): @abc.abstractmethod def is_connected(self) -> bool: """ - Whether the teleoperator is currently connected or not. If `False`, calling :pymeth:`get_action` - or :pymeth:`send_feedback` should raise an error. + Whether the teleoperator is currently connected or not. If ``False``, calling + :pymeth:`get_action` or :pymeth:`send_feedback` should raise an error. """ pass @@ -133,7 +203,7 @@ class Teleoperator(abc.ABC): @property @abc.abstractmethod def is_calibrated(self) -> bool: - """Whether the teleoperator is currently calibrated or not. Should be always `True` if not applicable""" + """Whether the teleoperator is currently calibrated or not. Should be always ``True`` if not applicable""" pass @abc.abstractmethod @@ -151,7 +221,7 @@ class Teleoperator(abc.ABC): Helper to load calibration data from the specified file. Args: - fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`. + fpath (Path | None): Optional path to the calibration file. Defaults to ``self.calibration_fpath``. """ fpath = self.calibration_fpath if fpath is None else fpath with open(fpath) as f, draccus.config_type("json"): @@ -162,7 +232,7 @@ class Teleoperator(abc.ABC): Helper to save calibration data to the specified file. Args: - fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`. + fpath (Path | None): Optional path to save the calibration file. Defaults to ``self.calibration_fpath``. """ fpath = self.calibration_fpath if fpath is None else fpath with open(fpath, "w") as f, draccus.config_type("json"): @@ -176,29 +246,51 @@ class Teleoperator(abc.ABC): """ pass - @abc.abstractmethod + # ── Template methods (concrete, call pipeline internally) ───────────────── + def get_action(self) -> RobotAction: """ - Retrieve the current action from the teleoperator. + Retrieve the current action from the teleoperator and apply the output pipeline. + + Calls :pymeth:`_get_action` to get raw hardware data, then applies + :pymeth:`output_pipeline`. Returns: - RobotAction: A flat dictionary representing the teleoperator's current actions. Its - structure should match :pymeth:`observation_features`. + RobotAction: Pipeline-transformed action. With the default identity pipeline + this equals the raw action from :pymeth:`_get_action`. + """ + raw = self._get_action() + return self.output_pipeline()(raw) + + @abc.abstractmethod + def _get_action(self) -> RobotAction: + """ + Retrieve the raw action directly from teleoperator hardware. + + Returns: + RobotAction: A flat dictionary representing the teleoperator's current actions. + Its structure should match :pymeth:`raw_action_features`. """ pass - @abc.abstractmethod def send_feedback(self, feedback: dict[str, Any]) -> None: """ - Send a feedback action command to the teleoperator. + Apply the input pipeline and send the resulting feedback to teleoperator hardware. Args: - feedback (dict[str, Any]): Dictionary representing the desired feedback. Its structure should match - :pymeth:`feedback_features`. + feedback (dict[str, Any]): Dictionary representing the desired feedback. + Its structure should match :pymeth:`feedback_features`. + """ + transformed = self.input_pipeline()(feedback) + self._send_feedback(transformed) - Returns: - dict[str, Any]: The action actually sent to the motors potentially clipped or modified, e.g. by - safety limits on velocity. + @abc.abstractmethod + def _send_feedback(self, feedback: dict[str, Any]) -> None: + """ + Send feedback directly to teleoperator hardware. + + Args: + feedback (dict[str, Any]): Dictionary of hardware-level feedback commands. """ pass diff --git a/src/lerobot/teleoperators/unitree_g1/unitree_g1.py b/src/lerobot/teleoperators/unitree_g1/unitree_g1.py index 3779d83ec..bf2941b6f 100644 --- a/src/lerobot/teleoperators/unitree_g1/unitree_g1.py +++ b/src/lerobot/teleoperators/unitree_g1/unitree_g1.py @@ -72,7 +72,7 @@ class UnitreeG1Teleoperator(Teleoperator): self.ik_helper: ExoskeletonIKHelper | None = None @cached_property - def action_features(self) -> dict[str, type]: + def raw_action_features(self) -> dict[str, type]: return {f"{name}.q": float for name in self._g1_joint_names} @cached_property @@ -114,12 +114,12 @@ class UnitreeG1Teleoperator(Teleoperator): def configure(self) -> None: pass - def get_action(self) -> dict[str, float]: + def _get_action(self) -> dict[str, float]: left_angles = self.left_arm.get_angles() right_angles = self.right_arm.get_angles() return self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles) - def send_feedback(self, feedback: dict[str, float]) -> None: + def _send_feedback(self, feedback: dict[str, float]) -> None: raise NotImplementedError("Exoskeleton arms do not support feedback") def disconnect(self) -> None: diff --git a/src/lerobot/utils/pipeline_utils.py b/src/lerobot/utils/pipeline_utils.py new file mode 100644 index 000000000..4ed728e86 --- /dev/null +++ b/src/lerobot/utils/pipeline_utils.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utilities for building dataset features from robot/teleoperator pipelines and for +checking action/observation space compatibility between teleops and robots. +""" + +import logging + +from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features +from lerobot.datasets.utils import combine_feature_dicts + + +def build_dataset_features( + robot, + teleop, + *, + use_videos: bool = True, +) -> dict: + """ + Derive dataset feature specifications from robot and teleoperator pipelines. + + Uses the robot's ``output_pipeline`` and ``raw_observation_features`` to determine + what the dataset will store as observations, and the teleoperator's ``output_pipeline`` + and ``raw_action_features`` to determine what will be stored as actions. + + This replaces the old pattern of manually calling ``aggregate_pipeline_dataset_features`` + with explicit processor objects. + + Args: + robot: The robot instance (must have ``output_pipeline()`` and ``raw_observation_features``). + teleop: The teleoperator instance (must have ``output_pipeline()`` and ``raw_action_features``). + use_videos: If True, image observations are included as video features. + + Returns: + A combined feature dict suitable for passing to ``LeRobotDataset.create(..., features=...)``. + + Example:: + + dataset = LeRobotDataset.create( + repo_id="...", + fps=30, + features=build_dataset_features(follower, leader, use_videos=True), + ) + """ + obs_features = aggregate_pipeline_dataset_features( + pipeline=robot.output_pipeline(), + initial_features=create_initial_features(observation=robot.raw_observation_features), + use_videos=use_videos, + ) + action_features = aggregate_pipeline_dataset_features( + pipeline=teleop.output_pipeline(), + initial_features=create_initial_features(action=teleop.raw_action_features), + use_videos=False, + ) + return combine_feature_dicts(action_features, obs_features) + + +def check_action_space_compatibility(teleop, robot) -> None: + """ + Warn if the teleoperator's pipeline-transformed action features don't match the robot's + declared ``action_features``. + + This is a soft check — a mismatch produces a warning but does not raise. It is intended + to catch obvious misconfigurations (e.g., sending EE actions to a robot expecting joints) + before the control loop starts. + + Args: + teleop: The teleoperator whose ``action_features`` describe what it sends. + robot: The robot whose ``action_features`` describe what it expects. + """ + teleop_out = set(teleop.action_features.keys()) + robot_in = set(robot.action_features.keys()) + if teleop_out != robot_in: + import warnings + + warnings.warn( + f"Action space mismatch between teleop and robot.\n" + f" Teleop sends: {sorted(teleop_out)}\n" + f" Robot expects: {sorted(robot_in)}\n" + "Ensure pipelines map between these spaces correctly.", + UserWarning, + stacklevel=2, + ) + else: + logging.debug("Action space compatibility check passed.") + + +def check_observation_space_compatibility(robot, teleop) -> None: + """ + Warn if the robot's observation features don't cover what the teleoperator's + ``feedback_features`` expects. + + A non-empty ``feedback_features`` that is not a subset of the robot's observation keys + will produce a warning. + + Args: + robot: The robot whose ``observation_features`` describe what it produces. + teleop: The teleoperator whose ``feedback_features`` describe what it expects as feedback. + """ + robot_obs = set(robot.observation_features.keys()) + teleop_feedback = set(teleop.feedback_features.keys()) + if teleop_feedback and not teleop_feedback.issubset(robot_obs): + import warnings + + warnings.warn( + f"Observation/feedback space mismatch.\n" + f" Robot obs: {sorted(robot_obs)}\n" + f" Teleop feedback expects: {sorted(teleop_feedback)}\n" + "Ensure the robot observation pipeline covers all feedback keys.", + UserWarning, + stacklevel=2, + ) + else: + logging.debug("Observation/feedback space compatibility check passed.") diff --git a/tests/mocks/mock_robot.py b/tests/mocks/mock_robot.py index f69a2c02a..1472d38b5 100644 --- a/tests/mocks/mock_robot.py +++ b/tests/mocks/mock_robot.py @@ -87,7 +87,7 @@ class MockRobot(Robot): } @cached_property - def observation_features(self) -> dict[str, type | tuple]: + def raw_observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property @@ -116,7 +116,7 @@ class MockRobot(Robot): pass @check_if_not_connected - def get_observation(self) -> RobotObservation: + def _get_observation(self) -> RobotObservation: if self.config.random_values: return {f"{motor}.pos": random.uniform(-100, 100) for motor in self.motors} else: @@ -125,7 +125,7 @@ class MockRobot(Robot): } @check_if_not_connected - def send_action(self, action: RobotAction) -> RobotAction: + def _send_action(self, action: RobotAction) -> RobotAction: return action @check_if_not_connected diff --git a/tests/mocks/mock_teleop.py b/tests/mocks/mock_teleop.py index 89174dadf..edbc7d96c 100644 --- a/tests/mocks/mock_teleop.py +++ b/tests/mocks/mock_teleop.py @@ -57,7 +57,7 @@ class MockTeleop(Teleoperator): self.motors = [f"motor_{i + 1}" for i in range(config.n_motors)] @cached_property - def action_features(self) -> dict[str, type]: + def raw_action_features(self) -> dict[str, type]: return {f"{motor}.pos": float for motor in self.motors} @cached_property @@ -86,7 +86,7 @@ class MockTeleop(Teleoperator): pass @check_if_not_connected - def get_action(self) -> RobotAction: + def _get_action(self) -> RobotAction: if self.config.random_values: return {f"{motor}.pos": random.uniform(-100, 100) for motor in self.motors} else: @@ -95,7 +95,7 @@ class MockTeleop(Teleoperator): } @check_if_not_connected - def send_feedback(self, feedback: dict[str, Any]) -> None: ... + def _send_feedback(self, feedback: dict[str, Any]) -> None: ... @check_if_not_connected def disconnect(self) -> None: diff --git a/tests/test_pipeline_hub.py b/tests/test_pipeline_hub.py new file mode 100644 index 000000000..4a1ec6f50 --- /dev/null +++ b/tests/test_pipeline_hub.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Integration tests for loading robot/teleop pipelines from the Hugging Face Hub. + +These tests require network access and are marked with ``@pytest.mark.integration``. +Run with:: + + pytest tests/test_pipeline_hub.py -m integration -v + +The tests verify the full end-to-end flow of: +1. Loading a pipeline from the Hub via ``RobotProcessorPipeline.from_pretrained(...)`` +2. Attaching it to a robot or teleoperator via ``set_output_pipeline`` / ``set_input_pipeline`` +3. Verifying that ``observation_features`` / ``action_features`` differ from the raw versions + +Note: The Hub repos referenced below are placeholders. Update them once actual pipelines +are published to the Hub. +""" + +import pytest + + +# ─── Shared mock infrastructure (mirrors test_robot_pipeline.py) ────────────── + +try: + from tests.test_robot_pipeline import MockRobot, MockTeleop # type: ignore[import] +except ImportError: + # Fallback if tests are run from a different working directory + import sys + from pathlib import Path + + sys.path.insert(0, str(Path(__file__).parent)) + from test_robot_pipeline import MockRobot, MockTeleop + + +# ─── Integration tests ──────────────────────────────────────────────────────── + + +@pytest.mark.integration +def test_load_robot_pipeline_from_hub(tmp_path): + """ + Full end-to-end: load a FK observation pipeline for SO-101 from the Hub, + attach it to a robot, and verify that observation_features are transformed. + + Prerequisites: + - A pipeline must be published at ``lerobot/so101-fk-observation-pipeline`` on the Hub. + - A URDF file must be available locally (update ``local_urdf_path`` to point to it). + """ + pytest.importorskip("huggingface_hub") + from lerobot.processor.pipeline import RobotProcessorPipeline + + local_urdf_path = tmp_path / "so101.urdf" + # NOTE: In a real test environment, provide an actual URDF or mock the kinematics. + # For now, this test validates the Hub loading mechanism only if a URDF is provided. + if not local_urdf_path.exists(): + pytest.skip("URDF not available; skipping Hub loading test") + + pipeline = RobotProcessorPipeline.from_pretrained( + "lerobot/so101-fk-observation-pipeline", + overrides={"step_0": {"urdf_path": str(local_urdf_path)}}, + ) + robot = MockRobot() + robot.set_output_pipeline(pipeline) + + # Pipeline-transformed features should differ from raw features (EE vs joints) + assert robot.observation_features != robot.raw_observation_features + + +@pytest.mark.integration +def test_load_teleop_pipeline_from_hub(tmp_path): + """ + Full end-to-end: load a FK action pipeline for SO-101 leader from the Hub, + attach it to a teleoperator, and verify that action_features are transformed. + + Prerequisites: + - A pipeline must be published at ``lerobot/so101-leader-fk-action-pipeline`` on the Hub. + - A URDF file must be available locally (update ``local_urdf_path`` to point to it). + """ + pytest.importorskip("huggingface_hub") + from lerobot.processor.pipeline import RobotProcessorPipeline + + local_urdf_path = tmp_path / "so101.urdf" + if not local_urdf_path.exists(): + pytest.skip("URDF not available; skipping Hub loading test") + + pipeline = RobotProcessorPipeline.from_pretrained( + "lerobot/so101-leader-fk-action-pipeline", + overrides={"step_0": {"urdf_path": str(local_urdf_path)}}, + ) + teleop = MockTeleop() + teleop.set_output_pipeline(pipeline) + + # Pipeline-transformed features should differ from raw features (EE vs joints) + assert teleop.action_features != teleop.raw_action_features diff --git a/tests/test_robot_pipeline.py b/tests/test_robot_pipeline.py new file mode 100644 index 000000000..2f50b937f --- /dev/null +++ b/tests/test_robot_pipeline.py @@ -0,0 +1,403 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests for the robot/teleoperator pipeline interface. + +Tests cover: +- Default (identity) pipeline behaviour +- Custom pipeline attachment via set_output_pipeline / set_input_pipeline +- Auto-derived observation_features / action_features via pipelines +- Compatibility checks +- build_dataset_features utility +""" + +import warnings +from dataclasses import dataclass +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from lerobot.configs.types import PipelineFeatureType +from lerobot.processor import RobotAction, RobotObservation +from lerobot.processor.converters import ( + observation_to_transition, + robot_action_observation_to_transition, + robot_action_to_transition, + transition_to_observation, + transition_to_robot_action, +) +from lerobot.processor.factory import ( + _make_identity_feedback_pipeline, + _make_identity_observation_pipeline, + _make_identity_robot_action_pipeline, + _make_identity_teleop_action_pipeline, +) +from lerobot.processor.pipeline import ( + IdentityProcessorStep, + ObservationProcessorStep, + RobotActionProcessorStep, + RobotProcessorPipeline, +) +from lerobot.robots.robot import Robot +from lerobot.teleoperators.teleoperator import Teleoperator +from lerobot.utils.pipeline_utils import ( + build_dataset_features, + check_action_space_compatibility, + check_observation_space_compatibility, +) + + +# ─── Mock hardware classes ──────────────────────────────────────────────────── + + +@dataclass +class MockRobotConfig: + id: str = "mock_robot" + calibration_dir: Path | None = None + + +@dataclass +class MockTeleopConfig: + id: str = "mock_teleop" + calibration_dir: Path | None = None + + +_JOINT_NAMES = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"] +_JOINT_FEATURES = {f"{j}.pos": float for j in _JOINT_NAMES} +_EE_FEATURES = {"ee.x": float, "ee.y": float, "ee.z": float, "ee.wx": float, "ee.wy": float, "ee.wz": float, "ee.gripper_vel": float} + + +class MockRobot(Robot): + """Minimal Robot that stores last action for assertion.""" + + config_class = MockRobotConfig + name = "mock_robot" + + def __init__(self): + # bypass filesystem calibration setup; initialize with identity pipelines directly + self._output_pipeline = _make_identity_observation_pipeline() + self._input_pipeline = _make_identity_robot_action_pipeline() + self._last_raw_obs: RobotObservation = {} + self._last_sent: RobotAction = {} + + @property + def raw_observation_features(self) -> dict: + return {**_JOINT_FEATURES, "camera": (480, 640, 3)} + + @property + def action_features(self) -> dict: + return _JOINT_FEATURES + + @property + def is_connected(self) -> bool: + return True + + def connect(self, calibrate=True): + pass + + @property + def is_calibrated(self) -> bool: + return True + + def calibrate(self): + pass + + def configure(self): + pass + + def _get_observation(self) -> RobotObservation: + return {f"{j}.pos": float(i) for i, j in enumerate(_JOINT_NAMES)} | {"camera": None} + + def _send_action(self, action: RobotAction) -> RobotAction: + self._last_sent = action + return action + + def disconnect(self): + pass + + +class MockTeleop(Teleoperator): + """Minimal Teleoperator.""" + + config_class = MockTeleopConfig + name = "mock_teleop" + + def __init__(self): + # bypass filesystem calibration setup; initialize with identity pipelines directly + self._output_pipeline = _make_identity_teleop_action_pipeline() + self._input_pipeline = _make_identity_feedback_pipeline() + + @property + def raw_action_features(self) -> dict: + return _JOINT_FEATURES + + @property + def feedback_features(self) -> dict: + return {} + + @property + def is_connected(self) -> bool: + return True + + def connect(self, calibrate=True): + pass + + @property + def is_calibrated(self) -> bool: + return True + + def calibrate(self): + pass + + def configure(self): + pass + + def _get_action(self) -> RobotAction: + return {f"{j}.pos": float(i) for i, j in enumerate(_JOINT_NAMES)} + + def _send_feedback(self, feedback): + pass + + def disconnect(self): + pass + + +# ─── Simple transform step (doubles all float values) ──────────────────────── + + +class DoubleActionStep(RobotActionProcessorStep): + """Doubles all float action values.""" + + def action(self, action: RobotAction) -> RobotAction: + return {k: v * 2 for k, v in action.items()} + + def transform_features(self, features): + return features + + +class RenameToEEObsStep(ObservationProcessorStep): + """Renames joint obs keys to EE-like keys for testing transform_features.""" + + def observation(self, obs: RobotObservation) -> RobotObservation: + return {f"ee.{i}": v for i, v in enumerate(obs.values()) if isinstance(v, float)} + + def transform_features(self, features): + obs = features.get(PipelineFeatureType.OBSERVATION, {}) + new_obs = {f"ee.{i}": float for i in range(len([v for v in obs.values() if v == float]))} + return {**features, PipelineFeatureType.OBSERVATION: new_obs} + + +# ─── Tests: Robot pipeline interface ───────────────────────────────────────── + + +def test_robot_default_pipeline_is_identity(): + """With no custom pipeline, get_observation returns the same as _get_observation.""" + robot = MockRobot() + raw = robot._get_observation() + obs = robot.get_observation() + assert obs == raw + + +def test_robot_observation_caches_last_raw(): + """get_observation caches raw result for IK use in send_action.""" + robot = MockRobot() + robot.get_observation() + assert robot._last_raw_obs is not None + assert "shoulder_pan.pos" in robot._last_raw_obs + + +def test_robot_default_send_action_is_identity(): + """With no custom pipeline, send_action passes action unchanged to _send_action.""" + robot = MockRobot() + robot.get_observation() # populate _last_raw_obs + action = {f"{j}.pos": 1.0 for j in _JOINT_NAMES} + sent = robot.send_action(action) + assert sent == action + assert robot._last_sent == action + + +def test_robot_custom_output_pipeline_applied(): + """A custom action pipeline is applied to the action before _send_action.""" + robot = MockRobot() + double_pipeline = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( + steps=[DoubleActionStep()], + to_transition=robot_action_observation_to_transition, + to_output=transition_to_robot_action, + ) + robot.set_input_pipeline(double_pipeline) + robot.get_observation() # populate _last_raw_obs + action = {f"{j}.pos": 1.0 for j in _JOINT_NAMES} + robot.send_action(action) + assert all(v == 2.0 for v in robot._last_sent.values()) + + +def test_robot_observation_features_identity_matches_raw(): + """observation_features equals raw_observation_features with identity pipeline.""" + robot = MockRobot() + assert robot.observation_features == robot.raw_observation_features + + +def test_robot_raw_observation_features_unchanged_after_pipeline(): + """raw_observation_features is unaffected by the output pipeline.""" + robot = MockRobot() + # Even with an FK-like renaming pipeline, raw_observation_features stays the same + transform_pipeline = RobotProcessorPipeline[RobotObservation, RobotObservation]( + steps=[RenameToEEObsStep()], + to_transition=observation_to_transition, + to_output=transition_to_observation, + ) + robot.set_output_pipeline(transform_pipeline) + # raw should still be joints + camera + raw = robot.raw_observation_features + assert "shoulder_pan.pos" in raw + assert "camera" in raw + + +def test_robot_set_output_pipeline_replaces_identity(): + """set_output_pipeline replaces the default identity.""" + robot = MockRobot() + p = _make_identity_observation_pipeline() + robot.set_output_pipeline(p) + assert robot._output_pipeline is p + + +def test_robot_set_input_pipeline_replaces_identity(): + robot = MockRobot() + p = _make_identity_robot_action_pipeline() + robot.set_input_pipeline(p) + assert robot._input_pipeline is p + + +# ─── Tests: Teleoperator pipeline interface ─────────────────────────────────── + + +def test_teleop_default_get_action_is_identity(): + """With no custom pipeline, get_action returns the same as _get_action.""" + teleop = MockTeleop() + raw = teleop._get_action() + action = teleop.get_action() + assert action == raw + + +def test_teleop_action_features_identity_matches_raw(): + """action_features equals raw_action_features with identity pipeline.""" + teleop = MockTeleop() + assert teleop.action_features == teleop.raw_action_features + + +def test_teleop_set_output_pipeline(): + teleop = MockTeleop() + p = _make_identity_teleop_action_pipeline() + teleop.set_output_pipeline(p) + assert teleop._output_pipeline is p + + +def test_teleop_send_feedback_calls_send_feedback_impl(): + """send_feedback applies identity pipeline and delegates to _send_feedback.""" + teleop = MockTeleop() + received = {} + + def capture(fb): + received.update(fb) + + teleop._send_feedback = capture + teleop.send_feedback({"key": 1.0}) + assert received == {"key": 1.0} + + +# ─── Tests: Compatibility checks ───────────────────────────────────────────── + + +def test_check_action_space_compatibility_matching(): + """No warning when teleop output and robot action features match.""" + teleop = MockTeleop() + robot = MockRobot() + with warnings.catch_warnings(): + warnings.simplefilter("error") + check_action_space_compatibility(teleop, robot) # should not warn + + +def test_check_action_space_compatibility_mismatch_warns(): + """Warning issued when teleop and robot action features differ.""" + + class EETeleop(MockTeleop): + @property + def raw_action_features(self): + return _EE_FEATURES + + teleop = EETeleop() + robot = MockRobot() # still returns joint features + with pytest.warns(UserWarning, match="Action space mismatch"): + check_action_space_compatibility(teleop, robot) + + +def test_check_observation_space_compatibility_no_feedback(): + """No warning when teleop has empty feedback_features.""" + robot = MockRobot() + teleop = MockTeleop() + with warnings.catch_warnings(): + warnings.simplefilter("error") + check_observation_space_compatibility(robot, teleop) # empty feedback → no warning + + +# ─── Tests: build_dataset_features ─────────────────────────────────────────── + + +def test_build_dataset_features_identity(): + """With identity pipelines, dataset features contain joint keys.""" + robot = MockRobot() + teleop = MockTeleop() + features = build_dataset_features(robot, teleop, use_videos=False) + # Should contain action features (joint names) + action_keys = {k for k in features if "action" in k or any(j in k for j in _JOINT_NAMES)} + assert len(action_keys) > 0 + + +def test_build_dataset_features_includes_images_when_use_videos_true(): + """Image features are included when use_videos=True.""" + robot = MockRobot() + teleop = MockTeleop() + feats_with = build_dataset_features(robot, teleop, use_videos=True) + feats_without = build_dataset_features(robot, teleop, use_videos=False) + # With videos should have more features (camera) + assert len(feats_with) >= len(feats_without) + + +# ─── Tests: Factory identity pipeline helpers ───────────────────────────────── + + +def test_make_identity_observation_pipeline_is_noop(): + pipeline = _make_identity_observation_pipeline() + obs = {"shoulder_pan.pos": 1.0, "camera": None} + result = pipeline(obs) + assert result == obs + + +def test_make_identity_robot_action_pipeline_is_noop(): + pipeline = _make_identity_robot_action_pipeline() + action = {"shoulder_pan.pos": 1.0} + obs = {"shoulder_pan.pos": 0.0} + result = pipeline((action, obs)) + assert result == action + + +def test_make_identity_teleop_action_pipeline_is_noop(): + pipeline = _make_identity_teleop_action_pipeline() + action = {"shoulder_pan.pos": 1.0} + result = pipeline(action) + assert result == action