feat(annotate): Module 1 sees the whole episode as one video block

Replaces keyframe sampling with a single Qwen-VL video block covering
the whole demonstration. The model pools temporally itself and chooses
where to cut subtasks — no stride, no count, no keyframe count knob to
tune.

- frames.py: ``FrameProvider`` gains ``video_for_episode(record,
  max_frames)``; ``VideoFrameProvider`` samples up to ``max_frames``
  uniformly across the episode duration; ``_NullProvider`` returns []
  for the no-video fallback. New ``to_video_block`` helper.
- Module 1: drops keyframe sampling. The subtask prompt now goes out as
  ``[{"type":"video", "video":[<frames>]}, {"type":"text", ...}]`` and
  the prompt template asks the model to "watch the whole clip, then
  segment it" with cut points decided from gripper/contact/regrasp
  events the model sees.
- Module1Config: ``keyframes_per_episode`` removed; replaced with
  ``max_video_frames: int = 32`` (model-capacity bound, not annotation
  logic).
- Test: ``test_module1_attaches_video_block_to_subtask_prompt`` locks in
  the single-video-block invariant.
- Stub-VLM markers updated: tests now key on "atomic subtasks" instead
  of the old "Decompose the demonstration" phrase that no longer
  appears in the prompt.
- Docs: updated to describe the whole-episode video-block behavior and
  the no-video fallback.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-04-27 17:08:36 +02:00
