From d38eb89f7117d0ea945e1a0d686f03501b49508e Mon Sep 17 00:00:00 2001 From: Caroline Pascal Date: Tue, 19 May 2026 14:46:14 +0200 Subject: [PATCH] feat(video re-encoding): Adding utility and dataset edition tool for video re-encoding (#3611) * feat(utility): adding video re-encode utility * feat(edit): adding a new lerobot-edit-dataset tool to re-encode all the videos of a dataset * chore(format): formatting code * chore(review): fix Claude reviews * test(reencode dataset): adding missing test for reencode dataset --- src/lerobot/datasets/__init__.py | 2 + src/lerobot/datasets/dataset_tools.py | 84 ++++++++++++++++++- src/lerobot/datasets/video_utils.py | 86 ++++++++++++++++++++ src/lerobot/scripts/lerobot_edit_dataset.py | 89 +++++++++++++++++++++ tests/datasets/test_dataset_tools.py | 42 ++++++++++ tests/datasets/test_video_encoding.py | 49 +++++++++--- 6 files changed, 342 insertions(+), 10 deletions(-) diff --git a/src/lerobot/datasets/__init__.py b/src/lerobot/datasets/__init__.py index e4e3ccdf6..2a67858d2 100644 --- a/src/lerobot/datasets/__init__.py +++ b/src/lerobot/datasets/__init__.py @@ -31,6 +31,7 @@ from .dataset_tools import ( modify_features, modify_tasks, recompute_stats, + reencode_dataset, remove_feature, split_dataset, ) @@ -91,6 +92,7 @@ __all__ = [ "modify_features", "modify_tasks", "recompute_stats", + "reencode_dataset", "remove_feature", "resolve_delta_timestamps", "safe_stop_image_writer", diff --git a/src/lerobot/datasets/dataset_tools.py b/src/lerobot/datasets/dataset_tools.py index 489914fbc..adbb841c4 100644 --- a/src/lerobot/datasets/dataset_tools.py +++ b/src/lerobot/datasets/dataset_tools.py @@ -26,7 +26,7 @@ This module provides utilities for: import logging import shutil from collections.abc import Callable -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from pathlib import Path import datasets @@ -61,11 +61,13 @@ from .utils import ( DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_DATA_PATH, DEFAULT_EPISODES_PATH, + VIDEO_DIR, update_chunk_file_indices, ) from .video_utils import ( encode_video_frames, get_video_info, + reencode_video, ) @@ -1884,3 +1886,83 @@ def convert_image_to_video_dataset( # Return new dataset return LeRobotDataset(repo_id=repo_id, root=output_dir) + + +def _reencode_video_worker(args: tuple) -> Path: + """Picklable worker for :func:`reencode_dataset`'s process pool.""" + video_path, camera_encoder, encoder_threads = args + reencode_video( + input_video_path=video_path, + output_video_path=video_path, + camera_encoder=camera_encoder, + encoder_threads=encoder_threads, + overwrite=True, + ) + return video_path + + +def reencode_dataset( + dataset: LeRobotDataset, + camera_encoder: VideoEncoderConfig, + encoder_threads: int | None = None, + num_workers: int | None = None, +) -> LeRobotDataset: + """Re-encode every video in a dataset with a new set of encoding parameters. + + Videos are re-encoded in-place and the video information in ``info.json`` is refreshed. + + Args: + dataset: An existing :class:`LeRobotDataset` whose videos will be + re-encoded. + camera_encoder: Target encoder configuration applied to every video + file. + encoder_threads: Per-encoder thread count forwarded to + :func:`reencode_video`. ``None`` lets the codec decide. + num_workers: Number of parallel processes. ``None`` or ``0`` means + sequential (no multiprocessing); ``1+`` spawns a + :class:`~concurrent.futures.ProcessPoolExecutor`. + + Returns: + The same :class:`LeRobotDataset` instance with its metadata updated + on disk. + """ + meta = dataset.meta + video_paths_list = [] + + # Only re-encode if the videos are not already encoded with the given video encoding parameters + for video_key in meta.video_keys: + current_info = meta.info.features[video_key].get("info", {}) + current_encoder = VideoEncoderConfig.from_video_info(current_info) + if current_encoder != camera_encoder: + video_paths_list.extend((meta.root / VIDEO_DIR / video_key).rglob("*.mp4")) + else: + logging.info(f"{video_key} videos are already encoded with {camera_encoder}. Nothing to do.") + + if len(video_paths_list) == 0: + logging.warning("Dataset has no videos to re-encode.") + return dataset + logging.info(f"Re-encoding {len(video_paths_list)} video file(s) with {camera_encoder}") + + worker_args = [(vp, camera_encoder, encoder_threads) for vp in video_paths_list] + if num_workers and num_workers > 1: + with ProcessPoolExecutor(max_workers=num_workers) as pool: + futures = [pool.submit(_reencode_video_worker, args) for args in worker_args] + for future in tqdm( + as_completed(futures), + total=len(futures), + desc="Re-encoding videos", + ): + future.result() + else: + for args in tqdm(worker_args, desc="Re-encoding videos"): + _reencode_video_worker(args) + + # Refresh video info in metadata for every video key. + for vid_key in meta.video_keys: + video_path = meta.root / meta.get_video_file_path(0, vid_key) + meta.info.features[vid_key]["info"] = get_video_info(video_path, camera_encoder=camera_encoder) + + write_info(meta.info, meta.root) + logging.info("Dataset metadata updated.") + + return dataset diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index e823a406c..99122381a 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -403,6 +403,92 @@ def encode_video_frames( raise OSError(f"Video encoding did not work. File not found: {video_path}.") +def reencode_video( + input_video_path: Path | str, + output_video_path: Path | str, + camera_encoder: VideoEncoderConfig | None = None, + encoder_threads: int | None = None, + log_level: int | None = av.logging.WARNING, + overwrite: bool = False, +) -> None: + """Re-encode a video file using the given encoder configuration. + + Args: + input_video_path: Existing video file to read. + output_video_path: Path for the re-encoded file. + camera_encoder: Encoder configuration. Defaults to :func:`camera_encoder_defaults`. + encoder_threads: Optional thread count forwarded to :meth:`VideoEncoderConfig.get_codec_options`. + log_level: libav log level while encoding, or ``None`` to leave logging unchanged. Defaults to WARNING. + overwrite: When ``False`` and ``output_video_path`` already exists, skip and log a warning. + """ + + camera_encoder = camera_encoder or camera_encoder_defaults() + + output_video_path = Path(output_video_path) + + if output_video_path.exists() and not overwrite: + logger.warning(f"Video file already exists: {output_video_path}. Skipping re-encode.") + return + + output_video_path.parent.mkdir(parents=True, exist_ok=True) + + video_options = camera_encoder.get_codec_options(encoder_threads, as_strings=True) + vcodec = camera_encoder.vcodec + pix_fmt = camera_encoder.pix_fmt + + with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_named_file: + tmp_output_video_path = tmp_named_file.name + + if log_level is not None: + logging.getLogger("libav").setLevel(log_level) + + try: + with av.open(input_video_path, mode="r") as src: + try: + in_stream = src.streams.video[0] + except IndexError as e: + raise ValueError(f"No video stream in {input_video_path}") from e + + fps = ( + in_stream.base_rate + ) # We allow fractional fps though LeRobotDataset only supports integer fps + width = int(in_stream.width) + height = int(in_stream.height) + + with av.open( + tmp_output_video_path, + mode="w", + options={ + "movflags": "faststart" + }, # faststart is to move the metadata to the beginning of the file to speed up loading + ) as dst: + out_stream = dst.add_stream(vcodec, fps, options=video_options) + out_stream.pix_fmt = pix_fmt + out_stream.width = width + out_stream.height = height + + for frame in src.decode(in_stream): + frame = frame.reformat(width=width, height=height, format=pix_fmt) + packet = out_stream.encode(frame) + if packet: + dst.mux(packet) + + packet = out_stream.encode() + if packet: + dst.mux(packet) + + shutil.move(tmp_output_video_path, output_video_path) + except Exception: + Path(tmp_output_video_path).unlink(missing_ok=True) + raise + finally: + if log_level is not None: + av.logging.restore_default_callback() + + if not output_video_path.exists(): + raise OSError(f"Video re-encoding did not work. File not found: {output_video_path}.") + + def concatenate_video_files( input_video_paths: list[Path | str], output_video_path: Path, diff --git a/src/lerobot/scripts/lerobot_edit_dataset.py b/src/lerobot/scripts/lerobot_edit_dataset.py index eb6a57870..3c1edbb31 100644 --- a/src/lerobot/scripts/lerobot_edit_dataset.py +++ b/src/lerobot/scripts/lerobot_edit_dataset.py @@ -178,6 +178,31 @@ Recompute stats for relative actions and push to hub: --operation.num_workers 4 \ --push_to_hub true +Re-encode all videos in a dataset (saves to lerobot/pusht_reencoded by default): + lerobot-edit-dataset \ + --repo_id lerobot/pusht \ + --operation.type reencode_videos \ + --operation.camera_encoder.vcodec h264 \ + --operation.camera_encoder.pix_fmt yuv420p \ + --operation.camera_encoder.crf 23 + +Re-encode videos into a new dataset using 4 parallel processes: + lerobot-edit-dataset \ + --repo_id lerobot/pusht \ + --new_repo_id lerobot/pusht_h264 \ + --operation.type reencode_videos \ + --operation.camera_encoder.vcodec h264 \ + --operation.camera_encoder.crf 23 \ + --operation.num_workers 4 + +Re-encode videos in-place (overwrites original dataset): + lerobot-edit-dataset \ + --repo_id lerobot/pusht \ + --new_repo_id lerobot/pusht \ + --operation.type reencode_videos \ + --operation.camera_encoder.vcodec h264 \ + --operation.overwrite true + Using JSON config file: lerobot-edit-dataset \ --config_path path/to/edit_config.json @@ -200,6 +225,7 @@ from lerobot.datasets import ( merge_datasets, modify_tasks, recompute_stats, + reencode_dataset, remove_feature, split_dataset, ) @@ -268,6 +294,15 @@ class RecomputeStatsConfig(OperationConfig): overwrite: bool = False +@OperationConfig.register_subclass("reencode_videos") +@dataclass +class ReencodeVideosConfig(OperationConfig): + camera_encoder: VideoEncoderConfig = field(default_factory=camera_encoder_defaults) + num_workers: int = 0 + encoder_threads: int | None = None + overwrite: bool = False + + @OperationConfig.register_subclass("info") @dataclass class InfoConfig(OperationConfig): @@ -634,6 +669,58 @@ def handle_recompute_stats(cfg: EditDatasetConfig) -> None: dataset.push_to_hub() +def handle_reencode_videos(cfg: EditDatasetConfig) -> None: + if not isinstance(cfg.operation, ReencodeVideosConfig): + raise ValueError("Operation config must be ReencodeVideosConfig") + + output_repo_id, input_root, output_root = _resolve_io_paths( + cfg.repo_id, + cfg.new_repo_id, + cfg.root, + cfg.new_root, + default_new_repo_id=f"{cfg.repo_id}_reencoded", + ) + in_place = output_root == input_root + + if in_place and not cfg.operation.overwrite: + raise ValueError( + f"reencode_videos would overwrite the dataset in-place at {input_root}. " + "Pass --operation.overwrite true to allow in-place modification, " + "or use --new_repo_id / --new_root to write to a different location. " + f"Default output repo_id when neither is set: '{cfg.repo_id}_reencoded'." + ) + + if in_place: + logging.warning( + f"Overwriting dataset videos in-place at {input_root}. The original videos will be lost." + ) + dataset = LeRobotDataset(cfg.repo_id, root=input_root) + else: + logging.info(f"Copying dataset from {input_root} to {output_root}") + if output_root.exists(): + backup_path = output_root.with_name(output_root.name + "_old") + logging.warning(f"Output directory {output_root} already exists. Moving to {backup_path}") + if backup_path.exists(): + shutil.rmtree(backup_path) + shutil.move(output_root, backup_path) + shutil.copytree(input_root, output_root) + dataset = LeRobotDataset(output_repo_id, root=output_root) + + logging.info(f"Re-encoding videos in {output_repo_id} with {cfg.operation.camera_encoder}") + reencode_dataset( + dataset, + camera_encoder=cfg.operation.camera_encoder, + encoder_threads=cfg.operation.encoder_threads, + num_workers=cfg.operation.num_workers, + ) + + logging.info(f"All videos re-encoded at {dataset.root}") + + if cfg.push_to_hub: + logging.info(f"Pushing to hub as {output_repo_id}...") + dataset.push_to_hub() + + def _get_dataset_size(repo_path): import os @@ -707,6 +794,8 @@ def edit_dataset(cfg: EditDatasetConfig) -> None: handle_convert_image_to_video(cfg) elif operation_type == "recompute_stats": handle_recompute_stats(cfg) + elif operation_type == "reencode_videos": + handle_reencode_videos(cfg) elif operation_type == "info": handle_info(cfg) else: diff --git a/tests/datasets/test_dataset_tools.py b/tests/datasets/test_dataset_tools.py index 032fd4f7c..d36312920 100644 --- a/tests/datasets/test_dataset_tools.py +++ b/tests/datasets/test_dataset_tools.py @@ -23,6 +23,7 @@ import torch pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])") + from lerobot.configs import VideoEncoderConfig from lerobot.datasets.dataset_tools import ( add_features, @@ -31,9 +32,12 @@ from lerobot.datasets.dataset_tools import ( merge_datasets, modify_features, modify_tasks, + reencode_dataset, remove_feature, split_dataset, ) +from lerobot.datasets.io_utils import load_info +from tests.datasets.test_video_encoding import _add_frames, require_h264, require_libsvtav1 @pytest.fixture @@ -1326,3 +1330,41 @@ def test_convert_image_to_video_dataset_subset_episodes(tmp_path): if output_dir.exists(): shutil.rmtree(output_dir) + + +# ─── reencode_dataset ───────────────────────────────────────────────── + + +@require_libsvtav1 +@require_h264 +def test_reencode_dataset_multi_key_multiprocessing( + tmp_path, empty_lerobot_dataset_factory, features_factory +): + """Re-encode a two-camera dataset with num_workers=2 and verify metadata refresh.""" + features = features_factory(use_videos=True) + initial_cfg = VideoEncoderConfig(vcodec="libsvtav1", g=2, crf=30, preset=12) + dataset = empty_lerobot_dataset_factory( + root=tmp_path / "ds", + features=features, + use_videos=True, + camera_encoder=initial_cfg, + ) + + _add_frames(dataset, num_frames=4) + dataset.save_episode() + _add_frames(dataset, num_frames=4) + dataset.save_episode() + dataset.finalize() + + assert len(dataset.meta.video_keys) == 2 + + target_cfg = VideoEncoderConfig(vcodec="h264", g=6, crf=23, pix_fmt="yuv420p") + + result = reencode_dataset(dataset, camera_encoder=target_cfg, num_workers=2) + + assert result is dataset + + persisted_info = load_info(dataset.root) + for vk in dataset.meta.video_keys: + persisted_encoder = VideoEncoderConfig.from_video_info(persisted_info.features[vk].get("info", {})) + assert persisted_encoder == target_cfg diff --git a/tests/datasets/test_video_encoding.py b/tests/datasets/test_video_encoding.py index 224f2405b..1af61e9f9 100644 --- a/tests/datasets/test_video_encoding.py +++ b/tests/datasets/test_video_encoding.py @@ -35,6 +35,7 @@ from lerobot.datasets.video_utils import ( concatenate_video_files, encode_video_frames, get_video_info, + reencode_video, ) from tests.fixtures.constants import DUMMY_VIDEO_INFO @@ -347,16 +348,22 @@ def _read_feature_info(dataset: LeRobotDataset) -> dict: return info["features"][VIDEO_KEY]["info"] -def _add_frames(dataset: LeRobotDataset, num_frames: int) -> None: - shape = dataset.meta.features[VIDEO_KEY]["shape"] +def _add_frames(dataset: LeRobotDataset, num_frames: int, video_keys: list[str] | None = None) -> None: + from lerobot.utils.constants import DEFAULT_FEATURES + + if video_keys is None: + video_keys = dataset.meta.video_keys for _ in range(num_frames): - dataset.add_frame( - { - VIDEO_KEY: np.random.randint(0, 256, shape, dtype=np.uint8), - "action": np.zeros(2, dtype=np.float32), - "task": "test", - } - ) + frame: dict = {"task": "test"} + for key, ft in dataset.meta.features.items(): + if key in DEFAULT_FEATURES: + continue + shape = ft["shape"] + if key in video_keys: + frame[key] = np.random.randint(0, 256, shape, dtype=np.uint8) + else: + frame[key] = np.zeros(shape, dtype=np.float32) + dataset.add_frame(frame) class TestGetVideoInfo: @@ -474,6 +481,30 @@ class TestEncodeVideoFrames: assert info["video.extra_options"] == {} +class TestReencodeVideo: + @require_libsvtav1 + @require_h264 + def test_reencode_video(self, tmp_path): + src = TEST_ARTIFACTS_DIR / "clip_4frames.mp4" + out = tmp_path / "reencoded.mp4" + cfg = VideoEncoderConfig(vcodec="h264", g=6, crf=23, pix_fmt="yuv444p") + reencode_video(src, out, camera_encoder=cfg, overwrite=True) + + assert out.exists() + with av.open(str(out)) as container: + n_frames = sum(1 for _ in container.decode(video=0)) + assert n_frames == 4 + + info = get_video_info(out, camera_encoder=cfg) + assert info["video.codec"] == "h264" + assert info["video.pix_fmt"] == "yuv444p" + assert info["video.height"] == 64 + assert info["video.width"] == 96 + assert info["video.fps"] == 30 + assert info["video.g"] == 6 + assert info["video.crf"] == 23 + + class TestConcatenateVideoFiles: def test_two_clips_frame_count(self, tmp_path): """Output frame count equals the sum of the two input frame counts."""