Compare commits

...

2 Commits

Author SHA1 Message Date
Roham Z. Nobari dfdc48a7f1 fix(datasets): bound VideoDecoderCache to prevent OOM on large datasets (#3614)
VideoDecoderCache used an unbounded dict keyed on absolute path, with no
eviction in the standard LeRobotDataset path. With shuffled iteration over
datasets that have many distinct mp4 files, every DataLoader worker
accumulated one cached (VideoDecoder, fsspec file handle) pair per distinct
path it had ever touched. Per-entry cost is ~3-5 MB of host RAM plus one
open FD; at ~8 k entries this is roughly 30 GB per worker.

This was hit in the wild during a SmolVLA training run on a 4,195-episode
SO-101 dataset (8,390 mp4s, two cameras per episode). dmesg showed
anon-rss climbing to 34.9 GB on a single pt_data_worker before the OOM
killer fired ~30 min into training; with --num_workers=8 the per-worker
peak halved to 17.9 GB, which is the expected inverse-scaling signature
when the leak is per-decode and the workload is split across workers. The
working workaround on the affected platform was --dataset.video_backend=pyav,
because the pyav path opens/closes per call and never touches this cache.

Switch the backing store to an OrderedDict and evict LRU entries when the
cap is reached, closing the evicted file handle inside the lock so we do
not leak FDs either. Default cap is DEFAULT_DECODER_CACHE_SIZE = 100,
overridable via LEROBOT_VIDEO_DECODER_CACHE_SIZE or by passing max_size=
to the constructor; max_size=None restores the legacy unbounded behaviour
for callers that need it.

Validation on the original failing workload (decode_video_frames_torchcodec
called over real mp4s from the affected SO-101 dataset):

  unbounded:    300 files  ->  +1087 MB host RSS,  cache=300, still climbing
  cap=50:       500 files  ->   +266 MB host RSS,  cache=50,  stable
  cap=50:      2000 calls  ->   +312 MB host RSS,  cache=50,  stable
  cap=100:     1000 calls  ->   +470 MB host RSS,  cache=100, stable

Three independent seeded runs at cap=50 agreed to within 1% (263 / 266 /
265 MB delta), and the 2000-call multi-pass run shows RSS plateaus after
the cap is reached instead of drifting.

Tests in tests/datasets/test_video_decoder_cache.py cover:
default-is-bounded, size cap, LRU ordering, FD close on eviction, FD close
on clear(), cache-hit invariance, max_size=None fallback, and env-var
override. No regressions in test_video_encoding.py, test_streaming.py, or
test_dataset_reader.py (73 prior tests still pass alongside the 8 new ones).
2026-05-19 16:54:25 +02:00
四七 6a8878a639 fix(datasets): normalize shape=(1,) numeric values before HF encoding (#3344)
* fix(datasets): normalize shape=(1,) numeric values before save

* test(datasets): cover shape=(1,) int/bool and finalize

Co-authored-by: Copilot <copilot@github.com>
2026-05-19 16:53:19 +02:00
4 changed files with 271 additions and 17 deletions
+8 -1
View File
@@ -250,7 +250,14 @@ class DatasetWriter:
for key, ft in self._meta.features.items():
if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["image", "video"]:
continue
episode_buffer[key] = np.stack(episode_buffer[key])
stacked_values = np.stack(episode_buffer[key])
# `shape=(1,)` numeric features are serialized as `datasets.Value`, which expects scalars.
# Normalizing to `(N,)` keeps save semantics stable across dependency versions.
if tuple(ft["shape"]) == (1,) and ft["dtype"] != "string":
stacked_values = stacked_values.reshape(episode_length)
episode_buffer[key] = stacked_values
# Wait for image writer to end, so that episode stats over images can be computed
self._wait_image_writer()
+87 -16
View File
@@ -17,11 +17,13 @@ import contextlib
import glob
import importlib
import logging
import os
import queue
import shutil
import tempfile
import threading
import warnings
from collections import OrderedDict
from dataclasses import asdict, dataclass, field
from fractions import Fraction
from pathlib import Path
@@ -191,15 +193,70 @@ def decode_video_frames_pyav(
return closest_frames
class VideoDecoderCache:
"""Thread-safe cache for video decoders to avoid expensive re-initialization."""
DEFAULT_DECODER_CACHE_SIZE = 100
"""Default LRU capacity for :class:`VideoDecoderCache`.
def __init__(self):
self._cache: dict[str, tuple[Any, Any]] = {}
Sized to comfortably hold a small rolling window of episodes worth of decoders
(typical recipes: 2-4 cameras per episode × tens of episodes in flight) while
bounding host RAM. Each cached entry retains a torchcodec ``VideoDecoder`` plus
an open ``fsspec`` file handle on the order of a few MB per entry. Override
via the ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var or by passing ``max_size``
to the constructor (``None`` restores the legacy unbounded behaviour).
"""
def _default_max_cache_size() -> int | None:
raw = os.environ.get("LEROBOT_VIDEO_DECODER_CACHE_SIZE")
if raw is None:
return DEFAULT_DECODER_CACHE_SIZE
raw = raw.strip().lower()
if raw in ("", "none", "unbounded", "-1"):
return None
try:
value = int(raw)
except ValueError as e:
raise ValueError(
f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be an integer, 'none', or '-1'; got {raw!r}"
) from e
if value <= 0:
raise ValueError(f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be positive; got {value}")
return value
class VideoDecoderCache:
"""Thread-safe LRU cache for torchcodec ``VideoDecoder`` instances.
Cached entries hold a ``VideoDecoder`` plus the open ``fsspec`` file handle
backing it. When the cache is full and a new path is requested, the
least-recently-used entry is evicted and its file handle is closed. This
bounds host-RAM growth when iterating over datasets with many distinct
video files (otherwise each ``DataLoader`` worker pins every decoder it has
ever opened until the process exits).
Args:
max_size: Maximum number of decoders to retain. ``None`` disables
eviction and restores legacy unbounded behaviour. Defaults to the
value of ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` if set, otherwise
:data:`DEFAULT_DECODER_CACHE_SIZE`.
"""
_SENTINEL: ClassVar[object] = object()
def __init__(self, max_size: int | None | object = _SENTINEL):
if max_size is VideoDecoderCache._SENTINEL:
max_size = _default_max_cache_size()
if max_size is not None and max_size <= 0:
raise ValueError(f"max_size must be positive or None; got {max_size}")
self.max_size: int | None = max_size # type: ignore[assignment]
self._cache: OrderedDict[str, tuple[Any, Any]] = OrderedDict()
self._lock = Lock()
def __contains__(self, video_path: object) -> bool:
with self._lock:
return str(video_path) in self._cache
def get_decoder(self, video_path: str):
"""Get a cached decoder or create a new one."""
"""Get a cached decoder or create a new one, evicting LRU if at capacity."""
if importlib.util.find_spec("torchcodec"):
from torchcodec.decoders import VideoDecoder
else:
@@ -211,22 +268,36 @@ class VideoDecoderCache:
video_path = str(video_path)
with self._lock:
if video_path not in self._cache:
file_handle = fsspec.open(video_path).__enter__()
try:
decoder = VideoDecoder(file_handle, seek_mode="approximate")
except Exception:
file_handle.close()
raise
self._cache[video_path] = (decoder, file_handle)
entry = self._cache.get(video_path)
if entry is not None:
self._cache.move_to_end(video_path)
return entry[0]
return self._cache[video_path][0]
file_handle = fsspec.open(video_path).__enter__()
try:
decoder = VideoDecoder(file_handle, seek_mode="approximate")
except Exception:
file_handle.close()
raise
self._cache[video_path] = (decoder, file_handle)
# Evict LRU entries until we are back under the cap. We close
# evicted file handles immediately; the associated ``VideoDecoder``
# is released to the GC when its last reference goes away.
if self.max_size is not None:
while len(self._cache) > self.max_size:
_evicted_path, (_evicted_decoder, evicted_handle) = self._cache.popitem(last=False)
with contextlib.suppress(Exception):
evicted_handle.close()
return decoder
def clear(self):
"""Clear the cache and close file handles."""
"""Clear the cache and close all file handles."""
with self._lock:
for _, file_handle in self._cache.values():
file_handle.close()
with contextlib.suppress(Exception):
file_handle.close()
self._cache.clear()
def size(self) -> int:
+36
View File
@@ -24,6 +24,7 @@ import torch
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
import datasets
from huggingface_hub import HfApi
from PIL import Image
from safetensors.torch import load_file
@@ -360,6 +361,41 @@ def test_add_frame_image_pil(image_dataset):
assert dataset[0]["image"].shape == torch.Size(DUMMY_CHW)
@pytest.mark.parametrize(
"dtype,np_dtype,values,assert_fn",
[
("float32", np.float32, [1.0, 2.0], np.testing.assert_allclose),
("int64", np.int64, [1, 2], np.testing.assert_array_equal),
("bool", np.bool_, [True, False], np.testing.assert_array_equal),
],
ids=["float32", "int64", "bool"],
)
def test_save_episode_shape_1_scalar_is_scalarized_before_hf_encoding(
tmp_path, empty_lerobot_dataset_factory, monkeypatch, dtype, np_dtype, values, assert_fn
):
features = {"state": {"dtype": dtype, "shape": (1,), "names": None}}
dataset = empty_lerobot_dataset_factory(root=tmp_path / "test", features=features)
dataset.add_frame({"state": np.array([values[0]], dtype=np_dtype), "task": "Dummy task"})
dataset.add_frame({"state": np.array([values[1]], dtype=np_dtype), "task": "Dummy task"})
captured = {}
original_from_dict = datasets.Dataset.from_dict
def _from_dict_spy(cls, mapping, *args, **kwargs):
captured["state"] = mapping["state"]
return original_from_dict(mapping, *args, **kwargs)
monkeypatch.setattr(datasets.Dataset, "from_dict", classmethod(_from_dict_spy))
dataset.save_episode()
dataset.finalize()
assert "state" in captured
assert isinstance(captured["state"], np.ndarray)
assert captured["state"].shape == (2,)
assert_fn(captured["state"], np.array(values, dtype=np_dtype))
def test_set_image_transforms_applies_transparently(image_dataset):
dataset = image_dataset
dataset.add_frame({"image": np.random.rand(*DUMMY_CHW), "task": "Dummy task"})
+140
View File
@@ -0,0 +1,140 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for ``lerobot.datasets.video_utils.VideoDecoderCache``.
These cover the LRU bounding + file-handle release behaviour added to prevent
unbounded growth when iterating over datasets with many distinct video files
(observed: ~35 GB anon-rss per DataLoader worker on an 8 k-file dataset).
"""
import shutil
from pathlib import Path
import pytest
pytest.importorskip("torchcodec", reason="torchcodec is required (install lerobot[dataset])")
from lerobot.datasets.video_utils import VideoDecoderCache # noqa: E402
TEST_ARTIFACTS_DIR = Path(__file__).resolve().parent.parent / "artifacts" / "encoded_videos"
SRC_CLIP = TEST_ARTIFACTS_DIR / "clip_4frames.mp4"
def _make_distinct_clips(tmp_path: Path, n: int) -> list[Path]:
"""Copy the small reference mp4 to ``n`` distinct paths.
The cache keys on absolute path, so distinct paths force distinct cache entries
even though the file contents are identical.
"""
assert SRC_CLIP.exists(), f"missing test artifact {SRC_CLIP}"
paths = []
for i in range(n):
dst = tmp_path / f"clip_{i:04d}.mp4"
shutil.copyfile(SRC_CLIP, dst)
paths.append(dst)
return paths
class TestVideoDecoderCacheBounded:
def test_default_cache_is_bounded(self):
"""The default cache must have a finite ``max_size`` to bound RSS growth."""
cache = VideoDecoderCache()
assert cache.max_size is not None, "default cache must be bounded"
assert cache.max_size > 0
def test_size_capped_at_max_size(self, tmp_path):
"""``get_decoder`` for >``max_size`` distinct paths must NOT grow without bound."""
paths = _make_distinct_clips(tmp_path, n=5)
cache = VideoDecoderCache(max_size=2)
for p in paths:
cache.get_decoder(p)
assert cache.size() == 2
def test_evicts_least_recently_used(self, tmp_path):
"""Re-accessing an entry must promote it; the LRU entry is the one evicted."""
paths = _make_distinct_clips(tmp_path, n=3)
cache = VideoDecoderCache(max_size=2)
cache.get_decoder(paths[0])
cache.get_decoder(paths[1])
cache.get_decoder(paths[0]) # promote paths[0] to MRU; paths[1] is now LRU
cache.get_decoder(paths[2]) # should evict paths[1]
assert str(paths[0]) in cache # MRU stays
assert str(paths[1]) not in cache # LRU evicted
assert str(paths[2]) in cache # newest stays
def test_eviction_closes_file_handle(self, tmp_path):
"""Evicting an entry must close its fsspec file handle (otherwise we leak FDs)."""
paths = _make_distinct_clips(tmp_path, n=2)
cache = VideoDecoderCache(max_size=1)
cache.get_decoder(paths[0])
# Reach into the cache to capture the handle before it is evicted. This is
# the only assertion in the suite that touches a private attribute, and it
# is the most direct way to prove the file descriptor is actually released.
evicted_handle = cache._cache[str(paths[0])][1]
assert evicted_handle.closed is False
cache.get_decoder(paths[1]) # forces eviction of paths[0]
assert evicted_handle.closed is True
def test_clear_closes_all_file_handles(self, tmp_path):
"""``clear()`` must close every cached file handle."""
paths = _make_distinct_clips(tmp_path, n=3)
cache = VideoDecoderCache(max_size=10)
for p in paths:
cache.get_decoder(p)
handles = [entry[1] for entry in cache._cache.values()]
assert all(not h.closed for h in handles)
cache.clear()
assert cache.size() == 0
assert all(h.closed for h in handles)
def test_hit_does_not_reopen_or_evict(self, tmp_path):
"""A cache hit must return the same decoder instance without touching the cap."""
paths = _make_distinct_clips(tmp_path, n=1)
cache = VideoDecoderCache(max_size=2)
first = cache.get_decoder(paths[0])
second = cache.get_decoder(paths[0])
assert first is second
assert cache.size() == 1
def test_unbounded_when_max_size_none(self, tmp_path):
"""``max_size=None`` preserves the legacy unbounded behaviour."""
paths = _make_distinct_clips(tmp_path, n=4)
cache = VideoDecoderCache(max_size=None)
for p in paths:
cache.get_decoder(p)
assert cache.size() == 4
def test_env_var_overrides_default(self, tmp_path, monkeypatch):
"""``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var sets the default ``max_size``."""
monkeypatch.setenv("LEROBOT_VIDEO_DECODER_CACHE_SIZE", "3")
cache = VideoDecoderCache()
assert cache.max_size == 3
paths = _make_distinct_clips(tmp_path, n=5)
for p in paths:
cache.get_decoder(p)
assert cache.size() == 3