From 1b81e4921462feb83170d312af3c02e3ee1aab34 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Thu, 30 Apr 2026 16:49:51 +0200 Subject: [PATCH] feat(annotate): task rephrasings + video-derived task fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Module 1 now produces ``task_aug`` rows (registered in PR 1) so the PR-1 ``${task}`` resolver can rotate phrasings deterministically per ``sample_idx``. Plus an opt-in video-derived task that bypasses the canonical ``meta/tasks.parquet`` task when it's empty, low-quality, or explicitly disabled — every downstream Module-1 prompt then uses the derived task as its grounding. - ``Module1Config``: adds ``n_task_rephrasings`` (default 10) and ``derive_task_from_video`` ∈ ``{off, if_short, always}`` (default ``if_short``: triggers when canonical is empty, < 3 words, or matches a placeholder string like ``debug`` / ``unnamed`` / ``tbd``). - ``plan_subtasks_memory.py``: ``run_episode`` now resolves an ``effective_task`` (canonical OR video-derived) and threads it through ``_generate_subtasks`` / ``_generate_plan`` / ``_generate_memory`` so subtasks, plans, and memory are all grounded in the same task string. Then generates ``n`` rephrasings of the effective task and writes them as ``task_aug`` rows at ``t=0`` with ``role=user``. The effective task itself is included as the first variant so the rotation is guaranteed to cover the source-of-truth phrasing. - New prompts: ``module_1_video_task.txt`` (one-shot video → task), ``module_1_task_rephrasings.txt`` (text-only paraphraser, ``n`` per call). - ``meta/tasks.parquet`` is NOT modified — derived tasks live only in ``language_persistent``. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../annotations/steerable_pipeline/config.py | 29 +++- .../modules/plan_subtasks_memory.py | 157 +++++++++++++++++- .../prompts/module_1_task_rephrasings.txt | 32 ++++ .../prompts/module_1_video_task.txt | 17 ++ 4 files changed, 226 insertions(+), 9 deletions(-) create mode 100644 src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_rephrasings.txt create mode 100644 src/lerobot/annotations/steerable_pipeline/prompts/module_1_video_task.txt diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index 50372d8f2..678930784 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -23,7 +23,7 @@ from typing import Any @dataclass class Module1Config: - """Module 1 hyperparameters: plan + subtasks + memory. + """Module 1 hyperparameters: plan + subtasks + memory + task augmentation. Subtask decomposition sees the **whole episode** as one Qwen-VL video block — no keyframe stride or count: the model handles temporal pooling @@ -33,6 +33,33 @@ class Module1Config: """ enabled: bool = True + n_task_rephrasings: int = 10 + """Number of task rephrasings to generate at ``t=0`` as ``task_aug`` + persistent rows (PR 1 ``CORE_STYLES``). The renderer's ``${task}`` + binding rotates among them deterministically per ``sample_idx``, + realizing Xiao 2022 / CAST-style task-prompt diversity without + touching ``meta/tasks.parquet``. Set to 0 to disable.""" + derive_task_from_video: str = "if_short" + """When to bypass the user-provided ``record.episode_task`` and + derive a fresh task description from the episode video alone: + + - ``off`` never; always use the canonical task as the basis. + - ``if_short`` derive when the canonical task is empty, has fewer + than ``derive_task_min_words`` words, or matches a + placeholder string (``debug``, ``unnamed``, ``tbd``, + ...). Default — fixes noisy / placeholder tasks + without forcing derivation everywhere. + - ``always`` ignore the canonical task entirely; always derive + from the video. Useful when the dataset's task + labels are uniformly bad. + + The video-derived task replaces the canonical task as the basis for + subtask decomposition, plan, memory, AND the ``task_aug`` rephrasings, + so every downstream annotation is grounded in what's actually visible. + ``meta/tasks.parquet`` is NOT modified — the Module-1-derived task + only lives in ``language_persistent`` rows.""" + derive_task_min_words: int = 3 + """Word-count threshold for ``derive_task_from_video=if_short``.""" frames_per_second: float = 1.0 """Sample one image-frame per ``1/fps`` seconds across the episode for Module 1's subtask-decomposition prompt. ``1.0`` = 1 fps. Capped by diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index c125e3640..c48d888fb 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -71,7 +71,40 @@ class PlanSubtasksMemoryModule: def run_episode(self, record: EpisodeRecord, staging: EpisodeStaging) -> None: rows: list[dict[str, Any]] = [] - subtask_spans = self._generate_subtasks(record) + # Resolve the task that drives every other Module-1 prompt. May be + # the canonical ``record.episode_task`` (default), or a fresh + # description derived from the video when the canonical task is + # empty / placeholder / forced-off (see Module1Config.derive_task_*). + effective_task = self._resolve_effective_task(record) + # ``task_aug`` rows at t=0 (role=user), one per rephrasing — the + # PR 1 renderer rotates ``${task}`` deterministically through them + # so the policy sees diverse phrasings during training. + t0 = float(record.frame_timestamps[0]) if record.frame_timestamps else 0.0 + if self.config.n_task_rephrasings > 0 and effective_task: + rephrasings = self._generate_task_rephrasings( + effective_task, n=self.config.n_task_rephrasings + ) + # Always include the effective task itself as the first variant + # so the rotation is guaranteed to cover the source-of-truth + # phrasing, not just synthetic alternatives. + seen: set[str] = set() + ordered = [effective_task, *rephrasings] + for phrasing in ordered: + key = phrasing.strip() + if not key or key in seen: + continue + seen.add(key) + rows.append( + { + "role": "user", + "content": key, + "style": "task_aug", + "timestamp": t0, + "tool_calls": None, + } + ) + + subtask_spans = self._generate_subtasks(record, task=effective_task) # subtask rows for span in subtask_spans: rows.append( @@ -84,9 +117,8 @@ class PlanSubtasksMemoryModule: } ) # plan row at t=0 - plan_text = self._generate_plan(record, subtask_spans) + plan_text = self._generate_plan(record, subtask_spans, task=effective_task) if plan_text is not None: - t0 = record.frame_timestamps[0] if record.frame_timestamps else 0.0 rows.append( { "role": "assistant", @@ -101,7 +133,9 @@ class PlanSubtasksMemoryModule: for i, span in enumerate(subtask_spans[1:], start=1): completed = subtask_spans[i - 1]["text"] remaining = [s["text"] for s in subtask_spans[i:]] - mem_text = self._generate_memory(record, prior_memory, completed, remaining) + mem_text = self._generate_memory( + record, prior_memory, completed, remaining, task=effective_task + ) if mem_text: ts = _snap_to_frame(span["start"], record.frame_timestamps) rows.append( @@ -116,6 +150,108 @@ class PlanSubtasksMemoryModule: prior_memory = mem_text staging.write("module_1", rows) + # ------------------------------------------------------------------ + # Task derivation + rephrasings + # ------------------------------------------------------------------ + + _PLACEHOLDER_TASKS: frozenset[str] = frozenset( + { + "debug", + "test", + "tbd", + "todo", + "n/a", + "na", + "untitled", + "unnamed", + "default", + "placeholder", + } + ) + + def _resolve_effective_task(self, record: EpisodeRecord) -> str: + """Decide which task string drives Module 1 for this episode. + + Returns the user-supplied ``record.episode_task`` unless + ``derive_task_from_video`` says otherwise (see config docstring). + Falls back gracefully to the canonical task if video derivation + fails. + """ + canonical = (record.episode_task or "").strip() + mode = (self.config.derive_task_from_video or "off").strip().lower() + if mode == "always": + derived = self._derive_task_from_video(record) + return derived or canonical + if mode == "if_short" and self._task_seems_bad(canonical): + derived = self._derive_task_from_video(record) + if derived: + return derived + return canonical + + def _task_seems_bad(self, task: str) -> bool: + if not task: + return True + if len(task.split()) < int(self.config.derive_task_min_words): + return True + if task.lower() in self._PLACEHOLDER_TASKS: + return True + return False + + def _derive_task_from_video(self, record: EpisodeRecord) -> str | None: + """Ask the VLM "what is this video about" with no task hint at all.""" + prompt = load_prompt("module_1_video_task") + video_block = self._episode_video_block(record) + content = [*video_block, {"type": "text", "text": prompt}] + messages = [{"role": "user", "content": content}] + result = self.vlm.generate_json([messages])[0] + if isinstance(result, dict) and isinstance(result.get("task"), str): + text = result["task"].strip() + if text: + return text + return None + + def _generate_task_rephrasings(self, base_task: str, *, n: int) -> list[str]: + """Generate ``n`` text-only paraphrases of ``base_task``.""" + if n <= 0 or not base_task: + return [] + prompt = load_prompt("module_1_task_rephrasings").format( + base_task=base_task, n=n + ) + messages = [{"role": "user", "content": [{"type": "text", "text": prompt}]}] + result = self.vlm.generate_json([messages])[0] + if not isinstance(result, dict): + return [] + raw = result.get("rephrasings") + if not isinstance(raw, list): + return [] + out: list[str] = [] + for item in raw: + if isinstance(item, str): + cleaned = item.strip().strip('"').strip("'") + if cleaned: + out.append(cleaned) + return out[:n] + + def _episode_video_block(self, record: EpisodeRecord) -> list[dict[str, Any]]: + """Same video block ``_generate_subtasks`` builds — extracted helper.""" + if not record.frame_timestamps: + return [] + if self.config.use_video_url and isinstance(self.frame_provider, VideoFrameProvider): + cache_dir = Path(self.frame_provider.root) / ".annotate_staging" / ".video_clips" + clip = episode_clip_path(record, self.frame_provider, cache_dir) + return ( + to_video_url_block(f"file://{clip}", fps=self.config.use_video_url_fps) + if clip is not None + else [] + ) + episode_duration = record.frame_timestamps[-1] - record.frame_timestamps[0] + target_count = max( + 1, int(round(episode_duration * self.config.frames_per_second)) + ) + target_count = min(target_count, self.config.max_video_frames) + video_frames = self.frame_provider.video_for_episode(record, target_count) + return to_video_block(video_frames) + def run_plan_updates( self, record: EpisodeRecord, @@ -179,12 +315,14 @@ class PlanSubtasksMemoryModule: last_t = t return out - def _generate_subtasks(self, record: EpisodeRecord) -> list[dict[str, Any]]: + def _generate_subtasks( + self, record: EpisodeRecord, *, task: str | None = None + ) -> list[dict[str, Any]]: if record.row_count == 0 or not record.frame_timestamps: return [] episode_duration = record.frame_timestamps[-1] - record.frame_timestamps[0] prompt = load_prompt("module_1_subtasks").format( - episode_task=record.episode_task, + episode_task=(task if task is not None else record.episode_task), min_subtask_seconds=self.config.min_subtask_seconds, max_steps=self.config.plan_max_steps, episode_duration=f"{episode_duration:.3f}", @@ -239,12 +377,13 @@ class PlanSubtasksMemoryModule: *, refresh_t: float | None = None, interjection: str | None = None, + task: str | None = None, ) -> str | None: if not subtask_spans: return None subtasks_text = "\n".join(f"- {s['text']}" for s in subtask_spans) prompt = load_prompt("module_1_plan").format( - episode_task=record.episode_task, + episode_task=(task if task is not None else record.episode_task), subtasks_text=subtasks_text, plan_max_steps=self.config.plan_max_steps, ) @@ -288,9 +427,11 @@ class PlanSubtasksMemoryModule: prior_memory: str, completed: str, remaining: Sequence[str], + *, + task: str | None = None, ) -> str: prompt = load_prompt("module_1_memory").format( - episode_task=record.episode_task, + episode_task=(task if task is not None else record.episode_task), prior_memory=prior_memory or "(none)", completed_subtask=completed, remaining_subtasks=", ".join(remaining) if remaining else "(none)", diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_rephrasings.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_rephrasings.txt new file mode 100644 index 000000000..d03a6bf8b --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_rephrasings.txt @@ -0,0 +1,32 @@ +You are generating training data for a Hi Robot-style policy. We need +{n} alternative phrasings of the same robot task so the policy sees +diverse user prompts during training instead of the same canonical +string repeated every frame. + +Original task: +"{base_task}" + +Generate exactly {n} alternative phrasings of the same task. Vary: + +- formality (casual / polite / curt) +- verbosity (short imperative vs longer polite request) +- word choice (synonyms, different verbs) +- sentence structure (imperative / question / suggestion) + +Hard rules: +- Each phrasing MUST preserve the exact meaning of the original task. + Do not change which object is involved, the destination, or the + action. Do not add extra steps. Do not invent new objects. +- Each phrasing must be a single short sentence, plain prose, no + markdown, no quotes, no list numbers. +- Phrasings must be distinct — no near-duplicates. +- Output exactly {n} entries. + +Output strictly valid JSON: + {{ + "rephrasings": [ + "", + "", + ... + ] + }} diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_video_task.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_video_task.txt new file mode 100644 index 000000000..fcaae7046 --- /dev/null +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_video_task.txt @@ -0,0 +1,17 @@ +The video above shows a robot manipulation episode in full. Look at +the entire video and describe in ONE concise sentence what the robot +is doing. + +Rules: +- One sentence, in natural English, like a user instruction. +- Capture the goal of the demonstration, not low-level motions. + Example: "place the yellow cube into the red bin" — not "move the + end-effector down 5cm and close the gripper". +- 4 to 15 words. Plain prose, no markdown, no bullets, no quotes. +- Do not invent objects or actions that aren't visible. +- Do not output anything other than the JSON object below. + +Output strictly valid JSON: + {{ + "task": "" + }}