From ce24063efdb31de148963bb82a4a516ec46a66b0 Mon Sep 17 00:00:00 2001 From: Maxime Ellerbach Date: Tue, 5 May 2026 14:44:32 +0200 Subject: [PATCH] feat(dagger): adding smooth handover (#3506) * feat(dagger): adding smooth handover * update docstring * small phase fix and documenting potential issues * cleaning up --- src/lerobot/rollout/strategies/dagger.py | 181 ++++++++++++------ .../openarm_mini/openarm_mini.py | 5 +- .../teleoperators/so_leader/so_leader.py | 14 +- 3 files changed, 137 insertions(+), 63 deletions(-) diff --git a/src/lerobot/rollout/strategies/dagger.py b/src/lerobot/rollout/strategies/dagger.py index da4b463fc..1bd6d9bb0 100644 --- a/src/lerobot/rollout/strategies/dagger.py +++ b/src/lerobot/rollout/strategies/dagger.py @@ -33,12 +33,13 @@ Recording modes: ``record_autonomous=False``: Only correction windows are recorded. Each correction (start to stop) becomes one episode. -Teleoperator expectations: - The user is responsible for keeping the leader arm aligned with the - follower arm at the moment a correction begins. Programmatic motor - handover (``enable_torque`` / ``disable_torque`` / ``write_goal_positions``) - is intentionally not invoked here — see the TODO in - :func:`DAggerStrategy._apply_transition` for the open design decision. +Teleoperator handover: + On AUTONOMOUS → PAUSED, actuated teleops (those with non-empty + ``feedback_features``, e.g. SO-101, OpenArmMini) are smoothly driven to + the follower's last position via ``send_feedback`` so the operator takes + over without a jerk. Non-actuated teleops cannot be driven, + so on PAUSED → CORRECTING the follower is instead slid to the teleop's + current pose before the correction begins. """ from __future__ import annotations @@ -175,17 +176,27 @@ class DAggerEvents: # --------------------------------------------------------------------------- -# TODO(Steven): re-enable programmatic teleop alignment once we decide whether -# to enforce motor-control methods on every Teleoperator. Until then the user -# is responsible for moving the leader arm to the follower's pose at the moment -# a correction begins. -def _teleop_smooth_move_to( - teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 50 -) -> None: - """Smoothly move teleop to target position via linear interpolation. +def _teleop_supports_feedback(teleop: Teleoperator) -> bool: + """Return True when the teleop can receive position feedback (is actuated). + TODO(Maxime): See if it is possible to unify this interface across teleops instead of duck-typing. + """ + return ( + bool(teleop.feedback_features) + and hasattr(teleop, "disable_torque") + and hasattr(teleop, "enable_torque") + ) - Requires the teleoperator to support motor control methods - (``enable_torque``, ``write_goal_positions``, ``get_action``). + +def _teleop_smooth_move_to( + teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 30 +) -> None: + """Smoothly move an actuated teleop to ``target_pos`` via linear interpolation. + + Requires the teleoperator to support feedback + (i.e. have non-empty ``feedback_features`` and implement ``disable_torque`` / ``enable_torque``). + + TODO(Maxime): This blocks up to ``duration_s`` seconds, during this time + the follower robot doesn't receive new actions, this could be an issue on LeKiwi. """ teleop.enable_torque() current = teleop.get_action() @@ -193,13 +204,28 @@ def _teleop_smooth_move_to( for step in range(steps + 1): t = step / steps - interp = {} - for k in current: - if k in target_pos: - interp[k] = current[k] * (1 - t) + target_pos[k] * t - else: - interp[k] = current[k] - teleop.write_goal_positions(interp) + interp = { + k: current[k] * (1 - t) + target_pos[k] * t if k in target_pos else current[k] for k in current + } + teleop.send_feedback(interp) + time.sleep(1 / fps) + + +def _follower_smooth_move_to( + robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30 +) -> None: + """Smoothly move the follower robot from ``current`` to ``target`` action. + + Used when the teleop is non-actuated: instead of driving the leader arm + to the follower, we bring the follower to the teleop's current pose. + Both ``current`` and ``target`` must be in robot-action key space. + """ + steps = max(int(duration_s * fps), 1) + + for step in range(steps + 1): + t = step / steps + interp = {k: current[k] * (1 - t) + target[k] * t if k in target else current[k] for k in current} + robot.send_action(interp) time.sleep(1 / fps) @@ -415,9 +441,6 @@ class DAggerStrategy(RolloutStrategy): engine.reset() interpolator.reset() events.reset() - # TODO(Steven): re-enable once Teleoperator motor-control methods are - # standardised; until then the user pre-aligns the leader by hand. - # teleop.disable_torque() engine.resume() last_action: dict[str, Any] | None = None @@ -441,8 +464,16 @@ class DAggerStrategy(RolloutStrategy): transition = events.consume_transition() if transition is not None: old_phase, new_phase = transition - self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop) - last_action = None + self._apply_transition( + old_phase, + new_phase, + engine, + interpolator, + ctx, + last_action, + ) + if new_phase == DAggerPhase.AUTONOMOUS: + last_action = None phase = events.phase obs = robot.get_observation() @@ -532,9 +563,6 @@ class DAggerStrategy(RolloutStrategy): finally: logger.info("DAgger continuous control loop ended — pausing engine") engine.pause() - # TODO(Steven): re-enable once Teleoperator motor-control methods - # are standardised across all teleop implementations. - # teleop.disable_torque() with contextlib.suppress(Exception): with self._episode_lock: dataset.save_episode() @@ -570,9 +598,6 @@ class DAggerStrategy(RolloutStrategy): engine.reset() interpolator.reset() events.reset() - # TODO(Steven): re-enable once Teleoperator motor-control methods are - # standardised; until then the user pre-aligns the leader by hand. - # teleop.disable_torque() engine.resume() last_action: dict[str, Any] | None = None @@ -600,8 +625,16 @@ class DAggerStrategy(RolloutStrategy): transition = events.consume_transition() if transition is not None: old_phase, new_phase = transition - self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop) - last_action = None + self._apply_transition( + old_phase, + new_phase, + engine, + interpolator, + ctx, + last_action, + ) + if new_phase == DAggerPhase.AUTONOMOUS: + last_action = None # Correction ended -> save episode (blocking if not streaming) if old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED: @@ -679,9 +712,6 @@ class DAggerStrategy(RolloutStrategy): finally: logger.info("DAgger corrections-only loop ended — pausing engine") engine.pause() - # TODO(Steven): re-enable once Teleoperator motor-control methods - # are standardised across all teleop implementations. - # teleop.disable_torque() with contextlib.suppress(Exception): with self._episode_lock: dataset.save_episode() @@ -698,36 +728,71 @@ class DAggerStrategy(RolloutStrategy): new_phase: DAggerPhase, engine, interpolator, - robot: ThreadSafeRobot, - teleop: Teleoperator, + ctx: RolloutContext, + prev_action: dict | None, ) -> None: - """Execute side-effects for a validated phase transition.""" + """Execute side-effects for a validated phase transition, including smooth handovers. + + AUTONOMOUS -> PAUSED (actuated teleop): + Pause the engine, then drive the leader arm to the follower's last + commanded position so the operator takes over without a jerk. + + PAUSED -> CORRECTING (non-actuated teleop): + Slide the follower to the teleop's current pose so the robot meets + the operator's hand rather than jumping to it on the first frame. + + CORRECTING -> PAUSED (actuated teleop): + Re-enable torque to hold position after correction. + This will be potentially useful if cancelling the correction recording + + PAUSED -> AUTONOMOUS: + Reset and resume the inference engine. + """ + teleop = ctx.hardware.teleop + robot = ctx.hardware.robot_wrapper + logger.info("Phase transition: %s -> %s", old_phase.value, new_phase.value) if old_phase == DAggerPhase.AUTONOMOUS and new_phase == DAggerPhase.PAUSED: - logger.info("Pausing engine — robot holds position") + logger.info("Pausing engine - robot holds position") engine.pause() - obs = robot.get_observation() - _robot_pos = { - k: v for k, v in obs.items() if k.endswith(".pos") and k in robot.observation_features - } - # TODO(Steven): once Teleoperator motor-control methods are - # standardised, drive the leader to the follower's pose here so the - # operator does not need to pre-align the arm by hand. Until then - # the user is responsible for the alignment. - # _teleop_smooth_move_to(teleop, _robot_pos, duration_s=2.0, fps=50) - elif new_phase == DAggerPhase.CORRECTING: - logger.info("Entering correction mode — human teleop control") - # TODO(Steven): re-enable once Teleoperator motor-control methods - # are standardised across all teleop implementations. - # teleop.disable_torque() + if _teleop_supports_feedback(teleop) and prev_action is not None: + # TODO(Maxime): prev_action is in robot action key space (output of robot_action_processor). + # send_feedback expects teleop feedback key space. For homogeneous setups (e.g. SO-101 + # leader + SO-101 follower) the keys are identical so this works. If the processor pipeline + # does non-trivial key renaming (e.g. a rename_map on action keys), the interpolation in + # _teleop_smooth_move_to silently no-ops and the arm doesn't move. + logger.info("Smooth handover: moving leader arm to follower position") + _teleop_smooth_move_to(teleop, prev_action) + + elif old_phase == DAggerPhase.PAUSED and new_phase == DAggerPhase.CORRECTING: + logger.info("Entering correction mode - human teleop control") + if not _teleop_supports_feedback(teleop) and prev_action is not None: + logger.info("Smooth handover: sliding follower to teleop position") + obs = robot.get_observation() + teleop_action = teleop.get_action() + processed = ctx.processors.teleop_action_processor((teleop_action, obs)) + target = ctx.processors.robot_action_processor((processed, obs)) + _follower_smooth_move_to(robot, prev_action, target) + + # unlock the teleop for human control + if _teleop_supports_feedback(teleop): + teleop.disable_torque() + + elif old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED: + if _teleop_supports_feedback(teleop): + teleop.enable_torque() elif new_phase == DAggerPhase.AUTONOMOUS: - logger.info("Resuming autonomous mode — resetting engine and interpolator") + logger.info("Resuming autonomous mode - resetting engine and interpolator") interpolator.reset() engine.reset() engine.resume() + # release teleop before resuming the policy + if _teleop_supports_feedback(teleop): + teleop.disable_torque() + # ------------------------------------------------------------------ # Background push (shared by both modes) # ------------------------------------------------------------------ diff --git a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py index edc28e7a6..2df1dfcfe 100644 --- a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py +++ b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py @@ -112,7 +112,7 @@ class OpenArmMini(Teleoperator): @property def feedback_features(self) -> dict[str, type]: - return {} + return self.action_features @property def is_connected(self) -> bool: @@ -348,8 +348,9 @@ class OpenArmMini(Teleoperator): if left_goals: self.bus_left.sync_write("Goal_Position", left_goals) + @check_if_not_connected def send_feedback(self, feedback: dict[str, float]) -> None: - raise NotImplementedError("Feedback is not yet implemented for OpenArm Mini.") + self.write_goal_positions(feedback) @check_if_not_connected def disconnect(self) -> None: diff --git a/src/lerobot/teleoperators/so_leader/so_leader.py b/src/lerobot/teleoperators/so_leader/so_leader.py index 04ce0f21f..7e731d5ed 100644 --- a/src/lerobot/teleoperators/so_leader/so_leader.py +++ b/src/lerobot/teleoperators/so_leader/so_leader.py @@ -59,7 +59,7 @@ class SOLeader(Teleoperator): @property def feedback_features(self) -> dict[str, type]: - return {} + return self.action_features @property def is_connected(self) -> bool: @@ -130,6 +130,12 @@ class SOLeader(Teleoperator): for motor in self.bus.motors: self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + def enable_torque(self) -> None: + self.bus.enable_torque() + + def disable_torque(self) -> None: + self.bus.disable_torque() + def setup_motors(self) -> None: for motor in reversed(self.bus.motors): input(f"Connect the controller board to the '{motor}' motor only and press enter.") @@ -145,9 +151,11 @@ class SOLeader(Teleoperator): logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action + @check_if_not_connected def send_feedback(self, feedback: dict[str, float]) -> None: - # TODO: Implement force feedback - raise NotImplementedError + goals = {k.removesuffix(".pos"): v for k, v in feedback.items() if k.endswith(".pos")} + if goals: + self.bus.sync_write("Goal_Position", goals) @check_if_not_connected def disconnect(self) -> None: