mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-14 16:19:45 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7b82a5c381 | |||
| 13418dcd7b | |||
| a3cb9f5317 |
@@ -0,0 +1,170 @@
|
||||
# !/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""SO100 leader / follower teleop with HIL-SERL-style intervention toggle.
|
||||
|
||||
This is a position-only standalone demo of the leader-arm intervention pattern
|
||||
used by the HIL-SERL training stack (see ``lerobot.processor.LeaderArmInterventionStep``
|
||||
and ``lerobot.teleoperators.so_leader.SOLeaderFollower``).
|
||||
|
||||
Behaviour:
|
||||
* **Following mode** (default): The follower is idle, the leader is
|
||||
torque-enabled and haptically tracks the follower's pose. The user can
|
||||
grab the leader at any time without fighting the position loop.
|
||||
* **Intervention mode** (toggled by pressing SPACE): The leader's torque is
|
||||
released, the user moves the leader freely and the follower mirrors the
|
||||
leader's end-effector position via ``[delta_x, delta_y, delta_z]`` deltas,
|
||||
identical to how the real HIL-SERL action pipeline records interventions.
|
||||
|
||||
Keyboard:
|
||||
* ``SPACE`` -- toggle intervention on/off.
|
||||
* ``q`` -- exit the loop cleanly.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
|
||||
from lerobot.teleoperators.so_leader import SOLeaderFollower, SOLeaderTeleopConfig
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
from lerobot.utils.robot_utils import precise_sleep
|
||||
|
||||
FPS = 30
|
||||
|
||||
# Per-axis EE-delta normalization (metres). Same convention as
|
||||
# `LeaderArmInterventionStep`: the normalised delta is `(p_leader - p_follower) / step`,
|
||||
# clipped to [-1, 1]. Keep these small so a single tick is a safe motion.
|
||||
EE_STEP_SIZES = {"x": 0.010, "y": 0.010, "z": 0.010}
|
||||
|
||||
# Workspace bounds (metres) -- a tight box around the resting pose to keep the
|
||||
# follower from running into its joint limits during the demo.
|
||||
EE_BOUNDS = {"min": np.array([-0.20, -0.30, 0.02]), "max": np.array([0.30, 0.30, 0.40])}
|
||||
|
||||
URDF_PATH = "./SO101/so101_new_calib.urdf"
|
||||
TARGET_FRAME = "gripper_frame_link"
|
||||
|
||||
|
||||
def _joints_dict_to_array(joints: dict[str, float], motor_names: list[str]) -> np.ndarray:
|
||||
return np.array([joints[f"{m}.pos"] for m in motor_names], dtype=float)
|
||||
|
||||
|
||||
def _array_to_joints_dict(arr: np.ndarray, motor_names: list[str]) -> dict[str, float]:
|
||||
return {f"{m}.pos": float(v) for m, v in zip(motor_names, arr, strict=True)}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
follower_config = SO100FollowerConfig(
|
||||
port="/dev/tty.usbmodem5A460814411", id="my_follower_arm", use_degrees=True
|
||||
)
|
||||
leader_config = SOLeaderTeleopConfig(
|
||||
port="/dev/tty.usbmodem5A460819811",
|
||||
id="my_leader_arm",
|
||||
use_degrees=True,
|
||||
leader_follower_mode=True,
|
||||
use_gripper=True,
|
||||
)
|
||||
|
||||
follower = SO100Follower(follower_config)
|
||||
leader = SOLeaderFollower(leader_config)
|
||||
|
||||
follower_motor_names = list(follower.bus.motors.keys())
|
||||
leader_motor_names = list(leader.bus.motors.keys())
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
|
||||
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
follower_kinematics = RobotKinematics(
|
||||
urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=follower_motor_names
|
||||
)
|
||||
leader_kinematics = RobotKinematics(
|
||||
urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=leader_motor_names
|
||||
)
|
||||
|
||||
follower.connect()
|
||||
leader.connect()
|
||||
|
||||
print("Starting leader-follower intervention demo...")
|
||||
print(" - Press SPACE to toggle intervention.")
|
||||
print(" - Press 'q' to exit.")
|
||||
|
||||
try:
|
||||
while True:
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# 1. Read both arms.
|
||||
follower_obs = follower.get_observation()
|
||||
follower_joints_dict = {f"{m}.pos": float(follower_obs[f"{m}.pos"]) for m in follower_motor_names}
|
||||
leader_joints_dict = leader.get_action()
|
||||
|
||||
# 2. Haptic follow: push follower joints back to the leader. The
|
||||
# leader's `send_action` gates motor writes on its intervention
|
||||
# state internally (torque on while following, off while intervening).
|
||||
leader.send_action(follower_joints_dict)
|
||||
|
||||
# 3. Pull teleop events (SPACE toggle, 'q' terminate).
|
||||
events = leader.get_teleop_events()
|
||||
if events.get(TeleopEvents.TERMINATE_EPISODE):
|
||||
print("Termination requested -- exiting.")
|
||||
break
|
||||
|
||||
is_intervention = events.get(TeleopEvents.IS_INTERVENTION, False)
|
||||
|
||||
if is_intervention:
|
||||
# 4a. Compute leader/follower EE poses, take the *normalised
|
||||
# position-only delta*, and integrate it onto the follower's
|
||||
# current EE pose to get a target. This mirrors the action
|
||||
# space recorded by `LeaderArmInterventionStep` during HIL-SERL.
|
||||
leader_arr = _joints_dict_to_array(leader_joints_dict, leader_motor_names)
|
||||
follower_arr = _joints_dict_to_array(follower_joints_dict, follower_motor_names)
|
||||
|
||||
p_leader = leader_kinematics.forward_kinematics(leader_arr)[:3, 3]
|
||||
p_follower_mat = follower_kinematics.forward_kinematics(follower_arr)
|
||||
p_follower = p_follower_mat[:3, 3]
|
||||
|
||||
raw_delta = p_leader - p_follower
|
||||
step_vec = np.array([EE_STEP_SIZES["x"], EE_STEP_SIZES["y"], EE_STEP_SIZES["z"]], dtype=float)
|
||||
delta_norm = np.clip(raw_delta / step_vec, -1.0, 1.0)
|
||||
delta_m = delta_norm * step_vec
|
||||
|
||||
target_pose = p_follower_mat.copy()
|
||||
target_pose[:3, 3] = np.clip(p_follower + delta_m, EE_BOUNDS["min"], EE_BOUNDS["max"])
|
||||
|
||||
# IK -> joint-space goal for the follower's arm chain. The
|
||||
# gripper joint is kept separate and driven from the leader's
|
||||
# gripper position directly (no IK).
|
||||
target_joints = follower_kinematics.inverse_kinematics(
|
||||
current_joint_pos=follower_arr,
|
||||
desired_ee_pose=target_pose,
|
||||
orientation_weight=0.0,
|
||||
)
|
||||
follower_action = _array_to_joints_dict(target_joints, follower_motor_names)
|
||||
follower_action["gripper.pos"] = float(leader_joints_dict.get("gripper.pos", 50.0))
|
||||
follower.send_action(follower_action)
|
||||
# 4b. Following mode: leave the follower alone -- the leader just
|
||||
# tracks it haptically. In real HIL-SERL training this is where the
|
||||
# policy would step the follower forward.
|
||||
|
||||
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
|
||||
finally:
|
||||
leader.disconnect()
|
||||
follower.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -61,6 +61,7 @@ from .hil_processor import (
|
||||
RewardClassifierProcessorStep,
|
||||
TimeLimitProcessorStep,
|
||||
)
|
||||
from .leader_follower_processor import LeaderArmInterventionStep
|
||||
from .newline_task_processor import NewLineTaskProcessorStep
|
||||
from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep, hotswap_stats
|
||||
from .observation_processor import VanillaObservationProcessorStep
|
||||
@@ -122,6 +123,7 @@ __all__ = [
|
||||
"ImageCropResizeProcessorStep",
|
||||
"InfoProcessorStep",
|
||||
"InterventionActionProcessorStep",
|
||||
"LeaderArmInterventionStep",
|
||||
"make_default_processors",
|
||||
"make_default_teleop_action_processor",
|
||||
"make_default_robot_action_processor",
|
||||
|
||||
@@ -0,0 +1,270 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Processor step for using a leader arm as the HIL-SERL intervention device.
|
||||
|
||||
Position-only port of the leader/follower control mode (no rotation): the leader
|
||||
arm acts as a 4-D end-effector delta source ``[dx, dy, dz, gripper]`` for the
|
||||
existing ``InterventionActionProcessorStep`` overriding pipeline.
|
||||
|
||||
The teleop_action returned by the leader is a flat dictionary of joint angles
|
||||
(degrees) like ``{"shoulder_pan.pos": ..., ..., "gripper.pos": ...}``. This step
|
||||
converts that into a normalised EE-delta dictionary by:
|
||||
|
||||
1. Running forward kinematics on the leader joints -> ``p_leader`` (xyz, m).
|
||||
2. Running forward kinematics on the follower joints (read from the env
|
||||
transition's observation / complementary data) -> ``p_follower`` (xyz, m).
|
||||
3. Normalising ``p_leader - p_follower`` by ``end_effector_step_sizes`` and
|
||||
clipping to ``[-1, 1]`` (matches the gamepad / keyboard EE convention).
|
||||
4. Mapping the leader gripper position ``[0, 100]`` to the discrete
|
||||
``{0=close, 1=stay, 2=open}`` action used by the SO follower.
|
||||
|
||||
The output is written back to ``complementary_data["teleop_action"]`` so the
|
||||
rest of the action pipeline (``InterventionActionProcessorStep`` ->
|
||||
``MapTensorToDeltaActionDictStep`` -> IK) is unchanged.
|
||||
|
||||
Additionally, when an optional ``teleop_device`` reference is provided, this
|
||||
step also pushes the follower's raw joint positions back to the leader via
|
||||
``teleop_device.send_action(follower_joints)`` every tick. Combined with
|
||||
:class:`SOLeaderFollower.send_action`, this implements the **haptic follow**
|
||||
behaviour from https://github.com/huggingface/lerobot/pull/2596: the leader
|
||||
mimics the follower while the human is hands-off, then drops torque the
|
||||
moment intervention is toggled so the user can grab and steer it.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
from lerobot.configs import PipelineFeatureType, PolicyFeature
|
||||
from lerobot.model import RobotKinematics
|
||||
from lerobot.types import EnvTransition, TransitionKey
|
||||
|
||||
from .pipeline import ProcessorStep, ProcessorStepRegistry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
TELEOP_ACTION_KEY = "teleop_action"
|
||||
RAW_JOINT_POSITIONS_KEY = "raw_joint_positions"
|
||||
GRIPPER_KEY = "gripper"
|
||||
|
||||
# Leader gripper is in [0, 100] when calibrated.
|
||||
LEADER_GRIPPER_OPEN_DEFAULT = 60.0
|
||||
LEADER_GRIPPER_CLOSE_DEFAULT = 30.0
|
||||
|
||||
# Discrete gripper command convention (matches GripperVelocityToJoint).
|
||||
GRIPPER_CLOSE = 0.0
|
||||
GRIPPER_STAY = 1.0
|
||||
GRIPPER_OPEN = 2.0
|
||||
|
||||
|
||||
def _joint_dict_to_array(joint_dict: dict[str, float], motor_names: list[str]) -> np.ndarray | None:
|
||||
"""Pull joint positions in ``motor_names`` order from a ``"<motor>.pos"`` dict.
|
||||
|
||||
Returns ``None`` if any motor is missing.
|
||||
"""
|
||||
out = np.zeros(len(motor_names), dtype=float)
|
||||
for i, name in enumerate(motor_names):
|
||||
v = joint_dict.get(f"{name}.pos")
|
||||
if v is None:
|
||||
return None
|
||||
out[i] = float(v)
|
||||
return out
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("leader_arm_intervention")
|
||||
@dataclass
|
||||
class LeaderArmInterventionStep(ProcessorStep):
|
||||
"""Convert leader joint positions in ``teleop_action`` into a 4-D EE-delta dict.
|
||||
|
||||
This step is intended to run **between** ``AddTeleopActionAsComplimentaryDataStep``
|
||||
(which populates ``complementary_data["teleop_action"]`` with raw leader joint
|
||||
angles) and ``InterventionActionProcessorStep`` (which expects a delta dict).
|
||||
|
||||
Attributes:
|
||||
kinematics: Robot kinematic model shared with the follower; used for FK
|
||||
on both the leader arm and the follower arm. Both arms must use the
|
||||
same URDF joint order.
|
||||
motor_names: Ordered joint names matching ``kinematics.joint_names``,
|
||||
used to slice joint dicts.
|
||||
end_effector_step_sizes: Per-axis normalisation in metres, e.g.
|
||||
``{"x": 0.025, "y": 0.025, "z": 0.025}``. The clamped delta is
|
||||
``(p_leader - p_follower) / step_size``.
|
||||
use_gripper: When ``True``, append a discrete gripper command derived from
|
||||
the leader gripper joint to the output dict.
|
||||
leader_gripper_open: Threshold (>= ) above which the leader gripper is
|
||||
considered ``open`` -> command ``2``.
|
||||
leader_gripper_close: Threshold (<= ) below which the leader gripper is
|
||||
considered ``closed`` -> command ``0``.
|
||||
teleop_device: Optional reference to the leader teleoperator. When set
|
||||
and the device implements ``send_action(action_dict)``, this step
|
||||
pushes the follower's raw joints to it every tick to drive haptic
|
||||
follow. The teleop is responsible for gating actual motor writes on
|
||||
its own intervention state (see :class:`SOLeaderFollower`).
|
||||
"""
|
||||
|
||||
kinematics: RobotKinematics
|
||||
motor_names: list[str]
|
||||
end_effector_step_sizes: dict[str, float]
|
||||
use_gripper: bool = True
|
||||
leader_gripper_open: float = LEADER_GRIPPER_OPEN_DEFAULT
|
||||
leader_gripper_close: float = LEADER_GRIPPER_CLOSE_DEFAULT
|
||||
teleop_device: Any = None
|
||||
|
||||
_initial_follower_joints: np.ndarray | None = field(default=None, init=False, repr=False)
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
new_transition = transition.copy()
|
||||
complementary_data = dict(new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) or {})
|
||||
|
||||
# Haptic follow: push follower joints to the leader every step (whether
|
||||
# or not we have a usable leader action this tick). The leader's own
|
||||
# send_action gates writes on its intervention state.
|
||||
follower_joints_dict = self._read_follower_joints_dict(transition, complementary_data)
|
||||
if follower_joints_dict is not None:
|
||||
self._push_haptic_follow(follower_joints_dict)
|
||||
|
||||
leader_joints_dict = complementary_data.get(TELEOP_ACTION_KEY)
|
||||
if not isinstance(leader_joints_dict, dict):
|
||||
# Nothing to convert (e.g. teleop disconnected). Leave transition untouched.
|
||||
return new_transition
|
||||
|
||||
if not any(k.endswith(".pos") for k in leader_joints_dict):
|
||||
# Already in EE-delta form (or unrecognised); skip.
|
||||
return new_transition
|
||||
|
||||
follower_joints = (
|
||||
_joint_dict_to_array(follower_joints_dict, self.motor_names)
|
||||
if follower_joints_dict is not None
|
||||
else None
|
||||
)
|
||||
leader_joints = _joint_dict_to_array(leader_joints_dict, self.motor_names)
|
||||
|
||||
if follower_joints is None or leader_joints is None:
|
||||
# Cannot compute delta this step; expose a zero-action so downstream
|
||||
# InterventionActionProcessorStep does not propagate stale joints.
|
||||
complementary_data[TELEOP_ACTION_KEY] = self._zero_action()
|
||||
new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data
|
||||
return new_transition
|
||||
|
||||
p_leader = self.kinematics.forward_kinematics(leader_joints)[:3, 3]
|
||||
p_follower = self.kinematics.forward_kinematics(follower_joints)[:3, 3]
|
||||
|
||||
delta = p_leader - p_follower
|
||||
delta_norm = np.array(
|
||||
[
|
||||
delta[0] / max(self.end_effector_step_sizes.get("x", 1.0), 1e-6),
|
||||
delta[1] / max(self.end_effector_step_sizes.get("y", 1.0), 1e-6),
|
||||
delta[2] / max(self.end_effector_step_sizes.get("z", 1.0), 1e-6),
|
||||
],
|
||||
dtype=float,
|
||||
)
|
||||
delta_norm = np.clip(delta_norm, -1.0, 1.0)
|
||||
|
||||
teleop_action: dict[str, float] = {
|
||||
"delta_x": float(delta_norm[0]),
|
||||
"delta_y": float(delta_norm[1]),
|
||||
"delta_z": float(delta_norm[2]),
|
||||
}
|
||||
|
||||
if self.use_gripper:
|
||||
leader_gripper = float(leader_joints_dict.get(f"{GRIPPER_KEY}.pos", 50.0))
|
||||
teleop_action[GRIPPER_KEY] = self._discretise_gripper(leader_gripper)
|
||||
|
||||
complementary_data[TELEOP_ACTION_KEY] = teleop_action
|
||||
new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data
|
||||
return new_transition
|
||||
|
||||
def _read_follower_joints_dict(
|
||||
self, transition: EnvTransition, complementary_data: dict[str, Any]
|
||||
) -> dict[str, float] | None:
|
||||
"""Best-effort read of the follower joints from the transition.
|
||||
|
||||
Tries (in order):
|
||||
1. ``complementary_data["raw_joint_positions"]`` (set after env.step).
|
||||
2. ``transition[OBSERVATION]`` if it is a flat ``"<motor>.pos"`` dict
|
||||
(this is the convention used by ``step_env_and_process_transition``
|
||||
when staging an action transition).
|
||||
|
||||
Returns the source dict if all expected motors are present, else
|
||||
``None``. We return the *dict* (not the array) because we want to feed
|
||||
it back to ``teleop_device.send_action`` for haptic follow.
|
||||
"""
|
||||
raw = complementary_data.get(RAW_JOINT_POSITIONS_KEY)
|
||||
if isinstance(raw, dict) and all(f"{m}.pos" in raw for m in self.motor_names):
|
||||
return raw # type: ignore[return-value]
|
||||
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if isinstance(observation, dict) and all(f"{m}.pos" in observation for m in self.motor_names):
|
||||
return observation # type: ignore[return-value]
|
||||
|
||||
return None
|
||||
|
||||
def _push_haptic_follow(self, follower_joints_dict: dict[str, float]) -> None:
|
||||
"""Send the follower's joints back to the leader for haptic follow.
|
||||
|
||||
Errors are logged once and swallowed -- a failed haptic update must
|
||||
never break the policy / learner loop.
|
||||
"""
|
||||
if self.teleop_device is None:
|
||||
return
|
||||
send_action = getattr(self.teleop_device, "send_action", None)
|
||||
if send_action is None:
|
||||
return
|
||||
try:
|
||||
send_action(follower_joints_dict)
|
||||
except NotImplementedError:
|
||||
# Plain SOLeader / unsupported teleop -- silently disable haptic follow.
|
||||
self.teleop_device = None
|
||||
except Exception as e: # pragma: no cover - hardware path
|
||||
logger.warning(f"[LeaderArmInterventionStep] haptic follow failed: {e}")
|
||||
|
||||
def _discretise_gripper(self, leader_gripper_pos: float) -> float:
|
||||
"""Map a leader gripper position in ``[0, 100]`` to ``{0, 1, 2}``."""
|
||||
if leader_gripper_pos >= self.leader_gripper_open:
|
||||
return GRIPPER_OPEN
|
||||
if leader_gripper_pos <= self.leader_gripper_close:
|
||||
return GRIPPER_CLOSE
|
||||
return GRIPPER_STAY
|
||||
|
||||
def _zero_action(self) -> dict[str, float]:
|
||||
out: dict[str, float] = {"delta_x": 0.0, "delta_y": 0.0, "delta_z": 0.0}
|
||||
if self.use_gripper:
|
||||
out[GRIPPER_KEY] = GRIPPER_STAY
|
||||
return out
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
# `kinematics` and `teleop_device` are runtime objects (not JSON-serializable)
|
||||
# and are re-injected by `gym_manipulator.make_processors`, so they are
|
||||
# intentionally omitted from the saved config.
|
||||
return {
|
||||
"motor_names": list(self.motor_names),
|
||||
"end_effector_step_sizes": dict(self.end_effector_step_sizes),
|
||||
"use_gripper": self.use_gripper,
|
||||
"leader_gripper_open": self.leader_gripper_open,
|
||||
"leader_gripper_close": self.leader_gripper_close,
|
||||
}
|
||||
|
||||
def reset(self) -> None:
|
||||
self._initial_follower_joints = None
|
||||
|
||||
def transform_features(
|
||||
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
|
||||
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
|
||||
return features
|
||||
@@ -39,6 +39,7 @@ from lerobot.processor import (
|
||||
GymHILAdapterProcessorStep,
|
||||
ImageCropResizeProcessorStep,
|
||||
InterventionActionProcessorStep,
|
||||
LeaderArmInterventionStep,
|
||||
MapDeltaActionToRobotActionStep,
|
||||
MapTensorToDeltaActionDictStep,
|
||||
Numpy2TorchActionProcessorStep,
|
||||
@@ -481,15 +482,41 @@ def make_processors(
|
||||
env_pipeline_steps.append(AddBatchDimensionProcessorStep())
|
||||
env_pipeline_steps.append(DeviceProcessorStep(device=device))
|
||||
|
||||
action_pipeline_steps = [
|
||||
action_pipeline_steps: list = [
|
||||
AddTeleopActionAsComplimentaryDataStep(teleop_device=teleop_device),
|
||||
AddTeleopEventsAsInfoStep(teleop_device=teleop_device),
|
||||
InterventionActionProcessorStep(
|
||||
use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False,
|
||||
terminate_on_success=terminate_on_success,
|
||||
),
|
||||
]
|
||||
|
||||
use_gripper_for_intervention = (
|
||||
cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False
|
||||
)
|
||||
|
||||
# Leader-arm intervention: convert raw leader joints in `teleop_action`
|
||||
# into a 4-D EE-delta dict before the override step consumes it. The same
|
||||
# step also drives haptic follow on the leader (when `teleop_device` is a
|
||||
# `SOLeaderFollower`) by pushing the follower joints back via send_action.
|
||||
if (
|
||||
getattr(cfg.processor, "control_mode", "gamepad") == "leader"
|
||||
and cfg.processor.inverse_kinematics is not None
|
||||
and kinematics_solver is not None
|
||||
):
|
||||
action_pipeline_steps.append(
|
||||
LeaderArmInterventionStep(
|
||||
kinematics=kinematics_solver,
|
||||
motor_names=motor_names,
|
||||
end_effector_step_sizes=cfg.processor.inverse_kinematics.end_effector_step_sizes,
|
||||
teleop_device=teleop_device,
|
||||
use_gripper=use_gripper_for_intervention,
|
||||
)
|
||||
)
|
||||
|
||||
action_pipeline_steps.append(
|
||||
InterventionActionProcessorStep(
|
||||
use_gripper=use_gripper_for_intervention,
|
||||
terminate_on_success=terminate_on_success,
|
||||
)
|
||||
)
|
||||
|
||||
# Replace InverseKinematicsProcessor with new kinematic processors
|
||||
if cfg.processor.inverse_kinematics is not None and kinematics_solver is not None:
|
||||
# Add EE bounds and safety processor
|
||||
|
||||
@@ -21,6 +21,7 @@ from .config_so_leader import (
|
||||
SOLeaderTeleopConfig,
|
||||
)
|
||||
from .so_leader import SO100Leader, SO101Leader, SOLeader
|
||||
from .so_leader_follower import SOLeaderFollower
|
||||
|
||||
__all__ = [
|
||||
"SO100Leader",
|
||||
@@ -29,5 +30,6 @@ __all__ = [
|
||||
"SO101LeaderConfig",
|
||||
"SOLeader",
|
||||
"SOLeaderConfig",
|
||||
"SOLeaderFollower",
|
||||
"SOLeaderTeleopConfig",
|
||||
]
|
||||
|
||||
@@ -29,6 +29,18 @@ class SOLeaderConfig:
|
||||
# Whether to use degrees for angles
|
||||
use_degrees: bool = True
|
||||
|
||||
# When True, the SO leader is wrapped in `SOLeaderFollower`, which adds a
|
||||
# keyboard listener for HIL-SERL intervention events (SPACE toggles
|
||||
# intervention, S/R/Q signal success/rerecord/fail) and reports a 4-D
|
||||
# EE-delta action space via `action_features`. The raw leader joints are
|
||||
# still returned by `get_action()` and converted downstream by
|
||||
# `LeaderArmInterventionStep`.
|
||||
leader_follower_mode: bool = False
|
||||
|
||||
# When `leader_follower_mode` is enabled, include the gripper in the
|
||||
# 4-D delta action space announced by `action_features`.
|
||||
use_gripper: bool = True
|
||||
|
||||
|
||||
@TeleoperatorConfig.register_subclass("so101_leader")
|
||||
@TeleoperatorConfig.register_subclass("so100_leader")
|
||||
|
||||
@@ -0,0 +1,341 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
SO Leader teleoperator extended with HIL-SERL intervention events and haptic follow.
|
||||
|
||||
This wrapper around :class:`SOLeader` keeps the underlying joint-reading behaviour
|
||||
intact (so :func:`get_action` returns ``{"<motor>.pos": float}``) while adding:
|
||||
|
||||
* A pynput keyboard listener that toggles intervention with SPACE and emits
|
||||
``success`` / ``rerecord`` / ``fail`` signals via S / R / Q keys, mirroring
|
||||
:class:`KeyboardEndEffectorTeleop`.
|
||||
* A :func:`get_teleop_events` method satisfying the
|
||||
:class:`HasTeleopEvents` protocol consumed by ``AddTeleopEventsAsInfoStep``.
|
||||
* An :func:`action_features` override that announces the 4-D
|
||||
``[delta_x, delta_y, delta_z, gripper]`` space the leader will project into
|
||||
via :class:`LeaderArmInterventionStep` -- this is what ends up recorded by
|
||||
``LeRobotDataset`` in HIL-SERL ``record`` mode.
|
||||
* :func:`send_action` for **haptic follow**: when the human is not intervening,
|
||||
the leader is torque-enabled and tracks the follower's joint positions so the
|
||||
user can grab it at any time and seamlessly take over (mirrors the design from
|
||||
https://github.com/huggingface/lerobot/pull/2596). When intervention is
|
||||
toggled on, leader torque is disabled so the user can move it freely.
|
||||
* Lower position-loop gains on :func:`connect` (``P=16, I=0, D=16``) so the
|
||||
haptic follow does not jerk the user's hand when grabbing the leader.
|
||||
* Bus-control primitives (:func:`enable_torque`, :func:`disable_torque`,
|
||||
:func:`write_goal_positions`) and a :func:`smooth_move_to` helper. These
|
||||
satisfy the ``teleop_has_motor_control`` capability gate in
|
||||
``examples/hil/hil_utils.py``, so the BC-style HIL data collector
|
||||
(``examples/hil/hil_data_collection.py``) can drive an SO leader the same way
|
||||
it drives the OpenArm Mini -- pause / smooth-mirror to follower / take over.
|
||||
|
||||
The joint-to-EE-delta conversion does **not** happen here; it is performed by
|
||||
:class:`LeaderArmInterventionStep` in the action processor pipeline so the
|
||||
leader stays a pure I/O device.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
from lerobot.types import RobotAction
|
||||
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
|
||||
from lerobot.utils.import_utils import _pynput_available
|
||||
|
||||
from ..utils import TeleopEvents
|
||||
from .config_so_leader import SOLeaderTeleopConfig
|
||||
from .so_leader import SOLeader
|
||||
|
||||
LEADER_FOLLOWER_P_GAIN = 16
|
||||
LEADER_FOLLOWER_I_GAIN = 0
|
||||
LEADER_FOLLOWER_D_GAIN = 16
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PYNPUT_AVAILABLE = _pynput_available
|
||||
keyboard: Any = None
|
||||
if PYNPUT_AVAILABLE:
|
||||
try:
|
||||
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
|
||||
logger.info("No DISPLAY set. Skipping pynput import for SOLeaderFollower.")
|
||||
PYNPUT_AVAILABLE = False
|
||||
else:
|
||||
from pynput import keyboard # type: ignore[no-redef]
|
||||
except Exception as e: # pragma: no cover - hardware path
|
||||
PYNPUT_AVAILABLE = False
|
||||
logger.info(f"Could not import pynput: {e}")
|
||||
|
||||
|
||||
class SOLeaderFollower(SOLeader):
|
||||
"""SO leader teleop with intervention/success/rerecord keyboard signals."""
|
||||
|
||||
config_class = SOLeaderTeleopConfig
|
||||
name = "so_leader_follower"
|
||||
|
||||
def __init__(self, config: SOLeaderTeleopConfig):
|
||||
super().__init__(config)
|
||||
|
||||
self._is_intervention: bool = False
|
||||
self._success: bool = False
|
||||
self._rerecord: bool = False
|
||||
self._terminate_episode: bool = False
|
||||
self._listener: Any = None
|
||||
|
||||
# Haptic follow state (mirrors `is_intervening` / `leader_torque_enabled`
|
||||
# in https://github.com/huggingface/lerobot/pull/2596 SO101LeaderFollower).
|
||||
self._leader_torque_enabled: bool = True
|
||||
self._last_follower_pos: np.ndarray | None = None
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, Any]:
|
||||
"""Announce the 4-D EE-delta action space recorded by the dataset.
|
||||
|
||||
The leader still produces raw joints from :func:`get_action`; this
|
||||
property describes what downstream processors emit and what the dataset
|
||||
layer should reserve in the ``action`` column for HIL-SERL recordings.
|
||||
"""
|
||||
names: dict[str, int] = {"delta_x": 0, "delta_y": 1, "delta_z": 2}
|
||||
shape = (3,)
|
||||
if getattr(self.config, "use_gripper", True):
|
||||
names["gripper"] = 3
|
||||
shape = (4,)
|
||||
return {"dtype": "float32", "shape": shape, "names": names}
|
||||
|
||||
@check_if_already_connected
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
super().connect(calibrate=calibrate)
|
||||
self._configure_leader_follower_gains()
|
||||
self._start_keyboard_listener()
|
||||
logger.info(
|
||||
"[SOLeaderFollower] connected. Press SPACE to toggle intervention, "
|
||||
"'s' for success, 'r' for re-record, 'q' to terminate."
|
||||
)
|
||||
|
||||
def _configure_leader_follower_gains(self) -> None:
|
||||
"""Lower position-loop gains so haptic follow does not yank the user.
|
||||
|
||||
Mirrors the gains used by the SO101LeaderFollower in PR #2596 — high
|
||||
default gains make the leader fight the user's hand when they grab it
|
||||
between interventions.
|
||||
"""
|
||||
for motor in self.bus.motors:
|
||||
try:
|
||||
self.bus.write("P_Coefficient", motor, LEADER_FOLLOWER_P_GAIN)
|
||||
self.bus.write("I_Coefficient", motor, LEADER_FOLLOWER_I_GAIN)
|
||||
self.bus.write("D_Coefficient", motor, LEADER_FOLLOWER_D_GAIN)
|
||||
except Exception as e: # pragma: no cover - hardware path
|
||||
logger.warning(f"[SOLeaderFollower] could not set PID gains for '{motor}': {e}")
|
||||
|
||||
def _start_keyboard_listener(self) -> None:
|
||||
if not PYNPUT_AVAILABLE:
|
||||
logger.info("pynput unavailable; SOLeaderFollower keyboard events disabled.")
|
||||
return
|
||||
|
||||
def on_press(key: Any) -> None:
|
||||
try:
|
||||
if key == keyboard.Key.space:
|
||||
self._is_intervention = not self._is_intervention
|
||||
logger.info(f"[SOLeaderFollower] intervention -> {self._is_intervention}")
|
||||
return
|
||||
char = getattr(key, "char", None)
|
||||
if char is None:
|
||||
return
|
||||
if char == "s":
|
||||
self._success = True
|
||||
self._terminate_episode = True
|
||||
elif char == "r":
|
||||
self._rerecord = True
|
||||
self._terminate_episode = True
|
||||
elif char == "q":
|
||||
self._terminate_episode = True
|
||||
except Exception: # nosec B110
|
||||
# Never let the listener thread crash on weird keys.
|
||||
pass
|
||||
|
||||
self._listener = keyboard.Listener(on_press=on_press)
|
||||
self._listener.daemon = True
|
||||
self._listener.start()
|
||||
|
||||
def enable_torque(self) -> None:
|
||||
"""Enable position-loop torque on every motor.
|
||||
|
||||
Exposed alongside :func:`disable_torque` and :func:`write_goal_positions`
|
||||
so this teleop satisfies the ``teleop_has_motor_control`` gate used by
|
||||
``examples/hil/hil_data_collection.py`` (mirrors the OpenArm Mini API).
|
||||
Errors are logged and swallowed -- the loop must keep ticking even if a
|
||||
single haptic update fails.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
return
|
||||
try:
|
||||
self.bus.sync_write("Torque_Enable", 1)
|
||||
self._leader_torque_enabled = True
|
||||
except Exception as e: # pragma: no cover - hardware path
|
||||
logger.warning(f"[SOLeaderFollower] could not enable leader torque: {e}")
|
||||
|
||||
def disable_torque(self) -> None:
|
||||
"""Disable position-loop torque so the user can move the leader freely."""
|
||||
if not self.is_connected:
|
||||
return
|
||||
try:
|
||||
self.bus.sync_write("Torque_Enable", 0)
|
||||
self._leader_torque_enabled = False
|
||||
except Exception as e: # pragma: no cover - hardware path
|
||||
logger.warning(f"[SOLeaderFollower] could not disable leader torque: {e}")
|
||||
|
||||
def write_goal_positions(self, positions: dict[str, float]) -> None:
|
||||
"""Push goal positions to the leader bus (no torque toggling).
|
||||
|
||||
Accepts dataset-style keys ``{"<motor>.pos": deg}`` (matches what
|
||||
:func:`get_action` produces and what :func:`smooth_move_to` and
|
||||
:func:`send_action` consume) -- bare motor names are also tolerated
|
||||
for parity with :class:`OpenArmMini.write_goal_positions`.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
return
|
||||
goal_pos: dict[str, float] = {}
|
||||
for motor in self.bus.motors:
|
||||
if f"{motor}.pos" in positions:
|
||||
goal_pos[motor] = float(positions[f"{motor}.pos"])
|
||||
elif motor in positions:
|
||||
goal_pos[motor] = float(positions[motor])
|
||||
if not goal_pos:
|
||||
return
|
||||
try:
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
except Exception as e: # pragma: no cover - hardware path
|
||||
logger.warning(f"[SOLeaderFollower] could not push goal position to leader: {e}")
|
||||
|
||||
def smooth_move_to(
|
||||
self,
|
||||
target_pos: dict[str, float],
|
||||
duration_s: float = 2.0,
|
||||
fps: int = 50,
|
||||
) -> None:
|
||||
"""Linearly ramp the leader from its current pose to ``target_pos``.
|
||||
|
||||
Mirrors the ``teleop_smooth_move_to`` helper from
|
||||
``examples/hil/hil_utils.py`` so the leader can be safely re-engaged
|
||||
without yanking the user's hand -- typical use is right after
|
||||
:func:`connect` (or whenever the leader and follower drift apart, e.g.
|
||||
on episode reset). Blocks for ``duration_s`` seconds.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
return
|
||||
|
||||
steps = max(int(duration_s * fps), 1)
|
||||
try:
|
||||
current = self.get_action()
|
||||
except Exception as e: # pragma: no cover - hardware path
|
||||
logger.warning(f"[SOLeaderFollower] smooth_move_to could not read current pose: {e}")
|
||||
return
|
||||
|
||||
self.enable_torque()
|
||||
if not self._leader_torque_enabled:
|
||||
return
|
||||
|
||||
for step in range(steps + 1):
|
||||
t = step / steps
|
||||
interp = {}
|
||||
for key, current_val in current.items():
|
||||
if key in target_pos:
|
||||
interp[key] = current_val * (1.0 - t) + float(target_pos[key]) * t
|
||||
else:
|
||||
interp[key] = current_val
|
||||
self.write_goal_positions(interp)
|
||||
time.sleep(1.0 / fps)
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> RobotAction:
|
||||
# When the user has just toggled into intervention, make sure leader
|
||||
# torque is OFF so they can move it without fighting the position loop.
|
||||
if self._is_intervention and self._leader_torque_enabled:
|
||||
self.disable_torque()
|
||||
return super().get_action()
|
||||
|
||||
def send_action(self, action: dict[str, float]) -> None: # type: ignore[override]
|
||||
"""Mirror the follower's joint positions on the leader (haptic follow).
|
||||
|
||||
This is called every step from the action pipeline (typically by
|
||||
:class:`LeaderArmInterventionStep`) with the follower's raw joint
|
||||
positions ``{"<motor>.pos": float}``. While the user is **not**
|
||||
intervening the leader is torque-enabled and tracks the follower so the
|
||||
operator can grab it at any time and continue motion smoothly. As soon
|
||||
as the user toggles intervention on (SPACE), torque is dropped in
|
||||
:func:`get_action` so the human can move the leader freely.
|
||||
|
||||
Args:
|
||||
action: Dictionary of follower motor positions, ``{motor.pos: deg}``.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
return
|
||||
|
||||
try:
|
||||
self._last_follower_pos = np.array(
|
||||
[float(action.get(f"{m}.pos", 0.0)) for m in self.bus.motors],
|
||||
dtype=float,
|
||||
)
|
||||
except Exception as e: # pragma: no cover - defensive
|
||||
logger.warning(f"[SOLeaderFollower] could not extract follower joints: {e}")
|
||||
return
|
||||
|
||||
if self._is_intervention:
|
||||
return
|
||||
|
||||
if not self._leader_torque_enabled:
|
||||
self.enable_torque()
|
||||
if not self._leader_torque_enabled:
|
||||
return
|
||||
|
||||
self.write_goal_positions(action)
|
||||
|
||||
def get_teleop_events(self) -> dict[TeleopEvents, bool]:
|
||||
events = {
|
||||
TeleopEvents.IS_INTERVENTION: self._is_intervention,
|
||||
TeleopEvents.TERMINATE_EPISODE: self._terminate_episode,
|
||||
TeleopEvents.SUCCESS: self._success,
|
||||
TeleopEvents.RERECORD_EPISODE: self._rerecord,
|
||||
}
|
||||
# Edge-trigger the episode-control flags so the next read does not
|
||||
# re-fire the same event for several frames.
|
||||
self._success = False
|
||||
self._rerecord = False
|
||||
self._terminate_episode = False
|
||||
return events
|
||||
|
||||
def reset(self) -> None:
|
||||
self._is_intervention = False
|
||||
self._success = False
|
||||
self._rerecord = False
|
||||
self._terminate_episode = False
|
||||
self._leader_torque_enabled = True
|
||||
self._last_follower_pos = None
|
||||
|
||||
@check_if_not_connected
|
||||
def disconnect(self) -> None:
|
||||
if self._listener is not None:
|
||||
with contextlib.suppress(Exception):
|
||||
self._listener.stop()
|
||||
self._listener = None
|
||||
super().disconnect()
|
||||
@@ -48,10 +48,18 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
|
||||
|
||||
return OmxLeader(config)
|
||||
elif config.type == "so100_leader":
|
||||
if getattr(config, "leader_follower_mode", False):
|
||||
from .so_leader import SOLeaderFollower
|
||||
|
||||
return SOLeaderFollower(config)
|
||||
from .so_leader import SO100Leader
|
||||
|
||||
return SO100Leader(config)
|
||||
elif config.type == "so101_leader":
|
||||
if getattr(config, "leader_follower_mode", False):
|
||||
from .so_leader import SOLeaderFollower
|
||||
|
||||
return SOLeaderFollower(config)
|
||||
from .so_leader import SO101Leader
|
||||
|
||||
return SO101Leader(config)
|
||||
|
||||
@@ -0,0 +1,284 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for LeaderArmInterventionStep (placo-free, FK is mocked)."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from lerobot.processor.converters import create_transition
|
||||
from lerobot.processor.leader_follower_processor import (
|
||||
GRIPPER_CLOSE,
|
||||
GRIPPER_OPEN,
|
||||
GRIPPER_STAY,
|
||||
LeaderArmInterventionStep,
|
||||
)
|
||||
from lerobot.types import TransitionKey
|
||||
|
||||
MOTOR_NAMES = [
|
||||
"shoulder_pan",
|
||||
"shoulder_lift",
|
||||
"elbow_flex",
|
||||
"wrist_flex",
|
||||
"wrist_roll",
|
||||
]
|
||||
STEP_SIZES = {"x": 0.025, "y": 0.025, "z": 0.025}
|
||||
|
||||
|
||||
class _FakeKinematics:
|
||||
"""Minimal stand-in for `RobotKinematics.forward_kinematics`.
|
||||
|
||||
Maps a joint vector deterministically to a 4x4 transform whose translation
|
||||
is `(j[0] * 0.001, j[1] * 0.001, j[2] * 0.001)`. This lets the test drive
|
||||
arbitrary EE positions by choosing leader / follower joint values without
|
||||
depending on placo / a URDF.
|
||||
"""
|
||||
|
||||
def forward_kinematics(self, joints: np.ndarray) -> np.ndarray:
|
||||
t = np.eye(4, dtype=float)
|
||||
t[:3, 3] = np.asarray(joints[:3], dtype=float) * 1e-3
|
||||
return t
|
||||
|
||||
|
||||
def _joint_dict(values: list[float]) -> dict[str, float]:
|
||||
return {f"{name}.pos": v for name, v in zip(MOTOR_NAMES, values, strict=False)}
|
||||
|
||||
|
||||
def _make_step(use_gripper: bool = True, teleop_device: Any = None) -> LeaderArmInterventionStep:
|
||||
return LeaderArmInterventionStep(
|
||||
kinematics=_FakeKinematics(), # type: ignore[arg-type]
|
||||
motor_names=MOTOR_NAMES,
|
||||
end_effector_step_sizes=STEP_SIZES,
|
||||
use_gripper=use_gripper,
|
||||
teleop_device=teleop_device,
|
||||
)
|
||||
|
||||
|
||||
class _RecordingTeleop:
|
||||
"""Minimal teleop double that records every send_action call."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.calls: list[dict[str, float]] = []
|
||||
|
||||
def send_action(self, action: dict[str, float]) -> None:
|
||||
self.calls.append(dict(action))
|
||||
|
||||
|
||||
class _RaisingTeleop:
|
||||
"""Teleop double whose send_action raises an unexpected error."""
|
||||
|
||||
def __init__(self, exc: Exception) -> None:
|
||||
self.exc = exc
|
||||
self.calls = 0
|
||||
|
||||
def send_action(self, action: dict[str, float]) -> None:
|
||||
self.calls += 1
|
||||
raise self.exc
|
||||
|
||||
|
||||
def _build_transition(
|
||||
leader_joints: dict[str, float] | None,
|
||||
follower_joints: dict[str, float] | None,
|
||||
extra_complementary: dict[str, Any] | None = None,
|
||||
) -> Any:
|
||||
complementary: dict[str, Any] = dict(extra_complementary or {})
|
||||
if leader_joints is not None:
|
||||
complementary["teleop_action"] = leader_joints
|
||||
if follower_joints is not None:
|
||||
complementary["raw_joint_positions"] = follower_joints
|
||||
return create_transition(complementary_data=complementary)
|
||||
|
||||
|
||||
def test_replaces_teleop_action_with_normalised_ee_delta():
|
||||
leader = _joint_dict([25.0, 0.0, 0.0, 0.0, 0.0])
|
||||
leader["gripper.pos"] = 80.0
|
||||
follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
|
||||
follower["gripper.pos"] = 30.0
|
||||
|
||||
transition = _build_transition(leader, follower)
|
||||
step = _make_step()
|
||||
out = step(transition)
|
||||
|
||||
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
|
||||
assert set(teleop_action) == {"delta_x", "delta_y", "delta_z", "gripper"}
|
||||
# joint 0 differs by +25 -> 0.025 m -> normalised by 0.025 step -> 1.0
|
||||
assert teleop_action["delta_x"] == pytest.approx(1.0)
|
||||
assert teleop_action["delta_y"] == pytest.approx(0.0)
|
||||
assert teleop_action["delta_z"] == pytest.approx(0.0)
|
||||
# leader gripper 80 >= open threshold 60 -> open command
|
||||
assert teleop_action["gripper"] == GRIPPER_OPEN
|
||||
|
||||
|
||||
def test_clips_delta_to_unit_box():
|
||||
leader = _joint_dict([1000.0, -1000.0, 1000.0, 0.0, 0.0])
|
||||
follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
|
||||
transition = _build_transition(leader, follower)
|
||||
|
||||
out = _make_step(use_gripper=False)(transition)
|
||||
|
||||
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
|
||||
assert "gripper" not in teleop_action
|
||||
assert teleop_action["delta_x"] == pytest.approx(1.0)
|
||||
assert teleop_action["delta_y"] == pytest.approx(-1.0)
|
||||
assert teleop_action["delta_z"] == pytest.approx(1.0)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("leader_gripper", "expected"),
|
||||
[
|
||||
(10.0, GRIPPER_CLOSE),
|
||||
(45.0, GRIPPER_STAY),
|
||||
(90.0, GRIPPER_OPEN),
|
||||
],
|
||||
)
|
||||
def test_gripper_quantisation(leader_gripper: float, expected: float):
|
||||
leader = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
|
||||
leader["gripper.pos"] = leader_gripper
|
||||
follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
|
||||
follower["gripper.pos"] = 50.0
|
||||
|
||||
out = _make_step(use_gripper=True)(_build_transition(leader, follower))
|
||||
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
|
||||
assert teleop_action["gripper"] == expected
|
||||
|
||||
|
||||
def test_zero_action_when_follower_joints_missing():
|
||||
leader = _joint_dict([10.0, 10.0, 10.0, 0.0, 0.0])
|
||||
leader["gripper.pos"] = 50.0
|
||||
transition = _build_transition(leader, follower_joints=None)
|
||||
|
||||
out = _make_step()(transition)
|
||||
|
||||
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
|
||||
assert teleop_action == {
|
||||
"delta_x": 0.0,
|
||||
"delta_y": 0.0,
|
||||
"delta_z": 0.0,
|
||||
"gripper": GRIPPER_STAY,
|
||||
}
|
||||
|
||||
|
||||
def test_passthrough_when_teleop_action_missing():
|
||||
transition = _build_transition(leader_joints=None, follower_joints=None)
|
||||
out = _make_step()(transition)
|
||||
assert "teleop_action" not in out[TransitionKey.COMPLEMENTARY_DATA]
|
||||
|
||||
|
||||
def test_passthrough_when_teleop_action_is_already_delta_dict():
|
||||
"""Idempotent on dicts that don't look like raw joint reads."""
|
||||
delta = {"delta_x": 0.5, "delta_y": 0.0, "delta_z": -0.3, "gripper": GRIPPER_OPEN}
|
||||
follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0])
|
||||
transition = _build_transition(delta, follower)
|
||||
out = _make_step()(transition)
|
||||
assert out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] == delta
|
||||
|
||||
|
||||
def test_reads_follower_from_observation_when_complementary_missing():
|
||||
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
|
||||
leader["gripper.pos"] = 50.0
|
||||
follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0])
|
||||
|
||||
transition = create_transition(
|
||||
observation=follower,
|
||||
complementary_data={"teleop_action": leader},
|
||||
)
|
||||
out = _make_step()(transition)
|
||||
|
||||
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
|
||||
# delta = (20 - 10) * 1e-3 = 0.01, normalised by 0.025 -> 0.4
|
||||
assert teleop_action["delta_x"] == pytest.approx(0.4)
|
||||
|
||||
|
||||
# --- haptic follow ----------------------------------------------------------
|
||||
|
||||
|
||||
def test_haptic_follow_pushes_follower_joints_to_teleop_device():
|
||||
"""When teleop_device is set, follower joints should be sent to it every tick."""
|
||||
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
|
||||
leader["gripper.pos"] = 50.0
|
||||
follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0])
|
||||
follower["gripper.pos"] = 50.0
|
||||
teleop = _RecordingTeleop()
|
||||
|
||||
step = _make_step(teleop_device=teleop)
|
||||
step(_build_transition(leader, follower))
|
||||
|
||||
assert len(teleop.calls) == 1
|
||||
assert teleop.calls[0] == follower
|
||||
|
||||
|
||||
def test_haptic_follow_uses_observation_when_complementary_missing():
|
||||
"""Falls back to OBSERVATION dict for haptic follow when complementary is empty."""
|
||||
leader = _joint_dict([5.0, 0.0, 0.0, 0.0, 0.0])
|
||||
leader["gripper.pos"] = 50.0
|
||||
follower = _joint_dict([3.0, 0.0, 0.0, 0.0, 0.0])
|
||||
follower["gripper.pos"] = 50.0
|
||||
teleop = _RecordingTeleop()
|
||||
|
||||
transition = create_transition(
|
||||
observation=follower,
|
||||
complementary_data={"teleop_action": leader},
|
||||
)
|
||||
_make_step(teleop_device=teleop)(transition)
|
||||
|
||||
assert teleop.calls == [follower]
|
||||
|
||||
|
||||
def test_haptic_follow_skipped_when_no_follower_joints_available():
|
||||
"""No follower joints -> no haptic write (don't push stale data)."""
|
||||
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
|
||||
leader["gripper.pos"] = 50.0
|
||||
teleop = _RecordingTeleop()
|
||||
|
||||
transition = _build_transition(leader, follower_joints=None)
|
||||
_make_step(teleop_device=teleop)(transition)
|
||||
|
||||
assert teleop.calls == []
|
||||
|
||||
|
||||
def test_haptic_follow_swallows_send_action_errors():
|
||||
"""A failing teleop.send_action must not abort the action pipeline."""
|
||||
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
|
||||
leader["gripper.pos"] = 50.0
|
||||
follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0])
|
||||
follower["gripper.pos"] = 50.0
|
||||
teleop = _RaisingTeleop(RuntimeError("bus comms fail"))
|
||||
|
||||
step = _make_step(teleop_device=teleop)
|
||||
out = step(_build_transition(leader, follower))
|
||||
|
||||
assert teleop.calls == 1
|
||||
# The downstream EE-delta payload must still be produced normally.
|
||||
teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
|
||||
assert teleop_action["delta_x"] == pytest.approx(0.4)
|
||||
|
||||
|
||||
def test_haptic_follow_disables_when_send_action_not_implemented():
|
||||
"""Plain leaders (no haptic follow) opt out via NotImplementedError."""
|
||||
leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0])
|
||||
leader["gripper.pos"] = 50.0
|
||||
follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0])
|
||||
follower["gripper.pos"] = 50.0
|
||||
teleop = _RaisingTeleop(NotImplementedError("plain leader, no haptic follow"))
|
||||
|
||||
step = _make_step(teleop_device=teleop)
|
||||
step(_build_transition(leader, follower))
|
||||
# Tick again and confirm the step gave up rather than spamming the device.
|
||||
step(_build_transition(leader, follower))
|
||||
|
||||
assert teleop.calls == 1
|
||||
assert step.teleop_device is None
|
||||
Reference in New Issue
Block a user