From 14743b896ea5de8c842658920141df369deb53f4 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 3 Nov 2025 12:23:12 +0000 Subject: [PATCH] * refactor behaviour1k_lerobot_dataset.py * add example scripts to load behaviour 1k data in `load_behaviour1k_dataset.py` --- .../behavior_lerobot_dataset_v3.py | 606 ++++++++++++------ .../behavior_1k/behaviour_1k_constants.py | 15 + examples/behavior_1k/convert_to_lerobot_v3.py | 18 +- .../behavior_1k/load_behavior_1k_dataset.py | 130 ++++ 4 files changed, 570 insertions(+), 199 deletions(-) create mode 100644 examples/behavior_1k/load_behavior_1k_dataset.py diff --git a/examples/behavior_1k/behavior_lerobot_dataset_v3.py b/examples/behavior_1k/behavior_lerobot_dataset_v3.py index c867f9cfa..8b448f89f 100644 --- a/examples/behavior_1k/behavior_lerobot_dataset_v3.py +++ b/examples/behavior_1k/behavior_lerobot_dataset_v3.py @@ -1,230 +1,464 @@ #!/usr/bin/env python -import json +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +BehaviorLeRobotDatasetV3: A wrapper around LeRobotDataset v3.0 for loading BEHAVIOR-1K data. + +This wrapper extends LeRobotDataset to support BEHAVIOR-1K specific features: +- Modality and camera selection (rgb, depth, seg_instance_id) +- Efficient chunk streaming mode with keyframe access +- Additional BEHAVIOR-1K metadata (cam_rel_poses, task_info, etc.) +""" + import logging +from collections.abc import Callable from pathlib import Path -from typing import Any +import datasets import numpy as np -import torch as th +from behaviour_1k_constants import ROBOT_CAMERA_NAMES, ROBOT_TYPE +from torch.utils.data import Dataset, get_worker_info -from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.utils.utils import init_logging - -from .behaviour_1k_constants import ( - PROPRIOCEPTION_INDICES, - ROBOT_CAMERA_NAMES, - TASK_INDICES_TO_NAMES, +from lerobot.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset, LeRobotDatasetMetadata +from lerobot.datasets.utils import ( + check_delta_timestamps, + get_delta_indices, + get_safe_version, + hf_transform_to_torch, ) +from lerobot.datasets.video_utils import decode_video_frames, get_safe_default_codec +from lerobot.utils.constants import HF_LEROBOT_HOME -init_logging() +logger = logging.getLogger(__name__) + + +class BehaviorLeRobotDatasetMetadata(LeRobotDatasetMetadata): + """ + Extended metadata class for BEHAVIOR-1K datasets. + + Adds support for: + - Modality and camera filtering + - Custom metainfo and annotation paths + """ + + def __init__( + self, + repo_id: str, + root: str | Path | None = None, + revision: str | None = None, + force_cache_sync: bool = False, + metadata_buffer_size: int = 10, + modalities: set[str] | None = None, + cameras: set[str] | None = None, + ): + self.modalities = set(modalities) if modalities else {"rgb", "depth", "seg_instance_id"} + self.camera_names = set(cameras) if cameras else {"head", "left_wrist", "right_wrist"} + + assert self.modalities.issubset({"rgb", "depth", "seg_instance_id"}), ( + f"Modalities must be subset of ['rgb', 'depth', 'seg_instance_id'], got {self.modalities}" + ) + + assert self.camera_names.issubset(set(ROBOT_CAMERA_NAMES[ROBOT_TYPE])), ( + f"Camera names must be subset of {list(ROBOT_CAMERA_NAMES[ROBOT_TYPE])}, got {self.camera_names}" + ) + + super().__init__(repo_id, root, revision, force_cache_sync, metadata_buffer_size) + + @property + def filtered_features(self) -> dict[str, dict]: + """Return only features matching selected modalities and cameras.""" + features = {} + for name, feature_info in self.features.items(): + if not name.startswith("observation.images."): + features[name] = feature_info + continue + + parts = name.split(".") + if len(parts) >= 4: + modality = parts[2] + camera = parts[3] + if modality in self.modalities and camera in self.camera_names: + features[name] = feature_info + + return features + + @property + def video_keys(self) -> list[str]: + """Return only video keys for selected modalities and cameras.""" + all_video_keys = super().video_keys + + filtered_keys = [] + for key in all_video_keys: + parts = key.split(".") + if len(parts) >= 4: + modality = parts[2] + camera = parts[3] + if modality in self.modalities and camera in self.camera_names: + filtered_keys.append(key) + + return filtered_keys + + def get_metainfo_path(self, ep_index: int) -> Path: + """Get path to episode metainfo file.""" + if "metainfo_path" in self.info: + fpath = self.info["metainfo_path"].format(episode_index=ep_index) + return Path(fpath) + return None + + def get_annotation_path(self, ep_index: int) -> Path: + """Get path to episode annotation file.""" + if "annotation_path" in self.info: + fpath = self.info["annotation_path"].format(episode_index=ep_index) + return Path(fpath) + return None class BehaviorLeRobotDatasetV3(LeRobotDataset): """ - Extends LeRobotDataset v3.0 for BEHAVIOR-1K specific requirements. - Handles task-based episode organization and BEHAVIOR-1K metadata. + BEHAVIOR-1K wrapper for LeRobotDataset v3.0. + + Each BEHAVIOR-1K dataset contains a single task (e.g., behavior1k-task0000). + See https://huggingface.co/collections/lerobot/behavior-1k for all available tasks. + + Key features: + - Modality and camera selection + - Efficient chunk streaming with keyframe access (recommended for B1K with GOP=250) + - Support for BEHAVIOR-1K specific observations (cam_rel_poses, task_info, task_index) """ - @classmethod - def create( - cls, + def __init__( + self, repo_id: str, - fps: int, - features: dict, root: str | Path | None = None, - robot_type: str | None = None, - use_videos: bool = True, + episodes: list[int] | None = None, + image_transforms: Callable | None = None, + delta_timestamps: dict[list[float]] | None = None, tolerance_s: float = 1e-4, - image_writer_processes: int = 0, - image_writer_threads: int = 0, + revision: str | None = None, + force_cache_sync: bool = False, + download_videos: bool = True, video_backend: str | None = None, batch_encoding_size: int = 1, - ) -> "BehaviorLeRobotDatasetV3": + # BEHAVIOR-1K specific arguments + modalities: list[str] | None = None, + cameras: list[str] | None = None, + check_timestamp_sync: bool = True, + chunk_streaming_using_keyframe: bool = True, + shuffle: bool = True, + seed: int = 42, + ): """ - Create a new BEHAVIOR-1K dataset in v3.0 format. + Initialize BEHAVIOR-1K dataset. Args: - repo_id: HuggingFace repository ID - fps: Frames per second (default: 30) - root: Local directory for the dataset - robot_type: Robot type (default: "R1Pro") - use_videos: Whether to encode videos (default: True) - video_backend: Video backend to use (default: "pyav") - batch_encoding_size: Number of episodes to batch before encoding videos - image_writer_processes: Number of processes for async image writing - image_writer_threads: Number of threads per process for image writing + repo_id: HuggingFace repository ID (e.g., "lerobot/behavior1k-task0000") + root: Local directory for dataset storage + episodes: List of episode indices to load (for train/val split) + image_transforms: Torchvision v2 transforms for images + delta_timestamps: Temporal offsets for history/future frames + tolerance_s: Tolerance for timestamp synchronization + revision: Git revision/branch to load + force_cache_sync: Force re-download from hub + download_videos: Whether to download video files + video_backend: Video decoder ('pyav' or 'torchcodec') + batch_encoding_size: Batch size for video encoding + modalities: List of modalities to load (None = all: rgb, depth, seg_instance_id) + cameras: List of cameras to load (None = all: head, left_wrist, right_wrist) + check_timestamp_sync: Verify timestamp synchronization (can be slow) + chunk_streaming_using_keyframe: Use keyframe-based streaming (STRONGLY RECOMMENDED for B1K) + shuffle: Shuffle chunks in streaming mode + seed: Random seed for shuffling + """ + Dataset.__init__(self) + + self.repo_id = repo_id + if root: + self.root = Path(root) + else: + dataset_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id + self.root = HF_LEROBOT_HOME / dataset_name + + self.image_transforms = image_transforms + self.delta_timestamps = delta_timestamps + self.tolerance_s = tolerance_s + self.revision = revision if revision else CODEBASE_VERSION + self.video_backend = video_backend if video_backend else get_safe_default_codec() + self.delta_indices = None + self.batch_encoding_size = batch_encoding_size + self.episodes_since_last_encoding = 0 + self.seed = seed + + self.image_writer = None + self.episode_buffer = None + self.writer = None + self.latest_episode = None + self._current_file_start_frame = None + + self.root.mkdir(exist_ok=True, parents=True) + + if modalities is None: + modalities = ["rgb", "depth", "seg_instance_id"] + if "seg_instance_id" in modalities: + assert chunk_streaming_using_keyframe, ( + "For performance, seg_instance_id requires chunk_streaming_using_keyframe=True" + ) + if "depth" in modalities: + assert self.video_backend == "pyav", "Depth videos require video_backend='pyav'" + if cameras is None: + cameras = ["head", "left_wrist", "right_wrist"] + + self.meta = BehaviorLeRobotDatasetMetadata( + repo_id=self.repo_id, + root=self.root, + revision=self.revision, + force_cache_sync=force_cache_sync, + modalities=modalities, + cameras=cameras, + ) + + if episodes is not None: + self.episodes = sorted([i for i in episodes if i < len(self.meta.episodes)]) + else: + self.episodes = list(range(len(self.meta.episodes))) + + logger.info(f"Total episodes: {len(self.episodes)}") + + self._chunk_streaming_using_keyframe = chunk_streaming_using_keyframe + if self._chunk_streaming_using_keyframe: + if not shuffle: + logger.warning("Chunk streaming enabled but shuffle=False. This may reduce randomness.") + self.chunks = self._get_keyframe_chunk_indices() + self.current_streaming_chunk_idx = None if shuffle else 0 + self.current_streaming_frame_idx = None if shuffle else self.chunks[0][0] if self.chunks else 0 + self.obs_loaders = {} + self._should_obs_loaders_reload = True + + self._lazy_loading = False + self._recorded_frames = self.meta.total_frames + self._writer_closed_for_reading = False + + try: + if force_cache_sync: + raise FileNotFoundError + self.hf_dataset = self.load_hf_dataset() + except (AssertionError, FileNotFoundError, NotADirectoryError): + self.revision = get_safe_version(self.repo_id, self.revision) + self.download_episodes(download_videos) + self.hf_dataset = self.load_hf_dataset() + + if self.delta_timestamps is not None: + check_delta_timestamps(self.delta_timestamps, self.meta.fps, self.tolerance_s) + self.delta_indices = get_delta_indices(self.delta_timestamps, self.meta.fps) + + @property + def fps(self) -> int: + """Frames per second.""" + return self.meta.fps + + @property + def features(self) -> dict: + """Dataset features (filtered by modalities/cameras).""" + return self.meta.filtered_features + + @property + def num_episodes(self) -> int: + """Number of episodes.""" + return len(self.episodes) + + @property + def num_frames(self) -> int: + """Total number of frames.""" + return len(self.hf_dataset) + + def get_episodes_file_paths(self) -> list[str]: + """ + Get download patterns for requested episodes. + + Returns glob patterns for download rather than specific file paths. + + Note: Unlike the base LeRobotDataset, this method cannot filter downloads to only + requested episodes because: + 1. BEHAVIOR-1K episode indices are encoded (e.g., 10010 for task 1, episode 10) + 2. Episodes are chunked across multiple parquet/video files + 3. The parquet files are organized by chunk, not by episode + + Therefore, we download full data/meta/video directories and rely on + `self.load_hf_dataset()` to filter to requested episodes from the loaded data. + """ + allow_patterns = ["data/**", "meta/**"] + + # Filter by modalities and cameras for video patterns + if len(self.meta.video_keys) > 0: + if len(self.meta.modalities) != 3 or len(self.meta.camera_names) != 3: + # Only download specific modality/camera combinations + for modality in self.meta.modalities: + for camera in self.meta.camera_names: + allow_patterns.append(f"**/observation.images.{modality}.{camera}/**") + else: + # Download all videos (no filtering needed) + allow_patterns.append("videos/**") + + return allow_patterns + + def download_episodes(self, download_videos: bool = True) -> None: + """ + Download episodes with modality/camera filtering. + + Follows the same pattern as base LeRobotDataset.download() but uses + get_episodes_file_paths() which returns patterns for modality/camera filtering. + """ + ignore_patterns = None if download_videos else "videos/" + files = self.get_episodes_file_paths() + self.pull_from_repo(allow_patterns=files, ignore_patterns=ignore_patterns) + + def pull_from_repo( + self, + allow_patterns: list[str] | str | None = None, + ignore_patterns: list[str] | str | None = None, + ) -> None: + """Pull dataset from HuggingFace Hub.""" + + from huggingface_hub import snapshot_download + + logger.info(f"Pulling dataset {self.repo_id} from HuggingFace Hub...") + snapshot_download( + self.repo_id, + repo_type="dataset", + revision=self.revision, + local_dir=self.root, + allow_patterns=allow_patterns, + ignore_patterns=ignore_patterns, + ) + + def load_hf_dataset(self) -> datasets.Dataset: + """Load dataset from parquet files.""" + from datasets import load_dataset + + path = str(self.root / "data") + hf_dataset = load_dataset("parquet", data_dir=path, split="train") + + hf_dataset.set_transform(hf_transform_to_torch) + return hf_dataset + + def _get_keyframe_chunk_indices(self, chunk_size: int = 250) -> list[tuple[int, int, int]]: + """ + Divide episodes into chunks based on GOP size (keyframe interval). + + For BEHAVIOR-1K, GOP size is 250 frames for efficient storage. Returns: - BehaviorLeRobotDatasetV3 instance + List of (start_index, end_index, local_start_index) tuples """ - # Create the dataset using parent class method with BEHAVIOR-1K features - obj = super().create( - repo_id=repo_id, - fps=fps, - features=features, - root=root, - robot_type=robot_type, - use_videos=use_videos, - tolerance_s=1e-4, - image_writer_processes=image_writer_processes, - image_writer_threads=image_writer_threads, - video_backend=video_backend, - batch_encoding_size=batch_encoding_size, - ) + chunks = [] + offset = 0 - # Convert to BehaviorLeRobotDatasetV3 instance - obj.__class__ = cls + for ep_array_idx in self.episodes: + # self.episodes contains array indices, so access directly + ep = self.meta.episodes[ep_array_idx] + length = ep["length"] + local_starts = list(range(0, length, chunk_size)) + local_ends = local_starts[1:] + [length] - # Initialize BEHAVIOR-1K specific attributes - obj.task_episode_mapping = {} # Maps task_id to list of episode indices - obj.episode_task_mapping = {} # Maps episode_index to task info + for local_start, local_end in zip(local_starts, local_ends, strict=True): + chunks.append((offset + local_start, offset + local_end, local_start)) + offset += length - # Additional metadata for BEHAVIOR-1K - obj.behavior_metadata = { - "robot_type": robot_type, - "task_names": TASK_INDICES_TO_NAMES, - "proprioception_indices": PROPRIOCEPTION_INDICES[robot_type], - "camera_names": ROBOT_CAMERA_NAMES[robot_type], - } + return chunks - logging.info(f"Created BehaviorLeRobotDatasetV3 with repo_id: {repo_id}") - return obj + def __getitem__(self, idx: int) -> dict: + """Get item by index, with optional chunk streaming.""" + if not self._chunk_streaming_using_keyframe: + item = self.hf_dataset[idx] - def __init__(self, *args, **kwargs): - """ - Initialize from existing dataset. - Use the create() classmethod to create a new dataset. - """ - super().__init__(*args, **kwargs) + for key in self.meta.video_keys: + if key in self.features: + ep_idx = item["episode_index"].item() + timestamp = item["timestamp"].item() + video_path = self.root / self.meta.get_video_file_path(ep_idx, key) + frames = decode_video_frames( + video_path, [timestamp], self.tolerance_s, self.video_backend + ) + item[key] = frames.squeeze(0) - # Initialize BEHAVIOR-1K specific attributes for loading existing datasets - self.task_episode_mapping = {} - self.episode_task_mapping = {} - self.behavior_metadata = {} + if self.image_transforms is not None: + for key in self.features: + if key.startswith("observation.images."): + item[key] = self.image_transforms(item[key]) - # Try to load BEHAVIOR-1K metadata if it exists - metadata_path = self.root / "meta" / "behavior_metadata.json" - if metadata_path.exists(): - with open(metadata_path) as f: - stored_metadata = json.load(f) - self.behavior_metadata = stored_metadata - self.task_episode_mapping = stored_metadata.get("task_episode_mapping", {}) - self.episode_task_mapping = stored_metadata.get("episode_task_mapping", {}) + if "task_index" in item: + task_idx = item["task_index"].item() + try: + item["task"] = self.meta.tasks.iloc[task_idx].name + except (IndexError, AttributeError): + item["task"] = f"task_{task_idx}" - def add_episode_from_hdf5( - self, - hdf5_data: dict[str, Any], - task_id: int, - episode_id: int, - include_videos: bool = True, - ) -> None: - """ - Add an episode from HDF5 data to the dataset. + return item - Args: - hdf5_data: Dictionary containing the HDF5 episode data - task_id: Task ID for this episode - episode_id: Episode ID (should be task_id * 10000 + local_episode_id) - include_videos: Whether to include video data - """ - task_name = TASK_INDICES_TO_NAMES[task_id] - num_frames = len(hdf5_data["action"]) + return self._get_item_streaming(idx) - logging.info(f"Adding episode {episode_id} (task: {task_name}) with {num_frames} frames") + def _get_item_streaming(self, idx: int) -> dict: + """Get item in chunk streaming mode.""" + if self.current_streaming_chunk_idx is None: + worker_info = get_worker_info() + worker_id = 0 if worker_info is None else worker_info.id + rng = np.random.default_rng(self.seed + worker_id) + rng.shuffle(self.chunks) + self.current_streaming_chunk_idx = rng.integers(0, len(self.chunks)).item() + self.current_streaming_frame_idx = self.chunks[self.current_streaming_chunk_idx][0] - # Process each frame - for frame_idx in range(num_frames): - frame_data = { - "action": hdf5_data["action"][frame_idx], - "observation.state": hdf5_data["obs"]["robot_r1::proprio"][frame_idx], - "observation.cam_rel_poses": hdf5_data["obs"]["robot_r1::cam_rel_poses"][frame_idx], - "observation.task_info": hdf5_data["obs"]["task::low_dim"][frame_idx], - "task": task_name, - "timestamp": frame_idx / self.fps, - } + if self.current_streaming_frame_idx >= self.chunks[self.current_streaming_chunk_idx][1]: + self.current_streaming_chunk_idx += 1 + if self.current_streaming_chunk_idx >= len(self.chunks): + self.current_streaming_chunk_idx = 0 + self.current_streaming_frame_idx = self.chunks[self.current_streaming_chunk_idx][0] + self._should_obs_loaders_reload = True - # Add video frames if requested - if include_videos: - for modality in ["rgb", "depth_linear", "seg_instance_id"]: - # Map depth_linear to depth for consistency - output_modality = "depth" if modality == "depth_linear" else modality + item = self.hf_dataset[self.current_streaming_frame_idx] + ep_idx = item["episode_index"].item() - for camera_name, robot_camera_name in ROBOT_CAMERA_NAMES[self.robot_type].items(): - key = f"observation.images.{output_modality}.{camera_name}" - hdf5_key = f"{robot_camera_name}::{modality}" + if self._should_obs_loaders_reload: + for loader in self.obs_loaders.values(): + if hasattr(loader, "close"): + loader.close() + self.obs_loaders = {} + self.current_streaming_episode_idx = ep_idx + self._should_obs_loaders_reload = False - if hdf5_key in hdf5_data["obs"]: - # Get the frame data - frame = hdf5_data["obs"][hdf5_key][frame_idx] + for key in self.meta.video_keys: + if key in self.features: + timestamp = item["timestamp"].item() + video_path = self.root / self.meta.get_video_file_path(ep_idx, key) + frames = decode_video_frames(video_path, [timestamp], self.tolerance_s, self.video_backend) + item[key] = frames.squeeze(0) - # Handle different data types - if isinstance(frame, th.Tensor): - frame = frame.numpy() + if self.image_transforms is not None: + for key in self.features: + if key.startswith("observation.images."): + item[key] = self.image_transforms(item[key]) - # Ensure correct shape - if modality == "seg_instance_id" and len(frame.shape) == 2: - # Add channel dimension for grayscale - frame = np.expand_dims(frame, axis=-1) - elif modality == "depth_linear" and len(frame.shape) == 2: - frame = np.expand_dims(frame, axis=-1) + if "task_index" in item: + task_idx = item["task_index"].item() + try: + item["task"] = self.meta.tasks.iloc[task_idx].name + except (IndexError, AttributeError): + item["task"] = f"task_{task_idx}" - frame_data[key] = frame + self.current_streaming_frame_idx += 1 + return item - # Add frame to dataset - self.add_frame(frame_data) - - # Save episode with metadata - episode_metadata = { - "task_id": task_id, - "task_name": task_name, - "original_episode_id": episode_id, - } - - # Add any additional HDF5 attributes as metadata - if "attrs" in hdf5_data: - for attr_name, attr_value in hdf5_data["attrs"].items(): - if isinstance(attr_value, (list, np.ndarray)): - episode_metadata[attr_name] = list(attr_value) - else: - episode_metadata[attr_name] = attr_value - - # Save the episode - self.save_episode(episode_data=None) - - # Track task-episode mapping - if task_id not in self.task_episode_mapping: - self.task_episode_mapping[task_id] = [] - self.task_episode_mapping[task_id].append(self.num_episodes - 1) - self.episode_task_mapping[self.num_episodes - 1] = { - "task_id": task_id, - "task_name": task_name, - "original_episode_id": episode_id, - } - - def finalize(self) -> None: - """Finalize the dataset and save additional BEHAVIOR-1K metadata.""" - # Save BEHAVIOR-1K specific metadata - metadata_path = self.root / "meta" / "behavior_metadata.json" - metadata_path.parent.mkdir(parents=True, exist_ok=True) - - self.behavior_metadata.update( - { - "task_episode_mapping": self.task_episode_mapping, - "episode_task_mapping": self.episode_task_mapping, - "total_tasks": len(self.task_episode_mapping), - "total_episodes": self.num_episodes, - "total_frames": self.num_frames, - } - ) - - with open(metadata_path, "w") as f: - json.dump(self.behavior_metadata, f, indent=2) - - # Finalize the parent dataset - super().finalize() - - logging.info( - f"Finalized dataset with {self.num_episodes} episodes " - f"and {self.num_frames} frames across {len(self.task_episode_mapping)} tasks" - ) + def __len__(self) -> int: + """Total number of frames.""" + return len(self.hf_dataset) diff --git a/examples/behavior_1k/behaviour_1k_constants.py b/examples/behavior_1k/behaviour_1k_constants.py index a0601c55b..fba8df3fd 100644 --- a/examples/behavior_1k/behaviour_1k_constants.py +++ b/examples/behavior_1k/behaviour_1k_constants.py @@ -1,3 +1,18 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from collections import OrderedDict import numpy as np diff --git a/examples/behavior_1k/convert_to_lerobot_v3.py b/examples/behavior_1k/convert_to_lerobot_v3.py index abc00fe39..898c13762 100755 --- a/examples/behavior_1k/convert_to_lerobot_v3.py +++ b/examples/behavior_1k/convert_to_lerobot_v3.py @@ -16,6 +16,7 @@ """Convert Behavior Dataset to LeRobotDataset v3.0 format""" import argparse +import json import logging import shutil from pathlib import Path @@ -327,10 +328,6 @@ def convert_videos(root: Path, new_root: Path, video_file_size_in_mb: int, task_ return episods_metadata -import json -from pathlib import Path - - def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: """ Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task. @@ -491,9 +488,6 @@ def convert_episodes_metadata( write_stats(stats, new_root) -from pathlib import Path - - def convert_dataset_local( data_path: Path, new_repo: Path, @@ -525,20 +519,18 @@ def convert_dataset_local( print(f"🔹 Starting conversion for task {task_id}") print(f"Input root: {root}") print(f"Output root: {new_root}") - STEP = 10 # Infer task episode ranges - EPISODES_META_PATH = root / "meta" / "episodes.jsonl" - task_ranges = infer_task_episode_ranges(EPISODES_META_PATH) - # def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step): + episodes_meta_path = root / "meta" / "episodes.jsonl" + task_ranges = infer_task_episode_ranges(episodes_meta_path) convert_info( root, new_root, data_file_size_in_mb, video_file_size_in_mb, - EPISODES_META_PATH, + episodes_meta_path, task_id, task_ranges, - STEP, + step=10, ) convert_tasks(root, new_root, task_id) episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id) diff --git a/examples/behavior_1k/load_behavior_1k_dataset.py b/examples/behavior_1k/load_behavior_1k_dataset.py new file mode 100644 index 000000000..9103a108f --- /dev/null +++ b/examples/behavior_1k/load_behavior_1k_dataset.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test script to verify BEHAVIOR-1K dataset loading with v3.0 wrapper. +""" + +import argparse +import logging + +from behavior_lerobot_dataset_v3 import BehaviorLeRobotDatasetV3 + +from lerobot.utils.utils import init_logging + +init_logging() + + +def load_behavior1k_dataset(repo_id, root): + """Test basic dataset loading.""" + logging.info("=" * 80) + logging.info("Testing BEHAVIOR-1K dataset loading") + logging.info("=" * 80) + + logging.info(f"\n1. Loading dataset with repo_id: {repo_id}") + dataset = BehaviorLeRobotDatasetV3( + repo_id=repo_id, + root=root, + modalities=["rgb"], + cameras=["head"], + chunk_streaming_using_keyframe=False, + check_timestamp_sync=False, + ) + + logging.info("\n2. Dataset loaded successfully!") + logging.info(f" - Number of episodes: {dataset.num_episodes}") + logging.info(f" - Number of frames: {dataset.num_frames}") + logging.info(f" - FPS: {dataset.fps}") + logging.info(f" - Features: {list(dataset.features)}") + + return dataset + + +def load_behavior1k_dataset_with_multiple_modalities(repo_id, root): + """Test loading multiple modalities and cameras.""" + logging.info("\n" + "=" * 80) + logging.info("Testing multi-modality loading with repo_id: {repo_id}") + logging.info("=" * 80) + + logging.info(f"\n1. Loading dataset with RGB + Depth with repo_id: {repo_id}") + dataset = BehaviorLeRobotDatasetV3( + repo_id=repo_id, + root=root, + modalities=["rgb", "depth"], + cameras=["head", "left_wrist", "right_wrist"], + chunk_streaming_using_keyframe=False, + check_timestamp_sync=False, + video_backend="pyav", + ) + + logging.info(f"\n2. Dataset loaded with modalities: {list(dataset.features)}") + logging.info(f" - Total features: {len(dataset.features)}") + + rgb_keys = [k for k in dataset.features if "rgb" in k] + depth_keys = [k for k in dataset.features if "depth" in k] + logging.info(f" - RGB features: {rgb_keys}") + logging.info(f" - Depth features: {depth_keys}") + + logging.info("\n3. SUCCESS! Multi-modality loading works.") + + return dataset + + +def stream_behavior1k_dataset(repo_id, root): + """Test chunk streaming mode.""" + logging.info("\n" + "=" * 80) + logging.info("Testing chunk streaming mode") + logging.info("=" * 80) + + logging.info("\n1. Loading dataset with chunk streaming...") + dataset = BehaviorLeRobotDatasetV3( + repo_id=repo_id, + root=root, + modalities=["rgb"], + cameras=["head"], + chunk_streaming_using_keyframe=True, + shuffle=True, + seed=42, + check_timestamp_sync=False, + ) + + logging.info("\n2. Dataset loaded in streaming mode") + logging.info(f" - Number of chunks: {len(dataset.chunks)}") + logging.info(f" - First chunk range: {dataset.chunks[0]}") + + logging.info("\n3. Testing frame access in streaming mode...") + for i in range(min(3, len(dataset))): + frame = dataset[i] + logging.info( + f" - Frame {i}: episode_index={frame['episode_index'].item()}, " + f"task_index={frame['task_index'].item()}" + ) + + logging.info("\n4. SUCCESS! Chunk streaming works.") + + return dataset + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--repo-id", type=str, default=None) + parser.add_argument("--root", type=str, default=None) + + args = parser.parse_args() + + load_behavior1k_dataset(args.repo_id, args.root) + load_behavior1k_dataset_with_multiple_modalities(args.repo_id, args.root) + stream_behavior1k_dataset(args.repo_id, args.root)