mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-14 08:09:45 +00:00
6d269b28c8
* docs(omx): adding some examples and scripts * cleaning up and reviewing the cli args * adding __init__.py to example folder, adjusting the examples * adding reference to pretrained act policy * moving `.send_action` before `dataset.add_frame` for consistency Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net> * adjusting docstring Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net> * adressing hardcoded dataset fps * removed init as it worked without --------- Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>
423 lines
16 KiB
Python
423 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Auto-record grab episodes for the OMX robot arm.
|
|
|
|
Each episode cycle:
|
|
1. grab_and_place — grab cube from workspace center and place at a random (pan, reach) position
|
|
2. HOME — return arm to home with gripper open
|
|
3. record_grab — execute a targeted grab to the stored position while recording
|
|
observations + actions to a LeRobotDataset
|
|
|
|
Usage (run from repo root):
|
|
python -m examples.omx.record_grab \\
|
|
--robot.type=omx_follower \\
|
|
--robot.port=/dev/ttyACM0 \\
|
|
--robot.id=omx_follower \\
|
|
--robot.cameras="{ wrist: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30, fourcc: MJPG}, top: {type: opencv, index_or_path: 4, width: 640, height: 480, fps: 30, fourcc: MJPG} }" \\
|
|
--dataset.repo_id=<hf_username>/<dataset_name> \\
|
|
--dataset.root=data/omx_grab \\
|
|
--dataset.num_episodes=50 \\
|
|
--dataset.single_task="Grab the cube" \\
|
|
--dataset.streaming_encoding=true
|
|
"""
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from pprint import pformat
|
|
|
|
import numpy as np
|
|
|
|
from lerobot.cameras import CameraConfig # noqa: F401
|
|
from lerobot.cameras.opencv import OpenCVCameraConfig # noqa: F401
|
|
from lerobot.configs import parser
|
|
from lerobot.configs.dataset import DatasetRecordConfig
|
|
from lerobot.datasets import (
|
|
LeRobotDataset,
|
|
VideoEncodingManager,
|
|
aggregate_pipeline_dataset_features,
|
|
create_initial_features,
|
|
)
|
|
from lerobot.processor import make_default_processors
|
|
from lerobot.robots import RobotConfig, make_robot_from_config
|
|
from lerobot.robots.omx_follower import OmxFollower
|
|
from lerobot.utils.constants import ACTION, OBS_STR
|
|
from lerobot.utils.feature_utils import build_dataset_frame, combine_feature_dicts
|
|
from lerobot.utils.robot_utils import precise_sleep
|
|
|
|
from .reset_environment import (
|
|
APPROACH_SPEED,
|
|
GRIPPER_CLOSE_POS,
|
|
HOME_POSE,
|
|
PUSH_END_ELBOW_FLEX,
|
|
PUSH_END_SHOULDER_LIFT,
|
|
PUSH_START_ELBOW_FLEX,
|
|
PUSH_START_SHOULDER_LIFT,
|
|
array_to_pose,
|
|
grab_cube,
|
|
horizontal_wrist_flex,
|
|
move_to_pose,
|
|
place_cube,
|
|
pose_to_array,
|
|
)
|
|
|
|
# ── Grab-episode motion parameters ────────────────────────────────────────────
|
|
|
|
# Shoulder-lift offset for the raised approach phase (subtracted from the target sl, arm is higher).
|
|
GRAB_RAISE_SL_OFFSET = 20.0
|
|
GRAB_LOWER_SPEED = 20.0
|
|
RECORD_SPEED = 30.0
|
|
|
|
# Pose the arm travels to after closing the gripper (cube held).
|
|
GRAB_CARRY_POSE = {
|
|
"shoulder_pan.pos": -23.0,
|
|
"shoulder_lift.pos": 5.0,
|
|
"elbow_flex.pos": 18.0,
|
|
"wrist_flex.pos": -14.0,
|
|
"wrist_roll.pos": 0.0,
|
|
"gripper.pos": GRIPPER_CLOSE_POS,
|
|
}
|
|
|
|
# Per-joint jitter limits (degrees) applied to transit waypoints for human-like variation.
|
|
# Cube-approach and carry poses are never jittered to preserve precision.
|
|
_JITTER_LIMITS: dict[str, float] = {
|
|
"shoulder_pan.pos": 5.0,
|
|
"shoulder_lift.pos": 4.0,
|
|
"elbow_flex.pos": 4.0,
|
|
"wrist_flex.pos": 3.0,
|
|
"wrist_roll.pos": 2.0,
|
|
"gripper.pos": 0.0,
|
|
}
|
|
|
|
|
|
def _jitter_pose(pose: dict, rng: np.random.Generator) -> dict:
|
|
"""Return a copy of pose with independent per-joint random perturbations."""
|
|
return {
|
|
k: v + rng.uniform(-_JITTER_LIMITS.get(k, 0.0), _JITTER_LIMITS.get(k, 0.0)) for k, v in pose.items()
|
|
}
|
|
|
|
|
|
def _random_stuck_pose(rng: np.random.Generator) -> dict:
|
|
"""Return a physically plausible stuck pose (failed grasp), gripper closed.
|
|
|
|
ef bounds are piecewise-linear in sl so the arm stays in a reachable,
|
|
table-safe envelope across the full sl range:
|
|
sl=-50 → ef ∈ [ 0, 50] (arm raised, can be bent forward)
|
|
sl= 0 → ef ∈ [-25, 25] (mid reach)
|
|
sl= 30 → ef ∈ [-20, 0] (arm extended, little room to flex)
|
|
wrist_flex is randomly offset from the horizontal value.
|
|
"""
|
|
pan = float(rng.uniform(-5.0, 35.0))
|
|
sl = float(rng.uniform(-50.0, 30.0))
|
|
|
|
if sl <= 0.0:
|
|
alpha = (sl + 50.0) / 50.0 # 0 at sl=-50, 1 at sl=0
|
|
ef_lo = alpha * -25.0 # 0 → -25
|
|
ef_hi = 50.0 + alpha * -25.0 # 50 → 25
|
|
else:
|
|
alpha = sl / 30.0 # 0 at sl=0, 1 at sl=30
|
|
ef_lo = -25.0 + alpha * 5.0 # -25 → -20
|
|
ef_hi = 25.0 + alpha * -25.0 # 25 → 0
|
|
|
|
ef = float(rng.uniform(ef_lo, ef_hi))
|
|
wf = horizontal_wrist_flex(sl, ef) + float(rng.uniform(-15.0, 15.0))
|
|
return {
|
|
"shoulder_pan.pos": pan,
|
|
"shoulder_lift.pos": sl,
|
|
"elbow_flex.pos": ef,
|
|
"wrist_flex.pos": wf,
|
|
"wrist_roll.pos": float(rng.uniform(-15.0, 15.0)),
|
|
"gripper.pos": GRIPPER_CLOSE_POS,
|
|
}
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class OmxRecordGrabConfig:
|
|
robot: RobotConfig
|
|
dataset: DatasetRecordConfig
|
|
# Resume recording on an existing dataset.
|
|
resume: bool = False
|
|
# Fraction of episodes that start from a random stuck pose (gripper closed) to
|
|
# generate recovery data. 0.0 = disabled, 1.0 = all episodes are recovery starts.
|
|
recovery_prob: float = 0.5
|
|
|
|
|
|
def record_episode_spline(
|
|
robot: OmxFollower,
|
|
waypoints: list[dict],
|
|
speeds: list[float],
|
|
dataset: LeRobotDataset,
|
|
task: str,
|
|
) -> None:
|
|
"""Execute a Catmull-Rom-style spline through waypoints, recording each frame.
|
|
|
|
Segment durations are parameterized from the maximum absolute joint delta
|
|
between consecutive waypoints divided by the requested segment speed,
|
|
producing non-uniform timing in joint space. Interior tangents are derived
|
|
from the adjacent per-segment velocities, with clamped (zero-velocity)
|
|
endpoints so the arm starts and stops smoothly. Each segment is cubic
|
|
Hermite, giving C1 continuity at every waypoint.
|
|
"""
|
|
pts = [pose_to_array(w) for w in waypoints]
|
|
n = len(pts)
|
|
|
|
# Steps and duration per segment
|
|
n_steps_list = []
|
|
timestamps = []
|
|
for i in range(n - 1):
|
|
max_dist = float(np.max(np.abs(pts[i + 1] - pts[i])))
|
|
ns = max(1, int(max_dist / speeds[i] * dataset.fps)) if max_dist >= 0.5 else 0
|
|
n_steps_list.append(ns)
|
|
timestamps.append(ns / dataset.fps)
|
|
|
|
# Velocity tangents (deg/sec) — clamped at endpoints, Catmull-Rom for interior
|
|
vels = [np.zeros_like(pts[0])]
|
|
for i in range(1, n - 1):
|
|
v_prev = (pts[i] - pts[i - 1]) / timestamps[i - 1] if timestamps[i - 1] > 0 else np.zeros_like(pts[0])
|
|
v_next = (pts[i + 1] - pts[i]) / timestamps[i] if timestamps[i] > 0 else np.zeros_like(pts[0])
|
|
vels.append(0.5 * (v_prev + v_next))
|
|
vels.append(np.zeros_like(pts[0]))
|
|
|
|
dt = 1.0 / dataset.fps
|
|
for seg in range(n - 1):
|
|
ns = n_steps_list[seg]
|
|
if ns == 0:
|
|
continue
|
|
p0, p1 = pts[seg], pts[seg + 1]
|
|
# Scale velocity (deg/sec) to t-space tangent (deg/t-unit, where t: 0→1 over ns steps)
|
|
m0 = vels[seg] * timestamps[seg]
|
|
m1 = vels[seg + 1] * timestamps[seg]
|
|
|
|
for step in range(1, ns + 1):
|
|
t = step / ns
|
|
h00 = 2 * t**3 - 3 * t**2 + 1
|
|
h10 = t**3 - 2 * t**2 + t
|
|
h01 = -2 * t**3 + 3 * t**2
|
|
h11 = t**3 - t**2
|
|
commanded = h00 * p0 + h10 * m0 + h01 * p1 + h11 * m1
|
|
|
|
action = array_to_pose(commanded)
|
|
robot.send_action(action)
|
|
obs = robot.get_observation()
|
|
obs_frame = build_dataset_frame(dataset.features, obs, prefix=OBS_STR)
|
|
action_frame = build_dataset_frame(dataset.features, action, prefix=ACTION)
|
|
dataset.add_frame({**obs_frame, **action_frame, "task": task})
|
|
precise_sleep(dt)
|
|
|
|
|
|
def record_grab_episode(
|
|
robot: OmxFollower,
|
|
dataset: LeRobotDataset,
|
|
pan: float,
|
|
t: float,
|
|
task: str,
|
|
recovery_start: bool = False,
|
|
) -> None:
|
|
"""Execute a targeted grab to the stored (pan, t) position, recording every frame.
|
|
|
|
Normal sequence (initial HOME move is NOT recorded):
|
|
HOME → raised approach above cube → lower → close gripper
|
|
→ raise [jittered] → retract [jittered] → GRAB_CARRY_POSE → drop → HOME
|
|
|
|
Recovery sequence (recovery_start=True): arm is moved to a random stuck pose
|
|
(gripper closed) without recording, then recording begins from there:
|
|
stuck_pose → raised approach above cube → [normal grab sequence from there]
|
|
|
|
All segments are joined by a Catmull-Rom spline (C1-continuous velocities).
|
|
"""
|
|
sl = PUSH_START_SHOULDER_LIFT + t * (PUSH_END_SHOULDER_LIFT - PUSH_START_SHOULDER_LIFT)
|
|
ef = PUSH_START_ELBOW_FLEX + t * (PUSH_END_ELBOW_FLEX - PUSH_START_ELBOW_FLEX)
|
|
sl_raised = sl - GRAB_RAISE_SL_OFFSET
|
|
wf_horizontal = horizontal_wrist_flex(sl, ef)
|
|
|
|
rng = np.random.default_rng()
|
|
|
|
if recovery_start:
|
|
stuck_pose = _random_stuck_pose(rng)
|
|
logger.info(f"Recovery start: {stuck_pose}")
|
|
move_to_pose(robot, stuck_pose, APPROACH_SPEED)
|
|
first_waypoints = [stuck_pose]
|
|
first_speeds = []
|
|
else:
|
|
jittery_start = _jitter_pose(HOME_POSE, rng)
|
|
move_to_pose(robot, jittery_start, APPROACH_SPEED)
|
|
first_waypoints = [jittery_start]
|
|
first_speeds = []
|
|
|
|
waypoints = first_waypoints + [
|
|
{ # raised approach: arm above cube
|
|
"shoulder_pan.pos": pan,
|
|
"shoulder_lift.pos": sl_raised,
|
|
"elbow_flex.pos": ef,
|
|
"wrist_flex.pos": horizontal_wrist_flex(sl_raised, ef),
|
|
"wrist_roll.pos": 0.0,
|
|
"gripper.pos": 60.0,
|
|
},
|
|
{ # lower onto cube — no jitter: precision needed
|
|
"shoulder_pan.pos": pan,
|
|
"shoulder_lift.pos": sl,
|
|
"elbow_flex.pos": ef,
|
|
"wrist_flex.pos": wf_horizontal,
|
|
"wrist_roll.pos": 0.0,
|
|
"gripper.pos": 60.0,
|
|
},
|
|
{ # close gripper — no jitter: precision needed
|
|
"shoulder_pan.pos": pan,
|
|
"shoulder_lift.pos": sl,
|
|
"elbow_flex.pos": ef,
|
|
"wrist_flex.pos": wf_horizontal,
|
|
"wrist_roll.pos": 0.0,
|
|
"gripper.pos": GRIPPER_CLOSE_POS,
|
|
},
|
|
_jitter_pose(
|
|
{ # raise with cube
|
|
"shoulder_pan.pos": pan,
|
|
"shoulder_lift.pos": sl_raised,
|
|
"elbow_flex.pos": ef,
|
|
"wrist_flex.pos": horizontal_wrist_flex(sl_raised, ef),
|
|
"wrist_roll.pos": 0.0,
|
|
"gripper.pos": GRIPPER_CLOSE_POS,
|
|
},
|
|
rng,
|
|
),
|
|
_jitter_pose(
|
|
{ # retract: fold arm toward HOME before sweeping to carry zone
|
|
"shoulder_pan.pos": pan * 0.25,
|
|
"shoulder_lift.pos": HOME_POSE["shoulder_lift.pos"] + 5.0,
|
|
"elbow_flex.pos": HOME_POSE["elbow_flex.pos"] - 5.0,
|
|
"wrist_flex.pos": 0.0,
|
|
"wrist_roll.pos": 0.0,
|
|
"gripper.pos": GRIPPER_CLOSE_POS,
|
|
},
|
|
rng,
|
|
),
|
|
GRAB_CARRY_POSE, # no jitter: target drop zone
|
|
{**GRAB_CARRY_POSE, "gripper.pos": 60.0}, # drop cube
|
|
HOME_POSE,
|
|
]
|
|
speeds = first_speeds + [
|
|
RECORD_SPEED, # (HOME →) raised approach
|
|
GRAB_LOWER_SPEED, # raised approach → lower
|
|
GRAB_LOWER_SPEED, # lower → close gripper
|
|
RECORD_SPEED, # close gripper → raise
|
|
RECORD_SPEED, # raise → retract
|
|
RECORD_SPEED, # retract → carry pose
|
|
RECORD_SPEED, # carry pose → drop
|
|
RECORD_SPEED, # drop → HOME
|
|
]
|
|
|
|
record_episode_spline(robot, waypoints, speeds, dataset, task)
|
|
|
|
# Dwell at HOME for ~0.5 s before next episode
|
|
home_action = build_dataset_frame(dataset.features, HOME_POSE, prefix=ACTION)
|
|
dt = 1.0 / dataset.fps
|
|
for _ in range(int(dataset.fps * 0.5)):
|
|
robot.send_action(HOME_POSE)
|
|
obs = robot.get_observation()
|
|
obs_frame = build_dataset_frame(dataset.features, obs, prefix=OBS_STR)
|
|
dataset.add_frame({**obs_frame, **home_action, "task": task})
|
|
precise_sleep(dt)
|
|
|
|
|
|
@parser.wrap()
|
|
def record_grab(cfg: OmxRecordGrabConfig) -> LeRobotDataset:
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
|
logger.info(pformat(cfg))
|
|
|
|
robot = make_robot_from_config(cfg.robot)
|
|
use_videos = cfg.dataset.video
|
|
|
|
teleop_action_processor, _, robot_obs_processor = make_default_processors()
|
|
|
|
dataset_features = combine_feature_dicts(
|
|
aggregate_pipeline_dataset_features(
|
|
pipeline=teleop_action_processor,
|
|
initial_features=create_initial_features(action=robot.action_features),
|
|
use_videos=use_videos,
|
|
),
|
|
aggregate_pipeline_dataset_features(
|
|
pipeline=robot_obs_processor,
|
|
initial_features=create_initial_features(observation=robot.observation_features),
|
|
use_videos=use_videos,
|
|
),
|
|
)
|
|
|
|
num_cameras = len(robot.cameras) if hasattr(robot, "cameras") else 0
|
|
dataset = None
|
|
|
|
try:
|
|
if cfg.resume:
|
|
dataset = LeRobotDataset.resume(
|
|
cfg.dataset.repo_id,
|
|
root=cfg.dataset.root,
|
|
streaming_encoding=cfg.dataset.streaming_encoding,
|
|
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
|
|
vcodec=cfg.dataset.vcodec,
|
|
encoder_threads=cfg.dataset.encoder_threads,
|
|
image_writer_processes=cfg.dataset.num_image_writer_processes if num_cameras > 0 else 0,
|
|
image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * num_cameras
|
|
if num_cameras > 0
|
|
else 0,
|
|
)
|
|
else:
|
|
cfg.dataset.stamp_repo_id()
|
|
dataset = LeRobotDataset.create(
|
|
cfg.dataset.repo_id,
|
|
cfg.dataset.fps,
|
|
root=cfg.dataset.root,
|
|
robot_type=robot.name,
|
|
features=dataset_features,
|
|
use_videos=use_videos,
|
|
streaming_encoding=cfg.dataset.streaming_encoding,
|
|
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
|
|
vcodec=cfg.dataset.vcodec,
|
|
encoder_threads=cfg.dataset.encoder_threads,
|
|
image_writer_processes=cfg.dataset.num_image_writer_processes if num_cameras > 0 else 0,
|
|
image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * num_cameras
|
|
if num_cameras > 0
|
|
else 0,
|
|
)
|
|
|
|
robot.connect(calibrate=True)
|
|
|
|
rng = np.random.default_rng()
|
|
with VideoEncodingManager(dataset):
|
|
for episode_idx in range(cfg.dataset.num_episodes):
|
|
logger.info(f"=== Episode {episode_idx + 1}/{cfg.dataset.num_episodes} ===")
|
|
|
|
logger.info("Step 1: grabbing and placing cube...")
|
|
grab_cube(robot)
|
|
pan, t = place_cube(robot)
|
|
logger.info(f"Cube placed at pan={pan:.1f}, reach={t:.2f}")
|
|
|
|
recovery_start = cfg.recovery_prob > 0 and float(rng.random()) < cfg.recovery_prob
|
|
logger.info(f"Step 2: recording {'recovery ' if recovery_start else ''}grab episode...")
|
|
record_grab_episode(
|
|
robot,
|
|
dataset,
|
|
pan,
|
|
t,
|
|
cfg.dataset.single_task,
|
|
recovery_start=recovery_start,
|
|
)
|
|
|
|
dataset.save_episode()
|
|
logger.info(f"Episode {episode_idx + 1} saved.")
|
|
|
|
finally:
|
|
if dataset:
|
|
dataset.finalize()
|
|
if robot.is_connected:
|
|
robot.disconnect()
|
|
|
|
if cfg.dataset.push_to_hub and dataset and dataset.num_episodes > 0:
|
|
dataset.push_to_hub(tags=cfg.dataset.tags, private=cfg.dataset.private)
|
|
|
|
return dataset
|
|
|
|
|
|
if __name__ == "__main__":
|
|
record_grab()
|