From fe2c32d9e7fc65983f3ba493ffa856b96572d98e Mon Sep 17 00:00:00 2001 From: Khalil Meftah Date: Tue, 28 Apr 2026 16:53:36 +0200 Subject: [PATCH] add so leader arm --- .../so100_to_so100_EE_deltas/teleoperate.py | 365 ++++++++++++++++++ src/lerobot/envs/configs.py | 1 + src/lerobot/processor/__init__.py | 2 + .../processor/delta_action_processor.py | 48 ++- src/lerobot/processor/hil_processor.py | 9 + .../processor/leader_follower_processor.py | 243 ++++++++++++ .../so_follower/robot_kinematic_processor.py | 78 +++- .../teleoperators/so_leader/__init__.py | 1 + .../so_leader/config_so_leader.py | 5 + .../so_leader/so101_leader_follower.py | 259 +++++++++++++ src/lerobot/teleoperators/utils.py | 5 +- 11 files changed, 999 insertions(+), 17 deletions(-) create mode 100644 examples/so100_to_so100_EE_deltas/teleoperate.py create mode 100644 src/lerobot/processor/leader_follower_processor.py create mode 100644 src/lerobot/teleoperators/so_leader/so101_leader_follower.py diff --git a/examples/so100_to_so100_EE_deltas/teleoperate.py b/examples/so100_to_so100_EE_deltas/teleoperate.py new file mode 100644 index 000000000..dd267deff --- /dev/null +++ b/examples/so100_to_so100_EE_deltas/teleoperate.py @@ -0,0 +1,365 @@ +# !/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from dataclasses import dataclass + +import numpy as np +import torch + +from lerobot.configs.types import PipelineFeatureType, PolicyFeature +from lerobot.model.kinematics import RobotKinematics +from lerobot.processor import ( + ProcessorStepRegistry, + RobotAction, + RobotActionProcessorStep, + RobotObservation, + RobotProcessorPipeline, + TransitionKey, +) +from lerobot.processor.converters import ( + create_transition, + identity_transition, +) +from lerobot.robots.robot import Robot +from lerobot.robots.so100_follower.robot_kinematic_processor import ( + EEBoundsAndSafety, + EEReferenceAndDelta, + GripperVelocityToJoint, + InverseKinematicsRLStep, +) +from lerobot.robots.so101_follower.config_so101_follower import SO101FollowerConfig +from lerobot.robots.so101_follower.so101_follower import SO101Follower +from lerobot.teleoperators.so101_leader.config_so101_leader import SO101LeaderConfig +from lerobot.teleoperators.so101_leader.so101_leader import SO101Leader +from lerobot.utils.robot_utils import precise_sleep +from lerobot.utils.rotation import Rotation + + +def reset_follower_position(robot_arm: Robot, target_position: np.ndarray) -> None: + """Reset robot arm to target position using smooth trajectory.""" + current_position_dict = robot_arm.bus.sync_read("Present_Position") + current_position = np.array( + [current_position_dict[name] for name in current_position_dict], + dtype=np.float32, + ) + trajectory = torch.from_numpy( + np.linspace(current_position, target_position, 50) + ) # NOTE: 30 is just an arbitrary number + for pose in trajectory: + action_dict = dict(zip(current_position_dict, pose, strict=False)) + robot_arm.bus.sync_write("Goal_Position", action_dict) + precise_sleep(0.015) + + +@dataclass +class LogRobotAction(RobotActionProcessorStep): + def action(self, action: RobotAction) -> RobotAction: + print(f"Robot action: {action}") + return action + + def transform_features(self, features): + # features[PipelineFeatureType.ACTION][ACTION] = PolicyFeature( + # type=FeatureType.ACTION, shape=(len(self.motor_names),) + # ) + return features + + +@ProcessorStepRegistry.register("forward_kinematics_joints_to_ee_target_action") +@dataclass +class ForwardKinematicsJointsToEETargetAction(RobotActionProcessorStep): + """ + Computes the end-effector pose from joint positions using forward kinematics (FK). + + This step is typically used to add the robot's Cartesian pose to the observation space, + which can be useful for visualization or as an input to a policy. + + Attributes: + kinematics: The robot's kinematic model. + """ + + kinematics: RobotKinematics + motor_names: list[str] + end_effector_step_sizes: dict + max_gripper_pos: float + use_ik_solution: bool = False + + def action(self, action: RobotAction) -> RobotAction: + # return compute_forward_kinematics_joints_to_ee(action, self.kinematics, self.motor_names) + teleop_action = action + raw_joint_pos = self.transition.get(TransitionKey.OBSERVATION) + + leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names]) + + leader_ee = self.kinematics.forward_kinematics(leader_pos) + + if self.use_ik_solution and "IK_solution" in self.transition.get(TransitionKey.COMPLEMENTARY_DATA): + follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"] + else: + follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names]) + + follower_ee = self.kinematics.forward_kinematics(follower_pos) + + follower_ee_pos = follower_ee[:3, 3] + follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec() + # follower_gripper_pos = raw_joint_pos["gripper.pos"] + follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor + + leader_ee_pos = leader_ee[:3, 3] + leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec() + leader_gripper_pos = np.clip( + teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos + ) + + print("f pos:", follower_ee_pos) + print("l pos:", leader_ee_pos) + + print("f rvec:", follower_ee_rvec) + print("l rvec:", leader_ee_rvec) + + # follower_ee_pos = follower_ee[:3, 3] + # follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec() + + delta_pos = leader_ee_pos - follower_ee_pos + + # For rotation: compute relative rotation from follower to leader + # R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader + r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3] + delta_rvec = Rotation.from_matrix(r_delta).as_rotvec() + delta_gripper = leader_gripper_pos - follower_gripper_pos + + desired = np.eye(4, dtype=float) + desired[:3, :3] = follower_ee[:3, :3] @ r_delta + desired[:3, 3] = follower_ee[:3, 3] + delta_pos + + pos = desired[:3, 3] + tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec() + + assert np.allclose(pos, leader_ee_pos), "Position delta computation error" + assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error" + assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), ( + "Gripper delta computation error" + ) + + # Normalize the action to the range [-1, 1] + delta_pos = delta_pos / np.array( + [ + self.end_effector_step_sizes["x"], + self.end_effector_step_sizes["y"], + self.end_effector_step_sizes["z"], + ] + ) + delta_rvec = delta_rvec / np.array( + [ + self.end_effector_step_sizes["wx"], + self.end_effector_step_sizes["wy"], + self.end_effector_step_sizes["wz"], + ] + ) + + # Check if any of the normalized deltas exceed 1.0 + + max_normalized_pos = max( + abs(delta_pos[0]), + abs(delta_pos[1]), + abs(delta_pos[2]), + ) + + max_normalized_rot = max( + abs(delta_rvec[0]), + abs(delta_rvec[1]), + abs(delta_rvec[2]), + ) + + # Use the same scaling factor for both position and rotation + max_normalized = max(max_normalized_pos, max_normalized_rot) + if max_normalized > 1.0: + print(f"Warning: EE delta too large, scaling. Max normalized delta: {max_normalized_pos}") + print(f"Original delta_pos: {delta_pos}, delta_rvec: {delta_rvec}") + # Scale proportionally + delta_pos = delta_pos / max_normalized + delta_rvec = delta_rvec / max_normalized + + new_action = {} + new_action["enabled"] = True + new_action["target_x"] = float(delta_pos[0]) + new_action["target_y"] = float(delta_pos[1]) + new_action["target_z"] = float(delta_pos[2]) + new_action["target_wx"] = float(delta_rvec[0]) + new_action["target_wy"] = float(delta_rvec[1]) + new_action["target_wz"] = float(delta_rvec[2]) + new_action["gripper_vel"] = float( + np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) / self.max_gripper_pos + ) + return new_action + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + # TODO: implement feature transformation + return features + + +FPS = 20 + +# Initialize the robot and teleoperator config +follower_config = SO101FollowerConfig(port="/dev/usb_follower_arm_a", id="follower_arm_a", use_degrees=True) +leader_config = SO101LeaderConfig(port="/dev/usb_leader_arm_a", id="leader_arm_a", use_degrees=True) + +# Initialize the robot and teleoperator +follower = SO101Follower(follower_config) +leader = SO101Leader(leader_config) + +# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +follower_kinematics_solver = RobotKinematics( + urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf", + target_frame_name="gripper_frame_link", + joint_names=list(follower.bus.motors.keys()), +) + +# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +leader_kinematics_solver = RobotKinematics( + urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf", + target_frame_name="gripper_frame_link", + joint_names=list(leader.bus.motors.keys()), +) + +end_effector_step_sizes = { + "x": 0.004, + "y": 0.004, + "z": 0.004, + "wx": 5 * np.pi / 180, + "wy": 5 * np.pi / 180, + "wz": 5 * np.pi / 180, +} + + +# Build pipeline to convert teleop joints to EE action +leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction]( + steps=[ + LogRobotAction(), + ForwardKinematicsJointsToEETargetAction( + kinematics=leader_kinematics_solver, + motor_names=list(leader.bus.motors.keys()), + end_effector_step_sizes=end_effector_step_sizes, + max_gripper_pos=30.0, + use_ik_solution=True, + ), + LogRobotAction(), + ], + to_transition=identity_transition, + to_output=identity_transition, +) + +# build pipeline to convert EE action to robot joints +ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( + [ + LogRobotAction(), + EEReferenceAndDelta( + kinematics=follower_kinematics_solver, + # end_effector_step_sizes={"x": 0.006, "y": 0.01, "z": 0.005}, + end_effector_step_sizes=end_effector_step_sizes, + motor_names=list(follower.bus.motors.keys()), + use_latched_reference=False, + use_ik_solution=True, + ), + LogRobotAction(), + EEBoundsAndSafety( + end_effector_bounds={ + "min": [-0.05, -0.55, -0.0075], + "max": [0.55, 0.55, 0.55], + }, + # end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}, + max_ee_step_m=0.05, + ), + LogRobotAction(), + GripperVelocityToJoint( + clip_max=30.0, + speed_factor=0.2, + discrete_gripper=False, + scale_velocity=True, + use_ik_solution=True, + ), + LogRobotAction(), + InverseKinematicsRLStep( + kinematics=follower_kinematics_solver, + motor_names=list(follower.bus.motors.keys()), + initial_guess_current_joints=False, + ), + LogRobotAction(), + ], + to_transition=identity_transition, + to_output=identity_transition, +) + +# Connect to the robot and teleoperator +follower.connect() +leader.connect() + +reset_pose = [0.0, 10, 20, 60.00, 90.00, 10.00] + +start_time = time.perf_counter() +reset_follower_position(follower, np.array(reset_pose)) +reset_follower_position(leader, np.array(reset_pose)) +precise_sleep(5.0 - (time.perf_counter() - start_time)) +# time.sleep(10) +leader.bus.sync_write("Torque_Enable", 0) + +# Init rerun viewer +# init_rerun(session_name="so100_so100_EE_teleop") + +transition = None + +print("Starting teleop loop...") +while True: + print("New loop iteration") + t0 = time.perf_counter() + + # Get robot observation + robot_obs = follower.get_observation() + + # Get teleop observation + leader_joints_obs = leader.get_action() + + # teleop joints -> teleop EE action + if transition is None: + transition = create_transition(action=leader_joints_obs, observation=robot_obs) + else: + transition = create_transition( + action=leader_joints_obs, + observation=robot_obs, + complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA), + ) + + transition = leader_to_ee(transition) + leader_ee_act = transition[TransitionKey.ACTION] + + # teleop EE -> robot joints + transition = create_transition( + action=leader_ee_act, + observation=robot_obs, + complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA), + ) + transition = ee_to_follower_joints(transition) + follower_joints_act = transition[TransitionKey.ACTION] + + # Send action to robot + _ = follower.send_action(follower_joints_act) + + # Visualize + # log_rerun_data(observation=leader_ee_act, action=follower_joints_act) + + precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0)) diff --git a/src/lerobot/envs/configs.py b/src/lerobot/envs/configs.py index 84c40472f..002277261 100644 --- a/src/lerobot/envs/configs.py +++ b/src/lerobot/envs/configs.py @@ -299,6 +299,7 @@ class HILSerlProcessorConfig: inverse_kinematics: InverseKinematicsConfig | None = None reward_classifier: RewardClassifierConfig | None = None max_gripper_pos: float | None = 100.0 + gripper_speed_factor: float | None = None @EnvConfig.register_subclass(name="gym_manipulator") diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 3688a4b8c..673587360 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -61,6 +61,7 @@ from .hil_processor import ( RewardClassifierProcessorStep, TimeLimitProcessorStep, ) +from .leader_follower_processor import LeaderFollowerProcessor from .newline_task_processor import NewLineTaskProcessorStep from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep, hotswap_stats from .observation_processor import VanillaObservationProcessorStep @@ -122,6 +123,7 @@ __all__ = [ "ImageCropResizeProcessorStep", "InfoProcessorStep", "InterventionActionProcessorStep", + "LeaderFollowerProcessor", "make_default_processors", "make_default_teleop_action_processor", "make_default_robot_action_processor", diff --git a/src/lerobot/processor/delta_action_processor.py b/src/lerobot/processor/delta_action_processor.py index 86b2feec1..983454435 100644 --- a/src/lerobot/processor/delta_action_processor.py +++ b/src/lerobot/processor/delta_action_processor.py @@ -38,6 +38,7 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep): """ use_gripper: bool = True + use_rotation: bool = False def action(self, action: PolicyAction) -> RobotAction: if not isinstance(action, PolicyAction): @@ -52,7 +53,13 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep): "delta_y": action[1].item(), "delta_z": action[2].item(), } - if self.use_gripper: + if self.use_rotation: + delta_action["delta_wx"] = action[3].item() + delta_action["delta_wy"] = action[4].item() + delta_action["delta_wz"] = action[5].item() + if self.use_gripper: + delta_action["gripper"] = action[6].item() + elif self.use_gripper: delta_action["gripper"] = action[3].item() return delta_action @@ -64,6 +71,12 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep): type=FeatureType.ACTION, shape=(1,) ) + if self.use_rotation: + for axis in ["wx", "wy", "wz"]: + features[PipelineFeatureType.ACTION][f"delta_{axis}"] = PolicyFeature( + type=FeatureType.ACTION, shape=(1,) + ) + if self.use_gripper: features[PipelineFeatureType.ACTION]["gripper"] = PolicyFeature( type=FeatureType.ACTION, shape=(1,) @@ -90,6 +103,8 @@ class MapDeltaActionToRobotActionStep(RobotActionProcessorStep): # Scale factors for delta movements position_scale: float = 1.0 noise_threshold: float = 1e-3 # 1 mm threshold to filter out noise + use_rotation: bool = False + rotation_scale: float = 1.0 def action(self, action: RobotAction) -> RobotAction: # NOTE (maractingi): Action can be a dict from the teleop_devices or a tensor from the policy @@ -97,23 +112,34 @@ class MapDeltaActionToRobotActionStep(RobotActionProcessorStep): delta_x = action.pop("delta_x") delta_y = action.pop("delta_y") delta_z = action.pop("delta_z") + if self.use_rotation: + delta_wx = action.pop("delta_wx") + delta_wy = action.pop("delta_wy") + delta_wz = action.pop("delta_wz") + else: + delta_wx = 0.0 + delta_wy = 0.0 + delta_wz = 0.0 gripper = action.pop("gripper") # Determine if the teleoperator is actively providing input # Consider enabled if any significant movement delta is detected position_magnitude = (delta_x**2 + delta_y**2 + delta_z**2) ** 0.5 # Use Euclidean norm for position - enabled = position_magnitude > self.noise_threshold # Small threshold to avoid noise + rotation_magnitude = ( + delta_wx**2 + delta_wy**2 + delta_wz**2 + ) ** 0.5 # TODO use proper magnitud for rotation + enabled = ( + position_magnitude > self.noise_threshold or rotation_magnitude > self.noise_threshold + ) # Small threshold to avoid noise # Scale the deltas appropriately scaled_delta_x = delta_x * self.position_scale scaled_delta_y = delta_y * self.position_scale scaled_delta_z = delta_z * self.position_scale - # For gamepad/keyboard, we don't have rotation input, so set to 0 - # These could be extended in the future for more sophisticated teleoperators - target_wx = 0.0 - target_wy = 0.0 - target_wz = 0.0 + target_wx = delta_wx * self.rotation_scale + target_wy = delta_wy * self.rotation_scale + target_wz = delta_wz * self.rotation_scale # Update action with robot target format action = { @@ -132,9 +158,15 @@ class MapDeltaActionToRobotActionStep(RobotActionProcessorStep): def transform_features( self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: - for axis in ["x", "y", "z", "gripper"]: + for axis in ["x", "y", "z"]: features[PipelineFeatureType.ACTION].pop(f"delta_{axis}", None) + if self.use_rotation: + for axis in ["wx", "wy", "wz"]: + features[PipelineFeatureType.ACTION].pop(f"delta_{axis}", None) + + features[PipelineFeatureType.ACTION].pop("delta_gripper", None) + for feat in ["enabled", "target_x", "target_y", "target_z", "target_wx", "target_wy", "target_wz"]: features[PipelineFeatureType.ACTION][f"{feat}"] = PolicyFeature( type=FeatureType.ACTION, shape=(1,) diff --git a/src/lerobot/processor/hil_processor.py b/src/lerobot/processor/hil_processor.py index c17441c46..db822d5b7 100644 --- a/src/lerobot/processor/hil_processor.py +++ b/src/lerobot/processor/hil_processor.py @@ -461,6 +461,7 @@ class InterventionActionProcessorStep(ProcessorStep): use_gripper: bool = False terminate_on_success: bool = True + use_rotation: bool = False def __call__(self, transition: EnvTransition) -> EnvTransition: """ @@ -497,6 +498,14 @@ class InterventionActionProcessorStep(ProcessorStep): teleop_action.get("delta_y", 0.0), teleop_action.get("delta_z", 0.0), ] + if self.use_rotation: + action_list.extend( + [ + teleop_action.get("delta_wx", 0.0), + teleop_action.get("delta_wy", 0.0), + teleop_action.get("delta_wz", 0.0), + ] + ) if self.use_gripper: action_list.append(teleop_action.get(GRIPPER_KEY, 1.0)) elif isinstance(teleop_action, np.ndarray): diff --git a/src/lerobot/processor/leader_follower_processor.py b/src/lerobot/processor/leader_follower_processor.py new file mode 100644 index 000000000..df2f66235 --- /dev/null +++ b/src/lerobot/processor/leader_follower_processor.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +import numpy as np +import torch + +from lerobot.configs.types import PipelineFeatureType, PolicyFeature +from lerobot.model.kinematics import RobotKinematics +from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey +from lerobot.robots import Robot +from lerobot.teleoperators import Teleoperator +from lerobot.teleoperators.utils import TeleopEvents +from lerobot.utils.rotation import Rotation + +from .pipeline import ProcessorStep + + +@ProcessorStepRegistry.register("leader_follower_processor") +@dataclass +class LeaderFollowerProcessor(ProcessorStep): + """ + Processor for leader-follower teleoperation mode. + + This processor: + 1. Sends follower positions to leader arm when not intervening + 2. Computes EE delta actions from leader when intervening + 3. Handles teleop events from the leader device + """ + + leader_device: Teleoperator + motor_names: list[str] + robot: Robot + kinematics: RobotKinematics + end_effector_step_sizes: np.ndarray | None = None + use_gripper: bool = True + # prev_leader_gripper: float | None = None + max_gripper_pos: float = 100.0 + use_ik_solution: bool = False + + def __call__(self, transition: EnvTransition) -> EnvTransition: + """Process transition with leader-follower logic.""" + # Get current follower position from complementary data + # raw_joint_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get("raw_joint_positions") + raw_joint_pos = transition.get(TransitionKey.OBSERVATION) + if raw_joint_pos is not None: + # Send follower position to leader (for follow mode) + # follower_action = { + # f"{motor}.pos": float(raw_joint_pos[motor]) + # for motor in self.motor_names + # } + self.leader_device.send_action(raw_joint_pos) + + # Only compute EE action if intervention is active + # (AddTeleopEventsAsInfo already added IS_INTERVENTION to info) + info = transition.get(TransitionKey.INFO, {}) + if info.get(TeleopEvents.IS_INTERVENTION, False): + # Get leader joint positions from teleop_action + # (AddTeleopActionAsComplimentaryData already got the action) + complementary = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) + teleop_action = complementary.get("teleop_action", {}) + + if isinstance(teleop_action, dict) and raw_joint_pos is not None: + leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names]) + + leader_ee = self.kinematics.forward_kinematics(leader_pos) + + if self.use_ik_solution and "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA): + follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"] + else: + follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names]) + + follower_ee = self.kinematics.forward_kinematics(follower_pos) + + # follower_gripper_pos = raw_joint_pos["gripper.pos"] + follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor + + leader_ee_pos = leader_ee[:3, 3] + leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec() + leader_gripper_pos = np.clip( + teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos + ) + + follower_ee_pos = follower_ee[:3, 3] + # follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec() + + delta_pos = leader_ee_pos - follower_ee_pos + + # For rotation: compute relative rotation from follower to leader + # R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader + r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3] + delta_rvec = Rotation.from_matrix(r_delta).as_rotvec() + + delta_gripper = leader_gripper_pos - follower_gripper_pos + + desired = np.eye(4, dtype=float) + desired[:3, :3] = follower_ee[:3, :3] @ r_delta + desired[:3, 3] = follower_ee[:3, 3] + delta_pos + + pos = desired[:3, 3] + tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec() + + assert np.allclose(pos, leader_ee_pos), "Position delta computation error" + assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error" + assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), ( + "Gripper delta computation error" + ) + + # Normalize the action to the range [-1, 1] + delta_pos = delta_pos / np.array( + [ + self.end_effector_step_sizes["x"], + self.end_effector_step_sizes["y"], + self.end_effector_step_sizes["z"], + ] + ) + delta_rvec = delta_rvec / np.array( + [ + self.end_effector_step_sizes["wx"], + self.end_effector_step_sizes["wy"], + self.end_effector_step_sizes["wz"], + ] + ) + max_normalized_pos = max( + abs(delta_pos[0]), + abs(delta_pos[1]), + abs(delta_pos[2]), + ) + + normalized_rot = max(abs(delta_rvec[0]), abs(delta_rvec[1]), abs(delta_rvec[2])) + + max_normalized = max(max_normalized_pos, normalized_rot) + + if max_normalized > 1.0: + # Scale proportionally + delta_pos = delta_pos / max_normalized + delta_rvec = delta_rvec / max_normalized + + intervention_action = np.array( + [ + delta_pos[0], + delta_pos[1], + delta_pos[2], + delta_rvec[0], + delta_rvec[1], + delta_rvec[2], + np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) + / self.max_gripper_pos, + ], + dtype=float, + ) + + # # Extract leader positions from teleop action dict + # # leader_pos = np.array([teleop_action.get(f"{motor}.pos", 0) for motor in self.motor_names]) + # # follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names]) + + # teleop_action = self.leader_device.bus.sync_read("Present_Position") + # raw_joint_pos = self.robot.bus.sync_read("Present_Position") + # leader_pos = np.array([teleop_action.get(f"{motor}", 0) for motor in self.motor_names]) + # follower_pos = np.array([raw_joint_pos[f"{motor}"] for motor in self.motor_names]) + + # # Compute EE positions + # leader_ee_fi = self.kinematics.forward_kinematics(leader_pos) + # leader_ee_pos = leader_ee_fi[:3, 3] + # # leader_ee_rot = Rotation.from_matrix(leader_ee_fi[:3, :3]).as_rotvec() + # leader_ee = np.concat([leader_ee_pos, [0,0,0]]) + + # if "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA): + # follower_ee = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"] + # else: + # follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names]) + # follower_ee_fi = self.kinematics.forward_kinematics(follower_pos) + # follower_ee_pos = follower_ee_fi[:3, 3] + # # follower_ee_rot = Rotation.from_matrix(follower_ee_fi[:3, :3]).as_rotvec() + # follower_ee = np.concat([follower_ee_pos, [0,0,0]]) + + # # Compute normalized EE delta + # if self.end_effector_step_sizes is not None: + # ee_delta = np.clip( + # leader_ee - follower_ee, + # -self.end_effector_step_sizes, + # self.end_effector_step_sizes + # ) + # ee_delta_normalized = ee_delta / self.end_effector_step_sizes + # else: + # ee_delta_normalized = leader_ee - follower_ee + + # # Handle gripper + # if self.use_gripper and len(leader_pos) > 3: + # if self.prev_leader_gripper is None: + # self.prev_leader_gripper = np.clip( + # leader_pos[-1], 0, self.max_gripper_pos + # ) + + # leader_gripper = leader_pos[-1] + # gripper_delta = leader_gripper - self.prev_leader_gripper + # normalized_delta = gripper_delta / self.max_gripper_pos + + # # Quantize gripper action + # if normalized_delta >= 0.3: + # gripper_action = 2 + # elif normalized_delta <= -0.1: + # gripper_action = 0 + # else: + # gripper_action = 1 + + # self.prev_leader_gripper = leader_gripper + + # # Create intervention action + # intervention_action = np.append(ee_delta_normalized, gripper_action) + # else: + # intervention_action = ee_delta_normalized + + # # Override teleop_action with computed EE action + complementary["teleop_action"] = torch.from_numpy(intervention_action).float() + transition[TransitionKey.COMPLEMENTARY_DATA] = complementary # type: ignore[misc] + + return transition + + def reset(self) -> None: + """Reset leader-follower state.""" + # self.prev_leader_gripper = None + if hasattr(self.leader_device, "reset"): + self.leader_device.reset() + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + return features diff --git a/src/lerobot/robots/so_follower/robot_kinematic_processor.py b/src/lerobot/robots/so_follower/robot_kinematic_processor.py index a95343b2d..661ec4b3c 100644 --- a/src/lerobot/robots/so_follower/robot_kinematic_processor.py +++ b/src/lerobot/robots/so_follower/robot_kinematic_processor.py @@ -18,6 +18,7 @@ from dataclasses import dataclass, field from typing import Any import numpy as np +import torch from lerobot.configs import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.model import RobotKinematics @@ -31,6 +32,7 @@ from lerobot.processor import ( RobotObservation, TransitionKey, ) +from lerobot.utils.constants import OBS_STATE from lerobot.utils.rotation import Rotation @@ -126,9 +128,18 @@ class EEReferenceAndDelta(RobotActionProcessorStep): ], dtype=float, ) - r_abs = Rotation.from_rotvec([wx, wy, wz]).as_matrix() + delta_r = np.array( + [ + wx * self.end_effector_step_sizes.get("wx", 1), + wy * self.end_effector_step_sizes.get("wy", 1), + wz * self.end_effector_step_sizes.get("wz", 1), + ], + dtype=float, + ) + + r_mat = Rotation.from_rotvec(delta_r).as_matrix() desired = np.eye(4, dtype=float) - desired[:3, :3] = ref[:3, :3] @ r_abs + desired[:3, :3] = ref[:3, :3] @ r_mat desired[:3, 3] = ref[:3, 3] + delta_p self._command_when_disabled = desired.copy() @@ -361,6 +372,8 @@ class GripperVelocityToJoint(RobotActionProcessorStep): clip_min: float = 0.0 clip_max: float = 100.0 discrete_gripper: bool = False + scale_velocity: bool = False + use_ik_solution: bool = False def action(self, action: RobotAction) -> RobotAction: observation = self.transition.get(TransitionKey.OBSERVATION).copy() @@ -370,14 +383,17 @@ class GripperVelocityToJoint(RobotActionProcessorStep): if observation is None: raise ValueError("Joints observation is require for computing robot kinematics") - q_raw = np.array( - [float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")], - dtype=float, - ) + if self.use_ik_solution and "IK_solution" in self.transition.get(TransitionKey.COMPLEMENTARY_DATA): + q_raw = self.transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"] + else: + q_raw = np.array( + [float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")], + dtype=float, + ) if q_raw is None: raise ValueError("Joints observation is require for computing robot kinematics") - if self.discrete_gripper: + if self.discrete_gripper or self.scale_velocity: # Map discrete command {0=close, 1=stay, 2=open} -> signed velocity. # Negation accounts for SO100 sign (joint position increases on close). # 0 -> +clip_max (close), 1 -> 0 (stay), 2 -> -clip_max (open) @@ -579,6 +595,7 @@ class InverseKinematicsRLStep(ProcessorStep): # Compute inverse kinematics q_target = self.kinematics.inverse_kinematics(self.q_curr, t_des) + q_target[-1] = gripper_pos # Set gripper position self.q_curr = q_target # TODO: This is sentitive to order of motor_names = q_target mapping @@ -610,3 +627,50 @@ class InverseKinematicsRLStep(ProcessorStep): def reset(self): """Resets the initial guess for the IK solver.""" self.q_curr = None + + +@dataclass +@ProcessorStepRegistry.register("ee_observation") +class EEObservationStep(ObservationProcessorStep): + use_rotation: bool = False + + def observation(self, observation: dict) -> dict: + ee_pose_list = [ + observation["ee.x"], + observation["ee.y"], + observation["ee.z"], + ] + if self.use_rotation: + ee_pose_list.extend( + [ + observation["ee.wx"], + observation["ee.wy"], + observation["ee.wz"], + ] + ) + # gripper_pos = action.pop("ee.gripper_pos") + ee_pose = torch.tensor(ee_pose_list, dtype=torch.float32).unsqueeze(0) + + current_state = observation.get(OBS_STATE) + if current_state is None: + return observation + + extended_state = torch.cat([current_state, ee_pose], dim=-1) + + # Create new observation dict + new_observation = dict(observation) + new_observation[OBS_STATE] = extended_state + + return new_observation + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + if OBS_STATE in features[PipelineFeatureType.OBSERVATION]: + original_feature = features[PipelineFeatureType.OBSERVATION][OBS_STATE] + new_shape = (original_feature.shape[0] + 3,) + original_feature.shape[1:] + + features[PipelineFeatureType.OBSERVATION][OBS_STATE] = PolicyFeature( + type=original_feature.type, shape=new_shape + ) + return features diff --git a/src/lerobot/teleoperators/so_leader/__init__.py b/src/lerobot/teleoperators/so_leader/__init__.py index 26ef66677..2e57ee498 100644 --- a/src/lerobot/teleoperators/so_leader/__init__.py +++ b/src/lerobot/teleoperators/so_leader/__init__.py @@ -20,6 +20,7 @@ from .config_so_leader import ( SOLeaderConfig, SOLeaderTeleopConfig, ) +from .so101_leader_follower import SO101LeaderFollower from .so_leader import SO100Leader, SO101Leader, SOLeader __all__ = [ diff --git a/src/lerobot/teleoperators/so_leader/config_so_leader.py b/src/lerobot/teleoperators/so_leader/config_so_leader.py index 189303088..c9982bb21 100644 --- a/src/lerobot/teleoperators/so_leader/config_so_leader.py +++ b/src/lerobot/teleoperators/so_leader/config_so_leader.py @@ -29,6 +29,11 @@ class SOLeaderConfig: # Whether to use degrees for angles use_degrees: bool = True + # Enable leader-follower mode where leader can both lead and follow + leader_follower_mode: bool = False + + use_gripper: bool = True + @TeleoperatorConfig.register_subclass("so101_leader") @TeleoperatorConfig.register_subclass("so100_leader") diff --git a/src/lerobot/teleoperators/so_leader/so101_leader_follower.py b/src/lerobot/teleoperators/so_leader/so101_leader_follower.py new file mode 100644 index 000000000..8ab83ad40 --- /dev/null +++ b/src/lerobot/teleoperators/so_leader/so101_leader_follower.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import sys +import time +from collections import deque +from threading import Event, Thread + +import numpy as np + +from lerobot.teleoperators.so101_leader.so101_leader import SO101Leader +from lerobot.teleoperators.utils import TeleopEvents + +PYNPUT_AVAILABLE = True +try: + if ("DISPLAY" not in os.environ) and ("linux" in sys.platform): + logging.info("No DISPLAY set. Skipping pynput import.") + raise ImportError("pynput blocked intentionally due to no display.") + + from pynput import keyboard +except ImportError: + keyboard = None + PYNPUT_AVAILABLE = False +except Exception as e: + keyboard = None + PYNPUT_AVAILABLE = False + logging.info(f"Could not import pynput: {e}") + +logger = logging.getLogger(__name__) + + +class SO101LeaderFollower(SO101Leader): + """ + Extended SO101 Leader that can both lead (human control) and follow (mimic follower). + + This class adds leader-follower functionality where: + - In follow mode: The leader arm mimics the follower's position (torque enabled) + - In lead mode: Human controls the leader (torque disabled) and provides actions + """ + + def __init__(self, config): + super().__init__(config) + + # Leader-follower state + self.is_intervening = False + self.leader_torque_enabled = True + + # Tracking error for automatic intervention detection + self.leader_tracking_error_queue = deque(maxlen=4) + + # Keyboard event handling + self.keyboard_events = { + "intervention": False, + "success": False, + "failure": False, + "rerecord": False, + } + self.keyboard_thread = None + self.stop_event = Event() + + # Store last follower position for action computation + self.last_follower_pos = None + + @property + def action_features(self) -> dict: + if self.config.use_gripper: + return { + "dtype": "float32", + "shape": (7,), + "names": { + "delta_x": 0, + "delta_y": 1, + "delta_z": 2, + "delta_wx": 3, + "delta_wy": 4, + "delta_wz": 5, + "gripper": 6, + }, + } + else: + return { + "dtype": "float32", + "shape": (6,), + "names": { + "delta_x": 0, + "delta_y": 1, + "delta_z": 2, + "delta_wx": 3, + "delta_wy": 4, + "delta_wz": 5, + }, + } + + def connect(self, calibrate: bool = True) -> None: + """Connect and configure for leader-follower mode.""" + super().connect(calibrate) + + # Configure for leader-follower mode with lower gains + # Lower gains allow manual intervention without injury risk + # self.bus.sync_write("Torque_Enable", 1) + for motor in self.bus.motors: + self.bus.write("P_Coefficient", motor, 16) + self.bus.write("I_Coefficient", motor, 0) + self.bus.write("D_Coefficient", motor, 16) + + # Start keyboard listener + self._start_keyboard_listener() + + print("- Leader-Follower Mode:") + print(" - Press SPACE to toggle intervention (leader control)") + print(" - When not intervening, leader follows follower position") + print(" - When intervening, follower follows leader in end-effector space") + print(" - Press 's' to mark episode as success") + print(" - Press ESC to end episode as failure") + print(" - Press 'r' to re-record episode") + + def _start_keyboard_listener(self): + """Start keyboard listener thread for intervention control.""" + + def on_press(key): + try: + if key == keyboard.Key.space: + self.keyboard_events["intervention"] = not self.keyboard_events["intervention"] + self.is_intervening = self.keyboard_events["intervention"] + state = "INTERVENTION MODE" if self.is_intervening else "FOLLOWING MODE" + logger.info(f"Toggled to {state}") + elif key == keyboard.Key.esc: + self.keyboard_events["failure"] = True + elif hasattr(key, "char"): + if key.char == "s": + self.keyboard_events["success"] = True + elif key.char == "r": + self.keyboard_events["rerecord"] = True + except Exception as e: + logger.error(f"Error handling key press: {e}") + + def listen(): + with keyboard.Listener(on_press=on_press) as listener: + while not self.stop_event.is_set(): + time.sleep(0.1) + listener.stop() + + self.keyboard_thread = Thread(target=listen, daemon=True) + self.keyboard_thread.start() + + def send_action(self, action: dict[str, float]) -> None: + """ + Send position commands to leader arm (follow mode). + + Args: + action: Dictionary of motor positions to command + """ + # Store follower position for later use + self.last_follower_pos = np.array([action.get(f"{motor}.pos", 0) for motor in self.bus.motors]) + + if not self.is_intervening: + # Follow mode: enable torque and track follower + if not self.leader_torque_enabled: + self.bus.sync_write("Torque_Enable", 1) + self.leader_torque_enabled = True + + # Send follower positions to leader + goal_pos = {motor: action[f"{motor}.pos"] for motor in self.bus.motors} + self.bus.sync_write("Goal_Position", goal_pos) + + # Track error for automatic intervention detection + current_pos = self.bus.sync_read("Present_Position") + current_array = np.array([current_pos[motor] for motor in self.bus.motors]) + error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1]) + self.leader_tracking_error_queue.append(error) + + def get_action(self) -> dict[str, float]: + """ + Get action from leader arm. + + In follow mode: Returns neutral/current positions + In lead mode: Returns actual leader positions for follower to track + """ + start = time.perf_counter() + + if self.is_intervening: + # Lead mode: disable torque if needed and return leader positions + if self.leader_torque_enabled: + self.bus.sync_write("Torque_Enable", 0) + self.leader_torque_enabled = False + + # Get current leader position + action = self.bus.sync_read("Present_Position") + action = {f"{motor}.pos": val for motor, val in action.items()} + + # Track error + if self.last_follower_pos is not None: + current_array = np.array([action[f"{motor}.pos"] for motor in self.bus.motors]) + error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1]) + self.leader_tracking_error_queue.append(error) + else: + # Follow mode: return current/neutral positions + action = self.bus.sync_read("Present_Position") + action = {f"{motor}.pos": val for motor, val in action.items()} + + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read action: {dt_ms:.1f}ms") + return action + + def get_teleop_events(self) -> dict[TeleopEvents, bool]: + """Get current keyboard events.""" + events = {} + + # Map keyboard events to TeleopEvents + if self.keyboard_events["success"]: + events[TeleopEvents.SUCCESS] = True + self.keyboard_events["success"] = False + if self.keyboard_events["failure"]: + events[TeleopEvents.FAILURE] = True + events[TeleopEvents.TERMINATE_EPISODE] = True + self.keyboard_events["failure"] = False + if self.keyboard_events["rerecord"]: + events[TeleopEvents.RERECORD_EPISODE] = True + events[TeleopEvents.TERMINATE_EPISODE] = True + self.keyboard_events["rerecord"] = False + + # Always report intervention state + events[TeleopEvents.IS_INTERVENTION] = self.is_intervening + + return events + + def disconnect(self) -> None: + """Disconnect and cleanup.""" + self.stop_event.set() + if self.keyboard_thread: + self.keyboard_thread.join(timeout=1.0) + super().disconnect() + + def reset(self) -> None: + """Reset leader-follower state.""" + self.is_intervening = False + self.leader_torque_enabled = True + self.leader_tracking_error_queue.clear() + self.keyboard_events = { + "intervention": False, + "success": False, + "failure": False, + "rerecord": False, + } diff --git a/src/lerobot/teleoperators/utils.py b/src/lerobot/teleoperators/utils.py index db685f396..186e711ed 100644 --- a/src/lerobot/teleoperators/utils.py +++ b/src/lerobot/teleoperators/utils.py @@ -52,9 +52,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator": return SO100Leader(config) elif config.type == "so101_leader": - from .so_leader import SO101Leader + from .so_leader import SO101LeaderFollower - return SO101Leader(config) + if getattr(config, "leader_follower_mode", False): + return SO101LeaderFollower(config) elif config.type == "mock_teleop": from tests.mocks.mock_teleop import MockTeleop