From 2ea0da2d9ff6ac9f60a41da6b9ce7f6714284463 Mon Sep 17 00:00:00 2001 From: pepijn Date: Tue, 19 May 2026 12:44:35 +0000 Subject: [PATCH 1/8] fix(annotate): tag uploaded dataset revision Co-authored-by: Cursor --- src/lerobot/scripts/lerobot_annotate.py | 19 +++++---- tests/scripts/test_lerobot_annotate.py | 51 +++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 7 deletions(-) create mode 100644 tests/scripts/test_lerobot_annotate.py diff --git a/src/lerobot/scripts/lerobot_annotate.py b/src/lerobot/scripts/lerobot_annotate.py index 86d3ab3fa..7fee1f052 100644 --- a/src/lerobot/scripts/lerobot_annotate.py +++ b/src/lerobot/scripts/lerobot_annotate.py @@ -140,7 +140,7 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None: exist_ok=True, ) print(f"[lerobot-annotate] uploading {root} -> {repo_id}...", flush=True) - api.upload_folder( + commit_info = api.upload_folder( folder_path=str(root), repo_id=repo_id, repo_type="dataset", @@ -169,13 +169,18 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None: version_tag = ds_version except Exception as exc: # noqa: BLE001 print(f"[lerobot-annotate] could not read codebase_version from info.json ({exc}); falling back to {version_tag}", flush=True) + revision = getattr(commit_info, "oid", None) + tag_kwargs = { + "repo_id": repo_id, + "tag": version_tag, + "repo_type": "dataset", + "exist_ok": True, + } + if revision is not None: + tag_kwargs["revision"] = revision + try: - api.create_tag( - repo_id=repo_id, - tag=version_tag, - repo_type="dataset", - exist_ok=True, - ) + api.create_tag(**tag_kwargs) print(f"[lerobot-annotate] tagged {repo_id} as {version_tag}", flush=True) except Exception as exc: # noqa: BLE001 print( diff --git a/tests/scripts/test_lerobot_annotate.py b/tests/scripts/test_lerobot_annotate.py new file mode 100644 index 000000000..c98ee7cb3 --- /dev/null +++ b/tests/scripts/test_lerobot_annotate.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +import json +from types import SimpleNamespace + + +def test_push_to_hub_tags_uploaded_dataset_revision(tmp_path, monkeypatch): + from lerobot.scripts.lerobot_annotate import _push_to_hub + + root = tmp_path / "dataset" + (root / "meta").mkdir(parents=True) + (root / "meta" / "info.json").write_text(json.dumps({"codebase_version": "v3.0"})) + + calls = {} + + class FakeHfApi: + def create_repo(self, **kwargs): + calls["create_repo"] = kwargs + + def upload_folder(self, **kwargs): + calls["upload_folder"] = kwargs + return SimpleNamespace(oid="abc123") + + def create_tag(self, **kwargs): + calls["create_tag"] = kwargs + + monkeypatch.setattr("huggingface_hub.HfApi", FakeHfApi) + + cfg = SimpleNamespace( + repo_id="source/dataset", + dest_repo_id="annotated/dataset", + push_private=True, + push_commit_message=None, + ) + + _push_to_hub(root, cfg) + + assert calls["create_repo"] == { + "repo_id": "annotated/dataset", + "repo_type": "dataset", + "private": True, + "exist_ok": True, + } + assert calls["upload_folder"]["repo_id"] == "annotated/dataset" + assert calls["create_tag"] == { + "repo_id": "annotated/dataset", + "tag": "v3.0", + "repo_type": "dataset", + "exist_ok": True, + "revision": "abc123", + } From a0233f53f48f3cd3391a5c823e3d91407b7045d4 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Wed, 20 May 2026 11:46:59 +0200 Subject: [PATCH 2/8] feat(annotate): default VLM to Qwen3.6-35B-A3B-FP8 Match the production target used in examples/annotations/run_hf_job.py. Per Scale Labs' dense-captioning ablations, model capacity dominates prompt-engineering gains; defaulting to the larger model avoids shipping a worst-tier configuration out of the box. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/lerobot/annotations/steerable_pipeline/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index f6b9204bc..c609cd286 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -102,7 +102,7 @@ class VlmConfig: # ``openai`` talks to a local OpenAI-compatible server; the CLI # auto-spawns one when ``auto_serve=True``. backend: str = "openai" - model_id: str = "Qwen/Qwen2.5-VL-7B-Instruct" + model_id: str = "Qwen/Qwen3.6-35B-A3B-FP8" # OpenAI-compatible server endpoint; ``EMPTY`` works for local servers. api_base: str = "http://localhost:8000/v1" From 86a7edc590c16a0f4c7e0fc7a72376d88f3a8407 Mon Sep 17 00:00:00 2001 From: pepijn Date: Fri, 22 May 2026 11:40:05 +0000 Subject: [PATCH 3/8] =?UTF-8?q?feat(annotate):=20phase=200=20=E2=80=94=20d?= =?UTF-8?q?erive=20canonical=20vocabulary=20from=20sample=20episodes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pipeline previously emitted near-unique subtask + memory phrasings per episode (free-form LLM rephrasing). On the downstream low-level policy that collapses the action expert's conditioning to noise: every episode pairs a different paraphrase with similar motions, so the expert learns a flat scene-prior that ignores the subtask string — then at inference the high-level head invents *yet another* paraphrase and the expert produces tiny "uncertain hover" chunks. Add a vocabulary-discovery phase (phase 0) that runs once per dataset: - watches the first ``vocabulary.sample_episodes`` (default 3) episode videos as one Qwen-VL prompt, - asks the VLM to derive ~``n_subtask_target`` canonical imperative subtask labels and ~``n_memory_target`` first-person past-tense memory milestones that recur across the demos, - persists them to ``meta/canonical_vocabulary.json`` (human- inspectable, hand-editable), and - wires the resulting ``Vocabulary`` into the ``plan`` module so every per-episode subtask + memory call is constrained to those exact strings (both as prompt-side instructions *and* post-VLM validation: paraphrases snap to the closest canonical entry via token-set overlap; below a 0.5 Jaccard floor the subtask is dropped rather than warped into something semantically wrong). Operator workflow: - first run discovers the vocabulary, writes the JSON, and runs the ``plan`` module against it, - subsequent runs reuse the on-disk file (``reuse_existing=True`` default) so hand-edits stick, - set ``--vocabulary.enabled=False`` to fall back to free-form generation (the original behaviour). The discovery prompt forbids gerunds / third-person / adverbs and caps the lists to the requested counts, matching the Hi-Robot / π0.6-MEM convention of small per-environment vocabularies. The ``plan`` module's subtask + memory prompts grow a conditional ``{vocabulary_block}`` slot rendered only when a vocabulary is present; without one the templates collapse to their previous free-form form. Tests: 11 new unit tests under tests/annotations/test_vocabulary.py cover the on-disk round-trip, discovery against the fixture dataset, ``reuse_existing`` short-circuit, paraphrase canonicalisation, off- vocab subtask dropping, and the no-vocabulary pass-through path. Co-Authored-By: Claude Opus 4.7 (1M context) Co-authored-by: Cursor --- docs/source/annotation_pipeline.mdx | 17 +- .../steerable_pipeline/__init__.py | 14 + .../annotations/steerable_pipeline/config.py | 37 +++ .../steerable_pipeline/executor.py | 69 ++++- .../modules/plan_subtasks_memory.py | 99 +++++++ .../prompts/module_0_vocabulary.txt | 46 +++ .../prompts/module_1_memory.txt | 2 +- .../prompts/module_1_subtasks.txt | 4 +- .../steerable_pipeline/vocabulary.py | 224 +++++++++++++++ src/lerobot/scripts/lerobot_annotate.py | 5 + tests/annotations/test_vocabulary.py | 271 ++++++++++++++++++ 11 files changed, 783 insertions(+), 5 deletions(-) create mode 100644 src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt create mode 100644 src/lerobot/annotations/steerable_pipeline/vocabulary.py create mode 100644 tests/annotations/test_vocabulary.py diff --git a/docs/source/annotation_pipeline.mdx b/docs/source/annotation_pipeline.mdx index b5f52cb1c..6e4cd9563 100644 --- a/docs/source/annotation_pipeline.mdx +++ b/docs/source/annotation_pipeline.mdx @@ -7,7 +7,8 @@ ## What the pipeline produces -Three modules write into a per-episode staging tree, then a single writer +A vocabulary-discovery phase derives a small canonical wording, then three +modules write into a per-episode staging tree, then a single writer rewrites the data shards in place: | Style / atom | Column | Module | @@ -20,6 +21,20 @@ rewrites the data shards in place: | speech tool-call atom (`style=null`, `say`) | `language_events` | `interjections`| | `vqa` (user / assistant pair) | `language_events` | `vqa` | +The `plan` module is constrained to a **canonical vocabulary** discovered +once per dataset by the `vocabulary` module (phase 0). It watches a few +sample episode videos (`--vocabulary.sample_episodes`, default `3`) and +asks the VLM to derive a small set of imperative subtask labels +(~`--vocabulary.n_subtask_target`, default `10`) and first-person memory +milestones (~`--vocabulary.n_memory_target`, default `6`) that recur +across the demos. The result lands at +`meta/canonical_vocabulary.json` (human-readable / hand-editable) and is +reused on every subsequent run. The `plan` module then constrains both +subtask + memory generation to those exact strings — the downstream +low-level policy sees a small, repeatable target distribution instead of +thousands of LLM paraphrases. Disable with `--vocabulary.enabled=False` +to fall back to free-form generation. + The writer does **not** add a `tools` column to the parquet — the tool catalog lives at `meta/info.json["tools"]` instead (see [Tools](./tools)). After every annotation run the pipeline ensures the diff --git a/src/lerobot/annotations/steerable_pipeline/__init__.py b/src/lerobot/annotations/steerable_pipeline/__init__.py index a8da5e05e..02d819604 100644 --- a/src/lerobot/annotations/steerable_pipeline/__init__.py +++ b/src/lerobot/annotations/steerable_pipeline/__init__.py @@ -26,11 +26,25 @@ outputs are staged per-episode before a final parquet rewrite: from .config import AnnotationPipelineConfig from .validator import StagingValidator, ValidationReport +from .vocabulary import ( + VOCABULARY_FILENAME, + Vocabulary, + VocabularyDiscoveryModule, + load_vocabulary, + save_vocabulary, + vocabulary_path, +) from .writer import LanguageColumnsWriter __all__ = [ + "VOCABULARY_FILENAME", "AnnotationPipelineConfig", "LanguageColumnsWriter", "StagingValidator", "ValidationReport", + "Vocabulary", + "VocabularyDiscoveryModule", + "load_vocabulary", + "save_vocabulary", + "vocabulary_path", ] diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index c609cd286..d80e4149a 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -21,6 +21,42 @@ from pathlib import Path from typing import Any +@dataclass +class VocabularyConfig: + """Phase 0 — dataset-level canonical vocabulary discovery. + + Watches the first ``sample_episodes`` episode videos and asks the VLM + to derive a small canonical vocabulary (~``n_subtask_target`` subtask + labels + ~``n_memory_target`` memory milestones) that every episode + in the dataset will reuse. The output lands at + ``meta/canonical_vocabulary.json`` and feeds phase 1's subtask + + memory generation as both a prompt-side constraint and a post-VLM + validation gate. + + Why this exists: free-form LLM rephrasing per episode produces near- + unique subtask strings, which makes the downstream low-level policy's + conditioning effectively noise — at inference the policy generates a + *new* paraphrase the action expert has never seen and produces tiny + cautious actions. Forcing every episode onto the same small set of + canonical strings gives the action expert dense supervision per + string and a small target distribution to learn against. + + Set ``enabled=False`` to fall back to free-form generation (original + behaviour). ``reuse_existing=True`` keeps a hand-edited vocabulary + file from being clobbered on re-runs. + """ + + enabled: bool = True + sample_episodes: int = 3 + n_subtask_target: int = 10 + n_memory_target: int = 6 + max_video_frames_per_episode: int = 32 + # When True (default), an existing meta/canonical_vocabulary.json is + # loaded as-is and no VLM call is made — lets operators hand-edit the + # file. Set False to always rediscover from the sample episodes. + reuse_existing: bool = True + + @dataclass class PlanConfig: """``plan`` module: plan + subtasks + memory + task augmentation. @@ -186,6 +222,7 @@ class AnnotationPipelineConfig: seed: int = 1729 + vocabulary: VocabularyConfig = field(default_factory=VocabularyConfig) plan: PlanConfig = field(default_factory=PlanConfig) interjections: InterjectionsConfig = field(default_factory=InterjectionsConfig) vqa: VqaConfig = field(default_factory=VqaConfig) diff --git a/src/lerobot/annotations/steerable_pipeline/executor.py b/src/lerobot/annotations/steerable_pipeline/executor.py index ad46d4750..5c725fa65 100644 --- a/src/lerobot/annotations/steerable_pipeline/executor.py +++ b/src/lerobot/annotations/steerable_pipeline/executor.py @@ -15,8 +15,14 @@ # limitations under the License. """In-process executor that runs the annotation phases. -The executor plans **six phases** in the dependency order from the plan: +The executor plans **seven phases** in the dependency order from the plan: + phase 0: vocabulary discovery — derive a small canonical vocabulary + from the first few sample-episode videos (subtask labels + + memory milestones) and persist it next to the dataset; the + ``plan`` module then constrains every per-episode generation + to those strings, so the downstream policy sees a small, + repeatable conditioning distribution phase 1: ``plan`` module (plan + subtasks + memory) phase 2: ``interjections`` module (interjections + speech) phase 3: ``plan`` plan-update pass — re-runs plan emission at every @@ -88,6 +94,7 @@ class Executor: vqa: Any # GeneralVqaModule writer: LanguageColumnsWriter validator: StagingValidator + vocabulary: Any = None # VocabularyDiscoveryModule | None def run(self, root: Path) -> PipelineRunSummary: records = list(iter_episodes(root, only_episodes=self.config.only_episodes)) @@ -102,6 +109,10 @@ class Executor: phases: list[PhaseResult] = [] + # Phase 0: vocabulary discovery. Mutates ``self.plan.vocabulary`` + # so subsequent per-episode plan calls see the canonical labels. + phases.append(self._run_vocabulary_phase(records, root)) + # Phase 1: ``plan`` module (plan + subtasks + memory) phases.append(self._run_module_phase("plan", records, staging_dir, self.plan)) # Phase 2: ``interjections`` module (interjections + speech). It @@ -172,6 +183,62 @@ class Executor: flush=True, ) + def _run_vocabulary_phase( + self, records: list[EpisodeRecord], root: Path + ) -> PhaseResult: + """Discover (or load) the canonical vocabulary, wire it into ``self.plan``. + + Returns a ``PhaseResult`` whose ``episodes_processed`` is the number + of sample episodes consulted (0 when disabled or no VLM call was + needed); ``episodes_skipped`` is always ``0`` because vocabulary is + a once-per-dataset artifact, not a per-episode product. + """ + from .vocabulary import load_vocabulary, save_vocabulary # noqa: PLC0415 + + if self.vocabulary is None or not getattr(self.vocabulary, "enabled", False): + print( + "[annotate] phase=vocabulary skipped (module disabled or unset)", + flush=True, + ) + return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) + + existing = load_vocabulary(root) + if existing is not None and self.config.vocabulary.reuse_existing: + print( + f"[annotate] phase=vocabulary reusing {root / 'meta' / 'canonical_vocabulary.json'} " + f"({len(existing.subtasks)} subtask labels, " + f"{len(existing.memory_milestones)} memory milestones)", + flush=True, + ) + self.plan.vocabulary = existing + return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) + + sample_n = max(1, min(int(self.config.vocabulary.sample_episodes), len(records))) + print( + f"[annotate] phase=vocabulary discovering from {sample_n} sample episode(s)...", + flush=True, + ) + t0 = time.time() + vocab = self.vocabulary.discover(records[:sample_n], existing=existing) + if vocab is None: + print( + "[annotate] phase=vocabulary returned no vocabulary — " + "plan module will fall back to free-form generation", + flush=True, + ) + return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) + + save_path = save_vocabulary(root, vocab) + print( + f"[annotate] phase=vocabulary wrote {save_path} " + f"({len(vocab.subtasks)} subtask labels, " + f"{len(vocab.memory_milestones)} memory milestones) in " + f"{time.time() - t0:.1f}s", + flush=True, + ) + self.plan.vocabulary = vocab + return PhaseResult(name="vocabulary", episodes_processed=sample_n, episodes_skipped=0) + def _run_module_phase( self, name: str, diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index 0218e79b1..218d98ad1 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -17,6 +17,7 @@ from __future__ import annotations +import logging from collections.abc import Sequence from dataclasses import dataclass, field from pathlib import Path @@ -34,6 +35,9 @@ from ..prompts import load as load_prompt from ..reader import EpisodeRecord, reconstruct_subtask_spans, snap_to_frame from ..staging import EpisodeStaging from ..vlm_client import VlmClient +from ..vocabulary import Vocabulary + +logger = logging.getLogger(__name__) @dataclass @@ -54,6 +58,11 @@ class PlanSubtasksMemoryModule: vlm: VlmClient config: PlanConfig frame_provider: FrameProvider = field(default_factory=null_provider) + vocabulary: Vocabulary | None = None + """When set, the module constrains subtask + memory generation to the + canonical strings in ``vocabulary``. Phase 0 (vocabulary discovery) + populates this once per dataset; ``None`` falls back to free-form + generation (original behaviour).""" @property def enabled(self) -> bool: @@ -311,6 +320,7 @@ class PlanSubtasksMemoryModule: min_subtask_seconds=self.config.min_subtask_seconds, max_steps=self.config.plan_max_steps, episode_duration=f"{episode_duration:.3f}", + vocabulary_block=self._subtask_vocabulary_block(), ) spans = self._vlm_field(self._video_message(record, prompt), "subtasks") if not spans: @@ -330,12 +340,100 @@ class PlanSubtasksMemoryModule: end = max(t0, min(end, t_last)) if end < start: start, end = end, start + if not text: + continue + text = self._canonicalize_subtask(text) if not text: continue cleaned.append({"text": text, "start": start, "end": end}) cleaned.sort(key=lambda s: s["start"]) return cleaned + # ------------------------------------------------------------------ + # Canonical-vocabulary helpers + # ------------------------------------------------------------------ + + def _subtask_vocabulary_block(self) -> str: + """Bullet-list of canonical subtasks the VLM must pick from. + + Returns an empty string when no vocabulary is configured — + ``module_1_subtasks.txt`` then falls back to its free-form + rules (original behaviour). + """ + if self.vocabulary is None or not self.vocabulary.subtasks: + return "" + bullets = "\n".join(f"- {s}" for s in self.vocabulary.subtasks) + return ( + "You MUST choose each subtask label verbatim from this canonical " + "vocabulary — pick the closest match for each phase of the demo, " + "and reuse the SAME string every time that phase recurs. The " + "low-level policy is conditioned on these exact strings; any " + "novel paraphrase you invent will make its conditioning OOD.\n" + "Canonical subtask labels:\n" + f"{bullets}\n\n" + ) + + def _memory_vocabulary_block(self) -> str: + """Bullet-list of canonical memory milestones the VLM must pick from.""" + if self.vocabulary is None or not self.vocabulary.memory_milestones: + return "" + bullets = "\n".join(f"- {m}" for m in self.vocabulary.memory_milestones) + return ( + "Compose the memory by picking ONLY from this canonical milestone " + "list — append a milestone (or rewrite the running memory to " + "compress past ones) using these exact phrases. Do not invent new " + "wording: every paraphrase weakens the downstream conditioning.\n" + "Canonical memory milestones:\n" + f"{bullets}\n\n" + ) + + def _canonicalize_subtask(self, text: str) -> str: + """Snap ``text`` to the closest canonical subtask string, or drop it. + + Without a vocabulary, the original text passes through. With a + vocabulary, an exact case-insensitive match wins; failing that, + the best Jaccard overlap on the word set is used as a tolerant + fuzzy match (handles articles / minor reorderings). If nothing + clears the floor, the subtask is dropped — better to skip a + phase than to feed the action expert an off-distribution string. + """ + if self.vocabulary is None or not self.vocabulary.subtasks: + return text.strip() + candidates = self.vocabulary.subtasks + cleaned = text.strip() + lowered = cleaned.lower() + for candidate in candidates: + if candidate.lower() == lowered: + return candidate + # Jaccard fallback: token-set overlap, drop articles + adverbs. + ignore = {"the", "a", "an", "to", "into", "from", "of", "on", "over", "at"} + words = {w for w in lowered.replace(",", " ").split() if w and w not in ignore} + if not words: + return "" + best: tuple[float, str] | None = None + for candidate in candidates: + cand_words = { + w for w in candidate.lower().replace(",", " ").split() if w and w not in ignore + } + if not cand_words: + continue + inter = len(words & cand_words) + union = len(words | cand_words) + score = inter / union if union else 0.0 + if best is None or score > best[0]: + best = (score, candidate) + # Floor: require at least ~half the tokens to overlap. Below that + # the VLM is hallucinating a novel phrase; drop rather than warp + # it into something semantically wrong. + if best is None or best[0] < 0.5: + logger.warning( + "subtask %r did not match any canonical label (best=%s) — dropping", + cleaned, + best, + ) + return "" + return best[1] + def _generate_plan( self, record: EpisodeRecord, # noqa: ARG002 (kept for signature stability) @@ -397,6 +495,7 @@ class PlanSubtasksMemoryModule: prior_memory=prior_memory or "(none)", completed_subtask=completed, remaining_subtasks=", ".join(remaining) if remaining else "(none)", + vocabulary_block=self._memory_vocabulary_block(), ) memory = self._vlm_field(self._text_message(prompt), "memory") return memory.strip() if isinstance(memory, str) else "" diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt new file mode 100644 index 000000000..f867c34b5 --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt @@ -0,0 +1,46 @@ +You are inspecting {n_episodes} sample episode video(s) from a teleoperated +robot dataset. Every episode in the dataset performs the SAME task; the +user originally asked: "{episode_task}". + +Watch all the clips and produce a SHORT canonical vocabulary that every +episode in this dataset will reuse. The downstream low-level policy is +conditioned on these strings — duplicate phrasings (e.g. "grasp blue +cube" vs "pick up the blue cube") would destroy the conditioning, so +pick one wording per concept and reuse it everywhere. + +You output two lists: + +1. `subtasks`: imperative, telegraphic commands the robot can execute. + - Verb-first. Drop articles, adverbs, qualifiers. + - Consistent object nouns (if the task says "cube", every subtask says + "cube" — never "block" / "object"). + - Atomic — one skill per subtask (gripper-open events, contact, regrasps, + transitions all become cut points). + - Aim for ~{n_subtask_target} labels. Fewer is better than more. + - Good: "move to blue cube", "grasp blue cube", "lift blue cube", + "place blue cube in box", "release blue cube", "retract arm". + - Bad: "the robot arm moves towards the blue cube" (third person, + too long), "carefully pick up the cube" (adverb, article), + "carrying the yellow cube over the green basket" (gerund — should + be imperative "transport yellow cube to green basket"). + +2. `memory_milestones`: first-person past-tense sentences the running + memory composes from. Each subtask phase that produces a lasting + change should have a milestone; transient motions (move, retract) + should NOT. + - First person, past tense. Start with "I". + - One sentence. Functional outcome only — no grasp / motion detail. + - Aim for ~{n_memory_target} milestones. + - Good: "I picked up the blue cube.", "I placed the blue cube in + the green box.", "I wiped the counter." + - Bad: "The robot arm grasped the blue cube." (third person), + "I carefully grasped the blue cube with the parallel gripper." + (irrelevant detail), "I moved towards the blue cube." (transient + motion — should be omitted, not memorialised). + +Output strictly valid JSON of shape: + + {{ + "subtasks": ["", ...], + "memory_milestones": ["I .", ...] + }} diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt index b5278368b..d066b9f73 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt @@ -13,7 +13,7 @@ Previous memory: {prior_memory} Just-completed subtask: "{completed_subtask}" Remaining subtasks (for relevance judgement only): {remaining_subtasks} -Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the +{vocabulary_block}Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the robot has accomplished so far — the running story it would tell itself. Authoring rules: diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt index c530b1340..12bbcfba2 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt @@ -4,9 +4,9 @@ The user originally asked: "{episode_task}" You are shown the entire demonstration as a single video. Watch the whole clip, then segment it into a list of consecutive atomic subtasks -the robot performs. Write short, telegraphic action labels. +the robot performs. -Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts: +{vocabulary_block}Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts: - Each subtask = one atomic skill the low-level policy can execute. - Write each subtask as an IMPERATIVE COMMAND, starting with a verb: diff --git a/src/lerobot/annotations/steerable_pipeline/vocabulary.py b/src/lerobot/annotations/steerable_pipeline/vocabulary.py new file mode 100644 index 000000000..8787ec372 --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/vocabulary.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Dataset-level canonical vocabulary discovery (Phase 0). + +The downstream consumer of these annotations is a low-level action expert +conditioned on the ``subtask`` string. Free-form per-episode LLM rephrasing +gives near-unique strings per occurrence, which collapses the action +expert's conditioning to noise and makes runtime subtask-paraphrase drift +catastrophic. The Hi-Robot / π0.6-MEM recipe ships a small canonical +vocabulary per environment (~10 strings) that every episode reuses; this +module derives that vocabulary automatically from the first few episode +videos and persists it next to the dataset. + +Pipeline-level flow: + + Phase 0 (here): watch N sample episodes → produce vocabulary.json + Phase 1 (plan module): reuse vocabulary on every episode, both as + prompt-side constraint *and* post-VLM validation + +The vocabulary is JSON, lives at ``/meta/canonical_vocabulary.json``, +and is human-inspectable / hand-editable — if the discovered set is wrong, +operators edit the file and re-run the pipeline without phase 0. +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Sequence +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from .config import VocabularyConfig +from .frames import FrameProvider, null_provider, to_video_block +from .prompts import load as load_prompt +from .reader import EpisodeRecord +from .vlm_client import VlmClient + +logger = logging.getLogger(__name__) + +VOCABULARY_FILENAME = "canonical_vocabulary.json" + + +@dataclass +class Vocabulary: + """Canonical phrasings shared across every episode of one dataset. + + Both lists are strict: per-episode subtask + memory generation pick + from these strings only; the downstream policy then has a small, + repeatable target distribution to learn instead of thousands of + LLM paraphrases. + """ + + subtasks: tuple[str, ...] + """Imperative subtask labels — what the low-level policy is conditioned + on. Verb-first, telegraphic, consistent object nouns. Example: + ``("move to blue cube", "grasp blue cube", "lift blue cube", + "place blue cube in box", "retract arm")``. + """ + + memory_milestones: tuple[str, ...] + """First-person past-tense milestone sentences — building blocks for + the running memory string. Example: ``("I picked up the blue cube.", + "I placed the blue cube in the green box.")``. Each milestone maps + 1:1 onto a completed subtask phase; ``memory_at_step_k`` is the + concatenation of milestones for completed phases. + """ + + def to_json(self) -> dict[str, list[str]]: + return { + "subtasks": list(self.subtasks), + "memory_milestones": list(self.memory_milestones), + } + + @classmethod + def from_json(cls, payload: dict[str, Any]) -> Vocabulary: + subtasks = tuple( + str(s).strip() for s in (payload.get("subtasks") or []) if str(s).strip() + ) + memory_milestones = tuple( + str(s).strip() for s in (payload.get("memory_milestones") or []) if str(s).strip() + ) + return cls(subtasks=subtasks, memory_milestones=memory_milestones) + + def is_empty(self) -> bool: + return not self.subtasks and not self.memory_milestones + + +def vocabulary_path(root: Path) -> Path: + """Return the canonical on-disk location for the vocabulary file.""" + return root / "meta" / VOCABULARY_FILENAME + + +def load_vocabulary(root: Path) -> Vocabulary | None: + """Read ``/meta/canonical_vocabulary.json`` if present. + + Returns ``None`` when the file does not exist — callers fall back to + free-form (unconstrained) subtask + memory generation, preserving the + pipeline's behaviour on datasets that never ran phase 0. + """ + path = vocabulary_path(root) + if not path.exists(): + return None + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + logger.warning("could not read %s: %s — proceeding without vocabulary", path, exc) + return None + if not isinstance(payload, dict): + logger.warning("%s is not a JSON object — ignoring", path) + return None + vocab = Vocabulary.from_json(payload) + if vocab.is_empty(): + return None + return vocab + + +def save_vocabulary(root: Path, vocab: Vocabulary) -> Path: + """Atomically persist ``vocab`` to ``/meta/canonical_vocabulary.json``.""" + path = vocabulary_path(root) + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text( + json.dumps(vocab.to_json(), indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + tmp.replace(path) + return path + + +@dataclass +class VocabularyDiscoveryModule: + """Derive a dataset-level canonical vocabulary from sample episodes. + + Phase 0 of the executor: pulls ``config.sample_episodes`` episode + videos, packs them into one Qwen-VL multi-video prompt, and asks the + model to enumerate the small set of canonical subtask labels + + memory milestones that recur across them. The output is persisted + to ``meta/canonical_vocabulary.json`` and consumed by phase 1. + """ + + vlm: VlmClient + config: VocabularyConfig + frame_provider: FrameProvider = field(default_factory=null_provider) + + @property + def enabled(self) -> bool: + return self.config.enabled + + def discover( + self, + records: Sequence[EpisodeRecord], + *, + existing: Vocabulary | None = None, + ) -> Vocabulary | None: + """Run vocabulary discovery against the first N sample episodes. + + ``existing`` short-circuits the VLM call when ``config.reuse_existing`` + is True and an on-disk vocabulary is already present — keeps re-runs + cheap and lets operators hand-edit the file without it getting + overwritten. + """ + if existing is not None and self.config.reuse_existing: + logger.info( + "vocabulary: reusing existing (%d subtasks, %d memory milestones)", + len(existing.subtasks), + len(existing.memory_milestones), + ) + return existing + + sample = list(records[: max(1, int(self.config.sample_episodes))]) + if not sample: + return None + + task_hint = next((r.episode_task for r in sample if r.episode_task), "") + prompt = load_prompt("module_0_vocabulary").format( + episode_task=task_hint or "(unspecified)", + n_episodes=len(sample), + n_subtask_target=int(self.config.n_subtask_target), + n_memory_target=int(self.config.n_memory_target), + ) + # Pack one video block per sample episode so the VLM sees the + # variation across episodes (different starting poses, different + # object placements) rather than overfitting to one trajectory. + content: list[dict[str, Any]] = [] + for record in sample: + video_frames = self.frame_provider.video_for_episode( + record, int(self.config.max_video_frames_per_episode) + ) + if video_frames: + content.extend(to_video_block(video_frames)) + content.append({"type": "text", "text": prompt}) + messages = [{"role": "user", "content": content}] + + result = self.vlm.generate_json([messages])[0] + if not isinstance(result, dict): + logger.warning("vocabulary: VLM did not return a JSON object — skipping") + return None + + vocab = Vocabulary.from_json(result) + if vocab.is_empty(): + logger.warning("vocabulary: VLM returned an empty vocabulary — skipping") + return None + logger.info( + "vocabulary: discovered %d subtask labels + %d memory milestones from %d episodes", + len(vocab.subtasks), + len(vocab.memory_milestones), + len(sample), + ) + return vocab diff --git a/src/lerobot/scripts/lerobot_annotate.py b/src/lerobot/scripts/lerobot_annotate.py index 7fee1f052..52309b827 100644 --- a/src/lerobot/scripts/lerobot_annotate.py +++ b/src/lerobot/scripts/lerobot_annotate.py @@ -40,6 +40,7 @@ from lerobot.annotations.steerable_pipeline.modules import ( ) from lerobot.annotations.steerable_pipeline.validator import StagingValidator from lerobot.annotations.steerable_pipeline.vlm_client import make_vlm_client +from lerobot.annotations.steerable_pipeline.vocabulary import VocabularyDiscoveryModule from lerobot.annotations.steerable_pipeline.writer import LanguageColumnsWriter from lerobot.configs import parser @@ -88,6 +89,9 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: vlm=vlm, config=cfg.interjections, seed=cfg.seed, frame_provider=frame_provider ) vqa = GeneralVqaModule(vlm=vlm, config=cfg.vqa, seed=cfg.seed, frame_provider=frame_provider) + vocabulary = VocabularyDiscoveryModule( + vlm=vlm, config=cfg.vocabulary, frame_provider=frame_provider + ) writer = LanguageColumnsWriter() validator = StagingValidator( dataset_camera_keys=tuple(getattr(frame_provider, "camera_keys", []) or []) or None, @@ -98,6 +102,7 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: plan=plan, interjections=interjections, vqa=vqa, + vocabulary=vocabulary, writer=writer, validator=validator, ) diff --git a/tests/annotations/test_vocabulary.py b/tests/annotations/test_vocabulary.py new file mode 100644 index 000000000..a9f080a16 --- /dev/null +++ b/tests/annotations/test_vocabulary.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Vocabulary-discovery phase (phase 0) tests.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from lerobot.annotations.steerable_pipeline.config import ( + PlanConfig, + VocabularyConfig, +) +from lerobot.annotations.steerable_pipeline.modules import PlanSubtasksMemoryModule +from lerobot.annotations.steerable_pipeline.reader import iter_episodes +from lerobot.annotations.steerable_pipeline.staging import EpisodeStaging +from lerobot.annotations.steerable_pipeline.vocabulary import ( + Vocabulary, + VocabularyDiscoveryModule, + load_vocabulary, + save_vocabulary, + vocabulary_path, +) + +from ._helpers import make_canned_responder + + +_CANONICAL_SUBTASKS = ( + "grasp blue cube", + "place blue cube in box", + "retract arm", +) +_CANONICAL_MEMORY = ( + "I picked up the blue cube.", + "I placed the blue cube in the box.", +) + + +# --------------------------------------------------------------------------- +# Vocabulary dataclass + on-disk round-trip +# --------------------------------------------------------------------------- + + +def test_vocabulary_roundtrip(tmp_path: Path) -> None: + vocab = Vocabulary( + subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY + ) + save_path = save_vocabulary(tmp_path, vocab) + assert save_path == vocabulary_path(tmp_path) + assert save_path.exists() + + loaded = load_vocabulary(tmp_path) + assert loaded is not None + assert loaded.subtasks == _CANONICAL_SUBTASKS + assert loaded.memory_milestones == _CANONICAL_MEMORY + + +def test_vocabulary_load_missing_returns_none(tmp_path: Path) -> None: + assert load_vocabulary(tmp_path) is None + + +def test_vocabulary_load_malformed_returns_none(tmp_path: Path) -> None: + path = vocabulary_path(tmp_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("{ not valid json", encoding="utf-8") + assert load_vocabulary(tmp_path) is None + + +def test_vocabulary_load_empty_payload_returns_none(tmp_path: Path) -> None: + path = vocabulary_path(tmp_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps({"subtasks": [], "memory_milestones": []}), encoding="utf-8") + assert load_vocabulary(tmp_path) is None + + +# --------------------------------------------------------------------------- +# Discovery module +# --------------------------------------------------------------------------- + + +def test_vocabulary_discovery_calls_vlm_and_returns_vocab( + fixture_dataset_root: Path, +) -> None: + vlm = make_canned_responder( + { + "canonical vocabulary": { + "subtasks": list(_CANONICAL_SUBTASKS), + "memory_milestones": list(_CANONICAL_MEMORY), + } + } + ) + module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig(sample_episodes=2)) + records = list(iter_episodes(fixture_dataset_root)) + vocab = module.discover(records) + assert vocab is not None + assert vocab.subtasks == _CANONICAL_SUBTASKS + assert vocab.memory_milestones == _CANONICAL_MEMORY + + +def test_vocabulary_discovery_reuses_existing(fixture_dataset_root: Path) -> None: + """``reuse_existing=True`` short-circuits the VLM call entirely.""" + + def _explode(_messages): # pragma: no cover - must not be called + raise AssertionError("VLM should not be invoked when reusing existing vocabulary") + + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + vlm = StubVlmClient(responder=_explode) + module = VocabularyDiscoveryModule( + vlm=vlm, config=VocabularyConfig(reuse_existing=True) + ) + records = list(iter_episodes(fixture_dataset_root)) + existing = Vocabulary(subtasks=("a", "b"), memory_milestones=("I a.",)) + vocab = module.discover(records, existing=existing) + assert vocab is existing + + +def test_vocabulary_discovery_empty_payload_returns_none( + fixture_dataset_root: Path, +) -> None: + vlm = make_canned_responder({"canonical vocabulary": {"subtasks": [], "memory_milestones": []}}) + module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig()) + records = list(iter_episodes(fixture_dataset_root)) + assert module.discover(records) is None + + +# --------------------------------------------------------------------------- +# PlanSubtasksMemoryModule consumes the vocabulary +# --------------------------------------------------------------------------- + + +def test_plan_module_inlines_vocab_into_subtask_prompt( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + captured: list[str] = [] + + def responder(messages): + # Find the last user text block and stash it for inspection. + for message in messages: + content = message.get("content") + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + captured.append(block.get("text", "")) + # Return canned subtasks; pick the first two canonical strings so + # the validator accepts them. + return { + "subtasks": [ + {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, + {"text": "place blue cube in box", "start": 0.4, "end": 0.9}, + ] + } + + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + # The subtask prompt (and the memory prompt) carries the canonical + # bullet list so the VLM can't paraphrase them away. + assert any("Canonical subtask labels:" in t for t in captured) + assert any("grasp blue cube" in t for t in captured) + + +def test_plan_module_canonicalizes_paraphrased_subtask( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """Off-vocab paraphrase with high token overlap snaps to canonical form.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + return { + "subtasks": [ + # paraphrase of "grasp blue cube" — overlapping tokens + {"text": "grasp the blue cube", "start": 0.0, "end": 0.4}, + # paraphrase of "place blue cube in box" — high overlap + {"text": "place the blue cube into the box", "start": 0.4, "end": 0.9}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + # Both paraphrases snapped onto canonical strings. + assert subtask_texts == ["grasp blue cube", "place blue cube in box"] + + +def test_plan_module_drops_off_vocab_subtask( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """A subtask with low overlap to every canonical label is dropped.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + return { + "subtasks": [ + # in-vocab + {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, + # off-vocab hallucination — no token overlap above the + # Jaccard floor; should be dropped. + {"text": "perform a fancy macarena dance", "start": 0.4, "end": 0.9}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + assert subtask_texts == ["grasp blue cube"] + + +def test_plan_module_without_vocab_passes_through( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """No vocabulary configured → original free-form behavior is preserved.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + return { + "subtasks": [ + {"text": "any free-form text the VLM wants", "start": 0.0, "end": 1.0}, + ] + } + + vlm = StubVlmClient(responder=responder) + module = PlanSubtasksMemoryModule( + vlm=vlm, config=PlanConfig(n_task_rephrasings=0) + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + assert subtask_texts == ["any free-form text the VLM wants"] From 369ab17110829fca54c33717ad47929830ba3dbf Mon Sep 17 00:00:00 2001 From: pepijn Date: Fri, 22 May 2026 11:43:06 +0000 Subject: [PATCH 4/8] fix(annotate): update run_hf_job CLI args for renamed namespaces + phase 0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three stale things in the launcher script: - ``--module_1/2/3.*`` no longer exist; review commit fd18beb renamed the CLI namespaces to ``--plan/interjections/vqa``. Forwarded all eight existing args to their new names. - ``--push_to_hub`` is now a bool; the destination repo lives at ``--dest_repo_id``. Split the single positional into both args. - ``openai`` was missing from the pip install list, which the prior review review (claude bot, 2026-05-08) flagged — the default vlm backend is ``openai`` so the job would have ImportError'd. Added. Also expose the new phase 0 (canonical vocabulary discovery) knobs explicitly: ``--vocabulary.sample_episodes``, ``--n_subtask_target``, ``--n_memory_target``. Defaults are sane (3 / 10 / 6) but worth flagging in the example so the operator knows what they're running. Update the docstring + section comments to match the current phase layout (vocabulary → plan → interjections → vqa → writer). Co-Authored-By: Claude Opus 4.7 (1M context) Co-authored-by: Cursor --- examples/annotations/run_hf_job.py | 66 ++++++++++++++---------------- 1 file changed, 30 insertions(+), 36 deletions(-) diff --git a/examples/annotations/run_hf_job.py b/examples/annotations/run_hf_job.py index 8dd354c7d..905811d14 100644 --- a/examples/annotations/run_hf_job.py +++ b/examples/annotations/run_hf_job.py @@ -1,38 +1,22 @@ #!/usr/bin/env python - -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. """Launch ``lerobot-annotate`` on a Hugging Face job (vllm + Qwen3.6 MoE). Spawns one ``h200x2`` job that: 1. installs this branch of ``lerobot`` plus the annotation extras, 2. boots two vllm servers (one per GPU) with Qwen3.6-35B-A3B-FP8, - 3. runs the plan / interjections / vqa modules across the dataset, - 4. uploads the annotated dataset back to ``--repo_id`` (or to - ``--dest_repo_id`` when set). - -``--repo_id`` is the download source and, with ``--push_to_hub=true``, also -the default upload destination — the job annotates the dataset in place. -Pass ``--dest_repo_id`` to push the result to a separate repo instead and -leave the source untouched. + 3. discovers the dataset's canonical subtask + memory vocabulary + from the first 3 sample episodes (phase 0), + 4. runs the plan / interjections / vqa modules across the dataset + (subtasks + memory are constrained to the canonical vocabulary), + 5. uploads the annotated dataset to ``--dest_repo_id`` (when set) + or back to ``--repo_id``. Usage: HF_TOKEN=hf_... uv run python examples/annotations/run_hf_job.py -Adjust ``CMD`` below to point at your own dataset. +Adjust ``CMD`` below to point at your own dataset / target hub repo. """ import os @@ -48,19 +32,14 @@ CMD = ( "pip install --no-deps " "'lerobot @ git+https://github.com/huggingface/lerobot.git@feat/language-annotation-pipeline' && " "pip install --upgrade-strategy only-if-needed " - # Mirror lerobot's [annotations] runtime deps. ``openai`` is required - # because ``VlmConfig.backend`` defaults to ``"openai"`` (which talks - # to a vllm/transformers/ktransformers OpenAI-compatible server). - "datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include " - "toml typing-inspect openai && " + "datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include toml typing-inspect " + "openai && " "export VLLM_MEMORY_PROFILER_ESTIMATE_CUDAGRAPHS=0 && " "export VLLM_VIDEO_BACKEND=pyav && " "lerobot-annotate " - # The dataset to annotate. By default it is also the push destination - # (annotate in place); pass --dest_repo_id to push to a separate repo. - "--repo_id=/ " + "--repo_id=imstevenpmwork/super_poulain_draft " + "--dest_repo_id=pepijn223/super_poulain_vocab " "--push_to_hub=true " - # "--dest_repo_id=/ " "--vlm.backend=openai " "--vlm.model_id=Qwen/Qwen3.6-35B-A3B-FP8 " "--vlm.parallel_servers=2 " @@ -69,15 +48,30 @@ CMD = ( "--tensor-parallel-size 1 --max-model-len 32768 " '--gpu-memory-utilization 0.8 --uvicorn-log-level warning --port {port}" ' "--vlm.serve_ready_timeout_s=1800 " - "--vlm.client_concurrency=256 " + "--vlm.client_concurrency=128 " "--vlm.max_new_tokens=512 " - "--executor.episode_parallelism=32 " - "--vlm.chat_template_kwargs='{enable_thinking: false}' " + "--vlm.temperature=0.7 " + "--executor.episode_parallelism=16 " + "--vlm.chat_template_kwargs='{\"enable_thinking\": false}' " "--vlm.camera_key=observation.images.wrist " + # Phase 0 — canonical vocabulary discovery from the first N sample + # episodes. The resulting meta/canonical_vocabulary.json constrains + # every subtask + memory string to a small repeatable target + # distribution; tune the counts for your task complexity. + "--vocabulary.sample_episodes=3 " + "--vocabulary.n_subtask_target=10 " + "--vocabulary.n_memory_target=6 " + # Phase 1 — plan module (subtasks + plan + memory + task_aug). "--plan.frames_per_second=1.0 " "--plan.use_video_url=true " "--plan.use_video_url_fps=1.0 " - "--vqa.K=1 --vqa.vqa_emission_hz=0.2" + "--plan.derive_task_from_video=always " + "--plan.n_task_rephrasings=30 " + # Phase 2 — interjections + speech. + "--interjections.max_interjections_per_episode=6 " + # Phase 4 — general VQA. + "--vqa.K=3 " + "--vqa.vqa_emission_hz=1.0" ) job = run_job( From 54221ceea245dbc0769155d77d7142242afe842f Mon Sep 17 00:00:00 2001 From: pepijn Date: Fri, 22 May 2026 11:46:31 +0000 Subject: [PATCH 5/8] feat(annotate): let the VLM decide vocabulary size MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hardcoding ``n_subtask_target=10`` and ``n_memory_target=6`` baked task complexity into the config — a simple pick-and-place needs ~6, a multi-step recipe needs ~20. The VLM already sees the clips, so let it pick the count itself from what's recurring across episodes. Drop both knobs from ``VocabularyConfig`` and the ``module_0_vocabulary`` prompt template. The prompt now says "decide the count yourself based on what you see — the smallest set that still covers every recurring phase" and adds an "each label must recur across the demos" rule so the VLM filters out one-off motions. Update the launcher script + docs to remove the old knobs. Co-Authored-By: Claude Opus 4.7 (1M context) Co-authored-by: Cursor --- docs/source/annotation_pipeline.mdx | 21 ++++++++++--------- examples/annotations/run_hf_job.py | 9 ++++---- .../annotations/steerable_pipeline/config.py | 15 +++++++------ .../prompts/module_0_vocabulary.txt | 11 ++++++++-- .../steerable_pipeline/vocabulary.py | 2 -- 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/docs/source/annotation_pipeline.mdx b/docs/source/annotation_pipeline.mdx index 6e4cd9563..9d6e66231 100644 --- a/docs/source/annotation_pipeline.mdx +++ b/docs/source/annotation_pipeline.mdx @@ -24,16 +24,17 @@ rewrites the data shards in place: The `plan` module is constrained to a **canonical vocabulary** discovered once per dataset by the `vocabulary` module (phase 0). It watches a few sample episode videos (`--vocabulary.sample_episodes`, default `3`) and -asks the VLM to derive a small set of imperative subtask labels -(~`--vocabulary.n_subtask_target`, default `10`) and first-person memory -milestones (~`--vocabulary.n_memory_target`, default `6`) that recur -across the demos. The result lands at -`meta/canonical_vocabulary.json` (human-readable / hand-editable) and is -reused on every subsequent run. The `plan` module then constrains both -subtask + memory generation to those exact strings — the downstream -low-level policy sees a small, repeatable target distribution instead of -thousands of LLM paraphrases. Disable with `--vocabulary.enabled=False` -to fall back to free-form generation. +asks the VLM to derive a small set of imperative subtask labels and +first-person memory milestones that recur across the demos. The VLM +picks the right number of entries itself based on what it sees in the +clips — short pick-and-place demos get ~6 subtask labels, longer +multi-step recipes get more. The result lands at +`meta/canonical_vocabulary.json` (human-readable / hand-editable) and +is reused on every subsequent run. The `plan` module then constrains +both subtask + memory generation to those exact strings — the +downstream low-level policy sees a small, repeatable target +distribution instead of thousands of LLM paraphrases. Disable with +`--vocabulary.enabled=False` to fall back to free-form generation. The writer does **not** add a `tools` column to the parquet — the tool catalog lives at `meta/info.json["tools"]` instead (see diff --git a/examples/annotations/run_hf_job.py b/examples/annotations/run_hf_job.py index 905811d14..f3e497039 100644 --- a/examples/annotations/run_hf_job.py +++ b/examples/annotations/run_hf_job.py @@ -55,12 +55,11 @@ CMD = ( "--vlm.chat_template_kwargs='{\"enable_thinking\": false}' " "--vlm.camera_key=observation.images.wrist " # Phase 0 — canonical vocabulary discovery from the first N sample - # episodes. The resulting meta/canonical_vocabulary.json constrains - # every subtask + memory string to a small repeatable target - # distribution; tune the counts for your task complexity. + # episodes. The VLM picks the right number of subtask + memory + # entries itself from what it sees; the resulting + # meta/canonical_vocabulary.json constrains every subtask + memory + # string to a small repeatable target distribution. "--vocabulary.sample_episodes=3 " - "--vocabulary.n_subtask_target=10 " - "--vocabulary.n_memory_target=6 " # Phase 1 — plan module (subtasks + plan + memory + task_aug). "--plan.frames_per_second=1.0 " "--plan.use_video_url=true " diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index d80e4149a..da07d7998 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -26,12 +26,13 @@ class VocabularyConfig: """Phase 0 — dataset-level canonical vocabulary discovery. Watches the first ``sample_episodes`` episode videos and asks the VLM - to derive a small canonical vocabulary (~``n_subtask_target`` subtask - labels + ~``n_memory_target`` memory milestones) that every episode - in the dataset will reuse. The output lands at - ``meta/canonical_vocabulary.json`` and feeds phase 1's subtask + - memory generation as both a prompt-side constraint and a post-VLM - validation gate. + to derive a small canonical vocabulary (subtask labels + memory + milestones) that every episode in the dataset will reuse. The VLM + decides the count itself from what it sees in the clips — short + pick-and-place demos get ~6 labels, longer multi-step recipes more. + The output lands at ``meta/canonical_vocabulary.json`` and feeds + phase 1's subtask + memory generation as both a prompt-side + constraint and a post-VLM validation gate. Why this exists: free-form LLM rephrasing per episode produces near- unique subtask strings, which makes the downstream low-level policy's @@ -48,8 +49,6 @@ class VocabularyConfig: enabled: bool = True sample_episodes: int = 3 - n_subtask_target: int = 10 - n_memory_target: int = 6 max_video_frames_per_episode: int = 32 # When True (default), an existing meta/canonical_vocabulary.json is # loaded as-is and no VLM call is made — lets operators hand-edit the diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt index f867c34b5..00c29be4e 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt @@ -8,6 +8,13 @@ conditioned on these strings — duplicate phrasings (e.g. "grasp blue cube" vs "pick up the blue cube") would destroy the conditioning, so pick one wording per concept and reuse it everywhere. +Decide how many entries each list needs YOURSELF based on what you see — +the smallest set that still covers every recurring phase in the demos. +A simple two-object pick-and-place might need ~6 subtask labels and 2 +memory milestones; a long multi-step recipe needs more. Err on the side +of FEWER — extra entries that don't recur across episodes weaken the +conditioning. + You output two lists: 1. `subtasks`: imperative, telegraphic commands the robot can execute. @@ -16,7 +23,8 @@ You output two lists: "cube" — never "block" / "object"). - Atomic — one skill per subtask (gripper-open events, contact, regrasps, transitions all become cut points). - - Aim for ~{n_subtask_target} labels. Fewer is better than more. + - Each label must recur across the demos. If you see a motion only + once across all sample clips, it probably isn't a canonical phase. - Good: "move to blue cube", "grasp blue cube", "lift blue cube", "place blue cube in box", "release blue cube", "retract arm". - Bad: "the robot arm moves towards the blue cube" (third person, @@ -30,7 +38,6 @@ You output two lists: should NOT. - First person, past tense. Start with "I". - One sentence. Functional outcome only — no grasp / motion detail. - - Aim for ~{n_memory_target} milestones. - Good: "I picked up the blue cube.", "I placed the blue cube in the green box.", "I wiped the counter." - Bad: "The robot arm grasped the blue cube." (third person), diff --git a/src/lerobot/annotations/steerable_pipeline/vocabulary.py b/src/lerobot/annotations/steerable_pipeline/vocabulary.py index 8787ec372..121cef849 100644 --- a/src/lerobot/annotations/steerable_pipeline/vocabulary.py +++ b/src/lerobot/annotations/steerable_pipeline/vocabulary.py @@ -190,8 +190,6 @@ class VocabularyDiscoveryModule: prompt = load_prompt("module_0_vocabulary").format( episode_task=task_hint or "(unspecified)", n_episodes=len(sample), - n_subtask_target=int(self.config.n_subtask_target), - n_memory_target=int(self.config.n_memory_target), ) # Pack one video block per sample episode so the VLM sees the # variation across episodes (different starting poses, different From 336af85c09b57ae21672d8c56f71d11a53fa4042 Mon Sep 17 00:00:00 2001 From: pepijn Date: Fri, 22 May 2026 12:44:03 +0000 Subject: [PATCH 6/8] fix(annotate): never leave an episode with zero canonical subtasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the canonical vocabulary is enabled and the VLM produces spans that don't overlap any canonical label, the previous Jaccard-floor (0.5) dropped them and the episode came out with no subtasks at all — invisible to the downstream policy. Observed on ``pepijn223/super_poulain_vocab``: some episodes had empty subtask columns because every VLM-emitted phrase scored below 0.5 against the discovered vocabulary. Two-pass canonicalisation: - First pass keeps the Jaccard floor (lowered from 0.5 → 0.25, to let mild paraphrases through) and drops everything below. - If that first pass leaves the episode with **zero** subtasks, fall back to a second pass that always snaps each VLM span to its nearest canonical label by Jaccard (no floor). The episode ends up with subtasks even when the vocabulary missed a phase — a slightly-wrong canonical label is still closer to the right motion than nothing at all. - Log loudly when the fallback fires so the operator can spot coverage gaps in ``meta/canonical_vocabulary.json``. - Log a per-episode count at INFO when some (but not all) spans were dropped so it's visible without spamming the run output. Promote the Jaccard floor + ignore-tokens to class constants so they're a single edit point. Add ``force=True`` parameter to ``_canonicalize_subtask`` for the no-floor fallback path. New test ``test_plan_module_snaps_when_all_off_vocab`` covers the fallback; existing ``test_plan_module_drops_off_vocab_subtask`` is adjusted to keep at least one in-vocab span so the floor path can still fire and is exercised. All 12 vocabulary tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) Co-authored-by: Cursor --- .../modules/plan_subtasks_memory.py | 95 ++++++++++++++----- tests/annotations/test_vocabulary.py | 48 +++++++++- 2 files changed, 119 insertions(+), 24 deletions(-) diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index 218d98ad1..ed226bd02 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -328,7 +328,7 @@ class PlanSubtasksMemoryModule: # clamp to [t0, t_last] and sort t0 = record.frame_timestamps[0] t_last = record.frame_timestamps[-1] - cleaned: list[dict[str, Any]] = [] + raw: list[dict[str, Any]] = [] for span in spans: try: start = float(span["start"]) @@ -340,12 +340,45 @@ class PlanSubtasksMemoryModule: end = max(t0, min(end, t_last)) if end < start: start, end = end, start - if not text: - continue - text = self._canonicalize_subtask(text) - if not text: - continue - cleaned.append({"text": text, "start": start, "end": end}) + if text: + raw.append({"text": text, "start": start, "end": end}) + + # Without a vocabulary, free-form spans pass through unchanged. + if self.vocabulary is None or not self.vocabulary.subtasks: + raw.sort(key=lambda s: s["start"]) + return raw + + # With a vocabulary, snap each span to the closest canonical + # label. Two-pass: first try the normal Jaccard floor (drops + # off-topic hallucinations); if that leaves the episode with + # zero subtasks, fall back to snap-without-floor so the episode + # is never silently emptied — a wrong canonical label is still + # closer to the right phase than nothing at all. + cleaned: list[dict[str, Any]] = [] + for span in raw: + mapped = self._canonicalize_subtask(span["text"]) + if mapped: + cleaned.append({**span, "text": mapped}) + if not cleaned and raw: + logger.warning( + "episode %d: every VLM subtask was off-vocabulary " + "(%d spans); snapping to closest canonical label anyway " + "(check meta/canonical_vocabulary.json for missing phases)", + record.episode_index, + len(raw), + ) + for span in raw: + mapped = self._canonicalize_subtask(span["text"], force=True) + if mapped: + cleaned.append({**span, "text": mapped}) + elif len(cleaned) < len(raw): + logger.info( + "episode %d: %d/%d subtasks survived canonicalisation; " + "the rest were off-vocabulary", + record.episode_index, + len(cleaned), + len(raw), + ) cleaned.sort(key=lambda s: s["start"]) return cleaned @@ -387,15 +420,28 @@ class PlanSubtasksMemoryModule: f"{bullets}\n\n" ) - def _canonicalize_subtask(self, text: str) -> str: - """Snap ``text`` to the closest canonical subtask string, or drop it. + _CANONICALIZE_JACCARD_FLOOR: float = 0.25 + _CANONICALIZE_IGNORE_TOKENS: frozenset[str] = frozenset( + {"the", "a", "an", "to", "into", "from", "of", "on", "over", "at"} + ) + + def _canonicalize_subtask(self, text: str, *, force: bool = False) -> str: + """Snap ``text`` to the closest canonical subtask string. Without a vocabulary, the original text passes through. With a vocabulary, an exact case-insensitive match wins; failing that, the best Jaccard overlap on the word set is used as a tolerant - fuzzy match (handles articles / minor reorderings). If nothing - clears the floor, the subtask is dropped — better to skip a - phase than to feed the action expert an off-distribution string. + fuzzy match (handles articles / minor reorderings). + + Behaviour at the Jaccard floor depends on ``force``: + - ``force=False`` (default): below ``_CANONICALIZE_JACCARD_FLOOR`` + the subtask is dropped. ``_generate_subtasks`` runs this first + to filter genuine off-topic hallucinations. + - ``force=True``: always snap, no floor. ``_generate_subtasks`` + uses this in a second pass when the first pass would otherwise + empty the episode — a slightly-wrong canonical label is still + closer to the right phase than no subtask at all, which makes + the whole episode invisible to the downstream policy. """ if self.vocabulary is None or not self.vocabulary.subtasks: return text.strip() @@ -406,14 +452,17 @@ class PlanSubtasksMemoryModule: if candidate.lower() == lowered: return candidate # Jaccard fallback: token-set overlap, drop articles + adverbs. - ignore = {"the", "a", "an", "to", "into", "from", "of", "on", "over", "at"} - words = {w for w in lowered.replace(",", " ").split() if w and w not in ignore} + words = { + w for w in lowered.replace(",", " ").split() + if w and w not in self._CANONICALIZE_IGNORE_TOKENS + } if not words: return "" best: tuple[float, str] | None = None for candidate in candidates: cand_words = { - w for w in candidate.lower().replace(",", " ").split() if w and w not in ignore + w for w in candidate.lower().replace(",", " ").split() + if w and w not in self._CANONICALIZE_IGNORE_TOKENS } if not cand_words: continue @@ -422,14 +471,16 @@ class PlanSubtasksMemoryModule: score = inter / union if union else 0.0 if best is None or score > best[0]: best = (score, candidate) - # Floor: require at least ~half the tokens to overlap. Below that - # the VLM is hallucinating a novel phrase; drop rather than warp - # it into something semantically wrong. - if best is None or best[0] < 0.5: - logger.warning( - "subtask %r did not match any canonical label (best=%s) — dropping", + if best is None: + return "" + if not force and best[0] < self._CANONICALIZE_JACCARD_FLOOR: + logger.info( + "subtask %r dropped — best canonical match %r scored %.2f " + "(< %.2f Jaccard floor)", cleaned, - best, + best[1], + best[0], + self._CANONICALIZE_JACCARD_FLOOR, ) return "" return best[1] diff --git a/tests/annotations/test_vocabulary.py b/tests/annotations/test_vocabulary.py index a9f080a16..20d22a50d 100644 --- a/tests/annotations/test_vocabulary.py +++ b/tests/annotations/test_vocabulary.py @@ -217,13 +217,20 @@ def test_plan_module_canonicalizes_paraphrased_subtask( def test_plan_module_drops_off_vocab_subtask( fixture_dataset_root: Path, tmp_path: Path ) -> None: - """A subtask with low overlap to every canonical label is dropped.""" + """A subtask with low overlap to every canonical label is dropped. + + Drop only kicks in when *at least one* other subtask survives — if + every span would be dropped the episode would come out empty, so + ``_generate_subtasks`` falls back to snap-without-floor; that path + is exercised by ``test_plan_module_snaps_when_all_off_vocab``. + """ from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient def responder(_messages): return { "subtasks": [ - # in-vocab + # in-vocab — keeps the episode non-empty so the floor + # is allowed to drop the next span. {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, # off-vocab hallucination — no token overlap above the # Jaccard floor; should be dropped. @@ -246,6 +253,43 @@ def test_plan_module_drops_off_vocab_subtask( assert subtask_texts == ["grasp blue cube"] +def test_plan_module_snaps_when_all_off_vocab( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """All-off-vocab spans snap to nearest canonical instead of emptying the episode.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + return { + "subtasks": [ + # Both off-vocab — would normally be dropped. The + # fallback should snap each to its best canonical match + # rather than leave the episode with no subtasks at all. + {"text": "make a smoothie", "start": 0.0, "end": 0.4}, + {"text": "consult the wizard", "start": 0.4, "end": 0.9}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + # Two off-vocab spans → two canonical subtasks (snapped to nearest + # by Jaccard with no floor). The exact canonical choice doesn't + # matter — only that the episode came out with subtasks rather + # than empty. + assert len(subtask_texts) == 2 + assert all(s in _CANONICAL_SUBTASKS for s in subtask_texts) + + def test_plan_module_without_vocab_passes_through( fixture_dataset_root: Path, tmp_path: Path ) -> None: From a15e16c0721aeee195ce5e5b6f5851479d1ce418 Mon Sep 17 00:00:00 2001 From: pepijn Date: Sat, 23 May 2026 09:57:27 +0000 Subject: [PATCH 7/8] fix(annotate): replace fuzzy subtask snapping with strict match + one-shot retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Jaccard-overlap snap was warping VLM output into wrong canonical labels — e.g. an off-vocab "consult the wizard" span would silently become "grasp blue cube" if that scored highest. Even with a higher floor the operator can't tell which subtasks were paraphrases vs genuine mislabels in the resulting dataset. Replace with strict exact-match validation + a single targeted retry: 1. Generate subtasks as before. 2. If any returned subtask's normalised form (lowercased, articles stripped, whitespace collapsed) isn't in the canonical vocab, fire one retry call naming the offending strings and re-sending the full canonical list. The retry prompt requires byte-identical output from the vocab. 3. After the retry, validate again. Spans still off-vocab are dropped — no fuzzy snapping ever produces a different canonical label than the VLM actually emitted. 4. If every span ends up off-vocab even after the retry, warn loudly so the operator extends ``meta/canonical_vocabulary.json`` to cover the missing phase. The episode is left with empty subtasks rather than silently fabricated ones — visibility > sweep-under- the-rug. Promote ``_NORMALIZE_STRIP_TOKENS`` to a class constant and split the normalisation helper out so the retry-validation and the final canonicalisation share one source of truth. Tests: - test_plan_module_accepts_article_only_difference: "grasp the blue cube" still maps to canonical "grasp blue cube" (article-tolerant). - test_plan_module_retries_when_subtask_off_vocab: paraphrase triggers the retry which the VLM corrects in pass 2. - test_plan_module_drops_off_vocab_subtask_after_retry: VLM that refuses to correct → bad span dropped, in-vocab span kept. - test_plan_module_empty_when_all_off_vocab_after_retry: every span off-vocab → episode left empty (no warping). All 13 vocabulary tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) Co-authored-by: Cursor --- .../modules/plan_subtasks_memory.py | 208 ++++++++++-------- tests/annotations/test_vocabulary.py | 164 +++++++++----- 2 files changed, 220 insertions(+), 152 deletions(-) diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index ed226bd02..7d55a5d8d 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -322,13 +322,32 @@ class PlanSubtasksMemoryModule: episode_duration=f"{episode_duration:.3f}", vocabulary_block=self._subtask_vocabulary_block(), ) - spans = self._vlm_field(self._video_message(record, prompt), "subtasks") + messages = self._video_message(record, prompt) + spans = self._vlm_field(messages, "subtasks") + # When a vocabulary is in force, do a single targeted retry if + # any returned subtask is off-vocab — strict exact-match only, + # no fuzzy snapping. The retry includes the offending strings + # and the full canonical list so the VLM can correct itself. + if self.vocabulary is not None and self.vocabulary.subtasks and spans: + invalid = self._invalid_subtasks(spans) + if invalid: + logger.info( + "episode %d: VLM emitted %d off-vocab subtask(s) (%s); retrying once", + record.episode_index, + len(invalid), + invalid, + ) + retry_msg = self._build_subtask_retry_message(messages, invalid) + retried = self._vlm_field(retry_msg, "subtasks") + if retried: + spans = retried + if not spans: return [] # clamp to [t0, t_last] and sort t0 = record.frame_timestamps[0] t_last = record.frame_timestamps[-1] - raw: list[dict[str, Any]] = [] + cleaned: list[dict[str, Any]] = [] for span in spans: try: start = float(span["start"]) @@ -340,46 +359,20 @@ class PlanSubtasksMemoryModule: end = max(t0, min(end, t_last)) if end < start: start, end = end, start - if text: - raw.append({"text": text, "start": start, "end": end}) - - # Without a vocabulary, free-form spans pass through unchanged. - if self.vocabulary is None or not self.vocabulary.subtasks: - raw.sort(key=lambda s: s["start"]) - return raw - - # With a vocabulary, snap each span to the closest canonical - # label. Two-pass: first try the normal Jaccard floor (drops - # off-topic hallucinations); if that leaves the episode with - # zero subtasks, fall back to snap-without-floor so the episode - # is never silently emptied — a wrong canonical label is still - # closer to the right phase than nothing at all. - cleaned: list[dict[str, Any]] = [] - for span in raw: - mapped = self._canonicalize_subtask(span["text"]) - if mapped: - cleaned.append({**span, "text": mapped}) - if not cleaned and raw: - logger.warning( - "episode %d: every VLM subtask was off-vocabulary " - "(%d spans); snapping to closest canonical label anyway " - "(check meta/canonical_vocabulary.json for missing phases)", - record.episode_index, - len(raw), - ) - for span in raw: - mapped = self._canonicalize_subtask(span["text"], force=True) - if mapped: - cleaned.append({**span, "text": mapped}) - elif len(cleaned) < len(raw): - logger.info( - "episode %d: %d/%d subtasks survived canonicalisation; " - "the rest were off-vocabulary", - record.episode_index, - len(cleaned), - len(raw), - ) + if not text: + continue + text = self._canonicalize_subtask(text) + if not text: + continue + cleaned.append({"text": text, "start": start, "end": end}) cleaned.sort(key=lambda s: s["start"]) + if self.vocabulary is not None and self.vocabulary.subtasks and not cleaned: + logger.warning( + "episode %d: every VLM subtask was off-vocab even after retry — " + "episode left empty (extend meta/canonical_vocabulary.json to " + "cover the missing phase)", + record.episode_index, + ) return cleaned # ------------------------------------------------------------------ @@ -420,70 +413,93 @@ class PlanSubtasksMemoryModule: f"{bullets}\n\n" ) - _CANONICALIZE_JACCARD_FLOOR: float = 0.25 - _CANONICALIZE_IGNORE_TOKENS: frozenset[str] = frozenset( - {"the", "a", "an", "to", "into", "from", "of", "on", "over", "at"} - ) + _NORMALIZE_STRIP_TOKENS: frozenset[str] = frozenset({"the", "a", "an"}) - def _canonicalize_subtask(self, text: str, *, force: bool = False) -> str: - """Snap ``text`` to the closest canonical subtask string. + def _canonicalize_subtask(self, text: str) -> str: + """Validate ``text`` against the canonical vocabulary; no fuzzy snap. Without a vocabulary, the original text passes through. With a - vocabulary, an exact case-insensitive match wins; failing that, - the best Jaccard overlap on the word set is used as a tolerant - fuzzy match (handles articles / minor reorderings). + vocabulary, accept the span only if its normalised form (lower- + cased, articles stripped, whitespace collapsed) matches a + canonical entry exactly — the canonical wording is returned so + the supervised string is byte-identical across episodes. - Behaviour at the Jaccard floor depends on ``force``: - - ``force=False`` (default): below ``_CANONICALIZE_JACCARD_FLOOR`` - the subtask is dropped. ``_generate_subtasks`` runs this first - to filter genuine off-topic hallucinations. - - ``force=True``: always snap, no floor. ``_generate_subtasks`` - uses this in a second pass when the first pass would otherwise - empty the episode — a slightly-wrong canonical label is still - closer to the right phase than no subtask at all, which makes - the whole episode invisible to the downstream policy. + Off-vocab spans are dropped (empty string). Upstream + ``_generate_subtasks`` triggers a targeted retry before reaching + the drop path; this function never snaps or warps a span into + a different label. """ if self.vocabulary is None or not self.vocabulary.subtasks: return text.strip() - candidates = self.vocabulary.subtasks - cleaned = text.strip() - lowered = cleaned.lower() - for candidate in candidates: - if candidate.lower() == lowered: + normalised = self._normalize(text) + if not normalised: + return "" + for candidate in self.vocabulary.subtasks: + if self._normalize(candidate) == normalised: return candidate - # Jaccard fallback: token-set overlap, drop articles + adverbs. - words = { - w for w in lowered.replace(",", " ").split() - if w and w not in self._CANONICALIZE_IGNORE_TOKENS - } - if not words: - return "" - best: tuple[float, str] | None = None - for candidate in candidates: - cand_words = { - w for w in candidate.lower().replace(",", " ").split() - if w and w not in self._CANONICALIZE_IGNORE_TOKENS - } - if not cand_words: + return "" + + @classmethod + def _normalize(cls, text: str) -> str: + """Lowercase, strip articles, collapse whitespace, drop punctuation.""" + words = [ + w.strip(".,:;\"'!?()") + for w in text.lower().replace(",", " ").split() + ] + return " ".join(w for w in words if w and w not in cls._NORMALIZE_STRIP_TOKENS) + + def _invalid_subtasks(self, spans: list[dict[str, Any]]) -> list[str]: + """Return the unique off-vocab subtask strings the VLM produced.""" + seen: list[str] = [] + for span in spans: + text = str((span or {}).get("text") or "").strip() + if not text: continue - inter = len(words & cand_words) - union = len(words | cand_words) - score = inter / union if union else 0.0 - if best is None or score > best[0]: - best = (score, candidate) - if best is None: - return "" - if not force and best[0] < self._CANONICALIZE_JACCARD_FLOOR: - logger.info( - "subtask %r dropped — best canonical match %r scored %.2f " - "(< %.2f Jaccard floor)", - cleaned, - best[1], - best[0], - self._CANONICALIZE_JACCARD_FLOOR, - ) - return "" - return best[1] + if self._canonicalize_subtask(text): + continue + if text not in seen: + seen.append(text) + return seen + + def _build_subtask_retry_message( + self, original_messages: list[dict[str, Any]], invalid: list[str] + ) -> list[dict[str, Any]]: + """Compose a one-shot correction prompt naming the off-vocab strings.""" + assert self.vocabulary is not None + canonical = "\n".join(f"- {s}" for s in self.vocabulary.subtasks) + invalid_list = "\n".join(f"- {s!r}" for s in invalid) + correction = ( + "Your previous response included subtask labels that are NOT in " + "the canonical vocabulary:\n" + f"{invalid_list}\n\n" + "Re-emit the same segmentation (same number of spans, same start/end " + "timestamps where they were valid) but replace every off-vocab " + "label with the EXACT canonical string for that phase, copied " + "verbatim from this list:\n" + f"{canonical}\n\n" + "Strict rules:\n" + "- Output strings must be byte-for-byte identical to entries above.\n" + "- No articles, no adverbs, no extra words.\n" + "- If a phase truly has no canonical match, omit that span entirely.\n" + "Return the same JSON shape as before." + ) + # Append the correction as an additional user turn; the model + # sees the original prompt + its prior output is implied by the + # conversation context (the VLM client is stateless, so we + # re-send the original content plus this correction). + retry_messages = [ + { + "role": m.get("role", "user"), + "content": ( + m.get("content") + if isinstance(m.get("content"), str) + else list(m.get("content") or []) + ), + } + for m in original_messages + ] + retry_messages.append({"role": "user", "content": correction}) + return retry_messages def _generate_plan( self, diff --git a/tests/annotations/test_vocabulary.py b/tests/annotations/test_vocabulary.py index 20d22a50d..1f1c046fe 100644 --- a/tests/annotations/test_vocabulary.py +++ b/tests/annotations/test_vocabulary.py @@ -182,59 +182,17 @@ def test_plan_module_inlines_vocab_into_subtask_prompt( assert any("grasp blue cube" in t for t in captured) -def test_plan_module_canonicalizes_paraphrased_subtask( +def test_plan_module_accepts_article_only_difference( fixture_dataset_root: Path, tmp_path: Path ) -> None: - """Off-vocab paraphrase with high token overlap snaps to canonical form.""" + """Articles like 'the'/'a'/'an' are stripped during validation.""" from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient def responder(_messages): return { "subtasks": [ - # paraphrase of "grasp blue cube" — overlapping tokens + # Same canonical phrase modulo "the" — should be accepted. {"text": "grasp the blue cube", "start": 0.0, "end": 0.4}, - # paraphrase of "place blue cube in box" — high overlap - {"text": "place the blue cube into the box", "start": 0.4, "end": 0.9}, - ] - } - - vlm = StubVlmClient(responder=responder) - vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) - module = PlanSubtasksMemoryModule( - vlm=vlm, - config=PlanConfig(n_task_rephrasings=0), - vocabulary=vocab, - ) - record = next(iter_episodes(fixture_dataset_root)) - staging = EpisodeStaging(tmp_path / "stage", record.episode_index) - module.run_episode(record, staging) - rows = staging.read("plan") - subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] - # Both paraphrases snapped onto canonical strings. - assert subtask_texts == ["grasp blue cube", "place blue cube in box"] - - -def test_plan_module_drops_off_vocab_subtask( - fixture_dataset_root: Path, tmp_path: Path -) -> None: - """A subtask with low overlap to every canonical label is dropped. - - Drop only kicks in when *at least one* other subtask survives — if - every span would be dropped the episode would come out empty, so - ``_generate_subtasks`` falls back to snap-without-floor; that path - is exercised by ``test_plan_module_snaps_when_all_off_vocab``. - """ - from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient - - def responder(_messages): - return { - "subtasks": [ - # in-vocab — keeps the episode non-empty so the floor - # is allowed to drop the next span. - {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, - # off-vocab hallucination — no token overlap above the - # Jaccard floor; should be dropped. - {"text": "perform a fancy macarena dance", "start": 0.4, "end": 0.9}, ] } @@ -253,18 +211,114 @@ def test_plan_module_drops_off_vocab_subtask( assert subtask_texts == ["grasp blue cube"] -def test_plan_module_snaps_when_all_off_vocab( +def test_plan_module_retries_when_subtask_off_vocab( fixture_dataset_root: Path, tmp_path: Path ) -> None: - """All-off-vocab spans snap to nearest canonical instead of emptying the episode.""" + """One-shot retry replaces an off-vocab paraphrase with the canonical form.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + call_count = {"n": 0} + + def responder(messages): + call_count["n"] += 1 + # First call: returns an off-vocab paraphrase. + if call_count["n"] == 1: + return { + "subtasks": [ + # paraphrase, not in vocab + {"text": "pick up blue cube", "start": 0.0, "end": 0.4}, + ] + } + # Second call (the retry): should contain the correction prompt; + # respond with the canonical phrase exactly. + last_user_text = "" + for message in messages: + content = message.get("content") + if isinstance(content, str): + last_user_text = content + elif isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + last_user_text = block.get("text", "") + assert "NOT in the canonical vocabulary" in last_user_text + return { + "subtasks": [ + {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + assert subtask_texts == ["grasp blue cube"] + # The retry must have fired exactly once. + assert call_count["n"] == 2 + + +def test_plan_module_drops_off_vocab_subtask_after_retry( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """If the VLM stays off-vocab even after the retry, the bad span is dropped.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + call_count = {"n": 0} + + def responder(_messages): + call_count["n"] += 1 + # Both calls return the same off-vocab span — the model can't + # be corrected. The second call also returns one in-vocab span + # so the episode isn't empty; this lets us check that the + # off-vocab span is dropped without affecting the in-vocab one. + if call_count["n"] == 1: + return { + "subtasks": [ + {"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4}, + {"text": "grasp blue cube", "start": 0.4, "end": 0.9}, + ] + } + return { + "subtasks": [ + {"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4}, + {"text": "grasp blue cube", "start": 0.4, "end": 0.9}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + # Retry fired exactly once; bad span dropped, good span kept. + assert call_count["n"] == 2 + assert subtask_texts == ["grasp blue cube"] + + +def test_plan_module_empty_when_all_off_vocab_after_retry( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """All-off-vocab spans → episode comes out empty (no silent fuzzy snap).""" from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient def responder(_messages): + # Returns the same off-vocab spans on both attempts. return { "subtasks": [ - # Both off-vocab — would normally be dropped. The - # fallback should snap each to its best canonical match - # rather than leave the episode with no subtasks at all. {"text": "make a smoothie", "start": 0.0, "end": 0.4}, {"text": "consult the wizard", "start": 0.4, "end": 0.9}, ] @@ -282,12 +336,10 @@ def test_plan_module_snaps_when_all_off_vocab( module.run_episode(record, staging) rows = staging.read("plan") subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] - # Two off-vocab spans → two canonical subtasks (snapped to nearest - # by Jaccard with no floor). The exact canonical choice doesn't - # matter — only that the episode came out with subtasks rather - # than empty. - assert len(subtask_texts) == 2 - assert all(s in _CANONICAL_SUBTASKS for s in subtask_texts) + # No subtask gets fabricated — better to leave the episode empty + # so the operator notices the vocabulary gap than to silently + # warp the labels. + assert subtask_texts == [] def test_plan_module_without_vocab_passes_through( From 471b2b1b1dee5fde8dba9482bc14bd56c09e8666 Mon Sep 17 00:00:00 2001 From: pepijn Date: Sat, 23 May 2026 19:31:44 +0000 Subject: [PATCH 8/8] fix(annotate): bump same-frame subtasks onto distinct frames MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If two consecutive VLM-emitted subtask spans have ``start`` timestamps that round to the same source frame after ``snap_to_frame`` (e.g. on short episodes the VLM sometimes nominates two ~adjacent action boundaries within one 30 Hz step), the writer emits two ``style=subtask`` rows at the identical persistent timestamp. The training-time renderer's default binding ``subtask: active_at(t, style=subtask)`` then raises: ValueError: Ambiguous resolver for style='subtask'; add role=..., tool_name=..., or camera=... to disambiguate. … and the whole training run dies on the first batch. Observed concretely on ``pepijn223/super_poulain_vocab2`` (job 22159979): episodes 3 and 30 each had two subtask rows at the same timestamp (``release yellow cube`` + ``retract arm`` snapping to the same frame). Add ``_dedupe_starts_to_distinct_frames`` to walk the cleaned span list and, whenever a snapped start collides with one already used, push the later span onto the next free frame timestamp. Both subtasks survive on distinct timestamps; the renderer can now disambiguate. If the episode genuinely has no later free frame (extremely unlikely — would require a same-timestamp collision on the very last frame of the episode), the later span is dropped with a warning rather than left to poison the render. New test ``test_plan_module_bumps_collocated_subtasks_to_distinct_frames`` locks in the contract; full vocabulary suite is 14/14 green. Co-Authored-By: Claude Opus 4.7 (1M context) Co-authored-by: Cursor --- .../modules/plan_subtasks_memory.py | 49 +++++++++++++++++++ tests/annotations/test_vocabulary.py | 45 +++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index 7d55a5d8d..b9bae607e 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -366,6 +366,7 @@ class PlanSubtasksMemoryModule: continue cleaned.append({"text": text, "start": start, "end": end}) cleaned.sort(key=lambda s: s["start"]) + cleaned = self._dedupe_starts_to_distinct_frames(cleaned, record) if self.vocabulary is not None and self.vocabulary.subtasks and not cleaned: logger.warning( "episode %d: every VLM subtask was off-vocab even after retry — " @@ -375,6 +376,54 @@ class PlanSubtasksMemoryModule: ) return cleaned + @staticmethod + def _dedupe_starts_to_distinct_frames( + spans: list[dict[str, Any]], record: EpisodeRecord + ) -> list[dict[str, Any]]: + """Bump same-frame subtask starts onto distinct frames. + + Two consecutive VLM spans whose ``start`` rounds to the same + source frame (after :func:`snap_to_frame`) would otherwise emit + two ``style=subtask`` rows at the identical persistent + timestamp. The training-time renderer's ``active_at(t, + style=subtask)`` resolver can't disambiguate that and raises + ``Ambiguous resolver for style='subtask'``. + + Walk the (sorted-by-start) spans, snap each to its frame, and + if the snapped frame is already taken push the span onto the + next unused frame so both subtasks survive on distinct + timestamps. If the episode ends before a free frame is found, + the trailing span is dropped with a warning — better than + poisoning the render. + """ + if not spans: + return spans + frames = record.frame_timestamps + if not frames: + return spans + used: set[float] = set() + out: list[dict[str, Any]] = [] + for span in spans: + ts = snap_to_frame(span["start"], frames) + if ts in used: + next_ts = next((f for f in frames if f > ts and f not in used), None) + if next_ts is None: + logger.warning( + "episode %d: subtask %r snapped to occupied frame " + "%.3f and no free later frame exists — dropping", + record.episode_index, + span.get("text"), + ts, + ) + continue + ts = next_ts + used.add(ts) + new_span = {**span, "start": ts} + if float(new_span.get("end", ts)) < ts: + new_span["end"] = ts + out.append(new_span) + return out + # ------------------------------------------------------------------ # Canonical-vocabulary helpers # ------------------------------------------------------------------ diff --git a/tests/annotations/test_vocabulary.py b/tests/annotations/test_vocabulary.py index 1f1c046fe..7b820834d 100644 --- a/tests/annotations/test_vocabulary.py +++ b/tests/annotations/test_vocabulary.py @@ -309,6 +309,51 @@ def test_plan_module_drops_off_vocab_subtask_after_retry( assert subtask_texts == ["grasp blue cube"] +def test_plan_module_bumps_collocated_subtasks_to_distinct_frames( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """Two subtasks whose starts snap to the same frame get split onto two frames. + + Without this guard, both spans would emit ``style=subtask`` rows at the + identical persistent timestamp; the training-time renderer's + ``active_at(t, style=subtask)`` then raises an ambiguity error. + """ + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + # Two canonical labels with starts within one frame of each other — + # both snap to the same source frame, so the dedupe pass must bump + # the later one to the next frame. + return { + "subtasks": [ + {"text": "grasp blue cube", "start": 0.40, "end": 0.42}, + {"text": "place blue cube in box", "start": 0.41, "end": 0.50}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_rows = [r for r in rows if r["style"] == "subtask"] + # Both subtasks present, both on distinct timestamps. + assert len(subtask_rows) == 2 + timestamps = [r["timestamp"] for r in subtask_rows] + assert len(set(timestamps)) == 2, f"subtask timestamps collide: {timestamps}" + # Order preserved: the chronologically earlier span keeps the earlier + # frame, the later one was bumped onto the next available frame. + assert subtask_rows[0]["content"] == "grasp blue cube" + assert subtask_rows[1]["content"] == "place blue cube in box" + assert subtask_rows[1]["timestamp"] > subtask_rows[0]["timestamp"] + + def test_plan_module_empty_when_all_off_vocab_after_retry( fixture_dataset_root: Path, tmp_path: Path ) -> None: