annotations(steerable): structured action records + 5-axis task augmentation

EgoMimic-inspired additions to the plan module, both opt-in for back-compat.

1. PHASE 1a + 1b: per-subtask structured action records
   * cfg.action_records.enabled=True triggers, after Phase 1 subtask-span
     generation, one extra VLM call per subtask to extract a typed record:
       {verb, object, arm, grasp_type, destination, mistake}
   * A deterministic Python template (_render_action_record_to_subtask_text)
     renders the record back to canonical subtask text. When replace_subtask_
     text=True (default), this REPLACES the VLM's free-form text — eliminates
     cross-episode phrasing drift.
   * When emit_record_row=True (default), the structured record is also
     emitted as a row with style='action_record' (added to PERSISTENT_STYLES)
     so downstream training can consume the typed schema directly.
   * Verb + grasp vocabularies are configurable. Out-of-vocab values are
     rejected at extraction time.

2. STRUCTURED 5-AXIS TASK AUGMENTATION
   * cfg.task_aug_axes.enabled=True replaces the free-form n_task_rephrasings
     path with a structured prompt producing variants along 5 named axes:
       synonym_paraphrase (3)
       omit_arm           (3)
       omit_orientation   (2)
       omit_grasp_method  (2)
       combined_omissions (2)
     Total ~12 variants. Axes with nothing to omit emit fewer entries.
   * Each variant is emitted as a task_aug row at t=0 (existing style).

Inspired by https://github.com/GaTech-RL2/EgoVerse/tree/main/egomimic/scripts/language_process
— they pay Scale AI annotators to fill a structured form and then generate
language via a deterministic prompt. We get the same hallucination-reducing
structure via one extra VLM call per subtask.

