feat(trim): adding optional trimming option in reencode_video (#3779)

* feat(trim): adding optional trimming option in reencode_video

* tests(trim): add triming test

---------

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
This commit is contained in:
Caroline Pascal
2026-06-12 11:29:26 +02:00
committed by GitHub
parent 87242cfced
commit 0e9bd9e6fb
2 changed files with 34 additions and 1 deletions
+21 -1
View File
@@ -481,8 +481,10 @@ def reencode_video(
encoder_threads: int | None = None,
log_level: int | None = av.logging.WARNING,
overwrite: bool = False,
start_time_s: float | None = None,
end_time_s: float | None = None,
) -> None:
"""Re-encode a video file using the given encoder configuration.
"""Re-encode a video file, optionally trimming it to ``[start_time_s, end_time_s)``.
Args:
input_video_path: Existing video file to read.
@@ -491,10 +493,17 @@ def reencode_video(
encoder_threads: Optional thread count forwarded to :meth:`VideoEncoderConfig.get_codec_options`.
log_level: libav log level while encoding, or ``None`` to leave logging unchanged. Defaults to WARNING.
overwrite: When ``False`` and ``output_video_path`` already exists, skip and log a warning.
start_time_s: When set, trim the output to start at this timestamp (seconds).
end_time_s: When set, trim the output to end at this timestamp (seconds, exclusive).
"""
camera_encoder = camera_encoder or camera_encoder_defaults()
if (start_time_s is not None and start_time_s < 0) or (end_time_s is not None and end_time_s < 0):
raise ValueError(f"Trim times must be non-negative, got start={start_time_s}, end={end_time_s}.")
if start_time_s is not None and end_time_s is not None and end_time_s <= start_time_s:
raise ValueError(f"end_time_s ({end_time_s}) must be greater than start_time_s ({start_time_s}).")
output_video_path = Path(output_video_path)
if output_video_path.exists() and not overwrite:
@@ -526,6 +535,10 @@ def reencode_video(
width = int(in_stream.width)
height = int(in_stream.height)
# Seek to the keyframe at or before start_time_s to avoid reading from the start.
if start_time_s is not None:
src.seek(int(start_time_s * av.time_base), backward=True)
with av.open(
tmp_output_video_path,
mode="w",
@@ -539,7 +552,14 @@ def reencode_video(
out_stream.height = height
for frame in src.decode(in_stream):
frame_time_s = frame.time
if start_time_s is not None and frame_time_s < start_time_s:
continue
if end_time_s is not None and frame_time_s >= end_time_s:
break
frame = frame.reformat(width=width, height=height, format=pix_fmt)
if start_time_s is not None:
frame.pts = None # reset timestamps so the trimmed output starts at t=0
packet = out_stream.encode(frame)
if packet:
dst.mux(packet)
+13
View File
@@ -504,6 +504,19 @@ class TestReencodeVideo:
assert info["video.g"] == 6
assert info["video.crf"] == 23
@require_h264
def test_reencode_video_trim_window(self, tmp_path):
src = TEST_ARTIFACTS_DIR / "clip_6frames.mp4"
out = tmp_path / "trim_window.mp4"
cfg = VideoEncoderConfig(vcodec="h264")
reencode_video(src, out, camera_encoder=cfg, start_time_s=0.05, end_time_s=0.12, overwrite=True)
with av.open(str(out)) as container:
frames = list(container.decode(video=0))
# Only the frames at 0.067 and 0.1 s fall inside [0.05, 0.12).
assert len(frames) == 2
assert frames[0].time == pytest.approx(0.0, abs=1e-3)
class TestConcatenateVideoFiles:
def test_two_clips_frame_count(self, tmp_path):