Add leader-follower processor and SO101 leader-follower teleoperator

- Introduced `LeaderFollowerProcessor` for managing leader-follower teleoperation logic, including position tracking and end-effector action computation.
- Added `SO101LeaderFollower` class to extend the SO101 leader functionality, enabling both leading and following modes with keyboard event handling for intervention control.
- updated docs
This commit is contained in:
Michel Aractingi
2025-08-12 17:56:53 +02:00
parent fe7c368630
commit f76a108b08
8 changed files with 386 additions and 11 deletions
+9 -2
View File
@@ -320,6 +320,8 @@ python -m lerobot.scripts.find_joint_limits \
--teleop.id=blue
```
Note: You can also use `so101_leader` as the teleop type if you have the SO101 leader arm with reduced gears for smoother teleoperation.
**Workflow**
1. Run the script and move the robot through the space that solves the task
@@ -476,7 +478,7 @@ To setup the gamepad, you need to set the `control_mode` to `"gamepad"` and defi
The SO101 leader arm has reduced gears that allows it to move and track the follower arm during exploration. Therefore, taking over is much smoother than the gearless SO100.
To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file.
To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file with `leader_follower_mode` enabled:
```json
{
@@ -484,7 +486,8 @@ To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and
"teleop": {
"type": "so101_leader",
"port": "/dev/tty.usbmodem585A0077921",
"use_degrees": true
"use_degrees": true,
"leader_follower_mode": true
},
"processor": {
"control_mode": "leader",
@@ -496,6 +499,10 @@ To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and
}
```
The `leader_follower_mode` enables the leader arm to automatically track the follower's position when you're not intervening. This creates a seamless teleoperation experience where:
- When not intervening: the leader arm follows the follower arm's position
- When intervening (press `space`): you control the leader arm, and the follower tracks it in end-effector space
In order to annotate the success/failure of the episode, **you will need** to use a keyboard to press `s` for success, `esc` for failure.
During the online training, press `space` to take over the policy and `space` again to give the control back to the policy.
+2
View File
@@ -28,6 +28,7 @@ from .hil_processor import (
TimeLimitProcessor,
)
from .joint_observations_processor import JointVelocityProcessor, MotorCurrentProcessor
from .leader_follower_processor import LeaderFollowerProcessor
from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor, hotswap_stats
from .observation_processor import VanillaObservationProcessor
from .pipeline import (
@@ -62,6 +63,7 @@ __all__ = [
"InfoProcessor",
"InterventionActionProcessor",
"JointVelocityProcessor",
"LeaderFollowerProcessor",
"MapDeltaActionToRobotAction",
"MotorCurrentProcessor",
"NormalizerProcessor",
@@ -0,0 +1,127 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import torch
from dataclasses import dataclass
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
from lerobot.teleoperators.utils import TeleopEvents
from lerobot.teleoperators import Teleoperator
from lerobot.robots import Robot
@ProcessorStepRegistry.register("leader_follower_processor")
@dataclass
class LeaderFollowerProcessor:
"""
Processor for leader-follower teleoperation mode.
This processor:
1. Sends follower positions to leader arm when not intervening
2. Computes EE delta actions from leader when intervening
3. Handles teleop events from the leader device
"""
leader_device: Teleoperator
motor_names: list[str]
robot: Robot
kinematics: RobotKinematics
end_effector_step_sizes: np.ndarray | None = None
use_gripper: bool = True
prev_leader_gripper: float | None = None
max_gripper_pos: float = 100.0
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Process transition with leader-follower logic."""
# Get current follower position from complementary data
raw_joint_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get("raw_joint_positions")
if raw_joint_pos is not None:
# Send follower position to leader (for follow mode)
follower_action = {
f"{motor}.pos": float(raw_joint_pos[motor])
for motor in self.motor_names
}
self.leader_device.send_action(follower_action)
# Only compute EE action if intervention is active
# (AddTeleopEventsAsInfo already added IS_INTERVENTION to info)
info = transition.get(TransitionKey.INFO, {})
if info.get(TeleopEvents.IS_INTERVENTION, False):
# Get leader joint positions from teleop_action
# (AddTeleopActionAsComplimentaryData already got the action)
complementary = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
teleop_action = complementary.get("teleop_action", {})
if isinstance(teleop_action, dict) and raw_joint_pos is not None:
# Extract leader positions from teleop action dict
leader_pos = np.array([teleop_action.get(f"{motor}.pos", 0) for motor in self.motor_names])
follower_pos = np.array([raw_joint_pos[motor] for motor in self.motor_names])
# Compute EE positions
leader_ee = self.kinematics.forward_kinematics(leader_pos)[:3, 3]
follower_ee = self.kinematics.forward_kinematics(follower_pos)[:3, 3]
# Compute normalized EE delta
if self.end_effector_step_sizes is not None:
ee_delta = np.clip(
leader_ee - follower_ee,
-self.end_effector_step_sizes,
self.end_effector_step_sizes
)
ee_delta_normalized = ee_delta / self.end_effector_step_sizes
else:
ee_delta_normalized = leader_ee - follower_ee
# Handle gripper
if self.use_gripper and len(leader_pos) > 3:
if self.prev_leader_gripper is None:
self.prev_leader_gripper = np.clip(
leader_pos[-1], 0, self.max_gripper_pos
)
leader_gripper = leader_pos[-1]
gripper_delta = leader_gripper - self.prev_leader_gripper
normalized_delta = gripper_delta / self.max_gripper_pos
# Quantize gripper action
if normalized_delta >= 0.3:
gripper_action = 2
elif normalized_delta <= -0.1:
gripper_action = 0
else:
gripper_action = 1
self.prev_leader_gripper = leader_gripper
# Create intervention action
intervention_action = np.append(ee_delta_normalized, gripper_action)
else:
intervention_action = ee_delta_normalized
# Override teleop_action with computed EE action
complementary["teleop_action"] = torch.from_numpy(intervention_action).float()
transition[TransitionKey.COMPLEMENTARY_DATA] = complementary # type: ignore[misc]
return transition
def reset(self) -> None:
"""Reset leader-follower state."""
self.prev_leader_gripper = None
if hasattr(self.leader_device, "reset"):
self.leader_device.reset()
+28 -8
View File
@@ -36,6 +36,7 @@ from lerobot.processor import (
ImageCropResizeProcessor,
InterventionActionProcessor,
JointVelocityProcessor,
LeaderFollowerProcessor,
MapDeltaActionToRobotAction,
MapTensorToDeltaActionDict,
MotorCurrentProcessor,
@@ -52,6 +53,7 @@ from lerobot.robots import ( # noqa: F401
RobotConfig,
make_robot_from_config,
so100_follower,
so101_follower,
)
from lerobot.robots.robot import Robot
from lerobot.robots.so100_follower.robot_kinematic_processor import (
@@ -68,6 +70,7 @@ from lerobot.teleoperators import (
make_teleoperator_from_config,
so101_leader, # noqa: F401
)
from lerobot.teleoperators.so101_leader.so101_leader_follower import SO101LeaderFollower
from lerobot.teleoperators.teleoperator import Teleoperator
from lerobot.teleoperators.utils import TeleopEvents
from lerobot.utils.robot_utils import busy_wait
@@ -113,7 +116,7 @@ def create_transition(
}
def reset_follower_position(robot_arm: Robot, target_position: np.ndarray) -> None:
def reset_follower_position(robot_arm: Robot | SO101LeaderFollower, target_position: np.ndarray) -> None:
"""Reset robot arm to target position using smooth trajectory."""
current_position_dict = robot_arm.bus.sync_read("Present_Position")
current_position = np.array(
@@ -458,15 +461,36 @@ def make_processors(
env_pipeline_steps.append(ToBatchProcessor())
env_pipeline_steps.append(DeviceProcessor(device=device))
# Get control mode
control_mode = cfg.processor.control_mode if cfg.processor is not None else "gamepad"
action_pipeline_steps = [
AddTeleopActionAsComplimentaryData(teleop_device=teleop_device),
AddTeleopEventsAsInfo(teleop_device=teleop_device),
AddRobotObservationAsComplimentaryData(robot=env.robot),
AddRobotObservationAsComplimentaryData(robot=env.robot)
]
# Check for leader control mode
if control_mode == "leader":
assert isinstance(teleop_device, SO101LeaderFollower), "Leader control mode requires SO101LeaderFollower teleop device"
action_pipeline_steps.append(
LeaderFollowerProcessor(
leader_device=teleop_device,
motor_names=motor_names,
robot=env.robot,
kinematics=kinematics_solver,
end_effector_step_sizes=np.array(list(cfg.processor.inverse_kinematics.end_effector_step_sizes.values())),
use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False,
max_gripper_pos=cfg.processor.max_gripper_pos if cfg.processor.max_gripper_pos is not None else 100.0,
)
)
# Standard teleop mode (gamepad, keyboard, etc.)
action_pipeline_steps.append(
InterventionActionProcessor(
use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False,
terminate_on_success=terminate_on_success,
),
]
)
# Replace InverseKinematicsProcessor with new kinematic processors
if cfg.processor.inverse_kinematics is not None and kinematics_solver is not None:
@@ -572,11 +596,7 @@ def control_loop(
dt = 1.0 / cfg.env.fps
print(f"Starting control loop at {cfg.env.fps} FPS")
print("Controls:")
print("- Use gamepad/teleop device for intervention")
print("- When not intervening, robot will stay still")
print("- Press Ctrl+C to exit")
# Reset environment and processors
obs, info = env.reset()
complementary_data = (
@@ -16,3 +16,4 @@
from .config_so101_leader import SO101LeaderConfig
from .so101_leader import SO101Leader
from .so101_leader_follower import SO101LeaderFollower
@@ -26,3 +26,6 @@ class SO101LeaderConfig(TeleoperatorConfig):
port: str
use_degrees: bool = False
# Enable leader-follower mode where leader can both lead and follow
leader_follower_mode: bool = False
@@ -0,0 +1,212 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from collections import deque
from threading import Event, Thread
import numpy as np
from pynput import keyboard
from lerobot.teleoperators.so101_leader.so101_leader import SO101Leader
from lerobot.teleoperators.utils import TeleopEvents
logger = logging.getLogger(__name__)
class SO101LeaderFollower(SO101Leader):
"""
Extended SO101 Leader that can both lead (human control) and follow (mimic follower).
This class adds leader-follower functionality where:
- In follow mode: The leader arm mimics the follower's position (torque enabled)
- In lead mode: Human controls the leader (torque disabled) and provides actions
"""
def __init__(self, config):
super().__init__(config)
# Leader-follower state
self.is_intervening = False
self.leader_torque_enabled = True
# Tracking error for automatic intervention detection
self.leader_tracking_error_queue = deque(maxlen=4)
# Keyboard event handling
self.keyboard_events = {
"intervention": False,
"success": False,
"failure": False,
"rerecord": False,
}
self.keyboard_thread = None
self.stop_event = Event()
# Store last follower position for action computation
self.last_follower_pos = None
def connect(self, calibrate: bool = True) -> None:
"""Connect and configure for leader-follower mode."""
super().connect(calibrate)
# Configure for leader-follower mode with lower gains
# Lower gains allow manual intervention without injury risk
self.bus.sync_write("Torque_Enable", 1)
for motor in self.bus.motors:
self.bus.write("P_Coefficient", motor, 16)
self.bus.write("I_Coefficient", motor, 0)
self.bus.write("D_Coefficient", motor, 16)
# Start keyboard listener
self._start_keyboard_listener()
print("- Leader-Follower Mode:")
print(" - Press SPACE to toggle intervention (leader control)")
print(" - When not intervening, leader follows follower position")
print(" - When intervening, follower follows leader in end-effector space")
print(" - Press 's' to mark episode as success")
print(" - Press ESC to end episode as failure")
print(" - Press 'r' to re-record episode")
def _start_keyboard_listener(self):
"""Start keyboard listener thread for intervention control."""
def on_press(key):
try:
if key == keyboard.Key.space:
self.keyboard_events["intervention"] = not self.keyboard_events["intervention"]
self.is_intervening = self.keyboard_events["intervention"]
state = "INTERVENTION MODE" if self.is_intervening else "FOLLOWING MODE"
logger.info(f"Toggled to {state}")
elif key == keyboard.Key.esc:
self.keyboard_events["failure"] = True
elif hasattr(key, 'char'):
if key.char == 's':
self.keyboard_events["success"] = True
elif key.char == 'r':
self.keyboard_events["rerecord"] = True
except Exception as e:
logger.error(f"Error handling key press: {e}")
def listen():
with keyboard.Listener(on_press=on_press) as listener:
while not self.stop_event.is_set():
time.sleep(0.1)
listener.stop()
self.keyboard_thread = Thread(target=listen, daemon=True)
self.keyboard_thread.start()
def send_action(self, action: dict[str, float]) -> None:
"""
Send position commands to leader arm (follow mode).
Args:
action: Dictionary of motor positions to command
"""
# Store follower position for later use
self.last_follower_pos = np.array([action.get(f"{motor}.pos", 0) for motor in self.bus.motors])
if not self.is_intervening:
# Follow mode: enable torque and track follower
if not self.leader_torque_enabled:
self.bus.sync_write("Torque_Enable", 1)
self.leader_torque_enabled = True
# Send follower positions to leader
goal_pos = {motor: action[f"{motor}.pos"] for motor in self.bus.motors}
self.bus.sync_write("Goal_Position", goal_pos)
# Track error for automatic intervention detection
current_pos = self.bus.sync_read("Present_Position")
current_array = np.array([current_pos[motor] for motor in self.bus.motors])
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
self.leader_tracking_error_queue.append(error)
def get_action(self) -> dict[str, float]:
"""
Get action from leader arm.
In follow mode: Returns neutral/current positions
In lead mode: Returns actual leader positions for follower to track
"""
start = time.perf_counter()
if self.is_intervening:
# Lead mode: disable torque if needed and return leader positions
if self.leader_torque_enabled:
self.bus.sync_write("Torque_Enable", 0)
self.leader_torque_enabled = False
# Get current leader position
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
# Track error
if self.last_follower_pos is not None:
current_array = np.array([action[f"{motor}.pos"] for motor in self.bus.motors])
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
self.leader_tracking_error_queue.append(error)
else:
# Follow mode: return current/neutral positions
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def get_teleop_events(self) -> dict[TeleopEvents, bool]:
"""Get current keyboard events."""
events = {}
# Map keyboard events to TeleopEvents
if self.keyboard_events["success"]:
events[TeleopEvents.SUCCESS] = True
self.keyboard_events["success"] = False
if self.keyboard_events["failure"]:
events[TeleopEvents.FAILURE] = True
events[TeleopEvents.TERMINATE_EPISODE] = True
self.keyboard_events["failure"] = False
if self.keyboard_events["rerecord"]:
events[TeleopEvents.RERECORD_EPISODE] = True
events[TeleopEvents.TERMINATE_EPISODE] = True
self.keyboard_events["rerecord"] = False
# Always report intervention state
events[TeleopEvents.IS_INTERVENTION] = self.is_intervening
return events
def disconnect(self) -> None:
"""Disconnect and cleanup."""
self.stop_event.set()
if self.keyboard_thread:
self.keyboard_thread.join(timeout=1.0)
super().disconnect()
def reset(self) -> None:
"""Reset leader-follower state."""
self.is_intervening = False
self.leader_torque_enabled = True
self.leader_tracking_error_queue.clear()
self.keyboard_events = {
"intervention": False,
"success": False,
"failure": False,
"rerecord": False
}
+4 -1
View File
@@ -42,7 +42,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
return SO100Leader(config)
elif config.type == "so101_leader":
from .so101_leader import SO101Leader
from .so101_leader import SO101Leader, SO101LeaderFollower
if getattr(config, "leader_follower_mode", False):
return SO101LeaderFollower(config)
return SO101Leader(config)
elif config.type == "stretch3":