diff --git a/examples/so100_to_so100_EE_deltas/teleoperate.py b/examples/so100_to_so100_EE_deltas/teleoperate.py new file mode 100644 index 000000000..243c58191 --- /dev/null +++ b/examples/so100_to_so100_EE_deltas/teleoperate.py @@ -0,0 +1,365 @@ +# !/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from dataclasses import dataclass + +import numpy as np +import torch + +from lerobot.configs.types import PipelineFeatureType, PolicyFeature +from lerobot.model.kinematics import RobotKinematics +from lerobot.processor import ( + ProcessorStepRegistry, + RobotAction, + RobotActionProcessorStep, + RobotObservation, + RobotProcessorPipeline, + TransitionKey, +) +from lerobot.processor.converters import ( + create_transition, + identity_transition, +) +from lerobot.robots.robot import Robot +from lerobot.robots.so_follower.config_so_follower import SO101FollowerConfig +from lerobot.robots.so_follower.robot_kinematic_processor import ( + EEBoundsAndSafety, + EEReferenceAndDelta, + GripperVelocityToJoint, + InverseKinematicsRLStep, +) +from lerobot.robots.so_follower.so_follower import SO101Follower +from lerobot.teleoperators.so_leader.config_so_leader import SO101LeaderConfig +from lerobot.teleoperators.so_leader.so_leader import SO101Leader +from lerobot.utils.robot_utils import precise_sleep +from lerobot.utils.rotation import Rotation + + +def reset_follower_position(robot_arm: Robot, target_position: np.ndarray) -> None: + """Reset robot arm to target position using smooth trajectory.""" + current_position_dict = robot_arm.bus.sync_read("Present_Position") + current_position = np.array( + [current_position_dict[name] for name in current_position_dict], + dtype=np.float32, + ) + trajectory = torch.from_numpy( + np.linspace(current_position, target_position, 50) + ) # NOTE: 30 is just an arbitrary number + for pose in trajectory: + action_dict = dict(zip(current_position_dict, pose, strict=False)) + robot_arm.bus.sync_write("Goal_Position", action_dict) + precise_sleep(0.015) + + +@dataclass +class LogRobotAction(RobotActionProcessorStep): + def action(self, action: RobotAction) -> RobotAction: + print(f"Robot action: {action}") + return action + + def transform_features(self, features): + # features[PipelineFeatureType.ACTION][ACTION] = PolicyFeature( + # type=FeatureType.ACTION, shape=(len(self.motor_names),) + # ) + return features + + +@ProcessorStepRegistry.register("forward_kinematics_joints_to_ee_target_action") +@dataclass +class ForwardKinematicsJointsToEETargetAction(RobotActionProcessorStep): + """ + Computes the end-effector pose from joint positions using forward kinematics (FK). + + This step is typically used to add the robot's Cartesian pose to the observation space, + which can be useful for visualization or as an input to a policy. + + Attributes: + kinematics: The robot's kinematic model. + """ + + kinematics: RobotKinematics + motor_names: list[str] + end_effector_step_sizes: dict + max_gripper_pos: float + use_ik_solution: bool = False + + def action(self, action: RobotAction) -> RobotAction: + # return compute_forward_kinematics_joints_to_ee(action, self.kinematics, self.motor_names) + teleop_action = action + raw_joint_pos = self.transition.get(TransitionKey.OBSERVATION) + + leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names]) + + leader_ee = self.kinematics.forward_kinematics(leader_pos) + + if self.use_ik_solution and "IK_solution" in self.transition.get(TransitionKey.COMPLEMENTARY_DATA): + follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"] + else: + follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names]) + + follower_ee = self.kinematics.forward_kinematics(follower_pos) + + follower_ee_pos = follower_ee[:3, 3] + follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec() + # follower_gripper_pos = raw_joint_pos["gripper.pos"] + follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor + + leader_ee_pos = leader_ee[:3, 3] + leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec() + leader_gripper_pos = np.clip( + teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos + ) + + print("f pos:", follower_ee_pos) + print("l pos:", leader_ee_pos) + + print("f rvec:", follower_ee_rvec) + print("l rvec:", leader_ee_rvec) + + # follower_ee_pos = follower_ee[:3, 3] + # follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec() + + delta_pos = leader_ee_pos - follower_ee_pos + + # For rotation: compute relative rotation from follower to leader + # R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader + r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3] + delta_rvec = Rotation.from_matrix(r_delta).as_rotvec() + delta_gripper = leader_gripper_pos - follower_gripper_pos + + desired = np.eye(4, dtype=float) + desired[:3, :3] = follower_ee[:3, :3] @ r_delta + desired[:3, 3] = follower_ee[:3, 3] + delta_pos + + pos = desired[:3, 3] + tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec() + + assert np.allclose(pos, leader_ee_pos), "Position delta computation error" + assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error" + assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), ( + "Gripper delta computation error" + ) + + # Normalize the action to the range [-1, 1] + delta_pos = delta_pos / np.array( + [ + self.end_effector_step_sizes["x"], + self.end_effector_step_sizes["y"], + self.end_effector_step_sizes["z"], + ] + ) + delta_rvec = delta_rvec / np.array( + [ + self.end_effector_step_sizes["wx"], + self.end_effector_step_sizes["wy"], + self.end_effector_step_sizes["wz"], + ] + ) + + # Check if any of the normalized deltas exceed 1.0 + + max_normalized_pos = max( + abs(delta_pos[0]), + abs(delta_pos[1]), + abs(delta_pos[2]), + ) + + max_normalized_rot = max( + abs(delta_rvec[0]), + abs(delta_rvec[1]), + abs(delta_rvec[2]), + ) + + # Use the same scaling factor for both position and rotation + max_normalized = max(max_normalized_pos, max_normalized_rot) + if max_normalized > 1.0: + print(f"Warning: EE delta too large, scaling. Max normalized delta: {max_normalized_pos}") + print(f"Original delta_pos: {delta_pos}, delta_rvec: {delta_rvec}") + # Scale proportionally + delta_pos = delta_pos / max_normalized + delta_rvec = delta_rvec / max_normalized + + new_action = {} + new_action["enabled"] = True + new_action["target_x"] = float(delta_pos[0]) + new_action["target_y"] = float(delta_pos[1]) + new_action["target_z"] = float(delta_pos[2]) + new_action["target_wx"] = float(delta_rvec[0]) + new_action["target_wy"] = float(delta_rvec[1]) + new_action["target_wz"] = float(delta_rvec[2]) + new_action["gripper_vel"] = float( + np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) / self.max_gripper_pos + ) + return new_action + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + # TODO: implement feature transformation + return features + + +FPS = 20 + +# Initialize the robot and teleoperator config +follower_config = SO101FollowerConfig(port="/dev/usb_follower_arm_a", id="follower_arm_a", use_degrees=True) +leader_config = SO101LeaderConfig(port="/dev/usb_leader_arm_a", id="leader_arm_a", use_degrees=True) + +# Initialize the robot and teleoperator +follower = SO101Follower(follower_config) +leader = SO101Leader(leader_config) + +# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +follower_kinematics_solver = RobotKinematics( + urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf", + target_frame_name="gripper_frame_link", + joint_names=list(follower.bus.motors.keys()), +) + +# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +leader_kinematics_solver = RobotKinematics( + urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf", + target_frame_name="gripper_frame_link", + joint_names=list(leader.bus.motors.keys()), +) + +end_effector_step_sizes = { + "x": 0.004, + "y": 0.004, + "z": 0.004, + "wx": 5 * np.pi / 180, + "wy": 5 * np.pi / 180, + "wz": 5 * np.pi / 180, +} + + +# Build pipeline to convert teleop joints to EE action +leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction]( + steps=[ + LogRobotAction(), + ForwardKinematicsJointsToEETargetAction( + kinematics=leader_kinematics_solver, + motor_names=list(leader.bus.motors.keys()), + end_effector_step_sizes=end_effector_step_sizes, + max_gripper_pos=30.0, + use_ik_solution=True, + ), + LogRobotAction(), + ], + to_transition=identity_transition, + to_output=identity_transition, +) + +# build pipeline to convert EE action to robot joints +ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( + [ + LogRobotAction(), + EEReferenceAndDelta( + kinematics=follower_kinematics_solver, + # end_effector_step_sizes={"x": 0.006, "y": 0.01, "z": 0.005}, + end_effector_step_sizes=end_effector_step_sizes, + motor_names=list(follower.bus.motors.keys()), + use_latched_reference=False, + use_ik_solution=True, + ), + LogRobotAction(), + EEBoundsAndSafety( + end_effector_bounds={ + "min": [-0.05, -0.55, -0.0075], + "max": [0.55, 0.55, 0.55], + }, + # end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}, + max_ee_step_m=0.05, + ), + LogRobotAction(), + GripperVelocityToJoint( + clip_max=30.0, + speed_factor=0.2, + discrete_gripper=False, + scale_velocity=True, + use_ik_solution=True, + ), + LogRobotAction(), + InverseKinematicsRLStep( + kinematics=follower_kinematics_solver, + motor_names=list(follower.bus.motors.keys()), + initial_guess_current_joints=False, + ), + LogRobotAction(), + ], + to_transition=identity_transition, + to_output=identity_transition, +) + +# Connect to the robot and teleoperator +follower.connect() +leader.connect() + +reset_pose = [0.0, 10, 20, 60.00, 90.00, 10.00] + +start_time = time.perf_counter() +reset_follower_position(follower, np.array(reset_pose)) +reset_follower_position(leader, np.array(reset_pose)) +precise_sleep(5.0 - (time.perf_counter() - start_time)) +# time.sleep(10) +leader.bus.sync_write("Torque_Enable", 0) + +# Init rerun viewer +# init_rerun(session_name="so100_so100_EE_teleop") + +transition = None + +print("Starting teleop loop...") +while True: + print("New loop iteration") + t0 = time.perf_counter() + + # Get robot observation + robot_obs = follower.get_observation() + + # Get teleop observation + leader_joints_obs = leader.get_action() + + # teleop joints -> teleop EE action + if transition is None: + transition = create_transition(action=leader_joints_obs, observation=robot_obs) + else: + transition = create_transition( + action=leader_joints_obs, + observation=robot_obs, + complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA), + ) + + transition = leader_to_ee(transition) + leader_ee_act = transition[TransitionKey.ACTION] + + # teleop EE -> robot joints + transition = create_transition( + action=leader_ee_act, + observation=robot_obs, + complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA), + ) + transition = ee_to_follower_joints(transition) + follower_joints_act = transition[TransitionKey.ACTION] + + # Send action to robot + _ = follower.send_action(follower_joints_act) + + # Visualize + # log_rerun_data(observation=leader_ee_act, action=follower_joints_act) + + precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0)) diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 3688a4b8c..673587360 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -61,6 +61,7 @@ from .hil_processor import ( RewardClassifierProcessorStep, TimeLimitProcessorStep, ) +from .leader_follower_processor import LeaderFollowerProcessor from .newline_task_processor import NewLineTaskProcessorStep from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep, hotswap_stats from .observation_processor import VanillaObservationProcessorStep @@ -122,6 +123,7 @@ __all__ = [ "ImageCropResizeProcessorStep", "InfoProcessorStep", "InterventionActionProcessorStep", + "LeaderFollowerProcessor", "make_default_processors", "make_default_teleop_action_processor", "make_default_robot_action_processor", diff --git a/src/lerobot/processor/hil_processor.py b/src/lerobot/processor/hil_processor.py index c17441c46..7a8b2c867 100644 --- a/src/lerobot/processor/hil_processor.py +++ b/src/lerobot/processor/hil_processor.py @@ -460,6 +460,8 @@ class InterventionActionProcessorStep(ProcessorStep): """ use_gripper: bool = False + use_rotation: bool = False + gripper_neutral_action: float = 1.0 terminate_on_success: bool = True def __call__(self, transition: EnvTransition) -> EnvTransition: @@ -497,8 +499,16 @@ class InterventionActionProcessorStep(ProcessorStep): teleop_action.get("delta_y", 0.0), teleop_action.get("delta_z", 0.0), ] + if self.use_rotation: + action_list.extend( + [ + teleop_action.get("delta_wx", 0.0), + teleop_action.get("delta_wy", 0.0), + teleop_action.get("delta_wz", 0.0), + ] + ) if self.use_gripper: - action_list.append(teleop_action.get(GRIPPER_KEY, 1.0)) + action_list.append(teleop_action.get(GRIPPER_KEY, self.gripper_neutral_action)) elif isinstance(teleop_action, np.ndarray): action_list = teleop_action.tolist() else: @@ -536,6 +546,8 @@ class InterventionActionProcessorStep(ProcessorStep): """ return { "use_gripper": self.use_gripper, + "use_rotation": self.use_rotation, + "gripper_neutral_action": self.gripper_neutral_action, "terminate_on_success": self.terminate_on_success, } diff --git a/src/lerobot/processor/leader_follower_processor.py b/src/lerobot/processor/leader_follower_processor.py new file mode 100644 index 000000000..df2f66235 --- /dev/null +++ b/src/lerobot/processor/leader_follower_processor.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +import numpy as np +import torch + +from lerobot.configs.types import PipelineFeatureType, PolicyFeature +from lerobot.model.kinematics import RobotKinematics +from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey +from lerobot.robots import Robot +from lerobot.teleoperators import Teleoperator +from lerobot.teleoperators.utils import TeleopEvents +from lerobot.utils.rotation import Rotation + +from .pipeline import ProcessorStep + + +@ProcessorStepRegistry.register("leader_follower_processor") +@dataclass +class LeaderFollowerProcessor(ProcessorStep): + """ + Processor for leader-follower teleoperation mode. + + This processor: + 1. Sends follower positions to leader arm when not intervening + 2. Computes EE delta actions from leader when intervening + 3. Handles teleop events from the leader device + """ + + leader_device: Teleoperator + motor_names: list[str] + robot: Robot + kinematics: RobotKinematics + end_effector_step_sizes: np.ndarray | None = None + use_gripper: bool = True + # prev_leader_gripper: float | None = None + max_gripper_pos: float = 100.0 + use_ik_solution: bool = False + + def __call__(self, transition: EnvTransition) -> EnvTransition: + """Process transition with leader-follower logic.""" + # Get current follower position from complementary data + # raw_joint_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get("raw_joint_positions") + raw_joint_pos = transition.get(TransitionKey.OBSERVATION) + if raw_joint_pos is not None: + # Send follower position to leader (for follow mode) + # follower_action = { + # f"{motor}.pos": float(raw_joint_pos[motor]) + # for motor in self.motor_names + # } + self.leader_device.send_action(raw_joint_pos) + + # Only compute EE action if intervention is active + # (AddTeleopEventsAsInfo already added IS_INTERVENTION to info) + info = transition.get(TransitionKey.INFO, {}) + if info.get(TeleopEvents.IS_INTERVENTION, False): + # Get leader joint positions from teleop_action + # (AddTeleopActionAsComplimentaryData already got the action) + complementary = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) + teleop_action = complementary.get("teleop_action", {}) + + if isinstance(teleop_action, dict) and raw_joint_pos is not None: + leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names]) + + leader_ee = self.kinematics.forward_kinematics(leader_pos) + + if self.use_ik_solution and "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA): + follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"] + else: + follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names]) + + follower_ee = self.kinematics.forward_kinematics(follower_pos) + + # follower_gripper_pos = raw_joint_pos["gripper.pos"] + follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor + + leader_ee_pos = leader_ee[:3, 3] + leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec() + leader_gripper_pos = np.clip( + teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos + ) + + follower_ee_pos = follower_ee[:3, 3] + # follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec() + + delta_pos = leader_ee_pos - follower_ee_pos + + # For rotation: compute relative rotation from follower to leader + # R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader + r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3] + delta_rvec = Rotation.from_matrix(r_delta).as_rotvec() + + delta_gripper = leader_gripper_pos - follower_gripper_pos + + desired = np.eye(4, dtype=float) + desired[:3, :3] = follower_ee[:3, :3] @ r_delta + desired[:3, 3] = follower_ee[:3, 3] + delta_pos + + pos = desired[:3, 3] + tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec() + + assert np.allclose(pos, leader_ee_pos), "Position delta computation error" + assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error" + assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), ( + "Gripper delta computation error" + ) + + # Normalize the action to the range [-1, 1] + delta_pos = delta_pos / np.array( + [ + self.end_effector_step_sizes["x"], + self.end_effector_step_sizes["y"], + self.end_effector_step_sizes["z"], + ] + ) + delta_rvec = delta_rvec / np.array( + [ + self.end_effector_step_sizes["wx"], + self.end_effector_step_sizes["wy"], + self.end_effector_step_sizes["wz"], + ] + ) + max_normalized_pos = max( + abs(delta_pos[0]), + abs(delta_pos[1]), + abs(delta_pos[2]), + ) + + normalized_rot = max(abs(delta_rvec[0]), abs(delta_rvec[1]), abs(delta_rvec[2])) + + max_normalized = max(max_normalized_pos, normalized_rot) + + if max_normalized > 1.0: + # Scale proportionally + delta_pos = delta_pos / max_normalized + delta_rvec = delta_rvec / max_normalized + + intervention_action = np.array( + [ + delta_pos[0], + delta_pos[1], + delta_pos[2], + delta_rvec[0], + delta_rvec[1], + delta_rvec[2], + np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) + / self.max_gripper_pos, + ], + dtype=float, + ) + + # # Extract leader positions from teleop action dict + # # leader_pos = np.array([teleop_action.get(f"{motor}.pos", 0) for motor in self.motor_names]) + # # follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names]) + + # teleop_action = self.leader_device.bus.sync_read("Present_Position") + # raw_joint_pos = self.robot.bus.sync_read("Present_Position") + # leader_pos = np.array([teleop_action.get(f"{motor}", 0) for motor in self.motor_names]) + # follower_pos = np.array([raw_joint_pos[f"{motor}"] for motor in self.motor_names]) + + # # Compute EE positions + # leader_ee_fi = self.kinematics.forward_kinematics(leader_pos) + # leader_ee_pos = leader_ee_fi[:3, 3] + # # leader_ee_rot = Rotation.from_matrix(leader_ee_fi[:3, :3]).as_rotvec() + # leader_ee = np.concat([leader_ee_pos, [0,0,0]]) + + # if "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA): + # follower_ee = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"] + # else: + # follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names]) + # follower_ee_fi = self.kinematics.forward_kinematics(follower_pos) + # follower_ee_pos = follower_ee_fi[:3, 3] + # # follower_ee_rot = Rotation.from_matrix(follower_ee_fi[:3, :3]).as_rotvec() + # follower_ee = np.concat([follower_ee_pos, [0,0,0]]) + + # # Compute normalized EE delta + # if self.end_effector_step_sizes is not None: + # ee_delta = np.clip( + # leader_ee - follower_ee, + # -self.end_effector_step_sizes, + # self.end_effector_step_sizes + # ) + # ee_delta_normalized = ee_delta / self.end_effector_step_sizes + # else: + # ee_delta_normalized = leader_ee - follower_ee + + # # Handle gripper + # if self.use_gripper and len(leader_pos) > 3: + # if self.prev_leader_gripper is None: + # self.prev_leader_gripper = np.clip( + # leader_pos[-1], 0, self.max_gripper_pos + # ) + + # leader_gripper = leader_pos[-1] + # gripper_delta = leader_gripper - self.prev_leader_gripper + # normalized_delta = gripper_delta / self.max_gripper_pos + + # # Quantize gripper action + # if normalized_delta >= 0.3: + # gripper_action = 2 + # elif normalized_delta <= -0.1: + # gripper_action = 0 + # else: + # gripper_action = 1 + + # self.prev_leader_gripper = leader_gripper + + # # Create intervention action + # intervention_action = np.append(ee_delta_normalized, gripper_action) + # else: + # intervention_action = ee_delta_normalized + + # # Override teleop_action with computed EE action + complementary["teleop_action"] = torch.from_numpy(intervention_action).float() + transition[TransitionKey.COMPLEMENTARY_DATA] = complementary # type: ignore[misc] + + return transition + + def reset(self) -> None: + """Reset leader-follower state.""" + # self.prev_leader_gripper = None + if hasattr(self.leader_device, "reset"): + self.leader_device.reset() + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + return features diff --git a/src/lerobot/rl/gym_manipulator.py b/src/lerobot/rl/gym_manipulator.py index bcf937e46..c7c4eb6e4 100644 --- a/src/lerobot/rl/gym_manipulator.py +++ b/src/lerobot/rl/gym_manipulator.py @@ -39,6 +39,7 @@ from lerobot.processor import ( GymHILAdapterProcessorStep, ImageCropResizeProcessorStep, InterventionActionProcessorStep, + LeaderFollowerProcessor, MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep, Numpy2TorchActionProcessorStep, @@ -71,6 +72,7 @@ from lerobot.teleoperators import ( make_teleoperator_from_config, so_leader, # noqa: F401 ) +from lerobot.teleoperators.so_leader import SO101LeaderFollower from lerobot.teleoperators.teleoperator import Teleoperator from lerobot.teleoperators.utils import TeleopEvents from lerobot.utils.constants import ACTION, DONE, OBS_IMAGES, OBS_STATE, REWARD @@ -481,14 +483,50 @@ def make_processors( env_pipeline_steps.append(AddBatchDimensionProcessorStep()) env_pipeline_steps.append(DeviceProcessorStep(device=device)) - action_pipeline_steps = [ + # Get control mode (gamepad / keyboard / leader -- see PR #2596) + control_mode = cfg.processor.control_mode if cfg.processor is not None else "gamepad" + + action_pipeline_steps: list = [ AddTeleopActionAsComplimentaryDataStep(teleop_device=teleop_device), AddTeleopEventsAsInfoStep(teleop_device=teleop_device), + ] + + # Leader-follower control mode: leader haptically tracks follower until the + # human toggles intervention with SPACE, at which point leader joints are + # converted into 7-D EE deltas (dx, dy, dz, dwx, dwy, dwz, gripper) by + # ``LeaderFollowerProcessor`` before being consumed by + # ``InterventionActionProcessorStep``. + if control_mode == "leader": + if not isinstance(teleop_device, SO101LeaderFollower): + raise ValueError( + "Leader control mode requires SO101LeaderFollower teleop device. " + "Set `--teleop.type=so101_leader --teleop.leader_follower_mode=true`." + ) + if cfg.processor.inverse_kinematics is None or kinematics_solver is None: + raise ValueError( + "Leader control mode requires `cfg.processor.inverse_kinematics` and a kinematics solver." + ) + action_pipeline_steps.append( + LeaderFollowerProcessor( + leader_device=teleop_device, + motor_names=motor_names, + robot=env.robot, + kinematics=kinematics_solver, + end_effector_step_sizes=cfg.processor.inverse_kinematics.end_effector_step_sizes, + use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False, + max_gripper_pos=cfg.processor.max_gripper_pos + if cfg.processor.max_gripper_pos is not None + else 100.0, + ) + ) + + action_pipeline_steps.append( InterventionActionProcessorStep( use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False, + use_rotation=(control_mode == "leader"), terminate_on_success=terminate_on_success, - ), - ] + ) + ) # Replace InverseKinematicsProcessor with new kinematic processors if cfg.processor.inverse_kinematics is not None and kinematics_solver is not None: diff --git a/src/lerobot/teleoperators/so_leader/__init__.py b/src/lerobot/teleoperators/so_leader/__init__.py index 26ef66677..c1fd050d7 100644 --- a/src/lerobot/teleoperators/so_leader/__init__.py +++ b/src/lerobot/teleoperators/so_leader/__init__.py @@ -20,6 +20,7 @@ from .config_so_leader import ( SOLeaderConfig, SOLeaderTeleopConfig, ) +from .so101_leader_follower import SO101LeaderFollower from .so_leader import SO100Leader, SO101Leader, SOLeader __all__ = [ @@ -27,6 +28,7 @@ __all__ = [ "SO100LeaderConfig", "SO101Leader", "SO101LeaderConfig", + "SO101LeaderFollower", "SOLeader", "SOLeaderConfig", "SOLeaderTeleopConfig", diff --git a/src/lerobot/teleoperators/so_leader/config_so_leader.py b/src/lerobot/teleoperators/so_leader/config_so_leader.py index 189303088..10ba89f52 100644 --- a/src/lerobot/teleoperators/so_leader/config_so_leader.py +++ b/src/lerobot/teleoperators/so_leader/config_so_leader.py @@ -29,6 +29,14 @@ class SOLeaderConfig: # Whether to use degrees for angles use_degrees: bool = True + # Enable leader-follower mode where leader can both lead and follow. + # When True, ``make_teleoperator_from_config`` returns ``SO101LeaderFollower`` + # instead of the bare ``SOLeader`` -- see PR #2596. + leader_follower_mode: bool = False + + # Whether to include the gripper in the leader-follower action vector. + use_gripper: bool = True + @TeleoperatorConfig.register_subclass("so101_leader") @TeleoperatorConfig.register_subclass("so100_leader") diff --git a/src/lerobot/teleoperators/so_leader/so101_leader_follower.py b/src/lerobot/teleoperators/so_leader/so101_leader_follower.py new file mode 100644 index 000000000..d96f80062 --- /dev/null +++ b/src/lerobot/teleoperators/so_leader/so101_leader_follower.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import sys +import time +from collections import deque +from threading import Event, Thread + +import numpy as np + +from lerobot.teleoperators.so_leader.so_leader import SO101Leader +from lerobot.teleoperators.utils import TeleopEvents + +PYNPUT_AVAILABLE = True +try: + if ("DISPLAY" not in os.environ) and ("linux" in sys.platform): + logging.info("No DISPLAY set. Skipping pynput import.") + raise ImportError("pynput blocked intentionally due to no display.") + + from pynput import keyboard +except ImportError: + keyboard = None + PYNPUT_AVAILABLE = False +except Exception as e: + keyboard = None + PYNPUT_AVAILABLE = False + logging.info(f"Could not import pynput: {e}") + +logger = logging.getLogger(__name__) + + +class SO101LeaderFollower(SO101Leader): + """ + Extended SO101 Leader that can both lead (human control) and follow (mimic follower). + + This class adds leader-follower functionality where: + - In follow mode: The leader arm mimics the follower's position (torque enabled) + - In lead mode: Human controls the leader (torque disabled) and provides actions + """ + + def __init__(self, config): + super().__init__(config) + + # Leader-follower state + self.is_intervening = False + self.leader_torque_enabled = True + + # Tracking error for automatic intervention detection + self.leader_tracking_error_queue = deque(maxlen=4) + + # Keyboard event handling + self.keyboard_events = { + "intervention": False, + "success": False, + "failure": False, + "rerecord": False, + } + self.keyboard_thread = None + self.stop_event = Event() + + # Store last follower position for action computation + self.last_follower_pos = None + + @property + def action_features(self) -> dict: + if self.config.use_gripper: + return { + "dtype": "float32", + "shape": (7,), + "names": { + "delta_x": 0, + "delta_y": 1, + "delta_z": 2, + "delta_wx": 3, + "delta_wy": 4, + "delta_wz": 5, + "gripper": 6, + }, + } + else: + return { + "dtype": "float32", + "shape": (6,), + "names": { + "delta_x": 0, + "delta_y": 1, + "delta_z": 2, + "delta_wx": 3, + "delta_wy": 4, + "delta_wz": 5, + }, + } + + def connect(self, calibrate: bool = True) -> None: + """Connect and configure for leader-follower mode.""" + super().connect(calibrate) + + # Configure for leader-follower mode with lower gains + # Lower gains allow manual intervention without injury risk + # self.bus.sync_write("Torque_Enable", 1) + for motor in self.bus.motors: + self.bus.write("P_Coefficient", motor, 16) + self.bus.write("I_Coefficient", motor, 0) + self.bus.write("D_Coefficient", motor, 16) + + # Start keyboard listener + self._start_keyboard_listener() + + print("- Leader-Follower Mode:") + print(" - Press SPACE to toggle intervention (leader control)") + print(" - When not intervening, leader follows follower position") + print(" - When intervening, follower follows leader in end-effector space") + print(" - Press 's' to mark episode as success") + print(" - Press ESC to end episode as failure") + print(" - Press 'r' to re-record episode") + + def _start_keyboard_listener(self): + """Start keyboard listener thread for intervention control.""" + + def on_press(key): + try: + if key == keyboard.Key.space: + self.keyboard_events["intervention"] = not self.keyboard_events["intervention"] + self.is_intervening = self.keyboard_events["intervention"] + state = "INTERVENTION MODE" if self.is_intervening else "FOLLOWING MODE" + logger.info(f"Toggled to {state}") + elif key == keyboard.Key.esc: + self.keyboard_events["failure"] = True + elif hasattr(key, "char"): + if key.char == "s": + self.keyboard_events["success"] = True + elif key.char == "r": + self.keyboard_events["rerecord"] = True + except Exception as e: + logger.error(f"Error handling key press: {e}") + + def listen(): + with keyboard.Listener(on_press=on_press) as listener: + while not self.stop_event.is_set(): + time.sleep(0.1) + listener.stop() + + self.keyboard_thread = Thread(target=listen, daemon=True) + self.keyboard_thread.start() + + def send_action(self, action: dict[str, float]) -> None: + """ + Send position commands to leader arm (follow mode). + + Args: + action: Dictionary of motor positions to command + """ + # Store follower position for later use + self.last_follower_pos = np.array([action.get(f"{motor}.pos", 0) for motor in self.bus.motors]) + + if not self.is_intervening: + # Follow mode: enable torque and track follower + if not self.leader_torque_enabled: + self.bus.sync_write("Torque_Enable", 1) + self.leader_torque_enabled = True + + # Send follower positions to leader + goal_pos = {motor: action[f"{motor}.pos"] for motor in self.bus.motors} + self.bus.sync_write("Goal_Position", goal_pos) + + # Track error for automatic intervention detection + current_pos = self.bus.sync_read("Present_Position") + current_array = np.array([current_pos[motor] for motor in self.bus.motors]) + error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1]) + self.leader_tracking_error_queue.append(error) + + def get_action(self) -> dict[str, float]: + """ + Get action from leader arm. + + In follow mode: Returns neutral/current positions + In lead mode: Returns actual leader positions for follower to track + """ + start = time.perf_counter() + + if self.is_intervening: + # Lead mode: disable torque if needed and return leader positions + if self.leader_torque_enabled: + self.bus.sync_write("Torque_Enable", 0) + self.leader_torque_enabled = False + + # Get current leader position + action = self.bus.sync_read("Present_Position") + action = {f"{motor}.pos": val for motor, val in action.items()} + + # Track error + if self.last_follower_pos is not None: + current_array = np.array([action[f"{motor}.pos"] for motor in self.bus.motors]) + error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1]) + self.leader_tracking_error_queue.append(error) + else: + # Follow mode: return current/neutral positions + action = self.bus.sync_read("Present_Position") + action = {f"{motor}.pos": val for motor, val in action.items()} + + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read action: {dt_ms:.1f}ms") + return action + + def get_teleop_events(self) -> dict[TeleopEvents, bool]: + """Get current keyboard events.""" + events = {} + + # Map keyboard events to TeleopEvents + if self.keyboard_events["success"]: + events[TeleopEvents.SUCCESS] = True + self.keyboard_events["success"] = False + if self.keyboard_events["failure"]: + events[TeleopEvents.FAILURE] = True + events[TeleopEvents.TERMINATE_EPISODE] = True + self.keyboard_events["failure"] = False + if self.keyboard_events["rerecord"]: + events[TeleopEvents.RERECORD_EPISODE] = True + events[TeleopEvents.TERMINATE_EPISODE] = True + self.keyboard_events["rerecord"] = False + + # Always report intervention state + events[TeleopEvents.IS_INTERVENTION] = self.is_intervening + + return events + + def disconnect(self) -> None: + """Disconnect and cleanup.""" + self.stop_event.set() + if self.keyboard_thread: + self.keyboard_thread.join(timeout=1.0) + super().disconnect() + + def reset(self) -> None: + """Reset leader-follower state.""" + self.is_intervening = False + self.leader_torque_enabled = True + self.leader_tracking_error_queue.clear() + self.keyboard_events = { + "intervention": False, + "success": False, + "failure": False, + "rerecord": False, + } diff --git a/src/lerobot/teleoperators/utils.py b/src/lerobot/teleoperators/utils.py index db685f396..6a4421f16 100644 --- a/src/lerobot/teleoperators/utils.py +++ b/src/lerobot/teleoperators/utils.py @@ -52,7 +52,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator": return SO100Leader(config) elif config.type == "so101_leader": - from .so_leader import SO101Leader + from .so_leader import SO101Leader, SO101LeaderFollower + + if getattr(config, "leader_follower_mode", False): + return SO101LeaderFollower(config) return SO101Leader(config) elif config.type == "mock_teleop":