from pathlib import Path import jsonlines DATA_PATH = Path("/fsx/francesco_capuano/.cache/behavior-1k/2025-challenge-demos") NEW_PATH = Path("/fsx/jade_choghari/.cache/behavior-1k-task0/") import argparse import logging import shutil from pathlib import Path from typing import Any import jsonlines import pandas as pd import pyarrow as pa import tqdm from datasets import Dataset, Features, Image from huggingface_hub import HfApi, snapshot_download from requests import HTTPError from lerobot.datasets.compute_stats import aggregate_stats from lerobot.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset from lerobot.datasets.utils import ( DEFAULT_CHUNK_SIZE, DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_DATA_PATH, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_VIDEO_PATH, LEGACY_EPISODES_PATH, LEGACY_EPISODES_STATS_PATH, LEGACY_TASKS_PATH, cast_stats_to_numpy, flatten_dict, get_file_size_in_mb, get_parquet_file_size_in_mb, get_parquet_num_frames, load_info, update_chunk_file_indices, write_episodes, write_info, write_stats, write_tasks, ) from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s from lerobot.utils.constants import HF_LEROBOT_HOME from lerobot.utils.utils import init_logging # script to convert one single task to v3.1 # TASK = 1 NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb") from lerobot.datasets.utils import load_info def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb): info = load_info(root) info["codebase_version"] = "v3.0" del info["total_videos"] info["data_files_size_in_mb"] = data_file_size_in_mb info["video_files_size_in_mb"] = video_file_size_in_mb info["data_path"] = DEFAULT_DATA_PATH info["video_path"] = DEFAULT_VIDEO_PATH if info["video_path"] is not None else None info["fps"] = int(info["fps"]) for key in info["features"]: if info["features"][key]["dtype"] == "video": # already has fps in video_info continue info["features"][key]["fps"] = info["fps"] info["total_episodes"] = get_total_episodes_task() info["total_frames"] = get_total_frames_task info["total_tasks"] = 1 info["total_videos"] = get_total_videos_task() info["chunks_size"] = breakpoint() write_info(info, new_root) # convert_info(DATA_PATH, 12, 24) def load_jsonlines(fpath: Path) -> list[any]: with jsonlines.open(fpath, "r") as reader: return list(reader) def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]: tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) # return tasks dict such that tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])} task_to_task_index = {task: task_index for task_index, task in tasks.items()} return tasks, task_to_task_index def convert_tasks(root, new_root, task_id: int): tasks, _ = legacy_load_tasks(root) if task_id not in tasks: raise ValueError(f"Task ID {task_id} not found in tasks (available: {list(tasks.keys())})") tasks = {task_id: tasks[task_id]} task_indices = tasks.keys() task_strings = tasks.values() df_tasks = pd.DataFrame({"task_index": task_indices}, index=task_strings) write_tasks(df_tasks, new_root) # convert_tasks(DATA_PATH) def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys): # TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets dataframes = [pd.read_parquet(file) for file in paths_to_cat] # Concatenate all DataFrames along rows concatenated_df = pd.concat(dataframes, ignore_index=True) path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) path.parent.mkdir(parents=True, exist_ok=True) if len(image_keys) > 0: schema = pa.Schema.from_pandas(concatenated_df) features = Features.from_arrow_schema(schema) for key in image_keys: features[key] = Image() schema = features.arrow_schema else: schema = None concatenated_df.to_parquet(path, index=False, schema=schema) def get_image_keys(root): info = load_info(root) features = info["features"] image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"] return image_keys def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int): task_dir_name = f"task-000{task_index}" data_dir = root / "data" / task_dir_name # print("data_dir", data_dir) ep_paths = sorted(data_dir.glob("*.parquet")) # ep_paths = sorted(data_dir.glob("*/*.parquet")) # print("ep_paths", ep_paths) image_keys = get_image_keys(root) ep_idx = 0 chunk_idx = 0 file_idx = 0 size_in_mb = 0 num_frames = 0 paths_to_cat = [] episodes_metadata = [] logging.info(f"Converting data files from {len(ep_paths)} episodes") for ep_path in tqdm.tqdm(ep_paths, desc="convert data files"): ep_size_in_mb = get_parquet_file_size_in_mb(ep_path) ep_num_frames = get_parquet_num_frames(ep_path) ep_metadata = { "episode_index": ep_idx, "data/chunk_index": chunk_idx, "data/file_index": file_idx, "dataset_from_index": num_frames, "dataset_to_index": num_frames + ep_num_frames, } size_in_mb += ep_size_in_mb num_frames += ep_num_frames episodes_metadata.append(ep_metadata) ep_idx += 1 if size_in_mb < data_file_size_in_mb: paths_to_cat.append(ep_path) continue if paths_to_cat: concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys) # Reset for the next file size_in_mb = ep_size_in_mb paths_to_cat = [ep_path] chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE) # Write remaining data if any if paths_to_cat: concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys) return episodes_metadata # episodes_metadata = convert_data(DATA_PATH, NEW_ROOT, 50, TASK) # print("episodes meta: ", episodes_metadata) def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int): # Access old paths to mp4 # videos_dir = root / "videos" # ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4")) task_dir_name = f"task-000{task_index}" videos_dir = root / "videos" / task_dir_name / video_key ep_paths = sorted(videos_dir.glob("*.mp4")) print("ep_paths", ep_paths) ep_idx = 0 chunk_idx = 0 file_idx = 0 size_in_mb = 0 duration_in_s = 0.0 paths_to_cat = [] episodes_metadata = [] for ep_path in tqdm.tqdm(ep_paths, desc=f"convert videos of {video_key}"): ep_size_in_mb = get_file_size_in_mb(ep_path) ep_duration_in_s = get_video_duration_in_s(ep_path) # Check if adding this episode would exceed the limit if size_in_mb + ep_size_in_mb >= video_file_size_in_mb and len(paths_to_cat) > 0: # Size limit would be exceeded, save current accumulation WITHOUT this episode # concatenate_video_files( # paths_to_cat, # new_root # / DEFAULT_VIDEO_PATH.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx), # ) # Update episodes metadata for the file we just saved for i, _ in enumerate(paths_to_cat): past_ep_idx = ep_idx - len(paths_to_cat) + i episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx # Move to next file and start fresh with current episode chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE) size_in_mb = 0 duration_in_s = 0.0 paths_to_cat = [] # Add current episode metadata ep_metadata = { "episode_index": ep_idx, f"videos/{video_key}/chunk_index": chunk_idx, # Will be updated when file is saved f"videos/{video_key}/file_index": file_idx, # Will be updated when file is saved f"videos/{video_key}/from_timestamp": duration_in_s, f"videos/{video_key}/to_timestamp": duration_in_s + ep_duration_in_s, } episodes_metadata.append(ep_metadata) # Add current episode to accumulation paths_to_cat.append(ep_path) size_in_mb += ep_size_in_mb duration_in_s += ep_duration_in_s ep_idx += 1 # Write remaining videos if any if paths_to_cat: concatenate_video_files( paths_to_cat, new_root / DEFAULT_VIDEO_PATH.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx), ) # Update episodes metadata for the final file for i, _ in enumerate(paths_to_cat): past_ep_idx = ep_idx - len(paths_to_cat) + i episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx return episodes_metadata def get_video_keys(root): info = load_info(root) features = info["features"] video_keys = [key for key, ft in features.items() if ft["dtype"] == "video"] return video_keys def convert_videos(root: Path, new_root: Path, video_file_size_in_mb: int, task_id: int): logging.info(f"Converting videos from {root} to {new_root}") video_keys = get_video_keys(root) if len(video_keys) == 0: return None video_keys = sorted(video_keys) eps_metadata_per_cam = [] for camera in video_keys: eps_metadata = convert_videos_of_camera(root, new_root, camera, video_file_size_in_mb, task_id) eps_metadata_per_cam.append(eps_metadata) num_eps_per_cam = [len(eps_cam_map) for eps_cam_map in eps_metadata_per_cam] if len(set(num_eps_per_cam)) != 1: raise ValueError(f"All cams dont have same number of episodes ({num_eps_per_cam}).") episods_metadata = [] num_cameras = len(video_keys) num_episodes = num_eps_per_cam[0] for ep_idx in tqdm.tqdm(range(num_episodes), desc="convert videos"): # Sanity check ep_ids = [eps_metadata_per_cam[cam_idx][ep_idx]["episode_index"] for cam_idx in range(num_cameras)] ep_ids += [ep_idx] if len(set(ep_ids)) != 1: raise ValueError(f"All episode indices need to match ({ep_ids}).") ep_dict = {} for cam_idx in range(num_cameras): ep_dict.update(eps_metadata_per_cam[cam_idx][ep_idx]) episods_metadata.append(ep_dict) return episods_metadata import json from pathlib import Path def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: """ Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task. Returns a dict: { task_id: { "task_string": ..., "ep_start": ..., "ep_end": ... } } """ task_ranges = {} task_id = 0 current_task_str = None ep_start = None ep_end = None with open(episodes_jsonl_path, "r") as f: for line in f: if not line.strip(): continue ep = json.loads(line) ep_idx = ep["episode_index"] task_str = ep["tasks"][0] if ep["tasks"] else "UNKNOWN" if current_task_str is None: current_task_str = task_str ep_start = ep_idx ep_end = ep_idx elif task_str == current_task_str: ep_end = ep_idx else: # close previous task group task_ranges[task_id] = { "task_string": current_task_str, "ep_start": ep_start, "ep_end": ep_end, } task_id += 1 # start new one current_task_str = task_str ep_start = ep_idx ep_end = ep_idx # store last task if current_task_str is not None: task_ranges[task_id] = { "task_string": current_task_str, "ep_start": ep_start, "ep_end": ep_end, } return task_ranges def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: """ Load only the episodes belonging to a specific task, inferred automatically from episode ranges. Args: local_dir (Path): Root path containing legacy meta/episodes.jsonl task_id (int): Which task to load (key from the inferred task_ranges dict) task_ranges (dict): Mapping from infer_task_episode_ranges() step (int): Episode index step (Behavior-1K = 10) """ all_episodes = legacy_load_episodes(local_dir) # get the range for this task if task_id not in task_ranges: raise ValueError(f"Task id {task_id} not found in task_ranges") ep_start = task_ranges[task_id]["ep_start"] ep_end = task_ranges[task_id]["ep_end"] task_episode_indices = range(ep_start, ep_end + step, step) return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes} def legacy_load_episodes(local_dir: Path) -> dict: episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH) return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])} # episodes_videos_metadata = convert_videos(DATA_PATH, NEW_ROOT, 50) # episodes_legacy_metadata = legacy_load_episodes(DATA_PATH) # episodes_task_0 = legacy_load_episodes_task(DATA_PATH, task_id=TASK, task_ranges=task_ranges) def legacy_load_episodes_stats(local_dir: Path) -> dict: episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH) return { item["episode_index"]: cast_stats_to_numpy(item["stats"]) for item in sorted(episodes_stats, key=lambda x: x["episode_index"]) } def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: all_stats = legacy_load_episodes_stats(local_dir) if task_id not in task_ranges: raise ValueError(f"Task id {task_id} not found in task_ranges") ep_start = task_ranges[task_id]["ep_start"] ep_end = task_ranges[task_id]["ep_end"] task_episode_indices = range(ep_start, ep_end + step, step) return {i: all_stats[i] for i in task_episode_indices if i in all_stats} # ep = legacy_load_episodes_stats_task(DATA_PATH, task_id=TASK, task_ranges=task_ranges) def generate_episode_metadata_dict( episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None ): num_episodes = len(episodes_metadata) episodes_legacy_metadata_vals = list(episodes_legacy_metadata.values()) episodes_stats_vals = list(episodes_stats.values()) episodes_stats_keys = list(episodes_stats.keys()) for i in range(num_episodes): ep_legacy_metadata = episodes_legacy_metadata_vals[i] ep_metadata = episodes_metadata[i] ep_stats = episodes_stats_vals[i] ep_ids_set = { ep_legacy_metadata["episode_index"], ep_metadata["episode_index"], episodes_stats_keys[i], } if episodes_videos is None: ep_video = {} else: ep_video = episodes_videos[i] ep_ids_set.add(ep_video["episode_index"]) # if len(ep_ids_set) != 1: # raise ValueError(f"Number of episodes is not the same ({ep_ids_set}).") ep_dict = {**ep_metadata, **ep_video, **ep_legacy_metadata, **flatten_dict({"stats": ep_stats})} ep_dict["meta/episodes/chunk_index"] = 0 ep_dict["meta/episodes/file_index"] = 0 yield ep_dict def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None): logging.info(f"Converting episodes metadata from {root} to {new_root}") # filter by task episodes_legacy_metadata = legacy_load_episodes_task(root, task_id=task_id, task_ranges=task_ranges) episodes_stats = legacy_load_episodes_stats_task(root, task_id=task_id, task_ranges=task_ranges) num_eps_set = {len(episodes_legacy_metadata), len(episodes_metadata)} if episodes_video_metadata is not None: num_eps_set.add(len(episodes_video_metadata)) if len(num_eps_set) != 1: raise ValueError(f"Number of episodes is not the same ({num_eps_set}).") ds_episodes = Dataset.from_generator( lambda: generate_episode_metadata_dict( episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_video_metadata ) ) write_episodes(ds_episodes, new_root) stats = aggregate_stats(list(episodes_stats.values())) write_stats(stats, new_root) import shutil from pathlib import Path def convert_dataset_local( data_path: Path, new_repo: Path, task_id: int, data_file_size_in_mb: int = DEFAULT_DATA_FILE_SIZE_IN_MB, video_file_size_in_mb: int = DEFAULT_VIDEO_FILE_SIZE_IN_MB, force_conversion: bool = False, ): """ Convert a local dataset to v3.x format, task-by-task, without using the Hugging Face Hub. Args: data_path (Path): path to local dataset root (e.g. /fsx/.../2025-challenge-demos) new_repo (Path): path where converted dataset will be written (e.g. /fsx/.../behavior1k_v3) task_id (int): which task to convert (index) data_file_size_in_mb (int): max size per data chunk video_file_size_in_mb (int): max size per video chunk force_conversion (bool): overwrite existing conversion if True """ root = Path(data_path) new_root = Path(new_repo) # Clean up if needed if new_root.exists() and force_conversion: shutil.rmtree(new_root) new_root.mkdir(parents=True, exist_ok=True) print(f"🔹 Starting conversion for task {task_id}") print(f"Input root: {root}") print(f"Output root: {new_root}") # Infer task episode ranges EPISODES_META_PATH = DATA_PATH / "meta" / "episodes.jsonl" task_ranges = infer_task_episode_ranges(EPISODES_META_PATH) convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb) convert_tasks(root, new_root, task_id) episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id) episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id) convert_episodes_metadata(root, new_root, episodes_metadata, task_id=task_id, task_ranges=task_ranges, episodes_video_metadata=episodes_videos_metadata) print(f"✅ Conversion complete for task {task_id}") print(f"Converted dataset written to: {new_root}") if __name__ == "__main__": import argparse from pathlib import Path init_logging() parser = argparse.ArgumentParser(description="Convert Behavior-1K tasks to LeRobot v3 format (local only)") parser.add_argument( "--data-path", type=str, required=True, help="Path to the local Behavior-1K dataset (e.g. /fsx/francesco_capuano/.cache/behavior-1k/2025-challenge-demos)", ) parser.add_argument( "--new-repo", type=str, required=True, help="Path to the output directory for the converted dataset", ) parser.add_argument( "--task-id", type=int, required=True, help="Task index to convert (e.g. 0, 1, 2, ...)", ) parser.add_argument( "--data-file-size-in-mb", type=int, default=DEFAULT_DATA_FILE_SIZE_IN_MB, help=f"Maximum size per data chunk (default: {DEFAULT_DATA_FILE_SIZE_IN_MB})", ) parser.add_argument( "--video-file-size-in-mb", type=int, default=DEFAULT_VIDEO_FILE_SIZE_IN_MB, help=f"Maximum size per video chunk (default: {DEFAULT_VIDEO_FILE_SIZE_IN_MB})", ) parser.add_argument( "--force-conversion", action="store_true", help="Force overwrite of existing conversion output if present.", ) args = parser.parse_args() convert_dataset_local( data_path=Path(args.data_path), new_repo=Path(args.new_repo), task_id=args.task_id, data_file_size_in_mb=args.data_file_size_in_mb, video_file_size_in_mb=args.video_file_size_in_mb, force_conversion=args.force_conversion, ) def convert_dataset( repo_id: str, branch: str | None = None, data_file_size_in_mb: int | None = None, video_file_size_in_mb: int | None = None, root: str | Path | None = None, push_to_hub: bool = True, force_conversion: bool = False, ): if data_file_size_in_mb is None: data_file_size_in_mb = DEFAULT_DATA_FILE_SIZE_IN_MB if video_file_size_in_mb is None: video_file_size_in_mb = DEFAULT_VIDEO_FILE_SIZE_IN_MB # First check if the dataset already has a v3.0 version if root is None and not force_conversion: try: print("Trying to download v3.0 version of the dataset from the hub...") snapshot_download(repo_id, repo_type="dataset", revision=V30, local_dir=HF_LEROBOT_HOME / repo_id) return except Exception: print("Dataset does not have an uploaded v3.0 version. Continuing with conversion.") # Set root based on whether local dataset path is provided use_local_dataset = False root = HF_LEROBOT_HOME / repo_id if root is None else Path(root) / repo_id if root.exists(): validate_local_dataset_version(root) use_local_dataset = True print(f"Using local dataset at {root}") old_root = root.parent / f"{root.name}_old" new_root = root.parent / f"{root.name}_v30" # Handle old_root cleanup if both old_root and root exist if old_root.is_dir() and root.is_dir(): shutil.rmtree(str(root)) shutil.move(str(old_root), str(root)) if new_root.is_dir(): shutil.rmtree(new_root) if not use_local_dataset: snapshot_download( repo_id, repo_type="dataset", revision=V21, local_dir=root, ) EPISODES_META_PATH = DATA_PATH / "meta" / "episodes.jsonl" task_ranges = infer_task_episode_ranges(EPISODES_META_PATH) convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb) # convert_tasks(root, new_root, TASK) # episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=TASK) # episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=TASK) # convert_episodes_metadata(root, new_root, episodes_metadata, task_id=TASK, task_ranges=task_ranges, episodes_videos_metadata=episodes_videos_metadata) shutil.move(str(root), str(old_root)) shutil.move(str(new_root), str(root)) if push_to_hub: hub_api = HfApi() try: hub_api.delete_tag(repo_id, tag=CODEBASE_VERSION, repo_type="dataset") except HTTPError as e: print(f"tag={CODEBASE_VERSION} probably doesn't exist. Skipping exception ({e})") pass hub_api.delete_files( delete_patterns=["data/chunk*/episode_*", "meta/*.jsonl", "videos/chunk*"], repo_id=repo_id, revision=branch, repo_type="dataset", ) hub_api.create_tag(repo_id, tag=CODEBASE_VERSION, revision=branch, repo_type="dataset") LeRobotDataset(repo_id).push_to_hub()