Compare commits

...

2 Commits

Author SHA1 Message Date
Maximellerbach 4c666599b4 cleaning up
Co-authored-by: Copilot <copilot@github.com>
2026-05-05 13:44:02 +02:00
Maximellerbach 8d9f137f70 small phase fix and documenting potential issues
Co-authored-by: Copilot <copilot@github.com>
2026-05-05 11:59:16 +02:00
2 changed files with 25 additions and 17 deletions
+24 -17
View File
@@ -188,12 +188,15 @@ def _teleop_supports_feedback(teleop: Teleoperator) -> bool:
def _teleop_smooth_move_to(
teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 50
teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 30
) -> None:
"""Smoothly move an actuated teleop to ``target_pos`` via linear interpolation.
Requires the teleoperator to support feedback
(i.e. have non-empty ``feedback_features`` and implement ``disable_torque`` / ``enable_torque``).
TODO(Maxime): This blocks up to ``duration_s`` seconds, during this time
the follower robot doesn't receive new actions, this could be an issue on LeKiwi.
"""
teleop.enable_torque()
current = teleop.get_action()
@@ -209,7 +212,7 @@ def _teleop_smooth_move_to(
def _follower_smooth_move_to(
robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 50
robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30
) -> None:
"""Smoothly move the follower robot from ``current`` to ``target`` action.
@@ -461,18 +464,13 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition()
if transition is not None:
old_phase, new_phase = transition
prev_action = last_action
obs = robot.get_observation()
self._apply_transition(
old_phase,
new_phase,
engine,
interpolator,
robot,
teleop,
ctx,
prev_action,
obs,
last_action,
)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
@@ -627,18 +625,13 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition()
if transition is not None:
old_phase, new_phase = transition
prev_action = last_action
obs = robot.get_observation()
self._apply_transition(
old_phase,
new_phase,
engine,
interpolator,
robot,
teleop,
ctx,
prev_action,
obs,
last_action,
)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
@@ -735,11 +728,8 @@ class DAggerStrategy(RolloutStrategy):
new_phase: DAggerPhase,
engine,
interpolator,
robot: ThreadSafeRobot,
teleop: Teleoperator,
ctx: RolloutContext,
prev_action: dict | None,
obs: dict,
) -> None:
"""Execute side-effects for a validated phase transition, including smooth handovers.
@@ -751,15 +741,27 @@ class DAggerStrategy(RolloutStrategy):
Slide the follower to the teleop's current pose so the robot meets
the operator's hand rather than jumping to it on the first frame.
CORRECTING -> PAUSED (actuated teleop):
Re-enable torque to hold position after correction.
This will be potentially useful if cancelling the correction recording
PAUSED -> AUTONOMOUS:
Reset and resume the inference engine.
"""
teleop = ctx.hardware.teleop
robot = ctx.hardware.robot_wrapper
logger.info("Phase transition: %s -> %s", old_phase.value, new_phase.value)
if old_phase == DAggerPhase.AUTONOMOUS and new_phase == DAggerPhase.PAUSED:
logger.info("Pausing engine - robot holds position")
engine.pause()
if _teleop_supports_feedback(teleop) and prev_action is not None:
# TODO(Maxime): prev_action is in robot action key space (output of robot_action_processor).
# send_feedback expects teleop feedback key space. For homogeneous setups (e.g. SO-101
# leader + SO-101 follower) the keys are identical so this works. If the processor pipeline
# does non-trivial key renaming (e.g. a rename_map on action keys), the interpolation in
# _teleop_smooth_move_to silently no-ops and the arm doesn't move.
logger.info("Smooth handover: moving leader arm to follower position")
_teleop_smooth_move_to(teleop, prev_action)
@@ -767,6 +769,7 @@ class DAggerStrategy(RolloutStrategy):
logger.info("Entering correction mode - human teleop control")
if not _teleop_supports_feedback(teleop) and prev_action is not None:
logger.info("Smooth handover: sliding follower to teleop position")
obs = robot.get_observation()
teleop_action = teleop.get_action()
processed = ctx.processors.teleop_action_processor((teleop_action, obs))
target = ctx.processors.robot_action_processor((processed, obs))
@@ -776,6 +779,10 @@ class DAggerStrategy(RolloutStrategy):
if _teleop_supports_feedback(teleop):
teleop.disable_torque()
elif old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED:
if _teleop_supports_feedback(teleop):
teleop.enable_torque()
elif new_phase == DAggerPhase.AUTONOMOUS:
logger.info("Resuming autonomous mode - resetting engine and interpolator")
interpolator.reset()
@@ -348,6 +348,7 @@ class OpenArmMini(Teleoperator):
if left_goals:
self.bus_left.sync_write("Goal_Position", left_goals)
@check_if_not_connected
def send_feedback(self, feedback: dict[str, float]) -> None:
self.write_goal_positions(feedback)