From dbe02f0c4fdd73522cafaa9204f780dc551ac422 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Thu, 4 Jun 2026 13:52:24 +0200 Subject: [PATCH] annotate(plan): condense verbose comments + docstrings Trim the long inline comment blocks (effective_task / task_aug, action records, plan-boundary rows, plan-update span closing, windowed + coverage-stitch sections) and the _generate_plan / run_plan_updates docstrings to a few lines each. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../modules/plan_subtasks_memory.py | 107 ++++++------------ 1 file changed, 34 insertions(+), 73 deletions(-) diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index ac5c76453..d054f9eb5 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -66,19 +66,12 @@ class PlanSubtasksMemoryModule: def run_episode(self, record: EpisodeRecord, staging: EpisodeStaging) -> None: rows: list[dict[str, Any]] = [] - # Resolve the task that drives every other ``plan``-module prompt. - # May be the canonical ``record.episode_task`` (default), or a fresh - # description derived from the video when the canonical task is - # empty / placeholder / forced-off (see PlanConfig.derive_task_*). + # Task driving every plan-module prompt: canonical episode_task, or a + # video-derived one when it's empty/placeholder (see derive_task_*). effective_task = self._resolve_effective_task(record) - # ``task_aug`` rows at t=0 (role=user), one per rephrasing — the - # message renderer rotates ``${task}`` deterministically through - # them so the policy sees diverse phrasings during training. - # Two paths: - # * ``task_aug_axes.enabled=True`` — structured 5-axis taxonomy - # (synonym / omit_arm / omit_orientation / omit_grasp_method - # / combined). Replaces the free-form rephrasings flow. - # * Otherwise — free-form ``n_task_rephrasings`` (original). + # task_aug rows at t=0: phrasings the renderer rotates ${task} through. + # Either the structured 5-axis taxonomy (task_aug_axes.enabled) or + # free-form n_task_rephrasings. t0 = float(record.frame_timestamps[0]) if record.frame_timestamps else 0.0 axes_cfg = self.config.task_aug_axes if axes_cfg.enabled and effective_task: @@ -101,9 +94,8 @@ class PlanSubtasksMemoryModule: ) elif self.config.n_task_rephrasings > 0 and effective_task: rephrasings = self._generate_task_rephrasings(effective_task, n=self.config.n_task_rephrasings) - # Always include the effective task itself as the first variant - # so the rotation is guaranteed to cover the source-of-truth - # phrasing, not just synthetic alternatives. + # Include the effective task first so the rotation always covers + # the source-of-truth phrasing, not just synthetic ones. seen = set() ordered = [effective_task, *rephrasings] for phrasing in ordered: @@ -123,16 +115,10 @@ class PlanSubtasksMemoryModule: subtask_spans = self._generate_subtasks(record, task=effective_task) - # ---------------------------------------------------------------- - # Phase 1a: structured per-subtask action records (additive) - # ---------------------------------------------------------------- - # When enabled, for every subtask span we ask the VLM for a typed - # ActionRecord (verb / object / arm / grasp_type / destination / - # mistake) and emit it as a separate ``style="action_record"`` - # row for downstream use. This is purely additive — it never - # touches the VLM's subtask text (reconstructing subtask text - # from these fields was too easy to hallucinate on tasks that - # don't fit the manipulation schema). + # Phase 1a: optional per-subtask action records. When enabled, emit a + # typed ActionRecord (verb/object/arm/grasp_type/destination/mistake) + # per span as a separate style="action_record" row. Purely additive — + # never touches the subtask text. records_cfg = self.config.action_records action_records: list[dict[str, Any] | None] = [None] * len(subtask_spans) if records_cfg.enabled and subtask_spans: @@ -162,14 +148,10 @@ class PlanSubtasksMemoryModule: "tool_calls": None, } ) - # Plan rows at every subtask boundary — including t=0 (start of - # the first subtask). Because the plan is just a numbered list - # of *still-todo* subtasks, re-emitting at each boundary makes - # the active plan shrink as work progresses: at frame t the - # rendered ``${plan}`` is the most recent emission, which - # contains exactly the subtasks that started at or after the - # current span. Saves the runtime from having to derive - # "what's still left" at inference time. + # Plan rows at every subtask boundary (incl. t=0). The plan is a + # numbered list of still-todo subtasks, so re-emitting at each + # boundary makes it shrink as work progresses — ${plan} at frame t is + # exactly what's left to do. if self.config.emit_plan: for span in subtask_spans: boundary_t = snap_to_frame(span["start"], record.frame_timestamps) @@ -252,9 +234,8 @@ class PlanSubtasksMemoryModule: return task.lower() in self._PLACEHOLDER_TASKS # ------------------------------------------------------------------ - # VLM call helpers (factored out: every ``plan``-module prompt below follows - # the same "build messages → single VLM call → pull a named field" - # shape, only differing in field name + post-processing). + # VLM call helpers — every plan-module prompt follows the same shape: + # build messages → single VLM call → pull a named field. # ------------------------------------------------------------------ def _vlm_field(self, messages: list[dict[str, Any]], field: str) -> Any: @@ -510,20 +491,15 @@ class PlanSubtasksMemoryModule: ) -> None: """Append additional ``plan`` rows at every interjection timestamp. - Plans refresh ONLY on user interjections — subtask generation - runs ~1 Hz at inference, but plan re-emission is event-driven. - Now also forwards the interjection's own text into the prompt so - the refreshed plan can actually reflect the user's correction - (the previous version told the model "an interjection happened" - without telling it what the user said). + Plans refresh ONLY on user interjections (event-driven). The + interjection text is forwarded into the prompt so the refreshed plan + reflects the user's correction. """ if not self.config.emit_plan: return existing = staging.read("plan") - # Pass the episode's last frame timestamp so the final subtask - # span is closed (otherwise its ``end`` equals its ``start``, - # zero duration, and the "current subtask at refresh_t" lookup - # in ``_generate_plan`` misses any refresh that lands inside it). + # Pass the last frame timestamp so the final span is closed (else its + # end == start, zero duration, and a refresh inside it is missed). episode_end_t = float(record.frame_timestamps[-1]) if record.frame_timestamps else None spans = reconstruct_subtask_spans(existing, episode_end_t=episode_end_t) already_planned: set[float] = {float(r["timestamp"]) for r in existing if r.get("style") == "plan"} @@ -571,12 +547,9 @@ class PlanSubtasksMemoryModule: effective_task = task if task is not None else record.episode_task # ---- Windowed path (constant temporal density) --------------- - # When ``subtask_window_seconds > 0`` and the episode is longer - # than one window, process the episode in fixed-length windows so - # the VLM always sees ``frames_per_second`` density (instead of a - # sparse 32-frame whole-episode view). Each window runs the full - # describe -> segment chain on its own frames; results are merged + - # stitched into a contiguous whole-episode cover. + # If subtask_window_seconds > 0 and the episode exceeds one window, + # process fixed-length windows so the VLM always sees + # frames_per_second density; results are merged + stitched. window_s = float(getattr(self.config, "subtask_window_seconds", 0.0) or 0.0) if window_s > 0.0 and episode_duration > window_s: return self._generate_subtasks_windowed(record, effective_task, window_s) @@ -609,12 +582,9 @@ class PlanSubtasksMemoryModule: return [] # ---- Full-episode coverage stitch ---------------------------- - # The VLM can leave the first subtask starting after t0 or leave - # gaps between spans, so the subtask timeline no longer tiles the - # whole episode and frames fall through with no active subtask. - # Always stitch the surviving spans into a contiguous cover of - # [t0, t_last] — there is no scenario where a sparse, gap-ridden - # subtask timeline is desirable for conditioning. + # The VLM can start after t0 or leave gaps, so frames fall through + # with no active subtask. Always stitch into a contiguous + # [t0, t_last] cover. cleaned = self._stitch_full_coverage(cleaned, record) return cleaned @@ -841,25 +811,16 @@ class PlanSubtasksMemoryModule: ) -> str | None: """Deterministic plan = numbered list of *still-todo* subtasks. - Previously this called the VLM with a prompt that asked it to - compress the subtasks into a "compact hierarchical plan". That - produced longer-than-necessary plans, cost an extra VLM round-trip - per episode (plus one per interjection on refresh), and could - diverge from the actual subtask sequence the model is going to - execute. Replacing it with a plain summarisation keeps the plan - tightly aligned with the upcoming subtasks and removes the VLM - call entirely. - - Layout — short imperative fragments prefixed by "N. ": + No VLM call: a plain numbered list keeps the plan aligned with the + upcoming subtasks (the old VLM "compact hierarchical plan" prompt + cost a round-trip per episode/refresh and could diverge). 1. 2. - ... - On a refresh at ``refresh_t`` (called from ``run_plan_updates`` - on interjection events, and from ``run_episode`` at every subtask - boundary), only subtasks whose start is at or after ``refresh_t`` - are included — the plan shrinks as work progresses, so it always + On a refresh at ``refresh_t`` (from ``run_plan_updates`` on + interjections, and ``run_episode`` at each boundary), only subtasks + starting at or after ``refresh_t`` are included — so it always describes what's left. """ if not subtask_spans: