Compare commits

...

5 Commits

Author SHA1 Message Date
CarolinePascal 6e01006d94 test(reencode dataset): adding missing test for reencode dataset 2026-05-17 23:23:10 +02:00
CarolinePascal 5547757cea chore(review): fix Claude reviews 2026-05-17 23:22:27 +02:00
CarolinePascal 197fcdb807 chore(format): formatting code 2026-05-15 23:01:59 +02:00
CarolinePascal e8503f5fe6 feat(edit): adding a new lerobot-edit-dataset tool to re-encode all the videos of a dataset 2026-05-15 22:54:23 +02:00
CarolinePascal 7559641c55 feat(utility): adding video re-encode utility 2026-05-15 21:54:12 +02:00
6 changed files with 342 additions and 10 deletions
+2
View File
@@ -31,6 +31,7 @@ from .dataset_tools import (
modify_features,
modify_tasks,
recompute_stats,
reencode_dataset,
remove_feature,
split_dataset,
)
@@ -77,6 +78,7 @@ __all__ = [
"modify_features",
"modify_tasks",
"recompute_stats",
"reencode_dataset",
"remove_feature",
"resolve_delta_timestamps",
"safe_stop_image_writer",
+83 -1
View File
@@ -26,7 +26,7 @@ This module provides utilities for:
import logging
import shutil
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
import datasets
@@ -61,11 +61,13 @@ from .utils import (
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH,
DEFAULT_EPISODES_PATH,
VIDEO_DIR,
update_chunk_file_indices,
)
from .video_utils import (
encode_video_frames,
get_video_info,
reencode_video,
)
@@ -1884,3 +1886,83 @@ def convert_image_to_video_dataset(
# Return new dataset
return LeRobotDataset(repo_id=repo_id, root=output_dir)
def _reencode_video_worker(args: tuple) -> Path:
"""Picklable worker for :func:`reencode_dataset`'s process pool."""
video_path, camera_encoder, encoder_threads = args
reencode_video(
input_video_path=video_path,
output_video_path=video_path,
camera_encoder=camera_encoder,
encoder_threads=encoder_threads,
overwrite=True,
)
return video_path
def reencode_dataset(
dataset: LeRobotDataset,
camera_encoder: VideoEncoderConfig,
encoder_threads: int | None = None,
num_workers: int | None = None,
) -> LeRobotDataset:
"""Re-encode every video in a dataset with a new set of encoding parameters.
Videos are re-encoded in-place and the video information in ``info.json`` is refreshed.
Args:
dataset: An existing :class:`LeRobotDataset` whose videos will be
re-encoded.
camera_encoder: Target encoder configuration applied to every video
file.
encoder_threads: Per-encoder thread count forwarded to
:func:`reencode_video`. ``None`` lets the codec decide.
num_workers: Number of parallel processes. ``None`` or ``0`` means
sequential (no multiprocessing); ``1+`` spawns a
:class:`~concurrent.futures.ProcessPoolExecutor`.
Returns:
The same :class:`LeRobotDataset` instance with its metadata updated
on disk.
"""
meta = dataset.meta
video_paths_list = []
# Only re-encode if the videos are not already encoded with the given video encoding parameters
for video_key in meta.video_keys:
current_info = meta.info.features[video_key].get("info", {})
current_encoder = VideoEncoderConfig.from_video_info(current_info)
if current_encoder != camera_encoder:
video_paths_list.extend((meta.root / VIDEO_DIR / video_key).rglob("*.mp4"))
else:
logging.info(f"{video_key} videos are already encoded with {camera_encoder}. Nothing to do.")
if len(video_paths_list) == 0:
logging.warning("Dataset has no videos to re-encode.")
return dataset
logging.info(f"Re-encoding {len(video_paths_list)} video file(s) with {camera_encoder}")
worker_args = [(vp, camera_encoder, encoder_threads) for vp in video_paths_list]
if num_workers and num_workers > 1:
with ProcessPoolExecutor(max_workers=num_workers) as pool:
futures = [pool.submit(_reencode_video_worker, args) for args in worker_args]
for future in tqdm(
as_completed(futures),
total=len(futures),
desc="Re-encoding videos",
):
future.result()
else:
for args in tqdm(worker_args, desc="Re-encoding videos"):
_reencode_video_worker(args)
# Refresh video info in metadata for every video key.
for vid_key in meta.video_keys:
video_path = meta.root / meta.get_video_file_path(0, vid_key)
meta.info.features[vid_key]["info"] = get_video_info(video_path, camera_encoder=camera_encoder)
write_info(meta.info, meta.root)
logging.info("Dataset metadata updated.")
return dataset
+86
View File
@@ -403,6 +403,92 @@ def encode_video_frames(
raise OSError(f"Video encoding did not work. File not found: {video_path}.")
def reencode_video(
input_video_path: Path | str,
output_video_path: Path | str,
camera_encoder: VideoEncoderConfig | None = None,
encoder_threads: int | None = None,
log_level: int | None = av.logging.WARNING,
overwrite: bool = False,
) -> None:
"""Re-encode a video file using the given encoder configuration.
Args:
input_video_path: Existing video file to read.
output_video_path: Path for the re-encoded file.
camera_encoder: Encoder configuration. Defaults to :func:`camera_encoder_defaults`.
encoder_threads: Optional thread count forwarded to :meth:`VideoEncoderConfig.get_codec_options`.
log_level: libav log level while encoding, or ``None`` to leave logging unchanged. Defaults to WARNING.
overwrite: When ``False`` and ``output_video_path`` already exists, skip and log a warning.
"""
camera_encoder = camera_encoder or camera_encoder_defaults()
output_video_path = Path(output_video_path)
if output_video_path.exists() and not overwrite:
logger.warning(f"Video file already exists: {output_video_path}. Skipping re-encode.")
return
output_video_path.parent.mkdir(parents=True, exist_ok=True)
video_options = camera_encoder.get_codec_options(encoder_threads, as_strings=True)
vcodec = camera_encoder.vcodec
pix_fmt = camera_encoder.pix_fmt
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_named_file:
tmp_output_video_path = tmp_named_file.name
if log_level is not None:
logging.getLogger("libav").setLevel(log_level)
try:
with av.open(input_video_path, mode="r") as src:
try:
in_stream = src.streams.video[0]
except IndexError as e:
raise ValueError(f"No video stream in {input_video_path}") from e
fps = (
in_stream.base_rate
) # We allow fractional fps though LeRobotDataset only supports integer fps
width = int(in_stream.width)
height = int(in_stream.height)
with av.open(
tmp_output_video_path,
mode="w",
options={
"movflags": "faststart"
}, # faststart is to move the metadata to the beginning of the file to speed up loading
) as dst:
out_stream = dst.add_stream(vcodec, fps, options=video_options)
out_stream.pix_fmt = pix_fmt
out_stream.width = width
out_stream.height = height
for frame in src.decode(in_stream):
frame = frame.reformat(width=width, height=height, format=pix_fmt)
packet = out_stream.encode(frame)
if packet:
dst.mux(packet)
packet = out_stream.encode()
if packet:
dst.mux(packet)
shutil.move(tmp_output_video_path, output_video_path)
except Exception:
Path(tmp_output_video_path).unlink(missing_ok=True)
raise
finally:
if log_level is not None:
av.logging.restore_default_callback()
if not output_video_path.exists():
raise OSError(f"Video re-encoding did not work. File not found: {output_video_path}.")
def concatenate_video_files(
input_video_paths: list[Path | str],
output_video_path: Path,
@@ -178,6 +178,31 @@ Recompute stats for relative actions and push to hub:
--operation.num_workers 4 \
--push_to_hub true
Re-encode all videos in a dataset (saves to lerobot/pusht_reencoded by default):
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type reencode_videos \
--operation.camera_encoder.vcodec h264 \
--operation.camera_encoder.pix_fmt yuv420p \
--operation.camera_encoder.crf 23
Re-encode videos into a new dataset using 4 parallel processes:
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--new_repo_id lerobot/pusht_h264 \
--operation.type reencode_videos \
--operation.camera_encoder.vcodec h264 \
--operation.camera_encoder.crf 23 \
--operation.num_workers 4
Re-encode videos in-place (overwrites original dataset):
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--new_repo_id lerobot/pusht \
--operation.type reencode_videos \
--operation.camera_encoder.vcodec h264 \
--operation.overwrite true
Using JSON config file:
lerobot-edit-dataset \
--config_path path/to/edit_config.json
@@ -200,6 +225,7 @@ from lerobot.datasets import (
merge_datasets,
modify_tasks,
recompute_stats,
reencode_dataset,
remove_feature,
split_dataset,
)
@@ -268,6 +294,15 @@ class RecomputeStatsConfig(OperationConfig):
overwrite: bool = False
@OperationConfig.register_subclass("reencode_videos")
@dataclass
class ReencodeVideosConfig(OperationConfig):
camera_encoder: VideoEncoderConfig = field(default_factory=camera_encoder_defaults)
num_workers: int = 0
encoder_threads: int | None = None
overwrite: bool = False
@OperationConfig.register_subclass("info")
@dataclass
class InfoConfig(OperationConfig):
@@ -634,6 +669,58 @@ def handle_recompute_stats(cfg: EditDatasetConfig) -> None:
dataset.push_to_hub()
def handle_reencode_videos(cfg: EditDatasetConfig) -> None:
if not isinstance(cfg.operation, ReencodeVideosConfig):
raise ValueError("Operation config must be ReencodeVideosConfig")
output_repo_id, input_root, output_root = _resolve_io_paths(
cfg.repo_id,
cfg.new_repo_id,
cfg.root,
cfg.new_root,
default_new_repo_id=f"{cfg.repo_id}_reencoded",
)
in_place = output_root == input_root
if in_place and not cfg.operation.overwrite:
raise ValueError(
f"reencode_videos would overwrite the dataset in-place at {input_root}. "
"Pass --operation.overwrite true to allow in-place modification, "
"or use --new_repo_id / --new_root to write to a different location. "
f"Default output repo_id when neither is set: '{cfg.repo_id}_reencoded'."
)
if in_place:
logging.warning(
f"Overwriting dataset videos in-place at {input_root}. The original videos will be lost."
)
dataset = LeRobotDataset(cfg.repo_id, root=input_root)
else:
logging.info(f"Copying dataset from {input_root} to {output_root}")
if output_root.exists():
backup_path = output_root.with_name(output_root.name + "_old")
logging.warning(f"Output directory {output_root} already exists. Moving to {backup_path}")
if backup_path.exists():
shutil.rmtree(backup_path)
shutil.move(output_root, backup_path)
shutil.copytree(input_root, output_root)
dataset = LeRobotDataset(output_repo_id, root=output_root)
logging.info(f"Re-encoding videos in {output_repo_id} with {cfg.operation.camera_encoder}")
reencode_dataset(
dataset,
camera_encoder=cfg.operation.camera_encoder,
encoder_threads=cfg.operation.encoder_threads,
num_workers=cfg.operation.num_workers,
)
logging.info(f"All videos re-encoded at {dataset.root}")
if cfg.push_to_hub:
logging.info(f"Pushing to hub as {output_repo_id}...")
dataset.push_to_hub()
def _get_dataset_size(repo_path):
import os
@@ -707,6 +794,8 @@ def edit_dataset(cfg: EditDatasetConfig) -> None:
handle_convert_image_to_video(cfg)
elif operation_type == "recompute_stats":
handle_recompute_stats(cfg)
elif operation_type == "reencode_videos":
handle_reencode_videos(cfg)
elif operation_type == "info":
handle_info(cfg)
else:
+42
View File
@@ -23,6 +23,7 @@ import torch
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
from lerobot.configs import VideoEncoderConfig
from lerobot.datasets.dataset_tools import (
add_features,
@@ -31,9 +32,12 @@ from lerobot.datasets.dataset_tools import (
merge_datasets,
modify_features,
modify_tasks,
reencode_dataset,
remove_feature,
split_dataset,
)
from lerobot.datasets.io_utils import load_info
from tests.datasets.test_video_encoding import _add_frames, require_h264, require_libsvtav1
@pytest.fixture
@@ -1326,3 +1330,41 @@ def test_convert_image_to_video_dataset_subset_episodes(tmp_path):
if output_dir.exists():
shutil.rmtree(output_dir)
# ─── reencode_dataset ─────────────────────────────────────────────────
@require_libsvtav1
@require_h264
def test_reencode_dataset_multi_key_multiprocessing(
tmp_path, empty_lerobot_dataset_factory, features_factory
):
"""Re-encode a two-camera dataset with num_workers=2 and verify metadata refresh."""
features = features_factory(use_videos=True)
initial_cfg = VideoEncoderConfig(vcodec="libsvtav1", g=2, crf=30, preset=12)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds",
features=features,
use_videos=True,
camera_encoder=initial_cfg,
)
_add_frames(dataset, num_frames=4)
dataset.save_episode()
_add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
assert len(dataset.meta.video_keys) == 2
target_cfg = VideoEncoderConfig(vcodec="h264", g=6, crf=23, pix_fmt="yuv420p")
result = reencode_dataset(dataset, camera_encoder=target_cfg, num_workers=2)
assert result is dataset
persisted_info = load_info(dataset.root)
for vk in dataset.meta.video_keys:
persisted_encoder = VideoEncoderConfig.from_video_info(persisted_info.features[vk].get("info", {}))
assert persisted_encoder == target_cfg
+40 -9
View File
@@ -35,6 +35,7 @@ from lerobot.datasets.video_utils import (
concatenate_video_files,
encode_video_frames,
get_video_info,
reencode_video,
)
from tests.fixtures.constants import DUMMY_VIDEO_INFO
@@ -347,16 +348,22 @@ def _read_feature_info(dataset: LeRobotDataset) -> dict:
return info["features"][VIDEO_KEY]["info"]
def _add_frames(dataset: LeRobotDataset, num_frames: int) -> None:
shape = dataset.meta.features[VIDEO_KEY]["shape"]
def _add_frames(dataset: LeRobotDataset, num_frames: int, video_keys: list[str] | None = None) -> None:
from lerobot.utils.constants import DEFAULT_FEATURES
if video_keys is None:
video_keys = dataset.meta.video_keys
for _ in range(num_frames):
dataset.add_frame(
{
VIDEO_KEY: np.random.randint(0, 256, shape, dtype=np.uint8),
"action": np.zeros(2, dtype=np.float32),
"task": "test",
}
)
frame: dict = {"task": "test"}
for key, ft in dataset.meta.features.items():
if key in DEFAULT_FEATURES:
continue
shape = ft["shape"]
if key in video_keys:
frame[key] = np.random.randint(0, 256, shape, dtype=np.uint8)
else:
frame[key] = np.zeros(shape, dtype=np.float32)
dataset.add_frame(frame)
class TestGetVideoInfo:
@@ -474,6 +481,30 @@ class TestEncodeVideoFrames:
assert info["video.extra_options"] == {}
class TestReencodeVideo:
@require_libsvtav1
@require_h264
def test_reencode_video(self, tmp_path):
src = TEST_ARTIFACTS_DIR / "clip_4frames.mp4"
out = tmp_path / "reencoded.mp4"
cfg = VideoEncoderConfig(vcodec="h264", g=6, crf=23, pix_fmt="yuv444p")
reencode_video(src, out, camera_encoder=cfg, overwrite=True)
assert out.exists()
with av.open(str(out)) as container:
n_frames = sum(1 for _ in container.decode(video=0))
assert n_frames == 4
info = get_video_info(out, camera_encoder=cfg)
assert info["video.codec"] == "h264"
assert info["video.pix_fmt"] == "yuv444p"
assert info["video.height"] == 64
assert info["video.width"] == 96
assert info["video.fps"] == 30
assert info["video.g"] == 6
assert info["video.crf"] == 23
class TestConcatenateVideoFiles:
def test_two_clips_frame_count(self, tmp_path):
"""Output frame count equals the sum of the two input frame counts."""