diff --git a/README.md b/README.md index c8d387a..9ec98a1 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg - [x] [LeRobotv2.0 to LeRobotv2.1](./ds_version_convert/v20_to_v21/README.md) - [x] [LeRobotv2.1 to LeRobotv2.0](./ds_version_convert/v21_to_v20/README.md) - [x] [LeRobotv2.1 to LeRobotv3.0](./ds_version_convert/v21_to_v30/README.md) - - [ ] LeRobotv3.0 to LeRobotv2.1 + - [x] [LeRobotv3.0 to LeRobotv2.1](./ds_version_convert/v30_to_v21/README.md) - [**Want more features?**](https://github.com/Tavish9/any4lerobot/issues/new?template=feature-request.yml) diff --git a/ds_version_convert/v30_to_v21/README.md b/ds_version_convert/v30_to_v21/README.md new file mode 100644 index 0000000..2b98f81 --- /dev/null +++ b/ds_version_convert/v30_to_v21/README.md @@ -0,0 +1,17 @@ +# LeRobot Dataset v30 to v21 + +## Get started + +1. Install v3.0 lerobot + + ```bash + git clone https://github.com/huggingface/lerobot.git + pip install -e . + ``` + +2. Run the converter: + ```bash + python convert_dataset_v30_to_v21.py \ + --repo-id=your_id \ + --root=your_local_dir + ``` diff --git a/ds_version_convert/v30_to_v21/convert_dataset_v30_to_v21.py b/ds_version_convert/v30_to_v21/convert_dataset_v30_to_v21.py new file mode 100644 index 0000000..6561277 --- /dev/null +++ b/ds_version_convert/v30_to_v21/convert_dataset_v30_to_v21.py @@ -0,0 +1,489 @@ +"""Utilities to convert a LeRobot dataset from codebase version v3.0 back to v2.1. + +The script mirrors :mod:`lerobot.datasets.v21.convert_dataset_v21_to_v30` but applies the reverse +transformations so an existing dataset created with the new consolidated file +layout can be ported back to the legacy per-episode structure. + +Usage examples +-------------- + +Convert a dataset that already exists locally:: + + python convert_dataset_v30_to_v21.py \ + --repo-id=lerobot/pusht \ + --root=/path/to/dataset + +""" + +from __future__ import annotations + +import argparse +import logging +import math +import shutil +import subprocess +import sys +from collections import defaultdict +from pathlib import Path +from typing import Any, Iterable + +import jsonlines +import numpy as np +import pyarrow.parquet as pq +import tqdm +from huggingface_hub import snapshot_download +from lerobot.datasets.utils import ( + DEFAULT_CHUNK_SIZE, + DEFAULT_DATA_PATH, + DEFAULT_VIDEO_PATH, + EPISODES_DIR, + LEGACY_EPISODES_PATH, + LEGACY_EPISODES_STATS_PATH, + LEGACY_TASKS_PATH, + load_info, + load_tasks, + serialize_dict, + unflatten_dict, + write_info, +) +from lerobot.utils.constants import HF_LEROBOT_HOME +from lerobot.utils.utils import init_logging + +V21 = "v2.1" +V30 = "v3.0" + +LEGACY_DATA_PATH_TEMPLATE = "data/chunk-{chunk_index:03d}/episode_{episode_index:06d}.parquet" +LEGACY_VIDEO_PATH_TEMPLATE = "videos/chunk-{chunk_index:03d}/{video_key}/episode_{episode_index:06d}.mp4" +MIN_VIDEO_DURATION = 1e-6 +LEGACY_STATS_KEYS = ("mean", "std", "min", "max", "q01", "q99") + + +def _to_serializable(value: Any) -> Any: + """Convert numpy/pyarrow values into standard Python types for JSON dumps.""" + + if isinstance(value, np.ndarray): + return value.tolist() + if isinstance(value, np.generic): + return value.item() + if isinstance(value, (list, tuple)): + return [_to_serializable(item) for item in value] + if isinstance(value, dict): + return {key: _to_serializable(val) for key, val in value.items()} + return value + + +def load_episode_records(root: Path) -> list[dict[str, Any]]: + """Load the consolidated metadata rows stored in ``meta/episodes``.""" + + episodes_dir = root / EPISODES_DIR + pq_paths = sorted(episodes_dir.glob("chunk-*/file-*.parquet")) + if not pq_paths: + raise FileNotFoundError(f"No episode parquet files found in {episodes_dir}.") + + records: list[dict[str, Any]] = [] + for pq_path in pq_paths: + table = pq.read_table(pq_path) + records.extend(table.to_pylist()) + + records.sort(key=lambda rec: int(rec["episode_index"])) + return records + + +def convert_tasks(root: Path, new_root: Path) -> None: + logging.info("Converting tasks parquet to legacy JSONL") + tasks = load_tasks(root) + tasks = tasks.sort_values("task_index") + + out_path = new_root / LEGACY_TASKS_PATH + out_path.parent.mkdir(parents=True, exist_ok=True) + + with jsonlines.open(out_path, mode="w") as writer: + for task, row in tasks.iterrows(): + writer.write( + { + "task_index": int(row["task_index"]), + "task": _to_serializable(task), + } + ) + + +def convert_info( + root: Path, + new_root: Path, + episode_records: list[dict[str, Any]], + video_keys: list[str], +) -> None: + info = load_info(root) + logging.info("Converting info.json metadata to v2.1 schema") + + total_episodes = info.get("total_episodes") or len(episode_records) + chunks_size = info.get("chunks_size", DEFAULT_CHUNK_SIZE) + + info["codebase_version"] = V21 + + # Restore legacy layout templates. + info["data_path"] = LEGACY_DATA_PATH_TEMPLATE + if info.get("video_path") is not None and len(video_keys) > 0: + info["video_path"] = LEGACY_VIDEO_PATH_TEMPLATE + else: + info["video_path"] = None + + # Remove v3-specific sizing hints which do not exist in v2.1. + info.pop("data_files_size_in_mb", None) + info.pop("video_files_size_in_mb", None) + + # Restore per-feature metadata: camera entries already contain their own fps. + for key, ft in info["features"].items(): + if ft.get("dtype") != "video": + ft.pop("fps", None) + + info["total_chunks"] = math.ceil(total_episodes / chunks_size) if total_episodes > 0 else 0 + info["total_videos"] = total_episodes * len(video_keys) + + write_info(info, new_root) + + +def _group_episodes_by_data_file( + episode_records: Iterable[dict[str, Any]], +) -> dict[tuple[int, int], list[dict[str, Any]]]: + grouped: dict[tuple[int, int], list[dict[str, Any]]] = defaultdict(list) + for record in episode_records: + key = ( + int(record["data/chunk_index"]), + int(record["data/file_index"]), + ) + grouped[key].append(record) + return grouped + + +def convert_data(root: Path, new_root: Path, episode_records: list[dict[str, Any]]) -> None: + logging.info("Converting consolidated parquet files back to per-episode files") + grouped = _group_episodes_by_data_file(episode_records) + + for (chunk_idx, file_idx), records in tqdm.tqdm(grouped.items(), desc="convert data files"): + source_path = root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) + if not source_path.exists(): + raise FileNotFoundError(f"Expected source parquet file not found: {source_path}") + + table = pq.read_table(source_path) + records = sorted(records, key=lambda rec: int(rec["dataset_from_index"])) + file_offset = int(records[0]["dataset_from_index"]) + + for record in records: + episode_index = int(record["episode_index"]) + start = int(record["dataset_from_index"]) - file_offset + stop = int(record["dataset_to_index"]) - file_offset + length = stop - start + + if length <= 0: + raise ValueError( + "Invalid episode length computed during data conversion: " + f"episode_index={episode_index}, length={length}" + ) + + episode_table = table.slice(start, length) + + dest_chunk = episode_index // DEFAULT_CHUNK_SIZE + dest_path = new_root / LEGACY_DATA_PATH_TEMPLATE.format( + chunk_index=dest_chunk, + episode_index=episode_index, + ) + dest_path.parent.mkdir(parents=True, exist_ok=True) + pq.write_table(episode_table, dest_path) + + +def _group_episodes_by_video_file( + episode_records: Iterable[dict[str, Any]], + video_key: str, +) -> dict[tuple[int, int], list[dict[str, Any]]]: + grouped: dict[tuple[int, int], list[dict[str, Any]]] = defaultdict(list) + chunk_column = f"videos/{video_key}/chunk_index" + file_column = f"videos/{video_key}/file_index" + + for record in episode_records: + if chunk_column not in record or file_column not in record: + continue + chunk_idx = record.get(chunk_column) + file_idx = record.get(file_column) + if chunk_idx is None or file_idx is None: + continue + grouped[(int(chunk_idx), int(file_idx))].append(record) + return grouped + + +def _validate_video_paths(src: Path, dst: Path) -> None: + """Validate source and destination paths to prevent security issues.""" + + # Convert to Path objects if they aren't already + src = Path(src) + dst = Path(dst) + + # Resolve paths to handle symlinks and normalize them + try: + src_resolved = src.resolve() + dst_resolved = dst.resolve() + except OSError as exc: + raise ValueError(f"Invalid path provided: {exc}") from exc + + # Check that source file exists and is a regular file + if not src_resolved.exists(): + raise FileNotFoundError(f"Source video file does not exist: {src_resolved}") + + if not src_resolved.is_file(): + raise ValueError(f"Source path is not a regular file: {src_resolved}") + + # Validate file extensions for video files + valid_video_extensions = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".m4v"} + if src_resolved.suffix.lower() not in valid_video_extensions: + raise ValueError(f"Source file does not have a valid video extension: {src_resolved}") + + if dst_resolved.suffix.lower() not in valid_video_extensions: + raise ValueError(f"Destination file does not have a valid video extension: {dst_resolved}") + + # Check for path traversal attempts in the original paths + src_str = str(src) + dst_str = str(dst) + + # Ensure paths don't contain null bytes or other control characters + for path_str, name in [(src_str, "source"), (dst_str, "destination")]: + if "\0" in path_str: + raise ValueError(f"Path contains null bytes: {name} path") + if any(ord(c) < 32 and c not in ["\t", "\n", "\r"] for c in path_str): + raise ValueError(f"Path contains invalid control characters: {name} path") + + # Additional check: ensure resolved paths don't point to system directories + system_dirs = {"/etc", "/sys", "/proc", "/dev", "/boot", "/root"} + for resolved_path, name in [(src_resolved, "source"), (dst_resolved, "destination")]: + path_str = str(resolved_path) + for sys_dir in system_dirs: + if path_str.startswith(sys_dir + "/") or path_str == sys_dir: + raise ValueError(f"Path points to system directory: {name} path {resolved_path}") + + # Ensure the destination directory can be created safely + try: + dst_parent = dst_resolved.parent + if not dst_parent.exists(): + # Check if we can create the parent directory structure + dst_parent.resolve() + except OSError as exc: + raise ValueError(f"Cannot create destination directory: {exc}") from exc + + +def _extract_video_segment( + src: Path, + dst: Path, + start: float, + end: float, +) -> None: + # Validate paths to prevent security issues + _validate_video_paths(src, dst) + + # Validate numeric parameters to prevent injection + if not (0 <= start <= 86400): # 24 hours max + raise ValueError(f"Invalid start time: {start}") + if not (0 <= end <= 86400): # 24 hours max + raise ValueError(f"Invalid end time: {end}") + if start >= end: + raise ValueError(f"Start time {start} must be less than end time {end}") + + duration = max(end - start, MIN_VIDEO_DURATION) + + # Validate duration is reasonable + if duration > 3600: # 1 hour max + raise ValueError(f"Video segment duration too long: {duration} seconds") + + dst.parent.mkdir(parents=True, exist_ok=True) + + # Build command with validated parameters + cmd = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "debug", + "-ss", + f"{start:.6f}", + "-i", + str(src), + "-t", + f"{duration:.6f}", + "-c", + "copy", + "-avoid_negative_ts", + "1", + "-y", + str(dst), + ] + + try: + # Use more secure subprocess call with explicit timeout + result = subprocess.run( + cmd, + check=True, + timeout=300, # 5 minute timeout + capture_output=True, + text=True, + ) + except subprocess.TimeoutExpired as exc: + raise RuntimeError(f"ffmpeg timed out while processing video '{src}' -> '{dst}'") from exc + except FileNotFoundError as exc: + raise RuntimeError("ffmpeg executable not found; it is required for video conversion") from exc + except subprocess.CalledProcessError as exc: + error_msg = f"ffmpeg failed while splitting video '{src}' into '{dst}'" + if exc.stderr: + error_msg += f". Error: {exc.stderr.strip()}" + raise RuntimeError(error_msg) from exc + + +def convert_videos(root: Path, new_root: Path, episode_records: list[dict[str, Any]], video_keys: list[str]) -> None: + if len(video_keys) == 0: + logging.info("No video features detected; skipping video conversion") + return + + logging.info("Converting concatenated MP4 files back to per-episode videos") + + for video_key in video_keys: + grouped = _group_episodes_by_video_file(episode_records, video_key) + if len(grouped) == 0: + logging.info("No video metadata found for key '%s'; skipping", video_key) + continue + + for (chunk_idx, file_idx), records in tqdm.tqdm(grouped.items(), desc=f"convert videos ({video_key})"): + src_path = root / DEFAULT_VIDEO_PATH.format( + video_key=video_key, + chunk_index=chunk_idx, + file_index=file_idx, + ) + if not src_path.exists(): + raise FileNotFoundError(f"Expected MP4 file not found: {src_path}") + + records = sorted(records, key=lambda rec: float(rec[f"videos/{video_key}/from_timestamp"])) + + for record in records: + episode_index = int(record["episode_index"]) + start = float(record[f"videos/{video_key}/from_timestamp"]) + end = float(record[f"videos/{video_key}/to_timestamp"]) + + dest_chunk = episode_index // DEFAULT_CHUNK_SIZE + dest_path = new_root / LEGACY_VIDEO_PATH_TEMPLATE.format( + chunk_index=dest_chunk, + video_key=video_key, + episode_index=episode_index, + ) + + _extract_video_segment(src_path, dest_path, start=start, end=end) + + +def convert_episodes_metadata(new_root: Path, episode_records: list[dict[str, Any]]) -> None: + logging.info("Reconstructing legacy episodes and episodes_stats JSONL files") + + episodes_path = new_root / LEGACY_EPISODES_PATH + stats_path = new_root / LEGACY_EPISODES_STATS_PATH + episodes_path.parent.mkdir(parents=True, exist_ok=True) + + def _filter_stats(stats: dict[str, Any]) -> dict[str, Any]: + """Remove v3-only statistics keys so output matches the v2.1 schema.""" + + filtered: dict[str, Any] = {} + for feature, values in stats.items(): + if not isinstance(values, dict): + continue + keep = {k: v for k, v in values.items() if k in LEGACY_STATS_KEYS} + if keep: + filtered[feature] = keep + return filtered + + with ( + jsonlines.open(episodes_path, mode="w") as episodes_writer, + jsonlines.open(stats_path, mode="w") as stats_writer, + ): + for record in sorted(episode_records, key=lambda rec: int(rec["episode_index"])): + legacy_episode = { + key: value + for key, value in record.items() + if not key.startswith("data/") + and not key.startswith("videos/") + and not key.startswith("stats/") + and not key.startswith("meta/") + and key not in {"dataset_from_index", "dataset_to_index"} + } + + serializable_episode = {key: _to_serializable(value) for key, value in legacy_episode.items()} + episodes_writer.write(serializable_episode) + + stats_flat = {key: record[key] for key in record if key.startswith("stats/")} + stats_nested = unflatten_dict(stats_flat).get("stats", {}) + stats_serialized = serialize_dict(_filter_stats(stats_nested)) + stats_writer.write( + { + "episode_index": int(record["episode_index"]), + "stats": stats_serialized, + } + ) + + +def copy_ancillary_directories(root: Path, new_root: Path) -> None: + for subdir in ["images"]: + source = root / subdir + if source.exists(): + shutil.copytree(source, new_root / subdir, dirs_exist_ok=True) + + +def convert_dataset( + repo_id: str, + root: str | Path | None = None, +) -> None: + root = HF_LEROBOT_HOME / repo_id if root is None else Path(root) + + if not root.exists(): + snapshot_download( + repo_id, + repo_type="dataset", + revision=V30, + local_dir=root, + ) + + old_root = root.parent / f"{root.name}_{V30}" + new_root = root.parent / f"{root.name}_{V21}" + + if old_root.is_dir(): + shutil.rmtree(old_root) + if new_root.is_dir(): + shutil.rmtree(new_root) + + new_root.mkdir(parents=True, exist_ok=True) + + episode_records = load_episode_records(root) + video_keys = [key for key, ft in load_info(root)["features"].items() if ft.get("dtype") == "video"] + + convert_info(root, new_root, episode_records, video_keys) + convert_tasks(root, new_root) + convert_data(root, new_root, episode_records) + convert_videos(root, new_root, episode_records, video_keys) + convert_episodes_metadata(new_root, episode_records) + copy_ancillary_directories(root, new_root) + + shutil.move(str(root), str(old_root)) + shutil.move(str(new_root), str(root)) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument( + "--repo-id", + type=str, + required=True, + help="Repository identifier on Hugging Face (e.g. `lerobot/pusht`).", + ) + parser.add_argument( + "--root", + type=str, + default=None, + help="Path to the local dataset root directory. If not provided, the script will use the dataset from local.", + ) + return parser.parse_args() + + +if __name__ == "__main__": + init_logging() + args = parse_args() + convert_dataset(**vars(args))