diff --git a/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py b/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py index cd248262c..f21e59ad8 100644 --- a/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py +++ b/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py @@ -40,8 +40,9 @@ from typing import Any import jsonlines import pandas as pd +import pyarrow as pa import tqdm -from datasets import Dataset, Image, concatenate_datasets +from datasets import Dataset, Features, Image from huggingface_hub import HfApi, snapshot_download from requests import HTTPError @@ -152,21 +153,24 @@ def convert_tasks(root, new_root): def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys): - # Save RAM by using Dataset.from_parquet and concatenate_datasets - datasets = [Dataset.from_parquet(file) for file in paths_to_cat] - concatenated_dataset = concatenate_datasets(datasets) + # TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets + dataframes = [pd.read_parquet(file) for file in paths_to_cat] + # Concatenate all DataFrames along rows + concatenated_df = pd.concat(dataframes, ignore_index=True) path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) path.parent.mkdir(parents=True, exist_ok=True) if len(image_keys) > 0: - # Handle image features by casting to the appropriate feature types - features = concatenated_dataset.features.copy() + schema = pa.Schema.from_pandas(concatenated_df) + features = Features.from_arrow_schema(schema) for key in image_keys: features[key] = Image() - concatenated_dataset = concatenated_dataset.cast(features) + schema = features.arrow_schema + else: + schema = None - concatenated_dataset.to_parquet(path) + concatenated_df.to_parquet(path, index=False, schema=schema) def convert_data(root, new_root): @@ -280,35 +284,50 @@ def convert_videos_of_camera(root: Path, new_root: Path, video_key): for ep_path in tqdm.tqdm(ep_paths, desc=f"convert videos of {video_key}"): ep_size_in_mb = get_video_size_in_mb(ep_path) ep_duration_in_s = get_video_duration_in_s(ep_path) + + # Check if adding this episode would exceed the limit + if size_in_mb + ep_size_in_mb >= DEFAULT_VIDEO_FILE_SIZE_IN_MB and len(paths_to_cat) > 0: + # Size limit would be exceeded, save current accumulation WITHOUT this episode + concat_video_files(paths_to_cat, new_root, video_key, chunk_idx, file_idx) + + # Update episodes metadata for the file we just saved + for i, _ in enumerate(paths_to_cat): + past_ep_idx = ep_idx - len(paths_to_cat) + i + episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx + episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx + + # Move to next file and start fresh with current episode + chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE) + size_in_mb = 0 + duration_in_s = 0.0 + paths_to_cat = [] + + # Add current episode metadata ep_metadata = { "episode_index": ep_idx, - f"videos/{video_key}/chunk_index": chunk_idx, - f"videos/{video_key}/file_index": file_idx, + f"videos/{video_key}/chunk_index": chunk_idx, # Will be updated when file is saved + f"videos/{video_key}/file_index": file_idx, # Will be updated when file is saved f"videos/{video_key}/from_timestamp": duration_in_s, f"videos/{video_key}/to_timestamp": duration_in_s + ep_duration_in_s, } + episodes_metadata.append(ep_metadata) + + # Add current episode to accumulation + paths_to_cat.append(ep_path) size_in_mb += ep_size_in_mb duration_in_s += ep_duration_in_s - episodes_metadata.append(ep_metadata) ep_idx += 1 - if size_in_mb < DEFAULT_VIDEO_FILE_SIZE_IN_MB: - paths_to_cat.append(ep_path) - continue - - concat_video_files(paths_to_cat, new_root, video_key, chunk_idx, file_idx) - - # Reset for the next file - size_in_mb = ep_size_in_mb - duration_in_s = ep_duration_in_s - paths_to_cat = [ep_path] - - chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE) - # Write remaining videos if any if paths_to_cat: concat_video_files(paths_to_cat, new_root, video_key, chunk_idx, file_idx) + # Update episodes metadata for the final file + for i, _ in enumerate(paths_to_cat): + past_ep_idx = ep_idx - len(paths_to_cat) + i + episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx + episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx + return episodes_metadata