2 Commits

Author SHA1 Message Date
Tavish 5068921975 📝 update readme 2026-02-07 15:51:21 +08:00
Tavish 533cfd487e ⚰️ remove dataset merging 2026-02-07 15:20:54 +08:00
23 changed files with 506 additions and 1688 deletions
+8 -11
View File
@@ -20,16 +20,14 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
## 📣 What's New <a><img width="35" height="20" src="https://user-images.githubusercontent.com/12782558/212848161-5e783dd6-11e8-4fe0-bbba-39ffb77730be.png"></a>
- **\[2026.06.12\]** We have provided an efficient and concise Generic Converter for building new dataset-to-LeRobot converters with minimal adapter code! 🔥🔥🔥
- **\[2026.03.20\]** We have supported Data Conversion from RoboCasa to LeRobot! 🔥🔥🔥
- **\[2025.10.04\]** We have collected and updated all Dataset Version Conversion Scripts for LeRobot! 🔥🔥🔥
- **\[2025.09.28\]** We have upgraded LeRobotDataset from v2.1 to v3.0! 🔥🔥🔥
- **\[2025.06.27\]** We have supported Data Conversion from LIBERO to LeRobot! 🔥🔥🔥
- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! 🔥🔥🔥
- **\[2025.05.12\]** We have supported Data Conversion from RoboMIND to LeRobot! 🔥🔥🔥
<details>
<summary>More News</summary>
- **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! 🔥🔥🔥
- **\[2025.05.12\]** We have supported Data Conversion from RoboMIND to LeRobot! 🔥🔥🔥
- **\[2025.04.15\]** We add Dataset Merging Tool for merging multi-source lerobot datasets! 🔥🔥🔥
- **\[2025.04.14\]** We have supported Data Conversion from AgiBotWorld to LeRobot! 🔥🔥🔥
- **\[2025.04.11\]** We change the repo from `openx2lerobot` to `any4lerobot`, making a universal toolbox for LeRobot! 🔥🔥🔥
@@ -48,13 +46,11 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
- **Data Conversion**:
- [x] [Generic Converter](./generic_converter/README.md)
- [x] [Open X-Embodiment to LeRobot](./openx2lerobot/README.md)
- [x] [AgiBot-World to LeRobot](./agibot2lerobot/README.md)
- [x] [RoboMIND to LeRobot](./robomind2lerobot/README.md)
- [x] [LeRobot to RLDS](./lerobot2rlds/README.md)
- [x] [LIBERO to LeRobot](./libero2lerobot/README.md)
- [x] [RoboCasa to LeRobot](./robocasa2lerobot/README.md)
- [**Version Conversion**](./ds_version_convert/README.md):
@@ -79,14 +75,14 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
- [OneTwoVLA](https://one-two-vla.github.io/): A Unified Vision-Language-Action Model with Adaptive Reasoning [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/Fanqi-Lin/OneTwoVLA">](https://github.com/Fanqi-Lin/OneTwoVLA)
- [SmolVLA](https://huggingface.co/blog/smolvla): Efficient Vision-Language-Action Model trained on Lerobot Community Data [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/huggingface/lerobot">](https://github.com/huggingface/lerobot)
- [SpatialVLA](https://spatialvla.github.io/): a spatial-enhanced vision-language-action model that is trained on 1.1 Million real robot episodes [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/SpatialVLA/SpatialVLA">](https://github.com/SpatialVLA/SpatialVLA)
- [openpi](https://www.physicalintelligence.company/blog/pi0): the official implementation of $π_0$: A Vision-Language-Action Flow Model for General Robot Control [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/Physical-Intelligence/openpi">](https://github.com/Physical-Intelligence/openpi)
- [openpi](https://www.physicalintelligence.company/blog/pi0): the official implemenation of $π_0$: A Vision-Language-Action Flow Model for General Robot Control [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/Physical-Intelligence/openpi">](https://github.com/Physical-Intelligence/openpi)
- [Isaac-GR00T](https://developer.nvidia.com/isaac/gr00t): NVIDIA Isaac GR00T N1 is the world's first open foundation model for generalized humanoid robot reasoning and skills [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/NVIDIA/Isaac-GR00T">](https://github.com/NVIDIA/Isaac-GR00T)
### Dataset
- [Official](https://huggingface.co/lerobot): State-of-the-art Machine Learning for real-world robotics.
- [IPEC-COMMUNITY/OpenX](https://huggingface.co/collections/IPEC-COMMUNITY/openx-lerobot-67c29b2ee5911f17dbea635e): Open X-Embodiment datasets in LeRobot format with standard transformation
- [IPEC-COMMUNITY/LIBERO](https://huggingface.co/collections/IPEC-COMMUNITY/libero-benchmark-dataset-684837af28d465aa8b043950): LIBERO datasets in LeRobot format with standard transformation and filtering
- [IPEC-COMMUNITY/OpenX](https://huggingface.co/collections/IPEC-COMMUNITY/openx-lerobot-67c29b2ee5911f17dbea635e): Open X-Embodiment datasets in LeRobot format with standard transfomation
- [IPEC-COMMUNITY/LIBERO](https://huggingface.co/collections/IPEC-COMMUNITY/libero-benchmark-dataset-684837af28d465aa8b043950): LIBERO datasets in LeRobot format with standard transfomation and filtering
- [weijian-sun/agibotworld-lerobot](https://huggingface.co/datasets/weijian-sun/agibotworld-lerobot): AgibotWorld-LeRobot v2.0
- [GR00T-Dateset](https://huggingface.co/GR00T-Dateset): Isaac-GR00T training dataset
- [nvidia/PhysicalAI-Robotics-GR00T-X-Embodiment-Sim](https://huggingface.co/datasets/nvidia/PhysicalAI-Robotics-GR00T-X-Embodiment-Sim): Isaac-GR00T training dataset
@@ -111,7 +107,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
- [LeKiwi](https://github.com/SIGRobotics-UIUC/LeKiwi): Low-Cost Mobile Manipulator [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/SIGRobotics-UIUC/LeKiwi">](https://github.com/SIGRobotics-UIUC/LeKiwi)
- [XLeRobot](https://github.com/Vector-Wangel/XLeRobot): Fully Autonomous Household Dual-Arm Mobile Robot for $998 [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/Vector-Wangel/XLeRobot">](https://github.com/Vector-Wangel/XLeRobot)
- [LeRobot-Kinematics](https://github.com/box2ai-robotics/lerobot-kinematics): Simple and Accurate Forward and Inverse Kinematics Examples for the Lerobot SO100 ARM [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/box2ai-robotics/lerobot-kinematics">](https://github.com/box2ai-robotics/lerobot-kinematics)
- [lerobotdepot](https://github.com/maximilienroberti/lerobotdepot): a repo for hardware, components, and 3D-printable projects compatible with the LeRobot library [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/maximilienroberti/lerobotdepot">](https://github.com/maximilienroberti/lerobotdepot)
- [lerobotdepot](https://github.com/maximilienroberti/lerobotdepot): a reoi for hardware, components, and 3D-printable projects compatible with the LeRobot library [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/maximilienroberti/lerobotdepot">](https://github.com/maximilienroberti/lerobotdepot)
- [PingTi-Arm](https://github.com/nomorewzx/PingTi-Arm): A human-scale robotic arm compatible with Lerobot, based on SO100 [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/nomorewzx/PingTi-Arm">](https://github.com/nomorewzx/PingTi-Arm)
- [LeDog](https://github.com/wuxiaoqiang12/LeDog): A mobile manipulator with a Weilan AlphaDog and a LeRobot-compatible SO-101 arm, with support for ROS 1/2 (Official: [ledog_ros2](https://gitcode.com/openeuler/ledog_ros2)) [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/wuxiaoqiang12/LeDog">](https://github.com/wuxiaoqiang12/LeDog)
@@ -136,6 +132,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg
- [LeRobot Dataset Visualizer](https://github.com/huggingface/lerobot-dataset-visualizer): Web application for visualizing robotics datasets in LeRobot format [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/huggingface/lerobot-dataset-visualizer">](https://github.com/huggingface/lerobot-dataset-visualizer)
- [lerobot_so101_teleop](https://github.com/liorbenhorin/lerobot_so101_teleop): Sample Environment for the LeRobot SO-101 Robot in Isaac Lab to collect demonstrations in a simulation [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/liorbenhorin/lerobot_so101_teleop">](https://github.com/liorbenhorin/lerobot_so101_teleop)
- [Robot Learning: A Tutorial](https://github.com/fracapuano/robot-learning-tutorial): All the source code for "Robot Learning: A Tutorial". Get involved to be featured in the next iteration [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/fracapuano/robot-learning-tutorial">](https://github.com/fracapuano/robot-learning-tutorial)
- [LERO](https://github.com/masato-ka/lero): LeRobot dataset Operations toolkit [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/masato-ka/lero">](https://github.com/masato-ka/lero)
- [CRISP](https://utiasdsl.github.io/crisp_controllers/): Record datasets and deploy policies using LeRobot and ROS2-compatible manipulators (Franka Robotics FR3 and more supported) [<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/utiasdsl/crisp_controllers">](https://github.com/utiasdsl/crisp_controllers)
## 👷‍♂️ Contributing
@@ -157,7 +154,7 @@ If you find this repository helpful in your research or projects, please conside
```bibtex
@misc{any4lerobot,
title = {Any4LeRobot: A tool collection for LeRobot},
author = {Qizhi Chen, Dong Wang, Bin Zhao},
author = {Qizhi Chen},
license = {MIT},
url = {https://github.com/Tavish9/any4lerobot},
year = {2025},
+49 -125
View File
@@ -1,112 +1,68 @@
import argparse
import inspect
import gc
import shutil
import tempfile
from pathlib import Path
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import ray
import torch
from agibot_utils.agibot_utils import get_task_info, load_local_dataset
from agibot_utils.config import AgiBotWorld_TASK_TYPE
from agibot_utils.lerobot_utils import compute_episode_stats, generate_features_from_config
from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.dataset_writer import DatasetWriter
from lerobot.datasets.feature_utils import get_hf_features_from_features, validate_episode_buffer, validate_frame
from lerobot.datasets.utils import DEFAULT_EPISODES_PATH
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import validate_episode_buffer, validate_frame
from ray.runtime_env import RuntimeEnv
class AgiBotDatasetMetadata(LeRobotDatasetMetadata):
def _flush_metadata_buffer(self) -> None:
"""Write all buffered episode metadata to parquet file."""
if not hasattr(self, "_metadata_buffer") or len(self._metadata_buffer) == 0:
return
combined_dict = {}
for episode_dict in self._metadata_buffer:
for key, value in episode_dict.items():
if key not in combined_dict:
combined_dict[key] = []
# Extract value and serialize numpy arrays
# because PyArrow's from_pydict function doesn't support numpy arrays
val = value[0] if isinstance(value, list) else value
combined_dict[key].append(val.tolist() if isinstance(val, np.ndarray) else val)
first_ep = self._metadata_buffer[0]
chunk_idx = first_ep["meta/episodes/chunk_index"][0]
file_idx = first_ep["meta/episodes/file_index"][0]
schema = None if not self._pq_writer else self._pq_writer.schema
table = pa.Table.from_pydict(combined_dict, schema=schema)
if not self._pq_writer:
path = Path(self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx))
path.parent.mkdir(parents=True, exist_ok=True)
self._pq_writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
self._pq_writer.write_table(table)
self.latest_episode = self._metadata_buffer[-1]
self._metadata_buffer.clear()
class AgiBotDatasetWriter(DatasetWriter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.hf_features = get_hf_features_from_features(self._meta.features)
class AgiBotDataset(LeRobotDataset):
def add_frame(self, frame: dict) -> None:
"""
Add a single frame to the current episode buffer.
Apart from images written to a temporary directory, nothing is written to disk
until ``save_episode()`` is called.
The caller must provide all user-defined features plus ``"task"``, and must
not provide ``"timestamp"`` or ``"frame_index"``; those are computed
automatically.
This function only adds the frame to the episode_buffer. Apart from images — which are written in a
temporary directory — nothing is written to disk. To save those frames, the 'save_episode()' method
then needs to be called.
"""
# Convert torch to numpy if needed
for name in frame:
if isinstance(frame[name], torch.Tensor):
frame[name] = frame[name].numpy()
features = {
key: value for key, value in self._meta.features.items() if key in self.hf_features
} # remove video keys
features = {key: value for key, value in self.features.items() if key in self.hf_features} # remove video keys
validate_frame(frame, features)
if self.episode_buffer is None:
self.episode_buffer = self._create_episode_buffer()
self.episode_buffer = self.create_episode_buffer()
# Automatically add frame_index and timestamp to episode buffer
frame_index = self.episode_buffer["size"]
timestamp = frame_index / self._meta.fps
timestamp = frame.pop("timestamp") if "timestamp" in frame else frame_index / self.fps
self.episode_buffer["frame_index"].append(frame_index)
self.episode_buffer["timestamp"].append(timestamp)
self.episode_buffer["task"].append(frame.pop("task"))
self.episode_buffer["task"].append(frame.pop("task")) # Remove task from frame after processing
# Add frame features to episode_buffer
for key, value in frame.items():
if key not in self._meta.features:
if key not in self.features:
raise ValueError(
f"An element of the frame is not in the features. '{key}' not in '{self._meta.features.keys()}'."
f"An element of the frame is not in the features. '{key}' not in '{self.features.keys()}'."
)
self.episode_buffer[key].append(value)
self.episode_buffer["size"] += 1
def save_episode(
self, videos: dict, action_config: list, episode_data: dict | None = None, parallel_encoding: bool = True
) -> None:
"""Save the current episode in self.episode_buffer to disk."""
def save_episode(self, videos: dict, action_config: list, episode_data: dict | None = None) -> None:
"""
This will save to disk the current episode in self.episode_buffer.
Args:
episode_data (dict | None, optional): Dict containing the episode data to save. If None, this will
save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to
None.
"""
episode_buffer = episode_data if episode_data is not None else self.episode_buffer
validate_episode_buffer(episode_buffer, self._meta.total_episodes, self._meta.features)
validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features)
# size and task are special cases that won't be added to hf_dataset
episode_length = episode_buffer.pop("size")
@@ -114,49 +70,53 @@ class AgiBotDatasetWriter(DatasetWriter):
episode_tasks = list(set(tasks))
episode_index = episode_buffer["episode_index"]
episode_buffer["index"] = np.arange(self._meta.total_frames, self._meta.total_frames + episode_length)
episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length)
episode_buffer["episode_index"] = np.full((episode_length,), episode_index)
# Update tasks and task indices with new tasks if any
self._meta.save_episode_tasks(episode_tasks)
self.meta.save_episode_tasks(episode_tasks)
# Given tasks in natural language, find their corresponding task indices
episode_buffer["task_index"] = np.array([self._meta.get_task_index(task) for task in tasks])
episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks])
for key, ft in self._meta.features.items():
for key, ft in self.features.items():
# index, episode_index, task_index are already processed above, and image and video
# are processed separately by storing image path and frame info as meta data
if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]:
continue
episode_buffer[key] = np.stack(episode_buffer[key]).squeeze()
for key in self._meta.video_keys:
episode_buffer[key] = str(videos[key])
for key in self.meta.video_keys:
episode_buffer[key] = str(videos[key]) # PosixPath -> str
ep_stats = compute_episode_stats(episode_buffer, self._meta.features)
ep_stats = compute_episode_stats(episode_buffer, self.features)
ep_metadata = self._save_episode_data(episode_buffer)
has_video_keys = len(self._meta.video_keys) > 0
use_batched_encoding = self._batch_encoding_size > 1
has_video_keys = len(self.meta.video_keys) > 0
use_batched_encoding = self.batch_encoding_size > 1
self.current_videos = videos
if has_video_keys and not use_batched_encoding:
for video_key in self._meta.video_keys:
for video_key in self.meta.video_keys:
ep_metadata.update(self._save_episode_video(video_key, episode_index))
# `meta.save_episode` be executed after encoding the videos
# add action_config to current episode
ep_metadata.update({"action_config": action_config})
self._meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
if has_video_keys and use_batched_encoding:
self._episodes_since_last_encoding += 1
if self._episodes_since_last_encoding == self._batch_encoding_size:
start_ep = self._meta.total_episodes - self._batch_encoding_size
end_ep = self._meta.total_episodes
# Check if we should trigger batch encoding
self.episodes_since_last_encoding += 1
if self.episodes_since_last_encoding == self.batch_encoding_size:
start_ep = self.num_episodes - self.batch_encoding_size
end_ep = self.num_episodes
self._batch_save_episode_video(start_ep, end_ep)
self._episodes_since_last_encoding = 0
self.episodes_since_last_encoding = 0
if not episode_data:
self.clear_episode_buffer(delete_images=len(self._meta.image_keys) > 0)
# Reset episode buffer and clean up temporary images (if not already deleted during video encoding)
self.clear_episode_buffer(delete_images=len(self.meta.image_keys) > 0)
def _encode_temporary_episode_video(self, video_key: str, episode_index: int) -> Path:
"""
@@ -164,47 +124,11 @@ class AgiBotDatasetWriter(DatasetWriter):
Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding,
since video encoding with ffmpeg is already using multithreading.
"""
temp_path = Path(tempfile.mkdtemp(dir=self._root)) / f"{video_key}_{episode_index:03d}.mp4"
temp_path = Path(tempfile.mkdtemp(dir=self.root)) / f"{video_key}_{episode_index:03d}.mp4"
shutil.copy(self.current_videos[video_key], temp_path)
return temp_path
class AgiBotDataset(LeRobotDataset):
@classmethod
def create(cls, *args, **kwargs) -> "AgiBotDataset":
sig = inspect.signature(super().create)
bound = sig.bind_partial(*args, **kwargs)
bound.apply_defaults()
params = bound.arguments
obj = super().create(*args, **kwargs)
shutil.rmtree(params["root"], ignore_errors=True)
obj.meta: AgiBotDatasetMetadata = AgiBotDatasetMetadata.create(
repo_id=params["repo_id"],
fps=params["fps"],
robot_type=params["robot_type"],
features=params["features"],
root=params["root"],
use_videos=params["use_videos"],
metadata_buffer_size=params["metadata_buffer_size"],
)
obj.writer: AgiBotDatasetWriter = AgiBotDatasetWriter(
meta=obj.meta,
root=obj.root,
vcodec=obj._vcodec,
encoder_threads=obj._encoder_threads,
batch_encoding_size=obj._batch_encoding_size,
)
return obj
def save_episode(
self, videos: dict, action_config: list, episode_data: dict | None = None, parallel_encoding: bool = True
) -> None:
self._require_writer("save_episode")
self.writer.save_episode(videos, action_config, episode_data, parallel_encoding)
def get_all_tasks(src_path: Path, output_path: Path):
json_files = src_path.glob("task_info/*.json")
for json_file in json_files:
@@ -267,13 +191,11 @@ def save_as_lerobot_dataset(agibot_world_config, task: tuple[Path, Path], save_d
dataset.save_episode(videos=videos, action_config=action_config)
except Exception as e:
print(f"{json_file.stem}, episode_{eid}: there are some corrupted mp4s\nException details: {str(e)}")
dataset.clear_episode_buffer(delete_images=False)
dataset.episode_buffer = None
continue
gc.collect()
print(f"process done for {json_file.stem}, episode_id {eid}, len {len(frames)}")
dataset.finalize()
def main(
src_path: str,
@@ -325,6 +247,8 @@ def main(
with open("output.txt", "a") as f:
f.write(f"{task}, exception details: {str(e)}\n")
ray.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
+15 -26
View File
@@ -1,11 +1,9 @@
import numpy as np
from lerobot.datasets.compute_stats import (
DEFAULT_QUANTILES,
auto_downsample_height_width,
get_feature_stats,
sample_indices,
)
from torchcodec.decoders import VideoDecoder
import torch
import torchvision
from lerobot.datasets.compute_stats import auto_downsample_height_width, get_feature_stats, sample_indices
torchvision.set_video_backend("pyav")
def generate_features_from_config(AgiBotWorld_CONFIG):
@@ -22,8 +20,9 @@ def generate_features_from_config(AgiBotWorld_CONFIG):
def sample_images(input):
if type(input) is str:
video_path = input
decoder = VideoDecoder(video_path)
frames_array = decoder[0:-1].numpy() # Shape: [T, C, H, W]
reader = torchvision.io.VideoReader(video_path, stream="video")
frames = [frame["data"] for frame in reader]
frames_array = torch.stack(frames).numpy() # Shape: [T, C, H, W]
sampled_indices = sample_indices(len(frames_array))
images = None
@@ -51,31 +50,21 @@ def sample_images(input):
return images
def compute_episode_stats(
episode_data: dict[str, list[str] | np.ndarray],
features: dict,
quantile_list: list[float] | None = None,
) -> dict:
if quantile_list is None:
quantile_list = DEFAULT_QUANTILES
def compute_episode_stats(episode_data: dict[str, list[str] | np.ndarray], features: dict) -> dict:
ep_stats = {}
for key, data in episode_data.items():
if features[key]["dtype"] == "string":
continue
continue # HACK: we should receive np.arrays of strings
elif features[key]["dtype"] in ["image", "video"]:
ep_ft_array = sample_images(data)
axes_to_reduce = (0, 2, 3)
axes_to_reduce = (0, 2, 3) # keep channel dim
keepdims = True
else:
ep_ft_array = data
axes_to_reduce = 0
keepdims = data.ndim == 1
ep_ft_array = data # data is already a np.ndarray
axes_to_reduce = 0 # compute stats over the first axis
keepdims = data.ndim == 1 # keep as np.array
ep_stats[key] = get_feature_stats(
ep_ft_array, axis=axes_to_reduce, keepdims=keepdims, quantile_list=quantile_list
)
ep_stats[key] = get_feature_stats(ep_ft_array, axis=axes_to_reduce, keepdims=keepdims)
if features[key]["dtype"] in ["image", "video"]:
value_norm = 1.0 if "depth" in key else 255.0
@@ -32,11 +32,6 @@ import pyarrow.parquet as pq
import tqdm
from datasets import Dataset
from huggingface_hub import snapshot_download
from lerobot.datasets.io_utils import (
load_info,
load_tasks,
write_info,
)
from lerobot.datasets.utils import (
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_PATH,
@@ -45,8 +40,11 @@ from lerobot.datasets.utils import (
LEGACY_EPISODES_PATH,
LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH,
load_info,
load_tasks,
serialize_dict,
unflatten_dict,
write_info,
)
from lerobot.utils.constants import HF_LEROBOT_HOME
from lerobot.utils.utils import init_logging
@@ -54,12 +52,8 @@ from lerobot.utils.utils import init_logging
V21 = "v2.1"
V30 = "v3.0"
LEGACY_DATA_PATH_TEMPLATE = (
"data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
)
LEGACY_VIDEO_PATH_TEMPLATE = (
"videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
)
LEGACY_DATA_PATH_TEMPLATE = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
LEGACY_VIDEO_PATH_TEMPLATE = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
MIN_VIDEO_DURATION = 1e-6
LEGACY_STATS_KEYS = ("mean", "std", "min", "max", "count")
@@ -143,9 +137,7 @@ def convert_info(
if ft.get("dtype") != "video":
ft.pop("fps", None)
info["total_chunks"] = (
math.ceil(total_episodes / chunks_size) if total_episodes > 0 else 0
)
info["total_chunks"] = math.ceil(total_episodes / chunks_size) if total_episodes > 0 else 0
info["total_videos"] = total_episodes * len(video_keys)
write_info(info, new_root)
@@ -164,22 +156,14 @@ def _group_episodes_by_data_file(
return grouped
def convert_data(
root: Path, new_root: Path, episode_records: list[dict[str, Any]]
) -> None:
def convert_data(root: Path, new_root: Path, episode_records: list[dict[str, Any]]) -> None:
logging.info("Converting consolidated parquet files back to per-episode files")
grouped = _group_episodes_by_data_file(episode_records)
for (chunk_idx, file_idx), records in tqdm.tqdm(
grouped.items(), desc="convert data files"
):
source_path = root / DEFAULT_DATA_PATH.format(
chunk_index=chunk_idx, file_index=file_idx
)
for (chunk_idx, file_idx), records in tqdm.tqdm(grouped.items(), desc="convert data files"):
source_path = root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
if not source_path.exists():
raise FileNotFoundError(
f"Expected source parquet file not found: {source_path}"
)
raise FileNotFoundError(f"Expected source parquet file not found: {source_path}")
table = pq.read_table(source_path)
records = sorted(records, key=lambda rec: int(rec["dataset_from_index"]))
@@ -197,7 +181,7 @@ def convert_data(
f"episode_index={episode_index}, length={length}"
)
episode_table = table.slice(start, length)
episode_table = table.slice(start, length).to_pandas()
dest_chunk = episode_index // DEFAULT_CHUNK_SIZE
dest_path = new_root / LEGACY_DATA_PATH_TEMPLATE.format(
@@ -205,7 +189,7 @@ def convert_data(
episode_index=episode_index,
)
dest_path.parent.mkdir(parents=True, exist_ok=True)
Dataset(episode_table).to_parquet(dest_path)
Dataset.from_pandas(episode_table).to_parquet(dest_path)
def _group_episodes_by_video_file(
@@ -251,14 +235,10 @@ def _validate_video_paths(src: Path, dst: Path) -> None:
# Validate file extensions for video files
valid_video_extensions = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".m4v"}
if src_resolved.suffix.lower() not in valid_video_extensions:
raise ValueError(
f"Source file does not have a valid video extension: {src_resolved}"
)
raise ValueError(f"Source file does not have a valid video extension: {src_resolved}")
if dst_resolved.suffix.lower() not in valid_video_extensions:
raise ValueError(
f"Destination file does not have a valid video extension: {dst_resolved}"
)
raise ValueError(f"Destination file does not have a valid video extension: {dst_resolved}")
# Check for path traversal attempts in the original paths
src_str = str(src)
@@ -273,16 +253,11 @@ def _validate_video_paths(src: Path, dst: Path) -> None:
# Additional check: ensure resolved paths don't point to system directories
system_dirs = {"/etc", "/sys", "/proc", "/dev", "/boot", "/root"}
for resolved_path, name in [
(src_resolved, "source"),
(dst_resolved, "destination"),
]:
for resolved_path, name in [(src_resolved, "source"), (dst_resolved, "destination")]:
path_str = str(resolved_path)
for sys_dir in system_dirs:
if path_str.startswith(sys_dir + "/") or path_str == sys_dir:
raise ValueError(
f"Path points to system directory: {name} path {resolved_path}"
)
raise ValueError(f"Path points to system directory: {name} path {resolved_path}")
# Ensure the destination directory can be created safely
try:
@@ -349,13 +324,9 @@ def _extract_video_segment(
text=True,
)
except subprocess.TimeoutExpired as exc:
raise RuntimeError(
f"ffmpeg timed out while processing video '{src}' -> '{dst}'"
) from exc
raise RuntimeError(f"ffmpeg timed out while processing video '{src}' -> '{dst}'") from exc
except FileNotFoundError as exc:
raise RuntimeError(
"ffmpeg executable not found; it is required for video conversion"
) from exc
raise RuntimeError("ffmpeg executable not found; it is required for video conversion") from exc
except subprocess.CalledProcessError as exc:
error_msg = f"ffmpeg failed while splitting video '{src}' into '{dst}'"
if exc.stderr:
@@ -363,12 +334,7 @@ def _extract_video_segment(
raise RuntimeError(error_msg) from exc
def convert_videos(
root: Path,
new_root: Path,
episode_records: list[dict[str, Any]],
video_keys: list[str],
) -> None:
def convert_videos(root: Path, new_root: Path, episode_records: list[dict[str, Any]], video_keys: list[str]) -> None:
if len(video_keys) == 0:
logging.info("No video features detected; skipping video conversion")
return
@@ -381,9 +347,7 @@ def convert_videos(
logging.info("No video metadata found for key '%s'; skipping", video_key)
continue
for (chunk_idx, file_idx), records in tqdm.tqdm(
grouped.items(), desc=f"convert videos ({video_key})"
):
for (chunk_idx, file_idx), records in tqdm.tqdm(grouped.items(), desc=f"convert videos ({video_key})"):
src_path = root / DEFAULT_VIDEO_PATH.format(
video_key=video_key,
chunk_index=chunk_idx,
@@ -392,10 +356,7 @@ def convert_videos(
if not src_path.exists():
raise FileNotFoundError(f"Expected MP4 file not found: {src_path}")
records = sorted(
records,
key=lambda rec: float(rec[f"videos/{video_key}/from_timestamp"]),
)
records = sorted(records, key=lambda rec: float(rec[f"videos/{video_key}/from_timestamp"]))
for record in records:
episode_index = int(record["episode_index"])
@@ -412,9 +373,7 @@ def convert_videos(
_extract_video_segment(src_path, dest_path, start=start, end=end)
def convert_episodes_metadata(
new_root: Path, episode_records: list[dict[str, Any]]
) -> None:
def convert_episodes_metadata(new_root: Path, episode_records: list[dict[str, Any]]) -> None:
logging.info("Reconstructing legacy episodes and episodes_stats JSONL files")
episodes_path = new_root / LEGACY_EPISODES_PATH
@@ -437,9 +396,7 @@ def convert_episodes_metadata(
jsonlines.open(episodes_path, mode="w") as episodes_writer,
jsonlines.open(stats_path, mode="w") as stats_writer,
):
for record in sorted(
episode_records, key=lambda rec: int(rec["episode_index"])
):
for record in sorted(episode_records, key=lambda rec: int(rec["episode_index"])):
legacy_episode = {
key: value
for key, value in record.items()
@@ -450,14 +407,10 @@ def convert_episodes_metadata(
and key not in {"dataset_from_index", "dataset_to_index"}
}
serializable_episode = {
key: _to_serializable(value) for key, value in legacy_episode.items()
}
serializable_episode = {key: _to_serializable(value) for key, value in legacy_episode.items()}
episodes_writer.write(serializable_episode)
stats_flat = {
key: record[key] for key in record if key.startswith("stats/")
}
stats_flat = {key: record[key] for key in record if key.startswith("stats/")}
stats_nested = unflatten_dict(stats_flat).get("stats", {})
stats_serialized = serialize_dict(_filter_stats(stats_nested))
stats_writer.write(
@@ -500,11 +453,7 @@ def convert_dataset(
new_root.mkdir(parents=True, exist_ok=True)
episode_records = load_episode_records(root)
video_keys = [
key
for key, ft in load_info(root)["features"].items()
if ft.get("dtype") == "video"
]
video_keys = [key for key, ft in load_info(root)["features"].items() if ft.get("dtype") == "video"]
convert_info(root, new_root, episode_records, video_keys)
convert_tasks(root, new_root)
-102
View File
@@ -1,102 +0,0 @@
# Generic Converter
Shared conversion flow for turning task-based source datasets into LeRobot
datasets.
The generic package owns the execution mechanics:
- create one temporary `LeRobotDataset` per `ConversionTask`
- run tasks with a local or Ray Datatrove executor
- aggregate temporary datasets into the adapter output directory
- remove temporary task outputs by default
- optionally push the aggregated dataset to the Hub
Dataset-specific converters own the adapter logic:
- where raw inputs come from
- how tasks are discovered or loaded
- how one raw input is converted into LeRobot episodes
- how task metadata, such as language instructions, is represented
## Files
- `adapter.py`: `BaseAdapter`, the class dataset adapters inherit from.
- `pipeline.py`: the reusable conversion, executor, aggregation, cleanup, and push flow.
- `utils.py`: shared types and small helpers.
## Adapter Contract
A dataset converter should subclass `BaseAdapter`, pass `output_path` to the
base constructor, and provide dataset-level metadata as class attributes.
Required attributes:
- `dataset_type`
- `fps`
- `robot_type`
- `features`
Optional attributes:
- `tags`
Required methods:
- `load_tasks(self) -> list[ConversionTask]`
- `load_subset(self, task: ConversionTask) -> Iterable[Sequence[dict]]`
`run_converter` reads `adapter.output_path` and calls `adapter.load_tasks()`
without arguments. Store paths, task manifests, or other adapter options on the
adapter instance in `__init__`.
Use `adapter.temp_output_path` when building task-level temporary output paths.
`load_subset` receives the full `ConversionTask`, not just an input path. Use
`task.input_path` for raw data and `task.metadata` for dataset-specific values
such as language instructions. Each yielded episode must be a sequence of frame
dictionaries accepted by `LeRobotDataset.add_frame`; each frame should include
the LeRobot `task` field when language tasks are needed.
## ConversionTask
`ConversionTask` describes one independently convertible raw input:
- `input_path`: source file or directory
- `output_path`: temporary LeRobot dataset directory for this task
- `local_repo_id`: repo id used while writing the temporary dataset
- `metadata`: adapter-owned metadata
Keep dataset-specific values in `metadata`; the generic pipeline does not know
about task-file schemas or instruction formats.
## Usage Sketch
```python
from generic_converter import BaseAdapter, ConversionTask, run_converter
class MyAdapter(BaseAdapter):
dataset_type = "my_dataset"
fps = 20
robot_type = "my_robot"
features = MY_FEATURES
tags = ["my_dataset"]
def __init__(self, output_path):
super().__init__(output_path)
def load_tasks(self) -> list[ConversionTask]:
...
def load_subset(self, task: ConversionTask):
...
run_converter(
adapter=adapter,
executor="local",
cpus_per_task=1,
tasks_per_job=1,
workers=-1,
)
```
-11
View File
@@ -1,11 +0,0 @@
from .adapter import BaseAdapter
from .pipeline import run_converter
from .utils import ConversionTask, FeatureSpec, TaskMetadata
__all__ = [
"BaseAdapter",
"ConversionTask",
"FeatureSpec",
"TaskMetadata",
"run_converter",
]
-32
View File
@@ -1,32 +0,0 @@
from abc import ABC, abstractmethod
from collections.abc import Iterable, Sequence
from pathlib import Path
from .utils import ConversionTask, FeatureSpec
class BaseAdapter(ABC):
"""Dataset-specific hooks used by the generic conversion pipeline."""
dataset_type: str
fps: int
robot_type: str
features: FeatureSpec
tags: Sequence[str] = ()
def __init__(self, output_path: Path):
self.output_path = output_path.expanduser().resolve()
@property
def temp_output_path(self) -> Path:
return self.output_path.with_name(f"{self.output_path.name}_temp")
@abstractmethod
def load_tasks(self) -> list[ConversionTask]:
"""Build conversion tasks from dataset-specific inputs."""
@abstractmethod
def load_subset(
self, task: ConversionTask
) -> Iterable[Sequence[dict]]:
"""Yield LeRobot episodes for one raw input path."""
-226
View File
@@ -1,226 +0,0 @@
import os
import shutil
import sys
from collections.abc import Sequence
from pathlib import Path
from datatrove.pipeline.base import PipelineStep
from datatrove.utils.logging import get_random_str, get_timestamp
from lerobot.datasets import LeRobotDataset
from lerobot.datasets.aggregate import aggregate_datasets
from .adapter import BaseAdapter
from .utils import (
ConversionTask,
setup_logger,
unique_strings,
)
class SaveLeRobotDataset(PipelineStep):
name = "Save Temp LeRobotDataset"
def __init__(self, tasks: list[ConversionTask], adapter: BaseAdapter):
super().__init__()
self.tasks = tasks
self.adapter = adapter
self.type = f"{adapter.dataset_type}2lerobot"
def run(self, data=None, rank: int = 0, world_size: int = 1):
logger = setup_logger()
task = self.tasks[rank]
if task.output_path.exists():
shutil.rmtree(task.output_path)
dataset = LeRobotDataset.create(
repo_id=task.local_repo_id,
root=task.output_path,
fps=self.adapter.fps,
robot_type=self.adapter.robot_type,
features=self.adapter.features,
)
logger.info(
f"start processing for {task.input_path}, saving to {task.output_path}"
)
raw_dataset = self.adapter.load_subset(task)
for episode_index, episode_data in enumerate(raw_dataset):
with self.track_time("saving episode"):
for frame in episode_data:
dataset.add_frame(frame)
dataset.save_episode()
logger.info(
f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}"
)
dataset.finalize()
def run_converter(
adapter: BaseAdapter,
executor: str,
cpus_per_task: int,
tasks_per_job: int,
workers: int,
resume_dir: str | None = None,
debug: bool = False,
local_repo_id: str | None = None,
hub_repo_id: str | None = None,
push_to_hub: bool = False,
cleanup_temp: bool = True,
extra_tags: Sequence[str] | None = None,
) -> Path:
tasks = adapter.load_tasks()
output_path = adapter.output_path
if not tasks:
raise ValueError(
"No conversion tasks found. Provide a non-empty tasks file or matching source files."
)
if cpus_per_task < 1:
raise ValueError("--cpus-per-task must be >= 1")
output_path.mkdir(parents=True, exist_ok=True)
if debug:
executor = "local"
workers = 1
tasks = tasks[:2]
push_to_hub = False
match executor:
case "local":
from datatrove.executor import LocalPipelineExecutor
resolved_workers = (
max(1, (os.cpu_count() or 1) // cpus_per_task)
if workers == -1
else workers
)
executor_cls, executor_config = LocalPipelineExecutor, {
"tasks": len(tasks),
"workers": resolved_workers,
}
case "ray":
import ray
from datatrove.executor import RayPipelineExecutor
from ray.runtime_env import RuntimeEnv
runtime_env = RuntimeEnv(env_vars=_build_ray_env_vars())
ray.init(runtime_env=runtime_env)
executor_cls, executor_config = RayPipelineExecutor, {
"tasks": len(tasks),
"workers": workers,
"cpus_per_task": cpus_per_task,
"tasks_per_job": tasks_per_job,
}
case _:
raise ValueError(f"Executor {executor} not supported")
if resume_dir:
logging_dir = str(resume_dir)
else:
logging_dir = str(Path.cwd() / "logs" / f"{get_timestamp()}_{get_random_str()}")
executor_cls(
pipeline=[SaveLeRobotDataset(tasks, adapter)],
**executor_config,
logging_dir=logging_dir,
).run()
aggregate_tasks(tasks, output_path, aggr_repo_id=local_repo_id)
if cleanup_temp:
logger = setup_logger()
logger.info("Delete temp data_dir")
shutil.rmtree(adapter.temp_output_path, ignore_errors=True)
if push_to_hub:
if hub_repo_id is None:
raise ValueError("--repo-id is required when --push-to-hub is set")
tags = unique_strings(
[
"LeRobot",
adapter.dataset_type,
adapter.robot_type,
*adapter.tags,
*(extra_tags or []),
]
)
LeRobotDataset(
repo_id=hub_repo_id,
root=output_path,
).push_to_hub(
tags=tags,
private=False,
push_videos=True,
license="apache-2.0",
upload_large_folder=False,
)
return output_path
def _build_ray_env_vars() -> dict[str, str]:
env_vars = {
"HDF5_USE_FILE_LOCKING": "FALSE",
"HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE",
"SVT_LOG": "1",
}
pythonpath = _build_ray_pythonpath()
if pythonpath:
env_vars["PYTHONPATH"] = pythonpath
return env_vars
def _build_ray_pythonpath() -> str:
repo_root = Path(__file__).resolve().parents[1]
paths: list[str] = []
def add_path(path_value: str | Path):
path = Path(path_value).expanduser()
try:
path = path.resolve()
except OSError:
return
if not path.exists():
return
path_str = str(path)
if path_str not in paths:
paths.append(path_str)
add_path(repo_root)
add_path(Path.cwd())
for path in sys.path:
if path:
add_path(path)
for path in os.environ.get("PYTHONPATH", "").split(os.pathsep):
if path:
add_path(path)
return os.pathsep.join(paths)
def aggregate_tasks(
tasks: list[ConversionTask],
output_dir: Path,
aggr_repo_id: str | None = None,
):
logger = setup_logger()
if output_dir.exists():
shutil.rmtree(output_dir)
roots = [task.output_path for task in tasks]
resolved_aggr_repo_id = aggr_repo_id or output_dir.name
logger.info(
f"aggregate {len(tasks)} temporary datasets into {output_dir} as {resolved_aggr_repo_id}"
)
aggregate_datasets(
repo_ids=[None] * len(tasks),
roots=roots,
aggr_repo_id=resolved_aggr_repo_id,
aggr_root=output_dir,
)
logger.info(f"aggregation complete: {output_dir}")
-39
View File
@@ -1,39 +0,0 @@
import shutil
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
TaskMetadata = Mapping[str, Any]
FeatureSpec = Mapping[str, dict]
@dataclass(frozen=True)
class ConversionTask:
"""One independently convertible raw input file and adapter metadata."""
input_path: Path
output_path: Path
local_repo_id: str | None = None
metadata: TaskMetadata = field(default_factory=dict)
def setup_logger():
import sys
from datatrove.utils.logging import logger
logger.remove()
logger.add(sys.stdout, level="INFO", colorize=True)
return logger
def unique_strings(values: Sequence[str]) -> list[str]:
result = []
seen = set()
for value in values:
if value in seen:
continue
result.append(value)
seen.add(value)
return result
+20 -46
View File
@@ -8,13 +8,13 @@ In this dataset, we have made several key improvements:
- **OpenVLA-based LIBERO Regeneration**: Resolution enhancement, No-op action filtration, 180° RGB frame rotation, Failed trajectory filtering.
- **State Data Preservation**: Maintained native LIBERO state information (accessible via `states.ee_state`, `states.joint_state` and etc.).
- **Robust Conversion Pipeline**: Using the shared `generic_converter` pipeline with local and Ray DataTrove executors for high-speed dataset transformation and resumable conversion.
- **Robust Conversion Pipeline**: Using DataTrove framework for High-speed dataset transformation and automatic failure recovery during conversion
Dataset Structure of `meta/info.json`:
```json
{
"codebase_version": "v3.0", // latest lerobot format
"codebase_version": "v3.0", // lastest lerobot format
"robot_type": "franka", // specific robot type
"fps": 20, // control frequency
"features": {
@@ -41,30 +41,7 @@ Dataset Structure of `meta/info.json`:
"has_audio": false
}
},
"observation.images.wrist_image": {
"dtype": "video",
"shape": [
256,
256,
3
],
"names": [
"height",
"width",
"rgb"
],
"info": {
"video.height": 256,
"video.width": 256,
"video.codec": "av1",
"video.pix_fmt": "yuv420p",
"video.is_depth_map": false,
"video.fps": 20,
"video.channels": 3,
"has_audio": false
}
},
// for more state keys, see LiberoAdapter.features in libero_h5.py
// for more states key, see configs
"observation.state": {
"dtype": "float32",
"shape": [
@@ -75,9 +52,9 @@ Dataset Structure of `meta/info.json`:
"x",
"y",
"z",
"axis_angle1",
"axis_angle2",
"axis_angle3",
"roll",
"pitch",
"yaw",
"gripper",
"gripper"
]
@@ -94,9 +71,9 @@ Dataset Structure of `meta/info.json`:
"x",
"y",
"z",
"axis_angle1",
"axis_angle2",
"axis_angle3",
"roll",
"pitch",
"yaw",
"gripper"
]
}
@@ -112,33 +89,31 @@ Dataset Structure of `meta/info.json`:
Follow instructions in [official repo](https://github.com/huggingface/lerobot?tab=readme-ov-file#installation).
2. Install others:
We use DataTrove for conversion. Install the Ray extra if you want distributed execution across multiple cores or nodes.
We use datatrove[ray] for parallel conversion, significantly speeding up data processing tasks by distributing the workload across multiple cores or nodes (if any).
```bash
pip install h5py
pip install -U datatrove
pip install -U "datatrove[ray]" # optional, for --executor ray
pip install -U "datatrove[ray]" # if you want ray features
```
## Get started
> [!NOTE]
> This script supports converting LIBERO-style HDF5 directories to LeRobot. If you want to convert from RLDS to LeRobot, check [openx2lerobot](../openx2lerobot/README.md).
> This script supports converting from original hdf5 to lerobot. If you want to convert from rlds to lerobot, check [openx2lerobot](../openx2lerobot/README.md).
### Download source code:
```bash
git clone https://github.com/Tavish9/any4lerobot.git
cd any4lerobot/libero2lerobot
```
### Regenerate LIBERO Trajectory:
1. [Install LIBERO dependency](https://github.com/Lifelong-Robot-Learning/LIBERO?tab=readme-ov-file#installtion)
2. Replace `libero_90` with your target libero dataset.
3. The converter feature schema expects `256x256x3` RGB observations. If your source HDF5 files are the original `128x128` LIBERO files, regenerate them first with `--resolution 256`, or update the image feature shapes in `libero_h5.py` to match your data.
```bash
python regenerate_libero_dataset.py \
python libero_utils/regenerate_libero_dataset.py \
--resolution 256 \
--libero_task_suite libero_90 \
--libero_raw_data_dir /path/to/libero/datasets/libero_90 \
@@ -147,17 +122,16 @@ python regenerate_libero_dataset.py \
### Modify in `convert.sh`:
1. `--src-paths` accepts one or more directories containing `*.hdf5` LIBERO task files. To merge many suites into one LeRobot dataset, specify all source directories, for example `--src-paths /path/libero_10 /path/libero_90`.
2. `--output-path` is the final aggregated LeRobot dataset root. Temporary per-task datasets are written next to it under `<output-name>_temp` and removed after aggregation.
3. If you have installed `datatrove[ray]`, use `--executor ray` for faster conversion. Increase `--workers`, `--tasks-per-job`, and `--cpus-per-task` if you have enough CPU and memory.
4. To resume a previous conversion, pass the existing DataTrove log directory with `--resume-dir /path/to/logs/...`.
5. Use `--debug` for a small local smoke test. It converts only the first two tasks, forces local execution, and disables Hub upload.
6. Use `--repo-id <namespace/name>` together with `--push-to-hub` to upload the aggregated dataset. Without `--push-to-hub`, `--repo-id` only controls the local aggregate repo id.
1. If you have installed `datatrove[ray]`, we recommend using `ray` executor for faster conversion.
2. Increase `workers` and `tasks-per-job` if you have sufficient computing resources.
3. To merge many datasets into one, simply specify both paths like: `--src-paths /path/libero_10 /path/libero_90`
4. To resume from a previous conversion, provide the appropriate log directory using `--resume-from-save` and `--resume-from-aggregate`
5. If you want different image resolution, regenerate the trajectory, and change the [config](./libero_utils/config.py). (DO NOT use resize)
```bash
python libero_h5.py \
--src-paths /path/to/libero/datasets/libero_90_no_noops \
--output-path /path/to/local/libero_90_lerobot \
--src-paths /path/to/libero/ \
--output-path /path/to/local \
--executor local \
--tasks-per-job 3 \
--workers 10
+205 -131
View File
@@ -1,127 +1,144 @@
import argparse
import os
import re
import sys
from collections.abc import Iterable, Sequence
import shutil
from pathlib import Path
import numpy as np
from h5py import File
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from generic_converter import BaseAdapter, ConversionTask, run_converter # noqa: E402
import pandas as pd
import ray
from datatrove.executor import LocalPipelineExecutor, RayPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from lerobot.datasets.aggregate import (
aggregate_data,
aggregate_metadata,
aggregate_stats,
aggregate_videos,
validate_all_metadata,
)
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import (
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_FILE_SIZE_IN_MB,
write_info,
write_stats,
write_tasks,
)
from libero_utils.config import LIBERO_FEATURES
from libero_utils.libero_utils import load_local_episodes
from ray.runtime_env import RuntimeEnv
from tqdm import tqdm
class LiberoAdapter(BaseAdapter):
dataset_type = "libero"
fps = 20
robot_type = "franka"
features = {
"observation.images.image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.images.wrist_image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.state": {
"dtype": "float32",
"shape": (8,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper", "gripper"]},
},
"observation.states.ee_state": {
"dtype": "float32",
"shape": (6,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3"]},
},
"observation.states.joint_state": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]},
},
"observation.states.gripper_state": {
"dtype": "float32",
"shape": (2,),
"names": {"motors": ["gripper", "gripper"]},
},
"action": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper"]},
},
}
tags = ["libero", "franka"]
def setup_logger():
import sys
def __init__(self, src_paths: list[Path], output_path: Path):
super().__init__(output_path)
self.src_paths = src_paths
from datatrove.utils.logging import logger
def load_tasks(self) -> list[ConversionTask]:
tasks = []
for src_path in self.src_paths:
for input_h5 in src_path.glob("*.hdf5"):
pattern1 = re.compile(r"_SCENE\d+_(.*?)_demo\.hdf5")
pattern2 = re.compile(r"(.*?)_demo\.hdf5")
logger.remove()
logger.add(sys.stdout, level="INFO", colorize=True)
return logger
match = pattern1.search(input_h5.name)
if match is None:
match = pattern2.search(input_h5.name)
if match is None:
continue
else:
task_instruction = match.group(1).replace("_", " ")
tasks.append(
ConversionTask(
input_path=input_h5.resolve(),
output_path=(
self.temp_output_path
/ f"{src_path.name}"
/ input_h5.stem
).resolve(),
local_repo_id=f"{input_h5.parent.name}/{input_h5.name}",
metadata={"task": task_instruction},
)
)
return tasks
class SaveLerobotDataset(PipelineStep):
name = "Save Temp LerobotDataset"
type = "libero2lerobot"
def load_subset(self, task: ConversionTask) -> Iterable[Sequence[dict]]:
input_h5 = task.input_path
task_instruction = task.metadata.get("task")
with File(input_h5, "r") as f:
for demo in f["data"].values():
demo_len = len(demo["obs/agentview_rgb"])
# (-1: open, 1: close) -> (0: close, 1: open)
action = np.array(demo["actions"])
action = np.concatenate(
[
action[:, :6],
(1 - np.clip(action[:, -1], 0, 1))[:, None],
],
axis=1,
)
state = np.concatenate(
[
np.array(demo["obs/ee_states"]),
np.array(demo["obs/gripper_states"]),
],
axis=1,
)
episode = {
"observation.images.image": np.array(demo["obs/agentview_rgb"]),
"observation.images.wrist_image": np.array(demo["obs/eye_in_hand_rgb"]),
"observation.state": np.array(state, dtype=np.float32),
"observation.states.ee_state": np.array(demo["obs/ee_states"], dtype=np.float32),
"observation.states.joint_state": np.array(demo["obs/joint_states"], dtype=np.float32),
"observation.states.gripper_state": np.array(demo["obs/gripper_states"], dtype=np.float32),
"action": np.array(action, dtype=np.float32),
}
yield [{**{k: v[i] for k, v in episode.items()}, "task": task_instruction} for i in range(demo_len)]
def __init__(self, tasks: list[tuple[Path, Path, str]]):
super().__init__()
self.tasks = tasks
def run(self, data=None, rank: int = 0, world_size: int = 1):
logger = setup_logger()
input_h5, output_path, task_instruction = self.tasks[rank]
if output_path.exists():
shutil.rmtree(output_path)
dataset = LeRobotDataset.create(
repo_id=f"{input_h5.parent.name}/{input_h5.name}",
root=output_path,
fps=20,
robot_type="franka",
features=LIBERO_FEATURES,
)
logger.info(f"start processing for {input_h5}, saving to {output_path}")
raw_dataset = load_local_episodes(input_h5)
for episode_index, episode_data in enumerate(raw_dataset):
with self.track_time("saving episode"):
for frame_data in episode_data:
frame_data["task"] = task_instruction
dataset.add_frame(frame_data)
dataset.save_episode()
logger.info(f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}")
def create_aggr_dataset(raw_dirs: list[Path], aggregated_dir: Path):
logger = setup_logger()
all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in raw_dirs]
fps, robot_type, features = validate_all_metadata(all_metadata)
if aggregated_dir.exists():
shutil.rmtree(aggregated_dir)
aggr_meta = LeRobotDatasetMetadata.create(
repo_id=f"{aggregated_dir.parent.name}/{aggregated_dir.name}",
root=aggregated_dir,
fps=fps,
robot_type=robot_type,
features=features,
)
video_keys = [key for key in features if features[key]["dtype"] == "video"]
unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique()
aggr_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks)
meta_idx = {"chunk": 0, "file": 0}
data_idx = {"chunk": 0, "file": 0}
videos_idx = {key: {"chunk": 0, "file": 0, "latest_duration": 0, "episode_duration": 0} for key in video_keys}
aggr_meta.episodes = {}
for src_meta in tqdm(all_metadata, desc="Copy data and videos"):
videos_idx = aggregate_videos(
src_meta, aggr_meta, videos_idx, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE
)
data_idx = aggregate_data(src_meta, aggr_meta, data_idx, DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE)
meta_idx = aggregate_metadata(src_meta, aggr_meta, meta_idx, data_idx, videos_idx)
aggr_meta.info["total_episodes"] += src_meta.total_episodes
aggr_meta.info["total_frames"] += src_meta.total_frames
logger.info("write tasks")
write_tasks(aggr_meta.tasks, aggr_meta.root)
logger.info("write info")
aggr_meta.info.update(
{
"total_tasks": len(aggr_meta.tasks),
"total_episodes": sum(m.total_episodes for m in all_metadata),
"total_frames": sum(m.total_frames for m in all_metadata),
"splits": {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"},
}
)
write_info(aggr_meta.info, aggr_meta.root)
logger.info("write stats")
aggr_meta.stats = aggregate_stats([m.stats for m in all_metadata])
write_stats(aggr_meta.stats, aggr_meta.root)
def delete_temp_data(temp_dirs: list[Path]):
logger = setup_logger()
logger.info("Delete temp data_dir")
for temp_dir in temp_dirs:
shutil.rmtree(temp_dir)
def main(
@@ -131,25 +148,84 @@ def main(
cpus_per_task: int,
tasks_per_job: int,
workers: int,
resume_dir: Path | None = None,
resume_dir: Path = None,
debug: bool = False,
repo_id: str | None = None,
repo_id: str = None,
push_to_hub: bool = False,
):
adapter = LiberoAdapter(src_paths, output_path)
tasks = []
pattern1 = re.compile(r"_SCENE\d+_(.*?)_demo\.hdf5")
pattern2 = re.compile(r"(.*?)_demo\.hdf5")
for src_path in src_paths:
for input_h5 in src_path.glob("*.hdf5"):
match = pattern1.search(input_h5.name)
if match is None:
match = pattern2.search(input_h5.name)
if match is None:
continue
tasks.append(
(
input_h5,
(output_path / (src_path.name + "_temp") / input_h5.stem).resolve(),
match.group(1).replace("_", " "),
)
)
if len(src_paths) > 1:
aggregate_output_path = output_path / ("_".join([src_path.name for src_path in src_paths]) + "_aggregated_lerobot")
else:
aggregate_output_path = output_path / f"{src_paths[0].name}_lerobot"
aggregate_output_path = aggregate_output_path.resolve()
run_converter(
adapter=adapter,
executor=executor,
cpus_per_task=cpus_per_task,
tasks_per_job=tasks_per_job,
workers=workers,
resume_dir=resume_dir,
debug=debug,
local_repo_id=repo_id,
hub_repo_id=repo_id,
push_to_hub=push_to_hub,
)
if debug:
executor = "local"
workers = 1
tasks = tasks[:2]
push_to_hub = False
match executor:
case "local":
workers = os.cpu_count() // cpus_per_task if workers == -1 else workers
executor = LocalPipelineExecutor
case "ray":
runtime_env = RuntimeEnv(
env_vars={
"HDF5_USE_FILE_LOCKING": "FALSE",
"HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE",
"SVT_LOG": "1",
},
)
ray.init(runtime_env=runtime_env)
executor = RayPipelineExecutor
case _:
raise ValueError(f"Executor {executor} not supported")
executor_config = {
"tasks": len(tasks),
"workers": workers,
**({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job} if executor is RayPipelineExecutor else {}),
}
executor(pipeline=[SaveLerobotDataset(tasks)], **executor_config, logging_dir=resume_dir).run()
create_aggr_dataset([task[1] for task in tasks], aggregate_output_path)
delete_temp_data([task[1] for task in tasks])
for task in tasks:
shutil.rmtree(task[1].parent, ignore_errors=True)
if push_to_hub:
assert repo_id is not None
tags = ["LeRobot", "libero", "franka"]
tags.extend([src_path.name for src_path in src_paths])
LeRobotDataset(
repo_id=repo_id,
root=aggregate_output_path,
).push_to_hub(
tags=tags,
private=False,
push_videos=True,
license="apache-2.0",
upload_large_folder=False,
)
if __name__ == "__main__":
@@ -158,9 +234,7 @@ if __name__ == "__main__":
parser.add_argument("--output-path", type=Path, required=True)
parser.add_argument("--executor", type=str, choices=["local", "ray"], default="local")
parser.add_argument("--cpus-per-task", type=int, default=1)
parser.add_argument(
"--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray"
)
parser.add_argument("--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray")
parser.add_argument("--workers", type=int, default=-1, help="number of concurrent jobs to run")
parser.add_argument("--resume-dir", type=Path, help="logs directory to resume")
parser.add_argument("--debug", action="store_true")
+37
View File
@@ -0,0 +1,37 @@
LIBERO_FEATURES = {
"observation.images.image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.images.wrist_image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.state": {
"dtype": "float32",
"shape": (8,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper", "gripper"]},
},
"observation.states.ee_state": {
"dtype": "float32",
"shape": (6,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3"]},
},
"observation.states.joint_state": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]},
},
"observation.states.gripper_state": {
"dtype": "float32",
"shape": (2,),
"names": {"motors": ["gripper", "gripper"]},
},
"action": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper"]},
},
}
@@ -0,0 +1,36 @@
from pathlib import Path
import numpy as np
from h5py import File
def load_local_episodes(input_h5: Path):
with File(input_h5, "r") as f:
for demo in f["data"].values():
demo_len = len(demo["obs/agentview_rgb"])
# (-1: open, 1: close) -> (0: close, 1: open)
action = np.array(demo["actions"])
action = np.concatenate(
[
action[:, :6],
(1 - np.clip(action[:, -1], 0, 1))[:, None],
],
axis=1,
)
state = np.concatenate(
[
np.array(demo["obs/ee_states"]),
np.array(demo["obs/gripper_states"]),
],
axis=1,
)
episode = {
"observation.images.image": np.array(demo["obs/agentview_rgb"]),
"observation.images.wrist_image": np.array(demo["obs/eye_in_hand_rgb"]),
"observation.state": np.array(state, dtype=np.float32),
"observation.states.ee_state": np.array(demo["obs/ee_states"], dtype=np.float32),
"observation.states.joint_state": np.array(demo["obs/joint_states"], dtype=np.float32),
"observation.states.gripper_state": np.array(demo["obs/gripper_states"], dtype=np.float32),
"action": np.array(action, dtype=np.float32),
}
yield [{**{k: v[i] for k, v in episode.items()}} for i in range(demo_len)]
-144
View File
@@ -1,144 +0,0 @@
# ROBOCASA TO LEROBOT
## ROBOCASA installation
- Clone this repo: https://github.com/robocasa/robocasa
- Follow README.md to install packages and download assets
## Data Preparation
- Check files: `robocasa/scripts/download_datasets.py`, `robocasa/utils/dataset_registry.py`
- Download original datasets by python scripts or wget/curl (recommended)
## Example:
```bash
wget https://utexas.box.com/shared/static/7y9csrcx6uhhq4p3yctmm2df3rjqpw6g.hdf5 -O PnPCounterToCab.hdf5
```
- Extract subset data: Original hdf5 files contain about 3000 episodes. However, it contains a key "masks", which contain list of subset demo_ids. For example: 30_demos : `[demo123, demo234, demo345, etc.]`.Run the code in the notebook to extract only chosen subset demos, which is much smaller and easier for later processes.
## Code Modification
- Add functions in `camera_utils.py` to your `robosuite/robosuite/utils/camera_utils.py` for camera parameters extraction (May be useful for experiments which requires multiview rendering)
- Change args to render depth and segmentation masks for new regenerated dataset. Change in `robocasa/environments/kitchen/kitchen.py`
```python
class Kitchen(ManipulationEnv, metaclass=KitchenEnvMeta):
...
EXCLUDE_LAYOUTS = []
def __init__(
self,
robots,
env_configuration="default",
controller_configs=None,
gripper_types="default",
base_types="default",
initialization_noise="default",
use_camera_obs=True,
use_object_obs=True, # currently unused variable
reward_scale=1.0, # currently unused variable
reward_shaping=False, # currently unused variables
placement_initializer=None,
has_renderer=False,
has_offscreen_renderer=True,
render_camera="robot0_agentview_center",
render_collision_mesh=False,
render_visual_mesh=True,
render_gpu_device_id=-1,
control_freq=20,
horizon=1000,
ignore_done=True,
hard_reset=True,
camera_names="agentview",
camera_heights=256,
camera_widths=256,
camera_depths=False, # -> True
renderer="mjviewer",
renderer_config=None,
init_robot_base_pos=None,
seed=None,
layout_and_style_ids=None,
layout_ids=None,
style_ids=None,
scene_split=None, # unsued, for backwards compatibility
generative_textures=None,
obj_registries=("objaverse",),
obj_instance_split=None,
use_distractors=False,
translucent_robot=False,
randomize_cameras=False,
camera_segmentations="instance", # add camera segmentation here: semantic/instance/element
):
...
super().__init__(
robots=robots,
env_configuration=env_configuration,
controller_configs=controller_configs,
base_types=base_types,
gripper_types=gripper_types,
initialization_noise=initialization_noise,
use_camera_obs=use_camera_obs,
has_renderer=has_renderer,
has_offscreen_renderer=has_offscreen_renderer,
render_camera=render_camera,
render_collision_mesh=render_collision_mesh,
render_visual_mesh=render_visual_mesh,
render_gpu_device_id=render_gpu_device_id,
control_freq=control_freq,
lite_physics=True,
horizon=horizon,
ignore_done=ignore_done,
hard_reset=hard_reset,
camera_names=camera_names,
camera_heights=camera_heights,
camera_widths=camera_widths,
camera_depths=camera_depths,
camera_segmentations=camera_segmentations, # add camera segmentation here
renderer=renderer,
renderer_config=renderer_config,
seed=seed,
)
```
## Regenerate
- Check file: `regenerate.py`
- Original dataset contain image in 128x128 resolution and does not contain segmentation mask, depth, etc. We need to rerender it in 256x256 and save segmentation mask, and depth
- Overall re-render flow:
- (1) load hdf5 file and create env
- (2) reset env to first state in the dataset
- (3) Execute action in action label of original dataset, at each step, we collect observation data, camera parameters, state, etc. from simulation.
- (4) Save only successful episode to new hdf5 file (original data contain unsuccessful episode or wrong action)
- Change `origin_dir` and `regenerate_dir` to your dir in `regenerate.py` then run `python regenerate.py` to regenerate
## Get started
1. Download source code:
```bash
git clone https://github.com/Tavish9/any4lerobot.git
```
2. Modify path in `convert.sh`:
```bash
python robocasa_h5.py \
--raw-dir /path/to/your/hdf5/files \
--repo-id your_hf_id \
--local-dir /path/to/your/output/dataset
```
3. Execute the script:
```bash
bash convert.sh
```
## Example output datasets:
- ROBOCASA 100 demos: https://huggingface.co/datasets/binhng/robocasa_merged_24_tasks_100demos_v1
- ROBOCASA 30 demos: https://huggingface.co/datasets/binhng/robocasa_merged_24_tasks_30demos_v3
-4
View File
@@ -1,4 +0,0 @@
python robocasa_h5.py \
--raw-dir /path/to/your/hdf5/files \
--repo-id your_hf_id \
--local-dir /path/to/your/output/dataset
-107
View File
@@ -1,107 +0,0 @@
import argparse
import json
import shutil
from pathlib import Path
import h5py
import numpy as np
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from tqdm import tqdm
def main(raw_dir: Path, repo_id: str, local_dir: Path):
if local_dir.exists():
shutil.rmtree(local_dir)
dataset = LeRobotDataset.create(
repo_id=repo_id,
robot_type="PandaOmron",
root=local_dir,
fps=20,
features={
"observation.images.robot0_agentview_right": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "channel"],
},
"observation.images.robot0_agentview_left": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "channel"],
},
"observation.images.robot0_eye_in_hand": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "channel"],
},
"observation.state": {
"dtype": "float32",
"shape": (9,),
"names": ["state"],
},
"action": {
"dtype": "float32",
"shape": (12,),
"names": ["actions"],
},
},
)
for dataset_path in raw_dir.glob("**/*.hdf5"):
with h5py.File(dataset_path, "r") as raw_dataset:
demos = raw_dataset["data"].keys()
for demo in tqdm(demos):
demo_length = len(raw_dataset["data"][demo]["actions"])
demo_data = raw_dataset["data"][demo]
left_images = demo_data["obs"]["robot0_agentview_left_image"][:]
right_images = demo_data["obs"]["robot0_agentview_right_image"][:]
wrist_images = demo_data["obs"]["robot0_eye_in_hand_image"][:]
states = np.concatenate(
(
demo_data["obs"]["robot0_base_to_eef_pos"][:],
demo_data["obs"]["robot0_base_to_eef_quat"][:],
demo_data["obs"]["robot0_gripper_qpos"][:],
),
axis=1,
)
actions = demo_data["actions"][:]
for i in range(demo_length):
ep_meta = demo_data.attrs["ep_meta"]
ep_meta = json.loads(ep_meta)
lang = ep_meta["lang"]
dataset.add_frame(
{
"observation.images.robot0_agentview_right": right_images[i],
"observation.images.robot0_agentview_left": left_images[i],
"observation.images.robot0_eye_in_hand": wrist_images[i],
"observation.state": states[i].astype(np.float32),
"action": actions[i].astype(np.float32),
"task": lang,
},
)
dataset.save_episode()
dataset.finalize()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--raw-dir",
type=Path,
required=True,
help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).",
)
parser.add_argument(
"--local-dir",
type=Path,
required=True,
help="When provided, writes the dataset converted to LeRobotDataset format in this directory (e.g. `data/lerobot/aloha_mobile_chair`).",
)
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True",
)
args = parser.parse_args()
main(raw_dir=args.raw_dir, repo_id=args.repo_id, local_dir=args.local_dir)
@@ -1,78 +0,0 @@
import numpy as np
import robosuite.utils.transform_utils as T
def get_camera_intrinsic_matrix(sim, camera_name, camera_height, camera_width):
"""
Obtains camera intrinsic matrix.
Args:
sim (MjSim): simulator instance
camera_name (str): name of camera
camera_height (int): height of camera images in pixels
camera_width (int): width of camera images in pixels
Return:
K (np.array): 3x3 camera matrix
"""
cam_id = sim.model.camera_name2id(camera_name)
fovy = sim.model.cam_fovy[cam_id]
f = 0.5 * camera_height / np.tan(fovy * np.pi / 360)
K = np.array([[f, 0, camera_width / 2], [0, f, camera_height / 2], [0, 0, 1]])
return K
def get_camera_extrinsic_matrix(sim, camera_name):
"""
Returns a 4x4 homogenous matrix corresponding to the camera pose in the
world frame. MuJoCo has a weird convention for how it sets up the
camera body axis, so we also apply a correction so that the x and y
axis are along the camera view and the z axis points along the
viewpoint.
Normal camera convention: https://docs.opencv.org/2.4/modules/calib3d/doc/camera_calibration_and_3d_reconstruction.html
Args:
sim (MjSim): simulator instance
camera_name (str): name of camera
Return:
R (np.array): 4x4 camera extrinsic matrix
"""
cam_id = sim.model.camera_name2id(camera_name)
camera_pos = sim.data.cam_xpos[cam_id]
camera_rot = sim.data.cam_xmat[cam_id].reshape(3, 3)
R = T.make_pose(camera_pos, camera_rot)
# IMPORTANT! This is a correction so that the camera axis is set up along the viewpoint correctly.
camera_axis_correction = np.array(
[[1.0, 0.0, 0.0, 0.0], [0.0, -1.0, 0.0, 0.0], [0.0, 0.0, -1.0, 0.0], [0.0, 0.0, 0.0, 1.0]]
)
R = R @ camera_axis_correction
return R
def get_camera_extrinsic_matrix_rel(sim, camera_name):
"""
Returns a 4x4 homogenous matrix corresponding to the camera pose in the
world frame. MuJoCo has a weird convention for how it sets up the
camera body axis, so we also apply a correction so that the x and y
axis are along the camera view and the z axis points along the
viewpoint.
Normal camera convention: https://docs.opencv.org/2.4/modules/calib3d/doc/camera_calibration_and_3d_reconstruction.html
Args:
sim (MjSim): simulator instance
camera_name (str): name of camera
Return:
R (np.array): 4x4 camera extrinsic matrix
"""
cam_id = sim.model.camera_name2id(camera_name)
camera_pos = sim.model.cam_pos[cam_id]
camera_quat = sim.model.cam_quat[cam_id]
camera_rot = T.quat2mat(camera_quat)
R = T.make_pose(camera_pos, camera_rot)
# IMPORTANT! This is a correction so that the camera axis is set up along the viewpoint correctly.
camera_axis_correction = np.array(
[[1.0, 0.0, 0.0, 0.0], [0.0, -1.0, 0.0, 0.0], [0.0, 0.0, -1.0, 0.0], [0.0, 0.0, 0.0, 1.0]]
)
R = R @ camera_axis_correction
return R
@@ -1,70 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "44b6da09",
"metadata": {},
"source": [
"# Extract subset data \n",
"\n",
"Original hdf5 file contains about 3000 episodes. However, it contains a key \"masks\", which contain list of subset demo_ids. For example: 30_demos : [demo123, demo234, demo 345, etc.]\n",
"\n",
"Run the code bellow to extract only chosen subset demos, which is much smaller and easier for later process."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6ac64550",
"metadata": {},
"outputs": [],
"source": [
"import h5py\n",
"\n",
"DATA_DIR=\"direction/to/your/hdf5/files/\"\n",
"# E.x: DATA_DIR=\"/projects/extern/kisski/kisski-spath/dir.project/VLA_3D/binh/robocasa/test\"\n",
"\n",
"# file_name = \"PnPCabToCounter.hdf5\"\n",
"# file_name = \"PnPCounterToCab.hdf5\"\n",
"# file_name = \"CoffeeSetupMug.hdf5\"\n",
"# file_name = \"TurnOnMicrowave.hdf5\"\n",
"file_name = \"TurnOffStove.hdf5\"\n",
"\n",
"file_path = DATA_DIR + \"/\" + file_name\n",
"\n",
"f = h5py.File(file_path, 'r')\n",
"chosen_demo_list = []\n",
"for i in f['mask']['100_demos'][:]: # or \"30_demos\"\n",
" chosen_demo_list.append(i.decode('utf-8'))\n",
" \n",
"chosen_data = []\n",
"for k in f['data'].keys():\n",
" if k in chosen_demo_list:\n",
" chosen_data.append(f['data'][k])\n",
" \n",
"with h5py.File(f\"direction_to_your_new_extracted_subset/{file_name}\", \"w\") as out:\n",
" out_data = out.create_group(\"data\")\n",
" \n",
" for key, val in f['data'].attrs.items():\n",
" out_data.attrs[key] = val # IMPORTANT: set attributes for new hdf5 files (need for reset env and later re-render)\n",
"\n",
" for grp in chosen_data:\n",
" name = grp.name.split(\"/\")[-1] # demo_xxx\n",
" grp.file.copy(grp, out_data, name=name)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "robocasa",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.10.19"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@@ -1,277 +0,0 @@
import json
import os
import h5py
import numpy as np
import robosuite
from robocasa.scripts.playback_dataset import reset_to
from robosuite.utils.camera_utils import (
get_camera_extrinsic_matrix,
get_camera_extrinsic_matrix_rel,
get_camera_intrinsic_matrix,
)
from tqdm import tqdm
ROBOCASA_DUMMY_ACTION = [0.0] * 6 + [-1.0] + [0.0] * 4 + [-1.0]
def get_camera_info(sim, camera_name, camera_height, camera_width):
camera_intrinsics = get_camera_intrinsic_matrix(sim, camera_name, camera_height, camera_width)
camera_extrinsics = get_camera_extrinsic_matrix(sim, camera_name)
return camera_intrinsics, camera_extrinsics
def creat_env_from_hdf5(f):
env_meta = json.loads(f["data"].attrs["env_args"])
env_meta["env_kwargs"]["camera_depths"] = True
env_meta["env_kwargs"]["camera_heights"] = 256
env_meta["env_kwargs"]["camera_widths"] = 256
env_meta["env_kwargs"]["camera_segmentations"] = "element" # element' #'instance'
# f.close()
env_kwargs = env_meta["env_kwargs"]
env_kwargs["env_name"] = env_meta["env_name"]
env_kwargs["has_renderer"] = False
env_kwargs["renderer"] = "mjviewer"
env_kwargs["has_offscreen_renderer"] = True # write_video
env_kwargs["use_camera_obs"] = True
env_kwargs["ignore_done"] = False
env = robosuite.make(**env_kwargs)
return env, env_meta
def reset_each_demo(env, demo):
# demo = f["data"]["demo_<idx>"]
model_xml = demo.attrs["model_file"]
init_state = demo["states"][()][0]
ep_meta = demo.attrs["ep_meta"]
state = {"states": init_state, "model": model_xml, "ep_meta": ep_meta}
reset_to(env, state)
def process_1_demo(env, f, demo_id, grp):
demo = f["data"][demo_id]
reset_each_demo(env, demo)
ep_meta = env.get_ep_meta()
model_file = env.model.get_xml()
for _ in range(10):
obs, reward, done, info = env.step(ROBOCASA_DUMMY_ACTION)
obs_keys = list(obs.keys())
obs_keys += [
"robot0_agentview_left_intrinsics",
"robot0_agentview_right_intrinsics",
"robot0_eye_in_hand_intrinsics",
]
obs_keys += [
"robot0_agentview_left_extrinsics",
"robot0_agentview_right_extrinsics",
"robot0_eye_in_hand_extrinsics",
]
obs_keys += [
"robot0_agentview_left_extrinsicsR",
"robot0_agentview_right_extrinsicsR",
"robot0_eye_in_hand_extrinsicsR",
]
obs_keys += ["robot0_agentview_left_depthW", "robot0_agentview_right_depthW", "robot0_eye_in_hand_depthW"]
obs_dict = {key: [] for key in obs_keys}
# action_dict = {key: [] for key in act_keys}
actions = []
actions_abs = []
rewards = []
dones = []
states = [] # env state, not robot. The state for robot is included in obs
# for key in obs_keys:
# obs_dict[key] = obs[key]
orig_actions = demo["actions"][()]
orig_actions_abs = demo["actions_abs"][()]
# orig_action_dict = demo['action_dict']
for i, action in enumerate(orig_actions):
# for i, action in enumerate(orig_actions_abs):
extent = env.sim.model.stat.extent
far = env.sim.model.vis.map.zfar * extent
near = env.sim.model.vis.map.znear * extent
left_depth = obs["robot0_agentview_left_depth"].copy()
right_depth = obs["robot0_agentview_right_depth"].copy()
wrist_depth = obs["robot0_eye_in_hand_depth"].copy()
left_depth = near / (1.0 - left_depth * (1.0 - near / far))[::-1]
right_depth = near / (1.0 - right_depth * (1.0 - near / far))[::-1]
wrist_depth = near / (1.0 - wrist_depth * (1.0 - near / far))[::-1]
obs["robot0_agentview_left_depthW"] = left_depth
obs["robot0_agentview_right_depthW"] = right_depth
obs["robot0_eye_in_hand_depthW"] = wrist_depth
left_intrinsics, left_extrinsics = get_camera_info(env.sim, "robot0_agentview_left", 256, 256)
right_intrinsics, right_extrinsics = get_camera_info(env.sim, "robot0_agentview_right", 256, 256)
wrist_intrinsics, wrist_extrinsics = get_camera_info(env.sim, "robot0_eye_in_hand", 256, 256)
obs["robot0_agentview_left_intrinsics"] = left_intrinsics
obs["robot0_agentview_right_intrinsics"] = right_intrinsics
obs["robot0_eye_in_hand_intrinsics"] = wrist_intrinsics
obs["robot0_agentview_left_extrinsics"] = left_extrinsics
obs["robot0_agentview_right_extrinsics"] = right_extrinsics
obs["robot0_eye_in_hand_extrinsics"] = wrist_extrinsics
left_intrinsics_rel = get_camera_extrinsic_matrix_rel(env.sim, "robot0_agentview_left")
right_intrinsics_rel = get_camera_extrinsic_matrix_rel(env.sim, "robot0_agentview_right")
wrist_intrinsics_rel = get_camera_extrinsic_matrix_rel(env.sim, "robot0_eye_in_hand")
obs["robot0_agentview_left_extrinsicsR"] = left_intrinsics_rel
obs["robot0_agentview_right_extrinsicsR"] = right_intrinsics_rel
obs["robot0_eye_in_hand_extrinsicsR"] = wrist_intrinsics_rel
# append all keys
for key in obs_keys:
if (
("eye_in_hand" in key or "agentview" in key)
and "depthW" not in key
and "intrinsics" not in key
and "extrinsics" not in key
):
obs_dict[key].append(obs[key][::-1, :, :])
else:
obs_dict[key].append(obs[key])
# for key in act_keys:
# action_dict[key].append(orig_action_dict[key][i])
actions.append(action)
actions_abs.append(orig_actions_abs[i])
rewards.append(reward)
dones.append(done)
current_state = env.sim.get_state().flatten()
states.append(current_state)
# step env
obs, reward, done, info = env.step(action.tolist())
done = done or env._check_success()
# if done:
# print(f" Step {i} done: {done}")
# print(f" Step {i} info: {info}")
# print(f" Step {i} is_success: {env._check_success()}" )
# save successful episode only
if done:
print(f"Demo {demo_id} is done after {i} actions! -> SAVE!!!")
# save to new hdf5 file here
ep_data = grp.create_group(demo_id)
# set attribute for ep_data here ...
ep_data.attrs["model_file"] = model_file
ep_data.attrs["ep_meta"] = json.dumps(ep_meta, indent=4)
# obs group
obs_grp = ep_data.create_group("obs")
for key in obs_keys:
obs_grp.create_dataset(key, data=np.stack(obs_dict[key], axis=0))
# actions dataset
ep_data.create_dataset("actions", data=np.stack(actions, axis=0))
ep_data.create_dataset("actions_abs", data=np.stack(actions_abs, axis=0))
ep_data.create_dataset("dones", data=np.stack(dones, axis=0))
ep_data.create_dataset("rewards", data=np.stack(rewards, axis=0))
# state dataset
ep_data.create_dataset("states", data=np.stack(states, axis=0))
elif not done:
print(f"Demo {demo_id} not done after all actions executed! -> does not SAVE!")
def regenerate_hdf5_dataset(input_path, output_path, debug=False):
f = h5py.File(input_path, "r")
env, env_meta = creat_env_from_hdf5(f)
out_f = h5py.File(output_path, "w")
out_f.attrs["env_args"] = json.dumps(env_meta)
grp = out_f.create_group("data")
all_demo_ids = list(f["data"].keys())
if debug:
all_demo_ids = all_demo_ids[: min(2, len(all_demo_ids))]
for demo_id in tqdm(all_demo_ids):
print(f"Processing {demo_id} ...")
process_1_demo(env, f, demo_id, grp)
f.close()
if len(out_f["data"].keys()) == 0:
print("No demos were processed successfully. Deleting output file.")
out_f.close()
os.remove(output_path)
else:
print(f"Processed data saved {len(out_f['data'].keys())} demos to {output_path}")
out_f.close()
def process_task_wrapper(args):
"""Wrapper function for multiprocessing to process a single task."""
task, origin_dir, regenerate_dir, debug = args
input_path = os.path.join(origin_dir, f"{task}.hdf5")
output_path = os.path.join(regenerate_dir, f"{task}.hdf5")
print(f"Regenerating dataset for task {task} ...")
regenerate_hdf5_dataset(input_path, output_path, debug=debug)
print(f"Completed task {task}")
if __name__ == "__main__":
n_demo = 100 # 100
origin_dir = f"<directory/contain/original/hdf5/files/>"
regenerate_dir = f"<directory/contain/regenerated/hdf5/files/>"
os.makedirs(regenerate_dir, exist_ok=True)
task_list = [
"PnPCabToCounter",
"PnPCounterToCab",
"CoffeeSetupMug",
"TurnOffStove",
"TurnOnMicrowave",
# ... add other tasks as needed
# "CoffeePressButton",
# "CoffeeServeMug",
# "TurnOffMicrowave",
# "TurnOffSinkFaucet",
# "TurnOnSinkFaucet",
# "TurnOnStove",
# "TurnSinkSpout"
# "CloseDoubleDoor",
# "CloseDrawer",
# "CloseSingleDoor",
# "OpenDoubleDoor",
# "OpenDrawer",
# "OpenSingleDoor"
# "PnPCounterToMicrowave",
# "PnPCounterToSink",
# "PnPCounterToStove",
# "PnPMicrowaveToCounter",
# "PnPSinkToCounter",
# "PnPStoveToCounter"
] # 24 tasks in robocasa kitchen dataset
debug = False
if debug:
task_list = task_list[:2]
for task in task_list:
input_path = os.path.join(origin_dir, f"{task}.hdf5")
output_path = os.path.join(regenerate_dir, f"{task}.hdf5")
print(f"Regenerating dataset for task {task} ...")
regenerate_hdf5_dataset(input_path, output_path, debug=False)
-2
View File
@@ -182,8 +182,6 @@ Dataset Structure of `meta/info.json`:
> │ └── h5_ur_1rgb.json
> └── RoboMIND_v1_2_instr.csv
> ```
>
> 3. Due the format mismatch in simulation dataset, this script skips `simulation`, `sim_franka_3rgb`, `sim_tienkung_1rgb`.
> [!NOTE]
> The conversion speed of this script is limited by the performance of the physical machine running it, including **CPU cores and memory**. We recommend using **2 CPU cores per task** for optimal performance. However, each task requires approximately 10 GiB of memory. To avoid running out of memory, you may need to increase the number of CPU cores per task depending on your systems available memory.
+98 -156
View File
@@ -1,6 +1,5 @@
import argparse
import concurrent.futures
import inspect
import gc
import json
import logging
import shutil
@@ -8,15 +7,11 @@ from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import ray
from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.compute_stats import aggregate_stats
from lerobot.datasets.dataset_writer import DatasetWriter, _encode_video_worker
from lerobot.datasets.feature_utils import validate_episode_buffer
from lerobot.datasets.io_utils import write_info, write_stats
from lerobot.datasets.utils import DEFAULT_EPISODES_PATH, flatten_dict
from lerobot.datasets.lerobot_dataset import VALID_VIDEO_CODECS, LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import flatten_dict, validate_episode_buffer, write_info, write_stats
from lerobot.datasets.video_utils import get_safe_default_codec
from ray.runtime_env import RuntimeEnv
from robomind_uitls.configs import ROBOMIND_CONFIG
from robomind_uitls.lerobot_uitls import compute_episode_stats, generate_features_from_config
@@ -26,38 +21,6 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(
class RoboMINDDatasetMetadata(LeRobotDatasetMetadata):
def _flush_metadata_buffer(self) -> None:
"""Write all buffered episode metadata to parquet file."""
if not hasattr(self, "_metadata_buffer") or len(self._metadata_buffer) == 0:
return
combined_dict = {}
for episode_dict in self._metadata_buffer:
for key, value in episode_dict.items():
if key not in combined_dict:
combined_dict[key] = []
# Extract value and serialize numpy arrays
# because PyArrow's from_pydict function doesn't support numpy arrays
val = value[0] if isinstance(value, list) else value
combined_dict[key].append(val.tolist() if isinstance(val, np.ndarray) else val)
first_ep = self._metadata_buffer[0]
chunk_idx = first_ep["meta/episodes/chunk_index"][0]
file_idx = first_ep["meta/episodes/file_index"][0]
schema = None if not self._pq_writer else self._pq_writer.schema
table = pa.Table.from_pydict(combined_dict, schema=schema)
if not self._pq_writer:
path = Path(self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx))
path.parent.mkdir(parents=True, exist_ok=True)
self._pq_writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
self._pq_writer.write_table(table)
self.latest_episode = self._metadata_buffer[-1]
self._metadata_buffer.clear()
def save_episode(
self,
split,
@@ -92,18 +55,78 @@ class RoboMINDDatasetMetadata(LeRobotDatasetMetadata):
write_stats(self.stats, self.root)
class RoboMINDDatasetWriter(DatasetWriter):
def save_episode(
self,
split,
action_config: dict,
episode_data: dict | None = None,
parallel_encoding: bool = True,
) -> None:
"""Save the current episode in self.episode_buffer to disk."""
class RoboMINDDataset(LeRobotDataset):
@classmethod
def create(
cls,
repo_id: str,
fps: int,
features: dict,
root: str | Path | None = None,
robot_type: str | None = None,
use_videos: bool = True,
tolerance_s: float = 1e-4,
image_writer_processes: int = 0,
image_writer_threads: int = 0,
video_backend: str | None = None,
batch_encoding_size: int = 1,
vcodec: str = "libsvtav1",
) -> "LeRobotDataset":
"""Create a LeRobot Dataset from scratch in order to record data."""
if vcodec not in VALID_VIDEO_CODECS:
raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}")
obj = cls.__new__(cls)
obj.meta = RoboMINDDatasetMetadata.create(
repo_id=repo_id,
fps=fps,
robot_type=robot_type,
features=features,
root=root,
use_videos=use_videos,
)
obj.repo_id = obj.meta.repo_id
obj.root = obj.meta.root
obj.revision = None
obj.tolerance_s = tolerance_s
obj.image_writer = None
obj.batch_encoding_size = batch_encoding_size
obj.episodes_since_last_encoding = 0
obj.vcodec = vcodec
if image_writer_processes or image_writer_threads:
obj.start_image_writer(image_writer_processes, image_writer_threads)
# TODO(aliberts, rcadene, alexander-soare): Merge this with OnlineBuffer/DataBuffer
obj.episode_buffer = obj.create_episode_buffer()
obj.episodes = None
obj.hf_dataset = obj.create_hf_dataset()
obj.image_transforms = None
obj.delta_timestamps = None
obj.delta_indices = None
obj._absolute_to_relative_idx = None
obj.video_backend = video_backend if video_backend is not None else get_safe_default_codec()
obj.writer = None
obj.latest_episode = None
obj._current_file_start_frame = None
# Initialize tracking for incremental recording
obj._lazy_loading = False
obj._recorded_frames = 0
obj._writer_closed_for_reading = False
return obj
def save_episode(self, split, action_config: dict, episode_data: dict | None = None) -> None:
"""
This will save to disk the current episode in self.episode_buffer.
Args:
episode_data (dict | None, optional): Dict containing the episode data to save. If None, this will
save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to
None.
"""
episode_buffer = episode_data if episode_data is not None else self.episode_buffer
validate_episode_buffer(episode_buffer, self._meta.total_episodes, self._meta.features)
validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features)
# size and task are special cases that won't be added to hf_dataset
episode_length = episode_buffer.pop("size")
@@ -111,131 +134,49 @@ class RoboMINDDatasetWriter(DatasetWriter):
episode_tasks = list(set(tasks))
episode_index = episode_buffer["episode_index"]
episode_buffer["index"] = np.arange(self._meta.total_frames, self._meta.total_frames + episode_length)
episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length)
episode_buffer["episode_index"] = np.full((episode_length,), episode_index)
# Update tasks and task indices with new tasks if any
self._meta.save_episode_tasks(episode_tasks)
self.meta.save_episode_tasks(episode_tasks)
# Given tasks in natural language, find their corresponding task indices
episode_buffer["task_index"] = np.array([self._meta.get_task_index(task) for task in tasks])
episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks])
for key, ft in self._meta.features.items():
for key, ft in self.features.items():
# index, episode_index, task_index are already processed above, and image and video
# are processed separately by storing image path and frame info as meta data
if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]:
continue
episode_buffer[key] = np.stack(episode_buffer[key]).squeeze()
# Wait for image writer to end, so that episode stats over images can be computed
self._wait_image_writer()
has_video_keys = len(self._meta.video_keys) > 0
use_streaming = self._streaming_encoder is not None and has_video_keys
use_batched_encoding = self._batch_encoding_size > 1
if use_streaming:
non_video_buffer = {
k: v for k, v in episode_buffer.items() if self._meta.features.get(k, {}).get("dtype") not in ("video",)
}
non_video_features = {k: v for k, v in self._meta.features.items() if v["dtype"] != "video"}
ep_stats = compute_episode_stats(non_video_buffer, non_video_features)
else:
ep_stats = compute_episode_stats(episode_buffer, self._meta.features)
ep_stats = compute_episode_stats(episode_buffer, self.features)
ep_metadata = self._save_episode_data(episode_buffer)
has_video_keys = len(self.meta.video_keys) > 0
use_batched_encoding = self.batch_encoding_size > 1
if use_streaming:
streaming_results = self._streaming_encoder.finish_episode()
for video_key in self._meta.video_keys:
temp_path, video_stats = streaming_results[video_key]
if video_stats is not None:
ep_stats[video_key] = {
k: v if k == "count" else np.squeeze(v.reshape(1, -1, 1, 1) / 255.0, axis=0)
for k, v in video_stats.items()
}
ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path))
elif has_video_keys and not use_batched_encoding:
num_cameras = len(self._meta.video_keys)
if parallel_encoding and num_cameras > 1:
with concurrent.futures.ProcessPoolExecutor(max_workers=num_cameras) as executor:
future_to_key = {
executor.submit(
_encode_video_worker,
video_key,
episode_index,
self._root,
self._meta.fps,
self._vcodec,
self._encoder_threads,
): video_key
for video_key in self._meta.video_keys
}
results = {}
for future in concurrent.futures.as_completed(future_to_key):
video_key = future_to_key[future]
try:
temp_path = future.result()
results[video_key] = temp_path
except Exception as exc:
logging.error(f"Video encoding failed for {video_key}: {exc}")
raise exc
for video_key in self._meta.video_keys:
temp_path = results[video_key]
ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path))
else:
for video_key in self._meta.video_keys:
ep_metadata.update(self._save_episode_video(video_key, episode_index))
if has_video_keys and not use_batched_encoding:
for video_key in self.meta.video_keys:
ep_metadata.update(self._save_episode_video(video_key, episode_index))
# `meta.save_episode` be executed after encoding the videos
ep_metadata.update({"action_config": action_config})
self._meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
self.meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
if has_video_keys and use_batched_encoding:
self._episodes_since_last_encoding += 1
if self._episodes_since_last_encoding == self._batch_encoding_size:
start_ep = self._meta.total_episodes - self._batch_encoding_size
end_ep = self._meta.total_episodes
# Check if we should trigger batch encoding
self.episodes_since_last_encoding += 1
if self.episodes_since_last_encoding == self.batch_encoding_size:
start_ep = self.num_episodes - self.batch_encoding_size
end_ep = self.num_episodes
self._batch_save_episode_video(start_ep, end_ep)
self._episodes_since_last_encoding = 0
self.episodes_since_last_encoding = 0
if not episode_data:
self.clear_episode_buffer(delete_images=len(self._meta.image_keys) > 0)
class RoboMINDDataset(LeRobotDataset):
@classmethod
def create(cls, *args, **kwargs) -> "RoboMINDDataset":
sig = inspect.signature(super().create)
bound = sig.bind_partial(*args, **kwargs)
bound.apply_defaults()
params = bound.arguments
obj = super().create(*args, **kwargs)
shutil.rmtree(params["root"], ignore_errors=True)
obj.meta: RoboMINDDatasetMetadata = RoboMINDDatasetMetadata.create(
repo_id=params["repo_id"],
fps=params["fps"],
robot_type=params["robot_type"],
features=params["features"],
root=params["root"],
use_videos=params["use_videos"],
metadata_buffer_size=params["metadata_buffer_size"],
)
obj.writer: RoboMINDDatasetWriter = RoboMINDDatasetWriter(
meta=obj.meta,
root=obj.root,
vcodec=obj._vcodec,
encoder_threads=obj._encoder_threads,
batch_encoding_size=obj._batch_encoding_size,
)
return obj
def save_episode(
self, split, action_config: dict, episode_data: dict | None = None, parallel_encoding: bool = True
) -> None:
self._require_writer("save_episode")
self.writer.save_episode(split, action_config, episode_data, parallel_encoding)
# Reset episode buffer and clean up temporary images (if not already deleted during video encoding)
self.clear_episode_buffer(delete_images=len(self.meta.image_keys) > 0)
def get_all_tasks(src_path: Path, output_path: Path, embodiment: str):
@@ -314,11 +255,11 @@ def save_as_lerobot_dataset(task: tuple[dict, Path, str], src_path, benchmark, e
return
else:
logging.warning(f"Skipped {episode_path}: len of dataset:{len(raw_dataset)} or {str(err)}")
dataset.finalize()
gc.collect()
if dataset.meta.total_episodes == 0:
shutil.rmtree(local_dir)
del dataset
def main(
@@ -357,6 +298,7 @@ def main(
logging.error(f"Exception occurred for {task_path['train']}")
with open("output.txt", "a") as f:
f.write(f"{task_path['train']}, exception details: {str(e)}\n")
ray.shutdown()
if __name__ == "__main__":
@@ -1,11 +1,9 @@
import numpy as np
from lerobot.datasets.compute_stats import (
DEFAULT_QUANTILES,
auto_downsample_height_width,
get_feature_stats,
sample_indices,
)
from lerobot.datasets.io_utils import load_image_as_numpy
import torchvision
from lerobot.datasets.compute_stats import auto_downsample_height_width, get_feature_stats, sample_indices
from lerobot.datasets.utils import load_image_as_numpy
torchvision.set_video_backend("pyav")
def generate_features_from_config(AgiBotWorld_CONFIG):
@@ -51,31 +49,21 @@ def sample_images(input):
return images
def compute_episode_stats(
episode_data: dict[str, list[str] | np.ndarray],
features: dict,
quantile_list: list[float] | None = None,
) -> dict:
if quantile_list is None:
quantile_list = DEFAULT_QUANTILES
def compute_episode_stats(episode_data: dict[str, list[str] | np.ndarray], features: dict) -> dict:
ep_stats = {}
for key, data in episode_data.items():
if features[key]["dtype"] == "string":
continue
continue # HACK: we should receive np.arrays of strings
elif features[key]["dtype"] in ["image", "video"]:
ep_ft_array = sample_images(data)
axes_to_reduce = (0, 2, 3)
axes_to_reduce = (0, 2, 3) # keep channel dim
keepdims = True
else:
ep_ft_array = data
axes_to_reduce = 0
keepdims = data.ndim == 1
ep_ft_array = data # data is already a np.ndarray
axes_to_reduce = 0 # compute stats over the first axis
keepdims = data.ndim == 1 # keep as np.array
ep_stats[key] = get_feature_stats(
ep_ft_array, axis=axes_to_reduce, keepdims=keepdims, quantile_list=quantile_list
)
ep_stats[key] = get_feature_stats(ep_ft_array, axis=axes_to_reduce, keepdims=keepdims)
if features[key]["dtype"] in ["image", "video"]:
value_norm = 1.0 if "depth" in key else 255.0