From dcb02a951db486f11a41be9dbbe53f2877a0a5b8 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 21 Jul 2025 11:49:15 +0200 Subject: [PATCH] fix(convert_v1) use correct legacy path, remove comments from scripts, revert lekiwi/record.py to main --- examples/lekiwi/record.py | 99 +++++++++++++------ src/lerobot/datasets/aggregate.py | 20 ---- src/lerobot/datasets/utils.py | 6 ++ .../datasets/v2/convert_dataset_v1_to_v2.py | 20 ++-- .../v30/convert_dataset_v21_to_v30.py | 9 +- src/lerobot/utils/buffer.py | 3 - 6 files changed, 87 insertions(+), 70 deletions(-) diff --git a/examples/lekiwi/record.py b/examples/lekiwi/record.py index 68d6d1b01..11a716761 100644 --- a/examples/lekiwi/record.py +++ b/examples/lekiwi/record.py @@ -1,66 +1,101 @@ -import time - from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.utils import hw_to_dataset_features +from lerobot.record import record_loop from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient from lerobot.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig +from lerobot.utils.control_utils import init_keyboard_listener +from lerobot.utils.utils import log_say +from lerobot.utils.visualization_utils import _init_rerun -NB_CYCLES_CLIENT_CONNECTION = 250 - -leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem58760431551") -leader_arm = SO100Leader(leader_arm_config) +NUM_EPISODES = 3 +FPS = 30 +EPISODE_TIME_SEC = 30 +RESET_TIME_SEC = 10 +TASK_DESCRIPTION = "My task description" +# Create the robot and teleoperator configurations +robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi") +leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem585A0077581", id="my_awesome_leader_arm") keyboard_config = KeyboardTeleopConfig() + +robot = LeKiwiClient(robot_config) +leader_arm = SO100Leader(leader_arm_config) keyboard = KeyboardTeleop(keyboard_config) -robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi") -robot = LeKiwiClient(robot_config) - +# Configure the dataset features action_features = hw_to_dataset_features(robot.action_features, "action") obs_features = hw_to_dataset_features(robot.observation_features, "observation") dataset_features = {**action_features, **obs_features} +# Create the dataset dataset = LeRobotDataset.create( - repo_id="pepijn223/lekiwi" + str(int(time.time())), - fps=10, + repo_id="/", + fps=FPS, features=dataset_features, robot_type=robot.name, + use_videos=True, + image_writer_threads=4, ) +# To connect you already should have this script running on LeKiwi: `python -m lerobot.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi` +robot.connect() leader_arm.connect() keyboard.connect() -robot.connect() + +_init_rerun(session_name="lekiwi_record") + +listener, events = init_keyboard_listener() if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected: - exit() + raise ValueError("Robot, leader arm of keyboard is not connected!") -print("Starting LeKiwi recording") -i = 0 -while i < NB_CYCLES_CLIENT_CONNECTION: - arm_action = leader_arm.get_action() - arm_action = {f"arm_{k}": v for k, v in arm_action.items()} +recorded_episodes = 0 +while recorded_episodes < NUM_EPISODES and not events["stop_recording"]: + log_say(f"Recording episode {recorded_episodes}") - keyboard_keys = keyboard.get_action() + # Run the record loop + record_loop( + robot=robot, + events=events, + fps=FPS, + dataset=dataset, + teleop=[leader_arm, keyboard], + control_time_s=EPISODE_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + ) - base_action = robot._from_keyboard_to_base_action(keyboard_keys) + # Logic for reset env + if not events["stop_recording"] and ( + (recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"] + ): + log_say("Reset the environment") + record_loop( + robot=robot, + events=events, + fps=FPS, + teleop=[leader_arm, keyboard], + control_time_s=RESET_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + ) - action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action + if events["rerecord_episode"]: + log_say("Re-record episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue - action_sent = robot.send_action(action) - observation = robot.get_observation() + dataset.save_episode() + recorded_episodes += 1 - frame = {**action_sent, **observation, "task": "Dummy Example Task Dataset"} +# Upload to hub and clean up +dataset.push_to_hub() - dataset.add_frame(frame) - i += 1 - -print("Disconnecting Teleop Devices and LeKiwi Client") robot.disconnect() leader_arm.disconnect() keyboard.disconnect() - -print("Uploading dataset to the hub") -dataset.save_episode() -dataset.push_to_hub() +listener.stop() diff --git a/src/lerobot/datasets/aggregate.py b/src/lerobot/datasets/aggregate.py index 1af6597b6..2fb98883c 100644 --- a/src/lerobot/datasets/aggregate.py +++ b/src/lerobot/datasets/aggregate.py @@ -58,7 +58,6 @@ def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]): ValueError: If any metadata has different fps, robot_type, or features than the first metadata in the list. """ - # validate same fps, robot_type, features fps = all_metadata[0].fps robot_type = all_metadata[0].robot_type @@ -178,7 +177,6 @@ def aggregate_datasets( """ logging.info("Start aggregate_datasets") - # Use default constants if parameters not provided if data_files_size_in_mb is None: data_files_size_in_mb = DEFAULT_DATA_FILE_SIZE_IN_MB if video_files_size_in_mb is None: @@ -186,7 +184,6 @@ def aggregate_datasets( if chunk_size is None: chunk_size = DEFAULT_CHUNK_SIZE - # Load metadata all_metadata = ( [LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids] if roots is None @@ -197,7 +194,6 @@ def aggregate_datasets( fps, robot_type, features = validate_all_metadata(all_metadata) video_keys = [key for key in features if features[key]["dtype"] == "video"] - # Initialize output dataset metadata dst_meta = LeRobotDatasetMetadata.create( repo_id=aggr_repo_id, fps=fps, @@ -206,12 +202,10 @@ def aggregate_datasets( root=aggr_root, ) - # Aggregate task info logging.info("Find all tasks") unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique() dst_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks) - # Track counters and indices meta_idx = {"chunk": 0, "file": 0} data_idx = {"chunk": 0, "file": 0} videos_idx = { @@ -220,7 +214,6 @@ def aggregate_datasets( dst_meta.episodes = {} - # Process each dataset for src_meta in tqdm.tqdm(all_metadata, desc="Copy data and videos"): videos_idx = aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chunk_size) data_idx = aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_size) @@ -234,11 +227,6 @@ def aggregate_datasets( logging.info("Aggregation complete.") -# ------------------------------- -# Helper Functions -# ------------------------------- - - def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chunk_size): """Aggregates video chunks from a source dataset into the destination dataset. @@ -256,7 +244,6 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu dict: Updated videos_idx with current chunk and file indices. """ for key, video_idx in videos_idx.items(): - # Get unique (chunk, file) combinations unique_chunk_file_pairs = { (chunk, file) for chunk, file in zip( @@ -265,10 +252,8 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu strict=False, ) } - # Multiple files should be looped increasing the iteration index unique_chunk_file_pairs = sorted(unique_chunk_file_pairs) - # Current target chunk/file index chunk_idx = video_idx["chunk"] file_idx = video_idx["file"] @@ -459,10 +444,8 @@ def append_or_create_parquet_file( Returns: dict: Updated index dictionary with current chunk and file indices. """ - # Initial destination path - use the correct default_path parameter dst_path = aggr_root / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"]) - # If destination file doesn't exist, just write the new one if not dst_path.exists(): dst_path.parent.mkdir(parents=True, exist_ok=True) if contains_images: @@ -471,19 +454,16 @@ def append_or_create_parquet_file( df.to_parquet(dst_path) return idx - # Otherwise, check if we exceed the size limit src_size = get_parquet_file_size_in_mb(src_path) dst_size = get_parquet_file_size_in_mb(dst_path) if dst_size + src_size >= max_mb: - # File is too large, move to a new one idx["chunk"], idx["file"] = update_chunk_file_indices(idx["chunk"], idx["file"], chunk_size) new_path = aggr_root / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"]) new_path.parent.mkdir(parents=True, exist_ok=True) final_df = df target_path = new_path else: - # Append to existing file existing_df = pd.read_parquet(dst_path) final_df = pd.concat([existing_df, df], ignore_index=True) target_path = dst_path diff --git a/src/lerobot/datasets/utils.py b/src/lerobot/datasets/utils.py index 1151d212e..6d68bdb03 100644 --- a/src/lerobot/datasets/utils.py +++ b/src/lerobot/datasets/utils.py @@ -67,6 +67,12 @@ DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet" DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4" DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png" +LEGACY_EPISODES_PATH = "meta/episodes.jsonl" +LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl" +LEGACY_TASKS_PATH = "meta/tasks.jsonl" +LEGACY_DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4" +LEGACY_DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet" + DATASET_CARD_TEMPLATE = """ --- # Metadata will go there diff --git a/src/lerobot/datasets/v2/convert_dataset_v1_to_v2.py b/src/lerobot/datasets/v2/convert_dataset_v1_to_v2.py index a1e51a185..1e53fd5d1 100644 --- a/src/lerobot/datasets/v2/convert_dataset_v1_to_v2.py +++ b/src/lerobot/datasets/v2/convert_dataset_v1_to_v2.py @@ -121,9 +121,9 @@ from safetensors.torch import load_file from lerobot.datasets.utils import ( DEFAULT_CHUNK_SIZE, - DEFAULT_DATA_PATH, - DEFAULT_VIDEO_PATH, INFO_PATH, + LEGACY_DEFAULT_PARQUET_PATH, + LEGACY_DEFAULT_VIDEO_PATH, LEGACY_EPISODES_PATH, LEGACY_TASKS_PATH, STATS_PATH, @@ -290,12 +290,14 @@ def split_parquet_by_episodes( for ep_chunk in range(total_chunks): ep_chunk_start = DEFAULT_CHUNK_SIZE * ep_chunk ep_chunk_end = min(DEFAULT_CHUNK_SIZE * (ep_chunk + 1), total_episodes) - chunk_dir = "/".join(DEFAULT_DATA_PATH.split("/")[:-1]).format(episode_chunk=ep_chunk) + chunk_dir = "/".join(LEGACY_DEFAULT_PARQUET_PATH.split("/")[:-1]).format(episode_chunk=ep_chunk) (output_dir / chunk_dir).mkdir(parents=True, exist_ok=True) for ep_idx in range(ep_chunk_start, ep_chunk_end): ep_table = table.filter(pc.equal(table["episode_index"], ep_idx)) episode_lengths.insert(ep_idx, len(ep_table)) - output_file = output_dir / DEFAULT_DATA_PATH.format(episode_chunk=ep_chunk, episode_index=ep_idx) + output_file = output_dir / LEGACY_DEFAULT_PARQUET_PATH.format( + episode_chunk=ep_chunk, episode_index=ep_idx + ) pq.write_table(ep_table, output_file) return episode_lengths @@ -342,13 +344,13 @@ def move_videos( ep_chunk_start = DEFAULT_CHUNK_SIZE * ep_chunk ep_chunk_end = min(DEFAULT_CHUNK_SIZE * (ep_chunk + 1), total_episodes) for vid_key in video_keys: - chunk_dir = "/".join(DEFAULT_VIDEO_PATH.split("/")[:-1]).format( + chunk_dir = "/".join(LEGACY_DEFAULT_VIDEO_PATH.split("/")[:-1]).format( episode_chunk=ep_chunk, video_key=vid_key ) (work_dir / chunk_dir).mkdir(parents=True, exist_ok=True) for ep_idx in range(ep_chunk_start, ep_chunk_end): - target_path = DEFAULT_VIDEO_PATH.format( + target_path = LEGACY_DEFAULT_VIDEO_PATH.format( episode_chunk=ep_chunk, video_key=vid_key, episode_index=ep_idx ) video_file = V1_VIDEO_FILE.format(video_key=vid_key, episode_index=ep_idx) @@ -416,7 +418,7 @@ def _get_lfs_untracked_videos(work_dir: Path, video_files: list[str]) -> list[st def get_videos_info(repo_id: str, local_dir: Path, video_keys: list[str], branch: str) -> dict: # Assumes first episode video_files = [ - DEFAULT_VIDEO_PATH.format(episode_chunk=0, video_key=vid_key, episode_index=0) + LEGACY_DEFAULT_VIDEO_PATH.format(episode_chunk=0, video_key=vid_key, episode_index=0) for vid_key in video_keys ] hub_api = HfApi() @@ -557,8 +559,8 @@ def convert_dataset( "chunks_size": DEFAULT_CHUNK_SIZE, "fps": metadata_v1["fps"], "splits": {"train": f"0:{total_episodes}"}, - "data_path": DEFAULT_DATA_PATH, - "video_path": DEFAULT_VIDEO_PATH if video_keys else None, + "data_path": LEGACY_DEFAULT_PARQUET_PATH, + "video_path": LEGACY_DEFAULT_VIDEO_PATH if video_keys else None, "features": features, } write_json(metadata_v2_0, v20_dir / INFO_PATH) diff --git a/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py b/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py index c6bbf97e0..72c83c0d6 100644 --- a/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py +++ b/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py @@ -55,6 +55,9 @@ from lerobot.datasets.utils import ( DEFAULT_DATA_PATH, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_VIDEO_PATH, + LEGACY_EPISODES_PATH, + LEGACY_EPISODES_STATS_PATH, + LEGACY_TASKS_PATH, cast_stats_to_numpy, concat_video_files, flatten_dict, @@ -70,12 +73,6 @@ from lerobot.datasets.utils import ( write_tasks, ) -LEGACY_EPISODES_PATH = "meta/episodes.jsonl" -LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl" -LEGACY_TASKS_PATH = "meta/tasks.jsonl" -LEGACY_DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4" -LEGACY_DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet" - V21 = "v2.1" diff --git a/src/lerobot/utils/buffer.py b/src/lerobot/utils/buffer.py index 1e594c877..c65801896 100644 --- a/src/lerobot/utils/buffer.py +++ b/src/lerobot/utils/buffer.py @@ -597,9 +597,6 @@ class ReplayBuffer: # Add to the dataset's buffer lerobot_dataset.add_frame(frame_dict) - # Move to next frame - # frame_idx_in_episode += 1 - # If we reached an episode boundary, call save_episode, reset counters if self.dones[actual_idx] or self.truncateds[actual_idx]: lerobot_dataset.save_episode()