fix(convert_v1) use correct legacy path, remove comments from scripts, revert lekiwi/record.py to main

This commit is contained in:
Michel Aractingi
2025-07-21 11:49:15 +02:00
parent ac0fd71f0a
commit dcb02a951d
6 changed files with 87 additions and 70 deletions
+67 -32
View File
@@ -1,66 +1,101 @@
import time
from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.record import record_loop
from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig
from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient
from lerobot.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig from lerobot.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig
from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import _init_rerun
NB_CYCLES_CLIENT_CONNECTION = 250 NUM_EPISODES = 3
FPS = 30
leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem58760431551") EPISODE_TIME_SEC = 30
leader_arm = SO100Leader(leader_arm_config) RESET_TIME_SEC = 10
TASK_DESCRIPTION = "My task description"
# Create the robot and teleoperator configurations
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi")
leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem585A0077581", id="my_awesome_leader_arm")
keyboard_config = KeyboardTeleopConfig() keyboard_config = KeyboardTeleopConfig()
robot = LeKiwiClient(robot_config)
leader_arm = SO100Leader(leader_arm_config)
keyboard = KeyboardTeleop(keyboard_config) keyboard = KeyboardTeleop(keyboard_config)
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi") # Configure the dataset features
robot = LeKiwiClient(robot_config)
action_features = hw_to_dataset_features(robot.action_features, "action") action_features = hw_to_dataset_features(robot.action_features, "action")
obs_features = hw_to_dataset_features(robot.observation_features, "observation") obs_features = hw_to_dataset_features(robot.observation_features, "observation")
dataset_features = {**action_features, **obs_features} dataset_features = {**action_features, **obs_features}
# Create the dataset
dataset = LeRobotDataset.create( dataset = LeRobotDataset.create(
repo_id="pepijn223/lekiwi" + str(int(time.time())), repo_id="<hf_username>/<dataset_repo_id>",
fps=10, fps=FPS,
features=dataset_features, features=dataset_features,
robot_type=robot.name, robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
) )
# To connect you already should have this script running on LeKiwi: `python -m lerobot.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
robot.connect()
leader_arm.connect() leader_arm.connect()
keyboard.connect() keyboard.connect()
robot.connect()
_init_rerun(session_name="lekiwi_record")
listener, events = init_keyboard_listener()
if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected: if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected:
exit() raise ValueError("Robot, leader arm of keyboard is not connected!")
print("Starting LeKiwi recording") recorded_episodes = 0
i = 0 while recorded_episodes < NUM_EPISODES and not events["stop_recording"]:
while i < NB_CYCLES_CLIENT_CONNECTION: log_say(f"Recording episode {recorded_episodes}")
arm_action = leader_arm.get_action()
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
keyboard_keys = keyboard.get_action() # Run the record loop
record_loop(
robot=robot,
events=events,
fps=FPS,
dataset=dataset,
teleop=[leader_arm, keyboard],
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
)
base_action = robot._from_keyboard_to_base_action(keyboard_keys) # Logic for reset env
if not events["stop_recording"] and (
(recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"]
):
log_say("Reset the environment")
record_loop(
robot=robot,
events=events,
fps=FPS,
teleop=[leader_arm, keyboard],
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
)
action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action if events["rerecord_episode"]:
log_say("Re-record episode")
events["rerecord_episode"] = False
events["exit_early"] = False
dataset.clear_episode_buffer()
continue
action_sent = robot.send_action(action) dataset.save_episode()
observation = robot.get_observation() recorded_episodes += 1
frame = {**action_sent, **observation, "task": "Dummy Example Task Dataset"} # Upload to hub and clean up
dataset.push_to_hub()
dataset.add_frame(frame)
i += 1
print("Disconnecting Teleop Devices and LeKiwi Client")
robot.disconnect() robot.disconnect()
leader_arm.disconnect() leader_arm.disconnect()
keyboard.disconnect() keyboard.disconnect()
listener.stop()
print("Uploading dataset to the hub")
dataset.save_episode()
dataset.push_to_hub()
-20
View File
@@ -58,7 +58,6 @@ def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]):
ValueError: If any metadata has different fps, robot_type, or features ValueError: If any metadata has different fps, robot_type, or features
than the first metadata in the list. than the first metadata in the list.
""" """
# validate same fps, robot_type, features
fps = all_metadata[0].fps fps = all_metadata[0].fps
robot_type = all_metadata[0].robot_type robot_type = all_metadata[0].robot_type
@@ -178,7 +177,6 @@ def aggregate_datasets(
""" """
logging.info("Start aggregate_datasets") logging.info("Start aggregate_datasets")
# Use default constants if parameters not provided
if data_files_size_in_mb is None: if data_files_size_in_mb is None:
data_files_size_in_mb = DEFAULT_DATA_FILE_SIZE_IN_MB data_files_size_in_mb = DEFAULT_DATA_FILE_SIZE_IN_MB
if video_files_size_in_mb is None: if video_files_size_in_mb is None:
@@ -186,7 +184,6 @@ def aggregate_datasets(
if chunk_size is None: if chunk_size is None:
chunk_size = DEFAULT_CHUNK_SIZE chunk_size = DEFAULT_CHUNK_SIZE
# Load metadata
all_metadata = ( all_metadata = (
[LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids] [LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids]
if roots is None if roots is None
@@ -197,7 +194,6 @@ def aggregate_datasets(
fps, robot_type, features = validate_all_metadata(all_metadata) fps, robot_type, features = validate_all_metadata(all_metadata)
video_keys = [key for key in features if features[key]["dtype"] == "video"] video_keys = [key for key in features if features[key]["dtype"] == "video"]
# Initialize output dataset metadata
dst_meta = LeRobotDatasetMetadata.create( dst_meta = LeRobotDatasetMetadata.create(
repo_id=aggr_repo_id, repo_id=aggr_repo_id,
fps=fps, fps=fps,
@@ -206,12 +202,10 @@ def aggregate_datasets(
root=aggr_root, root=aggr_root,
) )
# Aggregate task info
logging.info("Find all tasks") logging.info("Find all tasks")
unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique() unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique()
dst_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks) dst_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks)
# Track counters and indices
meta_idx = {"chunk": 0, "file": 0} meta_idx = {"chunk": 0, "file": 0}
data_idx = {"chunk": 0, "file": 0} data_idx = {"chunk": 0, "file": 0}
videos_idx = { videos_idx = {
@@ -220,7 +214,6 @@ def aggregate_datasets(
dst_meta.episodes = {} dst_meta.episodes = {}
# Process each dataset
for src_meta in tqdm.tqdm(all_metadata, desc="Copy data and videos"): for src_meta in tqdm.tqdm(all_metadata, desc="Copy data and videos"):
videos_idx = aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chunk_size) videos_idx = aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chunk_size)
data_idx = aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_size) data_idx = aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_size)
@@ -234,11 +227,6 @@ def aggregate_datasets(
logging.info("Aggregation complete.") logging.info("Aggregation complete.")
# -------------------------------
# Helper Functions
# -------------------------------
def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chunk_size): def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chunk_size):
"""Aggregates video chunks from a source dataset into the destination dataset. """Aggregates video chunks from a source dataset into the destination dataset.
@@ -256,7 +244,6 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
dict: Updated videos_idx with current chunk and file indices. dict: Updated videos_idx with current chunk and file indices.
""" """
for key, video_idx in videos_idx.items(): for key, video_idx in videos_idx.items():
# Get unique (chunk, file) combinations
unique_chunk_file_pairs = { unique_chunk_file_pairs = {
(chunk, file) (chunk, file)
for chunk, file in zip( for chunk, file in zip(
@@ -265,10 +252,8 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
strict=False, strict=False,
) )
} }
# Multiple files should be looped increasing the iteration index
unique_chunk_file_pairs = sorted(unique_chunk_file_pairs) unique_chunk_file_pairs = sorted(unique_chunk_file_pairs)
# Current target chunk/file index
chunk_idx = video_idx["chunk"] chunk_idx = video_idx["chunk"]
file_idx = video_idx["file"] file_idx = video_idx["file"]
@@ -459,10 +444,8 @@ def append_or_create_parquet_file(
Returns: Returns:
dict: Updated index dictionary with current chunk and file indices. dict: Updated index dictionary with current chunk and file indices.
""" """
# Initial destination path - use the correct default_path parameter
dst_path = aggr_root / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"]) dst_path = aggr_root / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"])
# If destination file doesn't exist, just write the new one
if not dst_path.exists(): if not dst_path.exists():
dst_path.parent.mkdir(parents=True, exist_ok=True) dst_path.parent.mkdir(parents=True, exist_ok=True)
if contains_images: if contains_images:
@@ -471,19 +454,16 @@ def append_or_create_parquet_file(
df.to_parquet(dst_path) df.to_parquet(dst_path)
return idx return idx
# Otherwise, check if we exceed the size limit
src_size = get_parquet_file_size_in_mb(src_path) src_size = get_parquet_file_size_in_mb(src_path)
dst_size = get_parquet_file_size_in_mb(dst_path) dst_size = get_parquet_file_size_in_mb(dst_path)
if dst_size + src_size >= max_mb: if dst_size + src_size >= max_mb:
# File is too large, move to a new one
idx["chunk"], idx["file"] = update_chunk_file_indices(idx["chunk"], idx["file"], chunk_size) idx["chunk"], idx["file"] = update_chunk_file_indices(idx["chunk"], idx["file"], chunk_size)
new_path = aggr_root / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"]) new_path = aggr_root / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"])
new_path.parent.mkdir(parents=True, exist_ok=True) new_path.parent.mkdir(parents=True, exist_ok=True)
final_df = df final_df = df
target_path = new_path target_path = new_path
else: else:
# Append to existing file
existing_df = pd.read_parquet(dst_path) existing_df = pd.read_parquet(dst_path)
final_df = pd.concat([existing_df, df], ignore_index=True) final_df = pd.concat([existing_df, df], ignore_index=True)
target_path = dst_path target_path = dst_path
+6
View File
@@ -67,6 +67,12 @@ DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4" DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png" DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png"
LEGACY_EPISODES_PATH = "meta/episodes.jsonl"
LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl"
LEGACY_TASKS_PATH = "meta/tasks.jsonl"
LEGACY_DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
LEGACY_DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
DATASET_CARD_TEMPLATE = """ DATASET_CARD_TEMPLATE = """
--- ---
# Metadata will go there # Metadata will go there
@@ -121,9 +121,9 @@ from safetensors.torch import load_file
from lerobot.datasets.utils import ( from lerobot.datasets.utils import (
DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_PATH,
DEFAULT_VIDEO_PATH,
INFO_PATH, INFO_PATH,
LEGACY_DEFAULT_PARQUET_PATH,
LEGACY_DEFAULT_VIDEO_PATH,
LEGACY_EPISODES_PATH, LEGACY_EPISODES_PATH,
LEGACY_TASKS_PATH, LEGACY_TASKS_PATH,
STATS_PATH, STATS_PATH,
@@ -290,12 +290,14 @@ def split_parquet_by_episodes(
for ep_chunk in range(total_chunks): for ep_chunk in range(total_chunks):
ep_chunk_start = DEFAULT_CHUNK_SIZE * ep_chunk ep_chunk_start = DEFAULT_CHUNK_SIZE * ep_chunk
ep_chunk_end = min(DEFAULT_CHUNK_SIZE * (ep_chunk + 1), total_episodes) ep_chunk_end = min(DEFAULT_CHUNK_SIZE * (ep_chunk + 1), total_episodes)
chunk_dir = "/".join(DEFAULT_DATA_PATH.split("/")[:-1]).format(episode_chunk=ep_chunk) chunk_dir = "/".join(LEGACY_DEFAULT_PARQUET_PATH.split("/")[:-1]).format(episode_chunk=ep_chunk)
(output_dir / chunk_dir).mkdir(parents=True, exist_ok=True) (output_dir / chunk_dir).mkdir(parents=True, exist_ok=True)
for ep_idx in range(ep_chunk_start, ep_chunk_end): for ep_idx in range(ep_chunk_start, ep_chunk_end):
ep_table = table.filter(pc.equal(table["episode_index"], ep_idx)) ep_table = table.filter(pc.equal(table["episode_index"], ep_idx))
episode_lengths.insert(ep_idx, len(ep_table)) episode_lengths.insert(ep_idx, len(ep_table))
output_file = output_dir / DEFAULT_DATA_PATH.format(episode_chunk=ep_chunk, episode_index=ep_idx) output_file = output_dir / LEGACY_DEFAULT_PARQUET_PATH.format(
episode_chunk=ep_chunk, episode_index=ep_idx
)
pq.write_table(ep_table, output_file) pq.write_table(ep_table, output_file)
return episode_lengths return episode_lengths
@@ -342,13 +344,13 @@ def move_videos(
ep_chunk_start = DEFAULT_CHUNK_SIZE * ep_chunk ep_chunk_start = DEFAULT_CHUNK_SIZE * ep_chunk
ep_chunk_end = min(DEFAULT_CHUNK_SIZE * (ep_chunk + 1), total_episodes) ep_chunk_end = min(DEFAULT_CHUNK_SIZE * (ep_chunk + 1), total_episodes)
for vid_key in video_keys: for vid_key in video_keys:
chunk_dir = "/".join(DEFAULT_VIDEO_PATH.split("/")[:-1]).format( chunk_dir = "/".join(LEGACY_DEFAULT_VIDEO_PATH.split("/")[:-1]).format(
episode_chunk=ep_chunk, video_key=vid_key episode_chunk=ep_chunk, video_key=vid_key
) )
(work_dir / chunk_dir).mkdir(parents=True, exist_ok=True) (work_dir / chunk_dir).mkdir(parents=True, exist_ok=True)
for ep_idx in range(ep_chunk_start, ep_chunk_end): for ep_idx in range(ep_chunk_start, ep_chunk_end):
target_path = DEFAULT_VIDEO_PATH.format( target_path = LEGACY_DEFAULT_VIDEO_PATH.format(
episode_chunk=ep_chunk, video_key=vid_key, episode_index=ep_idx episode_chunk=ep_chunk, video_key=vid_key, episode_index=ep_idx
) )
video_file = V1_VIDEO_FILE.format(video_key=vid_key, episode_index=ep_idx) video_file = V1_VIDEO_FILE.format(video_key=vid_key, episode_index=ep_idx)
@@ -416,7 +418,7 @@ def _get_lfs_untracked_videos(work_dir: Path, video_files: list[str]) -> list[st
def get_videos_info(repo_id: str, local_dir: Path, video_keys: list[str], branch: str) -> dict: def get_videos_info(repo_id: str, local_dir: Path, video_keys: list[str], branch: str) -> dict:
# Assumes first episode # Assumes first episode
video_files = [ video_files = [
DEFAULT_VIDEO_PATH.format(episode_chunk=0, video_key=vid_key, episode_index=0) LEGACY_DEFAULT_VIDEO_PATH.format(episode_chunk=0, video_key=vid_key, episode_index=0)
for vid_key in video_keys for vid_key in video_keys
] ]
hub_api = HfApi() hub_api = HfApi()
@@ -557,8 +559,8 @@ def convert_dataset(
"chunks_size": DEFAULT_CHUNK_SIZE, "chunks_size": DEFAULT_CHUNK_SIZE,
"fps": metadata_v1["fps"], "fps": metadata_v1["fps"],
"splits": {"train": f"0:{total_episodes}"}, "splits": {"train": f"0:{total_episodes}"},
"data_path": DEFAULT_DATA_PATH, "data_path": LEGACY_DEFAULT_PARQUET_PATH,
"video_path": DEFAULT_VIDEO_PATH if video_keys else None, "video_path": LEGACY_DEFAULT_VIDEO_PATH if video_keys else None,
"features": features, "features": features,
} }
write_json(metadata_v2_0, v20_dir / INFO_PATH) write_json(metadata_v2_0, v20_dir / INFO_PATH)
@@ -55,6 +55,9 @@ from lerobot.datasets.utils import (
DEFAULT_DATA_PATH, DEFAULT_DATA_PATH,
DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_VIDEO_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_PATH, DEFAULT_VIDEO_PATH,
LEGACY_EPISODES_PATH,
LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH,
cast_stats_to_numpy, cast_stats_to_numpy,
concat_video_files, concat_video_files,
flatten_dict, flatten_dict,
@@ -70,12 +73,6 @@ from lerobot.datasets.utils import (
write_tasks, write_tasks,
) )
LEGACY_EPISODES_PATH = "meta/episodes.jsonl"
LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl"
LEGACY_TASKS_PATH = "meta/tasks.jsonl"
LEGACY_DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
LEGACY_DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
V21 = "v2.1" V21 = "v2.1"
-3
View File
@@ -597,9 +597,6 @@ class ReplayBuffer:
# Add to the dataset's buffer # Add to the dataset's buffer
lerobot_dataset.add_frame(frame_dict) lerobot_dataset.add_frame(frame_dict)
# Move to next frame
# frame_idx_in_episode += 1
# If we reached an episode boundary, call save_episode, reset counters # If we reached an episode boundary, call save_episode, reset counters
if self.dones[actual_idx] or self.truncateds[actual_idx]: if self.dones[actual_idx] or self.truncateds[actual_idx]:
lerobot_dataset.save_episode() lerobot_dataset.save_episode()