annotate: remove dead code, document CLI options, compact config

Dead code (defined but never referenced anywhere in src/tests/examples):
  * reader.py: keyframe_indices, episode_frame_timestamps, lookup_data_path,
    and the now-orphaned gather_data_paths + episode_offsets_per_path
    (lookup_data_path was their only caller).
  * staging.py: iter_staged_episodes.
  * writer.py: normalize_rows_for_writer.
  * config.py VlmConfig: json_mode, batch_size, tensor_parallel_size,
    gpu_memory_utilization, trust_remote_code — consumed only by the
    in-process vllm/transformers backends that were removed; the openai
    auto-serve path carries those vLLM flags via serve_command instead.
    Kept max_model_len (still used as the serve-command default).
  * config.py TaskAugAxesConfig.total property.

Docs: new 'Key options' section in annotation_pipeline.mdx — grouped
tables (dataset in/out, module toggles, --vlm.*, --plan.*, interjections
+ vqa) describing the flags users actually reach for, with defaults.

config.py: compact the verbose field comments + ActionRecordsConfig /
TaskAugAxesConfig docstrings; fix two stale 'verify' references (the
verify pass was removed — it's describe -> segment now) and the stale
'renders record back to subtask text' note (that path was removed).
vlm_client docstring no longer mentions the removed json_mode field.

Verified: tests/annotations + tests/datasets/test_language +
tests/scripts/test_lerobot_annotate (40 passed); pre-commit clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-06-04 14:05:46 +02:00
parent dbe02f0c4f
commit 20c7a12dd5
6 changed files with 111 additions and 197 deletions
+59
View File
@@ -117,6 +117,65 @@ To use a different dataset, model, or hub repo, edit the `CMD` block in
the script. Every flag there maps directly to a `lerobot-annotate` flag
(run `lerobot-annotate --help` for the full list).
## Key options
These are the flags you'll reach for most often. Run
`lerobot-annotate --help` for everything else; the defaults are tuned for
short manipulation episodes.
### Dataset in / out
| Flag | Default | What it does |
| ----------------- | ------- | ----------------------------------------------------------------------- |
| `--repo_id` | — | Hub dataset to annotate (downloaded if `--root` unset). |
| `--root` | — | Annotate a local dataset directory instead. |
| `--new_repo_id` | — | Push the result to a new repo (leaves the source repo untouched). |
| `--push_to_hub` | `false` | Upload after annotating (to `--new_repo_id`, else back to `--repo_id`). |
| `--only_episodes` | all | Annotate just these episode indices (handy for a test run). |
| `--seed` | `1729` | Seeds the RNGs that pick interjection timestamps + VQA question types. |
### Which modules run
Each module can be turned off independently to iterate on one at a time:
`--plan.enabled`, `--interjections.enabled`, `--vqa.enabled` (all
`true` by default).
### The VLM (`--vlm.*`)
| Flag | Default | What it does |
| -------------------------- | ------------------ | ----------------------------------------------------------------------------------- |
| `--vlm.model_id` | `Qwen/Qwen3.6-27B` | The model to serve and prompt. |
| `--vlm.camera_key` | first `images.*` | Which camera every prompt is grounded on. |
| `--vlm.serve_command` | auto | The exact `vllm serve …` command (set TP size, GPU memory, `--max-model-len` here). |
| `--vlm.parallel_servers` | `1` | Independent servers for round-robin routing (one per GPU). |
| `--vlm.num_gpus` | `0` | GPUs per server (`0` = one each). |
| `--vlm.client_concurrency` | `16` | In-flight requests across all servers. |
| `--vlm.max_new_tokens` | `512` | Generation cap per call. |
| `--vlm.temperature` | `0.2` | Sampling temperature. |
### Subtasks / plan / memory (`--plan.*`)
| Flag | Default | What it does |
| ------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------- |
| `--plan.frames_per_second` | `1.0` | How densely the episode video is sampled. |
| `--plan.max_video_frames` | `32` | Hard cap on frames per call (context-budget guard — don't exceed ~32 for a 32k context). |
| `--plan.subtask_window_seconds` | `0` | Split long episodes into fixed windows for constant frame density (`0` = whole episode). |
| `--plan.plan_max_steps` | `8` | Upper bound on subtasks per episode. |
| `--plan.subtask_describe_first` | `true` | Run the describe→segment grounding pass (best subtask quality; +1 call/episode). |
| `--plan.emit_plan` | `true` | Emit the numbered `plan` rows (`false` = subtasks + memory only). |
| `--plan.n_task_rephrasings` | `10` | How many `task_aug` rephrasings to emit (`0` disables). |
| `--plan.derive_task_from_video` | `if_short` | Use the dataset task as-is (`off`), only when it's missing/short (`if_short`), or always re-derive from video (`always`). |
| `--plan.use_video_url` | `false` | Send a server-side video clip instead of embedded frames. |
### Interjections + VQA
| Flag | Default | What it does |
| ----------------------------------------------- | ------- | ---------------------------------------------------------- |
| `--interjections.max_interjections_per_episode` | `3` | Cap on interjection/speech pairs per episode. |
| `--vqa.vqa_emission_hz` | `1.0` | How often VQA pairs are emitted. |
| `--vqa.restrict_to_default_camera` | `false` | Ground VQA only on `--vlm.camera_key` (else every camera). |
| `--executor.episode_parallelism` | `16` | Episodes processed concurrently within each phase. |
## Contributing new modules
The pipeline is built to grow, and **contributions are very welcome** —
@@ -44,78 +44,56 @@ class PlanConfig:
derive_task_from_video: str = "if_short"
derive_task_min_words: int = 3
# Frame sampling for the subtask-decomposition prompt. Frames are
# sampled uniformly across the whole episode up to ``max_video_frames``
# (so longer episodes are subsampled, not truncated).
#
# ``max_video_frames`` is a HARD context-budget cap. With the embedded-
# frame path (use_video_url=false), every frame becomes ~250-320 vision
# tokens, so 128 frames ≈ 33-39k tokens — over a 32k-context VLM. 32
# frames (~8-10k tokens) leaves ample room for the prompt + the
# describe / verify passes. Raise only if your serving context is
# larger AND your episodes need finer temporal resolution; if you hit
# "Input length exceeds maximum context length", lower this.
# Frames are sampled uniformly across the episode, capped at
# ``max_video_frames`` (a HARD context-budget cap, not an annotation
# knob). Each embedded frame is ~250-320 vision tokens, so 32 frames
# (~8-10k tokens) fit a 32k-context VLM; 128 would overflow it. Lower
# this if you hit "Input length exceeds maximum context length".
frames_per_second: float = 1.0
max_video_frames: int = 32
# Windowed subtask generation for CONSTANT temporal density. When > 0
# and an episode is longer than this many seconds, the plan module
# processes the episode in consecutive windows of this length, each
# sampled at ``frames_per_second``, instead of subsampling the whole
# episode to ``max_video_frames`` (which makes long episodes sparse).
# The describe -> segment -> verify chain runs per window; results are
# offset to absolute time, merged, and stitched into a contiguous
# whole-episode cover. Cost scales with episode length (≈ chain calls
# × ceil(duration / window)). Set to ~max_video_frames / frames_per_
# second (e.g. 32s at 1 fps) so each window fills — but never exceeds —
# the per-call frame budget. 0 disables (single whole-episode call).
# and the episode is longer than this, the plan module processes it in
# consecutive windows of this length (each sampled at
# ``frames_per_second``) instead of subsampling the whole episode to a
# sparse ``max_video_frames``. The describe -> segment chain runs per
# window; spans are merged + stitched. Set to ~max_video_frames /
# frames_per_second (e.g. 32s at 1 fps). 0 disables.
subtask_window_seconds: float = 0.0
min_subtask_seconds: float = 1.5
plan_max_steps: int = 8
# ``subtask_describe_first``: run a grounding pass that narrates ONLY
# what is visible in the video (no subtask JSON yet), then inject that
# description into the segmentation prompt. Forces the model to observe
# before committing to structured output — the strongest lever against
# subtasks invented from the task text. ON by default; +1 VLM call/ep.
# Set False to trade quality for fewer calls on easy datasets.
# Run a grounding pass that narrates ONLY what's visible (no subtask
# JSON yet), then feed that into the segmentation prompt — the strongest
# lever against subtasks invented from the task text. ON by default;
# +1 VLM call/episode. False trades quality for fewer calls.
subtask_describe_first: bool = True
# Emit ``style="plan"`` rows (the numbered still-todo list re-emitted at
# every subtask boundary). Set False to keep only subtasks + memory and
# skip the plan rows entirely — saves one ``_generate_plan`` VLM call per
# subtask boundary. Subtask and memory generation are unaffected.
# Emit ``style="plan"`` rows (the numbered still-todo list, re-emitted at
# every subtask boundary). False keeps only subtasks + memory and skips
# the per-boundary ``_generate_plan`` call.
emit_plan: bool = True
# NOTE: subtask spans are ALWAYS stitched into a contiguous
# full-episode cover (first subtask pulled back to t0, gaps closed,
# last span extended to t_last) as a deterministic post-step in
# ``_generate_subtasks._stitch_full_coverage``. This is not
# configurable — a sparse / gap-ridden subtask timeline is never
# desirable for conditioning, so it is unconditional.
# NOTE: subtask spans are ALWAYS stitched into a contiguous full-episode
# cover (see ``_stitch_full_coverage``) — not configurable, since a
# sparse / gap-ridden timeline is never useful for conditioning.
# When True (and backend supports it, e.g. ``openai``), the ``plan``
# module sends a ``video_url`` block pointing at a per-episode mp4
# subclip and lets the server sample frames at ``use_video_url_fps``.
# When True (with a backend that supports it, e.g. ``openai``), send a
# ``video_url`` block pointing at a per-episode mp4 subclip and let the
# server sample frames at ``use_video_url_fps``.
use_video_url: bool = False
use_video_url_fps: float = 1.0
# Structured per-subtask action records (Phase 1a + 1b, inspired by
# EgoMimic's annotator form). For each generated subtask span, the
# VLM extracts a typed record (verb / object / arm / grasp_type /
# destination / mistake). A deterministic Python template renders
# that record back to canonical subtask text — reducing the VLM's
# "creative" surface to just the perception step. See
# ``ActionRecordsConfig`` for details. Off by default (back-compat).
# Optional structured per-subtask action records (EgoMimic-style). When
# enabled, the VLM extracts a typed record per subtask span; see
# ``ActionRecordsConfig``. Purely additive — off by default.
action_records: ActionRecordsConfig = field(default_factory=lambda: ActionRecordsConfig())
# Structured 5-axis augmentation taxonomy for the t=0 task variants
# (replaces the free-form ``n_task_rephrasings`` flow when enabled).
# Mirrors EgoMimic's ``augment_prompt.txt`` taxonomy: instead of N
# free-form rephrasings, the VLM produces variants along named
# axes (synonym / omit_arm / omit_orientation / omit_grasp_method /
# combined). Off by default (back-compat).
# Optional 5-axis task-augmentation taxonomy for the t=0 variants
# (EgoMimic-style: synonym / omit_arm / omit_orientation /
# omit_grasp_method / combined). Replaces the free-form
# ``n_task_rephrasings`` flow when enabled; see ``TaskAugAxesConfig``.
task_aug_axes: TaskAugAxesConfig = field(default_factory=lambda: TaskAugAxesConfig())
@@ -123,9 +101,8 @@ class PlanConfig:
class ActionRecordsConfig:
"""Structured per-subtask action record extraction.
When ``enabled=True``, after the existing subtask-span generation in
``plan_subtasks_memory.py``, the module makes one extra VLM call per
subtask to extract a typed record::
When ``enabled=True``, after subtask-span generation the module makes
one extra VLM call per subtask to extract a typed record::
{
"verb": "pick" | "place" | "press" | ..., # closed vocabulary
@@ -136,20 +113,13 @@ class ActionRecordsConfig:
"mistake": "<short text>" | null,
}
The record is emitted as a separate row with ``style="action_record"``
(``content=json.dumps(record)``) at the subtask's start timestamp.
It is PURELY ADDITIVE — it never touches the VLM's subtask text.
Downstream training can consume the typed schema directly (e.g.
auxiliary supervision on verb / arm / grasp classification heads)
while the subtask string the policy conditions on stays exactly what
the subtask module produced. (Reconstructing subtask text from these
fields was too easy for the VLM to hallucinate on tasks that don't
fit the manipulation schema — navigation tasks yielded nonsense like
``move stove to stove`` — so that path was removed.)
Emitted as a separate ``style="action_record"`` row at the subtask's
start timestamp. PURELY ADDITIVE — it never touches the subtask text,
so downstream training can use the typed schema (e.g. auxiliary
verb/arm/grasp heads) while the conditioning string stays unchanged.
Cost: one extra VLM call per subtask. For an 8-subtask episode this
means ~8x more VLM calls in the plan module — still cheap relative
to the action-expert training cost, but worth knowing.
Cost: one extra VLM call per subtask (~8x plan-module calls on an
8-subtask episode).
"""
enabled: bool = False
@@ -204,26 +174,14 @@ class TaskAugAxesConfig:
"""Structured 5-axis augmentation taxonomy for t=0 task variants.
When ``enabled=True``, replaces the free-form ``n_task_rephrasings``
flow with a structured prompt that produces variants along five
named axes (mirroring EgoMimic's ``augment_prompt.txt``):
flow with variants along five named axes (EgoMimic-style):
``synonym_paraphrase`` (reword, keep all info), ``omit_arm``,
``omit_orientation``, ``omit_grasp_method``, and ``combined_omissions``
(drop two at once).
* ``synonym_paraphrase`` — different wording / verbs, all
information preserved.
* ``omit_arm`` — drop the left/right/both arm specification.
* ``omit_orientation`` — drop orientation cues (upright,
sideways, ...).
* ``omit_grasp_method`` — drop grip / grasp method specification.
* ``combined_omissions`` — combine two of the above
simultaneously.
Default counts (3+3+2+2+2 = 12 variants per task) match EgoMimic.
Axes that have nothing to omit in the source task (e.g. ``omit_arm``
when the task doesn't mention an arm) emit fewer entries rather
than pad — the prompt instructs the VLM accordingly.
Each variant is emitted as a ``task_aug`` row at ``t=0`` (same
style as the free-form variants), so the rest of the pipeline /
training recipe doesn't need to know about the taxonomy.
Default counts (3+3+2+2+2 = 12) match EgoMimic. Axes with nothing to
omit emit fewer entries rather than pad. Each variant becomes a
``task_aug`` row at ``t=0``, identical in style to the free-form ones.
"""
enabled: bool = False
@@ -234,17 +192,6 @@ class TaskAugAxesConfig:
omit_grasp_method: int = 2
combined_omissions: int = 2
@property
def total(self) -> int:
"""Sum of requested variants across all axes (upper bound)."""
return (
self.synonym_paraphrase
+ self.omit_arm
+ self.omit_orientation
+ self.omit_grasp_method
+ self.combined_omissions
)
@dataclass
class InterjectionsConfig:
@@ -326,16 +273,11 @@ class VlmConfig:
max_new_tokens: int = 512
temperature: float = 0.2
json_mode: bool = True
batch_size: int = 4
tensor_parallel_size: int = 1
# Fraction of GPU memory vllm allocates for weights + KV cache.
gpu_memory_utilization: float = 0.9
# Cap context length (None = model default). On 80 GB H100 a 30B BF16
# model often needs <= 8192 to leave KV-cache headroom.
# Context length for the auto-spawned vLLM server (None → 32768). vLLM
# tuning flags (tensor-parallel size, GPU memory fraction, ...) go in
# ``serve_command`` directly, not here.
max_model_len: int | None = None
trust_remote_code: bool = False
# Override the camera stream used for keyframe attachment. None picks
# the first ``observation.images.*`` key the dataset declares.
@@ -214,61 +214,3 @@ def _iter_one_path(path: Path, tasks: dict[int, str], only_set: set[int] | None)
rec = _build(cur_ep, start_offset, len(episode_col), cur_task_idx, ts_buf, fi_buf)
if rec is not None:
yield rec
def gather_data_paths(root: Path) -> list[Path]:
"""Return every ``data/chunk-*/file-*.parquet`` path under ``root``."""
return sorted((root / "data").rglob("*.parquet"))
def episode_offsets_per_path(path: Path) -> dict[int, tuple[int, int]]:
"""Return ``{episode_index: (row_offset, row_count)}`` for one parquet."""
table = pq.read_table(path, columns=["episode_index"])
episode_col = table.column("episode_index").to_pylist()
out: dict[int, tuple[int, int]] = {}
cur_ep: int | None = None
start = 0
for i, ep in enumerate(episode_col):
if cur_ep is None:
cur_ep = ep
start = i
continue
if ep != cur_ep:
out[cur_ep] = (start, i - start)
cur_ep = ep
start = i
if cur_ep is not None:
out[cur_ep] = (start, len(episode_col) - start)
return out
def keyframe_indices(record: EpisodeRecord, k: int) -> list[int]:
"""Return ``k`` evenly spaced row indices into the episode (relative)."""
n = record.row_count
if k <= 0 or n == 0:
return []
if k >= n:
return list(range(n))
step = (n - 1) / (k - 1) if k > 1 else 0.0
return [int(round(i * step)) for i in range(k)] if k > 1 else [n // 2]
def lookup_data_path(root: Path, episode_index: int) -> tuple[Path, int, int] | None:
"""Find the parquet file containing ``episode_index`` and its slice bounds."""
for path in gather_data_paths(root):
offsets = episode_offsets_per_path(path)
if episode_index in offsets:
start, count = offsets[episode_index]
return path, start, count
return None
def episode_frame_timestamps(root: Path, episode_index: int) -> tuple[Any, list[float]]:
"""Return the parquet path and per-frame timestamps for ``episode_index``."""
found = lookup_data_path(root, episode_index)
if found is None:
raise ValueError(f"Episode {episode_index} not found under {root}/data/")
path, start, count = found
table = pq.read_table(path, columns=["timestamp"])
timestamps = table.column("timestamp").to_pylist()[start : start + count]
return path, [float(t) for t in timestamps]
@@ -28,7 +28,7 @@ intermediate.
from __future__ import annotations
import json
from collections.abc import Iterable, Iterator
from collections.abc import Iterable
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@@ -90,15 +90,3 @@ class EpisodeStaging:
def has(self, module: ModuleName) -> bool:
return self.path_for(module).exists()
def iter_staged_episodes(root: Path) -> Iterator[int]:
"""Yield episode indices for which any staging artifact exists."""
if not root.exists():
return
for child in sorted(root.iterdir()):
if child.is_dir() and child.name.startswith("episode_"):
try:
yield int(child.name.removeprefix("episode_"))
except ValueError:
continue
@@ -23,8 +23,7 @@ into a real model.
The client speaks one method, :meth:`VlmClient.generate_json`, which:
- accepts a list of OpenAI/HF-style multimodal messages,
- requests JSON output (``json_mode=True`` enables guided decoding when the
backend supports it),
- requests JSON output from the server,
- batches requests transparently,
- and reprompts once on a JSON parse failure with an inline correction
message before raising.
@@ -46,7 +46,7 @@ from __future__ import annotations
import logging
from collections import defaultdict
from collections.abc import Iterable, Sequence
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@@ -338,19 +338,3 @@ def speech_atom(timestamp: float, text: str) -> dict[str, Any]:
}
],
}
def normalize_rows_for_writer(
rows: Iterable[dict[str, Any]],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""Helper used by tests/validators to partition a flat row list into
(persistent_rows, event_rows) using ``column_for_style``.
"""
persistent: list[dict[str, Any]] = []
events: list[dict[str, Any]] = []
for row in rows:
if column_for_style(row.get("style")) == LANGUAGE_PERSISTENT:
persistent.append(row)
else:
events.append(row)
return persistent, events