mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-20 11:09:59 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dfdc48a7f1 | |||
| 6a8878a639 |
@@ -250,7 +250,14 @@ class DatasetWriter:
|
|||||||
for key, ft in self._meta.features.items():
|
for key, ft in self._meta.features.items():
|
||||||
if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["image", "video"]:
|
if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["image", "video"]:
|
||||||
continue
|
continue
|
||||||
episode_buffer[key] = np.stack(episode_buffer[key])
|
stacked_values = np.stack(episode_buffer[key])
|
||||||
|
|
||||||
|
# `shape=(1,)` numeric features are serialized as `datasets.Value`, which expects scalars.
|
||||||
|
# Normalizing to `(N,)` keeps save semantics stable across dependency versions.
|
||||||
|
if tuple(ft["shape"]) == (1,) and ft["dtype"] != "string":
|
||||||
|
stacked_values = stacked_values.reshape(episode_length)
|
||||||
|
|
||||||
|
episode_buffer[key] = stacked_values
|
||||||
|
|
||||||
# Wait for image writer to end, so that episode stats over images can be computed
|
# Wait for image writer to end, so that episode stats over images can be computed
|
||||||
self._wait_image_writer()
|
self._wait_image_writer()
|
||||||
|
|||||||
@@ -17,11 +17,13 @@ import contextlib
|
|||||||
import glob
|
import glob
|
||||||
import importlib
|
import importlib
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import queue
|
import queue
|
||||||
import shutil
|
import shutil
|
||||||
import tempfile
|
import tempfile
|
||||||
import threading
|
import threading
|
||||||
import warnings
|
import warnings
|
||||||
|
from collections import OrderedDict
|
||||||
from dataclasses import asdict, dataclass, field
|
from dataclasses import asdict, dataclass, field
|
||||||
from fractions import Fraction
|
from fractions import Fraction
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -191,15 +193,70 @@ def decode_video_frames_pyav(
|
|||||||
return closest_frames
|
return closest_frames
|
||||||
|
|
||||||
|
|
||||||
class VideoDecoderCache:
|
DEFAULT_DECODER_CACHE_SIZE = 100
|
||||||
"""Thread-safe cache for video decoders to avoid expensive re-initialization."""
|
"""Default LRU capacity for :class:`VideoDecoderCache`.
|
||||||
|
|
||||||
def __init__(self):
|
Sized to comfortably hold a small rolling window of episodes worth of decoders
|
||||||
self._cache: dict[str, tuple[Any, Any]] = {}
|
(typical recipes: 2-4 cameras per episode × tens of episodes in flight) while
|
||||||
|
bounding host RAM. Each cached entry retains a torchcodec ``VideoDecoder`` plus
|
||||||
|
an open ``fsspec`` file handle — on the order of a few MB per entry. Override
|
||||||
|
via the ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var or by passing ``max_size``
|
||||||
|
to the constructor (``None`` restores the legacy unbounded behaviour).
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def _default_max_cache_size() -> int | None:
|
||||||
|
raw = os.environ.get("LEROBOT_VIDEO_DECODER_CACHE_SIZE")
|
||||||
|
if raw is None:
|
||||||
|
return DEFAULT_DECODER_CACHE_SIZE
|
||||||
|
raw = raw.strip().lower()
|
||||||
|
if raw in ("", "none", "unbounded", "-1"):
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
value = int(raw)
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(
|
||||||
|
f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be an integer, 'none', or '-1'; got {raw!r}"
|
||||||
|
) from e
|
||||||
|
if value <= 0:
|
||||||
|
raise ValueError(f"LEROBOT_VIDEO_DECODER_CACHE_SIZE must be positive; got {value}")
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
class VideoDecoderCache:
|
||||||
|
"""Thread-safe LRU cache for torchcodec ``VideoDecoder`` instances.
|
||||||
|
|
||||||
|
Cached entries hold a ``VideoDecoder`` plus the open ``fsspec`` file handle
|
||||||
|
backing it. When the cache is full and a new path is requested, the
|
||||||
|
least-recently-used entry is evicted and its file handle is closed. This
|
||||||
|
bounds host-RAM growth when iterating over datasets with many distinct
|
||||||
|
video files (otherwise each ``DataLoader`` worker pins every decoder it has
|
||||||
|
ever opened until the process exits).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
max_size: Maximum number of decoders to retain. ``None`` disables
|
||||||
|
eviction and restores legacy unbounded behaviour. Defaults to the
|
||||||
|
value of ``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` if set, otherwise
|
||||||
|
:data:`DEFAULT_DECODER_CACHE_SIZE`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_SENTINEL: ClassVar[object] = object()
|
||||||
|
|
||||||
|
def __init__(self, max_size: int | None | object = _SENTINEL):
|
||||||
|
if max_size is VideoDecoderCache._SENTINEL:
|
||||||
|
max_size = _default_max_cache_size()
|
||||||
|
if max_size is not None and max_size <= 0:
|
||||||
|
raise ValueError(f"max_size must be positive or None; got {max_size}")
|
||||||
|
self.max_size: int | None = max_size # type: ignore[assignment]
|
||||||
|
self._cache: OrderedDict[str, tuple[Any, Any]] = OrderedDict()
|
||||||
self._lock = Lock()
|
self._lock = Lock()
|
||||||
|
|
||||||
|
def __contains__(self, video_path: object) -> bool:
|
||||||
|
with self._lock:
|
||||||
|
return str(video_path) in self._cache
|
||||||
|
|
||||||
def get_decoder(self, video_path: str):
|
def get_decoder(self, video_path: str):
|
||||||
"""Get a cached decoder or create a new one."""
|
"""Get a cached decoder or create a new one, evicting LRU if at capacity."""
|
||||||
if importlib.util.find_spec("torchcodec"):
|
if importlib.util.find_spec("torchcodec"):
|
||||||
from torchcodec.decoders import VideoDecoder
|
from torchcodec.decoders import VideoDecoder
|
||||||
else:
|
else:
|
||||||
@@ -211,22 +268,36 @@ class VideoDecoderCache:
|
|||||||
video_path = str(video_path)
|
video_path = str(video_path)
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if video_path not in self._cache:
|
entry = self._cache.get(video_path)
|
||||||
file_handle = fsspec.open(video_path).__enter__()
|
if entry is not None:
|
||||||
try:
|
self._cache.move_to_end(video_path)
|
||||||
decoder = VideoDecoder(file_handle, seek_mode="approximate")
|
return entry[0]
|
||||||
except Exception:
|
|
||||||
file_handle.close()
|
|
||||||
raise
|
|
||||||
self._cache[video_path] = (decoder, file_handle)
|
|
||||||
|
|
||||||
return self._cache[video_path][0]
|
file_handle = fsspec.open(video_path).__enter__()
|
||||||
|
try:
|
||||||
|
decoder = VideoDecoder(file_handle, seek_mode="approximate")
|
||||||
|
except Exception:
|
||||||
|
file_handle.close()
|
||||||
|
raise
|
||||||
|
self._cache[video_path] = (decoder, file_handle)
|
||||||
|
|
||||||
|
# Evict LRU entries until we are back under the cap. We close
|
||||||
|
# evicted file handles immediately; the associated ``VideoDecoder``
|
||||||
|
# is released to the GC when its last reference goes away.
|
||||||
|
if self.max_size is not None:
|
||||||
|
while len(self._cache) > self.max_size:
|
||||||
|
_evicted_path, (_evicted_decoder, evicted_handle) = self._cache.popitem(last=False)
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
evicted_handle.close()
|
||||||
|
|
||||||
|
return decoder
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
"""Clear the cache and close file handles."""
|
"""Clear the cache and close all file handles."""
|
||||||
with self._lock:
|
with self._lock:
|
||||||
for _, file_handle in self._cache.values():
|
for _, file_handle in self._cache.values():
|
||||||
file_handle.close()
|
with contextlib.suppress(Exception):
|
||||||
|
file_handle.close()
|
||||||
self._cache.clear()
|
self._cache.clear()
|
||||||
|
|
||||||
def size(self) -> int:
|
def size(self) -> int:
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import torch
|
|||||||
|
|
||||||
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
|
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
|
||||||
|
|
||||||
|
import datasets
|
||||||
from huggingface_hub import HfApi
|
from huggingface_hub import HfApi
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
from safetensors.torch import load_file
|
from safetensors.torch import load_file
|
||||||
@@ -360,6 +361,41 @@ def test_add_frame_image_pil(image_dataset):
|
|||||||
assert dataset[0]["image"].shape == torch.Size(DUMMY_CHW)
|
assert dataset[0]["image"].shape == torch.Size(DUMMY_CHW)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"dtype,np_dtype,values,assert_fn",
|
||||||
|
[
|
||||||
|
("float32", np.float32, [1.0, 2.0], np.testing.assert_allclose),
|
||||||
|
("int64", np.int64, [1, 2], np.testing.assert_array_equal),
|
||||||
|
("bool", np.bool_, [True, False], np.testing.assert_array_equal),
|
||||||
|
],
|
||||||
|
ids=["float32", "int64", "bool"],
|
||||||
|
)
|
||||||
|
def test_save_episode_shape_1_scalar_is_scalarized_before_hf_encoding(
|
||||||
|
tmp_path, empty_lerobot_dataset_factory, monkeypatch, dtype, np_dtype, values, assert_fn
|
||||||
|
):
|
||||||
|
features = {"state": {"dtype": dtype, "shape": (1,), "names": None}}
|
||||||
|
dataset = empty_lerobot_dataset_factory(root=tmp_path / "test", features=features)
|
||||||
|
dataset.add_frame({"state": np.array([values[0]], dtype=np_dtype), "task": "Dummy task"})
|
||||||
|
dataset.add_frame({"state": np.array([values[1]], dtype=np_dtype), "task": "Dummy task"})
|
||||||
|
|
||||||
|
captured = {}
|
||||||
|
original_from_dict = datasets.Dataset.from_dict
|
||||||
|
|
||||||
|
def _from_dict_spy(cls, mapping, *args, **kwargs):
|
||||||
|
captured["state"] = mapping["state"]
|
||||||
|
return original_from_dict(mapping, *args, **kwargs)
|
||||||
|
|
||||||
|
monkeypatch.setattr(datasets.Dataset, "from_dict", classmethod(_from_dict_spy))
|
||||||
|
|
||||||
|
dataset.save_episode()
|
||||||
|
dataset.finalize()
|
||||||
|
|
||||||
|
assert "state" in captured
|
||||||
|
assert isinstance(captured["state"], np.ndarray)
|
||||||
|
assert captured["state"].shape == (2,)
|
||||||
|
assert_fn(captured["state"], np.array(values, dtype=np_dtype))
|
||||||
|
|
||||||
|
|
||||||
def test_set_image_transforms_applies_transparently(image_dataset):
|
def test_set_image_transforms_applies_transparently(image_dataset):
|
||||||
dataset = image_dataset
|
dataset = image_dataset
|
||||||
dataset.add_frame({"image": np.random.rand(*DUMMY_CHW), "task": "Dummy task"})
|
dataset.add_frame({"image": np.random.rand(*DUMMY_CHW), "task": "Dummy task"})
|
||||||
|
|||||||
@@ -0,0 +1,140 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""Unit tests for ``lerobot.datasets.video_utils.VideoDecoderCache``.
|
||||||
|
|
||||||
|
These cover the LRU bounding + file-handle release behaviour added to prevent
|
||||||
|
unbounded growth when iterating over datasets with many distinct video files
|
||||||
|
(observed: ~35 GB anon-rss per DataLoader worker on an 8 k-file dataset).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
pytest.importorskip("torchcodec", reason="torchcodec is required (install lerobot[dataset])")
|
||||||
|
|
||||||
|
from lerobot.datasets.video_utils import VideoDecoderCache # noqa: E402
|
||||||
|
|
||||||
|
TEST_ARTIFACTS_DIR = Path(__file__).resolve().parent.parent / "artifacts" / "encoded_videos"
|
||||||
|
SRC_CLIP = TEST_ARTIFACTS_DIR / "clip_4frames.mp4"
|
||||||
|
|
||||||
|
|
||||||
|
def _make_distinct_clips(tmp_path: Path, n: int) -> list[Path]:
|
||||||
|
"""Copy the small reference mp4 to ``n`` distinct paths.
|
||||||
|
|
||||||
|
The cache keys on absolute path, so distinct paths force distinct cache entries
|
||||||
|
even though the file contents are identical.
|
||||||
|
"""
|
||||||
|
assert SRC_CLIP.exists(), f"missing test artifact {SRC_CLIP}"
|
||||||
|
paths = []
|
||||||
|
for i in range(n):
|
||||||
|
dst = tmp_path / f"clip_{i:04d}.mp4"
|
||||||
|
shutil.copyfile(SRC_CLIP, dst)
|
||||||
|
paths.append(dst)
|
||||||
|
return paths
|
||||||
|
|
||||||
|
|
||||||
|
class TestVideoDecoderCacheBounded:
|
||||||
|
def test_default_cache_is_bounded(self):
|
||||||
|
"""The default cache must have a finite ``max_size`` to bound RSS growth."""
|
||||||
|
cache = VideoDecoderCache()
|
||||||
|
assert cache.max_size is not None, "default cache must be bounded"
|
||||||
|
assert cache.max_size > 0
|
||||||
|
|
||||||
|
def test_size_capped_at_max_size(self, tmp_path):
|
||||||
|
"""``get_decoder`` for >``max_size`` distinct paths must NOT grow without bound."""
|
||||||
|
paths = _make_distinct_clips(tmp_path, n=5)
|
||||||
|
cache = VideoDecoderCache(max_size=2)
|
||||||
|
for p in paths:
|
||||||
|
cache.get_decoder(p)
|
||||||
|
assert cache.size() == 2
|
||||||
|
|
||||||
|
def test_evicts_least_recently_used(self, tmp_path):
|
||||||
|
"""Re-accessing an entry must promote it; the LRU entry is the one evicted."""
|
||||||
|
paths = _make_distinct_clips(tmp_path, n=3)
|
||||||
|
cache = VideoDecoderCache(max_size=2)
|
||||||
|
|
||||||
|
cache.get_decoder(paths[0])
|
||||||
|
cache.get_decoder(paths[1])
|
||||||
|
cache.get_decoder(paths[0]) # promote paths[0] to MRU; paths[1] is now LRU
|
||||||
|
cache.get_decoder(paths[2]) # should evict paths[1]
|
||||||
|
|
||||||
|
assert str(paths[0]) in cache # MRU stays
|
||||||
|
assert str(paths[1]) not in cache # LRU evicted
|
||||||
|
assert str(paths[2]) in cache # newest stays
|
||||||
|
|
||||||
|
def test_eviction_closes_file_handle(self, tmp_path):
|
||||||
|
"""Evicting an entry must close its fsspec file handle (otherwise we leak FDs)."""
|
||||||
|
paths = _make_distinct_clips(tmp_path, n=2)
|
||||||
|
cache = VideoDecoderCache(max_size=1)
|
||||||
|
|
||||||
|
cache.get_decoder(paths[0])
|
||||||
|
# Reach into the cache to capture the handle before it is evicted. This is
|
||||||
|
# the only assertion in the suite that touches a private attribute, and it
|
||||||
|
# is the most direct way to prove the file descriptor is actually released.
|
||||||
|
evicted_handle = cache._cache[str(paths[0])][1]
|
||||||
|
assert evicted_handle.closed is False
|
||||||
|
|
||||||
|
cache.get_decoder(paths[1]) # forces eviction of paths[0]
|
||||||
|
|
||||||
|
assert evicted_handle.closed is True
|
||||||
|
|
||||||
|
def test_clear_closes_all_file_handles(self, tmp_path):
|
||||||
|
"""``clear()`` must close every cached file handle."""
|
||||||
|
paths = _make_distinct_clips(tmp_path, n=3)
|
||||||
|
cache = VideoDecoderCache(max_size=10)
|
||||||
|
|
||||||
|
for p in paths:
|
||||||
|
cache.get_decoder(p)
|
||||||
|
handles = [entry[1] for entry in cache._cache.values()]
|
||||||
|
assert all(not h.closed for h in handles)
|
||||||
|
|
||||||
|
cache.clear()
|
||||||
|
|
||||||
|
assert cache.size() == 0
|
||||||
|
assert all(h.closed for h in handles)
|
||||||
|
|
||||||
|
def test_hit_does_not_reopen_or_evict(self, tmp_path):
|
||||||
|
"""A cache hit must return the same decoder instance without touching the cap."""
|
||||||
|
paths = _make_distinct_clips(tmp_path, n=1)
|
||||||
|
cache = VideoDecoderCache(max_size=2)
|
||||||
|
|
||||||
|
first = cache.get_decoder(paths[0])
|
||||||
|
second = cache.get_decoder(paths[0])
|
||||||
|
|
||||||
|
assert first is second
|
||||||
|
assert cache.size() == 1
|
||||||
|
|
||||||
|
def test_unbounded_when_max_size_none(self, tmp_path):
|
||||||
|
"""``max_size=None`` preserves the legacy unbounded behaviour."""
|
||||||
|
paths = _make_distinct_clips(tmp_path, n=4)
|
||||||
|
cache = VideoDecoderCache(max_size=None)
|
||||||
|
for p in paths:
|
||||||
|
cache.get_decoder(p)
|
||||||
|
assert cache.size() == 4
|
||||||
|
|
||||||
|
def test_env_var_overrides_default(self, tmp_path, monkeypatch):
|
||||||
|
"""``LEROBOT_VIDEO_DECODER_CACHE_SIZE`` env var sets the default ``max_size``."""
|
||||||
|
monkeypatch.setenv("LEROBOT_VIDEO_DECODER_CACHE_SIZE", "3")
|
||||||
|
cache = VideoDecoderCache()
|
||||||
|
assert cache.max_size == 3
|
||||||
|
|
||||||
|
paths = _make_distinct_clips(tmp_path, n=5)
|
||||||
|
for p in paths:
|
||||||
|
cache.get_decoder(p)
|
||||||
|
assert cache.size() == 3
|
||||||
Reference in New Issue
Block a user