cleaning up

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
Maximellerbach
2026-05-05 13:44:02 +02:00
parent 8d9f137f70
commit 4c666599b4
+8 -17
View File
@@ -188,7 +188,7 @@ def _teleop_supports_feedback(teleop: Teleoperator) -> bool:
def _teleop_smooth_move_to( def _teleop_smooth_move_to(
teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 50 teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 30
) -> None: ) -> None:
"""Smoothly move an actuated teleop to ``target_pos`` via linear interpolation. """Smoothly move an actuated teleop to ``target_pos`` via linear interpolation.
@@ -212,7 +212,7 @@ def _teleop_smooth_move_to(
def _follower_smooth_move_to( def _follower_smooth_move_to(
robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 50 robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30
) -> None: ) -> None:
"""Smoothly move the follower robot from ``current`` to ``target`` action. """Smoothly move the follower robot from ``current`` to ``target`` action.
@@ -464,18 +464,13 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition() transition = events.consume_transition()
if transition is not None: if transition is not None:
old_phase, new_phase = transition old_phase, new_phase = transition
prev_action = last_action
obs = robot.get_observation()
self._apply_transition( self._apply_transition(
old_phase, old_phase,
new_phase, new_phase,
engine, engine,
interpolator, interpolator,
robot,
teleop,
ctx, ctx,
prev_action, last_action,
obs,
) )
if new_phase == DAggerPhase.AUTONOMOUS: if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None last_action = None
@@ -630,18 +625,13 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition() transition = events.consume_transition()
if transition is not None: if transition is not None:
old_phase, new_phase = transition old_phase, new_phase = transition
prev_action = last_action
obs = robot.get_observation()
self._apply_transition( self._apply_transition(
old_phase, old_phase,
new_phase, new_phase,
engine, engine,
interpolator, interpolator,
robot,
teleop,
ctx, ctx,
prev_action, last_action,
obs,
) )
if new_phase == DAggerPhase.AUTONOMOUS: if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None last_action = None
@@ -738,11 +728,8 @@ class DAggerStrategy(RolloutStrategy):
new_phase: DAggerPhase, new_phase: DAggerPhase,
engine, engine,
interpolator, interpolator,
robot: ThreadSafeRobot,
teleop: Teleoperator,
ctx: RolloutContext, ctx: RolloutContext,
prev_action: dict | None, prev_action: dict | None,
obs: dict,
) -> None: ) -> None:
"""Execute side-effects for a validated phase transition, including smooth handovers. """Execute side-effects for a validated phase transition, including smooth handovers.
@@ -761,6 +748,9 @@ class DAggerStrategy(RolloutStrategy):
PAUSED -> AUTONOMOUS: PAUSED -> AUTONOMOUS:
Reset and resume the inference engine. Reset and resume the inference engine.
""" """
teleop = ctx.hardware.teleop
robot = ctx.hardware.robot_wrapper
logger.info("Phase transition: %s -> %s", old_phase.value, new_phase.value) logger.info("Phase transition: %s -> %s", old_phase.value, new_phase.value)
if old_phase == DAggerPhase.AUTONOMOUS and new_phase == DAggerPhase.PAUSED: if old_phase == DAggerPhase.AUTONOMOUS and new_phase == DAggerPhase.PAUSED:
logger.info("Pausing engine - robot holds position") logger.info("Pausing engine - robot holds position")
@@ -779,6 +769,7 @@ class DAggerStrategy(RolloutStrategy):
logger.info("Entering correction mode - human teleop control") logger.info("Entering correction mode - human teleop control")
if not _teleop_supports_feedback(teleop) and prev_action is not None: if not _teleop_supports_feedback(teleop) and prev_action is not None:
logger.info("Smooth handover: sliding follower to teleop position") logger.info("Smooth handover: sliding follower to teleop position")
obs = robot.get_observation()
teleop_action = teleop.get_action() teleop_action = teleop.get_action()
processed = ctx.processors.teleop_action_processor((teleop_action, obs)) processed = ctx.processors.teleop_action_processor((teleop_action, obs))
target = ctx.processors.robot_action_processor((processed, obs)) target = ctx.processors.robot_action_processor((processed, obs))