diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index bb11d0e14..106300a4b 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -189,11 +189,11 @@ class AnnotationPipelineConfig: skip_validation: bool = False only_episodes: tuple[int, ...] | None = None - # Video decode backend for keyframe extraction. When unset, decoding tries - # the platform default (torchcodec when installed) and falls back to - # ``pyav`` on failure. Set to ``"pyav"`` to skip torchcodec entirely — - # useful in containers where torchcodec cannot decode ("Operation not - # permitted"). + # Keyframe decode backend. When unset, decoding tries the platform default + # (torchcodec when installed) then falls back to the ffmpeg CLI, which + # decodes AV1 and isolates crashes to a child process. Set to one of + # ``"torchcodec"``, ``"ffmpeg"``, or ``"pyav"`` to pin a single backend — + # e.g. ``"ffmpeg"`` in containers where torchcodec cannot decode. video_backend: str | None = None # When True, upload the annotated dataset to the Hugging Face Hub: diff --git a/src/lerobot/annotations/steerable_pipeline/frames.py b/src/lerobot/annotations/steerable_pipeline/frames.py index e918e94cf..c51ec21f0 100644 --- a/src/lerobot/annotations/steerable_pipeline/frames.py +++ b/src/lerobot/annotations/steerable_pipeline/frames.py @@ -135,11 +135,11 @@ class VideoFrameProvider: camera_key: str | None = None tolerance_s: float = 1e-2 cache_size: int = 256 - # Video decode backend forwarded to ``decode_video_frames``. When ``None``, - # decoding tries the platform default (torchcodec when installed) and - # falls back to ``pyav`` if it raises — some containers ship a torchcodec - # that cannot push packets to the decoder ("Operation not permitted"). - # Set explicitly (e.g. ``"pyav"``) to skip that probe. + # Keyframe decode backend. When ``None``, decoding tries the platform + # default (torchcodec when installed) then falls back to the ffmpeg CLI. + # Set explicitly to one of ``"torchcodec"``, ``"ffmpeg"``, or ``"pyav"`` + # to pin a single backend — e.g. ``"ffmpeg"`` to skip a torchcodec that + # cannot decode the dataset's codec ("Operation not permitted"). video_backend: str | None = None _meta: Any = field(default=None, init=False, repr=False) _cache: dict = field(default_factory=dict, init=False, repr=False) @@ -303,19 +303,24 @@ class VideoFrameProvider: shifted = [from_timestamp + ts for ts in timestamps] video_path = self.root / self._meta.get_video_file_path(episode_index, camera_key) - # Build the decoder chain. torchcodec is fast but unusable in some - # containers (vllm-openai: "Operation not permitted"); lerobot's - # ``pyav`` backend routes through ``torchvision.io.VideoReader``, - # removed in torchvision 0.23+. ``_decode_frames_av`` talks to the - # ``av`` package directly and is the always-available fallback. + # Build the decoder chain. In-process decoders are fragile here: + # torchcodec raises in some containers (vllm-openai: "Operation not + # permitted"), lerobot's ``pyav`` backend routes through + # ``torchvision.io.VideoReader`` (removed in torchvision 0.23+), and + # PyAV can outright SIGSEGV on the AV1 streams modern LeRobot + # datasets use. ``_decode_frames_ffmpeg`` shells out to the ffmpeg + # CLI — it decodes AV1 and a crash stays isolated to the child + # process — so it is the always-available fallback. if self.video_backend: chain = [self.video_backend] else: - chain = (["torchcodec"] if get_safe_default_codec() == "torchcodec" else []) + ["pyav"] + chain = (["torchcodec"] if get_safe_default_codec() == "torchcodec" else []) + ["ffmpeg"] exc: Exception | None = None for backend in chain: try: + if backend == "ffmpeg": + return _decode_frames_ffmpeg(video_path, shifted) if backend in ("pyav", "av"): return _decode_frames_av(video_path, shifted) # Stacked ``(N, C, H, W)`` uint8 tensor; one row per timestamp. @@ -361,14 +366,52 @@ def make_frame_provider( return provider +def _decode_frames_ffmpeg(video_path: Path, timestamps: list[float]) -> list[Any]: + """Decode the frames nearest to ``timestamps`` via the ffmpeg CLI. + + Runs one ``ffmpeg`` process per timestamp, seeking with ``-ss`` and + piping a single PNG to stdout. Unlike the in-process decoders this + survives a hostile container: a full ffmpeg build decodes AV1 (the codec + modern LeRobot datasets use) where torchcodec raises and PyAV can + SIGSEGV, and a crash stays isolated to the child process — a non-zero + exit is a catchable error, not a segfault of the whole job. Returns one + ``(C, H, W)`` uint8 tensor per timestamp. + """ + import io # noqa: PLC0415 + import subprocess # noqa: PLC0415 + + import numpy as np # noqa: PLC0415 + + frames: list[Any] = [] + for ts in timestamps: + proc = subprocess.run( + [ + "ffmpeg", "-nostdin", "-loglevel", "error", + "-ss", f"{max(ts, 0.0):.3f}", + "-i", str(video_path), + "-frames:v", "1", + "-f", "image2pipe", "-vcodec", "png", "pipe:1", + ], + capture_output=True, + check=True, + timeout=120, + ) + if not proc.stdout: + raise RuntimeError(f"ffmpeg returned no frame for t={ts:.3f}s of {video_path}") + img = PIL.Image.open(io.BytesIO(proc.stdout)).convert("RGB") + frames.append(torch.from_numpy(np.asarray(img).copy()).permute(2, 0, 1).contiguous()) + return frames + + def _decode_frames_av(video_path: Path, timestamps: list[float]) -> list[Any]: """Decode the frames nearest to ``timestamps`` using PyAV directly. lerobot's ``decode_video_frames(backend="pyav")`` routes through ``torchvision.io.VideoReader``, removed in torchvision 0.23+. This helper - talks to the ``av`` package directly so keyframe extraction keeps working - on modern torch/torchvision stacks and in containers where torchcodec - cannot decode. Returns one ``(C, H, W)`` uint8 tensor per timestamp. + talks to the ``av`` package directly. Note PyAV can SIGSEGV on AV1 + streams in some builds — prefer ``_decode_frames_ffmpeg`` as the default + fallback; this stays available behind ``video_backend="pyav"``. Returns + one ``(C, H, W)`` uint8 tensor per timestamp. """ import av # noqa: PLC0415 diff --git a/tests/annotations/test_frames.py b/tests/annotations/test_frames.py index 61b941579..07b8b2c33 100644 --- a/tests/annotations/test_frames.py +++ b/tests/annotations/test_frames.py @@ -41,6 +41,7 @@ pytest.importorskip("datasets", reason="datasets is required (install lerobot[da from lerobot.annotations.steerable_pipeline.frames import ( # noqa: E402 VideoFrameProvider, _decode_frames_av, + _decode_frames_ffmpeg, ) @@ -120,3 +121,26 @@ def test_decode_frames_av_raises_on_missing_file(tmp_path: Path) -> None: """A missing video surfaces as an exception the caller can fall back on.""" with pytest.raises(Exception): # noqa: B017, PT011 _decode_frames_av(tmp_path / "does_not_exist.mp4", [0.0]) + + +def test_decode_frames_ffmpeg_returns_one_uint8_frame_per_timestamp(sample_video: Path) -> None: + """``_decode_frames_ffmpeg`` shells out to the ffmpeg CLI — the always- + available fallback that decodes AV1 and isolates crashes to a child + process. + """ + timestamps = [0.0, 1.0, 2.5] + frames = _decode_frames_ffmpeg(sample_video, timestamps) + + assert len(frames) == len(timestamps) + for frame in frames: + assert isinstance(frame, torch.Tensor) + assert frame.dtype == torch.uint8 + assert frame.shape == (3, 120, 160) + + +def test_decode_frames_ffmpeg_raises_on_missing_file(tmp_path: Path) -> None: + """A missing video raises (non-zero ffmpeg exit), never crashes the job.""" + if shutil.which("ffmpeg") is None: + pytest.skip("ffmpeg not available") + with pytest.raises(Exception): # noqa: B017, PT011 + _decode_frames_ffmpeg(tmp_path / "does_not_exist.mp4", [0.0])