Compare commits

...

26 Commits

Author SHA1 Message Date
Pepijn 38106ea6b4 chore(streaming): drop benchmark and SLURM scaffolding from the PR
The benchmarks/streaming harness (matrix submitter, summarizer, decode
diagnostic) and the robocasa SLURM scripts are cluster-specific tooling,
not part of the streaming feature. The example's --dummy mode covers
throughput measurement for reviewers. Recoverable from git history
(894fc6bfb) for cluster runs. Example docstring de-personalized.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 21:46:43 +02:00
Pepijn 894fc6bfb5 refactor(streaming): rebuild StreamingLeRobotDataset on native datasets primitives
The custom episode pool becomes a pure `datasets` pipeline:

  split_dataset_by_node -> batch(by_column="episode_index")
    -> shuffle(buffer=episode_pool_size)            # episode pool
    -> map(explode + exact delta windows)           # episode -> frames
    -> shuffle(buffer=frame_shuffle_buffer_size)    # frame interleave

and the torch IterableDataset wrapper keeps only per-sample video decode
(decode-on-exit), image transforms, task lookup, and decode/fetch timing.

Replaced by native machinery and deleted: the pooled-episode admission
loop, the refcounted video prefetcher, manual worker shard striding plus
the worker-split suppression patch, the per-(epoch, rank) shard-order
permutation, the per-consumer SplitMix64 RNG, and fast-forward resume.
DataLoader workers are split by `datasets` itself; .shuffle() permutes
shard order per epoch natively; resume delegates to the native
state_dict/load_state_dict (exact with num_workers=0; with workers use
torchdata's StatefulDataLoader, which checkpoints per-worker state
through the same protocol). An in-flight epoch counter ensures a
mid-iteration state_dict records the epoch the stream position belongs
to. Buffer contents are skipped on resume (documented datasets
behavior): never repeats data, drops at most ~pool + frame-buffer frames.

Randomness is unchanged: a batch still mixes up to episode_pool_size
episodes; delta windows are still exact in-episode slices with correct
boundary padding (value-verified against the map-style dataset). The
known trade accepted with this rewrite: no video prefetch-on-admit, so
remote decode pays per-frame range reads at yield time - use a colocated
bucket (data_files_root) at large scale.

The delta-consistency tests gained a scalar-comparison branch: they
silently skipped python-scalar keys before (stale `check` variable),
exposed by the new pipeline's key ordering.

