feat(streaming): serial-by-default matrix submitter (afterany dependency chain)

For a bandwidth-sensitive benchmark, concurrent jobs would share the network to the
Hub/bucket and corrupt throughput numbers. Chain the matrix jobs with
--dependency=afterany (captured via `sbatch --parsable`) so SLURM runs exactly one at a
time while keeping each config an isolated job (own log + per-job OOM reporting).
afterany keeps the chain going if one job fails/OOMs. SERIAL=0 restores parallel
submission for OOM-isolation-only testing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-06-09 15:55:58 +02:00
parent a32a2c647b
commit afdc084677
+22 -6
View File
@@ -9,8 +9,13 @@
# per-job by SLURM (check `sacct -j <id> --format=JobID,State,MaxRSS,ReqMem`). Submit from a login node
# inside the repo: bash slurm/run_streaming_matrix.sh
#
# SERIAL (default 1): chain the jobs with --dependency=afterany so SLURM runs exactly ONE at a time. This
# is important for a bandwidth benchmark — concurrent jobs would share the network to the Hub/bucket and
# corrupt every throughput number. `afterany` means a failed/OOM'd job does not stall the chain. Set
# SERIAL=0 to let the scheduler run them in parallel (only for OOM-isolation testing, not for throughput).
#
# Knobs (env overrides):
# REPO_ID, BUCKET, WARM_BUCKET, OUT_DIR, NUM_BATCHES, TIME, MEM, GPUS
# REPO_ID, BUCKET, WARM_BUCKET, OUT_DIR, NUM_BATCHES, TIME, MEM, GPUS, SERIAL
# CPU_WORKERS / CPU_BUFFER (cpu-decode jobs) GPU_WORKERS / GPU_BUFFER (cuda-decode jobs, kept low to
# bound VRAM + NVDEC sessions). RUN ("python" by default; set RUN="uv run python" if using uv).
# SOURCES / MODES / DECODES to run a subset (e.g. SOURCES="hub bucket" DECODES="cpu").
@@ -26,6 +31,7 @@ NUM_BATCHES=${NUM_BATCHES:-200}
TIME=${TIME:-01:00:00}
MEM=${MEM:-64G}
GPUS=${GPUS:-1}
SERIAL=${SERIAL:-1} # 1 = run one job at a time (correct for bandwidth measurement)
CPU_WORKERS=${CPU_WORKERS:-8}
GPU_WORKERS=${GPU_WORKERS:-2} # low on purpose: each cuda worker holds a CUDA context + NVDEC session
CPU_BUFFER=${CPU_BUFFER:-4000}
@@ -48,6 +54,7 @@ data_root_for () {
}
n=0
prev_jid=""
for SOURCE in $SOURCES; do
DATA_ROOT=$(data_root_for "$SOURCE")
ROOTFLAG=""
@@ -55,10 +62,14 @@ for SOURCE in $SOURCES; do
for MODE in $MODES; do
for DECODE in $DECODES; do
if [ "$DECODE" = cpu ]; then W=$CPU_WORKERS; B=$CPU_BUFFER; else W=$GPU_WORKERS; B=$GPU_BUFFER; fi
sbatch \
# Run strictly after the previous job so only one job touches the network at a time.
DEPFLAG=""
if [ "$SERIAL" = 1 ] && [ -n "$prev_jid" ]; then DEPFLAG="--dependency=afterany:$prev_jid"; fi
jid=$(sbatch --parsable \
--job-name="bench_${SOURCE}_${MODE}_${DECODE}" \
--nodes=1 --ntasks=1 --gpus="$GPUS" --cpus-per-task=$((W + 4)) \
--mem="$MEM" --time="$TIME" --output="$REPO_DIR/logs/%x-%j.out" \
$DEPFLAG \
${ACCOUNT:+--account=$ACCOUNT} ${PARTITION:+--partition=$PARTITION} ${QOS:+--qos=$QOS} \
--wrap "set -euo pipefail; cd '$REPO_DIR'; \
export TOKENIZERS_PARALLELISM=false HF_HOME=\${HF_HOME:-\$SCRATCH/hf_home}; \
@@ -66,12 +77,17 @@ for SOURCE in $SOURCES; do
--repo_id $REPO_ID $ROOTFLAG \
--mode $MODE --source $SOURCE --video_decode_device $DECODE \
--batch_size $BATCH_SIZE --num_workers $W --buffer_size $B \
--num_batches $NUM_BATCHES --out_dir $OUT_DIR"
--num_batches $NUM_BATCHES --out_dir $OUT_DIR")
jid=${jid%%;*} # strip ';cluster' suffix on federated setups
echo "submitted job $jid bench_${SOURCE}_${MODE}_${DECODE}${DEPFLAG:+ (after $prev_jid)}"
prev_jid=$jid
n=$((n + 1))
done
done
done
echo "Submitted $n jobs. Watch: squeue -u \$USER"
echo "Results land in $OUT_DIR/<source>_<mode>_bs${BATCH_SIZE}_w<workers>_<decode>.{json,csv}"
echo "After they finish, summarize: python benchmarks/streaming/summarize_results.py $OUT_DIR"
echo
echo "Submitted $n jobs ($([ "$SERIAL" = 1 ] && echo 'serial chain — one runs at a time' || echo 'parallel'))."
echo "Watch: squeue -u \$USER (later jobs show reason '(Dependency)' until their turn)"
echo "Results: $OUT_DIR/<source>_<mode>_bs${BATCH_SIZE}_w<workers>_<decode>.{json,csv}"
echo "Summarize when done: $RUN benchmarks/streaming/summarize_results.py $OUT_DIR"