Compare commits

...

1 Commits

Author SHA1 Message Date
Francesco Capuano 1de2a4a828 add: profiler for streaming performance 2025-09-18 17:28:14 +02:00
+234
View File
@@ -0,0 +1,234 @@
import argparse
import datetime
import os
import time
from collections.abc import Callable
import numpy as np
import pandas as pd
from tqdm import tqdm
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.streaming_dataset import StreamingLeRobotDataset
def profile_throughput_indexed(
dataset: LeRobotDataset, num_samples: int, warmup_iters: int = 3
) -> np.ndarray:
"""Measure per-item access time on an indexable LeRobotDataset.
Accesses dataset[i % len(dataset)] for ``num_samples`` iterations, with an initial warmup.
"""
next_times = np.zeros(num_samples)
total = len(dataset)
# warmup
for k in range(warmup_iters):
_ = dataset[k % total]
for j in tqdm(range(num_samples), desc="Profiling dataset throughput", unit="item"):
start_time = time.perf_counter()
_ = dataset[j % total]
end_time = time.perf_counter()
next_times[j] = end_time - start_time
return next_times
def profile_throughput(
dataset: StreamingLeRobotDataset, num_samples: int, warmup_iters: int = 3
) -> np.ndarray:
"""Measure ``.next()`` call latency on a streaming dataset.
Performs a configurable warmup. This does not numerically "normalize" times; it simply
avoids including initialization overhead in the timing window.
"""
next_times = np.zeros(num_samples)
iter_dataset = iter(dataset)
# warmup
for _ in range(warmup_iters):
_ = next(iter_dataset)
for j in tqdm(range(num_samples), desc="Profiling throughput", unit="call"):
start_time = time.perf_counter()
_sample = next(iter_dataset)
end_time = time.perf_counter()
next_times[j] = end_time - start_time
return next_times
def profile_init(dataset_factory: Callable[[], StreamingLeRobotDataset], num_runs: int) -> np.ndarray:
"""Measure time-to-first-sample by re-instantiating the dataset ``num_runs`` times.
Using a factory avoids unsafe ``deepcopy`` of objects that may own threads or file handles.
"""
init_times = np.zeros(num_runs)
for i in tqdm(range(num_runs), desc="Profiling init", unit="run"):
fresh_dataset = dataset_factory()
iter_dataset = iter(fresh_dataset)
start_time = time.perf_counter()
_ = next(iter_dataset)
end_time = time.perf_counter()
init_times[i] = end_time - start_time
return init_times
def profile_randomness(dataset: StreamingLeRobotDataset, num_samples: int) -> float:
"""Measure how random the sample order is via correlation.
Returns a Pearson correlation between retrieved frame index and iteration index.
- ~0: random order
- ~+1: strictly increasing (in-order)
- ~-1: strictly decreasing (reverse order)
"""
frame_indices = np.zeros(num_samples, dtype=float)
iter_indices = np.arange(num_samples, dtype=float)
iter_dataset = iter(dataset)
for i in tqdm(range(num_samples), desc="Profiling randomness", unit="sample"):
sample = next(iter_dataset)
if "index" in sample:
frame_idx_value = sample["index"]
elif "frame_index" in sample:
frame_idx_value = sample["frame_index"]
else:
raise KeyError("Sample is missing 'index' (or 'frame_index') required to compute randomness.")
frame_indices[i] = float(frame_idx_value)
# Guard against degenerate cases
if num_samples < 2 or np.std(frame_indices) == 0 or np.std(iter_indices) == 0:
return np.nan, None
correlation = float(np.corrcoef(frame_indices, iter_indices)[0, 1])
return correlation
def profile_streaming_dataset(
repo_id: str,
delta_timestamps: dict[str, list[float]] | None = None,
num_samples: int = 100,
warmup_iters: int = 10,
buffer_size: int = 1000,
) -> tuple[np.ndarray, np.ndarray, float]:
"""Run init, throughput, and randomness profiles on a StreamingLeRobotDataset."""
def dataset_factory() -> StreamingLeRobotDataset:
return StreamingLeRobotDataset(repo_id, delta_timestamps=delta_timestamps, buffer_size=buffer_size)
# Measure init by repeated instantiation
init_times = profile_init(dataset_factory, num_runs=warmup_iters)
# Throughput and randomness on a single fresh dataset instance
dataset = dataset_factory()
next_times = profile_throughput(dataset, num_samples=num_samples, warmup_iters=warmup_iters)
correlation = profile_randomness(dataset, num_samples=num_samples)
return init_times, next_times, correlation
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Profile StreamingLeRobotDataset performance metrics.")
parser.add_argument(
"--repo-id",
type=str,
default="lerobot/svla_so101_pickplace",
help="Dataset repo_id to profile.",
)
parser.add_argument(
"--num-samples",
type=int,
default=1000,
help="Number of samples to measure for throughput/randomness.",
)
parser.add_argument(
"--warmup-iters",
type=int,
default=10,
help="Number of iterations for init and throughput warmup.",
)
parser.add_argument(
"--buffer-size",
type=int,
default=1000,
help="Buffer size for the streaming dataset.",
)
parser.add_argument(
"--with-delta-timestamps",
action="store_true",
help="Profile with delta timestamps.",
)
parser.add_argument(
"--compare-with-local",
action="store_true",
help="Also profile local LeRobotDataset throughput for comparison.",
)
parser.add_argument(
"--outdir",
type=str,
default=os.path.join("outputs", "benchmarks"),
help="Directory to write CSVs/PNGs to.",
)
args = parser.parse_args()
delta_timestamps = (
None
if not args.with_delta_timestamps
else {
"observation.state": [-2.0, -1.0, -0.5, 0.0, 0.5, 1.0],
"action": [
-0.1,
0.0,
0.1,
0.2,
0.3,
0.4,
0.5,
0.6,
0.7,
0.8,
0.9,
1.0,
],
}
)
init_times, next_times, correlation = profile_streaming_dataset(
repo_id=args.repo_id,
delta_timestamps=delta_timestamps,
num_samples=args.num_samples,
warmup_iters=args.warmup_iters,
buffer_size=args.buffer_size,
)
os.makedirs(args.outdir, exist_ok=True)
repo_id_str = args.repo_id.replace("/", "-")
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
name_suffix = f"{repo_id_str}_buf{args.buffer_size}_{date_str}"
# Visualization disabled by default; figures are not created or saved.
init_df = pd.DataFrame({"init_times": init_times})
next_df = pd.DataFrame({"next_times": next_times})
correlation_df = pd.DataFrame({"correlation": [correlation]})
init_df.to_csv(os.path.join(args.outdir, f"init_times_{name_suffix}.csv"), index=False)
next_df.to_csv(os.path.join(args.outdir, f"next_times_{name_suffix}.csv"), index=False)
correlation_df.to_csv(os.path.join(args.outdir, f"correlation_{name_suffix}.csv"), index=False)
if args.compare_with_local:
# Profile local non-streaming dataset throughput for comparison
local_ds = LeRobotDataset(args.repo_id, delta_timestamps=delta_timestamps)
local_next_times = profile_throughput_indexed(
local_ds, num_samples=args.num_samples, warmup_iters=args.warmup_iters
)
local_df = pd.DataFrame({"next_times": local_next_times})
local_df.to_csv(
os.path.join(args.outdir, f"next_times_local_{repo_id_str}_{date_str}.csv"),
index=False,
)