⬆️ upgrade lerobot dataset to v3.0 (#60)

* update openx

* update libero

* update agibot

* update robomind

* update readme

* update return type
This commit is contained in:
Qizhi Chen
2025-09-28 17:04:32 +08:00
committed by GitHub
parent 0b563da78a
commit 245465975f
8 changed files with 193 additions and 513 deletions
+67 -172
View File
@@ -4,20 +4,27 @@ import re
import shutil
from pathlib import Path
import numpy as np
import pandas as pd
import ray
from datatrove.executor import LocalPipelineExecutor, RayPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from lerobot.datasets.aggregate import (
aggregate_data,
aggregate_metadata,
aggregate_stats,
aggregate_videos,
validate_all_metadata,
)
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import (
write_episode,
write_episode_stats,
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_FILE_SIZE_IN_MB,
write_info,
write_task,
write_stats,
write_tasks,
)
from libero_utils.config import LIBERO_FEATURES
from libero_utils.lerobot_utils import validate_all_metadata
from libero_utils.libero_utils import load_local_episodes
from ray.runtime_env import RuntimeEnv
from tqdm import tqdm
@@ -63,178 +70,75 @@ class SaveLerobotDataset(PipelineStep):
for episode_index, episode_data in enumerate(raw_dataset):
with self.track_time("saving episode"):
for frame_data in episode_data:
dataset.add_frame(
frame_data,
task=task_instruction,
)
frame_data["task"] = task_instruction
dataset.add_frame(frame_data)
dataset.save_episode()
logger.info(f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}")
class AggregateDatasets(PipelineStep):
name = "Aggregate Datasets"
type = "libero2lerobot"
def create_aggr_dataset(raw_dirs: list[Path], aggregated_dir: Path):
logger = setup_logger()
def __init__(self, raw_dirs: list[Path], aggregated_dir: Path):
super().__init__()
self.raw_dirs = raw_dirs
self.aggregated_dir = aggregated_dir
all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in raw_dirs]
self.create_aggr_dataset()
fps, robot_type, features = validate_all_metadata(all_metadata)
def create_aggr_dataset(self):
logger = setup_logger()
if aggregated_dir.exists():
shutil.rmtree(aggregated_dir)
all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in self.raw_dirs]
aggr_meta = LeRobotDatasetMetadata.create(
repo_id=f"{aggregated_dir.parent.name}/{aggregated_dir.name}",
root=aggregated_dir,
fps=fps,
robot_type=robot_type,
features=features,
)
fps, robot_type, features = validate_all_metadata(all_metadata)
video_keys = [key for key in features if features[key]["dtype"] == "video"]
unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique()
aggr_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks)
if self.aggregated_dir.exists():
shutil.rmtree(self.aggregated_dir)
meta_idx = {"chunk": 0, "file": 0}
data_idx = {"chunk": 0, "file": 0}
videos_idx = {key: {"chunk": 0, "file": 0, "latest_duration": 0, "episode_duration": 0} for key in video_keys}
aggr_meta = LeRobotDatasetMetadata.create(
repo_id=f"{self.aggregated_dir.parent.name}/{self.aggregated_dir.name}",
root=self.aggregated_dir,
fps=fps,
robot_type=robot_type,
features=features,
aggr_meta.episodes = {}
for src_meta in tqdm(all_metadata, desc="Copy data and videos"):
videos_idx = aggregate_videos(
src_meta, aggr_meta, videos_idx, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE
)
data_idx = aggregate_data(src_meta, aggr_meta, data_idx, DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE)
datasets_task_index_to_aggr_task_index = {}
aggr_task_index = 0
for dataset_index, meta in enumerate(tqdm(all_metadata, desc="Aggregate tasks index")):
task_index_to_aggr_task_index = {}
meta_idx = aggregate_metadata(src_meta, aggr_meta, meta_idx, data_idx, videos_idx)
for task_index, task in meta.tasks.items():
if task not in aggr_meta.task_to_task_index:
# add the task to aggr tasks mappings
aggr_meta.tasks[aggr_task_index] = task
aggr_meta.task_to_task_index[task] = aggr_task_index
aggr_task_index += 1
aggr_meta.info["total_episodes"] += src_meta.total_episodes
aggr_meta.info["total_frames"] += src_meta.total_frames
task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task]
logger.info("write tasks")
write_tasks(aggr_meta.tasks, aggr_meta.root)
datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index
logger.info("write info")
aggr_meta.info.update(
{
"total_tasks": len(aggr_meta.tasks),
"total_episodes": sum(m.total_episodes for m in all_metadata),
"total_frames": sum(m.total_frames for m in all_metadata),
"splits": {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"},
}
)
write_info(aggr_meta.info, aggr_meta.root)
datasets_aggr_episode_index_shift = {}
datasets_aggr_index_shift = {}
for dataset_index, meta in enumerate(tqdm(all_metadata, desc="Aggregate episodes and global index")):
datasets_aggr_episode_index_shift[dataset_index] = aggr_meta.total_episodes
datasets_aggr_index_shift[dataset_index] = aggr_meta.total_frames
# populate episodes
for episode_index, episode_dict in meta.episodes.items():
aggr_episode_index = episode_index + aggr_meta.total_episodes
episode_dict["episode_index"] = aggr_episode_index
aggr_meta.episodes[aggr_episode_index] = episode_dict
# populate episodes_stats
for episode_index, episode_stats in meta.episodes_stats.items():
aggr_episode_index = episode_index + aggr_meta.total_episodes
episode_stats["index"].update(
{
"min": episode_stats["index"]["min"] + aggr_meta.total_frames,
"max": episode_stats["index"]["max"] + aggr_meta.total_frames,
"mean": episode_stats["index"]["mean"] + aggr_meta.total_frames,
}
)
episode_stats["episode_index"].update(
{
"min": np.array([aggr_episode_index]),
"max": np.array([aggr_episode_index]),
"mean": np.array([aggr_episode_index]),
}
)
df = pd.read_parquet(meta.root / meta.get_data_file_path(episode_index))
df["task_index"] = df["task_index"].map(datasets_task_index_to_aggr_task_index[dataset_index])
episode_stats["task_index"].update(
{
"min": np.array([df["task_index"].min()]),
"max": np.array([df["task_index"].max()]),
"mean": np.array([df["task_index"].mean()]),
"std": np.array([df["task_index"].std()]),
}
)
aggr_meta.episodes_stats[aggr_episode_index] = episode_stats
# populate info
aggr_meta.info["total_episodes"] += meta.total_episodes
aggr_meta.info["total_frames"] += meta.total_frames
aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes
logger.info("Write meta data")
aggr_meta.info["total_tasks"] = len(aggr_meta.tasks)
aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_meta.total_episodes - 1) + 1
aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"}
# create a new episodes jsonl with updated episode_index using write_episode
for episode_dict in tqdm(aggr_meta.episodes.values(), desc="Write episodes info"):
write_episode(episode_dict, aggr_meta.root)
# create a new episode_stats jsonl with updated episode_index using write_episode_stats
for episode_index, episode_stats in tqdm(aggr_meta.episodes_stats.items(), desc="Write episodes stats info"):
write_episode_stats(episode_index, episode_stats, aggr_meta.root)
# create a new task jsonl with updated episode_index using write_task
for task_index, task in tqdm(aggr_meta.tasks.items(), desc="Write tasks info"):
write_task(task_index, task, aggr_meta.root)
write_info(aggr_meta.info, aggr_meta.root)
self.datasets_task_index_to_aggr_task_index = datasets_task_index_to_aggr_task_index
self.datasets_aggr_episode_index_shift = datasets_aggr_episode_index_shift
self.datasets_aggr_index_shift = datasets_aggr_index_shift
logger.info("Meta data done writing")
def run(self, data=None, rank: int = 0, world_size: int = 1):
logger = setup_logger()
dataset_index = rank
aggr_meta = LeRobotDatasetMetadata("", root=self.aggregated_dir)
meta = LeRobotDatasetMetadata("", root=self.raw_dirs[dataset_index])
aggr_episode_index_shift = self.datasets_aggr_episode_index_shift[dataset_index]
aggr_index_shift = self.datasets_aggr_index_shift[dataset_index]
task_index_to_aggr_task_index = self.datasets_task_index_to_aggr_task_index[dataset_index]
with self.track_time("aggregating dataset"):
logger.info("Copy data")
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
data_path = meta.root / meta.get_data_file_path(episode_index)
aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index)
aggr_data_path.parent.mkdir(parents=True, exist_ok=True)
# update index, episode_index and task_index
df = pd.read_parquet(data_path)
df["index"] += aggr_index_shift
df["episode_index"] += aggr_episode_index_shift
df["task_index"] = df["task_index"].map(task_index_to_aggr_task_index)
df.to_parquet(aggr_data_path)
logger.info("Copy videos")
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
for vid_key in meta.video_keys:
video_path = meta.root / meta.get_video_file_path(episode_index, vid_key)
aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key)
aggr_video_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(video_path, aggr_video_path)
logger.info("write stats")
aggr_meta.stats = aggregate_stats([m.stats for m in all_metadata])
write_stats(aggr_meta.stats, aggr_meta.root)
class DeleteTempData(PipelineStep):
name = "Delete Temp Data"
type = "libero2lerobot"
def __init__(self, temp_dirs: list[Path]):
super().__init__()
self.temp_dirs = temp_dirs
def run(self, data=None, rank: int = 0, world_size: int = 1):
logger = setup_logger()
logger.info(f"Delete temp data {self.temp_dirs[rank]}")
shutil.rmtree(self.temp_dirs[rank])
def delete_temp_data(temp_dirs: list[Path]):
logger = setup_logger()
logger.info("Delete temp data_dir")
for temp_dir in temp_dirs:
shutil.rmtree(temp_dir)
def main(
@@ -244,8 +148,7 @@ def main(
cpus_per_task: int,
tasks_per_job: int,
workers: int,
resume_from_save: Path = None,
resume_from_aggregate: Path = None,
resume_dir: Path = None,
debug: bool = False,
repo_id: str = None,
push_to_hub: bool = False,
@@ -302,16 +205,9 @@ def main(
**({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job} if executor is RayPipelineExecutor else {}),
}
executor(pipeline=[SaveLerobotDataset(tasks)], **executor_config, logging_dir=resume_from_save).run()
executor(
pipeline=[DeleteTempData([task[1] for task in tasks])],
**executor_config,
depends=executor(
pipeline=[AggregateDatasets([task[1] for task in tasks], aggregate_output_path)],
**executor_config,
logging_dir=resume_from_aggregate,
),
).run()
executor(pipeline=[SaveLerobotDataset(tasks)], **executor_config, logging_dir=resume_dir).run()
create_aggr_dataset([task[1] for task in tasks], aggregate_output_path)
delete_temp_data([task[1] for task in tasks])
for task in tasks:
shutil.rmtree(task[1].parent, ignore_errors=True)
@@ -340,8 +236,7 @@ if __name__ == "__main__":
parser.add_argument("--cpus-per-task", type=int, default=1)
parser.add_argument("--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray")
parser.add_argument("--workers", type=int, default=-1, help="number of concurrent jobs to run")
parser.add_argument("--resume-from-save", type=Path, help="logs directory to resume from save step")
parser.add_argument("--resume-from-aggregate", type=Path, help="logs directory to resume from aggregate step")
parser.add_argument("--resume-dir", type=Path, help="logs directory to resume")
parser.add_argument("--debug", action="store_true")
parser.add_argument("--repo-id", type=str, help="required when push-to-hub is True")
parser.add_argument("--push-to-hub", action="store_true", help="upload to hub")
@@ -1,23 +0,0 @@
import tqdm
from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata
def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]):
"""
implemented by @Cadene
"""
# validate same fps, robot_type, features
fps = all_metadata[0].fps
robot_type = all_metadata[0].robot_type
features = all_metadata[0].features
for meta in tqdm.tqdm(all_metadata, desc="Validate all meta data"):
if fps != meta.fps:
raise ValueError(f"Same fps is expected, but got fps={meta.fps} instead of {fps}.")
if robot_type != meta.robot_type:
raise ValueError(f"Same robot_type is expected, but got robot_type={meta.robot_type} instead of {robot_type}.")
if features != meta.features:
raise ValueError(f"Same features is expected, but got features={meta.features} instead of {features}.")
return fps, robot_type, features