mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-11 14:49:43 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1de2a4a828 |
@@ -0,0 +1,234 @@
|
||||
import argparse
|
||||
import datetime
|
||||
import os
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from tqdm import tqdm
|
||||
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.streaming_dataset import StreamingLeRobotDataset
|
||||
|
||||
|
||||
def profile_throughput_indexed(
|
||||
dataset: LeRobotDataset, num_samples: int, warmup_iters: int = 3
|
||||
) -> np.ndarray:
|
||||
"""Measure per-item access time on an indexable LeRobotDataset.
|
||||
|
||||
Accesses dataset[i % len(dataset)] for ``num_samples`` iterations, with an initial warmup.
|
||||
"""
|
||||
next_times = np.zeros(num_samples)
|
||||
total = len(dataset)
|
||||
|
||||
# warmup
|
||||
for k in range(warmup_iters):
|
||||
_ = dataset[k % total]
|
||||
|
||||
for j in tqdm(range(num_samples), desc="Profiling dataset throughput", unit="item"):
|
||||
start_time = time.perf_counter()
|
||||
_ = dataset[j % total]
|
||||
end_time = time.perf_counter()
|
||||
next_times[j] = end_time - start_time
|
||||
|
||||
return next_times
|
||||
|
||||
|
||||
def profile_throughput(
|
||||
dataset: StreamingLeRobotDataset, num_samples: int, warmup_iters: int = 3
|
||||
) -> np.ndarray:
|
||||
"""Measure ``.next()`` call latency on a streaming dataset.
|
||||
|
||||
Performs a configurable warmup. This does not numerically "normalize" times; it simply
|
||||
avoids including initialization overhead in the timing window.
|
||||
"""
|
||||
next_times = np.zeros(num_samples)
|
||||
iter_dataset = iter(dataset)
|
||||
|
||||
# warmup
|
||||
for _ in range(warmup_iters):
|
||||
_ = next(iter_dataset)
|
||||
|
||||
for j in tqdm(range(num_samples), desc="Profiling throughput", unit="call"):
|
||||
start_time = time.perf_counter()
|
||||
_sample = next(iter_dataset)
|
||||
end_time = time.perf_counter()
|
||||
next_times[j] = end_time - start_time
|
||||
|
||||
return next_times
|
||||
|
||||
|
||||
def profile_init(dataset_factory: Callable[[], StreamingLeRobotDataset], num_runs: int) -> np.ndarray:
|
||||
"""Measure time-to-first-sample by re-instantiating the dataset ``num_runs`` times.
|
||||
|
||||
Using a factory avoids unsafe ``deepcopy`` of objects that may own threads or file handles.
|
||||
"""
|
||||
init_times = np.zeros(num_runs)
|
||||
for i in tqdm(range(num_runs), desc="Profiling init", unit="run"):
|
||||
fresh_dataset = dataset_factory()
|
||||
iter_dataset = iter(fresh_dataset)
|
||||
start_time = time.perf_counter()
|
||||
_ = next(iter_dataset)
|
||||
end_time = time.perf_counter()
|
||||
init_times[i] = end_time - start_time
|
||||
|
||||
return init_times
|
||||
|
||||
|
||||
def profile_randomness(dataset: StreamingLeRobotDataset, num_samples: int) -> float:
|
||||
"""Measure how random the sample order is via correlation.
|
||||
|
||||
Returns a Pearson correlation between retrieved frame index and iteration index.
|
||||
- ~0: random order
|
||||
- ~+1: strictly increasing (in-order)
|
||||
- ~-1: strictly decreasing (reverse order)
|
||||
"""
|
||||
frame_indices = np.zeros(num_samples, dtype=float)
|
||||
iter_indices = np.arange(num_samples, dtype=float)
|
||||
|
||||
iter_dataset = iter(dataset)
|
||||
|
||||
for i in tqdm(range(num_samples), desc="Profiling randomness", unit="sample"):
|
||||
sample = next(iter_dataset)
|
||||
if "index" in sample:
|
||||
frame_idx_value = sample["index"]
|
||||
elif "frame_index" in sample:
|
||||
frame_idx_value = sample["frame_index"]
|
||||
else:
|
||||
raise KeyError("Sample is missing 'index' (or 'frame_index') required to compute randomness.")
|
||||
frame_indices[i] = float(frame_idx_value)
|
||||
|
||||
# Guard against degenerate cases
|
||||
if num_samples < 2 or np.std(frame_indices) == 0 or np.std(iter_indices) == 0:
|
||||
return np.nan, None
|
||||
|
||||
correlation = float(np.corrcoef(frame_indices, iter_indices)[0, 1])
|
||||
return correlation
|
||||
|
||||
|
||||
def profile_streaming_dataset(
|
||||
repo_id: str,
|
||||
delta_timestamps: dict[str, list[float]] | None = None,
|
||||
num_samples: int = 100,
|
||||
warmup_iters: int = 10,
|
||||
buffer_size: int = 1000,
|
||||
) -> tuple[np.ndarray, np.ndarray, float]:
|
||||
"""Run init, throughput, and randomness profiles on a StreamingLeRobotDataset."""
|
||||
|
||||
def dataset_factory() -> StreamingLeRobotDataset:
|
||||
return StreamingLeRobotDataset(repo_id, delta_timestamps=delta_timestamps, buffer_size=buffer_size)
|
||||
|
||||
# Measure init by repeated instantiation
|
||||
init_times = profile_init(dataset_factory, num_runs=warmup_iters)
|
||||
|
||||
# Throughput and randomness on a single fresh dataset instance
|
||||
dataset = dataset_factory()
|
||||
next_times = profile_throughput(dataset, num_samples=num_samples, warmup_iters=warmup_iters)
|
||||
correlation = profile_randomness(dataset, num_samples=num_samples)
|
||||
|
||||
return init_times, next_times, correlation
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Profile StreamingLeRobotDataset performance metrics.")
|
||||
parser.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
default="lerobot/svla_so101_pickplace",
|
||||
help="Dataset repo_id to profile.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--num-samples",
|
||||
type=int,
|
||||
default=1000,
|
||||
help="Number of samples to measure for throughput/randomness.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--warmup-iters",
|
||||
type=int,
|
||||
default=10,
|
||||
help="Number of iterations for init and throughput warmup.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--buffer-size",
|
||||
type=int,
|
||||
default=1000,
|
||||
help="Buffer size for the streaming dataset.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--with-delta-timestamps",
|
||||
action="store_true",
|
||||
help="Profile with delta timestamps.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--compare-with-local",
|
||||
action="store_true",
|
||||
help="Also profile local LeRobotDataset throughput for comparison.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--outdir",
|
||||
type=str,
|
||||
default=os.path.join("outputs", "benchmarks"),
|
||||
help="Directory to write CSVs/PNGs to.",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
delta_timestamps = (
|
||||
None
|
||||
if not args.with_delta_timestamps
|
||||
else {
|
||||
"observation.state": [-2.0, -1.0, -0.5, 0.0, 0.5, 1.0],
|
||||
"action": [
|
||||
-0.1,
|
||||
0.0,
|
||||
0.1,
|
||||
0.2,
|
||||
0.3,
|
||||
0.4,
|
||||
0.5,
|
||||
0.6,
|
||||
0.7,
|
||||
0.8,
|
||||
0.9,
|
||||
1.0,
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
init_times, next_times, correlation = profile_streaming_dataset(
|
||||
repo_id=args.repo_id,
|
||||
delta_timestamps=delta_timestamps,
|
||||
num_samples=args.num_samples,
|
||||
warmup_iters=args.warmup_iters,
|
||||
buffer_size=args.buffer_size,
|
||||
)
|
||||
|
||||
os.makedirs(args.outdir, exist_ok=True)
|
||||
|
||||
repo_id_str = args.repo_id.replace("/", "-")
|
||||
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
|
||||
name_suffix = f"{repo_id_str}_buf{args.buffer_size}_{date_str}"
|
||||
|
||||
# Visualization disabled by default; figures are not created or saved.
|
||||
|
||||
init_df = pd.DataFrame({"init_times": init_times})
|
||||
next_df = pd.DataFrame({"next_times": next_times})
|
||||
correlation_df = pd.DataFrame({"correlation": [correlation]})
|
||||
|
||||
init_df.to_csv(os.path.join(args.outdir, f"init_times_{name_suffix}.csv"), index=False)
|
||||
next_df.to_csv(os.path.join(args.outdir, f"next_times_{name_suffix}.csv"), index=False)
|
||||
correlation_df.to_csv(os.path.join(args.outdir, f"correlation_{name_suffix}.csv"), index=False)
|
||||
|
||||
if args.compare_with_local:
|
||||
# Profile local non-streaming dataset throughput for comparison
|
||||
local_ds = LeRobotDataset(args.repo_id, delta_timestamps=delta_timestamps)
|
||||
local_next_times = profile_throughput_indexed(
|
||||
local_ds, num_samples=args.num_samples, warmup_iters=args.warmup_iters
|
||||
)
|
||||
local_df = pd.DataFrame({"next_times": local_next_times})
|
||||
local_df.to_csv(
|
||||
os.path.join(args.outdir, f"next_times_local_{repo_id_str}_{date_str}.csv"),
|
||||
index=False,
|
||||
)
|
||||
Reference in New Issue
Block a user