From d92b54e87a165b68a351614d0299dc82bed53de7 Mon Sep 17 00:00:00 2001 From: Maximellerbach Date: Thu, 4 Jun 2026 16:52:17 +0200 Subject: [PATCH] moving smooth teleop handover to control_utils and adding this behavior to legacy strategy --- src/lerobot/common/control_utils.py | 68 +++++++++++++++++++ src/lerobot/rollout/strategies/dagger.py | 83 ++++-------------------- src/lerobot/rollout/strategies/legacy.py | 30 ++++++++- 3 files changed, 109 insertions(+), 72 deletions(-) diff --git a/src/lerobot/common/control_utils.py b/src/lerobot/common/control_utils.py index efbcc082f..210ab4c8a 100644 --- a/src/lerobot/common/control_utils.py +++ b/src/lerobot/common/control_utils.py @@ -18,6 +18,7 @@ from __future__ import annotations # Utilities ######################################################################################## import logging +import time import traceback from contextlib import nullcontext from copy import copy @@ -243,3 +244,70 @@ def sanity_check_dataset_robot_compatibility( raise ValueError( "Dataset metadata compatibility check failed with mismatches:\n" + "\n".join(mismatches) ) + + +######################################################################################## +# Teleoperator smooth handover helpers +######################################################################################## + + +def teleop_supports_feedback(teleop) -> bool: + """Return True when the teleop can receive position feedback (is actuated). + + Actuated teleops (e.g. SO-101, OpenArmMini) have non-empty ``feedback_features`` + and expose ``enable_torque`` / ``disable_torque`` motor-control methods. + + TODO(Maxime): See if it is possible to unify this interface across teleops instead of duck-typing. + """ + return ( + bool(teleop.feedback_features) + and hasattr(teleop, "disable_torque") + and hasattr(teleop, "enable_torque") + ) + + +def teleop_smooth_move_to(teleop, target_pos: dict, duration_s: float = 2.0, fps: int = 30) -> None: + """Smoothly move an actuated teleop to ``target_pos`` via linear interpolation. + + Requires the teleoperator to support feedback (i.e. have non-empty + ``feedback_features`` and implement ``disable_torque`` / ``enable_torque``). + + ``target_pos`` is expected to be in the teleop's action/feedback key space. + For homogeneous setups (e.g. SO-101 leader + SO-101 follower) this matches + the robot action key space directly. + + TODO(Maxime): This blocks up to ``duration_s`` seconds; during this time the + follower robot does not receive new actions, which could be an issue on LeKiwi. + """ + teleop.enable_torque() + current = teleop.get_action() + steps = max(int(duration_s * fps), 1) + + for step in range(steps + 1): + t = step / steps + interp = { + k: current[k] * (1 - t) + target_pos[k] * t if k in target_pos else current[k] for k in current + } + teleop.send_feedback(interp) + time.sleep(1 / fps) + + +def follower_smooth_move_to( + robot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30 +) -> None: + """Smoothly move the follower robot from ``current`` to ``target`` action. + + Used when the teleop is non-actuated: instead of driving the leader arm to + the follower, the follower is brought to the teleop's current pose so the + robot meets the operator's hand rather than jumping to it on the first frame. + + Both ``current`` and ``target`` must be in the robot action key space + (i.e. the output of ``robot_action_processor``). + """ + steps = max(int(duration_s * fps), 1) + + for step in range(steps + 1): + t = step / steps + interp = {k: current[k] * (1 - t) + target[k] * t if k in target else current[k] for k in current} + robot.send_action(interp) + time.sleep(1 / fps) diff --git a/src/lerobot/rollout/strategies/dagger.py b/src/lerobot/rollout/strategies/dagger.py index 1bd6d9bb0..8791a5502 100644 --- a/src/lerobot/rollout/strategies/dagger.py +++ b/src/lerobot/rollout/strategies/dagger.py @@ -56,10 +56,14 @@ from typing import Any import numpy as np -from lerobot.common.control_utils import is_headless +from lerobot.common.control_utils import ( + follower_smooth_move_to, + is_headless, + teleop_smooth_move_to, + teleop_supports_feedback, +) from lerobot.datasets import VideoEncodingManager from lerobot.datasets.utils import DEFAULT_VIDEO_FILE_SIZE_IN_MB -from lerobot.teleoperators import Teleoperator from lerobot.utils.constants import ACTION, OBS_STR from lerobot.utils.feature_utils import build_dataset_frame from lerobot.utils.import_utils import _pynput_available @@ -69,7 +73,6 @@ from lerobot.utils.utils import log_say from ..configs import DAggerKeyboardConfig, DAggerPedalConfig, DAggerStrategyConfig from ..context import RolloutContext -from ..robot_wrapper import ThreadSafeRobot from .core import RolloutStrategy, estimate_max_episode_seconds, safe_push_to_hub, send_next_action PYNPUT_AVAILABLE = _pynput_available @@ -171,64 +174,6 @@ class DAggerEvents: self.upload_requested.clear() -# --------------------------------------------------------------------------- -# Teleoperator helpers -# --------------------------------------------------------------------------- - - -def _teleop_supports_feedback(teleop: Teleoperator) -> bool: - """Return True when the teleop can receive position feedback (is actuated). - TODO(Maxime): See if it is possible to unify this interface across teleops instead of duck-typing. - """ - return ( - bool(teleop.feedback_features) - and hasattr(teleop, "disable_torque") - and hasattr(teleop, "enable_torque") - ) - - -def _teleop_smooth_move_to( - teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 30 -) -> None: - """Smoothly move an actuated teleop to ``target_pos`` via linear interpolation. - - Requires the teleoperator to support feedback - (i.e. have non-empty ``feedback_features`` and implement ``disable_torque`` / ``enable_torque``). - - TODO(Maxime): This blocks up to ``duration_s`` seconds, during this time - the follower robot doesn't receive new actions, this could be an issue on LeKiwi. - """ - teleop.enable_torque() - current = teleop.get_action() - steps = max(int(duration_s * fps), 1) - - for step in range(steps + 1): - t = step / steps - interp = { - k: current[k] * (1 - t) + target_pos[k] * t if k in target_pos else current[k] for k in current - } - teleop.send_feedback(interp) - time.sleep(1 / fps) - - -def _follower_smooth_move_to( - robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30 -) -> None: - """Smoothly move the follower robot from ``current`` to ``target`` action. - - Used when the teleop is non-actuated: instead of driving the leader arm - to the follower, we bring the follower to the teleop's current pose. - Both ``current`` and ``target`` must be in robot-action key space. - """ - steps = max(int(duration_s * fps), 1) - - for step in range(steps + 1): - t = step / steps - interp = {k: current[k] * (1 - t) + target[k] * t if k in target else current[k] for k in current} - robot.send_action(interp) - time.sleep(1 / fps) - - # --------------------------------------------------------------------------- # Input device handlers # --------------------------------------------------------------------------- @@ -756,31 +701,31 @@ class DAggerStrategy(RolloutStrategy): logger.info("Pausing engine - robot holds position") engine.pause() - if _teleop_supports_feedback(teleop) and prev_action is not None: + if teleop_supports_feedback(teleop) and prev_action is not None: # TODO(Maxime): prev_action is in robot action key space (output of robot_action_processor). # send_feedback expects teleop feedback key space. For homogeneous setups (e.g. SO-101 # leader + SO-101 follower) the keys are identical so this works. If the processor pipeline # does non-trivial key renaming (e.g. a rename_map on action keys), the interpolation in - # _teleop_smooth_move_to silently no-ops and the arm doesn't move. + # teleop_smooth_move_to silently no-ops and the arm doesn't move. logger.info("Smooth handover: moving leader arm to follower position") - _teleop_smooth_move_to(teleop, prev_action) + teleop_smooth_move_to(teleop, prev_action) elif old_phase == DAggerPhase.PAUSED and new_phase == DAggerPhase.CORRECTING: logger.info("Entering correction mode - human teleop control") - if not _teleop_supports_feedback(teleop) and prev_action is not None: + if not teleop_supports_feedback(teleop) and prev_action is not None: logger.info("Smooth handover: sliding follower to teleop position") obs = robot.get_observation() teleop_action = teleop.get_action() processed = ctx.processors.teleop_action_processor((teleop_action, obs)) target = ctx.processors.robot_action_processor((processed, obs)) - _follower_smooth_move_to(robot, prev_action, target) + follower_smooth_move_to(robot, prev_action, target) # unlock the teleop for human control - if _teleop_supports_feedback(teleop): + if teleop_supports_feedback(teleop): teleop.disable_torque() elif old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED: - if _teleop_supports_feedback(teleop): + if teleop_supports_feedback(teleop): teleop.enable_torque() elif new_phase == DAggerPhase.AUTONOMOUS: @@ -790,7 +735,7 @@ class DAggerStrategy(RolloutStrategy): engine.resume() # release teleop before resuming the policy - if _teleop_supports_feedback(teleop): + if teleop_supports_feedback(teleop): teleop.disable_torque() # ------------------------------------------------------------------ diff --git a/src/lerobot/rollout/strategies/legacy.py b/src/lerobot/rollout/strategies/legacy.py index 20be321ed..d80a0a694 100644 --- a/src/lerobot/rollout/strategies/legacy.py +++ b/src/lerobot/rollout/strategies/legacy.py @@ -35,7 +35,13 @@ import contextlib import logging import time -from lerobot.common.control_utils import init_keyboard_listener, is_headless +from lerobot.common.control_utils import ( + follower_smooth_move_to, + init_keyboard_listener, + is_headless, + teleop_smooth_move_to, + teleop_supports_feedback, +) from lerobot.datasets import VideoEncodingManager from lerobot.utils.constants import ACTION, OBS_STR from lerobot.utils.feature_utils import build_dataset_frame @@ -134,8 +140,26 @@ class LegacyStrategy(RolloutStrategy): ): log_say("Reset the environment", play_sounds) - # returns to its initial joint positions captured at startup during the reset phase - if not teleop and self.config.reset_to_initial_position: + if teleop: + # Smooth handover so the transition to teleop control is jerk-free. + # For actuated teleops: drive the leader arm to the follower's current + # position so the operator takes over without fighting the arm. + # For non-actuated teleops: slide the follower to the teleop's current + # pose instead, since the leader cannot be driven. + obs = robot.get_observation() + current_pos = {k: v for k, v in obs.items() if k.endswith(".pos")} + if teleop_supports_feedback(teleop): + logger.info("Smooth handover: moving leader arm to follower position") + teleop_smooth_move_to(teleop, current_pos) + else: + logger.info("Smooth handover: sliding follower to teleop position") + teleop_action = teleop.get_action() + processed = ctx.processors.teleop_action_processor((teleop_action, obs)) + target = ctx.processors.robot_action_processor((processed, obs)) + follower_smooth_move_to(robot, current_pos, target) + + elif self.config.reset_to_initial_position: + # No teleop: return the robot to its startup position. self._return_to_initial_position(hw=ctx.hardware) self._reset_loop(