From a3cb9f5317ec0e61d58777083ce44cb21bd48cbe Mon Sep 17 00:00:00 2001 From: Khalil Meftah Date: Mon, 27 Apr 2026 17:26:29 +0200 Subject: [PATCH] feat(rl): leader arm as HIL-SERL intervention device (position-only) --- src/lerobot/processor/__init__.py | 2 + .../processor/leader_follower_processor.py | 220 ++++++++++++++++++ src/lerobot/rl/gym_manipulator.py | 34 ++- .../teleoperators/so_leader/__init__.py | 2 + .../so_leader/config_so_leader.py | 12 + .../so_leader/so_leader_follower.py | 167 +++++++++++++ src/lerobot/teleoperators/utils.py | 8 + .../test_leader_follower_processor.py | 180 ++++++++++++++ 8 files changed, 620 insertions(+), 5 deletions(-) create mode 100644 src/lerobot/processor/leader_follower_processor.py create mode 100644 src/lerobot/teleoperators/so_leader/so_leader_follower.py create mode 100644 tests/processor/test_leader_follower_processor.py diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 3688a4b8c..ae02a9bf1 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -61,6 +61,7 @@ from .hil_processor import ( RewardClassifierProcessorStep, TimeLimitProcessorStep, ) +from .leader_follower_processor import LeaderArmInterventionStep from .newline_task_processor import NewLineTaskProcessorStep from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep, hotswap_stats from .observation_processor import VanillaObservationProcessorStep @@ -122,6 +123,7 @@ __all__ = [ "ImageCropResizeProcessorStep", "InfoProcessorStep", "InterventionActionProcessorStep", + "LeaderArmInterventionStep", "make_default_processors", "make_default_teleop_action_processor", "make_default_robot_action_processor", diff --git a/src/lerobot/processor/leader_follower_processor.py b/src/lerobot/processor/leader_follower_processor.py new file mode 100644 index 000000000..4e1b155c7 --- /dev/null +++ b/src/lerobot/processor/leader_follower_processor.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Processor step for using a leader arm as the HIL-SERL intervention device. + +Position-only port of the leader/follower control mode (no rotation): the leader +arm acts as a 4-D end-effector delta source ``[dx, dy, dz, gripper]`` for the +existing ``InterventionActionProcessorStep`` overriding pipeline. + +The teleop_action returned by the leader is a flat dictionary of joint angles +(degrees) like ``{"shoulder_pan.pos": ..., ..., "gripper.pos": ...}``. This step +converts that into a normalised EE-delta dictionary by: + +1. Running forward kinematics on the leader joints -> ``p_leader`` (xyz, m). +2. Running forward kinematics on the follower joints (read from the env + transition's observation / complementary data) -> ``p_follower`` (xyz, m). +3. Normalising ``p_leader - p_follower`` by ``end_effector_step_sizes`` and + clipping to ``[-1, 1]`` (matches the gamepad / keyboard EE convention). +4. Mapping the leader gripper position ``[0, 100]`` to the discrete + ``{0=close, 1=stay, 2=open}`` action used by the SO follower. + +The output is written back to ``complementary_data["teleop_action"]`` so the +rest of the action pipeline (``InterventionActionProcessorStep`` -> +``MapTensorToDeltaActionDictStep`` -> IK) is unchanged. +""" + +from dataclasses import dataclass, field +from typing import Any + +import numpy as np + +from lerobot.configs import PipelineFeatureType, PolicyFeature +from lerobot.model import RobotKinematics +from lerobot.types import EnvTransition, TransitionKey + +from .pipeline import ProcessorStep, ProcessorStepRegistry + +TELEOP_ACTION_KEY = "teleop_action" +RAW_JOINT_POSITIONS_KEY = "raw_joint_positions" +GRIPPER_KEY = "gripper" + +# Leader gripper is in [0, 100] when calibrated. +LEADER_GRIPPER_OPEN_DEFAULT = 60.0 +LEADER_GRIPPER_CLOSE_DEFAULT = 30.0 + +# Discrete gripper command convention (matches GripperVelocityToJoint). +GRIPPER_CLOSE = 0.0 +GRIPPER_STAY = 1.0 +GRIPPER_OPEN = 2.0 + + +def _joint_dict_to_array(joint_dict: dict[str, float], motor_names: list[str]) -> np.ndarray | None: + """Pull joint positions in ``motor_names`` order from a ``".pos"`` dict. + + Returns ``None`` if any motor is missing. + """ + out = np.zeros(len(motor_names), dtype=float) + for i, name in enumerate(motor_names): + v = joint_dict.get(f"{name}.pos") + if v is None: + return None + out[i] = float(v) + return out + + +@ProcessorStepRegistry.register("leader_arm_intervention") +@dataclass +class LeaderArmInterventionStep(ProcessorStep): + """Convert leader joint positions in ``teleop_action`` into a 4-D EE-delta dict. + + This step is intended to run **between** ``AddTeleopActionAsComplimentaryDataStep`` + (which populates ``complementary_data["teleop_action"]`` with raw leader joint + angles) and ``InterventionActionProcessorStep`` (which expects a delta dict). + + Attributes: + kinematics: Robot kinematic model shared with the follower; used for FK + on both the leader arm and the follower arm. Both arms must use the + same URDF joint order. + motor_names: Ordered joint names matching ``kinematics.joint_names``, + used to slice joint dicts. + end_effector_step_sizes: Per-axis normalisation in metres, e.g. + ``{"x": 0.025, "y": 0.025, "z": 0.025}``. The clamped delta is + ``(p_leader - p_follower) / step_size``. + use_gripper: When ``True``, append a discrete gripper command derived from + the leader gripper joint to the output dict. + leader_gripper_open: Threshold (>= ) above which the leader gripper is + considered ``open`` -> command ``2``. + leader_gripper_close: Threshold (<= ) below which the leader gripper is + considered ``closed`` -> command ``0``. + """ + + kinematics: RobotKinematics + motor_names: list[str] + end_effector_step_sizes: dict[str, float] + use_gripper: bool = True + leader_gripper_open: float = LEADER_GRIPPER_OPEN_DEFAULT + leader_gripper_close: float = LEADER_GRIPPER_CLOSE_DEFAULT + + _initial_follower_joints: np.ndarray | None = field(default=None, init=False, repr=False) + + def __call__(self, transition: EnvTransition) -> EnvTransition: + new_transition = transition.copy() + complementary_data = dict(new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) or {}) + + leader_joints_dict = complementary_data.get(TELEOP_ACTION_KEY) + if not isinstance(leader_joints_dict, dict): + # Nothing to convert (e.g. teleop disconnected). Leave transition untouched. + return new_transition + + if not any(k.endswith(".pos") for k in leader_joints_dict): + # Already in EE-delta form (or unrecognised); skip. + return new_transition + + follower_joints = self._read_follower_joints(transition, complementary_data) + leader_joints = _joint_dict_to_array(leader_joints_dict, self.motor_names) + + if follower_joints is None or leader_joints is None: + # Cannot compute delta this step; expose a zero-action so downstream + # InterventionActionProcessorStep does not propagate stale joints. + complementary_data[TELEOP_ACTION_KEY] = self._zero_action() + new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data + return new_transition + + p_leader = self.kinematics.forward_kinematics(leader_joints)[:3, 3] + p_follower = self.kinematics.forward_kinematics(follower_joints)[:3, 3] + + delta = p_leader - p_follower + delta_norm = np.array( + [ + delta[0] / max(self.end_effector_step_sizes.get("x", 1.0), 1e-6), + delta[1] / max(self.end_effector_step_sizes.get("y", 1.0), 1e-6), + delta[2] / max(self.end_effector_step_sizes.get("z", 1.0), 1e-6), + ], + dtype=float, + ) + delta_norm = np.clip(delta_norm, -1.0, 1.0) + + teleop_action: dict[str, float] = { + "delta_x": float(delta_norm[0]), + "delta_y": float(delta_norm[1]), + "delta_z": float(delta_norm[2]), + } + + if self.use_gripper: + leader_gripper = float(leader_joints_dict.get(f"{GRIPPER_KEY}.pos", 50.0)) + teleop_action[GRIPPER_KEY] = self._discretise_gripper(leader_gripper) + + complementary_data[TELEOP_ACTION_KEY] = teleop_action + new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data + return new_transition + + def _read_follower_joints( + self, transition: EnvTransition, complementary_data: dict[str, Any] + ) -> np.ndarray | None: + """Best-effort read of the follower joints from the transition. + + Tries (in order): + 1. ``complementary_data["raw_joint_positions"]`` (set after env.step). + 2. ``transition[OBSERVATION]`` if it is a flat ``".pos"`` dict + (this is the convention used by ``step_env_and_process_transition`` + when staging an action transition). + """ + raw = complementary_data.get(RAW_JOINT_POSITIONS_KEY) + if isinstance(raw, dict): + arr = _joint_dict_to_array(raw, self.motor_names) + if arr is not None: + return arr + + observation = transition.get(TransitionKey.OBSERVATION) + if isinstance(observation, dict): + arr = _joint_dict_to_array(observation, self.motor_names) + if arr is not None: + return arr + + return None + + def _discretise_gripper(self, leader_gripper_pos: float) -> float: + """Map a leader gripper position in ``[0, 100]`` to ``{0, 1, 2}``.""" + if leader_gripper_pos >= self.leader_gripper_open: + return GRIPPER_OPEN + if leader_gripper_pos <= self.leader_gripper_close: + return GRIPPER_CLOSE + return GRIPPER_STAY + + def _zero_action(self) -> dict[str, float]: + out: dict[str, float] = {"delta_x": 0.0, "delta_y": 0.0, "delta_z": 0.0} + if self.use_gripper: + out[GRIPPER_KEY] = GRIPPER_STAY + return out + + def get_config(self) -> dict[str, Any]: + return { + "motor_names": list(self.motor_names), + "end_effector_step_sizes": dict(self.end_effector_step_sizes), + "use_gripper": self.use_gripper, + "leader_gripper_open": self.leader_gripper_open, + "leader_gripper_close": self.leader_gripper_close, + } + + def reset(self) -> None: + self._initial_follower_joints = None + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + return features diff --git a/src/lerobot/rl/gym_manipulator.py b/src/lerobot/rl/gym_manipulator.py index bcf937e46..245992878 100644 --- a/src/lerobot/rl/gym_manipulator.py +++ b/src/lerobot/rl/gym_manipulator.py @@ -39,6 +39,7 @@ from lerobot.processor import ( GymHILAdapterProcessorStep, ImageCropResizeProcessorStep, InterventionActionProcessorStep, + LeaderArmInterventionStep, MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep, Numpy2TorchActionProcessorStep, @@ -481,15 +482,38 @@ def make_processors( env_pipeline_steps.append(AddBatchDimensionProcessorStep()) env_pipeline_steps.append(DeviceProcessorStep(device=device)) - action_pipeline_steps = [ + action_pipeline_steps: list = [ AddTeleopActionAsComplimentaryDataStep(teleop_device=teleop_device), AddTeleopEventsAsInfoStep(teleop_device=teleop_device), - InterventionActionProcessorStep( - use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False, - terminate_on_success=terminate_on_success, - ), ] + use_gripper_for_intervention = ( + cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False + ) + + # Leader-arm intervention: convert raw leader joints in `teleop_action` + # into a 4-D EE-delta dict before the override step consumes it. + if ( + getattr(cfg.processor, "control_mode", "gamepad") == "leader" + and cfg.processor.inverse_kinematics is not None + and kinematics_solver is not None + ): + action_pipeline_steps.append( + LeaderArmInterventionStep( + kinematics=kinematics_solver, + motor_names=motor_names, + end_effector_step_sizes=cfg.processor.inverse_kinematics.end_effector_step_sizes, + use_gripper=use_gripper_for_intervention, + ) + ) + + action_pipeline_steps.append( + InterventionActionProcessorStep( + use_gripper=use_gripper_for_intervention, + terminate_on_success=terminate_on_success, + ) + ) + # Replace InverseKinematicsProcessor with new kinematic processors if cfg.processor.inverse_kinematics is not None and kinematics_solver is not None: # Add EE bounds and safety processor diff --git a/src/lerobot/teleoperators/so_leader/__init__.py b/src/lerobot/teleoperators/so_leader/__init__.py index 26ef66677..17793267d 100644 --- a/src/lerobot/teleoperators/so_leader/__init__.py +++ b/src/lerobot/teleoperators/so_leader/__init__.py @@ -21,6 +21,7 @@ from .config_so_leader import ( SOLeaderTeleopConfig, ) from .so_leader import SO100Leader, SO101Leader, SOLeader +from .so_leader_follower import SOLeaderFollower __all__ = [ "SO100Leader", @@ -29,5 +30,6 @@ __all__ = [ "SO101LeaderConfig", "SOLeader", "SOLeaderConfig", + "SOLeaderFollower", "SOLeaderTeleopConfig", ] diff --git a/src/lerobot/teleoperators/so_leader/config_so_leader.py b/src/lerobot/teleoperators/so_leader/config_so_leader.py index 189303088..1a9361307 100644 --- a/src/lerobot/teleoperators/so_leader/config_so_leader.py +++ b/src/lerobot/teleoperators/so_leader/config_so_leader.py @@ -29,6 +29,18 @@ class SOLeaderConfig: # Whether to use degrees for angles use_degrees: bool = True + # When True, the SO leader is wrapped in `SOLeaderFollower`, which adds a + # keyboard listener for HIL-SERL intervention events (SPACE toggles + # intervention, S/R/Q signal success/rerecord/fail) and reports a 4-D + # EE-delta action space via `action_features`. The raw leader joints are + # still returned by `get_action()` and converted downstream by + # `LeaderArmInterventionStep`. + leader_follower_mode: bool = False + + # When `leader_follower_mode` is enabled, include the gripper in the + # 4-D delta action space announced by `action_features`. + use_gripper: bool = True + @TeleoperatorConfig.register_subclass("so101_leader") @TeleoperatorConfig.register_subclass("so100_leader") diff --git a/src/lerobot/teleoperators/so_leader/so_leader_follower.py b/src/lerobot/teleoperators/so_leader/so_leader_follower.py new file mode 100644 index 000000000..0f228e3f7 --- /dev/null +++ b/src/lerobot/teleoperators/so_leader/so_leader_follower.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +SO Leader teleoperator extended with HIL-SERL intervention events. + +This thin wrapper around :class:`SOLeader` keeps the underlying joint-reading +behaviour intact (so :func:`get_action` returns ``{".pos": float}``) +while adding: + +* A pynput keyboard listener that toggles intervention with SPACE and emits + ``success`` / ``rerecord`` / ``fail`` signals via S / R / Q keys, mirroring + :class:`KeyboardEndEffectorTeleop`. +* A :func:`get_teleop_events` method satisfying the + :class:`HasTeleopEvents` protocol consumed by ``AddTeleopEventsAsInfoStep``. +* An :func:`action_features` override that announces the 4-D + ``[delta_x, delta_y, delta_z, gripper]`` space the leader will project into + via :class:`LeaderArmInterventionStep` -- this is what ends up recorded by + ``LeRobotDataset`` in HIL-SERL ``record`` mode. + +The actual joint-to-EE-delta conversion does **not** happen here; it is +performed by :class:`LeaderArmInterventionStep` in the action processor +pipeline so the leader stays a pure I/O device. +""" + +from __future__ import annotations + +import contextlib +import logging +import os +import sys +from typing import Any + +from lerobot.types import RobotAction +from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.import_utils import _pynput_available + +from ..utils import TeleopEvents +from .config_so_leader import SOLeaderTeleopConfig +from .so_leader import SOLeader + +logger = logging.getLogger(__name__) + +PYNPUT_AVAILABLE = _pynput_available +keyboard: Any = None +if PYNPUT_AVAILABLE: + try: + if ("DISPLAY" not in os.environ) and ("linux" in sys.platform): + logger.info("No DISPLAY set. Skipping pynput import for SOLeaderFollower.") + PYNPUT_AVAILABLE = False + else: + from pynput import keyboard # type: ignore[no-redef] + except Exception as e: # pragma: no cover - hardware path + PYNPUT_AVAILABLE = False + logger.info(f"Could not import pynput: {e}") + + +class SOLeaderFollower(SOLeader): + """SO leader teleop with intervention/success/rerecord keyboard signals.""" + + config_class = SOLeaderTeleopConfig + name = "so_leader_follower" + + def __init__(self, config: SOLeaderTeleopConfig): + super().__init__(config) + + self._is_intervention: bool = False + self._success: bool = False + self._rerecord: bool = False + self._terminate_episode: bool = False + self._listener: Any = None + + @property + def action_features(self) -> dict[str, Any]: + """Announce the 4-D EE-delta action space recorded by the dataset. + + The leader still produces raw joints from :func:`get_action`; this + property describes what downstream processors emit and what the dataset + layer should reserve in the ``action`` column for HIL-SERL recordings. + """ + names: dict[str, int] = {"delta_x": 0, "delta_y": 1, "delta_z": 2} + shape = (3,) + if getattr(self.config, "use_gripper", True): + names["gripper"] = 3 + shape = (4,) + return {"dtype": "float32", "shape": shape, "names": names} + + @check_if_already_connected + def connect(self, calibrate: bool = True) -> None: + super().connect(calibrate=calibrate) + self._start_keyboard_listener() + + def _start_keyboard_listener(self) -> None: + if not PYNPUT_AVAILABLE: + logger.info("pynput unavailable; SOLeaderFollower keyboard events disabled.") + return + + def on_press(key: Any) -> None: + try: + if key == keyboard.Key.space: + self._is_intervention = not self._is_intervention + logger.info(f"[SOLeaderFollower] intervention -> {self._is_intervention}") + return + char = getattr(key, "char", None) + if char is None: + return + if char == "s": + self._success = True + self._terminate_episode = True + elif char == "r": + self._rerecord = True + self._terminate_episode = True + elif char == "q": + self._terminate_episode = True + except Exception: # nosec B110 + # Never let the listener thread crash on weird keys. + pass + + self._listener = keyboard.Listener(on_press=on_press) + self._listener.daemon = True + self._listener.start() + + @check_if_not_connected + def get_action(self) -> RobotAction: + # Reuse the SOLeader joint read so we still expose the leader pose. + return super().get_action() + + def get_teleop_events(self) -> dict[TeleopEvents, bool]: + events = { + TeleopEvents.IS_INTERVENTION: self._is_intervention, + TeleopEvents.TERMINATE_EPISODE: self._terminate_episode, + TeleopEvents.SUCCESS: self._success, + TeleopEvents.RERECORD_EPISODE: self._rerecord, + } + # Edge-trigger the episode-control flags so the next read does not + # re-fire the same event for several frames. + self._success = False + self._rerecord = False + self._terminate_episode = False + return events + + def reset(self) -> None: + self._is_intervention = False + self._success = False + self._rerecord = False + self._terminate_episode = False + + @check_if_not_connected + def disconnect(self) -> None: + if self._listener is not None: + with contextlib.suppress(Exception): + self._listener.stop() + self._listener = None + super().disconnect() diff --git a/src/lerobot/teleoperators/utils.py b/src/lerobot/teleoperators/utils.py index db685f396..f8025fa4f 100644 --- a/src/lerobot/teleoperators/utils.py +++ b/src/lerobot/teleoperators/utils.py @@ -48,10 +48,18 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator": return OmxLeader(config) elif config.type == "so100_leader": + if getattr(config, "leader_follower_mode", False): + from .so_leader import SOLeaderFollower + + return SOLeaderFollower(config) from .so_leader import SO100Leader return SO100Leader(config) elif config.type == "so101_leader": + if getattr(config, "leader_follower_mode", False): + from .so_leader import SOLeaderFollower + + return SOLeaderFollower(config) from .so_leader import SO101Leader return SO101Leader(config) diff --git a/tests/processor/test_leader_follower_processor.py b/tests/processor/test_leader_follower_processor.py new file mode 100644 index 000000000..9f9e0e845 --- /dev/null +++ b/tests/processor/test_leader_follower_processor.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for LeaderArmInterventionStep (placo-free, FK is mocked).""" + +from typing import Any + +import numpy as np +import pytest + +from lerobot.processor.converters import create_transition +from lerobot.processor.leader_follower_processor import ( + GRIPPER_CLOSE, + GRIPPER_OPEN, + GRIPPER_STAY, + LeaderArmInterventionStep, +) +from lerobot.types import TransitionKey + +MOTOR_NAMES = [ + "shoulder_pan", + "shoulder_lift", + "elbow_flex", + "wrist_flex", + "wrist_roll", +] +STEP_SIZES = {"x": 0.025, "y": 0.025, "z": 0.025} + + +class _FakeKinematics: + """Minimal stand-in for `RobotKinematics.forward_kinematics`. + + Maps a joint vector deterministically to a 4x4 transform whose translation + is `(j[0] * 0.001, j[1] * 0.001, j[2] * 0.001)`. This lets the test drive + arbitrary EE positions by choosing leader / follower joint values without + depending on placo / a URDF. + """ + + def forward_kinematics(self, joints: np.ndarray) -> np.ndarray: + t = np.eye(4, dtype=float) + t[:3, 3] = np.asarray(joints[:3], dtype=float) * 1e-3 + return t + + +def _joint_dict(values: list[float]) -> dict[str, float]: + return {f"{name}.pos": v for name, v in zip(MOTOR_NAMES, values, strict=False)} + + +def _make_step(use_gripper: bool = True) -> LeaderArmInterventionStep: + return LeaderArmInterventionStep( + kinematics=_FakeKinematics(), # type: ignore[arg-type] + motor_names=MOTOR_NAMES, + end_effector_step_sizes=STEP_SIZES, + use_gripper=use_gripper, + ) + + +def _build_transition( + leader_joints: dict[str, float] | None, + follower_joints: dict[str, float] | None, + extra_complementary: dict[str, Any] | None = None, +) -> Any: + complementary: dict[str, Any] = dict(extra_complementary or {}) + if leader_joints is not None: + complementary["teleop_action"] = leader_joints + if follower_joints is not None: + complementary["raw_joint_positions"] = follower_joints + return create_transition(complementary_data=complementary) + + +def test_replaces_teleop_action_with_normalised_ee_delta(): + leader = _joint_dict([25.0, 0.0, 0.0, 0.0, 0.0]) + leader["gripper.pos"] = 80.0 + follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0]) + follower["gripper.pos"] = 30.0 + + transition = _build_transition(leader, follower) + step = _make_step() + out = step(transition) + + teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] + assert set(teleop_action) == {"delta_x", "delta_y", "delta_z", "gripper"} + # joint 0 differs by +25 -> 0.025 m -> normalised by 0.025 step -> 1.0 + assert teleop_action["delta_x"] == pytest.approx(1.0) + assert teleop_action["delta_y"] == pytest.approx(0.0) + assert teleop_action["delta_z"] == pytest.approx(0.0) + # leader gripper 80 >= open threshold 60 -> open command + assert teleop_action["gripper"] == GRIPPER_OPEN + + +def test_clips_delta_to_unit_box(): + leader = _joint_dict([1000.0, -1000.0, 1000.0, 0.0, 0.0]) + follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0]) + transition = _build_transition(leader, follower) + + out = _make_step(use_gripper=False)(transition) + + teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] + assert "gripper" not in teleop_action + assert teleop_action["delta_x"] == pytest.approx(1.0) + assert teleop_action["delta_y"] == pytest.approx(-1.0) + assert teleop_action["delta_z"] == pytest.approx(1.0) + + +@pytest.mark.parametrize( + ("leader_gripper", "expected"), + [ + (10.0, GRIPPER_CLOSE), + (45.0, GRIPPER_STAY), + (90.0, GRIPPER_OPEN), + ], +) +def test_gripper_quantisation(leader_gripper: float, expected: float): + leader = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0]) + leader["gripper.pos"] = leader_gripper + follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0]) + follower["gripper.pos"] = 50.0 + + out = _make_step(use_gripper=True)(_build_transition(leader, follower)) + teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] + assert teleop_action["gripper"] == expected + + +def test_zero_action_when_follower_joints_missing(): + leader = _joint_dict([10.0, 10.0, 10.0, 0.0, 0.0]) + leader["gripper.pos"] = 50.0 + transition = _build_transition(leader, follower_joints=None) + + out = _make_step()(transition) + + teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] + assert teleop_action == { + "delta_x": 0.0, + "delta_y": 0.0, + "delta_z": 0.0, + "gripper": GRIPPER_STAY, + } + + +def test_passthrough_when_teleop_action_missing(): + transition = _build_transition(leader_joints=None, follower_joints=None) + out = _make_step()(transition) + assert "teleop_action" not in out[TransitionKey.COMPLEMENTARY_DATA] + + +def test_passthrough_when_teleop_action_is_already_delta_dict(): + """Idempotent on dicts that don't look like raw joint reads.""" + delta = {"delta_x": 0.5, "delta_y": 0.0, "delta_z": -0.3, "gripper": GRIPPER_OPEN} + follower = _joint_dict([0.0, 0.0, 0.0, 0.0, 0.0]) + transition = _build_transition(delta, follower) + out = _make_step()(transition) + assert out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] == delta + + +def test_reads_follower_from_observation_when_complementary_missing(): + leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0]) + leader["gripper.pos"] = 50.0 + follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0]) + + transition = create_transition( + observation=follower, + complementary_data={"teleop_action": leader}, + ) + out = _make_step()(transition) + + teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] + # delta = (20 - 10) * 1e-3 = 0.01, normalised by 0.025 -> 0.4 + assert teleop_action["delta_x"] == pytest.approx(0.4)