From 343ecd7980fcbdebbc277ade439301e71beb499d Mon Sep 17 00:00:00 2001 From: Pepijn Date: Tue, 9 Jun 2026 15:47:11 +0200 Subject: [PATCH] feat(streaming): optional GPU (NVDEC) video decode device Add `video_decode_device` to StreamingLeRobotDataset and a `device` arg to VideoDecoderCache, passed to torchcodec's VideoDecoder. "cuda" offloads H.264/H.265 decode to the GPU's dedicated NVDEC engine (independent of the training SMs); requires a CUDA-enabled torchcodec build. benchmark: `--video_decode_device` flag. With cuda + num_workers>0 it forces the `spawn` start method (CUDA cannot init in forked workers) and disables CPU pin_memory (frames are already on-GPU). Decode device is recorded in results and the output filename. README documents the NVDEC option and its concurrency/IPC caveats. Co-Authored-By: Claude Opus 4.8 (1M context) --- benchmarks/streaming/README.md | 30 ++++++++++++++++++++- benchmarks/streaming/benchmark_streaming.py | 18 +++++++++++-- src/lerobot/datasets/streaming_dataset.py | 19 ++++++++++--- src/lerobot/datasets/video_utils.py | 13 +++++++-- 4 files changed, 72 insertions(+), 8 deletions(-) diff --git a/benchmarks/streaming/README.md b/benchmarks/streaming/README.md index 5f8d048ed..d598e555a 100644 --- a/benchmarks/streaming/README.md +++ b/benchmarks/streaming/README.md @@ -29,7 +29,35 @@ sbatch slurm/benchmark_streaming_robocasa.sh | Nodes | 1 and 2 (per-node throughput should be independent) | | Frame mode | `single` (1 frame, all cameras; target ≥ 120 frames/s/node) · `sarm` (8 steps spaced 1s; target ≥ 320 frames/s/node) | -`--source` is a label only; the actual source is whatever `--repo_id` / `--root` point at. +`--source` is a label only; the actual source is whatever `--repo_id` / `--root` / `--data_files_root` +point at. + +### GPU (NVDEC) decoding + +By default video is decoded on the **CPU** in each DataLoader worker, so throughput is CPU-decode-bound and +scales with `--num_workers` (capped by the dataset's `num_shards`). Pass `--video_decode_device cuda` to +offload H.264/H.265 decode to the GPU's dedicated **NVDEC** engine, which runs independently of the SMs used +for training (see ). This requires a CUDA-enabled torchcodec +build, and because CUDA cannot initialize in forked workers the benchmark switches to the `spawn` start +method automatically when `--num_workers > 0`. + +```bash +# GPU/NVDEC decode, 6 workers, bucket source +python benchmarks/streaming/benchmark_streaming.py \ + --repo_id pepijn223/robocasa_pretrain_human300_v4 \ + --data_files_root hf://buckets/pepijn223/robocasa-stream \ + --mode sarm --batch_size 64 --num_workers 6 --num_batches 200 \ + --video_decode_device cuda --source bucket +``` + +Caveats with `cuda` + many workers: each worker creates its own CUDA context (VRAM overhead) and NVDEC has a +limited number of concurrent decode sessions per GPU; if you hit session/IPC limits, reduce `--num_workers` +or compare against `--num_workers 0` (single-process NVDEC, which often saturates the decode engine on its +own). Result files include the decode device in their name (`..._w6_cuda.json`). + +Reference data root: bucket sources resolve through `--data_files_root hf://buckets//` (metadata +still loads from `--repo_id`). The local `single`/`sarm` CPU baselines on this dataset were ~176 / ~212 +frames/s/node at `--num_workers 3` (3 cameras, fps 20). ## Metrics emitted (JSON + CSV) diff --git a/benchmarks/streaming/benchmark_streaming.py b/benchmarks/streaming/benchmark_streaming.py index e0f981088..4db13b345 100644 --- a/benchmarks/streaming/benchmark_streaming.py +++ b/benchmarks/streaming/benchmark_streaming.py @@ -64,6 +64,13 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--num_workers", type=int, default=8) parser.add_argument("--buffer_size", type=int, default=2000) parser.add_argument("--video_decoder_cache_size", type=int, default=None) + parser.add_argument( + "--video_decode_device", + type=str, + default="cpu", + help="Decode device passed to torchcodec. 'cuda' offloads decode to the GPU's NVDEC engine " + "(needs a CUDA-enabled torchcodec build). With num_workers>0 this forces the 'spawn' start method.", + ) parser.add_argument("--num_batches", type=int, default=200) parser.add_argument("--warmup_batches", type=int, default=5, help="Excluded from steady-state stats.") parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu") @@ -81,6 +88,7 @@ def build_dataset(args: argparse.Namespace, meta: LeRobotDatasetMetadata) -> Str delta_timestamps=delta_timestamps, buffer_size=args.buffer_size, video_decoder_cache_size=args.video_decoder_cache_size, + video_decode_device=args.video_decode_device, tolerance_s=1e-3, ) @@ -99,13 +107,18 @@ def main() -> None: meta = LeRobotDatasetMetadata(args.repo_id, root=args.root) dataset = build_dataset(args, meta) + gpu_decode = args.video_decode_device.startswith("cuda") loader = DataLoader( dataset, batch_size=args.batch_size, num_workers=args.num_workers, - pin_memory=device.type == "cuda", + # GPU-decoded frames are already on the GPU, so CPU pinning is irrelevant (and pinning CUDA + # tensors errors). Pin only when decode is on CPU and we copy to a CUDA device. + pin_memory=device.type == "cuda" and not gpu_decode, drop_last=True, prefetch_factor=2 if args.num_workers > 0 else None, + # CUDA cannot initialize in forked workers; NVDEC decode in workers needs the spawn start method. + multiprocessing_context="spawn" if gpu_decode and args.num_workers > 0 else None, ) sample_latencies_ms: list[float] = [] @@ -152,6 +165,7 @@ def main() -> None: "num_cameras": len(meta.video_keys), "fps": meta.fps, "device": str(device), + "video_decode_device": args.video_decode_device, "frames_measured": frames, "first_batch_latency_s": round(first_batch_latency_s or float("nan"), 4), "frames_per_s_node": round(frames / steady_elapsed_s, 2) if steady_elapsed_s else 0.0, @@ -167,7 +181,7 @@ def main() -> None: out_dir = Path(args.out_dir) out_dir.mkdir(parents=True, exist_ok=True) - tag = f"{args.source}_{args.mode}_bs{args.batch_size}_w{args.num_workers}" + tag = f"{args.source}_{args.mode}_bs{args.batch_size}_w{args.num_workers}_{args.video_decode_device}" (out_dir / f"{tag}.json").write_text(json.dumps(results, indent=2)) flat = {k: (json.dumps(v) if isinstance(v, dict) else v) for k, v in results.items()} with open(out_dir / f"{tag}.csv", "w", newline="") as f: diff --git a/src/lerobot/datasets/streaming_dataset.py b/src/lerobot/datasets/streaming_dataset.py index 9a368a957..e7a7b6bb3 100644 --- a/src/lerobot/datasets/streaming_dataset.py +++ b/src/lerobot/datasets/streaming_dataset.py @@ -262,6 +262,7 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset): world_size: int | None = None, video_decoder_cache_size: int | None = None, data_files_root: str | None = None, + video_decode_device: str = "cpu", ): """Initialize a StreamingLeRobotDataset. @@ -296,6 +297,11 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset): video frames are read from there while metadata still loads from ``repo_id`` on the Hub. Resolves through fsspec exactly like ``hf://``; use it to benchmark bucket / prewarmed-bucket sources without copying the (small) metadata. + video_decode_device (str, optional): Device for video decoding, passed to the torchcodec + ``VideoDecoder``. Defaults to ``"cpu"``. Set to ``"cuda"`` to offload H.264/H.265 decode to + the GPU's dedicated NVDEC engine (independent of the training SMs), which requires a + CUDA-enabled torchcodec build. Note: ``"cuda"`` decode inside ``DataLoader`` workers needs + the ``spawn`` start method (CUDA cannot init in forked workers). """ super().__init__() self.repo_id = repo_id @@ -319,6 +325,7 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset): self.rank, self.world_size = self._resolve_distributed(rank, world_size) self.video_decoder_cache_size = video_decoder_cache_size self.data_files_root = data_files_root.rstrip("/") if data_files_root else None + self.video_decode_device = video_decode_device # We cache the video decoders to avoid re-initializing them at each frame (avoiding a ~10x slowdown) self.video_decoder_cache = None @@ -425,12 +432,18 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset): margin so the round-robin never evicts a still-live decoder. """ if self.video_decoder_cache_size is not None: - return VideoDecoderCache(max_size=self.video_decoder_cache_size, counters=self._cache_counters) + return VideoDecoderCache( + max_size=self.video_decoder_cache_size, + counters=self._cache_counters, + device=self.video_decode_device, + ) num_cameras = len(self.meta.video_keys) if num_cameras == 0: - return VideoDecoderCache(counters=self._cache_counters) + return VideoDecoderCache(counters=self._cache_counters, device=self.video_decode_device) return VideoDecoderCache( - max_size=(num_active_shards + 1) * num_cameras, counters=self._cache_counters + max_size=(num_active_shards + 1) * num_cameras, + counters=self._cache_counters, + device=self.video_decode_device, ) # TODO(fracapuano): Implement multi-threaded prefetching to accelerate data loading. diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 1ed1b909c..c0b8ebd1d 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -242,7 +242,12 @@ class VideoDecoderCache: _SENTINEL: ClassVar[object] = object() - def __init__(self, max_size: int | None | object = _SENTINEL, counters: "torch.Tensor | None" = None): + def __init__( + self, + max_size: int | None | object = _SENTINEL, + counters: "torch.Tensor | None" = None, + device: str = "cpu", + ): if max_size is VideoDecoderCache._SENTINEL: max_size = _default_max_cache_size() if max_size is not None and max_size <= 0: @@ -250,6 +255,10 @@ class VideoDecoderCache: self.max_size: int | None = max_size # type: ignore[assignment] self._cache: OrderedDict[str, tuple[Any, Any]] = OrderedDict() self._lock = Lock() + # Decode device for the underlying torchcodec VideoDecoder. "cuda" offloads H.264/H.265 decode to + # the GPU's dedicated NVDEC engine (independent of the SMs used for training); requires a + # CUDA-enabled torchcodec/FFmpeg build. See https://developer.nvidia.com/video-codec-sdk. + self.device = device # Observability counters (cheap, updated under the lock) for benchmarking decoder reuse. self.hits = 0 self.misses = 0 @@ -289,7 +298,7 @@ class VideoDecoderCache: self._counters[1] += 1 file_handle = fsspec.open(video_path).__enter__() try: - decoder = VideoDecoder(file_handle, seek_mode="approximate") + decoder = VideoDecoder(file_handle, seek_mode="approximate", device=self.device) except Exception: file_handle.close() raise