Files
lerobot/tests/annotations/test_pipeline_recipe_render.py
T
Pepijn 1ff10b935c Merge branch 'feat/language-annotation-pipeline' into feat/smolvla-on-steerable
Resolves conflicts from 66 commits on the base branch:

* pyproject.toml — keep base's transformers>=5.4.0,<5.6.0; add the
  sentencepiece-dep entry pi052 (FAST action tokenizer) needs.
* policies/__init__.py — keep pi052 export; drop the
  RewardClassifierConfig export that base removed.
* policies/factory.py — docstring list resolution (keep pi052; drop
  reward_classifier, removed by base).
* annotations/steerable_pipeline/executor.py — adopt base's renamed
  _ensure_annotation_metadata_in_info (it already advertises the say
  tool); drop pi052's older _ensure_tools_in_info call.
* configs/train.py — keep pi052's vqa_target_fraction; adopt base's
  SampleWeightingConfig (legacy RA-BC inline params already covered
  by the migration shim base added).
* scripts/lerobot_train.py — merge pi052's per-policy processor
  rebuild + dataset_repo_id pass-through with base's active_cfg /
  is_reward_model_training tightening, and re-route vqa-weighted
  sampler to active_cfg.drop_n_last_frames.
* datasets/language_render.py — adopt base's _select_one + timestamp
  tolerance (drops pi052's stale _select_latest / per-style sort_key).
* tests — adopt base's parametrized per-camera blend + tolerance
  test; drop pi052 tests that overlap with base's tighter rewrites;
  keep pi052's flow-only / VQA-blend coverage; add a
  test_canonical_recipe_loads check on subtask_mem_vqa_speech.yaml.
* policies/pi052/processor_pi052.py — import RenderMessagesStep
  directly from render_messages_processor (base intentionally
  dropped it from lerobot.processor's re-exports).
* uv.lock — regenerated cleanly from base + pi052's pocket-tts /
  beartype.

All 67 touched tests pass (30 pi052 + 37 recipe / language-render /
pipeline / render-messages).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 14:47:09 +02:00

182 lines
7.1 KiB
Python

#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""End-to-end smoke: pipeline output → PR 1 canonical recipe rendering."""
from __future__ import annotations
from pathlib import Path
import pyarrow.parquet as pq
from lerobot.annotations.steerable_pipeline.config import (
AnnotationPipelineConfig,
InterjectionsConfig,
PlanConfig,
VqaConfig,
)
from lerobot.annotations.steerable_pipeline.executor import Executor
from lerobot.annotations.steerable_pipeline.modules import (
GeneralVqaModule,
InterjectionsAndSpeechModule,
PlanSubtasksMemoryModule,
)
from lerobot.annotations.steerable_pipeline.validator import StagingValidator
from lerobot.annotations.steerable_pipeline.writer import LanguageColumnsWriter
from lerobot.configs.recipe import MessageTurn, TrainingRecipe
from lerobot.datasets.language_render import render_sample
from ._helpers import make_canned_responder
def _build_pr1_style_blend_recipe() -> TrainingRecipe:
"""Inline blend recipe that consumes every style this pipeline produces.
PR 1 used to ship ``src/lerobot/configs/recipes/pi05_hirobot.yaml`` as
a canonical example, but that file was dropped during PR 1 review. The
cross-PR contract this test guards is "the recipe DSL can render
non-empty messages from pipeline output", which doesn't require a
specific YAML — so we build the equivalent blend in code.
"""
return TrainingRecipe(
blend={
"low_level_execution": TrainingRecipe(
weight=0.35,
messages=[
MessageTurn(
role="user",
content="${task}\nPlan: ${plan}\nMemory: ${memory}",
stream="high_level",
),
MessageTurn(role="assistant", content="${subtask}", stream="low_level", target=True),
],
),
"user_interjection_response": TrainingRecipe(
weight=0.16,
bindings={
"speech": "emitted_at(t, role=assistant, tool_name=say)",
"interjection": "emitted_at(t, style=interjection)",
},
messages=[
MessageTurn(role="user", content="${task}", stream="high_level"),
MessageTurn(
role="user",
content="${interjection}",
stream="high_level",
if_present="interjection",
),
MessageTurn(
role="assistant",
content="${plan}",
stream="high_level",
target=True,
if_present="plan",
tool_calls_from="speech",
),
],
),
}
)
def _build_executor() -> Executor:
vlm = make_canned_responder(
{
"atomic subtasks": {
"subtasks": [
{"text": "grasp the bottle", "start": 0.0, "end": 0.5},
{"text": "pour into the cup", "start": 0.5, "end": 1.0},
{"text": "place the bottle down", "start": 1.0, "end": 1.5},
]
},
"concise hierarchical PLAN": {"plan": "1. grasp\n2. pour\n3. place"},
"Update the memory": {"memory": "poured once"},
"acknowledgement the robot": {"text": "Sure."},
"ONE realistic interruption": {
"interjection": "use less water",
"speech": "Using less water.",
},
"frame-grounded visual question": {
"question": "How many cups?",
"answer": {"label": "cup", "count": 1},
},
},
)
config = AnnotationPipelineConfig(
plan=PlanConfig(),
interjections=InterjectionsConfig(max_interjections_per_episode=1, interjection_min_t=0.5),
vqa=VqaConfig(vqa_emission_hz=1.0, K=2),
)
return Executor(
config=config,
plan=PlanSubtasksMemoryModule(vlm=vlm, config=config.plan),
interjections=InterjectionsAndSpeechModule(vlm=vlm, config=config.interjections, seed=config.seed),
vqa=GeneralVqaModule(vlm=vlm, config=config.vqa, seed=config.seed),
writer=LanguageColumnsWriter(),
validator=StagingValidator(),
)
def test_pr1_canonical_recipe_renders_nonempty_from_pipeline_output(
single_episode_root: Path,
) -> None:
executor = _build_executor()
summary = executor.run(single_episode_root)
# validator may emit warnings but no errors for the synthetic fixture
assert summary.validation_report.ok, summary.validation_report.summary()
table = pq.read_table(single_episode_root / "data" / "chunk-000" / "file-000.parquet")
persistent_lists = table.column("language_persistent").to_pylist()
events_lists = table.column("language_events").to_pylist()
timestamps = table.column("timestamp").to_pylist()
recipe = _build_pr1_style_blend_recipe()
rendered_any = False
for sample_idx, (ts, persistent, events) in enumerate(
zip(timestamps, persistent_lists, events_lists, strict=True)
):
result = render_sample(
recipe=recipe,
persistent=persistent,
events=events,
t=float(ts),
sample_idx=sample_idx,
dataset_ctx={"task": "Pour water from the bottle into the cup."},
)
if result is None:
continue
if result["messages"]:
rendered_any = True
# A valid render supervises something: a text-CE target turn
# OR a flow-only ``low_level``-stream turn (action loss).
assert (
result["target_message_indices"]
or "low_level" in result["message_streams"]
)
break
assert rendered_any, "recipe rendered no messages from pipeline output"
# Sanity: speech atom appears in events column intact
flat_events = [r for ev in events_lists for r in ev]
speech_rows = [r for r in flat_events if r.get("style") is None and r.get("role") == "assistant"]
assert speech_rows
say = speech_rows[0]["tool_calls"][0]
assert say["function"]["name"] == "say"
assert isinstance(say["function"]["arguments"]["text"], str)
# PR 2 no longer writes a ``tools`` column — the say schema lives as a
# constant (``SAY_TOOL_SCHEMA``) so PR 1's row struct is the single
# source of truth for the v3.1 schema.
assert "tools" not in table.column_names