Adapt teleoperate and replay to pipeline similar to record

This commit is contained in:
Michel Aractingi
2025-08-12 12:16:38 +02:00
parent f65e74af9c
commit fe7c368630
2 changed files with 97 additions and 15 deletions
+23 -4
View File
@@ -45,9 +45,11 @@ from dataclasses import asdict, dataclass
from pathlib import Path
from pprint import pformat
import draccus
from lerobot.configs import parser
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.processor import RobotProcessor
from lerobot.processor.converters import to_output_robot_action, to_transition_teleop_action
from lerobot.processor.pipeline import IdentityProcessor
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
@@ -83,13 +85,25 @@ class ReplayConfig:
dataset: DatasetReplayConfig
# Use vocal synthesis to read events.
play_sounds: bool = True
# Optional processor for actions before sending to robot
robot_action_processor: RobotProcessor | None = None
@draccus.wrap()
@parser.wrap()
def replay(cfg: ReplayConfig):
init_logging()
logging.info(pformat(asdict(cfg)))
# Initialize robot action processor with default if not provided
robot_action_processor = cfg.robot_action_processor or RobotProcessor(
steps=[IdentityProcessor()],
to_transition=lambda tr: tr,
to_output=to_output_robot_action, # type: ignore[arg-type]
)
# Reset processor
robot_action_processor.reset()
robot = make_robot_from_config(cfg.robot)
dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode])
actions = dataset.hf_dataset.select_columns("action")
@@ -104,7 +118,12 @@ def replay(cfg: ReplayConfig):
for i, name in enumerate(dataset.features["action"]["names"]):
action[name] = action_array[i]
robot.send_action(action)
# Process action through robot action processor
# Note: We need to convert the action dict to a transition format first
action_transition = to_transition_teleop_action(action)
processed_action = robot_action_processor(action_transition)
robot.send_action(processed_action) # type: ignore[arg-type]
dt_s = time.perf_counter() - start_episode_t
busy_wait(1 / dataset.fps - dt_s)
+74 -11
View File
@@ -56,11 +56,18 @@ import time
from dataclasses import asdict, dataclass
from pprint import pformat
import draccus
import rerun as rr
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.configs import parser
from lerobot.processor import RobotProcessor
from lerobot.processor.converters import (
to_output_robot_action,
to_transition_robot_observation,
to_transition_teleop_action,
)
from lerobot.processor.pipeline import IdentityProcessor
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
@@ -97,21 +104,67 @@ class TeleoperateConfig:
teleop_time_s: float | None = None
# Display all cameras on screen
display_data: bool = False
# Optional processors for data transformation
teleop_action_processor: RobotProcessor | None = None # runs after teleop
robot_action_processor: RobotProcessor | None = None # runs before robot
robot_observation_processor: RobotProcessor | None = None # runs after robot
def teleop_loop(
teleop: Teleoperator, robot: Robot, fps: int, display_data: bool = False, duration: float | None = None
teleop: Teleoperator,
robot: Robot,
fps: int,
display_data: bool = False,
duration: float | None = None,
teleop_action_processor: RobotProcessor | None = None,
robot_action_processor: RobotProcessor | None = None,
robot_observation_processor: RobotProcessor | None = None,
):
# Initialize processors with defaults if not provided
teleop_action_processor = teleop_action_processor or RobotProcessor(
steps=[IdentityProcessor()], to_transition=to_transition_teleop_action, to_output=lambda tr: tr
)
robot_action_processor = robot_action_processor or RobotProcessor(
steps=[IdentityProcessor()],
to_transition=lambda tr: tr,
to_output=to_output_robot_action, # type: ignore[arg-type]
)
robot_observation_processor = robot_observation_processor or RobotProcessor(
steps=[IdentityProcessor()], to_transition=to_transition_robot_observation, to_output=lambda tr: tr
)
# Reset processors
teleop_action_processor.reset()
robot_action_processor.reset()
robot_observation_processor.reset()
display_len = max(len(key) for key in robot.action_features)
start = time.perf_counter()
while True:
loop_start = time.perf_counter()
action = teleop.get_action()
if display_data:
observation = robot.get_observation()
log_rerun_data(observation=observation, action=action)
robot.send_action(action)
# Get robot observation
obs = robot.get_observation()
# Process robot observation through pipeline
obs_transition = robot_observation_processor(obs)
# Get teleop action
raw_action = teleop.get_action()
# Process teleop action through pipeline
teleop_transition = teleop_action_processor(raw_action)
# Process action for robot through pipeline
robot_action_to_send = robot_action_processor(teleop_transition)
# Send processed action to robot (robot_action_processor.to_output should return dict[str, Any])
robot.send_action(robot_action_to_send) # type: ignore[arg-type]
if display_data:
log_rerun_data([obs_transition, teleop_transition])
dt_s = time.perf_counter() - loop_start
busy_wait(1 / fps - dt_s)
@@ -119,17 +172,18 @@ def teleop_loop(
print("\n" + "-" * (display_len + 10))
print(f"{'NAME':<{display_len}} | {'NORM':>7}")
for motor, value in action.items():
# Display the final robot action that was sent
for motor, value in robot_action_to_send.items():
print(f"{motor:<{display_len}} | {value:>7.2f}")
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
if duration is not None and time.perf_counter() - start >= duration:
return
move_cursor_up(len(action) + 5)
move_cursor_up(len(robot_action_to_send) + 5)
@draccus.wrap()
@parser.wrap()
def teleoperate(cfg: TeleoperateConfig):
init_logging()
logging.info(pformat(asdict(cfg)))
@@ -143,7 +197,16 @@ def teleoperate(cfg: TeleoperateConfig):
robot.connect()
try:
teleop_loop(teleop, robot, cfg.fps, display_data=cfg.display_data, duration=cfg.teleop_time_s)
teleop_loop(
teleop=teleop,
robot=robot,
fps=cfg.fps,
display_data=cfg.display_data,
duration=cfg.teleop_time_s,
teleop_action_processor=cfg.teleop_action_processor,
robot_action_processor=cfg.robot_action_processor,
robot_observation_processor=cfg.robot_observation_processor,
)
except KeyboardInterrupt:
pass
finally: