From 79ca79cba29a75c32d58e0e37a096a725af163b0 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Mon, 27 Apr 2026 17:08:36 +0200 Subject: [PATCH] feat(annotate): Module 1 sees the whole episode as one video block MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces keyframe sampling with a single Qwen-VL video block covering the whole demonstration. The model pools temporally itself and chooses where to cut subtasks — no stride, no count, no keyframe count knob to tune. - frames.py: ``FrameProvider`` gains ``video_for_episode(record, max_frames)``; ``VideoFrameProvider`` samples up to ``max_frames`` uniformly across the episode duration; ``_NullProvider`` returns [] for the no-video fallback. New ``to_video_block`` helper. - Module 1: drops keyframe sampling. The subtask prompt now goes out as ``[{"type":"video", "video":[]}, {"type":"text", ...}]`` and the prompt template asks the model to "watch the whole clip, then segment it" with cut points decided from gripper/contact/regrasp events the model sees. - Module1Config: ``keyframes_per_episode`` removed; replaced with ``max_video_frames: int = 32`` (model-capacity bound, not annotation logic). - Test: ``test_module1_attaches_video_block_to_subtask_prompt`` locks in the single-video-block invariant. - Stub-VLM markers updated: tests now key on "atomic subtasks" instead of the old "Decompose the demonstration" phrase that no longer appears in the prompt. - Docs: updated to describe the whole-episode video-block behavior and the no-video fallback. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/source/annotation_pipeline.mdx | 19 ++++-- .../annotations/steerable_pipeline/config.py | 11 +++- .../annotations/steerable_pipeline/frames.py | 44 +++++++++++++ .../modules/plan_subtasks_memory.py | 11 ++-- .../prompts/module_1_subtasks.txt | 18 +++--- tests/annotations/run_e2e_smoke.py | 2 +- tests/annotations/test_modules.py | 61 ++++++++++++++++++- .../test_pipeline_recipe_render.py | 4 +- 8 files changed, 143 insertions(+), 27 deletions(-) diff --git a/docs/source/annotation_pipeline.mdx b/docs/source/annotation_pipeline.mdx index 7104e7224..f896849c2 100644 --- a/docs/source/annotation_pipeline.mdx +++ b/docs/source/annotation_pipeline.mdx @@ -36,11 +36,20 @@ uv run lerobot-annotate \ --vlm.tensor_parallel_size=2 ``` -The pipeline attaches camera keyframes to every Module 1/2/3 prompt by -default, decoded from the dataset's first `observation.images.*` stream. -Override with `--vlm.camera_key=observation.images.` to pin a -specific viewpoint. Datasets with no video tracks fall back to text-only -prompts automatically. +The pipeline attaches actual camera footage to every Module 1/2/3 prompt +by default, decoded from the dataset's first `observation.images.*` +stream. Override with `--vlm.camera_key=observation.images.` to +pin a specific viewpoint. Datasets with no video tracks fall back to +text-only prompts automatically. + +**Module 1 sees the whole episode as one video block.** Subtask +decomposition gets a `{"type":"video", "video":[]}` block +covering the entire demonstration; Qwen-VL pools temporally on its own +and decides where to cut. There is no keyframe stride or count knob — +`--module_1.max_video_frames` (default 32) only caps the frames packed +into the video block as a model-capacity bound. Module 2 attaches a +single still frame at the interjection timestamp; Module 3 attaches the +exact emission frame to each VQA pair. The executor picks `LocalPipelineExecutor` for small datasets and `SlurmPipelineExecutor` for large ones based on diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index 97b0e273d..f7d80c28a 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -23,10 +23,17 @@ from typing import Literal @dataclass class Module1Config: - """Module 1 hyperparameters: plan + subtasks + memory.""" + """Module 1 hyperparameters: plan + subtasks + memory. + + Subtask decomposition sees the **whole episode** as one Qwen-VL video + block — no keyframe stride or count: the model handles temporal pooling + itself and decides where to cut. ``max_video_frames`` only caps the + number of frames packed into the video block (a model-capacity bound, + not an annotation-logic knob). + """ enabled: bool = True - keyframes_per_episode: int = 8 + max_video_frames: int = 32 min_subtask_seconds: float = 1.5 plan_max_steps: int = 8 diff --git a/src/lerobot/annotations/steerable_pipeline/frames.py b/src/lerobot/annotations/steerable_pipeline/frames.py index a28f30180..68c7cd025 100644 --- a/src/lerobot/annotations/steerable_pipeline/frames.py +++ b/src/lerobot/annotations/steerable_pipeline/frames.py @@ -37,6 +37,15 @@ class FrameProvider(Protocol): def frames_at(self, record: EpisodeRecord, timestamps: list[float]) -> list[Any]: """Return one PIL.Image per timestamp; empty list if no camera available.""" + def video_for_episode(self, record: EpisodeRecord, max_frames: int) -> list[Any]: + """Return up to ``max_frames`` PIL images covering the whole episode. + + Sampling is uniform across the episode duration. The returned list is + intended to be passed as one ``{"type":"video", "video":}`` + block to a Qwen-VL-compatible model that pools temporally itself. + Empty list if no camera available. + """ + @dataclass class _NullProvider: @@ -45,6 +54,9 @@ class _NullProvider: def frames_at(self, record: EpisodeRecord, timestamps: list[float]) -> list[Any]: return [] + def video_for_episode(self, record: EpisodeRecord, max_frames: int) -> list[Any]: + return [] + def null_provider() -> FrameProvider: return _NullProvider() @@ -133,6 +145,27 @@ class VideoFrameProvider: out.append(Image.fromarray(hwc, mode="RGB")) return out + def video_for_episode(self, record: EpisodeRecord, max_frames: int) -> list[Any]: + """Return up to ``max_frames`` images uniformly sampled across the episode. + + The whole episode duration is covered; the model picks subtask + boundaries from the temporal pooling it does internally. + """ + if max_frames <= 0 or self.camera_key is None or not record.frame_timestamps: + return [] + n_frames = min(max_frames, len(record.frame_timestamps)) + if n_frames == len(record.frame_timestamps): + timestamps = list(record.frame_timestamps) + else: + t0 = record.frame_timestamps[0] + t_last = record.frame_timestamps[-1] + if t_last <= t0: + timestamps = [float(t0)] * n_frames + else: + step = (t_last - t0) / (n_frames - 1) if n_frames > 1 else 0.0 + timestamps = [float(t0 + i * step) for i in range(n_frames)] + return self.frames_at(record, timestamps) + def make_frame_provider(root: Path, camera_key: str | None = None) -> FrameProvider: """Build a :class:`VideoFrameProvider` if videos are present, else null.""" @@ -148,3 +181,14 @@ def make_frame_provider(root: Path, camera_key: str | None = None) -> FrameProvi def to_image_blocks(images: list[Any]) -> list[dict[str, Any]]: """Convert PIL images to Qwen-VL-compatible content blocks.""" return [{"type": "image", "image": img} for img in images] + + +def to_video_block(images: list[Any]) -> list[dict[str, Any]]: + """Wrap a list of PIL images as one Qwen-VL video block. + + Returns ``[]`` when the list is empty, so the caller can splat the result + into a content array without a separate emptiness check. + """ + if not images: + return [] + return [{"type": "video", "video": list(images)}] diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index bbb6f6a86..fff7b308b 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -22,9 +22,9 @@ from dataclasses import dataclass, field from typing import Any from ..config import Module1Config -from ..frames import FrameProvider, null_provider, to_image_blocks +from ..frames import FrameProvider, null_provider, to_video_block from ..prompts import load as load_prompt -from ..reader import EpisodeRecord, keyframe_indices +from ..reader import EpisodeRecord from ..staging import EpisodeStaging from ..vlm_client import VlmClient @@ -151,17 +151,14 @@ class PlanSubtasksMemoryModule: if record.row_count == 0 or not record.frame_timestamps: return [] episode_duration = record.frame_timestamps[-1] - record.frame_timestamps[0] - keyframe_local = keyframe_indices(record, self.config.keyframes_per_episode) - keyframe_ts = [float(record.frame_timestamps[i]) for i in keyframe_local] - images = self.frame_provider.frames_at(record, keyframe_ts) + video_frames = self.frame_provider.video_for_episode(record, self.config.max_video_frames) prompt = load_prompt("module_1_subtasks").format( episode_task=record.episode_task, - num_keyframes=len(keyframe_local), min_subtask_seconds=self.config.min_subtask_seconds, max_steps=self.config.plan_max_steps, episode_duration=f"{episode_duration:.3f}", ) - content = [*to_image_blocks(images), {"type": "text", "text": prompt}] + content = [*to_video_block(video_frames), {"type": "text", "text": prompt}] messages = [{"role": "user", "content": content}] result = self.vlm.generate_json([messages])[0] spans = result.get("subtasks") if isinstance(result, dict) else None diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt index 523312123..5d7c9cc8d 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt @@ -2,20 +2,22 @@ You are labeling a teleoperated robot demonstration. The user originally asked: "{episode_task}" -You will be shown {num_keyframes} keyframes spaced evenly across the -episode. Decompose the demonstration into a list of consecutive atomic -subtasks the robot performs. +You are shown the entire demonstration as a single video. Watch the +whole clip, then segment it into a list of consecutive atomic subtasks +the robot performs. -Authoring rules — based on Hi Robot (Shi 2025) atom granularity and Pi0.7 -(Physical Intelligence 2025) "how, not what" detail: +Authoring rules — based on Hi Robot (Shi 2025) atom granularity and +Pi0.7 (Physical Intelligence 2025) "how, not what" detail: -- Each subtask is one atomic skill the low-level policy can execute, e.g. - "pick up one piece of lettuce", "place the bowl into the box", +- Each subtask is one atomic skill the low-level policy can execute, + e.g. "pick up one piece of lettuce", "place the bowl into the box", "move the right arm to the left". - Capture HOW the subtask is performed, not only WHAT — e.g. prefer "grasp the handle of the sponge with the left hand" to "pick up the sponge". -- Subtasks are non-overlapping and must cover the full episode in order. +- Subtasks are non-overlapping and cover the full episode in order. + Choose the cut points yourself based on what you see in the video + (gripper open/close events, contact, regrasps, transitions). - Each subtask spans at least {min_subtask_seconds} seconds. - Do not exceed {max_steps} subtasks total. - Every subtask's [start_time, end_time] must lie within diff --git a/tests/annotations/run_e2e_smoke.py b/tests/annotations/run_e2e_smoke.py index 6d35266f7..1696c81c0 100644 --- a/tests/annotations/run_e2e_smoke.py +++ b/tests/annotations/run_e2e_smoke.py @@ -79,7 +79,7 @@ def _stub_responder(messages): text = block.get("text", "") elif isinstance(content, str): text = content - if "Decompose the demonstration" in text: + if "atomic subtasks" in text: return { "subtasks": [ {"text": "grasp the bottle", "start": 0.0, "end": 1.0}, diff --git a/tests/annotations/test_modules.py b/tests/annotations/test_modules.py index 96ab6b9b3..a9dcd77de 100644 --- a/tests/annotations/test_modules.py +++ b/tests/annotations/test_modules.py @@ -45,11 +45,17 @@ class _StubFrameProvider: sentinel: Any = field(default_factory=lambda: object()) calls: list[tuple[int, tuple[float, ...]]] = field(default_factory=list) + video_calls: list[tuple[int, int]] = field(default_factory=list) def frames_at(self, record, timestamps): self.calls.append((record.episode_index, tuple(timestamps))) return [self.sentinel] * len(timestamps) + def video_for_episode(self, record, max_frames): + self.video_calls.append((record.episode_index, max_frames)) + n = min(max_frames, len(record.frame_timestamps)) + return [self.sentinel] * n + def _spy_responder(captured: list[list[dict[str, Any]]], reply: Any): def responder(messages): @@ -62,14 +68,14 @@ def _spy_responder(captured: list[list[dict[str, Any]]], reply: Any): def test_module1_plan_memory_subtask_smoke(fixture_dataset_root: Path, tmp_path: Path) -> None: vlm = make_canned_responder( { - "Decompose the demonstration": { + "atomic subtasks": { "subtasks": [ {"text": "grasp the handle of the sponge", "start": 0.0, "end": 0.4}, {"text": "wipe the counter from left to right", "start": 0.4, "end": 0.8}, {"text": "place the sponge into the sink", "start": 0.8, "end": 1.1}, ] }, - "write a concise hierarchical PLAN": {"plan": "1. grasp\n2. wipe\n3. place"}, + "concise hierarchical PLAN": {"plan": "1. grasp\n2. wipe\n3. place"}, "Update the memory": {"memory": "wiped the counter once"}, }, ) @@ -168,6 +174,57 @@ def test_module3_vqa_unique_per_frame(single_episode_root: Path, tmp_path: Path) assert ts in frame_set +def test_module1_attaches_video_block_to_subtask_prompt(fixture_dataset_root: Path, tmp_path: Path) -> None: + """Module 1 sends one ``type=video`` block covering the whole episode.""" + captured: list[list[dict[str, Any]]] = [] + payload = { + "subtasks": [ + {"text": "grasp the handle of the sponge", "start": 0.0, "end": 0.5}, + {"text": "wipe the counter", "start": 0.5, "end": 1.1}, + ] + } + plan_payload = {"plan": "1. grasp\n2. wipe"} + memory_payload = {"memory": "wiped once"} + + def responder(messages): + captured.append(list(messages)) + text = "" + for m in messages: + for block in m.get("content", []): + if isinstance(block, dict) and block.get("type") == "text": + text = block.get("text", "") + if "concise hierarchical PLAN" in text: + return plan_payload + if "Update the memory" in text: + return memory_payload + return payload + + provider = _StubFrameProvider() + module = PlanSubtasksMemoryModule( + vlm=StubVlmClient(responder=responder), + config=Module1Config(max_video_frames=5), + frame_provider=provider, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + + # the subtask call (the first VLM call) must carry exactly one video block + assert captured, "no VLM calls made" + first_call = captured[0] + content = first_call[0]["content"] + video_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "video"] + image_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "image"] + text_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "text"] + assert len(video_blocks) == 1, f"expected exactly 1 video block, got {content}" + assert image_blocks == [], "subtask prompt must not mix image blocks with the video block" + assert len(text_blocks) == 1 + # video block must wrap a list of frames covering the episode + assert isinstance(video_blocks[0]["video"], list) + assert len(video_blocks[0]["video"]) <= 5 + assert provider.video_calls == [(record.episode_index, 5)] + + def test_module3_attaches_frame_image_block_to_prompt(single_episode_root: Path, tmp_path: Path) -> None: """Each VQA prompt must carry a single image block at the emission frame.""" captured: list[list[dict[str, Any]]] = [] diff --git a/tests/annotations/test_pipeline_recipe_render.py b/tests/annotations/test_pipeline_recipe_render.py index d881f9961..80f9d01a8 100644 --- a/tests/annotations/test_pipeline_recipe_render.py +++ b/tests/annotations/test_pipeline_recipe_render.py @@ -49,14 +49,14 @@ _RECIPE_PATH = ( def _build_executor() -> Executor: vlm = make_canned_responder( { - "Decompose the demonstration": { + "atomic subtasks": { "subtasks": [ {"text": "grasp the bottle", "start": 0.0, "end": 0.5}, {"text": "pour into the cup", "start": 0.5, "end": 1.0}, {"text": "place the bottle down", "start": 1.0, "end": 1.5}, ] }, - "write a concise hierarchical PLAN": {"plan": "1. grasp\n2. pour\n3. place"}, + "concise hierarchical PLAN": {"plan": "1. grasp\n2. pour\n3. place"}, "Update the memory": {"memory": "poured once"}, "acknowledgement the robot": {"text": "Sure."}, "ONE realistic interruption": {