mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-18 00:37:10 +00:00
Report episode cache fill stage timings
This commit is contained in:
@@ -308,14 +308,21 @@ def run_fetch_pool(
|
||||
open_decoders=False,
|
||||
) as cache:
|
||||
elapsed = _fill_cache(cache, episodes)
|
||||
timings = cache.timing_summary()
|
||||
byte_count = _bytes_for(manifest, episodes)
|
||||
episode_mb = byte_count / len(episodes) / 1024**2
|
||||
job_count = max(timings["jobs"], 1.0)
|
||||
return {
|
||||
"fetch_s": elapsed,
|
||||
"fetch_mbps": byte_count / elapsed / 1024**2,
|
||||
"fetch_episodes_s": len(episodes) / elapsed,
|
||||
"episode_mb": episode_mb,
|
||||
"avg_mb_miss": byte_count / (len(episodes) * len(manifest.video_keys)) / 1024**2,
|
||||
"jobs": timings["jobs"],
|
||||
"lookup_ms": timings["lookup_s"] * 1000 / job_count,
|
||||
"range_fetch_ms": timings["fetch_s"] * 1000 / job_count,
|
||||
"synthesize_ms": timings["synthesize_s"] * 1000 / job_count,
|
||||
"store_ms": timings["store_s"] * 1000 / job_count,
|
||||
}
|
||||
|
||||
|
||||
@@ -563,6 +570,14 @@ def run_indexed_strategy(
|
||||
f"{_format_duration(estimated_benchmark_s)} | {_format_duration(estimated_dataset_s)} | "
|
||||
f"{fetch_pool['avg_mb_miss']:.1f} | {args.workers} workers, no decoder open/frame decode |"
|
||||
)
|
||||
print()
|
||||
print("| Camera Job Stage | avg ms/job |")
|
||||
print("|---|---:|")
|
||||
print(f"| manifest lookup | {fetch_pool['lookup_ms']:.3f} |")
|
||||
print(f"| remote byte-range fetch | {fetch_pool['range_fetch_ms']:.3f} |")
|
||||
print(f"| synthesize mini-MP4 | {fetch_pool['synthesize_ms']:.3f} |")
|
||||
print(f"| store in shared cache | {fetch_pool['store_ms']:.3f} |")
|
||||
print(f"| camera jobs | {fetch_pool['jobs']:.0f} |")
|
||||
|
||||
if args.include_decode:
|
||||
timestamps = _timestamps(manifest, episodes, args.frames_per_episode, args.seed + 1)
|
||||
|
||||
@@ -550,6 +550,13 @@ class EpisodeByteCache:
|
||||
self._futures: dict[tuple[int, str], Future[dict[str, Any]]] = {}
|
||||
self._bytes = 0
|
||||
self._lock = threading.Lock()
|
||||
self._timing_totals = {
|
||||
"lookup_s": 0.0,
|
||||
"fetch_s": 0.0,
|
||||
"synthesize_s": 0.0,
|
||||
"store_s": 0.0,
|
||||
"jobs": 0.0,
|
||||
}
|
||||
|
||||
def close(self) -> None:
|
||||
self._pool.shutdown(wait=True)
|
||||
@@ -597,6 +604,10 @@ class EpisodeByteCache:
|
||||
fps = metadata.num_frames / duration
|
||||
return decoder.get_frames_at(indices=[round(ts * fps) for ts in local_ts]).data
|
||||
|
||||
def timing_summary(self) -> dict[str, float]:
|
||||
with self._lock:
|
||||
return dict(self._timing_totals)
|
||||
|
||||
def _submit(self, episode_index: int, camera_key: str) -> Future[dict[str, Any]]:
|
||||
key = (episode_index, camera_key)
|
||||
with self._lock:
|
||||
@@ -619,6 +630,7 @@ class EpisodeByteCache:
|
||||
return entry
|
||||
future = self._submit(episode_index, camera_key)
|
||||
entry = future.result()
|
||||
store_start = time.perf_counter()
|
||||
with self._lock:
|
||||
self._futures.pop(key, None)
|
||||
existing = self._cache.get(key)
|
||||
@@ -628,6 +640,13 @@ class EpisodeByteCache:
|
||||
self._cache[key] = entry
|
||||
self._bytes += len(entry["bytes"])
|
||||
self._evict_locked()
|
||||
timings = entry.pop("_timings", None)
|
||||
if timings is not None:
|
||||
self._timing_totals["lookup_s"] += timings["lookup_s"]
|
||||
self._timing_totals["fetch_s"] += timings["fetch_s"]
|
||||
self._timing_totals["synthesize_s"] += timings["synthesize_s"]
|
||||
self._timing_totals["store_s"] += time.perf_counter() - store_start
|
||||
self._timing_totals["jobs"] += 1
|
||||
return entry
|
||||
|
||||
def _evict_locked(self) -> None:
|
||||
@@ -636,17 +655,36 @@ class EpisodeByteCache:
|
||||
self._bytes -= len(entry["bytes"])
|
||||
|
||||
def _fetch_and_synthesize(self, episode_index: int, camera_key: str) -> dict[str, Any]:
|
||||
lookup_start = time.perf_counter()
|
||||
span = self.manifest.lookup(episode_index, camera_key)
|
||||
file_record = self.manifest.file_lookup(span.file_id)
|
||||
sample_slice = Mp4SampleSlice(
|
||||
sample_lo=span.sample_lo,
|
||||
sample_hi=span.sample_hi,
|
||||
byte_offset=span.mdat_offset,
|
||||
byte_length=span.mdat_length,
|
||||
source_start_pts=span.source_start_pts,
|
||||
)
|
||||
lookup_s = time.perf_counter() - lookup_start
|
||||
fetch_start = time.perf_counter()
|
||||
payload = self.fetcher.read_range(file_record.file_path, span.mdat_offset, span.mdat_length)
|
||||
fetch_s = time.perf_counter() - fetch_start
|
||||
if len(payload) != span.mdat_length:
|
||||
raise OSError(
|
||||
f"Short read for {file_record.file_path}: expected {span.mdat_length}, got {len(payload)}"
|
||||
)
|
||||
mp4_bytes = synthesize_mp4(
|
||||
file_record.mp4, self.manifest.sample_slice(episode_index, camera_key), payload
|
||||
)
|
||||
entry: dict[str, Any] = {"bytes": mp4_bytes, "decoder": None}
|
||||
synthesize_start = time.perf_counter()
|
||||
mp4_bytes = synthesize_mp4(file_record.mp4, sample_slice, payload)
|
||||
synthesize_s = time.perf_counter() - synthesize_start
|
||||
entry: dict[str, Any] = {
|
||||
"bytes": mp4_bytes,
|
||||
"decoder": None,
|
||||
"_timings": {
|
||||
"lookup_s": lookup_s,
|
||||
"fetch_s": fetch_s,
|
||||
"synthesize_s": synthesize_s,
|
||||
},
|
||||
}
|
||||
if self.open_decoders:
|
||||
entry["decoder"] = open_video_decoder(io.BytesIO(mp4_bytes))
|
||||
return entry
|
||||
|
||||
Reference in New Issue
Block a user