Files:
  src/lerobot/datasets/language.py
  src/lerobot/annotations/steerable_pipeline/config.py
  src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py
  src/lerobot/annotations/steerable_pipeline/prompts/module_1_action_record.txt
  src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_aug_axes.txt

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-06-02 10:01:47 +02:00
parent 1e7c0d6aa1
commit 2bfaf44db2
5 changed files with 514 additions and 5 deletions
@@ -92,6 +92,136 @@ class PlanConfig:
use_video_url: bool = False
use_video_url_fps: float = 1.0
# Structured per-subtask action records (Phase 1a + 1b, inspired by
# EgoMimic's annotator form). For each generated subtask span, the
# VLM extracts a typed record (verb / object / arm / grasp_type /
# destination / mistake). A deterministic Python template renders
# that record back to canonical subtask text — reducing the VLM's
# "creative" surface to just the perception step. See
# ``ActionRecordsConfig`` for details. Off by default (back-compat).
action_records: "ActionRecordsConfig" = field(default_factory=lambda: ActionRecordsConfig())
# Structured 5-axis augmentation taxonomy for the t=0 task variants
# (replaces the free-form ``n_task_rephrasings`` flow when enabled).
# Mirrors EgoMimic's ``augment_prompt.txt`` taxonomy: instead of N
# free-form rephrasings, the VLM produces variants along named
# axes (synonym / omit_arm / omit_orientation / omit_grasp_method /
# combined). Off by default (back-compat).
task_aug_axes: "TaskAugAxesConfig" = field(default_factory=lambda: TaskAugAxesConfig())
@dataclass
class ActionRecordsConfig:
"""Structured per-subtask action record extraction.
When ``enabled=True``, after the existing subtask-span generation in
``plan_subtasks_memory.py``, the module makes one extra VLM call per
subtask to extract a typed record::
{
"verb": "pick" | "place" | "press" | ..., # closed vocabulary
"object": "<canonical_object_name>",
"arm": "left" | "right" | "both" | null,
"grasp_type": "pinch" | "wrap" | "hook" | ... | null,
"destination": "<canonical_destination>" | null,
"mistake": "<short text>" | null,
}
A deterministic Python template then renders the record back to
canonical subtask text (e.g. ``pick blue cube with left arm using
pinch grip``). When ``replace_subtask_text=True`` (default), the
rendered text REPLACES the VLM's free-form subtask text — eliminating
cross-episode phrasing drift. When ``emit_record_row=True``
(default), the structured record is also emitted as a row with
``style="action_record"`` so downstream consumers can train on the
typed schema directly.
Cost: one extra VLM call per subtask. For an 8-subtask episode this
means ~8x more VLM calls in the plan module — still cheap relative
to the action-expert training cost, but worth knowing.
"""
enabled: bool = False
# When True, replace the VLM-generated subtask text with the
# deterministic template's rendering of the structured record.
# Strongly recommended — it's the whole point of the structured
# intermediate. Set False to keep both representations side by side.
replace_subtask_text: bool = True
# When True, emit a separate row with ``style="action_record"`` and
# ``content=json.dumps(record)`` at the subtask's start timestamp.
# Lets downstream training consume the typed schema directly (e.g.
# auxiliary supervision on verb/arm/grasp classification heads).
emit_record_row: bool = True
# Frame sampling for the per-subtask VLM call (similar to the
# interjection module's window). Anchored to the subtask span.
frames_per_subtask: int = 4
# Closed verb vocabulary. The prompt instructs the VLM to pick
# exactly one. Override per-dataset (e.g. ``["pick", "place", "open",
# "close"]`` for door-only manipulation) for tighter constraint.
verb_vocabulary: tuple[str, ...] = (
"pick", "place", "push", "pull", "open", "close", "turn",
"press", "lift", "insert", "pour", "move", "reach", "grasp",
"release", "wipe", "dump",
)
# Closed grasp-type vocabulary. ``null`` is always allowed (no
# contact / unclear). Adjust per-hardware (e.g. drop ``hook`` /
# ``key`` for parallel-jaw grippers).
grasp_vocabulary: tuple[str, ...] = (
"pinch", "wrap", "hook", "key", "lateral",
)
@dataclass
class TaskAugAxesConfig:
"""Structured 5-axis augmentation taxonomy for t=0 task variants.
When ``enabled=True``, replaces the free-form ``n_task_rephrasings``
flow with a structured prompt that produces variants along five
named axes (mirroring EgoMimic's ``augment_prompt.txt``):
* ``synonym_paraphrase`` — different wording / verbs, all
information preserved.
* ``omit_arm`` — drop the left/right/both arm specification.
* ``omit_orientation`` — drop orientation cues (upright,
sideways, ...).
* ``omit_grasp_method`` — drop grip / grasp method specification.
* ``combined_omissions`` — combine two of the above
simultaneously.
Default counts (3+3+2+2+2 = 12 variants per task) match EgoMimic.
Axes that have nothing to omit in the source task (e.g. ``omit_arm``
when the task doesn't mention an arm) emit fewer entries rather
than pad — the prompt instructs the VLM accordingly.
Each variant is emitted as a ``task_aug`` row at ``t=0`` (same
style as the free-form variants), so the rest of the pipeline /
training recipe doesn't need to know about the taxonomy.
"""
enabled: bool = False
synonym_paraphrase: int = 3
omit_arm: int = 3
omit_orientation: int = 2
omit_grasp_method: int = 2
combined_omissions: int = 2
@property
def total(self) -> int:
"""Sum of requested variants across all axes (upper bound)."""
return (
self.synonym_paraphrase
+ self.omit_arm
+ self.omit_orientation
+ self.omit_grasp_method
+ self.combined_omissions
)
@dataclass
class InterjectionsConfig:
@@ -17,6 +17,7 @@
from __future__ import annotations
import json
import logging
from collections.abc import Sequence
from dataclasses import dataclass, field
@@ -28,6 +29,7 @@ from ..frames import (
FrameProvider,
VideoFrameProvider,
null_provider,
to_image_blocks,
to_video_block,
to_video_url_block,
)
@@ -78,13 +80,37 @@ class PlanSubtasksMemoryModule:
# ``task_aug`` rows at t=0 (role=user), one per rephrasing — the
# message renderer rotates ``${task}`` deterministically through
# them so the policy sees diverse phrasings during training.
# Two paths:
# * ``task_aug_axes.enabled=True`` — structured 5-axis taxonomy
# (synonym / omit_arm / omit_orientation / omit_grasp_method
# / combined). Replaces the free-form rephrasings flow.
# * Otherwise — free-form ``n_task_rephrasings`` (original).
t0 = float(record.frame_timestamps[0]) if record.frame_timestamps else 0.0
if self.config.n_task_rephrasings > 0 and effective_task:
axes_cfg = self.config.task_aug_axes
if axes_cfg.enabled and effective_task:
variants = self._generate_task_aug_by_axes(effective_task, axes_cfg)
seen: set[str] = set()
ordered = [effective_task, *variants]
for phrasing in ordered:
key = phrasing.strip()
if not key or key in seen:
continue
seen.add(key)
rows.append(
{
"role": "user",
"content": key,
"style": "task_aug",
"timestamp": t0,
"tool_calls": None,
}
)
elif self.config.n_task_rephrasings > 0 and effective_task:
rephrasings = self._generate_task_rephrasings(effective_task, n=self.config.n_task_rephrasings)
# Always include the effective task itself as the first variant
# so the rotation is guaranteed to cover the source-of-truth
# phrasing, not just synthetic alternatives.
seen: set[str] = set()
seen = set()
ordered = [effective_task, *rephrasings]
for phrasing in ordered:
key = phrasing.strip()
@@ -102,8 +128,31 @@ class PlanSubtasksMemoryModule:
)
subtask_spans = self._generate_subtasks(record, task=effective_task)
# subtask rows
for span in subtask_spans:
# ----------------------------------------------------------------
# Phase 1a + 1b: structured per-subtask action records
# ----------------------------------------------------------------
# When enabled, for every subtask span we ask the VLM for a typed
# ActionRecord (verb / object / arm / grasp_type / destination /
# mistake). A deterministic Python template renders the record
# back to canonical subtask text. The render replaces the
# free-form subtask text (cleaner conditioning) and the typed
# record is emitted as a separate row for downstream use.
records_cfg = self.config.action_records
action_records: list[dict[str, Any] | None] = [None] * len(subtask_spans)
if records_cfg.enabled and subtask_spans:
for i, span in enumerate(subtask_spans):
rec = self._extract_action_record(record, span, effective_task)
if rec is None:
continue
action_records[i] = rec
if records_cfg.replace_subtask_text:
canonical_text = self._render_action_record_to_subtask_text(rec)
if canonical_text:
span["text"] = canonical_text
# subtask rows (may now reflect canonical-rendered text)
for i, span in enumerate(subtask_spans):
rows.append(
{
"role": "assistant",
@@ -113,6 +162,16 @@ class PlanSubtasksMemoryModule:
"tool_calls": None,
}
)
if records_cfg.enabled and records_cfg.emit_record_row and action_records[i] is not None:
rows.append(
{
"role": "assistant",
"content": json.dumps(action_records[i], sort_keys=True),
"style": "action_record",
"timestamp": snap_to_frame(span["start"], record.frame_timestamps),
"tool_calls": None,
}
)
# Plan rows at every subtask boundary — including t=0 (start of
# the first subtask). Because the plan is just a numbered list
# of *still-todo* subtasks, re-emitting at each boundary makes
@@ -244,6 +303,202 @@ class PlanSubtasksMemoryModule:
out = [item.strip().strip('"').strip("'") for item in raw if isinstance(item, str)]
return [s for s in out if s][:n]
# ------------------------------------------------------------------
# Phase 1a + 1b: structured per-subtask action records
# ------------------------------------------------------------------
def _extract_action_record(
self,
record: EpisodeRecord,
span: dict[str, Any],
episode_task: str,
) -> dict[str, Any] | None:
"""Ask the VLM to extract a typed ``ActionRecord`` from a subtask span.
Sends ``frames_per_subtask`` frames uniformly sampled from
``[span.start, span.end]`` plus the canonical subtask text. The
VLM is constrained to verb + grasp vocabularies from the config
— invalid values are silently dropped at this layer (the
validator catches structural problems pre-write).
Returns ``None`` when the call fails or the VLM returns something
unrecognizable; callers fall back to the free-form subtask text.
"""
cfg = self.config.action_records
start_t = float(span.get("start", 0.0))
end_t = float(span.get("end", start_t))
duration = max(0.0, end_t - start_t)
# Uniform timestamps within the span; fall back to a single
# center frame for very short spans.
n = max(1, int(cfg.frames_per_subtask))
if n == 1 or duration <= 0.0:
timestamps = [0.5 * (start_t + end_t)]
else:
step = duration / (n - 1)
timestamps = [start_t + i * step for i in range(n)]
frames = self.frame_provider.frames_at(record, timestamps)
if not frames:
logger.debug(
"action_record: no frames at span %.2f-%.2f for ep %s; skipping",
start_t, end_t, record.episode_index,
)
return None
prompt = load_prompt("module_1_action_record").format(
episode_task=episode_task,
subtask_text=span.get("text", ""),
start_time=start_t,
end_time=end_t,
duration=duration,
n_frames=len(frames),
verb_vocabulary=", ".join(cfg.verb_vocabulary),
grasp_vocabulary=" | ".join(f'"{g}"' for g in cfg.grasp_vocabulary),
)
message = [
{
"role": "user",
"content": [*to_image_blocks(frames), {"type": "text", "text": prompt}],
}
]
result = self.vlm.generate_json([message])[0]
if not isinstance(result, dict):
return None
# Light validation + normalisation. Verb is required; everything
# else may be null. Verb / grasp_type are clamped to the
# vocabularies (out-of-vocab → reject or null).
verb = (result.get("verb") or "").strip().lower()
if not verb or verb not in {v.lower() for v in cfg.verb_vocabulary}:
return None
obj = (result.get("object") or "").strip()
if not obj:
return None
grasp = result.get("grasp_type")
if isinstance(grasp, str):
grasp = grasp.strip().lower()
if grasp not in {g.lower() for g in cfg.grasp_vocabulary}:
grasp = None
else:
grasp = None
arm = result.get("arm")
if isinstance(arm, str):
arm = arm.strip().lower()
if arm not in {"left", "right", "both"}:
arm = None
else:
arm = None
destination = result.get("destination")
destination = destination.strip() if isinstance(destination, str) and destination.strip() else None
mistake = result.get("mistake")
mistake = mistake.strip() if isinstance(mistake, str) and mistake.strip() else None
return {
"verb": verb,
"object": obj,
"arm": arm,
"grasp_type": grasp,
"destination": destination,
"mistake": mistake,
}
@staticmethod
def _render_action_record_to_subtask_text(record: dict[str, Any]) -> str:
"""Deterministic template: ``ActionRecord`` → canonical subtask text.
Mirrors the authoring guidance in ``module_1_subtasks.txt``:
imperative, drop articles / adverbs, use canonical object nouns,
append arm / grasp clauses only when present.
Examples (record → rendered text)::
{verb=pick, object=blue cube}
"pick blue cube"
{verb=pick, object=blue cube, arm=left, grasp_type=pinch}
"pick blue cube with left arm using pinch grip"
{verb=place, object=blue cube, destination=green box}
"place blue cube in green box"
{verb=move, object=mug, destination=stove}
"move mug to stove"
"""
verb = (record.get("verb") or "").strip().lower()
obj = (record.get("object") or "").strip()
arm = (record.get("arm") or "").strip().lower() if record.get("arm") else ""
grasp = (record.get("grasp_type") or "").strip().lower() if record.get("grasp_type") else ""
dest = (record.get("destination") or "").strip() if record.get("destination") else ""
if not verb:
return ""
parts: list[str] = [verb]
if obj:
parts.append(obj)
if dest:
# Pick a sensible preposition per verb family.
if verb in {"place", "put", "drop", "insert", "pour", "dump"}:
parts.append(f"in {dest}")
elif verb in {"move", "transport", "reach"}:
parts.append(f"to {dest}")
else:
parts.append(f"at {dest}")
if arm == "both":
parts.append("with both arms")
elif arm in {"left", "right"}:
parts.append(f"with {arm} arm")
if grasp:
parts.append(f"using {grasp} grip")
return " ".join(parts)
# ------------------------------------------------------------------
# Structured 5-axis task augmentation (EgoMimic-style taxonomy)
# ------------------------------------------------------------------
def _generate_task_aug_by_axes(self, base_task: str, axes_cfg: Any) -> list[str]:
"""One VLM call → variants along the 5-axis taxonomy.
Variants from all axes are flattened into a single list (the
downstream pipeline doesn't need to know about the per-axis
bucketing — every variant becomes a ``task_aug`` row). Order
is preserved for reproducibility: synonym_paraphrase first,
then omit_arm, then omit_orientation, then omit_grasp_method,
then combined_omissions.
"""
if not base_task:
return []
prompt = load_prompt("module_1_task_aug_axes").format(
base_task=base_task,
n_synonym=axes_cfg.synonym_paraphrase,
n_omit_arm=axes_cfg.omit_arm,
n_omit_orientation=axes_cfg.omit_orientation,
n_omit_grasp_method=axes_cfg.omit_grasp_method,
n_combined=axes_cfg.combined_omissions,
)
result = self.vlm.generate_json([self._text_message(prompt)])[0]
if not isinstance(result, dict):
return []
ordered_axes = (
"synonym_paraphrase",
"omit_arm",
"omit_orientation",
"omit_grasp_method",
"combined_omissions",
)
flat: list[str] = []
seen: set[str] = set()
for axis in ordered_axes:
entries = result.get(axis)
if not isinstance(entries, list):
continue
for item in entries:
if not isinstance(item, str):
continue
key = item.strip().strip('"').strip("'")
if not key or key in seen:
continue
seen.add(key)
flat.append(key)
return flat
def _episode_video_block(self, record: EpisodeRecord) -> list[dict[str, Any]]:
"""Same video block ``_generate_subtasks`` builds — extracted helper."""
if not record.frame_timestamps:
@@ -0,0 +1,64 @@
You are extracting a structured action record from a subtask span of a
teleoperated robot demonstration. This is Phase 1a of a two-step
process: you extract a typed record; a deterministic template then
renders it back to canonical subtask text. Your job is the PERCEPTION
step — not the language step.
The user originally asked: "{episode_task}"
The subtask span is: "{subtask_text}"
Span time window: [{start_time:.2f}s, {end_time:.2f}s]
({duration:.2f}s of robot activity)
You are shown {n_frames} frames sampled uniformly from the subtask
window. Fill in a structured record describing the action that takes
place between the first and last frame.
Hard rules:
- Use ONLY information visible in the frames. Do not infer details from
outside the span. Do not extrapolate from the original task wording.
- Use canonical object names from the original task VERBATIM. Never
introduce synonyms: if the task says "cube", the record says "cube",
never "block" / "object" / "item".
- For non-applicable fields, use ``null`` (not "n/a", not "none", not
an empty string).
- For ``verb`` and ``grasp_type``, pick EXACTLY one value from the
vocabulary below. Never invent a new one.
Field schema:
verb (required) — the imperative verb of the action. Vocabulary:
{verb_vocabulary}
object (required) — the manipulated object. Use the canonical noun
from the original task above.
arm — which arm performs the action. One of:
"left" | "right" | "both" | null
Use ``null`` when the source robot is single-arm or when the arm
is genuinely not visible in the frames.
grasp_type — which grip the gripper uses on contact. One of:
{grasp_vocabulary} | null
Use ``null`` when there is no contact in this span (e.g. a pure
``move`` / ``reach`` subtask) or the grip is genuinely unclear.
destination — the target location for actions like ``place``,
``move``, ``insert``, ``pour``. Use canonical names from the
original task. Use ``null`` for in-place actions (``press``,
``turn``, ``grasp``, ``release``).
mistake — a brief one-clause description of any visible failure or
recovery during the span (e.g. "dropped the cube and re-grasped",
"missed the target on first attempt"). Use ``null`` when the span
completes cleanly with no visible recovery.
Output strictly valid JSON of shape:
{{
"verb": "<one of vocabulary>",
"object": "<canonical noun>",
"arm": "left" | "right" | "both" | null,
"grasp_type": "<one of vocabulary>" | null,
"destination": "<canonical noun>" | null,
"mistake": "<short description>" | null
}}
@@ -0,0 +1,60 @@
You are generating structured augmentations of a robot task instruction
for training a language-conditioned policy. Unlike free-form rephrasing,
your variants follow a NAMED 5-axis taxonomy — each axis omits or varies
a specific element of the task while preserving its meaning.
Original task: "{base_task}"
Produce variants along five named axes. Each axis has a target count.
The whole batch should expose the policy to maximum linguistic diversity
WITHOUT changing what the robot is supposed to do.
Axes and target counts:
synonym_paraphrase ({n_synonym}):
Different wording / verbs / sentence structure. ALL information
from the original task is preserved — same object, same arm
specification if present, same orientation if present, same grasp
if present.
omit_arm ({n_omit_arm}):
Drop the left/right/both arm specification from the task. Skip
entirely (emit 0 entries) if the original task does NOT mention an
arm. Do not invent an arm specification just to omit it.
omit_orientation ({n_omit_orientation}):
Drop orientation cues (upright, sideways, facing the user,
long-edge-first, etc.). Skip entirely if no orientation cue is
present in the original task.
omit_grasp_method ({n_omit_grasp_method}):
Drop the grip / grasp method specification (pinch, wrap, hold by
the rim, etc.). Skip entirely if no grasp method is mentioned.
combined_omissions ({n_combined}):
Combine TWO of the above omissions simultaneously (e.g. drop both
arm and orientation). Skip entirely if fewer than two of (arm,
orientation, grasp_method) appear in the original task.
Hard rules:
- Each variant MUST preserve the core action and the target object.
Do not change which object is involved, the destination, or the
high-level action.
- Each variant is plain prose, no markdown, no quotes, no list numbers.
- Each variant must be DISTINCT from every other variant in the entire
output, both within and across axes. Near-duplicates are not allowed.
- If an axis cannot reach its target count because the original task
lacks the omittable element, emit fewer entries — do NOT pad the
axis with paraphrases that belong to a different axis.
- Variants should not all start with verbs — vary sentence structure
(some imperative, some polite request, some question).
Output strictly valid JSON of shape:
{{
"synonym_paraphrase": ["<v1>", "<v2>", ...],
"omit_arm": ["<v1>", "<v2>", ...],
"omit_orientation": ["<v1>", ...],
"omit_grasp_method": ["<v1>", ...],
"combined_omissions": ["<v1>", ...]
}}
+1 -1
View File
@@ -46,7 +46,7 @@ CORE_STYLES = {
EXTENDED_STYLES: set[str] = set()
STYLE_REGISTRY = CORE_STYLES | EXTENDED_STYLES
PERSISTENT_STYLES = {"subtask", "plan", "memory", "motion", "task_aug"}
PERSISTENT_STYLES = {"subtask", "plan", "memory", "motion", "task_aug", "action_record"}
EVENT_ONLY_STYLES = {"interjection", "vqa", "trace"}
# Styles whose ``content`` is grounded in a specific camera view. Rows of these