import argparse import concurrent.futures import inspect import json import logging import shutil from pathlib import Path import numpy as np import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import ray from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata from lerobot.datasets.compute_stats import aggregate_stats from lerobot.datasets.dataset_writer import DatasetWriter, _encode_video_worker from lerobot.datasets.feature_utils import validate_episode_buffer from lerobot.datasets.io_utils import write_info, write_stats from lerobot.datasets.utils import DEFAULT_EPISODES_PATH, flatten_dict from ray.runtime_env import RuntimeEnv from robomind_uitls.configs import ROBOMIND_CONFIG from robomind_uitls.lerobot_uitls import compute_episode_stats, generate_features_from_config from robomind_uitls.robomind_uitls import load_local_dataset logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") class RoboMINDDatasetMetadata(LeRobotDatasetMetadata): def _flush_metadata_buffer(self) -> None: """Write all buffered episode metadata to parquet file.""" if not hasattr(self, "_metadata_buffer") or len(self._metadata_buffer) == 0: return combined_dict = {} for episode_dict in self._metadata_buffer: for key, value in episode_dict.items(): if key not in combined_dict: combined_dict[key] = [] # Extract value and serialize numpy arrays # because PyArrow's from_pydict function doesn't support numpy arrays val = value[0] if isinstance(value, list) else value combined_dict[key].append(val.tolist() if isinstance(val, np.ndarray) else val) first_ep = self._metadata_buffer[0] chunk_idx = first_ep["meta/episodes/chunk_index"][0] file_idx = first_ep["meta/episodes/file_index"][0] schema = None if not self._pq_writer else self._pq_writer.schema table = pa.Table.from_pydict(combined_dict, schema=schema) if not self._pq_writer: path = Path(self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)) path.parent.mkdir(parents=True, exist_ok=True) self._pq_writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True) self._pq_writer.write_table(table) self.latest_episode = self._metadata_buffer[-1] self._metadata_buffer.clear() def save_episode( self, split, episode_index: int, episode_length: int, episode_tasks: list[str], episode_stats: dict[str, dict], episode_metadata: dict, ) -> None: episode_dict = { "episode_index": episode_index, "tasks": episode_tasks, "length": episode_length, } episode_dict.update(episode_metadata) episode_dict.update(flatten_dict({"stats": episode_stats})) self._save_episode_metadata(episode_dict) # Update info self.info["total_episodes"] += 1 self.info["total_frames"] += episode_length self.info["total_tasks"] = len(self.tasks) if split == "train": self.info["splits"]["train"] = f"0:{self.info['total_episodes']}" self.train_count = self.info["total_episodes"] elif "val" in split: self.info["splits"]["validation"] = f"{self.train_count}:{self.info['total_episodes']}" write_info(self.info, self.root) self.stats = aggregate_stats([self.stats, episode_stats]) if self.stats is not None else episode_stats write_stats(self.stats, self.root) class RoboMINDDatasetWriter(DatasetWriter): def save_episode( self, split, action_config: dict, episode_data: dict | None = None, parallel_encoding: bool = True, ) -> None: """Save the current episode in self.episode_buffer to disk.""" episode_buffer = episode_data if episode_data is not None else self.episode_buffer validate_episode_buffer(episode_buffer, self._meta.total_episodes, self._meta.features) # size and task are special cases that won't be added to hf_dataset episode_length = episode_buffer.pop("size") tasks = episode_buffer.pop("task") episode_tasks = list(set(tasks)) episode_index = episode_buffer["episode_index"] episode_buffer["index"] = np.arange(self._meta.total_frames, self._meta.total_frames + episode_length) episode_buffer["episode_index"] = np.full((episode_length,), episode_index) # Update tasks and task indices with new tasks if any self._meta.save_episode_tasks(episode_tasks) # Given tasks in natural language, find their corresponding task indices episode_buffer["task_index"] = np.array([self._meta.get_task_index(task) for task in tasks]) for key, ft in self._meta.features.items(): if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]: continue episode_buffer[key] = np.stack(episode_buffer[key]).squeeze() # Wait for image writer to end, so that episode stats over images can be computed self._wait_image_writer() has_video_keys = len(self._meta.video_keys) > 0 use_streaming = self._streaming_encoder is not None and has_video_keys use_batched_encoding = self._batch_encoding_size > 1 if use_streaming: non_video_buffer = { k: v for k, v in episode_buffer.items() if self._meta.features.get(k, {}).get("dtype") not in ("video",) } non_video_features = {k: v for k, v in self._meta.features.items() if v["dtype"] != "video"} ep_stats = compute_episode_stats(non_video_buffer, non_video_features) else: ep_stats = compute_episode_stats(episode_buffer, self._meta.features) ep_metadata = self._save_episode_data(episode_buffer) if use_streaming: streaming_results = self._streaming_encoder.finish_episode() for video_key in self._meta.video_keys: temp_path, video_stats = streaming_results[video_key] if video_stats is not None: ep_stats[video_key] = { k: v if k == "count" else np.squeeze(v.reshape(1, -1, 1, 1) / 255.0, axis=0) for k, v in video_stats.items() } ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path)) elif has_video_keys and not use_batched_encoding: num_cameras = len(self._meta.video_keys) if parallel_encoding and num_cameras > 1: with concurrent.futures.ProcessPoolExecutor(max_workers=num_cameras) as executor: future_to_key = { executor.submit( _encode_video_worker, video_key, episode_index, self._root, self._meta.fps, self._vcodec, self._encoder_threads, ): video_key for video_key in self._meta.video_keys } results = {} for future in concurrent.futures.as_completed(future_to_key): video_key = future_to_key[future] try: temp_path = future.result() results[video_key] = temp_path except Exception as exc: logging.error(f"Video encoding failed for {video_key}: {exc}") raise exc for video_key in self._meta.video_keys: temp_path = results[video_key] ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path)) else: for video_key in self._meta.video_keys: ep_metadata.update(self._save_episode_video(video_key, episode_index)) # `meta.save_episode` be executed after encoding the videos ep_metadata.update({"action_config": action_config}) self._meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, ep_metadata) if has_video_keys and use_batched_encoding: self._episodes_since_last_encoding += 1 if self._episodes_since_last_encoding == self._batch_encoding_size: start_ep = self._meta.total_episodes - self._batch_encoding_size end_ep = self._meta.total_episodes self._batch_save_episode_video(start_ep, end_ep) self._episodes_since_last_encoding = 0 if not episode_data: self.clear_episode_buffer(delete_images=len(self._meta.image_keys) > 0) class RoboMINDDataset(LeRobotDataset): @classmethod def create(cls, *args, **kwargs) -> "RoboMINDDataset": sig = inspect.signature(super().create) bound = sig.bind_partial(*args, **kwargs) bound.apply_defaults() params = bound.arguments obj = super().create(*args, **kwargs) shutil.rmtree(params["root"], ignore_errors=True) obj.meta: RoboMINDDatasetMetadata = RoboMINDDatasetMetadata.create( repo_id=params["repo_id"], fps=params["fps"], robot_type=params["robot_type"], features=params["features"], root=params["root"], use_videos=params["use_videos"], metadata_buffer_size=params["metadata_buffer_size"], ) obj.writer: RoboMINDDatasetWriter = RoboMINDDatasetWriter( meta=obj.meta, root=obj.root, vcodec=obj._vcodec, encoder_threads=obj._encoder_threads, batch_encoding_size=obj._batch_encoding_size, ) return obj def save_episode( self, split, action_config: dict, episode_data: dict | None = None, parallel_encoding: bool = True ) -> None: self._require_writer("save_episode") self.writer.save_episode(split, action_config, episode_data, parallel_encoding) def get_all_tasks(src_path: Path, output_path: Path, embodiment: str): output_path = output_path / src_path.name / embodiment src_path = src_path / f"h5_{embodiment}" if src_path.exists(): df = pd.read_csv(src_path.parent.parent / "RoboMIND_v1_2_instr.csv", index_col=0).drop_duplicates() instruction_dict = df.set_index("task")["instruction"].to_dict() for task_type in src_path.iterdir(): yield ( task_type.name, {"train": task_type / "success_episodes" / "train", "val": task_type / "success_episodes" / "val"}, (output_path / task_type.name).resolve(), instruction_dict[task_type.name], ) def save_as_lerobot_dataset(task: tuple[dict, Path, str], src_path, benchmark, embodiment, save_depth): task_type, splits, local_dir, task_instruction = task config = ROBOMIND_CONFIG[embodiment] features = generate_features_from_config(config) # [HACK]: franka and ur image is bgr... bgr2rgb = False if embodiment in ["franka_1rgb", "franka_3rgb", "franka_fr3_dual", "ur_1rgb"]: bgr2rgb = True if local_dir.exists(): shutil.rmtree(local_dir) if not save_depth: features = dict(filter(lambda item: "depth" not in item[0], features.items())) dataset: RoboMINDDataset = RoboMINDDataset.create( repo_id=f"{embodiment}/{local_dir.name}", root=local_dir, fps=30, robot_type=embodiment, features=features, ) logging.info(f"start processing for {benchmark}, {embodiment}, {task_type}, saving to {local_dir}") for split, path in splits.items(): action_config_path = src_path / "language_description_annotation_json" / f"h5_{embodiment}.json" if action_config_path.exists(): action_config = json.load(open(action_config_path)) action_config = { Path(config["id"]).parent.name: config["response"] for config in action_config if local_dir.name in config["id"] and split in config["id"] } else: action_config = {} for episode_path in path.glob("**/trajectory.hdf5"): status, raw_dataset, err = load_local_dataset(episode_path, config, save_depth, bgr2rgb) if status and len(raw_dataset) >= 50: try: for frame_data in raw_dataset: frame_data["task"] = task_instruction dataset.add_frame(frame_data) dataset.save_episode( split, action_config.get(episode_path.parent.parent.name, {"task_summary": None, "steps": None}) ) logging.info(f"process done for {path}, len {len(raw_dataset)}") except Exception: # [HACK]: not consistent image shape... if config["images"]["camera_top"]["shape"] == (720, 1280, 3): config["images"]["camera_top"]["shape"] = (480, 640, 3) config["images"]["camera_top_depth"]["shape"] = (480, 640, 1) else: config["images"]["camera_top"]["shape"] = (720, 1280, 3) config["images"]["camera_top_depth"]["shape"] = (720, 1280, 1) save_as_lerobot_dataset(task, src_path, benchmark, embodiment, save_depth) return else: logging.warning(f"Skipped {episode_path}: len of dataset:{len(raw_dataset)} or {str(err)}") dataset.finalize() if dataset.meta.total_episodes == 0: shutil.rmtree(local_dir) def main( src_path: Path, output_path: Path, benchmark: str, embodiments: list[str], cpus_per_task: int, save_depth: bool, debug: bool = False, ): if debug: tasks = get_all_tasks(src_path / benchmark, output_path, embodiments[0]) save_as_lerobot_dataset(next(tasks), src_path, benchmark, embodiments[0], save_depth) else: runtime_env = RuntimeEnv( env_vars={"HDF5_USE_FILE_LOCKING": "FALSE", "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE"} ) ray.init(runtime_env=runtime_env) resources = ray.available_resources() cpus = int(resources["CPU"]) logging.info(f"Available CPUs: {cpus}, num_cpus_per_task: {cpus_per_task}") remote_task = ray.remote(save_as_lerobot_dataset).options(num_cpus=cpus_per_task) futures = [] for embodiment in embodiments: tasks = get_all_tasks(src_path / benchmark, output_path, embodiment) for task in tasks: futures.append((task[1], remote_task.remote(task, src_path, benchmark, embodiment, save_depth))) for task_path, future in futures: try: ray.get(future) except Exception as e: logging.error(f"Exception occurred for {task_path['train']}") with open("output.txt", "a") as f: f.write(f"{task_path['train']}, exception details: {str(e)}\n") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--src-path", type=Path, required=True) parser.add_argument( "--benchmark", type=str, choices=["benchmark1_0_release", "benchmark1_1_release", "benchmark1_2_release"], default="benchmark1_1_release", ) parser.add_argument("--output-path", type=Path, required=True) parser.add_argument( "--embodiments", type=str, nargs="+", help=str( [ "agilex_3rgb", "franka_1rgb", "franka_3rgb", "franka_fr3_dual", "tienkung_gello_1rgb", "tienkung_prod1_gello_1rgb", "tienkung_xsens_1rgb", "ur_1rgb", ] ), default=["agilex_3rgb"], ) parser.add_argument("--cpus-per-task", type=int, default=2) parser.add_argument("--save-depth", action="store_true") parser.add_argument("--debug", action="store_true") args = parser.parse_args() main(**vars(args))