mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-20 02:59:50 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dfdc48a7f1 | |||
| 6a8878a639 |
@@ -250,7 +250,14 @@ class DatasetWriter:
|
||||
for key, ft in self._meta.features.items():
|
||||
if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["image", "video"]:
|
||||
continue
|
||||
episode_buffer[key] = np.stack(episode_buffer[key])
|
||||
stacked_values = np.stack(episode_buffer[key])
|
||||
|
||||
# `shape=(1,)` numeric features are serialized as `datasets.Value`, which expects scalars.
|
||||
# Normalizing to `(N,)` keeps save semantics stable across dependency versions.
|
||||
if tuple(ft["shape"]) == (1,) and ft["dtype"] != "string":
|
||||
stacked_values = stacked_values.reshape(episode_length)
|
||||
|
||||
episode_buffer[key] = stacked_values
|
||||
|
||||
# Wait for image writer to end, so that episode stats over images can be computed
|
||||
self._wait_image_writer()
|
||||
|
||||
@@ -17,11 +17,13 @@ import contextlib
|
||||
import glob
|
||||
import importlib
|
||||
import logging
|
||||
import os
|
||||
import queue
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import warnings
|
||||
from collections import OrderedDict
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from fractions import Fraction
|
||||
from pathlib import Path
|
||||
@@ -191,15 +193,70 @@ def decode_video_frames_pyav(
|
||||
return closest_frames
|
||||
|
||||
|
||||
class VideoDecoderCache:
|
||||
"""Thread-safe cache for video decoders to avoid expensive re-initialization."""
|
||||
DEFAULT_DECODER_CACHE_SIZE = 100
|
||||
"""Default LRU capacity for :class:`VideoDecoderCache`.
|
||||
|
||||
def __init__(self):
|
||||
self._cache: dict[str, tuple[Any, Any]] = {}
|
||||
Sized to comfortably hold a small rolling window of episodes worth of decoders
|
||||
(typical recipes: 2-4 cameras per episode × tens of episodes in flight) while
|
||||
bounding host RAM. Each cached entry retains a torchcodec ``VideoDecoder`` plus
|
||||
an open ``fsspec`` file handle — on the order of a few MB per entry. Override
|
||||
via the ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var or by passing ``max_size``
|
||||
to the constructor (``None`` restores the legacy unbounded behaviour).
|
||||
"""
|
||||
|
||||
|
||||
def _default_max_cache_size() -> int | None:
|
||||
raw = os.environ.get("LEROBOT_VIDEO_DECODER_CACHE_SIZE")
|
||||
if raw is None:
|
||||
return DEFAULT_DECODER_CACHE_SIZE
|
||||
raw = raw.strip().lower()
|
||||
if raw in ("", "none", "unbounded", "-1"):
|
||||
return None
|
||||
try:
|
||||
value = int(raw)
|
||||
except ValueError as e:
|
||||
raise ValueError(
|
||||
f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be an integer, 'none', or '-1'; got {raw!r}"
|
||||
) from e
|
||||
if value <= 0:
|
||||
raise ValueError(f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be positive; got {value}")
|
||||
return value
|
||||
|
||||
|
||||
class VideoDecoderCache:
|
||||
"""Thread-safe LRU cache for torchcodec ``VideoDecoder`` instances.
|
||||
|
||||
Cached entries hold a ``VideoDecoder`` plus the open ``fsspec`` file handle
|
||||
backing it. When the cache is full and a new path is requested, the
|
||||
least-recently-used entry is evicted and its file handle is closed. This
|
||||
bounds host-RAM growth when iterating over datasets with many distinct
|
||||
video files (otherwise each ``DataLoader`` worker pins every decoder it has
|
||||
ever opened until the process exits).
|
||||
|
||||
Args:
|
||||
max_size: Maximum number of decoders to retain. ``None`` disables
|
||||
eviction and restores legacy unbounded behaviour. Defaults to the
|
||||
value of ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` if set, otherwise
|
||||
:data:`DEFAULT_DECODER_CACHE_SIZE`.
|
||||
"""
|
||||
|
||||
_SENTINEL: ClassVar[object] = object()
|
||||
|
||||
def __init__(self, max_size: int | None | object = _SENTINEL):
|
||||
if max_size is VideoDecoderCache._SENTINEL:
|
||||
max_size = _default_max_cache_size()
|
||||
if max_size is not None and max_size <= 0:
|
||||
raise ValueError(f"max_size must be positive or None; got {max_size}")
|
||||
self.max_size: int | None = max_size # type: ignore[assignment]
|
||||
self._cache: OrderedDict[str, tuple[Any, Any]] = OrderedDict()
|
||||
self._lock = Lock()
|
||||
|
||||
def __contains__(self, video_path: object) -> bool:
|
||||
with self._lock:
|
||||
return str(video_path) in self._cache
|
||||
|
||||
def get_decoder(self, video_path: str):
|
||||
"""Get a cached decoder or create a new one."""
|
||||
"""Get a cached decoder or create a new one, evicting LRU if at capacity."""
|
||||
if importlib.util.find_spec("torchcodec"):
|
||||
from torchcodec.decoders import VideoDecoder
|
||||
else:
|
||||
@@ -211,22 +268,36 @@ class VideoDecoderCache:
|
||||
video_path = str(video_path)
|
||||
|
||||
with self._lock:
|
||||
if video_path not in self._cache:
|
||||
file_handle = fsspec.open(video_path).__enter__()
|
||||
try:
|
||||
decoder = VideoDecoder(file_handle, seek_mode="approximate")
|
||||
except Exception:
|
||||
file_handle.close()
|
||||
raise
|
||||
self._cache[video_path] = (decoder, file_handle)
|
||||
entry = self._cache.get(video_path)
|
||||
if entry is not None:
|
||||
self._cache.move_to_end(video_path)
|
||||
return entry[0]
|
||||
|
||||
return self._cache[video_path][0]
|
||||
file_handle = fsspec.open(video_path).__enter__()
|
||||
try:
|
||||
decoder = VideoDecoder(file_handle, seek_mode="approximate")
|
||||
except Exception:
|
||||
file_handle.close()
|
||||
raise
|
||||
self._cache[video_path] = (decoder, file_handle)
|
||||
|
||||
# Evict LRU entries until we are back under the cap. We close
|
||||
# evicted file handles immediately; the associated ``VideoDecoder``
|
||||
# is released to the GC when its last reference goes away.
|
||||
if self.max_size is not None:
|
||||
while len(self._cache) > self.max_size:
|
||||
_evicted_path, (_evicted_decoder, evicted_handle) = self._cache.popitem(last=False)
|
||||
with contextlib.suppress(Exception):
|
||||
evicted_handle.close()
|
||||
|
||||
return decoder
|
||||
|
||||
def clear(self):
|
||||
"""Clear the cache and close file handles."""
|
||||
"""Clear the cache and close all file handles."""
|
||||
with self._lock:
|
||||
for _, file_handle in self._cache.values():
|
||||
file_handle.close()
|
||||
with contextlib.suppress(Exception):
|
||||
file_handle.close()
|
||||
self._cache.clear()
|
||||
|
||||
def size(self) -> int:
|
||||
|
||||
@@ -24,6 +24,7 @@ import torch
|
||||
|
||||
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
|
||||
|
||||
import datasets
|
||||
from huggingface_hub import HfApi
|
||||
from PIL import Image
|
||||
from safetensors.torch import load_file
|
||||
@@ -360,6 +361,41 @@ def test_add_frame_image_pil(image_dataset):
|
||||
assert dataset[0]["image"].shape == torch.Size(DUMMY_CHW)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"dtype,np_dtype,values,assert_fn",
|
||||
[
|
||||
("float32", np.float32, [1.0, 2.0], np.testing.assert_allclose),
|
||||
("int64", np.int64, [1, 2], np.testing.assert_array_equal),
|
||||
("bool", np.bool_, [True, False], np.testing.assert_array_equal),
|
||||
],
|
||||
ids=["float32", "int64", "bool"],
|
||||
)
|
||||
def test_save_episode_shape_1_scalar_is_scalarized_before_hf_encoding(
|
||||
tmp_path, empty_lerobot_dataset_factory, monkeypatch, dtype, np_dtype, values, assert_fn
|
||||
):
|
||||
features = {"state": {"dtype": dtype, "shape": (1,), "names": None}}
|
||||
dataset = empty_lerobot_dataset_factory(root=tmp_path / "test", features=features)
|
||||
dataset.add_frame({"state": np.array([values[0]], dtype=np_dtype), "task": "Dummy task"})
|
||||
dataset.add_frame({"state": np.array([values[1]], dtype=np_dtype), "task": "Dummy task"})
|
||||
|
||||
captured = {}
|
||||
original_from_dict = datasets.Dataset.from_dict
|
||||
|
||||
def _from_dict_spy(cls, mapping, *args, **kwargs):
|
||||
captured["state"] = mapping["state"]
|
||||
return original_from_dict(mapping, *args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(datasets.Dataset, "from_dict", classmethod(_from_dict_spy))
|
||||
|
||||
dataset.save_episode()
|
||||
dataset.finalize()
|
||||
|
||||
assert "state" in captured
|
||||
assert isinstance(captured["state"], np.ndarray)
|
||||
assert captured["state"].shape == (2,)
|
||||
assert_fn(captured["state"], np.array(values, dtype=np_dtype))
|
||||
|
||||
|
||||
def test_set_image_transforms_applies_transparently(image_dataset):
|
||||
dataset = image_dataset
|
||||
dataset.add_frame({"image": np.random.rand(*DUMMY_CHW), "task": "Dummy task"})
|
||||
|
||||
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Unit tests for ``lerobot.datasets.video_utils.VideoDecoderCache``.
|
||||
|
||||
These cover the LRU bounding + file-handle release behaviour added to prevent
|
||||
unbounded growth when iterating over datasets with many distinct video files
|
||||
(observed: ~35 GB anon-rss per DataLoader worker on an 8 k-file dataset).
|
||||
"""
|
||||
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
pytest.importorskip("torchcodec", reason="torchcodec is required (install lerobot[dataset])")
|
||||
|
||||
from lerobot.datasets.video_utils import VideoDecoderCache # noqa: E402
|
||||
|
||||
TEST_ARTIFACTS_DIR = Path(__file__).resolve().parent.parent / "artifacts" / "encoded_videos"
|
||||
SRC_CLIP = TEST_ARTIFACTS_DIR / "clip_4frames.mp4"
|
||||
|
||||
|
||||
def _make_distinct_clips(tmp_path: Path, n: int) -> list[Path]:
|
||||
"""Copy the small reference mp4 to ``n`` distinct paths.
|
||||
|
||||
The cache keys on absolute path, so distinct paths force distinct cache entries
|
||||
even though the file contents are identical.
|
||||
"""
|
||||
assert SRC_CLIP.exists(), f"missing test artifact {SRC_CLIP}"
|
||||
paths = []
|
||||
for i in range(n):
|
||||
dst = tmp_path / f"clip_{i:04d}.mp4"
|
||||
shutil.copyfile(SRC_CLIP, dst)
|
||||
paths.append(dst)
|
||||
return paths
|
||||
|
||||
|
||||
class TestVideoDecoderCacheBounded:
|
||||
def test_default_cache_is_bounded(self):
|
||||
"""The default cache must have a finite ``max_size`` to bound RSS growth."""
|
||||
cache = VideoDecoderCache()
|
||||
assert cache.max_size is not None, "default cache must be bounded"
|
||||
assert cache.max_size > 0
|
||||
|
||||
def test_size_capped_at_max_size(self, tmp_path):
|
||||
"""``get_decoder`` for >``max_size`` distinct paths must NOT grow without bound."""
|
||||
paths = _make_distinct_clips(tmp_path, n=5)
|
||||
cache = VideoDecoderCache(max_size=2)
|
||||
for p in paths:
|
||||
cache.get_decoder(p)
|
||||
assert cache.size() == 2
|
||||
|
||||
def test_evicts_least_recently_used(self, tmp_path):
|
||||
"""Re-accessing an entry must promote it; the LRU entry is the one evicted."""
|
||||
paths = _make_distinct_clips(tmp_path, n=3)
|
||||
cache = VideoDecoderCache(max_size=2)
|
||||
|
||||
cache.get_decoder(paths[0])
|
||||
cache.get_decoder(paths[1])
|
||||
cache.get_decoder(paths[0]) # promote paths[0] to MRU; paths[1] is now LRU
|
||||
cache.get_decoder(paths[2]) # should evict paths[1]
|
||||
|
||||
assert str(paths[0]) in cache # MRU stays
|
||||
assert str(paths[1]) not in cache # LRU evicted
|
||||
assert str(paths[2]) in cache # newest stays
|
||||
|
||||
def test_eviction_closes_file_handle(self, tmp_path):
|
||||
"""Evicting an entry must close its fsspec file handle (otherwise we leak FDs)."""
|
||||
paths = _make_distinct_clips(tmp_path, n=2)
|
||||
cache = VideoDecoderCache(max_size=1)
|
||||
|
||||
cache.get_decoder(paths[0])
|
||||
# Reach into the cache to capture the handle before it is evicted. This is
|
||||
# the only assertion in the suite that touches a private attribute, and it
|
||||
# is the most direct way to prove the file descriptor is actually released.
|
||||
evicted_handle = cache._cache[str(paths[0])][1]
|
||||
assert evicted_handle.closed is False
|
||||
|
||||
cache.get_decoder(paths[1]) # forces eviction of paths[0]
|
||||
|
||||
assert evicted_handle.closed is True
|
||||
|
||||
def test_clear_closes_all_file_handles(self, tmp_path):
|
||||
"""``clear()`` must close every cached file handle."""
|
||||
paths = _make_distinct_clips(tmp_path, n=3)
|
||||
cache = VideoDecoderCache(max_size=10)
|
||||
|
||||
for p in paths:
|
||||
cache.get_decoder(p)
|
||||
handles = [entry[1] for entry in cache._cache.values()]
|
||||
assert all(not h.closed for h in handles)
|
||||
|
||||
cache.clear()
|
||||
|
||||
assert cache.size() == 0
|
||||
assert all(h.closed for h in handles)
|
||||
|
||||
def test_hit_does_not_reopen_or_evict(self, tmp_path):
|
||||
"""A cache hit must return the same decoder instance without touching the cap."""
|
||||
paths = _make_distinct_clips(tmp_path, n=1)
|
||||
cache = VideoDecoderCache(max_size=2)
|
||||
|
||||
first = cache.get_decoder(paths[0])
|
||||
second = cache.get_decoder(paths[0])
|
||||
|
||||
assert first is second
|
||||
assert cache.size() == 1
|
||||
|
||||
def test_unbounded_when_max_size_none(self, tmp_path):
|
||||
"""``max_size=None`` preserves the legacy unbounded behaviour."""
|
||||
paths = _make_distinct_clips(tmp_path, n=4)
|
||||
cache = VideoDecoderCache(max_size=None)
|
||||
for p in paths:
|
||||
cache.get_decoder(p)
|
||||
assert cache.size() == 4
|
||||
|
||||
def test_env_var_overrides_default(self, tmp_path, monkeypatch):
|
||||
"""``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var sets the default ``max_size``."""
|
||||
monkeypatch.setenv("LEROBOT_VIDEO_DECODER_CACHE_SIZE", "3")
|
||||
cache = VideoDecoderCache()
|
||||
assert cache.max_size == 3
|
||||
|
||||
paths = _make_distinct_clips(tmp_path, n=5)
|
||||
for p in paths:
|
||||
cache.get_decoder(p)
|
||||
assert cache.size() == 3
|
||||
Reference in New Issue
Block a user