mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-23 20:50:02 +00:00
336af85c09
When the canonical vocabulary is enabled and the VLM produces spans
that don't overlap any canonical label, the previous Jaccard-floor
(0.5) dropped them and the episode came out with no subtasks at all
— invisible to the downstream policy. Observed on
``pepijn223/super_poulain_vocab``: some episodes had empty subtask
columns because every VLM-emitted phrase scored below 0.5 against
the discovered vocabulary.
Two-pass canonicalisation:
- First pass keeps the Jaccard floor (lowered from 0.5 → 0.25, to
let mild paraphrases through) and drops everything below.
- If that first pass leaves the episode with **zero** subtasks,
fall back to a second pass that always snaps each VLM span to
its nearest canonical label by Jaccard (no floor). The episode
ends up with subtasks even when the vocabulary missed a phase
— a slightly-wrong canonical label is still closer to the right
motion than nothing at all.
- Log loudly when the fallback fires so the operator can spot
coverage gaps in ``meta/canonical_vocabulary.json``.
- Log a per-episode count at INFO when some (but not all) spans
were dropped so it's visible without spamming the run output.
Promote the Jaccard floor + ignore-tokens to class constants so
they're a single edit point. Add ``force=True`` parameter to
``_canonicalize_subtask`` for the no-floor fallback path.
New test ``test_plan_module_snaps_when_all_off_vocab`` covers the
fallback; existing ``test_plan_module_drops_off_vocab_subtask`` is
adjusted to keep at least one in-vocab span so the floor path can
still fire and is exercised. All 12 vocabulary tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
316 lines
12 KiB
Python
316 lines
12 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Vocabulary-discovery phase (phase 0) tests."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from lerobot.annotations.steerable_pipeline.config import (
|
|
PlanConfig,
|
|
VocabularyConfig,
|
|
)
|
|
from lerobot.annotations.steerable_pipeline.modules import PlanSubtasksMemoryModule
|
|
from lerobot.annotations.steerable_pipeline.reader import iter_episodes
|
|
from lerobot.annotations.steerable_pipeline.staging import EpisodeStaging
|
|
from lerobot.annotations.steerable_pipeline.vocabulary import (
|
|
Vocabulary,
|
|
VocabularyDiscoveryModule,
|
|
load_vocabulary,
|
|
save_vocabulary,
|
|
vocabulary_path,
|
|
)
|
|
|
|
from ._helpers import make_canned_responder
|
|
|
|
|
|
_CANONICAL_SUBTASKS = (
|
|
"grasp blue cube",
|
|
"place blue cube in box",
|
|
"retract arm",
|
|
)
|
|
_CANONICAL_MEMORY = (
|
|
"I picked up the blue cube.",
|
|
"I placed the blue cube in the box.",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Vocabulary dataclass + on-disk round-trip
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_vocabulary_roundtrip(tmp_path: Path) -> None:
|
|
vocab = Vocabulary(
|
|
subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY
|
|
)
|
|
save_path = save_vocabulary(tmp_path, vocab)
|
|
assert save_path == vocabulary_path(tmp_path)
|
|
assert save_path.exists()
|
|
|
|
loaded = load_vocabulary(tmp_path)
|
|
assert loaded is not None
|
|
assert loaded.subtasks == _CANONICAL_SUBTASKS
|
|
assert loaded.memory_milestones == _CANONICAL_MEMORY
|
|
|
|
|
|
def test_vocabulary_load_missing_returns_none(tmp_path: Path) -> None:
|
|
assert load_vocabulary(tmp_path) is None
|
|
|
|
|
|
def test_vocabulary_load_malformed_returns_none(tmp_path: Path) -> None:
|
|
path = vocabulary_path(tmp_path)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text("{ not valid json", encoding="utf-8")
|
|
assert load_vocabulary(tmp_path) is None
|
|
|
|
|
|
def test_vocabulary_load_empty_payload_returns_none(tmp_path: Path) -> None:
|
|
path = vocabulary_path(tmp_path)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps({"subtasks": [], "memory_milestones": []}), encoding="utf-8")
|
|
assert load_vocabulary(tmp_path) is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Discovery module
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_vocabulary_discovery_calls_vlm_and_returns_vocab(
|
|
fixture_dataset_root: Path,
|
|
) -> None:
|
|
vlm = make_canned_responder(
|
|
{
|
|
"canonical vocabulary": {
|
|
"subtasks": list(_CANONICAL_SUBTASKS),
|
|
"memory_milestones": list(_CANONICAL_MEMORY),
|
|
}
|
|
}
|
|
)
|
|
module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig(sample_episodes=2))
|
|
records = list(iter_episodes(fixture_dataset_root))
|
|
vocab = module.discover(records)
|
|
assert vocab is not None
|
|
assert vocab.subtasks == _CANONICAL_SUBTASKS
|
|
assert vocab.memory_milestones == _CANONICAL_MEMORY
|
|
|
|
|
|
def test_vocabulary_discovery_reuses_existing(fixture_dataset_root: Path) -> None:
|
|
"""``reuse_existing=True`` short-circuits the VLM call entirely."""
|
|
|
|
def _explode(_messages): # pragma: no cover - must not be called
|
|
raise AssertionError("VLM should not be invoked when reusing existing vocabulary")
|
|
|
|
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
|
|
|
vlm = StubVlmClient(responder=_explode)
|
|
module = VocabularyDiscoveryModule(
|
|
vlm=vlm, config=VocabularyConfig(reuse_existing=True)
|
|
)
|
|
records = list(iter_episodes(fixture_dataset_root))
|
|
existing = Vocabulary(subtasks=("a", "b"), memory_milestones=("I a.",))
|
|
vocab = module.discover(records, existing=existing)
|
|
assert vocab is existing
|
|
|
|
|
|
def test_vocabulary_discovery_empty_payload_returns_none(
|
|
fixture_dataset_root: Path,
|
|
) -> None:
|
|
vlm = make_canned_responder({"canonical vocabulary": {"subtasks": [], "memory_milestones": []}})
|
|
module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig())
|
|
records = list(iter_episodes(fixture_dataset_root))
|
|
assert module.discover(records) is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PlanSubtasksMemoryModule consumes the vocabulary
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_plan_module_inlines_vocab_into_subtask_prompt(
|
|
fixture_dataset_root: Path, tmp_path: Path
|
|
) -> None:
|
|
captured: list[str] = []
|
|
|
|
def responder(messages):
|
|
# Find the last user text block and stash it for inspection.
|
|
for message in messages:
|
|
content = message.get("content")
|
|
if isinstance(content, list):
|
|
for block in content:
|
|
if isinstance(block, dict) and block.get("type") == "text":
|
|
captured.append(block.get("text", ""))
|
|
# Return canned subtasks; pick the first two canonical strings so
|
|
# the validator accepts them.
|
|
return {
|
|
"subtasks": [
|
|
{"text": "grasp blue cube", "start": 0.0, "end": 0.4},
|
|
{"text": "place blue cube in box", "start": 0.4, "end": 0.9},
|
|
]
|
|
}
|
|
|
|
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
|
|
|
vlm = StubVlmClient(responder=responder)
|
|
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
|
module = PlanSubtasksMemoryModule(
|
|
vlm=vlm,
|
|
config=PlanConfig(n_task_rephrasings=0),
|
|
vocabulary=vocab,
|
|
)
|
|
record = next(iter_episodes(fixture_dataset_root))
|
|
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
|
module.run_episode(record, staging)
|
|
# The subtask prompt (and the memory prompt) carries the canonical
|
|
# bullet list so the VLM can't paraphrase them away.
|
|
assert any("Canonical subtask labels:" in t for t in captured)
|
|
assert any("grasp blue cube" in t for t in captured)
|
|
|
|
|
|
def test_plan_module_canonicalizes_paraphrased_subtask(
|
|
fixture_dataset_root: Path, tmp_path: Path
|
|
) -> None:
|
|
"""Off-vocab paraphrase with high token overlap snaps to canonical form."""
|
|
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
|
|
|
def responder(_messages):
|
|
return {
|
|
"subtasks": [
|
|
# paraphrase of "grasp blue cube" — overlapping tokens
|
|
{"text": "grasp the blue cube", "start": 0.0, "end": 0.4},
|
|
# paraphrase of "place blue cube in box" — high overlap
|
|
{"text": "place the blue cube into the box", "start": 0.4, "end": 0.9},
|
|
]
|
|
}
|
|
|
|
vlm = StubVlmClient(responder=responder)
|
|
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
|
module = PlanSubtasksMemoryModule(
|
|
vlm=vlm,
|
|
config=PlanConfig(n_task_rephrasings=0),
|
|
vocabulary=vocab,
|
|
)
|
|
record = next(iter_episodes(fixture_dataset_root))
|
|
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
|
module.run_episode(record, staging)
|
|
rows = staging.read("plan")
|
|
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
|
|
# Both paraphrases snapped onto canonical strings.
|
|
assert subtask_texts == ["grasp blue cube", "place blue cube in box"]
|
|
|
|
|
|
def test_plan_module_drops_off_vocab_subtask(
|
|
fixture_dataset_root: Path, tmp_path: Path
|
|
) -> None:
|
|
"""A subtask with low overlap to every canonical label is dropped.
|
|
|
|
Drop only kicks in when *at least one* other subtask survives — if
|
|
every span would be dropped the episode would come out empty, so
|
|
``_generate_subtasks`` falls back to snap-without-floor; that path
|
|
is exercised by ``test_plan_module_snaps_when_all_off_vocab``.
|
|
"""
|
|
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
|
|
|
def responder(_messages):
|
|
return {
|
|
"subtasks": [
|
|
# in-vocab — keeps the episode non-empty so the floor
|
|
# is allowed to drop the next span.
|
|
{"text": "grasp blue cube", "start": 0.0, "end": 0.4},
|
|
# off-vocab hallucination — no token overlap above the
|
|
# Jaccard floor; should be dropped.
|
|
{"text": "perform a fancy macarena dance", "start": 0.4, "end": 0.9},
|
|
]
|
|
}
|
|
|
|
vlm = StubVlmClient(responder=responder)
|
|
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
|
module = PlanSubtasksMemoryModule(
|
|
vlm=vlm,
|
|
config=PlanConfig(n_task_rephrasings=0),
|
|
vocabulary=vocab,
|
|
)
|
|
record = next(iter_episodes(fixture_dataset_root))
|
|
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
|
module.run_episode(record, staging)
|
|
rows = staging.read("plan")
|
|
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
|
|
assert subtask_texts == ["grasp blue cube"]
|
|
|
|
|
|
def test_plan_module_snaps_when_all_off_vocab(
|
|
fixture_dataset_root: Path, tmp_path: Path
|
|
) -> None:
|
|
"""All-off-vocab spans snap to nearest canonical instead of emptying the episode."""
|
|
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
|
|
|
def responder(_messages):
|
|
return {
|
|
"subtasks": [
|
|
# Both off-vocab — would normally be dropped. The
|
|
# fallback should snap each to its best canonical match
|
|
# rather than leave the episode with no subtasks at all.
|
|
{"text": "make a smoothie", "start": 0.0, "end": 0.4},
|
|
{"text": "consult the wizard", "start": 0.4, "end": 0.9},
|
|
]
|
|
}
|
|
|
|
vlm = StubVlmClient(responder=responder)
|
|
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
|
module = PlanSubtasksMemoryModule(
|
|
vlm=vlm,
|
|
config=PlanConfig(n_task_rephrasings=0),
|
|
vocabulary=vocab,
|
|
)
|
|
record = next(iter_episodes(fixture_dataset_root))
|
|
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
|
module.run_episode(record, staging)
|
|
rows = staging.read("plan")
|
|
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
|
|
# Two off-vocab spans → two canonical subtasks (snapped to nearest
|
|
# by Jaccard with no floor). The exact canonical choice doesn't
|
|
# matter — only that the episode came out with subtasks rather
|
|
# than empty.
|
|
assert len(subtask_texts) == 2
|
|
assert all(s in _CANONICAL_SUBTASKS for s in subtask_texts)
|
|
|
|
|
|
def test_plan_module_without_vocab_passes_through(
|
|
fixture_dataset_root: Path, tmp_path: Path
|
|
) -> None:
|
|
"""No vocabulary configured → original free-form behavior is preserved."""
|
|
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
|
|
|
def responder(_messages):
|
|
return {
|
|
"subtasks": [
|
|
{"text": "any free-form text the VLM wants", "start": 0.0, "end": 1.0},
|
|
]
|
|
}
|
|
|
|
vlm = StubVlmClient(responder=responder)
|
|
module = PlanSubtasksMemoryModule(
|
|
vlm=vlm, config=PlanConfig(n_task_rephrasings=0)
|
|
)
|
|
record = next(iter_episodes(fixture_dataset_root))
|
|
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
|
module.run_episode(record, staging)
|
|
rows = staging.read("plan")
|
|
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
|
|
assert subtask_texts == ["any free-form text the VLM wants"]
|