diff --git a/benchmarks/streaming/README.md b/benchmarks/streaming/README.md new file mode 100644 index 000000000..5f8d048ed --- /dev/null +++ b/benchmarks/streaming/README.md @@ -0,0 +1,53 @@ +# Streaming dataloading benchmark + +Measures **dataloading only** (no model) for `StreamingLeRobotDataset`: parquet read + video decode + +delta windowing + shuffle. A dummy consumer pulls batches and moves them to the device, so the numbers +isolate the data pipeline. Use it to compare sources (Hub vs. storage bucket vs. prewarmed bucket), +frame modes, and node counts, and to catch p95/p99 video-decode regressions. + +## Run + +```bash +python benchmarks/streaming/benchmark_streaming.py \ + --repo_id pepijn223/robocasa_pretrain_human300_v4 \ + --mode sarm --batch_size 64 --num_workers 12 --num_batches 200 \ + --source hub --out_dir benchmarks/streaming/results +``` + +Multinode (per-node throughput) goes through Accelerate under SLURM: + +```bash +sbatch slurm/benchmark_streaming_robocasa.sh +``` + +## Matrix + +| Axis | Values | +| ---------- | -------------------------------------------------------------------------------------------------------------------- | +| Source | `hub` (verify now), `bucket`, `warmed_bucket` (bucket + prewarming; with user's help later) | +| Baseline | current `main` `StreamingLeRobotDataset` on Hub streaming | +| Nodes | 1 and 2 (per-node throughput should be independent) | +| Frame mode | `single` (1 frame, all cameras; target ≥ 120 frames/s/node) · `sarm` (8 steps spaced 1s; target ≥ 320 frames/s/node) | + +`--source` is a label only; the actual source is whatever `--repo_id` / `--root` point at. + +## Metrics emitted (JSON + CSV) + +`frames_per_s_node`, `samples_per_s`, `first_batch_latency_s`, `p50/p95/p99_sample_latency_ms`, +`wallclock_s`, and `video_decoder_cache` (`hits`, `misses`, `evictions`, `hit_rate`, `size`). A low +cache `hit_rate` with high `p99` is the decoder-thrash signature — raise `--video_decoder_cache_size` +or `--buffer_size`, or reduce `num_workers`. + +## Bucket sources & prewarming (manual) + +Prewarming is a **server-side** Hugging Face storage-bucket feature — there is no client script. To +benchmark the `warmed_bucket` source: + +1. Attach a storage bucket to the dataset and enable it (see + ). Buckets resolve through `fsspec`, the same as + `hf://`, so no code change is needed — point `--repo_id`/`--revision` (or `--root`) at the bucket. +2. Enable **prewarming** in the bucket settings and wait for warm-up to complete. +3. Run the benchmark with `--source warmed_bucket`. Compare against the cold `--source bucket` and the + `--source hub` baseline. + +Manual only — not run in CI. diff --git a/benchmarks/streaming/benchmark_streaming.py b/benchmarks/streaming/benchmark_streaming.py new file mode 100644 index 000000000..187b1770e --- /dev/null +++ b/benchmarks/streaming/benchmark_streaming.py @@ -0,0 +1,169 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dataloading-only benchmark for StreamingLeRobotDataset. + +A dummy consumer pulls batches and moves them to the device; no model runs, so the numbers isolate the +data pipeline (parquet read + video decode + delta windowing + shuffle). Reports per-node throughput and +sample-latency percentiles, plus video-decoder-cache reuse stats, and emits JSON + CSV. + +Frame modes (matching the streaming design targets): + - ``single``: one frame, all cameras (target >= 120 frames/s/node). + - ``sarm``: an 8-step window spaced 1s (delta over 8s) (target >= 320 frames/s/node). + +Example (stream from the Hub, single node): + + python benchmarks/streaming/benchmark_streaming.py \ + --repo_id pepijn223/robocasa_pretrain_human300_v4 --mode sarm \ + --batch_size 64 --num_workers 12 --num_batches 200 --out_dir benchmarks/streaming/results + +Distributed / multinode runs go through Accelerate; see ``slurm/benchmark_streaming_robocasa.sh``. Set +``--source`` purely for labeling the output (``hub`` / ``bucket`` / ``warmed_bucket``); the actual source +is whatever ``--repo_id``/``--root`` point at. See the README for bucket prewarming. +""" + +import argparse +import csv +import json +import statistics +import time +from pathlib import Path + +import torch +from torch.utils.data import DataLoader + +from lerobot.datasets import LeRobotDatasetMetadata, StreamingLeRobotDataset +from lerobot.utils.constants import ACTION + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--repo_id", type=str, required=True) + parser.add_argument("--root", type=str, default=None, help="Local/prewarmed root (else stream from Hub).") + parser.add_argument("--mode", choices=["single", "sarm"], default="single") + parser.add_argument("--source", type=str, default="hub", help="Label only: hub | bucket | warmed_bucket.") + parser.add_argument("--batch_size", type=int, default=64) + parser.add_argument("--num_workers", type=int, default=8) + parser.add_argument("--buffer_size", type=int, default=2000) + parser.add_argument("--video_decoder_cache_size", type=int, default=None) + parser.add_argument("--num_batches", type=int, default=200) + parser.add_argument("--warmup_batches", type=int, default=5, help="Excluded from steady-state stats.") + parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu") + parser.add_argument("--out_dir", type=str, default="benchmarks/streaming/results") + return parser.parse_args() + + +def build_dataset(args: argparse.Namespace, meta: LeRobotDatasetMetadata) -> StreamingLeRobotDataset: + # sarm: an 8-step window spaced 1s => an 8s delta window (the SARM stress case). + delta_timestamps = {ACTION: [float(t) for t in range(8)]} if args.mode == "sarm" else None + return StreamingLeRobotDataset( + args.repo_id, + root=args.root, + delta_timestamps=delta_timestamps, + buffer_size=args.buffer_size, + video_decoder_cache_size=args.video_decoder_cache_size, + tolerance_s=1e-3, + ) + + +def percentile(values: list[float], pct: float) -> float: + if not values: + return float("nan") + ordered = sorted(values) + k = max(0, min(len(ordered) - 1, int(round((pct / 100.0) * (len(ordered) - 1))))) + return ordered[k] + + +def main() -> None: + args = parse_args() + device = torch.device(args.device) + meta = LeRobotDatasetMetadata(args.repo_id, root=args.root) + dataset = build_dataset(args, meta) + + loader = DataLoader( + dataset, + batch_size=args.batch_size, + num_workers=args.num_workers, + pin_memory=device.type == "cuda", + drop_last=True, + prefetch_factor=2 if args.num_workers > 0 else None, + ) + + sample_latencies_ms: list[float] = [] + frames = 0 + first_batch_latency_s = None + + t_start = time.perf_counter() + t_prev = t_start + for i, batch in enumerate(loader): + # Dummy consume: move tensors to the device, mimicking what a real trainer would do. + for value in batch.values(): + if torch.is_tensor(value): + value.to(device, non_blocking=device.type == "cuda") + now = time.perf_counter() + if first_batch_latency_s is None: + first_batch_latency_s = now - t_start + + if i >= args.warmup_batches: + per_sample_ms = (now - t_prev) / args.batch_size * 1000.0 + sample_latencies_ms.append(per_sample_ms) + frames += args.batch_size + t_prev = now + if i + 1 >= args.num_batches: + break + + elapsed = time.perf_counter() - t_start + steady_elapsed_s = sum(sample_latencies_ms) / 1000.0 + cache_stats = dataset.video_decoder_cache.stats() if dataset.video_decoder_cache is not None else {} + + results = { + "repo_id": args.repo_id, + "source": args.source, + "mode": args.mode, + "batch_size": args.batch_size, + "num_workers": args.num_workers, + "buffer_size": args.buffer_size, + "num_cameras": len(meta.video_keys), + "fps": meta.fps, + "device": str(device), + "frames_measured": frames, + "first_batch_latency_s": round(first_batch_latency_s or float("nan"), 4), + "frames_per_s_node": round(frames / steady_elapsed_s, 2) if steady_elapsed_s else 0.0, + "samples_per_s": round(frames / steady_elapsed_s, 2) if steady_elapsed_s else 0.0, + "p50_sample_latency_ms": round(statistics.median(sample_latencies_ms), 3) + if sample_latencies_ms + else None, + "p95_sample_latency_ms": round(percentile(sample_latencies_ms, 95), 3), + "p99_sample_latency_ms": round(percentile(sample_latencies_ms, 99), 3), + "wallclock_s": round(elapsed, 2), + "video_decoder_cache": cache_stats, + } + + out_dir = Path(args.out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + tag = f"{args.source}_{args.mode}_bs{args.batch_size}_w{args.num_workers}" + (out_dir / f"{tag}.json").write_text(json.dumps(results, indent=2)) + flat = {k: (json.dumps(v) if isinstance(v, dict) else v) for k, v in results.items()} + with open(out_dir / f"{tag}.csv", "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=list(flat)) + writer.writeheader() + writer.writerow(flat) + + print("Command config:", vars(args)) + print(json.dumps(results, indent=2)) + print(f"Wrote {out_dir / tag}.json and .csv") + + +if __name__ == "__main__": + main() diff --git a/examples/scaling/train_streaming_multinode.py b/examples/scaling/train_streaming_multinode.py new file mode 100644 index 000000000..f9e49b02d --- /dev/null +++ b/examples/scaling/train_streaming_multinode.py @@ -0,0 +1,169 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Distributed, resumable streaming training on a large HF-hosted dataset. + +This example shows how to train (or just stress the data pipeline) over a multi-TB dataset that never +touches local disk, scaling across GPUs and nodes with Accelerate. It demonstrates the large-scale +streaming features of :class:`StreamingLeRobotDataset`: + +- per-rank sharding via ``split_dataset_by_node`` (each GPU streams disjoint data; ``rank``/``world_size`` + are auto-resolved from the Accelerate state, so nothing needs to be passed explicitly); +- DataLoader-worker shard splitting (no duplicate frames within a rank); +- resumable streaming via ``dataset.state_dict()`` / ``load_state_dict()`` saved into the checkpoint; +- an explicit video-decoder cache size so the working set of open decoders does not thrash. + +Launch with Accelerate (single node, N GPUs): + + accelerate launch --num_processes=8 examples/scaling/train_streaming_multinode.py \ + --repo_id=pepijn223/robocasa_pretrain_human300_v4 --batch_size=64 + +Multinode runs use the same script under SLURM; see ``slurm/train_streaming_robocasa.sh``. + +Pass ``--dummy`` to skip the model entirely and measure pure dataloading throughput. +""" + +import argparse +import time +from pathlib import Path + +import torch +from accelerate import Accelerator +from torch.utils.data import DataLoader + +from lerobot.datasets import LeRobotDatasetMetadata, StreamingLeRobotDataset +from lerobot.utils.constants import ACTION + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--repo_id", type=str, default="lerobot/droid_1.0.1") + parser.add_argument( + "--root", type=str, default=None, help="Local/prewarmed dataset root (else stream from Hub)." + ) + parser.add_argument("--output_dir", type=str, default="outputs/train/streaming_multinode") + parser.add_argument("--steps", type=int, default=1000) + parser.add_argument("--batch_size", type=int, default=64, help="Per-process batch size.") + parser.add_argument("--num_workers", type=int, default=8) + parser.add_argument( + "--buffer_size", type=int, default=2000, help="Output shuffle-buffer size, in frames." + ) + parser.add_argument("--video_decoder_cache_size", type=int, default=None) + parser.add_argument("--n_action_steps", type=int, default=16, help="Action-chunk length (delta horizon).") + parser.add_argument("--save_freq", type=int, default=200) + parser.add_argument("--log_freq", type=int, default=20) + parser.add_argument("--resume_from", type=str, default=None, help="Checkpoint dir to resume from.") + parser.add_argument("--dummy", action="store_true", help="Skip the model; measure dataloading only.") + return parser.parse_args() + + +def make_dataloader( + args: argparse.Namespace, meta: LeRobotDatasetMetadata +) -> tuple[DataLoader, StreamingLeRobotDataset]: + # Supervise an action chunk; delta_timestamps drive the SARM-style temporal window. + delta_timestamps = {ACTION: [t / meta.fps for t in range(args.n_action_steps)]} + # rank / world_size are resolved automatically from the Accelerate state inside the dataset. + dataset = StreamingLeRobotDataset( + args.repo_id, + root=args.root, + delta_timestamps=delta_timestamps, + buffer_size=args.buffer_size, + video_decoder_cache_size=args.video_decoder_cache_size, + tolerance_s=1e-3, + ) + loader = DataLoader( + dataset, + batch_size=args.batch_size, + num_workers=args.num_workers, + pin_memory=True, + drop_last=True, + prefetch_factor=2 if args.num_workers > 0 else None, + ) + return loader, dataset + + +def main() -> None: + args = parse_args() + accelerator = Accelerator() + output_dir = Path(args.output_dir) + if accelerator.is_main_process: + output_dir.mkdir(parents=True, exist_ok=True) + + meta = LeRobotDatasetMetadata(args.repo_id, root=args.root) + loader, dataset = make_dataloader(args, meta) + + if args.dummy: + model = optimizer = None + else: + from lerobot.policies.act import ACTConfig, ACTPolicy + from lerobot.utils.feature_utils import dataset_to_policy_features + + features = dataset_to_policy_features(meta.features) + output_features = {k: ft for k, ft in features.items() if k == ACTION} + input_features = {k: ft for k, ft in features.items() if k not in output_features} + cfg = ACTConfig(input_features=input_features, output_features=output_features) + model = ACTPolicy(cfg) + optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) + model, optimizer, loader = accelerator.prepare(model, optimizer, loader) + + # Resume: restore the dataset's stream position so we don't replay already-seen data. The state holds + # plain HF stream dicts + RNG state (not tensors), so weights_only=False is required; the file is a + # checkpoint this script wrote itself. + if args.resume_from is not None: + state = torch.load(Path(args.resume_from) / "dataset_state.pt", weights_only=False) # nosec B614 + dataset.load_state_dict(state) + accelerator.print(f"Resumed dataset stream from {args.resume_from}") + + step = 0 + frames_seen = 0 + window_start = time.perf_counter() + done = False + while not done: + for batch in loader: + if model is not None: + batch = {k: (v.to(accelerator.device) if torch.is_tensor(v) else v) for k, v in batch.items()} + loss, _ = model.forward(batch) + accelerator.backward(loss) + optimizer.step() + optimizer.zero_grad() + + step += 1 + frames_seen += args.batch_size + if step % args.log_freq == 0: + elapsed = time.perf_counter() - window_start + fps_per_proc = (args.log_freq * args.batch_size) / max(elapsed, 1e-9) + total_fps = fps_per_proc * accelerator.num_processes + accelerator.print( + f"step {step} | {fps_per_proc:.1f} frames/s/proc | {total_fps:.1f} frames/s total" + + ("" if model is None else f" | loss {loss.item():.3f}") + ) + window_start = time.perf_counter() + + if step % args.save_freq == 0 and accelerator.is_main_process: + ckpt = output_dir / f"checkpoint-{step}" + ckpt.mkdir(parents=True, exist_ok=True) + # Save the dataset stream position alongside the model so a restart resumes mid-stream. + torch.save(dataset.state_dict(), ckpt / "dataset_state.pt") + if model is not None: + accelerator.unwrap_model(model).save_pretrained(ckpt) + + if step >= args.steps: + done = True + break + + accelerator.print(f"End of training: {step} steps, ~{frames_seen} frames/proc") + + +if __name__ == "__main__": + main() diff --git a/slurm/benchmark_streaming_robocasa.sh b/slurm/benchmark_streaming_robocasa.sh new file mode 100644 index 000000000..0ee150dcd --- /dev/null +++ b/slurm/benchmark_streaming_robocasa.sh @@ -0,0 +1,40 @@ +#!/bin/bash +#SBATCH --job-name=bench_stream +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=1 +#SBATCH --gpus-per-node=8 +#SBATCH --cpus-per-task=96 +#SBATCH --exclusive +#SBATCH --time=02:00:00 +#SBATCH --output=logs/%x-%j.out + +# Per-node dataloading benchmark for StreamingLeRobotDataset across 1-2 nodes. Each node runs an +# independent dummy-consumer benchmark; per-node throughput should be independent (separate network). +# Results are written per (node, source, mode) under --out_dir. +# +# Submit with: sbatch slurm/benchmark_streaming_robocasa.sh +# Override the source label for cold/warm bucket runs: SOURCE=warmed_bucket sbatch slurm/benchmark_streaming_robocasa.sh + +set -euo pipefail + +REPO_ID=${REPO_ID:-pepijn223/robocasa_pretrain_human300_v4} +SOURCE=${SOURCE:-hub} +OUT_DIR=${OUT_DIR:-benchmarks/streaming/results} + +export HF_HOME=${HF_HOME:-$SCRATCH/hf_home} +export TOKENIZERS_PARALLELISM=false + +# One benchmark process per node (each saturates the node's DataLoader workers + network independently). +srun --kill-on-bad-exit=1 bash -c ' +for MODE in single sarm; do + python benchmarks/streaming/benchmark_streaming.py \ + --repo_id '"$REPO_ID"' \ + --source '"$SOURCE"' \ + --mode $MODE \ + --batch_size 64 \ + --num_workers 12 \ + --buffer_size 4000 \ + --num_batches 300 \ + --out_dir '"$OUT_DIR"'/node${SLURM_NODEID} +done +' diff --git a/slurm/train_streaming_robocasa.sh b/slurm/train_streaming_robocasa.sh new file mode 100644 index 000000000..31cfe2f4b --- /dev/null +++ b/slurm/train_streaming_robocasa.sh @@ -0,0 +1,49 @@ +#!/bin/bash +#SBATCH --job-name=stream_robocasa +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=1 +#SBATCH --gpus-per-node=8 +#SBATCH --cpus-per-task=96 +#SBATCH --exclusive +#SBATCH --time=24:00:00 +#SBATCH --output=logs/%x-%j.out + +# Multinode streaming training over a large HF-hosted RoboCasa dataset (never touches local disk). +# Launches examples/scaling/train_streaming_multinode.py with Accelerate. Each rank streams a disjoint +# set of shards via split_dataset_by_node (auto-resolved from the Accelerate state), so per-node +# throughput scales independently. For an even split, ensure n_shards % (nodes * gpus_per_node) == 0. +# +# Submit with: sbatch slurm/train_streaming_robocasa.sh + +set -euo pipefail + +REPO_ID=${REPO_ID:-pepijn223/robocasa_pretrain_human300_v4} +GPUS_PER_NODE=8 +NUM_PROCESSES=$((SLURM_NNODES * GPUS_PER_NODE)) + +# Rendezvous: use the first node in the allocation as the main process. +MAIN_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n1) +MAIN_PORT=${MAIN_PORT:-29500} + +export HF_HOME=${HF_HOME:-$SCRATCH/hf_home} +# Avoid each rank fighting over the tokenizers' internal thread pool. +export TOKENIZERS_PARALLELISM=false + +srun --kill-on-bad-exit=1 bash -c ' +accelerate launch \ + --num_machines '"$SLURM_NNODES"' \ + --num_processes '"$NUM_PROCESSES"' \ + --machine_rank $SLURM_NODEID \ + --main_process_ip '"$MAIN_ADDR"' \ + --main_process_port '"$MAIN_PORT"' \ + --mixed_precision bf16 \ + --dynamo_backend no \ + examples/scaling/train_streaming_multinode.py \ + --repo_id '"$REPO_ID"' \ + --batch_size 64 \ + --num_workers 12 \ + --buffer_size 4000 \ + --steps 200000 \ + --save_freq 2000 \ + --log_freq 50 +' diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 84ab56e08..a56015d4f 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -250,6 +250,10 @@ class VideoDecoderCache: self.max_size: int | None = max_size # type: ignore[assignment] self._cache: OrderedDict[str, tuple[Any, Any]] = OrderedDict() self._lock = Lock() + # Observability counters (cheap, updated under the lock) for benchmarking decoder reuse. + self.hits = 0 + self.misses = 0 + self.evictions = 0 def __contains__(self, video_path: object) -> bool: with self._lock: @@ -271,8 +275,10 @@ class VideoDecoderCache: entry = self._cache.get(video_path) if entry is not None: self._cache.move_to_end(video_path) + self.hits += 1 return entry[0] + self.misses += 1 file_handle = fsspec.open(video_path).__enter__() try: decoder = VideoDecoder(file_handle, seek_mode="approximate") @@ -287,6 +293,7 @@ class VideoDecoderCache: if self.max_size is not None: while len(self._cache) > self.max_size: _evicted_path, (_evicted_decoder, evicted_handle) = self._cache.popitem(last=False) + self.evictions += 1 with contextlib.suppress(Exception): evicted_handle.close() @@ -305,6 +312,18 @@ class VideoDecoderCache: with self._lock: return len(self._cache) + def stats(self) -> dict[str, int | float]: + """Return reuse counters (hits/misses/evictions, hit rate, current size) for benchmarking.""" + with self._lock: + total = self.hits + self.misses + return { + "hits": self.hits, + "misses": self.misses, + "evictions": self.evictions, + "hit_rate": self.hits / total if total else 0.0, + "size": len(self._cache), + } + class FrameTimestampError(ValueError): """Helper error to indicate the retrieved timestamps exceed the queried ones""" diff --git a/tests/datasets/test_streaming_distributed.py b/tests/datasets/test_streaming_distributed.py new file mode 100644 index 000000000..10ffc5dca --- /dev/null +++ b/tests/datasets/test_streaming_distributed.py @@ -0,0 +1,100 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""End-to-end distributed streaming smoke test under a real `accelerate launch`. + +Mirrors tests/training/test_multi_gpu.py but runs on CPU and only checks the dataloading contract: with +two processes, `split_dataset_by_node` (auto-resolved from the Accelerate state) must give each rank a +disjoint set of frames that together cover the dataset. Skips if the environment can't actually spawn +>= 2 processes (e.g. local macOS multi-CPU), so it never silently passes as a single process. +""" + +import json +import shutil +import subprocess +import sys + +import pytest + +pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])") +pytest.importorskip("accelerate", reason="accelerate is required (install lerobot[training])") + +from tests.fixtures.constants import DUMMY_REPO_ID + +WORKER = """ +import json, sys +from accelerate import PartialState +from lerobot.datasets.streaming_dataset import StreamingLeRobotDataset + +root, repo_id, out_dir = sys.argv[1], sys.argv[2], sys.argv[3] +state = PartialState() +ds = StreamingLeRobotDataset( + repo_id=repo_id, root=root, shuffle=False, buffer_size=8, max_num_shards=8 +) +indices = [int(frame["index"]) for frame in ds] +payload = {"rank": state.process_index, "world": state.num_processes, "indices": indices} +with open(f"{out_dir}/rank_{state.process_index}.json", "w") as f: + json.dump(payload, f) +""" + + +@pytest.mark.skipif(shutil.which("accelerate") is None, reason="accelerate CLI not available") +def test_accelerate_launch_ranks_are_disjoint(tmp_path, lerobot_dataset_factory): + total_frames = 160 + repo_id = f"{DUMMY_REPO_ID}-acc" + root = tmp_path / "ds" + lerobot_dataset_factory( + root=root, + repo_id=repo_id, + total_episodes=8, + total_frames=total_frames, + use_videos=False, + data_files_size_in_mb=0.001, + chunks_size=1, + ) + + worker = tmp_path / "worker.py" + worker.write_text(WORKER) + out_dir = tmp_path / "out" + out_dir.mkdir() + + cmd = [ + "accelerate", + "launch", + "--num_processes=2", + "--num_machines=1", + "--mixed_precision=no", + "--dynamo_backend=no", + "--cpu", + str(worker), + str(root), + repo_id, + str(out_dir), + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) + assert result.returncode == 0, ( + f"accelerate launch failed:\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" + ) + + payloads = [json.loads(p.read_text()) for p in sorted(out_dir.glob("rank_*.json"))] + if len(payloads) < 2 or any(p["world"] < 2 for p in payloads): + pytest.skip("environment did not spawn >= 2 distributed processes (e.g. local macOS multi-CPU)") + + rank_sets = [set(p["indices"]) for p in payloads] + assert rank_sets[0].isdisjoint(rank_sets[1]), "ranks streamed overlapping frames under accelerate launch" + assert set().union(*rank_sets) == set(range(total_frames)), "ranks did not jointly cover all frames" + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-v"])) diff --git a/tests/datasets/test_streaming_native.py b/tests/datasets/test_streaming_native.py index cd0317317..fc3d35153 100644 --- a/tests/datasets/test_streaming_native.py +++ b/tests/datasets/test_streaming_native.py @@ -115,8 +115,12 @@ def test_sarm_window_covers_long_horizon_without_padding(tmp_path, lerobot_datas SARM uses a window of 8 steps spaced 1s (~160 frames @ fps20). Here fps=30, so +5s = 150 frames > 100. """ repo_id = f"{DUMMY_REPO_ID}-sarm" - # Two episodes of 200 frames each -> a +150-frame lookahead stays inside an episode for early frames. - _make_local_dataset(lerobot_dataset_factory, tmp_path / "ds", repo_id, total_episodes=2, total_frames=400) + # A single long episode so a +150-frame lookahead is unambiguously inside the episode (the fixture + # gives episodes variable lengths, so multi-episode boundaries can't be assumed). + episode_frames = 300 + _make_local_dataset( + lerobot_dataset_factory, tmp_path / "ds", repo_id, total_episodes=1, total_frames=episode_frames + ) horizon_s = 5.0 # 150 frames @ fps30, well beyond LOOKAHEAD_BACKTRACKTABLE=100 delta_timestamps = {ACTION: [0.0, horizon_s]} @@ -130,11 +134,12 @@ def test_sarm_window_covers_long_horizon_without_padding(tmp_path, lerobot_datas ) horizon_frames = int(round(horizon_s * ds.fps)) + assert horizon_frames > 100, "test must exceed the old LOOKAHEAD_BACKTRACKTABLE ceiling" checked = 0 for frame in ds: idx = int(frame["index"]) - # Only assert on frames whose +horizon target is still inside the same episode. - if int(frame["episode_index"]) == 0 and idx + horizon_frames < 200: + # The +horizon target is inside the single episode -> it must be a real frame, not padding. + if idx + horizon_frames < episode_frames: assert not bool(frame[f"{ACTION}_is_pad"][-1]), ( f"frame {idx}: +{horizon_frames} target was padded; long delta window did not reach it" )