feat(utility): adding video re-encode utility

This commit is contained in:
CarolinePascal
2026-05-15 21:54:12 +02:00
parent 01dcb4c292
commit 7559641c55
2 changed files with 107 additions and 0 deletions
+82
View File
@@ -403,6 +403,88 @@ def encode_video_frames(
raise OSError(f"Video encoding did not work. File not found: {video_path}.") raise OSError(f"Video encoding did not work. File not found: {video_path}.")
def reencode_video(
input_video_path: Path | str,
output_video_path: Path | str,
camera_encoder: VideoEncoderConfig | None = None,
encoder_threads: int | None = None,
log_level: int | None = av.logging.WARNING,
overwrite: bool = False,
) -> None:
"""Re-encode a video file using the given encoder configuration.
Args:
input_video_path: Existing video file to read.
output_video_path: Path for the re-encoded file.
camera_encoder: Encoder configuration. Defaults to :func:`camera_encoder_defaults`.
encoder_threads: Optional thread count forwarded to :meth:`VideoEncoderConfig.get_codec_options`.
log_level: libav log level while encoding, or ``None`` to leave logging unchanged. Defaults to WARNING.
overwrite: When ``False`` and ``output_video_path`` already exists, skip and log a warning.
"""
camera_encoder = camera_encoder or camera_encoder_defaults()
output_video_path = Path(output_video_path)
if output_video_path.exists() and not overwrite:
logger.warning(f"Video file already exists: {output_video_path}. Skipping re-encode.")
return
output_video_path.parent.mkdir(parents=True, exist_ok=True)
video_options = camera_encoder.get_codec_options(encoder_threads, as_strings=True)
vcodec = camera_encoder.vcodec
pix_fmt = camera_encoder.pix_fmt
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_named_file:
tmp_output_video_path = tmp_named_file.name
if log_level is not None:
logging.getLogger("libav").setLevel(log_level)
try:
with av.open(input_video_path, mode="r") as src:
try:
in_stream = src.streams.video[0]
except IndexError as e:
raise ValueError(f"No video stream in {input_video_path}") from e
fps = int(in_stream.base_rate)
width = int(in_stream.width)
height = int(in_stream.height)
with av.open(
tmp_output_video_path,
mode="w",
options={"movflags": "faststart"}, # faststart is to move the metadata to the beginning of the file to speed up loading
) as dst:
out_stream = dst.add_stream(vcodec, fps, options=video_options)
out_stream.pix_fmt = pix_fmt
out_stream.width = width
out_stream.height = height
for frame in src.decode(in_stream):
frame = frame.reformat(width=width, height=height, format=pix_fmt)
packet = out_stream.encode(frame)
if packet:
dst.mux(packet)
packet = out_stream.encode()
if packet:
dst.mux(packet)
except Exception:
Path(tmp_output_video_path).unlink(missing_ok=True)
raise
finally:
if log_level is not None:
av.logging.restore_default_callback()
shutil.move(tmp_output_video_path, output_video_path)
if not output_video_path.exists():
raise OSError(f"Video re-encoding did not work. File not found: {output_video_path}.")
def concatenate_video_files( def concatenate_video_files(
input_video_paths: list[Path | str], input_video_paths: list[Path | str],
output_video_path: Path, output_video_path: Path,
+25
View File
@@ -17,6 +17,7 @@
"""Unit tests for ``lerobot.datasets.video_utils`` encoding functions and ``lerobot.configs.video.VideoEncoderConfig`` config class.""" """Unit tests for ``lerobot.datasets.video_utils`` encoding functions and ``lerobot.configs.video.VideoEncoderConfig`` config class."""
import json import json
import shutil
from pathlib import Path from pathlib import Path
import numpy as np import numpy as np
@@ -35,6 +36,7 @@ from lerobot.datasets.video_utils import (
concatenate_video_files, concatenate_video_files,
encode_video_frames, encode_video_frames,
get_video_info, get_video_info,
reencode_video,
) )
from tests.fixtures.constants import DUMMY_VIDEO_INFO from tests.fixtures.constants import DUMMY_VIDEO_INFO
@@ -474,6 +476,29 @@ class TestEncodeVideoFrames:
assert info["video.extra_options"] == {} assert info["video.extra_options"] == {}
class TestReencodeVideo:
@require_libsvtav1
@require_h264
def test_reencode_video(self, tmp_path):
src = TEST_ARTIFACTS_DIR / "clip_4frames.mp4"
out = tmp_path / "reencoded.mp4"
cfg = VideoEncoderConfig(vcodec="h264", g=6, crf=23, pix_fmt="yuv444p")
reencode_video(src, out, camera_encoder=cfg, overwrite=True)
assert out.exists()
with av.open(str(out)) as container:
n_frames = sum(1 for _ in container.decode(video=0))
assert n_frames == 4
info = get_video_info(out, camera_encoder=cfg)
assert info["video.codec"] == "h264"
assert info["video.pix_fmt"] == "yuv444p"
assert info["video.height"] == 64
assert info["video.width"] == 96
assert info["video.fps"] == 30
assert info["video.g"] == 6
assert info["video.crf"] == 23
class TestConcatenateVideoFiles: class TestConcatenateVideoFiles:
def test_two_clips_frame_count(self, tmp_path): def test_two_clips_frame_count(self, tmp_path):
"""Output frame count equals the sum of the two input frame counts.""" """Output frame count equals the sum of the two input frame counts."""