diff --git a/docs/source/annotation_pipeline.mdx b/docs/source/annotation_pipeline.mdx index b5f52cb1c..6e4cd9563 100644 --- a/docs/source/annotation_pipeline.mdx +++ b/docs/source/annotation_pipeline.mdx @@ -7,7 +7,8 @@ ## What the pipeline produces -Three modules write into a per-episode staging tree, then a single writer +A vocabulary-discovery phase derives a small canonical wording, then three +modules write into a per-episode staging tree, then a single writer rewrites the data shards in place: | Style / atom | Column | Module | @@ -20,6 +21,20 @@ rewrites the data shards in place: | speech tool-call atom (`style=null`, `say`) | `language_events` | `interjections`| | `vqa` (user / assistant pair) | `language_events` | `vqa` | +The `plan` module is constrained to a **canonical vocabulary** discovered +once per dataset by the `vocabulary` module (phase 0). It watches a few +sample episode videos (`--vocabulary.sample_episodes`, default `3`) and +asks the VLM to derive a small set of imperative subtask labels +(~`--vocabulary.n_subtask_target`, default `10`) and first-person memory +milestones (~`--vocabulary.n_memory_target`, default `6`) that recur +across the demos. The result lands at +`meta/canonical_vocabulary.json` (human-readable / hand-editable) and is +reused on every subsequent run. The `plan` module then constrains both +subtask + memory generation to those exact strings — the downstream +low-level policy sees a small, repeatable target distribution instead of +thousands of LLM paraphrases. Disable with `--vocabulary.enabled=False` +to fall back to free-form generation. + The writer does **not** add a `tools` column to the parquet — the tool catalog lives at `meta/info.json["tools"]` instead (see [Tools](./tools)). After every annotation run the pipeline ensures the diff --git a/src/lerobot/annotations/steerable_pipeline/__init__.py b/src/lerobot/annotations/steerable_pipeline/__init__.py index a8da5e05e..02d819604 100644 --- a/src/lerobot/annotations/steerable_pipeline/__init__.py +++ b/src/lerobot/annotations/steerable_pipeline/__init__.py @@ -26,11 +26,25 @@ outputs are staged per-episode before a final parquet rewrite: from .config import AnnotationPipelineConfig from .validator import StagingValidator, ValidationReport +from .vocabulary import ( + VOCABULARY_FILENAME, + Vocabulary, + VocabularyDiscoveryModule, + load_vocabulary, + save_vocabulary, + vocabulary_path, +) from .writer import LanguageColumnsWriter __all__ = [ + "VOCABULARY_FILENAME", "AnnotationPipelineConfig", "LanguageColumnsWriter", "StagingValidator", "ValidationReport", + "Vocabulary", + "VocabularyDiscoveryModule", + "load_vocabulary", + "save_vocabulary", + "vocabulary_path", ] diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index c609cd286..d80e4149a 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -21,6 +21,42 @@ from pathlib import Path from typing import Any +@dataclass +class VocabularyConfig: + """Phase 0 — dataset-level canonical vocabulary discovery. + + Watches the first ``sample_episodes`` episode videos and asks the VLM + to derive a small canonical vocabulary (~``n_subtask_target`` subtask + labels + ~``n_memory_target`` memory milestones) that every episode + in the dataset will reuse. The output lands at + ``meta/canonical_vocabulary.json`` and feeds phase 1's subtask + + memory generation as both a prompt-side constraint and a post-VLM + validation gate. + + Why this exists: free-form LLM rephrasing per episode produces near- + unique subtask strings, which makes the downstream low-level policy's + conditioning effectively noise — at inference the policy generates a + *new* paraphrase the action expert has never seen and produces tiny + cautious actions. Forcing every episode onto the same small set of + canonical strings gives the action expert dense supervision per + string and a small target distribution to learn against. + + Set ``enabled=False`` to fall back to free-form generation (original + behaviour). ``reuse_existing=True`` keeps a hand-edited vocabulary + file from being clobbered on re-runs. + """ + + enabled: bool = True + sample_episodes: int = 3 + n_subtask_target: int = 10 + n_memory_target: int = 6 + max_video_frames_per_episode: int = 32 + # When True (default), an existing meta/canonical_vocabulary.json is + # loaded as-is and no VLM call is made — lets operators hand-edit the + # file. Set False to always rediscover from the sample episodes. + reuse_existing: bool = True + + @dataclass class PlanConfig: """``plan`` module: plan + subtasks + memory + task augmentation. @@ -186,6 +222,7 @@ class AnnotationPipelineConfig: seed: int = 1729 + vocabulary: VocabularyConfig = field(default_factory=VocabularyConfig) plan: PlanConfig = field(default_factory=PlanConfig) interjections: InterjectionsConfig = field(default_factory=InterjectionsConfig) vqa: VqaConfig = field(default_factory=VqaConfig) diff --git a/src/lerobot/annotations/steerable_pipeline/executor.py b/src/lerobot/annotations/steerable_pipeline/executor.py index ad46d4750..5c725fa65 100644 --- a/src/lerobot/annotations/steerable_pipeline/executor.py +++ b/src/lerobot/annotations/steerable_pipeline/executor.py @@ -15,8 +15,14 @@ # limitations under the License. """In-process executor that runs the annotation phases. -The executor plans **six phases** in the dependency order from the plan: +The executor plans **seven phases** in the dependency order from the plan: + phase 0: vocabulary discovery — derive a small canonical vocabulary + from the first few sample-episode videos (subtask labels + + memory milestones) and persist it next to the dataset; the + ``plan`` module then constrains every per-episode generation + to those strings, so the downstream policy sees a small, + repeatable conditioning distribution phase 1: ``plan`` module (plan + subtasks + memory) phase 2: ``interjections`` module (interjections + speech) phase 3: ``plan`` plan-update pass — re-runs plan emission at every @@ -88,6 +94,7 @@ class Executor: vqa: Any # GeneralVqaModule writer: LanguageColumnsWriter validator: StagingValidator + vocabulary: Any = None # VocabularyDiscoveryModule | None def run(self, root: Path) -> PipelineRunSummary: records = list(iter_episodes(root, only_episodes=self.config.only_episodes)) @@ -102,6 +109,10 @@ class Executor: phases: list[PhaseResult] = [] + # Phase 0: vocabulary discovery. Mutates ``self.plan.vocabulary`` + # so subsequent per-episode plan calls see the canonical labels. + phases.append(self._run_vocabulary_phase(records, root)) + # Phase 1: ``plan`` module (plan + subtasks + memory) phases.append(self._run_module_phase("plan", records, staging_dir, self.plan)) # Phase 2: ``interjections`` module (interjections + speech). It @@ -172,6 +183,62 @@ class Executor: flush=True, ) + def _run_vocabulary_phase( + self, records: list[EpisodeRecord], root: Path + ) -> PhaseResult: + """Discover (or load) the canonical vocabulary, wire it into ``self.plan``. + + Returns a ``PhaseResult`` whose ``episodes_processed`` is the number + of sample episodes consulted (0 when disabled or no VLM call was + needed); ``episodes_skipped`` is always ``0`` because vocabulary is + a once-per-dataset artifact, not a per-episode product. + """ + from .vocabulary import load_vocabulary, save_vocabulary # noqa: PLC0415 + + if self.vocabulary is None or not getattr(self.vocabulary, "enabled", False): + print( + "[annotate] phase=vocabulary skipped (module disabled or unset)", + flush=True, + ) + return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) + + existing = load_vocabulary(root) + if existing is not None and self.config.vocabulary.reuse_existing: + print( + f"[annotate] phase=vocabulary reusing {root / 'meta' / 'canonical_vocabulary.json'} " + f"({len(existing.subtasks)} subtask labels, " + f"{len(existing.memory_milestones)} memory milestones)", + flush=True, + ) + self.plan.vocabulary = existing + return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) + + sample_n = max(1, min(int(self.config.vocabulary.sample_episodes), len(records))) + print( + f"[annotate] phase=vocabulary discovering from {sample_n} sample episode(s)...", + flush=True, + ) + t0 = time.time() + vocab = self.vocabulary.discover(records[:sample_n], existing=existing) + if vocab is None: + print( + "[annotate] phase=vocabulary returned no vocabulary — " + "plan module will fall back to free-form generation", + flush=True, + ) + return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) + + save_path = save_vocabulary(root, vocab) + print( + f"[annotate] phase=vocabulary wrote {save_path} " + f"({len(vocab.subtasks)} subtask labels, " + f"{len(vocab.memory_milestones)} memory milestones) in " + f"{time.time() - t0:.1f}s", + flush=True, + ) + self.plan.vocabulary = vocab + return PhaseResult(name="vocabulary", episodes_processed=sample_n, episodes_skipped=0) + def _run_module_phase( self, name: str, diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index 0218e79b1..218d98ad1 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -17,6 +17,7 @@ from __future__ import annotations +import logging from collections.abc import Sequence from dataclasses import dataclass, field from pathlib import Path @@ -34,6 +35,9 @@ from ..prompts import load as load_prompt from ..reader import EpisodeRecord, reconstruct_subtask_spans, snap_to_frame from ..staging import EpisodeStaging from ..vlm_client import VlmClient +from ..vocabulary import Vocabulary + +logger = logging.getLogger(__name__) @dataclass @@ -54,6 +58,11 @@ class PlanSubtasksMemoryModule: vlm: VlmClient config: PlanConfig frame_provider: FrameProvider = field(default_factory=null_provider) + vocabulary: Vocabulary | None = None + """When set, the module constrains subtask + memory generation to the + canonical strings in ``vocabulary``. Phase 0 (vocabulary discovery) + populates this once per dataset; ``None`` falls back to free-form + generation (original behaviour).""" @property def enabled(self) -> bool: @@ -311,6 +320,7 @@ class PlanSubtasksMemoryModule: min_subtask_seconds=self.config.min_subtask_seconds, max_steps=self.config.plan_max_steps, episode_duration=f"{episode_duration:.3f}", + vocabulary_block=self._subtask_vocabulary_block(), ) spans = self._vlm_field(self._video_message(record, prompt), "subtasks") if not spans: @@ -330,12 +340,100 @@ class PlanSubtasksMemoryModule: end = max(t0, min(end, t_last)) if end < start: start, end = end, start + if not text: + continue + text = self._canonicalize_subtask(text) if not text: continue cleaned.append({"text": text, "start": start, "end": end}) cleaned.sort(key=lambda s: s["start"]) return cleaned + # ------------------------------------------------------------------ + # Canonical-vocabulary helpers + # ------------------------------------------------------------------ + + def _subtask_vocabulary_block(self) -> str: + """Bullet-list of canonical subtasks the VLM must pick from. + + Returns an empty string when no vocabulary is configured — + ``module_1_subtasks.txt`` then falls back to its free-form + rules (original behaviour). + """ + if self.vocabulary is None or not self.vocabulary.subtasks: + return "" + bullets = "\n".join(f"- {s}" for s in self.vocabulary.subtasks) + return ( + "You MUST choose each subtask label verbatim from this canonical " + "vocabulary — pick the closest match for each phase of the demo, " + "and reuse the SAME string every time that phase recurs. The " + "low-level policy is conditioned on these exact strings; any " + "novel paraphrase you invent will make its conditioning OOD.\n" + "Canonical subtask labels:\n" + f"{bullets}\n\n" + ) + + def _memory_vocabulary_block(self) -> str: + """Bullet-list of canonical memory milestones the VLM must pick from.""" + if self.vocabulary is None or not self.vocabulary.memory_milestones: + return "" + bullets = "\n".join(f"- {m}" for m in self.vocabulary.memory_milestones) + return ( + "Compose the memory by picking ONLY from this canonical milestone " + "list — append a milestone (or rewrite the running memory to " + "compress past ones) using these exact phrases. Do not invent new " + "wording: every paraphrase weakens the downstream conditioning.\n" + "Canonical memory milestones:\n" + f"{bullets}\n\n" + ) + + def _canonicalize_subtask(self, text: str) -> str: + """Snap ``text`` to the closest canonical subtask string, or drop it. + + Without a vocabulary, the original text passes through. With a + vocabulary, an exact case-insensitive match wins; failing that, + the best Jaccard overlap on the word set is used as a tolerant + fuzzy match (handles articles / minor reorderings). If nothing + clears the floor, the subtask is dropped — better to skip a + phase than to feed the action expert an off-distribution string. + """ + if self.vocabulary is None or not self.vocabulary.subtasks: + return text.strip() + candidates = self.vocabulary.subtasks + cleaned = text.strip() + lowered = cleaned.lower() + for candidate in candidates: + if candidate.lower() == lowered: + return candidate + # Jaccard fallback: token-set overlap, drop articles + adverbs. + ignore = {"the", "a", "an", "to", "into", "from", "of", "on", "over", "at"} + words = {w for w in lowered.replace(",", " ").split() if w and w not in ignore} + if not words: + return "" + best: tuple[float, str] | None = None + for candidate in candidates: + cand_words = { + w for w in candidate.lower().replace(",", " ").split() if w and w not in ignore + } + if not cand_words: + continue + inter = len(words & cand_words) + union = len(words | cand_words) + score = inter / union if union else 0.0 + if best is None or score > best[0]: + best = (score, candidate) + # Floor: require at least ~half the tokens to overlap. Below that + # the VLM is hallucinating a novel phrase; drop rather than warp + # it into something semantically wrong. + if best is None or best[0] < 0.5: + logger.warning( + "subtask %r did not match any canonical label (best=%s) — dropping", + cleaned, + best, + ) + return "" + return best[1] + def _generate_plan( self, record: EpisodeRecord, # noqa: ARG002 (kept for signature stability) @@ -397,6 +495,7 @@ class PlanSubtasksMemoryModule: prior_memory=prior_memory or "(none)", completed_subtask=completed, remaining_subtasks=", ".join(remaining) if remaining else "(none)", + vocabulary_block=self._memory_vocabulary_block(), ) memory = self._vlm_field(self._text_message(prompt), "memory") return memory.strip() if isinstance(memory, str) else "" diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt new file mode 100644 index 000000000..f867c34b5 --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt @@ -0,0 +1,46 @@ +You are inspecting {n_episodes} sample episode video(s) from a teleoperated +robot dataset. Every episode in the dataset performs the SAME task; the +user originally asked: "{episode_task}". + +Watch all the clips and produce a SHORT canonical vocabulary that every +episode in this dataset will reuse. The downstream low-level policy is +conditioned on these strings — duplicate phrasings (e.g. "grasp blue +cube" vs "pick up the blue cube") would destroy the conditioning, so +pick one wording per concept and reuse it everywhere. + +You output two lists: + +1. `subtasks`: imperative, telegraphic commands the robot can execute. + - Verb-first. Drop articles, adverbs, qualifiers. + - Consistent object nouns (if the task says "cube", every subtask says + "cube" — never "block" / "object"). + - Atomic — one skill per subtask (gripper-open events, contact, regrasps, + transitions all become cut points). + - Aim for ~{n_subtask_target} labels. Fewer is better than more. + - Good: "move to blue cube", "grasp blue cube", "lift blue cube", + "place blue cube in box", "release blue cube", "retract arm". + - Bad: "the robot arm moves towards the blue cube" (third person, + too long), "carefully pick up the cube" (adverb, article), + "carrying the yellow cube over the green basket" (gerund — should + be imperative "transport yellow cube to green basket"). + +2. `memory_milestones`: first-person past-tense sentences the running + memory composes from. Each subtask phase that produces a lasting + change should have a milestone; transient motions (move, retract) + should NOT. + - First person, past tense. Start with "I". + - One sentence. Functional outcome only — no grasp / motion detail. + - Aim for ~{n_memory_target} milestones. + - Good: "I picked up the blue cube.", "I placed the blue cube in + the green box.", "I wiped the counter." + - Bad: "The robot arm grasped the blue cube." (third person), + "I carefully grasped the blue cube with the parallel gripper." + (irrelevant detail), "I moved towards the blue cube." (transient + motion — should be omitted, not memorialised). + +Output strictly valid JSON of shape: + + {{ + "subtasks": ["", ...], + "memory_milestones": ["I .", ...] + }} diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt index b5278368b..d066b9f73 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt @@ -13,7 +13,7 @@ Previous memory: {prior_memory} Just-completed subtask: "{completed_subtask}" Remaining subtasks (for relevance judgement only): {remaining_subtasks} -Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the +{vocabulary_block}Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the robot has accomplished so far — the running story it would tell itself. Authoring rules: diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt index c530b1340..12bbcfba2 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt @@ -4,9 +4,9 @@ The user originally asked: "{episode_task}" You are shown the entire demonstration as a single video. Watch the whole clip, then segment it into a list of consecutive atomic subtasks -the robot performs. Write short, telegraphic action labels. +the robot performs. -Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts: +{vocabulary_block}Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts: - Each subtask = one atomic skill the low-level policy can execute. - Write each subtask as an IMPERATIVE COMMAND, starting with a verb: diff --git a/src/lerobot/annotations/steerable_pipeline/vocabulary.py b/src/lerobot/annotations/steerable_pipeline/vocabulary.py new file mode 100644 index 000000000..8787ec372 --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/vocabulary.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Dataset-level canonical vocabulary discovery (Phase 0). + +The downstream consumer of these annotations is a low-level action expert +conditioned on the ``subtask`` string. Free-form per-episode LLM rephrasing +gives near-unique strings per occurrence, which collapses the action +expert's conditioning to noise and makes runtime subtask-paraphrase drift +catastrophic. The Hi-Robot / π0.6-MEM recipe ships a small canonical +vocabulary per environment (~10 strings) that every episode reuses; this +module derives that vocabulary automatically from the first few episode +videos and persists it next to the dataset. + +Pipeline-level flow: + + Phase 0 (here): watch N sample episodes → produce vocabulary.json + Phase 1 (plan module): reuse vocabulary on every episode, both as + prompt-side constraint *and* post-VLM validation + +The vocabulary is JSON, lives at ``/meta/canonical_vocabulary.json``, +and is human-inspectable / hand-editable — if the discovered set is wrong, +operators edit the file and re-run the pipeline without phase 0. +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Sequence +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from .config import VocabularyConfig +from .frames import FrameProvider, null_provider, to_video_block +from .prompts import load as load_prompt +from .reader import EpisodeRecord +from .vlm_client import VlmClient + +logger = logging.getLogger(__name__) + +VOCABULARY_FILENAME = "canonical_vocabulary.json" + + +@dataclass +class Vocabulary: + """Canonical phrasings shared across every episode of one dataset. + + Both lists are strict: per-episode subtask + memory generation pick + from these strings only; the downstream policy then has a small, + repeatable target distribution to learn instead of thousands of + LLM paraphrases. + """ + + subtasks: tuple[str, ...] + """Imperative subtask labels — what the low-level policy is conditioned + on. Verb-first, telegraphic, consistent object nouns. Example: + ``("move to blue cube", "grasp blue cube", "lift blue cube", + "place blue cube in box", "retract arm")``. + """ + + memory_milestones: tuple[str, ...] + """First-person past-tense milestone sentences — building blocks for + the running memory string. Example: ``("I picked up the blue cube.", + "I placed the blue cube in the green box.")``. Each milestone maps + 1:1 onto a completed subtask phase; ``memory_at_step_k`` is the + concatenation of milestones for completed phases. + """ + + def to_json(self) -> dict[str, list[str]]: + return { + "subtasks": list(self.subtasks), + "memory_milestones": list(self.memory_milestones), + } + + @classmethod + def from_json(cls, payload: dict[str, Any]) -> Vocabulary: + subtasks = tuple( + str(s).strip() for s in (payload.get("subtasks") or []) if str(s).strip() + ) + memory_milestones = tuple( + str(s).strip() for s in (payload.get("memory_milestones") or []) if str(s).strip() + ) + return cls(subtasks=subtasks, memory_milestones=memory_milestones) + + def is_empty(self) -> bool: + return not self.subtasks and not self.memory_milestones + + +def vocabulary_path(root: Path) -> Path: + """Return the canonical on-disk location for the vocabulary file.""" + return root / "meta" / VOCABULARY_FILENAME + + +def load_vocabulary(root: Path) -> Vocabulary | None: + """Read ``/meta/canonical_vocabulary.json`` if present. + + Returns ``None`` when the file does not exist — callers fall back to + free-form (unconstrained) subtask + memory generation, preserving the + pipeline's behaviour on datasets that never ran phase 0. + """ + path = vocabulary_path(root) + if not path.exists(): + return None + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + logger.warning("could not read %s: %s — proceeding without vocabulary", path, exc) + return None + if not isinstance(payload, dict): + logger.warning("%s is not a JSON object — ignoring", path) + return None + vocab = Vocabulary.from_json(payload) + if vocab.is_empty(): + return None + return vocab + + +def save_vocabulary(root: Path, vocab: Vocabulary) -> Path: + """Atomically persist ``vocab`` to ``/meta/canonical_vocabulary.json``.""" + path = vocabulary_path(root) + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text( + json.dumps(vocab.to_json(), indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + tmp.replace(path) + return path + + +@dataclass +class VocabularyDiscoveryModule: + """Derive a dataset-level canonical vocabulary from sample episodes. + + Phase 0 of the executor: pulls ``config.sample_episodes`` episode + videos, packs them into one Qwen-VL multi-video prompt, and asks the + model to enumerate the small set of canonical subtask labels + + memory milestones that recur across them. The output is persisted + to ``meta/canonical_vocabulary.json`` and consumed by phase 1. + """ + + vlm: VlmClient + config: VocabularyConfig + frame_provider: FrameProvider = field(default_factory=null_provider) + + @property + def enabled(self) -> bool: + return self.config.enabled + + def discover( + self, + records: Sequence[EpisodeRecord], + *, + existing: Vocabulary | None = None, + ) -> Vocabulary | None: + """Run vocabulary discovery against the first N sample episodes. + + ``existing`` short-circuits the VLM call when ``config.reuse_existing`` + is True and an on-disk vocabulary is already present — keeps re-runs + cheap and lets operators hand-edit the file without it getting + overwritten. + """ + if existing is not None and self.config.reuse_existing: + logger.info( + "vocabulary: reusing existing (%d subtasks, %d memory milestones)", + len(existing.subtasks), + len(existing.memory_milestones), + ) + return existing + + sample = list(records[: max(1, int(self.config.sample_episodes))]) + if not sample: + return None + + task_hint = next((r.episode_task for r in sample if r.episode_task), "") + prompt = load_prompt("module_0_vocabulary").format( + episode_task=task_hint or "(unspecified)", + n_episodes=len(sample), + n_subtask_target=int(self.config.n_subtask_target), + n_memory_target=int(self.config.n_memory_target), + ) + # Pack one video block per sample episode so the VLM sees the + # variation across episodes (different starting poses, different + # object placements) rather than overfitting to one trajectory. + content: list[dict[str, Any]] = [] + for record in sample: + video_frames = self.frame_provider.video_for_episode( + record, int(self.config.max_video_frames_per_episode) + ) + if video_frames: + content.extend(to_video_block(video_frames)) + content.append({"type": "text", "text": prompt}) + messages = [{"role": "user", "content": content}] + + result = self.vlm.generate_json([messages])[0] + if not isinstance(result, dict): + logger.warning("vocabulary: VLM did not return a JSON object — skipping") + return None + + vocab = Vocabulary.from_json(result) + if vocab.is_empty(): + logger.warning("vocabulary: VLM returned an empty vocabulary — skipping") + return None + logger.info( + "vocabulary: discovered %d subtask labels + %d memory milestones from %d episodes", + len(vocab.subtasks), + len(vocab.memory_milestones), + len(sample), + ) + return vocab diff --git a/src/lerobot/scripts/lerobot_annotate.py b/src/lerobot/scripts/lerobot_annotate.py index 7fee1f052..52309b827 100644 --- a/src/lerobot/scripts/lerobot_annotate.py +++ b/src/lerobot/scripts/lerobot_annotate.py @@ -40,6 +40,7 @@ from lerobot.annotations.steerable_pipeline.modules import ( ) from lerobot.annotations.steerable_pipeline.validator import StagingValidator from lerobot.annotations.steerable_pipeline.vlm_client import make_vlm_client +from lerobot.annotations.steerable_pipeline.vocabulary import VocabularyDiscoveryModule from lerobot.annotations.steerable_pipeline.writer import LanguageColumnsWriter from lerobot.configs import parser @@ -88,6 +89,9 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: vlm=vlm, config=cfg.interjections, seed=cfg.seed, frame_provider=frame_provider ) vqa = GeneralVqaModule(vlm=vlm, config=cfg.vqa, seed=cfg.seed, frame_provider=frame_provider) + vocabulary = VocabularyDiscoveryModule( + vlm=vlm, config=cfg.vocabulary, frame_provider=frame_provider + ) writer = LanguageColumnsWriter() validator = StagingValidator( dataset_camera_keys=tuple(getattr(frame_provider, "camera_keys", []) or []) or None, @@ -98,6 +102,7 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: plan=plan, interjections=interjections, vqa=vqa, + vocabulary=vocabulary, writer=writer, validator=validator, ) diff --git a/tests/annotations/test_vocabulary.py b/tests/annotations/test_vocabulary.py new file mode 100644 index 000000000..a9f080a16 --- /dev/null +++ b/tests/annotations/test_vocabulary.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Vocabulary-discovery phase (phase 0) tests.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from lerobot.annotations.steerable_pipeline.config import ( + PlanConfig, + VocabularyConfig, +) +from lerobot.annotations.steerable_pipeline.modules import PlanSubtasksMemoryModule +from lerobot.annotations.steerable_pipeline.reader import iter_episodes +from lerobot.annotations.steerable_pipeline.staging import EpisodeStaging +from lerobot.annotations.steerable_pipeline.vocabulary import ( + Vocabulary, + VocabularyDiscoveryModule, + load_vocabulary, + save_vocabulary, + vocabulary_path, +) + +from ._helpers import make_canned_responder + + +_CANONICAL_SUBTASKS = ( + "grasp blue cube", + "place blue cube in box", + "retract arm", +) +_CANONICAL_MEMORY = ( + "I picked up the blue cube.", + "I placed the blue cube in the box.", +) + + +# --------------------------------------------------------------------------- +# Vocabulary dataclass + on-disk round-trip +# --------------------------------------------------------------------------- + + +def test_vocabulary_roundtrip(tmp_path: Path) -> None: + vocab = Vocabulary( + subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY + ) + save_path = save_vocabulary(tmp_path, vocab) + assert save_path == vocabulary_path(tmp_path) + assert save_path.exists() + + loaded = load_vocabulary(tmp_path) + assert loaded is not None + assert loaded.subtasks == _CANONICAL_SUBTASKS + assert loaded.memory_milestones == _CANONICAL_MEMORY + + +def test_vocabulary_load_missing_returns_none(tmp_path: Path) -> None: + assert load_vocabulary(tmp_path) is None + + +def test_vocabulary_load_malformed_returns_none(tmp_path: Path) -> None: + path = vocabulary_path(tmp_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("{ not valid json", encoding="utf-8") + assert load_vocabulary(tmp_path) is None + + +def test_vocabulary_load_empty_payload_returns_none(tmp_path: Path) -> None: + path = vocabulary_path(tmp_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps({"subtasks": [], "memory_milestones": []}), encoding="utf-8") + assert load_vocabulary(tmp_path) is None + + +# --------------------------------------------------------------------------- +# Discovery module +# --------------------------------------------------------------------------- + + +def test_vocabulary_discovery_calls_vlm_and_returns_vocab( + fixture_dataset_root: Path, +) -> None: + vlm = make_canned_responder( + { + "canonical vocabulary": { + "subtasks": list(_CANONICAL_SUBTASKS), + "memory_milestones": list(_CANONICAL_MEMORY), + } + } + ) + module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig(sample_episodes=2)) + records = list(iter_episodes(fixture_dataset_root)) + vocab = module.discover(records) + assert vocab is not None + assert vocab.subtasks == _CANONICAL_SUBTASKS + assert vocab.memory_milestones == _CANONICAL_MEMORY + + +def test_vocabulary_discovery_reuses_existing(fixture_dataset_root: Path) -> None: + """``reuse_existing=True`` short-circuits the VLM call entirely.""" + + def _explode(_messages): # pragma: no cover - must not be called + raise AssertionError("VLM should not be invoked when reusing existing vocabulary") + + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + vlm = StubVlmClient(responder=_explode) + module = VocabularyDiscoveryModule( + vlm=vlm, config=VocabularyConfig(reuse_existing=True) + ) + records = list(iter_episodes(fixture_dataset_root)) + existing = Vocabulary(subtasks=("a", "b"), memory_milestones=("I a.",)) + vocab = module.discover(records, existing=existing) + assert vocab is existing + + +def test_vocabulary_discovery_empty_payload_returns_none( + fixture_dataset_root: Path, +) -> None: + vlm = make_canned_responder({"canonical vocabulary": {"subtasks": [], "memory_milestones": []}}) + module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig()) + records = list(iter_episodes(fixture_dataset_root)) + assert module.discover(records) is None + + +# --------------------------------------------------------------------------- +# PlanSubtasksMemoryModule consumes the vocabulary +# --------------------------------------------------------------------------- + + +def test_plan_module_inlines_vocab_into_subtask_prompt( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + captured: list[str] = [] + + def responder(messages): + # Find the last user text block and stash it for inspection. + for message in messages: + content = message.get("content") + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + captured.append(block.get("text", "")) + # Return canned subtasks; pick the first two canonical strings so + # the validator accepts them. + return { + "subtasks": [ + {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, + {"text": "place blue cube in box", "start": 0.4, "end": 0.9}, + ] + } + + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + # The subtask prompt (and the memory prompt) carries the canonical + # bullet list so the VLM can't paraphrase them away. + assert any("Canonical subtask labels:" in t for t in captured) + assert any("grasp blue cube" in t for t in captured) + + +def test_plan_module_canonicalizes_paraphrased_subtask( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """Off-vocab paraphrase with high token overlap snaps to canonical form.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + return { + "subtasks": [ + # paraphrase of "grasp blue cube" — overlapping tokens + {"text": "grasp the blue cube", "start": 0.0, "end": 0.4}, + # paraphrase of "place blue cube in box" — high overlap + {"text": "place the blue cube into the box", "start": 0.4, "end": 0.9}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + # Both paraphrases snapped onto canonical strings. + assert subtask_texts == ["grasp blue cube", "place blue cube in box"] + + +def test_plan_module_drops_off_vocab_subtask( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """A subtask with low overlap to every canonical label is dropped.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + return { + "subtasks": [ + # in-vocab + {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, + # off-vocab hallucination — no token overlap above the + # Jaccard floor; should be dropped. + {"text": "perform a fancy macarena dance", "start": 0.4, "end": 0.9}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + assert subtask_texts == ["grasp blue cube"] + + +def test_plan_module_without_vocab_passes_through( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """No vocabulary configured → original free-form behavior is preserved.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + return { + "subtasks": [ + {"text": "any free-form text the VLM wants", "start": 0.0, "end": 1.0}, + ] + } + + vlm = StubVlmClient(responder=responder) + module = PlanSubtasksMemoryModule( + vlm=vlm, config=PlanConfig(n_task_rephrasings=0) + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + assert subtask_texts == ["any free-form text the VLM wants"]