parent 4f4dd49972
commit 9d5aa1c63e
8 changed files with 143 additions and 27 deletions
+14 -5
View File
@@ -36,11 +36,20 @@ uv run lerobot-annotate \
--vlm.tensor_parallel_size=2 --vlm.tensor_parallel_size=2
``` ```
The pipeline attaches camera keyframes to every Module 1/2/3 prompt by The pipeline attaches actual camera footage to every Module 1/2/3 prompt
default, decoded from the dataset's first `observation.images.*` stream. by default, decoded from the dataset's first `observation.images.*`
Override with `--vlm.camera_key=observation.images.<name>` to pin a stream. Override with `--vlm.camera_key=observation.images.<name>` to
specific viewpoint. Datasets with no video tracks fall back to text-only pin a specific viewpoint. Datasets with no video tracks fall back to
prompts automatically. text-only prompts automatically.
**Module 1 sees the whole episode as one video block.** Subtask
decomposition gets a `{"type":"video", "video":[<frames>]}` block
covering the entire demonstration; Qwen-VL pools temporally on its own
and decides where to cut. There is no keyframe stride or count knob —
`--module_1.max_video_frames` (default 32) only caps the frames packed
into the video block as a model-capacity bound. Module 2 attaches a
single still frame at the interjection timestamp; Module 3 attaches the
exact emission frame to each VQA pair.
The executor picks `LocalPipelineExecutor` for small datasets and The executor picks `LocalPipelineExecutor` for small datasets and
`SlurmPipelineExecutor` for large ones based on `SlurmPipelineExecutor` for large ones based on
@@ -23,10 +23,17 @@ from typing import Literal
@dataclass @dataclass
class Module1Config: class Module1Config:
"""Module 1 hyperparameters: plan + subtasks + memory.""" """Module 1 hyperparameters: plan + subtasks + memory.
Subtask decomposition sees the **whole episode** as one Qwen-VL video
block — no keyframe stride or count: the model handles temporal pooling
itself and decides where to cut. ``max_video_frames`` only caps the
number of frames packed into the video block (a model-capacity bound,
not an annotation-logic knob).
"""
enabled: bool = True enabled: bool = True
keyframes_per_episode: int = 8 max_video_frames: int = 32
min_subtask_seconds: float = 1.5 min_subtask_seconds: float = 1.5
plan_max_steps: int = 8 plan_max_steps: int = 8
@@ -37,6 +37,15 @@ class FrameProvider(Protocol):
def frames_at(self, record: EpisodeRecord, timestamps: list[float]) -> list[Any]: def frames_at(self, record: EpisodeRecord, timestamps: list[float]) -> list[Any]:
"""Return one PIL.Image per timestamp; empty list if no camera available.""" """Return one PIL.Image per timestamp; empty list if no camera available."""
def video_for_episode(self, record: EpisodeRecord, max_frames: int) -> list[Any]:
"""Return up to ``max_frames`` PIL images covering the whole episode.
Sampling is uniform across the episode duration. The returned list is
intended to be passed as one ``{"type":"video", "video":<list>}``
block to a Qwen-VL-compatible model that pools temporally itself.
Empty list if no camera available.
"""
@dataclass @dataclass
class _NullProvider: class _NullProvider:
@@ -45,6 +54,9 @@ class _NullProvider:
def frames_at(self, record: EpisodeRecord, timestamps: list[float]) -> list[Any]: def frames_at(self, record: EpisodeRecord, timestamps: list[float]) -> list[Any]:
return [] return []
def video_for_episode(self, record: EpisodeRecord, max_frames: int) -> list[Any]:
return []
def null_provider() -> FrameProvider: def null_provider() -> FrameProvider:
return _NullProvider() return _NullProvider()
@@ -133,6 +145,27 @@ class VideoFrameProvider:
out.append(Image.fromarray(hwc, mode="RGB")) out.append(Image.fromarray(hwc, mode="RGB"))
return out return out
def video_for_episode(self, record: EpisodeRecord, max_frames: int) -> list[Any]:
"""Return up to ``max_frames`` images uniformly sampled across the episode.
The whole episode duration is covered; the model picks subtask
boundaries from the temporal pooling it does internally.
"""
if max_frames <= 0 or self.camera_key is None or not record.frame_timestamps:
return []
n_frames = min(max_frames, len(record.frame_timestamps))
if n_frames == len(record.frame_timestamps):
timestamps = list(record.frame_timestamps)
else:
t0 = record.frame_timestamps[0]
t_last = record.frame_timestamps[-1]
if t_last <= t0:
timestamps = [float(t0)] * n_frames
else:
step = (t_last - t0) / (n_frames - 1) if n_frames > 1 else 0.0
timestamps = [float(t0 + i * step) for i in range(n_frames)]
return self.frames_at(record, timestamps)
def make_frame_provider(root: Path, camera_key: str | None = None) -> FrameProvider: def make_frame_provider(root: Path, camera_key: str | None = None) -> FrameProvider:
"""Build a :class:`VideoFrameProvider` if videos are present, else null.""" """Build a :class:`VideoFrameProvider` if videos are present, else null."""
@@ -148,3 +181,14 @@ def make_frame_provider(root: Path, camera_key: str | None = None) -> FrameProvi
def to_image_blocks(images: list[Any]) -> list[dict[str, Any]]: def to_image_blocks(images: list[Any]) -> list[dict[str, Any]]:
"""Convert PIL images to Qwen-VL-compatible content blocks.""" """Convert PIL images to Qwen-VL-compatible content blocks."""
return [{"type": "image", "image": img} for img in images] return [{"type": "image", "image": img} for img in images]
def to_video_block(images: list[Any]) -> list[dict[str, Any]]:
"""Wrap a list of PIL images as one Qwen-VL video block.
Returns ``[]`` when the list is empty, so the caller can splat the result
into a content array without a separate emptiness check.
"""
if not images:
return []
return [{"type": "video", "video": list(images)}]
@@ -22,9 +22,9 @@ from dataclasses import dataclass, field
from typing import Any from typing import Any
from ..config import Module1Config from ..config import Module1Config
from ..frames import FrameProvider, null_provider, to_image_blocks from ..frames import FrameProvider, null_provider, to_video_block
from ..prompts import load as load_prompt from ..prompts import load as load_prompt
from ..reader import EpisodeRecord, keyframe_indices from ..reader import EpisodeRecord
from ..staging import EpisodeStaging from ..staging import EpisodeStaging
from ..vlm_client import VlmClient from ..vlm_client import VlmClient
@@ -151,17 +151,14 @@ class PlanSubtasksMemoryModule:
if record.row_count == 0 or not record.frame_timestamps: if record.row_count == 0 or not record.frame_timestamps:
return [] return []
episode_duration = record.frame_timestamps[-1] - record.frame_timestamps[0] episode_duration = record.frame_timestamps[-1] - record.frame_timestamps[0]
keyframe_local = keyframe_indices(record, self.config.keyframes_per_episode) video_frames = self.frame_provider.video_for_episode(record, self.config.max_video_frames)
keyframe_ts = [float(record.frame_timestamps[i]) for i in keyframe_local]
images = self.frame_provider.frames_at(record, keyframe_ts)
prompt = load_prompt("module_1_subtasks").format( prompt = load_prompt("module_1_subtasks").format(
episode_task=record.episode_task, episode_task=record.episode_task,
num_keyframes=len(keyframe_local),
min_subtask_seconds=self.config.min_subtask_seconds, min_subtask_seconds=self.config.min_subtask_seconds,
max_steps=self.config.plan_max_steps, max_steps=self.config.plan_max_steps,
episode_duration=f"{episode_duration:.3f}", episode_duration=f"{episode_duration:.3f}",
) )
content = [*to_image_blocks(images), {"type": "text", "text": prompt}] content = [*to_video_block(video_frames), {"type": "text", "text": prompt}]
messages = [{"role": "user", "content": content}] messages = [{"role": "user", "content": content}]
result = self.vlm.generate_json([messages])[0] result = self.vlm.generate_json([messages])[0]
spans = result.get("subtasks") if isinstance(result, dict) else None spans = result.get("subtasks") if isinstance(result, dict) else None
@@ -2,20 +2,22 @@ You are labeling a teleoperated robot demonstration.
The user originally asked: "{episode_task}" The user originally asked: "{episode_task}"
You will be shown {num_keyframes} keyframes spaced evenly across the You are shown the entire demonstration as a single video. Watch the
episode. Decompose the demonstration into a list of consecutive atomic whole clip, then segment it into a list of consecutive atomic subtasks
subtasks the robot performs. the robot performs.
Authoring rules — based on Hi Robot (Shi 2025) atom granularity and Pi0.7 Authoring rules — based on Hi Robot (Shi 2025) atom granularity and
(Physical Intelligence 2025) "how, not what" detail: Pi0.7 (Physical Intelligence 2025) "how, not what" detail:
- Each subtask is one atomic skill the low-level policy can execute, e.g. - Each subtask is one atomic skill the low-level policy can execute,
"pick up one piece of lettuce", "place the bowl into the box", e.g. "pick up one piece of lettuce", "place the bowl into the box",
"move the right arm to the left". "move the right arm to the left".
- Capture HOW the subtask is performed, not only WHAT — e.g. prefer - Capture HOW the subtask is performed, not only WHAT — e.g. prefer
"grasp the handle of the sponge with the left hand" to "pick up the "grasp the handle of the sponge with the left hand" to "pick up the
sponge". sponge".
- Subtasks are non-overlapping and must cover the full episode in order. - Subtasks are non-overlapping and cover the full episode in order.
Choose the cut points yourself based on what you see in the video
(gripper open/close events, contact, regrasps, transitions).
- Each subtask spans at least {min_subtask_seconds} seconds. - Each subtask spans at least {min_subtask_seconds} seconds.
- Do not exceed {max_steps} subtasks total. - Do not exceed {max_steps} subtasks total.
- Every subtask's [start_time, end_time] must lie within - Every subtask's [start_time, end_time] must lie within
+1 -1
View File
@@ -79,7 +79,7 @@ def _stub_responder(messages):
text = block.get("text", "") text = block.get("text", "")
elif isinstance(content, str): elif isinstance(content, str):
text = content text = content
if "Decompose the demonstration" in text: if "atomic subtasks" in text:
return { return {
"subtasks": [ "subtasks": [
{"text": "grasp the bottle", "start": 0.0, "end": 1.0}, {"text": "grasp the bottle", "start": 0.0, "end": 1.0},
+59 -2
View File
@@ -45,11 +45,17 @@ class _StubFrameProvider:
sentinel: Any = field(default_factory=lambda: object()) sentinel: Any = field(default_factory=lambda: object())
calls: list[tuple[int, tuple[float, ...]]] = field(default_factory=list) calls: list[tuple[int, tuple[float, ...]]] = field(default_factory=list)
video_calls: list[tuple[int, int]] = field(default_factory=list)
def frames_at(self, record, timestamps): def frames_at(self, record, timestamps):
self.calls.append((record.episode_index, tuple(timestamps))) self.calls.append((record.episode_index, tuple(timestamps)))
return [self.sentinel] * len(timestamps) return [self.sentinel] * len(timestamps)
def video_for_episode(self, record, max_frames):
self.video_calls.append((record.episode_index, max_frames))
n = min(max_frames, len(record.frame_timestamps))
return [self.sentinel] * n
def _spy_responder(captured: list[list[dict[str, Any]]], reply: Any): def _spy_responder(captured: list[list[dict[str, Any]]], reply: Any):
def responder(messages): def responder(messages):
@@ -62,14 +68,14 @@ def _spy_responder(captured: list[list[dict[str, Any]]], reply: Any):
def test_module1_plan_memory_subtask_smoke(fixture_dataset_root: Path, tmp_path: Path) -> None: def test_module1_plan_memory_subtask_smoke(fixture_dataset_root: Path, tmp_path: Path) -> None:
vlm = make_canned_responder( vlm = make_canned_responder(
{ {
"Decompose the demonstration": { "atomic subtasks": {
"subtasks": [ "subtasks": [
{"text": "grasp the handle of the sponge", "start": 0.0, "end": 0.4}, {"text": "grasp the handle of the sponge", "start": 0.0, "end": 0.4},
{"text": "wipe the counter from left to right", "start": 0.4, "end": 0.8}, {"text": "wipe the counter from left to right", "start": 0.4, "end": 0.8},
{"text": "place the sponge into the sink", "start": 0.8, "end": 1.1}, {"text": "place the sponge into the sink", "start": 0.8, "end": 1.1},
] ]
}, },
"write a concise hierarchical PLAN": {"plan": "1. grasp\n2. wipe\n3. place"}, "concise hierarchical PLAN": {"plan": "1. grasp\n2. wipe\n3. place"},
"Update the memory": {"memory": "wiped the counter once"}, "Update the memory": {"memory": "wiped the counter once"},
}, },
) )
@@ -168,6 +174,57 @@ def test_module3_vqa_unique_per_frame(single_episode_root: Path, tmp_path: Path)
assert ts in frame_set assert ts in frame_set
def test_module1_attaches_video_block_to_subtask_prompt(fixture_dataset_root: Path, tmp_path: Path) -> None:
"""Module 1 sends one ``type=video`` block covering the whole episode."""
captured: list[list[dict[str, Any]]] = []
payload = {
"subtasks": [
{"text": "grasp the handle of the sponge", "start": 0.0, "end": 0.5},
{"text": "wipe the counter", "start": 0.5, "end": 1.1},
]
}
plan_payload = {"plan": "1. grasp\n2. wipe"}
memory_payload = {"memory": "wiped once"}
def responder(messages):
captured.append(list(messages))
text = ""
for m in messages:
for block in m.get("content", []):
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if "concise hierarchical PLAN" in text:
return plan_payload
if "Update the memory" in text:
return memory_payload
return payload
provider = _StubFrameProvider()
module = PlanSubtasksMemoryModule(
vlm=StubVlmClient(responder=responder),
config=Module1Config(max_video_frames=5),
frame_provider=provider,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
# the subtask call (the first VLM call) must carry exactly one video block
assert captured, "no VLM calls made"
first_call = captured[0]
content = first_call[0]["content"]
video_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "video"]
image_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "image"]
text_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "text"]
assert len(video_blocks) == 1, f"expected exactly 1 video block, got {content}"
assert image_blocks == [], "subtask prompt must not mix image blocks with the video block"
assert len(text_blocks) == 1
# video block must wrap a list of frames covering the episode
assert isinstance(video_blocks[0]["video"], list)
assert len(video_blocks[0]["video"]) <= 5
assert provider.video_calls == [(record.episode_index, 5)]
def test_module3_attaches_frame_image_block_to_prompt(single_episode_root: Path, tmp_path: Path) -> None: def test_module3_attaches_frame_image_block_to_prompt(single_episode_root: Path, tmp_path: Path) -> None:
"""Each VQA prompt must carry a single image block at the emission frame.""" """Each VQA prompt must carry a single image block at the emission frame."""
captured: list[list[dict[str, Any]]] = [] captured: list[list[dict[str, Any]]] = []
@@ -49,14 +49,14 @@ _RECIPE_PATH = (
def _build_executor() -> Executor: def _build_executor() -> Executor:
vlm = make_canned_responder( vlm = make_canned_responder(
{ {
"Decompose the demonstration": { "atomic subtasks": {
"subtasks": [ "subtasks": [
{"text": "grasp the bottle", "start": 0.0, "end": 0.5}, {"text": "grasp the bottle", "start": 0.0, "end": 0.5},
{"text": "pour into the cup", "start": 0.5, "end": 1.0}, {"text": "pour into the cup", "start": 0.5, "end": 1.0},
{"text": "place the bottle down", "start": 1.0, "end": 1.5}, {"text": "place the bottle down", "start": 1.0, "end": 1.5},
] ]
}, },
"write a concise hierarchical PLAN": {"plan": "1. grasp\n2. pour\n3. place"}, "concise hierarchical PLAN": {"plan": "1. grasp\n2. pour\n3. place"},
"Update the memory": {"memory": "poured once"}, "Update the memory": {"memory": "poured once"},
"acknowledgement the robot": {"text": "Sure."}, "acknowledgement the robot": {"text": "Sure."},
"ONE realistic interruption": { "ONE realistic interruption": {