From d99e1fe89d483f9562d84d3d6feb621516805976 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Wed, 17 Jun 2026 20:29:57 +0200 Subject: [PATCH] Report episode cache fill stage timings --- scripts/bench_episode_byte_cache.py | 15 ++++++ .../datasets/episode_video_streaming.py | 46 +++++++++++++++++-- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/scripts/bench_episode_byte_cache.py b/scripts/bench_episode_byte_cache.py index 8c0245ade..4b399532e 100644 --- a/scripts/bench_episode_byte_cache.py +++ b/scripts/bench_episode_byte_cache.py @@ -308,14 +308,21 @@ def run_fetch_pool( open_decoders=False, ) as cache: elapsed = _fill_cache(cache, episodes) + timings = cache.timing_summary() byte_count = _bytes_for(manifest, episodes) episode_mb = byte_count / len(episodes) / 1024**2 + job_count = max(timings["jobs"], 1.0) return { "fetch_s": elapsed, "fetch_mbps": byte_count / elapsed / 1024**2, "fetch_episodes_s": len(episodes) / elapsed, "episode_mb": episode_mb, "avg_mb_miss": byte_count / (len(episodes) * len(manifest.video_keys)) / 1024**2, + "jobs": timings["jobs"], + "lookup_ms": timings["lookup_s"] * 1000 / job_count, + "range_fetch_ms": timings["fetch_s"] * 1000 / job_count, + "synthesize_ms": timings["synthesize_s"] * 1000 / job_count, + "store_ms": timings["store_s"] * 1000 / job_count, } @@ -563,6 +570,14 @@ def run_indexed_strategy( f"{_format_duration(estimated_benchmark_s)} | {_format_duration(estimated_dataset_s)} | " f"{fetch_pool['avg_mb_miss']:.1f} | {args.workers} workers, no decoder open/frame decode |" ) + print() + print("| Camera Job Stage | avg ms/job |") + print("|---|---:|") + print(f"| manifest lookup | {fetch_pool['lookup_ms']:.3f} |") + print(f"| remote byte-range fetch | {fetch_pool['range_fetch_ms']:.3f} |") + print(f"| synthesize mini-MP4 | {fetch_pool['synthesize_ms']:.3f} |") + print(f"| store in shared cache | {fetch_pool['store_ms']:.3f} |") + print(f"| camera jobs | {fetch_pool['jobs']:.0f} |") if args.include_decode: timestamps = _timestamps(manifest, episodes, args.frames_per_episode, args.seed + 1) diff --git a/src/lerobot/datasets/episode_video_streaming.py b/src/lerobot/datasets/episode_video_streaming.py index aeda64f68..dff8aa596 100644 --- a/src/lerobot/datasets/episode_video_streaming.py +++ b/src/lerobot/datasets/episode_video_streaming.py @@ -550,6 +550,13 @@ class EpisodeByteCache: self._futures: dict[tuple[int, str], Future[dict[str, Any]]] = {} self._bytes = 0 self._lock = threading.Lock() + self._timing_totals = { + "lookup_s": 0.0, + "fetch_s": 0.0, + "synthesize_s": 0.0, + "store_s": 0.0, + "jobs": 0.0, + } def close(self) -> None: self._pool.shutdown(wait=True) @@ -597,6 +604,10 @@ class EpisodeByteCache: fps = metadata.num_frames / duration return decoder.get_frames_at(indices=[round(ts * fps) for ts in local_ts]).data + def timing_summary(self) -> dict[str, float]: + with self._lock: + return dict(self._timing_totals) + def _submit(self, episode_index: int, camera_key: str) -> Future[dict[str, Any]]: key = (episode_index, camera_key) with self._lock: @@ -619,6 +630,7 @@ class EpisodeByteCache: return entry future = self._submit(episode_index, camera_key) entry = future.result() + store_start = time.perf_counter() with self._lock: self._futures.pop(key, None) existing = self._cache.get(key) @@ -628,6 +640,13 @@ class EpisodeByteCache: self._cache[key] = entry self._bytes += len(entry["bytes"]) self._evict_locked() + timings = entry.pop("_timings", None) + if timings is not None: + self._timing_totals["lookup_s"] += timings["lookup_s"] + self._timing_totals["fetch_s"] += timings["fetch_s"] + self._timing_totals["synthesize_s"] += timings["synthesize_s"] + self._timing_totals["store_s"] += time.perf_counter() - store_start + self._timing_totals["jobs"] += 1 return entry def _evict_locked(self) -> None: @@ -636,17 +655,36 @@ class EpisodeByteCache: self._bytes -= len(entry["bytes"]) def _fetch_and_synthesize(self, episode_index: int, camera_key: str) -> dict[str, Any]: + lookup_start = time.perf_counter() span = self.manifest.lookup(episode_index, camera_key) file_record = self.manifest.file_lookup(span.file_id) + sample_slice = Mp4SampleSlice( + sample_lo=span.sample_lo, + sample_hi=span.sample_hi, + byte_offset=span.mdat_offset, + byte_length=span.mdat_length, + source_start_pts=span.source_start_pts, + ) + lookup_s = time.perf_counter() - lookup_start + fetch_start = time.perf_counter() payload = self.fetcher.read_range(file_record.file_path, span.mdat_offset, span.mdat_length) + fetch_s = time.perf_counter() - fetch_start if len(payload) != span.mdat_length: raise OSError( f"Short read for {file_record.file_path}: expected {span.mdat_length}, got {len(payload)}" ) - mp4_bytes = synthesize_mp4( - file_record.mp4, self.manifest.sample_slice(episode_index, camera_key), payload - ) - entry: dict[str, Any] = {"bytes": mp4_bytes, "decoder": None} + synthesize_start = time.perf_counter() + mp4_bytes = synthesize_mp4(file_record.mp4, sample_slice, payload) + synthesize_s = time.perf_counter() - synthesize_start + entry: dict[str, Any] = { + "bytes": mp4_bytes, + "decoder": None, + "_timings": { + "lookup_s": lookup_s, + "fetch_s": fetch_s, + "synthesize_s": synthesize_s, + }, + } if self.open_decoders: entry["decoder"] = open_video_decoder(io.BytesIO(mp4_bytes)) return entry