Compare commits

...

3 Commits

9 changed files with 1121 additions and 5 deletions
@@ -0,0 +1,170 @@
# !/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SO100 leader / follower teleop with HIL-SERL-style intervention toggle.
This is a position-only standalone demo of the leader-arm intervention pattern
used by the HIL-SERL training stack (see ``lerobot.processor.LeaderArmInterventionStep``
and ``lerobot.teleoperators.so_leader.SOLeaderFollower``).
Behaviour:
* **Following mode** (default): The follower is idle, the leader is
torque-enabled and haptically tracks the follower's pose. The user can
grab the leader at any time without fighting the position loop.
* **Intervention mode** (toggled by pressing SPACE): The leader's torque is
released, the user moves the leader freely and the follower mirrors the
leader's end-effector position via ``[delta_x, delta_y, delta_z]`` deltas,
identical to how the real HIL-SERL action pipeline records interventions.
Keyboard:
* ``SPACE`` -- toggle intervention on/off.
* ``q`` -- exit the loop cleanly.
"""
from __future__ import annotations
import time
import numpy as np
from lerobot.model.kinematics import RobotKinematics
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.teleoperators.so_leader import SOLeaderFollower, SOLeaderTeleopConfig
from lerobot.teleoperators.utils import TeleopEvents
from lerobot.utils.robot_utils import precise_sleep
FPS = 30
# Per-axis EE-delta normalization (metres). Same convention as
# `LeaderArmInterventionStep`: the normalised delta is `(p_leader - p_follower) / step`,
# clipped to [-1, 1]. Keep these small so a single tick is a safe motion.
EE_STEP_SIZES = {"x": 0.010, "y": 0.010, "z": 0.010}
# Workspace bounds (metres) -- a tight box around the resting pose to keep the
# follower from running into its joint limits during the demo.
EE_BOUNDS = {"min": np.array([-0.20, -0.30, 0.02]), "max": np.array([0.30, 0.30, 0.40])}
URDF_PATH = "./SO101/so101_new_calib.urdf"
TARGET_FRAME = "gripper_frame_link"
def _joints_dict_to_array(joints: dict[str, float], motor_names: list[str]) -> np.ndarray:
return np.array([joints[f"{m}.pos"] for m in motor_names], dtype=float)
def _array_to_joints_dict(arr: np.ndarray, motor_names: list[str]) -> dict[str, float]:
return {f"{m}.pos": float(v) for m, v in zip(motor_names, arr, strict=True)}
def main() -> None:
follower_config = SO100FollowerConfig(
port="/dev/tty.usbmodem5A460814411", id="my_follower_arm", use_degrees=True
)
leader_config = SOLeaderTeleopConfig(
port="/dev/tty.usbmodem5A460819811",
id="my_leader_arm",
use_degrees=True,
leader_follower_mode=True,
use_gripper=True,
)
follower = SO100Follower(follower_config)
leader = SOLeaderFollower(leader_config)
follower_motor_names = list(follower.bus.motors.keys())
leader_motor_names = list(leader.bus.motors.keys())
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
follower_kinematics = RobotKinematics(
urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=follower_motor_names
)
leader_kinematics = RobotKinematics(
urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=leader_motor_names
)
follower.connect()
leader.connect()
print("Starting leader-follower intervention demo...")
print(" - Press SPACE to toggle intervention.")
print(" - Press 'q' to exit.")
try:
while True:
t0 = time.perf_counter()
# 1. Read both arms.
follower_obs = follower.get_observation()
follower_joints_dict = {f"{m}.pos": float(follower_obs[f"{m}.pos"]) for m in follower_motor_names}
leader_joints_dict = leader.get_action()
# 2. Haptic follow: push follower joints back to the leader. The
# leader's `send_action` gates motor writes on its intervention
# state internally (torque on while following, off while intervening).
leader.send_action(follower_joints_dict)
# 3. Pull teleop events (SPACE toggle, 'q' terminate).
events = leader.get_teleop_events()
if events.get(TeleopEvents.TERMINATE_EPISODE):
print("Termination requested -- exiting.")
break
is_intervention = events.get(TeleopEvents.IS_INTERVENTION, False)
if is_intervention:
# 4a. Compute leader/follower EE poses, take the *normalised
# position-only delta*, and integrate it onto the follower's
# current EE pose to get a target. This mirrors the action
# space recorded by `LeaderArmInterventionStep` during HIL-SERL.
leader_arr = _joints_dict_to_array(leader_joints_dict, leader_motor_names)
follower_arr = _joints_dict_to_array(follower_joints_dict, follower_motor_names)
p_leader = leader_kinematics.forward_kinematics(leader_arr)[:3, 3]
p_follower_mat = follower_kinematics.forward_kinematics(follower_arr)
p_follower = p_follower_mat[:3, 3]
raw_delta = p_leader - p_follower
step_vec = np.array([EE_STEP_SIZES["x"], EE_STEP_SIZES["y"], EE_STEP_SIZES["z"]], dtype=float)
delta_norm = np.clip(raw_delta / step_vec, -1.0, 1.0)
delta_m = delta_norm * step_vec
target_pose = p_follower_mat.copy()
target_pose[:3, 3] = np.clip(p_follower + delta_m, EE_BOUNDS["min"], EE_BOUNDS["max"])
# IK -> joint-space goal for the follower's arm chain. The
# gripper joint is kept separate and driven from the leader's
# gripper position directly (no IK).
target_joints = follower_kinematics.inverse_kinematics(
current_joint_pos=follower_arr,
desired_ee_pose=target_pose,
orientation_weight=0.0,
)
follower_action = _array_to_joints_dict(target_joints, follower_motor_names)
follower_action["gripper.pos"] = float(leader_joints_dict.get("gripper.pos", 50.0))
follower.send_action(follower_action)
# 4b. Following mode: leave the follower alone -- the leader just
# tracks it haptically. In real HIL-SERL training this is where the
# policy would step the follower forward.
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
finally:
leader.disconnect()
follower.disconnect()
if __name__ == "__main__":
main()
+2
View File
@@ -61,6 +61,7 @@ from .hil_processor import (
RewardClassifierProcessorStep,
TimeLimitProcessorStep,
)
from .leader_follower_processor import LeaderArmInterventionStep
from .newline_task_processor import NewLineTaskProcessorStep
from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep, hotswap_stats
from .observation_processor import VanillaObservationProcessorStep
@@ -122,6 +123,7 @@ __all__ = [
"ImageCropResizeProcessorStep",
"InfoProcessorStep",
"InterventionActionProcessorStep",
"LeaderArmInterventionStep",
"make_default_processors",
"make_default_teleop_action_processor",
"make_default_robot_action_processor",
@@ -0,0 +1,270 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor step for using a leader arm as the HIL-SERL intervention device.
Position-only port of the leader/follower control mode (no rotation): the leader
arm acts as a 4-D end-effector delta source ``[dx, dy, dz, gripper]`` for the
existing ``InterventionActionProcessorStep`` overriding pipeline.
The teleop_action returned by the leader is a flat dictionary of joint angles
(degrees) like ``{"shoulder_pan.pos": ..., ..., "gripper.pos": ...}``. This step
converts that into a normalised EE-delta dictionary by:
1. Running forward kinematics on the leader joints -> ``p_leader`` (xyz, m).
2. Running forward kinematics on the follower joints (read from the env
transition's observation / complementary data) -> ``p_follower`` (xyz, m).
3. Normalising ``p_leader - p_follower`` by ``end_effector_step_sizes`` and
clipping to ``[-1, 1]`` (matches the gamepad / keyboard EE convention).
4. Mapping the leader gripper position ``[0, 100]`` to the discrete
``{0=close, 1=stay, 2=open}`` action used by the SO follower.
The output is written back to ``complementary_data["teleop_action"]`` so the
rest of the action pipeline (``InterventionActionProcessorStep`` ->
``MapTensorToDeltaActionDictStep`` -> IK) is unchanged.
Additionally, when an optional ``teleop_device`` reference is provided, this
step also pushes the follower's raw joint positions back to the leader via
``teleop_device.send_action(follower_joints)`` every tick. Combined with
:class:`SOLeaderFollower.send_action`, this implements the **haptic follow**
behaviour from https://github.com/huggingface/lerobot/pull/2596: the leader
mimics the follower while the human is hands-off, then drops torque the
moment intervention is toggled so the user can grab and steer it.
"""
import logging
from dataclasses import dataclass, field
from typing import Any
import numpy as np
from lerobot.configs import PipelineFeatureType, PolicyFeature
from lerobot.model import RobotKinematics
from lerobot.types import EnvTransition, TransitionKey
from .pipeline import ProcessorStep, ProcessorStepRegistry
logger = logging.getLogger(__name__)
TELEOP_ACTION_KEY = "teleop_action"
RAW_JOINT_POSITIONS_KEY = "raw_joint_positions"
GRIPPER_KEY = "gripper"
# Leader gripper is in [0, 100] when calibrated.
LEADER_GRIPPER_OPEN_DEFAULT = 60.0
LEADER_GRIPPER_CLOSE_DEFAULT = 30.0
# Discrete gripper command convention (matches GripperVelocityToJoint).
GRIPPER_CLOSE = 0.0
GRIPPER_STAY = 1.0
GRIPPER_OPEN = 2.0
def _joint_dict_to_array(joint_dict: dict[str, float], motor_names: list[str]) -> np.ndarray | None:
"""Pull joint positions in ``motor_names`` order from a ``"<motor>.pos"`` dict.
Returns ``None`` if any motor is missing.
"""
out = np.zeros(len(motor_names), dtype=float)
for i, name in enumerate(motor_names):
v = joint_dict.get(f"{name}.pos")
if v is None:
return None
out[i] = float(v)
return out
@ProcessorStepRegistry.register("leader_arm_intervention")
@dataclass
class LeaderArmInterventionStep(ProcessorStep):
"""Convert leader joint positions in ``teleop_action`` into a 4-D EE-delta dict.
This step is intended to run **between** ``AddTeleopActionAsComplimentaryDataStep``
(which populates ``complementary_data["teleop_action"]`` with raw leader joint
angles) and ``InterventionActionProcessorStep`` (which expects a delta dict).
Attributes:
kinematics: Robot kinematic model shared with the follower; used for FK
on both the leader arm and the follower arm. Both arms must use the
same URDF joint order.
motor_names: Ordered joint names matching ``kinematics.joint_names``,
used to slice joint dicts.
end_effector_step_sizes: Per-axis normalisation in metres, e.g.
``{"x": 0.025, "y": 0.025, "z": 0.025}``. The clamped delta is
``(p_leader - p_follower) / step_size``.
use_gripper: When ``True``, append a discrete gripper command derived from
the leader gripper joint to the output dict.
leader_gripper_open: Threshold (>= ) above which the leader gripper is
considered ``open`` -> command ``2``.
leader_gripper_close: Threshold (<= ) below which the leader gripper is
considered ``closed`` -> command ``0``.
teleop_device: Optional reference to the leader teleoperator. When set
and the device implements ``send_action(action_dict)``, this step
pushes the follower's raw joints to it every tick to drive haptic
follow. The teleop is responsible for gating actual motor writes on
its own intervention state (see :class:`SOLeaderFollower`).
"""
kinematics: RobotKinematics
motor_names: list[str]
end_effector_step_sizes: dict[str, float]
use_gripper: bool = True
leader_gripper_open: float = LEADER_GRIPPER_OPEN_DEFAULT
leader_gripper_close: float = LEADER_GRIPPER_CLOSE_DEFAULT
teleop_device: Any = None
_initial_follower_joints: np.ndarray | None = field(default=None, init=False, repr=False)
def __call__(self, transition: EnvTransition) -> EnvTransition:
new_transition = transition.copy()
complementary_data = dict(new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) or {})
# Haptic follow: push follower joints to the leader every step (whether
# or not we have a usable leader action this tick). The leader's own
# send_action gates writes on its intervention state.
follower_joints_dict = self._read_follower_joints_dict(transition, complementary_data)
if follower_joints_dict is not None:
self._push_haptic_follow(follower_joints_dict)
leader_joints_dict = complementary_data.get(TELEOP_ACTION_KEY)
if not isinstance(leader_joints_dict, dict):
# Nothing to convert (e.g. teleop disconnected). Leave transition untouched.
return new_transition
if not any(k.endswith(".pos") for k in leader_joints_dict):
# Already in EE-delta form (or unrecognised); skip.
return new_transition
follower_joints = (
_joint_dict_to_array(follower_joints_dict, self.motor_names)
if follower_joints_dict is not None
else None
)
leader_joints = _joint_dict_to_array(leader_joints_dict, self.motor_names)
if follower_joints is None or leader_joints is None:
# Cannot compute delta this step; expose a zero-action so downstream
# InterventionActionProcessorStep does not propagate stale joints.
complementary_data[TELEOP_ACTION_KEY] = self._zero_action()
new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data
return new_transition
p_leader = self.kinematics.forward_kinematics(leader_joints)[:3, 3]
p_follower = self.kinematics.forward_kinematics(follower_joints)[:3, 3]
delta = p_leader - p_follower
delta_norm = np.array(
[
delta[0] / max(self.end_effector_step_sizes.get("x", 1.0), 1e-6),
delta[1] / max(self.end_effector_step_sizes.get("y", 1.0), 1e-6),
delta[2] / max(self.end_effector_step_sizes.get("z", 1.0), 1e-6),
],
dtype=float,
)
delta_norm = np.clip(delta_norm, -1.0, 1.0)
teleop_action: dict[str, float] = {
"delta_x": float(delta_norm[0]),
"delta_y": float(delta_norm[1]),
"delta_z": float(delta_norm[2]),
}
if self.use_gripper:
leader_gripper = float(leader_joints_dict.get(f"{GRIPPER_KEY}.pos", 50.0))
teleop_action[GRIPPER_KEY] = self._discretise_gripper(leader_gripper)
complementary_data[TELEOP_ACTION_KEY] = teleop_action
new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data
return new_transition
def _read_follower_joints_dict(
self, transition: EnvTransition, complementary_data: dict[str, Any]
) -> dict[str, float] | None:
"""Best-effort read of the follower joints from the transition.
Tries (in order):
1. ``complementary_data["raw_joint_positions"]`` (set after env.step).
2. ``transition[OBSERVATION]`` if it is a flat ``"<motor>.pos"`` dict
(this is the convention used by ``step_env_and_process_transition``
when staging an action transition).
Returns the source dict if all expected motors are present, else
``None``. We return the *dict* (not the array) because we want to feed
it back to ``teleop_device.send_action`` for haptic follow.
"""
raw = complementary_data.get(RAW_JOINT_POSITIONS_KEY)
if isinstance(raw, dict) and all(f"{m}.pos" in raw for m in self.motor_names):
return raw # type: ignore[return-value]
observation = transition.get(TransitionKey.OBSERVATION)
if isinstance(observation, dict) and all(f"{m}.pos" in observation for m in self.motor_names):
return observation # type: ignore[return-value]
return None
def _push_haptic_follow(self, follower_joints_dict: dict[str, float]) -> None:
"""Send the follower's joints back to the leader for haptic follow.
Errors are logged once and swallowed -- a failed haptic update must
never break the policy / learner loop.
"""
if self.teleop_device is None:
return
send_action = getattr(self.teleop_device, "send_action", None)
if send_action is None:
return
try:
send_action(follower_joints_dict)
except NotImplementedError:
# Plain SOLeader / unsupported teleop -- silently disable haptic follow.
self.teleop_device = None
except Exception as e: # pragma: no cover - hardware path
logger.warning(f"[LeaderArmInterventionStep] haptic follow failed: {e}")
def _discretise_gripper(self, leader_gripper_pos: float) -> float:
"""Map a leader gripper position in ``[0, 100]`` to ``{0, 1, 2}``."""
if leader_gripper_pos >= self.leader_gripper_open:
return GRIPPER_OPEN
if leader_gripper_pos <= self.leader_gripper_close:
return GRIPPER_CLOSE
return GRIPPER_STAY
def _zero_action(self) -> dict[str, float]:
out: dict[str, float] = {"delta_x": 0.0, "delta_y": 0.0, "delta_z": 0.0}
if self.use_gripper:
out[GRIPPER_KEY] = GRIPPER_STAY
return out
def get_config(self) -> dict[str, Any]:
# `kinematics` and `teleop_device` are runtime objects (not JSON-serializable)
# and are re-injected by `gym_manipulator.make_processors`, so they are
# intentionally omitted from the saved config.
return {
"motor_names": list(self.motor_names),
"end_effector_step_sizes": dict(self.end_effector_step_sizes),
"use_gripper": self.use_gripper,
"leader_gripper_open": self.leader_gripper_open,
"leader_gripper_close": self.leader_gripper_close,
}
def reset(self) -> None:
self._initial_follower_joints = None
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
return features
+32 -5
View File
@@ -39,6 +39,7 @@ from lerobot.processor import (
GymHILAdapterProcessorStep,
ImageCropResizeProcessorStep,
InterventionActionProcessorStep,
LeaderArmInterventionStep,
MapDeltaActionToRobotActionStep,
MapTensorToDeltaActionDictStep,
Numpy2TorchActionProcessorStep,
@@ -481,15 +482,41 @@ def make_processors(
env_pipeline_steps.append(AddBatchDimensionProcessorStep())
env_pipeline_steps.append(DeviceProcessorStep(device=device))
action_pipeline_steps = [
action_pipeline_steps: list = [
AddTeleopActionAsComplimentaryDataStep(teleop_device=teleop_device),
AddTeleopEventsAsInfoStep(teleop_device=teleop_device),
InterventionActionProcessorStep(
use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False,
terminate_on_success=terminate_on_success,
),
]
use_gripper_for_intervention = (
cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False
)
# Leader-arm intervention: convert raw leader joints in `teleop_action`
# into a 4-D EE-delta dict before the override step consumes it. The same
# step also drives haptic follow on the leader (when `teleop_device` is a
# `SOLeaderFollower`) by pushing the follower joints back via send_action.
if (
getattr(cfg.processor, "control_mode", "gamepad") == "leader"
and cfg.processor.inverse_kinematics is not None
and kinematics_solver is not None
):
action_pipeline_steps.append(
LeaderArmInterventionStep(
kinematics=kinematics_solver,
motor_names=motor_names,
end_effector_step_sizes=cfg.processor.inverse_kinematics.end_effector_step_sizes,
teleop_device=teleop_device,
use_gripper=use_gripper_for_intervention,
)
)
action_pipeline_steps.append(
InterventionActionProcessorStep(
use_gripper=use_gripper_for_intervention,
terminate_on_success=terminate_on_success,
)
)
# Replace InverseKinematicsProcessor with new kinematic processors
if cfg.processor.inverse_kinematics is not None and kinematics_solver is not None:
# Add EE bounds and safety processor
@@ -21,6 +21,7 @@ from .config_so_leader import (
SOLeaderTeleopConfig,
)
from .so_leader import SO100Leader, SO101Leader, SOLeader
from .so_leader_follower import SOLeaderFollower
__all__ = [
"SO100Leader",
@@ -29,5 +30,6 @@ __all__ = [
"SO101LeaderConfig",
"SOLeader",
"SOLeaderConfig",
"SOLeaderFollower",
"SOLeaderTeleopConfig",
]
@@ -29,6 +29,18 @@ class SOLeaderConfig:
# Whether to use degrees for angles
use_degrees: bool = True
# When True, the SO leader is wrapped in `SOLeaderFollower`, which adds a
# keyboard listener for HIL-SERL intervention events (SPACE toggles
# intervention, S/R/Q signal success/rerecord/fail) and reports a 4-D
# EE-delta action space via `action_features`. The raw leader joints are
# still returned by `get_action()` and converted downstream by
# `LeaderArmInterventionStep`.
leader_follower_mode: bool = False
# When `leader_follower_mode` is enabled, include the gripper in the
# 4-D delta action space announced by `action_features`.
use_gripper: bool = True
@TeleoperatorConfig.register_subclass("so101_leader")
@TeleoperatorConfig.register_subclass("so100_leader")
@@ -0,0 +1,341 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SO Leader teleoperator extended with HIL-SERL intervention events and haptic follow.
This wrapper around :class:`SOLeader` keeps the underlying joint-reading behaviour
intact (so :func:`get_action` returns ``{"<motor>.pos": float}``) while adding:
* A pynput keyboard listener that toggles intervention with SPACE and emits
``success`` / ``rerecord`` / ``fail`` signals via S / R / Q keys, mirroring
:class:`KeyboardEndEffectorTeleop`.
* A :func:`get_teleop_events` method satisfying the
:class:`HasTeleopEvents` protocol consumed by ``AddTeleopEventsAsInfoStep``.
* An :func:`action_features` override that announces the 4-D
``[delta_x, delta_y, delta_z, gripper]`` space the leader will project into
via :class:`LeaderArmInterventionStep` -- this is what ends up recorded by
``LeRobotDataset`` in HIL-SERL ``record`` mode.
* :func:`send_action` for **haptic follow**: when the human is not intervening,
the leader is torque-enabled and tracks the follower's joint positions so the
user can grab it at any time and seamlessly take over (mirrors the design from
https://github.com/huggingface/lerobot/pull/2596). When intervention is
toggled on, leader torque is disabled so the user can move it freely.
* Lower position-loop gains on :func:`connect` (``P=16, I=0, D=16``) so the
haptic follow does not jerk the user's hand when grabbing the leader.
* Bus-control primitives (:func:`enable_torque`, :func:`disable_torque`,
:func:`write_goal_positions`) and a :func:`smooth_move_to` helper. These
satisfy the ``teleop_has_motor_control`` capability gate in
``examples/hil/hil_utils.py``, so the BC-style HIL data collector
(``examples/hil/hil_data_collection.py``) can drive an SO leader the same way
it drives the OpenArm Mini -- pause / smooth-mirror to follower / take over.
The joint-to-EE-delta conversion does **not** happen here; it is performed by
:class:`LeaderArmInterventionStep` in the action processor pipeline so the
leader stays a pure I/O device.
"""
from __future__ import annotations
import contextlib
import logging
import os
import sys
import time
from typing import Any
import numpy as np
from lerobot.types import RobotAction
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.import_utils import _pynput_available
from ..utils import TeleopEvents
from .config_so_leader import SOLeaderTeleopConfig
from .so_leader import SOLeader
LEADER_FOLLOWER_P_GAIN = 16
LEADER_FOLLOWER_I_GAIN = 0
LEADER_FOLLOWER_D_GAIN = 16
logger = logging.getLogger(__name__)
PYNPUT_AVAILABLE = _pynput_available
keyboard: Any = None
if PYNPUT_AVAILABLE:
try:
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
logger.info("No DISPLAY set. Skipping pynput import for SOLeaderFollower.")
PYNPUT_AVAILABLE = False
else:
from pynput import keyboard # type: ignore[no-redef]
except Exception as e: # pragma: no cover - hardware path
PYNPUT_AVAILABLE = False
logger.info(f"Could not import pynput: {e}")
class SOLeaderFollower(SOLeader):
"""SO leader teleop with intervention/success/rerecord keyboard signals."""
config_class = SOLeaderTeleopConfig
name = "so_leader_follower"
def __init__(self, config: SOLeaderTeleopConfig):
super().__init__(config)
self._is_intervention: bool = False
self._success: bool = False
self._rerecord: bool = False
self._terminate_episode: bool = False
self._listener: Any = None
# Haptic follow state (mirrors `is_intervening` / `leader_torque_enabled`
# in https://github.com/huggingface/lerobot/pull/2596 SO101LeaderFollower).
self._leader_torque_enabled: bool = True
self._last_follower_pos: np.ndarray | None = None
@property
def action_features(self) -> dict[str, Any]:
"""Announce the 4-D EE-delta action space recorded by the dataset.
The leader still produces raw joints from :func:`get_action`; this
property describes what downstream processors emit and what the dataset
layer should reserve in the ``action`` column for HIL-SERL recordings.
"""
names: dict[str, int] = {"delta_x": 0, "delta_y": 1, "delta_z": 2}
shape = (3,)
if getattr(self.config, "use_gripper", True):
names["gripper"] = 3
shape = (4,)
return {"dtype": "float32", "shape": shape, "names": names}
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
super().connect(calibrate=calibrate)
self._configure_leader_follower_gains()
self._start_keyboard_listener()
logger.info(
"[SOLeaderFollower] connected. Press SPACE to toggle intervention, "
"'s' for success, 'r' for re-record, 'q' to terminate."
)
def _configure_leader_follower_gains(self) -> None:
"""Lower position-loop gains so haptic follow does not yank the user.
Mirrors the gains used by the SO101LeaderFollower in PR #2596 — high
default gains make the leader fight the user's hand when they grab it
between interventions.
"""
for motor in self.bus.motors:
try:
self.bus.write("P_Coefficient", motor, LEADER_FOLLOWER_P_GAIN)
self.bus.write("I_Coefficient", motor, LEADER_FOLLOWER_I_GAIN)
self.bus.write("D_Coefficient", motor, LEADER_FOLLOWER_D_GAIN)
except Exception as e: # pragma: no cover - hardware path
logger.warning(f"[SOLeaderFollower] could not set PID gains for '{motor}': {e}")
def _start_keyboard_listener(self) -> None:
if not PYNPUT_AVAILABLE:
logger.info("pynput unavailable; SOLeaderFollower keyboard events disabled.")
return
def on_press(key: Any) -> None:
try:
if key == keyboard.Key.space:
self._is_intervention = not self._is_intervention
logger.info(f"[SOLeaderFollower] intervention -> {self._is_intervention}")
return
char = getattr(key, "char", None)
if char is None:
return
if char == "s":
self._success = True
self._terminate_episode = True
elif char == "r":
self._rerecord = True
self._terminate_episode = True
elif char == "q":
self._terminate_episode = True
except Exception: # nosec B110
# Never let the listener thread crash on weird keys.
pass
self._listener = keyboard.Listener(on_press=on_press)
self._listener.daemon = True
self._listener.start()
def enable_torque(self) -> None:
"""Enable position-loop torque on every motor.
Exposed alongside :func:`disable_torque` and :func:`write_goal_positions`
so this teleop satisfies the ``teleop_has_motor_control`` gate used by
``examples/hil/hil_data_collection.py`` (mirrors the OpenArm Mini API).
Errors are logged and swallowed -- the loop must keep ticking even if a
single haptic update fails.
"""
if not self.is_connected:
return
try:
self.bus.sync_write("Torque_Enable", 1)
self._leader_torque_enabled = True
except Exception as e: # pragma: no cover - hardware path
logger.warning(f"[SOLeaderFollower] could not enable leader torque: {e}")
def disable_torque(self) -> None:
"""Disable position-loop torque so the user can move the leader freely."""
if not self.is_connected:
return
try:
self.bus.sync_write("Torque_Enable", 0)
self._leader_torque_enabled = False
except Exception as e: # pragma: no cover - hardware path
logger.warning(f"[SOLeaderFollower] could not disable leader torque: {e}")
def write_goal_positions(self, positions: dict[str, float]) -> None:
"""Push goal positions to the leader bus (no torque toggling).
Accepts dataset-style keys ``{"<motor>.pos": deg}`` (matches what
:func:`get_action` produces and what :func:`smooth_move_to` and
:func:`send_action` consume) -- bare motor names are also tolerated
for parity with :class:`OpenArmMini.write_goal_positions`.
"""
if not self.is_connected:
return
goal_pos: dict[str, float] = {}
for motor in self.bus.motors:
if f"{motor}.pos" in positions:
goal_pos[motor] = float(positions[f"{motor}.pos"])
elif motor in positions:
goal_pos[motor] = float(positions[motor])
if not goal_pos:
return
try:
self.bus.sync_write("Goal_Position", goal_pos)
except Exception as e: # pragma: no cover - hardware path
logger.warning(f"[SOLeaderFollower] could not push goal position to leader: {e}")
def smooth_move_to(
self,
target_pos: dict[str, float],
duration_s: float = 2.0,
fps: int = 50,
) -> None:
"""Linearly ramp the leader from its current pose to ``target_pos``.
Mirrors the ``teleop_smooth_move_to`` helper from
``examples/hil/hil_utils.py`` so the leader can be safely re-engaged
without yanking the user's hand -- typical use is right after
:func:`connect` (or whenever the leader and follower drift apart, e.g.
on episode reset). Blocks for ``duration_s`` seconds.
"""
if not self.is_connected:
return
steps = max(int(duration_s * fps), 1)
try:
current = self.get_action()
except Exception as e: # pragma: no cover - hardware path
logger.warning(f"[SOLeaderFollower] smooth_move_to could not read current pose: {e}")
return
self.enable_torque()
if not self._leader_torque_enabled:
return
for step in range(steps + 1):
t = step / steps
interp = {}
for key, current_val in current.items():
if key in target_pos:
interp[key] = current_val * (1.0 - t) + float(target_pos[key]) * t
else:
interp[key] = current_val
self.write_goal_positions(interp)
time.sleep(1.0 / fps)
@check_if_not_connected
def get_action(self) -> RobotAction:
# When the user has just toggled into intervention, make sure leader
# torque is OFF so they can move it without fighting the position loop.
if self._is_intervention and self._leader_torque_enabled:
self.disable_torque()
return super().get_action()
def send_action(self, action: dict[str, float]) -> None: # type: ignore[override]
"""Mirror the follower's joint positions on the leader (haptic follow).
This is called every step from the action pipeline (typically by
:class:`LeaderArmInterventionStep`) with the follower's raw joint
positions ``{"<motor>.pos": float}``. While the user is **not**
intervening the leader is torque-enabled and tracks the follower so the
operator can grab it at any time and continue motion smoothly. As soon
as the user toggles intervention on (SPACE), torque is dropped in
:func:`get_action` so the human can move the leader freely.
Args:
action: Dictionary of follower motor positions, ``{motor.pos: deg}``.
"""
if not self.is_connected:
return
try:
self._last_follower_pos = np.array(
[float(action.get(f"{m}.pos", 0.0)) for m in self.bus.motors],
dtype=float,
)
except Exception as e: # pragma: no cover - defensive
logger.warning(f"[SOLeaderFollower] could not extract follower joints: {e}")
return
if self._is_intervention:
return
if not self._leader_torque_enabled:
self.enable_torque()
if not self._leader_torque_enabled:
return
self.write_goal_positions(action)
def get_teleop_events(self) -> dict[TeleopEvents, bool]:
events = {
TeleopEvents.IS_INTERVENTION: self._is_intervention,
TeleopEvents.TERMINATE_EPISODE: self._terminate_episode,
TeleopEvents.SUCCESS: self._success,
TeleopEvents.RERECORD_EPISODE: self._rerecord,
}
# Edge-trigger the episode-control flags so the next read does not
# re-fire the same event for several frames.
self._success = False
self._rerecord = False
self._terminate_episode = False
return events
def reset(self) -> None:
self._is_intervention = False
self._success = False
self._rerecord = False
self._terminate_episode = False
self._leader_torque_enabled = True
self._last_follower_pos = None
@check_if_not_connected
def disconnect(self) -> None:
if self._listener is not None:
with contextlib.suppress(Exception):
self._listener.stop()
self._listener = None
super().disconnect()
+8
View File
@@ -48,10 +48,18 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
return OmxLeader(config)
elif config.type == "so100_leader":
if getattr(config, "leader_follower_mode", False):
from .so_leader import SOLeaderFollower
return SOLeaderFollower(config)
from .so_leader import SO100Leader
return SO100Leader(config)
elif config.type == "so101_leader":
if getattr(config, "leader_follower_mode", False):
from .so_leader import SOLeaderFollower
return SOLeaderFollower(config)
from .so_leader import SO101Leader
return SO101Leader(config)
@@ -0,0 +1,284 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for LeaderArmInterventionStep (placo-free, FK is mocked)."""
from typing import Any
import numpy as np
import pytest
from lerobot.processor.converters import create_transition
from lerobot.processor.leader_follower_processor import (
GRIPPER_CLOSE,
GRIPPER_OPEN,
GRIPPER_STAY,
LeaderArmInterventionStep,
)
from lerobot.types import TransitionKey
MOTOR_NAMES = [
"shoulder_pan",
"shoulder_lift",
"elbow_flex",
"wrist_flex",
"wrist_roll",
]
STEP_SIZES = {"x": 0.025, "y": 0.025, "z": 0.025}
class _FakeKinematics:
"""Minimal stand-in for `RobotKinematics.forward_kinematics`.
Maps a joint vector deterministically to a 4x4 transform whose translation
is `(j[0] * 0.001, j[1] * 0.001, j[2] * 0.001)`. This lets the test drive
arbitrary EE positions by choosing leader / follower joint values without
depending on placo / a URDF.
"""
def forward_kinematics(self, joints: np.ndarray) -> np.ndarray:
t = np.eye(4, dtype=float)
t[:3, 3] = np.asarray(joints[:3], dtype=float) * 1e-3
return t
def _joint_dict(values: list[float]) -> dict[str, float]:
return {f"{name}.pos": v for name, v in zip(MOTOR_NAMES, values, strict=False)}
def _make_step(use_gripper: bool = True, teleop_device: Any = None) -> LeaderArmInterventionStep:
return LeaderArmInterventionStep(
kinematics=_FakeKinematics(), # type: ignore[arg-type]
motor_names=MOTOR_NAMES,
end_effector_step_sizes=STEP_SIZES,
use_gripper=use_gripper,
teleop_device=teleop_device,
)
class _RecordingTeleop:
"""Minimal teleop double that records every send_action call."""
def __init__(self) -> None:
self.calls: list[dict[str, float]] = []
def send_action(self, action: dict[str, float]) -> None:
self.calls.append(dict(action))
class _RaisingTeleop:
"""Teleop double whose send_action raises an unexpected error."""
def __init__(self, exc: Exception) -> None:
self.exc = exc
self.calls = 0
def send_action(self, action: dict[str, float]) -> None:
self.calls += 1
raise self.exc
def _build_transition(
leader_joints: dict[str, float] | None,
follower_joints: dict[str, float] | None,
extra_complementary: dict[str, Any] | None = None,
) -> Any:
complementary: dict[str, Any] = dict(extra_complementary or {})
if leader_joints is not None:
complementary["teleop_action"] = leader_joints
if follower_joints is not None:
complementary["raw_joint_positions"] = follower_joints
return create_transition(complementary_data=complementary)
def test_replaces_teleop_action_with_normalised_ee_delta():
leader = _joint_dict([25.0, 0.0, 0.0, 0.0, 0.0])
leader["gripper.pos"] = 80.0
follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
follower["gripper.pos"] = 30.0
transition = _build_transition(leader, follower)
step = _make_step()
out = step(transition)
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
assert set(teleop_action) == {"delta_x", "delta_y", "delta_z", "gripper"}
# joint 0 differs by +25 -> 0.025 m -> normalised by 0.025 step -> 1.0
assert teleop_action["delta_x"] == pytest.approx(1.0)
assert teleop_action["delta_y"] == pytest.approx(0.0)
assert teleop_action["delta_z"] == pytest.approx(0.0)
# leader gripper 80 >= open threshold 60 -> open command
assert teleop_action["gripper"] == GRIPPER_OPEN
def test_clips_delta_to_unit_box():
leader = _joint_dict([1000.0, -1000.0, 1000.0, 0.0, 0.0])
follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
transition = _build_transition(leader, follower)
out = _make_step(use_gripper=False)(transition)
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
assert "gripper" not in teleop_action
assert teleop_action["delta_x"] == pytest.approx(1.0)
assert teleop_action["delta_y"] == pytest.approx(-1.0)
assert teleop_action["delta_z"] == pytest.approx(1.0)
@pytest.mark.parametrize(
("leader_gripper", "expected"),
[
(10.0, GRIPPER_CLOSE),
(45.0, GRIPPER_STAY),
(90.0, GRIPPER_OPEN),
],
)
def test_gripper_quantisation(leader_gripper: float, expected: float):
leader = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
leader["gripper.pos"] = leader_gripper
follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
follower["gripper.pos"] = 50.0
out = _make_step(use_gripper=True)(_build_transition(leader, follower))
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
assert teleop_action["gripper"] == expected
def test_zero_action_when_follower_joints_missing():
leader = _joint_dict([10.0, 10.0, 10.0, 0.0, 0.0])
leader["gripper.pos"] = 50.0
transition = _build_transition(leader, follower_joints=None)
out = _make_step()(transition)
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
assert teleop_action == {
"delta_x": 0.0,
"delta_y": 0.0,
"delta_z": 0.0,
"gripper": GRIPPER_STAY,
}
def test_passthrough_when_teleop_action_missing():
transition = _build_transition(leader_joints=None, follower_joints=None)
out = _make_step()(transition)
assert "teleop_action" not in out[TransitionKey.COMPLEMENTARY_DATA]
def test_passthrough_when_teleop_action_is_already_delta_dict():
"""Idempotent on dicts that don't look like raw joint reads."""
delta = {"delta_x": 0.5, "delta_y": 0.0, "delta_z": -0.3, "gripper": GRIPPER_OPEN}
follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
transition = _build_transition(delta, follower)
out = _make_step()(transition)
assert out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] == delta
def test_reads_follower_from_observation_when_complementary_missing():
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
leader["gripper.pos"] = 50.0
follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0])
transition = create_transition(
observation=follower,
complementary_data={"teleop_action": leader},
)
out = _make_step()(transition)
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
# delta = (20 - 10) * 1e-3 = 0.01, normalised by 0.025 -> 0.4
assert teleop_action["delta_x"] == pytest.approx(0.4)
# --- haptic follow ----------------------------------------------------------
def test_haptic_follow_pushes_follower_joints_to_teleop_device():
"""When teleop_device is set, follower joints should be sent to it every tick."""
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
leader["gripper.pos"] = 50.0
follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0])
follower["gripper.pos"] = 50.0
teleop = _RecordingTeleop()
step = _make_step(teleop_device=teleop)
step(_build_transition(leader, follower))
assert len(teleop.calls) == 1
assert teleop.calls[0] == follower
def test_haptic_follow_uses_observation_when_complementary_missing():
"""Falls back to OBSERVATION dict for haptic follow when complementary is empty."""
leader = _joint_dict([5.0, 0.0, 0.0, 0.0, 0.0])
leader["gripper.pos"] = 50.0
follower = _joint_dict([3.0, 0.0, 0.0, 0.0, 0.0])
follower["gripper.pos"] = 50.0
teleop = _RecordingTeleop()
transition = create_transition(
observation=follower,
complementary_data={"teleop_action": leader},
)
_make_step(teleop_device=teleop)(transition)
assert teleop.calls == [follower]
def test_haptic_follow_skipped_when_no_follower_joints_available():
"""No follower joints -> no haptic write (don't push stale data)."""
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
leader["gripper.pos"] = 50.0
teleop = _RecordingTeleop()
transition = _build_transition(leader, follower_joints=None)
_make_step(teleop_device=teleop)(transition)
assert teleop.calls == []
def test_haptic_follow_swallows_send_action_errors():
"""A failing teleop.send_action must not abort the action pipeline."""
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
leader["gripper.pos"] = 50.0
follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0])
follower["gripper.pos"] = 50.0
teleop = _RaisingTeleop(RuntimeError("bus comms fail"))
step = _make_step(teleop_device=teleop)
out = step(_build_transition(leader, follower))
assert teleop.calls == 1
# The downstream EE-delta payload must still be produced normally.
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
assert teleop_action["delta_x"] == pytest.approx(0.4)
def test_haptic_follow_disables_when_send_action_not_implemented():
"""Plain leaders (no haptic follow) opt out via NotImplementedError."""
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
leader["gripper.pos"] = 50.0
follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0])
follower["gripper.pos"] = 50.0
teleop = _RaisingTeleop(NotImplementedError("plain leader, no haptic follow"))
step = _make_step(teleop_device=teleop)
step(_build_transition(leader, follower))
# Tick again and confirm the step gave up rather than spamming the device.
step(_build_transition(leader, follower))
assert teleop.calls == 1
assert step.teleop_device is None