feat(annotate): emit VQA per-camera and propagate camera field

Module 3 now produces one (vqa, user) + (vqa, assistant) pair per
emission tick *per camera* rather than only against the dataset's first
camera. Each emitted row carries the `camera` field added in PR 1
(language-columns), so the resolver can disambiguate per-camera VQA via
`emitted_at(t, style=vqa, role=assistant, camera=...)` without ambiguity.

- `frames.py`: `FrameProvider` Protocol gains a `camera_keys` property
  and a `camera_key=` argument on `frames_at` / `video_for_episode`.
  `VideoFrameProvider` exposes every `observation.images.*` key the
  dataset declares (not just the first) and keys its decode cache on
  `(episode, camera, timestamp)` so per-camera reads don't collide.
  Module 1 / 2 keep their old single-camera behaviour by leaving
  `camera_key=None` (falls back to the default camera).
- `modules/general_vqa.py`: `run_episode` iterates `frame_provider
  .camera_keys` for each emission tick, builds one prompt per camera,
  batches all of them through the VLM, and stamps the resulting rows
  with `camera=<that key>`. Empty `camera_keys` (null provider) makes
  the module a no-op rather than silently emitting untagged rows.
- `writer.py`: `_normalize_persistent_row` / `_normalize_event_row`
  carry `camera` through and call `validate_camera_field` so the
  invariant is enforced at the writer boundary. Event sort key now
  includes `camera` for deterministic ordering when several cameras
  share `(timestamp, style, role)`. `speech_atom` sets `camera=None`.
- `validator.py`: `StagingValidator` gains a `dataset_camera_keys`
  field; `_check_camera_field` enforces the invariant and cross-checks
  every view-dependent row's `camera` against the dataset's known video
  keys. New `_check_vqa_uniqueness_per_frame_camera` flags duplicate
  `(vqa, role)` pairs at the same `(t, camera)`.
- `lerobot_annotate.py`: passes the live frame provider's
  `camera_keys` into the validator so the cross-check uses the actual
  dataset camera set.
- Tests: `_StubFrameProvider` exposes `camera_keys` and accepts the new
  `camera_key=` kwarg. `test_module3_vqa_unique_per_frame_and_camera`
  configures two cameras and asserts both are represented, that every
  emitted row has a `camera` tag, and that uniqueness holds per
  `(timestamp, camera, role)`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-04-30 10:48:33 +02:00
parent d0388e1142
commit 1217fdb6f0
6 changed files with 265 additions and 52 deletions
+37 -15
View File
@@ -44,15 +44,20 @@ class _StubFrameProvider:
"""Returns one sentinel object per requested timestamp."""
sentinel: Any = field(default_factory=lambda: object())
calls: list[tuple[int, tuple[float, ...]]] = field(default_factory=list)
video_calls: list[tuple[int, int]] = field(default_factory=list)
cameras: tuple[str, ...] = ("observation.images.top",)
calls: list[tuple[int, tuple[float, ...], str | None]] = field(default_factory=list)
video_calls: list[tuple[int, int, str | None]] = field(default_factory=list)
def frames_at(self, record, timestamps):
self.calls.append((record.episode_index, tuple(timestamps)))
@property
def camera_keys(self) -> list[str]:
return list(self.cameras)
def frames_at(self, record, timestamps, camera_key=None):
self.calls.append((record.episode_index, tuple(timestamps), camera_key))
return [self.sentinel] * len(timestamps)
def video_for_episode(self, record, max_frames):
self.video_calls.append((record.episode_index, max_frames))
def video_for_episode(self, record, max_frames, camera_key=None):
self.video_calls.append((record.episode_index, max_frames, camera_key))
n = min(max_frames, len(record.frame_timestamps))
return [self.sentinel] * n
@@ -148,7 +153,7 @@ def test_module2_mid_episode_emits_paired_interjection_and_speech(
assert any(abs(s["timestamp"] - inter_t) < 1e-9 for s in speeches)
def test_module3_vqa_unique_per_frame(single_episode_root: Path, tmp_path: Path) -> None:
def test_module3_vqa_unique_per_frame_and_camera(single_episode_root: Path, tmp_path: Path) -> None:
payload = {
"question": "How many cups?",
"answer": {"label": "cup", "count": 2, "note": "white & blue"},
@@ -158,19 +163,34 @@ def test_module3_vqa_unique_per_frame(single_episode_root: Path, tmp_path: Path)
vlm=vlm,
config=Module3Config(vqa_emission_hz=1.0, K=3),
seed=1,
frame_provider=_StubFrameProvider(
cameras=("observation.images.top", "observation.images.wrist")
),
)
record = next(iter_episodes(single_episode_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("module_3")
user_ts = [r["timestamp"] for r in rows if r["role"] == "user" and r["style"] == "vqa"]
assistant_ts = [r["timestamp"] for r in rows if r["role"] == "assistant" and r["style"] == "vqa"]
# at most one user (vqa) per frame; same for assistant
assert len(user_ts) == len(set(user_ts))
assert len(assistant_ts) == len(set(assistant_ts))
# every vqa row must carry a camera tag and one of the configured cameras
for r in rows:
assert r["style"] == "vqa"
assert r.get("camera") in {"observation.images.top", "observation.images.wrist"}
# at most one (vqa, user) and one (vqa, assistant) per (timestamp, camera)
user_keys = [
(r["timestamp"], r["camera"]) for r in rows if r["role"] == "user" and r["style"] == "vqa"
]
assistant_keys = [
(r["timestamp"], r["camera"])
for r in rows
if r["role"] == "assistant" and r["style"] == "vqa"
]
assert len(user_keys) == len(set(user_keys))
assert len(assistant_keys) == len(set(assistant_keys))
# both cameras must be represented
assert {c for _, c in user_keys} == {"observation.images.top", "observation.images.wrist"}
# every emitted timestamp must be an exact source frame timestamp
frame_set = set(record.frame_timestamps)
for ts in user_ts + assistant_ts:
for ts, _ in user_keys + assistant_keys:
assert ts in frame_set
@@ -254,11 +274,12 @@ def test_module3_attaches_frame_image_block_to_prompt(single_episode_root: Path,
assert len(image_blocks) == 1, f"expected 1 image block per VQA prompt, got {content}"
assert image_blocks[0]["image"] is provider.sentinel
assert len(text_blocks) == 1
# provider was called once per emission with the exact emission timestamp
for ep_idx, ts_tuple in provider.calls:
# provider was called once per emission per camera with the exact emission timestamp
for ep_idx, ts_tuple, camera in provider.calls:
assert ep_idx == record.episode_index
assert len(ts_tuple) == 1
assert ts_tuple[0] in record.frame_timestamps
assert camera in provider.cameras
def test_module3_assistant_content_is_valid_json(single_episode_root: Path, tmp_path: Path) -> None:
@@ -271,6 +292,7 @@ def test_module3_assistant_content_is_valid_json(single_episode_root: Path, tmp_
vlm=vlm,
config=Module3Config(vqa_emission_hz=1.0, K=2),
seed=2,
frame_provider=_StubFrameProvider(),
)
record = next(iter_episodes(single_episode_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)