feat(annotate): Module 1 sees the whole episode as one video block

Replaces keyframe sampling with a single Qwen-VL video block covering
the whole demonstration. The model pools temporally itself and chooses
where to cut subtasks — no stride, no count, no keyframe count knob to
tune.

- frames.py: ``FrameProvider`` gains ``video_for_episode(record,
  max_frames)``; ``VideoFrameProvider`` samples up to ``max_frames``
  uniformly across the episode duration; ``_NullProvider`` returns []
  for the no-video fallback. New ``to_video_block`` helper.
- Module 1: drops keyframe sampling. The subtask prompt now goes out as
  ``[{"type":"video", "video":[<frames>]}, {"type":"text", ...}]`` and
  the prompt template asks the model to "watch the whole clip, then
  segment it" with cut points decided from gripper/contact/regrasp
  events the model sees.
- Module1Config: ``keyframes_per_episode`` removed; replaced with
  ``max_video_frames: int = 32`` (model-capacity bound, not annotation
  logic).
- Test: ``test_module1_attaches_video_block_to_subtask_prompt`` locks in
  the single-video-block invariant.
- Stub-VLM markers updated: tests now key on "atomic subtasks" instead
  of the old "Decompose the demonstration" phrase that no longer
  appears in the prompt.
- Docs: updated to describe the whole-episode video-block behavior and
  the no-video fallback.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-04-27 17:08:36 +02:00
parent 80b7708a61
commit 79ca79cba2
8 changed files with 143 additions and 27 deletions
+59 -2
View File
@@ -45,11 +45,17 @@ class _StubFrameProvider:
sentinel: Any = field(default_factory=lambda: object())
calls: list[tuple[int, tuple[float, ...]]] = field(default_factory=list)
video_calls: list[tuple[int, int]] = field(default_factory=list)
def frames_at(self, record, timestamps):
self.calls.append((record.episode_index, tuple(timestamps)))
return [self.sentinel] * len(timestamps)
def video_for_episode(self, record, max_frames):
self.video_calls.append((record.episode_index, max_frames))
n = min(max_frames, len(record.frame_timestamps))
return [self.sentinel] * n
def _spy_responder(captured: list[list[dict[str, Any]]], reply: Any):
def responder(messages):
@@ -62,14 +68,14 @@ def _spy_responder(captured: list[list[dict[str, Any]]], reply: Any):
def test_module1_plan_memory_subtask_smoke(fixture_dataset_root: Path, tmp_path: Path) -> None:
vlm = make_canned_responder(
{
"Decompose the demonstration": {
"atomic subtasks": {
"subtasks": [
{"text": "grasp the handle of the sponge", "start": 0.0, "end": 0.4},
{"text": "wipe the counter from left to right", "start": 0.4, "end": 0.8},
{"text": "place the sponge into the sink", "start": 0.8, "end": 1.1},
]
},
"write a concise hierarchical PLAN": {"plan": "1. grasp\n2. wipe\n3. place"},
"concise hierarchical PLAN": {"plan": "1. grasp\n2. wipe\n3. place"},
"Update the memory": {"memory": "wiped the counter once"},
},
)
@@ -168,6 +174,57 @@ def test_module3_vqa_unique_per_frame(single_episode_root: Path, tmp_path: Path)
assert ts in frame_set
def test_module1_attaches_video_block_to_subtask_prompt(fixture_dataset_root: Path, tmp_path: Path) -> None:
"""Module 1 sends one ``type=video`` block covering the whole episode."""
captured: list[list[dict[str, Any]]] = []
payload = {
"subtasks": [
{"text": "grasp the handle of the sponge", "start": 0.0, "end": 0.5},
{"text": "wipe the counter", "start": 0.5, "end": 1.1},
]
}
plan_payload = {"plan": "1. grasp\n2. wipe"}
memory_payload = {"memory": "wiped once"}
def responder(messages):
captured.append(list(messages))
text = ""
for m in messages:
for block in m.get("content", []):
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if "concise hierarchical PLAN" in text:
return plan_payload
if "Update the memory" in text:
return memory_payload
return payload
provider = _StubFrameProvider()
module = PlanSubtasksMemoryModule(
vlm=StubVlmClient(responder=responder),
config=Module1Config(max_video_frames=5),
frame_provider=provider,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
# the subtask call (the first VLM call) must carry exactly one video block
assert captured, "no VLM calls made"
first_call = captured[0]
content = first_call[0]["content"]
video_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "video"]
image_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "image"]
text_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "text"]
assert len(video_blocks) == 1, f"expected exactly 1 video block, got {content}"
assert image_blocks == [], "subtask prompt must not mix image blocks with the video block"
assert len(text_blocks) == 1
# video block must wrap a list of frames covering the episode
assert isinstance(video_blocks[0]["video"], list)
assert len(video_blocks[0]["video"]) <= 5
assert provider.video_calls == [(record.episode_index, 5)]
def test_module3_attaches_frame_image_block_to_prompt(single_episode_root: Path, tmp_path: Path) -> None:
"""Each VQA prompt must carry a single image block at the emission frame."""
captured: list[list[dict[str, Any]]] = []