Requires datasets with #8259 (pinned to the merge commit on this
branch). Example updated to per-rank native resume via torchdata's
StatefulDataLoader when available.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 21:03:09 +02:00
Pepijn 984b400e5c build(deps): pin datasets to the datasets#8259 merge commit
The native streaming pipeline calls .shuffle() on top of batch(by_column=...),
which crashes on released datasets 5.0.0 (batch-accumulator flag dropped on
shard/shuffle re-creation). The fix (datasets#8259) is merged but unreleased,
so pin datasets to the merge commit 2c45eab on this branch via [tool.uv.sources].
Drop this pin and bump the floor in `dependencies` once the next datasets
release ships the fix.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 18:28:41 +02:00
Pepijn 4e056081cb feat(streaming): seeded shard-order permutation per (seed, epoch, rank)
Shards were assigned to consumers in file-index order, so a sub-epoch
run over a corpus consolidated source-by-source trains on whatever the
first N% of files contains and drifts curriculum-style as sources change
under it. Permute the rank's shard list with a seeded RNG before worker
striding: a 30%-of-epoch run now sees a uniform 30% sample of files.

The permutation is seeded by (seed, epoch, rank) only - every DataLoader
worker of a rank must derive the identical list, since workers stride it
and disagreement would create overlapping shard assignments. It re-draws
each epoch, is the identity when shuffle=False, and stays deterministic
for fast-forward resume.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 17:08:26 +02:00
Pepijn a164bb97bd feat(streaming): native datasets-5 episode batching and worker-split suppression
Allow datasets 5.x (pin >=4.7,<6; lockfile moves to 5.0.0) and use its
Arrow-native batch(by_column="episode_index") (huggingface/datasets#8194
sibling, #8172) for episode admission when available - one Arrow
accumulation per episode instead of one Python dict per row - with the
existing row loop as the 4.x fallback. A parity test asserts both paths
group identically.

Also fixes a latent worker bug this surfaced: `datasets` detects torch
DataLoader workers and re-splits its shards internally (_iter_pytorch),
on top of our explicit per-worker shard assignment. That second split
silently drops data whenever a per-worker stream has fewer internal
shards than there are workers (masked so far by single-file test
fixtures), and on datasets 5.0 it crashes by_column batching outright.
The worker context is now hidden from `datasets` while draining streams
we already partitioned (process-local patch, restored on exit).

The multi-shard shuffle buffer (huggingface/datasets#8194) is
intentionally NOT used: frame-level shuffling upstream of episode
grouping would fragment episodes and break delta windows. Its threaded
multi-source prefetch idea remains a follow-up for episode admission if
fetch timings warrant it.

Verified on both datasets 4.8.5 (fallback) and 5.0.0 (native): 27/27
streaming tests each; full datasets suite 469 passed under 5.0.0.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 16:10:53 +02:00
Pepijn 79b547de32 Merge remote episode-pool work into the full pool rewrite
The remote commit (2ab71231c) added an opt-in episode pool, deferred
decode in the legacy buffer path, decode/fetch timing instrumentation,
remote-IO retries (video_utils), and 32MB row-group writing
(dataset_tools). The pool rewrite on this side makes the episode pool
the only iteration path (with prefetch-on-admit, per-consumer seeding,
worker-exact fast-forward resume), so streaming_dataset.py resolves to
the rewrite with the remote instrumentation ported into it:

- 5-slot shared counters + timing_stats() (decode_s_total/fetch_s_total)
- fetch timed around episode admission, decode timed around emission
- benchmark/slurm keep the remote updates, with episode_pool_size as the
  knob (buffer_size deprecated and ignored)

video_utils retries and dataset_tools row groups are taken unchanged.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 15:17:04 +02:00
Pepijn a7b7f4964e fix(streaming): worker-exact resume arithmetic and multi-worker resume test
The fast-forward skip assumed every DataLoader worker delivers batches;
workers that own no shards yield nothing and are stopped, so the batch
round-robin runs over min(num_workers, num_shards) active workers. Use
that effective count (shard-less workers skip nothing). Adds a resume
test under num_workers=2 asserting exact continuation.

Note: the test fixtures write a single parquet file regardless of
data_files_size_in_mb, so worker-splitting tests exercise the degenerate
single-shard layout; multi-shard behavior is covered by the rank-level
split_dataset_by_node tests.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 15:11:00 +02:00
Pepijn 1050c2fb6c feat(streaming): episode-pool iteration with decode-on-exit, video prefetch, and exact resume
Replace the shard/Backtrackable/decoded-shuffle-buffer internals with an
episode pool: each (rank x worker) consumer keeps episode_pool_size whole
episodes' tabular rows in RAM and emits uniformly random frames across
them. delta_timestamps windows become exact in-RAM slices with correct
boundary padding (the Backtrackable machinery and its lookback/lookahead
ceilings are gone), and video is decoded only when a sample is emitted,
so pool memory stays tabular-sized instead of buffer_size decoded
samples.

- Prefetch-on-admit: when streaming from a remote source, each pooled
  episode's video files download to a local cache in the background
  (refcounted, since v3 packs several episodes per file; deleted on
  eviction), so decode-on-exit reads local bytes instead of paying
  network seek latency.
- Per-consumer RNG derived from (seed, epoch, rank, worker): consumers
  decorrelated, runs reproducible, epochs reshuffle automatically.
- Deterministic fast-forward resume: load_state_dict takes the trainer's
  {batches_consumed, batch_size}; each worker re-derives its own skip
  from the DataLoader's round-robin batch assignment and replays
  tabular-only (no decode). Exact within an epoch, works with
  num_workers > 0, and the same state file serves every rank. Replaces
  the per-shard HF state_dict approach, which lived in worker processes
  and could not be captured from the trainer.
- Shard-cap default removed (max_num_shards=None uses every parquet
  shard); runtime warnings for non-divisible world sizes (datasets
  degrades to read-everything splitting) and workers left without
  shards.
- episode_pool_size replaces buffer_size (deprecated, ignored with a
  warning); decoder cache sized to the pool working set, capped at 128.

Legacy order-replication tests asserted the old buffer algorithm
step-by-step and are rewritten as behavior contracts (exactly-once
coverage, per-seed determinism, epoch reshuffle). Value-level parity
tests against the map-style dataset pass unchanged.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 15:02:15 +02:00
Pepijn 66ac901632 fix(streaming): do not prepare the dataloader with accelerate
The dataset is already rank-disjoint via split_dataset_by_node;
accelerate's IterableDatasetShard wrapper kept only every Nth batch of
each rank's stream, silently training on 1/N of the data per pass while
decoding all of it. The --dummy benchmark path never prepared the
loader, so benchmarks were unaffected.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 12:21:20 +02:00
Pepijn ce326207e6 Merge remote-tracking branch 'origin/main' into feat/streaming-hf-native 2026-06-11 12:19:32 +02:00
pepijn 2ab71231cd feat(streaming): defer video decode, episode-pool shuffle, and remote-IO retries
- streaming_dataset: defer torchcodec decode until a sample leaves the shuffle
  buffer (buffer now holds ~KB tabular rows, not MB of pixels) and add an opt-in
  episode-pool shuffle (episode_pool_size) with exact in-episode delta lookups;
  expose decode/fetch timing_stats.
- video_utils: retry transient hf:///fsspec/httpx transport errors during
  streaming decode (LEROBOT_REMOTE_IO_MAX_RETRIES).
- dataset_tools: write multiple ~32MB row groups with a page index to bound
  per-shard streaming memory.
- benchmarks/slurm: streaming benchmark + matrix submitter updates.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-11 10:08:28 +00:00
Pepijn 41166b39fb fix(train): synchronize EpisodeAwareSampler shuffling across ranks and gate dataset download per node (#3768)
* fix(datasets): expose a generator on EpisodeAwareSampler for distributed shuffle sync

In distributed training, accelerate can only synchronize the shuffle
permutation across ranks when the sampler exposes a generator attribute.
EpisodeAwareSampler shuffled via the global torch RNG, so disjoint batch
shards relied on every rank's global CPU RNG staying in lockstep forever;
any rank-asymmetric RNG consumption (e.g. eval rollouts on the main
process only) silently desynced the permutations and ranks trained on
overlapping/missing samples.

* fix(train): seed sampler generator and gate dataset download per node

- Pass a generator seeded with cfg.seed to EpisodeAwareSampler so
  accelerator.prepare registers it as the synchronized RNG and the
  shuffle order is reproducible.
- Gate the initial make_dataset call on is_local_main_process instead of
  is_main_process: the global main process only exists on node 0, so on
  every other node all local ranks were downloading the dataset and
  building the Arrow cache concurrently.
2026-06-11 11:07:42 +02:00
Steven Palma 79c6821407 chore(dependecies): update mujoco transitives (#3756) 2026-06-10 12:58:55 +02:00
Pepijn 42d4788e4a fix(streaming): drop undeclared parquet columns that break batch collation
The data_files_root/bucket path reads an unversioned source (e.g. `main`), which can
carry extra annotation columns not in the dataset's feature contract — notably
`language_events`, a variable-length list (length 0..N per frame). Passed through to the
sample, these break default DataLoader collation ("each element in list of batch should
be of equal size"), which is why bucket jobs failed while the hub path (pinned to the
clean v3.0 revision) succeeded.

Drop any hf_dataset column not in meta.features after load. No-op on a clean revision;
removes language_events/language_persistent on main. Verified by reproducing the bucket
code path locally via --data_files_root hf://datasets/<repo> (parquet builder + main
columns): now decodes and collates instead of raising.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 17:24:30 +02:00
Pepijn 2d1c17d971 docs(streaming): note AV1 is LeRobot's default codec (vcodec=libsvtav1)
So the A100/H100 no-AV1-NVDEC limitation applies to most LeRobot v3 datasets, not just
RoboCasa — GPU decode needs an Ada GPU, an hevc/h264-encoded dataset, or a re-encode.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 17:10:18 +02:00
Pepijn 7241f029c6 docs(streaming): A100/H100 NVDEC cannot decode AV1 — correct guidance
NVIDIA's decode support matrix: the compute GPUs A100 (GA100) and H100 (GH100) have no
AV1 NVDEC decoder; only Ada (L4/L40/RTX40) and some Ampere (A10/A40/A16) do. So on
A100/H100 nodes, AV1 datasets must be decoded on CPU or re-encoded to H.265/H.264 — no
torchcodec build enables cuda AV1 decode there. Also distinguish that error from
"Unsupported device: cuda (variant: ffmpeg)", which is a torchcodec-built-without-CUDA
issue. Update diagnose_decode.py message + benchmark README accordingly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 17:08:54 +02:00
Pepijn 06ddc59913 feat(streaming): CONDA_ENV knob for the matrix submitter
Add CONDA_ENV=<name> to run each matrix job via `conda run --no-capture-output -n
<name>` — works inside the dash `sbatch --wrap` without sourcing conda.sh / activating,
and streams logs live. Point it at a conda env with a modern torchcodec (>=0.11) +
datasets (>=4.7); the default cluster `base` env is often too old to decode AV1.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 16:55:42 +02:00
Pepijn 23c58f5f9e feat(streaming): decode diagnostic + fail benchmark on 0 frames
- benchmark: raise SystemExit if 0 frames were measured, so a run that produces no
  batches (swallowed decode error, all batches dropped) fails loudly instead of being
  reported green with NaN/zero numbers (the misleading "COMPLETED" CUDA jobs).
- add benchmarks/streaming/diagnose_decode.py: isolates the streaming decode path
  (resolve path -> fsspec.open -> torchcodec VideoDecoder -> get one frame) and prints
  package versions + the first bytes of the handle. Pinpoints decode failures: bad/
  placeholder bytes vs ffmpeg/torchcodec build issue. RoboCasa videos are AV1; the
  failure message calls out AV1 decoder + NVDEC-on-Ada requirements explicitly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 16:40:24 +02:00
Pepijn b0ab57cedc fix(streaming): make matrix sbatch --wrap body POSIX-sh safe
`sbatch --wrap` runs the wrapped body under /bin/sh (dash), which has no
`set -o pipefail`, so every matrix job died on line 1 ("Illegal option -o pipefail")
before reaching the benchmark. The command has no pipes, so drop the bashism and chain
with `&&` (cd-guards the run) — fully POSIX-sh compatible. Runtime env expansion
(${HF_HOME:-$SCRATCH/hf_home}) is preserved.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 16:16:54 +02:00
Pepijn afdc084677 feat(streaming): serial-by-default matrix submitter (afterany dependency chain)
For a bandwidth-sensitive benchmark, concurrent jobs would share the network to the
Hub/bucket and corrupt throughput numbers. Chain the matrix jobs with
--dependency=afterany (captured via `sbatch --parsable`) so SLURM runs exactly one at a
time while keeping each config an isolated job (own log + per-job OOM reporting).
afterany keeps the chain going if one job fails/OOMs. SERIAL=0 restores parallel
submission for OOM-isolation-only testing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 15:55:58 +02:00
Pepijn a32a2c647b feat(streaming): full-matrix SLURM submitter + results summarizer
slurm/run_streaming_matrix.sh fans the benchmark matrix (sources {hub,bucket,
warmed_bucket} x modes {single,sarm} x decode {cpu,cuda}) out as isolated single-GPU
SLURM jobs, so an OOM in one config is contained and reported per-job by SLURM. Worker
count and shuffle buffer are bounded (lower for cuda, which holds a CUDA context + NVDEC
session per worker) to avoid host/VRAM OOM. Source/mode/decode/workers/buffer/account/
partition are env-overridable; SOURCES/MODES/DECODES select subsets.

benchmarks/streaming/summarize_results.py collapses the per-run JSONs into one comparison
table + summary.csv (frames/s/node, first-batch + p50/p95/p99 latency, cache hit-rate).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 15:51:36 +02:00
Pepijn 343ecd7980 feat(streaming): optional GPU (NVDEC) video decode device
Add `video_decode_device` to StreamingLeRobotDataset and a `device` arg to
VideoDecoderCache, passed to torchcodec's VideoDecoder. "cuda" offloads H.264/H.265
decode to the GPU's dedicated NVDEC engine (independent of the training SMs); requires
a CUDA-enabled torchcodec build.

benchmark: `--video_decode_device` flag. With cuda + num_workers>0 it forces the
`spawn` start method (CUDA cannot init in forked workers) and disables CPU pin_memory
(frames are already on-GPU). Decode device is recorded in results and the output
filename. README documents the NVDEC option and its concurrency/IPC caveats.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 15:47:11 +02:00
Pepijn f7c8a526e8 feat(streaming): wallclock benchmark throughput, cross-worker cache stats, bucket source
- benchmark: frames_per_s_node now measures sustained wall-clock throughput over the
  post-warmup window. The previous metric summed inter-batch gaps, which collapse to ~0
  under async prefetch (consumer drains a pre-filled queue) and overstated throughput ~100x.
- VideoDecoderCache gains an optional shared [hits, misses, evictions] counter tensor;
  StreamingLeRobotDataset.video_decoder_cache_stats() aggregates it across DataLoader
  workers (lock-free, approximate; hit_rate preserved). Fixes empty cache stats with workers.
- StreamingLeRobotDataset.data_files_root: read bulk data/ + videos/ from an fsspec root
  (e.g. hf://buckets/<owner>/<name>) while metadata still loads from repo_id. Enables
  bucket / prewarmed-bucket benchmark sources without copying metadata. Exposed as
  benchmark --data_files_root.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 15:25:44 +02:00
Pepijn 77af66a29c fix(streaming): decode video at episode-local timestamp + from_timestamp offset
make_frame used `item["index"] / fps` (a dataset-global value) as the in-file
video timestamp. That only matches the file timeline when the whole dataset is a
single video (as in the test fixtures); on multi-file v3 datasets it decodes
out-of-range frames and crashes (e.g. RoboCasa: "Invalid frame index=23314614 ...
must be less than 41021").

Mirror the map-style reader: use the episode-local `timestamp` column as the base,
clamp delta query timestamps to per-camera episode-local bounds [0, duration], and
shift by the episode's `from_timestamp` per camera at decode time. For single-file
datasets `from_timestamp + timestamp == index / fps`, so existing parity tests are
unaffected; multi-file streaming is now correct.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 14:54:10 +02:00
Pepijn 68fa5d80b0 feat(streaming): multinode example, dataloading benchmark, distributed smoke test
- examples/scaling/train_streaming_multinode.py: Accelerate-based distributed/
  resumable streaming training (no DistributedSampler; rank/world_size auto-resolved),
  checkpoints the dataset stream state, and supports a --dummy pure-dataloading path
  with throughput logging. SLURM launcher in slurm/train_streaming_robocasa.sh.
- benchmarks/streaming/benchmark_streaming.py: dummy-consumer dataloading benchmark
  (single / sarm frame modes) emitting frames/s/node, p50/p95/p99 sample latency,
  first-batch latency, and VideoDecoderCache reuse stats as JSON + CSV. SLURM launcher
  + README documenting the source/node/mode matrix and manual bucket prewarming.
- VideoDecoderCache: add hit/miss/eviction counters and a stats() method so the
  benchmark can surface decoder thrash (no new cache, no eviction-policy change).
- tests/datasets/test_streaming_distributed.py: accelerate-launch smoke test asserting
  per-rank disjointness; skips (does not false-pass) when <2 processes spawn.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 13:48:23 +02:00
Pepijn d1fc8e298c feat(streaming): distributed + resumable HF-native StreamingLeRobotDataset
Add the large-scale streaming pieces that were missing from the frame-streaming
internals, keeping the existing Backtrackable + output-reservoir frame-shuffle:

- split_dataset_by_node(rank, world_size) before the per-shard loop so each rank
  streams a disjoint set of shards (fixes duplicate data across GPUs). rank and
  world_size auto-resolve from Accelerate state / RANK,WORLD_SIZE env / (0, 1).
- get_worker_info() shard splitting so DataLoader workers within a rank don't
  yield duplicate frames.
- Dynamic Backtrackable window (dynamic_bounds=True) sized to the requested
  delta_timestamps, removing the fixed 100-frame ceiling so long horizons (e.g. a
  SARM window ~160 frames) reach real frames instead of silently padding. Fix the
  peek_back off-by-one: history = lookback + 1.
- video_decoder_cache_size knob; default (active_shards + 1) x num_cameras so the
  live decoder working set does not thrash the VideoDecoderCache LRU.
- state_dict()/load_state_dict() for resume (per-shard HF stream state + exhausted
  set + RNG). Reservoir is re-warmed, so resumption is not bit-exact (documented).
- factory.py wires buffer_size from a new DatasetConfig.streaming_buffer_size field
  instead of repurposing max_num_shards as the worker count.

Tests: tests/datasets/test_streaming_native.py covers distributed disjointness,
worker de-duplication, the SARM-length window, resume, schema parity vs map-style,
local video path resolution, and shuffle decorrelation. 21 passed (13 existing + 8).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 13:37:30 +02:00
14 changed files with 1225 additions and 588 deletions
@@ -0,0 +1,192 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Distributed, resumable streaming training on a large HF-hosted dataset.
This example shows how to train (or just stress the data pipeline) over a multi-TB dataset that never
touches local disk, scaling across GPUs and nodes with Accelerate. It demonstrates the large-scale
streaming features of :class:`StreamingLeRobotDataset`:
- per-rank sharding via ``split_dataset_by_node`` (each GPU streams disjoint data; ``rank``/``world_size``
are auto-resolved from the Accelerate state, so nothing needs to be passed explicitly);
- DataLoader-worker shard splitting (no duplicate frames within a rank);
- native `datasets` resume: the loader checkpoints stream state via ``state_dict()`` (``torchdata`` StatefulDataLoader when available, so ``num_workers > 0`` resumes too);
- an explicit video-decoder cache size so the working set of open decoders does not thrash.
Launch with Accelerate (single node, N GPUs):
accelerate launch --num_processes=8 examples/scaling/train_streaming_multinode.py \
--repo_id=lerobot/droid_1.0.1 --batch_size=64
Multinode runs launch the same script with your cluster's accelerate/SLURM setup.
Pass ``--dummy`` to skip the model entirely and measure pure dataloading throughput.
"""
import argparse
import time
from pathlib import Path
import torch
from accelerate import Accelerator
from torch.utils.data import DataLoader
from lerobot.datasets import LeRobotDatasetMetadata, StreamingLeRobotDataset
from lerobot.utils.constants import ACTION
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--repo_id", type=str, default="lerobot/droid_1.0.1")
parser.add_argument(
"--root", type=str, default=None, help="Local/prewarmed dataset root (else stream from Hub)."
)
parser.add_argument("--output_dir", type=str, default="outputs/train/streaming_multinode")
parser.add_argument("--steps", type=int, default=1000)
parser.add_argument("--batch_size", type=int, default=64, help="Per-process batch size.")
parser.add_argument("--num_workers", type=int, default=8)
parser.add_argument(
"--episode_pool_size",
type=int,
default=64,
help="Whole episodes open per consumer (randomness knob).",
)
parser.add_argument("--video_decoder_cache_size", type=int, default=None)
parser.add_argument("--n_action_steps", type=int, default=16, help="Action-chunk length (delta horizon).")
parser.add_argument("--save_freq", type=int, default=200)
parser.add_argument("--log_freq", type=int, default=20)
parser.add_argument("--resume_from", type=str, default=None, help="Checkpoint dir to resume from.")
parser.add_argument("--dummy", action="store_true", help="Skip the model; measure dataloading only.")
return parser.parse_args()
def make_dataloader(
args: argparse.Namespace, meta: LeRobotDatasetMetadata
) -> tuple[DataLoader, StreamingLeRobotDataset]:
# Supervise an action chunk; delta_timestamps drive the SARM-style temporal window.
delta_timestamps = {ACTION: [t / meta.fps for t in range(args.n_action_steps)]}
# rank / world_size are resolved automatically from the Accelerate state inside the dataset.
dataset = StreamingLeRobotDataset(
args.repo_id,
root=args.root,
delta_timestamps=delta_timestamps,
episode_pool_size=args.episode_pool_size,
video_decoder_cache_size=args.video_decoder_cache_size,
tolerance_s=1e-3,
)
# torchdata's StatefulDataLoader checkpoints each worker's dataset state through the
# dataset's native state_dict protocol, making resume work with num_workers > 0. Fall back
# to the plain DataLoader (resume then requires num_workers=0).
try:
from torchdata.stateful_dataloader import StatefulDataLoader
loader_cls = StatefulDataLoader
except ImportError:
loader_cls = DataLoader
loader = loader_cls(
dataset,
batch_size=args.batch_size,
num_workers=args.num_workers,
pin_memory=True,
drop_last=True,
prefetch_factor=2 if args.num_workers > 0 else None,
)
return loader, dataset
def main() -> None:
args = parse_args()
accelerator = Accelerator()
output_dir = Path(args.output_dir)
if accelerator.is_main_process:
output_dir.mkdir(parents=True, exist_ok=True)
meta = LeRobotDatasetMetadata(args.repo_id, root=args.root)
loader, dataset = make_dataloader(args, meta)
if args.dummy:
model = optimizer = None
else:
from lerobot.policies.act import ACTConfig, ACTPolicy
from lerobot.utils.feature_utils import dataset_to_policy_features
features = dataset_to_policy_features(meta.features)
output_features = {k: ft for k, ft in features.items() if k == ACTION}
input_features = {k: ft for k, ft in features.items() if k not in output_features}
cfg = ACTConfig(input_features=input_features, output_features=output_features)
model = ACTPolicy(cfg)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# Do NOT prepare the dataloader: the dataset is already rank-disjoint via
# split_dataset_by_node, and accelerate's IterableDatasetShard would keep only every
# world_size-th batch of it (silently training on 1/N of the data while decoding all
# of it). Batches are moved to the device manually in the loop.
model, optimizer = accelerator.prepare(model, optimizer)
# Resume: native datasets stream state, saved per rank. With torchdata's StatefulDataLoader
# the state covers every worker; with the plain DataLoader it is exact for num_workers=0.
can_checkpoint_loader = hasattr(loader, "state_dict")
if args.resume_from is not None:
state_path = Path(args.resume_from) / f"dataset_state_rank{accelerator.process_index}.pt"
state = torch.load(state_path, weights_only=False) # plain dict of stream offsets # nosec B614
if can_checkpoint_loader:
loader.load_state_dict(state)
else:
dataset.load_state_dict(state)
accelerator.print(f"Resumed dataset stream from {state_path}")
step = 0
frames_seen = 0
window_start = time.perf_counter()
done = False
while not done:
for batch in loader:
if model is not None:
batch = {k: (v.to(accelerator.device) if torch.is_tensor(v) else v) for k, v in batch.items()}
loss, _ = model.forward(batch)
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
step += 1
frames_seen += args.batch_size
if step % args.log_freq == 0:
elapsed = time.perf_counter() - window_start
fps_per_proc = (args.log_freq * args.batch_size) / max(elapsed, 1e-9)
total_fps = fps_per_proc * accelerator.num_processes
accelerator.print(
f"step {step} | {fps_per_proc:.1f} frames/s/proc | {total_fps:.1f} frames/s total"
+ ("" if model is None else f" | loss {loss.item():.3f}")
)
window_start = time.perf_counter()
if step % args.save_freq == 0:
ckpt = output_dir / f"checkpoint-{step}"
if accelerator.is_main_process:
ckpt.mkdir(parents=True, exist_ok=True)
accelerator.wait_for_everyone()
# Every rank saves its own stream state: shard positions differ per rank.
state = loader.state_dict() if can_checkpoint_loader else dataset.state_dict()
torch.save(state, ckpt / f"dataset_state_rank{accelerator.process_index}.pt")
if model is not None and accelerator.is_main_process:
accelerator.unwrap_model(model).save_pretrained(ckpt)
if step >= args.steps:
done = True
break
accelerator.print(f"End of training: {step} steps, ~{frames_seen} frames/proc")
if __name__ == "__main__":
main()
+8 -4
View File
@@ -95,7 +95,7 @@ dependencies = [
# ── Feature-scoped extras ──────────────────────────────────
dataset = [
"datasets>=4.7.0,<5.0.0",
"datasets>=4.7.0,<6.0.0",
"pandas>=2.0.0,<3.0.0", # NOTE: Transitive dependency of datasets
"pyarrow>=21.0.0,<30.0.0", # NOTE: Transitive dependency of datasets
"lerobot[av-dep]",
@@ -216,7 +216,7 @@ robometer = ["lerobot[transformers-dep]", "lerobot[qwen-vl-utils-dep]", "lerobot
topreward = ["lerobot[transformers-dep]"]
xvla = ["lerobot[transformers-dep]"]
eo1 = ["lerobot[transformers-dep]", "lerobot[qwen-vl-utils-dep]"]
hilserl = ["lerobot[transformers-dep]", "lerobot[dataset]", "gym-hil>=0.1.13,<0.2.0", "lerobot[grpcio-dep]", "lerobot[placo-dep]"]
hilserl = ["lerobot[transformers-dep]", "lerobot[dataset]", "gym-hil>=0.1.14,<0.2.0", "lerobot[grpcio-dep]", "lerobot[placo-dep]"]
vla_jepa = ["lerobot[transformers-dep]", "lerobot[diffusers-dep]", "lerobot[qwen-vl-utils-dep]"]
# Features
@@ -231,9 +231,9 @@ video_benchmark = ["scikit-image>=0.23.2,<0.26.0", "pandas>=2.2.2,<2.4.0"]
# Simulation
# NOTE: Explicitly listing scipy helps flatten the dependecy tree.
aloha = ["lerobot[dataset]", "gym-aloha>=0.1.2,<0.2.0", "lerobot[scipy-dep]"]
aloha = ["lerobot[dataset]", "gym-aloha>=0.1.4,<0.2.0", "lerobot[scipy-dep]"]
pusht = ["lerobot[dataset]", "gym-pusht>=0.1.5,<0.2.0", "pymunk>=6.6.0,<7.0.0"] # TODO: Fix pymunk version in gym-pusht instead
libero = ["lerobot[dataset]", "lerobot[transformers-dep]", "hf-libero>=0.1.3,<0.2.0; sys_platform == 'linux'", "lerobot[scipy-dep]"]
libero = ["lerobot[dataset]", "lerobot[transformers-dep]", "hf-libero>=0.1.4,<0.2.0; sys_platform == 'linux'", "lerobot[scipy-dep]"]
metaworld = ["lerobot[dataset]", "metaworld==3.0.0", "lerobot[scipy-dep]"]
# NOTE: vlabench is NOT exposed as a `lerobot` extra. Its only distribution
# is the OpenMOSS/VLABench GitHub repo (package name `VLABench`, no PyPI
@@ -333,6 +333,10 @@ explicit = true
[tool.uv.sources]
torch = [{ index = "pytorch-cu128", marker = "sys_platform == 'linux'" }]
torchvision = [{ index = "pytorch-cu128", marker = "sys_platform == 'linux'" }]
# Temporary: the native streaming pipeline needs batch(by_column=...) to survive shard/shuffle
# re-creation, fixed in datasets#8259 (merged, not yet released). Pin to the merge commit until the
# next datasets release ships it, then drop this and bump the floor in `dependencies`.
datasets = { git = "https://github.com/huggingface/datasets.git", rev = "2c45eab1bb975ac3d846f2aa6217b82adec8eba3" }
[tool.setuptools.package-data]
lerobot = ["envs/*.json"]
+4
View File
@@ -39,6 +39,10 @@ class DatasetConfig:
# This reduces memory and speeds up DataLoader IPC. The training pipeline handles the conversion.
return_uint8: bool = False
streaming: bool = False
# Whole episodes each streaming consumer keeps open to shuffle across (the randomness knob).
# Larger mixes more episodes per batch at the cost of cold-start latency; RAM stays small because
# the pool holds tabular rows only. Ignored when streaming is False.
streaming_episode_pool_size: int = 64
def __post_init__(self) -> None:
if self.episodes is not None:
+11 -2
View File
@@ -945,8 +945,17 @@ def _write_parquet(df: pd.DataFrame, path: Path, meta: LeRobotDatasetMetadata) -
ep_dataset = embed_images(ep_dataset)
table = ep_dataset.with_format("arrow")[:]
writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
writer.write_table(table)
# Emit several row groups with a page index instead of one giant row group. A single row group forces
# streaming readers to materialize the whole file's columns per open shard; with random-access streaming
# (shuffle + delta windows) across many workers x shards that dominates RAM. Targeting ~32MB-uncompressed
# groups bounds per-shard memory while keeping groups large enough to scan
# efficiently; the page index lets readers skip to the pages they need.
target_row_group_bytes = 32 * 1024 * 1024
row_group_size = max(1, min(table.num_rows, table.num_rows * target_row_group_bytes // max(table.nbytes, 1)))
writer = pq.ParquetWriter(
path, schema=table.schema, compression="snappy", use_dictionary=True, write_page_index=True
)
writer.write_table(table, row_group_size=row_group_size)
writer.close()
+1 -1
View File
@@ -106,7 +106,7 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas
delta_timestamps=delta_timestamps,
image_transforms=image_transforms,
revision=cfg.dataset.revision,
max_num_shards=cfg.num_workers,
episode_pool_size=cfg.dataset.streaming_episode_pool_size,
tolerance_s=cfg.tolerance_s,
return_uint8=True,
)
+7 -1
View File
@@ -30,6 +30,7 @@ class EpisodeAwareSampler:
drop_n_first_frames: int = 0,
drop_n_last_frames: int = 0,
shuffle: bool = False,
generator: torch.Generator | None = None,
):
"""Sampler that optionally incorporates episode boundary information.
@@ -41,6 +42,10 @@ class EpisodeAwareSampler:
drop_n_first_frames: Number of frames to drop from the start of each episode.
drop_n_last_frames: Number of frames to drop from the end of each episode.
shuffle: Whether to shuffle the indices.
generator: Generator used for shuffling. Exposing this attribute (even when None) lets
`accelerate` register it as the synchronized RNG in distributed training, so
every rank draws the same permutation and batch shards stay disjoint. When
None, shuffling falls back to the global torch RNG.
"""
if drop_n_first_frames < 0:
raise ValueError(f"drop_n_first_frames must be >= 0, got {drop_n_first_frames}")
@@ -73,10 +78,11 @@ class EpisodeAwareSampler:
self.indices = indices
self.shuffle = shuffle
self.generator = generator
def __iter__(self) -> Iterator[int]:
if self.shuffle:
for i in torch.randperm(len(self.indices)):
for i in torch.randperm(len(self.indices), generator=self.generator):
yield self.indices[i]
else:
for i in self.indices:
+344 -449
View File
@@ -13,16 +13,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import deque
from collections.abc import Callable, Generator, Iterable, Iterator
import logging
import time
from collections.abc import Callable, Iterator
from pathlib import Path
import datasets
import numpy as np
import torch
from datasets import load_dataset
from datasets.distributed import split_dataset_by_node
from lerobot.utils.constants import HF_LEROBOT_HOME, LOOKAHEAD_BACKTRACKTABLE, LOOKBACK_BACKTRACKTABLE
from lerobot.utils.constants import HF_LEROBOT_HOME
from .dataset_metadata import CODEBASE_VERSION, LeRobotDatasetMetadata
from .feature_utils import get_delta_indices
@@ -31,207 +33,56 @@ from .utils import (
check_version_compatibility,
find_float_index,
is_float_in_list,
safe_shard,
)
from .video_utils import (
VideoDecoderCache,
decode_video_frames_torchcodec,
)
logger = logging.getLogger(__name__)
class LookBackError(Exception):
"""
Exception raised when trying to look back in the history of a Backtrackable object.
"""
pass
class LookAheadError(Exception):
"""
Exception raised when trying to look ahead in the future of a Backtrackable object.
"""
pass
class Backtrackable[T]:
"""
Wrap any iterator/iterable so you can step back up to `history` items
and look ahead up to `lookahead` items.
This is useful for streaming datasets where you need to access previous and future items
but can't load the entire dataset into memory.
Example:
-------
```python
ds = load_dataset("c4", "en", streaming=True, split="train")
rev = Backtrackable(ds, history=3, lookahead=2)
x0 = next(rev) # forward
x1 = next(rev)
x2 = next(rev)
# Look ahead
x3_peek = rev.peek_ahead(1) # next item without moving cursor
x4_peek = rev.peek_ahead(2) # two items ahead
# Look back
x1_again = rev.peek_back(1) # previous item without moving cursor
x0_again = rev.peek_back(2) # two items back
# Move backward
x1_back = rev.prev() # back one step
next(rev) # returns x2, continues forward from where we were
```
"""
__slots__ = ("_source", "_back_buf", "_ahead_buf", "_cursor", "_history", "_lookahead")
def __init__(self, iterable: Iterable[T], *, history: int = 1, lookahead: int = 0):
if history < 1:
raise ValueError("history must be >= 1")
if lookahead <= 0:
raise ValueError("lookahead must be > 0")
self._source: Iterator[T] = iter(iterable)
self._back_buf: deque[T] = deque(maxlen=history)
self._ahead_buf: deque[T] = deque(maxlen=lookahead) if lookahead > 0 else deque()
self._cursor: int = 0
self._history = history
self._lookahead = lookahead
def __iter__(self) -> "Backtrackable[T]":
return self
def __next__(self) -> T:
# If we've stepped back, consume from back buffer first
if self._cursor < 0: # -1 means "last item", etc.
self._cursor += 1
return self._back_buf[self._cursor]
# If we have items in the ahead buffer, use them first
item = self._ahead_buf.popleft() if self._ahead_buf else next(self._source)
# Add current item to back buffer and reset cursor
self._back_buf.append(item)
self._cursor = 0
return item
def prev(self) -> T:
"""
Step one item back in history and return it.
Raises IndexError if already at the oldest buffered item.
"""
if len(self._back_buf) + self._cursor <= 1:
raise LookBackError("At start of history")
self._cursor -= 1
return self._back_buf[self._cursor]
def peek_back(self, n: int = 1) -> T:
"""
Look `n` items back (n=1 == previous item) without moving the cursor.
"""
if n < 0 or n + 1 > len(self._back_buf) + self._cursor:
raise LookBackError("peek_back distance out of range")
return self._back_buf[self._cursor - (n + 1)]
def peek_ahead(self, n: int = 1) -> T:
"""
Look `n` items ahead (n=1 == next item) without moving the cursor.
Fills the ahead buffer if necessary.
"""
if n < 1:
raise LookAheadError("peek_ahead distance must be 1 or more")
elif n > self._lookahead:
raise LookAheadError("peek_ahead distance exceeds lookahead limit")
# Fill ahead buffer if we don't have enough items
while len(self._ahead_buf) < n:
try:
item = next(self._source)
self._ahead_buf.append(item)
except StopIteration as err:
raise LookAheadError("peek_ahead: not enough items in source") from err
return self._ahead_buf[n - 1]
def history(self) -> list[T]:
"""
Return a copy of the buffered history (most recent last).
The list length ≤ `history` argument passed at construction.
"""
if self._cursor == 0:
return list(self._back_buf)
# When cursor<0, slice so the order remains chronological
return list(self._back_buf)[: self._cursor or None]
def can_peek_back(self, steps: int = 1) -> bool:
"""
Check if we can go back `steps` items without raising an IndexError.
"""
return steps <= len(self._back_buf) + self._cursor
def can_peek_ahead(self, steps: int = 1) -> bool:
"""
Check if we can peek ahead `steps` items.
This may involve trying to fill the ahead buffer.
"""
if self._lookahead > 0 and steps > self._lookahead:
return False
# Try to fill ahead buffer to check if we can peek that far
try:
while len(self._ahead_buf) < steps:
if self._lookahead > 0 and len(self._ahead_buf) >= self._lookahead:
return False
item = next(self._source)
self._ahead_buf.append(item)
return True
except StopIteration:
return False
# Bound the default frame-level shuffle buffer: rows are tabular-only (~KB each), so this is
# roughly a few hundred MB of host RAM per consumer at the cap.
_MAX_DEFAULT_FRAME_BUFFER = 200_000
class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
"""LeRobotDataset with streaming capabilities.
"""LeRobotDataset with streaming capabilities, built on native HF `datasets` primitives.
This class extends LeRobotDataset to add streaming functionality, allowing data to be streamed
rather than loaded entirely into memory. This is especially useful for large datasets that may
not fit in memory or when you want to quickly explore a dataset without downloading it completely.
The tabular side is a pure `datasets` pipeline::
The key innovation is using a Backtrackable iterator that maintains a bounded buffer of recent
items, allowing us to access previous frames for delta timestamps without loading the entire
dataset into memory.
load_dataset(streaming=True) # parquet shards from the Hub / a bucket
-> split_dataset_by_node(rank, world_size) # disjoint shards per rank
-> batch(by_column="episode_index") # whole episodes
-> shuffle(buffer_size=episode_pool_size) # episode pool (the randomness knob)
-> map(explode + exact delta windows) # episode -> frames, windows are exact
-> shuffle(buffer_size=frame_shuffle_buffer_size) # frame-level interleave
and this class is a thin torch ``IterableDataset`` wrapper around it that decodes video
per emitted sample (decode-on-exit), applies image transforms, and attaches the task
string. DataLoader workers are split natively by `datasets` (disjoint shards per worker),
and resume uses the native ``state_dict`` / ``load_state_dict``.
Randomness: a batch mixes up to ``episode_pool_size`` distinct episodes; delta windows are
exact slices of the resident episode with correct padding at episode boundaries.
Resume: ``state_dict()`` / ``load_state_dict()`` delegate to `datasets`. Samples sitting in
the shuffle buffers at checkpoint time are skipped on resume (documented `datasets`
behavior), so resume never repeats data but may drop up to roughly
``episode_pool_size x episode_len + frame_shuffle_buffer_size`` frames — negligible at
training scale. The contract is exact with ``num_workers=0``; with DataLoader workers use
``torchdata.stateful_dataloader.StatefulDataLoader``, which checkpoints each worker's
dataset state through this same protocol.
Example:
Basic usage:
```python
from lerobot.common.datasets.streaming_dataset import StreamingLeRobotDataset
# Create a streaming dataset with delta timestamps
delta_timestamps = {
"observation.image": [-1.0, -0.5, 0.0], # 1 sec ago, 0.5 sec ago, current
"action": [0.0, 0.1, 0.2], # current, 0.1 sec future, 0.2 sec future
}
dataset = StreamingLeRobotDataset(
repo_id="your-dataset-repo-id",
delta_timestamps=delta_timestamps,
streaming=True,
buffer_size=1000,
delta_timestamps={"action": [0.0, 0.1, 0.2]},
episode_pool_size=64,
)
# Iterate over the dataset
for i, item in enumerate(dataset):
print(f"Sample {i}: Episode {item['episode_index']} Frame {item['frame_index']}")
# item will contain stacked frames according to delta_timestamps
if i >= 10:
break
for sample in dataset:
...
```
"""
@@ -246,12 +97,19 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
revision: str | None = None,
force_cache_sync: bool = False,
streaming: bool = True,
buffer_size: int = 1000,
max_num_shards: int = 16,
episode_pool_size: int | None = 64,
frame_shuffle_buffer_size: int | None = None,
buffer_size: int | None = None,
max_num_shards: int | None = None,
seed: int = 42,
rng: np.random.Generator | None = None,
shuffle: bool = True,
return_uint8: bool = False,
rank: int | None = None,
world_size: int | None = None,
video_decoder_cache_size: int | None = None,
data_files_root: str | None = None,
video_decode_device: str = "cpu",
):
"""Initialize a StreamingLeRobotDataset.
@@ -267,11 +125,32 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
revision (str, optional): Git revision id (branch name, tag, or commit hash).
force_cache_sync (bool, optional): Flag to sync and refresh local files first.
streaming (bool, optional): Whether to stream the dataset or load it all. Defaults to True.
buffer_size (int, optional): Buffer size for shuffling when streaming. Defaults to 1000.
max_num_shards (int, optional): Number of shards to re-shard the input dataset into. Defaults to 16.
episode_pool_size (int, optional): Whole episodes each consumer keeps open to shuffle
across — the randomness knob. Larger mixes more episodes per batch (closer to
map-style uniform) at the cost of cold-start latency and frame-buffer RAM.
Defaults to 64.
frame_shuffle_buffer_size (int | None, optional): Frame-level shuffle buffer after the
episode pool. Defaults to ``episode_pool_size x average episode length`` (capped),
which matches the pool's mixing radius.
buffer_size (int | None, optional): Deprecated; superseded by ``episode_pool_size``.
max_num_shards (int | None, optional): Deprecated; `datasets` handles shard-to-worker
assignment natively.
seed (int, optional): Reproducibility random seed.
rng (np.random.Generator | None, optional): Random number generator.
shuffle (bool, optional): Whether to shuffle the dataset across exhaustions. Defaults to True.
rng (np.random.Generator | None, optional): Deprecated; ignored.
shuffle (bool, optional): Whether to shuffle. False yields episodes in stream order.
rank (int | None, optional): This process' rank for distributed training. Each rank streams
a disjoint set of shards via ``split_dataset_by_node``. When omitted, resolved from
Accelerate (``process_index``) or the ``RANK`` env var, defaulting to 0.
world_size (int | None, optional): Total number of distributed processes. When omitted,
resolved from Accelerate or ``WORLD_SIZE``, defaulting to 1. For an even per-rank split,
``num_shards % world_size == 0`` should hold (warned otherwise).
video_decoder_cache_size (int | None, optional): Max number of open video decoders to retain.
When omitted, sized to the episode pool's working set, capped at 128.
data_files_root (str | None, optional): fsspec root holding the bulk ``data/`` and ``videos/``
trees (e.g. ``hf://buckets/<owner>/<name>``). When set, parquet and video bytes are read
from there while metadata still loads from ``repo_id`` on the Hub.
video_decode_device (str, optional): Device for torchcodec decode. ``"cuda"`` offloads to
NVDEC (needs a CUDA torchcodec build and ``spawn`` DataLoader workers).
"""
super().__init__()
self.repo_id = repo_id
@@ -284,15 +163,37 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
self.tolerance_s = tolerance_s
self.revision = revision if revision else CODEBASE_VERSION
self.seed = seed
self.rng = rng if rng is not None else np.random.default_rng(seed)
if rng is not None:
logger.warning("StreamingLeRobotDataset: `rng` is deprecated and ignored; use `seed`.")
if buffer_size is not None:
logger.warning(
"StreamingLeRobotDataset: `buffer_size` is deprecated and ignored; "
"use `episode_pool_size` (whole episodes, not frames)."
)
if max_num_shards is not None:
logger.warning(
"StreamingLeRobotDataset: `max_num_shards` is deprecated and ignored; "
"`datasets` assigns shards to DataLoader workers natively."
)
self.shuffle = shuffle
self.streaming = streaming
self.buffer_size = buffer_size
self.episode_pool_size = max(1, episode_pool_size) if episode_pool_size else 64
self._return_uint8 = return_uint8
self.rank, self.world_size = self._resolve_distributed(rank, world_size)
self.video_decoder_cache_size = video_decoder_cache_size
self.data_files_root = data_files_root.rstrip("/") if data_files_root else None
self.video_decode_device = video_decode_device
# We cache the video decoders to avoid re-initializing them at each frame (avoiding a ~10x slowdown)
self.video_decoder_cache = None
# Shared [hits, misses, evictions, decode_ns, fetch_ns] tensor so DataLoader workers aggregate
# decoder-cache stats and component timings into one place the main process can read after
# iteration (see video_decoder_cache_stats() / timing_stats()).
self._cache_counters = torch.zeros(5, dtype=torch.int64).share_memory_()
self._epoch = 0
self._in_flight_epoch = 0
if self._requested_root is not None:
self.root.mkdir(exist_ok=True, parents=True)
@@ -314,15 +215,42 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
self.delta_timestamps = delta_timestamps
self.delta_indices = get_delta_indices(self.delta_timestamps, self.fps)
self.hf_dataset: datasets.IterableDataset = load_dataset(
self.repo_id if not self.streaming_from_local else str(self.root),
split="train",
streaming=self.streaming,
data_files="data/*/*.parquet",
revision=self.revision,
if self.data_files_root is not None:
# Bulk data lives in an fsspec root (e.g. an HF storage bucket); metadata stays on the Hub.
self.hf_dataset: datasets.IterableDataset = load_dataset(
"parquet",
split="train",
streaming=self.streaming,
data_files=f"{self.data_files_root}/data/*/*.parquet",
)
else:
self.hf_dataset = load_dataset(
self.repo_id if not self.streaming_from_local else str(self.root),
split="train",
streaming=self.streaming,
data_files="data/*/*.parquet",
revision=self.revision,
)
# Drop any parquet columns not declared in the dataset's feature contract. Some revisions / sources
# (e.g. an unversioned bucket holding `main`) carry extra, possibly variable-length annotation
# columns such as `language_events`; left in, they leak into the sample and break default DataLoader
# collation across frames of differing length. On a clean revision this is a no-op.
known_columns = set(self.meta.features)
extra_columns = [c for c in (self.hf_dataset.column_names or []) if c not in known_columns]
if extra_columns:
self.hf_dataset = self.hf_dataset.remove_columns(extra_columns)
self.num_shards = self.hf_dataset.num_shards
avg_episode_len = max(1, round(self.meta.total_frames / max(1, self.meta.total_episodes)))
self.frame_shuffle_buffer_size = (
frame_shuffle_buffer_size
if frame_shuffle_buffer_size is not None
else min(self.episode_pool_size * avg_episode_len, _MAX_DEFAULT_FRAME_BUFFER)
)
self.num_shards = min(self.hf_dataset.num_shards, max_num_shards)
self._pipeline = self._build_pipeline()
@property
def num_frames(self):
@@ -337,96 +265,223 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
return self.meta.fps
@staticmethod
def _iter_random_indices(
rng: np.random.Generator, buffer_size: int, random_batch_size=100
) -> Iterator[int]:
while True:
yield from (int(i) for i in rng.integers(0, buffer_size, size=random_batch_size))
def _resolve_distributed(rank: int | None, world_size: int | None) -> tuple[int, int]:
"""Resolve (rank, world_size) for distributed streaming.
@staticmethod
def _infinite_generator_over_elements(rng: np.random.Generator, elements: list[int]) -> Iterator[int]:
while True:
yield rng.choice(elements)
Explicit arguments win. Otherwise prefer an already-initialized Accelerate state, then the
``RANK``/``WORLD_SIZE`` env vars set by launchers, and finally fall back to single-process (0, 1).
"""
import os
if rank is not None and world_size is not None:
return rank, world_size
try:
from accelerate.state import PartialState
if PartialState._shared_state: # only read it if already initialized; never initialize here
state = PartialState()
return state.process_index, state.num_processes
except Exception:
logger.debug("Could not resolve distributed state from Accelerate; using env/defaults.")
env_rank = os.environ.get("RANK")
env_world = os.environ.get("WORLD_SIZE")
if env_rank is not None and env_world is not None:
return int(env_rank), int(env_world)
return 0, 1
def _build_pipeline(self) -> datasets.IterableDataset:
"""Assemble the native tabular pipeline (everything except video decode)."""
ds = self.hf_dataset
if self.world_size > 1:
if ds.num_shards % self.world_size != 0:
logger.warning(
f"num_shards ({ds.num_shards}) is not divisible by world_size ({self.world_size}): "
"datasets falls back to example-level splitting where every rank reads (and pays "
"for) the full stream. Re-shard the dataset or adjust world size."
)
ds = split_dataset_by_node(ds, rank=self.rank, world_size=self.world_size)
ds = ds.batch(by_column="episode_index")
episode_columns = list(ds.column_names or self.hf_dataset.column_names or [])
if self.shuffle:
ds = ds.shuffle(seed=self.seed, buffer_size=self.episode_pool_size)
# A row-count-changing batched map must drop the input columns explicitly; the exploded
# frames re-emit them (windowed keys replaced by their delta windows + *_is_pad masks).
ds = ds.map(self._explode_episodes, batched=True, remove_columns=episode_columns)
if self.shuffle:
ds = ds.shuffle(seed=self.seed + 1, buffer_size=max(2, self.frame_shuffle_buffer_size))
return ds
def _tabular_window_keys(self) -> list[str]:
if self.delta_indices is None:
return []
return [key for key in self.delta_indices if key not in self.meta.video_keys]
def _explode_episodes(self, episode_batch: dict[str, list[list]]) -> dict[str, list]:
"""Episode batches -> per-frame rows, with exact tabular delta windows and pad masks.
Runs inside the `datasets` pipeline (plain Python values, no torch). For each windowed key
the original per-frame value is replaced by its delta window (list of values, clamped to
the episode bounds) plus a ``{key}_is_pad`` mask, mirroring the map-style dataset.
"""
window_keys = set(self._tabular_window_keys())
out: dict[str, list] = {key: [] for key in episode_batch if key not in window_keys}
for key in window_keys:
out[key] = []
out[f"{key}_is_pad"] = []
num_episodes = len(episode_batch["episode_index"])
for e in range(num_episodes):
length = len(episode_batch["episode_index"][e])
for key, column in episode_batch.items():
if key in window_keys:
continue
out[key].extend(column[e])
for key in window_keys:
episode_column = episode_batch[key][e]
deltas = self.delta_indices[key]
for t in range(length):
window = []
is_pad = []
for delta in deltas:
j = t + delta
window.append(episode_column[min(max(j, 0), length - 1)])
is_pad.append(not 0 <= j < length)
out[key].append(window)
out[f"{key}_is_pad"].append(is_pad)
return out
def _make_video_decoder_cache(self) -> VideoDecoderCache:
"""Size the decoder cache to the pool's working set (pool episodes x cameras), capped at 128."""
if self.video_decoder_cache_size is not None:
return VideoDecoderCache(
max_size=self.video_decoder_cache_size,
counters=self._cache_counters,
device=self.video_decode_device,
)
num_cameras = len(self.meta.video_keys)
if num_cameras == 0:
return VideoDecoderCache(counters=self._cache_counters, device=self.video_decode_device)
return VideoDecoderCache(
max_size=min((self.episode_pool_size + 1) * num_cameras, 128),
counters=self._cache_counters,
device=self.video_decode_device,
)
# TODO(fracapuano): Implement multi-threaded prefetching to accelerate data loading.
# The current sequential iteration is a bottleneck. A producer-consumer pattern
# could be used with a ThreadPoolExecutor to run `make_frame` (especially video decoding)
# in parallel, feeding a queue from which this iterator will yield processed items.
def __iter__(self) -> Iterator[dict[str, torch.Tensor]]:
if self.video_decoder_cache is None:
self.video_decoder_cache = VideoDecoderCache()
# `datasets` reshuffles (and re-permutes shard order) per epoch from (seed, epoch);
# DataLoader workers each advance their own copy's counter in lockstep. The in-flight
# epoch is tracked separately so a mid-iteration state_dict() records the epoch the
# stream position actually belongs to.
self._in_flight_epoch = self._epoch
self._pipeline.set_epoch(self._in_flight_epoch)
self._epoch += 1
self.video_decoder_cache = self._make_video_decoder_cache()
# keep the same seed across exhaustions if shuffle is False, otherwise shuffle data across exhaustions
rng = np.random.default_rng(self.seed) if not self.shuffle else self.rng
iterator = iter(self._pipeline)
while True:
fetch_start = time.perf_counter_ns()
try:
row = next(iterator)
except StopIteration:
return
finally:
self._cache_counters[4] += time.perf_counter_ns() - fetch_start
yield self._finalize_sample(row)
buffer_indices_generator = self._iter_random_indices(rng, self.buffer_size)
def _finalize_sample(self, row: dict) -> dict:
"""Torch conversion + video decode (decode-on-exit) + transforms + task for one frame."""
window_keys = self._tabular_window_keys()
pad_masks = {f"{key}_is_pad": torch.BoolTensor(row.pop(f"{key}_is_pad")) for key in window_keys}
item = item_to_torch(row)
item.update(pad_masks)
idx_to_backtrack_dataset = {
idx: self._make_backtrackable_dataset(safe_shard(self.hf_dataset, idx, self.num_shards))
for idx in range(self.num_shards)
if len(self.meta.video_keys) > 0:
ep_idx = int(item["episode_index"])
current_ts = float(item["timestamp"])
# Per-camera episode-local bounds [0, duration]: out-of-episode deltas pad instead of
# decoding against a neighbouring episode sharing the same video file.
episode_boundaries_ts = {
key: (
0.0,
self.meta.episodes[ep_idx][f"videos/{key}/to_timestamp"]
- self.meta.episodes[ep_idx][f"videos/{key}/from_timestamp"],
)
for key in self.meta.video_keys
}
original_timestamps = self._make_timestamps_from_indices(current_ts, self.delta_indices)
query_timestamps = self._get_query_timestamps(
current_ts, self.delta_indices, episode_boundaries_ts
)
decode_start = time.perf_counter_ns()
video_frames = self._query_videos(query_timestamps, ep_idx)
self._cache_counters[3] += time.perf_counter_ns() - decode_start
if self.image_transforms is not None:
for cam in self.meta.camera_keys:
video_frames[cam] = self.image_transforms(video_frames[cam])
item.update(video_frames)
if self.delta_indices is not None:
item.update(
self._get_video_frame_padding_mask(video_frames, query_timestamps, original_timestamps)
)
item["task"] = self.meta.tasks.iloc[int(item["task_index"])].name
return item
def set_epoch(self, epoch: int) -> None:
"""Set the epoch the next ``__iter__`` will use (reshuffles the native pipeline)."""
self._epoch = epoch
def state_dict(self) -> dict:
"""Native `datasets` stream state. Exact contract with ``num_workers=0``; with DataLoader
workers use ``torchdata.stateful_dataloader.StatefulDataLoader`` (it checkpoints each
worker's copy through this protocol). Samples in the shuffle buffers are skipped on
resume (never repeated), bounded by the pool + frame buffer sizes.
"""
return {"pipeline": self._pipeline.state_dict(), "epoch": self._in_flight_epoch}
def load_state_dict(self, state_dict: dict) -> None:
# Resume continues inside the recorded epoch: the next __iter__ replays that epoch's
# shuffle order from the restored stream position, then advances normally.
self._epoch = int(state_dict.get("epoch", 0))
self._pipeline.load_state_dict(state_dict["pipeline"])
def video_decoder_cache_stats(self) -> dict[str, int | float]:
"""Decoder-cache reuse aggregated across DataLoader workers via the shared counter tensor.
Unlike ``self.video_decoder_cache.stats()`` (which only reflects the main process), this sums
hits/misses/evictions over every worker. Counts are lock-free across processes, so treat them as
approximate; the ``hit_rate`` ratio is preserved.
"""
hits, misses, evictions = (int(x) for x in self._cache_counters[:3].tolist())
total = hits + misses
return {
"hits": hits,
"misses": misses,
"evictions": evictions,
"hit_rate": round(hits / total, 4) if total else 0.0,
}
# This buffer is populated while iterating on the dataset's shards
# the logic is to add 2 levels of randomness:
# (1) sample one shard at random from the ones available, and
# (2) sample one frame from the shard sampled at (1)
frames_buffer = []
while available_shards := list(idx_to_backtrack_dataset.keys()):
shard_key = next(self._infinite_generator_over_elements(rng, available_shards))
backtrack_dataset = idx_to_backtrack_dataset[shard_key] # selects which shard to iterate on
try:
for frame in self.make_frame(backtrack_dataset):
if len(frames_buffer) == self.buffer_size:
i = next(buffer_indices_generator) # samples a element from the buffer
yield frames_buffer[i]
frames_buffer[i] = frame
else:
frames_buffer.append(frame)
break # random shard sampled, switch shard
except (
RuntimeError,
StopIteration,
): # NOTE: StopIteration inside a generator throws a RuntimeError since python 3.7
del idx_to_backtrack_dataset[shard_key] # Remove exhausted shard, onto another shard
# Once shards are all exhausted, shuffle the buffer and yield the remaining frames
rng.shuffle(frames_buffer)
yield from frames_buffer
def _get_window_steps(
self, delta_timestamps: dict[str, list[float]] | None = None, dynamic_bounds: bool = False
) -> tuple[int, int]:
if delta_timestamps is None:
return 1, 1
if not dynamic_bounds:
# Fix the windows
lookback = LOOKBACK_BACKTRACKTABLE
lookahead = LOOKAHEAD_BACKTRACKTABLE
else:
# Dynamically adjust the windows based on the given delta_timesteps
all_timestamps = sum(delta_timestamps.values(), [])
lookback = min(all_timestamps) * self.fps
lookahead = max(all_timestamps) * self.fps
# When lookback is >=0 it means no negative timesteps have been provided
lookback = 0 if lookback >= 0 else (lookback * -1)
return lookback, lookahead
def _make_backtrackable_dataset(self, dataset: datasets.IterableDataset) -> Backtrackable:
lookback, lookahead = self._get_window_steps(self.delta_timestamps)
return Backtrackable(dataset, history=lookback, lookahead=lookahead)
def timing_stats(self) -> dict[str, float]:
"""Cumulative seconds spent in video decode and in the upstream tabular pipeline (parquet
fetch + grouping + shuffles + explode), summed across DataLoader workers via the shared
counter tensor. These overlap in wall-clock (workers run in parallel), so compare them to
``num_workers x wallclock`` for time fractions.
"""
decode_ns, fetch_ns = (int(x) for x in self._cache_counters[3:5].tolist())
return {"decode_s_total": round(decode_ns / 1e9, 2), "fetch_s_total": round(fetch_ns / 1e9, 2)}
def _make_timestamps_from_indices(
self, start_ts: float, indices: dict[str, list[int]] | None = None
) -> dict[str, list[float]]:
if indices is not None:
return {
key: (
start_ts + torch.tensor(indices[key]) / self.fps
).tolist() # NOTE: why not delta_timestamps directly?
key: (start_ts + torch.tensor(indices[key]) / self.fps).tolist()
for key in self.delta_timestamps
}
else:
@@ -463,65 +518,6 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
return padding_mask
def make_frame(self, dataset_iterator: Backtrackable) -> Generator:
"""Makes a frame starting from a dataset iterator"""
item = next(dataset_iterator)
item = item_to_torch(item)
updates = [] # list of "updates" to apply to the item retrieved from hf_dataset (w/o camera features)
# Get episode index from the item
ep_idx = item["episode_index"]
# "timestamp" restarts from 0 for each episode, whereas we need a global timestep within the single .mp4 file (given by index/fps)
current_ts = item["index"] / self.fps
episode_boundaries_ts = {
key: (
self.meta.episodes[ep_idx][f"videos/{key}/from_timestamp"],
self.meta.episodes[ep_idx][f"videos/{key}/to_timestamp"],
)
for key in self.meta.video_keys
}
# Apply delta querying logic if necessary
if self.delta_indices is not None:
query_result, padding = self._get_delta_frames(dataset_iterator, item)
updates.append(query_result)
updates.append(padding)
# Load video frames, when needed
if len(self.meta.video_keys) > 0:
original_timestamps = self._make_timestamps_from_indices(current_ts, self.delta_indices)
# Some timestamps might not result available considering the episode's boundaries
query_timestamps = self._get_query_timestamps(
current_ts, self.delta_indices, episode_boundaries_ts
)
video_frames = self._query_videos(query_timestamps, ep_idx)
if self.image_transforms is not None:
image_keys = self.meta.camera_keys
for cam in image_keys:
video_frames[cam] = self.image_transforms(video_frames[cam])
updates.append(video_frames)
if self.delta_indices is not None:
# We always return the same number of frames. Unavailable frames are padded.
padding_mask = self._get_video_frame_padding_mask(
video_frames, query_timestamps, original_timestamps
)
updates.append(padding_mask)
result = item.copy()
for update in updates:
result.update(update)
result["task"] = self.meta.tasks.iloc[item["task_index"]].name
yield result
def _get_query_timestamps(
self,
current_ts: float,
@@ -552,11 +548,20 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
item = {}
for video_key, query_ts in query_timestamps.items():
root = self.meta.url_root if self.streaming and not self.streaming_from_local else self.root
video_path = f"{root}/{self.meta.get_video_file_path(ep_idx, video_key)}"
# query_ts is episode-local; shift to the absolute in-file timeline by the episode's offset.
from_timestamp = self.meta.episodes[ep_idx][f"videos/{video_key}/from_timestamp"]
shifted_query_ts = [from_timestamp + ts for ts in query_ts]
rel_path = str(self.meta.get_video_file_path(ep_idx, video_key))
if self.data_files_root is not None:
root = self.data_files_root
elif self.streaming and not self.streaming_from_local:
root = self.meta.url_root
else:
root = self.root
video_path = f"{root}/{rel_path}"
frames = decode_video_frames_torchcodec(
video_path,
query_ts,
shifted_query_ts,
self.tolerance_s,
decoder_cache=self.video_decoder_cache,
return_uint8=self._return_uint8,
@@ -566,116 +571,6 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
return item
def _get_delta_frames(self, dataset_iterator: Backtrackable, current_item: dict):
# TODO(fracapuano): Modularize this function, refactor the code
"""Get frames with delta offsets using the backtrackable iterator.
Args:
current_item (dict): Current item from the iterator.
ep_idx (int): Episode index.
Returns:
tuple: (query_result, padding) - frames at delta offsets and padding info.
"""
current_episode_idx = current_item["episode_index"]
# Prepare results
query_result = {}
padding = {}
for key, delta_indices in self.delta_indices.items():
if key in self.meta.video_keys:
continue # visual frames are decoded separately
target_frames = []
is_pad = []
# Create a results dictionary to store frames in processing order, then reconstruct original order for stacking
delta_results = {}
# Separate and sort deltas by difficulty (easier operations first)
negative_deltas = sorted([d for d in delta_indices if d < 0], reverse=True) # [-1, -2, -3, ...]
positive_deltas = sorted([d for d in delta_indices if d > 0]) # [1, 2, 3, ...]
zero_deltas = [d for d in delta_indices if d == 0]
# Process zero deltas (current frame)
for delta in zero_deltas:
delta_results[delta] = (
current_item[key],
False,
)
# Process negative deltas in order of increasing difficulty
lookback_failed = False
last_successful_frame = current_item[key]
for delta in negative_deltas:
if lookback_failed:
delta_results[delta] = (last_successful_frame, True)
continue
try:
steps_back = abs(delta)
if dataset_iterator.can_peek_back(steps_back):
past_item = dataset_iterator.peek_back(steps_back)
past_item = item_to_torch(past_item)
if past_item["episode_index"] == current_episode_idx:
delta_results[delta] = (past_item[key], False)
last_successful_frame = past_item[key]
else:
raise LookBackError("Retrieved frame is from different episode!")
else:
raise LookBackError("Cannot go back further than the history buffer!")
except LookBackError:
delta_results[delta] = (last_successful_frame, True)
lookback_failed = True # All subsequent negative deltas will also fail
# Process positive deltas in order of increasing difficulty
lookahead_failed = False
last_successful_frame = current_item[key]
for delta in positive_deltas:
if lookahead_failed:
delta_results[delta] = (last_successful_frame, True)
continue
try:
if dataset_iterator.can_peek_ahead(delta):
future_item = dataset_iterator.peek_ahead(delta)
future_item = item_to_torch(future_item)
if future_item["episode_index"] == current_episode_idx:
delta_results[delta] = (future_item[key], False)
last_successful_frame = future_item[key]
else:
raise LookAheadError("Retrieved frame is from different episode!")
else:
raise LookAheadError("Cannot go ahead further than the lookahead buffer!")
except LookAheadError:
delta_results[delta] = (last_successful_frame, True)
lookahead_failed = True # All subsequent positive deltas will also fail
# Reconstruct original order for stacking
for delta in delta_indices:
frame, is_padded = delta_results[delta]
# add batch dimension for stacking
target_frames.append(frame) # frame.unsqueeze(0))
is_pad.append(is_padded)
# Stack frames and add to results
if target_frames:
query_result[key] = torch.stack(target_frames)
padding[f"{key}_is_pad"] = torch.BoolTensor(is_pad)
return query_result, padding
def _validate_delta_timestamp_keys(self, delta_timestamps: dict[list[float]]) -> None:
"""
Validate that all keys in delta_timestamps correspond to actual features in the dataset.
+158 -13
View File
@@ -22,6 +22,7 @@ import queue
import shutil
import tempfile
import threading
import time
import warnings
from collections import OrderedDict
from dataclasses import asdict, dataclass, field
@@ -47,6 +48,92 @@ from lerobot.utils.import_utils import get_safe_default_video_backend
logger = logging.getLogger(__name__)
DEFAULT_REMOTE_IO_MAX_RETRIES = 5
"""Retry budget for transient hf:// / fsspec / httpx transport errors during streaming video decode.
Streaming a dataset from an HF bucket/CDN issues many small range requests and occasionally hits a
transient transport failure (timeout, dropped connection, 408/5xx). The right response is to rebuild
the connection and retry rather than crash the DataLoader worker. Override via
``LEROBOT_REMOTE_IO_MAX_RETRIES``; set to ``0`` to disable retries (fail fast).
"""
# Transient transport failures from the hf:// -> fsspec -> httpx stack. We match on text because the
# concrete exception types live in optional deps (httpx, huggingface_hub) and vary across versions.
# "client has been closed" is the important one: once a shared httpx client is closed by a single
# failed read, every subsequent read in that worker fails until the fsspec instance cache is cleared.
_RETRYABLE_TRANSPORT_FRAGMENTS = (
"client has been closed",
"server disconnected",
"remoteprotocolerror",
"unexpected_eof",
"eof occurred in violation of protocol",
"connection reset",
"connection aborted",
"connection broken",
"incompleteread",
"read operation timed out",
"timed out",
"request time-out",
"408",
"502",
"503",
"504",
)
def _remote_io_max_retries() -> int:
raw = os.environ.get("LEROBOT_REMOTE_IO_MAX_RETRIES")
if raw is None:
return DEFAULT_REMOTE_IO_MAX_RETRIES
try:
return max(0, int(raw))
except ValueError as e:
raise ValueError(f"LEROBOT_REMOTE_IO_MAX_RETRIES must be an integer; got {raw!r}") from e
def _is_retryable_transport_error(exc: BaseException) -> bool:
"""True if ``exc`` looks like a transient remote-IO failure worth retrying (vs a real bug)."""
text = f"{type(exc).__name__}: {exc}".lower()
return any(fragment in text for fragment in _RETRYABLE_TRANSPORT_FRAGMENTS)
def _recover_remote_io(decoder_cache: "VideoDecoderCache", video_path: str) -> None:
"""Drop the dead decoder for ``video_path`` and force a fresh fsspec client before a retry.
fsspec caches one filesystem instance per (protocol, args), and that instance owns the httpx
client a failed read may have closed. Clearing the instance cache makes the next ``fsspec.open``
build a new client, which is what breaks the "client has been closed" cascade.
"""
decoder_cache.invalidate(video_path)
with contextlib.suppress(Exception):
fsspec.AbstractFileSystem.clear_instance_cache()
def _retry_remote_io(operation, on_retry, max_retries: int, base_delay: float = 0.5, max_delay: float = 10.0):
"""Run ``operation()``, retrying transient transport errors after ``on_retry()`` + capped backoff.
Non-transport errors (decode / index / timestamp issues) propagate immediately so real bugs are
never masked by retries.
"""
attempt = 0
while True:
try:
return operation()
except Exception as e:
if attempt >= max_retries or not _is_retryable_transport_error(e):
raise
attempt += 1
logger.warning(
"Transient remote-IO error (%s: %s); rebuilding connection and retrying (%d/%d).",
type(e).__name__,
e,
attempt,
max_retries,
)
on_retry()
time.sleep(min(base_delay * 2 ** (attempt - 1), max_delay))
def decode_video_frames(
video_path: Path | str,
timestamps: list[float],
@@ -242,7 +329,12 @@ class VideoDecoderCache:
_SENTINEL: ClassVar[object] = object()
def __init__(self, max_size: int | None | object = _SENTINEL):
def __init__(
self,
max_size: int | None | object = _SENTINEL,
counters: "torch.Tensor | None" = None,
device: str = "cpu",
):
if max_size is VideoDecoderCache._SENTINEL:
max_size = _default_max_cache_size()
if max_size is not None and max_size <= 0:
@@ -250,6 +342,18 @@ class VideoDecoderCache:
self.max_size: int | None = max_size # type: ignore[assignment]
self._cache: OrderedDict[str, tuple[Any, Any]] = OrderedDict()
self._lock = Lock()
# Decode device for the underlying torchcodec VideoDecoder. "cuda" offloads H.264/H.265 decode to
# the GPU's dedicated NVDEC engine (independent of the SMs used for training); requires a
# CUDA-enabled torchcodec/FFmpeg build. See https://developer.nvidia.com/video-codec-sdk.
self.device = device
# Observability counters (cheap, updated under the lock) for benchmarking decoder reuse.
self.hits = 0
self.misses = 0
self.evictions = 0
# Optional shared [hits, misses, evictions] tensor so DataLoader workers aggregate into one place
# (the per-worker `self.*` ints are invisible to the main process). Lock-free across processes, so
# treat the aggregate as approximate; the hit-rate ratio is preserved.
self._counters = counters
def __contains__(self, video_path: object) -> bool:
with self._lock:
@@ -271,11 +375,21 @@ class VideoDecoderCache:
entry = self._cache.get(video_path)
if entry is not None:
self._cache.move_to_end(video_path)
self.hits += 1
if self._counters is not None:
self._counters[0] += 1
return entry[0]
file_handle = fsspec.open(video_path).__enter__()
self.misses += 1
if self._counters is not None:
self._counters[1] += 1
# Bound per-handle buffering: with many decoders kept open at once (one per camera per active
# shard, across all workers), the default fsspec read cache balloons RAM on remote backends
# like hf:// buckets. A small readahead cache caps each handle's footprint without hurting the
# mostly-sequential reads torchcodec issues.
file_handle = fsspec.open(video_path, cache_type="readahead", block_size=2**20).__enter__()
try:
decoder = VideoDecoder(file_handle, seek_mode="approximate")
decoder = VideoDecoder(file_handle, seek_mode="approximate", device=self.device)
except Exception:
file_handle.close()
raise
@@ -287,6 +401,9 @@ class VideoDecoderCache:
if self.max_size is not None:
while len(self._cache) > self.max_size:
_evicted_path, (_evicted_decoder, evicted_handle) = self._cache.popitem(last=False)
self.evictions += 1
if self._counters is not None:
self._counters[2] += 1
with contextlib.suppress(Exception):
evicted_handle.close()
@@ -300,11 +417,35 @@ class VideoDecoderCache:
file_handle.close()
self._cache.clear()
def invalidate(self, video_path: str) -> None:
"""Drop and close the cached decoder for a path whose connection went bad.
After a transport error the cached ``fsspec`` handle (and the httpx client behind it) is dead;
removing the entry forces the next :meth:`get_decoder` to re-open a fresh handle.
"""
with self._lock:
entry = self._cache.pop(str(video_path), None)
if entry is not None:
with contextlib.suppress(Exception):
entry[1].close()
def size(self) -> int:
"""Return the number of cached decoders."""
with self._lock:
return len(self._cache)
def stats(self) -> dict[str, int | float]:
"""Return reuse counters (hits/misses/evictions, hit rate, current size) for benchmarking."""
with self._lock:
total = self.hits + self.misses
return {
"hits": self.hits,
"misses": self.misses,
"evictions": self.evictions,
"hit_rate": self.hits / total if total else 0.0,
"size": len(self._cache),
}
class FrameTimestampError(ValueError):
"""Helper error to indicate the retrieved timestamps exceed the queried ones"""
@@ -343,20 +484,24 @@ def decode_video_frames_torchcodec(
if decoder_cache is None:
decoder_cache = _default_decoder_cache
# Use cached decoder instead of creating new one each time
decoder = decoder_cache.get_decoder(str(video_path))
def _decode_frames():
# Both opening the decoder and reading frames go over the network for hf:// paths, so wrap the
# whole unit: a transient transport error retries by dropping the dead handle and rebuilding
# the connection (see _retry_remote_io / _recover_remote_io) instead of killing the worker.
decoder = decoder_cache.get_decoder(str(video_path))
average_fps = decoder.metadata.average_fps
frame_indices = [round(ts * average_fps) for ts in timestamps]
return decoder.get_frames_at(indices=frame_indices)
frames_batch = _retry_remote_io(
_decode_frames,
on_retry=lambda: _recover_remote_io(decoder_cache, str(video_path)),
max_retries=_remote_io_max_retries(),
)
loaded_ts = []
loaded_frames = []
# get metadata for frame information
metadata = decoder.metadata
average_fps = metadata.average_fps
# convert timestamps to frame indices
frame_indices = [round(ts * average_fps) for ts in timestamps]
# retrieve frames based on indices
frames_batch = decoder.get_frames_at(indices=frame_indices)
for frame, pts in zip(frames_batch.data, frames_batch.pts_seconds, strict=True):
loaded_frames.append(frame)
loaded_ts.append(pts.item())
+15 -5
View File
@@ -232,15 +232,18 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
# Dataset loading synchronization: main process downloads first to avoid race conditions
if is_main_process:
logging.info("Creating dataset")
# Dataset loading synchronization: each node's local main process downloads first to avoid
# race conditions (the global main process only exists on node 0, so gating on it would let
# all ranks of the other nodes download and build the Arrow cache concurrently).
if accelerator.is_local_main_process:
if is_main_process:
logging.info("Creating dataset")
dataset = make_dataset(cfg)
accelerator.wait_for_everyone()
# Now all other processes can safely load the dataset
if not is_main_process:
# Now all other processes can safely load the dataset from the local cache
if not accelerator.is_local_main_process:
dataset = make_dataset(cfg)
# Create environment used for evaluating checkpoints during training on simulation data.
@@ -386,12 +389,19 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
# create dataloader for offline training
if hasattr(active_cfg, "drop_n_last_frames"):
shuffle = False
# A dedicated generator (rather than the global torch RNG) lets accelerator.prepare
# synchronize the shuffle permutation across ranks, keeping batch shards disjoint even
# when ranks consume the global RNG asymmetrically (e.g. eval on the main process only).
sampler_generator = torch.Generator()
if cfg.seed is not None:
sampler_generator.manual_seed(cfg.seed)
sampler = EpisodeAwareSampler(
dataset.meta.episodes["dataset_from_index"],
dataset.meta.episodes["dataset_to_index"],
episode_indices_to_use=dataset.episodes,
drop_n_last_frames=active_cfg.drop_n_last_frames,
shuffle=True,
generator=sampler_generator,
)
else:
shuffle = True
+24
View File
@@ -114,6 +114,30 @@ def test_shuffle():
assert set(sampler) == {0, 1, 2, 3, 4, 5}
def test_shuffle_with_generator_is_deterministic():
# Two samplers shuffling with same-seed generators must yield identical permutations.
# This is what keeps batch shards disjoint across ranks in distributed training, where
# accelerate synchronizes the sampler's generator state instead of the global torch RNG.
sampler_a = EpisodeAwareSampler([0], [6], shuffle=True, generator=torch.Generator().manual_seed(42))
sampler_b = EpisodeAwareSampler([0], [6], shuffle=True, generator=torch.Generator().manual_seed(42))
assert list(sampler_a) == list(sampler_b)
# Desyncing the global RNG must not affect the permutation.
sampler_c = EpisodeAwareSampler([0], [6], shuffle=True, generator=torch.Generator().manual_seed(42))
order_before = list(sampler_c)
sampler_c.generator.manual_seed(42)
torch.randperm(1000) # consume global RNG, as rank-asymmetric code (e.g. eval) would
assert list(sampler_c) == order_before
def test_generator_attribute_defaults_to_none():
# accelerate detects synchronizable samplers via `hasattr(sampler, "generator")`,
# so the attribute must exist even when no generator is passed.
sampler = EpisodeAwareSampler([0], [6], shuffle=True)
assert sampler.generator is None
assert set(sampler) == {0, 1, 2, 3, 4, 5}
def test_negative_drop_first_frames_raises():
with pytest.raises(ValueError, match="drop_n_first_frames must be >= 0"):
EpisodeAwareSampler([0], [10], drop_n_first_frames=-1)
+30 -95
View File
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pytest
import torch
@@ -25,52 +24,6 @@ from lerobot.utils.constants import ACTION
from tests.fixtures.constants import DUMMY_REPO_ID
def get_frames_expected_order(streaming_ds: StreamingLeRobotDataset) -> list[int]:
"""Replicates the shuffling logic of StreamingLeRobotDataset to get the expected order of indices."""
rng = np.random.default_rng(streaming_ds.seed)
buffer_size = streaming_ds.buffer_size
num_shards = streaming_ds.num_shards
shards_indices = []
for shard_idx in range(num_shards):
shard = streaming_ds.hf_dataset.shard(num_shards, index=shard_idx)
shard_indices = [item["index"] for item in shard]
shards_indices.append(shard_indices)
shard_iterators = {i: iter(s) for i, s in enumerate(shards_indices)}
buffer_indices_generator = streaming_ds._iter_random_indices(rng, buffer_size)
frames_buffer = []
expected_indices = []
while shard_iterators: # While there are still available shards
available_shard_keys = list(shard_iterators.keys())
if not available_shard_keys:
break
# Call _infinite_generator_over_elements with current available shards (key difference!)
shard_key = next(streaming_ds._infinite_generator_over_elements(rng, available_shard_keys))
try:
frame_index = next(shard_iterators[shard_key])
if len(frames_buffer) == buffer_size:
i = next(buffer_indices_generator)
expected_indices.append(frames_buffer[i])
frames_buffer[i] = frame_index
else:
frames_buffer.append(frame_index)
except StopIteration:
del shard_iterators[shard_key] # Remove exhausted shard
rng.shuffle(frames_buffer)
expected_indices.extend(frames_buffer)
return expected_indices
def test_single_frame_consistency(tmp_path, lerobot_dataset_factory):
"""Test if are correctly accessed"""
ds_num_frames = 400
@@ -120,10 +73,9 @@ def test_single_frame_consistency(tmp_path, lerobot_dataset_factory):
[False, True],
)
def test_frames_order_over_epochs(tmp_path, lerobot_dataset_factory, shuffle):
"""Test if streamed frames correspond to shuffling operations over in-memory dataset."""
"""Each epoch covers every frame exactly once; shuffle reshuffles across epochs."""
ds_num_frames = 400
ds_num_episodes = 10
buffer_size = 100
seed = 42
n_epochs = 3
@@ -138,25 +90,17 @@ def test_frames_order_over_epochs(tmp_path, lerobot_dataset_factory, shuffle):
)
streaming_ds = StreamingLeRobotDataset(
repo_id=repo_id, root=local_path, buffer_size=buffer_size, seed=seed, shuffle=shuffle
repo_id=repo_id, root=local_path, episode_pool_size=4, seed=seed, shuffle=shuffle
)
first_epoch_indices = [frame["index"] for frame in streaming_ds]
expected_indices = get_frames_expected_order(streaming_ds)
assert first_epoch_indices == expected_indices, "First epoch indices do not match expected indices"
expected_indices = get_frames_expected_order(streaming_ds)
for _ in range(n_epochs):
streaming_indices = [frame["index"] for frame in streaming_ds]
frames_match = all(
s_index == e_index for s_index, e_index in zip(streaming_indices, expected_indices, strict=True)
)
if shuffle:
assert not frames_match
else:
assert frames_match
epochs = [[int(frame["index"]) for frame in streaming_ds] for _ in range(n_epochs)]
for epoch_indices in epochs:
assert sorted(epoch_indices) == list(range(ds_num_frames)), "epoch did not cover every frame once"
if shuffle:
assert epochs[0] != epochs[1], "shuffle did not reshuffle across epochs"
assert epochs[0] != list(range(ds_num_frames)), "shuffle left the stream in sequential order"
else:
assert epochs[0] == epochs[1] == epochs[2], "unshuffled epochs must repeat the same order"
@pytest.mark.parametrize(
@@ -164,15 +108,11 @@ def test_frames_order_over_epochs(tmp_path, lerobot_dataset_factory, shuffle):
[False, True],
)
def test_frames_order_with_shards(tmp_path, lerobot_dataset_factory, shuffle):
"""Test if streamed frames correspond to shuffling operations over in-memory dataset with multiple shards."""
"""Multi-shard streams keep exactly-once coverage and deterministic per-seed order."""
ds_num_frames = 100
ds_num_episodes = 10
buffer_size = 10
seed = 42
n_epochs = 3
data_file_size_mb = 0.001
chunks_size = 1
local_path = tmp_path / "test"
@@ -187,31 +127,21 @@ def test_frames_order_with_shards(tmp_path, lerobot_dataset_factory, shuffle):
chunks_size=chunks_size,
)
streaming_ds = StreamingLeRobotDataset(
repo_id=repo_id,
root=local_path,
buffer_size=buffer_size,
seed=seed,
shuffle=shuffle,
max_num_shards=4,
)
first_epoch_indices = [frame["index"] for frame in streaming_ds]
expected_indices = get_frames_expected_order(streaming_ds)
assert first_epoch_indices == expected_indices, "First epoch indices do not match expected indices"
for _ in range(n_epochs):
streaming_indices = [
frame["index"] for frame in streaming_ds
] # NOTE: this is the same as first_epoch_indices
frames_match = all(
s_index == e_index for s_index, e_index in zip(streaming_indices, expected_indices, strict=True)
def make_ds():
return StreamingLeRobotDataset(
repo_id=repo_id,
root=local_path,
episode_pool_size=3,
seed=seed,
shuffle=shuffle,
max_num_shards=4,
)
if shuffle:
assert not frames_match
else:
assert frames_match
first = [int(frame["index"]) for frame in make_ds()]
again = [int(frame["index"]) for frame in make_ds()]
assert sorted(first) == list(range(ds_num_frames)), "epoch did not cover every frame once"
assert first == again, "same seed must reproduce the same order"
@pytest.mark.parametrize(
@@ -288,6 +218,11 @@ def test_frames_with_delta_consistency(tmp_path, lerobot_dataset_factory, state_
check = torch.allclose(left, right) and left.shape == right.shape
else:
# Scalar numerics: streaming yields python floats/ints where map-style yields
# 0-dim tensors (long-standing accepted difference). Compare by value.
check = float(left) == float(right)
key_checks.append((key, check))
assert all(t[1] for t in key_checks), (
@@ -0,0 +1,100 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""End-to-end distributed streaming smoke test under a real `accelerate launch`.
Mirrors tests/training/test_multi_gpu.py but runs on CPU and only checks the dataloading contract: with
two processes, `split_dataset_by_node` (auto-resolved from the Accelerate state) must give each rank a
disjoint set of frames that together cover the dataset. Skips if the environment can't actually spawn
>= 2 processes (e.g. local macOS multi-CPU), so it never silently passes as a single process.
"""
import json
import shutil
import subprocess
import sys
import pytest
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
pytest.importorskip("accelerate", reason="accelerate is required (install lerobot[training])")
from tests.fixtures.constants import DUMMY_REPO_ID
WORKER = """
import json, sys
from accelerate import PartialState
from lerobot.datasets.streaming_dataset import StreamingLeRobotDataset
root, repo_id, out_dir = sys.argv[1], sys.argv[2], sys.argv[3]
state = PartialState()
ds = StreamingLeRobotDataset(
repo_id=repo_id, root=root, shuffle=False, episode_pool_size=8, max_num_shards=8
)
indices = [int(frame["index"]) for frame in ds]
payload = {"rank": state.process_index, "world": state.num_processes, "indices": indices}
with open(f"{out_dir}/rank_{state.process_index}.json", "w") as f:
json.dump(payload, f)
"""
@pytest.mark.skipif(shutil.which("accelerate") is None, reason="accelerate CLI not available")
def test_accelerate_launch_ranks_are_disjoint(tmp_path, lerobot_dataset_factory):
total_frames = 160
repo_id = f"{DUMMY_REPO_ID}-acc"
root = tmp_path / "ds"
lerobot_dataset_factory(
root=root,
repo_id=repo_id,
total_episodes=8,
total_frames=total_frames,
use_videos=False,
data_files_size_in_mb=0.001,
chunks_size=1,
)
worker = tmp_path / "worker.py"
worker.write_text(WORKER)
out_dir = tmp_path / "out"
out_dir.mkdir()
cmd = [
"accelerate",
"launch",
"--num_processes=2",
"--num_machines=1",
"--mixed_precision=no",
"--dynamo_backend=no",
"--cpu",
str(worker),
str(root),
repo_id,
str(out_dir),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
assert result.returncode == 0, (
f"accelerate launch failed:\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
)
payloads = [json.loads(p.read_text()) for p in sorted(out_dir.glob("rank_*.json"))]
if len(payloads) < 2 or any(p["world"] < 2 for p in payloads):
pytest.skip("environment did not spawn >= 2 distributed processes (e.g. local macOS multi-CPU)")
rank_sets = [set(p["indices"]) for p in payloads]
assert rank_sets[0].isdisjoint(rank_sets[1]), "ranks streamed overlapping frames under accelerate launch"
assert set().union(*rank_sets) == set(range(total_frames)), "ranks did not jointly cover all frames"
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-v"]))
+314
View File
@@ -0,0 +1,314 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the HF-native large-scale streaming additions: distributed (per-rank) sharding,
DataLoader worker splitting, the episode pool (randomness, coverage, exact deltas), video
prefetching, deterministic fast-forward resume, and schema parity."""
import pytest
import torch
from torch.utils.data import DataLoader
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
from lerobot.datasets.streaming_dataset import StreamingLeRobotDataset
from lerobot.utils.constants import ACTION
from tests.fixtures.constants import DUMMY_REPO_ID
def _make_local_dataset(factory, root, repo_id, *, total_episodes, total_frames, use_videos=False, **kw):
factory(
root=root,
repo_id=repo_id,
total_episodes=total_episodes,
total_frames=total_frames,
use_videos=use_videos,
data_files_size_in_mb=0.001,
chunks_size=1,
**kw,
)
def _stream_indices(ds: StreamingLeRobotDataset) -> list[int]:
return [int(frame["index"]) for frame in ds]
def test_resolve_distributed_prefers_explicit_then_env(monkeypatch):
assert StreamingLeRobotDataset._resolve_distributed(2, 8) == (2, 8)
monkeypatch.delenv("RANK", raising=False)
monkeypatch.delenv("WORLD_SIZE", raising=False)
# No accelerate state, no env -> single process.
assert StreamingLeRobotDataset._resolve_distributed(None, None) == (0, 1)
monkeypatch.setenv("RANK", "3")
monkeypatch.setenv("WORLD_SIZE", "4")
assert StreamingLeRobotDataset._resolve_distributed(None, None) == (3, 4)
def test_split_by_node_disjoint_across_ranks(tmp_path, lerobot_dataset_factory):
"""Each rank must stream a disjoint set of frames, and the ranks together must cover every frame."""
repo_id = f"{DUMMY_REPO_ID}-ranks"
total_frames, total_episodes = 200, 8
_make_local_dataset(
lerobot_dataset_factory,
tmp_path / "ds",
repo_id,
total_episodes=total_episodes,
total_frames=total_frames,
)
world_size = 2
per_rank = []
for rank in range(world_size):
ds = StreamingLeRobotDataset(
repo_id=repo_id,
root=tmp_path / "ds",
shuffle=False,
episode_pool_size=8,
max_num_shards=8,
rank=rank,
world_size=world_size,
)
per_rank.append(set(_stream_indices(ds)))
assert per_rank[0].isdisjoint(per_rank[1]), (
"ranks streamed overlapping frames (duplicate data across GPUs)"
)
assert per_rank[0] | per_rank[1] == set(range(total_frames)), "ranks did not jointly cover all frames"
def test_dataloader_workers_no_duplicates_within_rank(tmp_path, lerobot_dataset_factory):
"""DataLoader workers within a rank must split shards so no frame is yielded twice."""
repo_id = f"{DUMMY_REPO_ID}-workers"
total_frames, total_episodes = 120, 8
_make_local_dataset(
lerobot_dataset_factory,
tmp_path / "ds",
repo_id,
total_episodes=total_episodes,
total_frames=total_frames,
)
ds = StreamingLeRobotDataset(
repo_id=repo_id, root=tmp_path / "ds", shuffle=False, episode_pool_size=4, max_num_shards=4
)
loader = DataLoader(ds, batch_size=None, num_workers=2)
indices = [int(batch["index"]) for batch in loader]
assert len(indices) == len(set(indices)), "DataLoader workers yielded duplicate frames within a rank"
def test_sarm_window_covers_long_horizon_without_padding(tmp_path, lerobot_dataset_factory):
"""A delta window longer than the old 100-frame ceiling must fetch real frames, not pad them.
SARM uses a window of 8 steps spaced 1s (~160 frames @ fps20). Here fps=30, so +5s = 150 frames > 100.
"""
repo_id = f"{DUMMY_REPO_ID}-sarm"
# A single long episode so a +150-frame lookahead is unambiguously inside the episode (the fixture
# gives episodes variable lengths, so multi-episode boundaries can't be assumed).
episode_frames = 300
_make_local_dataset(
lerobot_dataset_factory, tmp_path / "ds", repo_id, total_episodes=1, total_frames=episode_frames
)
horizon_s = 5.0 # 150 frames @ fps30, well beyond LOOKAHEAD_BACKTRACKTABLE=100
delta_timestamps = {ACTION: [0.0, horizon_s]}
ds = StreamingLeRobotDataset(
repo_id=repo_id,
root=tmp_path / "ds",
shuffle=False,
episode_pool_size=1,
max_num_shards=1,
delta_timestamps=delta_timestamps,
)
horizon_frames = int(round(horizon_s * ds.fps))
assert horizon_frames > 100, "test must exceed the old LOOKAHEAD_BACKTRACKTABLE ceiling"
checked = 0
for frame in ds:
idx = int(frame["index"])
# The +horizon target is inside the single episode -> it must be a real frame, not padding.
if idx + horizon_frames < episode_frames:
assert not bool(frame[f"{ACTION}_is_pad"][-1]), (
f"frame {idx}: +{horizon_frames} target was padded; long delta window did not reach it"
)
checked += 1
assert checked > 0, "test did not exercise any in-episode long-horizon frame"
def test_pool_order_is_deterministic_per_seed(tmp_path, lerobot_dataset_factory):
repo_id = f"{DUMMY_REPO_ID}-seeds"
_make_local_dataset(lerobot_dataset_factory, tmp_path / "ds", repo_id, total_episodes=6, total_frames=120)
def order(seed):
return _stream_indices(
StreamingLeRobotDataset(
repo_id=repo_id,
root=tmp_path / "ds",
shuffle=True,
seed=seed,
episode_pool_size=4,
max_num_shards=2,
)
)
assert order(0) == order(0), "same seed must reproduce the same order"
assert order(0) != order(1), "different seeds should give different orders"
def test_pool_epochs_reshuffle_and_cover(tmp_path, lerobot_dataset_factory):
"""Consecutive passes over the same dataset object reshuffle (epoch advances) but keep coverage."""
repo_id = f"{DUMMY_REPO_ID}-epochs"
total_frames = 120
_make_local_dataset(
lerobot_dataset_factory, tmp_path / "ds", repo_id, total_episodes=6, total_frames=total_frames
)
ds = StreamingLeRobotDataset(
repo_id=repo_id, root=tmp_path / "ds", shuffle=True, seed=3, episode_pool_size=4, max_num_shards=2
)
epoch_0 = _stream_indices(ds)
epoch_1 = _stream_indices(ds)
assert sorted(epoch_0) == sorted(epoch_1) == list(range(total_frames))
assert epoch_0 != epoch_1, "epoch did not reshuffle"
def test_pool_mixes_episodes(tmp_path, lerobot_dataset_factory):
"""Early samples should already come from several distinct episodes (the pool's purpose)."""
repo_id = f"{DUMMY_REPO_ID}-mix"
_make_local_dataset(lerobot_dataset_factory, tmp_path / "ds", repo_id, total_episodes=8, total_frames=200)
ds = StreamingLeRobotDataset(
repo_id=repo_id, root=tmp_path / "ds", shuffle=True, seed=0, episode_pool_size=8, max_num_shards=4
)
episodes_in_head = {int(frame["episode_index"]) for _, frame in zip(range(20), ds, strict=False)}
assert len(episodes_in_head) >= 3, f"pool did not mix episodes: {episodes_in_head}"
def test_schema_parity_with_map_style(tmp_path, lerobot_dataset_factory):
"""Streamed samples must have the same keys / shapes / dtypes as map-style LeRobotDataset."""
repo_id = f"{DUMMY_REPO_ID}-parity"
map_ds = lerobot_dataset_factory(
root=tmp_path / "ds", repo_id=repo_id, total_episodes=4, total_frames=80, use_videos=True
)
stream_ds = StreamingLeRobotDataset(
repo_id=repo_id, root=tmp_path / "ds", shuffle=False, episode_pool_size=4, max_num_shards=2
)
map_frame = map_ds[0]
stream_frame = next(iter(stream_ds))
assert set(stream_frame) == set(map_frame), set(stream_frame) ^ set(map_frame)
for key, value in stream_frame.items():
ref = map_frame[key]
if isinstance(value, torch.Tensor):
assert isinstance(ref, torch.Tensor) and value.shape == ref.shape and value.dtype == ref.dtype, (
f"{key}: stream {tuple(value.shape)}/{value.dtype} vs map {tuple(ref.shape)}/{ref.dtype}"
)
elif isinstance(value, str):
assert isinstance(ref, str), f"{key}: {type(value)} vs {type(ref)}"
else:
# Scalar numerics: streaming yields python floats where map-style yields 0-dim tensors
# (a long-standing, accepted difference). Compare by value rather than exact type.
assert float(value) == float(ref), f"{key}: {value} vs {ref}"
def test_video_path_resolution_local(tmp_path, lerobot_dataset_factory, monkeypatch):
"""For a local (prewarmed) root, video decode must be issued against the local path, not hf://."""
import lerobot.datasets.streaming_dataset as sd
repo_id = f"{DUMMY_REPO_ID}-vpath"
lerobot_dataset_factory(
root=tmp_path / "ds", repo_id=repo_id, total_episodes=2, total_frames=40, use_videos=True
)
ds = StreamingLeRobotDataset(
repo_id=repo_id, root=tmp_path / "ds", shuffle=False, episode_pool_size=1, max_num_shards=1
)
seen_paths = []
def fake_decode(video_path, query_ts, *args, **kwargs):
seen_paths.append(str(video_path))
return torch.zeros(len(query_ts), 3, 64, 96)
monkeypatch.setattr(sd, "decode_video_frames_torchcodec", fake_decode)
next(iter(ds))
assert seen_paths, "no video decode was issued"
assert all(str(ds.root) in p and not p.startswith("hf://") for p in seen_paths), seen_paths
def test_shuffle_decorrelates_output_order(tmp_path, lerobot_dataset_factory):
"""With shuffle on, streamed frame order must differ from the underlying sequential order."""
repo_id = f"{DUMMY_REPO_ID}-shuf"
_make_local_dataset(lerobot_dataset_factory, tmp_path / "ds", repo_id, total_episodes=8, total_frames=200)
ordered = _stream_indices(
StreamingLeRobotDataset(
repo_id=repo_id, root=tmp_path / "ds", shuffle=False, episode_pool_size=1, max_num_shards=1
)
)
shuffled = _stream_indices(
StreamingLeRobotDataset(
repo_id=repo_id, root=tmp_path / "ds", shuffle=True, episode_pool_size=8, max_num_shards=4, seed=0
)
)
assert sorted(shuffled) == sorted(ordered), "shuffling changed the set of frames"
assert shuffled != ordered, "shuffle did not decorrelate output order"
def test_native_resume_never_repeats_and_loss_is_bounded(tmp_path, lerobot_dataset_factory):
"""Native state_dict resume: no sample is re-yielded; loss is bounded by the shuffle buffers."""
repo_id = f"{DUMMY_REPO_ID}-native-resume"
total_frames = 100
_make_local_dataset(
lerobot_dataset_factory, tmp_path / "ds", repo_id, total_episodes=5, total_frames=total_frames
)
def fresh_ds():
return StreamingLeRobotDataset(
repo_id=repo_id,
root=tmp_path / "ds",
shuffle=True,
seed=7,
episode_pool_size=2,
frame_shuffle_buffer_size=8,
)
ds = fresh_ds()
it = iter(ds)
consumed = [int(next(it)["index"]) for _ in range(30)]
state = ds.state_dict()
resumed_ds = fresh_ds()
resumed_ds.load_state_dict(state)
rest = [int(frame["index"]) for frame in resumed_ds]
assert not set(consumed) & set(rest), "resume re-yielded already-seen frames"
# in-flight buffer contents are skipped on resume (documented datasets behavior):
# bounded by the episode pool (2 episodes of <= ~30 frames here) + frame buffer (8)
covered = len(set(consumed) | set(rest))
max_in_flight = 2 * 30 + 8
assert covered >= total_frames - max_in_flight
assert covered + len(consumed) >= total_frames - max_in_flight
def test_pipeline_uses_native_primitives(tmp_path, lerobot_dataset_factory):
"""The tabular pipeline is pure datasets: batch(by_column) + shuffle + map + shuffle."""
repo_id = f"{DUMMY_REPO_ID}-native-pipe"
_make_local_dataset(lerobot_dataset_factory, tmp_path / "ds", repo_id, total_episodes=4, total_frames=80)
ds = StreamingLeRobotDataset(repo_id=repo_id, root=tmp_path / "ds", shuffle=True, episode_pool_size=2)
import datasets as hf_datasets
assert isinstance(ds._pipeline, hf_datasets.IterableDataset)
state = ds._pipeline.state_dict() # the native resume protocol is available end-to-end
assert state is not None
Generated
+17 -18
View File
@@ -1084,8 +1084,8 @@ wheels = [
[[package]]
name = "datasets"
version = "4.8.5"
source = { registry = "https://pypi.org/simple" }
version = "5.0.1.dev0"
source = { git = "https://github.com/huggingface/datasets.git?rev=2c45eab1bb975ac3d846f2aa6217b82adec8eba3#2c45eab1bb975ac3d846f2aa6217b82adec8eba3" }
dependencies = [
{ name = "dill" },
{ name = "filelock" },
@@ -1102,10 +1102,6 @@ dependencies = [
{ name = "tqdm" },
{ name = "xxhash" },
]
sdist = { url = "https://files.pythonhosted.org/packages/66/34/14cd8e76f907f7d4dca2334cfeec9f81d30fd15c25a015f99aaea694eaed/datasets-4.8.5.tar.gz", hash = "sha256:0f0c1c3d56ffff2c93b2f4c63c95bac94f3d7e8621aea2a2a576275233bba772", size = 605649, upload-time = "2026-04-27T15:43:57.384Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/65/99/00f3196036501b53032c4b1ab8337a0b978dee832ed276dae3815df4e8b5/datasets-4.8.5-py3-none-any.whl", hash = "sha256:5079900781719c0e063a8efdd2cd95a31ad0c63209178669cd23cf1b926149ff", size = 528973, upload-time = "2026-04-27T15:43:53.702Z" },
]
[[package]]
name = "debugpy"
@@ -1764,7 +1760,7 @@ wheels = [
[[package]]
name = "gym-aloha"
version = "0.1.3"
version = "0.1.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "dm-control" },
@@ -1772,14 +1768,14 @@ dependencies = [
{ name = "imageio", extra = ["ffmpeg"] },
{ name = "mujoco" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b5/5e/4bb7204730501c2f645e0532a2df4339206948b2882f77cbf0eaf75bc5fe/gym_aloha-0.1.3.tar.gz", hash = "sha256:b794b246a2e6da6ce5f75e152f553fbd4412704bc217fe6311d0ede3bb72a75e", size = 443468, upload-time = "2025-10-09T14:02:35.024Z" }
sdist = { url = "https://files.pythonhosted.org/packages/4a/c5/a5b8bdbddfcadec0b52b50e6d1a70325e09e6b594e5f55929d67d9122e2c/gym_aloha-0.1.4.tar.gz", hash = "sha256:0dc4e645045aeb3e74e3c320872d28df6dc93a8751d6ab2f266a2ca11323131f", size = 443466, upload-time = "2026-06-10T09:13:25.525Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/57/6c/10da397177c48ce360efa66ec21b10b10ef5fa2766256fcd8d7d9b5fa6fc/gym_aloha-0.1.3-py3-none-any.whl", hash = "sha256:a94e5747e71307897ded7ae17ed97fab05e814dcb714a16d320f110444f9d0c3", size = 447908, upload-time = "2025-10-09T14:02:33.253Z" },
{ url = "https://files.pythonhosted.org/packages/35/e3/3afd0e517a503aabe255bf65f5136490acb79c43189e8d56a3aa63081a10/gym_aloha-0.1.4-py3-none-any.whl", hash = "sha256:d9044290fbccddf0be4246b5287cf0eb6b9ddee545a3d222ce8d78c93ce7125e", size = 447908, upload-time = "2026-06-10T09:13:23.868Z" },
]
[[package]]
name = "gym-hil"
version = "0.1.13"
version = "0.1.14"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "gymnasium" },
@@ -1789,9 +1785,9 @@ dependencies = [
{ name = "pygame" },
{ name = "pynput" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f3/41/e89c87b3c66fb2f8ab5818bff4aa552977911eabaee7c12a8a336dcc406f/gym_hil-0.1.13.tar.gz", hash = "sha256:b9eab7a0acc811f181254e3ad72865830fdbb292c236895f374135d3d62f1b27", size = 5668001, upload-time = "2025-10-21T09:57:24.01Z" }
sdist = { url = "https://files.pythonhosted.org/packages/0c/64/b5cfe59d6a69d20497218f01ad2bdaa2a5a72b850bdb1a445d804ecc9948/gym_hil-0.1.14.tar.gz", hash = "sha256:aeee688dcb3ec72e7bcbe604df4a3f990cce49c8a2da469dd67c3a4eeb4c6bbb", size = 5667991, upload-time = "2026-06-10T09:16:38.98Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c2/8d/9e3ab53f9aac7bd542f339efd0a9283fa76e034474987e0705379274dfcf/gym_hil-0.1.13-py3-none-any.whl", hash = "sha256:b6444fc43ce1a68ce403df14f99100d9c903ae05d822959e9cd0b76a50b93320", size = 5750805, upload-time = "2025-10-21T09:57:22.068Z" },
{ url = "https://files.pythonhosted.org/packages/72/97/a7a9c3886306a89046ba5c989bc8b79008e7ec973228bad1fa20d7a94bba/gym_hil-0.1.14-py3-none-any.whl", hash = "sha256:9a2799d47a4561e0b0bb8d37fb3d84934657240be328d13991ea06758726533d", size = 5750805, upload-time = "2026-06-10T09:16:36.827Z" },
]
[[package]]
@@ -1881,7 +1877,7 @@ sdist = { url = "https://files.pythonhosted.org/packages/e6/3e/ffad88145b342d5a9
[[package]]
name = "hf-libero"
version = "0.1.3"
version = "0.1.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "bddl", marker = "sys_platform == 'linux'" },
@@ -1902,7 +1898,10 @@ dependencies = [
{ name = "transformers", marker = "sys_platform == 'linux'" },
{ name = "wandb", marker = "sys_platform == 'linux'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/7e/ca/7f1c90aedcd067d608681cf03469ae548990ba0806f68a67927dcc801f04/hf_libero-0.1.3.tar.gz", hash = "sha256:0d6b9a215a658db86f66c03d063d6d877d2e9f96d2d326cfa9f43ba4da4a6d5a", size = 2960521, upload-time = "2025-11-03T17:58:00.003Z" }
sdist = { url = "https://files.pythonhosted.org/packages/af/aa/4e9eb8715e0bff9cb6553db563a35d253393097d446f82bd53575e8b253d/hf_libero-0.1.4.tar.gz", hash = "sha256:c058d67ad5a2b589529c14d614282ef4cca3a7763dafa134f58a6c9039657e34", size = 2961319, upload-time = "2026-06-10T09:56:13.994Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/79/c286b894c051988d062241682834df915c945bcf51009ffdffbe5ecf69bf/hf_libero-0.1.4-py3-none-any.whl", hash = "sha256:207f76e2f28bff30f78132223d8592fe8f64b1f8fd90ce7024948ada0d7e2c27", size = 3169084, upload-time = "2026-06-10T09:56:12.441Z" },
]
[[package]]
name = "hf-xet"
@@ -3075,7 +3074,7 @@ requires-dist = [
{ name = "av", marker = "extra == 'av-dep'", specifier = ">=15.0.0,<16.0.0" },
{ name = "cmake", specifier = ">=3.29.0.1,<4.2.0" },
{ name = "contourpy", marker = "extra == 'matplotlib-dep'", specifier = ">=1.3.0,<2.0.0" },
{ name = "datasets", marker = "extra == 'dataset'", specifier = ">=4.7.0,<5.0.0" },
{ name = "datasets", marker = "extra == 'dataset'", git = "https://github.com/huggingface/datasets.git?rev=2c45eab1bb975ac3d846f2aa6217b82adec8eba3" },
{ name = "debugpy", marker = "extra == 'dev'", specifier = ">=1.8.1,<1.9.0" },
{ name = "decord", marker = "(platform_machine == 'AMD64' and extra == 'groot') or (platform_machine == 'x86_64' and extra == 'groot')", specifier = ">=0.6.0,<1.0.0" },
{ name = "deepdiff", marker = "extra == 'deepdiff-dep'", specifier = ">=7.0.1,<9.0.0" },
@@ -3090,12 +3089,12 @@ requires-dist = [
{ name = "flash-attn", marker = "sys_platform != 'darwin' and extra == 'groot'", specifier = ">=2.5.9,<3.0.0" },
{ name = "grpcio", marker = "extra == 'grpcio-dep'", specifier = "==1.73.1" },
{ name = "grpcio-tools", marker = "extra == 'dev'", specifier = "==1.73.1" },
{ name = "gym-aloha", marker = "extra == 'aloha'", specifier = ">=0.1.2,<0.2.0" },
{ name = "gym-hil", marker = "extra == 'hilserl'", specifier = ">=0.1.13,<0.2.0" },
{ name = "gym-aloha", marker = "extra == 'aloha'", specifier = ">=0.1.4,<0.2.0" },
{ name = "gym-hil", marker = "extra == 'hilserl'", specifier = ">=0.1.14,<0.2.0" },
{ name = "gym-pusht", marker = "extra == 'pusht'", specifier = ">=0.1.5,<0.2.0" },
{ name = "gymnasium", specifier = ">=1.1.1,<2.0.0" },
{ name = "hebi-py", marker = "extra == 'phone'", specifier = ">=2.8.0,<2.12.0" },
{ name = "hf-libero", marker = "sys_platform == 'linux' and extra == 'libero'", specifier = ">=0.1.3,<0.2.0" },
{ name = "hf-libero", marker = "sys_platform == 'linux' and extra == 'libero'", specifier = ">=0.1.4,<0.2.0" },
{ name = "hidapi", marker = "extra == 'gamepad'", specifier = ">=0.14.0,<0.15.0" },
{ name = "huggingface-hub", specifier = ">=1.0.0,<2.0.0" },
{ name = "ipykernel", marker = "extra == 'notebook'", specifier = ">=6.0.0,<7.0.0" },