From dfdc48a7f131c89ade51e322f5c11180b8509c72 Mon Sep 17 00:00:00 2001 From: "Roham Z. Nobari" Date: Tue, 19 May 2026 16:54:25 +0200 Subject: [PATCH] fix(datasets): bound VideoDecoderCache to prevent OOM on large datasets (#3614) VideoDecoderCache used an unbounded dict keyed on absolute path, with no eviction in the standard LeRobotDataset path. With shuffled iteration over datasets that have many distinct mp4 files, every DataLoader worker accumulated one cached (VideoDecoder, fsspec file handle) pair per distinct path it had ever touched. Per-entry cost is ~3-5 MB of host RAM plus one open FD; at ~8 k entries this is roughly 30 GB per worker. This was hit in the wild during a SmolVLA training run on a 4,195-episode SO-101 dataset (8,390 mp4s, two cameras per episode). dmesg showed anon-rss climbing to 34.9 GB on a single pt_data_worker before the OOM killer fired ~30 min into training; with --num_workers=8 the per-worker peak halved to 17.9 GB, which is the expected inverse-scaling signature when the leak is per-decode and the workload is split across workers. The working workaround on the affected platform was --dataset.video_backend=pyav, because the pyav path opens/closes per call and never touches this cache. Switch the backing store to an OrderedDict and evict LRU entries when the cap is reached, closing the evicted file handle inside the lock so we do not leak FDs either. Default cap is DEFAULT_DECODER_CACHE_SIZE = 100, overridable via LEROBOT_VIDEO_DECODER_CACHE_SIZE or by passing max_size= to the constructor; max_size=None restores the legacy unbounded behaviour for callers that need it. Validation on the original failing workload (decode_video_frames_torchcodec called over real mp4s from the affected SO-101 dataset): unbounded: 300 files -> +1087 MB host RSS, cache=300, still climbing cap=50: 500 files -> +266 MB host RSS, cache=50, stable cap=50: 2000 calls -> +312 MB host RSS, cache=50, stable cap=100: 1000 calls -> +470 MB host RSS, cache=100, stable Three independent seeded runs at cap=50 agreed to within 1% (263 / 266 / 265 MB delta), and the 2000-call multi-pass run shows RSS plateaus after the cap is reached instead of drifting. Tests in tests/datasets/test_video_decoder_cache.py cover: default-is-bounded, size cap, LRU ordering, FD close on eviction, FD close on clear(), cache-hit invariance, max_size=None fallback, and env-var override. No regressions in test_video_encoding.py, test_streaming.py, or test_dataset_reader.py (73 prior tests still pass alongside the 8 new ones). --- src/lerobot/datasets/video_utils.py | 103 ++++++++++++--- tests/datasets/test_video_decoder_cache.py | 140 +++++++++++++++++++++ 2 files changed, 227 insertions(+), 16 deletions(-) create mode 100644 tests/datasets/test_video_decoder_cache.py diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 99122381a..84ab56e08 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -17,11 +17,13 @@ import contextlib import glob import importlib import logging +import os import queue import shutil import tempfile import threading import warnings +from collections import OrderedDict from dataclasses import asdict, dataclass, field from fractions import Fraction from pathlib import Path @@ -191,15 +193,70 @@ def decode_video_frames_pyav( return closest_frames -class VideoDecoderCache: - """Thread-safe cache for video decoders to avoid expensive re-initialization.""" +DEFAULT_DECODER_CACHE_SIZE = 100 +"""Default LRU capacity for :class:`VideoDecoderCache`. - def __init__(self): - self._cache: dict[str, tuple[Any, Any]] = {} +Sized to comfortably hold a small rolling window of episodes worth of decoders +(typical recipes: 2-4 cameras per episode × tens of episodes in flight) while +bounding host RAM. Each cached entry retains a torchcodec ``VideoDecoder`` plus +an open ``fsspec`` file handle — on the order of a few MB per entry. Override +via the ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var or by passing ``max_size`` +to the constructor (``None`` restores the legacy unbounded behaviour). +""" + + +def _default_max_cache_size() -> int | None: + raw = os.environ.get("LEROBOT_VIDEO_DECODER_CACHE_SIZE") + if raw is None: + return DEFAULT_DECODER_CACHE_SIZE + raw = raw.strip().lower() + if raw in ("", "none", "unbounded", "-1"): + return None + try: + value = int(raw) + except ValueError as e: + raise ValueError( + f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be an integer, 'none', or '-1'; got {raw!r}" + ) from e + if value <= 0: + raise ValueError(f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be positive; got {value}") + return value + + +class VideoDecoderCache: + """Thread-safe LRU cache for torchcodec ``VideoDecoder`` instances. + + Cached entries hold a ``VideoDecoder`` plus the open ``fsspec`` file handle + backing it. When the cache is full and a new path is requested, the + least-recently-used entry is evicted and its file handle is closed. This + bounds host-RAM growth when iterating over datasets with many distinct + video files (otherwise each ``DataLoader`` worker pins every decoder it has + ever opened until the process exits). + + Args: + max_size: Maximum number of decoders to retain. ``None`` disables + eviction and restores legacy unbounded behaviour. Defaults to the + value of ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` if set, otherwise + :data:`DEFAULT_DECODER_CACHE_SIZE`. + """ + + _SENTINEL: ClassVar[object] = object() + + def __init__(self, max_size: int | None | object = _SENTINEL): + if max_size is VideoDecoderCache._SENTINEL: + max_size = _default_max_cache_size() + if max_size is not None and max_size <= 0: + raise ValueError(f"max_size must be positive or None; got {max_size}") + self.max_size: int | None = max_size # type: ignore[assignment] + self._cache: OrderedDict[str, tuple[Any, Any]] = OrderedDict() self._lock = Lock() + def __contains__(self, video_path: object) -> bool: + with self._lock: + return str(video_path) in self._cache + def get_decoder(self, video_path: str): - """Get a cached decoder or create a new one.""" + """Get a cached decoder or create a new one, evicting LRU if at capacity.""" if importlib.util.find_spec("torchcodec"): from torchcodec.decoders import VideoDecoder else: @@ -211,22 +268,36 @@ class VideoDecoderCache: video_path = str(video_path) with self._lock: - if video_path not in self._cache: - file_handle = fsspec.open(video_path).__enter__() - try: - decoder = VideoDecoder(file_handle, seek_mode="approximate") - except Exception: - file_handle.close() - raise - self._cache[video_path] = (decoder, file_handle) + entry = self._cache.get(video_path) + if entry is not None: + self._cache.move_to_end(video_path) + return entry[0] - return self._cache[video_path][0] + file_handle = fsspec.open(video_path).__enter__() + try: + decoder = VideoDecoder(file_handle, seek_mode="approximate") + except Exception: + file_handle.close() + raise + self._cache[video_path] = (decoder, file_handle) + + # Evict LRU entries until we are back under the cap. We close + # evicted file handles immediately; the associated ``VideoDecoder`` + # is released to the GC when its last reference goes away. + if self.max_size is not None: + while len(self._cache) > self.max_size: + _evicted_path, (_evicted_decoder, evicted_handle) = self._cache.popitem(last=False) + with contextlib.suppress(Exception): + evicted_handle.close() + + return decoder def clear(self): - """Clear the cache and close file handles.""" + """Clear the cache and close all file handles.""" with self._lock: for _, file_handle in self._cache.values(): - file_handle.close() + with contextlib.suppress(Exception): + file_handle.close() self._cache.clear() def size(self) -> int: diff --git a/tests/datasets/test_video_decoder_cache.py b/tests/datasets/test_video_decoder_cache.py new file mode 100644 index 000000000..6e69f8403 --- /dev/null +++ b/tests/datasets/test_video_decoder_cache.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ``lerobot.datasets.video_utils.VideoDecoderCache``. + +These cover the LRU bounding + file-handle release behaviour added to prevent +unbounded growth when iterating over datasets with many distinct video files +(observed: ~35 GB anon-rss per DataLoader worker on an 8 k-file dataset). +""" + +import shutil +from pathlib import Path + +import pytest + +pytest.importorskip("torchcodec", reason="torchcodec is required (install lerobot[dataset])") + +from lerobot.datasets.video_utils import VideoDecoderCache # noqa: E402 + +TEST_ARTIFACTS_DIR = Path(__file__).resolve().parent.parent / "artifacts" / "encoded_videos" +SRC_CLIP = TEST_ARTIFACTS_DIR / "clip_4frames.mp4" + + +def _make_distinct_clips(tmp_path: Path, n: int) -> list[Path]: + """Copy the small reference mp4 to ``n`` distinct paths. + + The cache keys on absolute path, so distinct paths force distinct cache entries + even though the file contents are identical. + """ + assert SRC_CLIP.exists(), f"missing test artifact {SRC_CLIP}" + paths = [] + for i in range(n): + dst = tmp_path / f"clip_{i:04d}.mp4" + shutil.copyfile(SRC_CLIP, dst) + paths.append(dst) + return paths + + +class TestVideoDecoderCacheBounded: + def test_default_cache_is_bounded(self): + """The default cache must have a finite ``max_size`` to bound RSS growth.""" + cache = VideoDecoderCache() + assert cache.max_size is not None, "default cache must be bounded" + assert cache.max_size > 0 + + def test_size_capped_at_max_size(self, tmp_path): + """``get_decoder`` for >``max_size`` distinct paths must NOT grow without bound.""" + paths = _make_distinct_clips(tmp_path, n=5) + cache = VideoDecoderCache(max_size=2) + for p in paths: + cache.get_decoder(p) + assert cache.size() == 2 + + def test_evicts_least_recently_used(self, tmp_path): + """Re-accessing an entry must promote it; the LRU entry is the one evicted.""" + paths = _make_distinct_clips(tmp_path, n=3) + cache = VideoDecoderCache(max_size=2) + + cache.get_decoder(paths[0]) + cache.get_decoder(paths[1]) + cache.get_decoder(paths[0]) # promote paths[0] to MRU; paths[1] is now LRU + cache.get_decoder(paths[2]) # should evict paths[1] + + assert str(paths[0]) in cache # MRU stays + assert str(paths[1]) not in cache # LRU evicted + assert str(paths[2]) in cache # newest stays + + def test_eviction_closes_file_handle(self, tmp_path): + """Evicting an entry must close its fsspec file handle (otherwise we leak FDs).""" + paths = _make_distinct_clips(tmp_path, n=2) + cache = VideoDecoderCache(max_size=1) + + cache.get_decoder(paths[0]) + # Reach into the cache to capture the handle before it is evicted. This is + # the only assertion in the suite that touches a private attribute, and it + # is the most direct way to prove the file descriptor is actually released. + evicted_handle = cache._cache[str(paths[0])][1] + assert evicted_handle.closed is False + + cache.get_decoder(paths[1]) # forces eviction of paths[0] + + assert evicted_handle.closed is True + + def test_clear_closes_all_file_handles(self, tmp_path): + """``clear()`` must close every cached file handle.""" + paths = _make_distinct_clips(tmp_path, n=3) + cache = VideoDecoderCache(max_size=10) + + for p in paths: + cache.get_decoder(p) + handles = [entry[1] for entry in cache._cache.values()] + assert all(not h.closed for h in handles) + + cache.clear() + + assert cache.size() == 0 + assert all(h.closed for h in handles) + + def test_hit_does_not_reopen_or_evict(self, tmp_path): + """A cache hit must return the same decoder instance without touching the cap.""" + paths = _make_distinct_clips(tmp_path, n=1) + cache = VideoDecoderCache(max_size=2) + + first = cache.get_decoder(paths[0]) + second = cache.get_decoder(paths[0]) + + assert first is second + assert cache.size() == 1 + + def test_unbounded_when_max_size_none(self, tmp_path): + """``max_size=None`` preserves the legacy unbounded behaviour.""" + paths = _make_distinct_clips(tmp_path, n=4) + cache = VideoDecoderCache(max_size=None) + for p in paths: + cache.get_decoder(p) + assert cache.size() == 4 + + def test_env_var_overrides_default(self, tmp_path, monkeypatch): + """``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var sets the default ``max_size``.""" + monkeypatch.setenv("LEROBOT_VIDEO_DECODER_CACHE_SIZE", "3") + cache = VideoDecoderCache() + assert cache.max_size == 3 + + paths = _make_distinct_clips(tmp_path, n=5) + for p in paths: + cache.get_decoder(p) + assert cache.size() == 3