chore(streaming): drop benchmark and SLURM scaffolding from the PR

The benchmarks/streaming harness (matrix submitter, summarizer, decode
diagnostic) and the robocasa SLURM scripts are cluster-specific tooling,
not part of the streaming feature. The example's --dummy mode covers
throughput measurement for reviewers. Recoverable from git history
(894fc6bfb) for cluster runs. Example docstring de-personalized.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-06-11 21:46:43 +02:00
parent 894fc6bfb5
commit 38106ea6b4
8 changed files with 2 additions and 802 deletions
-91
View File
@@ -1,91 +0,0 @@
# Streaming dataloading benchmark
Measures **dataloading only** (no model) for `StreamingLeRobotDataset`: parquet read + video decode +
delta windowing + shuffle. A dummy consumer pulls batches and moves them to the device, so the numbers
isolate the data pipeline. Use it to compare sources (Hub vs. storage bucket vs. prewarmed bucket),
frame modes, and node counts, and to catch p95/p99 video-decode regressions.
## Run
```bash
python benchmarks/streaming/benchmark_streaming.py \
--repo_id pepijn223/robocasa_pretrain_human300_v4 \
--mode sarm --batch_size 64 --num_workers 12 --num_batches 200 \
--source hub --out_dir benchmarks/streaming/results
```
Multinode (per-node throughput) goes through Accelerate under SLURM:
```bash
sbatch slurm/benchmark_streaming_robocasa.sh
```
## Matrix
| Axis | Values |
| ---------- | -------------------------------------------------------------------------------------------------------------------- |
| Source | `hub` (verify now), `bucket`, `warmed_bucket` (bucket + prewarming; with user's help later) |
| Baseline | current `main` `StreamingLeRobotDataset` on Hub streaming |
| Nodes | 1 and 2 (per-node throughput should be independent) |
| Frame mode | `single` (1 frame, all cameras; target ≥ 120 frames/s/node) · `sarm` (8 steps spaced 1s; target ≥ 320 frames/s/node) |
`--source` is a label only; the actual source is whatever `--repo_id` / `--root` / `--data_files_root`
point at.
### GPU (NVDEC) decoding
By default video is decoded on the **CPU** in each DataLoader worker, so throughput is CPU-decode-bound and
scales with `--num_workers` (capped by the dataset's `num_shards`). Pass `--video_decode_device cuda` to
offload H.264/H.265 decode to the GPU's dedicated **NVDEC** engine, which runs independently of the SMs used
for training (see <https://developer.nvidia.com/video-codec-sdk>). This requires a CUDA-enabled torchcodec
build, and because CUDA cannot initialize in forked workers the benchmark switches to the `spawn` start
method automatically when `--num_workers > 0`.
```bash
# GPU/NVDEC decode, 6 workers, bucket source
python benchmarks/streaming/benchmark_streaming.py \
--repo_id pepijn223/robocasa_pretrain_human300_v4 \
--data_files_root hf://buckets/pepijn223/robocasa-stream \
--mode sarm --batch_size 64 --num_workers 6 --num_batches 200 \
--video_decode_device cuda --source bucket
```
Caveats with `cuda` + many workers: each worker creates its own CUDA context (VRAM overhead) and NVDEC has a
limited number of concurrent decode sessions per GPU; if you hit session/IPC limits, reduce `--num_workers`
or compare against `--num_workers 0` (single-process NVDEC, which often saturates the decode engine on its
own). Result files include the decode device in their name (`..._w6_cuda.json`).
> **Codec ⇄ NVDEC compatibility (important).** NVDEC can only decode codecs its hardware supports. LeRobot's
> **default video codec is AV1** (`VideoEncoderConfig.vcodec = "libsvtav1"`), so most v3 datasets are
> AV1-encoded — and the **A100 and H100 compute GPUs have no AV1 NVDEC decoder**
> (per NVIDIA's [decode support matrix](https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new));
> only Ada (L4/L40/RTX40) and a few Ampere cards (A10/A40/A16) do. On A100/H100, AV1 must be decoded on
> **CPU**, or the dataset re-encoded to H.265/H.264 (which those GPUs' NVDEC do support). Run
> `diagnose_decode.py --video_decode_device cuda` to check your exact node before relying on `cuda` decode.
> A `cuda` torchcodec build also needs an FFmpeg with NVDEC; see
> <https://github.com/meta-pytorch/torchcodec#installing-cuda-enabled-torchcodec>.
Reference data root: bucket sources resolve through `--data_files_root hf://buckets/<owner>/<name>` (metadata
still loads from `--repo_id`). The local `single`/`sarm` CPU baselines on this dataset were ~176 / ~212
frames/s/node at `--num_workers 3` (3 cameras, fps 20).
## Metrics emitted (JSON + CSV)
`frames_per_s_node`, `samples_per_s`, `first_batch_latency_s`, `p50/p95/p99_sample_latency_ms`,
`wallclock_s`, and `video_decoder_cache` (`hits`, `misses`, `evictions`, `hit_rate`, `size`). A low
cache `hit_rate` with high `p99` is the decoder-thrash signature — raise `--video_decoder_cache_size`
or `--episode_pool_size`, or reduce `num_workers`.
## Bucket sources & prewarming (manual)
Prewarming is a **server-side** Hugging Face storage-bucket feature — there is no client script. To
benchmark the `warmed_bucket` source:
1. Attach a storage bucket to the dataset and enable it (see
<https://huggingface.co/docs/hub/storage-buckets>). Buckets resolve through `fsspec`, the same as
`hf://`, so no code change is needed — point `--repo_id`/`--revision` (or `--root`) at the bucket.
2. Enable **prewarming** in the bucket settings and wait for warm-up to complete.
3. Run the benchmark with `--source warmed_bucket`. Compare against the cold `--source bucket` and the
`--source hub` baseline.
Manual only — not run in CI.
-322
View File
@@ -1,322 +0,0 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dataloading-only benchmark for StreamingLeRobotDataset.
A dummy consumer pulls batches and moves them to the device; no model runs, so the numbers isolate the
data pipeline (parquet read + video decode + delta windowing + shuffle). Reports per-node throughput and
sample-latency percentiles, plus video-decoder-cache reuse stats, and emits JSON + CSV.
Frame modes (matching the streaming design targets):
- ``single``: one frame, all cameras (target >= 120 frames/s/node).
- ``sarm``: an 8-step window spaced 1s (delta over 8s) (target >= 320 frames/s/node).
Example (stream from the Hub, single node):
python benchmarks/streaming/benchmark_streaming.py \
--repo_id pepijn223/robocasa_pretrain_human300_v4 --mode sarm \
--batch_size 64 --num_workers 12 --num_batches 200 --out_dir benchmarks/streaming/results
Distributed / multinode runs go through Accelerate; see ``slurm/benchmark_streaming_robocasa.sh``. Set
``--source`` purely for labeling the output (``hub`` / ``bucket`` / ``warmed_bucket``); the actual source
is whatever ``--repo_id``/``--root`` point at. See the README for bucket prewarming.
"""
import argparse
import csv
import json
import os
import statistics
import threading
import time
from pathlib import Path
import torch
from torch.utils.data import DataLoader
from lerobot.datasets import LeRobotDatasetMetadata, StreamingLeRobotDataset
from lerobot.utils.constants import ACTION
def _tree_rss_bytes() -> int:
"""Sum RSS of this process and all its descendants via /proc (Linux only; 0 elsewhere).
DataLoader workers are separate processes, so the parent's own RSS misses most of the pipeline's
memory. Walking the process tree captures the real footprint (parquet buffers + decoders + shuffle).
"""
try:
children: dict[int, list[int]] = {}
for entry in os.listdir("/proc"):
if not entry.isdigit():
continue
try:
with open(f"/proc/{entry}/stat") as f:
ppid = int(f.read().split(") ", 1)[1].split()[1])
children.setdefault(ppid, []).append(int(entry))
except (OSError, ValueError, IndexError):
pass
total, stack = 0, [os.getpid()]
while stack:
cur = stack.pop()
try:
with open(f"/proc/{cur}/statm") as f:
total += int(f.read().split()[1]) * os.sysconf("SC_PAGE_SIZE")
except (OSError, ValueError, IndexError):
pass
stack.extend(children.get(cur, []))
return total
except OSError:
return 0
class PeakRSSSampler:
"""Background thread tracking peak process-tree RSS for the duration of the `with` block."""
def __init__(self, interval_s: float = 0.5):
self.interval_s = interval_s
self.peak_bytes = 0
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True)
def _run(self) -> None:
while not self._stop.is_set():
self.peak_bytes = max(self.peak_bytes, _tree_rss_bytes())
self._stop.wait(self.interval_s)
def __enter__(self) -> "PeakRSSSampler":
self._thread.start()
return self
def __exit__(self, *exc) -> None:
self._stop.set()
self._thread.join(timeout=2)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--repo_id", type=str, required=True)
parser.add_argument("--root", type=str, default=None, help="Local/prewarmed root (else stream from Hub).")
parser.add_argument(
"--data_files_root",
type=str,
default=None,
help="fsspec root for bulk data/videos, e.g. hf://buckets/<owner>/<name>. Metadata still loads "
"from --repo_id on the Hub. Use for bucket / warmed_bucket sources.",
)
parser.add_argument("--mode", choices=["single", "sarm"], default="single")
parser.add_argument("--source", type=str, default="hub", help="Label only: hub | bucket | warmed_bucket.")
parser.add_argument("--batch_size", type=int, default=64)
parser.add_argument("--num_workers", type=int, default=8)
parser.add_argument(
"--prefetch_factor",
type=int,
default=2,
help="DataLoader batches prefetched per worker. Higher hides IO/decode latency but raises RAM "
"(prefetch_factor x num_workers x batch_size decoded frames held in flight). Ignored if num_workers=0.",
)
parser.add_argument("--buffer_size", type=int, default=None, help="Deprecated; ignored.")
parser.add_argument(
"--max_num_shards",
type=int,
default=16,
help="Cap on concurrently-open stream shards. Each open shard holds ~one parquet row group in "
"RAM; reading from an hf:// bucket buffers ~5x more per shard than hf:// datasets, so lower this "
"(e.g. to num_workers) for bucket sources to avoid OOM. All data is still covered via re-sharding.",
)
parser.add_argument("--video_decoder_cache_size", type=int, default=None)
parser.add_argument(
"--episode_pool_size",
type=int,
default=64,
help="Whole episodes each consumer keeps open to shuffle across (the randomness knob).",
)
parser.add_argument(
"--video_decode_device",
type=str,
default="cpu",
help="Decode device passed to torchcodec. 'cuda' offloads decode to the GPU's NVDEC engine "
"(needs a CUDA-enabled torchcodec build). With num_workers>0 this forces the 'spawn' start method.",
)
parser.add_argument("--num_batches", type=int, default=200)
parser.add_argument("--warmup_batches", type=int, default=5, help="Excluded from steady-state stats.")
parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu")
parser.add_argument("--out_dir", type=str, default="benchmarks/streaming/results")
return parser.parse_args()
def build_dataset(args: argparse.Namespace, meta: LeRobotDatasetMetadata) -> StreamingLeRobotDataset:
# sarm: an 8-step window spaced 1s => an 8s delta window (the SARM stress case).
delta_timestamps = {ACTION: [float(t) for t in range(8)]} if args.mode == "sarm" else None
return StreamingLeRobotDataset(
args.repo_id,
root=args.root,
data_files_root=args.data_files_root,
delta_timestamps=delta_timestamps,
max_num_shards=args.max_num_shards,
video_decoder_cache_size=args.video_decoder_cache_size,
video_decode_device=args.video_decode_device,
episode_pool_size=args.episode_pool_size,
tolerance_s=1e-3,
)
def percentile(values: list[float], pct: float) -> float:
if not values:
return float("nan")
ordered = sorted(values)
k = max(0, min(len(ordered) - 1, int(round((pct / 100.0) * (len(ordered) - 1)))))
return ordered[k]
def main() -> None:
args = parse_args()
device = torch.device(args.device)
meta = LeRobotDatasetMetadata(args.repo_id, root=args.root)
dataset = build_dataset(args, meta)
gpu_decode = args.video_decode_device.startswith("cuda")
loader = DataLoader(
dataset,
batch_size=args.batch_size,
num_workers=args.num_workers,
# GPU-decoded frames are already on the GPU, so CPU pinning is irrelevant (and pinning CUDA
# tensors errors). Pin only when decode is on CPU and we copy to a CUDA device.
pin_memory=device.type == "cuda" and not gpu_decode,
drop_last=True,
prefetch_factor=args.prefetch_factor if args.num_workers > 0 else None,
# CUDA cannot initialize in forked workers; NVDEC decode in workers needs the spawn start method.
multiprocessing_context="spawn" if gpu_decode and args.num_workers > 0 else None,
)
sample_latencies_ms: list[float] = []
episodes_per_batch: list[int] = [] # shuffle-randomness proxy: distinct episodes within a batch
frames = 0
first_batch_latency_s = None
steady_start = None # wall-clock start of the post-warmup measurement window
t_start = time.perf_counter()
t_prev = t_start
with PeakRSSSampler() as rss:
for i, batch in enumerate(loader):
# Dummy consume: move tensors to the device, mimicking what a real trainer would do.
for value in batch.values():
if torch.is_tensor(value):
value.to(device, non_blocking=device.type == "cuda")
now = time.perf_counter()
if first_batch_latency_s is None:
first_batch_latency_s = now - t_start
if i == args.warmup_batches:
# Start the steady window here; the slow first batch and the prefetch queue it filled are
# excluded so throughput reflects sustained production, not draining a pre-filled queue.
steady_start = now
elif i > args.warmup_batches:
sample_latencies_ms.append((now - t_prev) / args.batch_size * 1000.0)
frames += args.batch_size
ep = batch.get("episode_index")
if torch.is_tensor(ep):
episodes_per_batch.append(int(torch.unique(ep).numel()))
t_prev = now
if i + 1 >= args.num_batches:
break
peak_rss_gb = round(rss.peak_bytes / 1e9, 2) if rss.peak_bytes else None
now = time.perf_counter()
elapsed = now - t_start
# Wall-clock throughput over the steady window. NOT sum(inter-batch gaps): under async prefetch those
# gaps collapse to ~0 (the consumer drains a pre-filled queue) and overstate throughput by ~100x.
steady_elapsed_s = (now - steady_start) if steady_start is not None else elapsed
cache_stats = dataset.video_decoder_cache_stats()
timing = dataset.timing_stats() # cumulative decode/fetch seconds summed across workers
# Image (camera frame) resolution as decoded, e.g. [C, H, W]. Read from the dataset feature contract.
image_shape = list(meta.features[meta.video_keys[0]]["shape"]) if meta.video_keys else None
# Decode/fetch overlap in wall-clock (workers run in parallel), so normalize against the total worker
# budget (num_workers x wallclock) to express each stage as a fraction of available worker time.
worker_budget_s = max(args.num_workers, 1) * elapsed
decode_pct = round(100 * timing["decode_s_total"] / worker_budget_s, 1) if worker_budget_s else None
fetch_pct = round(100 * timing["fetch_s_total"] / worker_budget_s, 1) if worker_budget_s else None
# A 0-frame run is a failure, not a 0-throughput result: the pipeline produced no batches (decode
# error swallowed in workers, all batches dropped by drop_last, etc.). Exit non-zero so the job is
# never reported green with NaN/zero numbers.
if frames == 0:
raise SystemExit(
f"FAILED: measured 0 frames over {args.num_batches} requested batches "
f"(cache misses={cache_stats.get('misses', 0)}, hits={cache_stats.get('hits', 0)}). "
"The data pipeline yielded no usable batches — inspect worker logs for decode errors. "
"Try --num_workers 0 to surface the underlying exception directly."
)
results = {
"repo_id": args.repo_id,
"source": args.source,
"mode": args.mode,
"batch_size": args.batch_size,
"num_workers": args.num_workers,
"prefetch_factor": args.prefetch_factor if args.num_workers > 0 else None,
"buffer_size": args.buffer_size,
"episode_pool_size": args.episode_pool_size,
"episodes_per_batch_mean": round(statistics.mean(episodes_per_batch), 1)
if episodes_per_batch
else None,
# Fraction of a batch that is distinct episodes; ~1.0 ≈ map-style uniform, low ≈ correlated.
"shuffle_randomness_frac": round(statistics.mean(episodes_per_batch) / args.batch_size, 3)
if episodes_per_batch
else None,
"num_cameras": len(meta.video_keys),
"image_shape": image_shape,
"fps": meta.fps,
"device": str(device),
"video_decode_device": args.video_decode_device,
"peak_rss_gb": peak_rss_gb,
"frames_measured": frames,
"first_batch_latency_s": round(first_batch_latency_s or float("nan"), 4),
"frames_per_s_node": round(frames / steady_elapsed_s, 2) if steady_elapsed_s else 0.0,
"samples_per_s": round(frames / steady_elapsed_s, 2) if steady_elapsed_s else 0.0,
"p50_sample_latency_ms": round(statistics.median(sample_latencies_ms), 3)
if sample_latencies_ms
else None,
"p95_sample_latency_ms": round(percentile(sample_latencies_ms, 95), 3),
"p99_sample_latency_ms": round(percentile(sample_latencies_ms, 99), 3),
"total_time_s": round(elapsed, 2),
"steady_time_s": round(steady_elapsed_s, 2),
"wallclock_s": round(elapsed, 2),
"decode_s_total": timing["decode_s_total"],
"fetch_s_total": timing["fetch_s_total"],
"decode_pct_worker_time": decode_pct,
"fetch_pct_worker_time": fetch_pct,
"video_decoder_cache": cache_stats,
}
out_dir = Path(args.out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
pool_tag = f"_ep{args.episode_pool_size}" if args.episode_pool_size else ""
tag = (
f"{args.source}_{args.mode}_bs{args.batch_size}_w{args.num_workers}"
f"_pf{args.prefetch_factor}{pool_tag}_{args.video_decode_device}"
)
(out_dir / f"{tag}.json").write_text(json.dumps(results, indent=2))
flat = {k: (json.dumps(v) if isinstance(v, dict) else v) for k, v in results.items()}
with open(out_dir / f"{tag}.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(flat))
writer.writeheader()
writer.writerow(flat)
print("Command config:", vars(args))
print(json.dumps(results, indent=2))
print(f"Wrote {out_dir / tag}.json and .csv")
if __name__ == "__main__":
main()
-112
View File
@@ -1,112 +0,0 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Isolate the streaming video-decode path — no SLURM, no DataLoader, no benchmark loop.
Reproduces exactly what StreamingLeRobotDataset does for one video (resolve path -> fsspec.open ->
torchcodec VideoDecoder -> get one frame) and prints the environment + the first bytes of the handle, so
a decode failure ("No valid stream found in input file") can be pinpointed: bad/placeholder bytes vs a
torchcodec/ffmpeg build issue vs a device issue.
python benchmarks/streaming/diagnose_decode.py --repo_id pepijn223/robocasa_pretrain_human300_v4
python benchmarks/streaming/diagnose_decode.py --repo_id … --data_files_root hf://buckets/<o>/<n>
python benchmarks/streaming/diagnose_decode.py --repo_id … --video_decode_device cuda
"""
import argparse
import importlib.metadata as im
import fsspec
from lerobot.datasets import LeRobotDatasetMetadata
def _version(pkg: str) -> str:
try:
return im.version(pkg)
except Exception:
return "MISSING"
def main() -> None:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--repo_id", required=True)
p.add_argument("--data_files_root", default=None, help="e.g. hf://buckets/<owner>/<name>")
p.add_argument("--revision", default=None)
p.add_argument("--video_decode_device", default="cpu")
p.add_argument("--episode", type=int, default=0)
args = p.parse_args()
print("== environment ==")
for pkg in ("torchcodec", "av", "huggingface_hub", "hf_xet", "datasets", "fsspec"):
print(f" {pkg}: {_version(pkg)}")
meta = LeRobotDatasetMetadata(args.repo_id, revision=args.revision)
video_key = meta.video_keys[0]
rel_path = meta.get_video_file_path(args.episode, video_key)
root = args.data_files_root.rstrip("/") if args.data_files_root else meta.url_root
video_path = f"{root}/{rel_path}"
print("\n== target ==")
print(f" video_key: {video_key}")
print(f" video_path: {video_path}")
print("\n== fsspec handle ==")
try:
fh = fsspec.open(video_path).__enter__()
head = fh.read(32)
print(f" first 32 bytes (hex): {head.hex()}")
# A valid MP4/MOV has an 'ftyp' box near the start; anything else (HTML/JSON/empty) means the
# handle resolved to a placeholder or error page, not the video bytes.
looks_mp4 = b"ftyp" in head
print(f" looks like MP4 (contains 'ftyp'): {looks_mp4}")
if not looks_mp4:
print(f" !! first bytes as text: {head[:32]!r}")
fh.seek(0)
except Exception as e:
print(f" !! fsspec.open/read FAILED: {type(e).__name__}: {e}")
return
print("\n== torchcodec VideoDecoder ==")
try:
from torchcodec.decoders import VideoDecoder
decoder = VideoDecoder(fh, seek_mode="approximate", device=args.video_decode_device)
md = decoder.metadata
print(f" OK: {md.num_frames} frames, {md.average_fps} fps, codec={getattr(md, 'codec', '?')}")
frame = decoder.get_frames_at(indices=[0])
print(f" decoded frame 0: shape={tuple(frame.data.shape)}, device={frame.data.device}")
print("\nDECODE OK — the streaming pipeline can read this video on this machine.")
except Exception as e:
print(f" !! VideoDecoder FAILED: {type(e).__name__}: {e}")
print(
"\nDECODE FAILED. If the bytes above look like MP4 (ftyp=True), this is a torchcodec/ffmpeg "
"build issue, NOT bad bytes. Common cause for LeRobot v3 datasets: the videos are AV1-encoded "
"(see the 'codec' line on a working machine). Then:\n"
" - CPU decode needs an ffmpeg built with an AV1 decoder (libdav1d/libaom); a build without it "
"reports 'No valid stream found'.\n"
" - GPU/NVDEC decode of AV1 is only on AV1-capable NVDEC GPUs: Ada (L4/L40/RTX40) and some "
"Ampere (A10/A40/A16). The COMPUTE GPUs A100 and H100 have NO AV1 NVDEC decoder (per NVIDIA's "
"support matrix), so no torchcodec build enables cuda decode of AV1 on them.\n"
" - 'Unsupported device: cuda (variant: ffmpeg)' instead means torchcodec was built without "
"the CUDA backend; install a CUDA-enabled wheel (see README) — but on A100/H100 that still "
"won't decode AV1.\n"
"Fix: decode on CPU, run NVDEC on an Ada GPU, or re-encode the dataset to H.265/H.264 (which "
"A100/H100 NVDEC do support).\n"
"If ftyp=False instead, the handle resolved to a placeholder/error page (auth, revision, or Xet "
"resolution) rather than the video bytes."
)
if __name__ == "__main__":
main()
-79
View File
@@ -1,79 +0,0 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Collapse a directory of benchmark JSON results into one comparison table (and a combined CSV).
python benchmarks/streaming/summarize_results.py benchmarks/streaming/results
"""
import csv
import json
import sys
from pathlib import Path
COLUMNS = [
("source", "source"),
("mode", "mode"),
("video_decode_device", "decode"),
("num_workers", "workers"),
("batch_size", "bs"),
("frames_per_s_node", "frames/s/node"),
("first_batch_latency_s", "first_batch_s"),
("p50_sample_latency_ms", "p50_ms"),
("p95_sample_latency_ms", "p95_ms"),
("p99_sample_latency_ms", "p99_ms"),
]
def main() -> None:
results_dir = Path(sys.argv[1] if len(sys.argv) > 1 else "benchmarks/streaming/results")
files = sorted(results_dir.rglob("*.json"))
if not files:
print(f"No JSON results under {results_dir}")
return
rows = []
for f in files:
d = json.loads(f.read_text())
d["hit_rate"] = d.get("video_decoder_cache", {}).get("hit_rate")
rows.append(d)
rows.sort(key=lambda r: (r.get("source", ""), r.get("mode", ""), r.get("video_decode_device", "")))
headers = [label for _, label in COLUMNS] + ["cache_hit_rate"]
widths = {h: len(h) for h in headers}
table = []
for r in rows:
row = {label: r.get(key, "") for key, label in COLUMNS}
row["cache_hit_rate"] = r.get("hit_rate", "")
table.append(row)
for h in headers:
widths[h] = max(widths[h], len(str(row[h])))
line = " ".join(h.ljust(widths[h]) for h in headers)
print(line)
print(" ".join("-" * widths[h] for h in headers))
for row in table:
print(" ".join(str(row[h]).ljust(widths[h]) for h in headers))
combined = results_dir / "summary.csv"
with open(combined, "w", newline="") as fh:
writer = csv.DictWriter(fh, fieldnames=headers)
writer.writeheader()
writer.writerows(table)
print(f"\nWrote {combined}")
if __name__ == "__main__":
main()
@@ -27,9 +27,9 @@ streaming features of :class:`StreamingLeRobotDataset`:
Launch with Accelerate (single node, N GPUs):
accelerate launch --num_processes=8 examples/scaling/train_streaming_multinode.py \
--repo_id=pepijn223/robocasa_pretrain_human300_v4 --batch_size=64
--repo_id=lerobot/droid_1.0.1 --batch_size=64
Multinode runs use the same script under SLURM; see ``slurm/train_streaming_robocasa.sh``.
Multinode runs launch the same script with your cluster's accelerate/SLURM setup.
Pass ``--dummy`` to skip the model entirely and measure pure dataloading throughput.
"""
-40
View File
@@ -1,40 +0,0 @@
#!/bin/bash
#SBATCH --job-name=bench_stream
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-node=8
#SBATCH --cpus-per-task=96
#SBATCH --exclusive
#SBATCH --time=02:00:00
#SBATCH --output=logs/%x-%j.out
# Per-node dataloading benchmark for StreamingLeRobotDataset across 1-2 nodes. Each node runs an
# independent dummy-consumer benchmark; per-node throughput should be independent (separate network).
# Results are written per (node, source, mode) under --out_dir.
#
# Submit with: sbatch slurm/benchmark_streaming_robocasa.sh
# Override the source label for cold/warm bucket runs: SOURCE=warmed_bucket sbatch slurm/benchmark_streaming_robocasa.sh
set -euo pipefail
REPO_ID=${REPO_ID:-pepijn223/robocasa_pretrain_human300_v4}
SOURCE=${SOURCE:-hub}
OUT_DIR=${OUT_DIR:-benchmarks/streaming/results}
export HF_HOME=${HF_HOME:-$SCRATCH/hf_home}
export TOKENIZERS_PARALLELISM=false
# One benchmark process per node (each saturates the node's DataLoader workers + network independently).
srun --kill-on-bad-exit=1 bash -c '
for MODE in single sarm; do
python benchmarks/streaming/benchmark_streaming.py \
--repo_id '"$REPO_ID"' \
--source '"$SOURCE"' \
--mode $MODE \
--batch_size 64 \
--num_workers 12 \
--episode_pool_size 64 \
--num_batches 300 \
--out_dir '"$OUT_DIR"'/node${SLURM_NODEID}
done
'
-107
View File
@@ -1,107 +0,0 @@
#!/bin/bash
# Submit the FULL streaming dataloading-benchmark matrix as isolated single-GPU SLURM jobs.
#
# sources : hub (Hub streaming) | bucket (cold HF bucket) | warmed_bucket (prewarmed HF bucket)
# modes : single (1 frame, all cameras) | sarm (8-step / 8s delta window)
# decode : cpu (torchcodec on CPU, scales with workers) | cuda (NVDEC, offloads decode to the GPU)
#
# => 3 x 2 x 2 = 12 jobs. Each runs in its OWN job (1 node, 1 GPU) so an OOM is isolated and reported
# per-job by SLURM (check `sacct -j <id> --format=JobID,State,MaxRSS,ReqMem`). Submit from a login node
# inside the repo: bash slurm/run_streaming_matrix.sh
#
# SERIAL (default 1): chain the jobs with --dependency=afterany so SLURM runs exactly ONE at a time. This
# is important for a bandwidth benchmark — concurrent jobs would share the network to the Hub/bucket and
# corrupt every throughput number. `afterany` means a failed/OOM'd job does not stall the chain. Set
# SERIAL=0 to let the scheduler run them in parallel (only for OOM-isolation testing, not for throughput).
#
# Knobs (env overrides):
# REPO_ID, BUCKET, WARM_BUCKET, OUT_DIR, NUM_BATCHES, TIME, MEM, GPUS, SERIAL
# CPU_WORKERS / CPU_BUFFER (cpu-decode jobs) GPU_WORKERS / GPU_BUFFER (cuda-decode jobs, kept low to
# bound VRAM + NVDEC sessions). RUN ("python" by default; set RUN="uv run python" if using uv).
# SOURCES / MODES / DECODES to run a subset (e.g. SOURCES="hub bucket" DECODES="cpu").
# ACCOUNT / PARTITION / QOS passed through to sbatch if set.
set -euo pipefail
REPO_DIR=$(git rev-parse --show-toplevel)
REPO_ID=${REPO_ID:-pepijn223/robocasa_pretrain_human300_v4}
BUCKET=${BUCKET:-hf://buckets/pepijn223/robocasa-stream}
WARM_BUCKET=${WARM_BUCKET:-hf://buckets/pepijn223/robocasa-stream-warm}
OUT_DIR=${OUT_DIR:-benchmarks/streaming/results}
NUM_BATCHES=${NUM_BATCHES:-200}
TIME=${TIME:-01:00:00}
MEM=${MEM:-64G}
GPUS=${GPUS:-1}
SERIAL=${SERIAL:-1} # 1 = run one job at a time (correct for bandwidth measurement)
CPU_WORKERS=${CPU_WORKERS:-8}
GPU_WORKERS=${GPU_WORKERS:-2} # low on purpose: each cuda worker holds a CUDA context + NVDEC session
CPU_BUFFER=${CPU_BUFFER:-64} # episode pool size (whole episodes per consumer; tabular-only RAM)
GPU_BUFFER=${GPU_BUFFER:-32} # smaller episode pool bounds in-flight decoded frames
# Cap concurrently-open stream shards. Each open shard holds ~one parquet row group in RAM, and reading
# from an hf:// bucket buffers ~5x more per shard than hf:// datasets (~1.2GB vs ~0.26GB). So for bucket
# sources default to num_workers (1 shard/worker); hub keeps 16. Override globally with MAX_SHARDS.
MAX_SHARDS=${MAX_SHARDS:-}
BATCH_SIZE=${BATCH_SIZE:-64}
PREFETCH=${PREFETCH:-2} # DataLoader batches prefetched per worker (higher = more throughput + RAM)
RUN=${RUN:-python}
# CONDA_ENV=<name> runs each job via `conda run -n <name>` (no activation needed inside the dash --wrap;
# --no-capture-output streams logs live). Set this to a conda env that has a MODERN torchcodec (>=0.11)
# + datasets (>=4.7) — the default `base` env on many clusters is too old to decode AV1 / lacks CUDA.
CONDA_ENV=${CONDA_ENV:-}
if [ -n "$CONDA_ENV" ] && [ "$RUN" = "python" ]; then
RUN="conda run --no-capture-output -n $CONDA_ENV python"
fi
SOURCES=${SOURCES:-"hub bucket warmed_bucket"}
MODES=${MODES:-"single sarm"}
DECODES=${DECODES:-"cpu cuda"}
mkdir -p "$REPO_DIR/logs" "$REPO_DIR/$OUT_DIR"
data_root_for () {
case "$1" in
hub) echo "" ;;
bucket) echo "$BUCKET" ;;
warmed_bucket) echo "$WARM_BUCKET" ;;
esac
}
n=0
prev_jid=""
for SOURCE in $SOURCES; do
DATA_ROOT=$(data_root_for "$SOURCE")
ROOTFLAG=""
[ -n "$DATA_ROOT" ] && ROOTFLAG="--data_files_root $DATA_ROOT"
for MODE in $MODES; do
for DECODE in $DECODES; do
if [ "$DECODE" = cpu ]; then W=$CPU_WORKERS; B=$CPU_BUFFER; else W=$GPU_WORKERS; B=$GPU_BUFFER; fi
if [ -n "$MAX_SHARDS" ]; then S=$MAX_SHARDS; elif [ "$SOURCE" = hub ]; then S=16; else S=$W; fi
# Run strictly after the previous job so only one job touches the network at a time.
DEPFLAG=""
if [ "$SERIAL" = 1 ] && [ -n "$prev_jid" ]; then DEPFLAG="--dependency=afterany:$prev_jid"; fi
jid=$(sbatch --parsable \
--job-name="bench_${SOURCE}_${MODE}_${DECODE}" \
--nodes=1 --ntasks=1 --gpus="$GPUS" --cpus-per-task=$((W + 4)) \
--mem="$MEM" --time="$TIME" --output="$REPO_DIR/logs/%x-%j.out" \
$DEPFLAG \
${ACCOUNT:+--account=$ACCOUNT} ${PARTITION:+--partition=$PARTITION} ${QOS:+--qos=$QOS} \
--wrap "cd '$REPO_DIR' && \
export TOKENIZERS_PARALLELISM=false && export HF_HOME=\${HF_HOME:-\$SCRATCH/hf_home} && \
$RUN benchmarks/streaming/benchmark_streaming.py \
--repo_id $REPO_ID $ROOTFLAG \
--mode $MODE --source $SOURCE --video_decode_device $DECODE \
--batch_size $BATCH_SIZE --num_workers $W --prefetch_factor $PREFETCH \
--episode_pool_size $B --max_num_shards $S \
--num_batches $NUM_BATCHES --out_dir $OUT_DIR")
jid=${jid%%;*} # strip ';cluster' suffix on federated setups
echo "submitted job $jid bench_${SOURCE}_${MODE}_${DECODE}${DEPFLAG:+ (after $prev_jid)}"
prev_jid=$jid
n=$((n + 1))
done
done
done
echo
echo "Submitted $n jobs ($([ "$SERIAL" = 1 ] && echo 'serial chain — one runs at a time' || echo 'parallel'))."
echo "Watch: squeue -u \$USER (later jobs show reason '(Dependency)' until their turn)"
echo "Results: $OUT_DIR/<source>_<mode>_bs${BATCH_SIZE}_w<workers>_pf<prefetch>_<decode>.{json,csv}"
echo "Summarize when done: $RUN benchmarks/streaming/summarize_results.py $OUT_DIR"
-49
View File
@@ -1,49 +0,0 @@
#!/bin/bash
#SBATCH --job-name=stream_robocasa
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-node=8
#SBATCH --cpus-per-task=96
#SBATCH --exclusive
#SBATCH --time=24:00:00
#SBATCH --output=logs/%x-%j.out
# Multinode streaming training over a large HF-hosted RoboCasa dataset (never touches local disk).
# Launches examples/scaling/train_streaming_multinode.py with Accelerate. Each rank streams a disjoint
# set of shards via split_dataset_by_node (auto-resolved from the Accelerate state), so per-node
# throughput scales independently. For an even split, ensure n_shards % (nodes * gpus_per_node) == 0.
#
# Submit with: sbatch slurm/train_streaming_robocasa.sh
set -euo pipefail
REPO_ID=${REPO_ID:-pepijn223/robocasa_pretrain_human300_v4}
GPUS_PER_NODE=8
NUM_PROCESSES=$((SLURM_NNODES * GPUS_PER_NODE))
# Rendezvous: use the first node in the allocation as the main process.
MAIN_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n1)
MAIN_PORT=${MAIN_PORT:-29500}
export HF_HOME=${HF_HOME:-$SCRATCH/hf_home}
# Avoid each rank fighting over the tokenizers' internal thread pool.
export TOKENIZERS_PARALLELISM=false
srun --kill-on-bad-exit=1 bash -c '
accelerate launch \
--num_machines '"$SLURM_NNODES"' \
--num_processes '"$NUM_PROCESSES"' \
--machine_rank $SLURM_NODEID \
--main_process_ip '"$MAIN_ADDR"' \
--main_process_port '"$MAIN_PORT"' \
--mixed_precision bf16 \
--dynamo_backend no \
examples/scaling/train_streaming_multinode.py \
--repo_id '"$REPO_ID"' \
--batch_size 64 \
--num_workers 12 \
--episode_pool_size 64 \
--steps 200000 \
--save_freq 2000 \
--log_freq 50
'