From 7a62ea21d8fed6d7fc55d5432e06f90f898e19f9 Mon Sep 17 00:00:00 2001 From: Maximellerbach Date: Mon, 4 May 2026 18:44:30 +0200 Subject: [PATCH] feat(dagger): adding smooth handover Co-authored-by: Copilot --- src/lerobot/rollout/strategies/dagger.py | 165 ++++++++++++------ .../openarm_mini/openarm_mini.py | 4 +- .../teleoperators/so_leader/so_leader.py | 14 +- 3 files changed, 125 insertions(+), 58 deletions(-) diff --git a/src/lerobot/rollout/strategies/dagger.py b/src/lerobot/rollout/strategies/dagger.py index da4b463fc..09992e25a 100644 --- a/src/lerobot/rollout/strategies/dagger.py +++ b/src/lerobot/rollout/strategies/dagger.py @@ -33,12 +33,13 @@ Recording modes: ``record_autonomous=False``: Only correction windows are recorded. Each correction (start to stop) becomes one episode. -Teleoperator expectations: - The user is responsible for keeping the leader arm aligned with the - follower arm at the moment a correction begins. Programmatic motor - handover (``enable_torque`` / ``disable_torque`` / ``write_goal_positions``) - is intentionally not invoked here — see the TODO in - :func:`DAggerStrategy._apply_transition` for the open design decision. +Teleoperator handover: + On AUTONOMOUS → PAUSED, actuated teleops (those with non-empty + ``feedback_features``, e.g. SO-101, OpenArmMini) are smoothly driven to + the follower's last position via ``send_feedback`` so the operator takes + over without a jerk. Non-actuated teleops cannot be driven, + so on PAUSED → CORRECTING the follower is instead slid to the teleop's + current pose before the correction begins. """ from __future__ import annotations @@ -175,17 +176,25 @@ class DAggerEvents: # --------------------------------------------------------------------------- -# TODO(Steven): re-enable programmatic teleop alignment once we decide whether -# to enforce motor-control methods on every Teleoperator. Until then the user -# is responsible for moving the leader arm to the follower's pose at the moment -# a correction begins. +def _teleop_supports_feedback(teleop: Teleoperator) -> bool: + """Return True when the teleop can receive position feedback (is actuated). + TODO(Maxime): See if it is possible to unify this interface across teleops instead of duck-typing. + """ + return ( + bool(teleop.feedback_features) + and hasattr(teleop, "disable_torque") + and hasattr(teleop, "enable_torque") + ) + + def _teleop_smooth_move_to( teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 50 ) -> None: - """Smoothly move teleop to target position via linear interpolation. + """Smoothly move an actuated teleop to ``target_pos`` via linear interpolation. - Requires the teleoperator to support motor control methods - (``enable_torque``, ``write_goal_positions``, ``get_action``). + Enables torque for the duration of the move, then disables it so the + operator can move the arm freely. ``target_pos`` must use the same key + space as ``teleop.feedback_features`` (i.e. ``"{motor}.pos"`` keys). """ teleop.enable_torque() current = teleop.get_action() @@ -193,13 +202,28 @@ def _teleop_smooth_move_to( for step in range(steps + 1): t = step / steps - interp = {} - for k in current: - if k in target_pos: - interp[k] = current[k] * (1 - t) + target_pos[k] * t - else: - interp[k] = current[k] - teleop.write_goal_positions(interp) + interp = { + k: current[k] * (1 - t) + target_pos[k] * t if k in target_pos else current[k] for k in current + } + teleop.send_feedback(interp) + time.sleep(1 / fps) + + +def _follower_smooth_move_to( + robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 50 +) -> None: + """Smoothly move the follower robot from ``current`` to ``target`` action. + + Used when the teleop is non-actuated: instead of driving the leader arm + to the follower, we bring the follower to the teleop's current pose. + Both ``current`` and ``target`` must be in robot-action key space. + """ + steps = max(int(duration_s * fps), 1) + + for step in range(steps + 1): + t = step / steps + interp = {k: current[k] * (1 - t) + target[k] * t if k in target else current[k] for k in current} + robot.send_action(interp) time.sleep(1 / fps) @@ -415,9 +439,6 @@ class DAggerStrategy(RolloutStrategy): engine.reset() interpolator.reset() events.reset() - # TODO(Steven): re-enable once Teleoperator motor-control methods are - # standardised; until then the user pre-aligns the leader by hand. - # teleop.disable_torque() engine.resume() last_action: dict[str, Any] | None = None @@ -441,8 +462,21 @@ class DAggerStrategy(RolloutStrategy): transition = events.consume_transition() if transition is not None: old_phase, new_phase = transition - self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop) - last_action = None + prev_action = last_action + obs = robot.get_observation() + self._apply_transition( + old_phase, + new_phase, + engine, + interpolator, + robot, + teleop, + ctx, + prev_action, + obs, + ) + if new_phase == DAggerPhase.AUTONOMOUS: + last_action = None phase = events.phase obs = robot.get_observation() @@ -532,9 +566,6 @@ class DAggerStrategy(RolloutStrategy): finally: logger.info("DAgger continuous control loop ended — pausing engine") engine.pause() - # TODO(Steven): re-enable once Teleoperator motor-control methods - # are standardised across all teleop implementations. - # teleop.disable_torque() with contextlib.suppress(Exception): with self._episode_lock: dataset.save_episode() @@ -570,9 +601,6 @@ class DAggerStrategy(RolloutStrategy): engine.reset() interpolator.reset() events.reset() - # TODO(Steven): re-enable once Teleoperator motor-control methods are - # standardised; until then the user pre-aligns the leader by hand. - # teleop.disable_torque() engine.resume() last_action: dict[str, Any] | None = None @@ -600,8 +628,21 @@ class DAggerStrategy(RolloutStrategy): transition = events.consume_transition() if transition is not None: old_phase, new_phase = transition - self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop) - last_action = None + prev_action = last_action + obs = robot.get_observation() + self._apply_transition( + old_phase, + new_phase, + engine, + interpolator, + robot, + teleop, + ctx, + prev_action, + obs, + ) + if new_phase == DAggerPhase.AUTONOMOUS: + last_action = None # Correction ended -> save episode (blocking if not streaming) if old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED: @@ -679,9 +720,6 @@ class DAggerStrategy(RolloutStrategy): finally: logger.info("DAgger corrections-only loop ended — pausing engine") engine.pause() - # TODO(Steven): re-enable once Teleoperator motor-control methods - # are standardised across all teleop implementations. - # teleop.disable_torque() with contextlib.suppress(Exception): with self._episode_lock: dataset.save_episode() @@ -700,34 +738,55 @@ class DAggerStrategy(RolloutStrategy): interpolator, robot: ThreadSafeRobot, teleop: Teleoperator, + ctx: RolloutContext, + prev_action: dict | None, + obs: dict, ) -> None: - """Execute side-effects for a validated phase transition.""" + """Execute side-effects for a validated phase transition, including smooth handovers. + + AUTONOMOUS -> PAUSED (actuated teleop): + Pause the engine, then drive the leader arm to the follower's last + commanded position so the operator takes over without a jerk. + + PAUSED -> CORRECTING (non-actuated teleop): + Slide the follower to the teleop's current pose so the robot meets + the operator's hand rather than jumping to it on the first frame. + + PAUSED -> AUTONOMOUS: + Reset and resume the inference engine. + """ logger.info("Phase transition: %s -> %s", old_phase.value, new_phase.value) if old_phase == DAggerPhase.AUTONOMOUS and new_phase == DAggerPhase.PAUSED: - logger.info("Pausing engine — robot holds position") + logger.info("Pausing engine - robot holds position") engine.pause() - obs = robot.get_observation() - _robot_pos = { - k: v for k, v in obs.items() if k.endswith(".pos") and k in robot.observation_features - } - # TODO(Steven): once Teleoperator motor-control methods are - # standardised, drive the leader to the follower's pose here so the - # operator does not need to pre-align the arm by hand. Until then - # the user is responsible for the alignment. - # _teleop_smooth_move_to(teleop, _robot_pos, duration_s=2.0, fps=50) - elif new_phase == DAggerPhase.CORRECTING: - logger.info("Entering correction mode — human teleop control") - # TODO(Steven): re-enable once Teleoperator motor-control methods - # are standardised across all teleop implementations. - # teleop.disable_torque() + if _teleop_supports_feedback(teleop) and prev_action is not None: + logger.info("Smooth handover: moving leader arm to follower position") + _teleop_smooth_move_to(teleop, prev_action) + + elif old_phase == DAggerPhase.PAUSED and new_phase == DAggerPhase.CORRECTING: + logger.info("Entering correction mode - human teleop control") + if not _teleop_supports_feedback(teleop) and prev_action is not None: + logger.info("Smooth handover: sliding follower to teleop position") + teleop_action = teleop.get_action() + processed = ctx.processors.teleop_action_processor((teleop_action, obs)) + target = ctx.processors.robot_action_processor((processed, obs)) + _follower_smooth_move_to(robot, prev_action, target) + + # unlock the teleop for human control + if _teleop_supports_feedback(teleop): + teleop.disable_torque() elif new_phase == DAggerPhase.AUTONOMOUS: - logger.info("Resuming autonomous mode — resetting engine and interpolator") + logger.info("Resuming autonomous mode - resetting engine and interpolator") interpolator.reset() engine.reset() engine.resume() + # release teleop before resuming the policy + if _teleop_supports_feedback(teleop): + teleop.disable_torque() + # ------------------------------------------------------------------ # Background push (shared by both modes) # ------------------------------------------------------------------ diff --git a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py index edc28e7a6..d7183085e 100644 --- a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py +++ b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py @@ -112,7 +112,7 @@ class OpenArmMini(Teleoperator): @property def feedback_features(self) -> dict[str, type]: - return {} + return self.action_features @property def is_connected(self) -> bool: @@ -349,7 +349,7 @@ class OpenArmMini(Teleoperator): self.bus_left.sync_write("Goal_Position", left_goals) def send_feedback(self, feedback: dict[str, float]) -> None: - raise NotImplementedError("Feedback is not yet implemented for OpenArm Mini.") + self.write_goal_positions(feedback) @check_if_not_connected def disconnect(self) -> None: diff --git a/src/lerobot/teleoperators/so_leader/so_leader.py b/src/lerobot/teleoperators/so_leader/so_leader.py index 04ce0f21f..7e731d5ed 100644 --- a/src/lerobot/teleoperators/so_leader/so_leader.py +++ b/src/lerobot/teleoperators/so_leader/so_leader.py @@ -59,7 +59,7 @@ class SOLeader(Teleoperator): @property def feedback_features(self) -> dict[str, type]: - return {} + return self.action_features @property def is_connected(self) -> bool: @@ -130,6 +130,12 @@ class SOLeader(Teleoperator): for motor in self.bus.motors: self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + def enable_torque(self) -> None: + self.bus.enable_torque() + + def disable_torque(self) -> None: + self.bus.disable_torque() + def setup_motors(self) -> None: for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") @@ -145,9 +151,11 @@ class SOLeader(Teleoperator): logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action + @check_if_not_connected def send_feedback(self, feedback: dict[str, float]) -> None: - # TODO: Implement force feedback - raise NotImplementedError + goals = {k.removesuffix(".pos"): v for k, v in feedback.items() if k.endswith(".pos")} + if goals: + self.bus.sync_write("Goal_Position", goals) @check_if_not_connected def disconnect(self) -> None: