Compare commits

...

27 Commits

Author SHA1 Message Date
Pepijn 88843ed675 Fix episode pool stream benchmark pacing 2026-06-22 20:40:33 +02:00
Pepijn f2b5c4a47b Add distributed episode pool benchmark summaries 2026-06-22 17:08:02 +02:00
Pepijn 9202fcea96 Fix pool sampling camera timestamps 2026-06-22 16:44:37 +02:00
Pepijn ef47c35178 Benchmark random sampling from episode pool 2026-06-22 16:26:26 +02:00
Pepijn 6d6c82eb8c Add GOP window range benchmark 2026-06-22 15:10:21 +02:00
Pepijn 9201be92cb Log failed HTTP range attempts 2026-06-22 12:42:30 +02:00
Pepijn 0064a06205 Instrument HfFileSystem range requests 2026-06-22 12:07:59 +02:00
Pepijn 710171ccac Clarify native HTTP exception timing 2026-06-22 12:02:12 +02:00
Pepijn 0f8257443c Retry native HTTP timeout statuses 2026-06-22 11:35:28 +02:00
Pepijn 3a09d0c48a Track native HTTP failed attempt timing 2026-06-22 11:27:08 +02:00
Pepijn 03fc5e3ea9 Allow dynamic range timing counters 2026-06-19 09:43:57 +02:00
Pepijn 28c3e095bf Report native HTTP chunk timing 2026-06-19 09:34:22 +02:00
Pepijn 5bfb749a9b Log episode cache fill progress 2026-06-18 18:43:43 +02:00
Pepijn 51c023a7a1 Tune native HTTP range diagnostics 2026-06-17 21:50:05 +02:00
Pepijn 51ea18cb7a Allow native HTTP sidecar range diagnostics 2026-06-17 21:36:57 +02:00
Pepijn 04ab43b8d2 Report range read timing breakdown 2026-06-17 21:20:08 +02:00
Pepijn cdfe192491 Remove random frame benchmark path 2026-06-17 21:14:42 +02:00
Pepijn 3451e53452 Use HfFileSystem for sidecar episode benchmark 2026-06-17 21:01:43 +02:00
Pepijn 30849ce74f Report memory usage in cache benchmarks 2026-06-17 20:54:12 +02:00
Pepijn 7d6907c444 Add random frame range fetch benchmark 2026-06-17 20:48:46 +02:00
Pepijn d99e1fe89d Report episode cache fill stage timings 2026-06-17 20:29:57 +02:00
Pepijn 7fcde61b69 Report full dataset estimate in episode cache benchmark 2026-06-17 20:25:21 +02:00
Pepijn bdfe8f8ce9 Use full MP4 sidecar for episode cache benchmark 2026-06-17 20:22:04 +02:00
Pepijn 34d0495d03 Retry transient native HTTP range failures 2026-06-17 20:19:54 +02:00
Pepijn 834c282631 Make episode cache benchmark fetch-only by default 2026-06-17 20:16:30 +02:00
Pepijn f132885cbc Pin Hub range cache and datasets main sources 2026-06-17 19:46:41 +02:00
Pepijn d0686be2f5 Add episode video streaming byte cache 2026-06-17 19:31:02 +02:00
8 changed files with 3583 additions and 16 deletions
+3
View File
@@ -355,6 +355,8 @@ explicit = true
[tool.uv.sources]
torch = [{ index = "pytorch-cu128", marker = "sys_platform == 'linux'" }]
torchvision = [{ index = "pytorch-cu128", marker = "sys_platform == 'linux'" }]
huggingface-hub = { git = "https://github.com/huggingface/huggingface_hub.git", branch = "feat/hffs-cache-cdn-range-reads" }
datasets = { git = "https://github.com/huggingface/datasets.git", branch = "main" }
[tool.setuptools.package-data]
lerobot = ["envs/*.json", "annotations/steerable_pipeline/prompts/*.txt"]
@@ -421,6 +423,7 @@ exclude_dirs = [
skips = ["B101", "B311", "B404", "B603", "B615"]
[tool.typos]
default.extend-words = { trak = "trak" }
default.extend-ignore-re = [
"(?Rm)^.*(#|//)\\s*spellchecker:disable-line$", # spellchecker:disable-line
"(?s)(#|//)\\s*spellchecker:off.*?\\n\\s*(#|//)\\s*spellchecker:on", # spellchecker:<on|off>
File diff suppressed because it is too large Load Diff
+93
View File
@@ -0,0 +1,93 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
from __future__ import annotations
import argparse
import time
from pathlib import Path
import fsspec
from lerobot.datasets.dataset_metadata import LeRobotDatasetMetadata
from lerobot.datasets.episode_video_streaming import EpisodeVideoManifest, assert_hf_hub_range_cache_branch
DEFAULT_REPO = "allenai/MolmoAct2-BimanualYAM-Dataset"
DEFAULT_REVISION = "e9f21ae15074330839f2ac25ed4b49d76dfa1f9c"
DEFAULT_DATA_ROOT = "hf://buckets/pepijn223/MolmoAct2-BimanualYAM-Dataset-bucket"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Build a reusable MP4 byte-index sidecar for streaming.")
parser.add_argument("--repo-id", default=DEFAULT_REPO)
parser.add_argument("--revision", default=DEFAULT_REVISION)
parser.add_argument("--data-root", default=DEFAULT_DATA_ROOT)
parser.add_argument("--output", required=True)
parser.add_argument("--episodes", type=int, default=None)
parser.add_argument("--workers", type=int, default=8)
parser.add_argument("--range-backend", choices=("fsspec", "native-http"), default="native-http")
parser.add_argument("--max-probe-mb", type=int, default=64)
parser.add_argument(
"--no-push", action="store_true", help="Do not upload the sidecar to data_root/meta/mp4-sidecars."
)
parser.add_argument("--no-hub-branch-assert", action="store_true")
return parser.parse_args()
def push_sidecar(local_path: str, data_root: str) -> list[str]:
if not data_root.startswith("hf://"):
return []
local = Path(local_path)
fs = fsspec.filesystem("hf")
remote_dir = f"{data_root.rstrip('/')}/meta/mp4-sidecars"
remote_paths = [f"{remote_dir}/{local.name}"]
for remote in remote_paths:
fs.put(str(local), remote)
return remote_paths
def main() -> None:
args = parse_args()
if args.data_root.startswith("hf://") and not args.no_hub_branch_assert:
assert_hf_hub_range_cache_branch()
meta = LeRobotDatasetMetadata(args.repo_id, revision=args.revision)
meta.ensure_readable()
total = (
int(meta.total_episodes) if args.episodes is None else min(args.episodes, int(meta.total_episodes))
)
rel_paths = sorted(
{str(meta.get_video_file_path(ep_idx, key)) for ep_idx in range(total) for key in meta.video_keys}
)
start = time.perf_counter()
EpisodeVideoManifest.write_file_sidecar(
args.output,
rel_paths,
args.data_root,
range_backend=args.range_backend,
workers=args.workers,
max_probe_bytes=args.max_probe_mb * 1024 * 1024,
)
elapsed = time.perf_counter() - start
print(f"wrote {args.output}")
print(f"episodes={total} files={len(rel_paths)} elapsed_s={elapsed:.2f}")
if args.no_push:
print("push_skipped: --no-push")
else:
pushed = push_sidecar(args.output, args.data_root)
for remote in pushed:
print(f"pushed {remote}")
if __name__ == "__main__":
main()
+65
View File
@@ -0,0 +1,65 @@
#!/usr/bin/env python
from __future__ import annotations
import argparse
import json
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Summarize distributed episode pool benchmark JSON files.")
parser.add_argument("summaries", nargs="+", help="Rank summary JSON files.")
return parser.parse_args()
def _load(path: str) -> dict:
return json.loads(Path(path).read_text())
def _fmt(value: float) -> str:
return f"{value:.1f}"
def main() -> None:
args = parse_args()
rows = [_load(path) for path in args.summaries]
rows.sort(key=lambda row: int(row.get("distributed_shard_index", 0)))
total_bytes = sum(float(row.get("fetch_bytes", 0.0)) for row in rows)
max_fetch_s = max(float(row.get("fetch_s", 0.0)) for row in rows)
aggregate_mib_s = total_bytes / max_fetch_s / 1024**2 if max_fetch_s > 0 else float("inf")
summed_rank_mib_s = sum(float(row.get("fetch_mib_s", 0.0)) for row in rows)
total_decode_samples_s = sum(float(row.get("pool_decode_training_samples_s", 0.0)) for row in rows)
total_stream_samples_s = sum(float(row.get("pool_stream_actual_samples_s", 0.0)) for row in rows)
kept_up = all(bool(row.get("pool_stream_kept_up", 0.0)) for row in rows)
print("| Aggregate | value |")
print("|---|---:|")
print(f"| ranks | {len(rows)} |")
print(f"| total fetched GiB | {total_bytes / 1024**3:.2f} |")
print(f"| aggregate fetch MiB/s | {_fmt(aggregate_mib_s)} |")
print(f"| summed rank fetch MiB/s | {_fmt(summed_rank_mib_s)} |")
if total_decode_samples_s:
print(f"| aggregate resident decode samples/s | {_fmt(total_decode_samples_s)} |")
if total_stream_samples_s:
print(f"| aggregate stream samples/s | {_fmt(total_stream_samples_s)} |")
print(f"| all ranks kept up | {'yes' if kept_up else 'no'} |")
print()
print("| Rank | host | fetch MiB/s | fetch s | GiB | decode samples/s | stream samples/s | kept up |")
print("|---:|---|---:|---:|---:|---:|---:|---|")
for row in rows:
rank = int(row.get("distributed_shard_index", 0))
print(
f"| {rank} | {row.get('hostname', '')} | "
f"{_fmt(float(row.get('fetch_mib_s', 0.0)))} | "
f"{_fmt(float(row.get('fetch_s', 0.0)))} | "
f"{float(row.get('fetch_gib', 0.0)):.2f} | "
f"{_fmt(float(row.get('pool_decode_training_samples_s', 0.0)))} | "
f"{_fmt(float(row.get('pool_stream_actual_samples_s', 0.0)))} | "
f"{'yes' if row.get('pool_stream_kept_up', 0.0) else 'no'} |"
)
if __name__ == "__main__":
main()
File diff suppressed because it is too large Load Diff
+666
View File
@@ -0,0 +1,666 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
from __future__ import annotations
import struct
from collections.abc import Callable, Iterable
from dataclasses import dataclass
import numpy as np
@dataclass(frozen=True)
class Box:
type: bytes
start: int
header_size: int
end: int
@property
def payload_start(self) -> int:
return self.start + self.header_size
@property
def size(self) -> int:
return self.end - self.start
@dataclass(frozen=True)
class Mp4SampleSlice:
sample_lo: int
sample_hi: int
byte_offset: int
byte_length: int
source_start_pts: float
@dataclass(frozen=True)
class Mp4Index:
file_path: str
file_size: int
ftyp: bytes
moov_offset: int
mdat_offset: int
mdat_payload_offset: int
mdat_payload_size: int
faststart: bool
codec: str
timescale: int
duration: int
track_id: int
width: int
height: int
stsd_body: bytes
sample_pts: np.ndarray
sample_durations: np.ndarray
sample_sizes: np.ndarray
sample_offsets: np.ndarray
sync_samples: np.ndarray
def sample_slice(
self,
from_ts: float,
to_ts: float,
*,
keyframe_pad_s: float = 0.1,
keyframe_pad_fraction: float = 0.05,
file_size: int | None = None,
) -> Mp4SampleSlice:
if to_ts < from_ts:
raise ValueError(f"Invalid timestamp span: {from_ts=} {to_ts=}")
if len(self.sample_pts) == 0:
raise ValueError(f"{self.file_path} contains no indexed samples")
pad = max(keyframe_pad_s, (to_ts - from_ts) * keyframe_pad_fraction)
lo_ts = max(0.0, from_ts - pad)
hi_ts = to_ts + pad
lo = int(np.searchsorted(self.sample_pts, lo_ts, side="left"))
hi = int(np.searchsorted(self.sample_pts, hi_ts, side="right")) - 1
lo = min(max(lo, 0), len(self.sample_pts) - 1)
hi = min(max(hi, lo), len(self.sample_pts) - 1)
if len(self.sync_samples):
prev_sync = self.sync_samples[self.sync_samples <= lo]
if len(prev_sync):
lo = int(prev_sync[-1])
else:
lo = int(self.sync_samples[0])
if lo > hi:
hi = lo
offsets = self.sample_offsets[lo : hi + 1]
sizes = self.sample_sizes[lo : hi + 1]
slice_lo = int(offsets.min())
slice_hi = int((offsets + sizes).max())
if file_size is not None:
slice_hi = min(slice_hi, int(file_size))
return Mp4SampleSlice(
sample_lo=lo,
sample_hi=hi,
byte_offset=slice_lo,
byte_length=slice_hi - slice_lo,
source_start_pts=float(self.sample_pts[lo]),
)
def to_dict(self) -> dict:
return {
"file_path": self.file_path,
"file_size": self.file_size,
"ftyp": self.ftyp.hex(),
"moov_offset": self.moov_offset,
"mdat_offset": self.mdat_offset,
"mdat_payload_offset": self.mdat_payload_offset,
"mdat_payload_size": self.mdat_payload_size,
"faststart": self.faststart,
"codec": self.codec,
"timescale": self.timescale,
"duration": self.duration,
"track_id": self.track_id,
"width": self.width,
"height": self.height,
"stsd_body": self.stsd_body.hex(),
}
@classmethod
def from_dict(cls, data: dict, arrays: dict[str, np.ndarray]) -> Mp4Index:
return cls(
file_path=data["file_path"],
file_size=int(data["file_size"]),
ftyp=bytes.fromhex(data["ftyp"]),
moov_offset=int(data["moov_offset"]),
mdat_offset=int(data["mdat_offset"]),
mdat_payload_offset=int(data["mdat_payload_offset"]),
mdat_payload_size=int(data["mdat_payload_size"]),
faststart=bool(data["faststart"]),
codec=data["codec"],
timescale=int(data["timescale"]),
duration=int(data["duration"]),
track_id=int(data["track_id"]),
width=int(data["width"]),
height=int(data["height"]),
stsd_body=bytes.fromhex(data["stsd_body"]),
sample_pts=arrays["sample_pts"],
sample_durations=arrays["sample_durations"],
sample_sizes=arrays["sample_sizes"],
sample_offsets=arrays["sample_offsets"],
sync_samples=arrays["sync_samples"],
)
def fetch_mp4_index(
path: str,
read_range: Callable[[str, int, int], bytes],
*,
file_size: int,
header_probe_bytes: int = 4 * 1024 * 1024,
max_probe_bytes: int = 64 * 1024 * 1024,
) -> Mp4Index:
probe_size = min(header_probe_bytes, file_size)
while True:
data = read_range(path, 0, probe_size)
top = list(iter_boxes(data, 0, len(data), absolute_base=0, allow_truncated=True))
has_mdat = any(box.type == b"mdat" for box in top)
has_moov = any(box.type == b"moov" and box.end <= len(data) for box in top)
if has_mdat and has_moov:
return parse_mp4_index(path, data, file_size=file_size)
if probe_size >= min(max_probe_bytes, file_size):
if has_mdat and not has_moov:
tail_index = _fetch_tail_moov_index(path, read_range, data, top, file_size, max_probe_bytes)
if tail_index is not None:
return tail_index
missing = []
if not has_mdat:
missing.append("mdat")
if not has_moov:
missing.append("moov")
raise ValueError(
f"Could not find complete {'/'.join(missing)} in first {probe_size} bytes of {path}"
)
probe_size = min(probe_size * 2, max_probe_bytes, file_size)
def _fetch_tail_moov_index(
path: str,
read_range: Callable[[str, int, int], bytes],
prefix: bytes,
top_boxes: list[Box],
file_size: int,
max_probe_bytes: int,
) -> Mp4Index | None:
mdat_box = _one(top_boxes, b"mdat")
if mdat_box is None or mdat_box.end >= file_size:
return None
tail_offset = mdat_box.end
tail_length = min(max_probe_bytes, file_size - tail_offset)
tail = read_range(path, tail_offset, tail_length)
tail_boxes = list(iter_boxes(tail, 0, len(tail), absolute_base=tail_offset, allow_truncated=True))
moov_box = next(
(box for box in tail_boxes if box.type == b"moov" and box.end <= tail_offset + len(tail)), None
)
if moov_box is None:
return None
ftyp_box = _one(top_boxes, b"ftyp", required=False)
ftyp = (
prefix[ftyp_box.start : ftyp_box.end]
if ftyp_box is not None
else _box(b"ftyp", b"isom\0\0\2\0isomiso2mp41")
)
moov_start = moov_box.payload_start - tail_offset
moov_end = moov_box.end - tail_offset
return _parse_mp4_index_from_layout(
path,
file_size=file_size,
ftyp=ftyp,
moov_offset=moov_box.start,
moov=tail[moov_start:moov_end],
mdat_box=mdat_box,
)
def parse_mp4_index(path: str, data: bytes, *, file_size: int | None = None) -> Mp4Index:
if file_size is None:
file_size = len(data)
top = list(iter_boxes(data, 0, len(data), absolute_base=0, allow_truncated=True))
ftyp_box = _one(top, b"ftyp", required=False)
moov_box = _one(top, b"moov")
mdat_box = _one(top, b"mdat")
if moov_box.end > len(data):
raise ValueError(f"{path}: moov box is truncated")
moov = data[moov_box.payload_start : moov_box.end]
ftyp = (
data[ftyp_box.start : ftyp_box.end]
if ftyp_box is not None
else _box(b"ftyp", b"isom\0\0\2\0isomiso2mp41")
)
return _parse_mp4_index_from_layout(
path,
file_size=file_size,
ftyp=ftyp,
moov_offset=moov_box.start,
moov=moov,
mdat_box=mdat_box,
)
def _parse_mp4_index_from_layout(
path: str,
*,
file_size: int,
ftyp: bytes,
moov_offset: int,
moov: bytes,
mdat_box: Box,
) -> Mp4Index:
mvhd_timescale, mvhd_duration = _parse_mvhd(_find_descendant(moov, [b"mvhd"]))
trak_box, trak_payload = _find_video_trak(moov)
_ = trak_box
tkhd = _parse_tkhd(_find_descendant(trak_payload, [b"tkhd"]))
mdhd_timescale, mdhd_duration = _parse_mdhd(_find_descendant(trak_payload, [b"mdia", b"mdhd"]))
stbl = _find_descendant(trak_payload, [b"mdia", b"minf", b"stbl"])
stsd = _find_child(stbl, b"stsd")
stsd_body = stbl[stsd.payload_start : stsd.end]
codec = _parse_stsd_codec(stsd_body)
stts = _parse_stts(_payload(stbl, b"stts"))
sample_sizes = _parse_stsz(_payload(stbl, b"stsz"))
stsc = _parse_stsc(_payload(stbl, b"stsc"))
chunk_offsets = _parse_chunk_offsets(stbl)
sync_samples = _parse_stss(stbl, len(sample_sizes))
sample_durations = _expand_stts(stts, len(sample_sizes))
sample_pts_units = np.empty(len(sample_durations), dtype=np.int64)
if len(sample_durations):
sample_pts_units[0] = 0
if len(sample_durations) > 1:
sample_pts_units[1:] = np.cumsum(sample_durations[:-1], dtype=np.int64)
sample_pts = sample_pts_units.astype(np.float64) / float(mdhd_timescale)
sample_offsets = _sample_offsets(stsc, chunk_offsets, sample_sizes)
return Mp4Index(
file_path=path,
file_size=file_size,
ftyp=ftyp,
moov_offset=moov_offset,
mdat_offset=mdat_box.start,
mdat_payload_offset=mdat_box.payload_start,
mdat_payload_size=mdat_box.end - mdat_box.payload_start
if mdat_box.end <= file_size
else file_size - mdat_box.payload_start,
faststart=moov_offset < mdat_box.start,
codec=codec,
timescale=mdhd_timescale,
duration=mdhd_duration or mvhd_duration,
track_id=tkhd["track_id"],
width=tkhd["width"],
height=tkhd["height"],
stsd_body=stsd_body,
sample_pts=sample_pts,
sample_durations=sample_durations,
sample_sizes=sample_sizes,
sample_offsets=sample_offsets,
sync_samples=sync_samples,
)
def synthesize_mp4(index: Mp4Index, sample_slice: Mp4SampleSlice, mdat_payload: bytes) -> bytes:
lo = sample_slice.sample_lo
hi = sample_slice.sample_hi + 1
if lo < 0 or hi > len(index.sample_sizes) or lo >= hi:
raise ValueError(f"Invalid sample range [{lo}, {hi}) for {index.file_path}")
offsets = index.sample_offsets[lo:hi]
sizes = index.sample_sizes[lo:hi]
rel_offsets = offsets - sample_slice.byte_offset
if int(rel_offsets.min()) != 0:
raise ValueError("Sample slice must start at the minimum referenced sample offset")
if int((rel_offsets + sizes).max()) > len(mdat_payload):
raise ValueError("Sample slice does not cover all referenced samples")
durations = index.sample_durations[lo:hi]
sync = index.sync_samples[(index.sync_samples >= lo) & (index.sync_samples < hi)] - lo + 1
moov = _make_moov(index, durations, sizes, rel_offsets, sync, mdat_data_offset=0)
header_size = len(index.ftyp) + len(moov)
moov = _make_moov(index, durations, sizes, rel_offsets, sync, mdat_data_offset=header_size + 8)
return index.ftyp + moov + _box(b"mdat", mdat_payload)
def iter_boxes(
data: bytes,
start: int,
end: int,
*,
absolute_base: int = 0,
allow_truncated: bool = False,
) -> Iterable[Box]:
pos = start
while pos + 8 <= end:
size = struct.unpack_from(">I", data, pos)[0]
typ = data[pos + 4 : pos + 8]
header_size = 8
if size == 1:
if pos + 16 > end:
break
size = struct.unpack_from(">Q", data, pos + 8)[0]
header_size = 16
elif size == 0:
size = end - pos
if size < header_size:
break
box_end = pos + size
if box_end > end and not allow_truncated:
break
yield Box(typ, absolute_base + pos, header_size, absolute_base + box_end)
pos = box_end
def _find_video_trak(moov: bytes) -> tuple[Box, bytes]:
for trak in _children(moov, 0, len(moov)):
if trak.type != b"trak":
continue
payload = moov[trak.payload_start : trak.end]
hdlr = _find_descendant(payload, [b"mdia", b"hdlr"])
if hdlr[8:12] == b"vide":
return trak, payload
raise ValueError("No video track found")
def _find_descendant(data: bytes, path: list[bytes]) -> bytes:
current = data
for typ in path:
box = _find_child(current, typ)
current = current[box.payload_start : box.end]
return current
def _find_child(data: bytes, typ: bytes) -> Box:
for box in _children(data, 0, len(data)):
if box.type == typ:
return box
raise ValueError(f"Missing MP4 box {typ.decode('latin1')}")
def _children(data: bytes, start: int, end: int) -> Iterable[Box]:
return iter_boxes(data, start, end, absolute_base=0)
def _one(boxes: list[Box], typ: bytes, *, required: bool = True) -> Box | None:
matches = [box for box in boxes if box.type == typ]
if not matches and required:
raise ValueError(f"Missing MP4 box {typ.decode('latin1')}")
return matches[0] if matches else None
def _payload(parent: bytes, typ: bytes) -> bytes:
box = _find_child(parent, typ)
return parent[box.payload_start : box.end]
def _parse_mvhd(payload: bytes) -> tuple[int, int]:
version = payload[0]
if version == 1:
return struct.unpack_from(">IQ", payload, 20)
return struct.unpack_from(">II", payload, 12)
def _parse_mdhd(payload: bytes) -> tuple[int, int]:
version = payload[0]
if version == 1:
return struct.unpack_from(">IQ", payload, 20)
return struct.unpack_from(">II", payload, 12)
def _parse_tkhd(payload: bytes) -> dict[str, int]:
version = payload[0]
if version == 1:
track_id = struct.unpack_from(">I", payload, 20)[0]
duration = struct.unpack_from(">Q", payload, 28)[0]
width, height = struct.unpack_from(">II", payload, 88)
else:
track_id = struct.unpack_from(">I", payload, 12)[0]
duration = struct.unpack_from(">I", payload, 20)[0]
width, height = struct.unpack_from(">II", payload, 76)
return {"track_id": track_id, "duration": duration, "width": width >> 16, "height": height >> 16}
def _parse_stsd_codec(stsd_body: bytes) -> str:
if len(stsd_body) < 16:
return "unknown"
return stsd_body[12:16].decode("latin1")
def _parse_stts(payload: bytes) -> list[tuple[int, int]]:
count = struct.unpack_from(">I", payload, 4)[0]
out = []
offset = 8
for _ in range(count):
out.append(struct.unpack_from(">II", payload, offset))
offset += 8
return out
def _expand_stts(entries: list[tuple[int, int]], sample_count: int) -> np.ndarray:
values = np.empty(sample_count, dtype=np.int64)
pos = 0
for count, delta in entries:
values[pos : pos + count] = delta
pos += count
if pos != sample_count:
raise ValueError(f"stts describes {pos} samples, stsz describes {sample_count}")
return values
def _parse_stsz(payload: bytes) -> np.ndarray:
sample_size, sample_count = struct.unpack_from(">II", payload, 4)
if sample_size:
return np.full(sample_count, sample_size, dtype=np.int64)
offset = 12
values = np.empty(sample_count, dtype=np.int64)
for idx in range(sample_count):
values[idx] = struct.unpack_from(">I", payload, offset)[0]
offset += 4
return values
def _parse_stsc(payload: bytes) -> list[tuple[int, int, int]]:
count = struct.unpack_from(">I", payload, 4)[0]
out = []
offset = 8
for _ in range(count):
out.append(struct.unpack_from(">III", payload, offset))
offset += 12
return out
def _parse_chunk_offsets(stbl: bytes) -> np.ndarray:
with_stco = None
with_co64 = None
for box in _children(stbl, 0, len(stbl)):
if box.type == b"stco":
with_stco = stbl[box.payload_start : box.end]
elif box.type == b"co64":
with_co64 = stbl[box.payload_start : box.end]
if with_co64 is not None:
count = struct.unpack_from(">I", with_co64, 4)[0]
return np.array(
[struct.unpack_from(">Q", with_co64, 8 + idx * 8)[0] for idx in range(count)], dtype=np.int64
)
if with_stco is None:
raise ValueError("Missing stco/co64 chunk offsets")
count = struct.unpack_from(">I", with_stco, 4)[0]
return np.array(
[struct.unpack_from(">I", with_stco, 8 + idx * 4)[0] for idx in range(count)], dtype=np.int64
)
def _parse_stss(stbl: bytes, sample_count: int) -> np.ndarray:
for box in _children(stbl, 0, len(stbl)):
if box.type == b"stss":
payload = stbl[box.payload_start : box.end]
count = struct.unpack_from(">I", payload, 4)[0]
return np.array(
[struct.unpack_from(">I", payload, 8 + idx * 4)[0] - 1 for idx in range(count)],
dtype=np.int64,
)
return np.arange(sample_count, dtype=np.int64)
def _sample_offsets(
stsc: list[tuple[int, int, int]], chunk_offsets: np.ndarray, sample_sizes: np.ndarray
) -> np.ndarray:
if not stsc:
raise ValueError("stsc is empty")
offsets = np.empty(len(sample_sizes), dtype=np.int64)
sample_idx = 0
for entry_idx, (first_chunk, samples_per_chunk, _desc_idx) in enumerate(stsc):
next_first = stsc[entry_idx + 1][0] if entry_idx + 1 < len(stsc) else len(chunk_offsets) + 1
for chunk_number in range(first_chunk, next_first):
if chunk_number < 1 or chunk_number > len(chunk_offsets):
raise ValueError("stsc references a chunk outside stco/co64")
chunk_pos = int(chunk_offsets[chunk_number - 1])
for _ in range(samples_per_chunk):
if sample_idx >= len(sample_sizes):
return offsets
offsets[sample_idx] = chunk_pos
chunk_pos += int(sample_sizes[sample_idx])
sample_idx += 1
if sample_idx != len(sample_sizes):
raise ValueError(f"stsc describes {sample_idx} samples, stsz describes {len(sample_sizes)}")
return offsets
def _make_moov(
index: Mp4Index,
durations: np.ndarray,
sizes: np.ndarray,
rel_offsets: np.ndarray,
sync_samples: np.ndarray,
*,
mdat_data_offset: int,
) -> bytes:
duration = int(durations.sum())
stco_values = [int(mdat_data_offset + value) for value in rel_offsets]
if any(value > 0xFFFFFFFF for value in stco_values):
offset_box = _co64(stco_values)
else:
offset_box = _stco(stco_values)
stbl = _box(
b"stbl",
_box(b"stsd", index.stsd_body)
+ _stts(durations)
+ _stsc_one_sample_per_chunk(len(sizes))
+ _stsz(sizes)
+ offset_box
+ (_stss(sync_samples) if len(sync_samples) else b""),
)
minf = _box(b"minf", _vmhd() + _dinf() + stbl)
mdia = _box(b"mdia", _mdhd(index.timescale, duration) + _hdlr() + minf)
trak = _box(b"trak", _tkhd(index.track_id, duration, index.width, index.height) + mdia)
return _box(b"moov", _mvhd(index.timescale, duration, index.track_id + 1) + trak)
def _full_box(typ: bytes, version: int, flags: int, payload: bytes = b"") -> bytes:
return _box(typ, bytes([version]) + flags.to_bytes(3, "big") + payload)
def _box(typ: bytes, payload: bytes) -> bytes:
size = len(payload) + 8
if size <= 0xFFFFFFFF:
return struct.pack(">I4s", size, typ) + payload
return struct.pack(">I4sQ", 1, typ, size + 8) + payload
def _mvhd(timescale: int, duration: int, next_track_id: int) -> bytes:
matrix = struct.pack(">9I", 0x00010000, 0, 0, 0, 0x00010000, 0, 0, 0, 0x40000000)
payload = (
struct.pack(">IIII", 0, 0, timescale, duration)
+ struct.pack(">IHH", 0x00010000, 0x0100, 0)
+ b"\0" * 8
+ matrix
+ b"\0" * 24
+ struct.pack(">I", next_track_id)
)
return _full_box(b"mvhd", 0, 0, payload)
def _tkhd(track_id: int, duration: int, width: int, height: int) -> bytes:
matrix = struct.pack(">9I", 0x00010000, 0, 0, 0, 0x00010000, 0, 0, 0, 0x40000000)
payload = (
struct.pack(">IIIII", 0, 0, track_id, 0, duration)
+ b"\0" * 8
+ struct.pack(">hhhh", 0, 0, 0, 0)
+ matrix
+ struct.pack(">II", width << 16, height << 16)
)
return _full_box(b"tkhd", 0, 7, payload)
def _mdhd(timescale: int, duration: int) -> bytes:
return _full_box(b"mdhd", 0, 0, struct.pack(">IIIIH", 0, 0, timescale, duration, 0x55C4) + b"\0\0")
def _hdlr() -> bytes:
return _full_box(b"hdlr", 0, 0, b"\0" * 4 + b"vide" + b"\0" * 12 + b"VideoHandler\0")
def _vmhd() -> bytes:
return _full_box(b"vmhd", 0, 1, struct.pack(">HHHH", 0, 0, 0, 0))
def _dinf() -> bytes:
url = _full_box(b"url ", 0, 1)
dref = _full_box(b"dref", 0, 0, struct.pack(">I", 1) + url)
return _box(b"dinf", dref)
def _stts(durations: np.ndarray) -> bytes:
runs = []
for duration in durations.tolist():
if runs and runs[-1][1] == int(duration):
runs[-1][0] += 1
else:
runs.append([1, int(duration)])
payload = struct.pack(">I", len(runs)) + b"".join(
struct.pack(">II", count, delta) for count, delta in runs
)
return _full_box(b"stts", 0, 0, payload)
def _stsc_one_sample_per_chunk(sample_count: int) -> bytes:
return _full_box(b"stsc", 0, 0, struct.pack(">IIII", 1, 1, 1, 1))
def _stsz(sizes: np.ndarray) -> bytes:
return _full_box(
b"stsz",
0,
0,
struct.pack(">II", 0, len(sizes)) + b"".join(struct.pack(">I", int(size)) for size in sizes.tolist()),
)
def _stco(values: list[int]) -> bytes:
return _full_box(
b"stco", 0, 0, struct.pack(">I", len(values)) + b"".join(struct.pack(">I", v) for v in values)
)
def _co64(values: list[int]) -> bytes:
return _full_box(
b"co64", 0, 0, struct.pack(">I", len(values)) + b"".join(struct.pack(">Q", v) for v in values)
)
def _stss(values: np.ndarray) -> bytes:
return _full_box(
b"stss",
0,
0,
struct.pack(">I", len(values)) + b"".join(struct.pack(">I", int(value)) for value in values.tolist()),
)
@@ -0,0 +1,121 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
import json
import struct
import numpy as np
import pytest
from lerobot.datasets.episode_video_streaming import assert_hf_hub_range_cache_branch
from lerobot.datasets.mp4 import (
_box,
_co64,
_dinf,
_hdlr,
_mdhd,
_mvhd,
_stco,
_stsc_one_sample_per_chunk,
_stss,
_stsz,
_stts,
_tkhd,
_vmhd,
parse_mp4_index,
synthesize_mp4,
)
def _minimal_mp4(sample_offsets: list[int], *, use_co64: bool = False) -> bytes:
ftyp = _box(b"ftyp", b"isom\0\0\2\0isomiso2mp41")
sizes = np.array([10, 10, 10], dtype=np.int64)
durations = np.array([1000, 1000, 1000], dtype=np.int64)
stsd_body = struct.pack(">II", 0, 1) + struct.pack(">I4s", 16, b"avc1") + b"\0" * 8
offsets = _co64(sample_offsets) if use_co64 else _stco(sample_offsets)
stbl = _box(
b"stbl",
_box(b"stsd", stsd_body)
+ _stts(durations)
+ _stsc_one_sample_per_chunk(len(sizes))
+ _stsz(sizes)
+ offsets
+ _stss(np.array([1], dtype=np.int64)),
)
minf = _box(b"minf", _vmhd() + _dinf() + stbl)
mdia = _box(b"mdia", _mdhd(1000, 3000) + _hdlr() + minf)
trak = _box(b"trak", _tkhd(1, 3000, 64, 48) + mdia)
moov = _box(b"moov", _mvhd(1000, 3000, 2) + trak)
mdat_payload_start = 10_000
free_size = mdat_payload_start - 8 - len(ftyp) - len(moov)
assert free_size >= 8
free = _box(b"free", b"\0" * (free_size - 8))
return ftyp + moov + free + _box(b"mdat", b"x" * 128)
def test_episode_slice_uses_min_max_sample_offsets_for_reordered_chunks():
mp4 = parse_mp4_index("test.mp4", _minimal_mp4([10_000, 10_050, 10_025]))
sample_slice = mp4.sample_slice(0.0, 2.0, keyframe_pad_s=0, keyframe_pad_fraction=0)
assert sample_slice.byte_offset == 10_000
assert sample_slice.byte_length == 60
assert sample_slice.sample_lo == 0
assert sample_slice.sample_hi == 2
def test_synthesized_mp4_rebases_one_chunk_per_sample_offsets():
mp4 = parse_mp4_index("test.mp4", _minimal_mp4([10_000, 10_050, 10_025]))
sample_slice = mp4.sample_slice(0.0, 2.0, keyframe_pad_s=0, keyframe_pad_fraction=0)
mini = synthesize_mp4(mp4, sample_slice, b"x" * sample_slice.byte_length)
mini_index = parse_mp4_index("mini.mp4", mini)
expected = np.array([0, 50, 25], dtype=np.int64) + mini_index.mdat_payload_offset
np.testing.assert_array_equal(mini_index.sample_offsets, expected)
np.testing.assert_array_equal(mini_index.sample_sizes, np.array([10, 10, 10]))
def test_parser_accepts_co64_chunk_offsets():
mp4 = parse_mp4_index("test.mp4", _minimal_mp4([10_000, 10_050, 10_025], use_co64=True))
np.testing.assert_array_equal(mp4.sample_offsets, np.array([10_000, 10_050, 10_025]))
def test_hf_hub_branch_assertion_accepts_requested_revision(monkeypatch):
class FakeDist:
def read_text(self, name):
assert name == "direct_url.json"
return json.dumps(
{
"url": "https://github.com/huggingface/huggingface_hub.git",
"vcs_info": {"requested_revision": "feat/hffs-cache-cdn-range-reads"},
}
)
monkeypatch.setattr(
"lerobot.datasets.episode_video_streaming.metadata.distribution", lambda _: FakeDist()
)
assert_hf_hub_range_cache_branch()
def test_hf_hub_branch_assertion_rejects_plain_install(monkeypatch):
class FakeDist:
def read_text(self, name):
assert name == "direct_url.json"
return json.dumps({"url": "https://github.com/huggingface/huggingface_hub.git"})
monkeypatch.setattr(
"lerobot.datasets.episode_video_streaming.metadata.distribution", lambda _: FakeDist()
)
with pytest.raises(AssertionError):
assert_hf_hub_range_cache_branch()
Generated
+8 -16
View File
@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.12"
resolution-markers = [
"(python_full_version >= '3.15' and platform_machine == 'AMD64' and sys_platform == 'linux') or (python_full_version >= '3.15' and platform_machine == 'x86_64' and sys_platform == 'linux')",
@@ -1089,8 +1089,8 @@ wheels = [
[[package]]
name = "datasets"
version = "4.8.5"
source = { registry = "https://pypi.org/simple" }
version = "5.0.1.dev0"
source = { git = "https://github.com/huggingface/datasets.git?branch=main#06fcc085fcdd22fc5cc741954f6187dd879543b6" }
dependencies = [
{ name = "dill" },
{ name = "filelock" },
@@ -1107,10 +1107,6 @@ dependencies = [
{ name = "tqdm" },
{ name = "xxhash" },
]
sdist = { url = "https://files.pythonhosted.org/packages/66/34/14cd8e76f907f7d4dca2334cfeec9f81d30fd15c25a015f99aaea694eaed/datasets-4.8.5.tar.gz", hash = "sha256:0f0c1c3d56ffff2c93b2f4c63c95bac94f3d7e8621aea2a2a576275233bba772", size = 605649, upload-time = "2026-04-27T15:43:57.384Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/65/99/00f3196036501b53032c4b1ab8337a0b978dee832ed276dae3815df4e8b5/datasets-4.8.5-py3-none-any.whl", hash = "sha256:5079900781719c0e063a8efdd2cd95a31ad0c63209178669cd23cf1b926149ff", size = 528973, upload-time = "2026-04-27T15:43:53.702Z" },
]
[[package]]
name = "debugpy"
@@ -1147,7 +1143,7 @@ name = "decord"
version = "0.6.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy", marker = "(platform_machine != 'arm64' and sys_platform == 'darwin') or (platform_machine == 'AMD64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "numpy", marker = "(platform_machine != 'arm64' and platform_machine != 's390x' and sys_platform == 'darwin') or (platform_machine == 'AMD64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux') or (platform_machine != 's390x' and sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/11/79/936af42edf90a7bd4e41a6cac89c913d4b47fa48a26b042d5129a9242ee3/decord-0.6.0-py3-none-manylinux2010_x86_64.whl", hash = "sha256:51997f20be8958e23b7c4061ba45d0efcd86bffd5fe81c695d0befee0d442976", size = 13602299, upload-time = "2021-06-14T21:30:55.486Z" },
@@ -2050,8 +2046,8 @@ wheels = [
[[package]]
name = "huggingface-hub"
version = "1.19.0"
source = { registry = "https://pypi.org/simple" }
version = "1.20.0.dev0"
source = { git = "https://github.com/huggingface/huggingface_hub.git?branch=feat%2Fhffs-cache-cdn-range-reads#5319b287faa73239bb40df16d69c39e5d6daf0f7" }
dependencies = [
{ name = "click" },
{ name = "filelock" },
@@ -2064,10 +2060,6 @@ dependencies = [
{ name = "typer" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/88/27/629cfe58c582f92ded066c4a07d1a057ff617118ab7973200f770bd853cb/huggingface_hub-1.19.0.tar.gz", hash = "sha256:fd771622182d40977272a923953ee3b1b13538f9f8a7f5d78398f10af0f1c0bd", size = 824721, upload-time = "2026-06-11T12:33:18.665Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b2/a5/558da89f66464d8d0229ff497e8b8666977de2d8cf48c28a2862ecf1250f/huggingface_hub-1.19.0-py3-none-any.whl", hash = "sha256:1dc72e1f6b4d6df6b30eb72e57d00514ef453d660f04af2b87f0e67267f31ee0", size = 693398, upload-time = "2026-06-11T12:33:16.695Z" },
]
[[package]]
name = "hydra-core"
@@ -3187,7 +3179,7 @@ requires-dist = [
{ name = "av", marker = "extra == 'av-dep'", specifier = ">=15.0.0,<16.0.0" },
{ name = "cmake", specifier = ">=3.29.0.1,<4.2.0" },
{ name = "contourpy", marker = "extra == 'matplotlib-dep'", specifier = ">=1.3.0,<2.0.0" },
{ name = "datasets", marker = "extra == 'dataset'", specifier = ">=4.7.0,<5.0.0" },
{ name = "datasets", marker = "extra == 'dataset'", git = "https://github.com/huggingface/datasets.git?branch=main" },
{ name = "debugpy", marker = "extra == 'dev'", specifier = ">=1.8.1,<1.9.0" },
{ name = "decord", marker = "(platform_machine == 'AMD64' and extra == 'groot') or (platform_machine == 'x86_64' and extra == 'groot')", specifier = ">=0.6.0,<1.0.0" },
{ name = "deepdiff", marker = "extra == 'deepdiff-dep'", specifier = ">=7.0.1,<9.0.0" },
@@ -3210,7 +3202,7 @@ requires-dist = [
{ name = "hebi-py", marker = "extra == 'phone'", specifier = ">=2.8.0,<2.12.0" },
{ name = "hf-libero", marker = "sys_platform == 'linux' and extra == 'libero'", specifier = ">=0.1.4,<0.2.0" },
{ name = "hidapi", marker = "extra == 'gamepad'", specifier = ">=0.14.0,<0.15.0" },
{ name = "huggingface-hub", specifier = ">=1.0.0,<2.0.0" },
{ name = "huggingface-hub", git = "https://github.com/huggingface/huggingface_hub.git?branch=feat%2Fhffs-cache-cdn-range-reads" },
{ name = "ipykernel", marker = "extra == 'notebook'", specifier = ">=6.0.0,<7.0.0" },
{ name = "jsonlines", marker = "extra == 'dataset'", specifier = ">=4.0.0,<5.0.0" },
{ name = "jupyter", marker = "extra == 'notebook'", specifier = ">=1.0.0,<2.0.0" },