Reshard parquet per row group (1 shard == 1 row group == 1 episode) and feed the
episode-pool shuffle with max_buffer_input_shards so the pool is a uniform random
sample of the corpus, independent of episodes-per-file. Add validate_row_groups
guardrails (collapsed-row-group + distributed divisibility), require datasets>=5.0.0,
make the test fixture write one row group per episode, and plumb max_buffer_input_shards
through the dataloading benchmark.
Co-authored-by: Cursor <cursoragent@cursor.com>
Single-file SLURM-oriented benchmark comparing the map-style and native
streaming loaders on single-image samples: a self-submitting serial chain
that measures peak RSS, samples/s (and decoded frames/s), fetch-vs-decode
split, shuffle randomness, and p50/p95/p99 sample latency over a fixed
wall-clock window, including a 2-node split_dataset_by_node leg.
Co-authored-by: Cursor <cursoragent@cursor.com>
Raise the default episode_pool_size to 1024 (DatasetConfig + StreamingLeRobotDataset)
for better default shuffle quality at scale.
Streaming is now a first-class option of the main train script: when cfg.dataset.streaming
is set, the dataloader is not handed to accelerate (the dataset is already rank-disjoint via
split_dataset_by_node, so IterableDatasetShard would drop (N-1)/N of each rank's stream),
batches are moved to device manually, and the episode-aware sampler is skipped. Remove the
standalone examples/scaling/train_streaming_multinode.py example in favor of this wiring.
Co-authored-by: Cursor <cursoragent@cursor.com>
The benchmarks/streaming harness (matrix submitter, summarizer, decode
diagnostic) and the robocasa SLURM scripts are cluster-specific tooling,
not part of the streaming feature. The example's --dummy mode covers
throughput measurement for reviewers. Recoverable from git history
(894fc6bfb) for cluster runs. Example docstring de-personalized.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The custom episode pool becomes a pure `datasets` pipeline:
split_dataset_by_node -> batch(by_column="episode_index")
-> shuffle(buffer=episode_pool_size) # episode pool
-> map(explode + exact delta windows) # episode -> frames
-> shuffle(buffer=frame_shuffle_buffer_size) # frame interleave
and the torch IterableDataset wrapper keeps only per-sample video decode
(decode-on-exit), image transforms, task lookup, and decode/fetch timing.
Replaced by native machinery and deleted: the pooled-episode admission
loop, the refcounted video prefetcher, manual worker shard striding plus
the worker-split suppression patch, the per-(epoch, rank) shard-order
permutation, the per-consumer SplitMix64 RNG, and fast-forward resume.
DataLoader workers are split by `datasets` itself; .shuffle() permutes
shard order per epoch natively; resume delegates to the native
state_dict/load_state_dict (exact with num_workers=0; with workers use
torchdata's StatefulDataLoader, which checkpoints per-worker state
through the same protocol). An in-flight epoch counter ensures a
mid-iteration state_dict records the epoch the stream position belongs
to. Buffer contents are skipped on resume (documented datasets
behavior): never repeats data, drops at most ~pool + frame-buffer frames.
Randomness is unchanged: a batch still mixes up to episode_pool_size
episodes; delta windows are still exact in-episode slices with correct
boundary padding (value-verified against the map-style dataset). The
known trade accepted with this rewrite: no video prefetch-on-admit, so
remote decode pays per-frame range reads at yield time - use a colocated
bucket (data_files_root) at large scale.
The delta-consistency tests gained a scalar-comparison branch: they
silently skipped python-scalar keys before (stale `check` variable),
exposed by the new pipeline's key ordering.
Requires datasets with #8259 (pinned to the merge commit on this
branch). Example updated to per-rank native resume via torchdata's
StatefulDataLoader when available.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Replace the shard/Backtrackable/decoded-shuffle-buffer internals with an
episode pool: each (rank x worker) consumer keeps episode_pool_size whole
episodes' tabular rows in RAM and emits uniformly random frames across
them. delta_timestamps windows become exact in-RAM slices with correct
boundary padding (the Backtrackable machinery and its lookback/lookahead
ceilings are gone), and video is decoded only when a sample is emitted,
so pool memory stays tabular-sized instead of buffer_size decoded
samples.
- Prefetch-on-admit: when streaming from a remote source, each pooled
episode's video files download to a local cache in the background
(refcounted, since v3 packs several episodes per file; deleted on
eviction), so decode-on-exit reads local bytes instead of paying
network seek latency.
- Per-consumer RNG derived from (seed, epoch, rank, worker): consumers
decorrelated, runs reproducible, epochs reshuffle automatically.
- Deterministic fast-forward resume: load_state_dict takes the trainer's
{batches_consumed, batch_size}; each worker re-derives its own skip
from the DataLoader's round-robin batch assignment and replays
tabular-only (no decode). Exact within an epoch, works with
num_workers > 0, and the same state file serves every rank. Replaces
the per-shard HF state_dict approach, which lived in worker processes
and could not be captured from the trainer.
- Shard-cap default removed (max_num_shards=None uses every parquet
shard); runtime warnings for non-divisible world sizes (datasets
degrades to read-everything splitting) and workers left without
shards.
- episode_pool_size replaces buffer_size (deprecated, ignored with a
warning); decoder cache sized to the pool working set, capped at 128.
Legacy order-replication tests asserted the old buffer algorithm
step-by-step and are rewritten as behavior contracts (exactly-once
coverage, per-seed determinism, epoch reshuffle). Value-level parity
tests against the map-style dataset pass unchanged.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The dataset is already rank-disjoint via split_dataset_by_node;
accelerate's IterableDatasetShard wrapper kept only every Nth batch of
each rank's stream, silently training on 1/N of the data per pass while
decoding all of it. The --dummy benchmark path never prepared the
loader, so benchmarks were unaffected.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
- examples/scaling/train_streaming_multinode.py: Accelerate-based distributed/
resumable streaming training (no DistributedSampler; rank/world_size auto-resolved),
checkpoints the dataset stream state, and supports a --dummy pure-dataloading path
with throughput logging. SLURM launcher in slurm/train_streaming_robocasa.sh.
- benchmarks/streaming/benchmark_streaming.py: dummy-consumer dataloading benchmark
(single / sarm frame modes) emitting frames/s/node, p50/p95/p99 sample latency,
first-batch latency, and VideoDecoderCache reuse stats as JSON + CSV. SLURM launcher
+ README documenting the source/node/mode matrix and manual bucket prewarming.
- VideoDecoderCache: add hit/miss/eviction counters and a stats() method so the
benchmark can surface decoder thrash (no new cache, no eviction-policy change).
- tests/datasets/test_streaming_distributed.py: accelerate-launch smoke test asserting
per-rank disjointness; skips (does not false-pass) when <2 processes spawn.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>