From 88380fe34eb98dd2adae9c9e9d93f76f1dbf6c66 Mon Sep 17 00:00:00 2001 From: Jade Choghari Date: Thu, 30 Oct 2025 18:11:27 +0100 Subject: [PATCH] update changes --- examples/behavior_1k/convert_to_lerobot_v3.py | 734 +++++++++++++----- 1 file changed, 543 insertions(+), 191 deletions(-) diff --git a/examples/behavior_1k/convert_to_lerobot_v3.py b/examples/behavior_1k/convert_to_lerobot_v3.py index 9927af27d..5fddb1a3f 100755 --- a/examples/behavior_1k/convert_to_lerobot_v3.py +++ b/examples/behavior_1k/convert_to_lerobot_v3.py @@ -1,226 +1,578 @@ #!/usr/bin/env python -""" -Convert a single BEHAVIOR-1K task from HDF5 to LeRobotDataset v3.0 format. - -Usage examples: -# Convert a single task -python convert_to_lerobot_v3.py \ - --data-folder /path/to/data \ - --repo-id "username/behavior-1k-assembling-gift-baskets" \ - --task-id 0 \ - --push-to-hub - -""" - +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Convert Behavior Dataset to LeRobotDataset v3.0 format""" +from pathlib import Path +import jsonlines import argparse import logging -import os +import shutil from pathlib import Path -import h5py -import numpy as np -from tqdm import tqdm +import jsonlines +import pandas as pd +import pyarrow as pa +import tqdm +from datasets import Dataset, Features, Image +from lerobot.datasets.compute_stats import aggregate_stats +from lerobot.datasets.utils import ( + DEFAULT_CHUNK_SIZE, + DEFAULT_DATA_FILE_SIZE_IN_MB, + DEFAULT_DATA_PATH, + DEFAULT_VIDEO_FILE_SIZE_IN_MB, + DEFAULT_VIDEO_PATH, + LEGACY_EPISODES_PATH, + LEGACY_EPISODES_STATS_PATH, + LEGACY_TASKS_PATH, + cast_stats_to_numpy, + flatten_dict, + get_file_size_in_mb, + get_parquet_file_size_in_mb, + get_parquet_num_frames, + load_info, + update_chunk_file_indices, + write_episodes, + write_info, + write_stats, + write_tasks, +) +from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s +from lerobot.utils.constants import HF_LEROBOT_HOME from lerobot.utils.utils import init_logging - -from .behavior_lerobot_dataset_v3 import BehaviorLeRobotDatasetV3 -from .behaviour_1k_constants import BEHAVIOR_DATASET_FEATURES, FPS, ROBOT_TYPE, TASK_INDICES_TO_NAMES - -init_logging() - - -def load_hdf5_episode(hdf5_path: str, episode_id: int = 0) -> dict: +# script to convert one single task to v3.1 +# TASK = 1 +NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb") +from lerobot.datasets.utils import load_info +def get_total_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step) -> int: """ - Load episode data from HDF5 file. - - Args: - hdf5_path: Path to the HDF5 file - episode_id: Episode ID to load (default: 0) - - Returns: - Dictionary containing episode data + Calculates the total number of episodes for a single, specified task. """ - episode_data = {} - - with h5py.File(hdf5_path, "r") as f: - # Find the episode with most samples if episode_id not specified - if episode_id == -1: - num_samples = [f["data"][key].attrs["num_samples"] for key in f["data"]] - episode_id = num_samples.index(max(num_samples)) - - demo_key = f"demo_{episode_id}" - if demo_key not in f["data"]: - raise ValueError(f"Episode {episode_id} not found in {hdf5_path}") - - demo_data = f["data"][demo_key] - - # Load actions - episode_data["action"] = np.array(demo_data["action"][:]) - - # Load observations - episode_data["obs"] = {} - for key in demo_data["obs"]: - episode_data["obs"][key] = np.array(demo_data["obs"][key][:]) - - # Load attributes - episode_data["attrs"] = {} - for attr_name in demo_data.attrs: - episode_data["attrs"][attr_name] = demo_data.attrs[attr_name] - - # Add global attributes - for attr_name in f["data"].attrs: - episode_data["attrs"][f"global_{attr_name}"] = f["data"].attrs[attr_name] - - return episode_data - - -def convert_episode( - data_folder: str, - task_id: int, - demo_id: int, - dataset: BehaviorLeRobotDatasetV3, - include_videos: bool = True, - include_segmentation: bool = True, -) -> None: - """ - Convert a single episode from HDF5 to LeRobotDataset v3.0 format. - - Args: - data_folder: Base data folder containing HDF5 files - repo_id: Repository ID for the dataset - task_id: Task ID - demo_id: Demo ID (episode ID) - dataset: BehaviorLeRobotDatasetV3 instance to add data to - include_videos: Whether to include video data - include_segmentation: Whether to include segmentation data - """ - # Construct paths - task_name = TASK_INDICES_TO_NAMES[task_id] - hdf5_path = f"{data_folder}/2025-challenge-rawdata/task-{task_id:04d}/episode_{demo_id:08d}.hdf5" - - if not os.path.exists(hdf5_path): - logging.error(f"HDF5 file not found: {hdf5_path}") - return - - logging.info(f"Converting episode {demo_id} from task {task_name}") - - # Load episode data - episode_data = load_hdf5_episode(hdf5_path, episode_id=0) - - # Filter out segmentation if not requested - if not include_segmentation: - keys_to_remove = [k for k in episode_data["obs"] if "seg_instance_id" in k] - for key in keys_to_remove: - del episode_data["obs"][key] - - # Add episode to dataset - dataset.add_episode_from_hdf5( - hdf5_data=episode_data, - task_id=task_id, - episode_id=demo_id, - include_videos=include_videos, + # Simply load the episodes for the task and count them. + episodes = legacy_load_episodes_task( + local_dir=local_dir, + task_id=task_id, + task_ranges=task_ranges, + step=step ) + return len(episodes) - -def convert_task_to_dataset( - data_folder: str, - repo_id: str, - task_id: int, - push_to_hub: bool = False, -) -> None: - """ - Convert a single BEHAVIOR-1K task from HDF5 to LeRobotDataset v3.0 format. - - Args: - data_folder: Base folder containing HDF5 data - repo_id: Repository ID (e.g., "username/behavior-1k-task-name") - task_id: Task ID to convert - push_to_hub: Whether to push to HuggingFace Hub - """ - task_name = TASK_INDICES_TO_NAMES[task_id] - task_folder = f"{data_folder}/2025-challenge-rawdata/task-{task_id:04d}" - - if not os.path.exists(task_folder): - raise ValueError(f"Task folder not found: {task_folder}") - - # Create output directory - output_dir = Path.home() / ".cache/huggingface/lerobot" / repo_id - output_dir.mkdir(parents=True, exist_ok=True) - - logging.info(f"Converting task '{task_name}' (ID: {task_id}) to: {output_dir}") - - # Initialize dataset for this task - dataset = BehaviorLeRobotDatasetV3.create( - repo_id=repo_id, - fps=FPS, - features=BEHAVIOR_DATASET_FEATURES, - robot_type=ROBOT_TYPE, +NUM_CAMERAS = 9 +def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, step: int) -> int: + episodes_metadata = legacy_load_episodes_task( + local_dir=local_dir, + task_id=task_id, + task_ranges=task_ranges, + step=step ) + total_frames = 0 + # like 'duration' + for ep in episodes_metadata.values(): + duration_s = ep["length"] + total_frames += int(duration_s) + return total_frames - # Find all episodes in the task folder - task_episode_ids = [] - for filename in os.listdir(task_folder): - if filename.startswith("episode_") and filename.endswith(".hdf5"): - eid = int(filename.split("_")[1].split(".")[0]) - task_episode_ids.append(eid) - task_episode_ids.sort() +def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step): + info = load_info(root) + info["codebase_version"] = "v3.0" + del info["total_videos"] + info["data_files_size_in_mb"] = data_file_size_in_mb + info["video_files_size_in_mb"] = video_file_size_in_mb + info["data_path"] = DEFAULT_DATA_PATH + info["video_path"] = DEFAULT_VIDEO_PATH if info["video_path"] is not None else None + info["fps"] = int(info["fps"]) + for key in info["features"]: + if info["features"][key]["dtype"] == "video": + # already has fps in video_info + continue + info["features"][key]["fps"] = info["fps"] + + info["total_episodes"] = get_total_episodes_task(root, task_id, task_ranges, step) + info["total_videos"] = info["total_episodes"] * NUM_CAMERAS + info["total_frames"] = get_total_frames_task(root, meta_path, task_id, task_ranges, step) + info["total_tasks"] = 1 + write_info(info, new_root) - logging.info(f"Processing {len(task_episode_ids)} episodes for task {task_name}") +def load_jsonlines(fpath: Path) -> list[any]: + with jsonlines.open(fpath, "r") as reader: + return list(reader) - # Convert each episode - episodes_converted = 0 - for demo_id in tqdm(task_episode_ids, desc="Converting episodes"): - convert_episode( - data_folder=data_folder, - task_id=task_id, - demo_id=demo_id, - dataset=dataset, - include_videos=True, - include_segmentation=True, +def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]: + tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) + # return tasks dict such that + tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])} + task_to_task_index = {task: task_index for task_index, task in tasks.items()} + return tasks, task_to_task_index + +def convert_tasks(root, new_root, task_id: int): + tasks, _ = legacy_load_tasks(root) + if task_id not in tasks: + raise ValueError(f"Task ID {task_id} not found in tasks (available: {list(tasks.keys())})") + tasks = {task_id: tasks[task_id]} + task_indices = tasks.keys() + task_strings = tasks.values() + df_tasks = pd.DataFrame({"task_index": task_indices}, index=task_strings) + write_tasks(df_tasks, new_root) + + +def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys): + # TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets + dataframes = [pd.read_parquet(file) for file in paths_to_cat] + # Concatenate all DataFrames along rows + concatenated_df = pd.concat(dataframes, ignore_index=True) + + path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) + path.parent.mkdir(parents=True, exist_ok=True) + if len(image_keys) > 0: + schema = pa.Schema.from_pandas(concatenated_df) + features = Features.from_arrow_schema(schema) + for key in image_keys: + features[key] = Image() + schema = features.arrow_schema + else: + schema = None + + concatenated_df.to_parquet(path, index=False, schema=schema) + +def get_image_keys(root): + info = load_info(root) + features = info["features"] + image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"] + return image_keys + +def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int): + task_dir_name = f"task-00{task_index}" + data_dir = root / "data" / task_dir_name + # print("data_dir", data_dir) + ep_paths = sorted(data_dir.glob("*.parquet")) + # ep_paths = sorted(data_dir.glob("*/*.parquet")) + # print("ep_paths", ep_paths) + image_keys = get_image_keys(root) + + ep_idx = 0 + chunk_idx = 0 + file_idx = 0 + size_in_mb = 0 + num_frames = 0 + paths_to_cat = [] + episodes_metadata = [] + + logging.info(f"Converting data files from {len(ep_paths)} episodes") + + for ep_path in tqdm.tqdm(ep_paths, desc="convert data files"): + ep_size_in_mb = get_parquet_file_size_in_mb(ep_path) + ep_num_frames = get_parquet_num_frames(ep_path) + ep_metadata = { + "episode_index": ep_idx, + "data/chunk_index": chunk_idx, + "data/file_index": file_idx, + "dataset_from_index": num_frames, + "dataset_to_index": num_frames + ep_num_frames, + } + size_in_mb += ep_size_in_mb + num_frames += ep_num_frames + episodes_metadata.append(ep_metadata) + ep_idx += 1 + + if size_in_mb < data_file_size_in_mb: + paths_to_cat.append(ep_path) + continue + + if paths_to_cat: + concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys) + + # Reset for the next file + size_in_mb = ep_size_in_mb + paths_to_cat = [ep_path] + + chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE) + + # Write remaining data if any + if paths_to_cat: + concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys) + + return episodes_metadata + +def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int): + # Access old paths to mp4 + # videos_dir = root / "videos" + # ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4")) + task_dir_name = f"task-00{task_index}" + videos_dir = root / "videos" / task_dir_name / video_key + ep_paths = sorted(videos_dir.glob("*.mp4")) + print("ep_paths", ep_paths) + ep_idx = 0 + chunk_idx = 0 + file_idx = 0 + size_in_mb = 0 + duration_in_s = 0.0 + paths_to_cat = [] + episodes_metadata = [] + + for ep_path in tqdm.tqdm(ep_paths, desc=f"convert videos of {video_key}"): + ep_size_in_mb = get_file_size_in_mb(ep_path) + ep_duration_in_s = get_video_duration_in_s(ep_path) + + # Check if adding this episode would exceed the limit + if size_in_mb + ep_size_in_mb >= video_file_size_in_mb and len(paths_to_cat) > 0: + # Size limit would be exceeded, save current accumulation WITHOUT this episode + concatenate_video_files( + paths_to_cat, + new_root + / DEFAULT_VIDEO_PATH.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx), + ) + + # Update episodes metadata for the file we just saved + for i, _ in enumerate(paths_to_cat): + past_ep_idx = ep_idx - len(paths_to_cat) + i + episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx + episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx + + # Move to next file and start fresh with current episode + chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE) + size_in_mb = 0 + duration_in_s = 0.0 + paths_to_cat = [] + + # Add current episode metadata + ep_metadata = { + "episode_index": ep_idx, + f"videos/{video_key}/chunk_index": chunk_idx, # Will be updated when file is saved + f"videos/{video_key}/file_index": file_idx, # Will be updated when file is saved + f"videos/{video_key}/from_timestamp": duration_in_s, + f"videos/{video_key}/to_timestamp": duration_in_s + ep_duration_in_s, + } + episodes_metadata.append(ep_metadata) + + # Add current episode to accumulation + paths_to_cat.append(ep_path) + size_in_mb += ep_size_in_mb + duration_in_s += ep_duration_in_s + ep_idx += 1 + + # Write remaining videos if any + if paths_to_cat: + concatenate_video_files( + paths_to_cat, + new_root + / DEFAULT_VIDEO_PATH.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx), ) - episodes_converted += 1 - logging.info(f"Converted {episodes_converted} episodes for task {task_name}") + # Update episodes metadata for the final file + for i, _ in enumerate(paths_to_cat): + past_ep_idx = ep_idx - len(paths_to_cat) + i + episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx + episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx - # Finalize dataset - logging.info(f"Finalizing dataset for task {task_name}...") - dataset.finalize() + return episodes_metadata - # Push to hub if requested - if push_to_hub: - logging.info(f"Pushing task {task_name} dataset to HuggingFace Hub...") - dataset.push_to_hub() - - logging.info("Conversion complete!") +def get_video_keys(root): + info = load_info(root) + features = info["features"] + video_keys = [key for key, ft in features.items() if ft["dtype"] == "video"] + return video_keys -def main(): - parser = argparse.ArgumentParser(description="Convert a single BEHAVIOR-1K task to LeRobotDataset v3.0") - parser.add_argument("--data-folder", type=str, required=True, help="Path to the data folder") +def convert_videos(root: Path, new_root: Path, video_file_size_in_mb: int, task_id: int): + logging.info(f"Converting videos from {root} to {new_root}") + + video_keys = get_video_keys(root) + if len(video_keys) == 0: + return None + + video_keys = sorted(video_keys) + + eps_metadata_per_cam = [] + for camera in video_keys: + eps_metadata = convert_videos_of_camera(root, new_root, camera, video_file_size_in_mb, task_id) + eps_metadata_per_cam.append(eps_metadata) + + num_eps_per_cam = [len(eps_cam_map) for eps_cam_map in eps_metadata_per_cam] + if len(set(num_eps_per_cam)) != 1: + raise ValueError(f"All cams dont have same number of episodes ({num_eps_per_cam}).") + + episods_metadata = [] + num_cameras = len(video_keys) + num_episodes = num_eps_per_cam[0] + for ep_idx in tqdm.tqdm(range(num_episodes), desc="convert videos"): + # Sanity check + ep_ids = [eps_metadata_per_cam[cam_idx][ep_idx]["episode_index"] for cam_idx in range(num_cameras)] + ep_ids += [ep_idx] + if len(set(ep_ids)) != 1: + raise ValueError(f"All episode indices need to match ({ep_ids}).") + + ep_dict = {} + for cam_idx in range(num_cameras): + ep_dict.update(eps_metadata_per_cam[cam_idx][ep_idx]) + episods_metadata.append(ep_dict) + + return episods_metadata + + +import json +from pathlib import Path + +def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: + """ + Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task. + Returns a dict: + { task_id: { "task_string": ..., "ep_start": ..., "ep_end": ... } } + """ + task_ranges = {} + task_id = 0 + current_task_str = None + ep_start = None + ep_end = None + + with open(episodes_jsonl_path, "r") as f: + for line in f: + if not line.strip(): + continue + ep = json.loads(line) + ep_idx = ep["episode_index"] + task_str = ep["tasks"][0] if ep["tasks"] else "UNKNOWN" + + if current_task_str is None: + current_task_str = task_str + ep_start = ep_idx + ep_end = ep_idx + elif task_str == current_task_str: + ep_end = ep_idx + else: + # close previous task group + task_ranges[task_id] = { + "task_string": current_task_str, + "ep_start": ep_start, + "ep_end": ep_end, + } + task_id += 1 + # start new one + current_task_str = task_str + ep_start = ep_idx + ep_end = ep_idx + + # store last task + if current_task_str is not None: + task_ranges[task_id] = { + "task_string": current_task_str, + "ep_start": ep_start, + "ep_end": ep_end, + } + + return task_ranges + +def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: + """ + Load only the episodes belonging to a specific task, inferred automatically from episode ranges. + + Args: + local_dir (Path): Root path containing legacy meta/episodes.jsonl + task_id (int): Which task to load (key from the inferred task_ranges dict) + task_ranges (dict): Mapping from infer_task_episode_ranges() + step (int): Episode index step (Behavior-1K = 10) + """ + all_episodes = legacy_load_episodes(local_dir) + + # get the range for this task + if task_id not in task_ranges: + raise ValueError(f"Task id {task_id} not found in task_ranges") + + ep_start = task_ranges[task_id]["ep_start"] + ep_end = task_ranges[task_id]["ep_end"] + + task_episode_indices = range(ep_start, ep_end + step, step) + return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes} + +def legacy_load_episodes(local_dir: Path) -> dict: + episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH) + return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])} + +def legacy_load_episodes_stats(local_dir: Path) -> dict: + episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH) + return { + item["episode_index"]: cast_stats_to_numpy(item["stats"]) + for item in sorted(episodes_stats, key=lambda x: x["episode_index"]) + } + +def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: + all_stats = legacy_load_episodes_stats(local_dir) + + if task_id not in task_ranges: + raise ValueError(f"Task id {task_id} not found in task_ranges") + + ep_start = task_ranges[task_id]["ep_start"] + ep_end = task_ranges[task_id]["ep_end"] + + task_episode_indices = range(ep_start, ep_end + step, step) + return {i: all_stats[i] for i in task_episode_indices if i in all_stats} + +def generate_episode_metadata_dict( + episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None +): + num_episodes = len(episodes_metadata) + episodes_legacy_metadata_vals = list(episodes_legacy_metadata.values()) + episodes_stats_vals = list(episodes_stats.values()) + episodes_stats_keys = list(episodes_stats.keys()) + + for i in range(num_episodes): + ep_legacy_metadata = episodes_legacy_metadata_vals[i] + ep_metadata = episodes_metadata[i] + ep_stats = episodes_stats_vals[i] + + ep_ids_set = { + ep_legacy_metadata["episode_index"], + ep_metadata["episode_index"], + episodes_stats_keys[i], + } + + if episodes_videos is None: + ep_video = {} + else: + ep_video = episodes_videos[i] + ep_ids_set.add(ep_video["episode_index"]) + # we skip this check because ep_ids have a step of 10, whereas we convert with a step of 1 + # if len(ep_ids_set) != 1: + # raise ValueError(f"Number of episodes is not the same ({ep_ids_set}).") + + ep_dict = {**ep_metadata, **ep_video, **ep_legacy_metadata, **flatten_dict({"stats": ep_stats})} + ep_dict["meta/episodes/chunk_index"] = 0 + ep_dict["meta/episodes/file_index"] = 0 + yield ep_dict + +def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None): + logging.info(f"Converting episodes metadata from {root} to {new_root}") + + # filter by task + episodes_legacy_metadata = legacy_load_episodes_task(root, task_id=task_id, task_ranges=task_ranges) + episodes_stats = legacy_load_episodes_stats_task(root, task_id=task_id, task_ranges=task_ranges) + + num_eps_set = {len(episodes_legacy_metadata), len(episodes_metadata)} + if episodes_video_metadata is not None: + num_eps_set.add(len(episodes_video_metadata)) + + if len(num_eps_set) != 1: + raise ValueError(f"Number of episodes is not the same ({num_eps_set}).") + + ds_episodes = Dataset.from_generator( + lambda: generate_episode_metadata_dict( + episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_video_metadata + ) + ) + write_episodes(ds_episodes, new_root) + + stats = aggregate_stats(list(episodes_stats.values())) + write_stats(stats, new_root) + +import shutil +from pathlib import Path + +def convert_dataset_local( + data_path: Path, + new_repo: Path, + task_id: int, + data_file_size_in_mb: int = DEFAULT_DATA_FILE_SIZE_IN_MB, + video_file_size_in_mb: int = DEFAULT_VIDEO_FILE_SIZE_IN_MB, + force_conversion: bool = False, +): + """ + Convert a local dataset to v3.x format, task-by-task, without using the Hugging Face Hub. + + Args: + data_path (Path): path to local dataset root (e.g. /fsx/.../2025-challenge-demos) + new_repo (Path): path where converted dataset will be written (e.g. /fsx/.../behavior1k_v3) + task_id (int): which task to convert (index) + data_file_size_in_mb (int): max size per data chunk + video_file_size_in_mb (int): max size per video chunk + force_conversion (bool): overwrite existing conversion if True + """ + + root = Path(data_path) + new_root = Path(new_repo) + + # Clean up if needed + if new_root.exists() and force_conversion: + shutil.rmtree(new_root) + new_root.mkdir(parents=True, exist_ok=True) + + print(f"🔹 Starting conversion for task {task_id}") + print(f"Input root: {root}") + print(f"Output root: {new_root}") + STEP = 10 + # Infer task episode ranges + EPISODES_META_PATH = root / "meta" / "episodes.jsonl" + task_ranges = infer_task_episode_ranges(EPISODES_META_PATH) + # def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step): + convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, EPISODES_META_PATH, task_id, task_ranges, STEP) + convert_tasks(root, new_root, task_id) + episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id) + episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id) + convert_episodes_metadata(root, new_root, episodes_metadata, task_id=task_id, task_ranges=task_ranges, episodes_video_metadata=episodes_videos_metadata) + + print(f"✅ Conversion complete for task {task_id}") + print(f"Converted dataset written to: {new_root}") + +if __name__ == "__main__": + import argparse + from pathlib import Path + + init_logging() + + parser = argparse.ArgumentParser(description="Convert Behavior-1K tasks to LeRobot v3 format (local only)") parser.add_argument( - "--repo-id", + "--data-path", type=str, required=True, - help="Output repository ID (e.g., 'username/behavior-1k-assembling-gift-baskets')", + help="Path to the local Behavior-1K dataset (e.g. /fsx/francesco_capuano/.cache/behavior-1k/2025-challenge-demos)", ) parser.add_argument( - "--task-id", type=int, required=True, help="Task ID to convert (e.g., 0 for assembling_gift_baskets)" + "--new-repo", + type=str, + required=True, + help="Path to the output directory for the converted dataset", ) parser.add_argument( - "--push-to-hub", action="store_true", help="Push dataset to HuggingFace Hub after conversion" + "--task-id", + type=int, + required=True, + help="Task index to convert (e.g. 0, 1, 2, ...)", + ) + parser.add_argument( + "--data-file-size-in-mb", + type=int, + default=DEFAULT_DATA_FILE_SIZE_IN_MB, + help=f"Maximum size per data chunk (default: {DEFAULT_DATA_FILE_SIZE_IN_MB})", + ) + parser.add_argument( + "--video-file-size-in-mb", + type=int, + default=DEFAULT_VIDEO_FILE_SIZE_IN_MB, + help=f"Maximum size per video chunk (default: {DEFAULT_VIDEO_FILE_SIZE_IN_MB})", + ) + parser.add_argument( + "--force-conversion", + action="store_true", + help="Force overwrite of existing conversion output if present.", ) args = parser.parse_args() - # Convert single task to dataset - convert_task_to_dataset( - data_folder=args.data_folder, - repo_id=args.repo_id, + convert_dataset_local( + data_path=Path(args.data_path), + new_repo=Path(args.new_repo), task_id=args.task_id, - push_to_hub=args.push_to_hub, + data_file_size_in_mb=args.data_file_size_in_mb, + video_file_size_in_mb=args.video_file_size_in_mb, + force_conversion=args.force_conversion, ) - - -if __name__ == "__main__": - main()