diff --git a/slurm/run_streaming_matrix.sh b/slurm/run_streaming_matrix.sh index 85527b978..5e9ad6ae8 100755 --- a/slurm/run_streaming_matrix.sh +++ b/slurm/run_streaming_matrix.sh @@ -9,8 +9,13 @@ # per-job by SLURM (check `sacct -j --format=JobID,State,MaxRSS,ReqMem`). Submit from a login node # inside the repo: bash slurm/run_streaming_matrix.sh # +# SERIAL (default 1): chain the jobs with --dependency=afterany so SLURM runs exactly ONE at a time. This +# is important for a bandwidth benchmark — concurrent jobs would share the network to the Hub/bucket and +# corrupt every throughput number. `afterany` means a failed/OOM'd job does not stall the chain. Set +# SERIAL=0 to let the scheduler run them in parallel (only for OOM-isolation testing, not for throughput). +# # Knobs (env overrides): -# REPO_ID, BUCKET, WARM_BUCKET, OUT_DIR, NUM_BATCHES, TIME, MEM, GPUS +# REPO_ID, BUCKET, WARM_BUCKET, OUT_DIR, NUM_BATCHES, TIME, MEM, GPUS, SERIAL # CPU_WORKERS / CPU_BUFFER (cpu-decode jobs) GPU_WORKERS / GPU_BUFFER (cuda-decode jobs, kept low to # bound VRAM + NVDEC sessions). RUN ("python" by default; set RUN="uv run python" if using uv). # SOURCES / MODES / DECODES to run a subset (e.g. SOURCES="hub bucket" DECODES="cpu"). @@ -26,6 +31,7 @@ NUM_BATCHES=${NUM_BATCHES:-200} TIME=${TIME:-01:00:00} MEM=${MEM:-64G} GPUS=${GPUS:-1} +SERIAL=${SERIAL:-1} # 1 = run one job at a time (correct for bandwidth measurement) CPU_WORKERS=${CPU_WORKERS:-8} GPU_WORKERS=${GPU_WORKERS:-2} # low on purpose: each cuda worker holds a CUDA context + NVDEC session CPU_BUFFER=${CPU_BUFFER:-4000} @@ -48,6 +54,7 @@ data_root_for () { } n=0 +prev_jid="" for SOURCE in $SOURCES; do DATA_ROOT=$(data_root_for "$SOURCE") ROOTFLAG="" @@ -55,10 +62,14 @@ for SOURCE in $SOURCES; do for MODE in $MODES; do for DECODE in $DECODES; do if [ "$DECODE" = cpu ]; then W=$CPU_WORKERS; B=$CPU_BUFFER; else W=$GPU_WORKERS; B=$GPU_BUFFER; fi - sbatch \ + # Run strictly after the previous job so only one job touches the network at a time. + DEPFLAG="" + if [ "$SERIAL" = 1 ] && [ -n "$prev_jid" ]; then DEPFLAG="--dependency=afterany:$prev_jid"; fi + jid=$(sbatch --parsable \ --job-name="bench_${SOURCE}_${MODE}_${DECODE}" \ --nodes=1 --ntasks=1 --gpus="$GPUS" --cpus-per-task=$((W + 4)) \ --mem="$MEM" --time="$TIME" --output="$REPO_DIR/logs/%x-%j.out" \ + $DEPFLAG \ ${ACCOUNT:+--account=$ACCOUNT} ${PARTITION:+--partition=$PARTITION} ${QOS:+--qos=$QOS} \ --wrap "set -euo pipefail; cd '$REPO_DIR'; \ export TOKENIZERS_PARALLELISM=false HF_HOME=\${HF_HOME:-\$SCRATCH/hf_home}; \ @@ -66,12 +77,17 @@ for SOURCE in $SOURCES; do --repo_id $REPO_ID $ROOTFLAG \ --mode $MODE --source $SOURCE --video_decode_device $DECODE \ --batch_size $BATCH_SIZE --num_workers $W --buffer_size $B \ - --num_batches $NUM_BATCHES --out_dir $OUT_DIR" + --num_batches $NUM_BATCHES --out_dir $OUT_DIR") + jid=${jid%%;*} # strip ';cluster' suffix on federated setups + echo "submitted job $jid bench_${SOURCE}_${MODE}_${DECODE}${DEPFLAG:+ (after $prev_jid)}" + prev_jid=$jid n=$((n + 1)) done done done -echo "Submitted $n jobs. Watch: squeue -u \$USER" -echo "Results land in $OUT_DIR/__bs${BATCH_SIZE}_w_.{json,csv}" -echo "After they finish, summarize: python benchmarks/streaming/summarize_results.py $OUT_DIR" +echo +echo "Submitted $n jobs ($([ "$SERIAL" = 1 ] && echo 'serial chain — one runs at a time' || echo 'parallel'))." +echo "Watch: squeue -u \$USER (later jobs show reason '(Dependency)' until their turn)" +echo "Results: $OUT_DIR/__bs${BATCH_SIZE}_w_.{json,csv}" +echo "Summarize when done: $RUN benchmarks/streaming/summarize_results.py $OUT_DIR"