mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-21 11:39:50 +00:00
fix(datasets): bound VideoDecoderCache to prevent OOM on large datasets (#3614)
VideoDecoderCache used an unbounded dict keyed on absolute path, with no eviction in the standard LeRobotDataset path. With shuffled iteration over datasets that have many distinct mp4 files, every DataLoader worker accumulated one cached (VideoDecoder, fsspec file handle) pair per distinct path it had ever touched. Per-entry cost is ~3-5 MB of host RAM plus one open FD; at ~8 k entries this is roughly 30 GB per worker. This was hit in the wild during a SmolVLA training run on a 4,195-episode SO-101 dataset (8,390 mp4s, two cameras per episode). dmesg showed anon-rss climbing to 34.9 GB on a single pt_data_worker before the OOM killer fired ~30 min into training; with --num_workers=8 the per-worker peak halved to 17.9 GB, which is the expected inverse-scaling signature when the leak is per-decode and the workload is split across workers. The working workaround on the affected platform was --dataset.video_backend=pyav, because the pyav path opens/closes per call and never touches this cache. Switch the backing store to an OrderedDict and evict LRU entries when the cap is reached, closing the evicted file handle inside the lock so we do not leak FDs either. Default cap is DEFAULT_DECODER_CACHE_SIZE = 100, overridable via LEROBOT_VIDEO_DECODER_CACHE_SIZE or by passing max_size= to the constructor; max_size=None restores the legacy unbounded behaviour for callers that need it. Validation on the original failing workload (decode_video_frames_torchcodec called over real mp4s from the affected SO-101 dataset): unbounded: 300 files -> +1087 MB host RSS, cache=300, still climbing cap=50: 500 files -> +266 MB host RSS, cache=50, stable cap=50: 2000 calls -> +312 MB host RSS, cache=50, stable cap=100: 1000 calls -> +470 MB host RSS, cache=100, stable Three independent seeded runs at cap=50 agreed to within 1% (263 / 266 / 265 MB delta), and the 2000-call multi-pass run shows RSS plateaus after the cap is reached instead of drifting. Tests in tests/datasets/test_video_decoder_cache.py cover: default-is-bounded, size cap, LRU ordering, FD close on eviction, FD close on clear(), cache-hit invariance, max_size=None fallback, and env-var override. No regressions in test_video_encoding.py, test_streaming.py, or test_dataset_reader.py (73 prior tests still pass alongside the 8 new ones).
This commit is contained in:
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Unit tests for ``lerobot.datasets.video_utils.VideoDecoderCache``.
|
||||
|
||||
These cover the LRU bounding + file-handle release behaviour added to prevent
|
||||
unbounded growth when iterating over datasets with many distinct video files
|
||||
(observed: ~35 GB anon-rss per DataLoader worker on an 8 k-file dataset).
|
||||
"""
|
||||
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
pytest.importorskip("torchcodec", reason="torchcodec is required (install lerobot[dataset])")
|
||||
|
||||
from lerobot.datasets.video_utils import VideoDecoderCache # noqa: E402
|
||||
|
||||
TEST_ARTIFACTS_DIR = Path(__file__).resolve().parent.parent / "artifacts" / "encoded_videos"
|
||||
SRC_CLIP = TEST_ARTIFACTS_DIR / "clip_4frames.mp4"
|
||||
|
||||
|
||||
def _make_distinct_clips(tmp_path: Path, n: int) -> list[Path]:
|
||||
"""Copy the small reference mp4 to ``n`` distinct paths.
|
||||
|
||||
The cache keys on absolute path, so distinct paths force distinct cache entries
|
||||
even though the file contents are identical.
|
||||
"""
|
||||
assert SRC_CLIP.exists(), f"missing test artifact {SRC_CLIP}"
|
||||
paths = []
|
||||
for i in range(n):
|
||||
dst = tmp_path / f"clip_{i:04d}.mp4"
|
||||
shutil.copyfile(SRC_CLIP, dst)
|
||||
paths.append(dst)
|
||||
return paths
|
||||
|
||||
|
||||
class TestVideoDecoderCacheBounded:
|
||||
def test_default_cache_is_bounded(self):
|
||||
"""The default cache must have a finite ``max_size`` to bound RSS growth."""
|
||||
cache = VideoDecoderCache()
|
||||
assert cache.max_size is not None, "default cache must be bounded"
|
||||
assert cache.max_size > 0
|
||||
|
||||
def test_size_capped_at_max_size(self, tmp_path):
|
||||
"""``get_decoder`` for >``max_size`` distinct paths must NOT grow without bound."""
|
||||
paths = _make_distinct_clips(tmp_path, n=5)
|
||||
cache = VideoDecoderCache(max_size=2)
|
||||
for p in paths:
|
||||
cache.get_decoder(p)
|
||||
assert cache.size() == 2
|
||||
|
||||
def test_evicts_least_recently_used(self, tmp_path):
|
||||
"""Re-accessing an entry must promote it; the LRU entry is the one evicted."""
|
||||
paths = _make_distinct_clips(tmp_path, n=3)
|
||||
cache = VideoDecoderCache(max_size=2)
|
||||
|
||||
cache.get_decoder(paths[0])
|
||||
cache.get_decoder(paths[1])
|
||||
cache.get_decoder(paths[0]) # promote paths[0] to MRU; paths[1] is now LRU
|
||||
cache.get_decoder(paths[2]) # should evict paths[1]
|
||||
|
||||
assert str(paths[0]) in cache # MRU stays
|
||||
assert str(paths[1]) not in cache # LRU evicted
|
||||
assert str(paths[2]) in cache # newest stays
|
||||
|
||||
def test_eviction_closes_file_handle(self, tmp_path):
|
||||
"""Evicting an entry must close its fsspec file handle (otherwise we leak FDs)."""
|
||||
paths = _make_distinct_clips(tmp_path, n=2)
|
||||
cache = VideoDecoderCache(max_size=1)
|
||||
|
||||
cache.get_decoder(paths[0])
|
||||
# Reach into the cache to capture the handle before it is evicted. This is
|
||||
# the only assertion in the suite that touches a private attribute, and it
|
||||
# is the most direct way to prove the file descriptor is actually released.
|
||||
evicted_handle = cache._cache[str(paths[0])][1]
|
||||
assert evicted_handle.closed is False
|
||||
|
||||
cache.get_decoder(paths[1]) # forces eviction of paths[0]
|
||||
|
||||
assert evicted_handle.closed is True
|
||||
|
||||
def test_clear_closes_all_file_handles(self, tmp_path):
|
||||
"""``clear()`` must close every cached file handle."""
|
||||
paths = _make_distinct_clips(tmp_path, n=3)
|
||||
cache = VideoDecoderCache(max_size=10)
|
||||
|
||||
for p in paths:
|
||||
cache.get_decoder(p)
|
||||
handles = [entry[1] for entry in cache._cache.values()]
|
||||
assert all(not h.closed for h in handles)
|
||||
|
||||
cache.clear()
|
||||
|
||||
assert cache.size() == 0
|
||||
assert all(h.closed for h in handles)
|
||||
|
||||
def test_hit_does_not_reopen_or_evict(self, tmp_path):
|
||||
"""A cache hit must return the same decoder instance without touching the cap."""
|
||||
paths = _make_distinct_clips(tmp_path, n=1)
|
||||
cache = VideoDecoderCache(max_size=2)
|
||||
|
||||
first = cache.get_decoder(paths[0])
|
||||
second = cache.get_decoder(paths[0])
|
||||
|
||||
assert first is second
|
||||
assert cache.size() == 1
|
||||
|
||||
def test_unbounded_when_max_size_none(self, tmp_path):
|
||||
"""``max_size=None`` preserves the legacy unbounded behaviour."""
|
||||
paths = _make_distinct_clips(tmp_path, n=4)
|
||||
cache = VideoDecoderCache(max_size=None)
|
||||
for p in paths:
|
||||
cache.get_decoder(p)
|
||||
assert cache.size() == 4
|
||||
|
||||
def test_env_var_overrides_default(self, tmp_path, monkeypatch):
|
||||
"""``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var sets the default ``max_size``."""
|
||||
monkeypatch.setenv("LEROBOT_VIDEO_DECODER_CACHE_SIZE", "3")
|
||||
cache = VideoDecoderCache()
|
||||
assert cache.max_size == 3
|
||||
|
||||
paths = _make_distinct_clips(tmp_path, n=5)
|
||||
for p in paths:
|
||||
cache.get_decoder(p)
|
||||
assert cache.size() == 3
|
||||
Reference in New Issue
Block a user