annotate(plan): condense verbose comments + docstrings

Trim the long inline comment blocks (effective_task / task_aug, action
records, plan-boundary rows, plan-update span closing, windowed +
coverage-stitch sections) and the _generate_plan / run_plan_updates
docstrings to a few lines each. No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-06-04 13:52:24 +02:00
parent 56cbb5f9ec
commit dbe02f0c4f
@@ -66,19 +66,12 @@ class PlanSubtasksMemoryModule:
def run_episode(self, record: EpisodeRecord, staging: EpisodeStaging) -> None:
rows: list[dict[str, Any]] = []
# Resolve the task that drives every other ``plan``-module prompt.
# May be the canonical ``record.episode_task`` (default), or a fresh
# description derived from the video when the canonical task is
# empty / placeholder / forced-off (see PlanConfig.derive_task_*).
# Task driving every plan-module prompt: canonical episode_task, or a
# video-derived one when it's empty/placeholder (see derive_task_*).
effective_task = self._resolve_effective_task(record)
# ``task_aug`` rows at t=0 (role=user), one per rephrasing — the
# message renderer rotates ``${task}`` deterministically through
# them so the policy sees diverse phrasings during training.
# Two paths:
# * ``task_aug_axes.enabled=True`` — structured 5-axis taxonomy
# (synonym / omit_arm / omit_orientation / omit_grasp_method
# / combined). Replaces the free-form rephrasings flow.
# * Otherwise — free-form ``n_task_rephrasings`` (original).
# task_aug rows at t=0: phrasings the renderer rotates ${task} through.
# Either the structured 5-axis taxonomy (task_aug_axes.enabled) or
# free-form n_task_rephrasings.
t0 = float(record.frame_timestamps[0]) if record.frame_timestamps else 0.0
axes_cfg = self.config.task_aug_axes
if axes_cfg.enabled and effective_task:
@@ -101,9 +94,8 @@ class PlanSubtasksMemoryModule:
)
elif self.config.n_task_rephrasings > 0 and effective_task:
rephrasings = self._generate_task_rephrasings(effective_task, n=self.config.n_task_rephrasings)
# Always include the effective task itself as the first variant
# so the rotation is guaranteed to cover the source-of-truth
# phrasing, not just synthetic alternatives.
# Include the effective task first so the rotation always covers
# the source-of-truth phrasing, not just synthetic ones.
seen = set()
ordered = [effective_task, *rephrasings]
for phrasing in ordered:
@@ -123,16 +115,10 @@ class PlanSubtasksMemoryModule:
subtask_spans = self._generate_subtasks(record, task=effective_task)
# ----------------------------------------------------------------
# Phase 1a: structured per-subtask action records (additive)
# ----------------------------------------------------------------
# When enabled, for every subtask span we ask the VLM for a typed
# ActionRecord (verb / object / arm / grasp_type / destination /
# mistake) and emit it as a separate ``style="action_record"``
# row for downstream use. This is purely additive — it never
# touches the VLM's subtask text (reconstructing subtask text
# from these fields was too easy to hallucinate on tasks that
# don't fit the manipulation schema).
# Phase 1a: optional per-subtask action records. When enabled, emit a
# typed ActionRecord (verb/object/arm/grasp_type/destination/mistake)
# per span as a separate style="action_record" row. Purely additive —
# never touches the subtask text.
records_cfg = self.config.action_records
action_records: list[dict[str, Any] | None] = [None] * len(subtask_spans)
if records_cfg.enabled and subtask_spans:
@@ -162,14 +148,10 @@ class PlanSubtasksMemoryModule:
"tool_calls": None,
}
)
# Plan rows at every subtask boundary including t=0 (start of
# the first subtask). Because the plan is just a numbered list
# of *still-todo* subtasks, re-emitting at each boundary makes
# the active plan shrink as work progresses: at frame t the
# rendered ``${plan}`` is the most recent emission, which
# contains exactly the subtasks that started at or after the
# current span. Saves the runtime from having to derive
# "what's still left" at inference time.
# Plan rows at every subtask boundary (incl. t=0). The plan is a
# numbered list of still-todo subtasks, so re-emitting at each
# boundary makes it shrink as work progresses — ${plan} at frame t is
# exactly what's left to do.
if self.config.emit_plan:
for span in subtask_spans:
boundary_t = snap_to_frame(span["start"], record.frame_timestamps)
@@ -252,9 +234,8 @@ class PlanSubtasksMemoryModule:
return task.lower() in self._PLACEHOLDER_TASKS
# ------------------------------------------------------------------
# VLM call helpers (factored out: every ``plan``-module prompt below follows
# the same "build messages → single VLM call → pull a named field"
# shape, only differing in field name + post-processing).
# VLM call helpers every plan-module prompt follows the same shape:
# build messages → single VLM call → pull a named field.
# ------------------------------------------------------------------
def _vlm_field(self, messages: list[dict[str, Any]], field: str) -> Any:
@@ -510,20 +491,15 @@ class PlanSubtasksMemoryModule:
) -> None:
"""Append additional ``plan`` rows at every interjection timestamp.
Plans refresh ONLY on user interjections — subtask generation
runs ~1 Hz at inference, but plan re-emission is event-driven.
Now also forwards the interjection's own text into the prompt so
the refreshed plan can actually reflect the user's correction
(the previous version told the model "an interjection happened"
without telling it what the user said).
Plans refresh ONLY on user interjections (event-driven). The
interjection text is forwarded into the prompt so the refreshed plan
reflects the user's correction.
"""
if not self.config.emit_plan:
return
existing = staging.read("plan")
# Pass the episode's last frame timestamp so the final subtask
# span is closed (otherwise its ``end`` equals its ``start``,
# zero duration, and the "current subtask at refresh_t" lookup
# in ``_generate_plan`` misses any refresh that lands inside it).
# Pass the last frame timestamp so the final span is closed (else its
# end == start, zero duration, and a refresh inside it is missed).
episode_end_t = float(record.frame_timestamps[-1]) if record.frame_timestamps else None
spans = reconstruct_subtask_spans(existing, episode_end_t=episode_end_t)
already_planned: set[float] = {float(r["timestamp"]) for r in existing if r.get("style") == "plan"}
@@ -571,12 +547,9 @@ class PlanSubtasksMemoryModule:
effective_task = task if task is not None else record.episode_task
# ---- Windowed path (constant temporal density) ---------------
# When ``subtask_window_seconds > 0`` and the episode is longer
# than one window, process the episode in fixed-length windows so
# the VLM always sees ``frames_per_second`` density (instead of a
# sparse 32-frame whole-episode view). Each window runs the full
# describe -> segment chain on its own frames; results are merged +
# stitched into a contiguous whole-episode cover.
# If subtask_window_seconds > 0 and the episode exceeds one window,
# process fixed-length windows so the VLM always sees
# frames_per_second density; results are merged + stitched.
window_s = float(getattr(self.config, "subtask_window_seconds", 0.0) or 0.0)
if window_s > 0.0 and episode_duration > window_s:
return self._generate_subtasks_windowed(record, effective_task, window_s)
@@ -609,12 +582,9 @@ class PlanSubtasksMemoryModule:
return []
# ---- Full-episode coverage stitch ----------------------------
# The VLM can leave the first subtask starting after t0 or leave
# gaps between spans, so the subtask timeline no longer tiles the
# whole episode and frames fall through with no active subtask.
# Always stitch the surviving spans into a contiguous cover of
# [t0, t_last] — there is no scenario where a sparse, gap-ridden
# subtask timeline is desirable for conditioning.
# The VLM can start after t0 or leave gaps, so frames fall through
# with no active subtask. Always stitch into a contiguous
# [t0, t_last] cover.
cleaned = self._stitch_full_coverage(cleaned, record)
return cleaned
@@ -841,25 +811,16 @@ class PlanSubtasksMemoryModule:
) -> str | None:
"""Deterministic plan = numbered list of *still-todo* subtasks.
Previously this called the VLM with a prompt that asked it to
compress the subtasks into a "compact hierarchical plan". That
produced longer-than-necessary plans, cost an extra VLM round-trip
per episode (plus one per interjection on refresh), and could
diverge from the actual subtask sequence the model is going to
execute. Replacing it with a plain summarisation keeps the plan
tightly aligned with the upcoming subtasks and removes the VLM
call entirely.
Layout — short imperative fragments prefixed by "N. ":
No VLM call: a plain numbered list keeps the plan aligned with the
upcoming subtasks (the old VLM "compact hierarchical plan" prompt
cost a round-trip per episode/refresh and could diverge).
1. <subtask 1>
2. <subtask 2>
...
On a refresh at ``refresh_t`` (called from ``run_plan_updates``
on interjection events, and from ``run_episode`` at every subtask
boundary), only subtasks whose start is at or after ``refresh_t``
are included — the plan shrinks as work progresses, so it always
On a refresh at ``refresh_t`` (from ``run_plan_updates`` on
interjections, and ``run_episode`` at each boundary), only subtasks
starting at or after ``refresh_t`` are included — so it always
describes what's left.
"""
if not subtask_spans: