refactor libero2lerobot converter

Co-authored-by: codex <codex@openai.com>
This commit is contained in:
Tavish
2026-06-13 10:12:37 +08:00
parent 8bb1aae91d
commit 219e6ed2a2
4 changed files with 128 additions and 277 deletions
+128 -204
View File
@@ -1,142 +1,127 @@
import argparse
import os
import re
import shutil
import sys
from collections.abc import Iterable, Sequence
from pathlib import Path
import pandas as pd
import ray
from datatrove.executor import LocalPipelineExecutor, RayPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.aggregate import (
aggregate_data,
aggregate_metadata,
aggregate_stats,
aggregate_videos,
validate_all_metadata,
)
from lerobot.datasets.io_utils import write_info, write_stats, write_tasks
from lerobot.datasets.utils import (
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_FILE_SIZE_IN_MB,
)
from libero_utils.config import LIBERO_FEATURES
from libero_utils.libero_utils import load_local_episodes
from ray.runtime_env import RuntimeEnv
from tqdm import tqdm
import numpy as np
from h5py import File
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from generic_converter import BaseAdapter, ConversionTask, run_converter # noqa: E402
def setup_logger():
import sys
class LiberoAdapter(BaseAdapter):
dataset_type = "libero"
fps = 20
robot_type = "franka"
features = {
"observation.images.image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.images.wrist_image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.state": {
"dtype": "float32",
"shape": (8,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper", "gripper"]},
},
"observation.states.ee_state": {
"dtype": "float32",
"shape": (6,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3"]},
},
"observation.states.joint_state": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]},
},
"observation.states.gripper_state": {
"dtype": "float32",
"shape": (2,),
"names": {"motors": ["gripper", "gripper"]},
},
"action": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper"]},
},
}
tags = ["libero", "franka"]
from datatrove.utils.logging import logger
def __init__(self, src_paths: list[Path], output_path: Path):
super().__init__(output_path)
self.src_paths = src_paths
logger.remove()
logger.add(sys.stdout, level="INFO", colorize=True)
return logger
def load_tasks(self) -> list[ConversionTask]:
tasks = []
for src_path in self.src_paths:
for input_h5 in src_path.glob("*.hdf5"):
pattern1 = re.compile(r"_SCENE\d+_(.*?)_demo\.hdf5")
pattern2 = re.compile(r"(.*?)_demo\.hdf5")
match = pattern1.search(input_h5.name)
if match is None:
match = pattern2.search(input_h5.name)
if match is None:
continue
else:
task_instruction = match.group(1).replace("_", " ")
class SaveLerobotDataset(PipelineStep):
name = "Save Temp LerobotDataset"
type = "libero2lerobot"
tasks.append(
ConversionTask(
input_path=input_h5.resolve(),
output_path=(
self.temp_output_path
/ f"{src_path.name}"
/ input_h5.stem
).resolve(),
local_repo_id=f"{input_h5.parent.name}/{input_h5.name}",
metadata={"task": task_instruction},
)
)
return tasks
def __init__(self, tasks: list[tuple[Path, Path, str]]):
super().__init__()
self.tasks = tasks
def run(self, data=None, rank: int = 0, world_size: int = 1):
logger = setup_logger()
input_h5, output_path, task_instruction = self.tasks[rank]
if output_path.exists():
shutil.rmtree(output_path)
dataset = LeRobotDataset.create(
repo_id=f"{input_h5.parent.name}/{input_h5.name}",
root=output_path,
fps=20,
robot_type="franka",
features=LIBERO_FEATURES,
)
logger.info(f"start processing for {input_h5}, saving to {output_path}")
raw_dataset = load_local_episodes(input_h5)
for episode_index, episode_data in enumerate(raw_dataset):
with self.track_time("saving episode"):
for frame_data in episode_data:
frame_data["task"] = task_instruction
dataset.add_frame(frame_data)
dataset.save_episode()
logger.info(f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}")
def create_aggr_dataset(raw_dirs: list[Path], aggregated_dir: Path):
logger = setup_logger()
all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in raw_dirs]
fps, robot_type, features = validate_all_metadata(all_metadata)
if aggregated_dir.exists():
shutil.rmtree(aggregated_dir)
aggr_meta = LeRobotDatasetMetadata.create(
repo_id=f"{aggregated_dir.parent.name}/{aggregated_dir.name}",
root=aggregated_dir,
fps=fps,
robot_type=robot_type,
features=features,
)
video_keys = [key for key in features if features[key]["dtype"] == "video"]
unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique()
aggr_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks)
meta_idx = {"chunk": 0, "file": 0}
data_idx = {"chunk": 0, "file": 0}
videos_idx = {key: {"chunk": 0, "file": 0, "latest_duration": 0, "episode_duration": 0} for key in video_keys}
aggr_meta.episodes = {}
for src_meta in tqdm(all_metadata, desc="Copy data and videos"):
videos_idx = aggregate_videos(
src_meta, aggr_meta, videos_idx, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE
)
data_idx = aggregate_data(src_meta, aggr_meta, data_idx, DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE)
meta_idx = aggregate_metadata(src_meta, aggr_meta, meta_idx, data_idx, videos_idx)
aggr_meta.info["total_episodes"] += src_meta.total_episodes
aggr_meta.info["total_frames"] += src_meta.total_frames
logger.info("write tasks")
write_tasks(aggr_meta.tasks, aggr_meta.root)
logger.info("write info")
aggr_meta.info.update(
{
"total_tasks": len(aggr_meta.tasks),
"total_episodes": sum(m.total_episodes for m in all_metadata),
"total_frames": sum(m.total_frames for m in all_metadata),
"splits": {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"},
}
)
write_info(aggr_meta.info, aggr_meta.root)
logger.info("write stats")
aggr_meta.stats = aggregate_stats([m.stats for m in all_metadata])
write_stats(aggr_meta.stats, aggr_meta.root)
def delete_temp_data(temp_dirs: list[Path]):
logger = setup_logger()
logger.info("Delete temp data_dir")
for temp_dir in temp_dirs:
shutil.rmtree(temp_dir)
def load_subset(self, task: ConversionTask) -> Iterable[Sequence[dict]]:
input_h5 = task.input_path
task_instruction = task.metadata.get("task")
with File(input_h5, "r") as f:
for demo in f["data"].values():
demo_len = len(demo["obs/agentview_rgb"])
# (-1: open, 1: close) -> (0: close, 1: open)
action = np.array(demo["actions"])
action = np.concatenate(
[
action[:, :6],
(1 - np.clip(action[:, -1], 0, 1))[:, None],
],
axis=1,
)
state = np.concatenate(
[
np.array(demo["obs/ee_states"]),
np.array(demo["obs/gripper_states"]),
],
axis=1,
)
episode = {
"observation.images.image": np.array(demo["obs/agentview_rgb"]),
"observation.images.wrist_image": np.array(demo["obs/eye_in_hand_rgb"]),
"observation.state": np.array(state, dtype=np.float32),
"observation.states.ee_state": np.array(demo["obs/ee_states"], dtype=np.float32),
"observation.states.joint_state": np.array(demo["obs/joint_states"], dtype=np.float32),
"observation.states.gripper_state": np.array(demo["obs/gripper_states"], dtype=np.float32),
"action": np.array(action, dtype=np.float32),
}
yield [{**{k: v[i] for k, v in episode.items()}, "task": task_instruction} for i in range(demo_len)]
def main(
@@ -146,86 +131,25 @@ def main(
cpus_per_task: int,
tasks_per_job: int,
workers: int,
resume_dir: Path = None,
resume_dir: Path | None = None,
debug: bool = False,
repo_id: str = None,
repo_id: str | None = None,
push_to_hub: bool = False,
):
tasks = []
pattern1 = re.compile(r"_SCENE\d+_(.*?)_demo\.hdf5")
pattern2 = re.compile(r"(.*?)_demo\.hdf5")
for src_path in src_paths:
for input_h5 in src_path.glob("*.hdf5"):
match = pattern1.search(input_h5.name)
if match is None:
match = pattern2.search(input_h5.name)
if match is None:
continue
tasks.append(
(
input_h5,
(output_path / (src_path.name + "_temp") / input_h5.stem).resolve(),
match.group(1).replace("_", " "),
)
)
if len(src_paths) > 1:
aggregate_output_path = output_path / (
"_".join([src_path.name for src_path in src_paths]) + "_aggregated_lerobot"
)
else:
aggregate_output_path = output_path / f"{src_paths[0].name}_lerobot"
aggregate_output_path = aggregate_output_path.resolve()
adapter = LiberoAdapter(src_paths, output_path)
if debug:
executor = "local"
workers = 1
tasks = tasks[:2]
push_to_hub = False
match executor:
case "local":
workers = os.cpu_count() // cpus_per_task if workers == -1 else workers
executor = LocalPipelineExecutor
case "ray":
runtime_env = RuntimeEnv(
env_vars={
"HDF5_USE_FILE_LOCKING": "FALSE",
"HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE",
"SVT_LOG": "1",
},
)
ray.init(runtime_env=runtime_env)
executor = RayPipelineExecutor
case _:
raise ValueError(f"Executor {executor} not supported")
executor_config = {
"tasks": len(tasks),
"workers": workers,
**({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job} if executor is RayPipelineExecutor else {}),
}
executor(pipeline=[SaveLerobotDataset(tasks)], **executor_config, logging_dir=resume_dir).run()
create_aggr_dataset([task[1] for task in tasks], aggregate_output_path)
delete_temp_data([task[1] for task in tasks])
for task in tasks:
shutil.rmtree(task[1].parent, ignore_errors=True)
if push_to_hub:
assert repo_id is not None
tags = ["LeRobot", "libero", "franka"]
tags.extend([src_path.name for src_path in src_paths])
LeRobotDataset(
repo_id=repo_id,
root=aggregate_output_path,
).push_to_hub(
tags=tags,
private=False,
push_videos=True,
license="apache-2.0",
upload_large_folder=False,
)
run_converter(
adapter=adapter,
executor=executor,
cpus_per_task=cpus_per_task,
tasks_per_job=tasks_per_job,
workers=workers,
resume_dir=resume_dir,
debug=debug,
local_repo_id=repo_id,
hub_repo_id=repo_id,
push_to_hub=push_to_hub,
)
if __name__ == "__main__":
-37
View File
@@ -1,37 +0,0 @@
LIBERO_FEATURES = {
"observation.images.image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.images.wrist_image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.state": {
"dtype": "float32",
"shape": (8,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper", "gripper"]},
},
"observation.states.ee_state": {
"dtype": "float32",
"shape": (6,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3"]},
},
"observation.states.joint_state": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]},
},
"observation.states.gripper_state": {
"dtype": "float32",
"shape": (2,),
"names": {"motors": ["gripper", "gripper"]},
},
"action": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper"]},
},
}
@@ -1,36 +0,0 @@
from pathlib import Path
import numpy as np
from h5py import File
def load_local_episodes(input_h5: Path):
with File(input_h5, "r") as f:
for demo in f["data"].values():
demo_len = len(demo["obs/agentview_rgb"])
# (-1: open, 1: close) -> (0: close, 1: open)
action = np.array(demo["actions"])
action = np.concatenate(
[
action[:, :6],
(1 - np.clip(action[:, -1], 0, 1))[:, None],
],
axis=1,
)
state = np.concatenate(
[
np.array(demo["obs/ee_states"]),
np.array(demo["obs/gripper_states"]),
],
axis=1,
)
episode = {
"observation.images.image": np.array(demo["obs/agentview_rgb"]),
"observation.images.wrist_image": np.array(demo["obs/eye_in_hand_rgb"]),
"observation.state": np.array(state, dtype=np.float32),
"observation.states.ee_state": np.array(demo["obs/ee_states"], dtype=np.float32),
"observation.states.joint_state": np.array(demo["obs/joint_states"], dtype=np.float32),
"observation.states.gripper_state": np.array(demo["obs/gripper_states"], dtype=np.float32),
"action": np.array(action, dtype=np.float32),
}
yield [{**{k: v[i] for k, v in episode.items()}} for i in range(demo_len)]