refactor(annotate): delegate distribution to HF Jobs; drop SLURM/local switch

The executor previously claimed it would "optionally hand off" to
datatrove's LocalPipelineExecutor or SlurmPipelineExecutor — but it
already runs phases inline in every code path, and HF Jobs (see
``examples/annotation/run_hf_job.py``) is the actual distribution
strategy. Stop pretending we have an executor selector.

* `executor.py`: drop `select_executor_class`, the "kind" log line, and
  the references to LocalPipelineExecutor / SlurmPipelineExecutor.
  Module docstring now says distribution is delegated to HF Jobs.
* `config.py`: drop `auto_threshold`, `force_local`, `slurm_partition`,
  `slurm_gpus`, `slurm_time`, `workers`. `ExecutorConfig` keeps only
  `episode_parallelism`. While here, prune the longer "why" docstrings
  on every field down to the load-bearing bits — full story moves to
  `docs/source/annotation_pipeline.mdx`.
* `pyproject.toml`: drop `datatrove>=0.4.0,<2.0.0` from the
  `[annotations]` extra; the dep was only there for the (never used)
  cluster executors. Comment block notes the new HF-Jobs delegation.
* `reader.py`, `lerobot_annotate.py`: drop their own datatrove /
  flavor-namespace mentions.
* `docs/source/annotation_pipeline.mdx`:
  - remove the flavor-namespace / sidecar paragraph (out of scope —
    "multiple revisions = multiple copies" is dataset-level policy);
  - remove the "writer drops the legacy `subtask_index` column" note
    (already covered by PR 1's intentional-break call-out);
  - remove the chat-template + `apply_chat_template(messages, tools=...)`
    line (covered by Tools doc);
  - replace the "executor picks Local vs Slurm" paragraph with
    `--executor.episode_parallelism` and a pointer to HF Jobs;
  - rewrite the style→recipe section to talk about "recipes" generically
    instead of pinning a specific YAML;
  - add a "Running on Hugging Face Jobs" section pointing at
    `examples/annotation/run_hf_job.py`;
  - add a "Running locally" example matching the CLI's docstring
    (`uv run lerobot-annotate --root=... --vlm.model_id=...`);
  - extend the paper-inspirations list with Pi0.7 and Steerable VLA
    Policies (Zhao 2025) for Module 3.

Tests: same 3 pre-existing failures as before this commit (2 module
assertions still in flight; 1 carryover from PR 1). 41/44 pass.
Pre-commit clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-05-08 11:09:22 +02:00
parent 8fa8323c91
commit dad2cf1178
7 changed files with 1551 additions and 369 deletions
@@ -23,94 +23,62 @@ from typing import Any
@dataclass
class Module1Config:
"""Module 1 hyperparameters: plan + subtasks + memory + task augmentation.
"""Module 1: plan + subtasks + memory + task augmentation.
Subtask decomposition sees the **whole episode** as one Qwen-VL video
block — no keyframe stride or count: the model handles temporal pooling
itself and decides where to cut. ``max_video_frames`` only caps the
number of frames packed into the video block (a model-capacity bound,
not an annotation-logic knob).
Module 1 attaches the whole episode as one Qwen-VL video block;
``max_video_frames`` only caps the frames packed in (a model-capacity
bound, not an annotation-logic knob).
"""
enabled: bool = True
# Number of ``task_aug`` rephrasings emitted at ``t=0``. The renderer's
# ``${task}`` binding rotates among them per ``sample_idx``. ``0`` disables.
n_task_rephrasings: int = 10
"""Number of task rephrasings to generate at ``t=0`` as ``task_aug``
persistent rows (PR 1 ``CORE_STYLES``). The renderer's ``${task}``
binding rotates among them deterministically per ``sample_idx``,
realizing Xiao 2022 / CAST-style task-prompt diversity without
touching ``meta/tasks.parquet``. Set to 0 to disable."""
# When to derive the task from the video instead of using
# ``record.episode_task``: ``off``, ``if_short`` (short / placeholder /
# missing canonical task), or ``always``. The derived task replaces the
# canonical one for every Module-1 prompt; ``meta/tasks.parquet`` is
# never modified.
derive_task_from_video: str = "if_short"
"""When to bypass the user-provided ``record.episode_task`` and
derive a fresh task description from the episode video alone:
- ``off`` never; always use the canonical task as the basis.
- ``if_short`` derive when the canonical task is empty, has fewer
than ``derive_task_min_words`` words, or matches a
placeholder string (``debug``, ``unnamed``, ``tbd``,
...). Default — fixes noisy / placeholder tasks
without forcing derivation everywhere.
- ``always`` ignore the canonical task entirely; always derive
from the video. Useful when the dataset's task
labels are uniformly bad.
The video-derived task replaces the canonical task as the basis for
subtask decomposition, plan, memory, AND the ``task_aug`` rephrasings,
so every downstream annotation is grounded in what's actually visible.
``meta/tasks.parquet`` is NOT modified — the Module-1-derived task
only lives in ``language_persistent`` rows."""
derive_task_min_words: int = 3
"""Word-count threshold for ``derive_task_from_video=if_short``."""
# Frame sampling for the subtask-decomposition prompt.
frames_per_second: float = 1.0
"""Sample one image-frame per ``1/fps`` seconds across the episode for
Module 1's subtask-decomposition prompt. ``1.0`` = 1 fps. Capped by
``max_video_frames`` to avoid blowing up the request payload."""
max_video_frames: int = 128
"""Hard cap on the number of frames Module 1 sends. With ``fps=1`` and
a 30 s episode this yields 30 frames. Bumped from 32 since each frame
is small (~30-100 KB PNG when base64'd)."""
min_subtask_seconds: float = 1.5
plan_max_steps: int = 8
# When True (and backend supports it, e.g. ``openai``), Module 1 sends a
# ``video_url`` block pointing at a per-episode mp4 subclip and lets the
# server sample frames at ``use_video_url_fps``.
use_video_url: bool = False
"""When True (and backend supports it, e.g. ``openai``), Module 1
sends a ``video_url`` content block pointing at the episode's mp4
file instead of pre-decoded frames. Lets the server sample frames at
its own ``fps`` — no in-process conv3d cost. The video file is
extracted as a per-episode subclip to ``staging/.video_clips/`` so
the model sees only this episode's frames."""
use_video_url_fps: float = 1.0
"""Frame-rate hint to send to the server (mm_processor_kwargs.fps).
Only used when ``use_video_url=True``. ``1.0`` = sample 1 frame per
second, which is plenty for subtask-boundary detection on most
manipulation episodes."""
@dataclass
class Module2Config:
"""Module 2 hyperparameters: interjections + paired speech."""
"""Module 2: interjections + paired speech."""
enabled: bool = True
# Each interjection emits a paired ``(interjection, speech)`` event row
# and triggers a ``plan`` refresh at the same timestamp via Module 1.
max_interjections_per_episode: int = 3
"""Number of mid-episode interjections to generate per episode. Each
creates a paired ``(interjection, speech)`` event row plus triggers a
``plan`` refresh at the same timestamp via Module 1. Bumped from the
original ``1`` after qwen36moe-10 showed plan/interjection coverage
was too sparse for Hi Robot-style training."""
interjection_min_t: float = 2.0
# Visual context attached to the interjection prompt: a short window
# of frames centered on the chosen timestamp so the VLM sees the
# ongoing motion rather than a single frozen frame.
interjection_window_seconds: float = 2.0
"""How many seconds of video to attach to the interjection prompt as
visual context. Without this the VLM only sees a single frozen frame
and writes generic interjections that aren't grounded in the actual
motion happening at the chosen timestamp."""
interjection_window_frames: int = 4
"""How many frames to sample over ``interjection_window_seconds``.
Default 4 ⇒ ~0.5 fps over the leading 2 seconds — enough for the
model to read the ongoing motion, cheap enough to keep prompt size
bounded for the 32k context."""
@dataclass
class Module3Config:
"""Module 3 hyperparameters: general VQA."""
"""Module 3: general VQA."""
enabled: bool = True
vqa_emission_hz: float = 1.0
@@ -122,118 +90,82 @@ class Module3Config:
class VlmConfig:
"""Shared Qwen-VL client configuration."""
# One of ``vllm``, ``transformers``, ``openai``, or ``stub`` (tests).
# ``openai`` talks to a local OpenAI-compatible server; the CLI
# auto-spawns one when ``auto_serve=True``.
backend: str = "openai"
"""One of ``vllm``, ``transformers``, ``openai``, or ``stub`` (tests only).
Default ``openai`` talks to a local OpenAI-compatible server (vllm /
transformers) which the CLI auto-spawns when ``auto_serve=True``."""
model_id: str = "Qwen/Qwen2.5-VL-7B-Instruct"
api_base: str = "http://localhost:8000/v1"
"""Base URL for the ``openai`` backend."""
api_key: str = "EMPTY"
"""API key for the ``openai`` backend; ``EMPTY`` works for local servers."""
auto_serve: bool = True
"""When True with ``backend=openai``, the CLI probes ``api_base``
first; if no server answers, it spawns one (default:
``transformers serve``), waits for it to be ready, runs the
pipeline, and tears it down on exit. Default ``True`` so a single
``lerobot-annotate`` call can drive the whole flow. Set to ``False``
if you want to fail fast when no server is reachable (e.g. you're
pointing at a remote endpoint that should already be up)."""
serve_port: int = 8000
"""Port the auto-spawned server binds to. Sets ``api_base`` automatically."""
serve_command: str | None = None
"""Override the auto-serve command (full shell command). When ``None``,
we run ``transformers serve <model_id> --port <serve_port> --continuous-batching``.
When ``parallel_servers > 1``, the literal ``{port}`` placeholder in
this command (if present) is substituted per-replica."""
# OpenAI-compatible server endpoint; ``EMPTY`` works for local servers.
api_base: str = "http://localhost:8000/v1"
api_key: str = "EMPTY"
# When True with ``backend=openai``, the CLI probes ``api_base`` and
# spawns a server if none answers (default: ``transformers serve``).
# Set to False to fail fast when pointing at a remote endpoint.
auto_serve: bool = True
serve_port: int = 8000
# Override the auto-serve command. ``{port}`` is substituted per replica
# when ``parallel_servers > 1``.
serve_command: str | None = None
# Run multiple independent inference servers for round-robin client
# routing (each pinned to a GPU via ``CUDA_VISIBLE_DEVICES`` and bound
# to ``serve_port + i``). ``num_gpus=0`` means one GPU per replica.
parallel_servers: int = 1
"""When >1, spawn this many independent inference servers (each pinned
to a GPU via ``CUDA_VISIBLE_DEVICES`` and listening on
``serve_port + i``) and round-robin client requests across them.
Useful when DP/TP NCCL setup is broken on the node — single-GPU
replicas don't need cross-GPU communication. When
``parallel_servers > num_gpus``, replicas are round-robin-assigned
to GPUs (e.g. 4 replicas on 2 GPUs → 0,1,0,1)."""
num_gpus: int = 0
"""How many physical GPUs are available for round-robin replica
placement. ``0`` means ``parallel_servers`` (one GPU per replica,
backward-compatible default). Set this to ``2`` with
``parallel_servers=4`` to pack 2 replicas per GPU."""
client_concurrency: int = 16
"""Maximum number of in-flight chat requests the client issues in
parallel. vllm batches them internally for free, so bumping this
typically gives big throughput wins on a single TP=1 server. Set to
``1`` for strict serial calls."""
serve_ready_timeout_s: float = 600.0
"""Max seconds to wait for the server to start serving requests."""
max_new_tokens: int = 512
temperature: float = 0.2
json_mode: bool = True
batch_size: int = 4
tensor_parallel_size: int = 1
# Fraction of GPU memory vllm allocates for weights + KV cache.
gpu_memory_utilization: float = 0.9
"""Fraction of GPU memory vllm allocates for weights + KV cache.
Lower (e.g. 0.7) when the vision encoder needs cuDNN workspace, or to
avoid CUDNN_STATUS_NOT_INITIALIZED on tight VRAM (30B BF16 on 80 GB)."""
# Cap context length (None = model default). On 80 GB H100 a 30B BF16
# model often needs <= 8192 to leave KV-cache headroom.
max_model_len: int | None = None
"""Cap context length. ``None`` keeps the model's default; on H100 80 GB
a 30B BF16 model often needs ``max_model_len=8192`` or smaller to leave
room for KV cache."""
trust_remote_code: bool = False
"""Pass ``trust_remote_code`` to HF auto-classes. Default ``False`` —
only enable for models that actually ship custom code in their repo
(rare for first-class VL releases). On Qwen3-VL it triggers an
std::bad_alloc post-load even though the official transformers class
is sufficient, so leaving this off is safest."""
# Override the camera stream used for keyframe attachment. None picks
# the first ``observation.images.*`` key the dataset declares.
camera_key: str | None = None
"""Override the camera stream used for keyframe attachment. ``None`` picks
the first ``observation.images.*`` key the dataset declares."""
# Forwarded as ``extra_body.chat_template_kwargs`` on every chat call;
# use to pass model-specific flags such as ``{"enable_thinking": false}``.
chat_template_kwargs: dict[str, Any] | None = None
"""Forwarded as ``extra_body.chat_template_kwargs`` on every chat call.
Use this to pass model-specific template flags such as
``{"enable_thinking": false}`` for Qwen3.5/Qwen3.6 to suppress the
reasoning preamble that otherwise eats the entire ``max_new_tokens``
budget before any JSON is emitted."""
@dataclass
class ExecutorConfig:
"""Executor selection and SLURM hyperparameters."""
"""Executor settings.
auto_threshold: int = 32
force_local: bool = False
slurm_partition: str | None = None
slurm_gpus: int = 1
slurm_time: str = "06:00:00"
workers: int = 1
Distributed execution is provided by Hugging Face Jobs (see
``examples/annotation/run_hf_job.py``); this config only controls
intra-process episode concurrency.
"""
# Episodes processed concurrently within each module phase. Each
# in-flight episode dispatches 3-5 dependent VLM calls, so this is the
# main knob for saturating ``parallel_servers`` and ``client_concurrency``.
episode_parallelism: int = 16
"""Number of episodes processed concurrently within each module phase.
Each in-flight episode sends 35 dependent VLM calls; bumping this is
how you actually saturate ``parallel_servers`` and ``client_concurrency``
— without it, the executor loops one episode at a time and the
inference servers sit ~90% idle. Set to ``1`` for strict serial
execution."""
@dataclass
class AnnotationPipelineConfig:
"""Top-level config for ``lerobot-annotate``.
Mirrors the structure of :class:`lerobot.configs.train.TrainPipelineConfig`:
a draccus-parsed dataclass that contains nested per-module sub-configs and
leaves the dataset, executor, and VLM choices independently knobbable.
Output is always in-place: the writer rewrites ``data/chunk-*/file-*.parquet``
in place. Multiple revisions of the same dataset live in separate copies.
The writer rewrites ``data/chunk-*/file-*.parquet`` in place. Multiple
revisions of the same dataset live in separate copies.
"""
repo_id: str | None = None
root: Path | None = None
# Defaults to ``<root>/.annotate_staging/`` when unset.
staging_dir: Path | None = None
"""If unset, defaults to ``<root>/.annotate_staging/``."""
seed: int = 1729
@@ -247,14 +179,10 @@ class AnnotationPipelineConfig:
skip_validation: bool = False
only_episodes: tuple[int, ...] | None = None
# Upload the annotated dataset to the Hugging Face Hub when set.
push_to_hub: str | None = None
"""If set, after the pipeline completes, upload the annotated dataset
root to the Hugging Face Hub as a dataset repo with this id (e.g.
``pepijn/super_poulain_steerable``). Creates the repo if missing."""
push_private: bool = False
"""When ``push_to_hub`` is set, create the repo as private."""
push_commit_message: str | None = None
"""Override the commit message used for the hub upload."""
def resolved_staging_dir(self, root: Path) -> Path:
return self.staging_dir if self.staging_dir is not None else root / ".annotate_staging"
@@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Executor selection: local vs SLURM via datatrove.
"""In-process executor that runs the four annotation phases.
The executor plans **four phases** with the dependency order from the plan:
@@ -25,8 +25,14 @@ The executor plans **four phases** with the dependency order from the plan:
phase 5: validator
phase 6: writer
Phase 3 is why ``executor.py`` documents the dependency: Module 1 must be
re-entered after Module 2 to refresh ``plan`` rows at interjection times.
Phase 3 is why Module 1 must be re-entered after Module 2 — to refresh
``plan`` rows at interjection timestamps.
Distributed execution is provided by Hugging Face Jobs (see
``examples/annotation/run_hf_job.py``); the runner inside the job
invokes ``lerobot-annotate`` which uses this in-process executor.
Episode-level concurrency is controlled by
``ExecutorConfig.episode_parallelism``.
"""
from __future__ import annotations
@@ -36,7 +42,7 @@ from dataclasses import dataclass
from pathlib import Path
from typing import Any
from .config import AnnotationPipelineConfig, ExecutorConfig
from .config import AnnotationPipelineConfig
from .reader import EpisodeRecord, iter_episodes
from .staging import EpisodeStaging
from .validator import StagingValidator
@@ -63,28 +69,14 @@ class PipelineRunSummary:
validation_report: Any # ValidationReport, kept Any to avoid import cycle
def select_executor_class(num_episodes: int, config: ExecutorConfig) -> str:
"""Return ``"local"`` or ``"slurm"`` based on the threshold.
The plan's "executor selection threshold" lives in
:class:`ExecutorConfig.auto_threshold`. ``force_local`` always wins.
"""
if config.force_local:
return "local"
return "local" if num_episodes <= config.auto_threshold else "slurm"
@dataclass
class Executor:
"""Run all four phases over a dataset root.
"""Run all four phases over a dataset root in-process.
The executor is intentionally framework-agnostic: by default it runs the
phases inline (suitable for tests, small datasets, and the CLI's
``--force-local`` mode). It will optionally hand off to datatrove's
:class:`LocalPipelineExecutor` or :class:`SlurmPipelineExecutor` when those
are installed and the dataset is large enough to benefit from them.
Tests construct the executor directly with stub modules.
Episode-level concurrency comes from ``ExecutorConfig.episode_parallelism``
(a thread pool); cluster-level concurrency comes from running this
executor inside a Hugging Face Job. Tests construct the executor
directly with stub modules.
"""
config: AnnotationPipelineConfig
@@ -100,8 +92,7 @@ class Executor:
if n == 0:
raise ValueError(f"No episodes found under {root}/data/")
executor_kind = select_executor_class(n, self.config.executor)
print(f"[annotate] {n} episodes total; executor={executor_kind}", flush=True)
print(f"[annotate] {n} episodes total", flush=True)
staging_dir = self.config.resolved_staging_dir(root)
staging_dir.mkdir(parents=True, exist_ok=True)
@@ -170,11 +161,7 @@ class Executor:
existing = info.get("tools")
if not isinstance(existing, list):
existing = []
names = {
(t.get("function") or {}).get("name")
for t in existing
if isinstance(t, dict)
}
names = {(t.get("function") or {}).get("name") for t in existing if isinstance(t, dict)}
merged = list(existing)
if SAY_TOOL_SCHEMA["function"]["name"] not in names:
merged.append(SAY_TOOL_SCHEMA)
@@ -207,8 +194,7 @@ class Executor:
n = len(records)
parallelism = max(1, min(self.config.executor.episode_parallelism, n))
print(
f"[annotate] phase={name} starting on {n} episode(s) "
f"(parallelism={parallelism})",
f"[annotate] phase={name} starting on {n} episode(s) (parallelism={parallelism})",
flush=True,
)
t0 = _time.time()
@@ -226,8 +212,7 @@ class Executor:
_, ep_idx, elapsed = _do((i, record))
processed += 1
print(
f"[annotate] {name} episode {i}/{n} "
f"(idx={ep_idx}) done in {elapsed:.1f}s",
f"[annotate] {name} episode {i}/{n} (idx={ep_idx}) done in {elapsed:.1f}s",
flush=True,
)
else:
@@ -262,15 +247,11 @@ class Executor:
for record in records:
staging = EpisodeStaging(staging_dir, record.episode_index)
interjection_rows = [
row
for row in staging.read("module_2")
if row.get("style") == "interjection"
row for row in staging.read("module_2") if row.get("style") == "interjection"
]
interjection_times = [float(row["timestamp"]) for row in interjection_rows]
interjection_texts = [str(row.get("content") or "") for row in interjection_rows]
if interjection_times:
self.module_1.run_plan_updates(
record, staging, interjection_times, interjection_texts
)
self.module_1.run_plan_updates(record, staging, interjection_times, interjection_texts)
processed += 1
return PhaseResult(name="module_1_plan_update", episodes_processed=processed, episodes_skipped=0)
@@ -26,9 +26,7 @@ episode containing:
- ``frames_df``: pandas.DataFrame slice for the episode (only loaded on demand)
This shape lets each module operate per-episode without loading all parquet
rows into memory at once. It deliberately does not depend on datatrove
datatrove integration wraps this generator inside a ``PipelineStep`` in
:mod:`.executor`.
rows into memory at once.
"""
from __future__ import annotations
+3 -4
View File
@@ -16,16 +16,15 @@
"""``lerobot-annotate`` — populate ``language_persistent`` and
``language_events`` columns on a LeRobot dataset.
Annotations live directly in ``data/chunk-*/file-*.parquet``: there is no
flavor namespace and no sidecar tree. Multiple revisions of the same dataset
mean multiple dataset copies.
Annotations live directly in ``data/chunk-*/file-*.parquet``.
Example:
uv run lerobot-annotate \\
--root=/path/to/dataset \\
--vlm.backend=transformers \\
--vlm.model_id=Qwen/Qwen2.5-VL-7B-Instruct
For distributed runs, see ``examples/annotation/run_hf_job.py``.
"""
import logging