Compare commits

...

4 Commits

Author SHA1 Message Date
Pepijn 1927077fea fix(annotate): transcode subclips to H.264 instead of stream-copy
Modern LeRobot datasets store videos in AV1, which vllm's libav build
cannot decode (the video processor returns 0 frames and downstream
chokes with ZeroDivisionError). Re-encode each per-episode subclip
with libx264 (preset ultrafast, crf 23) so the resulting mp4 is
universally decodable. Strip audio with -an for a smaller payload.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 17:00:39 +02:00
Pepijn 327ce89423 feat(annotate): pack multiple vllm replicas per GPU via num_gpus
Adds VlmConfig.num_gpus so parallel_servers can exceed the physical
GPU count. Replicas are round-robin-assigned to GPUs (e.g.
parallel_servers=4 + num_gpus=2 → replicas pinned to GPUs 0,1,0,1).
Backward-compatible: num_gpus=0 keeps the existing 1-replica-per-GPU
behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 16:11:51 +02:00
Pepijn b56cfe7eb9 feat(annotate): forward chat_template_kwargs to OpenAI extra_body
Lets callers pass per-request template flags such as
{"enable_thinking": false} for Qwen3.5/Qwen3.6 models, where the
default thinking preamble otherwise consumes the entire max_new_tokens
budget before any JSON is emitted.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 15:00:23 +02:00
Pepijn e06a7c79f7 fix(annotate): include prompt .txt files in wheel
The setuptools package-data declaration only listed envs/*.json, so
pip-installed wheels (including HF Jobs runs) were missing the
module_1_subtasks/plan/memory and module_2/3 prompt templates,
causing FileNotFoundError at runtime.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 11:47:03 +02:00
4 changed files with 42 additions and 13 deletions
+1 -1
View File
@@ -302,7 +302,7 @@ lerobot-annotate="lerobot.scripts.lerobot_annotate:main"
# ---------------- Tool Configurations ----------------
[tool.setuptools.package-data]
lerobot = ["envs/*.json"]
lerobot = ["envs/*.json", "annotations/steerable_pipeline/prompts/*.txt"]
[tool.setuptools.packages.find]
where = ["src"]
@@ -18,6 +18,7 @@ from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@dataclass
@@ -107,10 +108,17 @@ class VlmConfig:
this command (if present) is substituted per-replica."""
parallel_servers: int = 1
"""When >1, spawn this many independent inference servers (each pinned
to one GPU via ``CUDA_VISIBLE_DEVICES`` and listening on
to a GPU via ``CUDA_VISIBLE_DEVICES`` and listening on
``serve_port + i``) and round-robin client requests across them.
Useful when DP/TP NCCL setup is broken on the node — single-GPU
replicas don't need cross-GPU communication."""
replicas don't need cross-GPU communication. When
``parallel_servers > num_gpus``, replicas are round-robin-assigned
to GPUs (e.g. 4 replicas on 2 GPUs → 0,1,0,1)."""
num_gpus: int = 0
"""How many physical GPUs are available for round-robin replica
placement. ``0`` means ``parallel_servers`` (one GPU per replica,
backward-compatible default). Set this to ``2`` with
``parallel_servers=4`` to pack 2 replicas per GPU."""
client_concurrency: int = 16
"""Maximum number of in-flight chat requests the client issues in
parallel. vllm batches them internally for free, so bumping this
@@ -140,6 +148,12 @@ class VlmConfig:
camera_key: str | None = None
"""Override the camera stream used for keyframe attachment. ``None`` picks
the first ``observation.images.*`` key the dataset declares."""
chat_template_kwargs: dict[str, Any] | None = None
"""Forwarded as ``extra_body.chat_template_kwargs`` on every chat call.
Use this to pass model-specific template flags such as
``{"enable_thinking": false}`` for Qwen3.5/Qwen3.6 to suppress the
reasoning preamble that otherwise eats the entire ``max_new_tokens``
budget before any JSON is emitted."""
@dataclass
@@ -225,8 +225,11 @@ def episode_clip_path(
"""Extract the episode's subclip to ``cache_dir/ep_{idx:06d}.mp4``.
Returns ``None`` if the dataset has no video tracks. Skips re-extract
when the cached clip already exists. Uses ``ffmpeg`` via subprocess
with stream-copy where possible (no re-encode) for speed.
when the cached clip already exists. Re-encodes to H.264
(libx264) so the resulting mp4 is decodable by every downstream
video processor — stream-copy would inherit the source codec
(often AV1 in modern LeRobot datasets), which vllm's libav build
cannot decode.
"""
import subprocess # noqa: PLC0415
@@ -253,12 +256,19 @@ def episode_clip_path(
f"{to_timestamp:.3f}",
"-i",
str(src),
"-c",
"copy",
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-crf",
"23",
"-pix_fmt",
"yuv420p",
"-an",
str(out_path),
]
try:
subprocess.run(cmd, check=True, timeout=120)
subprocess.run(cmd, check=True, timeout=300)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
return None
return out_path if out_path.exists() and out_path.stat().st_size > 0 else None
@@ -380,10 +380,13 @@ def _make_openai_client(config: VlmConfig) -> VlmClient:
"max_tokens": max_tok,
"temperature": temp,
}
extra_body: dict[str, Any] = {}
if send_mm_kwargs and mm_kwargs:
kwargs["extra_body"] = {
"mm_processor_kwargs": {**mm_kwargs, "do_sample_frames": True}
}
extra_body["mm_processor_kwargs"] = {**mm_kwargs, "do_sample_frames": True}
if config.chat_template_kwargs:
extra_body["chat_template_kwargs"] = config.chat_template_kwargs
if extra_body:
kwargs["extra_body"] = extra_body
with rr_lock:
chosen = clients[rr_counter["i"] % len(clients)]
rr_counter["i"] += 1
@@ -453,10 +456,12 @@ def _spawn_parallel_inference_servers(config: VlmConfig) -> list[str]:
f"--uvicorn-log-level warning"
)
num_gpus = config.num_gpus if config.num_gpus > 0 else n
for i in range(n):
port = config.serve_port + i
gpu = i % num_gpus
env = _os.environ.copy()
env["CUDA_VISIBLE_DEVICES"] = str(i)
env["CUDA_VISIBLE_DEVICES"] = str(gpu)
cmd = base_cmd
if "{port}" in cmd:
cmd = cmd.replace("{port}", str(port))
@@ -464,7 +469,7 @@ def _spawn_parallel_inference_servers(config: VlmConfig) -> list[str]:
cmd = f"{cmd} --port {port}"
api_base = f"http://localhost:{port}/v1"
api_bases.append(api_base)
print(f"[server-{i}] launching on GPU {i} port {port}: {cmd}", flush=True)
print(f"[server-{i}] launching on GPU {gpu} port {port}: {cmd}", flush=True)
proc = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,