From 13418dcd7b810855e4809f30138100fe64f3e56b Mon Sep 17 00:00:00 2001 From: Khalil Meftah Date: Mon, 27 Apr 2026 17:50:29 +0200 Subject: [PATCH] feat(rl): port haptic follow + torque toggle from #2596 to leader intervention --- .../teleoperate_with_intervention.py | 170 ++++++++++++++++++ .../processor/leader_follower_processor.py | 72 ++++++-- src/lerobot/rl/gym_manipulator.py | 5 +- .../so_leader/so_leader_follower.py | 110 +++++++++++- .../test_leader_follower_processor.py | 106 ++++++++++- 5 files changed, 442 insertions(+), 21 deletions(-) create mode 100644 examples/so100_to_so100_EE/teleoperate_with_intervention.py diff --git a/examples/so100_to_so100_EE/teleoperate_with_intervention.py b/examples/so100_to_so100_EE/teleoperate_with_intervention.py new file mode 100644 index 000000000..256c71b32 --- /dev/null +++ b/examples/so100_to_so100_EE/teleoperate_with_intervention.py @@ -0,0 +1,170 @@ +# !/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""SO100 leader / follower teleop with HIL-SERL-style intervention toggle. + +This is a position-only standalone demo of the leader-arm intervention pattern +used by the HIL-SERL training stack (see ``lerobot.processor.LeaderArmInterventionStep`` +and ``lerobot.teleoperators.so_leader.SOLeaderFollower``). + +Behaviour: + * **Following mode** (default): The follower is idle, the leader is + torque-enabled and haptically tracks the follower's pose. The user can + grab the leader at any time without fighting the position loop. + * **Intervention mode** (toggled by pressing SPACE): The leader's torque is + released, the user moves the leader freely and the follower mirrors the + leader's end-effector position via ``[delta_x, delta_y, delta_z]`` deltas, + identical to how the real HIL-SERL action pipeline records interventions. + +Keyboard: + * ``SPACE`` -- toggle intervention on/off. + * ``q`` -- exit the loop cleanly. +""" + +from __future__ import annotations + +import time + +import numpy as np + +from lerobot.model.kinematics import RobotKinematics +from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig +from lerobot.teleoperators.so_leader import SOLeaderFollower, SOLeaderTeleopConfig +from lerobot.teleoperators.utils import TeleopEvents +from lerobot.utils.robot_utils import precise_sleep + +FPS = 30 + +# Per-axis EE-delta normalization (metres). Same convention as +# `LeaderArmInterventionStep`: the normalised delta is `(p_leader - p_follower) / step`, +# clipped to [-1, 1]. Keep these small so a single tick is a safe motion. +EE_STEP_SIZES = {"x": 0.010, "y": 0.010, "z": 0.010} + +# Workspace bounds (metres) -- a tight box around the resting pose to keep the +# follower from running into its joint limits during the demo. +EE_BOUNDS = {"min": np.array([-0.20, -0.30, 0.02]), "max": np.array([0.30, 0.30, 0.40])} + +URDF_PATH = "./SO101/so101_new_calib.urdf" +TARGET_FRAME = "gripper_frame_link" + + +def _joints_dict_to_array(joints: dict[str, float], motor_names: list[str]) -> np.ndarray: + return np.array([joints[f"{m}.pos"] for m in motor_names], dtype=float) + + +def _array_to_joints_dict(arr: np.ndarray, motor_names: list[str]) -> dict[str, float]: + return {f"{m}.pos": float(v) for m, v in zip(motor_names, arr, strict=True)} + + +def main() -> None: + follower_config = SO100FollowerConfig( + port="/dev/tty.usbmodem5A460814411", id="my_follower_arm", use_degrees=True + ) + leader_config = SOLeaderTeleopConfig( + port="/dev/tty.usbmodem5A460819811", + id="my_leader_arm", + use_degrees=True, + leader_follower_mode=True, + use_gripper=True, + ) + + follower = SO100Follower(follower_config) + leader = SOLeaderFollower(leader_config) + + follower_motor_names = list(follower.bus.motors.keys()) + leader_motor_names = list(leader.bus.motors.keys()) + + # NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: + # https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf + follower_kinematics = RobotKinematics( + urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=follower_motor_names + ) + leader_kinematics = RobotKinematics( + urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=leader_motor_names + ) + + follower.connect() + leader.connect() + + print("Starting leader-follower intervention demo...") + print(" - Press SPACE to toggle intervention.") + print(" - Press 'q' to exit.") + + try: + while True: + t0 = time.perf_counter() + + # 1. Read both arms. + follower_obs = follower.get_observation() + follower_joints_dict = {f"{m}.pos": float(follower_obs[f"{m}.pos"]) for m in follower_motor_names} + leader_joints_dict = leader.get_action() + + # 2. Haptic follow: push follower joints back to the leader. The + # leader's `send_action` gates motor writes on its intervention + # state internally (torque on while following, off while intervening). + leader.send_action(follower_joints_dict) + + # 3. Pull teleop events (SPACE toggle, 'q' terminate). + events = leader.get_teleop_events() + if events.get(TeleopEvents.TERMINATE_EPISODE): + print("Termination requested -- exiting.") + break + + is_intervention = events.get(TeleopEvents.IS_INTERVENTION, False) + + if is_intervention: + # 4a. Compute leader/follower EE poses, take the *normalised + # position-only delta*, and integrate it onto the follower's + # current EE pose to get a target. This mirrors the action + # space recorded by `LeaderArmInterventionStep` during HIL-SERL. + leader_arr = _joints_dict_to_array(leader_joints_dict, leader_motor_names) + follower_arr = _joints_dict_to_array(follower_joints_dict, follower_motor_names) + + p_leader = leader_kinematics.forward_kinematics(leader_arr)[:3, 3] + p_follower_mat = follower_kinematics.forward_kinematics(follower_arr) + p_follower = p_follower_mat[:3, 3] + + raw_delta = p_leader - p_follower + step_vec = np.array([EE_STEP_SIZES["x"], EE_STEP_SIZES["y"], EE_STEP_SIZES["z"]], dtype=float) + delta_norm = np.clip(raw_delta / step_vec, -1.0, 1.0) + delta_m = delta_norm * step_vec + + target_pose = p_follower_mat.copy() + target_pose[:3, 3] = np.clip(p_follower + delta_m, EE_BOUNDS["min"], EE_BOUNDS["max"]) + + # IK -> joint-space goal for the follower's arm chain. The + # gripper joint is kept separate and driven from the leader's + # gripper position directly (no IK). + target_joints = follower_kinematics.inverse_kinematics( + current_joint_pos=follower_arr, + desired_ee_pose=target_pose, + orientation_weight=0.0, + ) + follower_action = _array_to_joints_dict(target_joints, follower_motor_names) + follower_action["gripper.pos"] = float(leader_joints_dict.get("gripper.pos", 50.0)) + follower.send_action(follower_action) + # 4b. Following mode: leave the follower alone -- the leader just + # tracks it haptically. In real HIL-SERL training this is where the + # policy would step the follower forward. + + precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0)) + finally: + leader.disconnect() + follower.disconnect() + + +if __name__ == "__main__": + main() diff --git a/src/lerobot/processor/leader_follower_processor.py b/src/lerobot/processor/leader_follower_processor.py index 4e1b155c7..895bc4db0 100644 --- a/src/lerobot/processor/leader_follower_processor.py +++ b/src/lerobot/processor/leader_follower_processor.py @@ -36,8 +36,17 @@ converts that into a normalised EE-delta dictionary by: The output is written back to ``complementary_data["teleop_action"]`` so the rest of the action pipeline (``InterventionActionProcessorStep`` -> ``MapTensorToDeltaActionDictStep`` -> IK) is unchanged. + +Additionally, when an optional ``teleop_device`` reference is provided, this +step also pushes the follower's raw joint positions back to the leader via +``teleop_device.send_action(follower_joints)`` every tick. Combined with +:class:`SOLeaderFollower.send_action`, this implements the **haptic follow** +behaviour from https://github.com/huggingface/lerobot/pull/2596: the leader +mimics the follower while the human is hands-off, then drops torque the +moment intervention is toggled so the user can grab and steer it. """ +import logging from dataclasses import dataclass, field from typing import Any @@ -49,6 +58,8 @@ from lerobot.types import EnvTransition, TransitionKey from .pipeline import ProcessorStep, ProcessorStepRegistry +logger = logging.getLogger(__name__) + TELEOP_ACTION_KEY = "teleop_action" RAW_JOINT_POSITIONS_KEY = "raw_joint_positions" GRIPPER_KEY = "gripper" @@ -101,6 +112,11 @@ class LeaderArmInterventionStep(ProcessorStep): considered ``open`` -> command ``2``. leader_gripper_close: Threshold (<= ) below which the leader gripper is considered ``closed`` -> command ``0``. + teleop_device: Optional reference to the leader teleoperator. When set + and the device implements ``send_action(action_dict)``, this step + pushes the follower's raw joints to it every tick to drive haptic + follow. The teleop is responsible for gating actual motor writes on + its own intervention state (see :class:`SOLeaderFollower`). """ kinematics: RobotKinematics @@ -109,6 +125,7 @@ class LeaderArmInterventionStep(ProcessorStep): use_gripper: bool = True leader_gripper_open: float = LEADER_GRIPPER_OPEN_DEFAULT leader_gripper_close: float = LEADER_GRIPPER_CLOSE_DEFAULT + teleop_device: Any = None _initial_follower_joints: np.ndarray | None = field(default=None, init=False, repr=False) @@ -116,6 +133,13 @@ class LeaderArmInterventionStep(ProcessorStep): new_transition = transition.copy() complementary_data = dict(new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) or {}) + # Haptic follow: push follower joints to the leader every step (whether + # or not we have a usable leader action this tick). The leader's own + # send_action gates writes on its intervention state. + follower_joints_dict = self._read_follower_joints_dict(transition, complementary_data) + if follower_joints_dict is not None: + self._push_haptic_follow(follower_joints_dict) + leader_joints_dict = complementary_data.get(TELEOP_ACTION_KEY) if not isinstance(leader_joints_dict, dict): # Nothing to convert (e.g. teleop disconnected). Leave transition untouched. @@ -125,7 +149,11 @@ class LeaderArmInterventionStep(ProcessorStep): # Already in EE-delta form (or unrecognised); skip. return new_transition - follower_joints = self._read_follower_joints(transition, complementary_data) + follower_joints = ( + _joint_dict_to_array(follower_joints_dict, self.motor_names) + if follower_joints_dict is not None + else None + ) leader_joints = _joint_dict_to_array(leader_joints_dict, self.motor_names) if follower_joints is None or leader_joints is None: @@ -163,9 +191,9 @@ class LeaderArmInterventionStep(ProcessorStep): new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data return new_transition - def _read_follower_joints( + def _read_follower_joints_dict( self, transition: EnvTransition, complementary_data: dict[str, Any] - ) -> np.ndarray | None: + ) -> dict[str, float] | None: """Best-effort read of the follower joints from the transition. Tries (in order): @@ -173,21 +201,40 @@ class LeaderArmInterventionStep(ProcessorStep): 2. ``transition[OBSERVATION]`` if it is a flat ``".pos"`` dict (this is the convention used by ``step_env_and_process_transition`` when staging an action transition). + + Returns the source dict if all expected motors are present, else + ``None``. We return the *dict* (not the array) because we want to feed + it back to ``teleop_device.send_action`` for haptic follow. """ raw = complementary_data.get(RAW_JOINT_POSITIONS_KEY) - if isinstance(raw, dict): - arr = _joint_dict_to_array(raw, self.motor_names) - if arr is not None: - return arr + if isinstance(raw, dict) and all(f"{m}.pos" in raw for m in self.motor_names): + return raw # type: ignore[return-value] observation = transition.get(TransitionKey.OBSERVATION) - if isinstance(observation, dict): - arr = _joint_dict_to_array(observation, self.motor_names) - if arr is not None: - return arr + if isinstance(observation, dict) and all(f"{m}.pos" in observation for m in self.motor_names): + return observation # type: ignore[return-value] return None + def _push_haptic_follow(self, follower_joints_dict: dict[str, float]) -> None: + """Send the follower's joints back to the leader for haptic follow. + + Errors are logged once and swallowed -- a failed haptic update must + never break the policy / learner loop. + """ + if self.teleop_device is None: + return + send_action = getattr(self.teleop_device, "send_action", None) + if send_action is None: + return + try: + send_action(follower_joints_dict) + except NotImplementedError: + # Plain SOLeader / unsupported teleop -- silently disable haptic follow. + self.teleop_device = None + except Exception as e: # pragma: no cover - hardware path + logger.warning(f"[LeaderArmInterventionStep] haptic follow failed: {e}") + def _discretise_gripper(self, leader_gripper_pos: float) -> float: """Map a leader gripper position in ``[0, 100]`` to ``{0, 1, 2}``.""" if leader_gripper_pos >= self.leader_gripper_open: @@ -203,6 +250,9 @@ class LeaderArmInterventionStep(ProcessorStep): return out def get_config(self) -> dict[str, Any]: + # `kinematics` and `teleop_device` are runtime objects (not JSON-serializable) + # and are re-injected by `gym_manipulator.make_processors`, so they are + # intentionally omitted from the saved config. return { "motor_names": list(self.motor_names), "end_effector_step_sizes": dict(self.end_effector_step_sizes), diff --git a/src/lerobot/rl/gym_manipulator.py b/src/lerobot/rl/gym_manipulator.py index 245992878..c9d6ca7ed 100644 --- a/src/lerobot/rl/gym_manipulator.py +++ b/src/lerobot/rl/gym_manipulator.py @@ -492,7 +492,9 @@ def make_processors( ) # Leader-arm intervention: convert raw leader joints in `teleop_action` - # into a 4-D EE-delta dict before the override step consumes it. + # into a 4-D EE-delta dict before the override step consumes it. The same + # step also drives haptic follow on the leader (when `teleop_device` is a + # `SOLeaderFollower`) by pushing the follower joints back via send_action. if ( getattr(cfg.processor, "control_mode", "gamepad") == "leader" and cfg.processor.inverse_kinematics is not None @@ -503,6 +505,7 @@ def make_processors( kinematics=kinematics_solver, motor_names=motor_names, end_effector_step_sizes=cfg.processor.inverse_kinematics.end_effector_step_sizes, + teleop_device=teleop_device, use_gripper=use_gripper_for_intervention, ) ) diff --git a/src/lerobot/teleoperators/so_leader/so_leader_follower.py b/src/lerobot/teleoperators/so_leader/so_leader_follower.py index 0f228e3f7..c3e8348dd 100644 --- a/src/lerobot/teleoperators/so_leader/so_leader_follower.py +++ b/src/lerobot/teleoperators/so_leader/so_leader_follower.py @@ -15,11 +15,10 @@ # limitations under the License. """ -SO Leader teleoperator extended with HIL-SERL intervention events. +SO Leader teleoperator extended with HIL-SERL intervention events and haptic follow. -This thin wrapper around :class:`SOLeader` keeps the underlying joint-reading -behaviour intact (so :func:`get_action` returns ``{".pos": float}``) -while adding: +This wrapper around :class:`SOLeader` keeps the underlying joint-reading behaviour +intact (so :func:`get_action` returns ``{".pos": float}``) while adding: * A pynput keyboard listener that toggles intervention with SPACE and emits ``success`` / ``rerecord`` / ``fail`` signals via S / R / Q keys, mirroring @@ -30,10 +29,17 @@ while adding: ``[delta_x, delta_y, delta_z, gripper]`` space the leader will project into via :class:`LeaderArmInterventionStep` -- this is what ends up recorded by ``LeRobotDataset`` in HIL-SERL ``record`` mode. +* :func:`send_action` for **haptic follow**: when the human is not intervening, + the leader is torque-enabled and tracks the follower's joint positions so the + user can grab it at any time and seamlessly take over (mirrors the design from + https://github.com/huggingface/lerobot/pull/2596). When intervention is + toggled on, leader torque is disabled so the user can move it freely. +* Lower position-loop gains on :func:`connect` (``P=16, I=0, D=16``) so the + haptic follow does not jerk the user's hand when grabbing the leader. -The actual joint-to-EE-delta conversion does **not** happen here; it is -performed by :class:`LeaderArmInterventionStep` in the action processor -pipeline so the leader stays a pure I/O device. +The joint-to-EE-delta conversion does **not** happen here; it is performed by +:class:`LeaderArmInterventionStep` in the action processor pipeline so the +leader stays a pure I/O device. """ from __future__ import annotations @@ -44,6 +50,8 @@ import os import sys from typing import Any +import numpy as np + from lerobot.types import RobotAction from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected from lerobot.utils.import_utils import _pynput_available @@ -52,6 +60,10 @@ from ..utils import TeleopEvents from .config_so_leader import SOLeaderTeleopConfig from .so_leader import SOLeader +LEADER_FOLLOWER_P_GAIN = 16 +LEADER_FOLLOWER_I_GAIN = 0 +LEADER_FOLLOWER_D_GAIN = 16 + logger = logging.getLogger(__name__) PYNPUT_AVAILABLE = _pynput_available @@ -83,6 +95,11 @@ class SOLeaderFollower(SOLeader): self._terminate_episode: bool = False self._listener: Any = None + # Haptic follow state (mirrors `is_intervening` / `leader_torque_enabled` + # in https://github.com/huggingface/lerobot/pull/2596 SO101LeaderFollower). + self._leader_torque_enabled: bool = True + self._last_follower_pos: np.ndarray | None = None + @property def action_features(self) -> dict[str, Any]: """Announce the 4-D EE-delta action space recorded by the dataset. @@ -101,7 +118,27 @@ class SOLeaderFollower(SOLeader): @check_if_already_connected def connect(self, calibrate: bool = True) -> None: super().connect(calibrate=calibrate) + self._configure_leader_follower_gains() self._start_keyboard_listener() + logger.info( + "[SOLeaderFollower] connected. Press SPACE to toggle intervention, " + "'s' for success, 'r' for re-record, 'q' to terminate." + ) + + def _configure_leader_follower_gains(self) -> None: + """Lower position-loop gains so haptic follow does not yank the user. + + Mirrors the gains used by the SO101LeaderFollower in PR #2596 — high + default gains make the leader fight the user's hand when they grab it + between interventions. + """ + for motor in self.bus.motors: + try: + self.bus.write("P_Coefficient", motor, LEADER_FOLLOWER_P_GAIN) + self.bus.write("I_Coefficient", motor, LEADER_FOLLOWER_I_GAIN) + self.bus.write("D_Coefficient", motor, LEADER_FOLLOWER_D_GAIN) + except Exception as e: # pragma: no cover - hardware path + logger.warning(f"[SOLeaderFollower] could not set PID gains for '{motor}': {e}") def _start_keyboard_listener(self) -> None: if not PYNPUT_AVAILABLE: @@ -135,9 +172,64 @@ class SOLeaderFollower(SOLeader): @check_if_not_connected def get_action(self) -> RobotAction: - # Reuse the SOLeader joint read so we still expose the leader pose. + # When the user has just toggled into intervention, make sure leader + # torque is OFF so they can move it without fighting the position loop. + if self._is_intervention and self._leader_torque_enabled: + try: + self.bus.sync_write("Torque_Enable", 0) + self._leader_torque_enabled = False + except Exception as e: # pragma: no cover - hardware path + logger.warning(f"[SOLeaderFollower] could not disable leader torque: {e}") return super().get_action() + def send_action(self, action: dict[str, float]) -> None: # type: ignore[override] + """Mirror the follower's joint positions on the leader (haptic follow). + + This is called every step from the action pipeline (typically by + :class:`LeaderArmInterventionStep`) with the follower's raw joint + positions ``{".pos": float}``. While the user is **not** + intervening the leader is torque-enabled and tracks the follower so the + operator can grab it at any time and continue motion smoothly. As soon + as the user toggles intervention on (SPACE), torque is dropped in + :func:`get_action` so the human can move the leader freely. + + Errors talking to the bus are logged and swallowed -- the policy / + learner loop must keep ticking even if a single haptic update fails. + + Args: + action: Dictionary of follower motor positions, ``{motor.pos: deg}``. + """ + if not self.is_connected: + return + + try: + self._last_follower_pos = np.array( + [float(action.get(f"{m}.pos", 0.0)) for m in self.bus.motors], + dtype=float, + ) + except Exception as e: # pragma: no cover - defensive + logger.warning(f"[SOLeaderFollower] could not extract follower joints: {e}") + return + + if self._is_intervention: + return + + if not self._leader_torque_enabled: + try: + self.bus.sync_write("Torque_Enable", 1) + self._leader_torque_enabled = True + except Exception as e: # pragma: no cover - hardware path + logger.warning(f"[SOLeaderFollower] could not enable leader torque: {e}") + return + + goal_pos = {m: float(action[f"{m}.pos"]) for m in self.bus.motors if f"{m}.pos" in action} + if not goal_pos: + return + try: + self.bus.sync_write("Goal_Position", goal_pos) + except Exception as e: # pragma: no cover - hardware path + logger.warning(f"[SOLeaderFollower] could not push goal position to leader: {e}") + def get_teleop_events(self) -> dict[TeleopEvents, bool]: events = { TeleopEvents.IS_INTERVENTION: self._is_intervention, @@ -157,6 +249,8 @@ class SOLeaderFollower(SOLeader): self._success = False self._rerecord = False self._terminate_episode = False + self._leader_torque_enabled = True + self._last_follower_pos = None @check_if_not_connected def disconnect(self) -> None: diff --git a/tests/processor/test_leader_follower_processor.py b/tests/processor/test_leader_follower_processor.py index 9f9e0e845..92ecbbb6a 100644 --- a/tests/processor/test_leader_follower_processor.py +++ b/tests/processor/test_leader_follower_processor.py @@ -58,15 +58,38 @@ def _joint_dict(values: list[float]) -> dict[str, float]: return {f"{name}.pos": v for name, v in zip(MOTOR_NAMES, values, strict=False)} -def _make_step(use_gripper: bool = True) -> LeaderArmInterventionStep: +def _make_step(use_gripper: bool = True, teleop_device: Any = None) -> LeaderArmInterventionStep: return LeaderArmInterventionStep( kinematics=_FakeKinematics(), # type: ignore[arg-type] motor_names=MOTOR_NAMES, end_effector_step_sizes=STEP_SIZES, use_gripper=use_gripper, + teleop_device=teleop_device, ) +class _RecordingTeleop: + """Minimal teleop double that records every send_action call.""" + + def __init__(self) -> None: + self.calls: list[dict[str, float]] = [] + + def send_action(self, action: dict[str, float]) -> None: + self.calls.append(dict(action)) + + +class _RaisingTeleop: + """Teleop double whose send_action raises an unexpected error.""" + + def __init__(self, exc: Exception) -> None: + self.exc = exc + self.calls = 0 + + def send_action(self, action: dict[str, float]) -> None: + self.calls += 1 + raise self.exc + + def _build_transition( leader_joints: dict[str, float] | None, follower_joints: dict[str, float] | None, @@ -178,3 +201,84 @@ def test_reads_follower_from_observation_when_complementary_missing(): teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] # delta = (20 - 10) * 1e-3 = 0.01, normalised by 0.025 -> 0.4 assert teleop_action["delta_x"] == pytest.approx(0.4) + + +# --- haptic follow ---------------------------------------------------------- + + +def test_haptic_follow_pushes_follower_joints_to_teleop_device(): + """When teleop_device is set, follower joints should be sent to it every tick.""" + leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0]) + leader["gripper.pos"] = 50.0 + follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0]) + follower["gripper.pos"] = 50.0 + teleop = _RecordingTeleop() + + step = _make_step(teleop_device=teleop) + step(_build_transition(leader, follower)) + + assert len(teleop.calls) == 1 + assert teleop.calls[0] == follower + + +def test_haptic_follow_uses_observation_when_complementary_missing(): + """Falls back to OBSERVATION dict for haptic follow when complementary is empty.""" + leader = _joint_dict([5.0, 0.0, 0.0, 0.0, 0.0]) + leader["gripper.pos"] = 50.0 + follower = _joint_dict([3.0, 0.0, 0.0, 0.0, 0.0]) + follower["gripper.pos"] = 50.0 + teleop = _RecordingTeleop() + + transition = create_transition( + observation=follower, + complementary_data={"teleop_action": leader}, + ) + _make_step(teleop_device=teleop)(transition) + + assert teleop.calls == [follower] + + +def test_haptic_follow_skipped_when_no_follower_joints_available(): + """No follower joints -> no haptic write (don't push stale data).""" + leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0]) + leader["gripper.pos"] = 50.0 + teleop = _RecordingTeleop() + + transition = _build_transition(leader, follower_joints=None) + _make_step(teleop_device=teleop)(transition) + + assert teleop.calls == [] + + +def test_haptic_follow_swallows_send_action_errors(): + """A failing teleop.send_action must not abort the action pipeline.""" + leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0]) + leader["gripper.pos"] = 50.0 + follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0]) + follower["gripper.pos"] = 50.0 + teleop = _RaisingTeleop(RuntimeError("bus comms fail")) + + step = _make_step(teleop_device=teleop) + out = step(_build_transition(leader, follower)) + + assert teleop.calls == 1 + # The downstream EE-delta payload must still be produced normally. + teleop_action = out[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] + assert teleop_action["delta_x"] == pytest.approx(0.4) + + +def test_haptic_follow_disables_when_send_action_not_implemented(): + """Plain leaders (no haptic follow) opt out via NotImplementedError.""" + leader = _joint_dict([20.0, 0.0, 0.0, 0.0, 0.0]) + leader["gripper.pos"] = 50.0 + follower = _joint_dict([10.0, 0.0, 0.0, 0.0, 0.0]) + follower["gripper.pos"] = 50.0 + teleop = _RaisingTeleop(NotImplementedError("plain leader, no haptic follow")) + + step = _make_step(teleop_device=teleop) + step(_build_transition(leader, follower)) + # Tick again and confirm the step gave up rather than spamming the device. + step(_build_transition(leader, follower)) + + assert teleop.calls == 1 + assert step.teleop_device is None