diff --git a/agibot2lerobot/agibot_h5.py b/agibot2lerobot/agibot_h5.py index 53a1687..f067e44 100644 --- a/agibot2lerobot/agibot_h5.py +++ b/agibot2lerobot/agibot_h5.py @@ -12,19 +12,21 @@ import torch from agibot_utils.agibot_utils import get_task_info, load_local_dataset from agibot_utils.config import AgiBotWorld_TASK_TYPE from agibot_utils.lerobot_utils import compute_episode_stats, generate_features_from_config -from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata -from lerobot.datasets.utils import DEFAULT_EPISODES_PATH, validate_episode_buffer, validate_frame +from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata +from lerobot.datasets.dataset_writer import DatasetWriter +from lerobot.datasets.feature_utils import get_hf_features_from_features, validate_episode_buffer, validate_frame +from lerobot.datasets.utils import DEFAULT_EPISODES_PATH from ray.runtime_env import RuntimeEnv class AgiBotDatasetMetadata(LeRobotDatasetMetadata): def _flush_metadata_buffer(self) -> None: """Write all buffered episode metadata to parquet file.""" - if not hasattr(self, "metadata_buffer") or len(self.metadata_buffer) == 0: + if not hasattr(self, "_metadata_buffer") or len(self._metadata_buffer) == 0: return combined_dict = {} - for episode_dict in self.metadata_buffer: + for episode_dict in self._metadata_buffer: for key, value in episode_dict.items(): if key not in combined_dict: combined_dict[key] = [] @@ -33,22 +35,138 @@ class AgiBotDatasetMetadata(LeRobotDatasetMetadata): val = value[0] if isinstance(value, list) else value combined_dict[key].append(val.tolist() if isinstance(val, np.ndarray) else val) - first_ep = self.metadata_buffer[0] + first_ep = self._metadata_buffer[0] chunk_idx = first_ep["meta/episodes/chunk_index"][0] file_idx = first_ep["meta/episodes/file_index"][0] - schema = None if not self.writer else self.writer.schema + schema = None if not self._pq_writer else self._pq_writer.schema table = pa.Table.from_pydict(combined_dict, schema=schema) - if not self.writer: + if not self._pq_writer: path = Path(self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)) path.parent.mkdir(parents=True, exist_ok=True) - self.writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True) + self._pq_writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True) - self.writer.write_table(table) + self._pq_writer.write_table(table) - self.latest_episode = self.metadata_buffer[-1] - self.metadata_buffer.clear() + self.latest_episode = self._metadata_buffer[-1] + self._metadata_buffer.clear() + + +class AgiBotDatasetWriter(DatasetWriter): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.hf_features = get_hf_features_from_features(self._meta.features) + + def add_frame(self, frame: dict) -> None: + """ + Add a single frame to the current episode buffer. + + Apart from images written to a temporary directory, nothing is written to disk + until ``save_episode()`` is called. + + The caller must provide all user-defined features plus ``"task"``, and must + not provide ``"timestamp"`` or ``"frame_index"``; those are computed + automatically. + """ + # Convert torch to numpy if needed + for name in frame: + if isinstance(frame[name], torch.Tensor): + frame[name] = frame[name].numpy() + + features = { + key: value for key, value in self._meta.features.items() if key in self.hf_features + } # remove video keys + validate_frame(frame, features) + + if self.episode_buffer is None: + self.episode_buffer = self._create_episode_buffer() + + # Automatically add frame_index and timestamp to episode buffer + frame_index = self.episode_buffer["size"] + timestamp = frame_index / self._meta.fps + self.episode_buffer["frame_index"].append(frame_index) + self.episode_buffer["timestamp"].append(timestamp) + self.episode_buffer["task"].append(frame.pop("task")) + + # Add frame features to episode_buffer + for key, value in frame.items(): + if key not in self._meta.features: + raise ValueError( + f"An element of the frame is not in the features. '{key}' not in '{self._meta.features.keys()}'." + ) + + self.episode_buffer[key].append(value) + + self.episode_buffer["size"] += 1 + + def save_episode( + self, videos: dict, action_config: list, episode_data: dict | None = None, parallel_encoding: bool = True + ) -> None: + """Save the current episode in self.episode_buffer to disk.""" + episode_buffer = episode_data if episode_data is not None else self.episode_buffer + + validate_episode_buffer(episode_buffer, self._meta.total_episodes, self._meta.features) + + # size and task are special cases that won't be added to hf_dataset + episode_length = episode_buffer.pop("size") + tasks = episode_buffer.pop("task") + episode_tasks = list(set(tasks)) + episode_index = episode_buffer["episode_index"] + + episode_buffer["index"] = np.arange(self._meta.total_frames, self._meta.total_frames + episode_length) + episode_buffer["episode_index"] = np.full((episode_length,), episode_index) + + # Update tasks and task indices with new tasks if any + self._meta.save_episode_tasks(episode_tasks) + + # Given tasks in natural language, find their corresponding task indices + episode_buffer["task_index"] = np.array([self._meta.get_task_index(task) for task in tasks]) + + for key, ft in self._meta.features.items(): + # index, episode_index, task_index are already processed above, and image and video + # are processed separately by storing image path and frame info as meta data + if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]: + continue + episode_buffer[key] = np.stack(episode_buffer[key]).squeeze() + + for key in self._meta.video_keys: + episode_buffer[key] = str(videos[key]) + + ep_stats = compute_episode_stats(episode_buffer, self._meta.features) + + ep_metadata = self._save_episode_data(episode_buffer) + has_video_keys = len(self._meta.video_keys) > 0 + use_batched_encoding = self._batch_encoding_size > 1 + + self.current_videos = videos + if has_video_keys and not use_batched_encoding: + for video_key in self._meta.video_keys: + ep_metadata.update(self._save_episode_video(video_key, episode_index)) + + ep_metadata.update({"action_config": action_config}) + self._meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata) + + if has_video_keys and use_batched_encoding: + self._episodes_since_last_encoding += 1 + if self._episodes_since_last_encoding == self._batch_encoding_size: + start_ep = self._meta.total_episodes - self._batch_encoding_size + end_ep = self._meta.total_episodes + self._batch_save_episode_video(start_ep, end_ep) + self._episodes_since_last_encoding = 0 + + if not episode_data: + self.clear_episode_buffer(delete_images=len(self._meta.image_keys) > 0) + + def _encode_temporary_episode_video(self, video_key: str, episode_index: int) -> Path: + """ + Use ffmpeg to convert frames stored as png into mp4 videos. + Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding, + since video encoding with ffmpeg is already using multithreading. + """ + temp_path = Path(tempfile.mkdtemp(dir=self._root)) / f"{video_key}_{episode_index:03d}.mp4" + shutil.copy(self.current_videos[video_key], temp_path) + return temp_path class AgiBotDataset(LeRobotDataset): @@ -65,125 +183,26 @@ class AgiBotDataset(LeRobotDataset): obj.meta: AgiBotDatasetMetadata = AgiBotDatasetMetadata.create( repo_id=params["repo_id"], fps=params["fps"], - robot_type=params.get("robot_type"), + robot_type=params["robot_type"], features=params["features"], - root=params.get("root"), - use_videos=params.get("use_videos", True), - metadata_buffer_size=params.get("metadata_buffer_size", 10), + root=params["root"], + use_videos=params["use_videos"], + metadata_buffer_size=params["metadata_buffer_size"], + ) + obj.writer: AgiBotDatasetWriter = AgiBotDatasetWriter( + meta=obj.meta, + root=obj.root, + vcodec=obj._vcodec, + encoder_threads=obj._encoder_threads, + batch_encoding_size=obj._batch_encoding_size, ) return obj - def add_frame(self, frame: dict) -> None: - """ - This function only adds the frame to the episode_buffer. Apart from images — which are written in a - temporary directory — nothing is written to disk. To save those frames, the 'save_episode()' method - then needs to be called. - """ - # Convert torch to numpy if needed - for name in frame: - if isinstance(frame[name], torch.Tensor): - frame[name] = frame[name].numpy() - - features = {key: value for key, value in self.features.items() if key in self.hf_features} # remove video keys - validate_frame(frame, features) - - if self.episode_buffer is None: - self.episode_buffer = self.create_episode_buffer() - - # Automatically add frame_index and timestamp to episode buffer - frame_index = self.episode_buffer["size"] - timestamp = frame.pop("timestamp") if "timestamp" in frame else frame_index / self.fps - self.episode_buffer["frame_index"].append(frame_index) - self.episode_buffer["timestamp"].append(timestamp) - self.episode_buffer["task"].append(frame.pop("task")) # Remove task from frame after processing - - # Add frame features to episode_buffer - for key, value in frame.items(): - if key not in self.features: - raise ValueError( - f"An element of the frame is not in the features. '{key}' not in '{self.features.keys()}'." - ) - - self.episode_buffer[key].append(value) - - self.episode_buffer["size"] += 1 - - def save_episode(self, videos: dict, action_config: list, episode_data: dict | None = None) -> None: - """ - This will save to disk the current episode in self.episode_buffer. - - Args: - episode_data (dict | None, optional): Dict containing the episode data to save. If None, this will - save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to - None. - """ - episode_buffer = episode_data if episode_data is not None else self.episode_buffer - - validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features) - - # size and task are special cases that won't be added to hf_dataset - episode_length = episode_buffer.pop("size") - tasks = episode_buffer.pop("task") - episode_tasks = list(set(tasks)) - episode_index = episode_buffer["episode_index"] - - episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length) - episode_buffer["episode_index"] = np.full((episode_length,), episode_index) - - # Update tasks and task indices with new tasks if any - self.meta.save_episode_tasks(episode_tasks) - - # Given tasks in natural language, find their corresponding task indices - episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks]) - - for key, ft in self.features.items(): - # index, episode_index, task_index are already processed above, and image and video - # are processed separately by storing image path and frame info as meta data - if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]: - continue - episode_buffer[key] = np.stack(episode_buffer[key]).squeeze() - - for key in self.meta.video_keys: - episode_buffer[key] = str(videos[key]) # PosixPath -> str - - ep_stats = compute_episode_stats(episode_buffer, self.features) - - ep_metadata = self._save_episode_data(episode_buffer) - has_video_keys = len(self.meta.video_keys) > 0 - use_batched_encoding = self.batch_encoding_size > 1 - - self.current_videos = videos - if has_video_keys and not use_batched_encoding: - for video_key in self.meta.video_keys: - ep_metadata.update(self._save_episode_video(video_key, episode_index)) - - # `meta.save_episode` be executed after encoding the videos - # add action_config to current episode - ep_metadata.update({"action_config": action_config}) - self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata) - - if has_video_keys and use_batched_encoding: - # Check if we should trigger batch encoding - self.episodes_since_last_encoding += 1 - if self.episodes_since_last_encoding == self.batch_encoding_size: - start_ep = self.num_episodes - self.batch_encoding_size - end_ep = self.num_episodes - self._batch_save_episode_video(start_ep, end_ep) - self.episodes_since_last_encoding = 0 - - if not episode_data: - # Reset episode buffer and clean up temporary images (if not already deleted during video encoding) - self.clear_episode_buffer(delete_images=len(self.meta.image_keys) > 0) - - def _encode_temporary_episode_video(self, video_key: str, episode_index: int) -> Path: - """ - Use ffmpeg to convert frames stored as png into mp4 videos. - Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding, - since video encoding with ffmpeg is already using multithreading. - """ - temp_path = Path(tempfile.mkdtemp(dir=self.root)) / f"{video_key}_{episode_index:03d}.mp4" - shutil.copy(self.current_videos[video_key], temp_path) - return temp_path + def save_episode( + self, videos: dict, action_config: list, episode_data: dict | None = None, parallel_encoding: bool = True + ) -> None: + self._require_writer("save_episode") + self.writer.save_episode(videos, action_config, episode_data, parallel_encoding) def get_all_tasks(src_path: Path, output_path: Path): diff --git a/agibot2lerobot/agibot_utils/lerobot_utils.py b/agibot2lerobot/agibot_utils/lerobot_utils.py index c420535..93f468a 100644 --- a/agibot2lerobot/agibot_utils/lerobot_utils.py +++ b/agibot2lerobot/agibot_utils/lerobot_utils.py @@ -1,9 +1,11 @@ import numpy as np -import torch -import torchvision -from lerobot.datasets.compute_stats import auto_downsample_height_width, get_feature_stats, sample_indices - -torchvision.set_video_backend("pyav") +from lerobot.datasets.compute_stats import ( + DEFAULT_QUANTILES, + auto_downsample_height_width, + get_feature_stats, + sample_indices, +) +from torchcodec.decoders import VideoDecoder def generate_features_from_config(AgiBotWorld_CONFIG): @@ -20,9 +22,8 @@ def generate_features_from_config(AgiBotWorld_CONFIG): def sample_images(input): if type(input) is str: video_path = input - reader = torchvision.io.VideoReader(video_path, stream="video") - frames = [frame["data"] for frame in reader] - frames_array = torch.stack(frames).numpy() # Shape: [T, C, H, W] + decoder = VideoDecoder(video_path) + frames_array = decoder[0:-1].numpy() # Shape: [T, C, H, W] sampled_indices = sample_indices(len(frames_array)) images = None @@ -50,21 +51,31 @@ def sample_images(input): return images -def compute_episode_stats(episode_data: dict[str, list[str] | np.ndarray], features: dict) -> dict: +def compute_episode_stats( + episode_data: dict[str, list[str] | np.ndarray], + features: dict, + quantile_list: list[float] | None = None, +) -> dict: + if quantile_list is None: + quantile_list = DEFAULT_QUANTILES + ep_stats = {} for key, data in episode_data.items(): if features[key]["dtype"] == "string": - continue # HACK: we should receive np.arrays of strings + continue + elif features[key]["dtype"] in ["image", "video"]: ep_ft_array = sample_images(data) - axes_to_reduce = (0, 2, 3) # keep channel dim + axes_to_reduce = (0, 2, 3) keepdims = True else: - ep_ft_array = data # data is already a np.ndarray - axes_to_reduce = 0 # compute stats over the first axis - keepdims = data.ndim == 1 # keep as np.array + ep_ft_array = data + axes_to_reduce = 0 + keepdims = data.ndim == 1 - ep_stats[key] = get_feature_stats(ep_ft_array, axis=axes_to_reduce, keepdims=keepdims) + ep_stats[key] = get_feature_stats( + ep_ft_array, axis=axes_to_reduce, keepdims=keepdims, quantile_list=quantile_list + ) if features[key]["dtype"] in ["image", "video"]: value_norm = 1.0 if "depth" in key else 255.0 diff --git a/libero2lerobot/libero_h5.py b/libero2lerobot/libero_h5.py index 859a4f2..20d475c 100644 --- a/libero2lerobot/libero_h5.py +++ b/libero2lerobot/libero_h5.py @@ -8,6 +8,7 @@ import pandas as pd import ray from datatrove.executor import LocalPipelineExecutor, RayPipelineExecutor from datatrove.pipeline.base import PipelineStep +from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata from lerobot.datasets.aggregate import ( aggregate_data, aggregate_metadata, @@ -15,14 +16,11 @@ from lerobot.datasets.aggregate import ( aggregate_videos, validate_all_metadata, ) -from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata +from lerobot.datasets.io_utils import write_info, write_stats, write_tasks from lerobot.datasets.utils import ( DEFAULT_CHUNK_SIZE, DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_VIDEO_FILE_SIZE_IN_MB, - write_info, - write_stats, - write_tasks, ) from libero_utils.config import LIBERO_FEATURES from libero_utils.libero_utils import load_local_episodes @@ -171,7 +169,9 @@ def main( ) ) if len(src_paths) > 1: - aggregate_output_path = output_path / ("_".join([src_path.name for src_path in src_paths]) + "_aggregated_lerobot") + aggregate_output_path = output_path / ( + "_".join([src_path.name for src_path in src_paths]) + "_aggregated_lerobot" + ) else: aggregate_output_path = output_path / f"{src_paths[0].name}_lerobot" aggregate_output_path = aggregate_output_path.resolve() @@ -234,7 +234,9 @@ if __name__ == "__main__": parser.add_argument("--output-path", type=Path, required=True) parser.add_argument("--executor", type=str, choices=["local", "ray"], default="local") parser.add_argument("--cpus-per-task", type=int, default=1) - parser.add_argument("--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray") + parser.add_argument( + "--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray" + ) parser.add_argument("--workers", type=int, default=-1, help="number of concurrent jobs to run") parser.add_argument("--resume-dir", type=Path, help="logs directory to resume") parser.add_argument("--debug", action="store_true") diff --git a/robomind2lerobot/robomind_h5.py b/robomind2lerobot/robomind_h5.py index 533685a..0d67824 100644 --- a/robomind2lerobot/robomind_h5.py +++ b/robomind2lerobot/robomind_h5.py @@ -1,4 +1,5 @@ import argparse +import concurrent.futures import inspect import json import logging @@ -10,9 +11,12 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import ray +from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata from lerobot.datasets.compute_stats import aggregate_stats -from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata -from lerobot.datasets.utils import DEFAULT_EPISODES_PATH, flatten_dict, validate_episode_buffer, write_info, write_stats +from lerobot.datasets.dataset_writer import DatasetWriter, _encode_video_worker +from lerobot.datasets.feature_utils import validate_episode_buffer +from lerobot.datasets.io_utils import write_info, write_stats +from lerobot.datasets.utils import DEFAULT_EPISODES_PATH, flatten_dict from ray.runtime_env import RuntimeEnv from robomind_uitls.configs import ROBOMIND_CONFIG from robomind_uitls.lerobot_uitls import compute_episode_stats, generate_features_from_config @@ -24,11 +28,11 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %( class RoboMINDDatasetMetadata(LeRobotDatasetMetadata): def _flush_metadata_buffer(self) -> None: """Write all buffered episode metadata to parquet file.""" - if not hasattr(self, "metadata_buffer") or len(self.metadata_buffer) == 0: + if not hasattr(self, "_metadata_buffer") or len(self._metadata_buffer) == 0: return combined_dict = {} - for episode_dict in self.metadata_buffer: + for episode_dict in self._metadata_buffer: for key, value in episode_dict.items(): if key not in combined_dict: combined_dict[key] = [] @@ -37,22 +41,22 @@ class RoboMINDDatasetMetadata(LeRobotDatasetMetadata): val = value[0] if isinstance(value, list) else value combined_dict[key].append(val.tolist() if isinstance(val, np.ndarray) else val) - first_ep = self.metadata_buffer[0] + first_ep = self._metadata_buffer[0] chunk_idx = first_ep["meta/episodes/chunk_index"][0] file_idx = first_ep["meta/episodes/file_index"][0] - schema = None if not self.writer else self.writer.schema + schema = None if not self._pq_writer else self._pq_writer.schema table = pa.Table.from_pydict(combined_dict, schema=schema) - if not self.writer: + if not self._pq_writer: path = Path(self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)) path.parent.mkdir(parents=True, exist_ok=True) - self.writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True) + self._pq_writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True) - self.writer.write_table(table) + self._pq_writer.write_table(table) - self.latest_episode = self.metadata_buffer[-1] - self.metadata_buffer.clear() + self.latest_episode = self._metadata_buffer[-1] + self._metadata_buffer.clear() def save_episode( self, @@ -88,6 +92,116 @@ class RoboMINDDatasetMetadata(LeRobotDatasetMetadata): write_stats(self.stats, self.root) +class RoboMINDDatasetWriter(DatasetWriter): + def save_episode( + self, + split, + action_config: dict, + episode_data: dict | None = None, + parallel_encoding: bool = True, + ) -> None: + """Save the current episode in self.episode_buffer to disk.""" + episode_buffer = episode_data if episode_data is not None else self.episode_buffer + + validate_episode_buffer(episode_buffer, self._meta.total_episodes, self._meta.features) + + # size and task are special cases that won't be added to hf_dataset + episode_length = episode_buffer.pop("size") + tasks = episode_buffer.pop("task") + episode_tasks = list(set(tasks)) + episode_index = episode_buffer["episode_index"] + + episode_buffer["index"] = np.arange(self._meta.total_frames, self._meta.total_frames + episode_length) + episode_buffer["episode_index"] = np.full((episode_length,), episode_index) + + # Update tasks and task indices with new tasks if any + self._meta.save_episode_tasks(episode_tasks) + + # Given tasks in natural language, find their corresponding task indices + episode_buffer["task_index"] = np.array([self._meta.get_task_index(task) for task in tasks]) + + for key, ft in self._meta.features.items(): + if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]: + continue + episode_buffer[key] = np.stack(episode_buffer[key]).squeeze() + + # Wait for image writer to end, so that episode stats over images can be computed + self._wait_image_writer() + has_video_keys = len(self._meta.video_keys) > 0 + use_streaming = self._streaming_encoder is not None and has_video_keys + use_batched_encoding = self._batch_encoding_size > 1 + + if use_streaming: + non_video_buffer = { + k: v for k, v in episode_buffer.items() if self._meta.features.get(k, {}).get("dtype") not in ("video",) + } + non_video_features = {k: v for k, v in self._meta.features.items() if v["dtype"] != "video"} + ep_stats = compute_episode_stats(non_video_buffer, non_video_features) + else: + ep_stats = compute_episode_stats(episode_buffer, self._meta.features) + + ep_metadata = self._save_episode_data(episode_buffer) + + if use_streaming: + streaming_results = self._streaming_encoder.finish_episode() + for video_key in self._meta.video_keys: + temp_path, video_stats = streaming_results[video_key] + if video_stats is not None: + ep_stats[video_key] = { + k: v if k == "count" else np.squeeze(v.reshape(1, -1, 1, 1) / 255.0, axis=0) + for k, v in video_stats.items() + } + ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path)) + elif has_video_keys and not use_batched_encoding: + num_cameras = len(self._meta.video_keys) + if parallel_encoding and num_cameras > 1: + with concurrent.futures.ProcessPoolExecutor(max_workers=num_cameras) as executor: + future_to_key = { + executor.submit( + _encode_video_worker, + video_key, + episode_index, + self._root, + self._meta.fps, + self._vcodec, + self._encoder_threads, + ): video_key + for video_key in self._meta.video_keys + } + + results = {} + for future in concurrent.futures.as_completed(future_to_key): + video_key = future_to_key[future] + try: + temp_path = future.result() + results[video_key] = temp_path + except Exception as exc: + logging.error(f"Video encoding failed for {video_key}: {exc}") + raise exc + + for video_key in self._meta.video_keys: + temp_path = results[video_key] + ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path)) + else: + for video_key in self._meta.video_keys: + ep_metadata.update(self._save_episode_video(video_key, episode_index)) + + # `meta.save_episode` be executed after encoding the videos + ep_metadata.update({"action_config": action_config}) + self._meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, ep_metadata) + + if has_video_keys and use_batched_encoding: + self._episodes_since_last_encoding += 1 + if self._episodes_since_last_encoding == self._batch_encoding_size: + start_ep = self._meta.total_episodes - self._batch_encoding_size + end_ep = self._meta.total_episodes + self._batch_save_episode_video(start_ep, end_ep) + self._episodes_since_last_encoding = 0 + + if not episode_data: + self.clear_episode_buffer(delete_images=len(self._meta.image_keys) > 0) + + class RoboMINDDataset(LeRobotDataset): @classmethod def create(cls, *args, **kwargs) -> "RoboMINDDataset": @@ -108,70 +222,20 @@ class RoboMINDDataset(LeRobotDataset): use_videos=params["use_videos"], metadata_buffer_size=params["metadata_buffer_size"], ) + obj.writer: RoboMINDDatasetWriter = RoboMINDDatasetWriter( + meta=obj.meta, + root=obj.root, + vcodec=obj._vcodec, + encoder_threads=obj._encoder_threads, + batch_encoding_size=obj._batch_encoding_size, + ) return obj - def save_episode(self, split, action_config: dict, episode_data: dict | None = None) -> None: - """ - This will save to disk the current episode in self.episode_buffer. - - Args: - episode_data (dict | None, optional): Dict containing the episode data to save. If None, this will - save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to - None. - """ - episode_buffer = episode_data if episode_data is not None else self.episode_buffer - - validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features) - - # size and task are special cases that won't be added to hf_dataset - episode_length = episode_buffer.pop("size") - tasks = episode_buffer.pop("task") - episode_tasks = list(set(tasks)) - episode_index = episode_buffer["episode_index"] - - episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length) - episode_buffer["episode_index"] = np.full((episode_length,), episode_index) - - # Update tasks and task indices with new tasks if any - self.meta.save_episode_tasks(episode_tasks) - - # Given tasks in natural language, find their corresponding task indices - episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks]) - - for key, ft in self.features.items(): - # index, episode_index, task_index are already processed above, and image and video - # are processed separately by storing image path and frame info as meta data - if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]: - continue - episode_buffer[key] = np.stack(episode_buffer[key]).squeeze() - - self._wait_image_writer() - ep_stats = compute_episode_stats(episode_buffer, self.features) - - ep_metadata = self._save_episode_data(episode_buffer) - has_video_keys = len(self.meta.video_keys) > 0 - use_batched_encoding = self.batch_encoding_size > 1 - - if has_video_keys and not use_batched_encoding: - for video_key in self.meta.video_keys: - ep_metadata.update(self._save_episode_video(video_key, episode_index)) - - # `meta.save_episode` be executed after encoding the videos - ep_metadata.update({"action_config": action_config}) - self.meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, ep_metadata) - - if has_video_keys and use_batched_encoding: - # Check if we should trigger batch encoding - self.episodes_since_last_encoding += 1 - if self.episodes_since_last_encoding == self.batch_encoding_size: - start_ep = self.num_episodes - self.batch_encoding_size - end_ep = self.num_episodes - self._batch_save_episode_video(start_ep, end_ep) - self.episodes_since_last_encoding = 0 - - if not episode_data: - # Reset episode buffer and clean up temporary images (if not already deleted during video encoding) - self.clear_episode_buffer(delete_images=len(self.meta.image_keys) > 0) + def save_episode( + self, split, action_config: dict, episode_data: dict | None = None, parallel_encoding: bool = True + ) -> None: + self._require_writer("save_episode") + self.writer.save_episode(split, action_config, episode_data, parallel_encoding) def get_all_tasks(src_path: Path, output_path: Path, embodiment: str): diff --git a/robomind2lerobot/robomind_uitls/lerobot_uitls.py b/robomind2lerobot/robomind_uitls/lerobot_uitls.py index 41666ba..efcc652 100644 --- a/robomind2lerobot/robomind_uitls/lerobot_uitls.py +++ b/robomind2lerobot/robomind_uitls/lerobot_uitls.py @@ -1,9 +1,11 @@ import numpy as np -import torchvision -from lerobot.datasets.compute_stats import auto_downsample_height_width, get_feature_stats, sample_indices -from lerobot.datasets.utils import load_image_as_numpy - -torchvision.set_video_backend("pyav") +from lerobot.datasets.compute_stats import ( + DEFAULT_QUANTILES, + auto_downsample_height_width, + get_feature_stats, + sample_indices, +) +from lerobot.datasets.io_utils import load_image_as_numpy def generate_features_from_config(AgiBotWorld_CONFIG): @@ -49,21 +51,31 @@ def sample_images(input): return images -def compute_episode_stats(episode_data: dict[str, list[str] | np.ndarray], features: dict) -> dict: +def compute_episode_stats( + episode_data: dict[str, list[str] | np.ndarray], + features: dict, + quantile_list: list[float] | None = None, +) -> dict: + if quantile_list is None: + quantile_list = DEFAULT_QUANTILES + ep_stats = {} for key, data in episode_data.items(): if features[key]["dtype"] == "string": - continue # HACK: we should receive np.arrays of strings + continue + elif features[key]["dtype"] in ["image", "video"]: ep_ft_array = sample_images(data) - axes_to_reduce = (0, 2, 3) # keep channel dim + axes_to_reduce = (0, 2, 3) keepdims = True else: - ep_ft_array = data # data is already a np.ndarray - axes_to_reduce = 0 # compute stats over the first axis - keepdims = data.ndim == 1 # keep as np.array + ep_ft_array = data + axes_to_reduce = 0 + keepdims = data.ndim == 1 - ep_stats[key] = get_feature_stats(ep_ft_array, axis=axes_to_reduce, keepdims=keepdims) + ep_stats[key] = get_feature_stats( + ep_ft_array, axis=axes_to_reduce, keepdims=keepdims, quantile_list=quantile_list + ) if features[key]["dtype"] in ["image", "video"]: value_norm = 1.0 if "depth" in key else 255.0