Merge origin/feat/language-annotation-pipeline (8 fix(annotate) commits + vocabulary phase)

This commit is contained in:
Pepijn
2026-05-25 15:47:25 +02:00
13 changed files with 1139 additions and 50 deletions
+17 -1
View File
@@ -7,7 +7,8 @@
## What the pipeline produces ## What the pipeline produces
Three modules write into a per-episode staging tree, then a single writer A vocabulary-discovery phase derives a small canonical wording, then three
modules write into a per-episode staging tree, then a single writer
rewrites the data shards in place: rewrites the data shards in place:
| Style / atom | Column | Module | | Style / atom | Column | Module |
@@ -20,6 +21,21 @@ rewrites the data shards in place:
| speech tool-call atom (`style=null`, `say`) | `language_events` | `interjections`| | speech tool-call atom (`style=null`, `say`) | `language_events` | `interjections`|
| `vqa` (user / assistant pair) | `language_events` | `vqa` | | `vqa` (user / assistant pair) | `language_events` | `vqa` |
The `plan` module is constrained to a **canonical vocabulary** discovered
once per dataset by the `vocabulary` module (phase 0). It watches a few
sample episode videos (`--vocabulary.sample_episodes`, default `3`) and
asks the VLM to derive a small set of imperative subtask labels and
first-person memory milestones that recur across the demos. The VLM
picks the right number of entries itself based on what it sees in the
clips — short pick-and-place demos get ~6 subtask labels, longer
multi-step recipes get more. The result lands at
`meta/canonical_vocabulary.json` (human-readable / hand-editable) and
is reused on every subsequent run. The `plan` module then constrains
both subtask + memory generation to those exact strings — the
downstream low-level policy sees a small, repeatable target
distribution instead of thousands of LLM paraphrases. Disable with
`--vocabulary.enabled=False` to fall back to free-form generation.
The writer does **not** add a `tools` column to the parquet — the tool The writer does **not** add a `tools` column to the parquet — the tool
catalog lives at `meta/info.json["tools"]` instead (see catalog lives at `meta/info.json["tools"]` instead (see
[Tools](./tools)). After every annotation run the pipeline ensures the [Tools](./tools)). After every annotation run the pipeline ensures the
+29 -36
View File
@@ -1,38 +1,22 @@
#!/usr/bin/env python #!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Launch ``lerobot-annotate`` on a Hugging Face job (vllm + Qwen3.6 MoE). """Launch ``lerobot-annotate`` on a Hugging Face job (vllm + Qwen3.6 MoE).
Spawns one ``h200x2`` job that: Spawns one ``h200x2`` job that:
1. installs this branch of ``lerobot`` plus the annotation extras, 1. installs this branch of ``lerobot`` plus the annotation extras,
2. boots two vllm servers (one per GPU) with Qwen3.6-35B-A3B-FP8, 2. boots two vllm servers (one per GPU) with Qwen3.6-35B-A3B-FP8,
3. runs the plan / interjections / vqa modules across the dataset, 3. discovers the dataset's canonical subtask + memory vocabulary
4. uploads the annotated dataset back to ``--repo_id`` (or to from the first 3 sample episodes (phase 0),
``--dest_repo_id`` when set). 4. runs the plan / interjections / vqa modules across the dataset
(subtasks + memory are constrained to the canonical vocabulary),
``--repo_id`` is the download source and, with ``--push_to_hub=true``, also 5. uploads the annotated dataset to ``--dest_repo_id`` (when set)
the default upload destination — the job annotates the dataset in place. or back to ``--repo_id``.
Pass ``--dest_repo_id`` to push the result to a separate repo instead and
leave the source untouched.
Usage: Usage:
HF_TOKEN=hf_... uv run python examples/annotations/run_hf_job.py HF_TOKEN=hf_... uv run python examples/annotations/run_hf_job.py
Adjust ``CMD`` below to point at your own dataset. Adjust ``CMD`` below to point at your own dataset / target hub repo.
""" """
import os import os
@@ -48,19 +32,14 @@ CMD = (
"pip install --no-deps " "pip install --no-deps "
"'lerobot @ git+https://github.com/huggingface/lerobot.git@feat/language-annotation-pipeline' && " "'lerobot @ git+https://github.com/huggingface/lerobot.git@feat/language-annotation-pipeline' && "
"pip install --upgrade-strategy only-if-needed " "pip install --upgrade-strategy only-if-needed "
# Mirror lerobot's [annotations] runtime deps. ``openai`` is required "datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include toml typing-inspect "
# because ``VlmConfig.backend`` defaults to ``"openai"`` (which talks "openai && "
# to a vllm/transformers/ktransformers OpenAI-compatible server).
"datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include "
"toml typing-inspect openai && "
"export VLLM_MEMORY_PROFILER_ESTIMATE_CUDAGRAPHS=0 && " "export VLLM_MEMORY_PROFILER_ESTIMATE_CUDAGRAPHS=0 && "
"export VLLM_VIDEO_BACKEND=pyav && " "export VLLM_VIDEO_BACKEND=pyav && "
"lerobot-annotate " "lerobot-annotate "
# The dataset to annotate. By default it is also the push destination "--repo_id=imstevenpmwork/super_poulain_draft "
# (annotate in place); pass --dest_repo_id to push to a separate repo. "--dest_repo_id=pepijn223/super_poulain_vocab "
"--repo_id=<your-org>/<your-dataset> "
"--push_to_hub=true " "--push_to_hub=true "
# "--dest_repo_id=<your-org>/<your-annotated-dataset> "
"--vlm.backend=openai " "--vlm.backend=openai "
"--vlm.model_id=Qwen/Qwen3.6-35B-A3B-FP8 " "--vlm.model_id=Qwen/Qwen3.6-35B-A3B-FP8 "
"--vlm.parallel_servers=2 " "--vlm.parallel_servers=2 "
@@ -69,15 +48,29 @@ CMD = (
"--tensor-parallel-size 1 --max-model-len 32768 " "--tensor-parallel-size 1 --max-model-len 32768 "
'--gpu-memory-utilization 0.8 --uvicorn-log-level warning --port {port}" ' '--gpu-memory-utilization 0.8 --uvicorn-log-level warning --port {port}" '
"--vlm.serve_ready_timeout_s=1800 " "--vlm.serve_ready_timeout_s=1800 "
"--vlm.client_concurrency=256 " "--vlm.client_concurrency=128 "
"--vlm.max_new_tokens=512 " "--vlm.max_new_tokens=512 "
"--executor.episode_parallelism=32 " "--vlm.temperature=0.7 "
"--vlm.chat_template_kwargs='{enable_thinking: false}' " "--executor.episode_parallelism=16 "
"--vlm.chat_template_kwargs='{\"enable_thinking\": false}' "
"--vlm.camera_key=observation.images.wrist " "--vlm.camera_key=observation.images.wrist "
# Phase 0 — canonical vocabulary discovery from the first N sample
# episodes. The VLM picks the right number of subtask + memory
# entries itself from what it sees; the resulting
# meta/canonical_vocabulary.json constrains every subtask + memory
# string to a small repeatable target distribution.
"--vocabulary.sample_episodes=3 "
# Phase 1 — plan module (subtasks + plan + memory + task_aug).
"--plan.frames_per_second=1.0 " "--plan.frames_per_second=1.0 "
"--plan.use_video_url=true " "--plan.use_video_url=true "
"--plan.use_video_url_fps=1.0 " "--plan.use_video_url_fps=1.0 "
"--vqa.K=1 --vqa.vqa_emission_hz=0.2" "--plan.derive_task_from_video=always "
"--plan.n_task_rephrasings=30 "
# Phase 2 — interjections + speech.
"--interjections.max_interjections_per_episode=6 "
# Phase 4 — general VQA.
"--vqa.K=3 "
"--vqa.vqa_emission_hz=1.0"
) )
job = run_job( job = run_job(
@@ -26,11 +26,25 @@ outputs are staged per-episode before a final parquet rewrite:
from .config import AnnotationPipelineConfig from .config import AnnotationPipelineConfig
from .validator import StagingValidator, ValidationReport from .validator import StagingValidator, ValidationReport
from .vocabulary import (
VOCABULARY_FILENAME,
Vocabulary,
VocabularyDiscoveryModule,
load_vocabulary,
save_vocabulary,
vocabulary_path,
)
from .writer import LanguageColumnsWriter from .writer import LanguageColumnsWriter
__all__ = [ __all__ = [
"VOCABULARY_FILENAME",
"AnnotationPipelineConfig", "AnnotationPipelineConfig",
"LanguageColumnsWriter", "LanguageColumnsWriter",
"StagingValidator", "StagingValidator",
"ValidationReport", "ValidationReport",
"Vocabulary",
"VocabularyDiscoveryModule",
"load_vocabulary",
"save_vocabulary",
"vocabulary_path",
] ]
@@ -21,6 +21,41 @@ from pathlib import Path
from typing import Any from typing import Any
@dataclass
class VocabularyConfig:
"""Phase 0 — dataset-level canonical vocabulary discovery.
Watches the first ``sample_episodes`` episode videos and asks the VLM
to derive a small canonical vocabulary (subtask labels + memory
milestones) that every episode in the dataset will reuse. The VLM
decides the count itself from what it sees in the clips — short
pick-and-place demos get ~6 labels, longer multi-step recipes more.
The output lands at ``meta/canonical_vocabulary.json`` and feeds
phase 1's subtask + memory generation as both a prompt-side
constraint and a post-VLM validation gate.
Why this exists: free-form LLM rephrasing per episode produces near-
unique subtask strings, which makes the downstream low-level policy's
conditioning effectively noise — at inference the policy generates a
*new* paraphrase the action expert has never seen and produces tiny
cautious actions. Forcing every episode onto the same small set of
canonical strings gives the action expert dense supervision per
string and a small target distribution to learn against.
Set ``enabled=False`` to fall back to free-form generation (original
behaviour). ``reuse_existing=True`` keeps a hand-edited vocabulary
file from being clobbered on re-runs.
"""
enabled: bool = True
sample_episodes: int = 3
max_video_frames_per_episode: int = 32
# When True (default), an existing meta/canonical_vocabulary.json is
# loaded as-is and no VLM call is made — lets operators hand-edit the
# file. Set False to always rediscover from the sample episodes.
reuse_existing: bool = True
@dataclass @dataclass
class PlanConfig: class PlanConfig:
"""``plan`` module: plan + subtasks + memory + task augmentation. """``plan`` module: plan + subtasks + memory + task augmentation.
@@ -102,7 +137,7 @@ class VlmConfig:
# ``openai`` talks to a local OpenAI-compatible server; the CLI # ``openai`` talks to a local OpenAI-compatible server; the CLI
# auto-spawns one when ``auto_serve=True``. # auto-spawns one when ``auto_serve=True``.
backend: str = "openai" backend: str = "openai"
model_id: str = "Qwen/Qwen2.5-VL-7B-Instruct" model_id: str = "Qwen/Qwen3.6-35B-A3B-FP8"
# OpenAI-compatible server endpoint; ``EMPTY`` works for local servers. # OpenAI-compatible server endpoint; ``EMPTY`` works for local servers.
api_base: str = "http://localhost:8000/v1" api_base: str = "http://localhost:8000/v1"
@@ -186,6 +221,7 @@ class AnnotationPipelineConfig:
seed: int = 1729 seed: int = 1729
vocabulary: VocabularyConfig = field(default_factory=VocabularyConfig)
plan: PlanConfig = field(default_factory=PlanConfig) plan: PlanConfig = field(default_factory=PlanConfig)
interjections: InterjectionsConfig = field(default_factory=InterjectionsConfig) interjections: InterjectionsConfig = field(default_factory=InterjectionsConfig)
vqa: VqaConfig = field(default_factory=VqaConfig) vqa: VqaConfig = field(default_factory=VqaConfig)
@@ -15,8 +15,14 @@
# limitations under the License. # limitations under the License.
"""In-process executor that runs the annotation phases. """In-process executor that runs the annotation phases.
The executor plans **six phases** in the dependency order from the plan: The executor plans **seven phases** in the dependency order from the plan:
phase 0: vocabulary discovery — derive a small canonical vocabulary
from the first few sample-episode videos (subtask labels +
memory milestones) and persist it next to the dataset; the
``plan`` module then constrains every per-episode generation
to those strings, so the downstream policy sees a small,
repeatable conditioning distribution
phase 1: ``plan`` module (plan + subtasks + memory) phase 1: ``plan`` module (plan + subtasks + memory)
phase 2: ``interjections`` module (interjections + speech) phase 2: ``interjections`` module (interjections + speech)
phase 3: ``plan`` plan-update pass — re-runs plan emission at every phase 3: ``plan`` plan-update pass — re-runs plan emission at every
@@ -88,6 +94,7 @@ class Executor:
vqa: Any # GeneralVqaModule vqa: Any # GeneralVqaModule
writer: LanguageColumnsWriter writer: LanguageColumnsWriter
validator: StagingValidator validator: StagingValidator
vocabulary: Any = None # VocabularyDiscoveryModule | None
def run(self, root: Path) -> PipelineRunSummary: def run(self, root: Path) -> PipelineRunSummary:
records = list(iter_episodes(root, only_episodes=self.config.only_episodes)) records = list(iter_episodes(root, only_episodes=self.config.only_episodes))
@@ -102,6 +109,10 @@ class Executor:
phases: list[PhaseResult] = [] phases: list[PhaseResult] = []
# Phase 0: vocabulary discovery. Mutates ``self.plan.vocabulary``
# so subsequent per-episode plan calls see the canonical labels.
phases.append(self._run_vocabulary_phase(records, root))
# Phase 1: ``plan`` module (plan + subtasks + memory) # Phase 1: ``plan`` module (plan + subtasks + memory)
phases.append(self._run_module_phase("plan", records, staging_dir, self.plan)) phases.append(self._run_module_phase("plan", records, staging_dir, self.plan))
# Phase 2: ``interjections`` module (interjections + speech). It # Phase 2: ``interjections`` module (interjections + speech). It
@@ -172,6 +183,62 @@ class Executor:
flush=True, flush=True,
) )
def _run_vocabulary_phase(
self, records: list[EpisodeRecord], root: Path
) -> PhaseResult:
"""Discover (or load) the canonical vocabulary, wire it into ``self.plan``.
Returns a ``PhaseResult`` whose ``episodes_processed`` is the number
of sample episodes consulted (0 when disabled or no VLM call was
needed); ``episodes_skipped`` is always ``0`` because vocabulary is
a once-per-dataset artifact, not a per-episode product.
"""
from .vocabulary import load_vocabulary, save_vocabulary # noqa: PLC0415
if self.vocabulary is None or not getattr(self.vocabulary, "enabled", False):
print(
"[annotate] phase=vocabulary skipped (module disabled or unset)",
flush=True,
)
return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0)
existing = load_vocabulary(root)
if existing is not None and self.config.vocabulary.reuse_existing:
print(
f"[annotate] phase=vocabulary reusing {root / 'meta' / 'canonical_vocabulary.json'} "
f"({len(existing.subtasks)} subtask labels, "
f"{len(existing.memory_milestones)} memory milestones)",
flush=True,
)
self.plan.vocabulary = existing
return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0)
sample_n = max(1, min(int(self.config.vocabulary.sample_episodes), len(records)))
print(
f"[annotate] phase=vocabulary discovering from {sample_n} sample episode(s)...",
flush=True,
)
t0 = time.time()
vocab = self.vocabulary.discover(records[:sample_n], existing=existing)
if vocab is None:
print(
"[annotate] phase=vocabulary returned no vocabulary — "
"plan module will fall back to free-form generation",
flush=True,
)
return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0)
save_path = save_vocabulary(root, vocab)
print(
f"[annotate] phase=vocabulary wrote {save_path} "
f"({len(vocab.subtasks)} subtask labels, "
f"{len(vocab.memory_milestones)} memory milestones) in "
f"{time.time() - t0:.1f}s",
flush=True,
)
self.plan.vocabulary = vocab
return PhaseResult(name="vocabulary", episodes_processed=sample_n, episodes_skipped=0)
def _run_module_phase( def _run_module_phase(
self, self,
name: str, name: str,
@@ -17,6 +17,7 @@
from __future__ import annotations from __future__ import annotations
import logging
from collections.abc import Sequence from collections.abc import Sequence
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
@@ -34,6 +35,9 @@ from ..prompts import load as load_prompt
from ..reader import EpisodeRecord, reconstruct_subtask_spans, snap_to_frame from ..reader import EpisodeRecord, reconstruct_subtask_spans, snap_to_frame
from ..staging import EpisodeStaging from ..staging import EpisodeStaging
from ..vlm_client import VlmClient from ..vlm_client import VlmClient
from ..vocabulary import Vocabulary
logger = logging.getLogger(__name__)
@dataclass @dataclass
@@ -54,6 +58,11 @@ class PlanSubtasksMemoryModule:
vlm: VlmClient vlm: VlmClient
config: PlanConfig config: PlanConfig
frame_provider: FrameProvider = field(default_factory=null_provider) frame_provider: FrameProvider = field(default_factory=null_provider)
vocabulary: Vocabulary | None = None
"""When set, the module constrains subtask + memory generation to the
canonical strings in ``vocabulary``. Phase 0 (vocabulary discovery)
populates this once per dataset; ``None`` falls back to free-form
generation (original behaviour)."""
@property @property
def enabled(self) -> bool: def enabled(self) -> bool:
@@ -311,8 +320,28 @@ class PlanSubtasksMemoryModule:
min_subtask_seconds=self.config.min_subtask_seconds, min_subtask_seconds=self.config.min_subtask_seconds,
max_steps=self.config.plan_max_steps, max_steps=self.config.plan_max_steps,
episode_duration=f"{episode_duration:.3f}", episode_duration=f"{episode_duration:.3f}",
vocabulary_block=self._subtask_vocabulary_block(),
) )
spans = self._vlm_field(self._video_message(record, prompt), "subtasks") messages = self._video_message(record, prompt)
spans = self._vlm_field(messages, "subtasks")
# When a vocabulary is in force, do a single targeted retry if
# any returned subtask is off-vocab — strict exact-match only,
# no fuzzy snapping. The retry includes the offending strings
# and the full canonical list so the VLM can correct itself.
if self.vocabulary is not None and self.vocabulary.subtasks and spans:
invalid = self._invalid_subtasks(spans)
if invalid:
logger.info(
"episode %d: VLM emitted %d off-vocab subtask(s) (%s); retrying once",
record.episode_index,
len(invalid),
invalid,
)
retry_msg = self._build_subtask_retry_message(messages, invalid)
retried = self._vlm_field(retry_msg, "subtasks")
if retried:
spans = retried
if not spans: if not spans:
return [] return []
# clamp to [t0, t_last] and sort # clamp to [t0, t_last] and sort
@@ -330,12 +359,197 @@ class PlanSubtasksMemoryModule:
end = max(t0, min(end, t_last)) end = max(t0, min(end, t_last))
if end < start: if end < start:
start, end = end, start start, end = end, start
if not text:
continue
text = self._canonicalize_subtask(text)
if not text: if not text:
continue continue
cleaned.append({"text": text, "start": start, "end": end}) cleaned.append({"text": text, "start": start, "end": end})
cleaned.sort(key=lambda s: s["start"]) cleaned.sort(key=lambda s: s["start"])
cleaned = self._dedupe_starts_to_distinct_frames(cleaned, record)
if self.vocabulary is not None and self.vocabulary.subtasks and not cleaned:
logger.warning(
"episode %d: every VLM subtask was off-vocab even after retry — "
"episode left empty (extend meta/canonical_vocabulary.json to "
"cover the missing phase)",
record.episode_index,
)
return cleaned return cleaned
@staticmethod
def _dedupe_starts_to_distinct_frames(
spans: list[dict[str, Any]], record: EpisodeRecord
) -> list[dict[str, Any]]:
"""Bump same-frame subtask starts onto distinct frames.
Two consecutive VLM spans whose ``start`` rounds to the same
source frame (after :func:`snap_to_frame`) would otherwise emit
two ``style=subtask`` rows at the identical persistent
timestamp. The training-time renderer's ``active_at(t,
style=subtask)`` resolver can't disambiguate that and raises
``Ambiguous resolver for style='subtask'``.
Walk the (sorted-by-start) spans, snap each to its frame, and
if the snapped frame is already taken push the span onto the
next unused frame so both subtasks survive on distinct
timestamps. If the episode ends before a free frame is found,
the trailing span is dropped with a warning — better than
poisoning the render.
"""
if not spans:
return spans
frames = record.frame_timestamps
if not frames:
return spans
used: set[float] = set()
out: list[dict[str, Any]] = []
for span in spans:
ts = snap_to_frame(span["start"], frames)
if ts in used:
next_ts = next((f for f in frames if f > ts and f not in used), None)
if next_ts is None:
logger.warning(
"episode %d: subtask %r snapped to occupied frame "
"%.3f and no free later frame exists — dropping",
record.episode_index,
span.get("text"),
ts,
)
continue
ts = next_ts
used.add(ts)
new_span = {**span, "start": ts}
if float(new_span.get("end", ts)) < ts:
new_span["end"] = ts
out.append(new_span)
return out
# ------------------------------------------------------------------
# Canonical-vocabulary helpers
# ------------------------------------------------------------------
def _subtask_vocabulary_block(self) -> str:
"""Bullet-list of canonical subtasks the VLM must pick from.
Returns an empty string when no vocabulary is configured —
``module_1_subtasks.txt`` then falls back to its free-form
rules (original behaviour).
"""
if self.vocabulary is None or not self.vocabulary.subtasks:
return ""
bullets = "\n".join(f"- {s}" for s in self.vocabulary.subtasks)
return (
"You MUST choose each subtask label verbatim from this canonical "
"vocabulary — pick the closest match for each phase of the demo, "
"and reuse the SAME string every time that phase recurs. The "
"low-level policy is conditioned on these exact strings; any "
"novel paraphrase you invent will make its conditioning OOD.\n"
"Canonical subtask labels:\n"
f"{bullets}\n\n"
)
def _memory_vocabulary_block(self) -> str:
"""Bullet-list of canonical memory milestones the VLM must pick from."""
if self.vocabulary is None or not self.vocabulary.memory_milestones:
return ""
bullets = "\n".join(f"- {m}" for m in self.vocabulary.memory_milestones)
return (
"Compose the memory by picking ONLY from this canonical milestone "
"list — append a milestone (or rewrite the running memory to "
"compress past ones) using these exact phrases. Do not invent new "
"wording: every paraphrase weakens the downstream conditioning.\n"
"Canonical memory milestones:\n"
f"{bullets}\n\n"
)
_NORMALIZE_STRIP_TOKENS: frozenset[str] = frozenset({"the", "a", "an"})
def _canonicalize_subtask(self, text: str) -> str:
"""Validate ``text`` against the canonical vocabulary; no fuzzy snap.
Without a vocabulary, the original text passes through. With a
vocabulary, accept the span only if its normalised form (lower-
cased, articles stripped, whitespace collapsed) matches a
canonical entry exactly — the canonical wording is returned so
the supervised string is byte-identical across episodes.
Off-vocab spans are dropped (empty string). Upstream
``_generate_subtasks`` triggers a targeted retry before reaching
the drop path; this function never snaps or warps a span into
a different label.
"""
if self.vocabulary is None or not self.vocabulary.subtasks:
return text.strip()
normalised = self._normalize(text)
if not normalised:
return ""
for candidate in self.vocabulary.subtasks:
if self._normalize(candidate) == normalised:
return candidate
return ""
@classmethod
def _normalize(cls, text: str) -> str:
"""Lowercase, strip articles, collapse whitespace, drop punctuation."""
words = [
w.strip(".,:;\"'!?()")
for w in text.lower().replace(",", " ").split()
]
return " ".join(w for w in words if w and w not in cls._NORMALIZE_STRIP_TOKENS)
def _invalid_subtasks(self, spans: list[dict[str, Any]]) -> list[str]:
"""Return the unique off-vocab subtask strings the VLM produced."""
seen: list[str] = []
for span in spans:
text = str((span or {}).get("text") or "").strip()
if not text:
continue
if self._canonicalize_subtask(text):
continue
if text not in seen:
seen.append(text)
return seen
def _build_subtask_retry_message(
self, original_messages: list[dict[str, Any]], invalid: list[str]
) -> list[dict[str, Any]]:
"""Compose a one-shot correction prompt naming the off-vocab strings."""
assert self.vocabulary is not None
canonical = "\n".join(f"- {s}" for s in self.vocabulary.subtasks)
invalid_list = "\n".join(f"- {s!r}" for s in invalid)
correction = (
"Your previous response included subtask labels that are NOT in "
"the canonical vocabulary:\n"
f"{invalid_list}\n\n"
"Re-emit the same segmentation (same number of spans, same start/end "
"timestamps where they were valid) but replace every off-vocab "
"label with the EXACT canonical string for that phase, copied "
"verbatim from this list:\n"
f"{canonical}\n\n"
"Strict rules:\n"
"- Output strings must be byte-for-byte identical to entries above.\n"
"- No articles, no adverbs, no extra words.\n"
"- If a phase truly has no canonical match, omit that span entirely.\n"
"Return the same JSON shape as before."
)
# Append the correction as an additional user turn; the model
# sees the original prompt + its prior output is implied by the
# conversation context (the VLM client is stateless, so we
# re-send the original content plus this correction).
retry_messages = [
{
"role": m.get("role", "user"),
"content": (
m.get("content")
if isinstance(m.get("content"), str)
else list(m.get("content") or [])
),
}
for m in original_messages
]
retry_messages.append({"role": "user", "content": correction})
return retry_messages
def _generate_plan( def _generate_plan(
self, self,
record: EpisodeRecord, # noqa: ARG002 (kept for signature stability) record: EpisodeRecord, # noqa: ARG002 (kept for signature stability)
@@ -397,6 +611,7 @@ class PlanSubtasksMemoryModule:
prior_memory=prior_memory or "(none)", prior_memory=prior_memory or "(none)",
completed_subtask=completed, completed_subtask=completed,
remaining_subtasks=", ".join(remaining) if remaining else "(none)", remaining_subtasks=", ".join(remaining) if remaining else "(none)",
vocabulary_block=self._memory_vocabulary_block(),
) )
memory = self._vlm_field(self._text_message(prompt), "memory") memory = self._vlm_field(self._text_message(prompt), "memory")
return memory.strip() if isinstance(memory, str) else "" return memory.strip() if isinstance(memory, str) else ""
@@ -0,0 +1,53 @@
You are inspecting {n_episodes} sample episode video(s) from a teleoperated
robot dataset. Every episode in the dataset performs the SAME task; the
user originally asked: "{episode_task}".
Watch all the clips and produce a SHORT canonical vocabulary that every
episode in this dataset will reuse. The downstream low-level policy is
conditioned on these strings — duplicate phrasings (e.g. "grasp blue
cube" vs "pick up the blue cube") would destroy the conditioning, so
pick one wording per concept and reuse it everywhere.
Decide how many entries each list needs YOURSELF based on what you see —
the smallest set that still covers every recurring phase in the demos.
A simple two-object pick-and-place might need ~6 subtask labels and 2
memory milestones; a long multi-step recipe needs more. Err on the side
of FEWER — extra entries that don't recur across episodes weaken the
conditioning.
You output two lists:
1. `subtasks`: imperative, telegraphic commands the robot can execute.
- Verb-first. Drop articles, adverbs, qualifiers.
- Consistent object nouns (if the task says "cube", every subtask says
"cube" — never "block" / "object").
- Atomic — one skill per subtask (gripper-open events, contact, regrasps,
transitions all become cut points).
- Each label must recur across the demos. If you see a motion only
once across all sample clips, it probably isn't a canonical phase.
- Good: "move to blue cube", "grasp blue cube", "lift blue cube",
"place blue cube in box", "release blue cube", "retract arm".
- Bad: "the robot arm moves towards the blue cube" (third person,
too long), "carefully pick up the cube" (adverb, article),
"carrying the yellow cube over the green basket" (gerund — should
be imperative "transport yellow cube to green basket").
2. `memory_milestones`: first-person past-tense sentences the running
memory composes from. Each subtask phase that produces a lasting
change should have a milestone; transient motions (move, retract)
should NOT.
- First person, past tense. Start with "I".
- One sentence. Functional outcome only — no grasp / motion detail.
- Good: "I picked up the blue cube.", "I placed the blue cube in
the green box.", "I wiped the counter."
- Bad: "The robot arm grasped the blue cube." (third person),
"I carefully grasped the blue cube with the parallel gripper."
(irrelevant detail), "I moved towards the blue cube." (transient
motion — should be omitted, not memorialised).
Output strictly valid JSON of shape:
{{
"subtasks": ["<verb phrase>", ...],
"memory_milestones": ["I <past-tense sentence>.", ...]
}}
@@ -13,7 +13,7 @@ Previous memory: {prior_memory}
Just-completed subtask: "{completed_subtask}" Just-completed subtask: "{completed_subtask}"
Remaining subtasks (for relevance judgement only): {remaining_subtasks} Remaining subtasks (for relevance judgement only): {remaining_subtasks}
Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the {vocabulary_block}Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the
robot has accomplished so far — the running story it would tell itself. robot has accomplished so far — the running story it would tell itself.
Authoring rules: Authoring rules:
@@ -4,9 +4,9 @@ The user originally asked: "{episode_task}"
You are shown the entire demonstration as a single video. Watch the You are shown the entire demonstration as a single video. Watch the
whole clip, then segment it into a list of consecutive atomic subtasks whole clip, then segment it into a list of consecutive atomic subtasks
the robot performs. Write short, telegraphic action labels. the robot performs.
Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts: {vocabulary_block}Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts:
- Each subtask = one atomic skill the low-level policy can execute. - Each subtask = one atomic skill the low-level policy can execute.
- Write each subtask as an IMPERATIVE COMMAND, starting with a verb: - Write each subtask as an IMPERATIVE COMMAND, starting with a verb:
@@ -0,0 +1,222 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dataset-level canonical vocabulary discovery (Phase 0).
The downstream consumer of these annotations is a low-level action expert
conditioned on the ``subtask`` string. Free-form per-episode LLM rephrasing
gives near-unique strings per occurrence, which collapses the action
expert's conditioning to noise and makes runtime subtask-paraphrase drift
catastrophic. The Hi-Robot / π0.6-MEM recipe ships a small canonical
vocabulary per environment (~10 strings) that every episode reuses; this
module derives that vocabulary automatically from the first few episode
videos and persists it next to the dataset.
Pipeline-level flow:
Phase 0 (here): watch N sample episodes produce vocabulary.json
Phase 1 (plan module): reuse vocabulary on every episode, both as
prompt-side constraint *and* post-VLM validation
The vocabulary is JSON, lives at ``<root>/meta/canonical_vocabulary.json``,
and is human-inspectable / hand-editable if the discovered set is wrong,
operators edit the file and re-run the pipeline without phase 0.
"""
from __future__ import annotations
import json
import logging
from collections.abc import Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from .config import VocabularyConfig
from .frames import FrameProvider, null_provider, to_video_block
from .prompts import load as load_prompt
from .reader import EpisodeRecord
from .vlm_client import VlmClient
logger = logging.getLogger(__name__)
VOCABULARY_FILENAME = "canonical_vocabulary.json"
@dataclass
class Vocabulary:
"""Canonical phrasings shared across every episode of one dataset.
Both lists are strict: per-episode subtask + memory generation pick
from these strings only; the downstream policy then has a small,
repeatable target distribution to learn instead of thousands of
LLM paraphrases.
"""
subtasks: tuple[str, ...]
"""Imperative subtask labels — what the low-level policy is conditioned
on. Verb-first, telegraphic, consistent object nouns. Example:
``("move to blue cube", "grasp blue cube", "lift blue cube",
"place blue cube in box", "retract arm")``.
"""
memory_milestones: tuple[str, ...]
"""First-person past-tense milestone sentences — building blocks for
the running memory string. Example: ``("I picked up the blue cube.",
"I placed the blue cube in the green box.")``. Each milestone maps
1:1 onto a completed subtask phase; ``memory_at_step_k`` is the
concatenation of milestones for completed phases.
"""
def to_json(self) -> dict[str, list[str]]:
return {
"subtasks": list(self.subtasks),
"memory_milestones": list(self.memory_milestones),
}
@classmethod
def from_json(cls, payload: dict[str, Any]) -> Vocabulary:
subtasks = tuple(
str(s).strip() for s in (payload.get("subtasks") or []) if str(s).strip()
)
memory_milestones = tuple(
str(s).strip() for s in (payload.get("memory_milestones") or []) if str(s).strip()
)
return cls(subtasks=subtasks, memory_milestones=memory_milestones)
def is_empty(self) -> bool:
return not self.subtasks and not self.memory_milestones
def vocabulary_path(root: Path) -> Path:
"""Return the canonical on-disk location for the vocabulary file."""
return root / "meta" / VOCABULARY_FILENAME
def load_vocabulary(root: Path) -> Vocabulary | None:
"""Read ``<root>/meta/canonical_vocabulary.json`` if present.
Returns ``None`` when the file does not exist callers fall back to
free-form (unconstrained) subtask + memory generation, preserving the
pipeline's behaviour on datasets that never ran phase 0.
"""
path = vocabulary_path(root)
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
logger.warning("could not read %s: %s — proceeding without vocabulary", path, exc)
return None
if not isinstance(payload, dict):
logger.warning("%s is not a JSON object — ignoring", path)
return None
vocab = Vocabulary.from_json(payload)
if vocab.is_empty():
return None
return vocab
def save_vocabulary(root: Path, vocab: Vocabulary) -> Path:
"""Atomically persist ``vocab`` to ``<root>/meta/canonical_vocabulary.json``."""
path = vocabulary_path(root)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(
json.dumps(vocab.to_json(), indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
tmp.replace(path)
return path
@dataclass
class VocabularyDiscoveryModule:
"""Derive a dataset-level canonical vocabulary from sample episodes.
Phase 0 of the executor: pulls ``config.sample_episodes`` episode
videos, packs them into one Qwen-VL multi-video prompt, and asks the
model to enumerate the small set of canonical subtask labels +
memory milestones that recur across them. The output is persisted
to ``meta/canonical_vocabulary.json`` and consumed by phase 1.
"""
vlm: VlmClient
config: VocabularyConfig
frame_provider: FrameProvider = field(default_factory=null_provider)
@property
def enabled(self) -> bool:
return self.config.enabled
def discover(
self,
records: Sequence[EpisodeRecord],
*,
existing: Vocabulary | None = None,
) -> Vocabulary | None:
"""Run vocabulary discovery against the first N sample episodes.
``existing`` short-circuits the VLM call when ``config.reuse_existing``
is True and an on-disk vocabulary is already present keeps re-runs
cheap and lets operators hand-edit the file without it getting
overwritten.
"""
if existing is not None and self.config.reuse_existing:
logger.info(
"vocabulary: reusing existing (%d subtasks, %d memory milestones)",
len(existing.subtasks),
len(existing.memory_milestones),
)
return existing
sample = list(records[: max(1, int(self.config.sample_episodes))])
if not sample:
return None
task_hint = next((r.episode_task for r in sample if r.episode_task), "")
prompt = load_prompt("module_0_vocabulary").format(
episode_task=task_hint or "(unspecified)",
n_episodes=len(sample),
)
# Pack one video block per sample episode so the VLM sees the
# variation across episodes (different starting poses, different
# object placements) rather than overfitting to one trajectory.
content: list[dict[str, Any]] = []
for record in sample:
video_frames = self.frame_provider.video_for_episode(
record, int(self.config.max_video_frames_per_episode)
)
if video_frames:
content.extend(to_video_block(video_frames))
content.append({"type": "text", "text": prompt})
messages = [{"role": "user", "content": content}]
result = self.vlm.generate_json([messages])[0]
if not isinstance(result, dict):
logger.warning("vocabulary: VLM did not return a JSON object — skipping")
return None
vocab = Vocabulary.from_json(result)
if vocab.is_empty():
logger.warning("vocabulary: VLM returned an empty vocabulary — skipping")
return None
logger.info(
"vocabulary: discovered %d subtask labels + %d memory milestones from %d episodes",
len(vocab.subtasks),
len(vocab.memory_milestones),
len(sample),
)
return vocab
+17 -7
View File
@@ -40,6 +40,7 @@ from lerobot.annotations.steerable_pipeline.modules import (
) )
from lerobot.annotations.steerable_pipeline.validator import StagingValidator from lerobot.annotations.steerable_pipeline.validator import StagingValidator
from lerobot.annotations.steerable_pipeline.vlm_client import make_vlm_client from lerobot.annotations.steerable_pipeline.vlm_client import make_vlm_client
from lerobot.annotations.steerable_pipeline.vocabulary import VocabularyDiscoveryModule
from lerobot.annotations.steerable_pipeline.writer import LanguageColumnsWriter from lerobot.annotations.steerable_pipeline.writer import LanguageColumnsWriter
from lerobot.configs import parser from lerobot.configs import parser
@@ -88,6 +89,9 @@ def annotate(cfg: AnnotationPipelineConfig) -> None:
vlm=vlm, config=cfg.interjections, seed=cfg.seed, frame_provider=frame_provider vlm=vlm, config=cfg.interjections, seed=cfg.seed, frame_provider=frame_provider
) )
vqa = GeneralVqaModule(vlm=vlm, config=cfg.vqa, seed=cfg.seed, frame_provider=frame_provider) vqa = GeneralVqaModule(vlm=vlm, config=cfg.vqa, seed=cfg.seed, frame_provider=frame_provider)
vocabulary = VocabularyDiscoveryModule(
vlm=vlm, config=cfg.vocabulary, frame_provider=frame_provider
)
writer = LanguageColumnsWriter() writer = LanguageColumnsWriter()
validator = StagingValidator( validator = StagingValidator(
dataset_camera_keys=tuple(getattr(frame_provider, "camera_keys", []) or []) or None, dataset_camera_keys=tuple(getattr(frame_provider, "camera_keys", []) or []) or None,
@@ -98,6 +102,7 @@ def annotate(cfg: AnnotationPipelineConfig) -> None:
plan=plan, plan=plan,
interjections=interjections, interjections=interjections,
vqa=vqa, vqa=vqa,
vocabulary=vocabulary,
writer=writer, writer=writer,
validator=validator, validator=validator,
) )
@@ -140,7 +145,7 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None:
exist_ok=True, exist_ok=True,
) )
print(f"[lerobot-annotate] uploading {root} -> {repo_id}...", flush=True) print(f"[lerobot-annotate] uploading {root} -> {repo_id}...", flush=True)
api.upload_folder( commit_info = api.upload_folder(
folder_path=str(root), folder_path=str(root),
repo_id=repo_id, repo_id=repo_id,
repo_type="dataset", repo_type="dataset",
@@ -169,13 +174,18 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None:
version_tag = ds_version version_tag = ds_version
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
print(f"[lerobot-annotate] could not read codebase_version from info.json ({exc}); falling back to {version_tag}", flush=True) print(f"[lerobot-annotate] could not read codebase_version from info.json ({exc}); falling back to {version_tag}", flush=True)
revision = getattr(commit_info, "oid", None)
tag_kwargs = {
"repo_id": repo_id,
"tag": version_tag,
"repo_type": "dataset",
"exist_ok": True,
}
if revision is not None:
tag_kwargs["revision"] = revision
try: try:
api.create_tag( api.create_tag(**tag_kwargs)
repo_id=repo_id,
tag=version_tag,
repo_type="dataset",
exist_ok=True,
)
print(f"[lerobot-annotate] tagged {repo_id} as {version_tag}", flush=True) print(f"[lerobot-annotate] tagged {repo_id} as {version_tag}", flush=True)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
print( print(
+412
View File
@@ -0,0 +1,412 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Vocabulary-discovery phase (phase 0) tests."""
from __future__ import annotations
import json
from pathlib import Path
from lerobot.annotations.steerable_pipeline.config import (
PlanConfig,
VocabularyConfig,
)
from lerobot.annotations.steerable_pipeline.modules import PlanSubtasksMemoryModule
from lerobot.annotations.steerable_pipeline.reader import iter_episodes
from lerobot.annotations.steerable_pipeline.staging import EpisodeStaging
from lerobot.annotations.steerable_pipeline.vocabulary import (
Vocabulary,
VocabularyDiscoveryModule,
load_vocabulary,
save_vocabulary,
vocabulary_path,
)
from ._helpers import make_canned_responder
_CANONICAL_SUBTASKS = (
"grasp blue cube",
"place blue cube in box",
"retract arm",
)
_CANONICAL_MEMORY = (
"I picked up the blue cube.",
"I placed the blue cube in the box.",
)
# ---------------------------------------------------------------------------
# Vocabulary dataclass + on-disk round-trip
# ---------------------------------------------------------------------------
def test_vocabulary_roundtrip(tmp_path: Path) -> None:
vocab = Vocabulary(
subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY
)
save_path = save_vocabulary(tmp_path, vocab)
assert save_path == vocabulary_path(tmp_path)
assert save_path.exists()
loaded = load_vocabulary(tmp_path)
assert loaded is not None
assert loaded.subtasks == _CANONICAL_SUBTASKS
assert loaded.memory_milestones == _CANONICAL_MEMORY
def test_vocabulary_load_missing_returns_none(tmp_path: Path) -> None:
assert load_vocabulary(tmp_path) is None
def test_vocabulary_load_malformed_returns_none(tmp_path: Path) -> None:
path = vocabulary_path(tmp_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("{ not valid json", encoding="utf-8")
assert load_vocabulary(tmp_path) is None
def test_vocabulary_load_empty_payload_returns_none(tmp_path: Path) -> None:
path = vocabulary_path(tmp_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps({"subtasks": [], "memory_milestones": []}), encoding="utf-8")
assert load_vocabulary(tmp_path) is None
# ---------------------------------------------------------------------------
# Discovery module
# ---------------------------------------------------------------------------
def test_vocabulary_discovery_calls_vlm_and_returns_vocab(
fixture_dataset_root: Path,
) -> None:
vlm = make_canned_responder(
{
"canonical vocabulary": {
"subtasks": list(_CANONICAL_SUBTASKS),
"memory_milestones": list(_CANONICAL_MEMORY),
}
}
)
module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig(sample_episodes=2))
records = list(iter_episodes(fixture_dataset_root))
vocab = module.discover(records)
assert vocab is not None
assert vocab.subtasks == _CANONICAL_SUBTASKS
assert vocab.memory_milestones == _CANONICAL_MEMORY
def test_vocabulary_discovery_reuses_existing(fixture_dataset_root: Path) -> None:
"""``reuse_existing=True`` short-circuits the VLM call entirely."""
def _explode(_messages): # pragma: no cover - must not be called
raise AssertionError("VLM should not be invoked when reusing existing vocabulary")
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
vlm = StubVlmClient(responder=_explode)
module = VocabularyDiscoveryModule(
vlm=vlm, config=VocabularyConfig(reuse_existing=True)
)
records = list(iter_episodes(fixture_dataset_root))
existing = Vocabulary(subtasks=("a", "b"), memory_milestones=("I a.",))
vocab = module.discover(records, existing=existing)
assert vocab is existing
def test_vocabulary_discovery_empty_payload_returns_none(
fixture_dataset_root: Path,
) -> None:
vlm = make_canned_responder({"canonical vocabulary": {"subtasks": [], "memory_milestones": []}})
module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig())
records = list(iter_episodes(fixture_dataset_root))
assert module.discover(records) is None
# ---------------------------------------------------------------------------
# PlanSubtasksMemoryModule consumes the vocabulary
# ---------------------------------------------------------------------------
def test_plan_module_inlines_vocab_into_subtask_prompt(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
captured: list[str] = []
def responder(messages):
# Find the last user text block and stash it for inspection.
for message in messages:
content = message.get("content")
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
captured.append(block.get("text", ""))
# Return canned subtasks; pick the first two canonical strings so
# the validator accepts them.
return {
"subtasks": [
{"text": "grasp blue cube", "start": 0.0, "end": 0.4},
{"text": "place blue cube in box", "start": 0.4, "end": 0.9},
]
}
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
# The subtask prompt (and the memory prompt) carries the canonical
# bullet list so the VLM can't paraphrase them away.
assert any("Canonical subtask labels:" in t for t in captured)
assert any("grasp blue cube" in t for t in captured)
def test_plan_module_accepts_article_only_difference(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""Articles like 'the'/'a'/'an' are stripped during validation."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
def responder(_messages):
return {
"subtasks": [
# Same canonical phrase modulo "the" — should be accepted.
{"text": "grasp the blue cube", "start": 0.0, "end": 0.4},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
assert subtask_texts == ["grasp blue cube"]
def test_plan_module_retries_when_subtask_off_vocab(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""One-shot retry replaces an off-vocab paraphrase with the canonical form."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
call_count = {"n": 0}
def responder(messages):
call_count["n"] += 1
# First call: returns an off-vocab paraphrase.
if call_count["n"] == 1:
return {
"subtasks": [
# paraphrase, not in vocab
{"text": "pick up blue cube", "start": 0.0, "end": 0.4},
]
}
# Second call (the retry): should contain the correction prompt;
# respond with the canonical phrase exactly.
last_user_text = ""
for message in messages:
content = message.get("content")
if isinstance(content, str):
last_user_text = content
elif isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
last_user_text = block.get("text", "")
assert "NOT in the canonical vocabulary" in last_user_text
return {
"subtasks": [
{"text": "grasp blue cube", "start": 0.0, "end": 0.4},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
assert subtask_texts == ["grasp blue cube"]
# The retry must have fired exactly once.
assert call_count["n"] == 2
def test_plan_module_drops_off_vocab_subtask_after_retry(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""If the VLM stays off-vocab even after the retry, the bad span is dropped."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
call_count = {"n": 0}
def responder(_messages):
call_count["n"] += 1
# Both calls return the same off-vocab span — the model can't
# be corrected. The second call also returns one in-vocab span
# so the episode isn't empty; this lets us check that the
# off-vocab span is dropped without affecting the in-vocab one.
if call_count["n"] == 1:
return {
"subtasks": [
{"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4},
{"text": "grasp blue cube", "start": 0.4, "end": 0.9},
]
}
return {
"subtasks": [
{"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4},
{"text": "grasp blue cube", "start": 0.4, "end": 0.9},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
# Retry fired exactly once; bad span dropped, good span kept.
assert call_count["n"] == 2
assert subtask_texts == ["grasp blue cube"]
def test_plan_module_bumps_collocated_subtasks_to_distinct_frames(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""Two subtasks whose starts snap to the same frame get split onto two frames.
Without this guard, both spans would emit ``style=subtask`` rows at the
identical persistent timestamp; the training-time renderer's
``active_at(t, style=subtask)`` then raises an ambiguity error.
"""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
def responder(_messages):
# Two canonical labels with starts within one frame of each other —
# both snap to the same source frame, so the dedupe pass must bump
# the later one to the next frame.
return {
"subtasks": [
{"text": "grasp blue cube", "start": 0.40, "end": 0.42},
{"text": "place blue cube in box", "start": 0.41, "end": 0.50},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_rows = [r for r in rows if r["style"] == "subtask"]
# Both subtasks present, both on distinct timestamps.
assert len(subtask_rows) == 2
timestamps = [r["timestamp"] for r in subtask_rows]
assert len(set(timestamps)) == 2, f"subtask timestamps collide: {timestamps}"
# Order preserved: the chronologically earlier span keeps the earlier
# frame, the later one was bumped onto the next available frame.
assert subtask_rows[0]["content"] == "grasp blue cube"
assert subtask_rows[1]["content"] == "place blue cube in box"
assert subtask_rows[1]["timestamp"] > subtask_rows[0]["timestamp"]
def test_plan_module_empty_when_all_off_vocab_after_retry(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""All-off-vocab spans → episode comes out empty (no silent fuzzy snap)."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
def responder(_messages):
# Returns the same off-vocab spans on both attempts.
return {
"subtasks": [
{"text": "make a smoothie", "start": 0.0, "end": 0.4},
{"text": "consult the wizard", "start": 0.4, "end": 0.9},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
# No subtask gets fabricated — better to leave the episode empty
# so the operator notices the vocabulary gap than to silently
# warp the labels.
assert subtask_texts == []
def test_plan_module_without_vocab_passes_through(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""No vocabulary configured → original free-form behavior is preserved."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
def responder(_messages):
return {
"subtasks": [
{"text": "any free-form text the VLM wants", "start": 0.0, "end": 1.0},
]
}
vlm = StubVlmClient(responder=responder)
module = PlanSubtasksMemoryModule(
vlm=vlm, config=PlanConfig(n_task_rephrasings=0)
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
assert subtask_texts == ["any free-form text the VLM wants"]
+51
View File
@@ -0,0 +1,51 @@
#!/usr/bin/env python
import json
from types import SimpleNamespace
def test_push_to_hub_tags_uploaded_dataset_revision(tmp_path, monkeypatch):
from lerobot.scripts.lerobot_annotate import _push_to_hub
root = tmp_path / "dataset"
(root / "meta").mkdir(parents=True)
(root / "meta" / "info.json").write_text(json.dumps({"codebase_version": "v3.0"}))
calls = {}
class FakeHfApi:
def create_repo(self, **kwargs):
calls["create_repo"] = kwargs
def upload_folder(self, **kwargs):
calls["upload_folder"] = kwargs
return SimpleNamespace(oid="abc123")
def create_tag(self, **kwargs):
calls["create_tag"] = kwargs
monkeypatch.setattr("huggingface_hub.HfApi", FakeHfApi)
cfg = SimpleNamespace(
repo_id="source/dataset",
dest_repo_id="annotated/dataset",
push_private=True,
push_commit_message=None,
)
_push_to_hub(root, cfg)
assert calls["create_repo"] == {
"repo_id": "annotated/dataset",
"repo_type": "dataset",
"private": True,
"exist_ok": True,
}
assert calls["upload_folder"]["repo_id"] == "annotated/dataset"
assert calls["create_tag"] == {
"repo_id": "annotated/dataset",
"tag": "v3.0",
"repo_type": "dataset",
"exist_ok": True,
"revision": "abc123",
}