diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 99122381a..84ab56e08 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -17,11 +17,13 @@ import contextlib import glob import importlib import logging +import os import queue import shutil import tempfile import threading import warnings +from collections import OrderedDict from dataclasses import asdict, dataclass, field from fractions import Fraction from pathlib import Path @@ -191,15 +193,70 @@ def decode_video_frames_pyav( return closest_frames -class VideoDecoderCache: - """Thread-safe cache for video decoders to avoid expensive re-initialization.""" +DEFAULT_DECODER_CACHE_SIZE = 100 +"""Default LRU capacity for :class:`VideoDecoderCache`. - def __init__(self): - self._cache: dict[str, tuple[Any, Any]] = {} +Sized to comfortably hold a small rolling window of episodes worth of decoders +(typical recipes: 2-4 cameras per episode × tens of episodes in flight) while +bounding host RAM. Each cached entry retains a torchcodec ``VideoDecoder`` plus +an open ``fsspec`` file handle — on the order of a few MB per entry. Override +via the ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var or by passing ``max_size`` +to the constructor (``None`` restores the legacy unbounded behaviour). +""" + + +def _default_max_cache_size() -> int | None: + raw = os.environ.get("LEROBOT_VIDEO_DECODER_CACHE_SIZE") + if raw is None: + return DEFAULT_DECODER_CACHE_SIZE + raw = raw.strip().lower() + if raw in ("", "none", "unbounded", "-1"): + return None + try: + value = int(raw) + except ValueError as e: + raise ValueError( + f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be an integer, 'none', or '-1'; got {raw!r}" + ) from e + if value <= 0: + raise ValueError(f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be positive; got {value}") + return value + + +class VideoDecoderCache: + """Thread-safe LRU cache for torchcodec ``VideoDecoder`` instances. + + Cached entries hold a ``VideoDecoder`` plus the open ``fsspec`` file handle + backing it. When the cache is full and a new path is requested, the + least-recently-used entry is evicted and its file handle is closed. This + bounds host-RAM growth when iterating over datasets with many distinct + video files (otherwise each ``DataLoader`` worker pins every decoder it has + ever opened until the process exits). + + Args: + max_size: Maximum number of decoders to retain. ``None`` disables + eviction and restores legacy unbounded behaviour. Defaults to the + value of ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` if set, otherwise + :data:`DEFAULT_DECODER_CACHE_SIZE`. + """ + + _SENTINEL: ClassVar[object] = object() + + def __init__(self, max_size: int | None | object = _SENTINEL): + if max_size is VideoDecoderCache._SENTINEL: + max_size = _default_max_cache_size() + if max_size is not None and max_size <= 0: + raise ValueError(f"max_size must be positive or None; got {max_size}") + self.max_size: int | None = max_size # type: ignore[assignment] + self._cache: OrderedDict[str, tuple[Any, Any]] = OrderedDict() self._lock = Lock() + def __contains__(self, video_path: object) -> bool: + with self._lock: + return str(video_path) in self._cache + def get_decoder(self, video_path: str): - """Get a cached decoder or create a new one.""" + """Get a cached decoder or create a new one, evicting LRU if at capacity.""" if importlib.util.find_spec("torchcodec"): from torchcodec.decoders import VideoDecoder else: @@ -211,22 +268,36 @@ class VideoDecoderCache: video_path = str(video_path) with self._lock: - if video_path not in self._cache: - file_handle = fsspec.open(video_path).__enter__() - try: - decoder = VideoDecoder(file_handle, seek_mode="approximate") - except Exception: - file_handle.close() - raise - self._cache[video_path] = (decoder, file_handle) + entry = self._cache.get(video_path) + if entry is not None: + self._cache.move_to_end(video_path) + return entry[0] - return self._cache[video_path][0] + file_handle = fsspec.open(video_path).__enter__() + try: + decoder = VideoDecoder(file_handle, seek_mode="approximate") + except Exception: + file_handle.close() + raise + self._cache[video_path] = (decoder, file_handle) + + # Evict LRU entries until we are back under the cap. We close + # evicted file handles immediately; the associated ``VideoDecoder`` + # is released to the GC when its last reference goes away. + if self.max_size is not None: + while len(self._cache) > self.max_size: + _evicted_path, (_evicted_decoder, evicted_handle) = self._cache.popitem(last=False) + with contextlib.suppress(Exception): + evicted_handle.close() + + return decoder def clear(self): - """Clear the cache and close file handles.""" + """Clear the cache and close all file handles.""" with self._lock: for _, file_handle in self._cache.values(): - file_handle.close() + with contextlib.suppress(Exception): + file_handle.close() self._cache.clear() def size(self) -> int: diff --git a/tests/datasets/test_video_decoder_cache.py b/tests/datasets/test_video_decoder_cache.py new file mode 100644 index 000000000..6e69f8403 --- /dev/null +++ b/tests/datasets/test_video_decoder_cache.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ``lerobot.datasets.video_utils.VideoDecoderCache``. + +These cover the LRU bounding + file-handle release behaviour added to prevent +unbounded growth when iterating over datasets with many distinct video files +(observed: ~35 GB anon-rss per DataLoader worker on an 8 k-file dataset). +""" + +import shutil +from pathlib import Path + +import pytest + +pytest.importorskip("torchcodec", reason="torchcodec is required (install lerobot[dataset])") + +from lerobot.datasets.video_utils import VideoDecoderCache # noqa: E402 + +TEST_ARTIFACTS_DIR = Path(__file__).resolve().parent.parent / "artifacts" / "encoded_videos" +SRC_CLIP = TEST_ARTIFACTS_DIR / "clip_4frames.mp4" + + +def _make_distinct_clips(tmp_path: Path, n: int) -> list[Path]: + """Copy the small reference mp4 to ``n`` distinct paths. + + The cache keys on absolute path, so distinct paths force distinct cache entries + even though the file contents are identical. + """ + assert SRC_CLIP.exists(), f"missing test artifact {SRC_CLIP}" + paths = [] + for i in range(n): + dst = tmp_path / f"clip_{i:04d}.mp4" + shutil.copyfile(SRC_CLIP, dst) + paths.append(dst) + return paths + + +class TestVideoDecoderCacheBounded: + def test_default_cache_is_bounded(self): + """The default cache must have a finite ``max_size`` to bound RSS growth.""" + cache = VideoDecoderCache() + assert cache.max_size is not None, "default cache must be bounded" + assert cache.max_size > 0 + + def test_size_capped_at_max_size(self, tmp_path): + """``get_decoder`` for >``max_size`` distinct paths must NOT grow without bound.""" + paths = _make_distinct_clips(tmp_path, n=5) + cache = VideoDecoderCache(max_size=2) + for p in paths: + cache.get_decoder(p) + assert cache.size() == 2 + + def test_evicts_least_recently_used(self, tmp_path): + """Re-accessing an entry must promote it; the LRU entry is the one evicted.""" + paths = _make_distinct_clips(tmp_path, n=3) + cache = VideoDecoderCache(max_size=2) + + cache.get_decoder(paths[0]) + cache.get_decoder(paths[1]) + cache.get_decoder(paths[0]) # promote paths[0] to MRU; paths[1] is now LRU + cache.get_decoder(paths[2]) # should evict paths[1] + + assert str(paths[0]) in cache # MRU stays + assert str(paths[1]) not in cache # LRU evicted + assert str(paths[2]) in cache # newest stays + + def test_eviction_closes_file_handle(self, tmp_path): + """Evicting an entry must close its fsspec file handle (otherwise we leak FDs).""" + paths = _make_distinct_clips(tmp_path, n=2) + cache = VideoDecoderCache(max_size=1) + + cache.get_decoder(paths[0]) + # Reach into the cache to capture the handle before it is evicted. This is + # the only assertion in the suite that touches a private attribute, and it + # is the most direct way to prove the file descriptor is actually released. + evicted_handle = cache._cache[str(paths[0])][1] + assert evicted_handle.closed is False + + cache.get_decoder(paths[1]) # forces eviction of paths[0] + + assert evicted_handle.closed is True + + def test_clear_closes_all_file_handles(self, tmp_path): + """``clear()`` must close every cached file handle.""" + paths = _make_distinct_clips(tmp_path, n=3) + cache = VideoDecoderCache(max_size=10) + + for p in paths: + cache.get_decoder(p) + handles = [entry[1] for entry in cache._cache.values()] + assert all(not h.closed for h in handles) + + cache.clear() + + assert cache.size() == 0 + assert all(h.closed for h in handles) + + def test_hit_does_not_reopen_or_evict(self, tmp_path): + """A cache hit must return the same decoder instance without touching the cap.""" + paths = _make_distinct_clips(tmp_path, n=1) + cache = VideoDecoderCache(max_size=2) + + first = cache.get_decoder(paths[0]) + second = cache.get_decoder(paths[0]) + + assert first is second + assert cache.size() == 1 + + def test_unbounded_when_max_size_none(self, tmp_path): + """``max_size=None`` preserves the legacy unbounded behaviour.""" + paths = _make_distinct_clips(tmp_path, n=4) + cache = VideoDecoderCache(max_size=None) + for p in paths: + cache.get_decoder(p) + assert cache.size() == 4 + + def test_env_var_overrides_default(self, tmp_path, monkeypatch): + """``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var sets the default ``max_size``.""" + monkeypatch.setenv("LEROBOT_VIDEO_DECODER_CACHE_SIZE", "3") + cache = VideoDecoderCache() + assert cache.max_size == 3 + + paths = _make_distinct_clips(tmp_path, n=5) + for p in paths: + cache.get_decoder(p) + assert cache.size() == 3