diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index abca0d821..c51a48831 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -14,7 +14,7 @@ You can contribute in many ways: - **Documentation:** Improve examples, guides, and docstrings. - **Feedback:** Submit tickets related to bugs or desired new features. -If you are unsure where to start, join our [Discord Channel](https://discord.gg/JkrYNdmw). +If you are unsure where to start, join our [Discord Channel](https://discord.gg/q8Dzzpym3f). ## Development Setup diff --git a/docs/source/async.mdx b/docs/source/async.mdx index 1d3e0edbf..3244fc2a3 100644 --- a/docs/source/async.mdx +++ b/docs/source/async.mdx @@ -195,6 +195,7 @@ client_cfg = RobotClientConfig( robot=robot_cfg, server_address="localhost:8080", policy_device="mps", + client_device="cpu", policy_type="smolvla", pretrained_name_or_path="/smolvla_async", chunk_size_threshold=0.5, diff --git a/docs/source/using_dataset_tools.mdx b/docs/source/using_dataset_tools.mdx index 29e16ea0a..9e662604e 100644 --- a/docs/source/using_dataset_tools.mdx +++ b/docs/source/using_dataset_tools.mdx @@ -95,26 +95,26 @@ Convert an image-based dataset to video format, creating a new LeRobotDataset wh # Local-only: Save to a custom output directory (no hub push) lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir /path/to/output/pusht_video # Save with new repo_id (local storage) lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ --new_repo_id lerobot/pusht_video \ - --operation.type convert_to_video + --operation.type convert_image_to_video # Convert and push to Hugging Face Hub lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ --new_repo_id lerobot/pusht_video \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --push_to_hub true # Convert with custom video codec and quality settings lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir outputs/pusht_video \ --operation.vcodec libsvtav1 \ --operation.pix_fmt yuv420p \ @@ -124,16 +124,23 @@ lerobot-edit-dataset \ # Convert only specific episodes lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir outputs/pusht_video \ --operation.episode_indices "[0, 1, 2, 5, 10]" # Convert with multiple workers for parallel processing lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir outputs/pusht_video \ --operation.num_workers 8 + +# For memory-constrained systems, users can now specify limits: +lerobot-edit-dataset \ + --repo_id lerobot/pusht_image \ + --operation.type convert_to_video \ + --operation.max_episodes_per_batch 50 \ + --operation.max_frames_per_batch 10000 ``` **Parameters:** diff --git a/examples/tutorial/async-inf/robot_client.py b/examples/tutorial/async-inf/robot_client.py index eb3751169..db6ead3fe 100644 --- a/examples/tutorial/async-inf/robot_client.py +++ b/examples/tutorial/async-inf/robot_client.py @@ -30,6 +30,7 @@ def main(): robot=robot_cfg, server_address=server_address, policy_device="mps", + client_device="cpu", policy_type="act", pretrained_name_or_path="/robot_learning_tutorial_act", chunk_size_threshold=0.5, # g diff --git a/src/lerobot/async_inference/configs.py b/src/lerobot/async_inference/configs.py index d1768a323..2e3fe576d 100644 --- a/src/lerobot/async_inference/configs.py +++ b/src/lerobot/async_inference/configs.py @@ -126,6 +126,12 @@ class RobotClientConfig: # Device configuration policy_device: str = field(default="cpu", metadata={"help": "Device for policy inference"}) + client_device: str = field( + default="cpu", + metadata={ + "help": "Device to move actions to after receiving from server (e.g., for downstream planners)" + }, + ) # Control behavior configuration chunk_size_threshold: float = field(default=0.5, metadata={"help": "Threshold for chunk size control"}) @@ -161,6 +167,9 @@ class RobotClientConfig: if not self.policy_device: raise ValueError("policy_device cannot be empty") + if not self.client_device: + raise ValueError("client_device cannot be empty") + if self.chunk_size_threshold < 0 or self.chunk_size_threshold > 1: raise ValueError(f"chunk_size_threshold must be between 0 and 1, got {self.chunk_size_threshold}") @@ -184,6 +193,7 @@ class RobotClientConfig: "policy_type": self.policy_type, "pretrained_name_or_path": self.pretrained_name_or_path, "policy_device": self.policy_device, + "client_device": self.client_device, "chunk_size_threshold": self.chunk_size_threshold, "fps": self.fps, "actions_per_chunk": self.actions_per_chunk, diff --git a/src/lerobot/async_inference/constants.py b/src/lerobot/async_inference/constants.py index 081db0504..56910e67f 100644 --- a/src/lerobot/async_inference/constants.py +++ b/src/lerobot/async_inference/constants.py @@ -23,7 +23,7 @@ DEFAULT_INFERENCE_LATENCY = 1 / DEFAULT_FPS DEFAULT_OBS_QUEUE_TIMEOUT = 2 # All action chunking policies -SUPPORTED_POLICIES = ["act", "smolvla", "diffusion", "tdmpc", "vqbet", "pi0", "pi05"] +SUPPORTED_POLICIES = ["act", "smolvla", "diffusion", "tdmpc", "vqbet", "pi0", "pi05", "groot"] # TODO: Add all other robots SUPPORTED_ROBOTS = ["so100_follower", "so101_follower", "bi_so_follower", "omx_follower"] diff --git a/src/lerobot/async_inference/helpers.py b/src/lerobot/async_inference/helpers.py index 2158f51ac..8b12920d9 100644 --- a/src/lerobot/async_inference/helpers.py +++ b/src/lerobot/async_inference/helpers.py @@ -18,6 +18,7 @@ import os import time from dataclasses import dataclass, field from pathlib import Path +from typing import Any import torch @@ -39,8 +40,8 @@ from lerobot.utils.utils import init_logging Action = torch.Tensor -# observation as received from the robot -RawObservation = dict[str, torch.Tensor] +# observation as received from the robot (can be numpy arrays, floats, etc.) +RawObservation = dict[str, Any] # observation as those recorded in LeRobot dataset (keys are different) LeRobotObservation = dict[str, torch.Tensor] diff --git a/src/lerobot/async_inference/policy_server.py b/src/lerobot/async_inference/policy_server.py index ab2e6bcd8..aedce2a74 100644 --- a/src/lerobot/async_inference/policy_server.py +++ b/src/lerobot/async_inference/policy_server.py @@ -381,6 +381,8 @@ class PolicyServer(services_pb2_grpc.AsyncInferenceServicer): action_tensor = torch.stack(processed_actions, dim=1).squeeze(0) self.logger.debug(f"Postprocessed action shape: {action_tensor.shape}") + action_tensor = action_tensor.detach().cpu() + """5. Convert to TimedAction list""" action_chunk = self._time_action_chunk( observation_t.get_timestamp(), list(action_tensor), observation_t.get_timestep() diff --git a/src/lerobot/async_inference/robot_client.py b/src/lerobot/async_inference/robot_client.py index f26639dc1..e4d21652a 100644 --- a/src/lerobot/async_inference/robot_client.py +++ b/src/lerobot/async_inference/robot_client.py @@ -25,6 +25,7 @@ python src/lerobot/async_inference/robot_client.py \ --policy_type=act \ --pretrained_name_or_path=user/model \ --policy_device=mps \ + --client_device=cpu \ --actions_per_chunk=50 \ --chunk_size_threshold=0.5 \ --aggregate_fn_name=weighted_average \ @@ -40,6 +41,7 @@ from collections.abc import Callable from dataclasses import asdict from pprint import pformat from queue import Queue +from typing import Any import draccus import grpc @@ -47,7 +49,6 @@ import torch from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401 from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401 -from lerobot.processor import RobotAction from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, @@ -285,6 +286,21 @@ class RobotClient: timed_actions = pickle.loads(actions_chunk.data) # nosec deserialize_time = time.perf_counter() - deserialize_start + # Log device type of received actions + if len(timed_actions) > 0: + received_device = timed_actions[0].get_action().device.type + self.logger.debug(f"Received actions on device: {received_device}") + + # Move actions to client_device (e.g., for downstream planners that need GPU) + client_device = self.config.client_device + if client_device != "cpu": + for timed_action in timed_actions: + if timed_action.get_action().device.type != client_device: + timed_action.action = timed_action.get_action().to(client_device) + self.logger.debug(f"Converted actions to device: {client_device}") + else: + self.logger.debug(f"Actions kept on device: {client_device}") + self.action_chunk_size = max(self.action_chunk_size, len(timed_actions)) # Calculate network latency if we have matching observations @@ -351,7 +367,7 @@ class RobotClient: action = {key: action_tensor[i].item() for i, key in enumerate(self.robot.action_features)} return action - def control_loop_action(self, verbose: bool = False) -> RobotAction: + def control_loop_action(self, verbose: bool = False) -> dict[str, Any]: """Reading and performing actions in local queue""" # Lock only for queue operations diff --git a/src/lerobot/datasets/aggregate.py b/src/lerobot/datasets/aggregate.py index 455caf0fe..94ffe602e 100644 --- a/src/lerobot/datasets/aggregate.py +++ b/src/lerobot/datasets/aggregate.py @@ -19,6 +19,7 @@ import logging import shutil from pathlib import Path +import datasets import pandas as pd import tqdm @@ -32,6 +33,7 @@ from lerobot.datasets.utils import ( DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_VIDEO_PATH, get_file_size_in_mb, + get_hf_features_from_features, get_parquet_file_size_in_mb, to_parquet_with_hf_images, update_chunk_file_indices, @@ -402,12 +404,21 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si } unique_chunk_file_ids = sorted(unique_chunk_file_ids) + contains_images = len(dst_meta.image_keys) > 0 + + # retrieve features schema for proper image typing in parquet + hf_features = get_hf_features_from_features(dst_meta.features) if contains_images else None for src_chunk_idx, src_file_idx in unique_chunk_file_ids: src_path = src_meta.root / DEFAULT_DATA_PATH.format( chunk_index=src_chunk_idx, file_index=src_file_idx ) - df = pd.read_parquet(src_path) + if contains_images: + # Use HuggingFace datasets to read source data to preserve image format + src_ds = datasets.Dataset.from_parquet(str(src_path)) + df = src_ds.to_pandas() + else: + df = pd.read_parquet(src_path) df = update_data_df(df, src_meta, dst_meta) data_idx = append_or_create_parquet_file( @@ -417,8 +428,9 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si data_files_size_in_mb, chunk_size, DEFAULT_DATA_PATH, - contains_images=len(dst_meta.image_keys) > 0, + contains_images=contains_images, aggr_root=dst_meta.root, + hf_features=hf_features, ) return data_idx @@ -488,6 +500,7 @@ def append_or_create_parquet_file( default_path: str, contains_images: bool = False, aggr_root: Path = None, + hf_features: datasets.Features | None = None, ): """Appends data to an existing parquet file or creates a new one based on size constraints. @@ -503,6 +516,7 @@ def append_or_create_parquet_file( default_path: Format string for generating file paths. contains_images: Whether the data contains images requiring special handling. aggr_root: Root path for the aggregated dataset. + hf_features: Optional HuggingFace Features schema for proper image typing. Returns: dict: Updated index dictionary with current chunk and file indices. @@ -512,7 +526,7 @@ def append_or_create_parquet_file( if not dst_path.exists(): dst_path.parent.mkdir(parents=True, exist_ok=True) if contains_images: - to_parquet_with_hf_images(df, dst_path) + to_parquet_with_hf_images(df, dst_path, features=hf_features) else: df.to_parquet(dst_path) return idx @@ -527,12 +541,17 @@ def append_or_create_parquet_file( final_df = df target_path = new_path else: - existing_df = pd.read_parquet(dst_path) + if contains_images: + # Use HuggingFace datasets to read existing data to preserve image format + existing_ds = datasets.Dataset.from_parquet(str(dst_path)) + existing_df = existing_ds.to_pandas() + else: + existing_df = pd.read_parquet(dst_path) final_df = pd.concat([existing_df, df], ignore_index=True) target_path = dst_path if contains_images: - to_parquet_with_hf_images(final_df, target_path) + to_parquet_with_hf_images(final_df, target_path, features=hf_features) else: final_df.to_parquet(target_path) diff --git a/src/lerobot/datasets/dataset_tools.py b/src/lerobot/datasets/dataset_tools.py index 2fb68dca1..e2928e2a6 100644 --- a/src/lerobot/datasets/dataset_tools.py +++ b/src/lerobot/datasets/dataset_tools.py @@ -26,6 +26,7 @@ This module provides utilities for: import logging import shutil from collections.abc import Callable +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import datasets @@ -51,7 +52,8 @@ from lerobot.datasets.utils import ( write_stats, write_tasks, ) -from lerobot.utils.constants import HF_LEROBOT_HOME +from lerobot.datasets.video_utils import encode_video_frames, get_video_info +from lerobot.utils.constants import HF_LEROBOT_HOME, OBS_IMAGE def _load_episode_with_stats(src_dataset: LeRobotDataset, episode_idx: int) -> dict: @@ -1083,3 +1085,561 @@ def _copy_episodes_metadata_and_stats( else: if src_dataset.meta.stats: write_stats(src_dataset.meta.stats, dst_meta.root) + + +def _save_episode_images_for_video( + dataset: LeRobotDataset, + imgs_dir: Path, + img_key: str, + episode_index: int, + num_workers: int = 4, +) -> None: + """Save images from a specific episode and camera to disk for video encoding. + + Args: + dataset: The LeRobot dataset to extract images from + imgs_dir: Directory to save images to + img_key: The image key (camera) to extract + episode_index: Index of the episode to save + num_workers: Number of threads for parallel image saving + """ + # Create directory + imgs_dir.mkdir(parents=True, exist_ok=True) + + # Get dataset without torch format for PIL image access + hf_dataset = dataset.hf_dataset.with_format(None) + + # Select only this camera's images + imgs_dataset = hf_dataset.select_columns(img_key) + + # Get episode start and end indices + from_idx = dataset.meta.episodes["dataset_from_index"][episode_index] + to_idx = dataset.meta.episodes["dataset_to_index"][episode_index] + + # Get all items for this episode + episode_dataset = imgs_dataset.select(range(from_idx, to_idx)) + + # Define function to save a single image + def save_single_image(i_item_tuple): + i, item = i_item_tuple + img = item[img_key] + # Use frame-XXXXXX.png format to match encode_video_frames expectations + img.save(str(imgs_dir / f"frame-{i:06d}.png"), quality=100) + return i + + # Save images with proper naming convention for encode_video_frames (frame-XXXXXX.png) + items = list(enumerate(episode_dataset)) + + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(save_single_image, item) for item in items] + for future in as_completed(futures): + future.result() # This will raise any exceptions that occurred + + +def _save_batch_episodes_images( + dataset: LeRobotDataset, + imgs_dir: Path, + img_key: str, + episode_indices: list[int], + num_workers: int = 4, +) -> list[float]: + """Save images from multiple episodes to disk for batch video encoding. + + Args: + dataset: The LeRobot dataset to extract images from + imgs_dir: Directory to save images to + img_key: The image key (camera) to extract + episode_indices: List of episode indices to save + num_workers: Number of threads for parallel image saving + + Returns: + List of episode durations in seconds + """ + imgs_dir.mkdir(parents=True, exist_ok=True) + hf_dataset = dataset.hf_dataset.with_format(None) + imgs_dataset = hf_dataset.select_columns(img_key) + + # Define function to save a single image with global frame index + # Defined once outside the loop to avoid repeated closure creation + def save_single_image(i_item_tuple, base_frame_idx, img_key_param): + i, item = i_item_tuple + img = item[img_key_param] + # Use global frame index for naming + img.save(str(imgs_dir / f"frame-{base_frame_idx + i:06d}.png"), quality=100) + return i + + episode_durations = [] + frame_idx = 0 + + for ep_idx in episode_indices: + # Get episode range + from_idx = dataset.meta.episodes["dataset_from_index"][ep_idx] + to_idx = dataset.meta.episodes["dataset_to_index"][ep_idx] + episode_length = to_idx - from_idx + episode_durations.append(episode_length / dataset.fps) + + # Get episode images + episode_dataset = imgs_dataset.select(range(from_idx, to_idx)) + + # Save images + items = list(enumerate(episode_dataset)) + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(save_single_image, item, frame_idx, img_key) for item in items] + for future in as_completed(futures): + future.result() + + frame_idx += episode_length + + return episode_durations + + +def _iter_episode_batches( + episode_indices: list[int], + episode_lengths: dict[int, int], + size_per_frame_mb: float, + video_file_size_limit: float, + max_episodes: int | None, + max_frames: int | None, +): + """Generator that yields batches of episode indices for video encoding. + + Groups episodes into batches that respect size and memory constraints: + - Stays under video file size limit + - Respects maximum episodes per batch (if specified) + - Respects maximum frames per batch (if specified) + + Args: + episode_indices: List of episode indices to batch + episode_lengths: Dictionary mapping episode index to episode length + size_per_frame_mb: Estimated size per frame in MB + video_file_size_limit: Maximum video file size in MB + max_episodes: Maximum number of episodes per batch (None = no limit) + max_frames: Maximum number of frames per batch (None = no limit) + + Yields: + List of episode indices for each batch + """ + batch_episodes = [] + estimated_size = 0.0 + total_frames = 0 + + for ep_idx in episode_indices: + ep_length = episode_lengths[ep_idx] + ep_estimated_size = ep_length * size_per_frame_mb + + # we check if adding this episode would exceed any constraint + would_exceed_size = estimated_size > 0 and estimated_size + ep_estimated_size >= video_file_size_limit + would_exceed_episodes = max_episodes is not None and len(batch_episodes) >= max_episodes + would_exceed_frames = max_frames is not None and total_frames + ep_length > max_frames + + if batch_episodes and (would_exceed_size or would_exceed_episodes or would_exceed_frames): + # yield current batch before adding this episode + yield batch_episodes + # start a new batch with current episode + batch_episodes = [ep_idx] + estimated_size = ep_estimated_size + total_frames = ep_length + else: + # add to current batch + batch_episodes.append(ep_idx) + estimated_size += ep_estimated_size + total_frames += ep_length + + # yield final batch if not empty + if batch_episodes: + yield batch_episodes + + +def _estimate_frame_size_via_calibration( + dataset: LeRobotDataset, + img_key: str, + episode_indices: list[int], + temp_dir: Path, + fps: int, + vcodec: str, + pix_fmt: str, + g: int, + crf: int, + fast_decode: int, + num_calibration_frames: int = 30, +) -> float: + """Estimate MB per frame by encoding a small calibration sample. + + Encodes a representative sample of frames using the exact codec parameters + to measure actual compression ratio, which is more accurate than heuristics. + + Args: + dataset: Source dataset with images. + img_key: Image key to calibrate (e.g., "observation.images.top"). + episode_indices: List of episode indices being processed. + temp_dir: Temporary directory for calibration files. + fps: Frames per second for video encoding. + vcodec: Video codec (libsvtav1, h264, hevc). + pix_fmt: Pixel format (yuv420p, etc.). + g: GOP size (group of pictures). + crf: Constant Rate Factor (quality). + fast_decode: Fast decode tuning parameter. + num_calibration_frames: Number of frames to use for calibration (default: 30). + + Returns: + Estimated size in MB per frame based on actual encoding. + """ + calibration_dir = temp_dir / "calibration" / img_key + calibration_dir.mkdir(parents=True, exist_ok=True) + + try: + # Select a representative episode (prefer middle episode if available) + calibration_ep_idx = episode_indices[len(episode_indices) // 2] + + # Get episode range + from_idx = dataset.meta.episodes["dataset_from_index"][calibration_ep_idx] + to_idx = dataset.meta.episodes["dataset_to_index"][calibration_ep_idx] + episode_length = to_idx - from_idx + + # Use up to num_calibration_frames from this episode + num_frames = min(num_calibration_frames, episode_length) + + # Get frames from dataset + hf_dataset = dataset.hf_dataset.with_format(None) + sample_indices = range(from_idx, from_idx + num_frames) + + # Save calibration frames + for i, idx in enumerate(sample_indices): + img = hf_dataset[idx][img_key] + img.save(str(calibration_dir / f"frame-{i:06d}.png"), quality=100) + + # Encode calibration video + calibration_video_path = calibration_dir / "calibration.mp4" + encode_video_frames( + imgs_dir=calibration_dir, + video_path=calibration_video_path, + fps=fps, + vcodec=vcodec, + pix_fmt=pix_fmt, + g=g, + crf=crf, + fast_decode=fast_decode, + overwrite=True, + ) + + # Measure actual compressed size + video_size_bytes = calibration_video_path.stat().st_size + video_size_mb = video_size_bytes / BYTES_PER_MIB + size_per_frame_mb = video_size_mb / num_frames + + logging.info( + f" Calibration: {num_frames} frames -> {video_size_mb:.2f} MB " + f"= {size_per_frame_mb:.4f} MB/frame for {img_key}" + ) + + return size_per_frame_mb + + finally: + # Clean up calibration files + if calibration_dir.exists(): + shutil.rmtree(calibration_dir) + + +def _copy_data_without_images( + src_dataset: LeRobotDataset, + dst_meta: LeRobotDatasetMetadata, + episode_indices: list[int], + img_keys: list[str], +) -> None: + """Copy data files without image columns. + + Args: + src_dataset: Source dataset + dst_meta: Destination metadata + episode_indices: Episodes to include + img_keys: Image keys to remove + """ + from lerobot.datasets.utils import DATA_DIR + + data_dir = src_dataset.root / DATA_DIR + parquet_files = sorted(data_dir.glob("*/*.parquet")) + + if not parquet_files: + raise ValueError(f"No parquet files found in {data_dir}") + + episode_set = set(episode_indices) + + for src_path in tqdm(parquet_files, desc="Processing data files"): + df = pd.read_parquet(src_path).reset_index(drop=True) + + # Filter to only include selected episodes + df = df[df["episode_index"].isin(episode_set)].copy() + + if len(df) == 0: + continue + + # Remove image columns + columns_to_drop = [col for col in img_keys if col in df.columns] + if columns_to_drop: + df = df.drop(columns=columns_to_drop) + + # Get chunk and file indices from path + relative_path = src_path.relative_to(src_dataset.root) + chunk_dir = relative_path.parts[1] + file_name = relative_path.parts[2] + chunk_idx = int(chunk_dir.split("-")[1]) + file_idx = int(file_name.split("-")[1].split(".")[0]) + + # Write to destination without pandas index + dst_path = dst_meta.root / f"data/chunk-{chunk_idx:03d}/file-{file_idx:03d}.parquet" + dst_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(dst_path, index=False) + + +# Video conversion constants +BYTES_PER_KIB = 1024 +BYTES_PER_MIB = BYTES_PER_KIB * BYTES_PER_KIB + + +def convert_image_to_video_dataset( + dataset: LeRobotDataset, + output_dir: Path, + repo_id: str | None = None, + vcodec: str = "libsvtav1", + pix_fmt: str = "yuv420p", + g: int = 2, + crf: int = 30, + fast_decode: int = 0, + episode_indices: list[int] | None = None, + num_workers: int = 4, + max_episodes_per_batch: int | None = None, + max_frames_per_batch: int | None = None, +) -> LeRobotDataset: + """Convert image-to-video dataset. + + Creates a new LeRobotDataset with images encoded as videos, following the proper + LeRobot dataset structure with videos stored in chunked MP4 files. + + Args: + dataset: The source LeRobot dataset with images + output_dir: Directory to save the new video dataset + repo_id: Repository ID for the new dataset (default: original_id + "_video") + vcodec: Video codec (default: libsvtav1) + pix_fmt: Pixel format (default: yuv420p) + g: Group of pictures size (default: 2) + crf: Constant rate factor (default: 30) + fast_decode: Fast decode tuning (default: 0) + episode_indices: List of episode indices to convert (None = all episodes) + num_workers: Number of threads for parallel processing (default: 4) + max_episodes_per_batch: Maximum episodes per video batch to avoid memory issues (None = no limit) + max_frames_per_batch: Maximum frames per video batch to avoid memory issues (None = no limit) + + Returns: + New LeRobotDataset with images encoded as videos + """ + # Check that it's an image dataset + if len(dataset.meta.video_keys) > 0: + raise ValueError( + f"This operation is for image datasets only. Video dataset provided: {dataset.repo_id}" + ) + + # Get all image keys + hf_dataset = dataset.hf_dataset.with_format(None) + img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)] + + if len(img_keys) == 0: + raise ValueError(f"No image keys found in dataset {dataset.repo_id}") + + # Determine which episodes to process + if episode_indices is None: + episode_indices = list(range(dataset.meta.total_episodes)) + + if repo_id is None: + repo_id = f"{dataset.repo_id}_video" + + logging.info( + f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras from {dataset.repo_id}" + ) + logging.info(f"Video codec: {vcodec}, pixel format: {pix_fmt}, GOP: {g}, CRF: {crf}") + + # Create new features dict, converting image features to video features + new_features = {} + for key, value in dataset.meta.features.items(): + if key not in img_keys: + new_features[key] = value + else: + # Convert image key to video format + new_features[key] = value.copy() + new_features[key]["dtype"] = "video" # Change dtype from "image" to "video" + # Video info will be updated after episodes are encoded + + # Create new metadata for video dataset + new_meta = LeRobotDatasetMetadata.create( + repo_id=repo_id, + fps=dataset.meta.fps, + features=new_features, + robot_type=dataset.meta.robot_type, + root=output_dir, + use_videos=True, + chunks_size=dataset.meta.chunks_size, + data_files_size_in_mb=dataset.meta.data_files_size_in_mb, + video_files_size_in_mb=dataset.meta.video_files_size_in_mb, + ) + + # Create temporary directory for image extraction + temp_dir = output_dir / "temp_images" + temp_dir.mkdir(parents=True, exist_ok=True) + + # Process all episodes and batch encode videos + # Use dictionary for O(1) episode metadata lookups instead of O(n) linear search + all_episode_metadata = {} + fps = int(dataset.fps) + + try: + # Build episode metadata entries first + logging.info("Building episode metadata...") + cumulative_frame_idx = 0 + for ep_idx in episode_indices: + src_episode = dataset.meta.episodes[ep_idx] + ep_length = src_episode["length"] + ep_meta = { + "episode_index": ep_idx, + "length": ep_length, + "dataset_from_index": cumulative_frame_idx, + "dataset_to_index": cumulative_frame_idx + ep_length, + } + if "data/chunk_index" in src_episode: + ep_meta["data/chunk_index"] = src_episode["data/chunk_index"] + ep_meta["data/file_index"] = src_episode["data/file_index"] + all_episode_metadata[ep_idx] = ep_meta + cumulative_frame_idx += ep_length + + # Process each camera and batch encode multiple episodes together + video_file_size_limit = new_meta.video_files_size_in_mb + + # Pre-compute episode lengths for batching + episode_lengths = {ep_idx: dataset.meta.episodes["length"][ep_idx] for ep_idx in episode_indices} + + for img_key in tqdm(img_keys, desc="Processing cameras"): + # Estimate size per frame by encoding a small calibration sample + # This provides accurate compression ratio for the specific codec parameters + size_per_frame_mb = _estimate_frame_size_via_calibration( + dataset=dataset, + img_key=img_key, + episode_indices=episode_indices, + temp_dir=temp_dir, + fps=fps, + vcodec=vcodec, + pix_fmt=pix_fmt, + g=g, + crf=crf, + fast_decode=fast_decode, + ) + + logging.info(f"Processing camera: {img_key}") + chunk_idx, file_idx = 0, 0 + cumulative_timestamp = 0.0 + + # Process episodes in batches to stay under size limit + for batch_episodes in _iter_episode_batches( + episode_indices=episode_indices, + episode_lengths=episode_lengths, + size_per_frame_mb=size_per_frame_mb, + video_file_size_limit=video_file_size_limit, + max_episodes=max_episodes_per_batch, + max_frames=max_frames_per_batch, + ): + total_frames_in_batch = sum(episode_lengths[idx] for idx in batch_episodes) + logging.info( + f" Encoding batch of {len(batch_episodes)} episodes " + f"({batch_episodes[0]}-{batch_episodes[-1]}) = {total_frames_in_batch} frames" + ) + + # Save images for all episodes in this batch + imgs_dir = temp_dir / f"batch_{chunk_idx}_{file_idx}" / img_key + episode_durations = _save_batch_episodes_images( + dataset=dataset, + imgs_dir=imgs_dir, + img_key=img_key, + episode_indices=batch_episodes, + num_workers=num_workers, + ) + + # Encode all batched episodes into single video + video_path = new_meta.root / new_meta.video_path.format( + video_key=img_key, chunk_index=chunk_idx, file_index=file_idx + ) + video_path.parent.mkdir(parents=True, exist_ok=True) + + encode_video_frames( + imgs_dir=imgs_dir, + video_path=video_path, + fps=fps, + vcodec=vcodec, + pix_fmt=pix_fmt, + g=g, + crf=crf, + fast_decode=fast_decode, + overwrite=True, + ) + + # Clean up temporary images + shutil.rmtree(imgs_dir) + + # Update metadata for each episode in the batch + for ep_idx, duration in zip(batch_episodes, episode_durations, strict=True): + from_timestamp = cumulative_timestamp + to_timestamp = cumulative_timestamp + duration + cumulative_timestamp = to_timestamp + + # Find episode metadata entry and add video metadata (O(1) dictionary lookup) + ep_meta = all_episode_metadata[ep_idx] + ep_meta[f"videos/{img_key}/chunk_index"] = chunk_idx + ep_meta[f"videos/{img_key}/file_index"] = file_idx + ep_meta[f"videos/{img_key}/from_timestamp"] = from_timestamp + ep_meta[f"videos/{img_key}/to_timestamp"] = to_timestamp + + # Move to next video file for next batch + chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, new_meta.chunks_size) + cumulative_timestamp = 0.0 + + # Copy and transform data files (removing image columns) + _copy_data_without_images(dataset, new_meta, episode_indices, img_keys) + + # Save episode metadata + episodes_df = pd.DataFrame(list(all_episode_metadata.values())) + episodes_path = new_meta.root / "meta" / "episodes" / "chunk-000" / "file-000.parquet" + episodes_path.parent.mkdir(parents=True, exist_ok=True) + episodes_df.to_parquet(episodes_path, index=False) + + # Update metadata info + new_meta.info["total_episodes"] = len(episode_indices) + new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata.values()) + new_meta.info["total_tasks"] = dataset.meta.total_tasks + new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"} + + # Update video info for all image keys (now videos) + # We need to manually set video info since update_video_info() checks video_keys first + for img_key in img_keys: + if not new_meta.features[img_key].get("info", None): + video_path = new_meta.root / new_meta.video_path.format( + video_key=img_key, chunk_index=0, file_index=0 + ) + new_meta.info["features"][img_key]["info"] = get_video_info(video_path) + + write_info(new_meta.info, new_meta.root) + + # Copy stats and tasks + if dataset.meta.stats is not None: + # Remove image stats + new_stats = {k: v for k, v in dataset.meta.stats.items() if k not in img_keys} + write_stats(new_stats, new_meta.root) + + if dataset.meta.tasks is not None: + write_tasks(dataset.meta.tasks, new_meta.root) + + finally: + # Clean up temporary directory + if temp_dir.exists(): + shutil.rmtree(temp_dir) + + logging.info(f"Completed converting {dataset.repo_id} to video format") + logging.info(f"New dataset saved to: {output_dir}") + + # Return new dataset + return LeRobotDataset(repo_id=repo_id, root=output_dir) diff --git a/src/lerobot/datasets/utils.py b/src/lerobot/datasets/utils.py index 234736a75..ed678af6e 100644 --- a/src/lerobot/datasets/utils.py +++ b/src/lerobot/datasets/utils.py @@ -1172,12 +1172,21 @@ def validate_episode_buffer(episode_buffer: dict, total_episodes: int, features: ) -def to_parquet_with_hf_images(df: pandas.DataFrame, path: Path) -> None: +def to_parquet_with_hf_images( + df: pandas.DataFrame, path: Path, features: datasets.Features | None = None +) -> None: """This function correctly writes to parquet a panda DataFrame that contains images encoded by HF dataset. This way, it can be loaded by HF dataset and correctly formatted images are returned. + + Args: + df: DataFrame to write to parquet. + path: Path to write the parquet file. + features: Optional HuggingFace Features schema. If provided, ensures image columns + are properly typed as Image() in the parquet schema. """ # TODO(qlhoest): replace this weird synthax by `df.to_parquet(path)` only - datasets.Dataset.from_dict(df.to_dict(orient="list")).to_parquet(path) + ds = datasets.Dataset.from_dict(df.to_dict(orient="list"), features=features) + ds.to_parquet(path) def item_to_torch(item: dict) -> dict: diff --git a/src/lerobot/motors/feetech/tables.py b/src/lerobot/motors/feetech/tables.py index 91e844a72..56500e527 100644 --- a/src/lerobot/motors/feetech/tables.py +++ b/src/lerobot/motors/feetech/tables.py @@ -205,6 +205,7 @@ MODEL_BAUDRATE_TABLE = { # Sign-Magnitude encoding bits STS_SMS_SERIES_ENCODINGS_TABLE = { + "Present_Load": 10, "Homing_Offset": 11, "Goal_Position": 15, "Goal_Velocity": 15, diff --git a/src/lerobot/policies/groot/modeling_groot.py b/src/lerobot/policies/groot/modeling_groot.py index fd9baa9b1..9a479b8f9 100644 --- a/src/lerobot/policies/groot/modeling_groot.py +++ b/src/lerobot/policies/groot/modeling_groot.py @@ -32,16 +32,22 @@ Notes: from LeRobot, see `GrootPolicy.finetune_with_groot_runner` below. """ +import builtins import os from collections import deque +from pathlib import Path +from typing import TypeVar import torch from torch import Tensor +from lerobot.configs.types import FeatureType, PolicyFeature from lerobot.policies.groot.configuration_groot import GrootConfig from lerobot.policies.groot.groot_n1 import GR00TN15 from lerobot.policies.pretrained import PreTrainedPolicy -from lerobot.utils.constants import ACTION +from lerobot.utils.constants import ACTION, OBS_IMAGES + +T = TypeVar("T", bound="GrootPolicy") class GrootPolicy(PreTrainedPolicy): @@ -90,6 +96,129 @@ class GrootPolicy(PreTrainedPolicy): """Reset policy state when environment resets.""" self._action_queue = deque([], maxlen=self.config.n_action_steps) + @classmethod + def from_pretrained( + cls: builtins.type[T], + pretrained_name_or_path: str | Path, + *, + config: GrootConfig | None = None, + force_download: bool = False, + resume_download: bool | None = None, + proxies: dict | None = None, + token: str | bool | None = None, + cache_dir: str | Path | None = None, + local_files_only: bool = False, + revision: str | None = None, + strict: bool = True, + **kwargs, + ) -> T: + """Load Groot policy from pretrained model. + + Handles two cases: + 1. Base GR00T models (e.g., 'nvidia/GR00T-N1.5-3B') - loads the raw model + 2. Fine-tuned LeRobot checkpoints - loads config and weights from safetensors + + Args: + pretrained_name_or_path: Path to the GR00T model or fine-tuned checkpoint + config: Optional GrootConfig. If None, loads from checkpoint or creates default + force_download: Force download even if cached + resume_download: Resume interrupted download + proxies: Proxy settings + token: HuggingFace authentication token + cache_dir: Cache directory path + local_files_only: Only use local files + revision: Specific model revision + strict: Strict state dict loading + **kwargs: Additional arguments (passed to config) + + Returns: + Initialized GrootPolicy instance with loaded model + """ + from huggingface_hub import hf_hub_download + from huggingface_hub.constants import SAFETENSORS_SINGLE_FILE + from huggingface_hub.errors import HfHubHTTPError + + print( + "The Groot policy is a wrapper around Nvidia's GR00T N1.5 model.\n" + f"Loading pretrained model from: {pretrained_name_or_path}" + ) + + model_id = str(pretrained_name_or_path) + is_finetuned_checkpoint = False + + # Check if this is a fine-tuned LeRobot checkpoint (has model.safetensors) + try: + if os.path.isdir(model_id): + is_finetuned_checkpoint = os.path.exists(os.path.join(model_id, SAFETENSORS_SINGLE_FILE)) + else: + # Try to download the safetensors file to check if it exists + try: + hf_hub_download( + repo_id=model_id, + filename=SAFETENSORS_SINGLE_FILE, + revision=revision, + cache_dir=cache_dir, + force_download=False, # Just check, don't force download + proxies=proxies, + token=token, + local_files_only=local_files_only, + ) + is_finetuned_checkpoint = True + except HfHubHTTPError: + is_finetuned_checkpoint = False + except Exception: + is_finetuned_checkpoint = False + + if is_finetuned_checkpoint: + # This is a fine-tuned LeRobot checkpoint - use parent class loading + print("Detected fine-tuned LeRobot checkpoint, loading with state dict...") + return super().from_pretrained( + pretrained_name_or_path=pretrained_name_or_path, + config=config, + force_download=force_download, + resume_download=resume_download, + proxies=proxies, + token=token, + cache_dir=cache_dir, + local_files_only=local_files_only, + revision=revision, + strict=strict, + **kwargs, + ) + + # This is a base GR00T model - load it fresh + print("Detected base GR00T model, loading from HuggingFace...") + + if config is None: + # Create default config with the pretrained path + config = GrootConfig(base_model_path=str(pretrained_name_or_path)) + + # Add minimal visual feature required for validation + # validate_features() will automatically add state and action features + # These are placeholders - actual robot features come from the preprocessor + if not config.input_features: + config.input_features = { + f"{OBS_IMAGES}.camera": PolicyFeature( + type=FeatureType.VISUAL, + shape=(3, 224, 224), # Default image size from config + ), + } + else: + # Override the base_model_path with the provided path + config.base_model_path = str(pretrained_name_or_path) + + # Pass through any additional config overrides from kwargs + for key, value in kwargs.items(): + if hasattr(config, key): + setattr(config, key, value) + + # Create a fresh policy instance - this will automatically load the GR00T model + # in __init__ via _create_groot_model() + policy = cls(config) + + policy.eval() + return policy + def get_optim_params(self) -> dict: return self.parameters() diff --git a/src/lerobot/scripts/lerobot_edit_dataset.py b/src/lerobot/scripts/lerobot_edit_dataset.py index e835b1de6..4ba6ce44f 100644 --- a/src/lerobot/scripts/lerobot_edit_dataset.py +++ b/src/lerobot/scripts/lerobot_edit_dataset.py @@ -66,23 +66,23 @@ Remove camera feature: --operation.type remove_feature \ --operation.feature_names "['observation.images.top']" -Convert image dataset to video format (saves locally): +Convert image dataset to video format and save locally: python -m lerobot.scripts.lerobot_edit_dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir /path/to/output/pusht_video -Convert image dataset and save with new repo_id: +Convert image dataset to video format and save with new repo_id: python -m lerobot.scripts.lerobot_edit_dataset \ --repo_id lerobot/pusht_image \ --new_repo_id lerobot/pusht_video \ - --operation.type convert_to_video + --operation.type convert_image_to_video -Convert and push to hub: +Convert image dataset to video format and push to hub: python -m lerobot.scripts.lerobot_edit_dataset \ --repo_id lerobot/pusht_image \ --new_repo_id lerobot/pusht_video \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --push_to_hub true Using JSON config file: @@ -92,24 +92,19 @@ Using JSON config file: import logging import shutil -from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path -import pandas as pd -from tqdm import tqdm - from lerobot.configs import parser from lerobot.datasets.dataset_tools import ( + convert_image_to_video_dataset, delete_episodes, merge_datasets, remove_feature, split_dataset, ) -from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata -from lerobot.datasets.utils import write_stats, write_tasks -from lerobot.datasets.video_utils import encode_video_frames, get_video_info -from lerobot.utils.constants import HF_LEROBOT_HOME, OBS_IMAGE +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.utils.constants import HF_LEROBOT_HOME from lerobot.utils.utils import init_logging @@ -138,8 +133,8 @@ class RemoveFeatureConfig: @dataclass -class ConvertToVideoConfig: - type: str = "convert_to_video" +class ConvertImageToVideoConfig: + type: str = "convert_image_to_video" output_dir: str | None = None vcodec: str = "libsvtav1" pix_fmt: str = "yuv420p" @@ -148,12 +143,16 @@ class ConvertToVideoConfig: fast_decode: int = 0 episode_indices: list[int] | None = None num_workers: int = 4 + max_episodes_per_batch: int | None = None + max_frames_per_batch: int | None = None @dataclass class EditDatasetConfig: repo_id: str - operation: DeleteEpisodesConfig | SplitConfig | MergeConfig | RemoveFeatureConfig | ConvertToVideoConfig + operation: ( + DeleteEpisodesConfig | SplitConfig | MergeConfig | RemoveFeatureConfig | ConvertImageToVideoConfig + ) root: str | None = None new_repo_id: str | None = None push_to_hub: bool = False @@ -297,362 +296,7 @@ def handle_remove_feature(cfg: EditDatasetConfig) -> None: LeRobotDataset(output_repo_id, root=output_dir).push_to_hub() -def save_episode_images_for_video( - dataset: LeRobotDataset, - imgs_dir: Path, - img_key: str, - episode_index: int, - num_workers: int = 4, -) -> None: - """Save images from a specific episode and camera to disk for video encoding. - - Args: - dataset: The LeRobot dataset to extract images from - imgs_dir: Directory to save images to - img_key: The image key (camera) to extract - episode_index: Index of the episode to save - num_workers: Number of threads for parallel image saving - """ - # Create directory - imgs_dir.mkdir(parents=True, exist_ok=True) - - # Get dataset without torch format for PIL image access - hf_dataset = dataset.hf_dataset.with_format(None) - - # Select only this camera's images - imgs_dataset = hf_dataset.select_columns(img_key) - - # Get episode start and end indices - from_idx = dataset.meta.episodes["dataset_from_index"][episode_index] - to_idx = dataset.meta.episodes["dataset_to_index"][episode_index] - - # Get all items for this episode - episode_dataset = imgs_dataset.select(range(from_idx, to_idx)) - - # Define function to save a single image - def save_single_image(i_item_tuple): - i, item = i_item_tuple - img = item[img_key] - # Use frame-XXXXXX.png format to match encode_video_frames expectations - img.save(str(imgs_dir / f"frame-{i:06d}.png"), quality=100) - return i - - # Save images with proper naming convention for encode_video_frames (frame-XXXXXX.png) - items = list(enumerate(episode_dataset)) - - with ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = [executor.submit(save_single_image, item) for item in items] - for future in as_completed(futures): - future.result() # This will raise any exceptions that occurred - - -def encode_episode_videos( - dataset: LeRobotDataset, - new_meta: LeRobotDatasetMetadata, - episode_index: int, - vcodec: str, - pix_fmt: str, - g: int, - crf: int, - fast_decode: int, - temp_dir: Path, - num_image_workers: int = 4, -) -> dict[str, dict]: - """Encode videos for a single episode and return video metadata. - - Args: - dataset: Source dataset with images - new_meta: Metadata object for the new video dataset - episode_index: Episode index to process - vcodec: Video codec - pix_fmt: Pixel format - g: Group of pictures size - crf: Constant rate factor - fast_decode: Fast decode tuning - temp_dir: Temporary directory for images - num_image_workers: Number of workers for saving images - - Returns: - Dictionary mapping video keys to their metadata (chunk_index, file_index, timestamps) - """ - hf_dataset = dataset.hf_dataset.with_format(None) - img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)] - - video_metadata = {} - fps = int(dataset.fps) # Convert to int for PyAV compatibility - episode_length = dataset.meta.episodes["length"][episode_index] - episode_duration = episode_length / dataset.fps # Use original fps for duration calculation - - for img_key in img_keys: - # Save images temporarily - imgs_dir = temp_dir / f"episode_{episode_index:06d}" / img_key - save_episode_images_for_video(dataset, imgs_dir, img_key, episode_index, num_image_workers) - - # Determine chunk and file indices - # For simplicity, we'll put each episode in its own file - chunk_idx = episode_index // new_meta.chunks_size - file_idx = episode_index % new_meta.chunks_size - - # Create video path in the new dataset structure - video_path = new_meta.root / new_meta.video_path.format( - video_key=img_key, chunk_index=chunk_idx, file_index=file_idx - ) - video_path.parent.mkdir(parents=True, exist_ok=True) - - # Encode video - encode_video_frames( - imgs_dir=imgs_dir, - video_path=video_path, - fps=fps, - vcodec=vcodec, - pix_fmt=pix_fmt, - g=g, - crf=crf, - fast_decode=fast_decode, - overwrite=True, - ) - - # Clean up temporary images - shutil.rmtree(imgs_dir) - - # Store video metadata - video_metadata[img_key] = { - f"videos/{img_key}/chunk_index": chunk_idx, - f"videos/{img_key}/file_index": file_idx, - f"videos/{img_key}/from_timestamp": 0.0, - f"videos/{img_key}/to_timestamp": episode_duration, - } - - return video_metadata - - -def convert_dataset_to_videos( - dataset: LeRobotDataset, - output_dir: Path, - repo_id: str | None = None, - vcodec: str = "libsvtav1", - pix_fmt: str = "yuv420p", - g: int = 2, - crf: int = 30, - fast_decode: int = 0, - episode_indices: list[int] | None = None, - num_workers: int = 4, -) -> LeRobotDataset: - """Convert image-based dataset to video-based dataset. - - Creates a new LeRobotDataset with videos instead of images, following the proper - LeRobot dataset structure with videos stored in chunked MP4 files. - - Args: - dataset: The source LeRobot dataset with images - output_dir: Directory to save the new video dataset - repo_id: Repository ID for the new dataset (default: original_id + "_video") - vcodec: Video codec (default: libsvtav1) - pix_fmt: Pixel format (default: yuv420p) - g: Group of pictures size (default: 2) - crf: Constant rate factor (default: 30) - fast_decode: Fast decode tuning (default: 0) - episode_indices: List of episode indices to convert (None = all episodes) - num_workers: Number of threads for parallel processing (default: 4) - - Returns: - New LeRobotDataset with videos - """ - # Check that it's an image dataset - if len(dataset.meta.video_keys) > 0: - raise ValueError( - f"This operation is for image datasets only. Video dataset provided: {dataset.repo_id}" - ) - - # Get all image keys - hf_dataset = dataset.hf_dataset.with_format(None) - img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)] - - if len(img_keys) == 0: - raise ValueError(f"No image keys found in dataset {dataset.repo_id}") - - # Determine which episodes to process - if episode_indices is None: - episode_indices = list(range(dataset.meta.total_episodes)) - - if repo_id is None: - repo_id = f"{dataset.repo_id}_video" - - logging.info( - f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras from {dataset.repo_id}" - ) - logging.info(f"Video codec: {vcodec}, pixel format: {pix_fmt}, GOP: {g}, CRF: {crf}") - - # Create new features dict, converting image features to video features - new_features = {} - for key, value in dataset.meta.features.items(): - if key not in img_keys: - new_features[key] = value - else: - # Convert image key to video format - new_features[key] = value.copy() - new_features[key]["dtype"] = "video" # Change dtype from "image" to "video" - # Video info will be updated after episodes are encoded - - # Create new metadata for video dataset - new_meta = LeRobotDatasetMetadata.create( - repo_id=repo_id, - fps=dataset.meta.fps, - features=new_features, - robot_type=dataset.meta.robot_type, - root=output_dir, - use_videos=True, - chunks_size=dataset.meta.chunks_size, - data_files_size_in_mb=dataset.meta.data_files_size_in_mb, - video_files_size_in_mb=dataset.meta.video_files_size_in_mb, - ) - - # Create temporary directory for image extraction - temp_dir = output_dir / "temp_images" - temp_dir.mkdir(parents=True, exist_ok=True) - - # Process each episode - all_episode_metadata = [] - - try: - for ep_idx in tqdm(episode_indices, desc="Converting episodes to videos"): - # Get episode metadata from source - src_episode = dataset.meta.episodes[ep_idx] - - # Encode videos for this episode - video_metadata = encode_episode_videos( - dataset=dataset, - new_meta=new_meta, - episode_index=ep_idx, - vcodec=vcodec, - pix_fmt=pix_fmt, - g=g, - crf=crf, - fast_decode=fast_decode, - temp_dir=temp_dir, - num_image_workers=num_workers, - ) - - # Build episode metadata - episode_meta = { - "episode_index": ep_idx, - "length": src_episode["length"], - "dataset_from_index": ep_idx * src_episode["length"], - "dataset_to_index": (ep_idx + 1) * src_episode["length"], - } - - # Add video metadata - for img_key in img_keys: - episode_meta.update(video_metadata[img_key]) - - # Add data chunk/file info (using same structure as source) - if "data/chunk_index" in src_episode: - episode_meta["data/chunk_index"] = src_episode["data/chunk_index"] - episode_meta["data/file_index"] = src_episode["data/file_index"] - - all_episode_metadata.append(episode_meta) - - # Copy and transform data files (removing image columns) - _copy_data_without_images(dataset, new_meta, episode_indices, img_keys) - - # Save episode metadata - episodes_df = pd.DataFrame(all_episode_metadata) - episodes_path = new_meta.root / "meta" / "episodes" / "chunk-000" / "file-000.parquet" - episodes_path.parent.mkdir(parents=True, exist_ok=True) - episodes_df.to_parquet(episodes_path, index=False) - - # Update metadata info - new_meta.info["total_episodes"] = len(episode_indices) - new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata) - new_meta.info["total_tasks"] = dataset.meta.total_tasks - new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"} - - # Update video info for all image keys (now videos) - # We need to manually set video info since update_video_info() checks video_keys first - for img_key in img_keys: - if not new_meta.features[img_key].get("info", None): - video_path = new_meta.root / new_meta.video_path.format( - video_key=img_key, chunk_index=0, file_index=0 - ) - new_meta.info["features"][img_key]["info"] = get_video_info(video_path) - - from lerobot.datasets.utils import write_info - - write_info(new_meta.info, new_meta.root) - - # Copy stats and tasks - if dataset.meta.stats is not None: - # Remove image stats - new_stats = {k: v for k, v in dataset.meta.stats.items() if k not in img_keys} - write_stats(new_stats, new_meta.root) - - if dataset.meta.tasks is not None: - write_tasks(dataset.meta.tasks, new_meta.root) - - finally: - # Clean up temporary directory - if temp_dir.exists(): - shutil.rmtree(temp_dir) - - logging.info(f"✓ Completed converting {dataset.repo_id} to video format") - logging.info(f"New dataset saved to: {output_dir}") - - # Return new dataset - return LeRobotDataset(repo_id=repo_id, root=output_dir) - - -def _copy_data_without_images( - src_dataset: LeRobotDataset, - dst_meta: LeRobotDatasetMetadata, - episode_indices: list[int], - img_keys: list[str], -) -> None: - """Copy data files without image columns. - - Args: - src_dataset: Source dataset - dst_meta: Destination metadata - episode_indices: Episodes to include - img_keys: Image keys to remove - """ - from lerobot.datasets.utils import DATA_DIR - - data_dir = src_dataset.root / DATA_DIR - parquet_files = sorted(data_dir.glob("*/*.parquet")) - - if not parquet_files: - raise ValueError(f"No parquet files found in {data_dir}") - - episode_set = set(episode_indices) - - for src_path in tqdm(parquet_files, desc="Processing data files"): - df = pd.read_parquet(src_path).reset_index(drop=True) - - # Filter to only include selected episodes - df = df[df["episode_index"].isin(episode_set)].copy() - - if len(df) == 0: - continue - - # Remove image columns - columns_to_drop = [col for col in img_keys if col in df.columns] - if columns_to_drop: - df = df.drop(columns=columns_to_drop) - - # Get chunk and file indices from path - relative_path = src_path.relative_to(src_dataset.root) - chunk_dir = relative_path.parts[1] - file_name = relative_path.parts[2] - chunk_idx = int(chunk_dir.split("-")[1]) - file_idx = int(file_name.split("-")[1].split(".")[0]) - - # Write to destination without pandas index - dst_path = dst_meta.root / f"data/chunk-{chunk_idx:03d}/file-{file_idx:03d}.parquet" - dst_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(dst_path, index=False) - - -def handle_convert_to_video(cfg: EditDatasetConfig) -> None: +def handle_convert_image_to_video(cfg: EditDatasetConfig) -> None: # Note: Parser may create any config type with the right fields, so we access fields directly # instead of checking isinstance() dataset = LeRobotDataset(cfg.repo_id, root=cfg.root) @@ -664,8 +308,12 @@ def handle_convert_to_video(cfg: EditDatasetConfig) -> None: if cfg.new_repo_id: # Use new_repo_id for both local storage and hub push output_repo_id = cfg.new_repo_id - output_dir = Path(cfg.root) / cfg.new_repo_id if cfg.root else HF_LEROBOT_HOME / cfg.new_repo_id - logging.info(f"Saving to new dataset: {cfg.new_repo_id}") + # Place new dataset as a sibling to the original dataset + # Get the parent of the actual dataset root (not cfg.root which might be the lerobot cache dir) + # Extract just the dataset name (after last slash) for the local directory + local_dir_name = cfg.new_repo_id.split("/")[-1] + output_dir = dataset.root.parent / local_dir_name + logging.info(f"Saving to new dataset: {cfg.new_repo_id} at {output_dir}") elif output_dir_config: # Use custom output directory for local-only storage output_dir = Path(output_dir_config) @@ -675,12 +323,15 @@ def handle_convert_to_video(cfg: EditDatasetConfig) -> None: else: # Auto-generate name: append "_video" to original repo_id output_repo_id = f"{cfg.repo_id}_video" - output_dir = Path(cfg.root) / output_repo_id if cfg.root else HF_LEROBOT_HOME / output_repo_id + # Place new dataset as a sibling to the original dataset + # Extract just the dataset name (after last slash) for the local directory + local_dir_name = output_repo_id.split("/")[-1] + output_dir = dataset.root.parent / local_dir_name logging.info(f"Saving to auto-generated location: {output_dir}") logging.info(f"Converting dataset {cfg.repo_id} to video format") - new_dataset = convert_dataset_to_videos( + new_dataset = convert_image_to_video_dataset( dataset=dataset, output_dir=output_dir, repo_id=output_repo_id, @@ -691,6 +342,8 @@ def handle_convert_to_video(cfg: EditDatasetConfig) -> None: fast_decode=getattr(cfg.operation, "fast_decode", 0), episode_indices=getattr(cfg.operation, "episode_indices", None), num_workers=getattr(cfg.operation, "num_workers", 4), + max_episodes_per_batch=getattr(cfg.operation, "max_episodes_per_batch", None), + max_frames_per_batch=getattr(cfg.operation, "max_frames_per_batch", None), ) logging.info("Video dataset created successfully!") @@ -718,8 +371,8 @@ def edit_dataset(cfg: EditDatasetConfig) -> None: handle_merge(cfg) elif operation_type == "remove_feature": handle_remove_feature(cfg) - elif operation_type == "convert_to_video": - handle_convert_to_video(cfg) + elif operation_type == "convert_image_to_video": + handle_convert_image_to_video(cfg) else: raise ValueError( f"Unknown operation type: {operation_type}\n" diff --git a/tests/async_inference/test_e2e.py b/tests/async_inference/test_e2e.py index 11941ce32..54ca29b48 100644 --- a/tests/async_inference/test_e2e.py +++ b/tests/async_inference/test_e2e.py @@ -144,12 +144,18 @@ def test_async_inference_e2e(monkeypatch): client = RobotClient(client_config) assert client.start(), "Client failed initial handshake with the server" - # Track action chunks received without modifying RobotClient - action_chunks_received = {"count": 0} + # Track action chunks received and verify device type + action_chunks_received = {"count": 0, "actions_on_cpu": True} original_aggregate = client._aggregate_action_queues def counting_aggregate(*args, **kwargs): action_chunks_received["count"] += 1 + # Check that all received actions are on CPU + if args: + for timed_action in args[0]: # args[0] is the list of TimedAction + action_tensor = timed_action.get_action() + if action_tensor.device.type != "cpu": + action_chunks_received["actions_on_cpu"] = False return original_aggregate(*args, **kwargs) monkeypatch.setattr(client, "_aggregate_action_queues", counting_aggregate) diff --git a/tests/datasets/test_aggregate.py b/tests/datasets/test_aggregate.py index b710a3a4b..031c29d60 100644 --- a/tests/datasets/test_aggregate.py +++ b/tests/datasets/test_aggregate.py @@ -16,6 +16,7 @@ from unittest.mock import patch +import datasets import torch from lerobot.datasets.aggregate import aggregate_datasets @@ -380,3 +381,147 @@ def test_video_timestamps_regression(tmp_path, lerobot_dataset_factory): for key in aggr_ds.meta.video_keys: assert key in item, f"Video key {key} missing from item {i}" assert item[key].shape[0] == 3, f"Expected 3 channels for video key {key}" + + +def assert_image_schema_preserved(aggr_ds): + """Test that HuggingFace Image feature schema is preserved in aggregated parquet files. + + This verifies the fix for a bug where image columns were written with a generic + struct schema {'bytes': Value('binary'), 'path': Value('string')} instead of + the proper Image() feature type, causing HuggingFace Hub viewer to display + raw dict objects instead of image thumbnails. + """ + image_keys = aggr_ds.meta.image_keys + if not image_keys: + return + + # Check that parquet files have proper Image schema + data_dir = aggr_ds.root / "data" + parquet_files = list(data_dir.rglob("*.parquet")) + assert len(parquet_files) > 0, "No parquet files found in aggregated dataset" + + for parquet_file in parquet_files: + # Load with HuggingFace datasets to check schema + ds = datasets.Dataset.from_parquet(str(parquet_file)) + + for image_key in image_keys: + feature = ds.features.get(image_key) + assert feature is not None, f"Image key '{image_key}' not found in parquet schema" + assert isinstance(feature, datasets.Image), ( + f"Image key '{image_key}' should have Image() feature type, " + f"but got {type(feature).__name__}: {feature}. " + "This indicates image schema was not preserved during aggregation." + ) + + +def assert_image_frames_integrity(aggr_ds, ds_0, ds_1): + """Test that image frames are correctly preserved after aggregation.""" + image_keys = aggr_ds.meta.image_keys + if not image_keys: + return + + def images_equal(img1, img2): + return torch.allclose(img1, img2) + + # Test the section corresponding to the first dataset (ds_0) + for i in range(len(ds_0)): + assert aggr_ds[i]["index"] == i, ( + f"Frame index at position {i} should be {i}, but got {aggr_ds[i]['index']}" + ) + for key in image_keys: + assert images_equal(aggr_ds[i][key], ds_0[i][key]), ( + f"Image frames at position {i} should be equal between aggregated and ds_0" + ) + + # Test the section corresponding to the second dataset (ds_1) + for i in range(len(ds_0), len(ds_0) + len(ds_1)): + assert aggr_ds[i]["index"] == i, ( + f"Frame index at position {i} should be {i}, but got {aggr_ds[i]['index']}" + ) + for key in image_keys: + assert images_equal(aggr_ds[i][key], ds_1[i - len(ds_0)][key]), ( + f"Image frames at position {i} should be equal between aggregated and ds_1" + ) + + +def test_aggregate_image_datasets(tmp_path, lerobot_dataset_factory): + """Test aggregation of image-based datasets preserves HuggingFace Image schema. + + This test specifically verifies that: + 1. Image-based datasets can be aggregated correctly + 2. The HuggingFace Image() feature type is preserved in parquet files + 3. Image data integrity is maintained across aggregation + 4. Images can be properly decoded after aggregation + + This catches the bug where to_parquet_with_hf_images() was not passing + the features schema, causing image columns to be written as generic + struct types instead of Image() types. + """ + ds_0_num_frames = 50 + ds_1_num_frames = 75 + ds_0_num_episodes = 2 + ds_1_num_episodes = 3 + + # Create two image-based datasets (use_videos=False) + ds_0 = lerobot_dataset_factory( + root=tmp_path / "image_0", + repo_id=f"{DUMMY_REPO_ID}_image_0", + total_episodes=ds_0_num_episodes, + total_frames=ds_0_num_frames, + use_videos=False, # Image-based dataset + ) + ds_1 = lerobot_dataset_factory( + root=tmp_path / "image_1", + repo_id=f"{DUMMY_REPO_ID}_image_1", + total_episodes=ds_1_num_episodes, + total_frames=ds_1_num_frames, + use_videos=False, # Image-based dataset + ) + + # Verify source datasets have image keys + assert len(ds_0.meta.image_keys) > 0, "ds_0 should have image keys" + assert len(ds_1.meta.image_keys) > 0, "ds_1 should have image keys" + + # Aggregate the datasets + aggregate_datasets( + repo_ids=[ds_0.repo_id, ds_1.repo_id], + roots=[ds_0.root, ds_1.root], + aggr_repo_id=f"{DUMMY_REPO_ID}_image_aggr", + aggr_root=tmp_path / "image_aggr", + ) + + # Load the aggregated dataset + with ( + patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version, + patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download, + ): + mock_get_safe_version.return_value = "v3.0" + mock_snapshot_download.return_value = str(tmp_path / "image_aggr") + aggr_ds = LeRobotDataset(f"{DUMMY_REPO_ID}_image_aggr", root=tmp_path / "image_aggr") + + # Verify aggregated dataset has image keys + assert len(aggr_ds.meta.image_keys) > 0, "Aggregated dataset should have image keys" + assert aggr_ds.meta.image_keys == ds_0.meta.image_keys, "Image keys should match source datasets" + + # Run standard aggregation assertions + expected_total_episodes = ds_0_num_episodes + ds_1_num_episodes + expected_total_frames = ds_0_num_frames + ds_1_num_frames + + assert_episode_and_frame_counts(aggr_ds, expected_total_episodes, expected_total_frames) + assert_dataset_content_integrity(aggr_ds, ds_0, ds_1) + assert_metadata_consistency(aggr_ds, ds_0, ds_1) + assert_episode_indices_updated_correctly(aggr_ds, ds_0, ds_1) + + # Image-specific assertions + assert_image_schema_preserved(aggr_ds) + assert_image_frames_integrity(aggr_ds, ds_0, ds_1) + + # Verify images can be accessed and have correct shape + sample_item = aggr_ds[0] + for image_key in aggr_ds.meta.image_keys: + img = sample_item[image_key] + assert isinstance(img, torch.Tensor), f"Image {image_key} should be a tensor" + assert img.dim() == 3, f"Image {image_key} should have 3 dimensions (C, H, W)" + assert img.shape[0] == 3, f"Image {image_key} should have 3 channels" + + assert_dataset_iteration_works(aggr_ds) diff --git a/tests/datasets/test_dataset_tools.py b/tests/datasets/test_dataset_tools.py index 3a4516fc8..35a369de9 100644 --- a/tests/datasets/test_dataset_tools.py +++ b/tests/datasets/test_dataset_tools.py @@ -29,7 +29,7 @@ from lerobot.datasets.dataset_tools import ( remove_feature, split_dataset, ) -from lerobot.scripts.lerobot_edit_dataset import convert_dataset_to_videos +from lerobot.scripts.lerobot_edit_dataset import convert_image_to_video_dataset @pytest.fixture @@ -1050,7 +1050,7 @@ def test_modify_features_preserves_file_structure(sample_dataset, tmp_path): assert "reward" in modified_dataset.meta.features -def test_convert_dataset_to_videos(tmp_path): +def test_convert_image_to_video_dataset(tmp_path): """Test converting lerobot/pusht_image dataset to video format.""" from lerobot.datasets.lerobot_dataset import LeRobotDataset @@ -1071,7 +1071,7 @@ def test_convert_dataset_to_videos(tmp_path): assert "observation.image" in source_dataset.meta.features # Convert to video dataset (only first 2 episodes for speed) - video_dataset = convert_dataset_to_videos( + video_dataset = convert_image_to_video_dataset( dataset=source_dataset, output_dir=output_dir, repo_id="lerobot/pusht_video", @@ -1113,7 +1113,7 @@ def test_convert_dataset_to_videos(tmp_path): shutil.rmtree(output_dir) -def test_convert_dataset_to_videos_subset_episodes(tmp_path): +def test_convert_image_to_video_dataset_subset_episodes(tmp_path): """Test converting only specific episodes from lerobot/pusht_image to video format.""" from lerobot.datasets.lerobot_dataset import LeRobotDataset @@ -1132,7 +1132,7 @@ def test_convert_dataset_to_videos_subset_episodes(tmp_path): # Convert only episode 0 to video (subset of loaded episodes) episode_indices = [0] - video_dataset = convert_dataset_to_videos( + video_dataset = convert_image_to_video_dataset( dataset=source_dataset, output_dir=output_dir, repo_id="lerobot/pusht_video_subset",