Modify replay

This commit is contained in:
Pepijn
2025-07-18 16:38:09 +02:00
parent 779d38fff0
commit 005d4bb011
+71 -6
View File
@@ -25,6 +25,17 @@ python -m lerobot.replay \
--dataset.repo_id=aliberts/record-test \
--dataset.episode=2
```
Biteleop example:
```shell
python -m lerobot.replay \
--robot.type=so101_follower_t \
--robot.port=/dev/tty.usbmodem58760432961 \
--robot.id=follower_arm_torque \
--dataset.repo_id=pepijn223/bilateral-wipe-table \
--dataset.episode=0 \
--biteleop=true
```
"""
import logging
@@ -44,7 +55,9 @@ from lerobot.robots import ( # noqa: F401
make_robot_from_config,
so100_follower,
so101_follower,
so101_follower_torque,
)
from lerobot.robots.so101_follower_torque import SO101FollowerT
from lerobot.utils.robot_utils import busy_wait
from lerobot.utils.utils import (
init_logging,
@@ -70,6 +83,8 @@ class ReplayConfig:
dataset: DatasetReplayConfig
# Use vocal synthesis to read events.
play_sounds: bool = True
# Use biteleop to replay the dataset
biteleop: bool = False
@draccus.wrap()
@@ -80,22 +95,72 @@ def replay(cfg: ReplayConfig):
robot = make_robot_from_config(cfg.robot)
dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode])
actions = dataset.hf_dataset.select_columns("action")
if cfg.biteleop:
if not isinstance(robot, SO101FollowerT):
raise ValueError(
"Bilateral teleoperation replay requires the robot to be of type SO101FollowerT."
)
log_say("Bilateral teleoperation replay enabled.", cfg.play_sounds)
robot.connect()
log_say("Replaying episode", cfg.play_sounds, blocking=True)
start_time_all = time.perf_counter()
for idx in range(dataset.num_frames):
start_episode_t = time.perf_counter()
start_loop_t = time.perf_counter()
action_array = actions[idx]["action"]
action = {}
action_from_ds_array = actions[idx]["action"]
action_from_ds = {}
for i, name in enumerate(dataset.features["action"]["names"]):
action[name] = action_array[i]
action_from_ds[name] = action_from_ds_array[i]
robot.send_action(action)
# Bilateral teleoperation
if cfg.biteleop:
# Get current follower robot observation
obs_f = robot.get_observation()
pos_f = {j: obs_f[f"{j}.pos"] for j in robot.bus.motors}
vel_f = {j: obs_f[f"{j}.vel"] for j in robot.bus.motors}
tau_reaction_f = {j: obs_f[f"{j}.effort"] for j in robot.bus.motors}
dt_s = time.perf_counter() - start_episode_t
# Get target leader state from the dataset
pos_l = {j: action_from_ds[f"{j}.pos"] for j in robot.bus.motors}
vel_l = {j: action_from_ds[f"{j}.vel"] for j in robot.bus.motors}
# The saved effort in dataset is -tau_reaction_l
neg_tau_reaction_l = {j: action_from_ds[f"{j}.effort"] for j in robot.bus.motors}
# Get control gains from the robot instance
kp_gains = robot.kp_gains
kd_gains = robot.kd_gains
kf_gains = robot.kf_gains
# Compute torque command for the follower robot
tau_cmd_f = [
(
kp_gains[j] * (pos_l[j] - pos_f[j]) # Position tracking
+ kd_gains[j] * (vel_l[j] - vel_f[j]) # Velocity damping
+ kf_gains[j] * (neg_tau_reaction_l[j] - tau_reaction_f[j]) # Force reflection
)
for j in robot.bus.motors
]
# Format action with calculated torques and send to robot
action_to_send = {f"{m}.effort": tau_cmd_f[i] for i, m in enumerate(robot.bus.motors)}
robot.send_action(action_to_send)
else:
# Original logic for standard position-based replay
robot.send_action(action_from_ds)
dt_s = time.perf_counter() - start_loop_t
busy_wait(1 / dataset.fps - dt_s)
total_time = time.perf_counter() - start_time_all
actual_fps = idx / total_time if total_time > 0 else float("inf")
logging.info(f"Average FPS achieved over episode: {actual_fps:.2f}")
log_say(f"Average FPS achieved: {actual_fps:.2f}", cfg.play_sounds)
robot.disconnect()