feat(dagger): adding smooth handover (#3506)

* feat(dagger): adding smooth handover


* update docstring


* small phase fix and documenting potential issues


* cleaning up
This commit is contained in:
Maxime Ellerbach
2026-05-05 14:44:32 +02:00
committed by GitHub
parent 82934719db
commit ce24063efd
3 changed files with 137 additions and 63 deletions
+123 -58
View File
@@ -33,12 +33,13 @@ Recording modes:
``record_autonomous=False``: Only correction windows are recorded.
Each correction (start to stop) becomes one episode.
Teleoperator expectations:
The user is responsible for keeping the leader arm aligned with the
follower arm at the moment a correction begins. Programmatic motor
handover (``enable_torque`` / ``disable_torque`` / ``write_goal_positions``)
is intentionally not invoked here — see the TODO in
:func:`DAggerStrategy._apply_transition` for the open design decision.
Teleoperator handover:
On AUTONOMOUS → PAUSED, actuated teleops (those with non-empty
``feedback_features``, e.g. SO-101, OpenArmMini) are smoothly driven to
the follower's last position via ``send_feedback`` so the operator takes
over without a jerk. Non-actuated teleops cannot be driven,
so on PAUSED → CORRECTING the follower is instead slid to the teleop's
current pose before the correction begins.
"""
from __future__ import annotations
@@ -175,17 +176,27 @@ class DAggerEvents:
# ---------------------------------------------------------------------------
# TODO(Steven): re-enable programmatic teleop alignment once we decide whether
# to enforce motor-control methods on every Teleoperator. Until then the user
# is responsible for moving the leader arm to the follower's pose at the moment
# a correction begins.
def _teleop_smooth_move_to(
teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 50
) -> None:
"""Smoothly move teleop to target position via linear interpolation.
def _teleop_supports_feedback(teleop: Teleoperator) -> bool:
"""Return True when the teleop can receive position feedback (is actuated).
TODO(Maxime): See if it is possible to unify this interface across teleops instead of duck-typing.
"""
return (
bool(teleop.feedback_features)
and hasattr(teleop, "disable_torque")
and hasattr(teleop, "enable_torque")
)
Requires the teleoperator to support motor control methods
(``enable_torque``, ``write_goal_positions``, ``get_action``).
def _teleop_smooth_move_to(
teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 30
) -> None:
"""Smoothly move an actuated teleop to ``target_pos`` via linear interpolation.
Requires the teleoperator to support feedback
(i.e. have non-empty ``feedback_features`` and implement ``disable_torque`` / ``enable_torque``).
TODO(Maxime): This blocks up to ``duration_s`` seconds, during this time
the follower robot doesn't receive new actions, this could be an issue on LeKiwi.
"""
teleop.enable_torque()
current = teleop.get_action()
@@ -193,13 +204,28 @@ def _teleop_smooth_move_to(
for step in range(steps + 1):
t = step / steps
interp = {}
for k in current:
if k in target_pos:
interp[k] = current[k] * (1 - t) + target_pos[k] * t
else:
interp[k] = current[k]
teleop.write_goal_positions(interp)
interp = {
k: current[k] * (1 - t) + target_pos[k] * t if k in target_pos else current[k] for k in current
}
teleop.send_feedback(interp)
time.sleep(1 / fps)
def _follower_smooth_move_to(
robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30
) -> None:
"""Smoothly move the follower robot from ``current`` to ``target`` action.
Used when the teleop is non-actuated: instead of driving the leader arm
to the follower, we bring the follower to the teleop's current pose.
Both ``current`` and ``target`` must be in robot-action key space.
"""
steps = max(int(duration_s * fps), 1)
for step in range(steps + 1):
t = step / steps
interp = {k: current[k] * (1 - t) + target[k] * t if k in target else current[k] for k in current}
robot.send_action(interp)
time.sleep(1 / fps)
@@ -415,9 +441,6 @@ class DAggerStrategy(RolloutStrategy):
engine.reset()
interpolator.reset()
events.reset()
# TODO(Steven): re-enable once Teleoperator motor-control methods are
# standardised; until then the user pre-aligns the leader by hand.
# teleop.disable_torque()
engine.resume()
last_action: dict[str, Any] | None = None
@@ -441,8 +464,16 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition()
if transition is not None:
old_phase, new_phase = transition
self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop)
last_action = None
self._apply_transition(
old_phase,
new_phase,
engine,
interpolator,
ctx,
last_action,
)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
phase = events.phase
obs = robot.get_observation()
@@ -532,9 +563,6 @@ class DAggerStrategy(RolloutStrategy):
finally:
logger.info("DAgger continuous control loop ended — pausing engine")
engine.pause()
# TODO(Steven): re-enable once Teleoperator motor-control methods
# are standardised across all teleop implementations.
# teleop.disable_torque()
with contextlib.suppress(Exception):
with self._episode_lock:
dataset.save_episode()
@@ -570,9 +598,6 @@ class DAggerStrategy(RolloutStrategy):
engine.reset()
interpolator.reset()
events.reset()
# TODO(Steven): re-enable once Teleoperator motor-control methods are
# standardised; until then the user pre-aligns the leader by hand.
# teleop.disable_torque()
engine.resume()
last_action: dict[str, Any] | None = None
@@ -600,8 +625,16 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition()
if transition is not None:
old_phase, new_phase = transition
self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop)
last_action = None
self._apply_transition(
old_phase,
new_phase,
engine,
interpolator,
ctx,
last_action,
)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
# Correction ended -> save episode (blocking if not streaming)
if old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED:
@@ -679,9 +712,6 @@ class DAggerStrategy(RolloutStrategy):
finally:
logger.info("DAgger corrections-only loop ended — pausing engine")
engine.pause()
# TODO(Steven): re-enable once Teleoperator motor-control methods
# are standardised across all teleop implementations.
# teleop.disable_torque()
with contextlib.suppress(Exception):
with self._episode_lock:
dataset.save_episode()
@@ -698,36 +728,71 @@ class DAggerStrategy(RolloutStrategy):
new_phase: DAggerPhase,
engine,
interpolator,
robot: ThreadSafeRobot,
teleop: Teleoperator,
ctx: RolloutContext,
prev_action: dict | None,
) -> None:
"""Execute side-effects for a validated phase transition."""
"""Execute side-effects for a validated phase transition, including smooth handovers.
AUTONOMOUS -> PAUSED (actuated teleop):
Pause the engine, then drive the leader arm to the follower's last
commanded position so the operator takes over without a jerk.
PAUSED -> CORRECTING (non-actuated teleop):
Slide the follower to the teleop's current pose so the robot meets
the operator's hand rather than jumping to it on the first frame.
CORRECTING -> PAUSED (actuated teleop):
Re-enable torque to hold position after correction.
This will be potentially useful if cancelling the correction recording
PAUSED -> AUTONOMOUS:
Reset and resume the inference engine.
"""
teleop = ctx.hardware.teleop
robot = ctx.hardware.robot_wrapper
logger.info("Phase transition: %s -> %s", old_phase.value, new_phase.value)
if old_phase == DAggerPhase.AUTONOMOUS and new_phase == DAggerPhase.PAUSED:
logger.info("Pausing engine robot holds position")
logger.info("Pausing engine - robot holds position")
engine.pause()
obs = robot.get_observation()
_robot_pos = {
k: v for k, v in obs.items() if k.endswith(".pos") and k in robot.observation_features
}
# TODO(Steven): once Teleoperator motor-control methods are
# standardised, drive the leader to the follower's pose here so the
# operator does not need to pre-align the arm by hand. Until then
# the user is responsible for the alignment.
# _teleop_smooth_move_to(teleop, _robot_pos, duration_s=2.0, fps=50)
elif new_phase == DAggerPhase.CORRECTING:
logger.info("Entering correction mode — human teleop control")
# TODO(Steven): re-enable once Teleoperator motor-control methods
# are standardised across all teleop implementations.
# teleop.disable_torque()
if _teleop_supports_feedback(teleop) and prev_action is not None:
# TODO(Maxime): prev_action is in robot action key space (output of robot_action_processor).
# send_feedback expects teleop feedback key space. For homogeneous setups (e.g. SO-101
# leader + SO-101 follower) the keys are identical so this works. If the processor pipeline
# does non-trivial key renaming (e.g. a rename_map on action keys), the interpolation in
# _teleop_smooth_move_to silently no-ops and the arm doesn't move.
logger.info("Smooth handover: moving leader arm to follower position")
_teleop_smooth_move_to(teleop, prev_action)
elif old_phase == DAggerPhase.PAUSED and new_phase == DAggerPhase.CORRECTING:
logger.info("Entering correction mode - human teleop control")
if not _teleop_supports_feedback(teleop) and prev_action is not None:
logger.info("Smooth handover: sliding follower to teleop position")
obs = robot.get_observation()
teleop_action = teleop.get_action()
processed = ctx.processors.teleop_action_processor((teleop_action, obs))
target = ctx.processors.robot_action_processor((processed, obs))
_follower_smooth_move_to(robot, prev_action, target)
# unlock the teleop for human control
if _teleop_supports_feedback(teleop):
teleop.disable_torque()
elif old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED:
if _teleop_supports_feedback(teleop):
teleop.enable_torque()
elif new_phase == DAggerPhase.AUTONOMOUS:
logger.info("Resuming autonomous mode resetting engine and interpolator")
logger.info("Resuming autonomous mode - resetting engine and interpolator")
interpolator.reset()
engine.reset()
engine.resume()
# release teleop before resuming the policy
if _teleop_supports_feedback(teleop):
teleop.disable_torque()
# ------------------------------------------------------------------
# Background push (shared by both modes)
# ------------------------------------------------------------------
@@ -112,7 +112,7 @@ class OpenArmMini(Teleoperator):
@property
def feedback_features(self) -> dict[str, type]:
return {}
return self.action_features
@property
def is_connected(self) -> bool:
@@ -348,8 +348,9 @@ class OpenArmMini(Teleoperator):
if left_goals:
self.bus_left.sync_write("Goal_Position", left_goals)
@check_if_not_connected
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm Mini.")
self.write_goal_positions(feedback)
@check_if_not_connected
def disconnect(self) -> None:
@@ -59,7 +59,7 @@ class SOLeader(Teleoperator):
@property
def feedback_features(self) -> dict[str, type]:
return {}
return self.action_features
@property
def is_connected(self) -> bool:
@@ -130,6 +130,12 @@ class SOLeader(Teleoperator):
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
def enable_torque(self) -> None:
self.bus.enable_torque()
def disable_torque(self) -> None:
self.bus.disable_torque()
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
@@ -145,9 +151,11 @@ class SOLeader(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
@check_if_not_connected
def send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError
goals = {k.removesuffix(".pos"): v for k, v in feedback.items() if k.endswith(".pos")}
if goals:
self.bus.sync_write("Goal_Position", goals)
@check_if_not_connected
def disconnect(self) -> None: