From f40e09f48154be47bfec35d5a152123356eaf44e Mon Sep 17 00:00:00 2001 From: Qizhi Chen Date: Thu, 11 Jun 2026 22:16:44 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=92=A5=20Add=20generic=20converter=20pipe?= =?UTF-8?q?line=20(#104)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add generic converter pipeline Co-authored-by: Codex * Update generic converter README Co-authored-by: Codex * Simplify generic converter README Co-authored-by: Codex * Simplify adapter task loading API Co-authored-by: Codex * Require adapter output path Co-authored-by: Codex * Use adapter temp output path for LIBERO Co-authored-by: Codex * Remove LIBERO changes from generic converter PR Co-authored-by: Codex * update readme --------- Co-authored-by: Codex --- README.md | 4 +- generic_converter/README.md | 102 ++++++++++++++++ generic_converter/__init__.py | 11 ++ generic_converter/adapter.py | 32 +++++ generic_converter/pipeline.py | 221 ++++++++++++++++++++++++++++++++++ generic_converter/utils.py | 39 ++++++ 6 files changed, 408 insertions(+), 1 deletion(-) create mode 100644 generic_converter/README.md create mode 100644 generic_converter/__init__.py create mode 100644 generic_converter/adapter.py create mode 100644 generic_converter/pipeline.py create mode 100644 generic_converter/utils.py diff --git a/README.md b/README.md index 8e0a582..15e3fc6 100644 --- a/README.md +++ b/README.md @@ -20,14 +20,15 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg ## πŸ“£ What's New +- **\[2026.06.12\]** We have provided an efficient and concise Generic Converter for building new dataset-to-LeRobot converters with minimal adapter code! πŸ”₯πŸ”₯πŸ”₯ - **\[2026.03.20\]** We have supported Data Conversion from RoboCasa to LeRobot! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.10.04\]** We have collected and updated all Dataset Version Conversion Scripts for LeRobot! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.09.28\]** We have upgraded LeRobotDataset from v2.1 to v3.0! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.06.27\]** We have supported Data Conversion from LIBERO to LeRobot! πŸ”₯πŸ”₯πŸ”₯ -- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! πŸ”₯πŸ”₯πŸ”₯
More News +- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.05.12\]** We have supported Data Conversion from RoboMIND to LeRobot! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.04.15\]** We add Dataset Merging Tool for merging multi-source lerobot datasets! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.04.14\]** We have supported Data Conversion from AgiBotWorld to LeRobot! πŸ”₯πŸ”₯πŸ”₯ @@ -47,6 +48,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg - ​**​Data Conversion​**​: + - [x] [Generic Converter](./generic_converter/README.md) - [x] [Open X-Embodiment to LeRobot](./openx2lerobot/README.md) - [x] [AgiBot-World to LeRobot](./agibot2lerobot/README.md) - [x] [RoboMIND to LeRobot](./robomind2lerobot/README.md) diff --git a/generic_converter/README.md b/generic_converter/README.md new file mode 100644 index 0000000..6614df2 --- /dev/null +++ b/generic_converter/README.md @@ -0,0 +1,102 @@ +# Generic Converter + +Shared conversion flow for turning task-based source datasets into LeRobot +datasets. + +The generic package owns the execution mechanics: + +- create one temporary `LeRobotDataset` per `ConversionTask` +- run tasks with a local or Ray Datatrove executor +- aggregate temporary datasets into the adapter output directory +- remove temporary task outputs by default +- optionally push the aggregated dataset to the Hub + +Dataset-specific converters own the adapter logic: + +- where raw inputs come from +- how tasks are discovered or loaded +- how one raw input is converted into LeRobot episodes +- how task metadata, such as language instructions, is represented + +## Files + +- `adapter.py`: `BaseAdapter`, the class dataset adapters inherit from. +- `pipeline.py`: the reusable conversion, executor, aggregation, cleanup, and push flow. +- `utils.py`: shared types and small helpers. + +## Adapter Contract + +A dataset converter should subclass `BaseAdapter`, pass `output_path` to the +base constructor, and provide dataset-level metadata as class attributes. + +Required attributes: + +- `dataset_type` +- `fps` +- `robot_type` +- `features` + +Optional attributes: + +- `tags` + +Required methods: + +- `load_tasks(self) -> list[ConversionTask]` +- `load_subset(self, task: ConversionTask) -> Iterable[Sequence[dict]]` + +`run_converter` reads `adapter.output_path` and calls `adapter.load_tasks()` +without arguments. Store paths, task manifests, or other adapter options on the +adapter instance in `__init__`. + +Use `adapter.temp_output_path` when building task-level temporary output paths. + +`load_subset` receives the full `ConversionTask`, not just an input path. Use +`task.input_path` for raw data and `task.metadata` for dataset-specific values +such as language instructions. Each yielded episode must be a sequence of frame +dictionaries accepted by `LeRobotDataset.add_frame`; each frame should include +the LeRobot `task` field when language tasks are needed. + +## ConversionTask + +`ConversionTask` describes one independently convertible raw input: + +- `input_path`: source file or directory +- `output_path`: temporary LeRobot dataset directory for this task +- `local_repo_id`: repo id used while writing the temporary dataset +- `metadata`: adapter-owned metadata + +Keep dataset-specific values in `metadata`; the generic pipeline does not know +about task-file schemas or instruction formats. + +## Usage Sketch + +```python +from generic_converter import BaseAdapter, ConversionTask, run_converter + + +class MyAdapter(BaseAdapter): + dataset_type = "my_dataset" + fps = 20 + robot_type = "my_robot" + features = MY_FEATURES + tags = ["my_dataset"] + + def __init__(self, output_path): + super().__init__(output_path) + + def load_tasks(self) -> list[ConversionTask]: + ... + + def load_subset(self, task: ConversionTask): + ... + + +run_converter( + adapter=adapter, + executor="local", + cpus_per_task=1, + tasks_per_job=1, + workers=-1, +) +``` diff --git a/generic_converter/__init__.py b/generic_converter/__init__.py new file mode 100644 index 0000000..b8dede6 --- /dev/null +++ b/generic_converter/__init__.py @@ -0,0 +1,11 @@ +from .adapter import BaseAdapter +from .pipeline import run_converter +from .utils import ConversionTask, FeatureSpec, TaskMetadata + +__all__ = [ + "BaseAdapter", + "ConversionTask", + "FeatureSpec", + "TaskMetadata", + "run_converter", +] diff --git a/generic_converter/adapter.py b/generic_converter/adapter.py new file mode 100644 index 0000000..be58609 --- /dev/null +++ b/generic_converter/adapter.py @@ -0,0 +1,32 @@ +from abc import ABC, abstractmethod +from collections.abc import Iterable, Sequence +from pathlib import Path + +from .utils import ConversionTask, FeatureSpec + + +class BaseAdapter(ABC): + """Dataset-specific hooks used by the generic conversion pipeline.""" + + dataset_type: str + fps: int + robot_type: str + features: FeatureSpec + tags: Sequence[str] = () + + def __init__(self, output_path: Path): + self.output_path = output_path.expanduser().resolve() + + @property + def temp_output_path(self) -> Path: + return self.output_path.with_name(f"{self.output_path.name}_temp") + + @abstractmethod + def load_tasks(self) -> list[ConversionTask]: + """Build conversion tasks from dataset-specific inputs.""" + + @abstractmethod + def load_subset( + self, task: ConversionTask + ) -> Iterable[Sequence[dict]]: + """Yield LeRobot episodes for one raw input path.""" diff --git a/generic_converter/pipeline.py b/generic_converter/pipeline.py new file mode 100644 index 0000000..acad3dd --- /dev/null +++ b/generic_converter/pipeline.py @@ -0,0 +1,221 @@ +import os +import shutil +import sys +from collections.abc import Sequence +from pathlib import Path + +from datatrove.pipeline.base import PipelineStep +from lerobot.datasets import LeRobotDataset +from lerobot.datasets.aggregate import aggregate_datasets + +from .adapter import BaseAdapter +from .utils import ( + ConversionTask, + setup_logger, + unique_strings, +) + + +class SaveLeRobotDataset(PipelineStep): + name = "Save Temp LeRobotDataset" + + def __init__(self, tasks: list[ConversionTask], adapter: BaseAdapter): + super().__init__() + self.tasks = tasks + self.adapter = adapter + self.type = f"{adapter.dataset_type}2lerobot" + + def run(self, data=None, rank: int = 0, world_size: int = 1): + logger = setup_logger() + task = self.tasks[rank] + + if task.output_path.exists(): + shutil.rmtree(task.output_path) + + dataset = LeRobotDataset.create( + repo_id=task.local_repo_id, + root=task.output_path, + fps=self.adapter.fps, + robot_type=self.adapter.robot_type, + features=self.adapter.features, + ) + + logger.info( + f"start processing for {task.input_path}, saving to {task.output_path}" + ) + raw_dataset = self.adapter.load_subset(task) + for episode_index, episode_data in enumerate(raw_dataset): + with self.track_time("saving episode"): + for frame in episode_data: + dataset.add_frame(frame) + dataset.save_episode() + logger.info( + f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}" + ) + dataset.finalize() + + +def run_converter( + adapter: BaseAdapter, + executor: str, + cpus_per_task: int, + tasks_per_job: int, + workers: int, + resume_dir: str | None = None, + debug: bool = False, + local_repo_id: str | None = None, + hub_repo_id: str | None = None, + push_to_hub: bool = False, + cleanup_temp: bool = True, + extra_tags: Sequence[str] | None = None, +) -> Path: + tasks = adapter.load_tasks() + output_path = adapter.output_path + + if not tasks: + raise ValueError( + "No conversion tasks found. Provide a non-empty tasks file or matching source files." + ) + if cpus_per_task < 1: + raise ValueError("--cpus-per-task must be >= 1") + + output_path.mkdir(parents=True, exist_ok=True) + + if debug: + executor = "local" + workers = 1 + tasks = tasks[:2] + push_to_hub = False + + match executor: + case "local": + from datatrove.executor import LocalPipelineExecutor + + resolved_workers = ( + max(1, (os.cpu_count() or 1) // cpus_per_task) + if workers == -1 + else workers + ) + executor_cls, executor_config = LocalPipelineExecutor, { + "tasks": len(tasks), + "workers": resolved_workers, + } + case "ray": + import ray + from datatrove.executor import RayPipelineExecutor + from ray.runtime_env import RuntimeEnv + + runtime_env = RuntimeEnv(env_vars=_build_ray_env_vars()) + ray.init(runtime_env=runtime_env) + executor_cls, executor_config = RayPipelineExecutor, { + "tasks": len(tasks), + "workers": workers, + "cpus_per_task": cpus_per_task, + "tasks_per_job": tasks_per_job, + } + case _: + raise ValueError(f"Executor {executor} not supported") + + executor_cls( + pipeline=[SaveLeRobotDataset(tasks, adapter)], + **executor_config, + logging_dir=str(resume_dir) if resume_dir else None, + ).run() + aggregate_tasks(tasks, output_path, aggr_repo_id=local_repo_id) + + if cleanup_temp: + logger = setup_logger() + logger.info("Delete temp data_dir") + for temp_dir in [task.output_path for task in tasks]: + shutil.rmtree(temp_dir, ignore_errors=True) + + if push_to_hub: + if hub_repo_id is None: + raise ValueError("--repo-id is required when --push-to-hub is set") + + tags = unique_strings( + [ + "LeRobot", + adapter.dataset_type, + adapter.robot_type, + *adapter.tags, + *(extra_tags or []), + ] + ) + LeRobotDataset( + repo_id=hub_repo_id, + root=output_path, + ).push_to_hub( + tags=tags, + private=False, + push_videos=True, + license="apache-2.0", + upload_large_folder=False, + ) + + return output_path + + +def _build_ray_env_vars() -> dict[str, str]: + env_vars = { + "HDF5_USE_FILE_LOCKING": "FALSE", + "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE", + "SVT_LOG": "1", + } + pythonpath = _build_ray_pythonpath() + if pythonpath: + env_vars["PYTHONPATH"] = pythonpath + return env_vars + + +def _build_ray_pythonpath() -> str: + repo_root = Path(__file__).resolve().parents[1] + paths: list[str] = [] + + def add_path(path_value: str | Path): + path = Path(path_value).expanduser() + try: + path = path.resolve() + except OSError: + return + if not path.exists(): + return + path_str = str(path) + if path_str not in paths: + paths.append(path_str) + + add_path(repo_root) + add_path(Path.cwd()) + for path in sys.path: + if path: + add_path(path) + for path in os.environ.get("PYTHONPATH", "").split(os.pathsep): + if path: + add_path(path) + + return os.pathsep.join(paths) + + +def aggregate_tasks( + tasks: list[ConversionTask], + output_dir: Path, + aggr_repo_id: str | None = None, +): + logger = setup_logger() + + if output_dir.exists(): + shutil.rmtree(output_dir) + + roots = [task.output_path for task in tasks] + resolved_aggr_repo_id = aggr_repo_id or output_dir.name + + logger.info( + f"aggregate {len(tasks)} temporary datasets into {output_dir} as {resolved_aggr_repo_id}" + ) + aggregate_datasets( + repo_ids=[None] * len(tasks), + roots=roots, + aggr_repo_id=resolved_aggr_repo_id, + aggr_root=output_dir, + ) + logger.info(f"aggregation complete: {output_dir}") diff --git a/generic_converter/utils.py b/generic_converter/utils.py new file mode 100644 index 0000000..a928e1b --- /dev/null +++ b/generic_converter/utils.py @@ -0,0 +1,39 @@ +import shutil +from collections.abc import Mapping, Sequence +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +TaskMetadata = Mapping[str, Any] +FeatureSpec = Mapping[str, dict] + + +@dataclass(frozen=True) +class ConversionTask: + """One independently convertible raw input file and adapter metadata.""" + + input_path: Path + output_path: Path + local_repo_id: str | None = None + metadata: TaskMetadata = field(default_factory=dict) + + +def setup_logger(): + import sys + + from datatrove.utils.logging import logger + + logger.remove() + logger.add(sys.stdout, level="INFO", colorize=True) + return logger + + +def unique_strings(values: Sequence[str]) -> list[str]: + result = [] + seen = set() + for value in values: + if value in seen: + continue + result.append(value) + seen.add(value) + return result