diff --git a/benchmarks/policies/inference.py b/benchmarks/policies/inference.py new file mode 100644 index 000000000..7fccd5362 --- /dev/null +++ b/benchmarks/policies/inference.py @@ -0,0 +1,315 @@ +""" +Benchmark memory footprint and inference latency of a policy on arbitrary devices. + +This script loads a pretrained policy directly (similar to the async inference server) +and generates dummy input data based on the policy's input_features to perform +accurate benchmarking without requiring datasets. +""" + +import argparse +import os +import statistics +from datetime import datetime +from pathlib import Path + +import psutil +import torch + +from lerobot.configs.types import FeatureType +from lerobot.policies.factory import get_policy_class +from lerobot.policies.pretrained import PreTrainedPolicy + + +def bytes_to_human(n: int) -> str: + for unit in ["B", "KB", "MB", "GB", "TB"]: + if n < 1024: + return f"{n:.2f} {unit}" + n /= 1024 + return f"{n:.2f} PB" + + +def percentile(values: list[float], p: float) -> float: + if not values: + return float("nan") + k = (len(values) - 1) * (p / 100.0) + f = int(k) + c = min(f + 1, len(values) - 1) + if f == c: + return values[f] + return values[f] + (values[c] - values[f]) * (k - f) + + +def generate_dummy_observation(input_features: dict, device: str = "cpu") -> dict: + """Generate dummy observation data based on policy input features.""" + dummy_obs = {} + + for key, feature in input_features.items(): + shape = feature.shape + + if feature.type == FeatureType.VISUAL: + # Images: random values in [0, 1] range (already normalized) + dummy_obs[key] = torch.rand(shape, dtype=torch.float32, device=device) + elif feature.type in [FeatureType.STATE, FeatureType.ACTION, FeatureType.ENV]: + # State/action/env: random normal distribution + dummy_obs[key] = torch.randn(shape, dtype=torch.float32, device=device) + else: + # Default: random normal for unknown types + dummy_obs[key] = torch.randn(shape, dtype=torch.float32, device=device) + + # Add batch dimension + for key in dummy_obs: + dummy_obs[key] = dummy_obs[key].unsqueeze(0) + + # Add task string for language-conditioned policies + dummy_obs["task"] = "" + + return dummy_obs + + +def main(): + parser = argparse.ArgumentParser(description="Policy inference benchmark") + parser.add_argument( + "--policy-id", type=str, required=True, help="Model ID or local path to pretrained policy" + ) + parser.add_argument( + "--policy-type", type=str, required=True, help="Type of policy (smolvla, act, diffusion, etc.)" + ) + parser.add_argument( + "--device", type=str, default="mps", choices=["cuda", "cpu", "mps"], help="Device to run on" + ) + parser.add_argument("--seed", type=int, default=42, help="Random seed") + parser.add_argument("--num-trials", type=int, default=10, help="Number of timing trials") + parser.add_argument("--forwards-per-trial", type=int, default=10, help="Number of forwards per trial") + parser.add_argument("--warmup", type=int, default=2, help="Warmup forwards (not timed)") + parser.add_argument( + "--output-dir", type=str, default="outputs/benchmarks", help="Directory to save benchmark results" + ) + args = parser.parse_args() + + # Seed & deterministic-ish setup + torch.manual_seed(args.seed) + if args.device == "cuda": + torch.cuda.manual_seed_all(args.seed) + torch.backends.cudnn.benchmark = False + torch.backends.cudnn.deterministic = False # leave False to avoid perf cliffs + + # Resolve device availability + device = args.device.lower() + if device == "cuda" and not torch.cuda.is_available(): + print("[!] CUDA requested but unavailable. Falling back to CPU.") + device = "cpu" + elif device == "mps" and not (hasattr(torch.backends, "mps") and torch.backends.mps.is_available()): + print("[!] MPS requested but unavailable. Falling back to CPU.") + device = "cpu" + + use_cuda = device == "cuda" + + # Create output directory and log file + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + policy_name = args.policy_id.replace("/", "_").replace("\\", "_") + log_file = output_dir / f"benchmark_{args.policy_type}_{policy_name}_{device}_{timestamp}.txt" + + # Load policy directly from pretrained (similar to async inference server) + print(f"Loading policy {args.policy_type} from {args.policy_id}...") + policy_class = get_policy_class(args.policy_type) + policy: PreTrainedPolicy = policy_class.from_pretrained(args.policy_id) + policy.eval() + policy.to(device) + + print(f"Policy loaded on {device}") + print(f"Input features: {list(policy.config.input_features.keys())}") + print(f"Output features: {list(policy.config.output_features.keys())}") + + # Generate dummy observation based on policy input features + dummy_observation = generate_dummy_observation(policy.config.input_features, device) + + # Helper to sync for fair timings + def _sync(dev_=device): + if dev_ == "cuda" and torch.cuda.is_available(): + torch.cuda.synchronize() + elif dev_ == "mps" and hasattr(torch, "mps"): + try: + torch.mps.synchronize() + except AttributeError: + pass # MPS sync not available in this PyTorch version + + # Warmup (to stabilize kernels/caches) + print("Warming up...") + with torch.no_grad(): + for _ in range(args.warmup): + _ = policy.predict_action_chunk(dummy_observation) + _sync() + + # Memory footprint before timing + process = psutil.Process(os.getpid()) + rss_before = process.memory_info().rss + if use_cuda: + torch.cuda.reset_peak_memory_stats() + + # PyTorch timing with Event objects for more accurate GPU timing + print(f"Running benchmark: {args.num_trials} trials x {args.forwards_per_trial} forwards...") + + if use_cuda: + # Use CUDA Events for precise GPU timing + start_events = [] + end_events = [] + + with torch.no_grad(): + for _ in range(args.num_trials): + for _ in range(args.forwards_per_trial): + start_event = torch.cuda.Event(enable_timing=True) + end_event = torch.cuda.Event(enable_timing=True) + + start_event.record() + _ = policy.predict_action_chunk(dummy_observation) + end_event.record() + + start_events.append(start_event) + end_events.append(end_event) + + # Synchronize and collect timing results + torch.cuda.synchronize() + per_forward_ms = [] + for start_event, end_event in zip(start_events, end_events, strict=True): + per_forward_ms.append(start_event.elapsed_time(end_event)) + + else: + # Use torch.utils.benchmark for CPU/MPS timing + from torch.utils.benchmark import Timer + + def run_inference(): + return policy.predict_action_chunk(dummy_observation) + + # Collect individual timing measurements + per_forward_ms = [] + with torch.no_grad(): + for _ in range(args.num_trials): + for _ in range(args.forwards_per_trial): + timer = Timer(stmt="run_inference()", globals={"run_inference": run_inference}) + measurement = timer.timeit(1) # Single measurement + per_forward_ms.append(measurement.mean * 1000) # Convert to ms + + # Memory footprint after timing + rss_after = process.memory_info().rss + rss_delta = rss_after - rss_before + cuda_peak = torch.cuda.max_memory_allocated() if use_cuda else 0 + + # Sort timing results for percentile calculations + per_forward_ms_sorted = sorted(per_forward_ms) + + mean_ms = statistics.fmean(per_forward_ms) if per_forward_ms else float("nan") + std_ms = statistics.pstdev(per_forward_ms) if len(per_forward_ms) > 1 else 0.0 + min_ms = per_forward_ms_sorted[0] if per_forward_ms_sorted else float("nan") + max_ms = per_forward_ms_sorted[-1] if per_forward_ms_sorted else float("nan") + p50_ms = percentile(per_forward_ms_sorted, 50) + p95_ms = percentile(per_forward_ms_sorted, 95) + + # Model size + num_params = sum(p.numel() for p in policy.parameters()) + + # Prepare results for logging + results = { + "timestamp": datetime.now().isoformat(), + "policy_type": args.policy_type, + "policy_id": args.policy_id, + "device": device, + "num_trials": args.num_trials, + "forwards_per_trial": args.forwards_per_trial, + "warmup": args.warmup, + "seed": args.seed, + "num_params": num_params, + "latency_mean_ms": mean_ms, + "latency_std_ms": std_ms, + "latency_min_ms": min_ms, + "latency_max_ms": max_ms, + "latency_p50_ms": p50_ms, + "latency_p95_ms": p95_ms, + "cpu_rss_before": rss_before, + "cpu_rss_after": rss_after, + "cpu_rss_delta": rss_delta, + "cuda_peak_alloc": cuda_peak, + "input_features": list(policy.config.input_features.keys()), + "output_features": list(policy.config.output_features.keys()), + } + + # Format and write results to log file + log_content = f""" +=== LeRobot Policy Inference Benchmark === +Timestamp: {results["timestamp"]} +Policy: {results["policy_type"]} ({results["policy_id"]}) +Device: {results["device"]} +Seed: {results["seed"]} + +=== Model Information === +Parameters: {results["num_params"]:,} +Input Features: {", ".join(results["input_features"])} +Output Features: {", ".join(results["output_features"])} + +=== Benchmark Configuration === +Trials: {results["num_trials"]} +Forwards per Trial: {results["forwards_per_trial"]} +Warmup: {results["warmup"]} +Total Measurements: {len(per_forward_ms)} + +=== Latency Results (ms) === +Mean: {results["latency_mean_ms"]:.3f} +Std Dev: {results["latency_std_ms"]:.3f} +Min: {results["latency_min_ms"]:.3f} +Max: {results["latency_max_ms"]:.3f} +P50: {results["latency_p50_ms"]:.3f} +P95: {results["latency_p95_ms"]:.3f} + +=== Memory Footprint === +CPU RSS Before: {bytes_to_human(results["cpu_rss_before"])} +CPU RSS After: {bytes_to_human(results["cpu_rss_after"])} (Δ {bytes_to_human(results["cpu_rss_delta"])}) +""" + + if use_cuda: + log_content += f"CUDA Peak: {bytes_to_human(results['cuda_peak_alloc'])} (reset before timing)\n" + + log_content += f""" +=== Raw Timing Data (first 20 measurements, ms) === +{", ".join(f"{t:.3f}" for t in per_forward_ms[:20])} +{"..." if len(per_forward_ms) > 20 else ""} + +=== Summary Statistics === +Timing Method: {"CUDA Events" if use_cuda else "torch.utils.benchmark.Timer"} +Device Available: {torch.cuda.is_available() if device == "cuda" else torch.backends.mps.is_available() if device == "mps" else True} +PyTorch Version: {torch.__version__} + +Benchmark completed successfully at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} +""" + + # Write to log file + with open(log_file, "w") as f: + f.write(log_content) + + # Print to console (shorter version) + print("\n=== Inference Benchmark Results ===") + print(f"Policy: {args.policy_type} ({args.policy_id})") + print(f"Device: {device}") + print(f"Trials: {args.num_trials} | Forwards/Trial: {args.forwards_per_trial} | Warmup: {args.warmup}") + print(f"Model params: {num_params:,}") + + print("\nLatency per forward (ms):") + print(f" mean: {mean_ms:.3f} std: {std_ms:.3f}") + print(f" min: {min_ms:.3f} max: {max_ms:.3f}") + print(f" p50: {p50_ms:.3f} p95: {p95_ms:.3f}") + + print("\nMemory footprint:") + print(f" CPU RSS before: {bytes_to_human(rss_before)}") + print(f" CPU RSS after : {bytes_to_human(rss_after)} (Δ {bytes_to_human(rss_delta)})") + if use_cuda: + print( + f" CUDA peak allocated: {bytes_to_human(cuda_peak)} " + f"(reset by reset_peak_memory_stats before timing)" + ) + + print(f"\nResults saved to: {log_file}") + print("Benchmark completed successfully!") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/video/smolvla_bench.py b/benchmarks/video/smolvla_bench.py deleted file mode 100644 index 1cec8f316..000000000 --- a/benchmarks/video/smolvla_bench.py +++ /dev/null @@ -1,194 +0,0 @@ -#!/usr/bin/env python -""" -Minimal Policy inference + benchmarking. - -Features: -- End-to-end pipeline: dataset -> pre/post-processors -> policy.select_action -- Latency benchmarking with warmup, N trials, and M forwards/trial -- Reports mean/std/min/max and p50/p95 latencies (ms) per forward -- CPU RSS and CUDA (peak) memory footprint -- Works on CPU or CUDA; syncs properly for fair GPU timings - -Example: - python smolvla_bench.py \ - --repo_id AdilZtn/grab_red_cube_test_25 --episode 0 --sample_index 10 \ - --device cuda --num_trials 100 --forwards_per_trial 10 --warmup 20 -""" - -import argparse -import os -import statistics -import time -from typing import List - -import torch -import psutil - -from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata -from lerobot.policies.factory import make_policy, make_policy_config -from lerobot.policies.pretrained import PreTrainedPolicy -from lerobot.policies.factory import make_pre_post_processors - - -def bytes_to_human(n: int) -> str: - for unit in ["B", "KB", "MB", "GB", "TB"]: - if n < 1024: - return f"{n:.2f} {unit}" - n /= 1024 - return f"{n:.2f} PB" - - -def percentile(values: List[float], p: float) -> float: - if not values: - return float("nan") - k = (len(values) - 1) * (p / 100.0) - f = int(k) - c = min(f + 1, len(values) - 1) - if f == c: - return values[f] - return values[f] + (values[c] - values[f]) * (k - f) - - -def main(): - parser = argparse.ArgumentParser(description="SmolVLA inference + latency benchmark") - parser.add_argument("--repo_id", type=str, default="AdilZtn/grab_red_cube_test_25", - help="HF dataset repo_id with language instructions") - parser.add_argument("--episode", type=int, default=0, help="Episode index to load") - parser.add_argument("--sample_index", type=int, default=10, help="Sample index in the episode") - parser.add_argument("--device", type=str, default="cuda", choices=["cuda", "cpu"], help="Device to run on") - parser.add_argument("--seed", type=int, default=42, help="Random seed") - parser.add_argument("--n_obs_steps", type=int, default=1, help="Obs steps for SmolVLA") - parser.add_argument("--n_action_steps", type=int, default=50, help="Action steps for SmolVLA") - parser.add_argument("--chunk_size", type=int, default=50, help="Chunk size for SmolVLA") - parser.add_argument("--num_trials", type=int, default=100, help="Number of timing trials") - parser.add_argument("--forwards_per_trial", type=int, default=1, help="Number of forwards per trial") - parser.add_argument("--warmup", type=int, default=20, help="Warmup forwards (not timed)") - parser.add_argument("--print_each_trial", action="store_true", help="Print each trial's aggregate time") - parser.add_argument("--policy_type", type=str, default="smolvla", help="Type of policy to benchmark") - args = parser.parse_args() - - # Seed & deterministic-ish setup - torch.manual_seed(args.seed) - torch.cuda.manual_seed_all(args.seed) - torch.backends.cudnn.benchmark = False - torch.backends.cudnn.deterministic = False # leave False to avoid perf cliffs - - # Device - use_cuda = args.device == "cuda" and torch.cuda.is_available() - device = "cuda" if use_cuda else "cpu" - if args.device == "cuda" and not use_cuda: - print("[!] CUDA requested but unavailable. Falling back to CPU.") - - # Load dataset metadata - ds_meta = LeRobotDatasetMetadata(args.repo_id) - - # Policy config & creation - cfg = make_policy_config( - args.policy_type, - n_obs_steps=args.n_obs_steps, - chunk_size=args.chunk_size, # comment this if policy_type = "diffusion" - n_action_steps=args.n_action_steps, - device=device, - ) - - policy: PreTrainedPolicy = make_policy(cfg, ds_meta=ds_meta) - policy.eval() - policy.to(device) - - # Pre/post processors - preprocessor, postprocessor = make_pre_post_processors(cfg, dataset_stats=ds_meta.stats) - - # Dataset sample - dataset = LeRobotDataset(args.repo_id, episodes=[args.episode]) - sample = dataset[args.sample_index] - - # Preprocess once; we will reuse the same batch for all forwards (typical for latency bench) - preprocessed_batch = preprocessor(sample) - - # Helper to sync for fair timings - def _sync(): - if use_cuda: - torch.cuda.synchronize() - - # Warmup (to stabilize kernels/caches) - with torch.no_grad(): - for _ in range(args.warmup): - _ = policy.select_action(preprocessed_batch) - _sync() - - # Memory footprint before timing - process = psutil.Process(os.getpid()) - rss_before = process.memory_info().rss - if use_cuda: - torch.cuda.reset_peak_memory_stats() - - # Timing - trial_times_sec: List[float] = [] - - with torch.no_grad(): - for t in range(args.num_trials): - _sync() - t0 = time.perf_counter() - for _ in range(args.forwards_per_trial): - _ = policy.select_action(preprocessed_batch) - _sync() - t1 = time.perf_counter() - trial_dur = t1 - t0 - trial_times_sec.append(trial_dur) - if args.print_each_trial: - print(f"[trial {t+1:03d}] total {trial_dur*1000:.3f} ms " - f"({(trial_dur/args.forwards_per_trial)*1000:.3f} ms/forward)") - - # Memory footprint after timing - rss_after = process.memory_info().rss - rss_delta = rss_after - rss_before - cuda_peak = torch.cuda.max_memory_allocated() if use_cuda else 0 - - # Do a single real inference and postprocess to verify everything still works - with torch.no_grad(): - action = policy.select_action(preprocessed_batch) - postprocessed_action = postprocessor(action) - - # Summaries - # Per-forward latencies in ms - per_forward_ms = [(d / args.forwards_per_trial) * 1000.0 for d in trial_times_sec] - per_forward_ms_sorted = sorted(per_forward_ms) - - mean_ms = statistics.fmean(per_forward_ms) if per_forward_ms else float("nan") - std_ms = statistics.pstdev(per_forward_ms) if len(per_forward_ms) > 1 else 0.0 - min_ms = per_forward_ms_sorted[0] if per_forward_ms_sorted else float("nan") - max_ms = per_forward_ms_sorted[-1] if per_forward_ms_sorted else float("nan") - p50_ms = percentile(per_forward_ms_sorted, 50) - p95_ms = percentile(per_forward_ms_sorted, 95) - - # Model size - num_params = sum(p.numel() for p in policy.parameters()) - - print("\n=== Inference Benchmark for ===", args.policy_type) - print(f"Device: {device}") - print(f"Trials: {args.num_trials} | Forwards/Trial: {args.forwards_per_trial} | Warmup: {args.warmup}") - print(f"Model params: {num_params:,}") - - print("\nLatency per forward (ms):") - print(f" mean: {mean_ms:.3f} std: {std_ms:.3f}") - print(f" min: {min_ms:.3f} max: {max_ms:.3f}") - print(f" p50: {p50_ms:.3f} p95: {p95_ms:.3f}") - - print("\nMemory footprint:") - print(f" CPU RSS before: {bytes_to_human(rss_before)}") - print(f" CPU RSS after : {bytes_to_human(rss_after)} (Δ {bytes_to_human(rss_delta)})") - if use_cuda: - print(f" CUDA peak allocated: {bytes_to_human(cuda_peak)} " - f"(reset by reset_peak_memory_stats before timing)") - - # Quick shape dump from this run - try: - print("\nAction shapes:") - print(f" raw: {tuple(action.shape)}") - print(f" postprocessed: {tuple(postprocessed_action.shape)}") - except Exception: - pass - - -if __name__ == "__main__": - main()