diff --git a/README.md b/README.md
index 8e0a582..15e3fc6 100644
--- a/README.md
+++ b/README.md
@@ -20,14 +20,15 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
## π£ What's New
+- **\[2026.06.12\]** We have provided an efficient and concise Generic Converter for building new dataset-to-LeRobot converters with minimal adapter code! π₯π₯π₯
- **\[2026.03.20\]** We have supported Data Conversion from RoboCasa to LeRobot! π₯π₯π₯
- **\[2025.10.04\]** We have collected and updated all Dataset Version Conversion Scripts for LeRobot! π₯π₯π₯
- **\[2025.09.28\]** We have upgraded LeRobotDataset from v2.1 to v3.0! π₯π₯π₯
- **\[2025.06.27\]** We have supported Data Conversion from LIBERO to LeRobot! π₯π₯π₯
-- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! π₯π₯π₯
More News
+- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! π₯π₯π₯
- **\[2025.05.12\]** We have supported Data Conversion from RoboMIND to LeRobot! π₯π₯π₯
- **\[2025.04.15\]** We add Dataset Merging Tool for merging multi-source lerobot datasets! π₯π₯π₯
- **\[2025.04.14\]** We have supported Data Conversion from AgiBotWorld to LeRobot! π₯π₯π₯
@@ -47,6 +48,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
- β**βData Conversionβ**β:
+ - [x] [Generic Converter](./generic_converter/README.md)
- [x] [Open X-Embodiment to LeRobot](./openx2lerobot/README.md)
- [x] [AgiBot-World to LeRobot](./agibot2lerobot/README.md)
- [x] [RoboMIND to LeRobot](./robomind2lerobot/README.md)
diff --git a/generic_converter/README.md b/generic_converter/README.md
new file mode 100644
index 0000000..6614df2
--- /dev/null
+++ b/generic_converter/README.md
@@ -0,0 +1,102 @@
+# Generic Converter
+
+Shared conversion flow for turning task-based source datasets into LeRobot
+datasets.
+
+The generic package owns the execution mechanics:
+
+- create one temporary `LeRobotDataset` per `ConversionTask`
+- run tasks with a local or Ray Datatrove executor
+- aggregate temporary datasets into the adapter output directory
+- remove temporary task outputs by default
+- optionally push the aggregated dataset to the Hub
+
+Dataset-specific converters own the adapter logic:
+
+- where raw inputs come from
+- how tasks are discovered or loaded
+- how one raw input is converted into LeRobot episodes
+- how task metadata, such as language instructions, is represented
+
+## Files
+
+- `adapter.py`: `BaseAdapter`, the class dataset adapters inherit from.
+- `pipeline.py`: the reusable conversion, executor, aggregation, cleanup, and push flow.
+- `utils.py`: shared types and small helpers.
+
+## Adapter Contract
+
+A dataset converter should subclass `BaseAdapter`, pass `output_path` to the
+base constructor, and provide dataset-level metadata as class attributes.
+
+Required attributes:
+
+- `dataset_type`
+- `fps`
+- `robot_type`
+- `features`
+
+Optional attributes:
+
+- `tags`
+
+Required methods:
+
+- `load_tasks(self) -> list[ConversionTask]`
+- `load_subset(self, task: ConversionTask) -> Iterable[Sequence[dict]]`
+
+`run_converter` reads `adapter.output_path` and calls `adapter.load_tasks()`
+without arguments. Store paths, task manifests, or other adapter options on the
+adapter instance in `__init__`.
+
+Use `adapter.temp_output_path` when building task-level temporary output paths.
+
+`load_subset` receives the full `ConversionTask`, not just an input path. Use
+`task.input_path` for raw data and `task.metadata` for dataset-specific values
+such as language instructions. Each yielded episode must be a sequence of frame
+dictionaries accepted by `LeRobotDataset.add_frame`; each frame should include
+the LeRobot `task` field when language tasks are needed.
+
+## ConversionTask
+
+`ConversionTask` describes one independently convertible raw input:
+
+- `input_path`: source file or directory
+- `output_path`: temporary LeRobot dataset directory for this task
+- `local_repo_id`: repo id used while writing the temporary dataset
+- `metadata`: adapter-owned metadata
+
+Keep dataset-specific values in `metadata`; the generic pipeline does not know
+about task-file schemas or instruction formats.
+
+## Usage Sketch
+
+```python
+from generic_converter import BaseAdapter, ConversionTask, run_converter
+
+
+class MyAdapter(BaseAdapter):
+ dataset_type = "my_dataset"
+ fps = 20
+ robot_type = "my_robot"
+ features = MY_FEATURES
+ tags = ["my_dataset"]
+
+ def __init__(self, output_path):
+ super().__init__(output_path)
+
+ def load_tasks(self) -> list[ConversionTask]:
+ ...
+
+ def load_subset(self, task: ConversionTask):
+ ...
+
+
+run_converter(
+ adapter=adapter,
+ executor="local",
+ cpus_per_task=1,
+ tasks_per_job=1,
+ workers=-1,
+)
+```
diff --git a/generic_converter/__init__.py b/generic_converter/__init__.py
new file mode 100644
index 0000000..b8dede6
--- /dev/null
+++ b/generic_converter/__init__.py
@@ -0,0 +1,11 @@
+from .adapter import BaseAdapter
+from .pipeline import run_converter
+from .utils import ConversionTask, FeatureSpec, TaskMetadata
+
+__all__ = [
+ "BaseAdapter",
+ "ConversionTask",
+ "FeatureSpec",
+ "TaskMetadata",
+ "run_converter",
+]
diff --git a/generic_converter/adapter.py b/generic_converter/adapter.py
new file mode 100644
index 0000000..be58609
--- /dev/null
+++ b/generic_converter/adapter.py
@@ -0,0 +1,32 @@
+from abc import ABC, abstractmethod
+from collections.abc import Iterable, Sequence
+from pathlib import Path
+
+from .utils import ConversionTask, FeatureSpec
+
+
+class BaseAdapter(ABC):
+ """Dataset-specific hooks used by the generic conversion pipeline."""
+
+ dataset_type: str
+ fps: int
+ robot_type: str
+ features: FeatureSpec
+ tags: Sequence[str] = ()
+
+ def __init__(self, output_path: Path):
+ self.output_path = output_path.expanduser().resolve()
+
+ @property
+ def temp_output_path(self) -> Path:
+ return self.output_path.with_name(f"{self.output_path.name}_temp")
+
+ @abstractmethod
+ def load_tasks(self) -> list[ConversionTask]:
+ """Build conversion tasks from dataset-specific inputs."""
+
+ @abstractmethod
+ def load_subset(
+ self, task: ConversionTask
+ ) -> Iterable[Sequence[dict]]:
+ """Yield LeRobot episodes for one raw input path."""
diff --git a/generic_converter/pipeline.py b/generic_converter/pipeline.py
new file mode 100644
index 0000000..acad3dd
--- /dev/null
+++ b/generic_converter/pipeline.py
@@ -0,0 +1,221 @@
+import os
+import shutil
+import sys
+from collections.abc import Sequence
+from pathlib import Path
+
+from datatrove.pipeline.base import PipelineStep
+from lerobot.datasets import LeRobotDataset
+from lerobot.datasets.aggregate import aggregate_datasets
+
+from .adapter import BaseAdapter
+from .utils import (
+ ConversionTask,
+ setup_logger,
+ unique_strings,
+)
+
+
+class SaveLeRobotDataset(PipelineStep):
+ name = "Save Temp LeRobotDataset"
+
+ def __init__(self, tasks: list[ConversionTask], adapter: BaseAdapter):
+ super().__init__()
+ self.tasks = tasks
+ self.adapter = adapter
+ self.type = f"{adapter.dataset_type}2lerobot"
+
+ def run(self, data=None, rank: int = 0, world_size: int = 1):
+ logger = setup_logger()
+ task = self.tasks[rank]
+
+ if task.output_path.exists():
+ shutil.rmtree(task.output_path)
+
+ dataset = LeRobotDataset.create(
+ repo_id=task.local_repo_id,
+ root=task.output_path,
+ fps=self.adapter.fps,
+ robot_type=self.adapter.robot_type,
+ features=self.adapter.features,
+ )
+
+ logger.info(
+ f"start processing for {task.input_path}, saving to {task.output_path}"
+ )
+ raw_dataset = self.adapter.load_subset(task)
+ for episode_index, episode_data in enumerate(raw_dataset):
+ with self.track_time("saving episode"):
+ for frame in episode_data:
+ dataset.add_frame(frame)
+ dataset.save_episode()
+ logger.info(
+ f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}"
+ )
+ dataset.finalize()
+
+
+def run_converter(
+ adapter: BaseAdapter,
+ executor: str,
+ cpus_per_task: int,
+ tasks_per_job: int,
+ workers: int,
+ resume_dir: str | None = None,
+ debug: bool = False,
+ local_repo_id: str | None = None,
+ hub_repo_id: str | None = None,
+ push_to_hub: bool = False,
+ cleanup_temp: bool = True,
+ extra_tags: Sequence[str] | None = None,
+) -> Path:
+ tasks = adapter.load_tasks()
+ output_path = adapter.output_path
+
+ if not tasks:
+ raise ValueError(
+ "No conversion tasks found. Provide a non-empty tasks file or matching source files."
+ )
+ if cpus_per_task < 1:
+ raise ValueError("--cpus-per-task must be >= 1")
+
+ output_path.mkdir(parents=True, exist_ok=True)
+
+ if debug:
+ executor = "local"
+ workers = 1
+ tasks = tasks[:2]
+ push_to_hub = False
+
+ match executor:
+ case "local":
+ from datatrove.executor import LocalPipelineExecutor
+
+ resolved_workers = (
+ max(1, (os.cpu_count() or 1) // cpus_per_task)
+ if workers == -1
+ else workers
+ )
+ executor_cls, executor_config = LocalPipelineExecutor, {
+ "tasks": len(tasks),
+ "workers": resolved_workers,
+ }
+ case "ray":
+ import ray
+ from datatrove.executor import RayPipelineExecutor
+ from ray.runtime_env import RuntimeEnv
+
+ runtime_env = RuntimeEnv(env_vars=_build_ray_env_vars())
+ ray.init(runtime_env=runtime_env)
+ executor_cls, executor_config = RayPipelineExecutor, {
+ "tasks": len(tasks),
+ "workers": workers,
+ "cpus_per_task": cpus_per_task,
+ "tasks_per_job": tasks_per_job,
+ }
+ case _:
+ raise ValueError(f"Executor {executor} not supported")
+
+ executor_cls(
+ pipeline=[SaveLeRobotDataset(tasks, adapter)],
+ **executor_config,
+ logging_dir=str(resume_dir) if resume_dir else None,
+ ).run()
+ aggregate_tasks(tasks, output_path, aggr_repo_id=local_repo_id)
+
+ if cleanup_temp:
+ logger = setup_logger()
+ logger.info("Delete temp data_dir")
+ for temp_dir in [task.output_path for task in tasks]:
+ shutil.rmtree(temp_dir, ignore_errors=True)
+
+ if push_to_hub:
+ if hub_repo_id is None:
+ raise ValueError("--repo-id is required when --push-to-hub is set")
+
+ tags = unique_strings(
+ [
+ "LeRobot",
+ adapter.dataset_type,
+ adapter.robot_type,
+ *adapter.tags,
+ *(extra_tags or []),
+ ]
+ )
+ LeRobotDataset(
+ repo_id=hub_repo_id,
+ root=output_path,
+ ).push_to_hub(
+ tags=tags,
+ private=False,
+ push_videos=True,
+ license="apache-2.0",
+ upload_large_folder=False,
+ )
+
+ return output_path
+
+
+def _build_ray_env_vars() -> dict[str, str]:
+ env_vars = {
+ "HDF5_USE_FILE_LOCKING": "FALSE",
+ "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE",
+ "SVT_LOG": "1",
+ }
+ pythonpath = _build_ray_pythonpath()
+ if pythonpath:
+ env_vars["PYTHONPATH"] = pythonpath
+ return env_vars
+
+
+def _build_ray_pythonpath() -> str:
+ repo_root = Path(__file__).resolve().parents[1]
+ paths: list[str] = []
+
+ def add_path(path_value: str | Path):
+ path = Path(path_value).expanduser()
+ try:
+ path = path.resolve()
+ except OSError:
+ return
+ if not path.exists():
+ return
+ path_str = str(path)
+ if path_str not in paths:
+ paths.append(path_str)
+
+ add_path(repo_root)
+ add_path(Path.cwd())
+ for path in sys.path:
+ if path:
+ add_path(path)
+ for path in os.environ.get("PYTHONPATH", "").split(os.pathsep):
+ if path:
+ add_path(path)
+
+ return os.pathsep.join(paths)
+
+
+def aggregate_tasks(
+ tasks: list[ConversionTask],
+ output_dir: Path,
+ aggr_repo_id: str | None = None,
+):
+ logger = setup_logger()
+
+ if output_dir.exists():
+ shutil.rmtree(output_dir)
+
+ roots = [task.output_path for task in tasks]
+ resolved_aggr_repo_id = aggr_repo_id or output_dir.name
+
+ logger.info(
+ f"aggregate {len(tasks)} temporary datasets into {output_dir} as {resolved_aggr_repo_id}"
+ )
+ aggregate_datasets(
+ repo_ids=[None] * len(tasks),
+ roots=roots,
+ aggr_repo_id=resolved_aggr_repo_id,
+ aggr_root=output_dir,
+ )
+ logger.info(f"aggregation complete: {output_dir}")
diff --git a/generic_converter/utils.py b/generic_converter/utils.py
new file mode 100644
index 0000000..a928e1b
--- /dev/null
+++ b/generic_converter/utils.py
@@ -0,0 +1,39 @@
+import shutil
+from collections.abc import Mapping, Sequence
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Any
+
+TaskMetadata = Mapping[str, Any]
+FeatureSpec = Mapping[str, dict]
+
+
+@dataclass(frozen=True)
+class ConversionTask:
+ """One independently convertible raw input file and adapter metadata."""
+
+ input_path: Path
+ output_path: Path
+ local_repo_id: str | None = None
+ metadata: TaskMetadata = field(default_factory=dict)
+
+
+def setup_logger():
+ import sys
+
+ from datatrove.utils.logging import logger
+
+ logger.remove()
+ logger.add(sys.stdout, level="INFO", colorize=True)
+ return logger
+
+
+def unique_strings(values: Sequence[str]) -> list[str]:
+ result = []
+ seen = set()
+ for value in values:
+ if value in seen:
+ continue
+ result.append(value)
+ seen.add(value)
+ return result