add so leader arm

This commit is contained in:
Khalil Meftah
2026-04-28 16:53:36 +02:00
parent 6ed80f5a59
commit fe2c32d9e7
11 changed files with 999 additions and 17 deletions
@@ -0,0 +1,365 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from dataclasses import dataclass
import numpy as np
import torch
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import (
ProcessorStepRegistry,
RobotAction,
RobotActionProcessorStep,
RobotObservation,
RobotProcessorPipeline,
TransitionKey,
)
from lerobot.processor.converters import (
create_transition,
identity_transition,
)
from lerobot.robots.robot import Robot
from lerobot.robots.so100_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
EEReferenceAndDelta,
GripperVelocityToJoint,
InverseKinematicsRLStep,
)
from lerobot.robots.so101_follower.config_so101_follower import SO101FollowerConfig
from lerobot.robots.so101_follower.so101_follower import SO101Follower
from lerobot.teleoperators.so101_leader.config_so101_leader import SO101LeaderConfig
from lerobot.teleoperators.so101_leader.so101_leader import SO101Leader
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.rotation import Rotation
def reset_follower_position(robot_arm: Robot, target_position: np.ndarray) -> None:
"""Reset robot arm to target position using smooth trajectory."""
current_position_dict = robot_arm.bus.sync_read("Present_Position")
current_position = np.array(
[current_position_dict[name] for name in current_position_dict],
dtype=np.float32,
)
trajectory = torch.from_numpy(
np.linspace(current_position, target_position, 50)
) # NOTE: 30 is just an arbitrary number
for pose in trajectory:
action_dict = dict(zip(current_position_dict, pose, strict=False))
robot_arm.bus.sync_write("Goal_Position", action_dict)
precise_sleep(0.015)
@dataclass
class LogRobotAction(RobotActionProcessorStep):
def action(self, action: RobotAction) -> RobotAction:
print(f"Robot action: {action}")
return action
def transform_features(self, features):
# features[PipelineFeatureType.ACTION][ACTION] = PolicyFeature(
# type=FeatureType.ACTION, shape=(len(self.motor_names),)
# )
return features
@ProcessorStepRegistry.register("forward_kinematics_joints_to_ee_target_action")
@dataclass
class ForwardKinematicsJointsToEETargetAction(RobotActionProcessorStep):
"""
Computes the end-effector pose from joint positions using forward kinematics (FK).
This step is typically used to add the robot's Cartesian pose to the observation space,
which can be useful for visualization or as an input to a policy.
Attributes:
kinematics: The robot's kinematic model.
"""
kinematics: RobotKinematics
motor_names: list[str]
end_effector_step_sizes: dict
max_gripper_pos: float
use_ik_solution: bool = False
def action(self, action: RobotAction) -> RobotAction:
# return compute_forward_kinematics_joints_to_ee(action, self.kinematics, self.motor_names)
teleop_action = action
raw_joint_pos = self.transition.get(TransitionKey.OBSERVATION)
leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names])
leader_ee = self.kinematics.forward_kinematics(leader_pos)
if self.use_ik_solution and "IK_solution" in self.transition.get(TransitionKey.COMPLEMENTARY_DATA):
follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
else:
follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
follower_ee = self.kinematics.forward_kinematics(follower_pos)
follower_ee_pos = follower_ee[:3, 3]
follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec()
# follower_gripper_pos = raw_joint_pos["gripper.pos"]
follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor
leader_ee_pos = leader_ee[:3, 3]
leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec()
leader_gripper_pos = np.clip(
teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos
)
print("f pos:", follower_ee_pos)
print("l pos:", leader_ee_pos)
print("f rvec:", follower_ee_rvec)
print("l rvec:", leader_ee_rvec)
# follower_ee_pos = follower_ee[:3, 3]
# follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec()
delta_pos = leader_ee_pos - follower_ee_pos
# For rotation: compute relative rotation from follower to leader
# R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader
r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3]
delta_rvec = Rotation.from_matrix(r_delta).as_rotvec()
delta_gripper = leader_gripper_pos - follower_gripper_pos
desired = np.eye(4, dtype=float)
desired[:3, :3] = follower_ee[:3, :3] @ r_delta
desired[:3, 3] = follower_ee[:3, 3] + delta_pos
pos = desired[:3, 3]
tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec()
assert np.allclose(pos, leader_ee_pos), "Position delta computation error"
assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error"
assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), (
"Gripper delta computation error"
)
# Normalize the action to the range [-1, 1]
delta_pos = delta_pos / np.array(
[
self.end_effector_step_sizes["x"],
self.end_effector_step_sizes["y"],
self.end_effector_step_sizes["z"],
]
)
delta_rvec = delta_rvec / np.array(
[
self.end_effector_step_sizes["wx"],
self.end_effector_step_sizes["wy"],
self.end_effector_step_sizes["wz"],
]
)
# Check if any of the normalized deltas exceed 1.0
max_normalized_pos = max(
abs(delta_pos[0]),
abs(delta_pos[1]),
abs(delta_pos[2]),
)
max_normalized_rot = max(
abs(delta_rvec[0]),
abs(delta_rvec[1]),
abs(delta_rvec[2]),
)
# Use the same scaling factor for both position and rotation
max_normalized = max(max_normalized_pos, max_normalized_rot)
if max_normalized > 1.0:
print(f"Warning: EE delta too large, scaling. Max normalized delta: {max_normalized_pos}")
print(f"Original delta_pos: {delta_pos}, delta_rvec: {delta_rvec}")
# Scale proportionally
delta_pos = delta_pos / max_normalized
delta_rvec = delta_rvec / max_normalized
new_action = {}
new_action["enabled"] = True
new_action["target_x"] = float(delta_pos[0])
new_action["target_y"] = float(delta_pos[1])
new_action["target_z"] = float(delta_pos[2])
new_action["target_wx"] = float(delta_rvec[0])
new_action["target_wy"] = float(delta_rvec[1])
new_action["target_wz"] = float(delta_rvec[2])
new_action["gripper_vel"] = float(
np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) / self.max_gripper_pos
)
return new_action
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
# TODO: implement feature transformation
return features
FPS = 20
# Initialize the robot and teleoperator config
follower_config = SO101FollowerConfig(port="/dev/usb_follower_arm_a", id="follower_arm_a", use_degrees=True)
leader_config = SO101LeaderConfig(port="/dev/usb_leader_arm_a", id="leader_arm_a", use_degrees=True)
# Initialize the robot and teleoperator
follower = SO101Follower(follower_config)
leader = SO101Leader(leader_config)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
follower_kinematics_solver = RobotKinematics(
urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(follower.bus.motors.keys()),
)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
leader_kinematics_solver = RobotKinematics(
urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(leader.bus.motors.keys()),
)
end_effector_step_sizes = {
"x": 0.004,
"y": 0.004,
"z": 0.004,
"wx": 5 * np.pi / 180,
"wy": 5 * np.pi / 180,
"wz": 5 * np.pi / 180,
}
# Build pipeline to convert teleop joints to EE action
leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction](
steps=[
LogRobotAction(),
ForwardKinematicsJointsToEETargetAction(
kinematics=leader_kinematics_solver,
motor_names=list(leader.bus.motors.keys()),
end_effector_step_sizes=end_effector_step_sizes,
max_gripper_pos=30.0,
use_ik_solution=True,
),
LogRobotAction(),
],
to_transition=identity_transition,
to_output=identity_transition,
)
# build pipeline to convert EE action to robot joints
ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
[
LogRobotAction(),
EEReferenceAndDelta(
kinematics=follower_kinematics_solver,
# end_effector_step_sizes={"x": 0.006, "y": 0.01, "z": 0.005},
end_effector_step_sizes=end_effector_step_sizes,
motor_names=list(follower.bus.motors.keys()),
use_latched_reference=False,
use_ik_solution=True,
),
LogRobotAction(),
EEBoundsAndSafety(
end_effector_bounds={
"min": [-0.05, -0.55, -0.0075],
"max": [0.55, 0.55, 0.55],
},
# end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.05,
),
LogRobotAction(),
GripperVelocityToJoint(
clip_max=30.0,
speed_factor=0.2,
discrete_gripper=False,
scale_velocity=True,
use_ik_solution=True,
),
LogRobotAction(),
InverseKinematicsRLStep(
kinematics=follower_kinematics_solver,
motor_names=list(follower.bus.motors.keys()),
initial_guess_current_joints=False,
),
LogRobotAction(),
],
to_transition=identity_transition,
to_output=identity_transition,
)
# Connect to the robot and teleoperator
follower.connect()
leader.connect()
reset_pose = [0.0, 10, 20, 60.00, 90.00, 10.00]
start_time = time.perf_counter()
reset_follower_position(follower, np.array(reset_pose))
reset_follower_position(leader, np.array(reset_pose))
precise_sleep(5.0 - (time.perf_counter() - start_time))
# time.sleep(10)
leader.bus.sync_write("Torque_Enable", 0)
# Init rerun viewer
# init_rerun(session_name="so100_so100_EE_teleop")
transition = None
print("Starting teleop loop...")
while True:
print("New loop iteration")
t0 = time.perf_counter()
# Get robot observation
robot_obs = follower.get_observation()
# Get teleop observation
leader_joints_obs = leader.get_action()
# teleop joints -> teleop EE action
if transition is None:
transition = create_transition(action=leader_joints_obs, observation=robot_obs)
else:
transition = create_transition(
action=leader_joints_obs,
observation=robot_obs,
complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA),
)
transition = leader_to_ee(transition)
leader_ee_act = transition[TransitionKey.ACTION]
# teleop EE -> robot joints
transition = create_transition(
action=leader_ee_act,
observation=robot_obs,
complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA),
)
transition = ee_to_follower_joints(transition)
follower_joints_act = transition[TransitionKey.ACTION]
# Send action to robot
_ = follower.send_action(follower_joints_act)
# Visualize
# log_rerun_data(observation=leader_ee_act, action=follower_joints_act)
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
+1
View File
@@ -299,6 +299,7 @@ class HILSerlProcessorConfig:
inverse_kinematics: InverseKinematicsConfig | None = None inverse_kinematics: InverseKinematicsConfig | None = None
reward_classifier: RewardClassifierConfig | None = None reward_classifier: RewardClassifierConfig | None = None
max_gripper_pos: float | None = 100.0 max_gripper_pos: float | None = 100.0
gripper_speed_factor: float | None = None
@EnvConfig.register_subclass(name="gym_manipulator") @EnvConfig.register_subclass(name="gym_manipulator")
+2
View File
@@ -61,6 +61,7 @@ from .hil_processor import (
RewardClassifierProcessorStep, RewardClassifierProcessorStep,
TimeLimitProcessorStep, TimeLimitProcessorStep,
) )
from .leader_follower_processor import LeaderFollowerProcessor
from .newline_task_processor import NewLineTaskProcessorStep from .newline_task_processor import NewLineTaskProcessorStep
from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep, hotswap_stats from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep, hotswap_stats
from .observation_processor import VanillaObservationProcessorStep from .observation_processor import VanillaObservationProcessorStep
@@ -122,6 +123,7 @@ __all__ = [
"ImageCropResizeProcessorStep", "ImageCropResizeProcessorStep",
"InfoProcessorStep", "InfoProcessorStep",
"InterventionActionProcessorStep", "InterventionActionProcessorStep",
"LeaderFollowerProcessor",
"make_default_processors", "make_default_processors",
"make_default_teleop_action_processor", "make_default_teleop_action_processor",
"make_default_robot_action_processor", "make_default_robot_action_processor",
@@ -38,6 +38,7 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep):
""" """
use_gripper: bool = True use_gripper: bool = True
use_rotation: bool = False
def action(self, action: PolicyAction) -> RobotAction: def action(self, action: PolicyAction) -> RobotAction:
if not isinstance(action, PolicyAction): if not isinstance(action, PolicyAction):
@@ -52,7 +53,13 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep):
"delta_y": action[1].item(), "delta_y": action[1].item(),
"delta_z": action[2].item(), "delta_z": action[2].item(),
} }
if self.use_gripper: if self.use_rotation:
delta_action["delta_wx"] = action[3].item()
delta_action["delta_wy"] = action[4].item()
delta_action["delta_wz"] = action[5].item()
if self.use_gripper:
delta_action["gripper"] = action[6].item()
elif self.use_gripper:
delta_action["gripper"] = action[3].item() delta_action["gripper"] = action[3].item()
return delta_action return delta_action
@@ -64,6 +71,12 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep):
type=FeatureType.ACTION, shape=(1,) type=FeatureType.ACTION, shape=(1,)
) )
if self.use_rotation:
for axis in ["wx", "wy", "wz"]:
features[PipelineFeatureType.ACTION][f"delta_{axis}"] = PolicyFeature(
type=FeatureType.ACTION, shape=(1,)
)
if self.use_gripper: if self.use_gripper:
features[PipelineFeatureType.ACTION]["gripper"] = PolicyFeature( features[PipelineFeatureType.ACTION]["gripper"] = PolicyFeature(
type=FeatureType.ACTION, shape=(1,) type=FeatureType.ACTION, shape=(1,)
@@ -90,6 +103,8 @@ class MapDeltaActionToRobotActionStep(RobotActionProcessorStep):
# Scale factors for delta movements # Scale factors for delta movements
position_scale: float = 1.0 position_scale: float = 1.0
noise_threshold: float = 1e-3 # 1 mm threshold to filter out noise noise_threshold: float = 1e-3 # 1 mm threshold to filter out noise
use_rotation: bool = False
rotation_scale: float = 1.0
def action(self, action: RobotAction) -> RobotAction: def action(self, action: RobotAction) -> RobotAction:
# NOTE (maractingi): Action can be a dict from the teleop_devices or a tensor from the policy # NOTE (maractingi): Action can be a dict from the teleop_devices or a tensor from the policy
@@ -97,23 +112,34 @@ class MapDeltaActionToRobotActionStep(RobotActionProcessorStep):
delta_x = action.pop("delta_x") delta_x = action.pop("delta_x")
delta_y = action.pop("delta_y") delta_y = action.pop("delta_y")
delta_z = action.pop("delta_z") delta_z = action.pop("delta_z")
if self.use_rotation:
delta_wx = action.pop("delta_wx")
delta_wy = action.pop("delta_wy")
delta_wz = action.pop("delta_wz")
else:
delta_wx = 0.0
delta_wy = 0.0
delta_wz = 0.0
gripper = action.pop("gripper") gripper = action.pop("gripper")
# Determine if the teleoperator is actively providing input # Determine if the teleoperator is actively providing input
# Consider enabled if any significant movement delta is detected # Consider enabled if any significant movement delta is detected
position_magnitude = (delta_x**2 + delta_y**2 + delta_z**2) ** 0.5 # Use Euclidean norm for position position_magnitude = (delta_x**2 + delta_y**2 + delta_z**2) ** 0.5 # Use Euclidean norm for position
enabled = position_magnitude > self.noise_threshold # Small threshold to avoid noise rotation_magnitude = (
delta_wx**2 + delta_wy**2 + delta_wz**2
) ** 0.5 # TODO use proper magnitud for rotation
enabled = (
position_magnitude > self.noise_threshold or rotation_magnitude > self.noise_threshold
) # Small threshold to avoid noise
# Scale the deltas appropriately # Scale the deltas appropriately
scaled_delta_x = delta_x * self.position_scale scaled_delta_x = delta_x * self.position_scale
scaled_delta_y = delta_y * self.position_scale scaled_delta_y = delta_y * self.position_scale
scaled_delta_z = delta_z * self.position_scale scaled_delta_z = delta_z * self.position_scale
# For gamepad/keyboard, we don't have rotation input, so set to 0 target_wx = delta_wx * self.rotation_scale
# These could be extended in the future for more sophisticated teleoperators target_wy = delta_wy * self.rotation_scale
target_wx = 0.0 target_wz = delta_wz * self.rotation_scale
target_wy = 0.0
target_wz = 0.0
# Update action with robot target format # Update action with robot target format
action = { action = {
@@ -132,9 +158,15 @@ class MapDeltaActionToRobotActionStep(RobotActionProcessorStep):
def transform_features( def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
for axis in ["x", "y", "z", "gripper"]: for axis in ["x", "y", "z"]:
features[PipelineFeatureType.ACTION].pop(f"delta_{axis}", None) features[PipelineFeatureType.ACTION].pop(f"delta_{axis}", None)
if self.use_rotation:
for axis in ["wx", "wy", "wz"]:
features[PipelineFeatureType.ACTION].pop(f"delta_{axis}", None)
features[PipelineFeatureType.ACTION].pop("delta_gripper", None)
for feat in ["enabled", "target_x", "target_y", "target_z", "target_wx", "target_wy", "target_wz"]: for feat in ["enabled", "target_x", "target_y", "target_z", "target_wx", "target_wy", "target_wz"]:
features[PipelineFeatureType.ACTION][f"{feat}"] = PolicyFeature( features[PipelineFeatureType.ACTION][f"{feat}"] = PolicyFeature(
type=FeatureType.ACTION, shape=(1,) type=FeatureType.ACTION, shape=(1,)
+9
View File
@@ -461,6 +461,7 @@ class InterventionActionProcessorStep(ProcessorStep):
use_gripper: bool = False use_gripper: bool = False
terminate_on_success: bool = True terminate_on_success: bool = True
use_rotation: bool = False
def __call__(self, transition: EnvTransition) -> EnvTransition: def __call__(self, transition: EnvTransition) -> EnvTransition:
""" """
@@ -497,6 +498,14 @@ class InterventionActionProcessorStep(ProcessorStep):
teleop_action.get("delta_y", 0.0), teleop_action.get("delta_y", 0.0),
teleop_action.get("delta_z", 0.0), teleop_action.get("delta_z", 0.0),
] ]
if self.use_rotation:
action_list.extend(
[
teleop_action.get("delta_wx", 0.0),
teleop_action.get("delta_wy", 0.0),
teleop_action.get("delta_wz", 0.0),
]
)
if self.use_gripper: if self.use_gripper:
action_list.append(teleop_action.get(GRIPPER_KEY, 1.0)) action_list.append(teleop_action.get(GRIPPER_KEY, 1.0))
elif isinstance(teleop_action, np.ndarray): elif isinstance(teleop_action, np.ndarray):
@@ -0,0 +1,243 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
import numpy as np
import torch
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
from lerobot.robots import Robot
from lerobot.teleoperators import Teleoperator
from lerobot.teleoperators.utils import TeleopEvents
from lerobot.utils.rotation import Rotation
from .pipeline import ProcessorStep
@ProcessorStepRegistry.register("leader_follower_processor")
@dataclass
class LeaderFollowerProcessor(ProcessorStep):
"""
Processor for leader-follower teleoperation mode.
This processor:
1. Sends follower positions to leader arm when not intervening
2. Computes EE delta actions from leader when intervening
3. Handles teleop events from the leader device
"""
leader_device: Teleoperator
motor_names: list[str]
robot: Robot
kinematics: RobotKinematics
end_effector_step_sizes: np.ndarray | None = None
use_gripper: bool = True
# prev_leader_gripper: float | None = None
max_gripper_pos: float = 100.0
use_ik_solution: bool = False
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Process transition with leader-follower logic."""
# Get current follower position from complementary data
# raw_joint_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get("raw_joint_positions")
raw_joint_pos = transition.get(TransitionKey.OBSERVATION)
if raw_joint_pos is not None:
# Send follower position to leader (for follow mode)
# follower_action = {
# f"{motor}.pos": float(raw_joint_pos[motor])
# for motor in self.motor_names
# }
self.leader_device.send_action(raw_joint_pos)
# Only compute EE action if intervention is active
# (AddTeleopEventsAsInfo already added IS_INTERVENTION to info)
info = transition.get(TransitionKey.INFO, {})
if info.get(TeleopEvents.IS_INTERVENTION, False):
# Get leader joint positions from teleop_action
# (AddTeleopActionAsComplimentaryData already got the action)
complementary = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
teleop_action = complementary.get("teleop_action", {})
if isinstance(teleop_action, dict) and raw_joint_pos is not None:
leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names])
leader_ee = self.kinematics.forward_kinematics(leader_pos)
if self.use_ik_solution and "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA):
follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
else:
follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
follower_ee = self.kinematics.forward_kinematics(follower_pos)
# follower_gripper_pos = raw_joint_pos["gripper.pos"]
follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor
leader_ee_pos = leader_ee[:3, 3]
leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec()
leader_gripper_pos = np.clip(
teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos
)
follower_ee_pos = follower_ee[:3, 3]
# follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec()
delta_pos = leader_ee_pos - follower_ee_pos
# For rotation: compute relative rotation from follower to leader
# R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader
r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3]
delta_rvec = Rotation.from_matrix(r_delta).as_rotvec()
delta_gripper = leader_gripper_pos - follower_gripper_pos
desired = np.eye(4, dtype=float)
desired[:3, :3] = follower_ee[:3, :3] @ r_delta
desired[:3, 3] = follower_ee[:3, 3] + delta_pos
pos = desired[:3, 3]
tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec()
assert np.allclose(pos, leader_ee_pos), "Position delta computation error"
assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error"
assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), (
"Gripper delta computation error"
)
# Normalize the action to the range [-1, 1]
delta_pos = delta_pos / np.array(
[
self.end_effector_step_sizes["x"],
self.end_effector_step_sizes["y"],
self.end_effector_step_sizes["z"],
]
)
delta_rvec = delta_rvec / np.array(
[
self.end_effector_step_sizes["wx"],
self.end_effector_step_sizes["wy"],
self.end_effector_step_sizes["wz"],
]
)
max_normalized_pos = max(
abs(delta_pos[0]),
abs(delta_pos[1]),
abs(delta_pos[2]),
)
normalized_rot = max(abs(delta_rvec[0]), abs(delta_rvec[1]), abs(delta_rvec[2]))
max_normalized = max(max_normalized_pos, normalized_rot)
if max_normalized > 1.0:
# Scale proportionally
delta_pos = delta_pos / max_normalized
delta_rvec = delta_rvec / max_normalized
intervention_action = np.array(
[
delta_pos[0],
delta_pos[1],
delta_pos[2],
delta_rvec[0],
delta_rvec[1],
delta_rvec[2],
np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos)
/ self.max_gripper_pos,
],
dtype=float,
)
# # Extract leader positions from teleop action dict
# # leader_pos = np.array([teleop_action.get(f"{motor}.pos", 0) for motor in self.motor_names])
# # follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
# teleop_action = self.leader_device.bus.sync_read("Present_Position")
# raw_joint_pos = self.robot.bus.sync_read("Present_Position")
# leader_pos = np.array([teleop_action.get(f"{motor}", 0) for motor in self.motor_names])
# follower_pos = np.array([raw_joint_pos[f"{motor}"] for motor in self.motor_names])
# # Compute EE positions
# leader_ee_fi = self.kinematics.forward_kinematics(leader_pos)
# leader_ee_pos = leader_ee_fi[:3, 3]
# # leader_ee_rot = Rotation.from_matrix(leader_ee_fi[:3, :3]).as_rotvec()
# leader_ee = np.concat([leader_ee_pos, [0,0,0]])
# if "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA):
# follower_ee = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
# else:
# follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
# follower_ee_fi = self.kinematics.forward_kinematics(follower_pos)
# follower_ee_pos = follower_ee_fi[:3, 3]
# # follower_ee_rot = Rotation.from_matrix(follower_ee_fi[:3, :3]).as_rotvec()
# follower_ee = np.concat([follower_ee_pos, [0,0,0]])
# # Compute normalized EE delta
# if self.end_effector_step_sizes is not None:
# ee_delta = np.clip(
# leader_ee - follower_ee,
# -self.end_effector_step_sizes,
# self.end_effector_step_sizes
# )
# ee_delta_normalized = ee_delta / self.end_effector_step_sizes
# else:
# ee_delta_normalized = leader_ee - follower_ee
# # Handle gripper
# if self.use_gripper and len(leader_pos) > 3:
# if self.prev_leader_gripper is None:
# self.prev_leader_gripper = np.clip(
# leader_pos[-1], 0, self.max_gripper_pos
# )
# leader_gripper = leader_pos[-1]
# gripper_delta = leader_gripper - self.prev_leader_gripper
# normalized_delta = gripper_delta / self.max_gripper_pos
# # Quantize gripper action
# if normalized_delta >= 0.3:
# gripper_action = 2
# elif normalized_delta <= -0.1:
# gripper_action = 0
# else:
# gripper_action = 1
# self.prev_leader_gripper = leader_gripper
# # Create intervention action
# intervention_action = np.append(ee_delta_normalized, gripper_action)
# else:
# intervention_action = ee_delta_normalized
# # Override teleop_action with computed EE action
complementary["teleop_action"] = torch.from_numpy(intervention_action).float()
transition[TransitionKey.COMPLEMENTARY_DATA] = complementary # type: ignore[misc]
return transition
def reset(self) -> None:
"""Reset leader-follower state."""
# self.prev_leader_gripper = None
if hasattr(self.leader_device, "reset"):
self.leader_device.reset()
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
return features
@@ -18,6 +18,7 @@ from dataclasses import dataclass, field
from typing import Any from typing import Any
import numpy as np import numpy as np
import torch
from lerobot.configs import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.configs import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.model import RobotKinematics from lerobot.model import RobotKinematics
@@ -31,6 +32,7 @@ from lerobot.processor import (
RobotObservation, RobotObservation,
TransitionKey, TransitionKey,
) )
from lerobot.utils.constants import OBS_STATE
from lerobot.utils.rotation import Rotation from lerobot.utils.rotation import Rotation
@@ -126,9 +128,18 @@ class EEReferenceAndDelta(RobotActionProcessorStep):
], ],
dtype=float, dtype=float,
) )
r_abs = Rotation.from_rotvec([wx, wy, wz]).as_matrix() delta_r = np.array(
[
wx * self.end_effector_step_sizes.get("wx", 1),
wy * self.end_effector_step_sizes.get("wy", 1),
wz * self.end_effector_step_sizes.get("wz", 1),
],
dtype=float,
)
r_mat = Rotation.from_rotvec(delta_r).as_matrix()
desired = np.eye(4, dtype=float) desired = np.eye(4, dtype=float)
desired[:3, :3] = ref[:3, :3] @ r_abs desired[:3, :3] = ref[:3, :3] @ r_mat
desired[:3, 3] = ref[:3, 3] + delta_p desired[:3, 3] = ref[:3, 3] + delta_p
self._command_when_disabled = desired.copy() self._command_when_disabled = desired.copy()
@@ -361,6 +372,8 @@ class GripperVelocityToJoint(RobotActionProcessorStep):
clip_min: float = 0.0 clip_min: float = 0.0
clip_max: float = 100.0 clip_max: float = 100.0
discrete_gripper: bool = False discrete_gripper: bool = False
scale_velocity: bool = False
use_ik_solution: bool = False
def action(self, action: RobotAction) -> RobotAction: def action(self, action: RobotAction) -> RobotAction:
observation = self.transition.get(TransitionKey.OBSERVATION).copy() observation = self.transition.get(TransitionKey.OBSERVATION).copy()
@@ -370,14 +383,17 @@ class GripperVelocityToJoint(RobotActionProcessorStep):
if observation is None: if observation is None:
raise ValueError("Joints observation is require for computing robot kinematics") raise ValueError("Joints observation is require for computing robot kinematics")
q_raw = np.array( if self.use_ik_solution and "IK_solution" in self.transition.get(TransitionKey.COMPLEMENTARY_DATA):
[float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")], q_raw = self.transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
dtype=float, else:
) q_raw = np.array(
[float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")],
dtype=float,
)
if q_raw is None: if q_raw is None:
raise ValueError("Joints observation is require for computing robot kinematics") raise ValueError("Joints observation is require for computing robot kinematics")
if self.discrete_gripper: if self.discrete_gripper or self.scale_velocity:
# Map discrete command {0=close, 1=stay, 2=open} -> signed velocity. # Map discrete command {0=close, 1=stay, 2=open} -> signed velocity.
# Negation accounts for SO100 sign (joint position increases on close). # Negation accounts for SO100 sign (joint position increases on close).
# 0 -> +clip_max (close), 1 -> 0 (stay), 2 -> -clip_max (open) # 0 -> +clip_max (close), 1 -> 0 (stay), 2 -> -clip_max (open)
@@ -579,6 +595,7 @@ class InverseKinematicsRLStep(ProcessorStep):
# Compute inverse kinematics # Compute inverse kinematics
q_target = self.kinematics.inverse_kinematics(self.q_curr, t_des) q_target = self.kinematics.inverse_kinematics(self.q_curr, t_des)
q_target[-1] = gripper_pos # Set gripper position
self.q_curr = q_target self.q_curr = q_target
# TODO: This is sentitive to order of motor_names = q_target mapping # TODO: This is sentitive to order of motor_names = q_target mapping
@@ -610,3 +627,50 @@ class InverseKinematicsRLStep(ProcessorStep):
def reset(self): def reset(self):
"""Resets the initial guess for the IK solver.""" """Resets the initial guess for the IK solver."""
self.q_curr = None self.q_curr = None
@dataclass
@ProcessorStepRegistry.register("ee_observation")
class EEObservationStep(ObservationProcessorStep):
use_rotation: bool = False
def observation(self, observation: dict) -> dict:
ee_pose_list = [
observation["ee.x"],
observation["ee.y"],
observation["ee.z"],
]
if self.use_rotation:
ee_pose_list.extend(
[
observation["ee.wx"],
observation["ee.wy"],
observation["ee.wz"],
]
)
# gripper_pos = action.pop("ee.gripper_pos")
ee_pose = torch.tensor(ee_pose_list, dtype=torch.float32).unsqueeze(0)
current_state = observation.get(OBS_STATE)
if current_state is None:
return observation
extended_state = torch.cat([current_state, ee_pose], dim=-1)
# Create new observation dict
new_observation = dict(observation)
new_observation[OBS_STATE] = extended_state
return new_observation
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
if OBS_STATE in features[PipelineFeatureType.OBSERVATION]:
original_feature = features[PipelineFeatureType.OBSERVATION][OBS_STATE]
new_shape = (original_feature.shape[0] + 3,) + original_feature.shape[1:]
features[PipelineFeatureType.OBSERVATION][OBS_STATE] = PolicyFeature(
type=original_feature.type, shape=new_shape
)
return features
@@ -20,6 +20,7 @@ from .config_so_leader import (
SOLeaderConfig, SOLeaderConfig,
SOLeaderTeleopConfig, SOLeaderTeleopConfig,
) )
from .so101_leader_follower import SO101LeaderFollower
from .so_leader import SO100Leader, SO101Leader, SOLeader from .so_leader import SO100Leader, SO101Leader, SOLeader
__all__ = [ __all__ = [
@@ -29,6 +29,11 @@ class SOLeaderConfig:
# Whether to use degrees for angles # Whether to use degrees for angles
use_degrees: bool = True use_degrees: bool = True
# Enable leader-follower mode where leader can both lead and follow
leader_follower_mode: bool = False
use_gripper: bool = True
@TeleoperatorConfig.register_subclass("so101_leader") @TeleoperatorConfig.register_subclass("so101_leader")
@TeleoperatorConfig.register_subclass("so100_leader") @TeleoperatorConfig.register_subclass("so100_leader")
@@ -0,0 +1,259 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import time
from collections import deque
from threading import Event, Thread
import numpy as np
from lerobot.teleoperators.so101_leader.so101_leader import SO101Leader
from lerobot.teleoperators.utils import TeleopEvents
PYNPUT_AVAILABLE = True
try:
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
logging.info("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
from pynput import keyboard
except ImportError:
keyboard = None
PYNPUT_AVAILABLE = False
except Exception as e:
keyboard = None
PYNPUT_AVAILABLE = False
logging.info(f"Could not import pynput: {e}")
logger = logging.getLogger(__name__)
class SO101LeaderFollower(SO101Leader):
"""
Extended SO101 Leader that can both lead (human control) and follow (mimic follower).
This class adds leader-follower functionality where:
- In follow mode: The leader arm mimics the follower's position (torque enabled)
- In lead mode: Human controls the leader (torque disabled) and provides actions
"""
def __init__(self, config):
super().__init__(config)
# Leader-follower state
self.is_intervening = False
self.leader_torque_enabled = True
# Tracking error for automatic intervention detection
self.leader_tracking_error_queue = deque(maxlen=4)
# Keyboard event handling
self.keyboard_events = {
"intervention": False,
"success": False,
"failure": False,
"rerecord": False,
}
self.keyboard_thread = None
self.stop_event = Event()
# Store last follower position for action computation
self.last_follower_pos = None
@property
def action_features(self) -> dict:
if self.config.use_gripper:
return {
"dtype": "float32",
"shape": (7,),
"names": {
"delta_x": 0,
"delta_y": 1,
"delta_z": 2,
"delta_wx": 3,
"delta_wy": 4,
"delta_wz": 5,
"gripper": 6,
},
}
else:
return {
"dtype": "float32",
"shape": (6,),
"names": {
"delta_x": 0,
"delta_y": 1,
"delta_z": 2,
"delta_wx": 3,
"delta_wy": 4,
"delta_wz": 5,
},
}
def connect(self, calibrate: bool = True) -> None:
"""Connect and configure for leader-follower mode."""
super().connect(calibrate)
# Configure for leader-follower mode with lower gains
# Lower gains allow manual intervention without injury risk
# self.bus.sync_write("Torque_Enable", 1)
for motor in self.bus.motors:
self.bus.write("P_Coefficient", motor, 16)
self.bus.write("I_Coefficient", motor, 0)
self.bus.write("D_Coefficient", motor, 16)
# Start keyboard listener
self._start_keyboard_listener()
print("- Leader-Follower Mode:")
print(" - Press SPACE to toggle intervention (leader control)")
print(" - When not intervening, leader follows follower position")
print(" - When intervening, follower follows leader in end-effector space")
print(" - Press 's' to mark episode as success")
print(" - Press ESC to end episode as failure")
print(" - Press 'r' to re-record episode")
def _start_keyboard_listener(self):
"""Start keyboard listener thread for intervention control."""
def on_press(key):
try:
if key == keyboard.Key.space:
self.keyboard_events["intervention"] = not self.keyboard_events["intervention"]
self.is_intervening = self.keyboard_events["intervention"]
state = "INTERVENTION MODE" if self.is_intervening else "FOLLOWING MODE"
logger.info(f"Toggled to {state}")
elif key == keyboard.Key.esc:
self.keyboard_events["failure"] = True
elif hasattr(key, "char"):
if key.char == "s":
self.keyboard_events["success"] = True
elif key.char == "r":
self.keyboard_events["rerecord"] = True
except Exception as e:
logger.error(f"Error handling key press: {e}")
def listen():
with keyboard.Listener(on_press=on_press) as listener:
while not self.stop_event.is_set():
time.sleep(0.1)
listener.stop()
self.keyboard_thread = Thread(target=listen, daemon=True)
self.keyboard_thread.start()
def send_action(self, action: dict[str, float]) -> None:
"""
Send position commands to leader arm (follow mode).
Args:
action: Dictionary of motor positions to command
"""
# Store follower position for later use
self.last_follower_pos = np.array([action.get(f"{motor}.pos", 0) for motor in self.bus.motors])
if not self.is_intervening:
# Follow mode: enable torque and track follower
if not self.leader_torque_enabled:
self.bus.sync_write("Torque_Enable", 1)
self.leader_torque_enabled = True
# Send follower positions to leader
goal_pos = {motor: action[f"{motor}.pos"] for motor in self.bus.motors}
self.bus.sync_write("Goal_Position", goal_pos)
# Track error for automatic intervention detection
current_pos = self.bus.sync_read("Present_Position")
current_array = np.array([current_pos[motor] for motor in self.bus.motors])
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
self.leader_tracking_error_queue.append(error)
def get_action(self) -> dict[str, float]:
"""
Get action from leader arm.
In follow mode: Returns neutral/current positions
In lead mode: Returns actual leader positions for follower to track
"""
start = time.perf_counter()
if self.is_intervening:
# Lead mode: disable torque if needed and return leader positions
if self.leader_torque_enabled:
self.bus.sync_write("Torque_Enable", 0)
self.leader_torque_enabled = False
# Get current leader position
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
# Track error
if self.last_follower_pos is not None:
current_array = np.array([action[f"{motor}.pos"] for motor in self.bus.motors])
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
self.leader_tracking_error_queue.append(error)
else:
# Follow mode: return current/neutral positions
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def get_teleop_events(self) -> dict[TeleopEvents, bool]:
"""Get current keyboard events."""
events = {}
# Map keyboard events to TeleopEvents
if self.keyboard_events["success"]:
events[TeleopEvents.SUCCESS] = True
self.keyboard_events["success"] = False
if self.keyboard_events["failure"]:
events[TeleopEvents.FAILURE] = True
events[TeleopEvents.TERMINATE_EPISODE] = True
self.keyboard_events["failure"] = False
if self.keyboard_events["rerecord"]:
events[TeleopEvents.RERECORD_EPISODE] = True
events[TeleopEvents.TERMINATE_EPISODE] = True
self.keyboard_events["rerecord"] = False
# Always report intervention state
events[TeleopEvents.IS_INTERVENTION] = self.is_intervening
return events
def disconnect(self) -> None:
"""Disconnect and cleanup."""
self.stop_event.set()
if self.keyboard_thread:
self.keyboard_thread.join(timeout=1.0)
super().disconnect()
def reset(self) -> None:
"""Reset leader-follower state."""
self.is_intervening = False
self.leader_torque_enabled = True
self.leader_tracking_error_queue.clear()
self.keyboard_events = {
"intervention": False,
"success": False,
"failure": False,
"rerecord": False,
}
+3 -2
View File
@@ -52,9 +52,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
return SO100Leader(config) return SO100Leader(config)
elif config.type == "so101_leader": elif config.type == "so101_leader":
from .so_leader import SO101Leader from .so_leader import SO101LeaderFollower
return SO101Leader(config) if getattr(config, "leader_follower_mode", False):
return SO101LeaderFollower(config)
elif config.type == "mock_teleop": elif config.type == "mock_teleop":
from tests.mocks.mock_teleop import MockTeleop from tests.mocks.mock_teleop import MockTeleop