mirror of
https://github.com/Tavish9/any4lerobot.git
synced 2026-06-15 02:39:44 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5068921975 | |||
| 533cfd487e |
@@ -20,16 +20,14 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
|
||||
|
||||
## 📣 What's New <a><img width="35" height="20" src="https://user-images.githubusercontent.com/12782558/212848161-5e783dd6-11e8-4fe0-bbba-39ffb77730be.png"></a>
|
||||
|
||||
- **\[2026.06.12\]** We have provided an efficient and concise Generic Converter for building new dataset-to-LeRobot converters with minimal adapter code! 🔥🔥🔥
|
||||
- **\[2026.03.20\]** We have supported Data Conversion from RoboCasa to LeRobot! 🔥🔥🔥
|
||||
- **\[2025.10.04\]** We have collected and updated all Dataset Version Conversion Scripts for LeRobot! 🔥🔥🔥
|
||||
- **\[2025.09.28\]** We have upgraded LeRobotDataset from v2.1 to v3.0! 🔥🔥🔥
|
||||
- **\[2025.06.27\]** We have supported Data Conversion from LIBERO to LeRobot! 🔥🔥🔥
|
||||
- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! 🔥🔥🔥
|
||||
- **\[2025.05.12\]** We have supported Data Conversion from RoboMIND to LeRobot! 🔥🔥🔥
|
||||
<details>
|
||||
<summary>More News</summary>
|
||||
|
||||
- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! 🔥🔥🔥
|
||||
- **\[2025.05.12\]** We have supported Data Conversion from RoboMIND to LeRobot! 🔥🔥🔥
|
||||
- **\[2025.04.15\]** We add Dataset Merging Tool for merging multi-source lerobot datasets! 🔥🔥🔥
|
||||
- **\[2025.04.14\]** We have supported Data Conversion from AgiBotWorld to LeRobot! 🔥🔥🔥
|
||||
- **\[2025.04.11\]** We change the repo from `openx2lerobot` to `any4lerobot`, making a universal toolbox for LeRobot! 🔥🔥🔥
|
||||
@@ -48,13 +46,11 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
|
||||
|
||||
- **Data Conversion**:
|
||||
|
||||
- [x] [Generic Converter](./generic_converter/README.md)
|
||||
- [x] [Open X-Embodiment to LeRobot](./openx2lerobot/README.md)
|
||||
- [x] [AgiBot-World to LeRobot](./agibot2lerobot/README.md)
|
||||
- [x] [RoboMIND to LeRobot](./robomind2lerobot/README.md)
|
||||
- [x] [LeRobot to RLDS](./lerobot2rlds/README.md)
|
||||
- [x] [LIBERO to LeRobot](./libero2lerobot/README.md)
|
||||
- [x] [RoboCasa to LeRobot](./robocasa2lerobot/README.md)
|
||||
|
||||
- [**Version Conversion**](./ds_version_convert/README.md):
|
||||
|
||||
@@ -79,14 +75,14 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
|
||||
- [OneTwoVLA](https://one-two-vla.github.io/): A Unified Vision-Language-Action Model with Adaptive Reasoning [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/Fanqi-Lin/OneTwoVLA">](https://github.com/Fanqi-Lin/OneTwoVLA)
|
||||
- [SmolVLA](https://huggingface.co/blog/smolvla): Efficient Vision-Language-Action Model trained on Lerobot Community Data [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/huggingface/lerobot">](https://github.com/huggingface/lerobot)
|
||||
- [SpatialVLA](https://spatialvla.github.io/): a spatial-enhanced vision-language-action model that is trained on 1.1 Million real robot episodes [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/SpatialVLA/SpatialVLA">](https://github.com/SpatialVLA/SpatialVLA)
|
||||
- [openpi](https://www.physicalintelligence.company/blog/pi0): the official implementation of $π_0$: A Vision-Language-Action Flow Model for General Robot Control [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/Physical-Intelligence/openpi">](https://github.com/Physical-Intelligence/openpi)
|
||||
- [openpi](https://www.physicalintelligence.company/blog/pi0): the official implemenation of $π_0$: A Vision-Language-Action Flow Model for General Robot Control [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/Physical-Intelligence/openpi">](https://github.com/Physical-Intelligence/openpi)
|
||||
- [Isaac-GR00T](https://developer.nvidia.com/isaac/gr00t): NVIDIA Isaac GR00T N1 is the world's first open foundation model for generalized humanoid robot reasoning and skills [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/NVIDIA/Isaac-GR00T">](https://github.com/NVIDIA/Isaac-GR00T)
|
||||
|
||||
### Dataset
|
||||
|
||||
- [Official](https://huggingface.co/lerobot): State-of-the-art Machine Learning for real-world robotics.
|
||||
- [IPEC-COMMUNITY/OpenX](https://huggingface.co/collections/IPEC-COMMUNITY/openx-lerobot-67c29b2ee5911f17dbea635e): Open X-Embodiment datasets in LeRobot format with standard transformation
|
||||
- [IPEC-COMMUNITY/LIBERO](https://huggingface.co/collections/IPEC-COMMUNITY/libero-benchmark-dataset-684837af28d465aa8b043950): LIBERO datasets in LeRobot format with standard transformation and filtering
|
||||
- [IPEC-COMMUNITY/OpenX](https://huggingface.co/collections/IPEC-COMMUNITY/openx-lerobot-67c29b2ee5911f17dbea635e): Open X-Embodiment datasets in LeRobot format with standard transfomation
|
||||
- [IPEC-COMMUNITY/LIBERO](https://huggingface.co/collections/IPEC-COMMUNITY/libero-benchmark-dataset-684837af28d465aa8b043950): LIBERO datasets in LeRobot format with standard transfomation and filtering
|
||||
- [weijian-sun/agibotworld-lerobot](https://huggingface.co/datasets/weijian-sun/agibotworld-lerobot): AgibotWorld-LeRobot v2.0
|
||||
- [GR00T-Dateset](https://huggingface.co/GR00T-Dateset): Isaac-GR00T training dataset
|
||||
- [nvidia/PhysicalAI-Robotics-GR00T-X-Embodiment-Sim](https://huggingface.co/datasets/nvidia/PhysicalAI-Robotics-GR00T-X-Embodiment-Sim): Isaac-GR00T training dataset
|
||||
@@ -111,7 +107,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
|
||||
- [LeKiwi](https://github.com/SIGRobotics-UIUC/LeKiwi): Low-Cost Mobile Manipulator [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/SIGRobotics-UIUC/LeKiwi">](https://github.com/SIGRobotics-UIUC/LeKiwi)
|
||||
- [XLeRobot](https://github.com/Vector-Wangel/XLeRobot): Fully Autonomous Household Dual-Arm Mobile Robot for $998 [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/Vector-Wangel/XLeRobot">](https://github.com/Vector-Wangel/XLeRobot)
|
||||
- [LeRobot-Kinematics](https://github.com/box2ai-robotics/lerobot-kinematics): Simple and Accurate Forward and Inverse Kinematics Examples for the Lerobot SO100 ARM [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/box2ai-robotics/lerobot-kinematics">](https://github.com/box2ai-robotics/lerobot-kinematics)
|
||||
- [lerobotdepot](https://github.com/maximilienroberti/lerobotdepot): a repo for hardware, components, and 3D-printable projects compatible with the LeRobot library [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/maximilienroberti/lerobotdepot">](https://github.com/maximilienroberti/lerobotdepot)
|
||||
- [lerobotdepot](https://github.com/maximilienroberti/lerobotdepot): a reoi for hardware, components, and 3D-printable projects compatible with the LeRobot library [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/maximilienroberti/lerobotdepot">](https://github.com/maximilienroberti/lerobotdepot)
|
||||
- [PingTi-Arm](https://github.com/nomorewzx/PingTi-Arm): A human-scale robotic arm compatible with Lerobot, based on SO100 [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/nomorewzx/PingTi-Arm">](https://github.com/nomorewzx/PingTi-Arm)
|
||||
- [LeDog](https://github.com/wuxiaoqiang12/LeDog): A mobile manipulator with a Weilan AlphaDog and a LeRobot-compatible SO-101 arm, with support for ROS 1/2 (Official: [ledog_ros2](https://gitcode.com/openeuler/ledog_ros2)) [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/wuxiaoqiang12/LeDog">](https://github.com/wuxiaoqiang12/LeDog)
|
||||
|
||||
@@ -136,6 +132,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
|
||||
- [LeRobot Dataset Visualizer](https://github.com/huggingface/lerobot-dataset-visualizer): Web application for visualizing robotics datasets in LeRobot format [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/huggingface/lerobot-dataset-visualizer">](https://github.com/huggingface/lerobot-dataset-visualizer)
|
||||
- [lerobot_so101_teleop](https://github.com/liorbenhorin/lerobot_so101_teleop): Sample Environment for the LeRobot SO-101 Robot in Isaac Lab to collect demonstrations in a simulation [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/liorbenhorin/lerobot_so101_teleop">](https://github.com/liorbenhorin/lerobot_so101_teleop)
|
||||
- [Robot Learning: A Tutorial](https://github.com/fracapuano/robot-learning-tutorial): All the source code for "Robot Learning: A Tutorial". Get involved to be featured in the next iteration [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/fracapuano/robot-learning-tutorial">](https://github.com/fracapuano/robot-learning-tutorial)
|
||||
- [LERO](https://github.com/masato-ka/lero): LeRobot dataset Operations toolkit [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/masato-ka/lero">](https://github.com/masato-ka/lero)
|
||||
- [CRISP](https://utiasdsl.github.io/crisp_controllers/): Record datasets and deploy policies using LeRobot and ROS2-compatible manipulators (Franka Robotics FR3 and more supported) [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/utiasdsl/crisp_controllers">](https://github.com/utiasdsl/crisp_controllers)
|
||||
|
||||
## 👷♂️ Contributing
|
||||
@@ -157,7 +154,7 @@ If you find this repository helpful in your research or projects, please conside
|
||||
```bibtex
|
||||
@misc{any4lerobot,
|
||||
title = {Any4LeRobot: A tool collection for LeRobot},
|
||||
author = {Qizhi Chen, Dong Wang, Bin Zhao},
|
||||
author = {Qizhi Chen},
|
||||
license = {MIT},
|
||||
url = {https://github.com/Tavish9/any4lerobot},
|
||||
year = {2025},
|
||||
|
||||
+49
-125
@@ -1,112 +1,68 @@
|
||||
import argparse
|
||||
import inspect
|
||||
import gc
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import ray
|
||||
import torch
|
||||
from agibot_utils.agibot_utils import get_task_info, load_local_dataset
|
||||
from agibot_utils.config import AgiBotWorld_TASK_TYPE
|
||||
from agibot_utils.lerobot_utils import compute_episode_stats, generate_features_from_config
|
||||
from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata
|
||||
from lerobot.datasets.dataset_writer import DatasetWriter
|
||||
from lerobot.datasets.feature_utils import get_hf_features_from_features, validate_episode_buffer, validate_frame
|
||||
from lerobot.datasets.utils import DEFAULT_EPISODES_PATH
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.utils import validate_episode_buffer, validate_frame
|
||||
from ray.runtime_env import RuntimeEnv
|
||||
|
||||
|
||||
class AgiBotDatasetMetadata(LeRobotDatasetMetadata):
|
||||
def _flush_metadata_buffer(self) -> None:
|
||||
"""Write all buffered episode metadata to parquet file."""
|
||||
if not hasattr(self, "_metadata_buffer") or len(self._metadata_buffer) == 0:
|
||||
return
|
||||
|
||||
combined_dict = {}
|
||||
for episode_dict in self._metadata_buffer:
|
||||
for key, value in episode_dict.items():
|
||||
if key not in combined_dict:
|
||||
combined_dict[key] = []
|
||||
# Extract value and serialize numpy arrays
|
||||
# because PyArrow's from_pydict function doesn't support numpy arrays
|
||||
val = value[0] if isinstance(value, list) else value
|
||||
combined_dict[key].append(val.tolist() if isinstance(val, np.ndarray) else val)
|
||||
|
||||
first_ep = self._metadata_buffer[0]
|
||||
chunk_idx = first_ep["meta/episodes/chunk_index"][0]
|
||||
file_idx = first_ep["meta/episodes/file_index"][0]
|
||||
|
||||
schema = None if not self._pq_writer else self._pq_writer.schema
|
||||
table = pa.Table.from_pydict(combined_dict, schema=schema)
|
||||
|
||||
if not self._pq_writer:
|
||||
path = Path(self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx))
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self._pq_writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
|
||||
|
||||
self._pq_writer.write_table(table)
|
||||
|
||||
self.latest_episode = self._metadata_buffer[-1]
|
||||
self._metadata_buffer.clear()
|
||||
|
||||
|
||||
class AgiBotDatasetWriter(DatasetWriter):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.hf_features = get_hf_features_from_features(self._meta.features)
|
||||
|
||||
class AgiBotDataset(LeRobotDataset):
|
||||
def add_frame(self, frame: dict) -> None:
|
||||
"""
|
||||
Add a single frame to the current episode buffer.
|
||||
|
||||
Apart from images written to a temporary directory, nothing is written to disk
|
||||
until ``save_episode()`` is called.
|
||||
|
||||
The caller must provide all user-defined features plus ``"task"``, and must
|
||||
not provide ``"timestamp"`` or ``"frame_index"``; those are computed
|
||||
automatically.
|
||||
This function only adds the frame to the episode_buffer. Apart from images — which are written in a
|
||||
temporary directory — nothing is written to disk. To save those frames, the 'save_episode()' method
|
||||
then needs to be called.
|
||||
"""
|
||||
# Convert torch to numpy if needed
|
||||
for name in frame:
|
||||
if isinstance(frame[name], torch.Tensor):
|
||||
frame[name] = frame[name].numpy()
|
||||
|
||||
features = {
|
||||
key: value for key, value in self._meta.features.items() if key in self.hf_features
|
||||
} # remove video keys
|
||||
features = {key: value for key, value in self.features.items() if key in self.hf_features} # remove video keys
|
||||
validate_frame(frame, features)
|
||||
|
||||
if self.episode_buffer is None:
|
||||
self.episode_buffer = self._create_episode_buffer()
|
||||
self.episode_buffer = self.create_episode_buffer()
|
||||
|
||||
# Automatically add frame_index and timestamp to episode buffer
|
||||
frame_index = self.episode_buffer["size"]
|
||||
timestamp = frame_index / self._meta.fps
|
||||
timestamp = frame.pop("timestamp") if "timestamp" in frame else frame_index / self.fps
|
||||
self.episode_buffer["frame_index"].append(frame_index)
|
||||
self.episode_buffer["timestamp"].append(timestamp)
|
||||
self.episode_buffer["task"].append(frame.pop("task"))
|
||||
self.episode_buffer["task"].append(frame.pop("task")) # Remove task from frame after processing
|
||||
|
||||
# Add frame features to episode_buffer
|
||||
for key, value in frame.items():
|
||||
if key not in self._meta.features:
|
||||
if key not in self.features:
|
||||
raise ValueError(
|
||||
f"An element of the frame is not in the features. '{key}' not in '{self._meta.features.keys()}'."
|
||||
f"An element of the frame is not in the features. '{key}' not in '{self.features.keys()}'."
|
||||
)
|
||||
|
||||
self.episode_buffer[key].append(value)
|
||||
|
||||
self.episode_buffer["size"] += 1
|
||||
|
||||
def save_episode(
|
||||
self, videos: dict, action_config: list, episode_data: dict | None = None, parallel_encoding: bool = True
|
||||
) -> None:
|
||||
"""Save the current episode in self.episode_buffer to disk."""
|
||||
def save_episode(self, videos: dict, action_config: list, episode_data: dict | None = None) -> None:
|
||||
"""
|
||||
This will save to disk the current episode in self.episode_buffer.
|
||||
|
||||
Args:
|
||||
episode_data (dict | None, optional): Dict containing the episode data to save. If None, this will
|
||||
save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to
|
||||
None.
|
||||
"""
|
||||
episode_buffer = episode_data if episode_data is not None else self.episode_buffer
|
||||
|
||||
validate_episode_buffer(episode_buffer, self._meta.total_episodes, self._meta.features)
|
||||
validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features)
|
||||
|
||||
# size and task are special cases that won't be added to hf_dataset
|
||||
episode_length = episode_buffer.pop("size")
|
||||
@@ -114,49 +70,53 @@ class AgiBotDatasetWriter(DatasetWriter):
|
||||
episode_tasks = list(set(tasks))
|
||||
episode_index = episode_buffer["episode_index"]
|
||||
|
||||
episode_buffer["index"] = np.arange(self._meta.total_frames, self._meta.total_frames + episode_length)
|
||||
episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length)
|
||||
episode_buffer["episode_index"] = np.full((episode_length,), episode_index)
|
||||
|
||||
# Update tasks and task indices with new tasks if any
|
||||
self._meta.save_episode_tasks(episode_tasks)
|
||||
self.meta.save_episode_tasks(episode_tasks)
|
||||
|
||||
# Given tasks in natural language, find their corresponding task indices
|
||||
episode_buffer["task_index"] = np.array([self._meta.get_task_index(task) for task in tasks])
|
||||
episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks])
|
||||
|
||||
for key, ft in self._meta.features.items():
|
||||
for key, ft in self.features.items():
|
||||
# index, episode_index, task_index are already processed above, and image and video
|
||||
# are processed separately by storing image path and frame info as meta data
|
||||
if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]:
|
||||
continue
|
||||
episode_buffer[key] = np.stack(episode_buffer[key]).squeeze()
|
||||
|
||||
for key in self._meta.video_keys:
|
||||
episode_buffer[key] = str(videos[key])
|
||||
for key in self.meta.video_keys:
|
||||
episode_buffer[key] = str(videos[key]) # PosixPath -> str
|
||||
|
||||
ep_stats = compute_episode_stats(episode_buffer, self._meta.features)
|
||||
ep_stats = compute_episode_stats(episode_buffer, self.features)
|
||||
|
||||
ep_metadata = self._save_episode_data(episode_buffer)
|
||||
has_video_keys = len(self._meta.video_keys) > 0
|
||||
use_batched_encoding = self._batch_encoding_size > 1
|
||||
has_video_keys = len(self.meta.video_keys) > 0
|
||||
use_batched_encoding = self.batch_encoding_size > 1
|
||||
|
||||
self.current_videos = videos
|
||||
if has_video_keys and not use_batched_encoding:
|
||||
for video_key in self._meta.video_keys:
|
||||
for video_key in self.meta.video_keys:
|
||||
ep_metadata.update(self._save_episode_video(video_key, episode_index))
|
||||
|
||||
# `meta.save_episode` be executed after encoding the videos
|
||||
# add action_config to current episode
|
||||
ep_metadata.update({"action_config": action_config})
|
||||
self._meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
|
||||
self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
|
||||
|
||||
if has_video_keys and use_batched_encoding:
|
||||
self._episodes_since_last_encoding += 1
|
||||
if self._episodes_since_last_encoding == self._batch_encoding_size:
|
||||
start_ep = self._meta.total_episodes - self._batch_encoding_size
|
||||
end_ep = self._meta.total_episodes
|
||||
# Check if we should trigger batch encoding
|
||||
self.episodes_since_last_encoding += 1
|
||||
if self.episodes_since_last_encoding == self.batch_encoding_size:
|
||||
start_ep = self.num_episodes - self.batch_encoding_size
|
||||
end_ep = self.num_episodes
|
||||
self._batch_save_episode_video(start_ep, end_ep)
|
||||
self._episodes_since_last_encoding = 0
|
||||
self.episodes_since_last_encoding = 0
|
||||
|
||||
if not episode_data:
|
||||
self.clear_episode_buffer(delete_images=len(self._meta.image_keys) > 0)
|
||||
# Reset episode buffer and clean up temporary images (if not already deleted during video encoding)
|
||||
self.clear_episode_buffer(delete_images=len(self.meta.image_keys) > 0)
|
||||
|
||||
def _encode_temporary_episode_video(self, video_key: str, episode_index: int) -> Path:
|
||||
"""
|
||||
@@ -164,47 +124,11 @@ class AgiBotDatasetWriter(DatasetWriter):
|
||||
Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding,
|
||||
since video encoding with ffmpeg is already using multithreading.
|
||||
"""
|
||||
temp_path = Path(tempfile.mkdtemp(dir=self._root)) / f"{video_key}_{episode_index:03d}.mp4"
|
||||
temp_path = Path(tempfile.mkdtemp(dir=self.root)) / f"{video_key}_{episode_index:03d}.mp4"
|
||||
shutil.copy(self.current_videos[video_key], temp_path)
|
||||
return temp_path
|
||||
|
||||
|
||||
class AgiBotDataset(LeRobotDataset):
|
||||
@classmethod
|
||||
def create(cls, *args, **kwargs) -> "AgiBotDataset":
|
||||
sig = inspect.signature(super().create)
|
||||
bound = sig.bind_partial(*args, **kwargs)
|
||||
bound.apply_defaults()
|
||||
params = bound.arguments
|
||||
|
||||
obj = super().create(*args, **kwargs)
|
||||
|
||||
shutil.rmtree(params["root"], ignore_errors=True)
|
||||
obj.meta: AgiBotDatasetMetadata = AgiBotDatasetMetadata.create(
|
||||
repo_id=params["repo_id"],
|
||||
fps=params["fps"],
|
||||
robot_type=params["robot_type"],
|
||||
features=params["features"],
|
||||
root=params["root"],
|
||||
use_videos=params["use_videos"],
|
||||
metadata_buffer_size=params["metadata_buffer_size"],
|
||||
)
|
||||
obj.writer: AgiBotDatasetWriter = AgiBotDatasetWriter(
|
||||
meta=obj.meta,
|
||||
root=obj.root,
|
||||
vcodec=obj._vcodec,
|
||||
encoder_threads=obj._encoder_threads,
|
||||
batch_encoding_size=obj._batch_encoding_size,
|
||||
)
|
||||
return obj
|
||||
|
||||
def save_episode(
|
||||
self, videos: dict, action_config: list, episode_data: dict | None = None, parallel_encoding: bool = True
|
||||
) -> None:
|
||||
self._require_writer("save_episode")
|
||||
self.writer.save_episode(videos, action_config, episode_data, parallel_encoding)
|
||||
|
||||
|
||||
def get_all_tasks(src_path: Path, output_path: Path):
|
||||
json_files = src_path.glob("task_info/*.json")
|
||||
for json_file in json_files:
|
||||
@@ -267,13 +191,11 @@ def save_as_lerobot_dataset(agibot_world_config, task: tuple[Path, Path], save_d
|
||||
dataset.save_episode(videos=videos, action_config=action_config)
|
||||
except Exception as e:
|
||||
print(f"{json_file.stem}, episode_{eid}: there are some corrupted mp4s\nException details: {str(e)}")
|
||||
dataset.clear_episode_buffer(delete_images=False)
|
||||
dataset.episode_buffer = None
|
||||
continue
|
||||
|
||||
gc.collect()
|
||||
print(f"process done for {json_file.stem}, episode_id {eid}, len {len(frames)}")
|
||||
|
||||
dataset.finalize()
|
||||
|
||||
|
||||
def main(
|
||||
src_path: str,
|
||||
@@ -325,6 +247,8 @@ def main(
|
||||
with open("output.txt", "a") as f:
|
||||
f.write(f"{task}, exception details: {str(e)}\n")
|
||||
|
||||
ray.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
import numpy as np
|
||||
from lerobot.datasets.compute_stats import (
|
||||
DEFAULT_QUANTILES,
|
||||
auto_downsample_height_width,
|
||||
get_feature_stats,
|
||||
sample_indices,
|
||||
)
|
||||
from torchcodec.decoders import VideoDecoder
|
||||
import torch
|
||||
import torchvision
|
||||
from lerobot.datasets.compute_stats import auto_downsample_height_width, get_feature_stats, sample_indices
|
||||
|
||||
torchvision.set_video_backend("pyav")
|
||||
|
||||
|
||||
def generate_features_from_config(AgiBotWorld_CONFIG):
|
||||
@@ -22,8 +20,9 @@ def generate_features_from_config(AgiBotWorld_CONFIG):
|
||||
def sample_images(input):
|
||||
if type(input) is str:
|
||||
video_path = input
|
||||
decoder = VideoDecoder(video_path)
|
||||
frames_array = decoder[0:-1].numpy() # Shape: [T, C, H, W]
|
||||
reader = torchvision.io.VideoReader(video_path, stream="video")
|
||||
frames = [frame["data"] for frame in reader]
|
||||
frames_array = torch.stack(frames).numpy() # Shape: [T, C, H, W]
|
||||
|
||||
sampled_indices = sample_indices(len(frames_array))
|
||||
images = None
|
||||
@@ -51,31 +50,21 @@ def sample_images(input):
|
||||
return images
|
||||
|
||||
|
||||
def compute_episode_stats(
|
||||
episode_data: dict[str, list[str] | np.ndarray],
|
||||
features: dict,
|
||||
quantile_list: list[float] | None = None,
|
||||
) -> dict:
|
||||
if quantile_list is None:
|
||||
quantile_list = DEFAULT_QUANTILES
|
||||
|
||||
def compute_episode_stats(episode_data: dict[str, list[str] | np.ndarray], features: dict) -> dict:
|
||||
ep_stats = {}
|
||||
for key, data in episode_data.items():
|
||||
if features[key]["dtype"] == "string":
|
||||
continue
|
||||
|
||||
continue # HACK: we should receive np.arrays of strings
|
||||
elif features[key]["dtype"] in ["image", "video"]:
|
||||
ep_ft_array = sample_images(data)
|
||||
axes_to_reduce = (0, 2, 3)
|
||||
axes_to_reduce = (0, 2, 3) # keep channel dim
|
||||
keepdims = True
|
||||
else:
|
||||
ep_ft_array = data
|
||||
axes_to_reduce = 0
|
||||
keepdims = data.ndim == 1
|
||||
ep_ft_array = data # data is already a np.ndarray
|
||||
axes_to_reduce = 0 # compute stats over the first axis
|
||||
keepdims = data.ndim == 1 # keep as np.array
|
||||
|
||||
ep_stats[key] = get_feature_stats(
|
||||
ep_ft_array, axis=axes_to_reduce, keepdims=keepdims, quantile_list=quantile_list
|
||||
)
|
||||
ep_stats[key] = get_feature_stats(ep_ft_array, axis=axes_to_reduce, keepdims=keepdims)
|
||||
|
||||
if features[key]["dtype"] in ["image", "video"]:
|
||||
value_norm = 1.0 if "depth" in key else 255.0
|
||||
|
||||
@@ -32,11 +32,6 @@ import pyarrow.parquet as pq
|
||||
import tqdm
|
||||
from datasets import Dataset
|
||||
from huggingface_hub import snapshot_download
|
||||
from lerobot.datasets.io_utils import (
|
||||
load_info,
|
||||
load_tasks,
|
||||
write_info,
|
||||
)
|
||||
from lerobot.datasets.utils import (
|
||||
DEFAULT_CHUNK_SIZE,
|
||||
DEFAULT_DATA_PATH,
|
||||
@@ -45,8 +40,11 @@ from lerobot.datasets.utils import (
|
||||
LEGACY_EPISODES_PATH,
|
||||
LEGACY_EPISODES_STATS_PATH,
|
||||
LEGACY_TASKS_PATH,
|
||||
load_info,
|
||||
load_tasks,
|
||||
serialize_dict,
|
||||
unflatten_dict,
|
||||
write_info,
|
||||
)
|
||||
from lerobot.utils.constants import HF_LEROBOT_HOME
|
||||
from lerobot.utils.utils import init_logging
|
||||
@@ -54,12 +52,8 @@ from lerobot.utils.utils import init_logging
|
||||
V21 = "v2.1"
|
||||
V30 = "v3.0"
|
||||
|
||||
LEGACY_DATA_PATH_TEMPLATE = (
|
||||
"data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
|
||||
)
|
||||
LEGACY_VIDEO_PATH_TEMPLATE = (
|
||||
"videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
|
||||
)
|
||||
LEGACY_DATA_PATH_TEMPLATE = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
|
||||
LEGACY_VIDEO_PATH_TEMPLATE = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
|
||||
MIN_VIDEO_DURATION = 1e-6
|
||||
LEGACY_STATS_KEYS = ("mean", "std", "min", "max", "count")
|
||||
|
||||
@@ -143,9 +137,7 @@ def convert_info(
|
||||
if ft.get("dtype") != "video":
|
||||
ft.pop("fps", None)
|
||||
|
||||
info["total_chunks"] = (
|
||||
math.ceil(total_episodes / chunks_size) if total_episodes > 0 else 0
|
||||
)
|
||||
info["total_chunks"] = math.ceil(total_episodes / chunks_size) if total_episodes > 0 else 0
|
||||
info["total_videos"] = total_episodes * len(video_keys)
|
||||
|
||||
write_info(info, new_root)
|
||||
@@ -164,22 +156,14 @@ def _group_episodes_by_data_file(
|
||||
return grouped
|
||||
|
||||
|
||||
def convert_data(
|
||||
root: Path, new_root: Path, episode_records: list[dict[str, Any]]
|
||||
) -> None:
|
||||
def convert_data(root: Path, new_root: Path, episode_records: list[dict[str, Any]]) -> None:
|
||||
logging.info("Converting consolidated parquet files back to per-episode files")
|
||||
grouped = _group_episodes_by_data_file(episode_records)
|
||||
|
||||
for (chunk_idx, file_idx), records in tqdm.tqdm(
|
||||
grouped.items(), desc="convert data files"
|
||||
):
|
||||
source_path = root / DEFAULT_DATA_PATH.format(
|
||||
chunk_index=chunk_idx, file_index=file_idx
|
||||
)
|
||||
for (chunk_idx, file_idx), records in tqdm.tqdm(grouped.items(), desc="convert data files"):
|
||||
source_path = root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
|
||||
if not source_path.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Expected source parquet file not found: {source_path}"
|
||||
)
|
||||
raise FileNotFoundError(f"Expected source parquet file not found: {source_path}")
|
||||
|
||||
table = pq.read_table(source_path)
|
||||
records = sorted(records, key=lambda rec: int(rec["dataset_from_index"]))
|
||||
@@ -197,7 +181,7 @@ def convert_data(
|
||||
f"episode_index={episode_index}, length={length}"
|
||||
)
|
||||
|
||||
episode_table = table.slice(start, length)
|
||||
episode_table = table.slice(start, length).to_pandas()
|
||||
|
||||
dest_chunk = episode_index // DEFAULT_CHUNK_SIZE
|
||||
dest_path = new_root / LEGACY_DATA_PATH_TEMPLATE.format(
|
||||
@@ -205,7 +189,7 @@ def convert_data(
|
||||
episode_index=episode_index,
|
||||
)
|
||||
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
Dataset(episode_table).to_parquet(dest_path)
|
||||
Dataset.from_pandas(episode_table).to_parquet(dest_path)
|
||||
|
||||
|
||||
def _group_episodes_by_video_file(
|
||||
@@ -251,14 +235,10 @@ def _validate_video_paths(src: Path, dst: Path) -> None:
|
||||
# Validate file extensions for video files
|
||||
valid_video_extensions = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".m4v"}
|
||||
if src_resolved.suffix.lower() not in valid_video_extensions:
|
||||
raise ValueError(
|
||||
f"Source file does not have a valid video extension: {src_resolved}"
|
||||
)
|
||||
raise ValueError(f"Source file does not have a valid video extension: {src_resolved}")
|
||||
|
||||
if dst_resolved.suffix.lower() not in valid_video_extensions:
|
||||
raise ValueError(
|
||||
f"Destination file does not have a valid video extension: {dst_resolved}"
|
||||
)
|
||||
raise ValueError(f"Destination file does not have a valid video extension: {dst_resolved}")
|
||||
|
||||
# Check for path traversal attempts in the original paths
|
||||
src_str = str(src)
|
||||
@@ -273,16 +253,11 @@ def _validate_video_paths(src: Path, dst: Path) -> None:
|
||||
|
||||
# Additional check: ensure resolved paths don't point to system directories
|
||||
system_dirs = {"/etc", "/sys", "/proc", "/dev", "/boot", "/root"}
|
||||
for resolved_path, name in [
|
||||
(src_resolved, "source"),
|
||||
(dst_resolved, "destination"),
|
||||
]:
|
||||
for resolved_path, name in [(src_resolved, "source"), (dst_resolved, "destination")]:
|
||||
path_str = str(resolved_path)
|
||||
for sys_dir in system_dirs:
|
||||
if path_str.startswith(sys_dir + "/") or path_str == sys_dir:
|
||||
raise ValueError(
|
||||
f"Path points to system directory: {name} path {resolved_path}"
|
||||
)
|
||||
raise ValueError(f"Path points to system directory: {name} path {resolved_path}")
|
||||
|
||||
# Ensure the destination directory can be created safely
|
||||
try:
|
||||
@@ -349,13 +324,9 @@ def _extract_video_segment(
|
||||
text=True,
|
||||
)
|
||||
except subprocess.TimeoutExpired as exc:
|
||||
raise RuntimeError(
|
||||
f"ffmpeg timed out while processing video '{src}' -> '{dst}'"
|
||||
) from exc
|
||||
raise RuntimeError(f"ffmpeg timed out while processing video '{src}' -> '{dst}'") from exc
|
||||
except FileNotFoundError as exc:
|
||||
raise RuntimeError(
|
||||
"ffmpeg executable not found; it is required for video conversion"
|
||||
) from exc
|
||||
raise RuntimeError("ffmpeg executable not found; it is required for video conversion") from exc
|
||||
except subprocess.CalledProcessError as exc:
|
||||
error_msg = f"ffmpeg failed while splitting video '{src}' into '{dst}'"
|
||||
if exc.stderr:
|
||||
@@ -363,12 +334,7 @@ def _extract_video_segment(
|
||||
raise RuntimeError(error_msg) from exc
|
||||
|
||||
|
||||
def convert_videos(
|
||||
root: Path,
|
||||
new_root: Path,
|
||||
episode_records: list[dict[str, Any]],
|
||||
video_keys: list[str],
|
||||
) -> None:
|
||||
def convert_videos(root: Path, new_root: Path, episode_records: list[dict[str, Any]], video_keys: list[str]) -> None:
|
||||
if len(video_keys) == 0:
|
||||
logging.info("No video features detected; skipping video conversion")
|
||||
return
|
||||
@@ -381,9 +347,7 @@ def convert_videos(
|
||||
logging.info("No video metadata found for key '%s'; skipping", video_key)
|
||||
continue
|
||||
|
||||
for (chunk_idx, file_idx), records in tqdm.tqdm(
|
||||
grouped.items(), desc=f"convert videos ({video_key})"
|
||||
):
|
||||
for (chunk_idx, file_idx), records in tqdm.tqdm(grouped.items(), desc=f"convert videos ({video_key})"):
|
||||
src_path = root / DEFAULT_VIDEO_PATH.format(
|
||||
video_key=video_key,
|
||||
chunk_index=chunk_idx,
|
||||
@@ -392,10 +356,7 @@ def convert_videos(
|
||||
if not src_path.exists():
|
||||
raise FileNotFoundError(f"Expected MP4 file not found: {src_path}")
|
||||
|
||||
records = sorted(
|
||||
records,
|
||||
key=lambda rec: float(rec[f"videos/{video_key}/from_timestamp"]),
|
||||
)
|
||||
records = sorted(records, key=lambda rec: float(rec[f"videos/{video_key}/from_timestamp"]))
|
||||
|
||||
for record in records:
|
||||
episode_index = int(record["episode_index"])
|
||||
@@ -412,9 +373,7 @@ def convert_videos(
|
||||
_extract_video_segment(src_path, dest_path, start=start, end=end)
|
||||
|
||||
|
||||
def convert_episodes_metadata(
|
||||
new_root: Path, episode_records: list[dict[str, Any]]
|
||||
) -> None:
|
||||
def convert_episodes_metadata(new_root: Path, episode_records: list[dict[str, Any]]) -> None:
|
||||
logging.info("Reconstructing legacy episodes and episodes_stats JSONL files")
|
||||
|
||||
episodes_path = new_root / LEGACY_EPISODES_PATH
|
||||
@@ -437,9 +396,7 @@ def convert_episodes_metadata(
|
||||
jsonlines.open(episodes_path, mode="w") as episodes_writer,
|
||||
jsonlines.open(stats_path, mode="w") as stats_writer,
|
||||
):
|
||||
for record in sorted(
|
||||
episode_records, key=lambda rec: int(rec["episode_index"])
|
||||
):
|
||||
for record in sorted(episode_records, key=lambda rec: int(rec["episode_index"])):
|
||||
legacy_episode = {
|
||||
key: value
|
||||
for key, value in record.items()
|
||||
@@ -450,14 +407,10 @@ def convert_episodes_metadata(
|
||||
and key not in {"dataset_from_index", "dataset_to_index"}
|
||||
}
|
||||
|
||||
serializable_episode = {
|
||||
key: _to_serializable(value) for key, value in legacy_episode.items()
|
||||
}
|
||||
serializable_episode = {key: _to_serializable(value) for key, value in legacy_episode.items()}
|
||||
episodes_writer.write(serializable_episode)
|
||||
|
||||
stats_flat = {
|
||||
key: record[key] for key in record if key.startswith("stats/")
|
||||
}
|
||||
stats_flat = {key: record[key] for key in record if key.startswith("stats/")}
|
||||
stats_nested = unflatten_dict(stats_flat).get("stats", {})
|
||||
stats_serialized = serialize_dict(_filter_stats(stats_nested))
|
||||
stats_writer.write(
|
||||
@@ -500,11 +453,7 @@ def convert_dataset(
|
||||
new_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
episode_records = load_episode_records(root)
|
||||
video_keys = [
|
||||
key
|
||||
for key, ft in load_info(root)["features"].items()
|
||||
if ft.get("dtype") == "video"
|
||||
]
|
||||
video_keys = [key for key, ft in load_info(root)["features"].items() if ft.get("dtype") == "video"]
|
||||
|
||||
convert_info(root, new_root, episode_records, video_keys)
|
||||
convert_tasks(root, new_root)
|
||||
|
||||
@@ -1,102 +0,0 @@
|
||||
# Generic Converter
|
||||
|
||||
Shared conversion flow for turning task-based source datasets into LeRobot
|
||||
datasets.
|
||||
|
||||
The generic package owns the execution mechanics:
|
||||
|
||||
- create one temporary `LeRobotDataset` per `ConversionTask`
|
||||
- run tasks with a local or Ray Datatrove executor
|
||||
- aggregate temporary datasets into the adapter output directory
|
||||
- remove temporary task outputs by default
|
||||
- optionally push the aggregated dataset to the Hub
|
||||
|
||||
Dataset-specific converters own the adapter logic:
|
||||
|
||||
- where raw inputs come from
|
||||
- how tasks are discovered or loaded
|
||||
- how one raw input is converted into LeRobot episodes
|
||||
- how task metadata, such as language instructions, is represented
|
||||
|
||||
## Files
|
||||
|
||||
- `adapter.py`: `BaseAdapter`, the class dataset adapters inherit from.
|
||||
- `pipeline.py`: the reusable conversion, executor, aggregation, cleanup, and push flow.
|
||||
- `utils.py`: shared types and small helpers.
|
||||
|
||||
## Adapter Contract
|
||||
|
||||
A dataset converter should subclass `BaseAdapter`, pass `output_path` to the
|
||||
base constructor, and provide dataset-level metadata as class attributes.
|
||||
|
||||
Required attributes:
|
||||
|
||||
- `dataset_type`
|
||||
- `fps`
|
||||
- `robot_type`
|
||||
- `features`
|
||||
|
||||
Optional attributes:
|
||||
|
||||
- `tags`
|
||||
|
||||
Required methods:
|
||||
|
||||
- `load_tasks(self) -> list[ConversionTask]`
|
||||
- `load_subset(self, task: ConversionTask) -> Iterable[Sequence[dict]]`
|
||||
|
||||
`run_converter` reads `adapter.output_path` and calls `adapter.load_tasks()`
|
||||
without arguments. Store paths, task manifests, or other adapter options on the
|
||||
adapter instance in `__init__`.
|
||||
|
||||
Use `adapter.temp_output_path` when building task-level temporary output paths.
|
||||
|
||||
`load_subset` receives the full `ConversionTask`, not just an input path. Use
|
||||
`task.input_path` for raw data and `task.metadata` for dataset-specific values
|
||||
such as language instructions. Each yielded episode must be a sequence of frame
|
||||
dictionaries accepted by `LeRobotDataset.add_frame`; each frame should include
|
||||
the LeRobot `task` field when language tasks are needed.
|
||||
|
||||
## ConversionTask
|
||||
|
||||
`ConversionTask` describes one independently convertible raw input:
|
||||
|
||||
- `input_path`: source file or directory
|
||||
- `output_path`: temporary LeRobot dataset directory for this task
|
||||
- `local_repo_id`: repo id used while writing the temporary dataset
|
||||
- `metadata`: adapter-owned metadata
|
||||
|
||||
Keep dataset-specific values in `metadata`; the generic pipeline does not know
|
||||
about task-file schemas or instruction formats.
|
||||
|
||||
## Usage Sketch
|
||||
|
||||
```python
|
||||
from generic_converter import BaseAdapter, ConversionTask, run_converter
|
||||
|
||||
|
||||
class MyAdapter(BaseAdapter):
|
||||
dataset_type = "my_dataset"
|
||||
fps = 20
|
||||
robot_type = "my_robot"
|
||||
features = MY_FEATURES
|
||||
tags = ["my_dataset"]
|
||||
|
||||
def __init__(self, output_path):
|
||||
super().__init__(output_path)
|
||||
|
||||
def load_tasks(self) -> list[ConversionTask]:
|
||||
...
|
||||
|
||||
def load_subset(self, task: ConversionTask):
|
||||
...
|
||||
|
||||
|
||||
run_converter(
|
||||
adapter=adapter,
|
||||
executor="local",
|
||||
cpus_per_task=1,
|
||||
tasks_per_job=1,
|
||||
workers=-1,
|
||||
)
|
||||
```
|
||||
@@ -1,11 +0,0 @@
|
||||
from .adapter import BaseAdapter
|
||||
from .pipeline import run_converter
|
||||
from .utils import ConversionTask, FeatureSpec, TaskMetadata
|
||||
|
||||
__all__ = [
|
||||
"BaseAdapter",
|
||||
"ConversionTask",
|
||||
"FeatureSpec",
|
||||
"TaskMetadata",
|
||||
"run_converter",
|
||||
]
|
||||
@@ -1,32 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Iterable, Sequence
|
||||
from pathlib import Path
|
||||
|
||||
from .utils import ConversionTask, FeatureSpec
|
||||
|
||||
|
||||
class BaseAdapter(ABC):
|
||||
"""Dataset-specific hooks used by the generic conversion pipeline."""
|
||||
|
||||
dataset_type: str
|
||||
fps: int
|
||||
robot_type: str
|
||||
features: FeatureSpec
|
||||
tags: Sequence[str] = ()
|
||||
|
||||
def __init__(self, output_path: Path):
|
||||
self.output_path = output_path.expanduser().resolve()
|
||||
|
||||
@property
|
||||
def temp_output_path(self) -> Path:
|
||||
return self.output_path.with_name(f"{self.output_path.name}_temp")
|
||||
|
||||
@abstractmethod
|
||||
def load_tasks(self) -> list[ConversionTask]:
|
||||
"""Build conversion tasks from dataset-specific inputs."""
|
||||
|
||||
@abstractmethod
|
||||
def load_subset(
|
||||
self, task: ConversionTask
|
||||
) -> Iterable[Sequence[dict]]:
|
||||
"""Yield LeRobot episodes for one raw input path."""
|
||||
@@ -1,226 +0,0 @@
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
from collections.abc import Sequence
|
||||
from pathlib import Path
|
||||
|
||||
from datatrove.pipeline.base import PipelineStep
|
||||
from datatrove.utils.logging import get_random_str, get_timestamp
|
||||
from lerobot.datasets import LeRobotDataset
|
||||
from lerobot.datasets.aggregate import aggregate_datasets
|
||||
|
||||
from .adapter import BaseAdapter
|
||||
from .utils import (
|
||||
ConversionTask,
|
||||
setup_logger,
|
||||
unique_strings,
|
||||
)
|
||||
|
||||
|
||||
class SaveLeRobotDataset(PipelineStep):
|
||||
name = "Save Temp LeRobotDataset"
|
||||
|
||||
def __init__(self, tasks: list[ConversionTask], adapter: BaseAdapter):
|
||||
super().__init__()
|
||||
self.tasks = tasks
|
||||
self.adapter = adapter
|
||||
self.type = f"{adapter.dataset_type}2lerobot"
|
||||
|
||||
def run(self, data=None, rank: int = 0, world_size: int = 1):
|
||||
logger = setup_logger()
|
||||
task = self.tasks[rank]
|
||||
|
||||
if task.output_path.exists():
|
||||
shutil.rmtree(task.output_path)
|
||||
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id=task.local_repo_id,
|
||||
root=task.output_path,
|
||||
fps=self.adapter.fps,
|
||||
robot_type=self.adapter.robot_type,
|
||||
features=self.adapter.features,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"start processing for {task.input_path}, saving to {task.output_path}"
|
||||
)
|
||||
raw_dataset = self.adapter.load_subset(task)
|
||||
for episode_index, episode_data in enumerate(raw_dataset):
|
||||
with self.track_time("saving episode"):
|
||||
for frame in episode_data:
|
||||
dataset.add_frame(frame)
|
||||
dataset.save_episode()
|
||||
logger.info(
|
||||
f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}"
|
||||
)
|
||||
dataset.finalize()
|
||||
|
||||
|
||||
def run_converter(
|
||||
adapter: BaseAdapter,
|
||||
executor: str,
|
||||
cpus_per_task: int,
|
||||
tasks_per_job: int,
|
||||
workers: int,
|
||||
resume_dir: str | None = None,
|
||||
debug: bool = False,
|
||||
local_repo_id: str | None = None,
|
||||
hub_repo_id: str | None = None,
|
||||
push_to_hub: bool = False,
|
||||
cleanup_temp: bool = True,
|
||||
extra_tags: Sequence[str] | None = None,
|
||||
) -> Path:
|
||||
tasks = adapter.load_tasks()
|
||||
output_path = adapter.output_path
|
||||
|
||||
if not tasks:
|
||||
raise ValueError(
|
||||
"No conversion tasks found. Provide a non-empty tasks file or matching source files."
|
||||
)
|
||||
if cpus_per_task < 1:
|
||||
raise ValueError("--cpus-per-task must be >= 1")
|
||||
|
||||
output_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if debug:
|
||||
executor = "local"
|
||||
workers = 1
|
||||
tasks = tasks[:2]
|
||||
push_to_hub = False
|
||||
|
||||
match executor:
|
||||
case "local":
|
||||
from datatrove.executor import LocalPipelineExecutor
|
||||
|
||||
resolved_workers = (
|
||||
max(1, (os.cpu_count() or 1) // cpus_per_task)
|
||||
if workers == -1
|
||||
else workers
|
||||
)
|
||||
executor_cls, executor_config = LocalPipelineExecutor, {
|
||||
"tasks": len(tasks),
|
||||
"workers": resolved_workers,
|
||||
}
|
||||
case "ray":
|
||||
import ray
|
||||
from datatrove.executor import RayPipelineExecutor
|
||||
from ray.runtime_env import RuntimeEnv
|
||||
|
||||
runtime_env = RuntimeEnv(env_vars=_build_ray_env_vars())
|
||||
ray.init(runtime_env=runtime_env)
|
||||
executor_cls, executor_config = RayPipelineExecutor, {
|
||||
"tasks": len(tasks),
|
||||
"workers": workers,
|
||||
"cpus_per_task": cpus_per_task,
|
||||
"tasks_per_job": tasks_per_job,
|
||||
}
|
||||
case _:
|
||||
raise ValueError(f"Executor {executor} not supported")
|
||||
|
||||
if resume_dir:
|
||||
logging_dir = str(resume_dir)
|
||||
else:
|
||||
logging_dir = str(Path.cwd() / "logs" / f"{get_timestamp()}_{get_random_str()}")
|
||||
|
||||
executor_cls(
|
||||
pipeline=[SaveLeRobotDataset(tasks, adapter)],
|
||||
**executor_config,
|
||||
logging_dir=logging_dir,
|
||||
).run()
|
||||
aggregate_tasks(tasks, output_path, aggr_repo_id=local_repo_id)
|
||||
|
||||
if cleanup_temp:
|
||||
logger = setup_logger()
|
||||
logger.info("Delete temp data_dir")
|
||||
shutil.rmtree(adapter.temp_output_path, ignore_errors=True)
|
||||
|
||||
if push_to_hub:
|
||||
if hub_repo_id is None:
|
||||
raise ValueError("--repo-id is required when --push-to-hub is set")
|
||||
|
||||
tags = unique_strings(
|
||||
[
|
||||
"LeRobot",
|
||||
adapter.dataset_type,
|
||||
adapter.robot_type,
|
||||
*adapter.tags,
|
||||
*(extra_tags or []),
|
||||
]
|
||||
)
|
||||
LeRobotDataset(
|
||||
repo_id=hub_repo_id,
|
||||
root=output_path,
|
||||
).push_to_hub(
|
||||
tags=tags,
|
||||
private=False,
|
||||
push_videos=True,
|
||||
license="apache-2.0",
|
||||
upload_large_folder=False,
|
||||
)
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
def _build_ray_env_vars() -> dict[str, str]:
|
||||
env_vars = {
|
||||
"HDF5_USE_FILE_LOCKING": "FALSE",
|
||||
"HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE",
|
||||
"SVT_LOG": "1",
|
||||
}
|
||||
pythonpath = _build_ray_pythonpath()
|
||||
if pythonpath:
|
||||
env_vars["PYTHONPATH"] = pythonpath
|
||||
return env_vars
|
||||
|
||||
|
||||
def _build_ray_pythonpath() -> str:
|
||||
repo_root = Path(__file__).resolve().parents[1]
|
||||
paths: list[str] = []
|
||||
|
||||
def add_path(path_value: str | Path):
|
||||
path = Path(path_value).expanduser()
|
||||
try:
|
||||
path = path.resolve()
|
||||
except OSError:
|
||||
return
|
||||
if not path.exists():
|
||||
return
|
||||
path_str = str(path)
|
||||
if path_str not in paths:
|
||||
paths.append(path_str)
|
||||
|
||||
add_path(repo_root)
|
||||
add_path(Path.cwd())
|
||||
for path in sys.path:
|
||||
if path:
|
||||
add_path(path)
|
||||
for path in os.environ.get("PYTHONPATH", "").split(os.pathsep):
|
||||
if path:
|
||||
add_path(path)
|
||||
|
||||
return os.pathsep.join(paths)
|
||||
|
||||
|
||||
def aggregate_tasks(
|
||||
tasks: list[ConversionTask],
|
||||
output_dir: Path,
|
||||
aggr_repo_id: str | None = None,
|
||||
):
|
||||
logger = setup_logger()
|
||||
|
||||
if output_dir.exists():
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
roots = [task.output_path for task in tasks]
|
||||
resolved_aggr_repo_id = aggr_repo_id or output_dir.name
|
||||
|
||||
logger.info(
|
||||
f"aggregate {len(tasks)} temporary datasets into {output_dir} as {resolved_aggr_repo_id}"
|
||||
)
|
||||
aggregate_datasets(
|
||||
repo_ids=[None] * len(tasks),
|
||||
roots=roots,
|
||||
aggr_repo_id=resolved_aggr_repo_id,
|
||||
aggr_root=output_dir,
|
||||
)
|
||||
logger.info(f"aggregation complete: {output_dir}")
|
||||
@@ -1,39 +0,0 @@
|
||||
import shutil
|
||||
from collections.abc import Mapping, Sequence
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
TaskMetadata = Mapping[str, Any]
|
||||
FeatureSpec = Mapping[str, dict]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ConversionTask:
|
||||
"""One independently convertible raw input file and adapter metadata."""
|
||||
|
||||
input_path: Path
|
||||
output_path: Path
|
||||
local_repo_id: str | None = None
|
||||
metadata: TaskMetadata = field(default_factory=dict)
|
||||
|
||||
|
||||
def setup_logger():
|
||||
import sys
|
||||
|
||||
from datatrove.utils.logging import logger
|
||||
|
||||
logger.remove()
|
||||
logger.add(sys.stdout, level="INFO", colorize=True)
|
||||
return logger
|
||||
|
||||
|
||||
def unique_strings(values: Sequence[str]) -> list[str]:
|
||||
result = []
|
||||
seen = set()
|
||||
for value in values:
|
||||
if value in seen:
|
||||
continue
|
||||
result.append(value)
|
||||
seen.add(value)
|
||||
return result
|
||||
+20
-46
@@ -8,13 +8,13 @@ In this dataset, we have made several key improvements:
|
||||
|
||||
- **OpenVLA-based LIBERO Regeneration**: Resolution enhancement, No-op action filtration, 180° RGB frame rotation, Failed trajectory filtering.
|
||||
- **State Data Preservation**: Maintained native LIBERO state information (accessible via `states.ee_state`, `states.joint_state` and etc.).
|
||||
- **Robust Conversion Pipeline**: Using the shared `generic_converter` pipeline with local and Ray DataTrove executors for high-speed dataset transformation and resumable conversion.
|
||||
- **Robust Conversion Pipeline**: Using DataTrove framework for High-speed dataset transformation and automatic failure recovery during conversion
|
||||
|
||||
Dataset Structure of `meta/info.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"codebase_version": "v3.0", // latest lerobot format
|
||||
"codebase_version": "v3.0", // lastest lerobot format
|
||||
"robot_type": "franka", // specific robot type
|
||||
"fps": 20, // control frequency
|
||||
"features": {
|
||||
@@ -41,30 +41,7 @@ Dataset Structure of `meta/info.json`:
|
||||
"has_audio": false
|
||||
}
|
||||
},
|
||||
"observation.images.wrist_image": {
|
||||
"dtype": "video",
|
||||
"shape": [
|
||||
256,
|
||||
256,
|
||||
3
|
||||
],
|
||||
"names": [
|
||||
"height",
|
||||
"width",
|
||||
"rgb"
|
||||
],
|
||||
"info": {
|
||||
"video.height": 256,
|
||||
"video.width": 256,
|
||||
"video.codec": "av1",
|
||||
"video.pix_fmt": "yuv420p",
|
||||
"video.is_depth_map": false,
|
||||
"video.fps": 20,
|
||||
"video.channels": 3,
|
||||
"has_audio": false
|
||||
}
|
||||
},
|
||||
// for more state keys, see LiberoAdapter.features in libero_h5.py
|
||||
// for more states key, see configs
|
||||
"observation.state": {
|
||||
"dtype": "float32",
|
||||
"shape": [
|
||||
@@ -75,9 +52,9 @@ Dataset Structure of `meta/info.json`:
|
||||
"x",
|
||||
"y",
|
||||
"z",
|
||||
"axis_angle1",
|
||||
"axis_angle2",
|
||||
"axis_angle3",
|
||||
"roll",
|
||||
"pitch",
|
||||
"yaw",
|
||||
"gripper",
|
||||
"gripper"
|
||||
]
|
||||
@@ -94,9 +71,9 @@ Dataset Structure of `meta/info.json`:
|
||||
"x",
|
||||
"y",
|
||||
"z",
|
||||
"axis_angle1",
|
||||
"axis_angle2",
|
||||
"axis_angle3",
|
||||
"roll",
|
||||
"pitch",
|
||||
"yaw",
|
||||
"gripper"
|
||||
]
|
||||
}
|
||||
@@ -112,33 +89,31 @@ Dataset Structure of `meta/info.json`:
|
||||
Follow instructions in [official repo](https://github.com/huggingface/lerobot?tab=readme-ov-file#installation).
|
||||
|
||||
2. Install others:
|
||||
We use DataTrove for conversion. Install the Ray extra if you want distributed execution across multiple cores or nodes.
|
||||
We use datatrove[ray] for parallel conversion, significantly speeding up data processing tasks by distributing the workload across multiple cores or nodes (if any).
|
||||
```bash
|
||||
pip install h5py
|
||||
pip install -U datatrove
|
||||
pip install -U "datatrove[ray]" # optional, for --executor ray
|
||||
pip install -U "datatrove[ray]" # if you want ray features
|
||||
```
|
||||
|
||||
## Get started
|
||||
|
||||
> [!NOTE]
|
||||
> This script supports converting LIBERO-style HDF5 directories to LeRobot. If you want to convert from RLDS to LeRobot, check [openx2lerobot](../openx2lerobot/README.md).
|
||||
> This script supports converting from original hdf5 to lerobot. If you want to convert from rlds to lerobot, check [openx2lerobot](../openx2lerobot/README.md).
|
||||
|
||||
### Download source code:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/Tavish9/any4lerobot.git
|
||||
cd any4lerobot/libero2lerobot
|
||||
```
|
||||
|
||||
### Regenerate LIBERO Trajectory:
|
||||
|
||||
1. [Install LIBERO dependency](https://github.com/Lifelong-Robot-Learning/LIBERO?tab=readme-ov-file#installtion)
|
||||
2. Replace `libero_90` with your target libero dataset.
|
||||
3. The converter feature schema expects `256x256x3` RGB observations. If your source HDF5 files are the original `128x128` LIBERO files, regenerate them first with `--resolution 256`, or update the image feature shapes in `libero_h5.py` to match your data.
|
||||
|
||||
```bash
|
||||
python regenerate_libero_dataset.py \
|
||||
python libero_utils/regenerate_libero_dataset.py \
|
||||
--resolution 256 \
|
||||
--libero_task_suite libero_90 \
|
||||
--libero_raw_data_dir /path/to/libero/datasets/libero_90 \
|
||||
@@ -147,17 +122,16 @@ python regenerate_libero_dataset.py \
|
||||
|
||||
### Modify in `convert.sh`:
|
||||
|
||||
1. `--src-paths` accepts one or more directories containing `*.hdf5` LIBERO task files. To merge many suites into one LeRobot dataset, specify all source directories, for example `--src-paths /path/libero_10 /path/libero_90`.
|
||||
2. `--output-path` is the final aggregated LeRobot dataset root. Temporary per-task datasets are written next to it under `<output-name>_temp` and removed after aggregation.
|
||||
3. If you have installed `datatrove[ray]`, use `--executor ray` for faster conversion. Increase `--workers`, `--tasks-per-job`, and `--cpus-per-task` if you have enough CPU and memory.
|
||||
4. To resume a previous conversion, pass the existing DataTrove log directory with `--resume-dir /path/to/logs/...`.
|
||||
5. Use `--debug` for a small local smoke test. It converts only the first two tasks, forces local execution, and disables Hub upload.
|
||||
6. Use `--repo-id <namespace/name>` together with `--push-to-hub` to upload the aggregated dataset. Without `--push-to-hub`, `--repo-id` only controls the local aggregate repo id.
|
||||
1. If you have installed `datatrove[ray]`, we recommend using `ray` executor for faster conversion.
|
||||
2. Increase `workers` and `tasks-per-job` if you have sufficient computing resources.
|
||||
3. To merge many datasets into one, simply specify both paths like: `--src-paths /path/libero_10 /path/libero_90`
|
||||
4. To resume from a previous conversion, provide the appropriate log directory using `--resume-from-save` and `--resume-from-aggregate`
|
||||
5. If you want different image resolution, regenerate the trajectory, and change the [config](./libero_utils/config.py). (DO NOT use resize)
|
||||
|
||||
```bash
|
||||
python libero_h5.py \
|
||||
--src-paths /path/to/libero/datasets/libero_90_no_noops \
|
||||
--output-path /path/to/local/libero_90_lerobot \
|
||||
--src-paths /path/to/libero/ \
|
||||
--output-path /path/to/local \
|
||||
--executor local \
|
||||
--tasks-per-job 3 \
|
||||
--workers 10
|
||||
|
||||
+205
-131
@@ -1,127 +1,144 @@
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections.abc import Iterable, Sequence
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
from h5py import File
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(REPO_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(REPO_ROOT))
|
||||
|
||||
from generic_converter import BaseAdapter, ConversionTask, run_converter # noqa: E402
|
||||
import pandas as pd
|
||||
import ray
|
||||
from datatrove.executor import LocalPipelineExecutor, RayPipelineExecutor
|
||||
from datatrove.pipeline.base import PipelineStep
|
||||
from lerobot.datasets.aggregate import (
|
||||
aggregate_data,
|
||||
aggregate_metadata,
|
||||
aggregate_stats,
|
||||
aggregate_videos,
|
||||
validate_all_metadata,
|
||||
)
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
|
||||
from lerobot.datasets.utils import (
|
||||
DEFAULT_CHUNK_SIZE,
|
||||
DEFAULT_DATA_FILE_SIZE_IN_MB,
|
||||
DEFAULT_VIDEO_FILE_SIZE_IN_MB,
|
||||
write_info,
|
||||
write_stats,
|
||||
write_tasks,
|
||||
)
|
||||
from libero_utils.config import LIBERO_FEATURES
|
||||
from libero_utils.libero_utils import load_local_episodes
|
||||
from ray.runtime_env import RuntimeEnv
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
class LiberoAdapter(BaseAdapter):
|
||||
dataset_type = "libero"
|
||||
fps = 20
|
||||
robot_type = "franka"
|
||||
features = {
|
||||
"observation.images.image": {
|
||||
"dtype": "video",
|
||||
"shape": (256, 256, 3),
|
||||
"names": ["height", "width", "rgb"],
|
||||
},
|
||||
"observation.images.wrist_image": {
|
||||
"dtype": "video",
|
||||
"shape": (256, 256, 3),
|
||||
"names": ["height", "width", "rgb"],
|
||||
},
|
||||
"observation.state": {
|
||||
"dtype": "float32",
|
||||
"shape": (8,),
|
||||
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper", "gripper"]},
|
||||
},
|
||||
"observation.states.ee_state": {
|
||||
"dtype": "float32",
|
||||
"shape": (6,),
|
||||
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3"]},
|
||||
},
|
||||
"observation.states.joint_state": {
|
||||
"dtype": "float32",
|
||||
"shape": (7,),
|
||||
"names": {"motors": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]},
|
||||
},
|
||||
"observation.states.gripper_state": {
|
||||
"dtype": "float32",
|
||||
"shape": (2,),
|
||||
"names": {"motors": ["gripper", "gripper"]},
|
||||
},
|
||||
"action": {
|
||||
"dtype": "float32",
|
||||
"shape": (7,),
|
||||
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper"]},
|
||||
},
|
||||
}
|
||||
tags = ["libero", "franka"]
|
||||
def setup_logger():
|
||||
import sys
|
||||
|
||||
def __init__(self, src_paths: list[Path], output_path: Path):
|
||||
super().__init__(output_path)
|
||||
self.src_paths = src_paths
|
||||
from datatrove.utils.logging import logger
|
||||
|
||||
def load_tasks(self) -> list[ConversionTask]:
|
||||
tasks = []
|
||||
for src_path in self.src_paths:
|
||||
for input_h5 in src_path.glob("*.hdf5"):
|
||||
pattern1 = re.compile(r"_SCENE\d+_(.*?)_demo\.hdf5")
|
||||
pattern2 = re.compile(r"(.*?)_demo\.hdf5")
|
||||
logger.remove()
|
||||
logger.add(sys.stdout, level="INFO", colorize=True)
|
||||
return logger
|
||||
|
||||
match = pattern1.search(input_h5.name)
|
||||
if match is None:
|
||||
match = pattern2.search(input_h5.name)
|
||||
if match is None:
|
||||
continue
|
||||
else:
|
||||
task_instruction = match.group(1).replace("_", " ")
|
||||
|
||||
tasks.append(
|
||||
ConversionTask(
|
||||
input_path=input_h5.resolve(),
|
||||
output_path=(
|
||||
self.temp_output_path
|
||||
/ f"{src_path.name}"
|
||||
/ input_h5.stem
|
||||
).resolve(),
|
||||
local_repo_id=f"{input_h5.parent.name}/{input_h5.name}",
|
||||
metadata={"task": task_instruction},
|
||||
)
|
||||
)
|
||||
return tasks
|
||||
class SaveLerobotDataset(PipelineStep):
|
||||
name = "Save Temp LerobotDataset"
|
||||
type = "libero2lerobot"
|
||||
|
||||
def load_subset(self, task: ConversionTask) -> Iterable[Sequence[dict]]:
|
||||
input_h5 = task.input_path
|
||||
task_instruction = task.metadata.get("task")
|
||||
with File(input_h5, "r") as f:
|
||||
for demo in f["data"].values():
|
||||
demo_len = len(demo["obs/agentview_rgb"])
|
||||
# (-1: open, 1: close) -> (0: close, 1: open)
|
||||
action = np.array(demo["actions"])
|
||||
action = np.concatenate(
|
||||
[
|
||||
action[:, :6],
|
||||
(1 - np.clip(action[:, -1], 0, 1))[:, None],
|
||||
],
|
||||
axis=1,
|
||||
)
|
||||
state = np.concatenate(
|
||||
[
|
||||
np.array(demo["obs/ee_states"]),
|
||||
np.array(demo["obs/gripper_states"]),
|
||||
],
|
||||
axis=1,
|
||||
)
|
||||
episode = {
|
||||
"observation.images.image": np.array(demo["obs/agentview_rgb"]),
|
||||
"observation.images.wrist_image": np.array(demo["obs/eye_in_hand_rgb"]),
|
||||
"observation.state": np.array(state, dtype=np.float32),
|
||||
"observation.states.ee_state": np.array(demo["obs/ee_states"], dtype=np.float32),
|
||||
"observation.states.joint_state": np.array(demo["obs/joint_states"], dtype=np.float32),
|
||||
"observation.states.gripper_state": np.array(demo["obs/gripper_states"], dtype=np.float32),
|
||||
"action": np.array(action, dtype=np.float32),
|
||||
}
|
||||
yield [{**{k: v[i] for k, v in episode.items()}, "task": task_instruction} for i in range(demo_len)]
|
||||
def __init__(self, tasks: list[tuple[Path, Path, str]]):
|
||||
super().__init__()
|
||||
self.tasks = tasks
|
||||
|
||||
def run(self, data=None, rank: int = 0, world_size: int = 1):
|
||||
logger = setup_logger()
|
||||
|
||||
input_h5, output_path, task_instruction = self.tasks[rank]
|
||||
|
||||
if output_path.exists():
|
||||
shutil.rmtree(output_path)
|
||||
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id=f"{input_h5.parent.name}/{input_h5.name}",
|
||||
root=output_path,
|
||||
fps=20,
|
||||
robot_type="franka",
|
||||
features=LIBERO_FEATURES,
|
||||
)
|
||||
|
||||
logger.info(f"start processing for {input_h5}, saving to {output_path}")
|
||||
|
||||
raw_dataset = load_local_episodes(input_h5)
|
||||
for episode_index, episode_data in enumerate(raw_dataset):
|
||||
with self.track_time("saving episode"):
|
||||
for frame_data in episode_data:
|
||||
frame_data["task"] = task_instruction
|
||||
dataset.add_frame(frame_data)
|
||||
dataset.save_episode()
|
||||
logger.info(f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}")
|
||||
|
||||
|
||||
def create_aggr_dataset(raw_dirs: list[Path], aggregated_dir: Path):
|
||||
logger = setup_logger()
|
||||
|
||||
all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in raw_dirs]
|
||||
|
||||
fps, robot_type, features = validate_all_metadata(all_metadata)
|
||||
|
||||
if aggregated_dir.exists():
|
||||
shutil.rmtree(aggregated_dir)
|
||||
|
||||
aggr_meta = LeRobotDatasetMetadata.create(
|
||||
repo_id=f"{aggregated_dir.parent.name}/{aggregated_dir.name}",
|
||||
root=aggregated_dir,
|
||||
fps=fps,
|
||||
robot_type=robot_type,
|
||||
features=features,
|
||||
)
|
||||
|
||||
video_keys = [key for key in features if features[key]["dtype"] == "video"]
|
||||
unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique()
|
||||
aggr_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks)
|
||||
|
||||
meta_idx = {"chunk": 0, "file": 0}
|
||||
data_idx = {"chunk": 0, "file": 0}
|
||||
videos_idx = {key: {"chunk": 0, "file": 0, "latest_duration": 0, "episode_duration": 0} for key in video_keys}
|
||||
|
||||
aggr_meta.episodes = {}
|
||||
|
||||
for src_meta in tqdm(all_metadata, desc="Copy data and videos"):
|
||||
videos_idx = aggregate_videos(
|
||||
src_meta, aggr_meta, videos_idx, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE
|
||||
)
|
||||
data_idx = aggregate_data(src_meta, aggr_meta, data_idx, DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE)
|
||||
|
||||
meta_idx = aggregate_metadata(src_meta, aggr_meta, meta_idx, data_idx, videos_idx)
|
||||
|
||||
aggr_meta.info["total_episodes"] += src_meta.total_episodes
|
||||
aggr_meta.info["total_frames"] += src_meta.total_frames
|
||||
|
||||
logger.info("write tasks")
|
||||
write_tasks(aggr_meta.tasks, aggr_meta.root)
|
||||
|
||||
logger.info("write info")
|
||||
aggr_meta.info.update(
|
||||
{
|
||||
"total_tasks": len(aggr_meta.tasks),
|
||||
"total_episodes": sum(m.total_episodes for m in all_metadata),
|
||||
"total_frames": sum(m.total_frames for m in all_metadata),
|
||||
"splits": {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"},
|
||||
}
|
||||
)
|
||||
write_info(aggr_meta.info, aggr_meta.root)
|
||||
|
||||
logger.info("write stats")
|
||||
aggr_meta.stats = aggregate_stats([m.stats for m in all_metadata])
|
||||
write_stats(aggr_meta.stats, aggr_meta.root)
|
||||
|
||||
|
||||
def delete_temp_data(temp_dirs: list[Path]):
|
||||
logger = setup_logger()
|
||||
logger.info("Delete temp data_dir")
|
||||
for temp_dir in temp_dirs:
|
||||
shutil.rmtree(temp_dir)
|
||||
|
||||
|
||||
def main(
|
||||
@@ -131,25 +148,84 @@ def main(
|
||||
cpus_per_task: int,
|
||||
tasks_per_job: int,
|
||||
workers: int,
|
||||
resume_dir: Path | None = None,
|
||||
resume_dir: Path = None,
|
||||
debug: bool = False,
|
||||
repo_id: str | None = None,
|
||||
repo_id: str = None,
|
||||
push_to_hub: bool = False,
|
||||
):
|
||||
adapter = LiberoAdapter(src_paths, output_path)
|
||||
tasks = []
|
||||
pattern1 = re.compile(r"_SCENE\d+_(.*?)_demo\.hdf5")
|
||||
pattern2 = re.compile(r"(.*?)_demo\.hdf5")
|
||||
for src_path in src_paths:
|
||||
for input_h5 in src_path.glob("*.hdf5"):
|
||||
match = pattern1.search(input_h5.name)
|
||||
if match is None:
|
||||
match = pattern2.search(input_h5.name)
|
||||
if match is None:
|
||||
continue
|
||||
tasks.append(
|
||||
(
|
||||
input_h5,
|
||||
(output_path / (src_path.name + "_temp") / input_h5.stem).resolve(),
|
||||
match.group(1).replace("_", " "),
|
||||
)
|
||||
)
|
||||
if len(src_paths) > 1:
|
||||
aggregate_output_path = output_path / ("_".join([src_path.name for src_path in src_paths]) + "_aggregated_lerobot")
|
||||
else:
|
||||
aggregate_output_path = output_path / f"{src_paths[0].name}_lerobot"
|
||||
aggregate_output_path = aggregate_output_path.resolve()
|
||||
|
||||
run_converter(
|
||||
adapter=adapter,
|
||||
executor=executor,
|
||||
cpus_per_task=cpus_per_task,
|
||||
tasks_per_job=tasks_per_job,
|
||||
workers=workers,
|
||||
resume_dir=resume_dir,
|
||||
debug=debug,
|
||||
local_repo_id=repo_id,
|
||||
hub_repo_id=repo_id,
|
||||
push_to_hub=push_to_hub,
|
||||
)
|
||||
if debug:
|
||||
executor = "local"
|
||||
workers = 1
|
||||
tasks = tasks[:2]
|
||||
push_to_hub = False
|
||||
|
||||
match executor:
|
||||
case "local":
|
||||
workers = os.cpu_count() // cpus_per_task if workers == -1 else workers
|
||||
executor = LocalPipelineExecutor
|
||||
case "ray":
|
||||
runtime_env = RuntimeEnv(
|
||||
env_vars={
|
||||
"HDF5_USE_FILE_LOCKING": "FALSE",
|
||||
"HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE",
|
||||
"SVT_LOG": "1",
|
||||
},
|
||||
)
|
||||
ray.init(runtime_env=runtime_env)
|
||||
executor = RayPipelineExecutor
|
||||
case _:
|
||||
raise ValueError(f"Executor {executor} not supported")
|
||||
|
||||
executor_config = {
|
||||
"tasks": len(tasks),
|
||||
"workers": workers,
|
||||
**({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job} if executor is RayPipelineExecutor else {}),
|
||||
}
|
||||
|
||||
executor(pipeline=[SaveLerobotDataset(tasks)], **executor_config, logging_dir=resume_dir).run()
|
||||
create_aggr_dataset([task[1] for task in tasks], aggregate_output_path)
|
||||
delete_temp_data([task[1] for task in tasks])
|
||||
|
||||
for task in tasks:
|
||||
shutil.rmtree(task[1].parent, ignore_errors=True)
|
||||
|
||||
if push_to_hub:
|
||||
assert repo_id is not None
|
||||
tags = ["LeRobot", "libero", "franka"]
|
||||
tags.extend([src_path.name for src_path in src_paths])
|
||||
LeRobotDataset(
|
||||
repo_id=repo_id,
|
||||
root=aggregate_output_path,
|
||||
).push_to_hub(
|
||||
tags=tags,
|
||||
private=False,
|
||||
push_videos=True,
|
||||
license="apache-2.0",
|
||||
upload_large_folder=False,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -158,9 +234,7 @@ if __name__ == "__main__":
|
||||
parser.add_argument("--output-path", type=Path, required=True)
|
||||
parser.add_argument("--executor", type=str, choices=["local", "ray"], default="local")
|
||||
parser.add_argument("--cpus-per-task", type=int, default=1)
|
||||
parser.add_argument(
|
||||
"--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray"
|
||||
)
|
||||
parser.add_argument("--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray")
|
||||
parser.add_argument("--workers", type=int, default=-1, help="number of concurrent jobs to run")
|
||||
parser.add_argument("--resume-dir", type=Path, help="logs directory to resume")
|
||||
parser.add_argument("--debug", action="store_true")
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
LIBERO_FEATURES = {
|
||||
"observation.images.image": {
|
||||
"dtype": "video",
|
||||
"shape": (256, 256, 3),
|
||||
"names": ["height", "width", "rgb"],
|
||||
},
|
||||
"observation.images.wrist_image": {
|
||||
"dtype": "video",
|
||||
"shape": (256, 256, 3),
|
||||
"names": ["height", "width", "rgb"],
|
||||
},
|
||||
"observation.state": {
|
||||
"dtype": "float32",
|
||||
"shape": (8,),
|
||||
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper", "gripper"]},
|
||||
},
|
||||
"observation.states.ee_state": {
|
||||
"dtype": "float32",
|
||||
"shape": (6,),
|
||||
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3"]},
|
||||
},
|
||||
"observation.states.joint_state": {
|
||||
"dtype": "float32",
|
||||
"shape": (7,),
|
||||
"names": {"motors": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]},
|
||||
},
|
||||
"observation.states.gripper_state": {
|
||||
"dtype": "float32",
|
||||
"shape": (2,),
|
||||
"names": {"motors": ["gripper", "gripper"]},
|
||||
},
|
||||
"action": {
|
||||
"dtype": "float32",
|
||||
"shape": (7,),
|
||||
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper"]},
|
||||
},
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
from h5py import File
|
||||
|
||||
|
||||
def load_local_episodes(input_h5: Path):
|
||||
with File(input_h5, "r") as f:
|
||||
for demo in f["data"].values():
|
||||
demo_len = len(demo["obs/agentview_rgb"])
|
||||
# (-1: open, 1: close) -> (0: close, 1: open)
|
||||
action = np.array(demo["actions"])
|
||||
action = np.concatenate(
|
||||
[
|
||||
action[:, :6],
|
||||
(1 - np.clip(action[:, -1], 0, 1))[:, None],
|
||||
],
|
||||
axis=1,
|
||||
)
|
||||
state = np.concatenate(
|
||||
[
|
||||
np.array(demo["obs/ee_states"]),
|
||||
np.array(demo["obs/gripper_states"]),
|
||||
],
|
||||
axis=1,
|
||||
)
|
||||
episode = {
|
||||
"observation.images.image": np.array(demo["obs/agentview_rgb"]),
|
||||
"observation.images.wrist_image": np.array(demo["obs/eye_in_hand_rgb"]),
|
||||
"observation.state": np.array(state, dtype=np.float32),
|
||||
"observation.states.ee_state": np.array(demo["obs/ee_states"], dtype=np.float32),
|
||||
"observation.states.joint_state": np.array(demo["obs/joint_states"], dtype=np.float32),
|
||||
"observation.states.gripper_state": np.array(demo["obs/gripper_states"], dtype=np.float32),
|
||||
"action": np.array(action, dtype=np.float32),
|
||||
}
|
||||
yield [{**{k: v[i] for k, v in episode.items()}} for i in range(demo_len)]
|
||||
@@ -1,144 +0,0 @@
|
||||
# ROBOCASA TO LEROBOT
|
||||
|
||||
## ROBOCASA installation
|
||||
|
||||
- Clone this repo: https://github.com/robocasa/robocasa
|
||||
- Follow README.md to install packages and download assets
|
||||
|
||||
## Data Preparation
|
||||
|
||||
- Check files: `robocasa/scripts/download_datasets.py`, `robocasa/utils/dataset_registry.py`
|
||||
- Download original datasets by python scripts or wget/curl (recommended)
|
||||
|
||||
## Example:
|
||||
|
||||
```bash
|
||||
wget https://utexas.box.com/shared/static/7y9csrcx6uhhq4p3yctmm2df3rjqpw6g.hdf5 -O PnPCounterToCab.hdf5
|
||||
```
|
||||
|
||||
- Extract subset data: Original hdf5 files contain about 3000 episodes. However, it contains a key "masks", which contain list of subset demo_ids. For example: 30_demos : `[demo123, demo234, demo345, etc.]`.Run the code in the notebook to extract only chosen subset demos, which is much smaller and easier for later processes.
|
||||
|
||||
## Code Modification
|
||||
|
||||
- Add functions in `camera_utils.py` to your `robosuite/robosuite/utils/camera_utils.py` for camera parameters extraction (May be useful for experiments which requires multiview rendering)
|
||||
|
||||
- Change args to render depth and segmentation masks for new regenerated dataset. Change in `robocasa/environments/kitchen/kitchen.py`
|
||||
|
||||
```python
|
||||
|
||||
class Kitchen(ManipulationEnv, metaclass=KitchenEnvMeta):
|
||||
...
|
||||
EXCLUDE_LAYOUTS = []
|
||||
def __init__(
|
||||
self,
|
||||
robots,
|
||||
env_configuration="default",
|
||||
controller_configs=None,
|
||||
gripper_types="default",
|
||||
base_types="default",
|
||||
initialization_noise="default",
|
||||
use_camera_obs=True,
|
||||
use_object_obs=True, # currently unused variable
|
||||
reward_scale=1.0, # currently unused variable
|
||||
reward_shaping=False, # currently unused variables
|
||||
placement_initializer=None,
|
||||
has_renderer=False,
|
||||
has_offscreen_renderer=True,
|
||||
render_camera="robot0_agentview_center",
|
||||
render_collision_mesh=False,
|
||||
render_visual_mesh=True,
|
||||
render_gpu_device_id=-1,
|
||||
control_freq=20,
|
||||
horizon=1000,
|
||||
ignore_done=True,
|
||||
hard_reset=True,
|
||||
camera_names="agentview",
|
||||
camera_heights=256,
|
||||
camera_widths=256,
|
||||
camera_depths=False, # -> True
|
||||
renderer="mjviewer",
|
||||
renderer_config=None,
|
||||
init_robot_base_pos=None,
|
||||
seed=None,
|
||||
layout_and_style_ids=None,
|
||||
layout_ids=None,
|
||||
style_ids=None,
|
||||
scene_split=None, # unsued, for backwards compatibility
|
||||
generative_textures=None,
|
||||
obj_registries=("objaverse",),
|
||||
obj_instance_split=None,
|
||||
use_distractors=False,
|
||||
translucent_robot=False,
|
||||
randomize_cameras=False,
|
||||
camera_segmentations="instance", # add camera segmentation here: semantic/instance/element
|
||||
):
|
||||
...
|
||||
|
||||
super().__init__(
|
||||
robots=robots,
|
||||
env_configuration=env_configuration,
|
||||
controller_configs=controller_configs,
|
||||
base_types=base_types,
|
||||
gripper_types=gripper_types,
|
||||
initialization_noise=initialization_noise,
|
||||
use_camera_obs=use_camera_obs,
|
||||
has_renderer=has_renderer,
|
||||
has_offscreen_renderer=has_offscreen_renderer,
|
||||
render_camera=render_camera,
|
||||
render_collision_mesh=render_collision_mesh,
|
||||
render_visual_mesh=render_visual_mesh,
|
||||
render_gpu_device_id=render_gpu_device_id,
|
||||
control_freq=control_freq,
|
||||
lite_physics=True,
|
||||
horizon=horizon,
|
||||
ignore_done=ignore_done,
|
||||
hard_reset=hard_reset,
|
||||
camera_names=camera_names,
|
||||
camera_heights=camera_heights,
|
||||
camera_widths=camera_widths,
|
||||
camera_depths=camera_depths,
|
||||
camera_segmentations=camera_segmentations, # add camera segmentation here
|
||||
renderer=renderer,
|
||||
renderer_config=renderer_config,
|
||||
seed=seed,
|
||||
)
|
||||
```
|
||||
|
||||
## Regenerate
|
||||
|
||||
- Check file: `regenerate.py`
|
||||
- Original dataset contain image in 128x128 resolution and does not contain segmentation mask, depth, etc. We need to rerender it in 256x256 and save segmentation mask, and depth
|
||||
- Overall re-render flow:
|
||||
- (1) load hdf5 file and create env
|
||||
- (2) reset env to first state in the dataset
|
||||
- (3) Execute action in action label of original dataset, at each step, we collect observation data, camera parameters, state, etc. from simulation.
|
||||
- (4) Save only successful episode to new hdf5 file (original data contain unsuccessful episode or wrong action)
|
||||
- Change `origin_dir` and `regenerate_dir` to your dir in `regenerate.py` then run `python regenerate.py` to regenerate
|
||||
|
||||
## Get started
|
||||
|
||||
1. Download source code:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/Tavish9/any4lerobot.git
|
||||
```
|
||||
|
||||
2. Modify path in `convert.sh`:
|
||||
|
||||
```bash
|
||||
python robocasa_h5.py \
|
||||
--raw-dir /path/to/your/hdf5/files \
|
||||
--repo-id your_hf_id \
|
||||
--local-dir /path/to/your/output/dataset
|
||||
```
|
||||
|
||||
3. Execute the script:
|
||||
|
||||
```bash
|
||||
bash convert.sh
|
||||
```
|
||||
|
||||
## Example output datasets:
|
||||
|
||||
- ROBOCASA 100 demos: https://huggingface.co/datasets/binhng/robocasa_merged_24_tasks_100demos_v1
|
||||
- ROBOCASA 30 demos: https://huggingface.co/datasets/binhng/robocasa_merged_24_tasks_30demos_v3
|
||||
@@ -1,4 +0,0 @@
|
||||
python robocasa_h5.py \
|
||||
--raw-dir /path/to/your/hdf5/files \
|
||||
--repo-id your_hf_id \
|
||||
--local-dir /path/to/your/output/dataset
|
||||
@@ -1,107 +0,0 @@
|
||||
import argparse
|
||||
import json
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import h5py
|
||||
import numpy as np
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
def main(raw_dir: Path, repo_id: str, local_dir: Path):
|
||||
if local_dir.exists():
|
||||
shutil.rmtree(local_dir)
|
||||
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id=repo_id,
|
||||
robot_type="PandaOmron",
|
||||
root=local_dir,
|
||||
fps=20,
|
||||
features={
|
||||
"observation.images.robot0_agentview_right": {
|
||||
"dtype": "video",
|
||||
"shape": (256, 256, 3),
|
||||
"names": ["height", "width", "channel"],
|
||||
},
|
||||
"observation.images.robot0_agentview_left": {
|
||||
"dtype": "video",
|
||||
"shape": (256, 256, 3),
|
||||
"names": ["height", "width", "channel"],
|
||||
},
|
||||
"observation.images.robot0_eye_in_hand": {
|
||||
"dtype": "video",
|
||||
"shape": (256, 256, 3),
|
||||
"names": ["height", "width", "channel"],
|
||||
},
|
||||
"observation.state": {
|
||||
"dtype": "float32",
|
||||
"shape": (9,),
|
||||
"names": ["state"],
|
||||
},
|
||||
"action": {
|
||||
"dtype": "float32",
|
||||
"shape": (12,),
|
||||
"names": ["actions"],
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
for dataset_path in raw_dir.glob("**/*.hdf5"):
|
||||
with h5py.File(dataset_path, "r") as raw_dataset:
|
||||
demos = raw_dataset["data"].keys()
|
||||
for demo in tqdm(demos):
|
||||
demo_length = len(raw_dataset["data"][demo]["actions"])
|
||||
demo_data = raw_dataset["data"][demo]
|
||||
|
||||
left_images = demo_data["obs"]["robot0_agentview_left_image"][:]
|
||||
right_images = demo_data["obs"]["robot0_agentview_right_image"][:]
|
||||
wrist_images = demo_data["obs"]["robot0_eye_in_hand_image"][:]
|
||||
states = np.concatenate(
|
||||
(
|
||||
demo_data["obs"]["robot0_base_to_eef_pos"][:],
|
||||
demo_data["obs"]["robot0_base_to_eef_quat"][:],
|
||||
demo_data["obs"]["robot0_gripper_qpos"][:],
|
||||
),
|
||||
axis=1,
|
||||
)
|
||||
actions = demo_data["actions"][:]
|
||||
for i in range(demo_length):
|
||||
ep_meta = demo_data.attrs["ep_meta"]
|
||||
ep_meta = json.loads(ep_meta)
|
||||
lang = ep_meta["lang"]
|
||||
dataset.add_frame(
|
||||
{
|
||||
"observation.images.robot0_agentview_right": right_images[i],
|
||||
"observation.images.robot0_agentview_left": left_images[i],
|
||||
"observation.images.robot0_eye_in_hand": wrist_images[i],
|
||||
"observation.state": states[i].astype(np.float32),
|
||||
"action": actions[i].astype(np.float32),
|
||||
"task": lang,
|
||||
},
|
||||
)
|
||||
dataset.save_episode()
|
||||
dataset.finalize()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--raw-dir",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--local-dir",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="When provided, writes the dataset converted to LeRobotDataset format in this directory (e.g. `data/lerobot/aloha_mobile_chair`).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
main(raw_dir=args.raw_dir, repo_id=args.repo_id, local_dir=args.local_dir)
|
||||
@@ -1,78 +0,0 @@
|
||||
import numpy as np
|
||||
import robosuite.utils.transform_utils as T
|
||||
|
||||
|
||||
def get_camera_intrinsic_matrix(sim, camera_name, camera_height, camera_width):
|
||||
"""
|
||||
Obtains camera intrinsic matrix.
|
||||
|
||||
Args:
|
||||
sim (MjSim): simulator instance
|
||||
camera_name (str): name of camera
|
||||
camera_height (int): height of camera images in pixels
|
||||
camera_width (int): width of camera images in pixels
|
||||
Return:
|
||||
K (np.array): 3x3 camera matrix
|
||||
"""
|
||||
cam_id = sim.model.camera_name2id(camera_name)
|
||||
fovy = sim.model.cam_fovy[cam_id]
|
||||
f = 0.5 * camera_height / np.tan(fovy * np.pi / 360)
|
||||
K = np.array([[f, 0, camera_width / 2], [0, f, camera_height / 2], [0, 0, 1]])
|
||||
return K
|
||||
|
||||
|
||||
def get_camera_extrinsic_matrix(sim, camera_name):
|
||||
"""
|
||||
Returns a 4x4 homogenous matrix corresponding to the camera pose in the
|
||||
world frame. MuJoCo has a weird convention for how it sets up the
|
||||
camera body axis, so we also apply a correction so that the x and y
|
||||
axis are along the camera view and the z axis points along the
|
||||
viewpoint.
|
||||
Normal camera convention: https://docs.opencv.org/2.4/modules/calib3d/doc/camera_calibration_and_3d_reconstruction.html
|
||||
|
||||
Args:
|
||||
sim (MjSim): simulator instance
|
||||
camera_name (str): name of camera
|
||||
Return:
|
||||
R (np.array): 4x4 camera extrinsic matrix
|
||||
"""
|
||||
cam_id = sim.model.camera_name2id(camera_name)
|
||||
camera_pos = sim.data.cam_xpos[cam_id]
|
||||
camera_rot = sim.data.cam_xmat[cam_id].reshape(3, 3)
|
||||
R = T.make_pose(camera_pos, camera_rot)
|
||||
|
||||
# IMPORTANT! This is a correction so that the camera axis is set up along the viewpoint correctly.
|
||||
camera_axis_correction = np.array(
|
||||
[[1.0, 0.0, 0.0, 0.0], [0.0, -1.0, 0.0, 0.0], [0.0, 0.0, -1.0, 0.0], [0.0, 0.0, 0.0, 1.0]]
|
||||
)
|
||||
R = R @ camera_axis_correction
|
||||
return R
|
||||
|
||||
def get_camera_extrinsic_matrix_rel(sim, camera_name):
|
||||
"""
|
||||
Returns a 4x4 homogenous matrix corresponding to the camera pose in the
|
||||
world frame. MuJoCo has a weird convention for how it sets up the
|
||||
camera body axis, so we also apply a correction so that the x and y
|
||||
axis are along the camera view and the z axis points along the
|
||||
viewpoint.
|
||||
Normal camera convention: https://docs.opencv.org/2.4/modules/calib3d/doc/camera_calibration_and_3d_reconstruction.html
|
||||
|
||||
Args:
|
||||
sim (MjSim): simulator instance
|
||||
camera_name (str): name of camera
|
||||
Return:
|
||||
R (np.array): 4x4 camera extrinsic matrix
|
||||
"""
|
||||
cam_id = sim.model.camera_name2id(camera_name)
|
||||
camera_pos = sim.model.cam_pos[cam_id]
|
||||
camera_quat = sim.model.cam_quat[cam_id]
|
||||
camera_rot = T.quat2mat(camera_quat)
|
||||
R = T.make_pose(camera_pos, camera_rot)
|
||||
|
||||
# IMPORTANT! This is a correction so that the camera axis is set up along the viewpoint correctly.
|
||||
camera_axis_correction = np.array(
|
||||
[[1.0, 0.0, 0.0, 0.0], [0.0, -1.0, 0.0, 0.0], [0.0, 0.0, -1.0, 0.0], [0.0, 0.0, 0.0, 1.0]]
|
||||
)
|
||||
R = R @ camera_axis_correction
|
||||
return R
|
||||
|
||||
@@ -1,70 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "44b6da09",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Extract subset data \n",
|
||||
"\n",
|
||||
"Original hdf5 file contains about 3000 episodes. However, it contains a key \"masks\", which contain list of subset demo_ids. For example: 30_demos : [demo123, demo234, demo 345, etc.]\n",
|
||||
"\n",
|
||||
"Run the code bellow to extract only chosen subset demos, which is much smaller and easier for later process."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "6ac64550",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import h5py\n",
|
||||
"\n",
|
||||
"DATA_DIR=\"direction/to/your/hdf5/files/\"\n",
|
||||
"# E.x: DATA_DIR=\"/projects/extern/kisski/kisski-spath/dir.project/VLA_3D/binh/robocasa/test\"\n",
|
||||
"\n",
|
||||
"# file_name = \"PnPCabToCounter.hdf5\"\n",
|
||||
"# file_name = \"PnPCounterToCab.hdf5\"\n",
|
||||
"# file_name = \"CoffeeSetupMug.hdf5\"\n",
|
||||
"# file_name = \"TurnOnMicrowave.hdf5\"\n",
|
||||
"file_name = \"TurnOffStove.hdf5\"\n",
|
||||
"\n",
|
||||
"file_path = DATA_DIR + \"/\" + file_name\n",
|
||||
"\n",
|
||||
"f = h5py.File(file_path, 'r')\n",
|
||||
"chosen_demo_list = []\n",
|
||||
"for i in f['mask']['100_demos'][:]: # or \"30_demos\"\n",
|
||||
" chosen_demo_list.append(i.decode('utf-8'))\n",
|
||||
" \n",
|
||||
"chosen_data = []\n",
|
||||
"for k in f['data'].keys():\n",
|
||||
" if k in chosen_demo_list:\n",
|
||||
" chosen_data.append(f['data'][k])\n",
|
||||
" \n",
|
||||
"with h5py.File(f\"direction_to_your_new_extracted_subset/{file_name}\", \"w\") as out:\n",
|
||||
" out_data = out.create_group(\"data\")\n",
|
||||
" \n",
|
||||
" for key, val in f['data'].attrs.items():\n",
|
||||
" out_data.attrs[key] = val # IMPORTANT: set attributes for new hdf5 files (need for reset env and later re-render)\n",
|
||||
"\n",
|
||||
" for grp in chosen_data:\n",
|
||||
" name = grp.name.split(\"/\")[-1] # demo_xxx\n",
|
||||
" grp.file.copy(grp, out_data, name=name)"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "robocasa",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"name": "python",
|
||||
"version": "3.10.19"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
||||
@@ -1,277 +0,0 @@
|
||||
import json
|
||||
import os
|
||||
|
||||
import h5py
|
||||
import numpy as np
|
||||
import robosuite
|
||||
from robocasa.scripts.playback_dataset import reset_to
|
||||
from robosuite.utils.camera_utils import (
|
||||
get_camera_extrinsic_matrix,
|
||||
get_camera_extrinsic_matrix_rel,
|
||||
get_camera_intrinsic_matrix,
|
||||
)
|
||||
from tqdm import tqdm
|
||||
|
||||
ROBOCASA_DUMMY_ACTION = [0.0] * 6 + [-1.0] + [0.0] * 4 + [-1.0]
|
||||
|
||||
|
||||
def get_camera_info(sim, camera_name, camera_height, camera_width):
|
||||
camera_intrinsics = get_camera_intrinsic_matrix(sim, camera_name, camera_height, camera_width)
|
||||
camera_extrinsics = get_camera_extrinsic_matrix(sim, camera_name)
|
||||
|
||||
return camera_intrinsics, camera_extrinsics
|
||||
|
||||
|
||||
def creat_env_from_hdf5(f):
|
||||
env_meta = json.loads(f["data"].attrs["env_args"])
|
||||
env_meta["env_kwargs"]["camera_depths"] = True
|
||||
env_meta["env_kwargs"]["camera_heights"] = 256
|
||||
env_meta["env_kwargs"]["camera_widths"] = 256
|
||||
env_meta["env_kwargs"]["camera_segmentations"] = "element" # element' #'instance'
|
||||
# f.close()
|
||||
|
||||
env_kwargs = env_meta["env_kwargs"]
|
||||
env_kwargs["env_name"] = env_meta["env_name"]
|
||||
env_kwargs["has_renderer"] = False
|
||||
env_kwargs["renderer"] = "mjviewer"
|
||||
env_kwargs["has_offscreen_renderer"] = True # write_video
|
||||
env_kwargs["use_camera_obs"] = True
|
||||
env_kwargs["ignore_done"] = False
|
||||
|
||||
env = robosuite.make(**env_kwargs)
|
||||
|
||||
return env, env_meta
|
||||
|
||||
|
||||
def reset_each_demo(env, demo):
|
||||
# demo = f["data"]["demo_<idx>"]
|
||||
model_xml = demo.attrs["model_file"]
|
||||
init_state = demo["states"][()][0]
|
||||
ep_meta = demo.attrs["ep_meta"]
|
||||
|
||||
state = {"states": init_state, "model": model_xml, "ep_meta": ep_meta}
|
||||
reset_to(env, state)
|
||||
|
||||
|
||||
def process_1_demo(env, f, demo_id, grp):
|
||||
demo = f["data"][demo_id]
|
||||
reset_each_demo(env, demo)
|
||||
|
||||
ep_meta = env.get_ep_meta()
|
||||
model_file = env.model.get_xml()
|
||||
|
||||
for _ in range(10):
|
||||
obs, reward, done, info = env.step(ROBOCASA_DUMMY_ACTION)
|
||||
|
||||
obs_keys = list(obs.keys())
|
||||
obs_keys += [
|
||||
"robot0_agentview_left_intrinsics",
|
||||
"robot0_agentview_right_intrinsics",
|
||||
"robot0_eye_in_hand_intrinsics",
|
||||
]
|
||||
obs_keys += [
|
||||
"robot0_agentview_left_extrinsics",
|
||||
"robot0_agentview_right_extrinsics",
|
||||
"robot0_eye_in_hand_extrinsics",
|
||||
]
|
||||
obs_keys += [
|
||||
"robot0_agentview_left_extrinsicsR",
|
||||
"robot0_agentview_right_extrinsicsR",
|
||||
"robot0_eye_in_hand_extrinsicsR",
|
||||
]
|
||||
obs_keys += ["robot0_agentview_left_depthW", "robot0_agentview_right_depthW", "robot0_eye_in_hand_depthW"]
|
||||
|
||||
obs_dict = {key: [] for key in obs_keys}
|
||||
# action_dict = {key: [] for key in act_keys}
|
||||
actions = []
|
||||
actions_abs = []
|
||||
rewards = []
|
||||
dones = []
|
||||
states = [] # env state, not robot. The state for robot is included in obs
|
||||
|
||||
# for key in obs_keys:
|
||||
# obs_dict[key] = obs[key]
|
||||
orig_actions = demo["actions"][()]
|
||||
orig_actions_abs = demo["actions_abs"][()]
|
||||
# orig_action_dict = demo['action_dict']
|
||||
|
||||
for i, action in enumerate(orig_actions):
|
||||
# for i, action in enumerate(orig_actions_abs):
|
||||
extent = env.sim.model.stat.extent
|
||||
far = env.sim.model.vis.map.zfar * extent
|
||||
near = env.sim.model.vis.map.znear * extent
|
||||
left_depth = obs["robot0_agentview_left_depth"].copy()
|
||||
right_depth = obs["robot0_agentview_right_depth"].copy()
|
||||
wrist_depth = obs["robot0_eye_in_hand_depth"].copy()
|
||||
left_depth = near / (1.0 - left_depth * (1.0 - near / far))[::-1]
|
||||
right_depth = near / (1.0 - right_depth * (1.0 - near / far))[::-1]
|
||||
wrist_depth = near / (1.0 - wrist_depth * (1.0 - near / far))[::-1]
|
||||
|
||||
obs["robot0_agentview_left_depthW"] = left_depth
|
||||
obs["robot0_agentview_right_depthW"] = right_depth
|
||||
obs["robot0_eye_in_hand_depthW"] = wrist_depth
|
||||
|
||||
left_intrinsics, left_extrinsics = get_camera_info(env.sim, "robot0_agentview_left", 256, 256)
|
||||
right_intrinsics, right_extrinsics = get_camera_info(env.sim, "robot0_agentview_right", 256, 256)
|
||||
wrist_intrinsics, wrist_extrinsics = get_camera_info(env.sim, "robot0_eye_in_hand", 256, 256)
|
||||
|
||||
obs["robot0_agentview_left_intrinsics"] = left_intrinsics
|
||||
obs["robot0_agentview_right_intrinsics"] = right_intrinsics
|
||||
obs["robot0_eye_in_hand_intrinsics"] = wrist_intrinsics
|
||||
obs["robot0_agentview_left_extrinsics"] = left_extrinsics
|
||||
obs["robot0_agentview_right_extrinsics"] = right_extrinsics
|
||||
obs["robot0_eye_in_hand_extrinsics"] = wrist_extrinsics
|
||||
|
||||
left_intrinsics_rel = get_camera_extrinsic_matrix_rel(env.sim, "robot0_agentview_left")
|
||||
right_intrinsics_rel = get_camera_extrinsic_matrix_rel(env.sim, "robot0_agentview_right")
|
||||
wrist_intrinsics_rel = get_camera_extrinsic_matrix_rel(env.sim, "robot0_eye_in_hand")
|
||||
|
||||
obs["robot0_agentview_left_extrinsicsR"] = left_intrinsics_rel
|
||||
obs["robot0_agentview_right_extrinsicsR"] = right_intrinsics_rel
|
||||
obs["robot0_eye_in_hand_extrinsicsR"] = wrist_intrinsics_rel
|
||||
|
||||
# append all keys
|
||||
for key in obs_keys:
|
||||
if (
|
||||
("eye_in_hand" in key or "agentview" in key)
|
||||
and "depthW" not in key
|
||||
and "intrinsics" not in key
|
||||
and "extrinsics" not in key
|
||||
):
|
||||
obs_dict[key].append(obs[key][::-1, :, :])
|
||||
else:
|
||||
obs_dict[key].append(obs[key])
|
||||
|
||||
# for key in act_keys:
|
||||
# action_dict[key].append(orig_action_dict[key][i])
|
||||
|
||||
actions.append(action)
|
||||
actions_abs.append(orig_actions_abs[i])
|
||||
|
||||
rewards.append(reward)
|
||||
dones.append(done)
|
||||
|
||||
current_state = env.sim.get_state().flatten()
|
||||
states.append(current_state)
|
||||
|
||||
# step env
|
||||
obs, reward, done, info = env.step(action.tolist())
|
||||
|
||||
done = done or env._check_success()
|
||||
# if done:
|
||||
# print(f" Step {i} done: {done}")
|
||||
# print(f" Step {i} info: {info}")
|
||||
# print(f" Step {i} is_success: {env._check_success()}" )
|
||||
|
||||
# save successful episode only
|
||||
if done:
|
||||
print(f"Demo {demo_id} is done after {i} actions! -> SAVE!!!")
|
||||
|
||||
# save to new hdf5 file here
|
||||
ep_data = grp.create_group(demo_id)
|
||||
# set attribute for ep_data here ...
|
||||
ep_data.attrs["model_file"] = model_file
|
||||
ep_data.attrs["ep_meta"] = json.dumps(ep_meta, indent=4)
|
||||
|
||||
# obs group
|
||||
obs_grp = ep_data.create_group("obs")
|
||||
for key in obs_keys:
|
||||
obs_grp.create_dataset(key, data=np.stack(obs_dict[key], axis=0))
|
||||
|
||||
# actions dataset
|
||||
ep_data.create_dataset("actions", data=np.stack(actions, axis=0))
|
||||
|
||||
ep_data.create_dataset("actions_abs", data=np.stack(actions_abs, axis=0))
|
||||
|
||||
ep_data.create_dataset("dones", data=np.stack(dones, axis=0))
|
||||
|
||||
ep_data.create_dataset("rewards", data=np.stack(rewards, axis=0))
|
||||
|
||||
# state dataset
|
||||
ep_data.create_dataset("states", data=np.stack(states, axis=0))
|
||||
|
||||
elif not done:
|
||||
print(f"Demo {demo_id} not done after all actions executed! -> does not SAVE!")
|
||||
|
||||
|
||||
def regenerate_hdf5_dataset(input_path, output_path, debug=False):
|
||||
f = h5py.File(input_path, "r")
|
||||
env, env_meta = creat_env_from_hdf5(f)
|
||||
|
||||
out_f = h5py.File(output_path, "w")
|
||||
out_f.attrs["env_args"] = json.dumps(env_meta)
|
||||
|
||||
grp = out_f.create_group("data")
|
||||
|
||||
all_demo_ids = list(f["data"].keys())
|
||||
if debug:
|
||||
all_demo_ids = all_demo_ids[: min(2, len(all_demo_ids))]
|
||||
for demo_id in tqdm(all_demo_ids):
|
||||
print(f"Processing {demo_id} ...")
|
||||
process_1_demo(env, f, demo_id, grp)
|
||||
|
||||
f.close()
|
||||
if len(out_f["data"].keys()) == 0:
|
||||
print("No demos were processed successfully. Deleting output file.")
|
||||
out_f.close()
|
||||
os.remove(output_path)
|
||||
else:
|
||||
print(f"Processed data saved {len(out_f['data'].keys())} demos to {output_path}")
|
||||
out_f.close()
|
||||
|
||||
|
||||
def process_task_wrapper(args):
|
||||
"""Wrapper function for multiprocessing to process a single task."""
|
||||
task, origin_dir, regenerate_dir, debug = args
|
||||
input_path = os.path.join(origin_dir, f"{task}.hdf5")
|
||||
output_path = os.path.join(regenerate_dir, f"{task}.hdf5")
|
||||
|
||||
print(f"Regenerating dataset for task {task} ...")
|
||||
regenerate_hdf5_dataset(input_path, output_path, debug=debug)
|
||||
print(f"Completed task {task}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
n_demo = 100 # 100
|
||||
origin_dir = f"<directory/contain/original/hdf5/files/>"
|
||||
regenerate_dir = f"<directory/contain/regenerated/hdf5/files/>"
|
||||
os.makedirs(regenerate_dir, exist_ok=True)
|
||||
|
||||
task_list = [
|
||||
"PnPCabToCounter",
|
||||
"PnPCounterToCab",
|
||||
"CoffeeSetupMug",
|
||||
"TurnOffStove",
|
||||
"TurnOnMicrowave",
|
||||
# ... add other tasks as needed
|
||||
# "CoffeePressButton",
|
||||
# "CoffeeServeMug",
|
||||
# "TurnOffMicrowave",
|
||||
# "TurnOffSinkFaucet",
|
||||
# "TurnOnSinkFaucet",
|
||||
# "TurnOnStove",
|
||||
# "TurnSinkSpout"
|
||||
# "CloseDoubleDoor",
|
||||
# "CloseDrawer",
|
||||
# "CloseSingleDoor",
|
||||
# "OpenDoubleDoor",
|
||||
# "OpenDrawer",
|
||||
# "OpenSingleDoor"
|
||||
# "PnPCounterToMicrowave",
|
||||
# "PnPCounterToSink",
|
||||
# "PnPCounterToStove",
|
||||
# "PnPMicrowaveToCounter",
|
||||
# "PnPSinkToCounter",
|
||||
# "PnPStoveToCounter"
|
||||
] # 24 tasks in robocasa kitchen dataset
|
||||
|
||||
debug = False
|
||||
if debug:
|
||||
task_list = task_list[:2]
|
||||
|
||||
for task in task_list:
|
||||
input_path = os.path.join(origin_dir, f"{task}.hdf5")
|
||||
output_path = os.path.join(regenerate_dir, f"{task}.hdf5")
|
||||
|
||||
print(f"Regenerating dataset for task {task} ...")
|
||||
regenerate_hdf5_dataset(input_path, output_path, debug=False)
|
||||
@@ -182,8 +182,6 @@ Dataset Structure of `meta/info.json`:
|
||||
> │ └── h5_ur_1rgb.json
|
||||
> └── RoboMIND_v1_2_instr.csv
|
||||
> ```
|
||||
>
|
||||
> 3. Due the format mismatch in simulation dataset, this script skips `simulation`, `sim_franka_3rgb`, `sim_tienkung_1rgb`.
|
||||
|
||||
> [!NOTE]
|
||||
> The conversion speed of this script is limited by the performance of the physical machine running it, including **CPU cores and memory**. We recommend using **2 CPU cores per task** for optimal performance. However, each task requires approximately 10 GiB of memory. To avoid running out of memory, you may need to increase the number of CPU cores per task depending on your system’s available memory.
|
||||
|
||||
+98
-156
@@ -1,6 +1,5 @@
|
||||
import argparse
|
||||
import concurrent.futures
|
||||
import inspect
|
||||
import gc
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
@@ -8,15 +7,11 @@ from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import ray
|
||||
from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata
|
||||
from lerobot.datasets.compute_stats import aggregate_stats
|
||||
from lerobot.datasets.dataset_writer import DatasetWriter, _encode_video_worker
|
||||
from lerobot.datasets.feature_utils import validate_episode_buffer
|
||||
from lerobot.datasets.io_utils import write_info, write_stats
|
||||
from lerobot.datasets.utils import DEFAULT_EPISODES_PATH, flatten_dict
|
||||
from lerobot.datasets.lerobot_dataset import VALID_VIDEO_CODECS, LeRobotDataset, LeRobotDatasetMetadata
|
||||
from lerobot.datasets.utils import flatten_dict, validate_episode_buffer, write_info, write_stats
|
||||
from lerobot.datasets.video_utils import get_safe_default_codec
|
||||
from ray.runtime_env import RuntimeEnv
|
||||
from robomind_uitls.configs import ROBOMIND_CONFIG
|
||||
from robomind_uitls.lerobot_uitls import compute_episode_stats, generate_features_from_config
|
||||
@@ -26,38 +21,6 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(
|
||||
|
||||
|
||||
class RoboMINDDatasetMetadata(LeRobotDatasetMetadata):
|
||||
def _flush_metadata_buffer(self) -> None:
|
||||
"""Write all buffered episode metadata to parquet file."""
|
||||
if not hasattr(self, "_metadata_buffer") or len(self._metadata_buffer) == 0:
|
||||
return
|
||||
|
||||
combined_dict = {}
|
||||
for episode_dict in self._metadata_buffer:
|
||||
for key, value in episode_dict.items():
|
||||
if key not in combined_dict:
|
||||
combined_dict[key] = []
|
||||
# Extract value and serialize numpy arrays
|
||||
# because PyArrow's from_pydict function doesn't support numpy arrays
|
||||
val = value[0] if isinstance(value, list) else value
|
||||
combined_dict[key].append(val.tolist() if isinstance(val, np.ndarray) else val)
|
||||
|
||||
first_ep = self._metadata_buffer[0]
|
||||
chunk_idx = first_ep["meta/episodes/chunk_index"][0]
|
||||
file_idx = first_ep["meta/episodes/file_index"][0]
|
||||
|
||||
schema = None if not self._pq_writer else self._pq_writer.schema
|
||||
table = pa.Table.from_pydict(combined_dict, schema=schema)
|
||||
|
||||
if not self._pq_writer:
|
||||
path = Path(self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx))
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self._pq_writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
|
||||
|
||||
self._pq_writer.write_table(table)
|
||||
|
||||
self.latest_episode = self._metadata_buffer[-1]
|
||||
self._metadata_buffer.clear()
|
||||
|
||||
def save_episode(
|
||||
self,
|
||||
split,
|
||||
@@ -92,18 +55,78 @@ class RoboMINDDatasetMetadata(LeRobotDatasetMetadata):
|
||||
write_stats(self.stats, self.root)
|
||||
|
||||
|
||||
class RoboMINDDatasetWriter(DatasetWriter):
|
||||
def save_episode(
|
||||
self,
|
||||
split,
|
||||
action_config: dict,
|
||||
episode_data: dict | None = None,
|
||||
parallel_encoding: bool = True,
|
||||
) -> None:
|
||||
"""Save the current episode in self.episode_buffer to disk."""
|
||||
class RoboMINDDataset(LeRobotDataset):
|
||||
@classmethod
|
||||
def create(
|
||||
cls,
|
||||
repo_id: str,
|
||||
fps: int,
|
||||
features: dict,
|
||||
root: str | Path | None = None,
|
||||
robot_type: str | None = None,
|
||||
use_videos: bool = True,
|
||||
tolerance_s: float = 1e-4,
|
||||
image_writer_processes: int = 0,
|
||||
image_writer_threads: int = 0,
|
||||
video_backend: str | None = None,
|
||||
batch_encoding_size: int = 1,
|
||||
vcodec: str = "libsvtav1",
|
||||
) -> "LeRobotDataset":
|
||||
"""Create a LeRobot Dataset from scratch in order to record data."""
|
||||
if vcodec not in VALID_VIDEO_CODECS:
|
||||
raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}")
|
||||
obj = cls.__new__(cls)
|
||||
obj.meta = RoboMINDDatasetMetadata.create(
|
||||
repo_id=repo_id,
|
||||
fps=fps,
|
||||
robot_type=robot_type,
|
||||
features=features,
|
||||
root=root,
|
||||
use_videos=use_videos,
|
||||
)
|
||||
obj.repo_id = obj.meta.repo_id
|
||||
obj.root = obj.meta.root
|
||||
obj.revision = None
|
||||
obj.tolerance_s = tolerance_s
|
||||
obj.image_writer = None
|
||||
obj.batch_encoding_size = batch_encoding_size
|
||||
obj.episodes_since_last_encoding = 0
|
||||
obj.vcodec = vcodec
|
||||
|
||||
if image_writer_processes or image_writer_threads:
|
||||
obj.start_image_writer(image_writer_processes, image_writer_threads)
|
||||
|
||||
# TODO(aliberts, rcadene, alexander-soare): Merge this with OnlineBuffer/DataBuffer
|
||||
obj.episode_buffer = obj.create_episode_buffer()
|
||||
|
||||
obj.episodes = None
|
||||
obj.hf_dataset = obj.create_hf_dataset()
|
||||
obj.image_transforms = None
|
||||
obj.delta_timestamps = None
|
||||
obj.delta_indices = None
|
||||
obj._absolute_to_relative_idx = None
|
||||
obj.video_backend = video_backend if video_backend is not None else get_safe_default_codec()
|
||||
obj.writer = None
|
||||
obj.latest_episode = None
|
||||
obj._current_file_start_frame = None
|
||||
# Initialize tracking for incremental recording
|
||||
obj._lazy_loading = False
|
||||
obj._recorded_frames = 0
|
||||
obj._writer_closed_for_reading = False
|
||||
return obj
|
||||
|
||||
def save_episode(self, split, action_config: dict, episode_data: dict | None = None) -> None:
|
||||
"""
|
||||
This will save to disk the current episode in self.episode_buffer.
|
||||
|
||||
Args:
|
||||
episode_data (dict | None, optional): Dict containing the episode data to save. If None, this will
|
||||
save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to
|
||||
None.
|
||||
"""
|
||||
episode_buffer = episode_data if episode_data is not None else self.episode_buffer
|
||||
|
||||
validate_episode_buffer(episode_buffer, self._meta.total_episodes, self._meta.features)
|
||||
validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features)
|
||||
|
||||
# size and task are special cases that won't be added to hf_dataset
|
||||
episode_length = episode_buffer.pop("size")
|
||||
@@ -111,131 +134,49 @@ class RoboMINDDatasetWriter(DatasetWriter):
|
||||
episode_tasks = list(set(tasks))
|
||||
episode_index = episode_buffer["episode_index"]
|
||||
|
||||
episode_buffer["index"] = np.arange(self._meta.total_frames, self._meta.total_frames + episode_length)
|
||||
episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length)
|
||||
episode_buffer["episode_index"] = np.full((episode_length,), episode_index)
|
||||
|
||||
# Update tasks and task indices with new tasks if any
|
||||
self._meta.save_episode_tasks(episode_tasks)
|
||||
self.meta.save_episode_tasks(episode_tasks)
|
||||
|
||||
# Given tasks in natural language, find their corresponding task indices
|
||||
episode_buffer["task_index"] = np.array([self._meta.get_task_index(task) for task in tasks])
|
||||
episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks])
|
||||
|
||||
for key, ft in self._meta.features.items():
|
||||
for key, ft in self.features.items():
|
||||
# index, episode_index, task_index are already processed above, and image and video
|
||||
# are processed separately by storing image path and frame info as meta data
|
||||
if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]:
|
||||
continue
|
||||
episode_buffer[key] = np.stack(episode_buffer[key]).squeeze()
|
||||
|
||||
# Wait for image writer to end, so that episode stats over images can be computed
|
||||
self._wait_image_writer()
|
||||
has_video_keys = len(self._meta.video_keys) > 0
|
||||
use_streaming = self._streaming_encoder is not None and has_video_keys
|
||||
use_batched_encoding = self._batch_encoding_size > 1
|
||||
|
||||
if use_streaming:
|
||||
non_video_buffer = {
|
||||
k: v for k, v in episode_buffer.items() if self._meta.features.get(k, {}).get("dtype") not in ("video",)
|
||||
}
|
||||
non_video_features = {k: v for k, v in self._meta.features.items() if v["dtype"] != "video"}
|
||||
ep_stats = compute_episode_stats(non_video_buffer, non_video_features)
|
||||
else:
|
||||
ep_stats = compute_episode_stats(episode_buffer, self._meta.features)
|
||||
ep_stats = compute_episode_stats(episode_buffer, self.features)
|
||||
|
||||
ep_metadata = self._save_episode_data(episode_buffer)
|
||||
has_video_keys = len(self.meta.video_keys) > 0
|
||||
use_batched_encoding = self.batch_encoding_size > 1
|
||||
|
||||
if use_streaming:
|
||||
streaming_results = self._streaming_encoder.finish_episode()
|
||||
for video_key in self._meta.video_keys:
|
||||
temp_path, video_stats = streaming_results[video_key]
|
||||
if video_stats is not None:
|
||||
ep_stats[video_key] = {
|
||||
k: v if k == "count" else np.squeeze(v.reshape(1, -1, 1, 1) / 255.0, axis=0)
|
||||
for k, v in video_stats.items()
|
||||
}
|
||||
ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path))
|
||||
elif has_video_keys and not use_batched_encoding:
|
||||
num_cameras = len(self._meta.video_keys)
|
||||
if parallel_encoding and num_cameras > 1:
|
||||
with concurrent.futures.ProcessPoolExecutor(max_workers=num_cameras) as executor:
|
||||
future_to_key = {
|
||||
executor.submit(
|
||||
_encode_video_worker,
|
||||
video_key,
|
||||
episode_index,
|
||||
self._root,
|
||||
self._meta.fps,
|
||||
self._vcodec,
|
||||
self._encoder_threads,
|
||||
): video_key
|
||||
for video_key in self._meta.video_keys
|
||||
}
|
||||
|
||||
results = {}
|
||||
for future in concurrent.futures.as_completed(future_to_key):
|
||||
video_key = future_to_key[future]
|
||||
try:
|
||||
temp_path = future.result()
|
||||
results[video_key] = temp_path
|
||||
except Exception as exc:
|
||||
logging.error(f"Video encoding failed for {video_key}: {exc}")
|
||||
raise exc
|
||||
|
||||
for video_key in self._meta.video_keys:
|
||||
temp_path = results[video_key]
|
||||
ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path))
|
||||
else:
|
||||
for video_key in self._meta.video_keys:
|
||||
ep_metadata.update(self._save_episode_video(video_key, episode_index))
|
||||
if has_video_keys and not use_batched_encoding:
|
||||
for video_key in self.meta.video_keys:
|
||||
ep_metadata.update(self._save_episode_video(video_key, episode_index))
|
||||
|
||||
# `meta.save_episode` be executed after encoding the videos
|
||||
ep_metadata.update({"action_config": action_config})
|
||||
self._meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
|
||||
self.meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
|
||||
|
||||
if has_video_keys and use_batched_encoding:
|
||||
self._episodes_since_last_encoding += 1
|
||||
if self._episodes_since_last_encoding == self._batch_encoding_size:
|
||||
start_ep = self._meta.total_episodes - self._batch_encoding_size
|
||||
end_ep = self._meta.total_episodes
|
||||
# Check if we should trigger batch encoding
|
||||
self.episodes_since_last_encoding += 1
|
||||
if self.episodes_since_last_encoding == self.batch_encoding_size:
|
||||
start_ep = self.num_episodes - self.batch_encoding_size
|
||||
end_ep = self.num_episodes
|
||||
self._batch_save_episode_video(start_ep, end_ep)
|
||||
self._episodes_since_last_encoding = 0
|
||||
self.episodes_since_last_encoding = 0
|
||||
|
||||
if not episode_data:
|
||||
self.clear_episode_buffer(delete_images=len(self._meta.image_keys) > 0)
|
||||
|
||||
|
||||
class RoboMINDDataset(LeRobotDataset):
|
||||
@classmethod
|
||||
def create(cls, *args, **kwargs) -> "RoboMINDDataset":
|
||||
sig = inspect.signature(super().create)
|
||||
bound = sig.bind_partial(*args, **kwargs)
|
||||
bound.apply_defaults()
|
||||
params = bound.arguments
|
||||
|
||||
obj = super().create(*args, **kwargs)
|
||||
|
||||
shutil.rmtree(params["root"], ignore_errors=True)
|
||||
obj.meta: RoboMINDDatasetMetadata = RoboMINDDatasetMetadata.create(
|
||||
repo_id=params["repo_id"],
|
||||
fps=params["fps"],
|
||||
robot_type=params["robot_type"],
|
||||
features=params["features"],
|
||||
root=params["root"],
|
||||
use_videos=params["use_videos"],
|
||||
metadata_buffer_size=params["metadata_buffer_size"],
|
||||
)
|
||||
obj.writer: RoboMINDDatasetWriter = RoboMINDDatasetWriter(
|
||||
meta=obj.meta,
|
||||
root=obj.root,
|
||||
vcodec=obj._vcodec,
|
||||
encoder_threads=obj._encoder_threads,
|
||||
batch_encoding_size=obj._batch_encoding_size,
|
||||
)
|
||||
return obj
|
||||
|
||||
def save_episode(
|
||||
self, split, action_config: dict, episode_data: dict | None = None, parallel_encoding: bool = True
|
||||
) -> None:
|
||||
self._require_writer("save_episode")
|
||||
self.writer.save_episode(split, action_config, episode_data, parallel_encoding)
|
||||
# Reset episode buffer and clean up temporary images (if not already deleted during video encoding)
|
||||
self.clear_episode_buffer(delete_images=len(self.meta.image_keys) > 0)
|
||||
|
||||
|
||||
def get_all_tasks(src_path: Path, output_path: Path, embodiment: str):
|
||||
@@ -314,11 +255,11 @@ def save_as_lerobot_dataset(task: tuple[dict, Path, str], src_path, benchmark, e
|
||||
return
|
||||
else:
|
||||
logging.warning(f"Skipped {episode_path}: len of dataset:{len(raw_dataset)} or {str(err)}")
|
||||
|
||||
dataset.finalize()
|
||||
gc.collect()
|
||||
|
||||
if dataset.meta.total_episodes == 0:
|
||||
shutil.rmtree(local_dir)
|
||||
del dataset
|
||||
|
||||
|
||||
def main(
|
||||
@@ -357,6 +298,7 @@ def main(
|
||||
logging.error(f"Exception occurred for {task_path['train']}")
|
||||
with open("output.txt", "a") as f:
|
||||
f.write(f"{task_path['train']}, exception details: {str(e)}\n")
|
||||
ray.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
import numpy as np
|
||||
from lerobot.datasets.compute_stats import (
|
||||
DEFAULT_QUANTILES,
|
||||
auto_downsample_height_width,
|
||||
get_feature_stats,
|
||||
sample_indices,
|
||||
)
|
||||
from lerobot.datasets.io_utils import load_image_as_numpy
|
||||
import torchvision
|
||||
from lerobot.datasets.compute_stats import auto_downsample_height_width, get_feature_stats, sample_indices
|
||||
from lerobot.datasets.utils import load_image_as_numpy
|
||||
|
||||
torchvision.set_video_backend("pyav")
|
||||
|
||||
|
||||
def generate_features_from_config(AgiBotWorld_CONFIG):
|
||||
@@ -51,31 +49,21 @@ def sample_images(input):
|
||||
return images
|
||||
|
||||
|
||||
def compute_episode_stats(
|
||||
episode_data: dict[str, list[str] | np.ndarray],
|
||||
features: dict,
|
||||
quantile_list: list[float] | None = None,
|
||||
) -> dict:
|
||||
if quantile_list is None:
|
||||
quantile_list = DEFAULT_QUANTILES
|
||||
|
||||
def compute_episode_stats(episode_data: dict[str, list[str] | np.ndarray], features: dict) -> dict:
|
||||
ep_stats = {}
|
||||
for key, data in episode_data.items():
|
||||
if features[key]["dtype"] == "string":
|
||||
continue
|
||||
|
||||
continue # HACK: we should receive np.arrays of strings
|
||||
elif features[key]["dtype"] in ["image", "video"]:
|
||||
ep_ft_array = sample_images(data)
|
||||
axes_to_reduce = (0, 2, 3)
|
||||
axes_to_reduce = (0, 2, 3) # keep channel dim
|
||||
keepdims = True
|
||||
else:
|
||||
ep_ft_array = data
|
||||
axes_to_reduce = 0
|
||||
keepdims = data.ndim == 1
|
||||
ep_ft_array = data # data is already a np.ndarray
|
||||
axes_to_reduce = 0 # compute stats over the first axis
|
||||
keepdims = data.ndim == 1 # keep as np.array
|
||||
|
||||
ep_stats[key] = get_feature_stats(
|
||||
ep_ft_array, axis=axes_to_reduce, keepdims=keepdims, quantile_list=quantile_list
|
||||
)
|
||||
ep_stats[key] = get_feature_stats(ep_ft_array, axis=axes_to_reduce, keepdims=keepdims)
|
||||
|
||||
if features[key]["dtype"] in ["image", "video"]:
|
||||
value_norm = 1.0 if "depth" in key else 255.0
|
||||
|
||||
Reference in New Issue
Block a user