Compare commits

...

4 Commits

55 changed files with 1976 additions and 1047 deletions
-10
View File
@@ -18,7 +18,6 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import make_default_processors
from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig
from lerobot.scripts.lerobot_record import record_loop
from lerobot.utils.constants import ACTION, OBS_STR
@@ -71,9 +70,6 @@ def main():
# To connect you already should have this script running on LeKiwi: `python -m lerobot.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
robot.connect()
# TODO(Steven): Update this example to use pipelines
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
# Initialize the keyboard listener and rerun visualization
listener, events = init_keyboard_listener()
init_rerun(session_name="lekiwi_evaluate")
@@ -99,9 +95,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
# Reset the environment if not stopping or re-recording
@@ -116,9 +109,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
if events["rerecord_episode"]:
-10
View File
@@ -16,7 +16,6 @@
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.processor import make_default_processors
from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig
from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient
from lerobot.scripts.lerobot_record import record_loop
@@ -46,9 +45,6 @@ def main():
leader_arm = SO100Leader(leader_arm_config)
keyboard = KeyboardTeleop(keyboard_config)
# TODO(Steven): Update this example to use pipelines
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
# Configure the dataset features
action_features = hw_to_dataset_features(robot.action_features, ACTION)
obs_features = hw_to_dataset_features(robot.observation_features, OBS_STR)
@@ -93,9 +89,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
# Reset the environment if not stopping or re-recording
@@ -111,9 +104,6 @@ def main():
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
if events["rerecord_episode"]:
+25 -78
View File
@@ -17,30 +17,16 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import (
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_teleop_action_processor,
)
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_record import record_loop
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import build_dataset_features
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description"
HF_MODEL_ID = "<hf_username>/<model_repo_id>"
HF_DATASET_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot configuration & robot
@@ -64,68 +54,31 @@ def main():
robot = SO100Follower(robot_config)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Attach FK/IK pipelines so the robot works in EE space
motor_names = list(robot.bus.motors.keys())
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert joints observation to EE observation
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
)
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Create the dataset
# Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit
dataset = LeRobotDataset.create(
repo_id=HF_DATASET_ID,
fps=FPS,
features=combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
# User for now should be explicit on the feature keys that were used for record
# Alternatively, the user can pass the processor step that has the right features
aggregate_pipeline_dataset_features(
pipeline=make_default_teleop_action_processor(),
initial_features=create_initial_features(
action={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
}
),
use_videos=True,
),
features=build_dataset_features(
robot,
use_videos=True,
action_features={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
},
),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Build Policy Processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy,
@@ -151,21 +104,18 @@ def main():
for episode_idx in range(NUM_EPISODES):
log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Main record loop — pipelines applied internally by robot
record_loop(
robot=robot,
events=events,
fps=FPS,
policy=policy,
preprocessor=preprocessor, # Pass the pre and post policy processors
preprocessor=preprocessor,
postprocessor=postprocessor,
dataset=dataset,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
# Reset the environment if not stopping or re-recording
@@ -180,9 +130,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
if events["rerecord_episode"]:
+50 -73
View File
@@ -16,21 +16,17 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
robot_action_to_transition,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.pipelines import make_so10x_fk_observation_pipeline
from lerobot.robots.so_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
EEReferenceAndDelta,
ForwardKinematicsJointsToEE,
GripperVelocityToJoint,
InverseKinematicsEEToJoints,
)
@@ -39,6 +35,7 @@ from lerobot.teleoperators.phone.config_phone import PhoneConfig, PhoneOS
from lerobot.teleoperators.phone.phone_processor import MapPhoneActionToRobotAction
from lerobot.teleoperators.phone.teleop_phone import Phone
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import build_dataset_features
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -49,6 +46,10 @@ RESET_TIME_SEC = 30
TASK_DESCRIPTION = "My task description"
HF_REPO_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot and teleoperator configurations
@@ -65,77 +66,59 @@ def main():
robot = SO100Follower(robot_config)
phone = Phone(teleop_config)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
motor_names = list(robot.bus.motors.keys())
from lerobot.model.kinematics import RobotKinematics
kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
urdf_path=URDF_PATH,
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
joint_names=motor_names,
)
# Build pipeline to convert phone action to EE action
phone_to_robot_ee_pose_processor = RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
](
steps=[
MapPhoneActionToRobotAction(platform=teleop_config.phone_os),
EEReferenceAndDelta(
kinematics=kinematics_solver,
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
motor_names=list(robot.bus.motors.keys()),
use_latched_reference=True,
),
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.20,
),
GripperVelocityToJoint(speed_factor=20.0),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
# Phone output pipeline: map raw phone gesture to EE delta (no robot obs needed)
phone.set_output_pipeline(
RobotProcessorPipeline[RobotAction, RobotAction](
steps=[MapPhoneActionToRobotAction(platform=teleop_config.phone_os)],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
# Robot FK observation pipeline: joints → EE pose
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
# Robot input pipeline: EE delta + current robot obs → joint commands
robot.set_input_pipeline(
RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
EEReferenceAndDelta(
kinematics=kinematics_solver,
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
motor_names=motor_names,
use_latched_reference=True,
),
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.20,
),
GripperVelocityToJoint(speed_factor=20.0),
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=motor_names,
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
)
# Build pipeline to convert joint observation to EE observation
robot_joints_to_ee_pose = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
)
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Create the dataset
# Dataset features auto-derived from robot's FK obs pipeline and phone's mapped action pipeline
dataset = LeRobotDataset.create(
repo_id=HF_REPO_ID,
fps=FPS,
features=combine_feature_dicts(
# Run the feature contract of the pipelines
# This tells you how the features would look like after the pipeline steps
aggregate_pipeline_dataset_features(
pipeline=phone_to_robot_ee_pose_processor,
initial_features=create_initial_features(action=phone.action_features),
use_videos=True,
),
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
),
features=build_dataset_features(robot, phone, use_videos=True),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
@@ -158,7 +141,7 @@ def main():
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Main record loop — pipelines applied internally by robot and phone
record_loop(
robot=robot,
events=events,
@@ -168,9 +151,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=phone_to_robot_ee_pose_processor,
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose,
)
# Reset the environment if not stopping or re-recording
@@ -186,9 +166,6 @@ def main():
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=phone_to_robot_ee_pose_processor,
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose,
)
if events["rerecord_episode"]:
+2 -2
View File
@@ -87,8 +87,8 @@ from lerobot.policies.rtc.action_queue import ActionQueue
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.policies.rtc.latency_tracker import LatencyTracker
from lerobot.processor.factory import (
make_default_robot_action_processor,
make_default_robot_observation_processor,
_make_identity_observation_pipeline as make_default_robot_observation_processor,
_make_identity_robot_action_pipeline as make_default_robot_action_processor,
)
from lerobot.rl.process import ProcessSignalHandler
from lerobot.robots import ( # noqa: F401
+26 -79
View File
@@ -17,30 +17,16 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import (
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_teleop_action_processor,
)
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_record import record_loop
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import build_dataset_features
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description"
HF_MODEL_ID = "<hf_username>/<model_repo_id>"
HF_DATASET_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot configuration & robot
@@ -64,68 +54,31 @@ def main():
robot = SO100Follower(robot_config)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Attach FK/IK pipelines so the robot works in EE space
motor_names = list(robot.bus.motors.keys())
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert joints observation to EE observation
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
)
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Create the dataset
# Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit
dataset = LeRobotDataset.create(
repo_id=HF_DATASET_ID,
fps=FPS,
features=combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
# User for now should be explicit on the feature keys that were used for record
# Alternatively, the user can pass the processor step that has the right features
aggregate_pipeline_dataset_features(
pipeline=make_default_teleop_action_processor(),
initial_features=create_initial_features(
action={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
}
),
use_videos=True,
),
features=build_dataset_features(
robot,
use_videos=True,
action_features={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
},
),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Build Policy Processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy,
@@ -135,7 +88,7 @@ def main():
preprocessor_overrides={"device_processor": {"device": str(policy.config.device)}},
)
# Connect the robot and teleoperator
# Connect the robot
robot.connect()
# Initialize the keyboard listener and rerun visualization
@@ -151,21 +104,18 @@ def main():
for episode_idx in range(NUM_EPISODES):
log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Main record loop — pipelines applied internally by robot
record_loop(
robot=robot,
events=events,
fps=FPS,
policy=policy,
preprocessor=preprocessor, # Pass the pre and post policy processors
preprocessor=preprocessor,
postprocessor=postprocessor,
dataset=dataset,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
# Reset the environment if not stopping or re-recording
@@ -180,9 +130,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
if events["rerecord_episode"]:
+27 -89
View File
@@ -17,25 +17,20 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_record import record_loop
from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig
from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import (
build_dataset_features,
check_action_space_compatibility,
check_observation_space_compatibility,
)
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -46,6 +41,10 @@ RESET_TIME_SEC = 30
TASK_DESCRIPTION = "My task description"
HF_REPO_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: Use the URDF from the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot and teleoperator configurations
@@ -62,77 +61,17 @@ def main():
follower = SO100Follower(follower_config)
leader = SO100Leader(leader_config)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
follower_kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(follower.bus.motors.keys()),
)
# Attach EE-space pipelines to the objects
motor_names = list(follower.bus.motors.keys())
follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, list(leader.bus.motors.keys())))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
leader_kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(leader.bus.motors.keys()),
)
# Build pipeline to convert follower joints to EE observation
follower_joints_to_ee = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=follower_kinematics_solver, motor_names=list(follower.bus.motors.keys())
),
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Build pipeline to convert leader joints to EE action
leader_joints_to_ee = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
ForwardKinematicsJointsToEE(
kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys())
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert EE action to follower joints
ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
[
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.10,
),
InverseKinematicsEEToJoints(
kinematics=follower_kinematics_solver,
motor_names=list(follower.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Create the dataset
# Dataset features are derived automatically from robot/teleop pipelines
dataset = LeRobotDataset.create(
repo_id=HF_REPO_ID,
fps=FPS,
features=combine_feature_dicts(
# Run the feature contract of the pipelines
# This tells you how the features would look like after the pipeline steps
aggregate_pipeline_dataset_features(
pipeline=leader_joints_to_ee,
initial_features=create_initial_features(action=leader.action_features),
use_videos=True,
),
aggregate_pipeline_dataset_features(
pipeline=follower_joints_to_ee,
initial_features=create_initial_features(observation=follower.observation_features),
use_videos=True,
),
),
features=build_dataset_features(follower, leader, use_videos=True),
robot_type=follower.name,
use_videos=True,
image_writer_threads=4,
@@ -142,9 +81,13 @@ def main():
leader.connect()
follower.connect()
# Verify action/observation space alignment (warns on mismatch)
check_action_space_compatibility(leader, follower)
check_observation_space_compatibility(follower, leader)
# Initialize the keyboard listener and rerun visualization
listener, events = init_keyboard_listener()
init_rerun(session_name="recording_phone")
init_rerun(session_name="recording_ee")
try:
if not leader.is_connected or not follower.is_connected:
@@ -155,7 +98,8 @@ def main():
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Pipelines applied automatically inside robot.get_observation(),
# teleop.get_action(), and robot.send_action()
record_loop(
robot=follower,
events=events,
@@ -165,9 +109,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=leader_joints_to_ee,
robot_action_processor=ee_to_follower_joints,
robot_observation_processor=follower_joints_to_ee,
)
# Reset the environment if not stopping or re-recording
@@ -183,9 +124,6 @@ def main():
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=leader_joints_to_ee,
robot_action_processor=ee_to_follower_joints,
robot_observation_processor=follower_joints_to_ee,
)
if events["rerecord_episode"]:
+24 -77
View File
@@ -14,27 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
robot_action_observation_to_transition,
robot_action_to_transition,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_teleoperate import teleop_loop
from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.visualization_utils import init_rerun, log_rerun_data
from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline
from lerobot.utils.pipeline_utils import check_action_space_compatibility
from lerobot.utils.visualization_utils import init_rerun
FPS = 30
# NOTE: Use the URDF from the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Initialize the robot and teleoperator config
@@ -47,47 +43,14 @@ def main():
follower = SO100Follower(follower_config)
leader = SO100Leader(leader_config)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
follower_kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(follower.bus.motors.keys()),
)
# Attach EE-space pipelines to the objects
motor_names = list(follower.bus.motors.keys())
follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, list(leader.bus.motors.keys())))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
leader_kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(leader.bus.motors.keys()),
)
# Build pipeline to convert teleop joints to EE action
leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction](
steps=[
ForwardKinematicsJointsToEE(
kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys())
),
],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
# build pipeline to convert EE action to robot joints
ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
[
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.10,
),
InverseKinematicsEEToJoints(
kinematics=follower_kinematics_solver,
motor_names=list(follower.bus.motors.keys()),
initial_guess_current_joints=False,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Verify action space alignment (warns if leader EE ≠ follower action_features)
check_action_space_compatibility(leader, follower)
# Connect to the robot and teleoperator
follower.connect()
@@ -97,28 +60,12 @@ def main():
init_rerun(session_name="so100_so100_EE_teleop")
print("Starting teleop loop...")
while True:
t0 = time.perf_counter()
# Get robot observation
robot_obs = follower.get_observation()
# Get teleop observation
leader_joints_obs = leader.get_action()
# teleop joints -> teleop EE action
leader_ee_act = leader_to_ee(leader_joints_obs)
# teleop EE -> robot joints
follower_joints_act = ee_to_follower_joints((leader_ee_act, robot_obs))
# Send action to robot
_ = follower.send_action(follower_joints_act)
# Visualize
log_rerun_data(observation=leader_ee_act, action=follower_joints_act)
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
try:
# Pipelines applied automatically inside teleop.get_action() and robot.send_action()
teleop_loop(teleop=leader, robot=follower, fps=FPS, display_data=True)
finally:
follower.disconnect()
leader.disconnect()
if __name__ == "__main__":
+6 -102
View File
@@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from collections.abc import Sequence
from typing import Any
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from lerobot.configs.types import PipelineFeatureType
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.processor import DataProcessorPipeline, RobotAction, RobotObservation
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR
if TYPE_CHECKING:
from lerobot.processor import RobotAction, RobotObservation
def create_initial_features(
@@ -41,99 +41,3 @@ def create_initial_features(
if observation:
features[PipelineFeatureType.OBSERVATION] = observation
return features
# Helper to filter state/action keys based on regex patterns.
def should_keep(key: str, patterns: tuple[str]) -> bool:
if patterns is None:
return True
return any(re.search(pat, key) for pat in patterns)
def strip_prefix(key: str, prefixes_to_strip: tuple[str]) -> str:
for prefix in prefixes_to_strip:
if key.startswith(prefix):
return key[len(prefix) :]
return key
# Define prefixes to strip from feature keys for clean names.
# Handles both fully qualified (e.g., "action.state") and short (e.g., "state") forms.
PREFIXES_TO_STRIP = tuple(
f"{token}." for const in (ACTION, OBS_STATE, OBS_IMAGES) for token in (const, const.split(".")[-1])
)
def aggregate_pipeline_dataset_features(
pipeline: DataProcessorPipeline,
initial_features: dict[PipelineFeatureType, dict[str, Any]],
*,
use_videos: bool = True,
patterns: Sequence[str] | None = None,
) -> dict[str, dict]:
"""
Aggregates and filters pipeline features to create a dataset-ready features dictionary.
This function transforms initial features using the pipeline, categorizes them as action or observations
(image or state), filters them based on `use_videos` and `patterns`, and finally
formats them for use with a Hugging Face LeRobot Dataset.
Args:
pipeline: The DataProcessorPipeline to apply.
initial_features: A dictionary of raw feature specs for actions and observations.
use_videos: If False, image features are excluded.
patterns: A sequence of regex patterns to filter action and state features.
Image features are not affected by this filter.
Returns:
A dictionary of features formatted for a Hugging Face LeRobot Dataset.
"""
all_features = pipeline.transform_features(initial_features)
# Intermediate storage for categorized and filtered features.
processed_features: dict[str, dict[str, Any]] = {
ACTION: {},
OBS_STR: {},
}
images_token = OBS_IMAGES.split(".")[-1]
# Iterate through all features transformed by the pipeline.
for ptype, feats in all_features.items():
if ptype not in [PipelineFeatureType.ACTION, PipelineFeatureType.OBSERVATION]:
continue
for key, value in feats.items():
# 1. Categorize the feature.
is_action = ptype == PipelineFeatureType.ACTION
# Observations are classified as images if their key matches image-related tokens or if the shape of the feature is 3.
# All other observations are treated as state.
is_image = not is_action and (
(isinstance(value, tuple) and len(value) == 3)
or (
key.startswith(f"{OBS_IMAGES}.")
or key.startswith(f"{images_token}.")
or f".{images_token}." in key
)
)
# 2. Apply filtering rules.
if is_image and not use_videos:
continue
if not is_image and not should_keep(key, patterns):
continue
# 3. Add the feature to the appropriate group with a clean name.
name = strip_prefix(key, PREFIXES_TO_STRIP)
if is_action:
processed_features[ACTION][name] = value
else:
processed_features[OBS_STR][name] = value
# Convert the processed features into the final dataset format.
dataset_features = {}
if processed_features[ACTION]:
dataset_features.update(hw_to_dataset_features(processed_features[ACTION], ACTION, use_videos))
if processed_features[OBS_STR]:
dataset_features.update(hw_to_dataset_features(processed_features[OBS_STR], OBS_STR, use_videos))
return dataset_features
+1 -11
View File
@@ -30,12 +30,6 @@ from .core import (
)
from .delta_action_processor import MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep
from .device_processor import DeviceProcessorStep
from .factory import (
make_default_processors,
make_default_robot_action_processor,
make_default_robot_observation_processor,
make_default_teleop_action_processor,
)
from .gym_action_processor import (
Numpy2TorchActionProcessorStep,
Torch2NumpyActionProcessorStep,
@@ -95,11 +89,7 @@ __all__ = [
"ImageCropResizeProcessorStep",
"InfoProcessorStep",
"InterventionActionProcessorStep",
"make_default_processors",
"make_default_teleop_action_processor",
"make_default_robot_action_processor",
"make_default_robot_observation_processor",
"MapDeltaActionToRobotActionStep",
"MapDeltaActionToRobotActionStep",
"MapTensorToDeltaActionDictStep",
"NormalizerProcessorStep",
"Numpy2TorchActionProcessorStep",
+34 -28
View File
@@ -17,6 +17,7 @@
from .converters import (
observation_to_transition,
robot_action_observation_to_transition,
robot_action_to_transition,
transition_to_observation,
transition_to_robot_action,
)
@@ -24,39 +25,44 @@ from .core import RobotAction, RobotObservation
from .pipeline import IdentityProcessorStep, RobotProcessorPipeline
def make_default_teleop_action_processor() -> RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
]:
teleop_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
return teleop_action_processor
# ── Internal identity pipeline helpers (used by Robot/Teleoperator base classes) ──────────────────
def make_default_robot_action_processor() -> RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
]:
robot_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
return robot_action_processor
def make_default_robot_observation_processor() -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
robot_observation_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
def _make_identity_observation_pipeline() -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
"""Identity pipeline for robot observations (get_observation output pipeline)."""
return RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
def _make_identity_robot_action_pipeline() -> RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
]:
"""Identity pipeline for robot action input (send_action input pipeline, takes (action, obs) tuple)."""
return RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
def _make_identity_teleop_action_pipeline() -> RobotProcessorPipeline[RobotAction, RobotAction]:
"""Identity pipeline for teleop action output (get_action output pipeline, takes just action)."""
return RobotProcessorPipeline[RobotAction, RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
def _make_identity_feedback_pipeline() -> RobotProcessorPipeline[dict, dict]:
"""Identity pipeline for teleop feedback input (send_feedback input pipeline)."""
return RobotProcessorPipeline[dict, dict](
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
return robot_observation_processor
def make_default_processors():
teleop_action_processor = make_default_teleop_action_processor()
robot_action_processor = make_default_robot_action_processor()
robot_observation_processor = make_default_robot_observation_processor()
return (teleop_action_processor, robot_action_processor, robot_observation_processor)
+4 -2
View File
@@ -19,15 +19,17 @@ from __future__ import annotations
from copy import deepcopy
from dataclasses import dataclass, field
from typing import Any
from typing import TYPE_CHECKING, Any
import torch
from torch import Tensor
from lerobot.configs.types import FeatureType, NormalizationMode, PipelineFeatureType, PolicyFeature
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.constants import ACTION
if TYPE_CHECKING:
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from .converters import from_tensor_to_numpy, to_tensor
from .core import EnvTransition, PolicyAction, TransitionKey
from .pipeline import PolicyProcessorPipeline, ProcessorStep, ProcessorStepRegistry, RobotObservation
+5 -9
View File
@@ -43,12 +43,9 @@ from lerobot.utils.import_utils import _transformers_available
from .core import EnvTransition, RobotObservation, TransitionKey
from .pipeline import ActionProcessorStep, ObservationProcessorStep, ProcessorStepRegistry
# Conditional import for type checking and lazy loading
if TYPE_CHECKING or _transformers_available:
# Type-checking only import — do NOT import transformers at module level (it loads TF which blocks)
if TYPE_CHECKING:
from transformers import AutoProcessor, AutoTokenizer
else:
AutoProcessor = None
AutoTokenizer = None
@dataclass
@@ -106,8 +103,7 @@ class TokenizerProcessorStep(ObservationProcessorStep):
# Use provided tokenizer object directly
self.input_tokenizer = self.tokenizer
elif self.tokenizer_name is not None:
if AutoTokenizer is None:
raise ImportError("AutoTokenizer is not available")
from transformers import AutoTokenizer # lazy import to avoid TF deadlock at module load
self.input_tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_name)
else:
raise ValueError(
@@ -370,12 +366,12 @@ class ActionTokenizerProcessorStep(ActionProcessorStep):
"Please install it with `pip install 'lerobot[transformers-dep]'` to use ActionTokenizerProcessorStep."
)
from transformers import AutoProcessor, AutoTokenizer # lazy import to avoid TF deadlock at module load
if self.action_tokenizer_input_object is not None:
self.action_tokenizer = self.action_tokenizer_input_object
elif self.action_tokenizer_name is not None:
if AutoProcessor is None:
raise ImportError("AutoProcessor is not available")
self.action_tokenizer = AutoProcessor.from_pretrained(
self.action_tokenizer_name, trust_remote_code=self.trust_remote_code
)
@@ -102,11 +102,11 @@ class BiOpenArmFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -136,7 +136,7 @@ class BiOpenArmFollower(Robot):
)
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
obs_dict = {}
# Add "left_" prefix
@@ -150,7 +150,7 @@ class BiOpenArmFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(
def _send_action(
self,
action: RobotAction,
custom_kp: dict[str, float] | None = None,
@@ -86,11 +86,11 @@ class BiSOFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -119,7 +119,7 @@ class BiSOFollower(Robot):
self.right_arm.setup_motors()
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
obs_dict = {}
# Add "left_" prefix
@@ -133,7 +133,7 @@ class BiSOFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
# Remove "left_" prefix
left_action = {
key.removeprefix("left_"): value for key, value in action.items() if key.startswith("left_")
@@ -147,7 +147,7 @@ class EarthRoverMiniPlus(Robot):
pass
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
"""Define the observation space for dataset recording.
Returns:
@@ -184,7 +184,7 @@ class EarthRoverMiniPlus(Robot):
}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
"""Define the action space.
Returns:
@@ -198,7 +198,7 @@ class EarthRoverMiniPlus(Robot):
}
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
"""Get current robot observation from SDK.
Returns:
@@ -255,7 +255,7 @@ class EarthRoverMiniPlus(Robot):
return observation
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Send action to robot via SDK.
Args:
+4 -4
View File
@@ -71,11 +71,11 @@ class HopeJrArm(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -128,7 +128,7 @@ class HopeJrArm(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position", self.other_motors)
@@ -147,7 +147,7 @@ class HopeJrArm(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
+4 -4
View File
@@ -107,11 +107,11 @@ class HopeJrHand(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -158,7 +158,7 @@ class HopeJrHand(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
obs_dict = {}
# Read hand position
@@ -178,7 +178,7 @@ class HopeJrHand(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
self.bus.sync_write("Goal_Position", goal_pos)
return action
@@ -73,11 +73,11 @@ class KochFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -182,7 +182,7 @@ class KochFollower(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position")
@@ -200,7 +200,7 @@ class KochFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
+4 -4
View File
@@ -98,11 +98,11 @@ class LeKiwi(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._state_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._state_ft
@property
@@ -338,7 +338,7 @@ class LeKiwi(Robot):
} # m/s and deg/s
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read actuators position for arm and vel for base
start = time.perf_counter()
arm_pos = self.bus.sync_read("Present_Position", self.arm_motors)
@@ -367,7 +367,7 @@ class LeKiwi(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command lekiwi to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
+4 -4
View File
@@ -98,11 +98,11 @@ class LeKiwiClient(Robot):
return {name: (cfg.height, cfg.width, 3) for name, cfg in self.config.cameras.items()}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._state_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._state_ft
@property
@@ -250,7 +250,7 @@ class LeKiwiClient(Robot):
return new_frames, new_state
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
"""
Capture observations from the remote robot: current follower arm positions,
present wheel speeds (converted to body-frame velocities: x, y, theta),
@@ -304,7 +304,7 @@ class LeKiwiClient(Robot):
pass
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command lekiwi to move to a target joint configuration. Translates to motor space + sends over ZMQ
Args:
@@ -73,11 +73,11 @@ class OmxFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -165,7 +165,7 @@ class OmxFollower(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position")
@@ -183,7 +183,7 @@ class OmxFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
@@ -105,12 +105,12 @@ class OpenArmFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
"""Combined observation features from motors and cameras."""
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
"""Action features."""
return self._motors_ft
@@ -219,7 +219,7 @@ class OpenArmFollower(Robot):
)
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
"""
Get current observation from robot including position, velocity, and torque.
@@ -251,7 +251,7 @@ class OpenArmFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(
def _send_action(
self,
action: RobotAction,
custom_kp: dict[str, float] | None = None,
+4 -4
View File
@@ -95,11 +95,11 @@ class Reachy2Robot(Robot):
self.joints_dict: dict[str, str] = self._generate_joints_dict()
@property
def observation_features(self) -> dict[str, Any]:
def raw_observation_features(self) -> dict[str, Any]:
return {**self.motors_features, **self.camera_features}
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self.motors_features
@property
@@ -170,7 +170,7 @@ class Reachy2Robot(Robot):
else:
return {}
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
obs_dict: RobotObservation = {}
# Read Reachy 2 state
@@ -184,7 +184,7 @@ class Reachy2Robot(Robot):
return obs_dict
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
if self.reachy is not None:
if not self.is_connected:
raise ConnectionError()
+158 -34
View File
@@ -18,8 +18,11 @@ from pathlib import Path
import draccus
from lerobot.configs.types import PipelineFeatureType
from lerobot.motors import MotorCalibration
from lerobot.processor import RobotAction, RobotObservation
from lerobot.processor.core import RobotAction, RobotObservation
from lerobot.processor.factory import _make_identity_observation_pipeline, _make_identity_robot_action_pipeline
from lerobot.processor.pipeline import RobotProcessorPipeline
from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, ROBOTS
from .config import RobotConfig
@@ -34,6 +37,10 @@ class Robot(abc.ABC):
This class provides a standardized interface for interacting with physical robots.
Subclasses must implement all abstract methods and properties to be usable.
Pipelines are first-class citizens: every robot carries an optional output pipeline
(applied in get_observation()) and an optional input pipeline (applied in send_action()).
Both default to identity (no-op), so existing robots work without any changes.
Attributes:
config_class (RobotConfig): The expected configuration class for this robot.
name (str): The unique robot name used to identify this robot type.
@@ -55,6 +62,12 @@ class Robot(abc.ABC):
if self.calibration_fpath.is_file():
self._load_calibration()
# Pipeline interface — default to identity (no-op), swap via set_output/input_pipeline()
self._output_pipeline: RobotProcessorPipeline = _make_identity_observation_pipeline()
self._input_pipeline: RobotProcessorPipeline = _make_identity_robot_action_pipeline()
# Cache of most recent raw observation; used by input_pipeline for IK initial guess
self._last_raw_obs: RobotObservation = {}
def __str__(self) -> str:
return f"{self.id} {self.__class__.__name__}"
@@ -84,40 +97,117 @@ class Robot(abc.ABC):
except Exception: # nosec B110
pass
# TODO(aliberts): create a proper Feature class for this that links with datasets
# ── Pipeline interface ────────────────────────────────────────────────────
def output_pipeline(self) -> RobotProcessorPipeline:
"""
Pipeline applied inside get_observation() to transform raw hardware observations.
Default: identity (no-op). Override via set_output_pipeline() or subclassing.
Example: set a forward-kinematics pipeline to convert joint positions to EE pose.
"""
return self._output_pipeline
def input_pipeline(self) -> RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]:
"""
Pipeline applied inside send_action() to transform incoming actions before hardware write.
Default: identity (no-op). Override via set_input_pipeline() or subclassing.
The pipeline receives a (action, last_raw_obs) tuple so IK solvers can use the
current joint configuration as an initial guess.
Example: set an inverse-kinematics pipeline to convert EE commands to joint positions.
"""
return self._input_pipeline
def set_output_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
"""Set the observation output pipeline (applied in get_observation())."""
self._output_pipeline = pipeline
def set_input_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
"""Set the action input pipeline (applied in send_action())."""
self._input_pipeline = pipeline
# ── Feature properties ────────────────────────────────────────────────────
@property
@abc.abstractmethod
def observation_features(self) -> dict:
"""
A dictionary describing the structure and types of the observations produced by the robot.
Its structure (keys) should match the structure of what is returned by :pymeth:`get_observation`.
Values for the dict should either be:
- The type of the value if it's a simple value, e.g. `float` for single proprioceptive value (a joint's position/velocity)
- A tuple representing the shape if it's an array-type value, e.g. `(height, width, channel)` for images
Pipeline-transformed observation features.
Note: this property should be able to be called regardless of whether the robot is connected or not.
Applies output_pipeline().transform_features() to raw_observation_features so the
returned dict matches what get_observation() actually returns to callers.
Use raw_observation_features to inspect hardware-level feature shapes.
Note: this property should be able to be called regardless of whether the robot
is connected or not.
"""
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
initial = create_initial_features(observation=self.raw_observation_features)
transformed = self.output_pipeline().transform_features(initial)
return transformed.get(PipelineFeatureType.OBSERVATION, {})
@property
@abc.abstractmethod
def raw_observation_features(self) -> dict:
"""
Hardware-level observation features (before any pipeline transformation).
A dictionary describing the structure and types of the observations produced
directly by the robot hardware. Its structure (keys) should match the structure
of what is returned by :pymeth:`_get_observation`. Values should be:
- The type if it's a simple value, e.g. ``float`` for joint position
- A tuple representing the shape for array values, e.g. ``(H, W, C)`` for images
Note: this property should be able to be called regardless of whether the robot
is connected or not.
"""
pass
@property
@abc.abstractmethod
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
"""
A dictionary describing the structure and types of the actions expected by the robot. Its structure
(keys) should match the structure of what is passed to :pymeth:`send_action`. Values for the dict
should be the type of the value if it's a simple value, e.g. `float` for single proprioceptive value
(a joint's goal position/velocity)
Hardware-level action features (before any pipeline transformation).
Note: this property should be able to be called regardless of whether the robot is connected or not.
A dictionary describing the structure and types of the actions accepted directly
by the robot hardware (i.e. what :pymeth:`_send_action` receives). Its structure
(keys) should match the structure of what is expected by :pymeth:`_send_action`.
Values should be the type of the value if it's a simple value, e.g. ``float`` for
single proprioceptive value (a joint's goal position/velocity).
Note: this property should be able to be called regardless of whether the robot
is connected or not.
"""
pass
@property
def action_features(self) -> dict:
"""
Pipeline-transformed action features.
Applies input_pipeline().transform_features() to raw_action_features so the
returned dict reflects what the input pipeline outputs to hardware.
Use raw_action_features to inspect hardware-level action feature shapes.
Note: this property should be able to be called regardless of whether the robot
is connected or not.
"""
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
initial = create_initial_features(action=self.raw_action_features)
transformed = self.input_pipeline().transform_features(initial)
return transformed.get(PipelineFeatureType.ACTION, {})
@property
@abc.abstractmethod
def is_connected(self) -> bool:
"""
Whether the robot is currently connected or not. If `False`, calling :pymeth:`get_observation` or
:pymeth:`send_action` should raise an error.
Whether the robot is currently connected or not. If ``False``, calling
:pymeth:`get_observation` or :pymeth:`send_action` should raise an error.
"""
pass
@@ -135,7 +225,7 @@ class Robot(abc.ABC):
@property
@abc.abstractmethod
def is_calibrated(self) -> bool:
"""Whether the robot is currently calibrated or not. Should be always `True` if not applicable"""
"""Whether the robot is currently calibrated or not. Should be always ``True`` if not applicable"""
pass
@abc.abstractmethod
@@ -153,7 +243,7 @@ class Robot(abc.ABC):
Helper to load calibration data from the specified file.
Args:
fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`.
fpath (Path | None): Optional path to the calibration file. Defaults to ``self.calibration_fpath``.
"""
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath) as f, draccus.config_type("json"):
@@ -164,7 +254,7 @@ class Robot(abc.ABC):
Helper to save calibration data to the specified file.
Args:
fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`.
fpath (Path | None): Optional path to save the calibration file. Defaults to ``self.calibration_fpath``.
"""
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath, "w") as f, draccus.config_type("json"):
@@ -178,30 +268,64 @@ class Robot(abc.ABC):
"""
pass
@abc.abstractmethod
# ── Template methods (concrete, call pipeline internally) ─────────────────
def get_observation(self) -> RobotObservation:
"""
Retrieve the current observation from the robot.
Retrieve the current observation from the robot and apply the output pipeline.
Calls :pymeth:`_get_observation` to get raw hardware data, caches it for use as
IK initial guess in :pymeth:`send_action`, then applies :pymeth:`output_pipeline`.
Returns:
RobotObservation: A flat dictionary representing the robot's current sensory state. Its structure
should match :pymeth:`observation_features`.
RobotObservation: Pipeline-transformed observation. With the default identity
pipeline this equals the raw observation from :pymeth:`_get_observation`.
"""
pass
raw = self._get_observation()
self._last_raw_obs = raw
return self.output_pipeline()(raw)
@abc.abstractmethod
def send_action(self, action: RobotAction) -> RobotAction:
def _get_observation(self) -> RobotObservation:
"""
Send an action command to the robot.
Args:
action (RobotAction): Dictionary representing the desired action. Its structure should match
:pymeth:`action_features`.
Retrieve the raw observation directly from robot hardware.
Returns:
RobotAction: The action actually sent to the motors potentially clipped or modified, e.g. by
safety limits on velocity.
RobotObservation: A flat dictionary representing the robot's current sensory
state. Its structure should match :pymeth:`raw_observation_features`.
"""
pass
def send_action(self, action: RobotAction) -> RobotAction:
"""
Apply the input pipeline and send the resulting action to robot hardware.
The input pipeline receives ``(action, last_raw_obs)`` so IK solvers can use the
cached joint configuration as an initial guess. With the default identity pipeline,
the action is forwarded unchanged.
Args:
action (RobotAction): Dictionary representing the desired action. Its structure
should match :pymeth:`action_features`.
Returns:
RobotAction: The action actually sent to the motors, potentially clipped or
modified by the pipeline or hardware safety limits.
"""
transformed = self.input_pipeline()((action, self._last_raw_obs))
return self._send_action(transformed)
@abc.abstractmethod
def _send_action(self, action: RobotAction) -> RobotAction:
"""
Send an action command directly to robot hardware.
Args:
action (RobotAction): Dictionary of motor-level commands. Its structure should
match what the hardware expects (typically motor positions/velocities).
Returns:
RobotAction: The action actually sent, potentially clipped by safety limits.
"""
pass
@@ -0,0 +1,19 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .ee_space import make_so10x_fk_observation_pipeline, make_so10x_ik_action_pipeline
__all__ = ["make_so10x_fk_observation_pipeline", "make_so10x_ik_action_pipeline"]
@@ -0,0 +1,147 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
End-effector space pipelines for SO-100/101 follower robots.
These factory functions return ready-to-use pipelines that convert between joint space
and Cartesian end-effector space. Attach them to a robot with ``set_output_pipeline`` /
``set_input_pipeline`` to enable EE-space recording and teleoperation.
Example::
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
motor_names = list(follower.bus.motors.keys())
follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
"""
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
)
_DEFAULT_EE_BOUNDS = {"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}
_DEFAULT_GRIPPER_FRAME = "gripper_frame_link"
def make_so10x_fk_observation_pipeline(
urdf_path: str,
motor_names: list[str],
*,
target_frame_name: str = _DEFAULT_GRIPPER_FRAME,
) -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
"""
Create a forward-kinematics observation pipeline for SO-100/101 follower robots.
Converts raw joint positions (observation) into end-effector pose (position + orientation).
Attach this to a follower robot via ``set_output_pipeline`` so that ``get_observation()``
returns EE coordinates instead of raw joint angles.
Args:
urdf_path: Path to the SO-100/101 URDF file used for kinematics.
motor_names: Ordered list of motor names matching the URDF joint names.
target_frame_name: Name of the end-effector frame in the URDF.
Returns:
A RobotProcessorPipeline that maps joint observations to EE observations.
Example::
follower.set_output_pipeline(
make_so10x_fk_observation_pipeline("./so101.urdf", motor_names)
)
obs = follower.get_observation() # now contains ee.x, ee.y, ee.z, ...
"""
kinematics = RobotKinematics(
urdf_path=urdf_path,
target_frame_name=target_frame_name,
joint_names=motor_names,
)
return RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[ForwardKinematicsJointsToEE(kinematics=kinematics, motor_names=motor_names)],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
def make_so10x_ik_action_pipeline(
urdf_path: str,
motor_names: list[str],
*,
target_frame_name: str = _DEFAULT_GRIPPER_FRAME,
end_effector_bounds: dict | None = None,
max_ee_step_m: float = 0.10,
) -> RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]:
"""
Create an inverse-kinematics action pipeline for SO-100/101 follower robots.
Converts incoming end-effector pose commands into joint positions, applying safety
bounds and step-size limits before solving IK. The current joint positions are used
as the IK initial guess (taken from the cached ``_last_raw_obs``).
Attach this to a follower robot via ``set_input_pipeline`` so that ``send_action()``
receives EE commands and translates them to motor positions before the hardware write.
Args:
urdf_path: Path to the SO-100/101 URDF file used for kinematics.
motor_names: Ordered list of motor names matching the URDF joint names.
target_frame_name: Name of the end-effector frame in the URDF.
end_effector_bounds: Dict with ``"min"`` and ``"max"`` lists (3D position bounds in metres).
Defaults to ``{"min": [-1, -1, -1], "max": [1, 1, 1]}``.
max_ee_step_m: Maximum allowed EE position change per step in metres.
Returns:
A RobotProcessorPipeline that maps (EE action, raw obs) to joint action.
Example::
follower.set_input_pipeline(
make_so10x_ik_action_pipeline("./so101.urdf", motor_names)
)
# send_action() now accepts ee.x, ee.y, ee.z, ee.wx, ee.wy, ee.wz, ee.gripper_vel
"""
kinematics = RobotKinematics(
urdf_path=urdf_path,
target_frame_name=target_frame_name,
joint_names=motor_names,
)
bounds = end_effector_bounds or _DEFAULT_EE_BOUNDS
return RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
EEBoundsAndSafety(end_effector_bounds=bounds, max_ee_step_m=max_ee_step_m),
InverseKinematicsEEToJoints(
kinematics=kinematics,
motor_names=motor_names,
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
@@ -74,11 +74,11 @@ class SOFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -176,7 +176,7 @@ class SOFollower(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position")
@@ -194,7 +194,7 @@ class SOFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
+6 -6
View File
@@ -170,7 +170,7 @@ class UnitreeG1(Robot):
time.sleep(sleep_time)
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{G1_29_JointIndex(motor).name}.q": float for motor in G1_29_JointIndex}
def calibrate(self) -> None: # robot is already calibrated
@@ -273,7 +273,7 @@ class UnitreeG1(Robot):
for cam in self._cameras.values():
cam.disconnect()
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
lowstate = self._lowstate
if lowstate is None:
return {}
@@ -351,10 +351,10 @@ class UnitreeG1(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
for motor in G1_29_JointIndex:
key = f"{motor.name}.q"
if key in action:
@@ -421,7 +421,7 @@ class UnitreeG1(Robot):
num_steps = int(total_time / control_dt)
# get current state
obs = self.get_observation()
obs = self._get_observation()
# record current positions
init_dof_pos = np.zeros(29, dtype=np.float32)
@@ -439,7 +439,7 @@ class UnitreeG1(Robot):
interp_pos = init_dof_pos[motor.value] * (1 - alpha) + target_pos * alpha
action_dict[f"{motor.name}.q"] = float(interp_pos)
self.send_action(action_dict)
self._send_action(action_dict)
# Maintain constant control rate
elapsed = time.time() - start_time
+123 -89
View File
@@ -74,6 +74,8 @@ from pathlib import Path
from pprint import pformat
from typing import Any
import torch
from lerobot.cameras import ( # noqa: F401
CameraConfig, # noqa: F401
)
@@ -85,19 +87,16 @@ from lerobot.configs import parser
from lerobot.configs.policies import PreTrainedConfig
from lerobot.datasets.image_writer import safe_stop_image_writer
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import build_dataset_frame, combine_feature_dicts
from lerobot.datasets.utils import build_dataset_frame
from lerobot.datasets.video_utils import VideoEncodingManager
from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.rtc import ActionInterpolator
from lerobot.policies.utils import make_robot_action
from lerobot.processor import (
PolicyAction,
PolicyProcessorPipeline,
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_processors,
)
from lerobot.processor.rename_processor import rename_stats
from lerobot.robots import ( # noqa: F401
@@ -140,6 +139,11 @@ from lerobot.utils.control_utils import (
sanity_check_dataset_robot_compatibility,
)
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.pipeline_utils import (
build_dataset_features,
check_action_space_compatibility,
check_observation_space_compatibility,
)
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.utils import (
get_safe_torch_device,
@@ -155,7 +159,7 @@ class DatasetRecordConfig:
repo_id: str
# A short but accurate description of the task performed during the recording (e.g. "Pick the Lego block and drop it in the box on the right.")
single_task: str
# Root directory where the dataset will be stored (e.g. 'dataset/path'). If None, defaults to $HF_LEROBOT_HOME/repo_id.
# Root directory where the dataset will be stored (e.g. 'dataset/path').
root: str | Path | None = None
# Limit the frames per second.
fps: int = 30
@@ -226,6 +230,9 @@ class RecordConfig:
play_sounds: bool = True
# Resume recording on an existing dataset.
resume: bool = False
# Action interpolation multiplier for smoother policy control (1=off, 2=2x, 3=3x)
# Only applies when using a policy (not teleop)
interpolation_multiplier: int = 1
def __post_init__(self):
# HACK: We parse again the cli args here to get the pretrained path if there was one.
@@ -249,28 +256,23 @@ class RecordConfig:
""" --------------- record_loop() data flow --------------------------
[ Robot ]
V
[ robot.get_observation() ] ---> raw_obs
V
[ robot_observation_processor ] ---> processed_obs
[ robot.get_observation() ] → applies output_pipeline internally → obs
V
.-----( ACTION LOGIC )------------------.
V V
[ From Teleoperator ] [ From Policy ]
| |
| [teleop.get_action] -> raw_action | [predict_action]
| | | |
| V | V
| [teleop_action_processor] | |
| | | |
'---> processed_teleop_action '---> processed_policy_action
| teleop.get_action() | predict_action(obs)
| (output_pipeline applied internally) | |
| | | V
'----> action '---> policy_action_dict
| |
'-------------------------.-------------'
V
[ robot_action_processor ] --> robot_action_to_send
[ robot.send_action(action) ]
(input_pipeline applied internally)
V
[ robot.send_action() ] -- (Robot Executes)
V
( Save to Dataset )
( Save action + obs to Dataset )
V
( Rerun Log / Loop Wait )
"""
@@ -281,15 +283,6 @@ def record_loop(
robot: Robot,
events: dict,
fps: int,
teleop_action_processor: RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
], # runs after teleop
robot_action_processor: RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
], # runs before robot
robot_observation_processor: RobotProcessorPipeline[
RobotObservation, RobotObservation
], # runs after robot
dataset: LeRobotDataset | None = None,
teleop: Teleoperator | list[Teleoperator] | None = None,
policy: PreTrainedPolicy | None = None,
@@ -298,8 +291,30 @@ def record_loop(
control_time_s: int | None = None,
single_task: str | None = None,
display_data: bool = False,
interpolator: ActionInterpolator | None = None,
display_compressed_images: bool = False,
):
"""
Core recording loop. Robot and teleoperator pipelines are applied internally —
no explicit processor arguments are needed.
Args:
robot: The robot instance. Its output_pipeline() transforms observations and
its input_pipeline() transforms actions before hardware write.
events: Control events dict (exit_early, stop_recording, rerecord_episode).
fps: Target control loop frequency.
dataset: If provided, frames are written here each step.
teleop: Teleoperator or list of teleoperators. Its output_pipeline() transforms
actions (e.g., joint → EE) before they are sent to the robot.
policy: Optional pre-trained policy for closed-loop control.
preprocessor: Policy input pre-processor.
postprocessor: Policy output post-processor.
control_time_s: Episode duration in seconds.
single_task: Task description string saved with each frame.
display_data: If True, log observations and actions to Rerun.
interpolator: Optional action interpolator for smoother policy control.
display_compressed_images: If True, compress images before Rerun display.
"""
if dataset is not None and dataset.fps != fps:
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
@@ -334,6 +349,16 @@ def record_loop(
preprocessor.reset()
postprocessor.reset()
# Reset interpolator if provided
if interpolator is not None:
interpolator.reset()
# Calculate control interval based on interpolation
use_interpolation = interpolator is not None and interpolator.enabled and policy is not None
control_interval = interpolator.get_control_interval(fps) if interpolator else 1 / fps
# Pre-compute once — action features don't change during a recording episode
action_keys = sorted(robot.action_features) if use_interpolation else []
no_action_count = 0
timestamp = 0
start_episode_t = time.perf_counter()
@@ -344,43 +369,75 @@ def record_loop(
events["exit_early"] = False
break
# Get robot observation
# Get robot observation (output_pipeline applied internally)
obs = robot.get_observation()
# Applies a pipeline to the raw robot observation, default is IdentityProcessor
obs_processed = robot_observation_processor(obs)
if policy is not None or dataset is not None:
observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix=OBS_STR)
observation_frame = build_dataset_frame(dataset.features, obs, prefix=OBS_STR)
# Get action from either policy or teleop
if policy is not None and preprocessor is not None and postprocessor is not None:
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
# With interpolation: only call policy when interpolator needs new action
if use_interpolation:
if interpolator.needs_new_action():
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
# send_action applies input_pipeline (e.g. IK) internally;
# capture the actually-sent joint action for interpolation
sent_joint_action = robot.send_action(act_processed_policy)
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
# Build interpolation tensor from the motor-level joint action
action_tensor = torch.tensor([sent_joint_action[k] for k in action_keys])
interpolator.add(action_tensor)
# Get interpolated action (in joint/motor space)
interp_action = interpolator.get()
if interp_action is not None:
action_values = {k: interp_action[i].item() for i, k in enumerate(action_keys)}
# Interpolated values are already in joint space; bypass IK pipeline
robot._send_action(action_values)
else:
# No action available yet, skip this iteration
continue
else:
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
# send_action applies input_pipeline (e.g. IK) internally
robot.send_action(act_processed_policy)
action_values = act_processed_policy
elif policy is None and isinstance(teleop, Teleoperator):
act = teleop.get_action()
# Applies a pipeline to the raw teleop action, default is IdentityProcessor
act_processed_teleop = teleop_action_processor((act, obs))
# get_action applies output_pipeline (e.g. FK) internally
action_values = teleop.get_action()
# send_action applies input_pipeline (e.g. IK) internally
robot.send_action(action_values)
elif policy is None and isinstance(teleop, list):
arm_action = teleop_arm.get_action()
# LeKiwi multi-teleop path
arm_action = teleop_arm.get_action() # output_pipeline applied internally
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
keyboard_action = teleop_keyboard.get_action()
base_action = robot._from_keyboard_to_base_action(keyboard_action)
act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
act_processed_teleop = teleop_action_processor((act, obs))
action_values = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
robot.send_action(action_values) # input_pipeline applied internally
else:
no_action_count += 1
if no_action_count == 1 or no_action_count % 10 == 0:
@@ -391,20 +448,6 @@ def record_loop(
)
continue
# Applies a pipeline to the action, default is IdentityProcessor
if policy is not None and act_processed_policy is not None:
action_values = act_processed_policy
robot_action_to_send = robot_action_processor((act_processed_policy, obs))
else:
action_values = act_processed_teleop
robot_action_to_send = robot_action_processor((act_processed_teleop, obs))
# Send action to robot
# Action can eventually be clipped using `max_relative_target`,
# so action actually sent is saved in the dataset. action = postprocessor.process(action)
# TODO(steven, pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot.
_sent_action = robot.send_action(robot_action_to_send)
# Write to dataset
if dataset is not None:
action_frame = build_dataset_frame(dataset.features, action_values, prefix=ACTION)
@@ -413,12 +456,12 @@ def record_loop(
if display_data:
log_rerun_data(
observation=obs_processed, action=action_values, compress_images=display_compressed_images
observation=obs, action=action_values, compress_images=display_compressed_images
)
dt_s = time.perf_counter() - start_loop_t
sleep_time_s: float = 1 / fps - dt_s
sleep_time_s: float = control_interval - dt_s
if sleep_time_s < 0:
logging.warning(
f"Record loop is running slower ({1 / dt_s:.1f} Hz) than the target FPS ({fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
@@ -444,22 +487,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
robot = make_robot_from_config(cfg.robot)
teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
dataset_features = combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=teleop_action_processor,
initial_features=create_initial_features(
action=robot.action_features
), # TODO(steven, pepijn): in future this should be come from teleop or policy
use_videos=cfg.dataset.video,
),
aggregate_pipeline_dataset_features(
pipeline=robot_observation_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=cfg.dataset.video,
),
)
# Dataset features derived automatically from robot/teleop pipelines.
# When teleop is None (policy-only recording), only observation features are included.
dataset_features = build_dataset_features(robot, teleop, use_videos=cfg.dataset.video)
dataset = None
listener = None
@@ -505,6 +535,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)
preprocessor = None
postprocessor = None
interpolator = None
if cfg.policy is not None:
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
@@ -515,11 +546,19 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
"rename_observations_processor": {"rename_map": cfg.dataset.rename_map},
},
)
# Create interpolator for smoother policy control
if cfg.interpolation_multiplier > 1:
interpolator = ActionInterpolator(multiplier=cfg.interpolation_multiplier)
logging.info(f"Action interpolation enabled: {cfg.interpolation_multiplier}x control rate")
robot.connect()
if teleop is not None:
teleop.connect()
if teleop is not None:
check_action_space_compatibility(teleop, robot)
check_observation_space_compatibility(robot, teleop)
listener, events = init_keyboard_listener()
if not cfg.dataset.streaming_encoding:
@@ -535,9 +574,6 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
robot=robot,
events=events,
fps=cfg.dataset.fps,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
teleop=teleop,
policy=policy,
preprocessor=preprocessor,
@@ -546,6 +582,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
control_time_s=cfg.dataset.episode_time_s,
single_task=cfg.dataset.single_task,
display_data=cfg.display_data,
interpolator=interpolator,
display_compressed_images=display_compressed_images,
)
@@ -564,9 +601,6 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
robot=robot,
events=events,
fps=cfg.dataset.fps,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
teleop=teleop,
control_time_s=cfg.dataset.reset_time_s,
single_task=cfg.dataset.single_task,
+3 -9
View File
@@ -47,9 +47,6 @@ from pprint import pformat
from lerobot.configs import parser
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.processor import (
make_default_robot_action_processor,
)
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
@@ -99,8 +96,6 @@ def replay(cfg: ReplayConfig):
init_logging()
logging.info(pformat(asdict(cfg)))
robot_action_processor = make_default_robot_action_processor()
robot = make_robot_from_config(cfg.robot)
dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode])
@@ -120,11 +115,10 @@ def replay(cfg: ReplayConfig):
for i, name in enumerate(dataset.features[ACTION]["names"]):
action[name] = action_array[i]
robot_obs = robot.get_observation()
# Update cached observation so the robot's input pipeline can use it (e.g. for IK)
robot.get_observation()
processed_action = robot_action_processor((action, robot_obs))
_ = robot.send_action(processed_action)
_ = robot.send_action(action)
dt_s = time.perf_counter() - start_episode_t
precise_sleep(max(1 / dataset.fps - dt_s, 0.0))
+29 -46
View File
@@ -61,12 +61,6 @@ import rerun as rr
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.configs import parser
from lerobot.processor import (
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_processors,
)
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
@@ -100,6 +94,7 @@ from lerobot.teleoperators import ( # noqa: F401
unitree_g1,
)
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.pipeline_utils import check_action_space_compatibility, check_observation_space_compatibility
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.utils import init_logging, move_cursor_up
from lerobot.utils.visualization_utils import init_rerun, log_rerun_data
@@ -127,28 +122,28 @@ def teleop_loop(
teleop: Teleoperator,
robot: Robot,
fps: int,
teleop_action_processor: RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction],
robot_action_processor: RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction],
robot_observation_processor: RobotProcessorPipeline[RobotObservation, RobotObservation],
display_data: bool = False,
duration: float | None = None,
display_compressed_images: bool = False,
):
"""
This function continuously reads actions from a teleoperation device, processes them through optional
pipelines, sends them to a robot, and optionally displays the robot's state. The loop runs at a
specified frequency until a set duration is reached or it is manually interrupted.
Continuously reads actions from a teleoperation device, sends them to a robot,
and optionally displays the robot's state. Pipelines are applied internally by
the robot and teleoperator objects.
The loop runs at the specified frequency until a set duration is reached or it
is manually interrupted.
Args:
teleop: The teleoperator device instance providing control actions.
robot: The robot instance being controlled.
fps: The target frequency for the control loop in frames per second.
display_data: If True, fetches robot observations and displays them in the console and Rerun.
display_compressed_images: If True, compresses images before sending them to Rerun for display.
duration: The maximum duration of the teleoperation loop in seconds. If None, the loop runs indefinitely.
teleop_action_processor: An optional pipeline to process raw actions from the teleoperator.
robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
robot_observation_processor: An optional pipeline to process raw observations from the robot.
display_data: If True, fetches robot observations and displays them in the
console and Rerun.
display_compressed_images: If True, compresses images before sending them
to Rerun for display.
duration: The maximum duration of the teleoperation loop in seconds.
If None, the loop runs indefinitely.
"""
display_len = max(len(key) for key in robot.action_features)
@@ -157,40 +152,29 @@ def teleop_loop(
while True:
loop_start = time.perf_counter()
# Get robot observation
# Not really needed for now other than for visualization
# teleop_action_processor can take None as an observation
# given that it is the identity processor as default
obs = robot.get_observation()
# Get teleop action (output_pipeline applied internally)
action = teleop.get_action()
# Get teleop action
raw_action = teleop.get_action()
# Process teleop action through pipeline
teleop_action = teleop_action_processor((raw_action, obs))
# Process action for robot through pipeline
robot_action_to_send = robot_action_processor((teleop_action, obs))
# Send processed action to robot (robot_action_processor.to_output should return RobotAction)
_ = robot.send_action(robot_action_to_send)
# Send action to robot (input_pipeline applied internally)
robot_action_sent = robot.send_action(action)
if display_data:
# Process robot observation through pipeline
obs_transition = robot_observation_processor(obs)
# Get robot observation (output_pipeline applied internally)
obs = robot.get_observation()
teleop.send_feedback(obs)
log_rerun_data(
observation=obs_transition,
action=teleop_action,
observation=obs,
action=action,
compress_images=display_compressed_images,
)
print("\n" + "-" * (display_len + 10))
print(f"{'NAME':<{display_len}} | {'NORM':>7}")
# Display the final robot action that was sent
for motor, value in robot_action_to_send.items():
print(f"{motor:<{display_len}} | {value:>7.2f}")
move_cursor_up(len(robot_action_to_send) + 3)
for motor, value in robot_action_sent.items():
if isinstance(value, float | int):
print(f"{motor:<{display_len}} | {value:>7.2f}")
move_cursor_up(len(robot_action_sent) + 3)
dt_s = time.perf_counter() - loop_start
precise_sleep(max(1 / fps - dt_s, 0.0))
@@ -216,11 +200,13 @@ def teleoperate(cfg: TeleoperateConfig):
teleop = make_teleoperator_from_config(cfg.teleop)
robot = make_robot_from_config(cfg.robot)
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
teleop.connect()
robot.connect()
check_action_space_compatibility(teleop, robot)
check_observation_space_compatibility(robot, teleop)
try:
teleop_loop(
teleop=teleop,
@@ -228,9 +214,6 @@ def teleoperate(cfg: TeleoperateConfig):
fps=cfg.fps,
display_data=cfg.display_data,
duration=cfg.teleop_time_s,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
display_compressed_images=display_compressed_images,
)
except KeyboardInterrupt:
@@ -72,9 +72,9 @@ class BiOpenArmLeader(Teleoperator):
self.right_arm = OpenArmLeader(right_arm_config)
@cached_property
def action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.action_features
right_arm_features = self.right_arm.action_features
def raw_action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.raw_action_features
right_arm_features = self.right_arm.raw_action_features
return {
**{f"left_{k}": v for k, v in left_arm_features.items()},
@@ -82,7 +82,7 @@ class BiOpenArmLeader(Teleoperator):
}
@cached_property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -112,7 +112,7 @@ class BiOpenArmLeader(Teleoperator):
)
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
action_dict = {}
# Add "left_" prefix
@@ -125,7 +125,7 @@ class BiOpenArmLeader(Teleoperator):
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError
@@ -55,9 +55,9 @@ class BiSOLeader(Teleoperator):
self.right_arm = SOLeader(right_arm_config)
@cached_property
def action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.action_features
right_arm_features = self.right_arm.action_features
def raw_action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.raw_action_features
right_arm_features = self.right_arm.raw_action_features
return {
**{f"left_{k}": v for k, v in left_arm_features.items()},
@@ -65,7 +65,7 @@ class BiSOLeader(Teleoperator):
}
@cached_property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -94,7 +94,7 @@ class BiSOLeader(Teleoperator):
self.right_arm.setup_motors()
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
action_dict = {}
# Add "left_" prefix
@@ -107,7 +107,7 @@ class BiSOLeader(Teleoperator):
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError
@@ -57,7 +57,7 @@ class GamepadTeleop(Teleoperator):
self.gamepad = None
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
if self.config.use_gripper:
return {
"dtype": "float32",
@@ -72,7 +72,7 @@ class GamepadTeleop(Teleoperator):
}
@property
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
return {}
def connect(self) -> None:
@@ -87,7 +87,7 @@ class GamepadTeleop(Teleoperator):
self.gamepad.start()
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
# Update the controller to get fresh inputs
self.gamepad.update()
@@ -180,7 +180,7 @@ class GamepadTeleop(Teleoperator):
# No additional configuration needed
pass
def send_feedback(self, feedback: dict) -> None:
def _send_feedback(self, feedback: dict) -> None:
"""Send feedback to the gamepad."""
# Gamepad doesn't support feedback
pass
@@ -81,11 +81,11 @@ class HomunculusArm(Teleoperator):
self.state_lock = threading.Lock()
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
return {f"{joint}.pos": float for joint in self.joints}
@property
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
return {}
@property
@@ -298,11 +298,11 @@ class HomunculusArm(Teleoperator):
logger.debug(f"Error reading frame in background thread for {self}: {e}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
joint_positions = self._read()
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
@check_if_not_connected
@@ -107,11 +107,11 @@ class HomunculusGlove(Teleoperator):
self.state_lock = threading.Lock()
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
return {f"{joint}.pos": float for joint in self.joints}
@property
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
return {}
@property
@@ -324,13 +324,13 @@ class HomunculusGlove(Teleoperator):
logger.debug(f"Error reading frame in background thread for {self}: {e}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
joint_positions = self._read()
return homunculus_glove_to_hope_jr_hand(
{f"{joint}.pos": pos for joint, pos in joint_positions.items()}
)
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
@check_if_not_connected
@@ -67,7 +67,7 @@ class KeyboardTeleop(Teleoperator):
self.logs = {}
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.arm),),
@@ -75,7 +75,7 @@ class KeyboardTeleop(Teleoperator):
}
@property
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
return {}
@property
@@ -122,7 +122,7 @@ class KeyboardTeleop(Teleoperator):
pass
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
before_read_t = time.perf_counter()
self._drain_pressed_keys()
@@ -133,7 +133,7 @@ class KeyboardTeleop(Teleoperator):
return dict.fromkeys(action, None)
def send_feedback(self, feedback: dict[str, Any]) -> None:
def _send_feedback(self, feedback: dict[str, Any]) -> None:
pass
@check_if_not_connected
@@ -157,7 +157,7 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
self.misc_keys_queue = Queue()
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
if self.config.use_gripper:
return {
"dtype": "float32",
@@ -172,7 +172,7 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
}
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
self._drain_pressed_keys()
delta_x = 0.0
delta_y = 0.0
@@ -338,7 +338,7 @@ class KeyboardRoverTeleop(KeyboardTeleop):
self.current_angular_speed = config.angular_speed
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
"""Return action format for rover (linear and angular velocities)."""
return {
"linear.vel": float,
@@ -361,7 +361,7 @@ class KeyboardRoverTeleop(KeyboardTeleop):
self.current_pressed.pop(key_char, None)
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
"""
Get the current action based on pressed keys.
@@ -58,11 +58,11 @@ class KochLeader(Teleoperator):
)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -160,7 +160,7 @@ class KochLeader(Teleoperator):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
start = time.perf_counter()
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
@@ -168,7 +168,7 @@ class KochLeader(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO(rcadene, aliberts): Implement force feedback
raise NotImplementedError
@@ -57,11 +57,11 @@ class OmxLeader(Teleoperator):
)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -149,7 +149,7 @@ class OmxLeader(Teleoperator):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
start = time.perf_counter()
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
@@ -157,7 +157,7 @@ class OmxLeader(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO(rcadene, aliberts): Implement force feedback
raise NotImplementedError
@@ -65,7 +65,7 @@ class OpenArmLeader(Teleoperator):
)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
"""Features produced by this teleoperator."""
features: dict[str, type] = {}
for motor in self.bus.motors:
@@ -75,7 +75,7 @@ class OpenArmLeader(Teleoperator):
return features
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
"""Feedback features (not implemented for OpenArms)."""
return {}
@@ -183,7 +183,7 @@ class OpenArmLeader(Teleoperator):
)
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
"""
Get current action from the leader arm.
@@ -209,7 +209,7 @@ class OpenArmLeader(Teleoperator):
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm leader.")
@check_if_not_connected
@@ -31,9 +31,17 @@ from .config_openarm_mini import OpenArmMiniConfig
logger = logging.getLogger(__name__)
# Motors whose direction is inverted during readout
RIGHT_MOTORS_TO_FLIP = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"]
LEFT_MOTORS_TO_FLIP = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"]
# Motors whose direction is inverted on the leader side.
LEFT_MOTORS_TO_FLIP = {"joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_7"}
RIGHT_MOTORS_TO_FLIP = {"joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"}
# Leader(OpenArmMini) -> Follower(OpenArms) joint remap
JOINT_REMAP_TO_OPENARMS = {"joint_6": "joint_7", "joint_7": "joint_6"}
# Follower(OpenArms) -> Leader(OpenArmMini) joint remap
JOINT_REMAP_TO_MINI = {"joint_7": "joint_6", "joint_6": "joint_7"}
OPENARMS_GRIPPER_MIN = -65.0
OPENARMS_GRIPPER_MAX = 0.0
MINI_GRIPPER_MIN = 0.0
MINI_GRIPPER_MAX = 100.0
class OpenArmMini(Teleoperator):
@@ -93,8 +101,28 @@ class OpenArmMini(Teleoperator):
calibration=cal_left,
)
@staticmethod
def _mini_gripper_to_openarms(value: float) -> float:
"""Convert OpenArmMini gripper range [0, 100] to OpenArms gripper range [-65, 0]."""
mapped = OPENARMS_GRIPPER_MAX + (
(value - MINI_GRIPPER_MIN)
* (OPENARMS_GRIPPER_MIN - OPENARMS_GRIPPER_MAX)
/ (MINI_GRIPPER_MAX - MINI_GRIPPER_MIN)
)
return max(min(mapped, OPENARMS_GRIPPER_MAX), OPENARMS_GRIPPER_MIN)
@staticmethod
def _openarms_gripper_to_mini(value: float) -> float:
"""Convert OpenArms gripper range [-65, 0] to OpenArmMini gripper range [0, 100]."""
clipped = max(min(value, OPENARMS_GRIPPER_MAX), OPENARMS_GRIPPER_MIN)
return MINI_GRIPPER_MIN + (
(OPENARMS_GRIPPER_MAX - clipped)
* (MINI_GRIPPER_MAX - MINI_GRIPPER_MIN)
/ (OPENARMS_GRIPPER_MAX - OPENARMS_GRIPPER_MIN)
)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
features: dict[str, type] = {}
for motor in self.bus_right.motors:
features[f"right_{motor}.pos"] = float
@@ -103,7 +131,7 @@ class OpenArmMini(Teleoperator):
return features
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -269,7 +297,7 @@ class OpenArmMini(Teleoperator):
print(f"LEFT '{motor}' motor id set to {self.bus_left.motors[motor].id}")
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
"""Get current action from both arms (read positions from all motors)."""
start = time.perf_counter()
@@ -278,16 +306,64 @@ class OpenArmMini(Teleoperator):
action: dict[str, Any] = {}
for motor, val in right_positions.items():
action[f"right_{motor}.pos"] = -val if motor in RIGHT_MOTORS_TO_FLIP else val
target_motor = JOINT_REMAP_TO_OPENARMS.get(motor, motor)
mapped_val = -val if motor in RIGHT_MOTORS_TO_FLIP else val
if target_motor == "gripper":
mapped_val = self._mini_gripper_to_openarms(mapped_val)
action[f"right_{target_motor}.pos"] = mapped_val
for motor, val in left_positions.items():
action[f"left_{motor}.pos"] = -val if motor in LEFT_MOTORS_TO_FLIP else val
target_motor = JOINT_REMAP_TO_OPENARMS.get(motor, motor)
mapped_val = -val if motor in LEFT_MOTORS_TO_FLIP else val
if target_motor == "gripper":
mapped_val = self._mini_gripper_to_openarms(mapped_val)
action[f"left_{target_motor}.pos"] = mapped_val
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm Mini.")
@check_if_not_connected
def enable_torque(self) -> None:
"""Enable torque on both arms for active motion commands."""
self.bus_right.enable_torque()
self.bus_left.enable_torque()
@check_if_not_connected
def disable_torque(self) -> None:
"""Disable torque on both arms for manual teleoperation."""
self.bus_right.disable_torque()
self.bus_left.disable_torque()
@check_if_not_connected
def write_goal_positions(self, action: dict[str, float]) -> None:
"""Send normalized bilateral goal positions to the underlying Feetech buses."""
right_goals: dict[str, float] = {}
left_goals: dict[str, float] = {}
for key, value in action.items():
if not key.endswith(".pos"):
continue
if key.startswith("right_"):
openarms_motor = key.removeprefix("right_").removesuffix(".pos")
mini_motor = JOINT_REMAP_TO_MINI.get(openarms_motor, openarms_motor)
mapped_val = self._openarms_gripper_to_mini(value) if openarms_motor == "gripper" else value
right_goals[mini_motor] = -mapped_val if mini_motor in RIGHT_MOTORS_TO_FLIP else mapped_val
elif key.startswith("left_"):
openarms_motor = key.removeprefix("left_").removesuffix(".pos")
mini_motor = JOINT_REMAP_TO_MINI.get(openarms_motor, openarms_motor)
mapped_val = self._openarms_gripper_to_mini(value) if openarms_motor == "gripper" else value
left_goals[mini_motor] = -mapped_val if mini_motor in LEFT_MOTORS_TO_FLIP else mapped_val
if right_goals:
self.bus_right.sync_write("Goal_Position", right_goals)
if left_goals:
self.bus_left.sync_write("Goal_Position", left_goals)
@check_if_not_connected
def _send_feedback(self, feedback: dict[str, float]) -> None:
"""Route feedback position commands through the same OpenArms/OpenArmMini mapping."""
self.write_goal_positions(feedback)
@check_if_not_connected
def disconnect(self) -> None:
+14 -14
View File
@@ -47,7 +47,7 @@ class BasePhone:
return (self._calib_pos is not None) and (self._calib_rot_inv is not None)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {
"phone.pos": np.ndarray, # shape (3,)
"phone.rot": Rotation, # scipy.spatial.transform.Rotation
@@ -56,15 +56,15 @@ class BasePhone:
}
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
# No haptic or other feedback implemented yet
pass
return {}
def configure(self) -> None:
# No additional configuration required for phone teleop
pass
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# We could add haptic feedback (vibrations) here, but it's not implemented yet
raise NotImplementedError
@@ -163,7 +163,7 @@ class IOSPhone(BasePhone, Teleoperator):
return True, pos, rot, pose
@check_if_not_connected
def get_action(self) -> dict:
def _get_action(self) -> dict:
has_pose, raw_position, raw_rotation, fb_pose = self._read_current_pose()
if not has_pose or not self.is_calibrated:
return {}
@@ -314,7 +314,7 @@ class AndroidPhone(BasePhone, Teleoperator):
self._latest_message = message
@check_if_not_connected
def get_action(self) -> dict:
def _get_action(self) -> dict:
ok, raw_pos, raw_rot, pose = self._read_current_pose()
if not ok or not self.is_calibrated:
return {}
@@ -395,21 +395,21 @@ class Phone(Teleoperator):
return self._phone_impl.is_calibrated
@property
def action_features(self) -> dict[str, type]:
return self._phone_impl.action_features
def raw_action_features(self) -> dict[str, type]:
return self._phone_impl.raw_action_features
@property
def feedback_features(self) -> dict[str, type]:
return self._phone_impl.feedback_features
def raw_feedback_features(self) -> dict[str, type]:
return self._phone_impl.raw_feedback_features
def configure(self) -> None:
return self._phone_impl.configure()
def get_action(self) -> dict:
return self._phone_impl.get_action()
def _get_action(self) -> dict:
return self._phone_impl._get_action()
def send_feedback(self, feedback: dict[str, float]) -> None:
return self._phone_impl.send_feedback(feedback)
def _send_feedback(self, feedback: dict[str, float]) -> None:
return self._phone_impl._send_feedback(feedback)
def disconnect(self) -> None:
return self._phone_impl.disconnect()
@@ -104,7 +104,7 @@ class Reachy2Teleoperator(Teleoperator):
return joints
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
if self.config.with_mobile_base:
return {
**dict.fromkeys(
@@ -120,7 +120,7 @@ class Reachy2Teleoperator(Teleoperator):
return dict.fromkeys(self.joints_dict.keys(), float)
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -146,7 +146,7 @@ class Reachy2Teleoperator(Teleoperator):
pass
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
start = time.perf_counter()
joint_action: dict[str, float] = {}
@@ -168,7 +168,7 @@ class Reachy2Teleoperator(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return {**joint_action, **vel_action}
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
@@ -0,0 +1,19 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .ee_space import make_so10x_leader_fk_pipeline
__all__ = ["make_so10x_leader_fk_pipeline"]
@@ -0,0 +1,82 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Forward-kinematics pipeline for SO-100/101 leader (teleoperator) arm.
Converts raw leader joint positions into end-effector pose. Attach this to a leader
via ``set_output_pipeline`` so that ``get_action()`` returns EE coordinates instead of
raw joint angles.
Example::
from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline
motor_names = list(leader.bus.motors.keys())
leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, motor_names))
action = leader.get_action() # now contains ee.x, ee.y, ee.z, ...
"""
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotProcessorPipeline
from lerobot.processor.converters import (
robot_action_to_transition,
transition_to_robot_action,
)
from lerobot.robots.so_follower.robot_kinematic_processor import ForwardKinematicsJointsToEE
_DEFAULT_GRIPPER_FRAME = "gripper_frame_link"
def make_so10x_leader_fk_pipeline(
urdf_path: str,
motor_names: list[str],
*,
target_frame_name: str = _DEFAULT_GRIPPER_FRAME,
) -> RobotProcessorPipeline[RobotAction, RobotAction]:
"""
Create a forward-kinematics action pipeline for SO-100/101 leader teleoperators.
Converts raw leader joint positions (action) into end-effector pose (position +
orientation + gripper). Attach this to a leader via ``set_output_pipeline`` so that
``get_action()`` returns EE coordinates instead of raw joint angles.
Args:
urdf_path: Path to the SO-100/101 URDF file used for kinematics.
motor_names: Ordered list of motor names matching the URDF joint names.
target_frame_name: Name of the end-effector frame in the URDF.
Returns:
A RobotProcessorPipeline that maps joint actions to EE actions.
Example::
motor_names = list(leader.bus.motors.keys())
leader.set_output_pipeline(
make_so10x_leader_fk_pipeline("./so101.urdf", motor_names)
)
action = leader.get_action() # returns ee.x, ee.y, ee.z, ee.wx, ee.wy, ee.wz, ee.gripper_vel
"""
kinematics = RobotKinematics(
urdf_path=urdf_path,
target_frame_name=target_frame_name,
joint_names=motor_names,
)
return RobotProcessorPipeline[RobotAction, RobotAction](
steps=[ForwardKinematicsJointsToEE(kinematics=kinematics, motor_names=motor_names)],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
@@ -55,11 +55,11 @@ class SOLeader(Teleoperator):
)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -138,7 +138,7 @@ class SOLeader(Teleoperator):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
start = time.perf_counter()
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
@@ -146,7 +146,7 @@ class SOLeader(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError
+146 -30
View File
@@ -12,17 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import abc
import builtins
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any
import draccus
from lerobot.configs.types import PipelineFeatureType
from lerobot.motors.motors_bus import MotorCalibration
from lerobot.processor import RobotAction
from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS
if TYPE_CHECKING:
from lerobot.processor.core import RobotAction
from lerobot.processor.pipeline import RobotProcessorPipeline
from .config import TeleoperatorConfig
@@ -33,6 +39,10 @@ class Teleoperator(abc.ABC):
This class provides a standardized interface for interacting with physical teleoperators.
Subclasses must implement all abstract methods and properties to be usable.
Pipelines are first-class citizens: every teleoperator carries an optional output pipeline
(applied in get_action()) and an optional input pipeline (applied in send_feedback()).
Both default to identity (no-op), so existing teleoperators work without any changes.
Attributes:
config_class (RobotConfig): The expected configuration class for this teleoperator.
name (str): The unique name used to identify this teleoperator type.
@@ -55,6 +65,14 @@ class Teleoperator(abc.ABC):
if self.calibration_fpath.is_file():
self._load_calibration()
# Pipeline interface — default to identity (no-op), swap via set_output/input_pipeline()
# Lazy import: factory is in lerobot.processor which loads after teleoperators at module init time,
# but __init__ runs at instance-creation time when lerobot.processor is fully loaded.
from lerobot.processor.factory import _make_identity_feedback_pipeline, _make_identity_teleop_action_pipeline
self._output_pipeline: RobotProcessorPipeline = _make_identity_teleop_action_pipeline()
self._input_pipeline: RobotProcessorPipeline = _make_identity_feedback_pipeline()
def __str__(self) -> str:
return f"{self.id} {self.__class__.__name__}"
@@ -84,38 +102,114 @@ class Teleoperator(abc.ABC):
except Exception: # nosec B110
pass
# ── Pipeline interface ────────────────────────────────────────────────────
def output_pipeline(self) -> RobotProcessorPipeline:
"""
Pipeline applied inside get_action() to transform raw hardware actions.
Default: identity (no-op). Override via set_output_pipeline() or subclassing.
Example: set a forward-kinematics pipeline to convert leader joint positions to EE pose.
"""
return self._output_pipeline
def input_pipeline(self) -> RobotProcessorPipeline:
"""
Pipeline applied inside send_feedback() to transform incoming feedback.
Default: identity (no-op). Override via set_input_pipeline() or subclassing.
"""
return self._input_pipeline
def set_output_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
"""Set the action output pipeline (applied in get_action())."""
self._output_pipeline = pipeline
def set_input_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
"""Set the feedback input pipeline (applied in send_feedback())."""
self._input_pipeline = pipeline
# ── Feature properties ────────────────────────────────────────────────────
@property
@abc.abstractmethod
def action_features(self) -> dict:
"""
A dictionary describing the structure and types of the actions produced by the teleoperator. Its
structure (keys) should match the structure of what is returned by :pymeth:`get_action`. Values for
the dict should be the type of the value if it's a simple value, e.g. `float` for single
proprioceptive value (a joint's goal position/velocity)
Pipeline-transformed action features.
Note: this property should be able to be called regardless of whether the robot is connected or not.
Applies output_pipeline().transform_features() to raw_action_features so the
returned dict matches what get_action() actually produces for callers.
Use raw_action_features to inspect hardware-level feature shapes.
Note: this property should be able to be called regardless of whether the
teleoperator is connected or not.
"""
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
initial = create_initial_features(action=self.raw_action_features)
transformed = self.output_pipeline().transform_features(initial)
return transformed.get(PipelineFeatureType.ACTION, {})
@property
@abc.abstractmethod
def raw_action_features(self) -> dict:
"""
Hardware-level action features (before any pipeline transformation).
A dictionary describing the structure and types of the actions produced
directly by the teleoperator hardware. Its structure (keys) should match
the structure of what is returned by :pymeth:`_get_action`. Values should be
the type of the value if it's a simple value, e.g. ``float`` for single
proprioceptive value (a joint's goal position/velocity).
Note: this property should be able to be called regardless of whether the
teleoperator is connected or not.
"""
pass
@property
@abc.abstractmethod
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
"""
A dictionary describing the structure and types of the feedback actions expected by the robot. Its
structure (keys) should match the structure of what is passed to :pymeth:`send_feedback`. Values for
the dict should be the type of the value if it's a simple value, e.g. `float` for single
proprioceptive value (a joint's goal position/velocity)
Hardware-level feedback features (before any pipeline transformation).
Note: this property should be able to be called regardless of whether the robot is connected or not.
A dictionary describing the structure and types of the feedback accepted directly
by the teleoperator hardware (i.e. what :pymeth:`_send_feedback` receives). Its
structure (keys) should match the structure of what is expected by
:pymeth:`_send_feedback`. Values should be the type of the value if it's a simple
value, e.g. ``float`` for single proprioceptive value.
Return an empty dict if this teleoperator does not support feedback.
Note: this property should be able to be called regardless of whether the
teleoperator is connected or not.
"""
pass
@property
def feedback_features(self) -> dict:
"""
Pipeline-transformed feedback features.
Applies input_pipeline().transform_features() to raw_feedback_features so the
returned dict reflects what the input pipeline outputs to the teleoperator hardware.
Use raw_feedback_features to inspect hardware-level feedback feature shapes.
Note: this property should be able to be called regardless of whether the
teleoperator is connected or not.
"""
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
initial = create_initial_features(observation=self.raw_feedback_features)
transformed = self.input_pipeline().transform_features(initial)
return transformed.get(PipelineFeatureType.OBSERVATION, {})
@property
@abc.abstractmethod
def is_connected(self) -> bool:
"""
Whether the teleoperator is currently connected or not. If `False`, calling :pymeth:`get_action`
or :pymeth:`send_feedback` should raise an error.
Whether the teleoperator is currently connected or not. If ``False``, calling
:pymeth:`get_action` or :pymeth:`send_feedback` should raise an error.
"""
pass
@@ -133,7 +227,7 @@ class Teleoperator(abc.ABC):
@property
@abc.abstractmethod
def is_calibrated(self) -> bool:
"""Whether the teleoperator is currently calibrated or not. Should be always `True` if not applicable"""
"""Whether the teleoperator is currently calibrated or not. Should be always ``True`` if not applicable"""
pass
@abc.abstractmethod
@@ -151,7 +245,7 @@ class Teleoperator(abc.ABC):
Helper to load calibration data from the specified file.
Args:
fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`.
fpath (Path | None): Optional path to the calibration file. Defaults to ``self.calibration_fpath``.
"""
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath) as f, draccus.config_type("json"):
@@ -162,7 +256,7 @@ class Teleoperator(abc.ABC):
Helper to save calibration data to the specified file.
Args:
fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`.
fpath (Path | None): Optional path to save the calibration file. Defaults to ``self.calibration_fpath``.
"""
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath, "w") as f, draccus.config_type("json"):
@@ -176,29 +270,51 @@ class Teleoperator(abc.ABC):
"""
pass
@abc.abstractmethod
# ── Template methods (concrete, call pipeline internally) ─────────────────
def get_action(self) -> RobotAction:
"""
Retrieve the current action from the teleoperator.
Retrieve the current action from the teleoperator and apply the output pipeline.
Calls :pymeth:`_get_action` to get raw hardware data, then applies
:pymeth:`output_pipeline`.
Returns:
RobotAction: A flat dictionary representing the teleoperator's current actions. Its
structure should match :pymeth:`observation_features`.
RobotAction: Pipeline-transformed action. With the default identity pipeline
this equals the raw action from :pymeth:`_get_action`.
"""
raw = self._get_action()
return self.output_pipeline()(raw)
@abc.abstractmethod
def _get_action(self) -> RobotAction:
"""
Retrieve the raw action directly from teleoperator hardware.
Returns:
RobotAction: A flat dictionary representing the teleoperator's current actions.
Its structure should match :pymeth:`raw_action_features`.
"""
pass
@abc.abstractmethod
def send_feedback(self, feedback: dict[str, Any]) -> None:
"""
Send a feedback action command to the teleoperator.
Apply the input pipeline and send the resulting feedback to teleoperator hardware.
Args:
feedback (dict[str, Any]): Dictionary representing the desired feedback. Its structure should match
:pymeth:`feedback_features`.
feedback (dict[str, Any]): Dictionary representing the desired feedback.
Its structure should match :pymeth:`feedback_features`.
"""
transformed = self.input_pipeline()(feedback)
self._send_feedback(transformed)
Returns:
dict[str, Any]: The action actually sent to the motors potentially clipped or modified, e.g. by
safety limits on velocity.
@abc.abstractmethod
def _send_feedback(self, feedback: dict[str, Any]) -> None:
"""
Send feedback directly to teleoperator hardware.
Args:
feedback (dict[str, Any]): Dictionary of hardware-level feedback commands.
"""
pass
@@ -72,11 +72,11 @@ class UnitreeG1Teleoperator(Teleoperator):
self.ik_helper: ExoskeletonIKHelper | None = None
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{name}.q": float for name in self._g1_joint_names}
@cached_property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -114,12 +114,12 @@ class UnitreeG1Teleoperator(Teleoperator):
def configure(self) -> None:
pass
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
left_angles = self.left_arm.get_angles()
right_angles = self.right_arm.get_angles()
return self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles)
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Exoskeleton arms do not support feedback")
def disconnect(self) -> None:
+212
View File
@@ -0,0 +1,212 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utilities for building dataset features from robot/teleoperator pipelines and for
checking action/observation space compatibility between teleops and robots.
"""
import logging
import re
from collections.abc import Sequence
from lerobot.datasets.utils import combine_feature_dicts, hw_to_dataset_features
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR
# Prefixes stripped from feature keys to produce clean dataset names.
# Handles both fully-qualified (e.g. "observation.state.ee.x") and short (e.g. "state.ee.x") forms.
_PREFIXES_TO_STRIP = tuple(
f"{token}."
for const in (ACTION, OBS_STATE, OBS_IMAGES)
for token in (const, const.split(".")[-1])
)
_IMAGES_TOKEN = OBS_IMAGES.split(".")[-1]
def _should_keep(key: str, patterns: Sequence[str] | None) -> bool:
if patterns is None:
return True
return any(re.search(pat, key) for pat in patterns)
def _strip_prefix(key: str) -> str:
for prefix in _PREFIXES_TO_STRIP:
if key.startswith(prefix):
return key[len(prefix) :]
return key
def _features_to_dataset_spec(
features: dict,
*,
is_action: bool,
use_videos: bool,
patterns: Sequence[str] | None = None,
) -> dict:
"""
Convert a flat feature dict (as returned by ``robot.observation_features`` or
``teleop.action_features``) into a LeRobot dataset feature specification.
Args:
features: Flat dict mapping feature key type or shape.
is_action: True when ``features`` describes actions; False for observations.
use_videos: When False, image observation features are excluded entirely.
patterns: Optional regex patterns to filter state/action features.
Image features are not affected by this filter.
Returns:
A dict suitable for passing to ``LeRobotDataset.create(..., features=...)``.
"""
categorized: dict = {}
for key, value in features.items():
is_image = not is_action and (
(isinstance(value, tuple) and len(value) == 3)
or key.startswith(f"{OBS_IMAGES}.")
or key.startswith(f"{_IMAGES_TOKEN}.")
or f".{_IMAGES_TOKEN}." in key
)
if is_image and not use_videos:
continue
if not is_image and not _should_keep(key, patterns):
continue
categorized[_strip_prefix(key)] = value
if not categorized:
return {}
prefix = ACTION if is_action else OBS_STR
return hw_to_dataset_features(categorized, prefix, use_videos)
def build_dataset_features(
robot,
teleop=None,
*,
use_videos: bool = True,
action_features: dict | None = None,
) -> dict:
"""
Derive dataset feature specifications from robot and teleoperator pipelines.
Reads ``robot.observation_features`` (which already reflects the robot's output
pipeline transformation) and, when provided, ``teleop.action_features`` or an
explicit ``action_features`` dict to determine what the dataset will store.
This replaces the old pattern of manually calling ``aggregate_pipeline_dataset_features``
with explicit processor objects.
Args:
robot: The robot instance (must have ``observation_features``).
teleop: The teleoperator instance. When ``None`` and ``action_features`` is also
``None`` (policy-only recording), only observation features are returned.
use_videos: If True, image observations are included as video features.
action_features: Explicit action feature dict, used when no teleop is available
(e.g. evaluate/inference mode) but the dataset must match a specific action
space (e.g. EE coordinates from a previously recorded dataset).
Returns:
A combined feature dict suitable for passing to ``LeRobotDataset.create(..., features=...)``.
Example::
# Teleop recording
features = build_dataset_features(follower, leader, use_videos=True)
# Policy-only recording (no teleop)
features = build_dataset_features(robot, use_videos=True)
# Evaluate with explicit EE action space
features = build_dataset_features(
robot,
use_videos=True,
action_features={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
},
)
"""
obs_ds = _features_to_dataset_spec(robot.observation_features, is_action=False, use_videos=use_videos)
if action_features is not None:
act_ds = _features_to_dataset_spec(action_features, is_action=True, use_videos=False)
elif teleop is not None:
act_ds = _features_to_dataset_spec(teleop.action_features, is_action=True, use_videos=False)
else:
return obs_ds
return combine_feature_dicts(act_ds, obs_ds)
def check_action_space_compatibility(teleop, robot) -> None:
"""
Warn if the teleoperator's pipeline-transformed action features don't match the robot's
declared ``action_features``.
This is a soft check a mismatch produces a warning but does not raise. It is intended
to catch obvious misconfigurations (e.g., sending EE actions to a robot expecting joints)
before the control loop starts.
Args:
teleop: The teleoperator whose ``action_features`` describe what it sends.
robot: The robot whose ``action_features`` describe what it expects.
"""
teleop_out = set(teleop.action_features.keys())
robot_in = set(robot.action_features.keys())
if teleop_out != robot_in:
import warnings
warnings.warn(
f"Action space mismatch between teleop and robot.\n"
f" Teleop sends: {sorted(teleop_out)}\n"
f" Robot expects: {sorted(robot_in)}\n"
"Ensure pipelines map between these spaces correctly.",
UserWarning,
stacklevel=2,
)
else:
logging.debug("Action space compatibility check passed.")
def check_observation_space_compatibility(robot, teleop) -> None:
"""
Warn if the robot's observation features don't cover what the teleoperator's
``feedback_features`` expects.
A non-empty ``feedback_features`` that is not a subset of the robot's observation keys
will produce a warning.
Args:
robot: The robot whose ``observation_features`` describe what it produces.
teleop: The teleoperator whose ``feedback_features`` describe what it expects as feedback.
"""
robot_obs = set(robot.observation_features.keys())
teleop_feedback = set(teleop.feedback_features.keys())
if teleop_feedback and not teleop_feedback.issubset(robot_obs):
import warnings
warnings.warn(
f"Observation/feedback space mismatch.\n"
f" Robot obs: {sorted(robot_obs)}\n"
f" Teleop feedback expects: {sorted(teleop_feedback)}\n"
"Ensure the robot observation pipeline covers all feedback keys.",
UserWarning,
stacklevel=2,
)
else:
logging.debug("Observation/feedback space compatibility check passed.")
+3 -3
View File
@@ -87,7 +87,7 @@ class MockRobot(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
@@ -116,7 +116,7 @@ class MockRobot(Robot):
pass
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
if self.config.random_values:
return {f"{motor}.pos": random.uniform(-100, 100) for motor in self.motors}
else:
@@ -125,7 +125,7 @@ class MockRobot(Robot):
}
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
return action
@check_if_not_connected
+3 -3
View File
@@ -57,7 +57,7 @@ class MockTeleop(Teleoperator):
self.motors = [f"motor_{i + 1}" for i in range(config.n_motors)]
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.motors}
@cached_property
@@ -86,7 +86,7 @@ class MockTeleop(Teleoperator):
pass
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
if self.config.random_values:
return {f"{motor}.pos": random.uniform(-100, 100) for motor in self.motors}
else:
@@ -95,7 +95,7 @@ class MockTeleop(Teleoperator):
}
@check_if_not_connected
def send_feedback(self, feedback: dict[str, Any]) -> None: ...
def _send_feedback(self, feedback: dict[str, Any]) -> None: ...
@check_if_not_connected
def disconnect(self) -> None:
+77 -129
View File
@@ -26,7 +26,7 @@ import torch
import torch.nn as nn
from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features
from lerobot.utils.pipeline_utils import _features_to_dataset_spec
from lerobot.processor import (
DataProcessorPipeline,
EnvTransition,
@@ -2040,102 +2040,68 @@ def test_features_remove_from_initial(policy_feature_factory):
)
@dataclass
class AddActionEEAndJointFeatures(ProcessorStep):
"""Adds both EE and JOINT action features."""
def __call__(self, tr):
return tr
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
# EE features
features[PipelineFeatureType.ACTION]["action.ee.x"] = float
features[PipelineFeatureType.ACTION]["action.ee.y"] = float
# JOINT features
features[PipelineFeatureType.ACTION]["action.j1.pos"] = float
features[PipelineFeatureType.ACTION]["action.j2.pos"] = float
return features
# ── Tests for _features_to_dataset_spec ──────────────────────────────────────────────────────────
# These replace the old aggregate_pipeline_dataset_features tests, covering the same categorisation
# / filtering / prefix-stripping / HF-format logic via the private helper directly.
@dataclass
class AddObservationStateFeatures(ProcessorStep):
"""Adds state features (and optionally an image spec to test precedence)."""
add_front_image: bool = False
front_image_shape: tuple = (240, 320, 3)
def __call__(self, tr):
return tr
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
# State features (mix EE and a joint state)
features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.ee.x"] = float
features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.j1.pos"] = float
if self.add_front_image:
features[PipelineFeatureType.OBSERVATION][f"{OBS_IMAGES}.front"] = self.front_image_shape
return features
def test_aggregate_joint_action_only():
rp = DataProcessorPipeline([AddActionEEAndJointFeatures()])
initial = {PipelineFeatureType.OBSERVATION: {"front": (480, 640, 3)}, PipelineFeatureType.ACTION: {}}
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features=initial,
use_videos=True,
patterns=["action.j1.pos", "action.j2.pos"],
def test_dataset_spec_action_with_patterns():
"""Action features are filtered by pattern; unmatched keys are excluded."""
features = {
"action.ee.x": float,
"action.ee.y": float,
"action.j1.pos": float,
"action.j2.pos": float,
}
out = _features_to_dataset_spec(
features, is_action=True, use_videos=True, patterns=["action.j1.pos", "action.j2.pos"]
)
# Expect only ACTION with joint names
assert ACTION in out and OBS_STATE not in out
assert ACTION in out
assert out[ACTION]["dtype"] == "float32"
assert set(out[ACTION]["names"]) == {"j1.pos", "j2.pos"}
assert out[ACTION]["shape"] == (len(out[ACTION]["names"]),)
assert OBS_STATE not in out
def test_aggregate_ee_action_and_observation_with_videos():
rp = DataProcessorPipeline([AddActionEEAndJointFeatures(), AddObservationStateFeatures()])
initial = {"front": (480, 640, 3), "side": (720, 1280, 3)}
def test_dataset_spec_action_and_observation_with_videos():
"""EE action + state obs + image obs; all appear with correct dtypes."""
action_features = {"action.ee.x": float, "action.ee.y": float}
obs_features = {
f"{OBS_STATE}.ee.x": float,
f"{OBS_STATE}.j1.pos": float,
"front": (480, 640, 3),
"side": (720, 1280, 3),
}
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}},
use_videos=True,
patterns=["action.ee", OBS_STATE],
)
act_out = _features_to_dataset_spec(action_features, is_action=True, use_videos=False)
obs_out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=True)
# Action should pack only EE names
assert ACTION in out
assert set(out[ACTION]["names"]) == {"ee.x", "ee.y"}
assert out[ACTION]["dtype"] == "float32"
assert ACTION in act_out
assert set(act_out[ACTION]["names"]) == {"ee.x", "ee.y"}
assert act_out[ACTION]["dtype"] == "float32"
# Observation state should pack both ee.x and j1.pos as a vector
assert OBS_STATE in out
assert set(out[OBS_STATE]["names"]) == {"ee.x", "j1.pos"}
assert out[OBS_STATE]["dtype"] == "float32"
assert OBS_STATE in obs_out
assert set(obs_out[OBS_STATE]["names"]) == {"ee.x", "j1.pos"}
assert obs_out[OBS_STATE]["dtype"] == "float32"
# Cameras from initial_features appear as videos
for cam in ("front", "side"):
for cam, shape in [("front", (480, 640, 3)), ("side", (720, 1280, 3))]:
key = f"{OBS_IMAGES}.{cam}"
assert key in out
assert out[key]["dtype"] == "video"
assert out[key]["shape"] == initial[cam]
assert out[key]["names"] == ["height", "width", "channels"]
assert key in obs_out, f"missing camera key {key}"
assert obs_out[key]["dtype"] == "video"
assert obs_out[key]["shape"] == shape
assert obs_out[key]["names"] == ["height", "width", "channels"]
def test_aggregate_both_action_types():
rp = DataProcessorPipeline([AddActionEEAndJointFeatures()])
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: {}},
use_videos=True,
patterns=["action.ee", "action.j1", "action.j2.pos"],
)
def test_dataset_spec_all_action_types():
"""EE and joint action features are both included when no pattern filter."""
features = {
"action.ee.x": float,
"action.ee.y": float,
"action.j1.pos": float,
"action.j2.pos": float,
}
out = _features_to_dataset_spec(features, is_action=True, use_videos=True, patterns=None)
assert ACTION in out
expected = {"ee.x", "ee.y", "j1.pos", "j2.pos"}
@@ -2143,58 +2109,40 @@ def test_aggregate_both_action_types():
assert out[ACTION]["shape"] == (len(expected),)
def test_aggregate_images_when_use_videos_false():
rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)])
initial = {"back": (480, 640, 3)}
def test_dataset_spec_images_excluded_when_no_videos():
"""Image observation features are dropped entirely when use_videos=False."""
obs_features = {
f"{OBS_STATE}.j1.pos": float,
"back": (480, 640, 3),
f"{OBS_IMAGES}.front": (240, 320, 3),
}
out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=False)
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial},
use_videos=False, # expect "image" dtype
patterns=None,
)
key = f"{OBS_IMAGES}.back"
key_front = f"{OBS_IMAGES}.front"
assert key not in out
assert key_front not in out
assert f"{OBS_IMAGES}.back" not in out
assert f"{OBS_IMAGES}.front" not in out
# Non-image state feature is still present
assert OBS_STATE in out
assert "j1.pos" in out[OBS_STATE]["names"]
def test_aggregate_images_when_use_videos_true():
rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)])
initial = {"back": (480, 640, 3)}
def test_dataset_spec_images_included_when_use_videos():
"""Image features appear as video entries when use_videos=True."""
obs_features = {
"back": (480, 640, 3),
f"{OBS_IMAGES}.front": (240, 320, 3),
}
out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=True)
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}},
use_videos=True,
patterns=None,
)
assert f"{OBS_IMAGES}.back" in out
assert out[f"{OBS_IMAGES}.back"]["dtype"] == "video"
assert out[f"{OBS_IMAGES}.back"]["shape"] == (480, 640, 3)
key = f"{OBS_IMAGES}.front"
key_back = f"{OBS_IMAGES}.back"
assert key in out
assert key_back in out
assert out[key]["dtype"] == "video"
assert out[key_back]["dtype"] == "video"
assert out[key_back]["shape"] == initial["back"]
assert f"{OBS_IMAGES}.front" in out
assert out[f"{OBS_IMAGES}.front"]["dtype"] == "video"
assert out[f"{OBS_IMAGES}.front"]["shape"] == (240, 320, 3)
def test_initial_camera_not_overridden_by_step_image():
# Step explicitly sets a different front image shape; initial has another shape.
# aggregate_pipeline_dataset_features should keep the step's value (setdefault behavior on initial cams).
rp = DataProcessorPipeline(
[AddObservationStateFeatures(add_front_image=True, front_image_shape=(240, 320, 3))]
)
initial = {"front": (480, 640, 3)} # should NOT override the step-provided (240, 320, 3)
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial},
use_videos=True,
patterns=[f"{OBS_IMAGES}.front"],
)
key = f"{OBS_IMAGES}.front"
assert key in out
assert out[key]["shape"] == (240, 320, 3) # from the step, not from initial
def test_dataset_spec_empty_features_returns_empty():
"""Empty feature dict returns an empty output dict."""
assert _features_to_dataset_spec({}, is_action=True, use_videos=True) == {}
assert _features_to_dataset_spec({}, is_action=False, use_videos=True) == {}
+108
View File
@@ -0,0 +1,108 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Integration tests for loading robot/teleop pipelines from the Hugging Face Hub.
These tests require network access and are marked with ``@pytest.mark.integration``.
Run with::
pytest tests/test_pipeline_hub.py -m integration -v
The tests verify the full end-to-end flow of:
1. Loading a pipeline from the Hub via ``RobotProcessorPipeline.from_pretrained(...)``
2. Attaching it to a robot or teleoperator via ``set_output_pipeline`` / ``set_input_pipeline``
3. Verifying that ``observation_features`` / ``action_features`` differ from the raw versions
Note: The Hub repos referenced below are placeholders. Update them once actual pipelines
are published to the Hub.
"""
import pytest
# ─── Shared mock infrastructure (mirrors test_robot_pipeline.py) ──────────────
try:
from tests.test_robot_pipeline import MockRobot, MockTeleop # type: ignore[import]
except ImportError:
# Fallback if tests are run from a different working directory
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from test_robot_pipeline import MockRobot, MockTeleop
# ─── Integration tests ────────────────────────────────────────────────────────
@pytest.mark.integration
def test_load_robot_pipeline_from_hub(tmp_path):
"""
Full end-to-end: load a FK observation pipeline for SO-101 from the Hub,
attach it to a robot, and verify that observation_features are transformed.
Prerequisites:
- A pipeline must be published at ``lerobot/so101-fk-observation-pipeline`` on the Hub.
- A URDF file must be available locally (update ``local_urdf_path`` to point to it).
"""
pytest.importorskip("huggingface_hub")
from lerobot.processor.pipeline import RobotProcessorPipeline
local_urdf_path = tmp_path / "so101.urdf"
# NOTE: In a real test environment, provide an actual URDF or mock the kinematics.
# For now, this test validates the Hub loading mechanism only if a URDF is provided.
if not local_urdf_path.exists():
pytest.skip("URDF not available; skipping Hub loading test")
pipeline = RobotProcessorPipeline.from_pretrained(
"lerobot/so101-fk-observation-pipeline",
overrides={"step_0": {"urdf_path": str(local_urdf_path)}},
)
robot = MockRobot()
robot.set_output_pipeline(pipeline)
# Pipeline-transformed features should differ from raw features (EE vs joints)
assert robot.observation_features != robot.raw_observation_features
@pytest.mark.integration
def test_load_teleop_pipeline_from_hub(tmp_path):
"""
Full end-to-end: load a FK action pipeline for SO-101 leader from the Hub,
attach it to a teleoperator, and verify that action_features are transformed.
Prerequisites:
- A pipeline must be published at ``lerobot/so101-leader-fk-action-pipeline`` on the Hub.
- A URDF file must be available locally (update ``local_urdf_path`` to point to it).
"""
pytest.importorskip("huggingface_hub")
from lerobot.processor.pipeline import RobotProcessorPipeline
local_urdf_path = tmp_path / "so101.urdf"
if not local_urdf_path.exists():
pytest.skip("URDF not available; skipping Hub loading test")
pipeline = RobotProcessorPipeline.from_pretrained(
"lerobot/so101-leader-fk-action-pipeline",
overrides={"step_0": {"urdf_path": str(local_urdf_path)}},
)
teleop = MockTeleop()
teleop.set_output_pipeline(pipeline)
# Pipeline-transformed features should differ from raw features (EE vs joints)
assert teleop.action_features != teleop.raw_action_features
+433
View File
@@ -0,0 +1,433 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unit tests for the robot/teleoperator pipeline interface.
Tests cover:
- Default (identity) pipeline behaviour
- Custom pipeline attachment via set_output_pipeline / set_input_pipeline
- Auto-derived observation_features / action_features via pipelines
- Compatibility checks
- build_dataset_features utility
"""
import warnings
from dataclasses import dataclass
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from lerobot.configs.types import PipelineFeatureType
from lerobot.processor import RobotAction, RobotObservation
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
robot_action_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.processor.factory import (
_make_identity_feedback_pipeline,
_make_identity_observation_pipeline,
_make_identity_robot_action_pipeline,
_make_identity_teleop_action_pipeline,
)
from lerobot.processor.pipeline import (
IdentityProcessorStep,
ObservationProcessorStep,
RobotActionProcessorStep,
RobotProcessorPipeline,
)
from lerobot.robots.robot import Robot
from lerobot.teleoperators.teleoperator import Teleoperator
from lerobot.utils.pipeline_utils import (
build_dataset_features,
check_action_space_compatibility,
check_observation_space_compatibility,
)
# ─── Mock hardware classes ────────────────────────────────────────────────────
@dataclass
class MockRobotConfig:
id: str = "mock_robot"
calibration_dir: Path | None = None
@dataclass
class MockTeleopConfig:
id: str = "mock_teleop"
calibration_dir: Path | None = None
_JOINT_NAMES = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"]
_JOINT_FEATURES = {f"{j}.pos": float for j in _JOINT_NAMES}
_EE_FEATURES = {"ee.x": float, "ee.y": float, "ee.z": float, "ee.wx": float, "ee.wy": float, "ee.wz": float, "ee.gripper_vel": float}
class MockRobot(Robot):
"""Minimal Robot that stores last action for assertion."""
config_class = MockRobotConfig
name = "mock_robot"
def __init__(self):
# bypass filesystem calibration setup; initialize with identity pipelines directly
self._output_pipeline = _make_identity_observation_pipeline()
self._input_pipeline = _make_identity_robot_action_pipeline()
self._last_raw_obs: RobotObservation = {}
self._last_sent: RobotAction = {}
@property
def raw_observation_features(self) -> dict:
return {**_JOINT_FEATURES, "camera": (480, 640, 3)}
@property
def raw_action_features(self) -> dict:
return _JOINT_FEATURES
@property
def is_connected(self) -> bool:
return True
def connect(self, calibrate=True):
pass
@property
def is_calibrated(self) -> bool:
return True
def calibrate(self):
pass
def configure(self):
pass
def _get_observation(self) -> RobotObservation:
return {f"{j}.pos": float(i) for i, j in enumerate(_JOINT_NAMES)} | {"camera": None}
def _send_action(self, action: RobotAction) -> RobotAction:
self._last_sent = action
return action
def disconnect(self):
pass
class MockTeleop(Teleoperator):
"""Minimal Teleoperator."""
config_class = MockTeleopConfig
name = "mock_teleop"
def __init__(self):
# bypass filesystem calibration setup; initialize with identity pipelines directly
self._output_pipeline = _make_identity_teleop_action_pipeline()
self._input_pipeline = _make_identity_feedback_pipeline()
@property
def raw_action_features(self) -> dict:
return _JOINT_FEATURES
@property
def raw_feedback_features(self) -> dict:
return {}
@property
def is_connected(self) -> bool:
return True
def connect(self, calibrate=True):
pass
@property
def is_calibrated(self) -> bool:
return True
def calibrate(self):
pass
def configure(self):
pass
def _get_action(self) -> RobotAction:
return {f"{j}.pos": float(i) for i, j in enumerate(_JOINT_NAMES)}
def _send_feedback(self, feedback):
pass
def disconnect(self):
pass
# ─── Simple transform step (doubles all float values) ────────────────────────
class DoubleActionStep(RobotActionProcessorStep):
"""Doubles all float action values."""
def action(self, action: RobotAction) -> RobotAction:
return {k: v * 2 for k, v in action.items()}
def transform_features(self, features):
return features
class RenameToEEObsStep(ObservationProcessorStep):
"""Renames joint obs keys to EE-like keys for testing transform_features."""
def observation(self, obs: RobotObservation) -> RobotObservation:
return {f"ee.{i}": v for i, v in enumerate(obs.values()) if isinstance(v, float)}
def transform_features(self, features):
obs = features.get(PipelineFeatureType.OBSERVATION, {})
new_obs = {f"ee.{i}": float for i in range(len([v for v in obs.values() if v == float]))}
return {**features, PipelineFeatureType.OBSERVATION: new_obs}
# ─── Tests: Robot pipeline interface ─────────────────────────────────────────
def test_robot_default_pipeline_is_identity():
"""With no custom pipeline, get_observation returns the same as _get_observation."""
robot = MockRobot()
raw = robot._get_observation()
obs = robot.get_observation()
assert obs == raw
def test_robot_observation_caches_last_raw():
"""get_observation caches raw result for IK use in send_action."""
robot = MockRobot()
robot.get_observation()
assert robot._last_raw_obs is not None
assert "shoulder_pan.pos" in robot._last_raw_obs
def test_robot_default_send_action_is_identity():
"""With no custom pipeline, send_action passes action unchanged to _send_action."""
robot = MockRobot()
robot.get_observation() # populate _last_raw_obs
action = {f"{j}.pos": 1.0 for j in _JOINT_NAMES}
sent = robot.send_action(action)
assert sent == action
assert robot._last_sent == action
def test_robot_custom_output_pipeline_applied():
"""A custom action pipeline is applied to the action before _send_action."""
robot = MockRobot()
double_pipeline = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[DoubleActionStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
robot.set_input_pipeline(double_pipeline)
robot.get_observation() # populate _last_raw_obs
action = {f"{j}.pos": 1.0 for j in _JOINT_NAMES}
robot.send_action(action)
assert all(v == 2.0 for v in robot._last_sent.values())
def test_robot_observation_features_identity_matches_raw():
"""observation_features equals raw_observation_features with identity pipeline."""
robot = MockRobot()
assert robot.observation_features == robot.raw_observation_features
def test_robot_raw_observation_features_unchanged_after_pipeline():
"""raw_observation_features is unaffected by the output pipeline."""
robot = MockRobot()
# Even with an FK-like renaming pipeline, raw_observation_features stays the same
transform_pipeline = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[RenameToEEObsStep()],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
robot.set_output_pipeline(transform_pipeline)
# raw should still be joints + camera
raw = robot.raw_observation_features
assert "shoulder_pan.pos" in raw
assert "camera" in raw
def test_robot_action_features_identity_matches_raw():
"""action_features equals raw_action_features with identity input pipeline."""
robot = MockRobot()
assert robot.action_features == robot.raw_action_features
def test_robot_raw_action_features_unchanged_after_pipeline():
"""raw_action_features is unaffected by any pipeline."""
robot = MockRobot()
double_pipeline = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[DoubleActionStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
robot.set_input_pipeline(double_pipeline)
assert robot.raw_action_features == _JOINT_FEATURES
def test_robot_set_output_pipeline_replaces_identity():
"""set_output_pipeline replaces the default identity."""
robot = MockRobot()
p = _make_identity_observation_pipeline()
robot.set_output_pipeline(p)
assert robot._output_pipeline is p
def test_robot_set_input_pipeline_replaces_identity():
robot = MockRobot()
p = _make_identity_robot_action_pipeline()
robot.set_input_pipeline(p)
assert robot._input_pipeline is p
# ─── Tests: Teleoperator pipeline interface ───────────────────────────────────
def test_teleop_default_get_action_is_identity():
"""With no custom pipeline, get_action returns the same as _get_action."""
teleop = MockTeleop()
raw = teleop._get_action()
action = teleop.get_action()
assert action == raw
def test_teleop_action_features_identity_matches_raw():
"""action_features equals raw_action_features with identity pipeline."""
teleop = MockTeleop()
assert teleop.action_features == teleop.raw_action_features
def test_teleop_feedback_features_identity_matches_raw():
"""feedback_features equals raw_feedback_features with identity input pipeline."""
teleop = MockTeleop()
assert teleop.feedback_features == teleop.raw_feedback_features
def test_teleop_feedback_features_empty_when_raw_empty():
"""feedback_features returns empty dict when raw_feedback_features is empty."""
teleop = MockTeleop()
assert teleop.feedback_features == {}
def test_teleop_set_output_pipeline():
teleop = MockTeleop()
p = _make_identity_teleop_action_pipeline()
teleop.set_output_pipeline(p)
assert teleop._output_pipeline is p
def test_teleop_send_feedback_calls_send_feedback_impl():
"""send_feedback applies identity pipeline and delegates to _send_feedback."""
teleop = MockTeleop()
received = {}
def capture(fb):
received.update(fb)
teleop._send_feedback = capture
teleop.send_feedback({"key": 1.0})
assert received == {"key": 1.0}
# ─── Tests: Compatibility checks ─────────────────────────────────────────────
def test_check_action_space_compatibility_matching():
"""No warning when teleop output and robot action features match."""
teleop = MockTeleop()
robot = MockRobot()
with warnings.catch_warnings():
warnings.simplefilter("error")
check_action_space_compatibility(teleop, robot) # should not warn
def test_check_action_space_compatibility_mismatch_warns():
"""Warning issued when teleop and robot action features differ."""
class EETeleop(MockTeleop):
@property
def raw_action_features(self):
return _EE_FEATURES
teleop = EETeleop()
robot = MockRobot() # still returns joint features
with pytest.warns(UserWarning, match="Action space mismatch"):
check_action_space_compatibility(teleop, robot)
def test_check_observation_space_compatibility_no_feedback():
"""No warning when teleop has empty feedback_features."""
robot = MockRobot()
teleop = MockTeleop()
with warnings.catch_warnings():
warnings.simplefilter("error")
check_observation_space_compatibility(robot, teleop) # empty feedback → no warning
# ─── Tests: build_dataset_features ───────────────────────────────────────────
def test_build_dataset_features_identity():
"""With identity pipelines, dataset features contain joint keys."""
robot = MockRobot()
teleop = MockTeleop()
features = build_dataset_features(robot, teleop, use_videos=False)
# Should contain action features (joint names)
action_keys = {k for k in features if "action" in k or any(j in k for j in _JOINT_NAMES)}
assert len(action_keys) > 0
def test_build_dataset_features_includes_images_when_use_videos_true():
"""Image features are included when use_videos=True."""
robot = MockRobot()
teleop = MockTeleop()
feats_with = build_dataset_features(robot, teleop, use_videos=True)
feats_without = build_dataset_features(robot, teleop, use_videos=False)
# With videos should have more features (camera)
assert len(feats_with) >= len(feats_without)
# ─── Tests: Factory identity pipeline helpers ─────────────────────────────────
def test_make_identity_observation_pipeline_is_noop():
pipeline = _make_identity_observation_pipeline()
obs = {"shoulder_pan.pos": 1.0, "camera": None}
result = pipeline(obs)
assert result == obs
def test_make_identity_robot_action_pipeline_is_noop():
pipeline = _make_identity_robot_action_pipeline()
action = {"shoulder_pan.pos": 1.0}
obs = {"shoulder_pan.pos": 0.0}
result = pipeline((action, obs))
assert result == action
def test_make_identity_teleop_action_pipeline_is_noop():
pipeline = _make_identity_teleop_action_pipeline()
action = {"shoulder_pan.pos": 1.0}
result = pipeline(action)
assert result == action