diff --git a/README.md b/README.md index b9a0834..1dd8eea 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ [![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/Tavish9/any4lerobot) [![Python versions](https://img.shields.io/pypi/pyversions/lerobot)](https://www.python.org/downloads/) [![LeRobot Dataset](https://img.shields.io/badge/dynamic/json?url=https://api.github.com/repos/tavish9/any4lerobot/commits?per_page=1&query=$[0].commit.committer.date&label=LeRobot&color=blue)](https://github.com/huggingface/lerobot) -[![LeRobot Dataset](https://img.shields.io/badge/LeRobot%20Dataset-v2.1-ff69b4.svg)](https://github.com/huggingface/lerobot/pull/711) +[![LeRobot Dataset](https://img.shields.io/badge/LeRobot%20Dataset-v3.0-ff69b4.svg)](https://github.com/huggingface/lerobot/pull/1412) [![License](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) @@ -18,16 +18,17 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/huggingface/lerobot), including data conversion scripts, preprocessing tools, training workflow helpers and etc.. -## πŸš€ What's New +## πŸ“£ What's New +- **\[2025.09.28\]** We have upgraded LeRobotDataset from v2.1 to v3.0! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.06.27\]** We have supported Data Conversion from LIBERO to LeRobot! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.05.12\]** We have supported Data Conversion from RoboMIND to LeRobot! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.04.15\]** We add Dataset Merging Tool for merging multi-source lerobot datasets! πŸ”₯πŸ”₯πŸ”₯ -- **\[2025.04.14\]** We have supported Data Conversion from AgiBotWorld to LeRobot! πŸ”₯πŸ”₯πŸ”₯
More News +- **\[2025.04.14\]** We have supported Data Conversion from AgiBotWorld to LeRobot! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.04.11\]** We change the repo from `openx2lerobot` to `any4lerobot`, making a ​​universal toolbox for LeRobot​​! πŸ”₯πŸ”₯πŸ”₯ - **\[2025.02.19\]** We have supported Data Conversion from Open X-Embodiment to LeRobot! πŸ”₯πŸ”₯πŸ”₯
diff --git a/agibot2lerobot/agibot_h5.py b/agibot2lerobot/agibot_h5.py index 98cdcb9..b096244 100644 --- a/agibot2lerobot/agibot_h5.py +++ b/agibot2lerobot/agibot_h5.py @@ -1,10 +1,7 @@ import argparse import gc import shutil -from concurrent.futures import ( - ThreadPoolExecutor, - as_completed, -) +import tempfile from pathlib import Path import numpy as np @@ -13,105 +10,13 @@ import torch from agibot_utils.agibot_utils import get_task_info, load_local_dataset from agibot_utils.config import AgiBotWorld_TASK_TYPE from agibot_utils.lerobot_utils import compute_episode_stats, generate_features_from_config -from lerobot.datasets.compute_stats import aggregate_stats -from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata -from lerobot.datasets.utils import ( - check_timestamps_sync, - get_episode_data_index, - validate_episode_buffer, - validate_frame, - write_episode, - write_episode_stats, - write_info, -) -from lerobot.datasets.video_utils import get_safe_default_codec +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.datasets.utils import validate_episode_buffer, validate_frame from ray.runtime_env import RuntimeEnv -class AgiBotDatasetMetadata(LeRobotDatasetMetadata): - def save_episode( - self, - episode_index: int, - episode_length: int, - episode_tasks: list[str], - episode_stats: dict[str, dict], - action_config: list[dict], - ) -> None: - self.info["total_episodes"] += 1 - self.info["total_frames"] += episode_length - - chunk = self.get_episode_chunk(episode_index) - if chunk >= self.total_chunks: - self.info["total_chunks"] += 1 - - self.info["splits"] = {"train": f"0:{self.info['total_episodes']}"} - self.info["total_videos"] += len(self.video_keys) - if len(self.video_keys) > 0: - self.update_video_info() - - write_info(self.info, self.root) - - episode_dict = { - "episode_index": episode_index, - "tasks": episode_tasks, - "length": episode_length, - "action_config": action_config, - } - self.episodes[episode_index] = episode_dict - write_episode(episode_dict, self.root) - - self.episodes_stats[episode_index] = episode_stats - self.stats = aggregate_stats([self.stats, episode_stats]) if self.stats else episode_stats - write_episode_stats(episode_index, episode_stats, self.root) - - class AgiBotDataset(LeRobotDataset): - @classmethod - def create( - cls, - repo_id: str, - fps: int, - features: dict, - root: str | Path | None = None, - robot_type: str | None = None, - use_videos: bool = True, - tolerance_s: float = 1e-4, - image_writer_processes: int = 0, - image_writer_threads: int = 0, - video_backend: str | None = None, - ) -> "LeRobotDataset": - """Create a LeRobot Dataset from scratch in order to record data.""" - obj = cls.__new__(cls) - obj.meta = AgiBotDatasetMetadata.create( - repo_id=repo_id, - fps=fps, - robot_type=robot_type, - features=features, - root=root, - use_videos=use_videos, - ) - obj.repo_id = obj.meta.repo_id - obj.root = obj.meta.root - obj.revision = None - obj.tolerance_s = tolerance_s - obj.image_writer = None - - if image_writer_processes or image_writer_threads: - obj.start_image_writer(image_writer_processes, image_writer_threads) - - # TODO(aliberts, rcadene, alexander-soare): Merge this with OnlineBuffer/DataBuffer - obj.episode_buffer = obj.create_episode_buffer() - - obj.episodes = None - obj.hf_dataset = obj.create_hf_dataset() - obj.image_transforms = None - obj.delta_timestamps = None - obj.delta_indices = None - obj.episode_data_index = None - obj.video_backend = video_backend if video_backend is not None else get_safe_default_codec() - return obj - - def add_frame(self, frame: dict, task: str, timestamp: float | None = None) -> None: + def add_frame(self, frame: dict) -> None: """ This function only adds the frame to the episode_buffer. Apart from images β€” which are written in a temporary directory β€” nothing is written to disk. To save those frames, the 'save_episode()' method @@ -130,11 +35,10 @@ class AgiBotDataset(LeRobotDataset): # Automatically add frame_index and timestamp to episode buffer frame_index = self.episode_buffer["size"] - if timestamp is None: - timestamp = frame_index / self.fps + timestamp = frame.pop("timestamp") if "timestamp" in frame else frame_index / self.fps self.episode_buffer["frame_index"].append(frame_index) self.episode_buffer["timestamp"].append(timestamp) - self.episode_buffer["task"].append(task) + self.episode_buffer["task"].append(frame.pop("task")) # Remove task from frame after processing # Add frame features to episode_buffer for key, value in frame.items(): @@ -156,8 +60,7 @@ class AgiBotDataset(LeRobotDataset): save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to None. """ - if not episode_data: - episode_buffer = self.episode_buffer + episode_buffer = episode_data if episode_data is not None else self.episode_buffer validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features) @@ -170,11 +73,8 @@ class AgiBotDataset(LeRobotDataset): episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length) episode_buffer["episode_index"] = np.full((episode_length,), episode_index) - # Add new tasks to the tasks dictionary - for task in episode_tasks: - task_index = self.meta.get_task_index(task) - if task_index is None: - self.meta.add_task(task) + # Update tasks and task indices with new tasks if any + self.meta.save_episode_tasks(episode_tasks) # Given tasks in natural language, find their corresponding task indices episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks]) @@ -187,31 +87,46 @@ class AgiBotDataset(LeRobotDataset): episode_buffer[key] = np.stack(episode_buffer[key]).squeeze() for key in self.meta.video_keys: - video_path = self.root / self.meta.get_video_file_path(episode_index, key) - episode_buffer[key] = str(video_path) # PosixPath -> str - video_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copyfile(videos[key], video_path) + episode_buffer[key] = str(videos[key]) # PosixPath -> str ep_stats = compute_episode_stats(episode_buffer, self.features) - self._save_episode_table(episode_buffer, episode_index) + ep_metadata = self._save_episode_data(episode_buffer) + has_video_keys = len(self.meta.video_keys) > 0 + use_batched_encoding = self.batch_encoding_size > 1 + + self.current_videos = videos + if has_video_keys and not use_batched_encoding: + for video_key in self.meta.video_keys: + ep_metadata.update(self._save_episode_video(video_key, episode_index)) # `meta.save_episode` be executed after encoding the videos # add action_config to current episode - self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, action_config) + ep_metadata.update({"action_config": action_config}) + self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata) - ep_data_index = get_episode_data_index(self.meta.episodes, [episode_index]) - ep_data_index_np = {k: t.numpy() for k, t in ep_data_index.items()} - check_timestamps_sync( - episode_buffer["timestamp"], - episode_buffer["episode_index"], - ep_data_index_np, - self.fps, - self.tolerance_s, - ) + if has_video_keys and use_batched_encoding: + # Check if we should trigger batch encoding + self.episodes_since_last_encoding += 1 + if self.episodes_since_last_encoding == self.batch_encoding_size: + start_ep = self.num_episodes - self.batch_encoding_size + end_ep = self.num_episodes + self._batch_save_episode_video(start_ep, end_ep) + self.episodes_since_last_encoding = 0 - if not episode_data: # Reset the buffer - self.episode_buffer = self.create_episode_buffer() + if not episode_data: + # Reset episode buffer and clean up temporary images (if not already deleted during video encoding) + self.clear_episode_buffer(delete_images=len(self.meta.image_keys) > 0) + + def _encode_temporary_episode_video(self, video_key: str, episode_index: int) -> Path: + """ + Use ffmpeg to convert frames stored as png into mp4 videos. + Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding, + since video encoding with ffmpeg is already using multithreading. + """ + temp_path = Path(tempfile.mkdtemp(dir=self.root)) / f"{video_key}_{episode_index:03d}.mp4" + shutil.copy(self.current_videos[video_key], temp_path) + return temp_path def get_all_tasks(src_path: Path, output_path: Path): @@ -221,7 +136,7 @@ def get_all_tasks(src_path: Path, output_path: Path): yield (json_file, local_dir.resolve()) -def save_as_lerobot_dataset(agibot_world_config, task: tuple[Path, Path], num_threads, save_depth, debug): +def save_as_lerobot_dataset(agibot_world_config, task: tuple[Path, Path], save_depth): json_file, local_dir = task print(f"processing {json_file.stem}, saving to {local_dir}") src_path = json_file.parent.parent @@ -252,70 +167,34 @@ def save_as_lerobot_dataset(agibot_world_config, task: tuple[Path, Path], num_th all_subdir_eids = sorted([int(Path(path).name) for path in all_subdir]) - if debug or not save_depth: - for eid in all_subdir_eids: - if eid not in task_info: - print(f"{json_file.stem}, episode_{eid} not in task_info.json, skipping...") - continue - action_config = task_info[eid]["label_info"]["action_config"] - raw_dataset = load_local_dataset( - eid, - src_path=src_path, - task_id=task_id, - save_depth=save_depth, - AgiBotWorld_CONFIG=agibot_world_config, - ) - _, frames, videos = raw_dataset - if not all([video_path.exists() for video_path in videos.values()]): - print(f"{json_file.stem}, episode_{eid}: some of the videos does not exist, skipping...") - continue + for eid in all_subdir_eids: + if eid not in task_info: + print(f"{json_file.stem}, episode_{eid} not in task_info.json, skipping...") + continue + action_config = task_info[eid]["label_info"]["action_config"] + raw_dataset = load_local_dataset( + eid, + src_path=src_path, + task_id=task_id, + save_depth=save_depth, + AgiBotWorld_CONFIG=agibot_world_config, + ) + _, frames, videos = raw_dataset + if not all([video_path.exists() for video_path in videos.values()]): + print(f"{json_file.stem}, episode_{eid}: some of the videos does not exist, skipping...") + continue - for frame_data in frames: - dataset.add_frame(frame_data, task_instruction) - try: - dataset.save_episode(videos=videos, action_config=action_config) - except Exception as e: - print(f"{json_file.stem}, episode_{eid}: there are some corrupted mp4s\nException details: {str(e)}") - dataset.episode_buffer = None - continue - gc.collect() - print(f"process done for {json_file.stem}, episode_id {eid}, len {len(frames)}") - else: - with ThreadPoolExecutor(max_workers=num_threads) as executor: - futures = [] - for eid in all_subdir_eids: - if eid not in task_info: - print(f"{json_file.stem}, episode_{eid} not in task_info.json, skipping...") - continue - futures.append( - executor.submit( - load_local_dataset, - eid, - src_path=src_path, - task_id=task_id, - save_depth=save_depth, - AgiBotWorld_CONFIG=agibot_world_config, - ) - ) - - for raw_dataset in as_completed(futures): - eid, frames, videos = raw_dataset.result() - if not all([video_path.exists() for video_path in videos.values()]): - print(f"{json_file.stem}, episode_{eid}: some of the videos does not exist, skipping...") - continue - action_config = task_info[eid]["label_info"]["action_config"] - for frame_data in frames: - dataset.add_frame(frame_data, task_instruction) - try: - dataset.save_episode(videos=videos, action_config=action_config) - except Exception as e: - print( - f"{json_file.stem}, episode_{eid}: there are some corrupted mp4s\nException details: {str(e)}" - ) - dataset.episode_buffer = None - continue - gc.collect() - print(f"process done for {json_file.stem}, episode_id {eid}, len {len(frames)}") + for frame_data in frames: + frame_data["task"] = task_instruction + dataset.add_frame(frame_data) + try: + dataset.save_episode(videos=videos, action_config=action_config) + except Exception as e: + print(f"{json_file.stem}, episode_{eid}: there are some corrupted mp4s\nException details: {str(e)}") + dataset.episode_buffer = None + continue + gc.collect() + print(f"process done for {json_file.stem}, episode_id {eid}, len {len(frames)}") def main( @@ -324,7 +203,6 @@ def main( eef_type: str, task_ids: list, cpus_per_task: int, - num_threads_per_task: int, save_depth: bool, debug: bool = False, ): @@ -345,14 +223,10 @@ def main( tasks = filter(lambda task: task[0].stem in task_ids, tasks) if debug: - save_as_lerobot_dataset(agibot_world_config, next(tasks), num_threads_per_task, save_depth, debug) + save_as_lerobot_dataset(agibot_world_config, next(tasks), save_depth) else: runtime_env = RuntimeEnv( - env_vars={ - "HDF5_USE_FILE_LOCKING": "FALSE", - "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE", - "LD_PRELOAD": str(Path(__file__).resolve().parent / "libtcmalloc.so.4.5.3"), - } + env_vars={"HDF5_USE_FILE_LOCKING": "FALSE", "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE"} ) ray.init(runtime_env=runtime_env) resources = ray.available_resources() @@ -363,9 +237,7 @@ def main( remote_task = ray.remote(save_as_lerobot_dataset).options(num_cpus=cpus_per_task) futures = [] for task in tasks: - futures.append( - (task[0].stem, remote_task.remote(agibot_world_config, task, num_threads_per_task, save_depth, debug)) - ) + futures.append((task[0].stem, remote_task.remote(agibot_world_config, task, save_depth))) for task, future in futures: try: @@ -385,7 +257,6 @@ if __name__ == "__main__": parser.add_argument("--eef-type", type=str, choices=["gripper", "dexhand", "tactile"], default="gripper") parser.add_argument("--task-ids", type=str, nargs="+", help="task_327 task_351 ...", default=[]) parser.add_argument("--cpus-per-task", type=int, default=3) - parser.add_argument("--num-threads-per-task", type=int, default=2) parser.add_argument("--save-depth", action="store_true") parser.add_argument("--debug", action="store_true") args = parser.parse_args() diff --git a/agibot2lerobot/libtcmalloc.so.4.5.3 b/agibot2lerobot/libtcmalloc.so.4.5.3 deleted file mode 100755 index 133c817..0000000 Binary files a/agibot2lerobot/libtcmalloc.so.4.5.3 and /dev/null differ diff --git a/libero2lerobot/libero_h5.py b/libero2lerobot/libero_h5.py index c7a8a50..859a4f2 100644 --- a/libero2lerobot/libero_h5.py +++ b/libero2lerobot/libero_h5.py @@ -4,20 +4,27 @@ import re import shutil from pathlib import Path -import numpy as np import pandas as pd import ray from datatrove.executor import LocalPipelineExecutor, RayPipelineExecutor from datatrove.pipeline.base import PipelineStep +from lerobot.datasets.aggregate import ( + aggregate_data, + aggregate_metadata, + aggregate_stats, + aggregate_videos, + validate_all_metadata, +) from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata from lerobot.datasets.utils import ( - write_episode, - write_episode_stats, + DEFAULT_CHUNK_SIZE, + DEFAULT_DATA_FILE_SIZE_IN_MB, + DEFAULT_VIDEO_FILE_SIZE_IN_MB, write_info, - write_task, + write_stats, + write_tasks, ) from libero_utils.config import LIBERO_FEATURES -from libero_utils.lerobot_utils import validate_all_metadata from libero_utils.libero_utils import load_local_episodes from ray.runtime_env import RuntimeEnv from tqdm import tqdm @@ -63,178 +70,75 @@ class SaveLerobotDataset(PipelineStep): for episode_index, episode_data in enumerate(raw_dataset): with self.track_time("saving episode"): for frame_data in episode_data: - dataset.add_frame( - frame_data, - task=task_instruction, - ) + frame_data["task"] = task_instruction + dataset.add_frame(frame_data) dataset.save_episode() logger.info(f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}") -class AggregateDatasets(PipelineStep): - name = "Aggregate Datasets" - type = "libero2lerobot" +def create_aggr_dataset(raw_dirs: list[Path], aggregated_dir: Path): + logger = setup_logger() - def __init__(self, raw_dirs: list[Path], aggregated_dir: Path): - super().__init__() - self.raw_dirs = raw_dirs - self.aggregated_dir = aggregated_dir + all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in raw_dirs] - self.create_aggr_dataset() + fps, robot_type, features = validate_all_metadata(all_metadata) - def create_aggr_dataset(self): - logger = setup_logger() + if aggregated_dir.exists(): + shutil.rmtree(aggregated_dir) - all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in self.raw_dirs] + aggr_meta = LeRobotDatasetMetadata.create( + repo_id=f"{aggregated_dir.parent.name}/{aggregated_dir.name}", + root=aggregated_dir, + fps=fps, + robot_type=robot_type, + features=features, + ) - fps, robot_type, features = validate_all_metadata(all_metadata) + video_keys = [key for key in features if features[key]["dtype"] == "video"] + unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique() + aggr_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks) - if self.aggregated_dir.exists(): - shutil.rmtree(self.aggregated_dir) + meta_idx = {"chunk": 0, "file": 0} + data_idx = {"chunk": 0, "file": 0} + videos_idx = {key: {"chunk": 0, "file": 0, "latest_duration": 0, "episode_duration": 0} for key in video_keys} - aggr_meta = LeRobotDatasetMetadata.create( - repo_id=f"{self.aggregated_dir.parent.name}/{self.aggregated_dir.name}", - root=self.aggregated_dir, - fps=fps, - robot_type=robot_type, - features=features, + aggr_meta.episodes = {} + + for src_meta in tqdm(all_metadata, desc="Copy data and videos"): + videos_idx = aggregate_videos( + src_meta, aggr_meta, videos_idx, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE ) + data_idx = aggregate_data(src_meta, aggr_meta, data_idx, DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE) - datasets_task_index_to_aggr_task_index = {} - aggr_task_index = 0 - for dataset_index, meta in enumerate(tqdm(all_metadata, desc="Aggregate tasks index")): - task_index_to_aggr_task_index = {} + meta_idx = aggregate_metadata(src_meta, aggr_meta, meta_idx, data_idx, videos_idx) - for task_index, task in meta.tasks.items(): - if task not in aggr_meta.task_to_task_index: - # add the task to aggr tasks mappings - aggr_meta.tasks[aggr_task_index] = task - aggr_meta.task_to_task_index[task] = aggr_task_index - aggr_task_index += 1 + aggr_meta.info["total_episodes"] += src_meta.total_episodes + aggr_meta.info["total_frames"] += src_meta.total_frames - task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task] + logger.info("write tasks") + write_tasks(aggr_meta.tasks, aggr_meta.root) - datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index + logger.info("write info") + aggr_meta.info.update( + { + "total_tasks": len(aggr_meta.tasks), + "total_episodes": sum(m.total_episodes for m in all_metadata), + "total_frames": sum(m.total_frames for m in all_metadata), + "splits": {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"}, + } + ) + write_info(aggr_meta.info, aggr_meta.root) - datasets_aggr_episode_index_shift = {} - datasets_aggr_index_shift = {} - for dataset_index, meta in enumerate(tqdm(all_metadata, desc="Aggregate episodes and global index")): - datasets_aggr_episode_index_shift[dataset_index] = aggr_meta.total_episodes - datasets_aggr_index_shift[dataset_index] = aggr_meta.total_frames - - # populate episodes - for episode_index, episode_dict in meta.episodes.items(): - aggr_episode_index = episode_index + aggr_meta.total_episodes - episode_dict["episode_index"] = aggr_episode_index - aggr_meta.episodes[aggr_episode_index] = episode_dict - - # populate episodes_stats - for episode_index, episode_stats in meta.episodes_stats.items(): - aggr_episode_index = episode_index + aggr_meta.total_episodes - episode_stats["index"].update( - { - "min": episode_stats["index"]["min"] + aggr_meta.total_frames, - "max": episode_stats["index"]["max"] + aggr_meta.total_frames, - "mean": episode_stats["index"]["mean"] + aggr_meta.total_frames, - } - ) - episode_stats["episode_index"].update( - { - "min": np.array([aggr_episode_index]), - "max": np.array([aggr_episode_index]), - "mean": np.array([aggr_episode_index]), - } - ) - df = pd.read_parquet(meta.root / meta.get_data_file_path(episode_index)) - df["task_index"] = df["task_index"].map(datasets_task_index_to_aggr_task_index[dataset_index]) - episode_stats["task_index"].update( - { - "min": np.array([df["task_index"].min()]), - "max": np.array([df["task_index"].max()]), - "mean": np.array([df["task_index"].mean()]), - "std": np.array([df["task_index"].std()]), - } - ) - aggr_meta.episodes_stats[aggr_episode_index] = episode_stats - - # populate info - aggr_meta.info["total_episodes"] += meta.total_episodes - aggr_meta.info["total_frames"] += meta.total_frames - aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes - - logger.info("Write meta data") - aggr_meta.info["total_tasks"] = len(aggr_meta.tasks) - aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_meta.total_episodes - 1) + 1 - aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"} - - # create a new episodes jsonl with updated episode_index using write_episode - for episode_dict in tqdm(aggr_meta.episodes.values(), desc="Write episodes info"): - write_episode(episode_dict, aggr_meta.root) - - # create a new episode_stats jsonl with updated episode_index using write_episode_stats - for episode_index, episode_stats in tqdm(aggr_meta.episodes_stats.items(), desc="Write episodes stats info"): - write_episode_stats(episode_index, episode_stats, aggr_meta.root) - - # create a new task jsonl with updated episode_index using write_task - for task_index, task in tqdm(aggr_meta.tasks.items(), desc="Write tasks info"): - write_task(task_index, task, aggr_meta.root) - - write_info(aggr_meta.info, aggr_meta.root) - - self.datasets_task_index_to_aggr_task_index = datasets_task_index_to_aggr_task_index - self.datasets_aggr_episode_index_shift = datasets_aggr_episode_index_shift - self.datasets_aggr_index_shift = datasets_aggr_index_shift - - logger.info("Meta data done writing") - - def run(self, data=None, rank: int = 0, world_size: int = 1): - logger = setup_logger() - - dataset_index = rank - aggr_meta = LeRobotDatasetMetadata("", root=self.aggregated_dir) - meta = LeRobotDatasetMetadata("", root=self.raw_dirs[dataset_index]) - aggr_episode_index_shift = self.datasets_aggr_episode_index_shift[dataset_index] - aggr_index_shift = self.datasets_aggr_index_shift[dataset_index] - task_index_to_aggr_task_index = self.datasets_task_index_to_aggr_task_index[dataset_index] - - with self.track_time("aggregating dataset"): - logger.info("Copy data") - for episode_index in range(meta.total_episodes): - aggr_episode_index = episode_index + aggr_episode_index_shift - data_path = meta.root / meta.get_data_file_path(episode_index) - aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index) - aggr_data_path.parent.mkdir(parents=True, exist_ok=True) - - # update index, episode_index and task_index - df = pd.read_parquet(data_path) - df["index"] += aggr_index_shift - df["episode_index"] += aggr_episode_index_shift - df["task_index"] = df["task_index"].map(task_index_to_aggr_task_index) - df.to_parquet(aggr_data_path) - - logger.info("Copy videos") - for episode_index in range(meta.total_episodes): - aggr_episode_index = episode_index + aggr_episode_index_shift - for vid_key in meta.video_keys: - video_path = meta.root / meta.get_video_file_path(episode_index, vid_key) - aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key) - aggr_video_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(video_path, aggr_video_path) + logger.info("write stats") + aggr_meta.stats = aggregate_stats([m.stats for m in all_metadata]) + write_stats(aggr_meta.stats, aggr_meta.root) -class DeleteTempData(PipelineStep): - name = "Delete Temp Data" - type = "libero2lerobot" - - def __init__(self, temp_dirs: list[Path]): - super().__init__() - self.temp_dirs = temp_dirs - - def run(self, data=None, rank: int = 0, world_size: int = 1): - logger = setup_logger() - - logger.info(f"Delete temp data {self.temp_dirs[rank]}") - shutil.rmtree(self.temp_dirs[rank]) +def delete_temp_data(temp_dirs: list[Path]): + logger = setup_logger() + logger.info("Delete temp data_dir") + for temp_dir in temp_dirs: + shutil.rmtree(temp_dir) def main( @@ -244,8 +148,7 @@ def main( cpus_per_task: int, tasks_per_job: int, workers: int, - resume_from_save: Path = None, - resume_from_aggregate: Path = None, + resume_dir: Path = None, debug: bool = False, repo_id: str = None, push_to_hub: bool = False, @@ -302,16 +205,9 @@ def main( **({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job} if executor is RayPipelineExecutor else {}), } - executor(pipeline=[SaveLerobotDataset(tasks)], **executor_config, logging_dir=resume_from_save).run() - executor( - pipeline=[DeleteTempData([task[1] for task in tasks])], - **executor_config, - depends=executor( - pipeline=[AggregateDatasets([task[1] for task in tasks], aggregate_output_path)], - **executor_config, - logging_dir=resume_from_aggregate, - ), - ).run() + executor(pipeline=[SaveLerobotDataset(tasks)], **executor_config, logging_dir=resume_dir).run() + create_aggr_dataset([task[1] for task in tasks], aggregate_output_path) + delete_temp_data([task[1] for task in tasks]) for task in tasks: shutil.rmtree(task[1].parent, ignore_errors=True) @@ -340,8 +236,7 @@ if __name__ == "__main__": parser.add_argument("--cpus-per-task", type=int, default=1) parser.add_argument("--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray") parser.add_argument("--workers", type=int, default=-1, help="number of concurrent jobs to run") - parser.add_argument("--resume-from-save", type=Path, help="logs directory to resume from save step") - parser.add_argument("--resume-from-aggregate", type=Path, help="logs directory to resume from aggregate step") + parser.add_argument("--resume-dir", type=Path, help="logs directory to resume") parser.add_argument("--debug", action="store_true") parser.add_argument("--repo-id", type=str, help="required when push-to-hub is True") parser.add_argument("--push-to-hub", action="store_true", help="upload to hub") diff --git a/libero2lerobot/libero_utils/lerobot_utils.py b/libero2lerobot/libero_utils/lerobot_utils.py deleted file mode 100644 index f2b05c0..0000000 --- a/libero2lerobot/libero_utils/lerobot_utils.py +++ /dev/null @@ -1,23 +0,0 @@ -import tqdm -from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata - - -def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]): - """ - implemented by @Cadene - """ - # validate same fps, robot_type, features - - fps = all_metadata[0].fps - robot_type = all_metadata[0].robot_type - features = all_metadata[0].features - - for meta in tqdm.tqdm(all_metadata, desc="Validate all meta data"): - if fps != meta.fps: - raise ValueError(f"Same fps is expected, but got fps={meta.fps} instead of {fps}.") - if robot_type != meta.robot_type: - raise ValueError(f"Same robot_type is expected, but got robot_type={meta.robot_type} instead of {robot_type}.") - if features != meta.features: - raise ValueError(f"Same features is expected, but got features={meta.features} instead of {features}.") - - return fps, robot_type, features diff --git a/openx2lerobot/openx_rlds.py b/openx2lerobot/openx_rlds.py index 1d842f4..b8809a1 100644 --- a/openx2lerobot/openx_rlds.py +++ b/openx2lerobot/openx_rlds.py @@ -37,8 +37,8 @@ from pathlib import Path import numpy as np import tensorflow as tf import tensorflow_datasets as tfds -from lerobot.constants import HF_LEROBOT_HOME from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.utils.constants import HF_LEROBOT_HOME from oxe_utils.configs import OXE_DATASET_CONFIGS, ActionEncoding, StateEncoding from oxe_utils.transforms import OXE_STANDARDIZATION_TRANSFORMS @@ -147,8 +147,8 @@ def save_as_lerobot_dataset(lerobot_dataset: LeRobotDataset, raw_dataset: tf.dat **image_dict, "observation.state": traj["proprio"][i], "action": traj["action"][i], + "task": traj["task"][0].decode(), }, - task=traj["task"][0].decode(), ) lerobot_dataset.save_episode() diff --git a/robomind2lerobot/libtcmalloc.so.4.5.3 b/robomind2lerobot/libtcmalloc.so.4.5.3 deleted file mode 100755 index 133c817..0000000 Binary files a/robomind2lerobot/libtcmalloc.so.4.5.3 and /dev/null differ diff --git a/robomind2lerobot/robomind_h5.py b/robomind2lerobot/robomind_h5.py index 759d5b1..ef68024 100644 --- a/robomind2lerobot/robomind_h5.py +++ b/robomind2lerobot/robomind_h5.py @@ -8,18 +8,9 @@ from pathlib import Path import numpy as np import pandas as pd import ray -import torch from lerobot.datasets.compute_stats import aggregate_stats from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata -from lerobot.datasets.utils import ( - check_timestamps_sync, - get_episode_data_index, - validate_episode_buffer, - validate_frame, - write_episode, - write_episode_stats, - write_info, -) +from lerobot.datasets.utils import flatten_dict, validate_episode_buffer, write_info, write_stats from lerobot.datasets.video_utils import get_safe_default_codec from ray.runtime_env import RuntimeEnv from robomind_uitls.configs import ROBOMIND_CONFIG @@ -37,38 +28,31 @@ class RoboMINDDatasetMetadata(LeRobotDatasetMetadata): episode_length: int, episode_tasks: list[str], episode_stats: dict[str, dict], - action_config: dict[str, str | dict], + episode_metadata: dict, ) -> None: + episode_dict = { + "episode_index": episode_index, + "tasks": episode_tasks, + "length": episode_length, + } + episode_dict.update(episode_metadata) + episode_dict.update(flatten_dict({"stats": episode_stats})) + self._save_episode_metadata(episode_dict) + + # Update info self.info["total_episodes"] += 1 self.info["total_frames"] += episode_length - - chunk = self.get_episode_chunk(episode_index) - if chunk >= self.total_chunks: - self.info["total_chunks"] += 1 - + self.info["total_tasks"] = len(self.tasks) if split == "train": self.info["splits"]["train"] = f"0:{self.info['total_episodes']}" self.train_count = self.info["total_episodes"] elif "val" in split: self.info["splits"]["validation"] = f"{self.train_count}:{self.info['total_episodes']}" - self.info["total_videos"] += len(self.video_keys) - if len(self.video_keys) > 0: - self.update_video_info() write_info(self.info, self.root) - episode_dict = { - "episode_index": episode_index, - "tasks": episode_tasks, - "length": episode_length, - **({"action_config": action_config} if action_config else {}), - } - self.episodes[episode_index] = episode_dict - write_episode(episode_dict, self.root) - - self.episodes_stats[episode_index] = episode_stats - self.stats = aggregate_stats([self.stats, episode_stats]) if self.stats else episode_stats - write_episode_stats(episode_index, episode_stats, self.root) + self.stats = aggregate_stats([self.stats, episode_stats]) if self.stats is not None else episode_stats + write_stats(self.stats, self.root) class RoboMINDDataset(LeRobotDataset): @@ -85,6 +69,7 @@ class RoboMINDDataset(LeRobotDataset): image_writer_processes: int = 0, image_writer_threads: int = 0, video_backend: str | None = None, + batch_encoding_size: int = 1, ) -> "LeRobotDataset": """Create a LeRobot Dataset from scratch in order to record data.""" obj = cls.__new__(cls) @@ -101,6 +86,8 @@ class RoboMINDDataset(LeRobotDataset): obj.revision = None obj.tolerance_s = tolerance_s obj.image_writer = None + obj.batch_encoding_size = batch_encoding_size + obj.episodes_since_last_encoding = 0 if image_writer_processes or image_writer_threads: obj.start_image_writer(image_writer_processes, image_writer_threads) @@ -113,53 +100,10 @@ class RoboMINDDataset(LeRobotDataset): obj.image_transforms = None obj.delta_timestamps = None obj.delta_indices = None - obj.episode_data_index = None obj.video_backend = video_backend if video_backend is not None else get_safe_default_codec() return obj - def add_frame(self, frame: dict, task: str, timestamp: float | None = None) -> None: - """ - This function only adds the frame to the episode_buffer. Apart from images β€” which are written in a - temporary directory β€” nothing is written to disk. To save those frames, the 'save_episode()' method - then needs to be called. - """ - # Convert torch to numpy if needed - for name in frame: - if isinstance(frame[name], torch.Tensor): - frame[name] = frame[name].numpy() - - validate_frame(frame, self.features) - - if self.episode_buffer is None: - self.episode_buffer = self.create_episode_buffer() - - # Automatically add frame_index and timestamp to episode buffer - frame_index = self.episode_buffer["size"] - if timestamp is None: - timestamp = frame_index / self.fps - self.episode_buffer["frame_index"].append(frame_index) - self.episode_buffer["timestamp"].append(timestamp) - self.episode_buffer["task"].append(task) - - # Add frame features to episode_buffer - for key, value in frame.items(): - if key not in self.features: - raise ValueError(f"An element of the frame is not in the features. '{key}' not in '{self.features.keys()}'.") - - if self.features[key]["dtype"] in ["video"]: - img_path = self._get_image_file_path( - episode_index=self.episode_buffer["episode_index"], image_key=key, frame_index=frame_index - ) - if frame_index == 0: - img_path.parent.mkdir(parents=True, exist_ok=True) - self._save_image(value, img_path) - self.episode_buffer[key].append(str(img_path)) - else: - self.episode_buffer[key].append(value) - - self.episode_buffer["size"] += 1 - - def save_episode(self, split, action_config: dict, episode_data: dict | None = None, keep_images: bool = False) -> None: + def save_episode(self, split, action_config: dict, episode_data: dict | None = None) -> None: """ This will save to disk the current episode in self.episode_buffer. @@ -168,8 +112,7 @@ class RoboMINDDataset(LeRobotDataset): save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to None. """ - if not episode_data: - episode_buffer = self.episode_buffer + episode_buffer = episode_data if episode_data is not None else self.episode_buffer validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features) @@ -182,11 +125,8 @@ class RoboMINDDataset(LeRobotDataset): episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length) episode_buffer["episode_index"] = np.full((episode_length,), episode_index) - # Add new tasks to the tasks dictionary - for task in episode_tasks: - task_index = self.meta.get_task_index(task) - if task_index is None: - self.meta.add_task(task) + # Update tasks and task indices with new tasks if any + self.meta.save_episode_tasks(episode_tasks) # Given tasks in natural language, find their corresponding task indices episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks]) @@ -199,35 +139,32 @@ class RoboMINDDataset(LeRobotDataset): episode_buffer[key] = np.stack(episode_buffer[key]).squeeze() self._wait_image_writer() - self._save_episode_table(episode_buffer, episode_index) ep_stats = compute_episode_stats(episode_buffer, self.features) - if len(self.meta.video_keys) > 0: - video_paths = self.encode_episode_videos(episode_index) - for key in self.meta.video_keys: - episode_buffer[key] = video_paths[key] + ep_metadata = self._save_episode_data(episode_buffer) + has_video_keys = len(self.meta.video_keys) > 0 + use_batched_encoding = self.batch_encoding_size > 1 + + if has_video_keys and not use_batched_encoding: + for video_key in self.meta.video_keys: + ep_metadata.update(self._save_episode_video(video_key, episode_index)) # `meta.save_episode` be executed after encoding the videos - self.meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, action_config) + ep_metadata.update({"action_config": action_config}) + self.meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, ep_metadata) - ep_data_index = get_episode_data_index(self.meta.episodes, [episode_index]) - ep_data_index_np = {k: t.numpy() for k, t in ep_data_index.items()} - check_timestamps_sync( - episode_buffer["timestamp"], - episode_buffer["episode_index"], - ep_data_index_np, - self.fps, - self.tolerance_s, - ) + if has_video_keys and use_batched_encoding: + # Check if we should trigger batch encoding + self.episodes_since_last_encoding += 1 + if self.episodes_since_last_encoding == self.batch_encoding_size: + start_ep = self.num_episodes - self.batch_encoding_size + end_ep = self.num_episodes + self._batch_save_episode_video(start_ep, end_ep) + self.episodes_since_last_encoding = 0 - if not keep_images: - # delete images - img_dir = self.root / "images" - if img_dir.is_dir(): - shutil.rmtree(self.root / "images") - - if not episode_data: # Reset the buffer - self.episode_buffer = self.create_episode_buffer() + if not episode_data: + # Reset episode buffer and clean up temporary images (if not already deleted during video encoding) + self.clear_episode_buffer(delete_images=len(self.meta.image_keys) > 0) def get_all_tasks(src_path: Path, output_path: Path, embodiment: str): @@ -288,8 +225,11 @@ def save_as_lerobot_dataset(task: tuple[dict, Path, str], src_path, benchmark, e if status and len(raw_dataset) >= 50: try: for frame_data in raw_dataset: - dataset.add_frame(frame_data, task_instruction) - dataset.save_episode(split, action_config.get(episode_path.parent.parent.name, {})) + frame_data["task"] = task_instruction + dataset.add_frame(frame_data) + dataset.save_episode( + split, action_config.get(episode_path.parent.parent.name, {"task_summary": None, "steps": None}) + ) logging.info(f"process done for {path}, len {len(raw_dataset)}") except Exception: # [HACK]: not consistent image shape... @@ -324,11 +264,7 @@ def main( save_as_lerobot_dataset(next(tasks), src_path, benchmark, embodiments[0], save_depth) else: runtime_env = RuntimeEnv( - env_vars={ - "HDF5_USE_FILE_LOCKING": "FALSE", - "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE", - "LD_PRELOAD": str(Path(__file__).resolve().parent / "libtcmalloc.so.4.5.3"), - } + env_vars={"HDF5_USE_FILE_LOCKING": "FALSE", "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE"} ) ray.init(runtime_env=runtime_env) resources = ray.available_resources()