feat(annotate): video_url block for openai backend

Module 1 can now send the episode's actual mp4 file as a video_url
content block instead of pre-decoded frames. The server (transformers
serve / vllm serve / ktransformers serve) handles frame sampling at
the configured fps. Default fps=1 (one frame per second is enough for
subtask-boundary detection on manipulation episodes).

A per-episode subclip is extracted to <root>/.annotate_staging/.video_clips/
via ffmpeg stream-copy (no re-encode) so the model sees only this
episode's frames, not the whole shard.

Enable with --module_1.use_video_url=true (and --vlm.backend=openai).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-04-28 16:26:19 +02:00
parent 91dedcad1e
commit cd12665e85
4 changed files with 98 additions and 3 deletions
@@ -35,6 +35,18 @@ class Module1Config:
max_video_frames: int = 32 max_video_frames: int = 32
min_subtask_seconds: float = 1.5 min_subtask_seconds: float = 1.5
plan_max_steps: int = 8 plan_max_steps: int = 8
use_video_url: bool = False
"""When True (and backend supports it, e.g. ``openai``), Module 1
sends a ``video_url`` content block pointing at the episode's mp4
file instead of pre-decoded frames. Lets the server sample frames at
its own ``fps`` — no in-process conv3d cost. The video file is
extracted as a per-episode subclip to ``staging/.video_clips/`` so
the model sees only this episode's frames."""
use_video_url_fps: float = 1.0
"""Frame-rate hint to send to the server (mm_processor_kwargs.fps).
Only used when ``use_video_url=True``. ``1.0`` = sample 1 frame per
second, which is plenty for subtask-boundary detection on most
manipulation episodes."""
@dataclass @dataclass
@@ -203,3 +203,62 @@ def to_video_block(images: list[Any]) -> list[dict[str, Any]]:
if not images: if not images:
return [] return []
return [{"type": "video", "video": list(images)}] return [{"type": "video", "video": list(images)}]
def to_video_url_block(url: str | None, fps: float = 2.0) -> list[dict[str, Any]]:
"""Wrap a video file URL as one ``video_url`` block.
Used by the ``openai`` backend (transformers serve / vllm serve /
ktransformers serve), where the server handles frame sampling.
Returns ``[]`` when ``url`` is ``None`` so the caller can splat.
"""
if not url:
return []
return [{"type": "video_url", "video_url": {"url": url}, "fps": fps}]
def episode_clip_path(
record: EpisodeRecord,
provider: "VideoFrameProvider",
cache_dir: Path,
) -> Path | None:
"""Extract the episode's subclip to ``cache_dir/ep_{idx:06d}.mp4``.
Returns ``None`` if the dataset has no video tracks. Skips re-extract
when the cached clip already exists. Uses ``ffmpeg`` via subprocess
with stream-copy where possible (no re-encode) for speed.
"""
import subprocess # noqa: PLC0415
if provider.camera_key is None:
return None
cache_dir.mkdir(parents=True, exist_ok=True)
out_path = cache_dir / f"ep_{record.episode_index:06d}.mp4"
if out_path.exists() and out_path.stat().st_size > 0:
return out_path
ep = provider._meta.episodes[record.episode_index]
from_timestamp = float(ep[f"videos/{provider.camera_key}/from_timestamp"])
to_timestamp = float(ep[f"videos/{provider.camera_key}/to_timestamp"])
src = provider.root / provider._meta.get_video_file_path(
record.episode_index, provider.camera_key
)
cmd = [
"ffmpeg",
"-y",
"-loglevel",
"error",
"-ss",
f"{from_timestamp:.3f}",
"-to",
f"{to_timestamp:.3f}",
"-i",
str(src),
"-c",
"copy",
str(out_path),
]
try:
subprocess.run(cmd, check=True, timeout=120)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
return None
return out_path if out_path.exists() and out_path.stat().st_size > 0 else None
@@ -21,8 +21,17 @@ from collections.abc import Sequence
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any from typing import Any
from pathlib import Path
from ..config import Module1Config from ..config import Module1Config
from ..frames import FrameProvider, null_provider, to_video_block from ..frames import (
FrameProvider,
VideoFrameProvider,
episode_clip_path,
null_provider,
to_video_block,
to_video_url_block,
)
from ..prompts import load as load_prompt from ..prompts import load as load_prompt
from ..reader import EpisodeRecord from ..reader import EpisodeRecord
from ..staging import EpisodeStaging from ..staging import EpisodeStaging
@@ -151,14 +160,26 @@ class PlanSubtasksMemoryModule:
if record.row_count == 0 or not record.frame_timestamps: if record.row_count == 0 or not record.frame_timestamps:
return [] return []
episode_duration = record.frame_timestamps[-1] - record.frame_timestamps[0] episode_duration = record.frame_timestamps[-1] - record.frame_timestamps[0]
video_frames = self.frame_provider.video_for_episode(record, self.config.max_video_frames)
prompt = load_prompt("module_1_subtasks").format( prompt = load_prompt("module_1_subtasks").format(
episode_task=record.episode_task, episode_task=record.episode_task,
min_subtask_seconds=self.config.min_subtask_seconds, min_subtask_seconds=self.config.min_subtask_seconds,
max_steps=self.config.plan_max_steps, max_steps=self.config.plan_max_steps,
episode_duration=f"{episode_duration:.3f}", episode_duration=f"{episode_duration:.3f}",
) )
content = [*to_video_block(video_frames), {"type": "text", "text": prompt}] if self.config.use_video_url and isinstance(self.frame_provider, VideoFrameProvider):
cache_dir = Path(self.frame_provider.root) / ".annotate_staging" / ".video_clips"
clip = episode_clip_path(record, self.frame_provider, cache_dir)
video_block = (
to_video_url_block(f"file://{clip}", fps=self.config.use_video_url_fps)
if clip is not None
else []
)
else:
video_frames = self.frame_provider.video_for_episode(
record, self.config.max_video_frames
)
video_block = to_video_block(video_frames)
content = [*video_block, {"type": "text", "text": prompt}]
messages = [{"role": "user", "content": content}] messages = [{"role": "user", "content": content}]
result = self.vlm.generate_json([messages])[0] result = self.vlm.generate_json([messages])[0]
spans = result.get("subtasks") if isinstance(result, dict) else None spans = result.get("subtasks") if isinstance(result, dict) else None
@@ -317,6 +317,9 @@ def _to_openai_message(message: dict[str, Any]) -> dict[str, Any]:
out_blocks.append( out_blocks.append(
{"type": "image_url", "image_url": {"url": _pil_to_data_url(img)}} {"type": "image_url", "image_url": {"url": _pil_to_data_url(img)}}
) )
elif block_type == "video_url":
# Pass through to the OpenAI-compatible server unchanged.
out_blocks.append({"type": "video_url", "video_url": block["video_url"]})
else: else:
out_blocks.append(block) out_blocks.append(block)
return {"role": message["role"], "content": out_blocks} return {"role": message["role"], "content": out_blocks}