diff --git a/benchmarks/streaming/README.md b/benchmarks/streaming/README.md deleted file mode 100644 index cce36f329..000000000 --- a/benchmarks/streaming/README.md +++ /dev/null @@ -1,91 +0,0 @@ -# Streaming dataloading benchmark - -Measures **dataloading only** (no model) for `StreamingLeRobotDataset`: parquet read + video decode + -delta windowing + shuffle. A dummy consumer pulls batches and moves them to the device, so the numbers -isolate the data pipeline. Use it to compare sources (Hub vs. storage bucket vs. prewarmed bucket), -frame modes, and node counts, and to catch p95/p99 video-decode regressions. - -## Run - -```bash -python benchmarks/streaming/benchmark_streaming.py \ - --repo_id pepijn223/robocasa_pretrain_human300_v4 \ - --mode sarm --batch_size 64 --num_workers 12 --num_batches 200 \ - --source hub --out_dir benchmarks/streaming/results -``` - -Multinode (per-node throughput) goes through Accelerate under SLURM: - -```bash -sbatch slurm/benchmark_streaming_robocasa.sh -``` - -## Matrix - -| Axis | Values | -| ---------- | -------------------------------------------------------------------------------------------------------------------- | -| Source | `hub` (verify now), `bucket`, `warmed_bucket` (bucket + prewarming; with user's help later) | -| Baseline | current `main` `StreamingLeRobotDataset` on Hub streaming | -| Nodes | 1 and 2 (per-node throughput should be independent) | -| Frame mode | `single` (1 frame, all cameras; target ≥ 120 frames/s/node) · `sarm` (8 steps spaced 1s; target ≥ 320 frames/s/node) | - -`--source` is a label only; the actual source is whatever `--repo_id` / `--root` / `--data_files_root` -point at. - -### GPU (NVDEC) decoding - -By default video is decoded on the **CPU** in each DataLoader worker, so throughput is CPU-decode-bound and -scales with `--num_workers` (capped by the dataset's `num_shards`). Pass `--video_decode_device cuda` to -offload H.264/H.265 decode to the GPU's dedicated **NVDEC** engine, which runs independently of the SMs used -for training (see ). This requires a CUDA-enabled torchcodec -build, and because CUDA cannot initialize in forked workers the benchmark switches to the `spawn` start -method automatically when `--num_workers > 0`. - -```bash -# GPU/NVDEC decode, 6 workers, bucket source -python benchmarks/streaming/benchmark_streaming.py \ - --repo_id pepijn223/robocasa_pretrain_human300_v4 \ - --data_files_root hf://buckets/pepijn223/robocasa-stream \ - --mode sarm --batch_size 64 --num_workers 6 --num_batches 200 \ - --video_decode_device cuda --source bucket -``` - -Caveats with `cuda` + many workers: each worker creates its own CUDA context (VRAM overhead) and NVDEC has a -limited number of concurrent decode sessions per GPU; if you hit session/IPC limits, reduce `--num_workers` -or compare against `--num_workers 0` (single-process NVDEC, which often saturates the decode engine on its -own). Result files include the decode device in their name (`..._w6_cuda.json`). - -> **Codec ⇄ NVDEC compatibility (important).** NVDEC can only decode codecs its hardware supports. LeRobot's -> **default video codec is AV1** (`VideoEncoderConfig.vcodec = "libsvtav1"`), so most v3 datasets are -> AV1-encoded — and the **A100 and H100 compute GPUs have no AV1 NVDEC decoder** -> (per NVIDIA's [decode support matrix](https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new)); -> only Ada (L4/L40/RTX40) and a few Ampere cards (A10/A40/A16) do. On A100/H100, AV1 must be decoded on -> **CPU**, or the dataset re-encoded to H.265/H.264 (which those GPUs' NVDEC do support). Run -> `diagnose_decode.py --video_decode_device cuda` to check your exact node before relying on `cuda` decode. -> A `cuda` torchcodec build also needs an FFmpeg with NVDEC; see -> . - -Reference data root: bucket sources resolve through `--data_files_root hf://buckets//` (metadata -still loads from `--repo_id`). The local `single`/`sarm` CPU baselines on this dataset were ~176 / ~212 -frames/s/node at `--num_workers 3` (3 cameras, fps 20). - -## Metrics emitted (JSON + CSV) - -`frames_per_s_node`, `samples_per_s`, `first_batch_latency_s`, `p50/p95/p99_sample_latency_ms`, -`wallclock_s`, and `video_decoder_cache` (`hits`, `misses`, `evictions`, `hit_rate`, `size`). A low -cache `hit_rate` with high `p99` is the decoder-thrash signature — raise `--video_decoder_cache_size` -or `--episode_pool_size`, or reduce `num_workers`. - -## Bucket sources & prewarming (manual) - -Prewarming is a **server-side** Hugging Face storage-bucket feature — there is no client script. To -benchmark the `warmed_bucket` source: - -1. Attach a storage bucket to the dataset and enable it (see - ). Buckets resolve through `fsspec`, the same as - `hf://`, so no code change is needed — point `--repo_id`/`--revision` (or `--root`) at the bucket. -2. Enable **prewarming** in the bucket settings and wait for warm-up to complete. -3. Run the benchmark with `--source warmed_bucket`. Compare against the cold `--source bucket` and the - `--source hub` baseline. - -Manual only — not run in CI. diff --git a/benchmarks/streaming/benchmark_streaming.py b/benchmarks/streaming/benchmark_streaming.py deleted file mode 100644 index dc2a2456b..000000000 --- a/benchmarks/streaming/benchmark_streaming.py +++ /dev/null @@ -1,322 +0,0 @@ -# Copyright 2025 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Dataloading-only benchmark for StreamingLeRobotDataset. - -A dummy consumer pulls batches and moves them to the device; no model runs, so the numbers isolate the -data pipeline (parquet read + video decode + delta windowing + shuffle). Reports per-node throughput and -sample-latency percentiles, plus video-decoder-cache reuse stats, and emits JSON + CSV. - -Frame modes (matching the streaming design targets): - - ``single``: one frame, all cameras (target >= 120 frames/s/node). - - ``sarm``: an 8-step window spaced 1s (delta over 8s) (target >= 320 frames/s/node). - -Example (stream from the Hub, single node): - - python benchmarks/streaming/benchmark_streaming.py \ - --repo_id pepijn223/robocasa_pretrain_human300_v4 --mode sarm \ - --batch_size 64 --num_workers 12 --num_batches 200 --out_dir benchmarks/streaming/results - -Distributed / multinode runs go through Accelerate; see ``slurm/benchmark_streaming_robocasa.sh``. Set -``--source`` purely for labeling the output (``hub`` / ``bucket`` / ``warmed_bucket``); the actual source -is whatever ``--repo_id``/``--root`` point at. See the README for bucket prewarming. -""" - -import argparse -import csv -import json -import os -import statistics -import threading -import time -from pathlib import Path - -import torch -from torch.utils.data import DataLoader - -from lerobot.datasets import LeRobotDatasetMetadata, StreamingLeRobotDataset -from lerobot.utils.constants import ACTION - - -def _tree_rss_bytes() -> int: - """Sum RSS of this process and all its descendants via /proc (Linux only; 0 elsewhere). - - DataLoader workers are separate processes, so the parent's own RSS misses most of the pipeline's - memory. Walking the process tree captures the real footprint (parquet buffers + decoders + shuffle). - """ - try: - children: dict[int, list[int]] = {} - for entry in os.listdir("/proc"): - if not entry.isdigit(): - continue - try: - with open(f"/proc/{entry}/stat") as f: - ppid = int(f.read().split(") ", 1)[1].split()[1]) - children.setdefault(ppid, []).append(int(entry)) - except (OSError, ValueError, IndexError): - pass - total, stack = 0, [os.getpid()] - while stack: - cur = stack.pop() - try: - with open(f"/proc/{cur}/statm") as f: - total += int(f.read().split()[1]) * os.sysconf("SC_PAGE_SIZE") - except (OSError, ValueError, IndexError): - pass - stack.extend(children.get(cur, [])) - return total - except OSError: - return 0 - - -class PeakRSSSampler: - """Background thread tracking peak process-tree RSS for the duration of the `with` block.""" - - def __init__(self, interval_s: float = 0.5): - self.interval_s = interval_s - self.peak_bytes = 0 - self._stop = threading.Event() - self._thread = threading.Thread(target=self._run, daemon=True) - - def _run(self) -> None: - while not self._stop.is_set(): - self.peak_bytes = max(self.peak_bytes, _tree_rss_bytes()) - self._stop.wait(self.interval_s) - - def __enter__(self) -> "PeakRSSSampler": - self._thread.start() - return self - - def __exit__(self, *exc) -> None: - self._stop.set() - self._thread.join(timeout=2) - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument("--repo_id", type=str, required=True) - parser.add_argument("--root", type=str, default=None, help="Local/prewarmed root (else stream from Hub).") - parser.add_argument( - "--data_files_root", - type=str, - default=None, - help="fsspec root for bulk data/videos, e.g. hf://buckets//. Metadata still loads " - "from --repo_id on the Hub. Use for bucket / warmed_bucket sources.", - ) - parser.add_argument("--mode", choices=["single", "sarm"], default="single") - parser.add_argument("--source", type=str, default="hub", help="Label only: hub | bucket | warmed_bucket.") - parser.add_argument("--batch_size", type=int, default=64) - parser.add_argument("--num_workers", type=int, default=8) - parser.add_argument( - "--prefetch_factor", - type=int, - default=2, - help="DataLoader batches prefetched per worker. Higher hides IO/decode latency but raises RAM " - "(prefetch_factor x num_workers x batch_size decoded frames held in flight). Ignored if num_workers=0.", - ) - parser.add_argument("--buffer_size", type=int, default=None, help="Deprecated; ignored.") - parser.add_argument( - "--max_num_shards", - type=int, - default=16, - help="Cap on concurrently-open stream shards. Each open shard holds ~one parquet row group in " - "RAM; reading from an hf:// bucket buffers ~5x more per shard than hf:// datasets, so lower this " - "(e.g. to num_workers) for bucket sources to avoid OOM. All data is still covered via re-sharding.", - ) - parser.add_argument("--video_decoder_cache_size", type=int, default=None) - parser.add_argument( - "--episode_pool_size", - type=int, - default=64, - help="Whole episodes each consumer keeps open to shuffle across (the randomness knob).", - ) - parser.add_argument( - "--video_decode_device", - type=str, - default="cpu", - help="Decode device passed to torchcodec. 'cuda' offloads decode to the GPU's NVDEC engine " - "(needs a CUDA-enabled torchcodec build). With num_workers>0 this forces the 'spawn' start method.", - ) - parser.add_argument("--num_batches", type=int, default=200) - parser.add_argument("--warmup_batches", type=int, default=5, help="Excluded from steady-state stats.") - parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu") - parser.add_argument("--out_dir", type=str, default="benchmarks/streaming/results") - return parser.parse_args() - - -def build_dataset(args: argparse.Namespace, meta: LeRobotDatasetMetadata) -> StreamingLeRobotDataset: - # sarm: an 8-step window spaced 1s => an 8s delta window (the SARM stress case). - delta_timestamps = {ACTION: [float(t) for t in range(8)]} if args.mode == "sarm" else None - return StreamingLeRobotDataset( - args.repo_id, - root=args.root, - data_files_root=args.data_files_root, - delta_timestamps=delta_timestamps, - max_num_shards=args.max_num_shards, - video_decoder_cache_size=args.video_decoder_cache_size, - video_decode_device=args.video_decode_device, - episode_pool_size=args.episode_pool_size, - tolerance_s=1e-3, - ) - - -def percentile(values: list[float], pct: float) -> float: - if not values: - return float("nan") - ordered = sorted(values) - k = max(0, min(len(ordered) - 1, int(round((pct / 100.0) * (len(ordered) - 1))))) - return ordered[k] - - -def main() -> None: - args = parse_args() - device = torch.device(args.device) - meta = LeRobotDatasetMetadata(args.repo_id, root=args.root) - dataset = build_dataset(args, meta) - - gpu_decode = args.video_decode_device.startswith("cuda") - loader = DataLoader( - dataset, - batch_size=args.batch_size, - num_workers=args.num_workers, - # GPU-decoded frames are already on the GPU, so CPU pinning is irrelevant (and pinning CUDA - # tensors errors). Pin only when decode is on CPU and we copy to a CUDA device. - pin_memory=device.type == "cuda" and not gpu_decode, - drop_last=True, - prefetch_factor=args.prefetch_factor if args.num_workers > 0 else None, - # CUDA cannot initialize in forked workers; NVDEC decode in workers needs the spawn start method. - multiprocessing_context="spawn" if gpu_decode and args.num_workers > 0 else None, - ) - - sample_latencies_ms: list[float] = [] - episodes_per_batch: list[int] = [] # shuffle-randomness proxy: distinct episodes within a batch - frames = 0 - first_batch_latency_s = None - steady_start = None # wall-clock start of the post-warmup measurement window - - t_start = time.perf_counter() - t_prev = t_start - with PeakRSSSampler() as rss: - for i, batch in enumerate(loader): - # Dummy consume: move tensors to the device, mimicking what a real trainer would do. - for value in batch.values(): - if torch.is_tensor(value): - value.to(device, non_blocking=device.type == "cuda") - now = time.perf_counter() - if first_batch_latency_s is None: - first_batch_latency_s = now - t_start - - if i == args.warmup_batches: - # Start the steady window here; the slow first batch and the prefetch queue it filled are - # excluded so throughput reflects sustained production, not draining a pre-filled queue. - steady_start = now - elif i > args.warmup_batches: - sample_latencies_ms.append((now - t_prev) / args.batch_size * 1000.0) - frames += args.batch_size - ep = batch.get("episode_index") - if torch.is_tensor(ep): - episodes_per_batch.append(int(torch.unique(ep).numel())) - t_prev = now - if i + 1 >= args.num_batches: - break - peak_rss_gb = round(rss.peak_bytes / 1e9, 2) if rss.peak_bytes else None - - now = time.perf_counter() - elapsed = now - t_start - # Wall-clock throughput over the steady window. NOT sum(inter-batch gaps): under async prefetch those - # gaps collapse to ~0 (the consumer drains a pre-filled queue) and overstate throughput by ~100x. - steady_elapsed_s = (now - steady_start) if steady_start is not None else elapsed - cache_stats = dataset.video_decoder_cache_stats() - timing = dataset.timing_stats() # cumulative decode/fetch seconds summed across workers - # Image (camera frame) resolution as decoded, e.g. [C, H, W]. Read from the dataset feature contract. - image_shape = list(meta.features[meta.video_keys[0]]["shape"]) if meta.video_keys else None - # Decode/fetch overlap in wall-clock (workers run in parallel), so normalize against the total worker - # budget (num_workers x wallclock) to express each stage as a fraction of available worker time. - worker_budget_s = max(args.num_workers, 1) * elapsed - decode_pct = round(100 * timing["decode_s_total"] / worker_budget_s, 1) if worker_budget_s else None - fetch_pct = round(100 * timing["fetch_s_total"] / worker_budget_s, 1) if worker_budget_s else None - - # A 0-frame run is a failure, not a 0-throughput result: the pipeline produced no batches (decode - # error swallowed in workers, all batches dropped by drop_last, etc.). Exit non-zero so the job is - # never reported green with NaN/zero numbers. - if frames == 0: - raise SystemExit( - f"FAILED: measured 0 frames over {args.num_batches} requested batches " - f"(cache misses={cache_stats.get('misses', 0)}, hits={cache_stats.get('hits', 0)}). " - "The data pipeline yielded no usable batches — inspect worker logs for decode errors. " - "Try --num_workers 0 to surface the underlying exception directly." - ) - - results = { - "repo_id": args.repo_id, - "source": args.source, - "mode": args.mode, - "batch_size": args.batch_size, - "num_workers": args.num_workers, - "prefetch_factor": args.prefetch_factor if args.num_workers > 0 else None, - "buffer_size": args.buffer_size, - "episode_pool_size": args.episode_pool_size, - "episodes_per_batch_mean": round(statistics.mean(episodes_per_batch), 1) - if episodes_per_batch - else None, - # Fraction of a batch that is distinct episodes; ~1.0 ≈ map-style uniform, low ≈ correlated. - "shuffle_randomness_frac": round(statistics.mean(episodes_per_batch) / args.batch_size, 3) - if episodes_per_batch - else None, - "num_cameras": len(meta.video_keys), - "image_shape": image_shape, - "fps": meta.fps, - "device": str(device), - "video_decode_device": args.video_decode_device, - "peak_rss_gb": peak_rss_gb, - "frames_measured": frames, - "first_batch_latency_s": round(first_batch_latency_s or float("nan"), 4), - "frames_per_s_node": round(frames / steady_elapsed_s, 2) if steady_elapsed_s else 0.0, - "samples_per_s": round(frames / steady_elapsed_s, 2) if steady_elapsed_s else 0.0, - "p50_sample_latency_ms": round(statistics.median(sample_latencies_ms), 3) - if sample_latencies_ms - else None, - "p95_sample_latency_ms": round(percentile(sample_latencies_ms, 95), 3), - "p99_sample_latency_ms": round(percentile(sample_latencies_ms, 99), 3), - "total_time_s": round(elapsed, 2), - "steady_time_s": round(steady_elapsed_s, 2), - "wallclock_s": round(elapsed, 2), - "decode_s_total": timing["decode_s_total"], - "fetch_s_total": timing["fetch_s_total"], - "decode_pct_worker_time": decode_pct, - "fetch_pct_worker_time": fetch_pct, - "video_decoder_cache": cache_stats, - } - - out_dir = Path(args.out_dir) - out_dir.mkdir(parents=True, exist_ok=True) - pool_tag = f"_ep{args.episode_pool_size}" if args.episode_pool_size else "" - tag = ( - f"{args.source}_{args.mode}_bs{args.batch_size}_w{args.num_workers}" - f"_pf{args.prefetch_factor}{pool_tag}_{args.video_decode_device}" - ) - (out_dir / f"{tag}.json").write_text(json.dumps(results, indent=2)) - flat = {k: (json.dumps(v) if isinstance(v, dict) else v) for k, v in results.items()} - with open(out_dir / f"{tag}.csv", "w", newline="") as f: - writer = csv.DictWriter(f, fieldnames=list(flat)) - writer.writeheader() - writer.writerow(flat) - - print("Command config:", vars(args)) - print(json.dumps(results, indent=2)) - print(f"Wrote {out_dir / tag}.json and .csv") - - -if __name__ == "__main__": - main() diff --git a/benchmarks/streaming/diagnose_decode.py b/benchmarks/streaming/diagnose_decode.py deleted file mode 100644 index f84a1e1c6..000000000 --- a/benchmarks/streaming/diagnose_decode.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright 2025 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Isolate the streaming video-decode path — no SLURM, no DataLoader, no benchmark loop. - -Reproduces exactly what StreamingLeRobotDataset does for one video (resolve path -> fsspec.open -> -torchcodec VideoDecoder -> get one frame) and prints the environment + the first bytes of the handle, so -a decode failure ("No valid stream found in input file") can be pinpointed: bad/placeholder bytes vs a -torchcodec/ffmpeg build issue vs a device issue. - - python benchmarks/streaming/diagnose_decode.py --repo_id pepijn223/robocasa_pretrain_human300_v4 - python benchmarks/streaming/diagnose_decode.py --repo_id … --data_files_root hf://buckets// - python benchmarks/streaming/diagnose_decode.py --repo_id … --video_decode_device cuda -""" - -import argparse -import importlib.metadata as im - -import fsspec - -from lerobot.datasets import LeRobotDatasetMetadata - - -def _version(pkg: str) -> str: - try: - return im.version(pkg) - except Exception: - return "MISSING" - - -def main() -> None: - p = argparse.ArgumentParser(description=__doc__) - p.add_argument("--repo_id", required=True) - p.add_argument("--data_files_root", default=None, help="e.g. hf://buckets//") - p.add_argument("--revision", default=None) - p.add_argument("--video_decode_device", default="cpu") - p.add_argument("--episode", type=int, default=0) - args = p.parse_args() - - print("== environment ==") - for pkg in ("torchcodec", "av", "huggingface_hub", "hf_xet", "datasets", "fsspec"): - print(f" {pkg}: {_version(pkg)}") - - meta = LeRobotDatasetMetadata(args.repo_id, revision=args.revision) - video_key = meta.video_keys[0] - rel_path = meta.get_video_file_path(args.episode, video_key) - root = args.data_files_root.rstrip("/") if args.data_files_root else meta.url_root - video_path = f"{root}/{rel_path}" - print("\n== target ==") - print(f" video_key: {video_key}") - print(f" video_path: {video_path}") - - print("\n== fsspec handle ==") - try: - fh = fsspec.open(video_path).__enter__() - head = fh.read(32) - print(f" first 32 bytes (hex): {head.hex()}") - # A valid MP4/MOV has an 'ftyp' box near the start; anything else (HTML/JSON/empty) means the - # handle resolved to a placeholder or error page, not the video bytes. - looks_mp4 = b"ftyp" in head - print(f" looks like MP4 (contains 'ftyp'): {looks_mp4}") - if not looks_mp4: - print(f" !! first bytes as text: {head[:32]!r}") - fh.seek(0) - except Exception as e: - print(f" !! fsspec.open/read FAILED: {type(e).__name__}: {e}") - return - - print("\n== torchcodec VideoDecoder ==") - try: - from torchcodec.decoders import VideoDecoder - - decoder = VideoDecoder(fh, seek_mode="approximate", device=args.video_decode_device) - md = decoder.metadata - print(f" OK: {md.num_frames} frames, {md.average_fps} fps, codec={getattr(md, 'codec', '?')}") - frame = decoder.get_frames_at(indices=[0]) - print(f" decoded frame 0: shape={tuple(frame.data.shape)}, device={frame.data.device}") - print("\nDECODE OK — the streaming pipeline can read this video on this machine.") - except Exception as e: - print(f" !! VideoDecoder FAILED: {type(e).__name__}: {e}") - print( - "\nDECODE FAILED. If the bytes above look like MP4 (ftyp=True), this is a torchcodec/ffmpeg " - "build issue, NOT bad bytes. Common cause for LeRobot v3 datasets: the videos are AV1-encoded " - "(see the 'codec' line on a working machine). Then:\n" - " - CPU decode needs an ffmpeg built with an AV1 decoder (libdav1d/libaom); a build without it " - "reports 'No valid stream found'.\n" - " - GPU/NVDEC decode of AV1 is only on AV1-capable NVDEC GPUs: Ada (L4/L40/RTX40) and some " - "Ampere (A10/A40/A16). The COMPUTE GPUs A100 and H100 have NO AV1 NVDEC decoder (per NVIDIA's " - "support matrix), so no torchcodec build enables cuda decode of AV1 on them.\n" - " - 'Unsupported device: cuda (variant: ffmpeg)' instead means torchcodec was built without " - "the CUDA backend; install a CUDA-enabled wheel (see README) — but on A100/H100 that still " - "won't decode AV1.\n" - "Fix: decode on CPU, run NVDEC on an Ada GPU, or re-encode the dataset to H.265/H.264 (which " - "A100/H100 NVDEC do support).\n" - "If ftyp=False instead, the handle resolved to a placeholder/error page (auth, revision, or Xet " - "resolution) rather than the video bytes." - ) - - -if __name__ == "__main__": - main() diff --git a/benchmarks/streaming/summarize_results.py b/benchmarks/streaming/summarize_results.py deleted file mode 100755 index ab72f6f8d..000000000 --- a/benchmarks/streaming/summarize_results.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright 2025 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Collapse a directory of benchmark JSON results into one comparison table (and a combined CSV). - -python benchmarks/streaming/summarize_results.py benchmarks/streaming/results -""" - -import csv -import json -import sys -from pathlib import Path - -COLUMNS = [ - ("source", "source"), - ("mode", "mode"), - ("video_decode_device", "decode"), - ("num_workers", "workers"), - ("batch_size", "bs"), - ("frames_per_s_node", "frames/s/node"), - ("first_batch_latency_s", "first_batch_s"), - ("p50_sample_latency_ms", "p50_ms"), - ("p95_sample_latency_ms", "p95_ms"), - ("p99_sample_latency_ms", "p99_ms"), -] - - -def main() -> None: - results_dir = Path(sys.argv[1] if len(sys.argv) > 1 else "benchmarks/streaming/results") - files = sorted(results_dir.rglob("*.json")) - if not files: - print(f"No JSON results under {results_dir}") - return - - rows = [] - for f in files: - d = json.loads(f.read_text()) - d["hit_rate"] = d.get("video_decoder_cache", {}).get("hit_rate") - rows.append(d) - - rows.sort(key=lambda r: (r.get("source", ""), r.get("mode", ""), r.get("video_decode_device", ""))) - - headers = [label for _, label in COLUMNS] + ["cache_hit_rate"] - widths = {h: len(h) for h in headers} - table = [] - for r in rows: - row = {label: r.get(key, "") for key, label in COLUMNS} - row["cache_hit_rate"] = r.get("hit_rate", "") - table.append(row) - for h in headers: - widths[h] = max(widths[h], len(str(row[h]))) - - line = " ".join(h.ljust(widths[h]) for h in headers) - print(line) - print(" ".join("-" * widths[h] for h in headers)) - for row in table: - print(" ".join(str(row[h]).ljust(widths[h]) for h in headers)) - - combined = results_dir / "summary.csv" - with open(combined, "w", newline="") as fh: - writer = csv.DictWriter(fh, fieldnames=headers) - writer.writeheader() - writer.writerows(table) - print(f"\nWrote {combined}") - - -if __name__ == "__main__": - main() diff --git a/examples/scaling/train_streaming_multinode.py b/examples/scaling/train_streaming_multinode.py index f2983bfbb..5beb29b55 100644 --- a/examples/scaling/train_streaming_multinode.py +++ b/examples/scaling/train_streaming_multinode.py @@ -27,9 +27,9 @@ streaming features of :class:`StreamingLeRobotDataset`: Launch with Accelerate (single node, N GPUs): accelerate launch --num_processes=8 examples/scaling/train_streaming_multinode.py \ - --repo_id=pepijn223/robocasa_pretrain_human300_v4 --batch_size=64 + --repo_id=lerobot/droid_1.0.1 --batch_size=64 -Multinode runs use the same script under SLURM; see ``slurm/train_streaming_robocasa.sh``. +Multinode runs launch the same script with your cluster's accelerate/SLURM setup. Pass ``--dummy`` to skip the model entirely and measure pure dataloading throughput. """ diff --git a/slurm/benchmark_streaming_robocasa.sh b/slurm/benchmark_streaming_robocasa.sh deleted file mode 100644 index 01311fd37..000000000 --- a/slurm/benchmark_streaming_robocasa.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench_stream -#SBATCH --nodes=2 -#SBATCH --ntasks-per-node=1 -#SBATCH --gpus-per-node=8 -#SBATCH --cpus-per-task=96 -#SBATCH --exclusive -#SBATCH --time=02:00:00 -#SBATCH --output=logs/%x-%j.out - -# Per-node dataloading benchmark for StreamingLeRobotDataset across 1-2 nodes. Each node runs an -# independent dummy-consumer benchmark; per-node throughput should be independent (separate network). -# Results are written per (node, source, mode) under --out_dir. -# -# Submit with: sbatch slurm/benchmark_streaming_robocasa.sh -# Override the source label for cold/warm bucket runs: SOURCE=warmed_bucket sbatch slurm/benchmark_streaming_robocasa.sh - -set -euo pipefail - -REPO_ID=${REPO_ID:-pepijn223/robocasa_pretrain_human300_v4} -SOURCE=${SOURCE:-hub} -OUT_DIR=${OUT_DIR:-benchmarks/streaming/results} - -export HF_HOME=${HF_HOME:-$SCRATCH/hf_home} -export TOKENIZERS_PARALLELISM=false - -# One benchmark process per node (each saturates the node's DataLoader workers + network independently). -srun --kill-on-bad-exit=1 bash -c ' -for MODE in single sarm; do - python benchmarks/streaming/benchmark_streaming.py \ - --repo_id '"$REPO_ID"' \ - --source '"$SOURCE"' \ - --mode $MODE \ - --batch_size 64 \ - --num_workers 12 \ - --episode_pool_size 64 \ - --num_batches 300 \ - --out_dir '"$OUT_DIR"'/node${SLURM_NODEID} -done -' diff --git a/slurm/run_streaming_matrix.sh b/slurm/run_streaming_matrix.sh deleted file mode 100755 index ca236ab99..000000000 --- a/slurm/run_streaming_matrix.sh +++ /dev/null @@ -1,107 +0,0 @@ -#!/bin/bash -# Submit the FULL streaming dataloading-benchmark matrix as isolated single-GPU SLURM jobs. -# -# sources : hub (Hub streaming) | bucket (cold HF bucket) | warmed_bucket (prewarmed HF bucket) -# modes : single (1 frame, all cameras) | sarm (8-step / 8s delta window) -# decode : cpu (torchcodec on CPU, scales with workers) | cuda (NVDEC, offloads decode to the GPU) -# -# => 3 x 2 x 2 = 12 jobs. Each runs in its OWN job (1 node, 1 GPU) so an OOM is isolated and reported -# per-job by SLURM (check `sacct -j --format=JobID,State,MaxRSS,ReqMem`). Submit from a login node -# inside the repo: bash slurm/run_streaming_matrix.sh -# -# SERIAL (default 1): chain the jobs with --dependency=afterany so SLURM runs exactly ONE at a time. This -# is important for a bandwidth benchmark — concurrent jobs would share the network to the Hub/bucket and -# corrupt every throughput number. `afterany` means a failed/OOM'd job does not stall the chain. Set -# SERIAL=0 to let the scheduler run them in parallel (only for OOM-isolation testing, not for throughput). -# -# Knobs (env overrides): -# REPO_ID, BUCKET, WARM_BUCKET, OUT_DIR, NUM_BATCHES, TIME, MEM, GPUS, SERIAL -# CPU_WORKERS / CPU_BUFFER (cpu-decode jobs) GPU_WORKERS / GPU_BUFFER (cuda-decode jobs, kept low to -# bound VRAM + NVDEC sessions). RUN ("python" by default; set RUN="uv run python" if using uv). -# SOURCES / MODES / DECODES to run a subset (e.g. SOURCES="hub bucket" DECODES="cpu"). -# ACCOUNT / PARTITION / QOS passed through to sbatch if set. -set -euo pipefail - -REPO_DIR=$(git rev-parse --show-toplevel) -REPO_ID=${REPO_ID:-pepijn223/robocasa_pretrain_human300_v4} -BUCKET=${BUCKET:-hf://buckets/pepijn223/robocasa-stream} -WARM_BUCKET=${WARM_BUCKET:-hf://buckets/pepijn223/robocasa-stream-warm} -OUT_DIR=${OUT_DIR:-benchmarks/streaming/results} -NUM_BATCHES=${NUM_BATCHES:-200} -TIME=${TIME:-01:00:00} -MEM=${MEM:-64G} -GPUS=${GPUS:-1} -SERIAL=${SERIAL:-1} # 1 = run one job at a time (correct for bandwidth measurement) -CPU_WORKERS=${CPU_WORKERS:-8} -GPU_WORKERS=${GPU_WORKERS:-2} # low on purpose: each cuda worker holds a CUDA context + NVDEC session -CPU_BUFFER=${CPU_BUFFER:-64} # episode pool size (whole episodes per consumer; tabular-only RAM) -GPU_BUFFER=${GPU_BUFFER:-32} # smaller episode pool bounds in-flight decoded frames -# Cap concurrently-open stream shards. Each open shard holds ~one parquet row group in RAM, and reading -# from an hf:// bucket buffers ~5x more per shard than hf:// datasets (~1.2GB vs ~0.26GB). So for bucket -# sources default to num_workers (1 shard/worker); hub keeps 16. Override globally with MAX_SHARDS. -MAX_SHARDS=${MAX_SHARDS:-} -BATCH_SIZE=${BATCH_SIZE:-64} -PREFETCH=${PREFETCH:-2} # DataLoader batches prefetched per worker (higher = more throughput + RAM) -RUN=${RUN:-python} -# CONDA_ENV= runs each job via `conda run -n ` (no activation needed inside the dash --wrap; -# --no-capture-output streams logs live). Set this to a conda env that has a MODERN torchcodec (>=0.11) -# + datasets (>=4.7) — the default `base` env on many clusters is too old to decode AV1 / lacks CUDA. -CONDA_ENV=${CONDA_ENV:-} -if [ -n "$CONDA_ENV" ] && [ "$RUN" = "python" ]; then - RUN="conda run --no-capture-output -n $CONDA_ENV python" -fi - -SOURCES=${SOURCES:-"hub bucket warmed_bucket"} -MODES=${MODES:-"single sarm"} -DECODES=${DECODES:-"cpu cuda"} - -mkdir -p "$REPO_DIR/logs" "$REPO_DIR/$OUT_DIR" - -data_root_for () { - case "$1" in - hub) echo "" ;; - bucket) echo "$BUCKET" ;; - warmed_bucket) echo "$WARM_BUCKET" ;; - esac -} - -n=0 -prev_jid="" -for SOURCE in $SOURCES; do - DATA_ROOT=$(data_root_for "$SOURCE") - ROOTFLAG="" - [ -n "$DATA_ROOT" ] && ROOTFLAG="--data_files_root $DATA_ROOT" - for MODE in $MODES; do - for DECODE in $DECODES; do - if [ "$DECODE" = cpu ]; then W=$CPU_WORKERS; B=$CPU_BUFFER; else W=$GPU_WORKERS; B=$GPU_BUFFER; fi - if [ -n "$MAX_SHARDS" ]; then S=$MAX_SHARDS; elif [ "$SOURCE" = hub ]; then S=16; else S=$W; fi - # Run strictly after the previous job so only one job touches the network at a time. - DEPFLAG="" - if [ "$SERIAL" = 1 ] && [ -n "$prev_jid" ]; then DEPFLAG="--dependency=afterany:$prev_jid"; fi - jid=$(sbatch --parsable \ - --job-name="bench_${SOURCE}_${MODE}_${DECODE}" \ - --nodes=1 --ntasks=1 --gpus="$GPUS" --cpus-per-task=$((W + 4)) \ - --mem="$MEM" --time="$TIME" --output="$REPO_DIR/logs/%x-%j.out" \ - $DEPFLAG \ - ${ACCOUNT:+--account=$ACCOUNT} ${PARTITION:+--partition=$PARTITION} ${QOS:+--qos=$QOS} \ - --wrap "cd '$REPO_DIR' && \ - export TOKENIZERS_PARALLELISM=false && export HF_HOME=\${HF_HOME:-\$SCRATCH/hf_home} && \ - $RUN benchmarks/streaming/benchmark_streaming.py \ - --repo_id $REPO_ID $ROOTFLAG \ - --mode $MODE --source $SOURCE --video_decode_device $DECODE \ - --batch_size $BATCH_SIZE --num_workers $W --prefetch_factor $PREFETCH \ - --episode_pool_size $B --max_num_shards $S \ - --num_batches $NUM_BATCHES --out_dir $OUT_DIR") - jid=${jid%%;*} # strip ';cluster' suffix on federated setups - echo "submitted job $jid bench_${SOURCE}_${MODE}_${DECODE}${DEPFLAG:+ (after $prev_jid)}" - prev_jid=$jid - n=$((n + 1)) - done - done -done - -echo -echo "Submitted $n jobs ($([ "$SERIAL" = 1 ] && echo 'serial chain — one runs at a time' || echo 'parallel'))." -echo "Watch: squeue -u \$USER (later jobs show reason '(Dependency)' until their turn)" -echo "Results: $OUT_DIR/__bs${BATCH_SIZE}_w_pf_.{json,csv}" -echo "Summarize when done: $RUN benchmarks/streaming/summarize_results.py $OUT_DIR" diff --git a/slurm/train_streaming_robocasa.sh b/slurm/train_streaming_robocasa.sh deleted file mode 100644 index f71219dc5..000000000 --- a/slurm/train_streaming_robocasa.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=stream_robocasa -#SBATCH --nodes=2 -#SBATCH --ntasks-per-node=1 -#SBATCH --gpus-per-node=8 -#SBATCH --cpus-per-task=96 -#SBATCH --exclusive -#SBATCH --time=24:00:00 -#SBATCH --output=logs/%x-%j.out - -# Multinode streaming training over a large HF-hosted RoboCasa dataset (never touches local disk). -# Launches examples/scaling/train_streaming_multinode.py with Accelerate. Each rank streams a disjoint -# set of shards via split_dataset_by_node (auto-resolved from the Accelerate state), so per-node -# throughput scales independently. For an even split, ensure n_shards % (nodes * gpus_per_node) == 0. -# -# Submit with: sbatch slurm/train_streaming_robocasa.sh - -set -euo pipefail - -REPO_ID=${REPO_ID:-pepijn223/robocasa_pretrain_human300_v4} -GPUS_PER_NODE=8 -NUM_PROCESSES=$((SLURM_NNODES * GPUS_PER_NODE)) - -# Rendezvous: use the first node in the allocation as the main process. -MAIN_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n1) -MAIN_PORT=${MAIN_PORT:-29500} - -export HF_HOME=${HF_HOME:-$SCRATCH/hf_home} -# Avoid each rank fighting over the tokenizers' internal thread pool. -export TOKENIZERS_PARALLELISM=false - -srun --kill-on-bad-exit=1 bash -c ' -accelerate launch \ - --num_machines '"$SLURM_NNODES"' \ - --num_processes '"$NUM_PROCESSES"' \ - --machine_rank $SLURM_NODEID \ - --main_process_ip '"$MAIN_ADDR"' \ - --main_process_port '"$MAIN_PORT"' \ - --mixed_precision bf16 \ - --dynamo_backend no \ - examples/scaling/train_streaming_multinode.py \ - --repo_id '"$REPO_ID"' \ - --batch_size 64 \ - --num_workers 12 \ - --episode_pool_size 64 \ - --steps 200000 \ - --save_freq 2000 \ - --log_freq 50 -'