diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index da07d7998..cc6402f08 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -92,6 +92,136 @@ class PlanConfig: use_video_url: bool = False use_video_url_fps: float = 1.0 + # Structured per-subtask action records (Phase 1a + 1b, inspired by + # EgoMimic's annotator form). For each generated subtask span, the + # VLM extracts a typed record (verb / object / arm / grasp_type / + # destination / mistake). A deterministic Python template renders + # that record back to canonical subtask text — reducing the VLM's + # "creative" surface to just the perception step. See + # ``ActionRecordsConfig`` for details. Off by default (back-compat). + action_records: "ActionRecordsConfig" = field(default_factory=lambda: ActionRecordsConfig()) + + # Structured 5-axis augmentation taxonomy for the t=0 task variants + # (replaces the free-form ``n_task_rephrasings`` flow when enabled). + # Mirrors EgoMimic's ``augment_prompt.txt`` taxonomy: instead of N + # free-form rephrasings, the VLM produces variants along named + # axes (synonym / omit_arm / omit_orientation / omit_grasp_method / + # combined). Off by default (back-compat). + task_aug_axes: "TaskAugAxesConfig" = field(default_factory=lambda: TaskAugAxesConfig()) + + +@dataclass +class ActionRecordsConfig: + """Structured per-subtask action record extraction. + + When ``enabled=True``, after the existing subtask-span generation in + ``plan_subtasks_memory.py``, the module makes one extra VLM call per + subtask to extract a typed record:: + + { + "verb": "pick" | "place" | "press" | ..., # closed vocabulary + "object": "", + "arm": "left" | "right" | "both" | null, + "grasp_type": "pinch" | "wrap" | "hook" | ... | null, + "destination": "" | null, + "mistake": "" | null, + } + + A deterministic Python template then renders the record back to + canonical subtask text (e.g. ``pick blue cube with left arm using + pinch grip``). When ``replace_subtask_text=True`` (default), the + rendered text REPLACES the VLM's free-form subtask text — eliminating + cross-episode phrasing drift. When ``emit_record_row=True`` + (default), the structured record is also emitted as a row with + ``style="action_record"`` so downstream consumers can train on the + typed schema directly. + + Cost: one extra VLM call per subtask. For an 8-subtask episode this + means ~8x more VLM calls in the plan module — still cheap relative + to the action-expert training cost, but worth knowing. + """ + + enabled: bool = False + + # When True, replace the VLM-generated subtask text with the + # deterministic template's rendering of the structured record. + # Strongly recommended — it's the whole point of the structured + # intermediate. Set False to keep both representations side by side. + replace_subtask_text: bool = True + + # When True, emit a separate row with ``style="action_record"`` and + # ``content=json.dumps(record)`` at the subtask's start timestamp. + # Lets downstream training consume the typed schema directly (e.g. + # auxiliary supervision on verb/arm/grasp classification heads). + emit_record_row: bool = True + + # Frame sampling for the per-subtask VLM call (similar to the + # interjection module's window). Anchored to the subtask span. + frames_per_subtask: int = 4 + + # Closed verb vocabulary. The prompt instructs the VLM to pick + # exactly one. Override per-dataset (e.g. ``["pick", "place", "open", + # "close"]`` for door-only manipulation) for tighter constraint. + verb_vocabulary: tuple[str, ...] = ( + "pick", "place", "push", "pull", "open", "close", "turn", + "press", "lift", "insert", "pour", "move", "reach", "grasp", + "release", "wipe", "dump", + ) + + # Closed grasp-type vocabulary. ``null`` is always allowed (no + # contact / unclear). Adjust per-hardware (e.g. drop ``hook`` / + # ``key`` for parallel-jaw grippers). + grasp_vocabulary: tuple[str, ...] = ( + "pinch", "wrap", "hook", "key", "lateral", + ) + + +@dataclass +class TaskAugAxesConfig: + """Structured 5-axis augmentation taxonomy for t=0 task variants. + + When ``enabled=True``, replaces the free-form ``n_task_rephrasings`` + flow with a structured prompt that produces variants along five + named axes (mirroring EgoMimic's ``augment_prompt.txt``): + + * ``synonym_paraphrase`` — different wording / verbs, all + information preserved. + * ``omit_arm`` — drop the left/right/both arm specification. + * ``omit_orientation`` — drop orientation cues (upright, + sideways, ...). + * ``omit_grasp_method`` — drop grip / grasp method specification. + * ``combined_omissions`` — combine two of the above + simultaneously. + + Default counts (3+3+2+2+2 = 12 variants per task) match EgoMimic. + Axes that have nothing to omit in the source task (e.g. ``omit_arm`` + when the task doesn't mention an arm) emit fewer entries rather + than pad — the prompt instructs the VLM accordingly. + + Each variant is emitted as a ``task_aug`` row at ``t=0`` (same + style as the free-form variants), so the rest of the pipeline / + training recipe doesn't need to know about the taxonomy. + """ + + enabled: bool = False + + synonym_paraphrase: int = 3 + omit_arm: int = 3 + omit_orientation: int = 2 + omit_grasp_method: int = 2 + combined_omissions: int = 2 + + @property + def total(self) -> int: + """Sum of requested variants across all axes (upper bound).""" + return ( + self.synonym_paraphrase + + self.omit_arm + + self.omit_orientation + + self.omit_grasp_method + + self.combined_omissions + ) + @dataclass class InterjectionsConfig: diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index b9bae607e..46d678fd6 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -17,6 +17,7 @@ from __future__ import annotations +import json import logging from collections.abc import Sequence from dataclasses import dataclass, field @@ -28,6 +29,7 @@ from ..frames import ( FrameProvider, VideoFrameProvider, null_provider, + to_image_blocks, to_video_block, to_video_url_block, ) @@ -78,13 +80,37 @@ class PlanSubtasksMemoryModule: # ``task_aug`` rows at t=0 (role=user), one per rephrasing — the # message renderer rotates ``${task}`` deterministically through # them so the policy sees diverse phrasings during training. + # Two paths: + # * ``task_aug_axes.enabled=True`` — structured 5-axis taxonomy + # (synonym / omit_arm / omit_orientation / omit_grasp_method + # / combined). Replaces the free-form rephrasings flow. + # * Otherwise — free-form ``n_task_rephrasings`` (original). t0 = float(record.frame_timestamps[0]) if record.frame_timestamps else 0.0 - if self.config.n_task_rephrasings > 0 and effective_task: + axes_cfg = self.config.task_aug_axes + if axes_cfg.enabled and effective_task: + variants = self._generate_task_aug_by_axes(effective_task, axes_cfg) + seen: set[str] = set() + ordered = [effective_task, *variants] + for phrasing in ordered: + key = phrasing.strip() + if not key or key in seen: + continue + seen.add(key) + rows.append( + { + "role": "user", + "content": key, + "style": "task_aug", + "timestamp": t0, + "tool_calls": None, + } + ) + elif self.config.n_task_rephrasings > 0 and effective_task: rephrasings = self._generate_task_rephrasings(effective_task, n=self.config.n_task_rephrasings) # Always include the effective task itself as the first variant # so the rotation is guaranteed to cover the source-of-truth # phrasing, not just synthetic alternatives. - seen: set[str] = set() + seen = set() ordered = [effective_task, *rephrasings] for phrasing in ordered: key = phrasing.strip() @@ -102,8 +128,31 @@ class PlanSubtasksMemoryModule: ) subtask_spans = self._generate_subtasks(record, task=effective_task) - # subtask rows - for span in subtask_spans: + + # ---------------------------------------------------------------- + # Phase 1a + 1b: structured per-subtask action records + # ---------------------------------------------------------------- + # When enabled, for every subtask span we ask the VLM for a typed + # ActionRecord (verb / object / arm / grasp_type / destination / + # mistake). A deterministic Python template renders the record + # back to canonical subtask text. The render replaces the + # free-form subtask text (cleaner conditioning) and the typed + # record is emitted as a separate row for downstream use. + records_cfg = self.config.action_records + action_records: list[dict[str, Any] | None] = [None] * len(subtask_spans) + if records_cfg.enabled and subtask_spans: + for i, span in enumerate(subtask_spans): + rec = self._extract_action_record(record, span, effective_task) + if rec is None: + continue + action_records[i] = rec + if records_cfg.replace_subtask_text: + canonical_text = self._render_action_record_to_subtask_text(rec) + if canonical_text: + span["text"] = canonical_text + + # subtask rows (may now reflect canonical-rendered text) + for i, span in enumerate(subtask_spans): rows.append( { "role": "assistant", @@ -113,6 +162,16 @@ class PlanSubtasksMemoryModule: "tool_calls": None, } ) + if records_cfg.enabled and records_cfg.emit_record_row and action_records[i] is not None: + rows.append( + { + "role": "assistant", + "content": json.dumps(action_records[i], sort_keys=True), + "style": "action_record", + "timestamp": snap_to_frame(span["start"], record.frame_timestamps), + "tool_calls": None, + } + ) # Plan rows at every subtask boundary — including t=0 (start of # the first subtask). Because the plan is just a numbered list # of *still-todo* subtasks, re-emitting at each boundary makes @@ -244,6 +303,202 @@ class PlanSubtasksMemoryModule: out = [item.strip().strip('"').strip("'") for item in raw if isinstance(item, str)] return [s for s in out if s][:n] + # ------------------------------------------------------------------ + # Phase 1a + 1b: structured per-subtask action records + # ------------------------------------------------------------------ + + def _extract_action_record( + self, + record: EpisodeRecord, + span: dict[str, Any], + episode_task: str, + ) -> dict[str, Any] | None: + """Ask the VLM to extract a typed ``ActionRecord`` from a subtask span. + + Sends ``frames_per_subtask`` frames uniformly sampled from + ``[span.start, span.end]`` plus the canonical subtask text. The + VLM is constrained to verb + grasp vocabularies from the config + — invalid values are silently dropped at this layer (the + validator catches structural problems pre-write). + + Returns ``None`` when the call fails or the VLM returns something + unrecognizable; callers fall back to the free-form subtask text. + """ + cfg = self.config.action_records + start_t = float(span.get("start", 0.0)) + end_t = float(span.get("end", start_t)) + duration = max(0.0, end_t - start_t) + + # Uniform timestamps within the span; fall back to a single + # center frame for very short spans. + n = max(1, int(cfg.frames_per_subtask)) + if n == 1 or duration <= 0.0: + timestamps = [0.5 * (start_t + end_t)] + else: + step = duration / (n - 1) + timestamps = [start_t + i * step for i in range(n)] + frames = self.frame_provider.frames_at(record, timestamps) + if not frames: + logger.debug( + "action_record: no frames at span %.2f-%.2f for ep %s; skipping", + start_t, end_t, record.episode_index, + ) + return None + + prompt = load_prompt("module_1_action_record").format( + episode_task=episode_task, + subtask_text=span.get("text", ""), + start_time=start_t, + end_time=end_t, + duration=duration, + n_frames=len(frames), + verb_vocabulary=", ".join(cfg.verb_vocabulary), + grasp_vocabulary=" | ".join(f'"{g}"' for g in cfg.grasp_vocabulary), + ) + message = [ + { + "role": "user", + "content": [*to_image_blocks(frames), {"type": "text", "text": prompt}], + } + ] + result = self.vlm.generate_json([message])[0] + if not isinstance(result, dict): + return None + + # Light validation + normalisation. Verb is required; everything + # else may be null. Verb / grasp_type are clamped to the + # vocabularies (out-of-vocab → reject or null). + verb = (result.get("verb") or "").strip().lower() + if not verb or verb not in {v.lower() for v in cfg.verb_vocabulary}: + return None + obj = (result.get("object") or "").strip() + if not obj: + return None + grasp = result.get("grasp_type") + if isinstance(grasp, str): + grasp = grasp.strip().lower() + if grasp not in {g.lower() for g in cfg.grasp_vocabulary}: + grasp = None + else: + grasp = None + arm = result.get("arm") + if isinstance(arm, str): + arm = arm.strip().lower() + if arm not in {"left", "right", "both"}: + arm = None + else: + arm = None + destination = result.get("destination") + destination = destination.strip() if isinstance(destination, str) and destination.strip() else None + mistake = result.get("mistake") + mistake = mistake.strip() if isinstance(mistake, str) and mistake.strip() else None + + return { + "verb": verb, + "object": obj, + "arm": arm, + "grasp_type": grasp, + "destination": destination, + "mistake": mistake, + } + + @staticmethod + def _render_action_record_to_subtask_text(record: dict[str, Any]) -> str: + """Deterministic template: ``ActionRecord`` → canonical subtask text. + + Mirrors the authoring guidance in ``module_1_subtasks.txt``: + imperative, drop articles / adverbs, use canonical object nouns, + append arm / grasp clauses only when present. + + Examples (record → rendered text):: + + {verb=pick, object=blue cube} + → "pick blue cube" + {verb=pick, object=blue cube, arm=left, grasp_type=pinch} + → "pick blue cube with left arm using pinch grip" + {verb=place, object=blue cube, destination=green box} + → "place blue cube in green box" + {verb=move, object=mug, destination=stove} + → "move mug to stove" + """ + verb = (record.get("verb") or "").strip().lower() + obj = (record.get("object") or "").strip() + arm = (record.get("arm") or "").strip().lower() if record.get("arm") else "" + grasp = (record.get("grasp_type") or "").strip().lower() if record.get("grasp_type") else "" + dest = (record.get("destination") or "").strip() if record.get("destination") else "" + + if not verb: + return "" + + parts: list[str] = [verb] + if obj: + parts.append(obj) + if dest: + # Pick a sensible preposition per verb family. + if verb in {"place", "put", "drop", "insert", "pour", "dump"}: + parts.append(f"in {dest}") + elif verb in {"move", "transport", "reach"}: + parts.append(f"to {dest}") + else: + parts.append(f"at {dest}") + if arm == "both": + parts.append("with both arms") + elif arm in {"left", "right"}: + parts.append(f"with {arm} arm") + if grasp: + parts.append(f"using {grasp} grip") + return " ".join(parts) + + # ------------------------------------------------------------------ + # Structured 5-axis task augmentation (EgoMimic-style taxonomy) + # ------------------------------------------------------------------ + + def _generate_task_aug_by_axes(self, base_task: str, axes_cfg: Any) -> list[str]: + """One VLM call → variants along the 5-axis taxonomy. + + Variants from all axes are flattened into a single list (the + downstream pipeline doesn't need to know about the per-axis + bucketing — every variant becomes a ``task_aug`` row). Order + is preserved for reproducibility: synonym_paraphrase first, + then omit_arm, then omit_orientation, then omit_grasp_method, + then combined_omissions. + """ + if not base_task: + return [] + prompt = load_prompt("module_1_task_aug_axes").format( + base_task=base_task, + n_synonym=axes_cfg.synonym_paraphrase, + n_omit_arm=axes_cfg.omit_arm, + n_omit_orientation=axes_cfg.omit_orientation, + n_omit_grasp_method=axes_cfg.omit_grasp_method, + n_combined=axes_cfg.combined_omissions, + ) + result = self.vlm.generate_json([self._text_message(prompt)])[0] + if not isinstance(result, dict): + return [] + ordered_axes = ( + "synonym_paraphrase", + "omit_arm", + "omit_orientation", + "omit_grasp_method", + "combined_omissions", + ) + flat: list[str] = [] + seen: set[str] = set() + for axis in ordered_axes: + entries = result.get(axis) + if not isinstance(entries, list): + continue + for item in entries: + if not isinstance(item, str): + continue + key = item.strip().strip('"').strip("'") + if not key or key in seen: + continue + seen.add(key) + flat.append(key) + return flat + def _episode_video_block(self, record: EpisodeRecord) -> list[dict[str, Any]]: """Same video block ``_generate_subtasks`` builds — extracted helper.""" if not record.frame_timestamps: diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_action_record.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_action_record.txt new file mode 100644 index 000000000..1bd127048 --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_action_record.txt @@ -0,0 +1,64 @@ +You are extracting a structured action record from a subtask span of a +teleoperated robot demonstration. This is Phase 1a of a two-step +process: you extract a typed record; a deterministic template then +renders it back to canonical subtask text. Your job is the PERCEPTION +step — not the language step. + +The user originally asked: "{episode_task}" +The subtask span is: "{subtask_text}" +Span time window: [{start_time:.2f}s, {end_time:.2f}s] + ({duration:.2f}s of robot activity) + +You are shown {n_frames} frames sampled uniformly from the subtask +window. Fill in a structured record describing the action that takes +place between the first and last frame. + +Hard rules: +- Use ONLY information visible in the frames. Do not infer details from + outside the span. Do not extrapolate from the original task wording. +- Use canonical object names from the original task VERBATIM. Never + introduce synonyms: if the task says "cube", the record says "cube", + never "block" / "object" / "item". +- For non-applicable fields, use ``null`` (not "n/a", not "none", not + an empty string). +- For ``verb`` and ``grasp_type``, pick EXACTLY one value from the + vocabulary below. Never invent a new one. + +Field schema: + + verb (required) — the imperative verb of the action. Vocabulary: + {verb_vocabulary} + + object (required) — the manipulated object. Use the canonical noun + from the original task above. + + arm — which arm performs the action. One of: + "left" | "right" | "both" | null + Use ``null`` when the source robot is single-arm or when the arm + is genuinely not visible in the frames. + + grasp_type — which grip the gripper uses on contact. One of: + {grasp_vocabulary} | null + Use ``null`` when there is no contact in this span (e.g. a pure + ``move`` / ``reach`` subtask) or the grip is genuinely unclear. + + destination — the target location for actions like ``place``, + ``move``, ``insert``, ``pour``. Use canonical names from the + original task. Use ``null`` for in-place actions (``press``, + ``turn``, ``grasp``, ``release``). + + mistake — a brief one-clause description of any visible failure or + recovery during the span (e.g. "dropped the cube and re-grasped", + "missed the target on first attempt"). Use ``null`` when the span + completes cleanly with no visible recovery. + +Output strictly valid JSON of shape: + + {{ + "verb": "", + "object": "", + "arm": "left" | "right" | "both" | null, + "grasp_type": "" | null, + "destination": "" | null, + "mistake": "" | null + }} diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_aug_axes.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_aug_axes.txt new file mode 100644 index 000000000..d8cd13104 --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_aug_axes.txt @@ -0,0 +1,60 @@ +You are generating structured augmentations of a robot task instruction +for training a language-conditioned policy. Unlike free-form rephrasing, +your variants follow a NAMED 5-axis taxonomy — each axis omits or varies +a specific element of the task while preserving its meaning. + +Original task: "{base_task}" + +Produce variants along five named axes. Each axis has a target count. +The whole batch should expose the policy to maximum linguistic diversity +WITHOUT changing what the robot is supposed to do. + +Axes and target counts: + + synonym_paraphrase ({n_synonym}): + Different wording / verbs / sentence structure. ALL information + from the original task is preserved — same object, same arm + specification if present, same orientation if present, same grasp + if present. + + omit_arm ({n_omit_arm}): + Drop the left/right/both arm specification from the task. Skip + entirely (emit 0 entries) if the original task does NOT mention an + arm. Do not invent an arm specification just to omit it. + + omit_orientation ({n_omit_orientation}): + Drop orientation cues (upright, sideways, facing the user, + long-edge-first, etc.). Skip entirely if no orientation cue is + present in the original task. + + omit_grasp_method ({n_omit_grasp_method}): + Drop the grip / grasp method specification (pinch, wrap, hold by + the rim, etc.). Skip entirely if no grasp method is mentioned. + + combined_omissions ({n_combined}): + Combine TWO of the above omissions simultaneously (e.g. drop both + arm and orientation). Skip entirely if fewer than two of (arm, + orientation, grasp_method) appear in the original task. + +Hard rules: +- Each variant MUST preserve the core action and the target object. + Do not change which object is involved, the destination, or the + high-level action. +- Each variant is plain prose, no markdown, no quotes, no list numbers. +- Each variant must be DISTINCT from every other variant in the entire + output, both within and across axes. Near-duplicates are not allowed. +- If an axis cannot reach its target count because the original task + lacks the omittable element, emit fewer entries — do NOT pad the + axis with paraphrases that belong to a different axis. +- Variants should not all start with verbs — vary sentence structure + (some imperative, some polite request, some question). + +Output strictly valid JSON of shape: + + {{ + "synonym_paraphrase": ["", "", ...], + "omit_arm": ["", "", ...], + "omit_orientation": ["", ...], + "omit_grasp_method": ["", ...], + "combined_omissions": ["", ...] + }} diff --git a/src/lerobot/datasets/language.py b/src/lerobot/datasets/language.py index 124c25221..aaca34e23 100644 --- a/src/lerobot/datasets/language.py +++ b/src/lerobot/datasets/language.py @@ -46,7 +46,7 @@ CORE_STYLES = { EXTENDED_STYLES: set[str] = set() STYLE_REGISTRY = CORE_STYLES | EXTENDED_STYLES -PERSISTENT_STYLES = {"subtask", "plan", "memory", "motion", "task_aug"} +PERSISTENT_STYLES = {"subtask", "plan", "memory", "motion", "task_aug", "action_record"} EVENT_ONLY_STYLES = {"interjection", "vqa", "trace"} # Styles whose ``content`` is grounded in a specific camera view. Rows of these