From 79688a09f2e88f830fb6fdf57bbcda4b35acc36f Mon Sep 17 00:00:00 2001 From: Jade Choghari Date: Tue, 20 Jan 2026 11:04:22 +0100 Subject: [PATCH] improve(dataset-tools): image2video editing tools : Multiple episodes per video file (#2811) * improve image2video * add episodes video encoding * fix mypy failing * iterate on review * nit * remove max, and let it be optional * iterate more * update docs * fix test --------- Co-authored-by: Michel Aractingi --- docs/source/using_dataset_tools.mdx | 19 +- src/lerobot/datasets/dataset_tools.py | 562 +++++++++++++++++++- src/lerobot/scripts/lerobot_edit_dataset.py | 411 ++------------ tests/datasets/test_dataset_tools.py | 10 +- 4 files changed, 611 insertions(+), 391 deletions(-) diff --git a/docs/source/using_dataset_tools.mdx b/docs/source/using_dataset_tools.mdx index 29e16ea0a..9e662604e 100644 --- a/docs/source/using_dataset_tools.mdx +++ b/docs/source/using_dataset_tools.mdx @@ -95,26 +95,26 @@ Convert an image-based dataset to video format, creating a new LeRobotDataset wh # Local-only: Save to a custom output directory (no hub push) lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir /path/to/output/pusht_video # Save with new repo_id (local storage) lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ --new_repo_id lerobot/pusht_video \ - --operation.type convert_to_video + --operation.type convert_image_to_video # Convert and push to Hugging Face Hub lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ --new_repo_id lerobot/pusht_video \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --push_to_hub true # Convert with custom video codec and quality settings lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir outputs/pusht_video \ --operation.vcodec libsvtav1 \ --operation.pix_fmt yuv420p \ @@ -124,16 +124,23 @@ lerobot-edit-dataset \ # Convert only specific episodes lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir outputs/pusht_video \ --operation.episode_indices "[0, 1, 2, 5, 10]" # Convert with multiple workers for parallel processing lerobot-edit-dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir outputs/pusht_video \ --operation.num_workers 8 + +# For memory-constrained systems, users can now specify limits: +lerobot-edit-dataset \ + --repo_id lerobot/pusht_image \ + --operation.type convert_to_video \ + --operation.max_episodes_per_batch 50 \ + --operation.max_frames_per_batch 10000 ``` **Parameters:** diff --git a/src/lerobot/datasets/dataset_tools.py b/src/lerobot/datasets/dataset_tools.py index 2fb68dca1..e2928e2a6 100644 --- a/src/lerobot/datasets/dataset_tools.py +++ b/src/lerobot/datasets/dataset_tools.py @@ -26,6 +26,7 @@ This module provides utilities for: import logging import shutil from collections.abc import Callable +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import datasets @@ -51,7 +52,8 @@ from lerobot.datasets.utils import ( write_stats, write_tasks, ) -from lerobot.utils.constants import HF_LEROBOT_HOME +from lerobot.datasets.video_utils import encode_video_frames, get_video_info +from lerobot.utils.constants import HF_LEROBOT_HOME, OBS_IMAGE def _load_episode_with_stats(src_dataset: LeRobotDataset, episode_idx: int) -> dict: @@ -1083,3 +1085,561 @@ def _copy_episodes_metadata_and_stats( else: if src_dataset.meta.stats: write_stats(src_dataset.meta.stats, dst_meta.root) + + +def _save_episode_images_for_video( + dataset: LeRobotDataset, + imgs_dir: Path, + img_key: str, + episode_index: int, + num_workers: int = 4, +) -> None: + """Save images from a specific episode and camera to disk for video encoding. + + Args: + dataset: The LeRobot dataset to extract images from + imgs_dir: Directory to save images to + img_key: The image key (camera) to extract + episode_index: Index of the episode to save + num_workers: Number of threads for parallel image saving + """ + # Create directory + imgs_dir.mkdir(parents=True, exist_ok=True) + + # Get dataset without torch format for PIL image access + hf_dataset = dataset.hf_dataset.with_format(None) + + # Select only this camera's images + imgs_dataset = hf_dataset.select_columns(img_key) + + # Get episode start and end indices + from_idx = dataset.meta.episodes["dataset_from_index"][episode_index] + to_idx = dataset.meta.episodes["dataset_to_index"][episode_index] + + # Get all items for this episode + episode_dataset = imgs_dataset.select(range(from_idx, to_idx)) + + # Define function to save a single image + def save_single_image(i_item_tuple): + i, item = i_item_tuple + img = item[img_key] + # Use frame-XXXXXX.png format to match encode_video_frames expectations + img.save(str(imgs_dir / f"frame-{i:06d}.png"), quality=100) + return i + + # Save images with proper naming convention for encode_video_frames (frame-XXXXXX.png) + items = list(enumerate(episode_dataset)) + + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(save_single_image, item) for item in items] + for future in as_completed(futures): + future.result() # This will raise any exceptions that occurred + + +def _save_batch_episodes_images( + dataset: LeRobotDataset, + imgs_dir: Path, + img_key: str, + episode_indices: list[int], + num_workers: int = 4, +) -> list[float]: + """Save images from multiple episodes to disk for batch video encoding. + + Args: + dataset: The LeRobot dataset to extract images from + imgs_dir: Directory to save images to + img_key: The image key (camera) to extract + episode_indices: List of episode indices to save + num_workers: Number of threads for parallel image saving + + Returns: + List of episode durations in seconds + """ + imgs_dir.mkdir(parents=True, exist_ok=True) + hf_dataset = dataset.hf_dataset.with_format(None) + imgs_dataset = hf_dataset.select_columns(img_key) + + # Define function to save a single image with global frame index + # Defined once outside the loop to avoid repeated closure creation + def save_single_image(i_item_tuple, base_frame_idx, img_key_param): + i, item = i_item_tuple + img = item[img_key_param] + # Use global frame index for naming + img.save(str(imgs_dir / f"frame-{base_frame_idx + i:06d}.png"), quality=100) + return i + + episode_durations = [] + frame_idx = 0 + + for ep_idx in episode_indices: + # Get episode range + from_idx = dataset.meta.episodes["dataset_from_index"][ep_idx] + to_idx = dataset.meta.episodes["dataset_to_index"][ep_idx] + episode_length = to_idx - from_idx + episode_durations.append(episode_length / dataset.fps) + + # Get episode images + episode_dataset = imgs_dataset.select(range(from_idx, to_idx)) + + # Save images + items = list(enumerate(episode_dataset)) + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(save_single_image, item, frame_idx, img_key) for item in items] + for future in as_completed(futures): + future.result() + + frame_idx += episode_length + + return episode_durations + + +def _iter_episode_batches( + episode_indices: list[int], + episode_lengths: dict[int, int], + size_per_frame_mb: float, + video_file_size_limit: float, + max_episodes: int | None, + max_frames: int | None, +): + """Generator that yields batches of episode indices for video encoding. + + Groups episodes into batches that respect size and memory constraints: + - Stays under video file size limit + - Respects maximum episodes per batch (if specified) + - Respects maximum frames per batch (if specified) + + Args: + episode_indices: List of episode indices to batch + episode_lengths: Dictionary mapping episode index to episode length + size_per_frame_mb: Estimated size per frame in MB + video_file_size_limit: Maximum video file size in MB + max_episodes: Maximum number of episodes per batch (None = no limit) + max_frames: Maximum number of frames per batch (None = no limit) + + Yields: + List of episode indices for each batch + """ + batch_episodes = [] + estimated_size = 0.0 + total_frames = 0 + + for ep_idx in episode_indices: + ep_length = episode_lengths[ep_idx] + ep_estimated_size = ep_length * size_per_frame_mb + + # we check if adding this episode would exceed any constraint + would_exceed_size = estimated_size > 0 and estimated_size + ep_estimated_size >= video_file_size_limit + would_exceed_episodes = max_episodes is not None and len(batch_episodes) >= max_episodes + would_exceed_frames = max_frames is not None and total_frames + ep_length > max_frames + + if batch_episodes and (would_exceed_size or would_exceed_episodes or would_exceed_frames): + # yield current batch before adding this episode + yield batch_episodes + # start a new batch with current episode + batch_episodes = [ep_idx] + estimated_size = ep_estimated_size + total_frames = ep_length + else: + # add to current batch + batch_episodes.append(ep_idx) + estimated_size += ep_estimated_size + total_frames += ep_length + + # yield final batch if not empty + if batch_episodes: + yield batch_episodes + + +def _estimate_frame_size_via_calibration( + dataset: LeRobotDataset, + img_key: str, + episode_indices: list[int], + temp_dir: Path, + fps: int, + vcodec: str, + pix_fmt: str, + g: int, + crf: int, + fast_decode: int, + num_calibration_frames: int = 30, +) -> float: + """Estimate MB per frame by encoding a small calibration sample. + + Encodes a representative sample of frames using the exact codec parameters + to measure actual compression ratio, which is more accurate than heuristics. + + Args: + dataset: Source dataset with images. + img_key: Image key to calibrate (e.g., "observation.images.top"). + episode_indices: List of episode indices being processed. + temp_dir: Temporary directory for calibration files. + fps: Frames per second for video encoding. + vcodec: Video codec (libsvtav1, h264, hevc). + pix_fmt: Pixel format (yuv420p, etc.). + g: GOP size (group of pictures). + crf: Constant Rate Factor (quality). + fast_decode: Fast decode tuning parameter. + num_calibration_frames: Number of frames to use for calibration (default: 30). + + Returns: + Estimated size in MB per frame based on actual encoding. + """ + calibration_dir = temp_dir / "calibration" / img_key + calibration_dir.mkdir(parents=True, exist_ok=True) + + try: + # Select a representative episode (prefer middle episode if available) + calibration_ep_idx = episode_indices[len(episode_indices) // 2] + + # Get episode range + from_idx = dataset.meta.episodes["dataset_from_index"][calibration_ep_idx] + to_idx = dataset.meta.episodes["dataset_to_index"][calibration_ep_idx] + episode_length = to_idx - from_idx + + # Use up to num_calibration_frames from this episode + num_frames = min(num_calibration_frames, episode_length) + + # Get frames from dataset + hf_dataset = dataset.hf_dataset.with_format(None) + sample_indices = range(from_idx, from_idx + num_frames) + + # Save calibration frames + for i, idx in enumerate(sample_indices): + img = hf_dataset[idx][img_key] + img.save(str(calibration_dir / f"frame-{i:06d}.png"), quality=100) + + # Encode calibration video + calibration_video_path = calibration_dir / "calibration.mp4" + encode_video_frames( + imgs_dir=calibration_dir, + video_path=calibration_video_path, + fps=fps, + vcodec=vcodec, + pix_fmt=pix_fmt, + g=g, + crf=crf, + fast_decode=fast_decode, + overwrite=True, + ) + + # Measure actual compressed size + video_size_bytes = calibration_video_path.stat().st_size + video_size_mb = video_size_bytes / BYTES_PER_MIB + size_per_frame_mb = video_size_mb / num_frames + + logging.info( + f" Calibration: {num_frames} frames -> {video_size_mb:.2f} MB " + f"= {size_per_frame_mb:.4f} MB/frame for {img_key}" + ) + + return size_per_frame_mb + + finally: + # Clean up calibration files + if calibration_dir.exists(): + shutil.rmtree(calibration_dir) + + +def _copy_data_without_images( + src_dataset: LeRobotDataset, + dst_meta: LeRobotDatasetMetadata, + episode_indices: list[int], + img_keys: list[str], +) -> None: + """Copy data files without image columns. + + Args: + src_dataset: Source dataset + dst_meta: Destination metadata + episode_indices: Episodes to include + img_keys: Image keys to remove + """ + from lerobot.datasets.utils import DATA_DIR + + data_dir = src_dataset.root / DATA_DIR + parquet_files = sorted(data_dir.glob("*/*.parquet")) + + if not parquet_files: + raise ValueError(f"No parquet files found in {data_dir}") + + episode_set = set(episode_indices) + + for src_path in tqdm(parquet_files, desc="Processing data files"): + df = pd.read_parquet(src_path).reset_index(drop=True) + + # Filter to only include selected episodes + df = df[df["episode_index"].isin(episode_set)].copy() + + if len(df) == 0: + continue + + # Remove image columns + columns_to_drop = [col for col in img_keys if col in df.columns] + if columns_to_drop: + df = df.drop(columns=columns_to_drop) + + # Get chunk and file indices from path + relative_path = src_path.relative_to(src_dataset.root) + chunk_dir = relative_path.parts[1] + file_name = relative_path.parts[2] + chunk_idx = int(chunk_dir.split("-")[1]) + file_idx = int(file_name.split("-")[1].split(".")[0]) + + # Write to destination without pandas index + dst_path = dst_meta.root / f"data/chunk-{chunk_idx:03d}/file-{file_idx:03d}.parquet" + dst_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(dst_path, index=False) + + +# Video conversion constants +BYTES_PER_KIB = 1024 +BYTES_PER_MIB = BYTES_PER_KIB * BYTES_PER_KIB + + +def convert_image_to_video_dataset( + dataset: LeRobotDataset, + output_dir: Path, + repo_id: str | None = None, + vcodec: str = "libsvtav1", + pix_fmt: str = "yuv420p", + g: int = 2, + crf: int = 30, + fast_decode: int = 0, + episode_indices: list[int] | None = None, + num_workers: int = 4, + max_episodes_per_batch: int | None = None, + max_frames_per_batch: int | None = None, +) -> LeRobotDataset: + """Convert image-to-video dataset. + + Creates a new LeRobotDataset with images encoded as videos, following the proper + LeRobot dataset structure with videos stored in chunked MP4 files. + + Args: + dataset: The source LeRobot dataset with images + output_dir: Directory to save the new video dataset + repo_id: Repository ID for the new dataset (default: original_id + "_video") + vcodec: Video codec (default: libsvtav1) + pix_fmt: Pixel format (default: yuv420p) + g: Group of pictures size (default: 2) + crf: Constant rate factor (default: 30) + fast_decode: Fast decode tuning (default: 0) + episode_indices: List of episode indices to convert (None = all episodes) + num_workers: Number of threads for parallel processing (default: 4) + max_episodes_per_batch: Maximum episodes per video batch to avoid memory issues (None = no limit) + max_frames_per_batch: Maximum frames per video batch to avoid memory issues (None = no limit) + + Returns: + New LeRobotDataset with images encoded as videos + """ + # Check that it's an image dataset + if len(dataset.meta.video_keys) > 0: + raise ValueError( + f"This operation is for image datasets only. Video dataset provided: {dataset.repo_id}" + ) + + # Get all image keys + hf_dataset = dataset.hf_dataset.with_format(None) + img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)] + + if len(img_keys) == 0: + raise ValueError(f"No image keys found in dataset {dataset.repo_id}") + + # Determine which episodes to process + if episode_indices is None: + episode_indices = list(range(dataset.meta.total_episodes)) + + if repo_id is None: + repo_id = f"{dataset.repo_id}_video" + + logging.info( + f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras from {dataset.repo_id}" + ) + logging.info(f"Video codec: {vcodec}, pixel format: {pix_fmt}, GOP: {g}, CRF: {crf}") + + # Create new features dict, converting image features to video features + new_features = {} + for key, value in dataset.meta.features.items(): + if key not in img_keys: + new_features[key] = value + else: + # Convert image key to video format + new_features[key] = value.copy() + new_features[key]["dtype"] = "video" # Change dtype from "image" to "video" + # Video info will be updated after episodes are encoded + + # Create new metadata for video dataset + new_meta = LeRobotDatasetMetadata.create( + repo_id=repo_id, + fps=dataset.meta.fps, + features=new_features, + robot_type=dataset.meta.robot_type, + root=output_dir, + use_videos=True, + chunks_size=dataset.meta.chunks_size, + data_files_size_in_mb=dataset.meta.data_files_size_in_mb, + video_files_size_in_mb=dataset.meta.video_files_size_in_mb, + ) + + # Create temporary directory for image extraction + temp_dir = output_dir / "temp_images" + temp_dir.mkdir(parents=True, exist_ok=True) + + # Process all episodes and batch encode videos + # Use dictionary for O(1) episode metadata lookups instead of O(n) linear search + all_episode_metadata = {} + fps = int(dataset.fps) + + try: + # Build episode metadata entries first + logging.info("Building episode metadata...") + cumulative_frame_idx = 0 + for ep_idx in episode_indices: + src_episode = dataset.meta.episodes[ep_idx] + ep_length = src_episode["length"] + ep_meta = { + "episode_index": ep_idx, + "length": ep_length, + "dataset_from_index": cumulative_frame_idx, + "dataset_to_index": cumulative_frame_idx + ep_length, + } + if "data/chunk_index" in src_episode: + ep_meta["data/chunk_index"] = src_episode["data/chunk_index"] + ep_meta["data/file_index"] = src_episode["data/file_index"] + all_episode_metadata[ep_idx] = ep_meta + cumulative_frame_idx += ep_length + + # Process each camera and batch encode multiple episodes together + video_file_size_limit = new_meta.video_files_size_in_mb + + # Pre-compute episode lengths for batching + episode_lengths = {ep_idx: dataset.meta.episodes["length"][ep_idx] for ep_idx in episode_indices} + + for img_key in tqdm(img_keys, desc="Processing cameras"): + # Estimate size per frame by encoding a small calibration sample + # This provides accurate compression ratio for the specific codec parameters + size_per_frame_mb = _estimate_frame_size_via_calibration( + dataset=dataset, + img_key=img_key, + episode_indices=episode_indices, + temp_dir=temp_dir, + fps=fps, + vcodec=vcodec, + pix_fmt=pix_fmt, + g=g, + crf=crf, + fast_decode=fast_decode, + ) + + logging.info(f"Processing camera: {img_key}") + chunk_idx, file_idx = 0, 0 + cumulative_timestamp = 0.0 + + # Process episodes in batches to stay under size limit + for batch_episodes in _iter_episode_batches( + episode_indices=episode_indices, + episode_lengths=episode_lengths, + size_per_frame_mb=size_per_frame_mb, + video_file_size_limit=video_file_size_limit, + max_episodes=max_episodes_per_batch, + max_frames=max_frames_per_batch, + ): + total_frames_in_batch = sum(episode_lengths[idx] for idx in batch_episodes) + logging.info( + f" Encoding batch of {len(batch_episodes)} episodes " + f"({batch_episodes[0]}-{batch_episodes[-1]}) = {total_frames_in_batch} frames" + ) + + # Save images for all episodes in this batch + imgs_dir = temp_dir / f"batch_{chunk_idx}_{file_idx}" / img_key + episode_durations = _save_batch_episodes_images( + dataset=dataset, + imgs_dir=imgs_dir, + img_key=img_key, + episode_indices=batch_episodes, + num_workers=num_workers, + ) + + # Encode all batched episodes into single video + video_path = new_meta.root / new_meta.video_path.format( + video_key=img_key, chunk_index=chunk_idx, file_index=file_idx + ) + video_path.parent.mkdir(parents=True, exist_ok=True) + + encode_video_frames( + imgs_dir=imgs_dir, + video_path=video_path, + fps=fps, + vcodec=vcodec, + pix_fmt=pix_fmt, + g=g, + crf=crf, + fast_decode=fast_decode, + overwrite=True, + ) + + # Clean up temporary images + shutil.rmtree(imgs_dir) + + # Update metadata for each episode in the batch + for ep_idx, duration in zip(batch_episodes, episode_durations, strict=True): + from_timestamp = cumulative_timestamp + to_timestamp = cumulative_timestamp + duration + cumulative_timestamp = to_timestamp + + # Find episode metadata entry and add video metadata (O(1) dictionary lookup) + ep_meta = all_episode_metadata[ep_idx] + ep_meta[f"videos/{img_key}/chunk_index"] = chunk_idx + ep_meta[f"videos/{img_key}/file_index"] = file_idx + ep_meta[f"videos/{img_key}/from_timestamp"] = from_timestamp + ep_meta[f"videos/{img_key}/to_timestamp"] = to_timestamp + + # Move to next video file for next batch + chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, new_meta.chunks_size) + cumulative_timestamp = 0.0 + + # Copy and transform data files (removing image columns) + _copy_data_without_images(dataset, new_meta, episode_indices, img_keys) + + # Save episode metadata + episodes_df = pd.DataFrame(list(all_episode_metadata.values())) + episodes_path = new_meta.root / "meta" / "episodes" / "chunk-000" / "file-000.parquet" + episodes_path.parent.mkdir(parents=True, exist_ok=True) + episodes_df.to_parquet(episodes_path, index=False) + + # Update metadata info + new_meta.info["total_episodes"] = len(episode_indices) + new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata.values()) + new_meta.info["total_tasks"] = dataset.meta.total_tasks + new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"} + + # Update video info for all image keys (now videos) + # We need to manually set video info since update_video_info() checks video_keys first + for img_key in img_keys: + if not new_meta.features[img_key].get("info", None): + video_path = new_meta.root / new_meta.video_path.format( + video_key=img_key, chunk_index=0, file_index=0 + ) + new_meta.info["features"][img_key]["info"] = get_video_info(video_path) + + write_info(new_meta.info, new_meta.root) + + # Copy stats and tasks + if dataset.meta.stats is not None: + # Remove image stats + new_stats = {k: v for k, v in dataset.meta.stats.items() if k not in img_keys} + write_stats(new_stats, new_meta.root) + + if dataset.meta.tasks is not None: + write_tasks(dataset.meta.tasks, new_meta.root) + + finally: + # Clean up temporary directory + if temp_dir.exists(): + shutil.rmtree(temp_dir) + + logging.info(f"Completed converting {dataset.repo_id} to video format") + logging.info(f"New dataset saved to: {output_dir}") + + # Return new dataset + return LeRobotDataset(repo_id=repo_id, root=output_dir) diff --git a/src/lerobot/scripts/lerobot_edit_dataset.py b/src/lerobot/scripts/lerobot_edit_dataset.py index e835b1de6..4ba6ce44f 100644 --- a/src/lerobot/scripts/lerobot_edit_dataset.py +++ b/src/lerobot/scripts/lerobot_edit_dataset.py @@ -66,23 +66,23 @@ Remove camera feature: --operation.type remove_feature \ --operation.feature_names "['observation.images.top']" -Convert image dataset to video format (saves locally): +Convert image dataset to video format and save locally: python -m lerobot.scripts.lerobot_edit_dataset \ --repo_id lerobot/pusht_image \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --operation.output_dir /path/to/output/pusht_video -Convert image dataset and save with new repo_id: +Convert image dataset to video format and save with new repo_id: python -m lerobot.scripts.lerobot_edit_dataset \ --repo_id lerobot/pusht_image \ --new_repo_id lerobot/pusht_video \ - --operation.type convert_to_video + --operation.type convert_image_to_video -Convert and push to hub: +Convert image dataset to video format and push to hub: python -m lerobot.scripts.lerobot_edit_dataset \ --repo_id lerobot/pusht_image \ --new_repo_id lerobot/pusht_video \ - --operation.type convert_to_video \ + --operation.type convert_image_to_video \ --push_to_hub true Using JSON config file: @@ -92,24 +92,19 @@ Using JSON config file: import logging import shutil -from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path -import pandas as pd -from tqdm import tqdm - from lerobot.configs import parser from lerobot.datasets.dataset_tools import ( + convert_image_to_video_dataset, delete_episodes, merge_datasets, remove_feature, split_dataset, ) -from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata -from lerobot.datasets.utils import write_stats, write_tasks -from lerobot.datasets.video_utils import encode_video_frames, get_video_info -from lerobot.utils.constants import HF_LEROBOT_HOME, OBS_IMAGE +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.utils.constants import HF_LEROBOT_HOME from lerobot.utils.utils import init_logging @@ -138,8 +133,8 @@ class RemoveFeatureConfig: @dataclass -class ConvertToVideoConfig: - type: str = "convert_to_video" +class ConvertImageToVideoConfig: + type: str = "convert_image_to_video" output_dir: str | None = None vcodec: str = "libsvtav1" pix_fmt: str = "yuv420p" @@ -148,12 +143,16 @@ class ConvertToVideoConfig: fast_decode: int = 0 episode_indices: list[int] | None = None num_workers: int = 4 + max_episodes_per_batch: int | None = None + max_frames_per_batch: int | None = None @dataclass class EditDatasetConfig: repo_id: str - operation: DeleteEpisodesConfig | SplitConfig | MergeConfig | RemoveFeatureConfig | ConvertToVideoConfig + operation: ( + DeleteEpisodesConfig | SplitConfig | MergeConfig | RemoveFeatureConfig | ConvertImageToVideoConfig + ) root: str | None = None new_repo_id: str | None = None push_to_hub: bool = False @@ -297,362 +296,7 @@ def handle_remove_feature(cfg: EditDatasetConfig) -> None: LeRobotDataset(output_repo_id, root=output_dir).push_to_hub() -def save_episode_images_for_video( - dataset: LeRobotDataset, - imgs_dir: Path, - img_key: str, - episode_index: int, - num_workers: int = 4, -) -> None: - """Save images from a specific episode and camera to disk for video encoding. - - Args: - dataset: The LeRobot dataset to extract images from - imgs_dir: Directory to save images to - img_key: The image key (camera) to extract - episode_index: Index of the episode to save - num_workers: Number of threads for parallel image saving - """ - # Create directory - imgs_dir.mkdir(parents=True, exist_ok=True) - - # Get dataset without torch format for PIL image access - hf_dataset = dataset.hf_dataset.with_format(None) - - # Select only this camera's images - imgs_dataset = hf_dataset.select_columns(img_key) - - # Get episode start and end indices - from_idx = dataset.meta.episodes["dataset_from_index"][episode_index] - to_idx = dataset.meta.episodes["dataset_to_index"][episode_index] - - # Get all items for this episode - episode_dataset = imgs_dataset.select(range(from_idx, to_idx)) - - # Define function to save a single image - def save_single_image(i_item_tuple): - i, item = i_item_tuple - img = item[img_key] - # Use frame-XXXXXX.png format to match encode_video_frames expectations - img.save(str(imgs_dir / f"frame-{i:06d}.png"), quality=100) - return i - - # Save images with proper naming convention for encode_video_frames (frame-XXXXXX.png) - items = list(enumerate(episode_dataset)) - - with ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = [executor.submit(save_single_image, item) for item in items] - for future in as_completed(futures): - future.result() # This will raise any exceptions that occurred - - -def encode_episode_videos( - dataset: LeRobotDataset, - new_meta: LeRobotDatasetMetadata, - episode_index: int, - vcodec: str, - pix_fmt: str, - g: int, - crf: int, - fast_decode: int, - temp_dir: Path, - num_image_workers: int = 4, -) -> dict[str, dict]: - """Encode videos for a single episode and return video metadata. - - Args: - dataset: Source dataset with images - new_meta: Metadata object for the new video dataset - episode_index: Episode index to process - vcodec: Video codec - pix_fmt: Pixel format - g: Group of pictures size - crf: Constant rate factor - fast_decode: Fast decode tuning - temp_dir: Temporary directory for images - num_image_workers: Number of workers for saving images - - Returns: - Dictionary mapping video keys to their metadata (chunk_index, file_index, timestamps) - """ - hf_dataset = dataset.hf_dataset.with_format(None) - img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)] - - video_metadata = {} - fps = int(dataset.fps) # Convert to int for PyAV compatibility - episode_length = dataset.meta.episodes["length"][episode_index] - episode_duration = episode_length / dataset.fps # Use original fps for duration calculation - - for img_key in img_keys: - # Save images temporarily - imgs_dir = temp_dir / f"episode_{episode_index:06d}" / img_key - save_episode_images_for_video(dataset, imgs_dir, img_key, episode_index, num_image_workers) - - # Determine chunk and file indices - # For simplicity, we'll put each episode in its own file - chunk_idx = episode_index // new_meta.chunks_size - file_idx = episode_index % new_meta.chunks_size - - # Create video path in the new dataset structure - video_path = new_meta.root / new_meta.video_path.format( - video_key=img_key, chunk_index=chunk_idx, file_index=file_idx - ) - video_path.parent.mkdir(parents=True, exist_ok=True) - - # Encode video - encode_video_frames( - imgs_dir=imgs_dir, - video_path=video_path, - fps=fps, - vcodec=vcodec, - pix_fmt=pix_fmt, - g=g, - crf=crf, - fast_decode=fast_decode, - overwrite=True, - ) - - # Clean up temporary images - shutil.rmtree(imgs_dir) - - # Store video metadata - video_metadata[img_key] = { - f"videos/{img_key}/chunk_index": chunk_idx, - f"videos/{img_key}/file_index": file_idx, - f"videos/{img_key}/from_timestamp": 0.0, - f"videos/{img_key}/to_timestamp": episode_duration, - } - - return video_metadata - - -def convert_dataset_to_videos( - dataset: LeRobotDataset, - output_dir: Path, - repo_id: str | None = None, - vcodec: str = "libsvtav1", - pix_fmt: str = "yuv420p", - g: int = 2, - crf: int = 30, - fast_decode: int = 0, - episode_indices: list[int] | None = None, - num_workers: int = 4, -) -> LeRobotDataset: - """Convert image-based dataset to video-based dataset. - - Creates a new LeRobotDataset with videos instead of images, following the proper - LeRobot dataset structure with videos stored in chunked MP4 files. - - Args: - dataset: The source LeRobot dataset with images - output_dir: Directory to save the new video dataset - repo_id: Repository ID for the new dataset (default: original_id + "_video") - vcodec: Video codec (default: libsvtav1) - pix_fmt: Pixel format (default: yuv420p) - g: Group of pictures size (default: 2) - crf: Constant rate factor (default: 30) - fast_decode: Fast decode tuning (default: 0) - episode_indices: List of episode indices to convert (None = all episodes) - num_workers: Number of threads for parallel processing (default: 4) - - Returns: - New LeRobotDataset with videos - """ - # Check that it's an image dataset - if len(dataset.meta.video_keys) > 0: - raise ValueError( - f"This operation is for image datasets only. Video dataset provided: {dataset.repo_id}" - ) - - # Get all image keys - hf_dataset = dataset.hf_dataset.with_format(None) - img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)] - - if len(img_keys) == 0: - raise ValueError(f"No image keys found in dataset {dataset.repo_id}") - - # Determine which episodes to process - if episode_indices is None: - episode_indices = list(range(dataset.meta.total_episodes)) - - if repo_id is None: - repo_id = f"{dataset.repo_id}_video" - - logging.info( - f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras from {dataset.repo_id}" - ) - logging.info(f"Video codec: {vcodec}, pixel format: {pix_fmt}, GOP: {g}, CRF: {crf}") - - # Create new features dict, converting image features to video features - new_features = {} - for key, value in dataset.meta.features.items(): - if key not in img_keys: - new_features[key] = value - else: - # Convert image key to video format - new_features[key] = value.copy() - new_features[key]["dtype"] = "video" # Change dtype from "image" to "video" - # Video info will be updated after episodes are encoded - - # Create new metadata for video dataset - new_meta = LeRobotDatasetMetadata.create( - repo_id=repo_id, - fps=dataset.meta.fps, - features=new_features, - robot_type=dataset.meta.robot_type, - root=output_dir, - use_videos=True, - chunks_size=dataset.meta.chunks_size, - data_files_size_in_mb=dataset.meta.data_files_size_in_mb, - video_files_size_in_mb=dataset.meta.video_files_size_in_mb, - ) - - # Create temporary directory for image extraction - temp_dir = output_dir / "temp_images" - temp_dir.mkdir(parents=True, exist_ok=True) - - # Process each episode - all_episode_metadata = [] - - try: - for ep_idx in tqdm(episode_indices, desc="Converting episodes to videos"): - # Get episode metadata from source - src_episode = dataset.meta.episodes[ep_idx] - - # Encode videos for this episode - video_metadata = encode_episode_videos( - dataset=dataset, - new_meta=new_meta, - episode_index=ep_idx, - vcodec=vcodec, - pix_fmt=pix_fmt, - g=g, - crf=crf, - fast_decode=fast_decode, - temp_dir=temp_dir, - num_image_workers=num_workers, - ) - - # Build episode metadata - episode_meta = { - "episode_index": ep_idx, - "length": src_episode["length"], - "dataset_from_index": ep_idx * src_episode["length"], - "dataset_to_index": (ep_idx + 1) * src_episode["length"], - } - - # Add video metadata - for img_key in img_keys: - episode_meta.update(video_metadata[img_key]) - - # Add data chunk/file info (using same structure as source) - if "data/chunk_index" in src_episode: - episode_meta["data/chunk_index"] = src_episode["data/chunk_index"] - episode_meta["data/file_index"] = src_episode["data/file_index"] - - all_episode_metadata.append(episode_meta) - - # Copy and transform data files (removing image columns) - _copy_data_without_images(dataset, new_meta, episode_indices, img_keys) - - # Save episode metadata - episodes_df = pd.DataFrame(all_episode_metadata) - episodes_path = new_meta.root / "meta" / "episodes" / "chunk-000" / "file-000.parquet" - episodes_path.parent.mkdir(parents=True, exist_ok=True) - episodes_df.to_parquet(episodes_path, index=False) - - # Update metadata info - new_meta.info["total_episodes"] = len(episode_indices) - new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata) - new_meta.info["total_tasks"] = dataset.meta.total_tasks - new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"} - - # Update video info for all image keys (now videos) - # We need to manually set video info since update_video_info() checks video_keys first - for img_key in img_keys: - if not new_meta.features[img_key].get("info", None): - video_path = new_meta.root / new_meta.video_path.format( - video_key=img_key, chunk_index=0, file_index=0 - ) - new_meta.info["features"][img_key]["info"] = get_video_info(video_path) - - from lerobot.datasets.utils import write_info - - write_info(new_meta.info, new_meta.root) - - # Copy stats and tasks - if dataset.meta.stats is not None: - # Remove image stats - new_stats = {k: v for k, v in dataset.meta.stats.items() if k not in img_keys} - write_stats(new_stats, new_meta.root) - - if dataset.meta.tasks is not None: - write_tasks(dataset.meta.tasks, new_meta.root) - - finally: - # Clean up temporary directory - if temp_dir.exists(): - shutil.rmtree(temp_dir) - - logging.info(f"✓ Completed converting {dataset.repo_id} to video format") - logging.info(f"New dataset saved to: {output_dir}") - - # Return new dataset - return LeRobotDataset(repo_id=repo_id, root=output_dir) - - -def _copy_data_without_images( - src_dataset: LeRobotDataset, - dst_meta: LeRobotDatasetMetadata, - episode_indices: list[int], - img_keys: list[str], -) -> None: - """Copy data files without image columns. - - Args: - src_dataset: Source dataset - dst_meta: Destination metadata - episode_indices: Episodes to include - img_keys: Image keys to remove - """ - from lerobot.datasets.utils import DATA_DIR - - data_dir = src_dataset.root / DATA_DIR - parquet_files = sorted(data_dir.glob("*/*.parquet")) - - if not parquet_files: - raise ValueError(f"No parquet files found in {data_dir}") - - episode_set = set(episode_indices) - - for src_path in tqdm(parquet_files, desc="Processing data files"): - df = pd.read_parquet(src_path).reset_index(drop=True) - - # Filter to only include selected episodes - df = df[df["episode_index"].isin(episode_set)].copy() - - if len(df) == 0: - continue - - # Remove image columns - columns_to_drop = [col for col in img_keys if col in df.columns] - if columns_to_drop: - df = df.drop(columns=columns_to_drop) - - # Get chunk and file indices from path - relative_path = src_path.relative_to(src_dataset.root) - chunk_dir = relative_path.parts[1] - file_name = relative_path.parts[2] - chunk_idx = int(chunk_dir.split("-")[1]) - file_idx = int(file_name.split("-")[1].split(".")[0]) - - # Write to destination without pandas index - dst_path = dst_meta.root / f"data/chunk-{chunk_idx:03d}/file-{file_idx:03d}.parquet" - dst_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(dst_path, index=False) - - -def handle_convert_to_video(cfg: EditDatasetConfig) -> None: +def handle_convert_image_to_video(cfg: EditDatasetConfig) -> None: # Note: Parser may create any config type with the right fields, so we access fields directly # instead of checking isinstance() dataset = LeRobotDataset(cfg.repo_id, root=cfg.root) @@ -664,8 +308,12 @@ def handle_convert_to_video(cfg: EditDatasetConfig) -> None: if cfg.new_repo_id: # Use new_repo_id for both local storage and hub push output_repo_id = cfg.new_repo_id - output_dir = Path(cfg.root) / cfg.new_repo_id if cfg.root else HF_LEROBOT_HOME / cfg.new_repo_id - logging.info(f"Saving to new dataset: {cfg.new_repo_id}") + # Place new dataset as a sibling to the original dataset + # Get the parent of the actual dataset root (not cfg.root which might be the lerobot cache dir) + # Extract just the dataset name (after last slash) for the local directory + local_dir_name = cfg.new_repo_id.split("/")[-1] + output_dir = dataset.root.parent / local_dir_name + logging.info(f"Saving to new dataset: {cfg.new_repo_id} at {output_dir}") elif output_dir_config: # Use custom output directory for local-only storage output_dir = Path(output_dir_config) @@ -675,12 +323,15 @@ def handle_convert_to_video(cfg: EditDatasetConfig) -> None: else: # Auto-generate name: append "_video" to original repo_id output_repo_id = f"{cfg.repo_id}_video" - output_dir = Path(cfg.root) / output_repo_id if cfg.root else HF_LEROBOT_HOME / output_repo_id + # Place new dataset as a sibling to the original dataset + # Extract just the dataset name (after last slash) for the local directory + local_dir_name = output_repo_id.split("/")[-1] + output_dir = dataset.root.parent / local_dir_name logging.info(f"Saving to auto-generated location: {output_dir}") logging.info(f"Converting dataset {cfg.repo_id} to video format") - new_dataset = convert_dataset_to_videos( + new_dataset = convert_image_to_video_dataset( dataset=dataset, output_dir=output_dir, repo_id=output_repo_id, @@ -691,6 +342,8 @@ def handle_convert_to_video(cfg: EditDatasetConfig) -> None: fast_decode=getattr(cfg.operation, "fast_decode", 0), episode_indices=getattr(cfg.operation, "episode_indices", None), num_workers=getattr(cfg.operation, "num_workers", 4), + max_episodes_per_batch=getattr(cfg.operation, "max_episodes_per_batch", None), + max_frames_per_batch=getattr(cfg.operation, "max_frames_per_batch", None), ) logging.info("Video dataset created successfully!") @@ -718,8 +371,8 @@ def edit_dataset(cfg: EditDatasetConfig) -> None: handle_merge(cfg) elif operation_type == "remove_feature": handle_remove_feature(cfg) - elif operation_type == "convert_to_video": - handle_convert_to_video(cfg) + elif operation_type == "convert_image_to_video": + handle_convert_image_to_video(cfg) else: raise ValueError( f"Unknown operation type: {operation_type}\n" diff --git a/tests/datasets/test_dataset_tools.py b/tests/datasets/test_dataset_tools.py index 3a4516fc8..35a369de9 100644 --- a/tests/datasets/test_dataset_tools.py +++ b/tests/datasets/test_dataset_tools.py @@ -29,7 +29,7 @@ from lerobot.datasets.dataset_tools import ( remove_feature, split_dataset, ) -from lerobot.scripts.lerobot_edit_dataset import convert_dataset_to_videos +from lerobot.scripts.lerobot_edit_dataset import convert_image_to_video_dataset @pytest.fixture @@ -1050,7 +1050,7 @@ def test_modify_features_preserves_file_structure(sample_dataset, tmp_path): assert "reward" in modified_dataset.meta.features -def test_convert_dataset_to_videos(tmp_path): +def test_convert_image_to_video_dataset(tmp_path): """Test converting lerobot/pusht_image dataset to video format.""" from lerobot.datasets.lerobot_dataset import LeRobotDataset @@ -1071,7 +1071,7 @@ def test_convert_dataset_to_videos(tmp_path): assert "observation.image" in source_dataset.meta.features # Convert to video dataset (only first 2 episodes for speed) - video_dataset = convert_dataset_to_videos( + video_dataset = convert_image_to_video_dataset( dataset=source_dataset, output_dir=output_dir, repo_id="lerobot/pusht_video", @@ -1113,7 +1113,7 @@ def test_convert_dataset_to_videos(tmp_path): shutil.rmtree(output_dir) -def test_convert_dataset_to_videos_subset_episodes(tmp_path): +def test_convert_image_to_video_dataset_subset_episodes(tmp_path): """Test converting only specific episodes from lerobot/pusht_image to video format.""" from lerobot.datasets.lerobot_dataset import LeRobotDataset @@ -1132,7 +1132,7 @@ def test_convert_dataset_to_videos_subset_episodes(tmp_path): # Convert only episode 0 to video (subset of loaded episodes) episode_indices = [0] - video_dataset = convert_dataset_to_videos( + video_dataset = convert_image_to_video_dataset( dataset=source_dataset, output_dir=output_dir, repo_id="lerobot/pusht_video_subset",