From fe7c368630aa51ee8cecd0189d53d1362a3a9019 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Tue, 12 Aug 2025 12:16:38 +0200 Subject: [PATCH] Adapt teleoperate and replay to pipeline similar to record --- src/lerobot/replay.py | 27 ++++++++++-- src/lerobot/teleoperate.py | 85 +++++++++++++++++++++++++++++++++----- 2 files changed, 97 insertions(+), 15 deletions(-) diff --git a/src/lerobot/replay.py b/src/lerobot/replay.py index a9dceb741..dbf908019 100644 --- a/src/lerobot/replay.py +++ b/src/lerobot/replay.py @@ -45,9 +45,11 @@ from dataclasses import asdict, dataclass from pathlib import Path from pprint import pformat -import draccus - +from lerobot.configs import parser from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.processor import RobotProcessor +from lerobot.processor.converters import to_output_robot_action, to_transition_teleop_action +from lerobot.processor.pipeline import IdentityProcessor from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, @@ -83,13 +85,25 @@ class ReplayConfig: dataset: DatasetReplayConfig # Use vocal synthesis to read events. play_sounds: bool = True + # Optional processor for actions before sending to robot + robot_action_processor: RobotProcessor | None = None -@draccus.wrap() +@parser.wrap() def replay(cfg: ReplayConfig): init_logging() logging.info(pformat(asdict(cfg))) + # Initialize robot action processor with default if not provided + robot_action_processor = cfg.robot_action_processor or RobotProcessor( + steps=[IdentityProcessor()], + to_transition=lambda tr: tr, + to_output=to_output_robot_action, # type: ignore[arg-type] + ) + + # Reset processor + robot_action_processor.reset() + robot = make_robot_from_config(cfg.robot) dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode]) actions = dataset.hf_dataset.select_columns("action") @@ -104,7 +118,12 @@ def replay(cfg: ReplayConfig): for i, name in enumerate(dataset.features["action"]["names"]): action[name] = action_array[i] - robot.send_action(action) + # Process action through robot action processor + # Note: We need to convert the action dict to a transition format first + action_transition = to_transition_teleop_action(action) + processed_action = robot_action_processor(action_transition) + + robot.send_action(processed_action) # type: ignore[arg-type] dt_s = time.perf_counter() - start_episode_t busy_wait(1 / dataset.fps - dt_s) diff --git a/src/lerobot/teleoperate.py b/src/lerobot/teleoperate.py index 320140bdb..cbaae79a6 100644 --- a/src/lerobot/teleoperate.py +++ b/src/lerobot/teleoperate.py @@ -56,11 +56,18 @@ import time from dataclasses import asdict, dataclass from pprint import pformat -import draccus import rerun as rr from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401 from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401 +from lerobot.configs import parser +from lerobot.processor import RobotProcessor +from lerobot.processor.converters import ( + to_output_robot_action, + to_transition_robot_observation, + to_transition_teleop_action, +) +from lerobot.processor.pipeline import IdentityProcessor from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, @@ -97,21 +104,67 @@ class TeleoperateConfig: teleop_time_s: float | None = None # Display all cameras on screen display_data: bool = False + # Optional processors for data transformation + teleop_action_processor: RobotProcessor | None = None # runs after teleop + robot_action_processor: RobotProcessor | None = None # runs before robot + robot_observation_processor: RobotProcessor | None = None # runs after robot def teleop_loop( - teleop: Teleoperator, robot: Robot, fps: int, display_data: bool = False, duration: float | None = None + teleop: Teleoperator, + robot: Robot, + fps: int, + display_data: bool = False, + duration: float | None = None, + teleop_action_processor: RobotProcessor | None = None, + robot_action_processor: RobotProcessor | None = None, + robot_observation_processor: RobotProcessor | None = None, ): + # Initialize processors with defaults if not provided + teleop_action_processor = teleop_action_processor or RobotProcessor( + steps=[IdentityProcessor()], to_transition=to_transition_teleop_action, to_output=lambda tr: tr + ) + robot_action_processor = robot_action_processor or RobotProcessor( + steps=[IdentityProcessor()], + to_transition=lambda tr: tr, + to_output=to_output_robot_action, # type: ignore[arg-type] + ) + robot_observation_processor = robot_observation_processor or RobotProcessor( + steps=[IdentityProcessor()], to_transition=to_transition_robot_observation, to_output=lambda tr: tr + ) + + # Reset processors + teleop_action_processor.reset() + robot_action_processor.reset() + robot_observation_processor.reset() + display_len = max(len(key) for key in robot.action_features) start = time.perf_counter() + while True: loop_start = time.perf_counter() - action = teleop.get_action() - if display_data: - observation = robot.get_observation() - log_rerun_data(observation=observation, action=action) - robot.send_action(action) + # Get robot observation + obs = robot.get_observation() + + # Process robot observation through pipeline + obs_transition = robot_observation_processor(obs) + + # Get teleop action + raw_action = teleop.get_action() + + # Process teleop action through pipeline + teleop_transition = teleop_action_processor(raw_action) + + # Process action for robot through pipeline + robot_action_to_send = robot_action_processor(teleop_transition) + + # Send processed action to robot (robot_action_processor.to_output should return dict[str, Any]) + robot.send_action(robot_action_to_send) # type: ignore[arg-type] + + if display_data: + log_rerun_data([obs_transition, teleop_transition]) + dt_s = time.perf_counter() - loop_start busy_wait(1 / fps - dt_s) @@ -119,17 +172,18 @@ def teleop_loop( print("\n" + "-" * (display_len + 10)) print(f"{'NAME':<{display_len}} | {'NORM':>7}") - for motor, value in action.items(): + # Display the final robot action that was sent + for motor, value in robot_action_to_send.items(): print(f"{motor:<{display_len}} | {value:>7.2f}") print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") if duration is not None and time.perf_counter() - start >= duration: return - move_cursor_up(len(action) + 5) + move_cursor_up(len(robot_action_to_send) + 5) -@draccus.wrap() +@parser.wrap() def teleoperate(cfg: TeleoperateConfig): init_logging() logging.info(pformat(asdict(cfg))) @@ -143,7 +197,16 @@ def teleoperate(cfg: TeleoperateConfig): robot.connect() try: - teleop_loop(teleop, robot, cfg.fps, display_data=cfg.display_data, duration=cfg.teleop_time_s) + teleop_loop( + teleop=teleop, + robot=robot, + fps=cfg.fps, + display_data=cfg.display_data, + duration=cfg.teleop_time_s, + teleop_action_processor=cfg.teleop_action_processor, + robot_action_processor=cfg.robot_action_processor, + robot_observation_processor=cfg.robot_observation_processor, + ) except KeyboardInterrupt: pass finally: