feat(streaming): expose video_fetch_workers in the dataloading benchmark

One-flag A/B for the fetch-concurrency fix on the cluster benchmark.

Component-level A/B against the Hub (old EpisodeByteCache @7b6f4f2b1 vs
new, decoder stubbed, 16 episodes x 3 cameras x 6 MB slices of a 518 MB
file, both run orders to bound CDN warmth):

  OLD (4 workers, sequential cams):  22.2 / 24.3 MiB/s
  NEW (16 workers, parallel cams):   47.3 / 45.1 MiB/s   (~2.0x)

on a residential link; the cluster needed 1.44x (465 -> 670 MiB/s
aggregate) to hit its 1000 samples/s target.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-07-03 08:59:30 +02:00
parent bc876949ff
commit f8f7fa0279
@@ -224,6 +224,7 @@ def build_dataset(scenario: str, args: argparse.Namespace):
episode_pool_size=args.episode_pool_size,
max_buffer_input_shards=args.max_buffer_input_shards,
video_decoder_cache_size=args.video_decoder_cache_size,
video_fetch_workers=args.video_fetch_workers,
tolerance_s=1e-3,
# Throughput benchmark: don't gate on the one-row-group-per-episode invariant (a public
# dataset may be collapsed); reshard() still yields per-episode shards where it holds.
@@ -494,6 +495,12 @@ def parse_args() -> argparse.Namespace:
p.add_argument(
"--max_episodes", type=int, default=512, help="Cap mmap_local episodes to the local share."
)
p.add_argument(
"--video_fetch_workers",
type=int,
default=16,
help="Concurrent byte-range fetch threads per consumer (the fetch-throughput knob; was 4).",
)
p.add_argument("--batch_size", type=int, default=64)
p.add_argument("--num_workers", type=int, default=8)
p.add_argument("--prefetch_factor", type=int, default=2)