diff --git a/docs/source/annotation_pipeline.mdx b/docs/source/annotation_pipeline.mdx index b5f52cb1c..9d6e66231 100644 --- a/docs/source/annotation_pipeline.mdx +++ b/docs/source/annotation_pipeline.mdx @@ -7,7 +7,8 @@ ## What the pipeline produces -Three modules write into a per-episode staging tree, then a single writer +A vocabulary-discovery phase derives a small canonical wording, then three +modules write into a per-episode staging tree, then a single writer rewrites the data shards in place: | Style / atom | Column | Module | @@ -20,6 +21,21 @@ rewrites the data shards in place: | speech tool-call atom (`style=null`, `say`) | `language_events` | `interjections`| | `vqa` (user / assistant pair) | `language_events` | `vqa` | +The `plan` module is constrained to a **canonical vocabulary** discovered +once per dataset by the `vocabulary` module (phase 0). It watches a few +sample episode videos (`--vocabulary.sample_episodes`, default `3`) and +asks the VLM to derive a small set of imperative subtask labels and +first-person memory milestones that recur across the demos. The VLM +picks the right number of entries itself based on what it sees in the +clips — short pick-and-place demos get ~6 subtask labels, longer +multi-step recipes get more. The result lands at +`meta/canonical_vocabulary.json` (human-readable / hand-editable) and +is reused on every subsequent run. The `plan` module then constrains +both subtask + memory generation to those exact strings — the +downstream low-level policy sees a small, repeatable target +distribution instead of thousands of LLM paraphrases. Disable with +`--vocabulary.enabled=False` to fall back to free-form generation. + The writer does **not** add a `tools` column to the parquet — the tool catalog lives at `meta/info.json["tools"]` instead (see [Tools](./tools)). After every annotation run the pipeline ensures the diff --git a/examples/annotations/run_hf_job.py b/examples/annotations/run_hf_job.py index 8dd354c7d..f3e497039 100644 --- a/examples/annotations/run_hf_job.py +++ b/examples/annotations/run_hf_job.py @@ -1,38 +1,22 @@ #!/usr/bin/env python - -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. """Launch ``lerobot-annotate`` on a Hugging Face job (vllm + Qwen3.6 MoE). Spawns one ``h200x2`` job that: 1. installs this branch of ``lerobot`` plus the annotation extras, 2. boots two vllm servers (one per GPU) with Qwen3.6-35B-A3B-FP8, - 3. runs the plan / interjections / vqa modules across the dataset, - 4. uploads the annotated dataset back to ``--repo_id`` (or to - ``--dest_repo_id`` when set). - -``--repo_id`` is the download source and, with ``--push_to_hub=true``, also -the default upload destination — the job annotates the dataset in place. -Pass ``--dest_repo_id`` to push the result to a separate repo instead and -leave the source untouched. + 3. discovers the dataset's canonical subtask + memory vocabulary + from the first 3 sample episodes (phase 0), + 4. runs the plan / interjections / vqa modules across the dataset + (subtasks + memory are constrained to the canonical vocabulary), + 5. uploads the annotated dataset to ``--dest_repo_id`` (when set) + or back to ``--repo_id``. Usage: HF_TOKEN=hf_... uv run python examples/annotations/run_hf_job.py -Adjust ``CMD`` below to point at your own dataset. +Adjust ``CMD`` below to point at your own dataset / target hub repo. """ import os @@ -48,19 +32,14 @@ CMD = ( "pip install --no-deps " "'lerobot @ git+https://github.com/huggingface/lerobot.git@feat/language-annotation-pipeline' && " "pip install --upgrade-strategy only-if-needed " - # Mirror lerobot's [annotations] runtime deps. ``openai`` is required - # because ``VlmConfig.backend`` defaults to ``"openai"`` (which talks - # to a vllm/transformers/ktransformers OpenAI-compatible server). - "datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include " - "toml typing-inspect openai && " + "datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include toml typing-inspect " + "openai && " "export VLLM_MEMORY_PROFILER_ESTIMATE_CUDAGRAPHS=0 && " "export VLLM_VIDEO_BACKEND=pyav && " "lerobot-annotate " - # The dataset to annotate. By default it is also the push destination - # (annotate in place); pass --dest_repo_id to push to a separate repo. - "--repo_id=/ " + "--repo_id=imstevenpmwork/super_poulain_draft " + "--dest_repo_id=pepijn223/super_poulain_vocab " "--push_to_hub=true " - # "--dest_repo_id=/ " "--vlm.backend=openai " "--vlm.model_id=Qwen/Qwen3.6-35B-A3B-FP8 " "--vlm.parallel_servers=2 " @@ -69,15 +48,29 @@ CMD = ( "--tensor-parallel-size 1 --max-model-len 32768 " '--gpu-memory-utilization 0.8 --uvicorn-log-level warning --port {port}" ' "--vlm.serve_ready_timeout_s=1800 " - "--vlm.client_concurrency=256 " + "--vlm.client_concurrency=128 " "--vlm.max_new_tokens=512 " - "--executor.episode_parallelism=32 " - "--vlm.chat_template_kwargs='{enable_thinking: false}' " + "--vlm.temperature=0.7 " + "--executor.episode_parallelism=16 " + "--vlm.chat_template_kwargs='{\"enable_thinking\": false}' " "--vlm.camera_key=observation.images.wrist " + # Phase 0 — canonical vocabulary discovery from the first N sample + # episodes. The VLM picks the right number of subtask + memory + # entries itself from what it sees; the resulting + # meta/canonical_vocabulary.json constrains every subtask + memory + # string to a small repeatable target distribution. + "--vocabulary.sample_episodes=3 " + # Phase 1 — plan module (subtasks + plan + memory + task_aug). "--plan.frames_per_second=1.0 " "--plan.use_video_url=true " "--plan.use_video_url_fps=1.0 " - "--vqa.K=1 --vqa.vqa_emission_hz=0.2" + "--plan.derive_task_from_video=always " + "--plan.n_task_rephrasings=30 " + # Phase 2 — interjections + speech. + "--interjections.max_interjections_per_episode=6 " + # Phase 4 — general VQA. + "--vqa.K=3 " + "--vqa.vqa_emission_hz=1.0" ) job = run_job( diff --git a/src/lerobot/annotations/steerable_pipeline/__init__.py b/src/lerobot/annotations/steerable_pipeline/__init__.py index a8da5e05e..02d819604 100644 --- a/src/lerobot/annotations/steerable_pipeline/__init__.py +++ b/src/lerobot/annotations/steerable_pipeline/__init__.py @@ -26,11 +26,25 @@ outputs are staged per-episode before a final parquet rewrite: from .config import AnnotationPipelineConfig from .validator import StagingValidator, ValidationReport +from .vocabulary import ( + VOCABULARY_FILENAME, + Vocabulary, + VocabularyDiscoveryModule, + load_vocabulary, + save_vocabulary, + vocabulary_path, +) from .writer import LanguageColumnsWriter __all__ = [ + "VOCABULARY_FILENAME", "AnnotationPipelineConfig", "LanguageColumnsWriter", "StagingValidator", "ValidationReport", + "Vocabulary", + "VocabularyDiscoveryModule", + "load_vocabulary", + "save_vocabulary", + "vocabulary_path", ] diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index f6b9204bc..da07d7998 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -21,6 +21,41 @@ from pathlib import Path from typing import Any +@dataclass +class VocabularyConfig: + """Phase 0 — dataset-level canonical vocabulary discovery. + + Watches the first ``sample_episodes`` episode videos and asks the VLM + to derive a small canonical vocabulary (subtask labels + memory + milestones) that every episode in the dataset will reuse. The VLM + decides the count itself from what it sees in the clips — short + pick-and-place demos get ~6 labels, longer multi-step recipes more. + The output lands at ``meta/canonical_vocabulary.json`` and feeds + phase 1's subtask + memory generation as both a prompt-side + constraint and a post-VLM validation gate. + + Why this exists: free-form LLM rephrasing per episode produces near- + unique subtask strings, which makes the downstream low-level policy's + conditioning effectively noise — at inference the policy generates a + *new* paraphrase the action expert has never seen and produces tiny + cautious actions. Forcing every episode onto the same small set of + canonical strings gives the action expert dense supervision per + string and a small target distribution to learn against. + + Set ``enabled=False`` to fall back to free-form generation (original + behaviour). ``reuse_existing=True`` keeps a hand-edited vocabulary + file from being clobbered on re-runs. + """ + + enabled: bool = True + sample_episodes: int = 3 + max_video_frames_per_episode: int = 32 + # When True (default), an existing meta/canonical_vocabulary.json is + # loaded as-is and no VLM call is made — lets operators hand-edit the + # file. Set False to always rediscover from the sample episodes. + reuse_existing: bool = True + + @dataclass class PlanConfig: """``plan`` module: plan + subtasks + memory + task augmentation. @@ -102,7 +137,7 @@ class VlmConfig: # ``openai`` talks to a local OpenAI-compatible server; the CLI # auto-spawns one when ``auto_serve=True``. backend: str = "openai" - model_id: str = "Qwen/Qwen2.5-VL-7B-Instruct" + model_id: str = "Qwen/Qwen3.6-35B-A3B-FP8" # OpenAI-compatible server endpoint; ``EMPTY`` works for local servers. api_base: str = "http://localhost:8000/v1" @@ -186,6 +221,7 @@ class AnnotationPipelineConfig: seed: int = 1729 + vocabulary: VocabularyConfig = field(default_factory=VocabularyConfig) plan: PlanConfig = field(default_factory=PlanConfig) interjections: InterjectionsConfig = field(default_factory=InterjectionsConfig) vqa: VqaConfig = field(default_factory=VqaConfig) diff --git a/src/lerobot/annotations/steerable_pipeline/executor.py b/src/lerobot/annotations/steerable_pipeline/executor.py index ad46d4750..5c725fa65 100644 --- a/src/lerobot/annotations/steerable_pipeline/executor.py +++ b/src/lerobot/annotations/steerable_pipeline/executor.py @@ -15,8 +15,14 @@ # limitations under the License. """In-process executor that runs the annotation phases. -The executor plans **six phases** in the dependency order from the plan: +The executor plans **seven phases** in the dependency order from the plan: + phase 0: vocabulary discovery — derive a small canonical vocabulary + from the first few sample-episode videos (subtask labels + + memory milestones) and persist it next to the dataset; the + ``plan`` module then constrains every per-episode generation + to those strings, so the downstream policy sees a small, + repeatable conditioning distribution phase 1: ``plan`` module (plan + subtasks + memory) phase 2: ``interjections`` module (interjections + speech) phase 3: ``plan`` plan-update pass — re-runs plan emission at every @@ -88,6 +94,7 @@ class Executor: vqa: Any # GeneralVqaModule writer: LanguageColumnsWriter validator: StagingValidator + vocabulary: Any = None # VocabularyDiscoveryModule | None def run(self, root: Path) -> PipelineRunSummary: records = list(iter_episodes(root, only_episodes=self.config.only_episodes)) @@ -102,6 +109,10 @@ class Executor: phases: list[PhaseResult] = [] + # Phase 0: vocabulary discovery. Mutates ``self.plan.vocabulary`` + # so subsequent per-episode plan calls see the canonical labels. + phases.append(self._run_vocabulary_phase(records, root)) + # Phase 1: ``plan`` module (plan + subtasks + memory) phases.append(self._run_module_phase("plan", records, staging_dir, self.plan)) # Phase 2: ``interjections`` module (interjections + speech). It @@ -172,6 +183,62 @@ class Executor: flush=True, ) + def _run_vocabulary_phase( + self, records: list[EpisodeRecord], root: Path + ) -> PhaseResult: + """Discover (or load) the canonical vocabulary, wire it into ``self.plan``. + + Returns a ``PhaseResult`` whose ``episodes_processed`` is the number + of sample episodes consulted (0 when disabled or no VLM call was + needed); ``episodes_skipped`` is always ``0`` because vocabulary is + a once-per-dataset artifact, not a per-episode product. + """ + from .vocabulary import load_vocabulary, save_vocabulary # noqa: PLC0415 + + if self.vocabulary is None or not getattr(self.vocabulary, "enabled", False): + print( + "[annotate] phase=vocabulary skipped (module disabled or unset)", + flush=True, + ) + return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) + + existing = load_vocabulary(root) + if existing is not None and self.config.vocabulary.reuse_existing: + print( + f"[annotate] phase=vocabulary reusing {root / 'meta' / 'canonical_vocabulary.json'} " + f"({len(existing.subtasks)} subtask labels, " + f"{len(existing.memory_milestones)} memory milestones)", + flush=True, + ) + self.plan.vocabulary = existing + return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) + + sample_n = max(1, min(int(self.config.vocabulary.sample_episodes), len(records))) + print( + f"[annotate] phase=vocabulary discovering from {sample_n} sample episode(s)...", + flush=True, + ) + t0 = time.time() + vocab = self.vocabulary.discover(records[:sample_n], existing=existing) + if vocab is None: + print( + "[annotate] phase=vocabulary returned no vocabulary — " + "plan module will fall back to free-form generation", + flush=True, + ) + return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) + + save_path = save_vocabulary(root, vocab) + print( + f"[annotate] phase=vocabulary wrote {save_path} " + f"({len(vocab.subtasks)} subtask labels, " + f"{len(vocab.memory_milestones)} memory milestones) in " + f"{time.time() - t0:.1f}s", + flush=True, + ) + self.plan.vocabulary = vocab + return PhaseResult(name="vocabulary", episodes_processed=sample_n, episodes_skipped=0) + def _run_module_phase( self, name: str, diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index 0218e79b1..b9bae607e 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -17,6 +17,7 @@ from __future__ import annotations +import logging from collections.abc import Sequence from dataclasses import dataclass, field from pathlib import Path @@ -34,6 +35,9 @@ from ..prompts import load as load_prompt from ..reader import EpisodeRecord, reconstruct_subtask_spans, snap_to_frame from ..staging import EpisodeStaging from ..vlm_client import VlmClient +from ..vocabulary import Vocabulary + +logger = logging.getLogger(__name__) @dataclass @@ -54,6 +58,11 @@ class PlanSubtasksMemoryModule: vlm: VlmClient config: PlanConfig frame_provider: FrameProvider = field(default_factory=null_provider) + vocabulary: Vocabulary | None = None + """When set, the module constrains subtask + memory generation to the + canonical strings in ``vocabulary``. Phase 0 (vocabulary discovery) + populates this once per dataset; ``None`` falls back to free-form + generation (original behaviour).""" @property def enabled(self) -> bool: @@ -311,8 +320,28 @@ class PlanSubtasksMemoryModule: min_subtask_seconds=self.config.min_subtask_seconds, max_steps=self.config.plan_max_steps, episode_duration=f"{episode_duration:.3f}", + vocabulary_block=self._subtask_vocabulary_block(), ) - spans = self._vlm_field(self._video_message(record, prompt), "subtasks") + messages = self._video_message(record, prompt) + spans = self._vlm_field(messages, "subtasks") + # When a vocabulary is in force, do a single targeted retry if + # any returned subtask is off-vocab — strict exact-match only, + # no fuzzy snapping. The retry includes the offending strings + # and the full canonical list so the VLM can correct itself. + if self.vocabulary is not None and self.vocabulary.subtasks and spans: + invalid = self._invalid_subtasks(spans) + if invalid: + logger.info( + "episode %d: VLM emitted %d off-vocab subtask(s) (%s); retrying once", + record.episode_index, + len(invalid), + invalid, + ) + retry_msg = self._build_subtask_retry_message(messages, invalid) + retried = self._vlm_field(retry_msg, "subtasks") + if retried: + spans = retried + if not spans: return [] # clamp to [t0, t_last] and sort @@ -330,12 +359,197 @@ class PlanSubtasksMemoryModule: end = max(t0, min(end, t_last)) if end < start: start, end = end, start + if not text: + continue + text = self._canonicalize_subtask(text) if not text: continue cleaned.append({"text": text, "start": start, "end": end}) cleaned.sort(key=lambda s: s["start"]) + cleaned = self._dedupe_starts_to_distinct_frames(cleaned, record) + if self.vocabulary is not None and self.vocabulary.subtasks and not cleaned: + logger.warning( + "episode %d: every VLM subtask was off-vocab even after retry — " + "episode left empty (extend meta/canonical_vocabulary.json to " + "cover the missing phase)", + record.episode_index, + ) return cleaned + @staticmethod + def _dedupe_starts_to_distinct_frames( + spans: list[dict[str, Any]], record: EpisodeRecord + ) -> list[dict[str, Any]]: + """Bump same-frame subtask starts onto distinct frames. + + Two consecutive VLM spans whose ``start`` rounds to the same + source frame (after :func:`snap_to_frame`) would otherwise emit + two ``style=subtask`` rows at the identical persistent + timestamp. The training-time renderer's ``active_at(t, + style=subtask)`` resolver can't disambiguate that and raises + ``Ambiguous resolver for style='subtask'``. + + Walk the (sorted-by-start) spans, snap each to its frame, and + if the snapped frame is already taken push the span onto the + next unused frame so both subtasks survive on distinct + timestamps. If the episode ends before a free frame is found, + the trailing span is dropped with a warning — better than + poisoning the render. + """ + if not spans: + return spans + frames = record.frame_timestamps + if not frames: + return spans + used: set[float] = set() + out: list[dict[str, Any]] = [] + for span in spans: + ts = snap_to_frame(span["start"], frames) + if ts in used: + next_ts = next((f for f in frames if f > ts and f not in used), None) + if next_ts is None: + logger.warning( + "episode %d: subtask %r snapped to occupied frame " + "%.3f and no free later frame exists — dropping", + record.episode_index, + span.get("text"), + ts, + ) + continue + ts = next_ts + used.add(ts) + new_span = {**span, "start": ts} + if float(new_span.get("end", ts)) < ts: + new_span["end"] = ts + out.append(new_span) + return out + + # ------------------------------------------------------------------ + # Canonical-vocabulary helpers + # ------------------------------------------------------------------ + + def _subtask_vocabulary_block(self) -> str: + """Bullet-list of canonical subtasks the VLM must pick from. + + Returns an empty string when no vocabulary is configured — + ``module_1_subtasks.txt`` then falls back to its free-form + rules (original behaviour). + """ + if self.vocabulary is None or not self.vocabulary.subtasks: + return "" + bullets = "\n".join(f"- {s}" for s in self.vocabulary.subtasks) + return ( + "You MUST choose each subtask label verbatim from this canonical " + "vocabulary — pick the closest match for each phase of the demo, " + "and reuse the SAME string every time that phase recurs. The " + "low-level policy is conditioned on these exact strings; any " + "novel paraphrase you invent will make its conditioning OOD.\n" + "Canonical subtask labels:\n" + f"{bullets}\n\n" + ) + + def _memory_vocabulary_block(self) -> str: + """Bullet-list of canonical memory milestones the VLM must pick from.""" + if self.vocabulary is None or not self.vocabulary.memory_milestones: + return "" + bullets = "\n".join(f"- {m}" for m in self.vocabulary.memory_milestones) + return ( + "Compose the memory by picking ONLY from this canonical milestone " + "list — append a milestone (or rewrite the running memory to " + "compress past ones) using these exact phrases. Do not invent new " + "wording: every paraphrase weakens the downstream conditioning.\n" + "Canonical memory milestones:\n" + f"{bullets}\n\n" + ) + + _NORMALIZE_STRIP_TOKENS: frozenset[str] = frozenset({"the", "a", "an"}) + + def _canonicalize_subtask(self, text: str) -> str: + """Validate ``text`` against the canonical vocabulary; no fuzzy snap. + + Without a vocabulary, the original text passes through. With a + vocabulary, accept the span only if its normalised form (lower- + cased, articles stripped, whitespace collapsed) matches a + canonical entry exactly — the canonical wording is returned so + the supervised string is byte-identical across episodes. + + Off-vocab spans are dropped (empty string). Upstream + ``_generate_subtasks`` triggers a targeted retry before reaching + the drop path; this function never snaps or warps a span into + a different label. + """ + if self.vocabulary is None or not self.vocabulary.subtasks: + return text.strip() + normalised = self._normalize(text) + if not normalised: + return "" + for candidate in self.vocabulary.subtasks: + if self._normalize(candidate) == normalised: + return candidate + return "" + + @classmethod + def _normalize(cls, text: str) -> str: + """Lowercase, strip articles, collapse whitespace, drop punctuation.""" + words = [ + w.strip(".,:;\"'!?()") + for w in text.lower().replace(",", " ").split() + ] + return " ".join(w for w in words if w and w not in cls._NORMALIZE_STRIP_TOKENS) + + def _invalid_subtasks(self, spans: list[dict[str, Any]]) -> list[str]: + """Return the unique off-vocab subtask strings the VLM produced.""" + seen: list[str] = [] + for span in spans: + text = str((span or {}).get("text") or "").strip() + if not text: + continue + if self._canonicalize_subtask(text): + continue + if text not in seen: + seen.append(text) + return seen + + def _build_subtask_retry_message( + self, original_messages: list[dict[str, Any]], invalid: list[str] + ) -> list[dict[str, Any]]: + """Compose a one-shot correction prompt naming the off-vocab strings.""" + assert self.vocabulary is not None + canonical = "\n".join(f"- {s}" for s in self.vocabulary.subtasks) + invalid_list = "\n".join(f"- {s!r}" for s in invalid) + correction = ( + "Your previous response included subtask labels that are NOT in " + "the canonical vocabulary:\n" + f"{invalid_list}\n\n" + "Re-emit the same segmentation (same number of spans, same start/end " + "timestamps where they were valid) but replace every off-vocab " + "label with the EXACT canonical string for that phase, copied " + "verbatim from this list:\n" + f"{canonical}\n\n" + "Strict rules:\n" + "- Output strings must be byte-for-byte identical to entries above.\n" + "- No articles, no adverbs, no extra words.\n" + "- If a phase truly has no canonical match, omit that span entirely.\n" + "Return the same JSON shape as before." + ) + # Append the correction as an additional user turn; the model + # sees the original prompt + its prior output is implied by the + # conversation context (the VLM client is stateless, so we + # re-send the original content plus this correction). + retry_messages = [ + { + "role": m.get("role", "user"), + "content": ( + m.get("content") + if isinstance(m.get("content"), str) + else list(m.get("content") or []) + ), + } + for m in original_messages + ] + retry_messages.append({"role": "user", "content": correction}) + return retry_messages + def _generate_plan( self, record: EpisodeRecord, # noqa: ARG002 (kept for signature stability) @@ -397,6 +611,7 @@ class PlanSubtasksMemoryModule: prior_memory=prior_memory or "(none)", completed_subtask=completed, remaining_subtasks=", ".join(remaining) if remaining else "(none)", + vocabulary_block=self._memory_vocabulary_block(), ) memory = self._vlm_field(self._text_message(prompt), "memory") return memory.strip() if isinstance(memory, str) else "" diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt new file mode 100644 index 000000000..00c29be4e --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt @@ -0,0 +1,53 @@ +You are inspecting {n_episodes} sample episode video(s) from a teleoperated +robot dataset. Every episode in the dataset performs the SAME task; the +user originally asked: "{episode_task}". + +Watch all the clips and produce a SHORT canonical vocabulary that every +episode in this dataset will reuse. The downstream low-level policy is +conditioned on these strings — duplicate phrasings (e.g. "grasp blue +cube" vs "pick up the blue cube") would destroy the conditioning, so +pick one wording per concept and reuse it everywhere. + +Decide how many entries each list needs YOURSELF based on what you see — +the smallest set that still covers every recurring phase in the demos. +A simple two-object pick-and-place might need ~6 subtask labels and 2 +memory milestones; a long multi-step recipe needs more. Err on the side +of FEWER — extra entries that don't recur across episodes weaken the +conditioning. + +You output two lists: + +1. `subtasks`: imperative, telegraphic commands the robot can execute. + - Verb-first. Drop articles, adverbs, qualifiers. + - Consistent object nouns (if the task says "cube", every subtask says + "cube" — never "block" / "object"). + - Atomic — one skill per subtask (gripper-open events, contact, regrasps, + transitions all become cut points). + - Each label must recur across the demos. If you see a motion only + once across all sample clips, it probably isn't a canonical phase. + - Good: "move to blue cube", "grasp blue cube", "lift blue cube", + "place blue cube in box", "release blue cube", "retract arm". + - Bad: "the robot arm moves towards the blue cube" (third person, + too long), "carefully pick up the cube" (adverb, article), + "carrying the yellow cube over the green basket" (gerund — should + be imperative "transport yellow cube to green basket"). + +2. `memory_milestones`: first-person past-tense sentences the running + memory composes from. Each subtask phase that produces a lasting + change should have a milestone; transient motions (move, retract) + should NOT. + - First person, past tense. Start with "I". + - One sentence. Functional outcome only — no grasp / motion detail. + - Good: "I picked up the blue cube.", "I placed the blue cube in + the green box.", "I wiped the counter." + - Bad: "The robot arm grasped the blue cube." (third person), + "I carefully grasped the blue cube with the parallel gripper." + (irrelevant detail), "I moved towards the blue cube." (transient + motion — should be omitted, not memorialised). + +Output strictly valid JSON of shape: + + {{ + "subtasks": ["", ...], + "memory_milestones": ["I .", ...] + }} diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt index b5278368b..d066b9f73 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt @@ -13,7 +13,7 @@ Previous memory: {prior_memory} Just-completed subtask: "{completed_subtask}" Remaining subtasks (for relevance judgement only): {remaining_subtasks} -Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the +{vocabulary_block}Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the robot has accomplished so far — the running story it would tell itself. Authoring rules: diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt index c530b1340..12bbcfba2 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt @@ -4,9 +4,9 @@ The user originally asked: "{episode_task}" You are shown the entire demonstration as a single video. Watch the whole clip, then segment it into a list of consecutive atomic subtasks -the robot performs. Write short, telegraphic action labels. +the robot performs. -Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts: +{vocabulary_block}Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts: - Each subtask = one atomic skill the low-level policy can execute. - Write each subtask as an IMPERATIVE COMMAND, starting with a verb: diff --git a/src/lerobot/annotations/steerable_pipeline/vocabulary.py b/src/lerobot/annotations/steerable_pipeline/vocabulary.py new file mode 100644 index 000000000..121cef849 --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/vocabulary.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Dataset-level canonical vocabulary discovery (Phase 0). + +The downstream consumer of these annotations is a low-level action expert +conditioned on the ``subtask`` string. Free-form per-episode LLM rephrasing +gives near-unique strings per occurrence, which collapses the action +expert's conditioning to noise and makes runtime subtask-paraphrase drift +catastrophic. The Hi-Robot / π0.6-MEM recipe ships a small canonical +vocabulary per environment (~10 strings) that every episode reuses; this +module derives that vocabulary automatically from the first few episode +videos and persists it next to the dataset. + +Pipeline-level flow: + + Phase 0 (here): watch N sample episodes → produce vocabulary.json + Phase 1 (plan module): reuse vocabulary on every episode, both as + prompt-side constraint *and* post-VLM validation + +The vocabulary is JSON, lives at ``/meta/canonical_vocabulary.json``, +and is human-inspectable / hand-editable — if the discovered set is wrong, +operators edit the file and re-run the pipeline without phase 0. +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Sequence +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from .config import VocabularyConfig +from .frames import FrameProvider, null_provider, to_video_block +from .prompts import load as load_prompt +from .reader import EpisodeRecord +from .vlm_client import VlmClient + +logger = logging.getLogger(__name__) + +VOCABULARY_FILENAME = "canonical_vocabulary.json" + + +@dataclass +class Vocabulary: + """Canonical phrasings shared across every episode of one dataset. + + Both lists are strict: per-episode subtask + memory generation pick + from these strings only; the downstream policy then has a small, + repeatable target distribution to learn instead of thousands of + LLM paraphrases. + """ + + subtasks: tuple[str, ...] + """Imperative subtask labels — what the low-level policy is conditioned + on. Verb-first, telegraphic, consistent object nouns. Example: + ``("move to blue cube", "grasp blue cube", "lift blue cube", + "place blue cube in box", "retract arm")``. + """ + + memory_milestones: tuple[str, ...] + """First-person past-tense milestone sentences — building blocks for + the running memory string. Example: ``("I picked up the blue cube.", + "I placed the blue cube in the green box.")``. Each milestone maps + 1:1 onto a completed subtask phase; ``memory_at_step_k`` is the + concatenation of milestones for completed phases. + """ + + def to_json(self) -> dict[str, list[str]]: + return { + "subtasks": list(self.subtasks), + "memory_milestones": list(self.memory_milestones), + } + + @classmethod + def from_json(cls, payload: dict[str, Any]) -> Vocabulary: + subtasks = tuple( + str(s).strip() for s in (payload.get("subtasks") or []) if str(s).strip() + ) + memory_milestones = tuple( + str(s).strip() for s in (payload.get("memory_milestones") or []) if str(s).strip() + ) + return cls(subtasks=subtasks, memory_milestones=memory_milestones) + + def is_empty(self) -> bool: + return not self.subtasks and not self.memory_milestones + + +def vocabulary_path(root: Path) -> Path: + """Return the canonical on-disk location for the vocabulary file.""" + return root / "meta" / VOCABULARY_FILENAME + + +def load_vocabulary(root: Path) -> Vocabulary | None: + """Read ``/meta/canonical_vocabulary.json`` if present. + + Returns ``None`` when the file does not exist — callers fall back to + free-form (unconstrained) subtask + memory generation, preserving the + pipeline's behaviour on datasets that never ran phase 0. + """ + path = vocabulary_path(root) + if not path.exists(): + return None + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + logger.warning("could not read %s: %s — proceeding without vocabulary", path, exc) + return None + if not isinstance(payload, dict): + logger.warning("%s is not a JSON object — ignoring", path) + return None + vocab = Vocabulary.from_json(payload) + if vocab.is_empty(): + return None + return vocab + + +def save_vocabulary(root: Path, vocab: Vocabulary) -> Path: + """Atomically persist ``vocab`` to ``/meta/canonical_vocabulary.json``.""" + path = vocabulary_path(root) + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text( + json.dumps(vocab.to_json(), indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + tmp.replace(path) + return path + + +@dataclass +class VocabularyDiscoveryModule: + """Derive a dataset-level canonical vocabulary from sample episodes. + + Phase 0 of the executor: pulls ``config.sample_episodes`` episode + videos, packs them into one Qwen-VL multi-video prompt, and asks the + model to enumerate the small set of canonical subtask labels + + memory milestones that recur across them. The output is persisted + to ``meta/canonical_vocabulary.json`` and consumed by phase 1. + """ + + vlm: VlmClient + config: VocabularyConfig + frame_provider: FrameProvider = field(default_factory=null_provider) + + @property + def enabled(self) -> bool: + return self.config.enabled + + def discover( + self, + records: Sequence[EpisodeRecord], + *, + existing: Vocabulary | None = None, + ) -> Vocabulary | None: + """Run vocabulary discovery against the first N sample episodes. + + ``existing`` short-circuits the VLM call when ``config.reuse_existing`` + is True and an on-disk vocabulary is already present — keeps re-runs + cheap and lets operators hand-edit the file without it getting + overwritten. + """ + if existing is not None and self.config.reuse_existing: + logger.info( + "vocabulary: reusing existing (%d subtasks, %d memory milestones)", + len(existing.subtasks), + len(existing.memory_milestones), + ) + return existing + + sample = list(records[: max(1, int(self.config.sample_episodes))]) + if not sample: + return None + + task_hint = next((r.episode_task for r in sample if r.episode_task), "") + prompt = load_prompt("module_0_vocabulary").format( + episode_task=task_hint or "(unspecified)", + n_episodes=len(sample), + ) + # Pack one video block per sample episode so the VLM sees the + # variation across episodes (different starting poses, different + # object placements) rather than overfitting to one trajectory. + content: list[dict[str, Any]] = [] + for record in sample: + video_frames = self.frame_provider.video_for_episode( + record, int(self.config.max_video_frames_per_episode) + ) + if video_frames: + content.extend(to_video_block(video_frames)) + content.append({"type": "text", "text": prompt}) + messages = [{"role": "user", "content": content}] + + result = self.vlm.generate_json([messages])[0] + if not isinstance(result, dict): + logger.warning("vocabulary: VLM did not return a JSON object — skipping") + return None + + vocab = Vocabulary.from_json(result) + if vocab.is_empty(): + logger.warning("vocabulary: VLM returned an empty vocabulary — skipping") + return None + logger.info( + "vocabulary: discovered %d subtask labels + %d memory milestones from %d episodes", + len(vocab.subtasks), + len(vocab.memory_milestones), + len(sample), + ) + return vocab diff --git a/src/lerobot/scripts/lerobot_annotate.py b/src/lerobot/scripts/lerobot_annotate.py index 86d3ab3fa..52309b827 100644 --- a/src/lerobot/scripts/lerobot_annotate.py +++ b/src/lerobot/scripts/lerobot_annotate.py @@ -40,6 +40,7 @@ from lerobot.annotations.steerable_pipeline.modules import ( ) from lerobot.annotations.steerable_pipeline.validator import StagingValidator from lerobot.annotations.steerable_pipeline.vlm_client import make_vlm_client +from lerobot.annotations.steerable_pipeline.vocabulary import VocabularyDiscoveryModule from lerobot.annotations.steerable_pipeline.writer import LanguageColumnsWriter from lerobot.configs import parser @@ -88,6 +89,9 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: vlm=vlm, config=cfg.interjections, seed=cfg.seed, frame_provider=frame_provider ) vqa = GeneralVqaModule(vlm=vlm, config=cfg.vqa, seed=cfg.seed, frame_provider=frame_provider) + vocabulary = VocabularyDiscoveryModule( + vlm=vlm, config=cfg.vocabulary, frame_provider=frame_provider + ) writer = LanguageColumnsWriter() validator = StagingValidator( dataset_camera_keys=tuple(getattr(frame_provider, "camera_keys", []) or []) or None, @@ -98,6 +102,7 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: plan=plan, interjections=interjections, vqa=vqa, + vocabulary=vocabulary, writer=writer, validator=validator, ) @@ -140,7 +145,7 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None: exist_ok=True, ) print(f"[lerobot-annotate] uploading {root} -> {repo_id}...", flush=True) - api.upload_folder( + commit_info = api.upload_folder( folder_path=str(root), repo_id=repo_id, repo_type="dataset", @@ -169,13 +174,18 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None: version_tag = ds_version except Exception as exc: # noqa: BLE001 print(f"[lerobot-annotate] could not read codebase_version from info.json ({exc}); falling back to {version_tag}", flush=True) + revision = getattr(commit_info, "oid", None) + tag_kwargs = { + "repo_id": repo_id, + "tag": version_tag, + "repo_type": "dataset", + "exist_ok": True, + } + if revision is not None: + tag_kwargs["revision"] = revision + try: - api.create_tag( - repo_id=repo_id, - tag=version_tag, - repo_type="dataset", - exist_ok=True, - ) + api.create_tag(**tag_kwargs) print(f"[lerobot-annotate] tagged {repo_id} as {version_tag}", flush=True) except Exception as exc: # noqa: BLE001 print( diff --git a/tests/annotations/test_vocabulary.py b/tests/annotations/test_vocabulary.py new file mode 100644 index 000000000..7b820834d --- /dev/null +++ b/tests/annotations/test_vocabulary.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Vocabulary-discovery phase (phase 0) tests.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from lerobot.annotations.steerable_pipeline.config import ( + PlanConfig, + VocabularyConfig, +) +from lerobot.annotations.steerable_pipeline.modules import PlanSubtasksMemoryModule +from lerobot.annotations.steerable_pipeline.reader import iter_episodes +from lerobot.annotations.steerable_pipeline.staging import EpisodeStaging +from lerobot.annotations.steerable_pipeline.vocabulary import ( + Vocabulary, + VocabularyDiscoveryModule, + load_vocabulary, + save_vocabulary, + vocabulary_path, +) + +from ._helpers import make_canned_responder + + +_CANONICAL_SUBTASKS = ( + "grasp blue cube", + "place blue cube in box", + "retract arm", +) +_CANONICAL_MEMORY = ( + "I picked up the blue cube.", + "I placed the blue cube in the box.", +) + + +# --------------------------------------------------------------------------- +# Vocabulary dataclass + on-disk round-trip +# --------------------------------------------------------------------------- + + +def test_vocabulary_roundtrip(tmp_path: Path) -> None: + vocab = Vocabulary( + subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY + ) + save_path = save_vocabulary(tmp_path, vocab) + assert save_path == vocabulary_path(tmp_path) + assert save_path.exists() + + loaded = load_vocabulary(tmp_path) + assert loaded is not None + assert loaded.subtasks == _CANONICAL_SUBTASKS + assert loaded.memory_milestones == _CANONICAL_MEMORY + + +def test_vocabulary_load_missing_returns_none(tmp_path: Path) -> None: + assert load_vocabulary(tmp_path) is None + + +def test_vocabulary_load_malformed_returns_none(tmp_path: Path) -> None: + path = vocabulary_path(tmp_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("{ not valid json", encoding="utf-8") + assert load_vocabulary(tmp_path) is None + + +def test_vocabulary_load_empty_payload_returns_none(tmp_path: Path) -> None: + path = vocabulary_path(tmp_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps({"subtasks": [], "memory_milestones": []}), encoding="utf-8") + assert load_vocabulary(tmp_path) is None + + +# --------------------------------------------------------------------------- +# Discovery module +# --------------------------------------------------------------------------- + + +def test_vocabulary_discovery_calls_vlm_and_returns_vocab( + fixture_dataset_root: Path, +) -> None: + vlm = make_canned_responder( + { + "canonical vocabulary": { + "subtasks": list(_CANONICAL_SUBTASKS), + "memory_milestones": list(_CANONICAL_MEMORY), + } + } + ) + module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig(sample_episodes=2)) + records = list(iter_episodes(fixture_dataset_root)) + vocab = module.discover(records) + assert vocab is not None + assert vocab.subtasks == _CANONICAL_SUBTASKS + assert vocab.memory_milestones == _CANONICAL_MEMORY + + +def test_vocabulary_discovery_reuses_existing(fixture_dataset_root: Path) -> None: + """``reuse_existing=True`` short-circuits the VLM call entirely.""" + + def _explode(_messages): # pragma: no cover - must not be called + raise AssertionError("VLM should not be invoked when reusing existing vocabulary") + + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + vlm = StubVlmClient(responder=_explode) + module = VocabularyDiscoveryModule( + vlm=vlm, config=VocabularyConfig(reuse_existing=True) + ) + records = list(iter_episodes(fixture_dataset_root)) + existing = Vocabulary(subtasks=("a", "b"), memory_milestones=("I a.",)) + vocab = module.discover(records, existing=existing) + assert vocab is existing + + +def test_vocabulary_discovery_empty_payload_returns_none( + fixture_dataset_root: Path, +) -> None: + vlm = make_canned_responder({"canonical vocabulary": {"subtasks": [], "memory_milestones": []}}) + module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig()) + records = list(iter_episodes(fixture_dataset_root)) + assert module.discover(records) is None + + +# --------------------------------------------------------------------------- +# PlanSubtasksMemoryModule consumes the vocabulary +# --------------------------------------------------------------------------- + + +def test_plan_module_inlines_vocab_into_subtask_prompt( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + captured: list[str] = [] + + def responder(messages): + # Find the last user text block and stash it for inspection. + for message in messages: + content = message.get("content") + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + captured.append(block.get("text", "")) + # Return canned subtasks; pick the first two canonical strings so + # the validator accepts them. + return { + "subtasks": [ + {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, + {"text": "place blue cube in box", "start": 0.4, "end": 0.9}, + ] + } + + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + # The subtask prompt (and the memory prompt) carries the canonical + # bullet list so the VLM can't paraphrase them away. + assert any("Canonical subtask labels:" in t for t in captured) + assert any("grasp blue cube" in t for t in captured) + + +def test_plan_module_accepts_article_only_difference( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """Articles like 'the'/'a'/'an' are stripped during validation.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + return { + "subtasks": [ + # Same canonical phrase modulo "the" — should be accepted. + {"text": "grasp the blue cube", "start": 0.0, "end": 0.4}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + assert subtask_texts == ["grasp blue cube"] + + +def test_plan_module_retries_when_subtask_off_vocab( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """One-shot retry replaces an off-vocab paraphrase with the canonical form.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + call_count = {"n": 0} + + def responder(messages): + call_count["n"] += 1 + # First call: returns an off-vocab paraphrase. + if call_count["n"] == 1: + return { + "subtasks": [ + # paraphrase, not in vocab + {"text": "pick up blue cube", "start": 0.0, "end": 0.4}, + ] + } + # Second call (the retry): should contain the correction prompt; + # respond with the canonical phrase exactly. + last_user_text = "" + for message in messages: + content = message.get("content") + if isinstance(content, str): + last_user_text = content + elif isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + last_user_text = block.get("text", "") + assert "NOT in the canonical vocabulary" in last_user_text + return { + "subtasks": [ + {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + assert subtask_texts == ["grasp blue cube"] + # The retry must have fired exactly once. + assert call_count["n"] == 2 + + +def test_plan_module_drops_off_vocab_subtask_after_retry( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """If the VLM stays off-vocab even after the retry, the bad span is dropped.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + call_count = {"n": 0} + + def responder(_messages): + call_count["n"] += 1 + # Both calls return the same off-vocab span — the model can't + # be corrected. The second call also returns one in-vocab span + # so the episode isn't empty; this lets us check that the + # off-vocab span is dropped without affecting the in-vocab one. + if call_count["n"] == 1: + return { + "subtasks": [ + {"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4}, + {"text": "grasp blue cube", "start": 0.4, "end": 0.9}, + ] + } + return { + "subtasks": [ + {"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4}, + {"text": "grasp blue cube", "start": 0.4, "end": 0.9}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + # Retry fired exactly once; bad span dropped, good span kept. + assert call_count["n"] == 2 + assert subtask_texts == ["grasp blue cube"] + + +def test_plan_module_bumps_collocated_subtasks_to_distinct_frames( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """Two subtasks whose starts snap to the same frame get split onto two frames. + + Without this guard, both spans would emit ``style=subtask`` rows at the + identical persistent timestamp; the training-time renderer's + ``active_at(t, style=subtask)`` then raises an ambiguity error. + """ + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + # Two canonical labels with starts within one frame of each other — + # both snap to the same source frame, so the dedupe pass must bump + # the later one to the next frame. + return { + "subtasks": [ + {"text": "grasp blue cube", "start": 0.40, "end": 0.42}, + {"text": "place blue cube in box", "start": 0.41, "end": 0.50}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_rows = [r for r in rows if r["style"] == "subtask"] + # Both subtasks present, both on distinct timestamps. + assert len(subtask_rows) == 2 + timestamps = [r["timestamp"] for r in subtask_rows] + assert len(set(timestamps)) == 2, f"subtask timestamps collide: {timestamps}" + # Order preserved: the chronologically earlier span keeps the earlier + # frame, the later one was bumped onto the next available frame. + assert subtask_rows[0]["content"] == "grasp blue cube" + assert subtask_rows[1]["content"] == "place blue cube in box" + assert subtask_rows[1]["timestamp"] > subtask_rows[0]["timestamp"] + + +def test_plan_module_empty_when_all_off_vocab_after_retry( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """All-off-vocab spans → episode comes out empty (no silent fuzzy snap).""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + # Returns the same off-vocab spans on both attempts. + return { + "subtasks": [ + {"text": "make a smoothie", "start": 0.0, "end": 0.4}, + {"text": "consult the wizard", "start": 0.4, "end": 0.9}, + ] + } + + vlm = StubVlmClient(responder=responder) + vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) + module = PlanSubtasksMemoryModule( + vlm=vlm, + config=PlanConfig(n_task_rephrasings=0), + vocabulary=vocab, + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + # No subtask gets fabricated — better to leave the episode empty + # so the operator notices the vocabulary gap than to silently + # warp the labels. + assert subtask_texts == [] + + +def test_plan_module_without_vocab_passes_through( + fixture_dataset_root: Path, tmp_path: Path +) -> None: + """No vocabulary configured → original free-form behavior is preserved.""" + from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient + + def responder(_messages): + return { + "subtasks": [ + {"text": "any free-form text the VLM wants", "start": 0.0, "end": 1.0}, + ] + } + + vlm = StubVlmClient(responder=responder) + module = PlanSubtasksMemoryModule( + vlm=vlm, config=PlanConfig(n_task_rephrasings=0) + ) + record = next(iter_episodes(fixture_dataset_root)) + staging = EpisodeStaging(tmp_path / "stage", record.episode_index) + module.run_episode(record, staging) + rows = staging.read("plan") + subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] + assert subtask_texts == ["any free-form text the VLM wants"] diff --git a/tests/scripts/test_lerobot_annotate.py b/tests/scripts/test_lerobot_annotate.py new file mode 100644 index 000000000..c98ee7cb3 --- /dev/null +++ b/tests/scripts/test_lerobot_annotate.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +import json +from types import SimpleNamespace + + +def test_push_to_hub_tags_uploaded_dataset_revision(tmp_path, monkeypatch): + from lerobot.scripts.lerobot_annotate import _push_to_hub + + root = tmp_path / "dataset" + (root / "meta").mkdir(parents=True) + (root / "meta" / "info.json").write_text(json.dumps({"codebase_version": "v3.0"})) + + calls = {} + + class FakeHfApi: + def create_repo(self, **kwargs): + calls["create_repo"] = kwargs + + def upload_folder(self, **kwargs): + calls["upload_folder"] = kwargs + return SimpleNamespace(oid="abc123") + + def create_tag(self, **kwargs): + calls["create_tag"] = kwargs + + monkeypatch.setattr("huggingface_hub.HfApi", FakeHfApi) + + cfg = SimpleNamespace( + repo_id="source/dataset", + dest_repo_id="annotated/dataset", + push_private=True, + push_commit_message=None, + ) + + _push_to_hub(root, cfg) + + assert calls["create_repo"] == { + "repo_id": "annotated/dataset", + "repo_type": "dataset", + "private": True, + "exist_ok": True, + } + assert calls["upload_folder"]["repo_id"] == "annotated/dataset" + assert calls["create_tag"] == { + "repo_id": "annotated/dataset", + "tag": "v3.0", + "repo_type": "dataset", + "exist_ok": True, + "revision": "abc123", + }