mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-14 16:19:45 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1de2a4a828 |
@@ -0,0 +1,234 @@
|
|||||||
|
import argparse
|
||||||
|
import datetime
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from collections.abc import Callable
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||||
|
from lerobot.datasets.streaming_dataset import StreamingLeRobotDataset
|
||||||
|
|
||||||
|
|
||||||
|
def profile_throughput_indexed(
|
||||||
|
dataset: LeRobotDataset, num_samples: int, warmup_iters: int = 3
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Measure per-item access time on an indexable LeRobotDataset.
|
||||||
|
|
||||||
|
Accesses dataset[i % len(dataset)] for ``num_samples`` iterations, with an initial warmup.
|
||||||
|
"""
|
||||||
|
next_times = np.zeros(num_samples)
|
||||||
|
total = len(dataset)
|
||||||
|
|
||||||
|
# warmup
|
||||||
|
for k in range(warmup_iters):
|
||||||
|
_ = dataset[k % total]
|
||||||
|
|
||||||
|
for j in tqdm(range(num_samples), desc="Profiling dataset throughput", unit="item"):
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
_ = dataset[j % total]
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
next_times[j] = end_time - start_time
|
||||||
|
|
||||||
|
return next_times
|
||||||
|
|
||||||
|
|
||||||
|
def profile_throughput(
|
||||||
|
dataset: StreamingLeRobotDataset, num_samples: int, warmup_iters: int = 3
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Measure ``.next()`` call latency on a streaming dataset.
|
||||||
|
|
||||||
|
Performs a configurable warmup. This does not numerically "normalize" times; it simply
|
||||||
|
avoids including initialization overhead in the timing window.
|
||||||
|
"""
|
||||||
|
next_times = np.zeros(num_samples)
|
||||||
|
iter_dataset = iter(dataset)
|
||||||
|
|
||||||
|
# warmup
|
||||||
|
for _ in range(warmup_iters):
|
||||||
|
_ = next(iter_dataset)
|
||||||
|
|
||||||
|
for j in tqdm(range(num_samples), desc="Profiling throughput", unit="call"):
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
_sample = next(iter_dataset)
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
next_times[j] = end_time - start_time
|
||||||
|
|
||||||
|
return next_times
|
||||||
|
|
||||||
|
|
||||||
|
def profile_init(dataset_factory: Callable[[], StreamingLeRobotDataset], num_runs: int) -> np.ndarray:
|
||||||
|
"""Measure time-to-first-sample by re-instantiating the dataset ``num_runs`` times.
|
||||||
|
|
||||||
|
Using a factory avoids unsafe ``deepcopy`` of objects that may own threads or file handles.
|
||||||
|
"""
|
||||||
|
init_times = np.zeros(num_runs)
|
||||||
|
for i in tqdm(range(num_runs), desc="Profiling init", unit="run"):
|
||||||
|
fresh_dataset = dataset_factory()
|
||||||
|
iter_dataset = iter(fresh_dataset)
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
_ = next(iter_dataset)
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
init_times[i] = end_time - start_time
|
||||||
|
|
||||||
|
return init_times
|
||||||
|
|
||||||
|
|
||||||
|
def profile_randomness(dataset: StreamingLeRobotDataset, num_samples: int) -> float:
|
||||||
|
"""Measure how random the sample order is via correlation.
|
||||||
|
|
||||||
|
Returns a Pearson correlation between retrieved frame index and iteration index.
|
||||||
|
- ~0: random order
|
||||||
|
- ~+1: strictly increasing (in-order)
|
||||||
|
- ~-1: strictly decreasing (reverse order)
|
||||||
|
"""
|
||||||
|
frame_indices = np.zeros(num_samples, dtype=float)
|
||||||
|
iter_indices = np.arange(num_samples, dtype=float)
|
||||||
|
|
||||||
|
iter_dataset = iter(dataset)
|
||||||
|
|
||||||
|
for i in tqdm(range(num_samples), desc="Profiling randomness", unit="sample"):
|
||||||
|
sample = next(iter_dataset)
|
||||||
|
if "index" in sample:
|
||||||
|
frame_idx_value = sample["index"]
|
||||||
|
elif "frame_index" in sample:
|
||||||
|
frame_idx_value = sample["frame_index"]
|
||||||
|
else:
|
||||||
|
raise KeyError("Sample is missing 'index' (or 'frame_index') required to compute randomness.")
|
||||||
|
frame_indices[i] = float(frame_idx_value)
|
||||||
|
|
||||||
|
# Guard against degenerate cases
|
||||||
|
if num_samples < 2 or np.std(frame_indices) == 0 or np.std(iter_indices) == 0:
|
||||||
|
return np.nan, None
|
||||||
|
|
||||||
|
correlation = float(np.corrcoef(frame_indices, iter_indices)[0, 1])
|
||||||
|
return correlation
|
||||||
|
|
||||||
|
|
||||||
|
def profile_streaming_dataset(
|
||||||
|
repo_id: str,
|
||||||
|
delta_timestamps: dict[str, list[float]] | None = None,
|
||||||
|
num_samples: int = 100,
|
||||||
|
warmup_iters: int = 10,
|
||||||
|
buffer_size: int = 1000,
|
||||||
|
) -> tuple[np.ndarray, np.ndarray, float]:
|
||||||
|
"""Run init, throughput, and randomness profiles on a StreamingLeRobotDataset."""
|
||||||
|
|
||||||
|
def dataset_factory() -> StreamingLeRobotDataset:
|
||||||
|
return StreamingLeRobotDataset(repo_id, delta_timestamps=delta_timestamps, buffer_size=buffer_size)
|
||||||
|
|
||||||
|
# Measure init by repeated instantiation
|
||||||
|
init_times = profile_init(dataset_factory, num_runs=warmup_iters)
|
||||||
|
|
||||||
|
# Throughput and randomness on a single fresh dataset instance
|
||||||
|
dataset = dataset_factory()
|
||||||
|
next_times = profile_throughput(dataset, num_samples=num_samples, warmup_iters=warmup_iters)
|
||||||
|
correlation = profile_randomness(dataset, num_samples=num_samples)
|
||||||
|
|
||||||
|
return init_times, next_times, correlation
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="Profile StreamingLeRobotDataset performance metrics.")
|
||||||
|
parser.add_argument(
|
||||||
|
"--repo-id",
|
||||||
|
type=str,
|
||||||
|
default="lerobot/svla_so101_pickplace",
|
||||||
|
help="Dataset repo_id to profile.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--num-samples",
|
||||||
|
type=int,
|
||||||
|
default=1000,
|
||||||
|
help="Number of samples to measure for throughput/randomness.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--warmup-iters",
|
||||||
|
type=int,
|
||||||
|
default=10,
|
||||||
|
help="Number of iterations for init and throughput warmup.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--buffer-size",
|
||||||
|
type=int,
|
||||||
|
default=1000,
|
||||||
|
help="Buffer size for the streaming dataset.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--with-delta-timestamps",
|
||||||
|
action="store_true",
|
||||||
|
help="Profile with delta timestamps.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--compare-with-local",
|
||||||
|
action="store_true",
|
||||||
|
help="Also profile local LeRobotDataset throughput for comparison.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--outdir",
|
||||||
|
type=str,
|
||||||
|
default=os.path.join("outputs", "benchmarks"),
|
||||||
|
help="Directory to write CSVs/PNGs to.",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
delta_timestamps = (
|
||||||
|
None
|
||||||
|
if not args.with_delta_timestamps
|
||||||
|
else {
|
||||||
|
"observation.state": [-2.0, -1.0, -0.5, 0.0, 0.5, 1.0],
|
||||||
|
"action": [
|
||||||
|
-0.1,
|
||||||
|
0.0,
|
||||||
|
0.1,
|
||||||
|
0.2,
|
||||||
|
0.3,
|
||||||
|
0.4,
|
||||||
|
0.5,
|
||||||
|
0.6,
|
||||||
|
0.7,
|
||||||
|
0.8,
|
||||||
|
0.9,
|
||||||
|
1.0,
|
||||||
|
],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
init_times, next_times, correlation = profile_streaming_dataset(
|
||||||
|
repo_id=args.repo_id,
|
||||||
|
delta_timestamps=delta_timestamps,
|
||||||
|
num_samples=args.num_samples,
|
||||||
|
warmup_iters=args.warmup_iters,
|
||||||
|
buffer_size=args.buffer_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
os.makedirs(args.outdir, exist_ok=True)
|
||||||
|
|
||||||
|
repo_id_str = args.repo_id.replace("/", "-")
|
||||||
|
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
|
||||||
|
name_suffix = f"{repo_id_str}_buf{args.buffer_size}_{date_str}"
|
||||||
|
|
||||||
|
# Visualization disabled by default; figures are not created or saved.
|
||||||
|
|
||||||
|
init_df = pd.DataFrame({"init_times": init_times})
|
||||||
|
next_df = pd.DataFrame({"next_times": next_times})
|
||||||
|
correlation_df = pd.DataFrame({"correlation": [correlation]})
|
||||||
|
|
||||||
|
init_df.to_csv(os.path.join(args.outdir, f"init_times_{name_suffix}.csv"), index=False)
|
||||||
|
next_df.to_csv(os.path.join(args.outdir, f"next_times_{name_suffix}.csv"), index=False)
|
||||||
|
correlation_df.to_csv(os.path.join(args.outdir, f"correlation_{name_suffix}.csv"), index=False)
|
||||||
|
|
||||||
|
if args.compare_with_local:
|
||||||
|
# Profile local non-streaming dataset throughput for comparison
|
||||||
|
local_ds = LeRobotDataset(args.repo_id, delta_timestamps=delta_timestamps)
|
||||||
|
local_next_times = profile_throughput_indexed(
|
||||||
|
local_ds, num_samples=args.num_samples, warmup_iters=args.warmup_iters
|
||||||
|
)
|
||||||
|
local_df = pd.DataFrame({"next_times": local_next_times})
|
||||||
|
local_df.to_csv(
|
||||||
|
os.path.join(args.outdir, f"next_times_local_{repo_id_str}_{date_str}.csv"),
|
||||||
|
index=False,
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user