From cd12665e8575b510fa543db69ec14d22017be745 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Tue, 28 Apr 2026 16:26:19 +0200 Subject: [PATCH] feat(annotate): video_url block for openai backend Module 1 can now send the episode's actual mp4 file as a video_url content block instead of pre-decoded frames. The server (transformers serve / vllm serve / ktransformers serve) handles frame sampling at the configured fps. Default fps=1 (one frame per second is enough for subtask-boundary detection on manipulation episodes). A per-episode subclip is extracted to /.annotate_staging/.video_clips/ via ffmpeg stream-copy (no re-encode) so the model sees only this episode's frames, not the whole shard. Enable with --module_1.use_video_url=true (and --vlm.backend=openai). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../annotations/steerable_pipeline/config.py | 12 ++++ .../annotations/steerable_pipeline/frames.py | 59 +++++++++++++++++++ .../modules/plan_subtasks_memory.py | 27 ++++++++- .../steerable_pipeline/vlm_client.py | 3 + 4 files changed, 98 insertions(+), 3 deletions(-) diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index 1d6786d3f..5dad83629 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -35,6 +35,18 @@ class Module1Config: max_video_frames: int = 32 min_subtask_seconds: float = 1.5 plan_max_steps: int = 8 + use_video_url: bool = False + """When True (and backend supports it, e.g. ``openai``), Module 1 + sends a ``video_url`` content block pointing at the episode's mp4 + file instead of pre-decoded frames. Lets the server sample frames at + its own ``fps`` — no in-process conv3d cost. The video file is + extracted as a per-episode subclip to ``staging/.video_clips/`` so + the model sees only this episode's frames.""" + use_video_url_fps: float = 1.0 + """Frame-rate hint to send to the server (mm_processor_kwargs.fps). + Only used when ``use_video_url=True``. ``1.0`` = sample 1 frame per + second, which is plenty for subtask-boundary detection on most + manipulation episodes.""" @dataclass diff --git a/src/lerobot/annotations/steerable_pipeline/frames.py b/src/lerobot/annotations/steerable_pipeline/frames.py index 9540635bd..53d45f9eb 100644 --- a/src/lerobot/annotations/steerable_pipeline/frames.py +++ b/src/lerobot/annotations/steerable_pipeline/frames.py @@ -203,3 +203,62 @@ def to_video_block(images: list[Any]) -> list[dict[str, Any]]: if not images: return [] return [{"type": "video", "video": list(images)}] + + +def to_video_url_block(url: str | None, fps: float = 2.0) -> list[dict[str, Any]]: + """Wrap a video file URL as one ``video_url`` block. + + Used by the ``openai`` backend (transformers serve / vllm serve / + ktransformers serve), where the server handles frame sampling. + Returns ``[]`` when ``url`` is ``None`` so the caller can splat. + """ + if not url: + return [] + return [{"type": "video_url", "video_url": {"url": url}, "fps": fps}] + + +def episode_clip_path( + record: EpisodeRecord, + provider: "VideoFrameProvider", + cache_dir: Path, +) -> Path | None: + """Extract the episode's subclip to ``cache_dir/ep_{idx:06d}.mp4``. + + Returns ``None`` if the dataset has no video tracks. Skips re-extract + when the cached clip already exists. Uses ``ffmpeg`` via subprocess + with stream-copy where possible (no re-encode) for speed. + """ + import subprocess # noqa: PLC0415 + + if provider.camera_key is None: + return None + cache_dir.mkdir(parents=True, exist_ok=True) + out_path = cache_dir / f"ep_{record.episode_index:06d}.mp4" + if out_path.exists() and out_path.stat().st_size > 0: + return out_path + ep = provider._meta.episodes[record.episode_index] + from_timestamp = float(ep[f"videos/{provider.camera_key}/from_timestamp"]) + to_timestamp = float(ep[f"videos/{provider.camera_key}/to_timestamp"]) + src = provider.root / provider._meta.get_video_file_path( + record.episode_index, provider.camera_key + ) + cmd = [ + "ffmpeg", + "-y", + "-loglevel", + "error", + "-ss", + f"{from_timestamp:.3f}", + "-to", + f"{to_timestamp:.3f}", + "-i", + str(src), + "-c", + "copy", + str(out_path), + ] + try: + subprocess.run(cmd, check=True, timeout=120) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError): + return None + return out_path if out_path.exists() and out_path.stat().st_size > 0 else None diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index fff7b308b..dafddb70a 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -21,8 +21,17 @@ from collections.abc import Sequence from dataclasses import dataclass, field from typing import Any +from pathlib import Path + from ..config import Module1Config -from ..frames import FrameProvider, null_provider, to_video_block +from ..frames import ( + FrameProvider, + VideoFrameProvider, + episode_clip_path, + null_provider, + to_video_block, + to_video_url_block, +) from ..prompts import load as load_prompt from ..reader import EpisodeRecord from ..staging import EpisodeStaging @@ -151,14 +160,26 @@ class PlanSubtasksMemoryModule: if record.row_count == 0 or not record.frame_timestamps: return [] episode_duration = record.frame_timestamps[-1] - record.frame_timestamps[0] - video_frames = self.frame_provider.video_for_episode(record, self.config.max_video_frames) prompt = load_prompt("module_1_subtasks").format( episode_task=record.episode_task, min_subtask_seconds=self.config.min_subtask_seconds, max_steps=self.config.plan_max_steps, episode_duration=f"{episode_duration:.3f}", ) - content = [*to_video_block(video_frames), {"type": "text", "text": prompt}] + if self.config.use_video_url and isinstance(self.frame_provider, VideoFrameProvider): + cache_dir = Path(self.frame_provider.root) / ".annotate_staging" / ".video_clips" + clip = episode_clip_path(record, self.frame_provider, cache_dir) + video_block = ( + to_video_url_block(f"file://{clip}", fps=self.config.use_video_url_fps) + if clip is not None + else [] + ) + else: + video_frames = self.frame_provider.video_for_episode( + record, self.config.max_video_frames + ) + video_block = to_video_block(video_frames) + content = [*video_block, {"type": "text", "text": prompt}] messages = [{"role": "user", "content": content}] result = self.vlm.generate_json([messages])[0] spans = result.get("subtasks") if isinstance(result, dict) else None diff --git a/src/lerobot/annotations/steerable_pipeline/vlm_client.py b/src/lerobot/annotations/steerable_pipeline/vlm_client.py index e3f443c60..2c12d4780 100644 --- a/src/lerobot/annotations/steerable_pipeline/vlm_client.py +++ b/src/lerobot/annotations/steerable_pipeline/vlm_client.py @@ -317,6 +317,9 @@ def _to_openai_message(message: dict[str, Any]) -> dict[str, Any]: out_blocks.append( {"type": "image_url", "image_url": {"url": _pil_to_data_url(img)}} ) + elif block_type == "video_url": + # Pass through to the OpenAI-compatible server unchanged. + out_blocks.append({"type": "video_url", "video_url": block["video_url"]}) else: out_blocks.append(block) return {"role": message["role"], "content": out_blocks}