diff --git a/benchmarks/streaming/summarize_results.py b/benchmarks/streaming/summarize_results.py new file mode 100755 index 000000000..ab72f6f8d --- /dev/null +++ b/benchmarks/streaming/summarize_results.py @@ -0,0 +1,79 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Collapse a directory of benchmark JSON results into one comparison table (and a combined CSV). + +python benchmarks/streaming/summarize_results.py benchmarks/streaming/results +""" + +import csv +import json +import sys +from pathlib import Path + +COLUMNS = [ + ("source", "source"), + ("mode", "mode"), + ("video_decode_device", "decode"), + ("num_workers", "workers"), + ("batch_size", "bs"), + ("frames_per_s_node", "frames/s/node"), + ("first_batch_latency_s", "first_batch_s"), + ("p50_sample_latency_ms", "p50_ms"), + ("p95_sample_latency_ms", "p95_ms"), + ("p99_sample_latency_ms", "p99_ms"), +] + + +def main() -> None: + results_dir = Path(sys.argv[1] if len(sys.argv) > 1 else "benchmarks/streaming/results") + files = sorted(results_dir.rglob("*.json")) + if not files: + print(f"No JSON results under {results_dir}") + return + + rows = [] + for f in files: + d = json.loads(f.read_text()) + d["hit_rate"] = d.get("video_decoder_cache", {}).get("hit_rate") + rows.append(d) + + rows.sort(key=lambda r: (r.get("source", ""), r.get("mode", ""), r.get("video_decode_device", ""))) + + headers = [label for _, label in COLUMNS] + ["cache_hit_rate"] + widths = {h: len(h) for h in headers} + table = [] + for r in rows: + row = {label: r.get(key, "") for key, label in COLUMNS} + row["cache_hit_rate"] = r.get("hit_rate", "") + table.append(row) + for h in headers: + widths[h] = max(widths[h], len(str(row[h]))) + + line = " ".join(h.ljust(widths[h]) for h in headers) + print(line) + print(" ".join("-" * widths[h] for h in headers)) + for row in table: + print(" ".join(str(row[h]).ljust(widths[h]) for h in headers)) + + combined = results_dir / "summary.csv" + with open(combined, "w", newline="") as fh: + writer = csv.DictWriter(fh, fieldnames=headers) + writer.writeheader() + writer.writerows(table) + print(f"\nWrote {combined}") + + +if __name__ == "__main__": + main() diff --git a/slurm/run_streaming_matrix.sh b/slurm/run_streaming_matrix.sh new file mode 100755 index 000000000..85527b978 --- /dev/null +++ b/slurm/run_streaming_matrix.sh @@ -0,0 +1,77 @@ +#!/bin/bash +# Submit the FULL streaming dataloading-benchmark matrix as isolated single-GPU SLURM jobs. +# +# sources : hub (Hub streaming) | bucket (cold HF bucket) | warmed_bucket (prewarmed HF bucket) +# modes : single (1 frame, all cameras) | sarm (8-step / 8s delta window) +# decode : cpu (torchcodec on CPU, scales with workers) | cuda (NVDEC, offloads decode to the GPU) +# +# => 3 x 2 x 2 = 12 jobs. Each runs in its OWN job (1 node, 1 GPU) so an OOM is isolated and reported +# per-job by SLURM (check `sacct -j --format=JobID,State,MaxRSS,ReqMem`). Submit from a login node +# inside the repo: bash slurm/run_streaming_matrix.sh +# +# Knobs (env overrides): +# REPO_ID, BUCKET, WARM_BUCKET, OUT_DIR, NUM_BATCHES, TIME, MEM, GPUS +# CPU_WORKERS / CPU_BUFFER (cpu-decode jobs) GPU_WORKERS / GPU_BUFFER (cuda-decode jobs, kept low to +# bound VRAM + NVDEC sessions). RUN ("python" by default; set RUN="uv run python" if using uv). +# SOURCES / MODES / DECODES to run a subset (e.g. SOURCES="hub bucket" DECODES="cpu"). +# ACCOUNT / PARTITION / QOS passed through to sbatch if set. +set -euo pipefail + +REPO_DIR=$(git rev-parse --show-toplevel) +REPO_ID=${REPO_ID:-pepijn223/robocasa_pretrain_human300_v4} +BUCKET=${BUCKET:-hf://buckets/pepijn223/robocasa-stream} +WARM_BUCKET=${WARM_BUCKET:-hf://buckets/pepijn223/robocasa-stream-warm} +OUT_DIR=${OUT_DIR:-benchmarks/streaming/results} +NUM_BATCHES=${NUM_BATCHES:-200} +TIME=${TIME:-01:00:00} +MEM=${MEM:-64G} +GPUS=${GPUS:-1} +CPU_WORKERS=${CPU_WORKERS:-8} +GPU_WORKERS=${GPU_WORKERS:-2} # low on purpose: each cuda worker holds a CUDA context + NVDEC session +CPU_BUFFER=${CPU_BUFFER:-4000} +GPU_BUFFER=${GPU_BUFFER:-1000} # smaller buffer bounds on-GPU frame memory +BATCH_SIZE=${BATCH_SIZE:-64} +RUN=${RUN:-python} + +SOURCES=${SOURCES:-"hub bucket warmed_bucket"} +MODES=${MODES:-"single sarm"} +DECODES=${DECODES:-"cpu cuda"} + +mkdir -p "$REPO_DIR/logs" "$REPO_DIR/$OUT_DIR" + +data_root_for () { + case "$1" in + hub) echo "" ;; + bucket) echo "$BUCKET" ;; + warmed_bucket) echo "$WARM_BUCKET" ;; + esac +} + +n=0 +for SOURCE in $SOURCES; do + DATA_ROOT=$(data_root_for "$SOURCE") + ROOTFLAG="" + [ -n "$DATA_ROOT" ] && ROOTFLAG="--data_files_root $DATA_ROOT" + for MODE in $MODES; do + for DECODE in $DECODES; do + if [ "$DECODE" = cpu ]; then W=$CPU_WORKERS; B=$CPU_BUFFER; else W=$GPU_WORKERS; B=$GPU_BUFFER; fi + sbatch \ + --job-name="bench_${SOURCE}_${MODE}_${DECODE}" \ + --nodes=1 --ntasks=1 --gpus="$GPUS" --cpus-per-task=$((W + 4)) \ + --mem="$MEM" --time="$TIME" --output="$REPO_DIR/logs/%x-%j.out" \ + ${ACCOUNT:+--account=$ACCOUNT} ${PARTITION:+--partition=$PARTITION} ${QOS:+--qos=$QOS} \ + --wrap "set -euo pipefail; cd '$REPO_DIR'; \ + export TOKENIZERS_PARALLELISM=false HF_HOME=\${HF_HOME:-\$SCRATCH/hf_home}; \ + $RUN benchmarks/streaming/benchmark_streaming.py \ + --repo_id $REPO_ID $ROOTFLAG \ + --mode $MODE --source $SOURCE --video_decode_device $DECODE \ + --batch_size $BATCH_SIZE --num_workers $W --buffer_size $B \ + --num_batches $NUM_BATCHES --out_dir $OUT_DIR" + n=$((n + 1)) + done + done +done + +echo "Submitted $n jobs. Watch: squeue -u \$USER" +echo "Results land in $OUT_DIR/__bs${BATCH_SIZE}_w_.{json,csv}" +echo "After they finish, summarize: python benchmarks/streaming/summarize_results.py $OUT_DIR"