From e8503f5fe647743d300def4b068f9857f07f3b79 Mon Sep 17 00:00:00 2001 From: CarolinePascal Date: Fri, 15 May 2026 22:54:23 +0200 Subject: [PATCH] feat(edit): adding a new lerobot-edit-dataset tool to re-encode all the videos of a dataset --- src/lerobot/datasets/__init__.py | 2 + src/lerobot/datasets/dataset_tools.py | 78 +++++++++++++++++++- src/lerobot/scripts/lerobot_edit_dataset.py | 82 +++++++++++++++++++++ 3 files changed, 161 insertions(+), 1 deletion(-) diff --git a/src/lerobot/datasets/__init__.py b/src/lerobot/datasets/__init__.py index b51ef0222..70f7c7b63 100644 --- a/src/lerobot/datasets/__init__.py +++ b/src/lerobot/datasets/__init__.py @@ -31,6 +31,7 @@ from .dataset_tools import ( modify_features, modify_tasks, recompute_stats, + reencode_dataset, remove_feature, split_dataset, ) @@ -77,6 +78,7 @@ __all__ = [ "modify_features", "modify_tasks", "recompute_stats", + "reencode_dataset", "remove_feature", "resolve_delta_timestamps", "safe_stop_image_writer", diff --git a/src/lerobot/datasets/dataset_tools.py b/src/lerobot/datasets/dataset_tools.py index 489914fbc..6af190a89 100644 --- a/src/lerobot/datasets/dataset_tools.py +++ b/src/lerobot/datasets/dataset_tools.py @@ -26,7 +26,7 @@ This module provides utilities for: import logging import shutil from collections.abc import Callable -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor from pathlib import Path import datasets @@ -61,11 +61,13 @@ from .utils import ( DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_DATA_PATH, DEFAULT_EPISODES_PATH, + VIDEO_DIR, update_chunk_file_indices, ) from .video_utils import ( encode_video_frames, get_video_info, + reencode_video, ) @@ -1884,3 +1886,77 @@ def convert_image_to_video_dataset( # Return new dataset return LeRobotDataset(repo_id=repo_id, root=output_dir) + + +def _reencode_video_worker(args: tuple) -> Path: + """Picklable worker for :func:`reencode_dataset`'s process pool.""" + video_path, camera_encoder, encoder_threads = args + reencode_video( + input_video_path=video_path, + output_video_path=video_path, + camera_encoder=camera_encoder, + encoder_threads=encoder_threads, + overwrite=True, + ) + return video_path + + +def reencode_dataset( + dataset: LeRobotDataset, + camera_encoder: VideoEncoderConfig, + encoder_threads: int | None = None, + num_workers: int | None = None, +) -> LeRobotDataset: + """Re-encode every video in a dataset with a new set of encoding parameters. + + Videos are re-encoded in-place and the video information in ``info.json`` is refreshed. + + Args: + dataset: An existing :class:`LeRobotDataset` whose videos will be + re-encoded. + camera_encoder: Target encoder configuration applied to every video + file. + encoder_threads: Per-encoder thread count forwarded to + :func:`reencode_video`. ``None`` lets the codec decide. + num_workers: Number of parallel processes. ``None`` or ``0`` means + sequential (no multiprocessing); ``1+`` spawns a + :class:`~multiprocessing.pool.Pool`. + + Returns: + The same :class:`LeRobotDataset` instance with its metadata updated + on disk. + """ + meta = dataset.meta + video_paths_list = sorted((meta.root / VIDEO_DIR).rglob("*.mp4")) + if len(video_paths_list) == 0: + logging.warning("Dataset has no videos to re-encode.") + return dataset + logging.info(f"Re-encoding {len(video_paths_list)} video file(s) with {camera_encoder}") + + worker_args = [ + (vp, camera_encoder, encoder_threads) for vp in video_paths_list + ] + if num_workers and num_workers >= 1: + with ProcessPoolExecutor(max_workers=num_workers) as pool: + futures = [pool.submit(_reencode_video_worker, args) for args in worker_args] + for future in tqdm( + as_completed(futures), + total=len(futures), + desc="Re-encoding videos", + ): + future.result() + else: + for args in tqdm(worker_args, desc="Re-encoding videos"): + _reencode_video_worker(args) + + # Refresh video info in metadata for every video key. + for vid_key in meta.video_keys: + video_path = meta.root / meta.get_video_file_path(0, vid_key) + meta.info.features[vid_key]["info"] = get_video_info( + video_path, camera_encoder=camera_encoder + ) + + write_info(meta.info, meta.root) + logging.info("Dataset metadata updated.") + + return dataset diff --git a/src/lerobot/scripts/lerobot_edit_dataset.py b/src/lerobot/scripts/lerobot_edit_dataset.py index eb6a57870..3cfe8bd4f 100644 --- a/src/lerobot/scripts/lerobot_edit_dataset.py +++ b/src/lerobot/scripts/lerobot_edit_dataset.py @@ -178,6 +178,23 @@ Recompute stats for relative actions and push to hub: --operation.num_workers 4 \ --push_to_hub true +Re-encode all videos in a dataset in-place with a new codec: + lerobot-edit-dataset \ + --repo_id lerobot/pusht \ + --operation.type reencode_videos \ + --operation.camera_encoder.vcodec h264 \ + --operation.camera_encoder.pix_fmt yuv420p \ + --operation.camera_encoder.crf 23 + +Re-encode videos into a new dataset using 4 parallel processes: + lerobot-edit-dataset \ + --repo_id lerobot/pusht \ + --new_repo_id lerobot/pusht_h264 \ + --operation.type reencode_videos \ + --operation.camera_encoder.vcodec h264 \ + --operation.camera_encoder.crf 23 \ + --operation.num_workers 4 + Using JSON config file: lerobot-edit-dataset \ --config_path path/to/edit_config.json @@ -195,11 +212,13 @@ import draccus from lerobot.configs import VideoEncoderConfig, camera_encoder_defaults, parser from lerobot.datasets import ( LeRobotDataset, + LeRobotDatasetMetadata, convert_image_to_video_dataset, delete_episodes, merge_datasets, modify_tasks, recompute_stats, + reencode_dataset, remove_feature, split_dataset, ) @@ -268,6 +287,14 @@ class RecomputeStatsConfig(OperationConfig): overwrite: bool = False +@OperationConfig.register_subclass("reencode_videos") +@dataclass +class ReencodeVideosConfig(OperationConfig): + camera_encoder: VideoEncoderConfig = field(default_factory=camera_encoder_defaults) + num_workers: int = 0 + encoder_threads: int | None = None + + @OperationConfig.register_subclass("info") @dataclass class InfoConfig(OperationConfig): @@ -634,6 +661,59 @@ def handle_recompute_stats(cfg: EditDatasetConfig) -> None: dataset.push_to_hub() +def handle_reencode_videos(cfg: EditDatasetConfig) -> None: + if not isinstance(cfg.operation, ReencodeVideosConfig): + raise ValueError("Operation config must be ReencodeVideosConfig") + + meta = LeRobotDatasetMetadata(cfg.repo_id, root=cfg.root) + + first_video_key = meta.video_keys[0] if meta.video_keys else None + if first_video_key is not None: + current_info = meta.features[first_video_key].get("info", {}) + current_encoder = VideoEncoderConfig.from_video_info(current_info) + if current_encoder == cfg.operation.camera_encoder: + logging.info( + f"Videos in {cfg.repo_id} are already encoded with {current_encoder}. " + "Nothing to do." + ) + return + else: + raise ValueError("Dataset has no video features — nothing to re-encode.") + + output_repo_id, input_path, output_path = _resolve_io_paths( + cfg.repo_id, cfg.new_repo_id, cfg.root, cfg.new_root + ) + + if output_path == input_path: + backup_path = input_path.with_name(input_path.name + "_old") + logging.info(f"In-place re-encode — backing up dataset to {backup_path}") + if backup_path.exists(): + shutil.rmtree(backup_path) + shutil.copytree(input_path, backup_path) + else: + logging.info(f"Copying dataset from {input_path} to {output_path}") + if output_path.exists(): + shutil.rmtree(output_path) + shutil.copytree(input_path, output_path) + + logging.info(f"Re-encoding videos in {output_repo_id} with {cfg.operation.camera_encoder}") + + dataset = LeRobotDataset(output_repo_id, root=output_path) + + reencode_dataset( + dataset, + camera_encoder=cfg.operation.camera_encoder, + encoder_threads=cfg.operation.encoder_threads, + num_workers=cfg.operation.num_workers, + ) + + logging.info(f"All videos re-encoded at {dataset.root}") + + if cfg.push_to_hub: + logging.info(f"Pushing to hub as {output_repo_id}...") + dataset.push_to_hub() + + def _get_dataset_size(repo_path): import os @@ -707,6 +787,8 @@ def edit_dataset(cfg: EditDatasetConfig) -> None: handle_convert_image_to_video(cfg) elif operation_type == "recompute_stats": handle_recompute_stats(cfg) + elif operation_type == "reencode_videos": + handle_reencode_videos(cfg) elif operation_type == "info": handle_info(cfg) else: