💥 Add generic converter pipeline (#104)

* Add generic converter pipeline

Co-authored-by: Codex <codex@openai.com>

* Update generic converter README

Co-authored-by: Codex <codex@openai.com>

* Simplify generic converter README

Co-authored-by: Codex <codex@openai.com>

* Simplify adapter task loading API

Co-authored-by: Codex <codex@openai.com>

* Require adapter output path

Co-authored-by: Codex <codex@openai.com>

* Use adapter temp output path for LIBERO

Co-authored-by: Codex <codex@openai.com>

* Remove LIBERO changes from generic converter PR

Co-authored-by: Codex <codex@openai.com>

* update readme

---------

Co-authored-by: Codex <codex@openai.com>
This commit is contained in:
Qizhi Chen
2026-06-11 22:16:44 -07:00
committed by GitHub
parent 2ef2370d66
commit f40e09f481
6 changed files with 408 additions and 1 deletions
+3 -1
View File
@@ -20,14 +20,15 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
## 📣 What's New <a><img width="35" height="20" src="https://user-images.githubusercontent.com/12782558/212848161-5e783dd6-11e8-4fe0-bbba-39ffb77730be.png"></a>
- **\[2026.06.12\]** We have provided an efficient and concise Generic Converter for building new dataset-to-LeRobot converters with minimal adapter code! 🔥🔥🔥
- **\[2026.03.20\]** We have supported Data Conversion from RoboCasa to LeRobot! 🔥🔥🔥
- **\[2025.10.04\]** We have collected and updated all Dataset Version Conversion Scripts for LeRobot! 🔥🔥🔥
- **\[2025.09.28\]** We have upgraded LeRobotDataset from v2.1 to v3.0! 🔥🔥🔥
- **\[2025.06.27\]** We have supported Data Conversion from LIBERO to LeRobot! 🔥🔥🔥
- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! 🔥🔥🔥
<details>
<summary>More News</summary>
- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! 🔥🔥🔥
- **\[2025.05.12\]** We have supported Data Conversion from RoboMIND to LeRobot! 🔥🔥🔥
- **\[2025.04.15\]** We add Dataset Merging Tool for merging multi-source lerobot datasets! 🔥🔥🔥
- **\[2025.04.14\]** We have supported Data Conversion from AgiBotWorld to LeRobot! 🔥🔥🔥
@@ -47,6 +48,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
- **Data Conversion**:
- [x] [Generic Converter](./generic_converter/README.md)
- [x] [Open X-Embodiment to LeRobot](./openx2lerobot/README.md)
- [x] [AgiBot-World to LeRobot](./agibot2lerobot/README.md)
- [x] [RoboMIND to LeRobot](./robomind2lerobot/README.md)
+102
View File
@@ -0,0 +1,102 @@
# Generic Converter
Shared conversion flow for turning task-based source datasets into LeRobot
datasets.
The generic package owns the execution mechanics:
- create one temporary `LeRobotDataset` per `ConversionTask`
- run tasks with a local or Ray Datatrove executor
- aggregate temporary datasets into the adapter output directory
- remove temporary task outputs by default
- optionally push the aggregated dataset to the Hub
Dataset-specific converters own the adapter logic:
- where raw inputs come from
- how tasks are discovered or loaded
- how one raw input is converted into LeRobot episodes
- how task metadata, such as language instructions, is represented
## Files
- `adapter.py`: `BaseAdapter`, the class dataset adapters inherit from.
- `pipeline.py`: the reusable conversion, executor, aggregation, cleanup, and push flow.
- `utils.py`: shared types and small helpers.
## Adapter Contract
A dataset converter should subclass `BaseAdapter`, pass `output_path` to the
base constructor, and provide dataset-level metadata as class attributes.
Required attributes:
- `dataset_type`
- `fps`
- `robot_type`
- `features`
Optional attributes:
- `tags`
Required methods:
- `load_tasks(self) -> list[ConversionTask]`
- `load_subset(self, task: ConversionTask) -> Iterable[Sequence[dict]]`
`run_converter` reads `adapter.output_path` and calls `adapter.load_tasks()`
without arguments. Store paths, task manifests, or other adapter options on the
adapter instance in `__init__`.
Use `adapter.temp_output_path` when building task-level temporary output paths.
`load_subset` receives the full `ConversionTask`, not just an input path. Use
`task.input_path` for raw data and `task.metadata` for dataset-specific values
such as language instructions. Each yielded episode must be a sequence of frame
dictionaries accepted by `LeRobotDataset.add_frame`; each frame should include
the LeRobot `task` field when language tasks are needed.
## ConversionTask
`ConversionTask` describes one independently convertible raw input:
- `input_path`: source file or directory
- `output_path`: temporary LeRobot dataset directory for this task
- `local_repo_id`: repo id used while writing the temporary dataset
- `metadata`: adapter-owned metadata
Keep dataset-specific values in `metadata`; the generic pipeline does not know
about task-file schemas or instruction formats.
## Usage Sketch
```python
from generic_converter import BaseAdapter, ConversionTask, run_converter
class MyAdapter(BaseAdapter):
dataset_type = "my_dataset"
fps = 20
robot_type = "my_robot"
features = MY_FEATURES
tags = ["my_dataset"]
def __init__(self, output_path):
super().__init__(output_path)
def load_tasks(self) -> list[ConversionTask]:
...
def load_subset(self, task: ConversionTask):
...
run_converter(
adapter=adapter,
executor="local",
cpus_per_task=1,
tasks_per_job=1,
workers=-1,
)
```
+11
View File
@@ -0,0 +1,11 @@
from .adapter import BaseAdapter
from .pipeline import run_converter
from .utils import ConversionTask, FeatureSpec, TaskMetadata
__all__ = [
"BaseAdapter",
"ConversionTask",
"FeatureSpec",
"TaskMetadata",
"run_converter",
]
+32
View File
@@ -0,0 +1,32 @@
from abc import ABC, abstractmethod
from collections.abc import Iterable, Sequence
from pathlib import Path
from .utils import ConversionTask, FeatureSpec
class BaseAdapter(ABC):
"""Dataset-specific hooks used by the generic conversion pipeline."""
dataset_type: str
fps: int
robot_type: str
features: FeatureSpec
tags: Sequence[str] = ()
def __init__(self, output_path: Path):
self.output_path = output_path.expanduser().resolve()
@property
def temp_output_path(self) -> Path:
return self.output_path.with_name(f"{self.output_path.name}_temp")
@abstractmethod
def load_tasks(self) -> list[ConversionTask]:
"""Build conversion tasks from dataset-specific inputs."""
@abstractmethod
def load_subset(
self, task: ConversionTask
) -> Iterable[Sequence[dict]]:
"""Yield LeRobot episodes for one raw input path."""
+221
View File
@@ -0,0 +1,221 @@
import os
import shutil
import sys
from collections.abc import Sequence
from pathlib import Path
from datatrove.pipeline.base import PipelineStep
from lerobot.datasets import LeRobotDataset
from lerobot.datasets.aggregate import aggregate_datasets
from .adapter import BaseAdapter
from .utils import (
ConversionTask,
setup_logger,
unique_strings,
)
class SaveLeRobotDataset(PipelineStep):
name = "Save Temp LeRobotDataset"
def __init__(self, tasks: list[ConversionTask], adapter: BaseAdapter):
super().__init__()
self.tasks = tasks
self.adapter = adapter
self.type = f"{adapter.dataset_type}2lerobot"
def run(self, data=None, rank: int = 0, world_size: int = 1):
logger = setup_logger()
task = self.tasks[rank]
if task.output_path.exists():
shutil.rmtree(task.output_path)
dataset = LeRobotDataset.create(
repo_id=task.local_repo_id,
root=task.output_path,
fps=self.adapter.fps,
robot_type=self.adapter.robot_type,
features=self.adapter.features,
)
logger.info(
f"start processing for {task.input_path}, saving to {task.output_path}"
)
raw_dataset = self.adapter.load_subset(task)
for episode_index, episode_data in enumerate(raw_dataset):
with self.track_time("saving episode"):
for frame in episode_data:
dataset.add_frame(frame)
dataset.save_episode()
logger.info(
f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}"
)
dataset.finalize()
def run_converter(
adapter: BaseAdapter,
executor: str,
cpus_per_task: int,
tasks_per_job: int,
workers: int,
resume_dir: str | None = None,
debug: bool = False,
local_repo_id: str | None = None,
hub_repo_id: str | None = None,
push_to_hub: bool = False,
cleanup_temp: bool = True,
extra_tags: Sequence[str] | None = None,
) -> Path:
tasks = adapter.load_tasks()
output_path = adapter.output_path
if not tasks:
raise ValueError(
"No conversion tasks found. Provide a non-empty tasks file or matching source files."
)
if cpus_per_task < 1:
raise ValueError("--cpus-per-task must be >= 1")
output_path.mkdir(parents=True, exist_ok=True)
if debug:
executor = "local"
workers = 1
tasks = tasks[:2]
push_to_hub = False
match executor:
case "local":
from datatrove.executor import LocalPipelineExecutor
resolved_workers = (
max(1, (os.cpu_count() or 1) // cpus_per_task)
if workers == -1
else workers
)
executor_cls, executor_config = LocalPipelineExecutor, {
"tasks": len(tasks),
"workers": resolved_workers,
}
case "ray":
import ray
from datatrove.executor import RayPipelineExecutor
from ray.runtime_env import RuntimeEnv
runtime_env = RuntimeEnv(env_vars=_build_ray_env_vars())
ray.init(runtime_env=runtime_env)
executor_cls, executor_config = RayPipelineExecutor, {
"tasks": len(tasks),
"workers": workers,
"cpus_per_task": cpus_per_task,
"tasks_per_job": tasks_per_job,
}
case _:
raise ValueError(f"Executor {executor} not supported")
executor_cls(
pipeline=[SaveLeRobotDataset(tasks, adapter)],
**executor_config,
logging_dir=str(resume_dir) if resume_dir else None,
).run()
aggregate_tasks(tasks, output_path, aggr_repo_id=local_repo_id)
if cleanup_temp:
logger = setup_logger()
logger.info("Delete temp data_dir")
for temp_dir in [task.output_path for task in tasks]:
shutil.rmtree(temp_dir, ignore_errors=True)
if push_to_hub:
if hub_repo_id is None:
raise ValueError("--repo-id is required when --push-to-hub is set")
tags = unique_strings(
[
"LeRobot",
adapter.dataset_type,
adapter.robot_type,
*adapter.tags,
*(extra_tags or []),
]
)
LeRobotDataset(
repo_id=hub_repo_id,
root=output_path,
).push_to_hub(
tags=tags,
private=False,
push_videos=True,
license="apache-2.0",
upload_large_folder=False,
)
return output_path
def _build_ray_env_vars() -> dict[str, str]:
env_vars = {
"HDF5_USE_FILE_LOCKING": "FALSE",
"HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE",
"SVT_LOG": "1",
}
pythonpath = _build_ray_pythonpath()
if pythonpath:
env_vars["PYTHONPATH"] = pythonpath
return env_vars
def _build_ray_pythonpath() -> str:
repo_root = Path(__file__).resolve().parents[1]
paths: list[str] = []
def add_path(path_value: str | Path):
path = Path(path_value).expanduser()
try:
path = path.resolve()
except OSError:
return
if not path.exists():
return
path_str = str(path)
if path_str not in paths:
paths.append(path_str)
add_path(repo_root)
add_path(Path.cwd())
for path in sys.path:
if path:
add_path(path)
for path in os.environ.get("PYTHONPATH", "").split(os.pathsep):
if path:
add_path(path)
return os.pathsep.join(paths)
def aggregate_tasks(
tasks: list[ConversionTask],
output_dir: Path,
aggr_repo_id: str | None = None,
):
logger = setup_logger()
if output_dir.exists():
shutil.rmtree(output_dir)
roots = [task.output_path for task in tasks]
resolved_aggr_repo_id = aggr_repo_id or output_dir.name
logger.info(
f"aggregate {len(tasks)} temporary datasets into {output_dir} as {resolved_aggr_repo_id}"
)
aggregate_datasets(
repo_ids=[None] * len(tasks),
roots=roots,
aggr_repo_id=resolved_aggr_repo_id,
aggr_root=output_dir,
)
logger.info(f"aggregation complete: {output_dir}")
+39
View File
@@ -0,0 +1,39 @@
import shutil
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
TaskMetadata = Mapping[str, Any]
FeatureSpec = Mapping[str, dict]
@dataclass(frozen=True)
class ConversionTask:
"""One independently convertible raw input file and adapter metadata."""
input_path: Path
output_path: Path
local_repo_id: str | None = None
metadata: TaskMetadata = field(default_factory=dict)
def setup_logger():
import sys
from datatrove.utils.logging import logger
logger.remove()
logger.add(sys.stdout, level="INFO", colorize=True)
return logger
def unique_strings(values: Sequence[str]) -> list[str]:
result = []
seen = set()
for value in values:
if value in seen:
continue
result.append(value)
seen.add(value)
return result