From a2319300449a58a5056257c9221988176bdcba4b Mon Sep 17 00:00:00 2001 From: Remi Cadene Date: Tue, 6 May 2025 15:13:35 +0000 Subject: [PATCH] Fix aggregate (num_frames, dataset_from_index, index) --- src/lerobot/datasets/aggregate.py | 52 +++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/src/lerobot/datasets/aggregate.py b/src/lerobot/datasets/aggregate.py index 5b5768fdc..2cf58ff57 100644 --- a/src/lerobot/datasets/aggregate.py +++ b/src/lerobot/datasets/aggregate.py @@ -47,9 +47,10 @@ def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]): return fps, robot_type, features -def update_episode_and_task(df, episode_index_to_add, old_tasks, new_tasks): +def update_episode_frame_task(df, episode_index_to_add, old_tasks, new_tasks, frame_index_to_add): def _update(row): row["episode_index"] = row["episode_index"] + episode_index_to_add + row["index"] = row["index"] + frame_index_to_add task = old_tasks.iloc[row["task_index"]].name row["task_index"] = new_tasks.loc[task].task_index.item() return row @@ -150,8 +151,10 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format( chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx ) - - if aggr_path.exists(): + if not aggr_path.exists(): + aggr_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_path) + else: size_in_mb = get_parquet_file_size_in_mb(path) aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path) @@ -160,13 +163,16 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_meta_chunk_idx, aggr_meta_file_idx = update_chunk_file_indices( aggr_meta_chunk_idx, aggr_meta_file_idx, DEFAULT_CHUNK_SIZE ) + aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format( + chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx + ) + aggr_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_path) else: # Update the existing parquet file with new rows aggr_df = pd.read_parquet(aggr_path) df = pd.concat([aggr_df, df], ignore_index=True) - - aggr_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(aggr_path) + df.to_parquet(aggr_path) # Aggregate videos if any for key in video_keys: @@ -187,7 +193,11 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] chunk_index=aggr_videos_chunk_idx[key], file_index=aggr_videos_file_idx[key], ) - if aggr_path.exists(): + if not aggr_path.exists(): + # First video + aggr_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(path), str(aggr_path)) + else: size_in_mb = get_video_size_in_mb(path) aggr_size_in_mb = get_video_size_in_mb(aggr_path) @@ -196,6 +206,13 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_videos_chunk_idx[key], aggr_videos_file_idx[key] = update_chunk_file_indices( aggr_videos_chunk_idx[key], aggr_videos_file_idx[key], DEFAULT_CHUNK_SIZE ) + aggr_path = aggr_root / DEFAULT_VIDEO_PATH.format( + video_key=key, + chunk_index=aggr_videos_chunk_idx[key], + file_index=aggr_videos_file_idx[key], + ) + aggr_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(path), str(aggr_path)) else: # Update the existing parquet file with new rows concat_video_files( @@ -205,10 +222,6 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_videos_chunk_idx[key], aggr_videos_file_idx[key], ) - else: - aggr_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(str(path), str(aggr_path)) - # copy_command = f"cp {video_path} {aggr_video_path} &" # subprocess.Popen(copy_command, shell=True) @@ -220,13 +233,15 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] for chunk_idx, file_idx in data_chunk_file_ids: path = meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) df = pd.read_parquet(path) - # TODO(rcadene): update frame index - df = update_episode_and_task(df, num_episodes, meta.tasks, aggr_meta.tasks) + df = update_episode_frame_task(df, num_episodes, meta.tasks, aggr_meta.tasks, num_frames) aggr_path = aggr_root / DEFAULT_DATA_PATH.format( chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx ) - if aggr_path.exists(): + if not aggr_path.exists(): + aggr_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_path) + else: size_in_mb = get_parquet_file_size_in_mb(path) aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path) @@ -235,13 +250,16 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_data_chunk_idx, aggr_data_file_idx = update_chunk_file_indices( aggr_data_chunk_idx, aggr_data_file_idx, DEFAULT_CHUNK_SIZE ) + aggr_path = aggr_root / DEFAULT_DATA_PATH.format( + chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx + ) + aggr_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_path) else: # Update the existing parquet file with new rows aggr_df = pd.read_parquet(aggr_path) df = pd.concat([aggr_df, df], ignore_index=True) - - aggr_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(aggr_path) + df.to_parquet(aggr_path) num_episodes += meta.total_episodes num_frames += meta.total_frames