more annotation changes

This commit is contained in:
Jade Choghari
2026-02-12 12:46:45 +00:00
parent 4503019d18
commit 51b3b31927
4 changed files with 847 additions and 68 deletions
@@ -4,23 +4,47 @@
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="jadechoghari/collect-data"
MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct"
REPO_ID="jadechoghari/piper-demo-20260205_103303"
# MODEL="Qwen/Qwen3-VL-30B-A3B-Thinking"
MODEL="Qwen/Qwen2-VL-7B-Instruct"
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen_new"
BATCH_SIZE=32
BATCH_SIZE=2
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
# Run subtask annotation
python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
# Run subtask annotation.
# To use closed-vocabulary labels, add a line: --subtask-labels "label1" "label2" ...
# Example (add backslash after "$MODEL" and uncomment the next line):
# --model "$MODEL" \
# --subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can"
python /admin/home/jade_choghari/lerobot/src/lerobot/data_processing/annotations/subtask_annotate.py \
--repo-id "$REPO_ID" \
--video-key observation.images.base \
--video-key observation.images.top \
--output-dir "$OUTPUT_DIR" \
--output-repo-id "jadechoghari/collect-data-with-subtasks"
--output-repo-id "jadechoghari/piper-demo-annotated1" \
--push-to-hub \
--model "$MODEL" \
--subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can" \
--batch-size 2
# Run subtask annotation (image-window: frames as images for better accuracy)
# python /admin/home/jade_choghari/lerobot/src/lerobot/data_processing/annotations/subtask_annotate_image.py \
# --repo-id "$REPO_ID" \
# --camera-key observation.images.wrist \
# --output-dir "$OUTPUT_DIR" \
# --output-repo-id "jadechoghari/piper-demo-annotated1-image" \
# --push-to-hub \
# --model "$MODEL" \
# --window-size 184 \
# --max-frames-per-window 16 \
# --subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can" \
# --batch-size 2
# run synthetic data generation (all episodes processed)
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
@@ -81,7 +81,10 @@ from rich.progress import Progress, SpinnerColumn, TextColumn
from lerobot.datasets.dataset_tools import add_features
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.constants import SKILL_SEGMENTATION_PROMPT_TEMPLATE
from lerobot.utils.constants import (
SKILL_SEGMENTATION_PROMPT_TEMPLATE,
format_subtask_labels_section,
)
# Skill Annotation Data Structures
@@ -155,7 +158,7 @@ class BaseVLM(ABC):
video_path: Path to the video file
episode_duration: Total duration of the episode in seconds
coarse_goal: Optional high-level task description
subtask_labels: Optional list of allowed skill labels to use
subtask_labels: If provided, model must choose only from these labels (closed vocabulary)
Returns:
List of Skill objects representing atomic manipulation skills
@@ -177,7 +180,6 @@ class BaseVLM(ABC):
video_paths: List of paths to video files
episode_durations: List of episode durations in seconds
coarse_goal: Optional high-level task description
subtask_labels: Optional list of allowed skill labels to use
Returns:
List of skill lists, one for each video
@@ -188,33 +190,65 @@ class BaseVLM(ABC):
def create_skill_segmentation_prompt(
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
duration_seconds: float | None = None,
) -> str:
"""Create the prompt for skill segmentation.
Args:
coarse_goal: Optional high-level task description.
subtask_labels: Optional list of allowed skill/subtask labels. When provided,
the model is instructed to use only these labels (choosing the best match
for each segment).
Returns:
The formatted prompt string.
If subtask_labels is provided, uses closed-vocabulary template from constants.
duration_seconds is required when using subtask_labels so the model knows the full video length.
"""
goal_context = f'The overall goal is: "{coarse_goal}"\n\n' if coarse_goal else ""
if subtask_labels:
labels_str = ", ".join(f'"{label}"' for label in subtask_labels)
subtask_labels_section = (
f'6. **Allowed labels**: Use ONLY the following skill names '
f"(choose the best match for each segment): {labels_str}\n\n"
)
else:
subtask_labels_section = ""
return textwrap.dedent(
SKILL_SEGMENTATION_PROMPT_TEMPLATE.format(
if duration_seconds is None:
raise ValueError("duration_seconds is required when using subtask_labels (closed vocabulary)")
subtask_labels_section = format_subtask_labels_section(subtask_labels)
video_duration_mm_ss = f"{int(duration_seconds // 60):02d}:{int(duration_seconds % 60):02d}"
return SKILL_SEGMENTATION_PROMPT_TEMPLATE.format(
goal_context=goal_context,
subtask_labels_section=subtask_labels_section,
video_duration_seconds=duration_seconds,
video_duration_mm_ss=video_duration_mm_ss,
)
duration_note = (
f"\nThe total video duration is **{duration_seconds} seconds**. "
"All start/end values must be in seconds (float). The last skill's \"end\" must be exactly this duration.\n\n"
if duration_seconds is not None
else ""
)
return textwrap.dedent(f"""\
# Role
You are a Robotics Vision System specializing in temporal action segmentation for robot manipulation demonstrations.
# Task
{duration_note}{goal_context}Segment this robot demonstration video into short atomic manipulation skills. Each skill should:
- Last approximately 1-3 seconds
- Describe a clear, single action (e.g., "pick up object", "move arm left", "release gripper")
- Have precise start and end timestamps
# Requirements
1. **Atomic Actions**: Each skill should be a single, indivisible action
2. **Complete Coverage**: Skills must cover the entire video duration with no gaps
3. **Boundary Consistency**: The end of one skill equals the start of the next
4. **Natural Language**: Use clear, descriptive names for each skill
5. **Timestamps**: Use seconds (float) for all timestamps. If the video has a visible timer in the corner showing elapsed time in seconds, use it to report accurate start and end times for each skill.
# Output Format
After your analysis, output ONLY valid JSON with this exact structure:
```json
{{
"skills": [
{{"name": "skill description", "start": 0.0, "end": 1.5}},
{{"name": "another skill", "start": 1.5, "end": 3.2}}
]
}}
```
The first skill must start at 0.0 and the last skill must end at the video duration.
""")
# Qwen2-VL Implementation
@@ -249,7 +283,9 @@ class Qwen2VL(BaseVLM):
subtask_labels: list[str] | None = None,
) -> list[Skill]:
"""Segment video into skills using Qwen2-VL."""
prompt = create_skill_segmentation_prompt(coarse_goal, subtask_labels)
prompt = create_skill_segmentation_prompt(
coarse_goal, subtask_labels, duration_seconds=episode_duration
)
duration_str = f"{int(episode_duration // 60):02d}:{int(episode_duration % 60):02d}"
messages = [
@@ -260,7 +296,7 @@ class Qwen2VL(BaseVLM):
{"type": "video", "video": str(video_path), "fps": 1.0},
{
"type": "text",
"text": f"Video duration: {duration_str} (~{episode_duration:.1f}s). Segment into atomic skills.",
"text": f"Video duration: {duration_str} (exactly {episode_duration:.1f} seconds). Segment into atomic skills. Last skill must end at {episode_duration:.1f}.",
},
],
},
@@ -294,11 +330,12 @@ class Qwen2VL(BaseVLM):
subtask_labels: list[str] | None = None,
) -> list[list[Skill]]:
"""Segment multiple videos into skills using Qwen2-VL in a batch."""
prompt = create_skill_segmentation_prompt(coarse_goal, subtask_labels)
# Create messages for each video
# Create messages for each video (prompt includes duration so each gets correct length)
all_messages = []
for video_path, duration in zip(video_paths, episode_durations):
prompt = create_skill_segmentation_prompt(
coarse_goal, subtask_labels, duration_seconds=duration
)
duration_str = f"{int(duration // 60):02d}:{int(duration % 60):02d}"
messages = [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
@@ -308,7 +345,7 @@ class Qwen2VL(BaseVLM):
{"type": "video", "video": str(video_path), "fps": 1.0},
{
"type": "text",
"text": f"Video duration: {duration_str} (~{duration:.1f}s). Segment into atomic skills.",
"text": f"Video duration: {duration_str} (exactly {duration:.1f} seconds). Segment into atomic skills. Last skill must end at {duration:.1f}.",
},
],
},
@@ -413,9 +450,10 @@ class Qwen3VL(BaseVLM):
subtask_labels: list[str] | None = None,
) -> list[Skill]:
"""Segment video into skills using Qwen3-VL."""
prompt = create_skill_segmentation_prompt(coarse_goal, subtask_labels)
prompt = create_skill_segmentation_prompt(
coarse_goal, subtask_labels, duration_seconds=episode_duration
)
duration_str = f"{int(episode_duration // 60):02d}:{int(episode_duration % 60):02d}"
messages = [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
{
@@ -424,7 +462,7 @@ class Qwen3VL(BaseVLM):
{"type": "video", "video": str(video_path), "fps": 1.0},
{
"type": "text",
"text": f"Video duration: {duration_str} (~{episode_duration:.1f}s). Segment into atomic skills.",
"text": f"Video duration: {duration_str} (exactly {episode_duration:.1f} seconds). Segment into atomic skills. Last skill must end at {episode_duration:.1f}.",
},
],
},
@@ -439,7 +477,6 @@ class Qwen3VL(BaseVLM):
padding=True,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(**inputs, max_new_tokens=1024, do_sample=True, temperature=0.7)
@@ -458,11 +495,12 @@ class Qwen3VL(BaseVLM):
subtask_labels: list[str] | None = None,
) -> list[list[Skill]]:
"""Segment multiple videos into skills using Qwen3-VL in a batch."""
prompt = create_skill_segmentation_prompt(coarse_goal, subtask_labels)
# Create messages for each video
# Create messages for each video (prompt includes duration so each gets correct length)
all_messages = []
for video_path, duration in zip(video_paths, episode_durations):
prompt = create_skill_segmentation_prompt(
coarse_goal, subtask_labels, duration_seconds=duration
)
duration_str = f"{int(duration // 60):02d}:{int(duration % 60):02d}"
messages = [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
@@ -472,7 +510,7 @@ class Qwen3VL(BaseVLM):
{"type": "video", "video": str(video_path), "fps": 1.0},
{
"type": "text",
"text": f"Video duration: {duration_str} (~{duration:.1f}s). Segment into atomic skills.",
"text": f"Video duration: {duration_str} (exactly {duration:.1f} seconds). Segment into atomic skills. Last skill must end at {duration:.1f}.",
},
],
},
@@ -660,6 +698,58 @@ class VideoExtractor:
return tmp_path
def add_timer_overlay(self, video_path: Path) -> Path:
"""
Add a visible timer overlay to each frame (elapsed time in seconds) in one corner.
Used so the VLM can read the timestamp from the image instead of relying on file metadata.
Writes to a new temporary file and returns its path.
"""
out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
out_path = Path(out_file.name)
out_file.close()
cap = cv2.VideoCapture(str(video_path))
fps = cap.get(cv2.CAP_PROP_FPS) or 1.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(out_path), fourcc, fps, (w, h))
# Font scale so timer is big and readable (proportional to frame size)
font_scale = max(1.2, min(h, w) / 350.0)
thickness = max(2, int(font_scale))
font = cv2.FONT_HERSHEY_SIMPLEX
frame_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
t_sec = frame_idx / fps
text = f"{t_sec:.1f} s"
# Position: top-right corner with margin
(tw, th), _ = cv2.getTextSize(text, font, font_scale, thickness)
x = w - tw - 30
y = 25 + th
# Black outline for readability on any background (draw in 8 directions then center)
for dx in (-2, -1, 0, 1, 2):
for dy in (-2, -1, 0, 1, 2):
if dx != 0 or dy != 0:
cv2.putText(
frame, text, (x + dx, y + dy), font, font_scale, (0, 0, 0), thickness
)
cv2.putText(frame, text, (x, y), font, font_scale, (255, 255, 255), thickness)
writer.write(frame)
frame_idx += 1
cap.release()
writer.release()
if not out_path.exists() or out_path.stat().st_size < 1024:
if out_path.exists():
out_path.unlink()
raise RuntimeError("Timer overlay produced invalid file")
return out_path
def get_video_duration(self, video_path: Path) -> float:
"""Get duration of a video file in seconds."""
cap = cv2.VideoCapture(str(video_path))
@@ -687,11 +777,13 @@ class SkillAnnotator:
video_extractor: VideoExtractor | None = None,
console: Console | None = None,
batch_size: int = 8,
add_timer_overlay: bool = True,
):
self.vlm = vlm
self.console = console or Console()
self.video_extractor = video_extractor or VideoExtractor(self.console)
self.batch_size = batch_size
self.add_timer_overlay = add_timer_overlay
def annotate_dataset(
self,
@@ -709,7 +801,7 @@ class SkillAnnotator:
video_key: Key for video observations (e.g., "observation.images.base")
episodes: Specific episode indices to annotate (None = all)
skip_existing: Skip episodes that already have skill annotations
subtask_labels: Optional list of allowed skill labels (VLM will use only these)
subtask_labels: If provided, model must choose only from these labels (closed vocabulary)
Returns:
Dictionary mapping episode index to EpisodeSkills
@@ -854,6 +946,8 @@ class SkillAnnotator:
"""Annotate multiple episodes with skill labels in a batch."""
# Extract all videos for this batch
extracted_paths = []
timer_paths = []
paths_for_vlm = []
durations = []
valid_episode_indices = []
@@ -876,8 +970,16 @@ class SkillAnnotator:
extracted_path = self.video_extractor.extract_episode_video(
video_path, start_ts, end_ts, target_fps=1
)
extracted_paths.append(extracted_path)
if self.add_timer_overlay:
video_for_vlm = self.video_extractor.add_timer_overlay(extracted_path)
extracted_paths.append(extracted_path)
timer_paths.append(video_for_vlm)
else:
video_for_vlm = extracted_path
extracted_paths.append(extracted_path)
timer_paths.append(None)
paths_for_vlm.append(video_for_vlm)
durations.append(duration)
valid_episode_indices.append(ep_idx)
@@ -885,13 +987,13 @@ class SkillAnnotator:
self.console.print(f"[yellow]Warning: Failed to extract video for episode {ep_idx}: {e}[/yellow]")
continue
if not extracted_paths:
if not paths_for_vlm:
return {}
try:
# Run VLM skill segmentation in batch
all_skills = self.vlm.segment_skills_batch(
extracted_paths, durations, coarse_goal, subtask_labels
paths_for_vlm, durations, coarse_goal, subtask_labels
)
# Map results back to episode indices
@@ -902,10 +1004,13 @@ class SkillAnnotator:
return results
finally:
# Clean up all temporary files
# Clean up all temporary files (extracted and timer-overlay)
for path in extracted_paths:
if path.exists():
path.unlink()
for path in timer_paths:
if path is not None and path.exists():
path.unlink()
def _annotate_episode(
self,
@@ -932,17 +1037,23 @@ class SkillAnnotator:
extracted_path = self.video_extractor.extract_episode_video(
video_path, start_ts, end_ts, target_fps=1
)
if self.add_timer_overlay:
video_for_vlm = self.video_extractor.add_timer_overlay(extracted_path)
else:
video_for_vlm = extracted_path
try:
# Run VLM skill segmentation
skills = self.vlm.segment_skills(
extracted_path, duration, coarse_goal, subtask_labels
video_for_vlm, duration, coarse_goal, subtask_labels
)
return skills
finally:
# Clean up temporary file
# Clean up temporary files (extracted and optionally timer-overlay)
if extracted_path.exists():
extracted_path.unlink()
if self.add_timer_overlay and video_for_vlm != extracted_path and video_for_vlm.exists():
video_for_vlm.unlink()
# Metadata Writer - Updates per-frame task_index based on skills
@@ -1301,13 +1412,6 @@ def main():
action="store_true",
help="Skip episodes that already have annotations",
)
parser.add_argument(
"--subtask-labels",
type=str,
nargs="+",
default=None,
help="Optional list of allowed skill labels (VLM will use only these; space-separated)",
)
# Output options
parser.add_argument(
@@ -1325,6 +1429,18 @@ def main():
type=str,
help="Repository ID for the new dataset (default: original_repo_id_with_subtasks)",
)
parser.add_argument(
"--subtask-labels",
type=str,
nargs="*",
default=None,
help="Closed vocabulary: model must choose only from these labels",
)
parser.add_argument(
"--no-timer-overlay",
action="store_true",
help="Disable timer overlay on video (by default a timer is drawn in the corner so the VLM can read timestamps from the image)",
)
args = parser.parse_args()
console = Console()
@@ -1357,7 +1473,13 @@ def main():
vlm = get_vlm(args.model, args.device, torch_dtype)
# Create annotator and run annotation
annotator = SkillAnnotator(vlm=vlm, console=console, batch_size=args.batch_size)
add_timer_overlay = not args.no_timer_overlay
annotator = SkillAnnotator(
vlm=vlm,
console=console,
batch_size=args.batch_size,
add_timer_overlay=add_timer_overlay,
)
console.print(f"[cyan]Processing with batch size: {args.batch_size}[/cyan]")
annotations = annotator.annotate_dataset(
dataset=dataset,
@@ -1393,4 +1515,4 @@ def main():
if __name__ == "__main__":
main()
main()
@@ -0,0 +1,561 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Image-window subtask annotation for LeRobot datasets using Qwen VLMs.
This script assigns a subtask to each window of consecutive frames by sending
those frames as images to the VLM (instead of a video) for better accuracy.
Supports Qwen2-VL and Qwen3-VL (same models as subtask_annotate.py).
Pipeline:
1. Load a LeRobot dataset (local or Hub).
2. For each episode, slide a window over frame indices.
3. For each window, load the corresponding images (from image_key or decoded video_key).
4. Send the window of images to Qwen2-VL with the same skill prompt; get one subtask name.
5. Assign that subtask to all frames in the window.
6. Write subtasks.parquet and add subtask_index via add_features (same as subtask_annotate).
Usage:
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--data-dir /path/to/dataset --camera-key observation.images.base \\
--window-size 8 --stride 8 --output-dir ./output
"""
from __future__ import annotations
import argparse
import random
import textwrap
from pathlib import Path
import numpy as np
import PIL.Image
import torch
from rich.console import Console
from lerobot.datasets.lerobot_dataset import LeRobotDataset
# Reuse data structures and save/load from the video-based annotator
from lerobot.data_processing.annotations.subtask_annotate import (
EpisodeSkills,
Skill,
load_skill_annotations,
save_skill_annotations,
)
def create_window_skill_prompt(
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> str:
"""Prompt for labeling a single window of frames with one atomic skill.
If subtask_labels are provided, the model must choose exactly one from that list.
"""
goal_context = f'The overall goal is: "{coarse_goal}".\n\n' if coarse_goal else ""
if subtask_labels:
labels_list = ", ".join(f'"{l}"' for l in subtask_labels)
label_instruction = (
f"You must choose exactly ONE skill from this list: [{labels_list}]. "
"Do not create new labels. Reply with only that label.\n\n"
)
else:
label_instruction = ""
return textwrap.dedent(f"""\
# Role
You are a Robotics Vision System that labels short clips from robot manipulation demonstrations.
# Task
{goal_context}{label_instruction}The following images are consecutive frames from a single short clip of a robot demonstration.
What single atomic manipulation skill is being performed in this clip?
# Requirements
- Reply with ONLY one short skill name (e.g. "pick up object", "move arm left", "release gripper").
- No explanation, no timestamps, no JSON. Just the skill name.
""").strip()
def _run_image_segmenter(
self,
images: list[PIL.Image.Image],
coarse_goal: str | None,
subtask_labels: list[str] | None = None,
) -> str:
"""Shared inference for Qwen2-VL and Qwen3-VL image window labeling."""
prompt = create_window_skill_prompt(coarse_goal, subtask_labels)
content = []
for img in images:
content.append({"type": "image", "image": img})
content.append({"type": "text", "text": "What single atomic skill is shown in these frames? Reply with only the skill name."})
messages = [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
{"role": "user", "content": content},
]
text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = self.process_vision_info(messages)
inputs = self.processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(**inputs, max_new_tokens=128, do_sample=False)
response = self.processor.batch_decode(
[out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids)],
skip_special_tokens=True,
)[0].strip()
skill_name = response.split("\n")[0].strip().strip('."')
return skill_name if skill_name else "unknown"
def _run_image_segmenter_batch(
self,
batch_images: list[list[PIL.Image.Image]],
coarse_goal: str | None,
subtask_labels: list[str] | None = None,
) -> list[str]:
"""Run VLM on multiple windows at once; returns one skill name per window."""
if not batch_images:
return []
prompt = create_window_skill_prompt(coarse_goal, subtask_labels)
all_texts = []
all_image_inputs = []
all_video_inputs = []
for images in batch_images:
content = []
for img in images:
content.append({"type": "image", "image": img})
content.append({"type": "text", "text": "What single atomic skill is shown in these frames? Reply with only the skill name."})
messages = [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
{"role": "user", "content": content},
]
text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = self.process_vision_info(messages)
all_texts.append(text)
if image_inputs is not None:
all_image_inputs.extend(image_inputs if isinstance(image_inputs, list) else [image_inputs])
if video_inputs is not None:
all_video_inputs.extend(video_inputs if isinstance(video_inputs, list) else [video_inputs])
inputs = self.processor(
text=all_texts,
images=all_image_inputs if all_image_inputs else None,
videos=all_video_inputs if all_video_inputs else None,
padding=True,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(**inputs, max_new_tokens=128, do_sample=False)
responses = self.processor.batch_decode(
[out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids)],
skip_special_tokens=True,
)
return [
(r.split("\n")[0].strip().strip('."') or "unknown")
for r in responses
]
class Qwen2VLImageSegmenter:
"""Uses Qwen2-VL to assign one skill name to a window of images (same model as subtask_annotate)."""
def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16):
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
self.console = Console()
self.device = device
self.process_vision_info = process_vision_info
self.console.print(f"[cyan]Loading Qwen2-VL for image-window labeling: {model_name}...[/cyan]")
self.model = Qwen2VLForConditionalGeneration.from_pretrained(
model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True
)
self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
self.console.print(f"[green]✓ Model loaded on {device}[/green]")
def segment_skill_from_images(
self,
images: list[PIL.Image.Image],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> str:
"""Return a single skill name for the given window of images."""
return _run_image_segmenter(self, images, coarse_goal, subtask_labels)
def segment_skill_from_images_batch(
self,
batch_images: list[list[PIL.Image.Image]],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[str]:
"""Return one skill name per window; processes multiple windows in one forward pass."""
return _run_image_segmenter_batch(self, batch_images, coarse_goal, subtask_labels)
class Qwen3VLImageSegmenter:
"""Uses Qwen3-VL (MoE) to assign one skill name to a window of images."""
def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16):
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor, Qwen3VLMoeForConditionalGeneration
self.console = Console()
self.device = device
self.process_vision_info = process_vision_info
self.console.print(f"[cyan]Loading Qwen3-VL for image-window labeling: {model_name}...[/cyan]")
self.model = Qwen3VLMoeForConditionalGeneration.from_pretrained(
model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True
)
self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
self.console.print(f"[green]✓ Model loaded on {device}[/green]")
def segment_skill_from_images(
self,
images: list[PIL.Image.Image],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> str:
"""Return a single skill name for the given window of images."""
return _run_image_segmenter(self, images, coarse_goal, subtask_labels)
def segment_skill_from_images_batch(
self,
batch_images: list[list[PIL.Image.Image]],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[str]:
"""Return one skill name per window; processes multiple windows in one forward pass."""
return _run_image_segmenter_batch(self, batch_images, coarse_goal, subtask_labels)
def get_image_segmenter(
model_name: str,
device: str = "cuda",
torch_dtype: torch.dtype = torch.bfloat16,
):
"""Return the appropriate image-window segmenter for the model (Qwen2-VL or Qwen3-VL)."""
model_lower = model_name.lower()
if "qwen3" in model_lower:
return Qwen3VLImageSegmenter(model_name, device, torch_dtype)
return Qwen2VLImageSegmenter(model_name, device, torch_dtype)
def frame_to_pil(frame_value) -> PIL.Image.Image:
"""Convert a single frame from dataset (tensor or PIL or path) to PIL.Image."""
if isinstance(frame_value, PIL.Image.Image):
return frame_value
if isinstance(frame_value, (str, Path)):
return PIL.Image.open(frame_value).convert("RGB")
if hasattr(frame_value, "numpy"):
arr = frame_value.numpy()
else:
arr = np.asarray(frame_value)
if arr.ndim == 3 and arr.shape[0] in (1, 3, 4):
arr = np.transpose(arr, (1, 2, 0))
if arr.dtype == np.float32 or arr.dtype == np.float64:
arr = (np.clip(arr, 0, 1) * 255).astype(np.uint8)
elif arr.dtype != np.uint8:
arr = np.clip(arr, 0, 255).astype(np.uint8)
if arr.shape[-1] == 1:
arr = np.repeat(arr, 3, axis=-1)
return PIL.Image.fromarray(arr)
def _sample_window_indices(window_length: int, max_frames: int) -> list[int]:
"""Return indices into a window of length window_length, at most max_frames, in order.
If window_length <= max_frames, returns range(window_length).
Otherwise returns sorted random sample of max_frames indices (temporal order preserved).
"""
if max_frames <= 0 or window_length <= max_frames:
return list(range(window_length))
return sorted(random.sample(range(window_length), max_frames))
class SkillAnnotatorImage:
"""Annotates episodes by sliding a window over frames and labeling each window with the VLM."""
def __init__(
self,
segmenter: Qwen2VLImageSegmenter | Qwen3VLImageSegmenter,
window_size: int = 8,
stride: int | None = None,
batch_size: int = 1,
max_frames_per_window: int | None = None,
console: Console | None = None,
):
self.segmenter = segmenter
self.window_size = window_size
self.stride = stride if stride is not None else window_size
self.batch_size = max(1, batch_size)
self.max_frames_per_window = max_frames_per_window
self.console = console or Console()
def annotate_dataset(
self,
dataset: LeRobotDataset,
camera_key: str,
episodes: list[int] | None = None,
skip_existing: bool = False,
subtask_labels: list[str] | None = None,
) -> dict[int, EpisodeSkills]:
"""Annotate episodes using image windows. camera_key can be an image_key or video_key."""
episode_indices = episodes or list(range(dataset.meta.total_episodes))
coarse_goal = self._get_coarse_goal(dataset)
annotations: dict[int, EpisodeSkills] = {}
if skip_existing:
existing = load_skill_annotations(dataset.root)
if existing and existing.get("episodes"):
existing_eps = {int(k) for k in existing["episodes"] if existing["episodes"][k].get("skills")}
episode_indices = [i for i in episode_indices if i not in existing_eps]
for ep_idx in episode_indices:
try:
skills = self._annotate_episode(
dataset, ep_idx, camera_key, coarse_goal, subtask_labels
)
if skills:
annotations[ep_idx] = EpisodeSkills(
episode_index=ep_idx,
description=coarse_goal,
skills=skills,
)
self.console.print(f"[green]✓ Episode {ep_idx}: {len(skills)} window skills[/green]")
else:
self.console.print(f"[yellow]⚠ Episode {ep_idx}: no skills[/yellow]")
except Exception as e:
self.console.print(f"[red]Episode {ep_idx} failed: {e}[/red]")
return annotations
def _get_coarse_goal(self, dataset: LeRobotDataset) -> str:
if dataset.meta.tasks is not None and len(dataset.meta.tasks) > 0:
return str(dataset.meta.tasks.index[0])
return "Perform the demonstrated manipulation task."
def _annotate_episode(
self,
dataset: LeRobotDataset,
episode_index: int,
camera_key: str,
coarse_goal: str,
subtask_labels: list[str] | None = None,
) -> list[Skill]:
ep = dataset.meta.episodes[episode_index]
ep_from = int(ep["dataset_from_index"])
ep_to = int(ep["dataset_to_index"])
length = ep_to - ep_from
fps = dataset.meta.fps
if length == 0:
return []
# Collect full windows: (images, t_start, t_end) using frame timestamps.
# If max_frames_per_window is set and window is larger, sample that many frames (order preserved).
window_specs: list[tuple[list[PIL.Image.Image], float, float]] = []
start = 0
while start + self.window_size <= length:
offsets = _sample_window_indices(
self.window_size,
self.max_frames_per_window or self.window_size,
)
frame_indices = [ep_from + start + i for i in offsets]
images = []
t_start = float(dataset[frame_indices[0]]["timestamp"].item())
for idx in frame_indices:
item = dataset[idx]
images.append(frame_to_pil(item[camera_key]))
t_end = t_start + self.window_size / fps
window_specs.append((images, t_start, t_end))
start += self.stride
# Last partial window
if start < length:
partial_len = ep_to - (ep_from + start)
offsets = _sample_window_indices(
partial_len,
self.max_frames_per_window or partial_len,
)
frame_indices = [ep_from + start + i for i in offsets]
images = []
t_start = float(dataset[frame_indices[0]]["timestamp"].item())
for idx in frame_indices:
item = dataset[idx]
images.append(frame_to_pil(item[camera_key]))
t_end = float(dataset[frame_indices[-1]]["timestamp"].item()) + 1.0 / fps
window_specs.append((images, t_start, t_end))
# Run in batches
skills: list[Skill] = []
for i in range(0, len(window_specs), self.batch_size):
chunk = window_specs[i : i + self.batch_size]
batch_images = [spec[0] for spec in chunk]
if len(batch_images) > 1:
skill_names = self.segmenter.segment_skill_from_images_batch(
batch_images, coarse_goal, subtask_labels
)
else:
skill_names = [
self.segmenter.segment_skill_from_images(
batch_images[0], coarse_goal, subtask_labels
)
]
for (_, t_start, t_end), name in zip(chunk, skill_names, strict=True):
skills.append(Skill(name=name, start=t_start, end=t_end))
return skills
def main():
parser = argparse.ArgumentParser(
description="Image-window subtask annotation using Qwen VLM (frames as images for better accuracy)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent("""\
Examples:
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--data-dir /path/to/dataset --camera-key observation.images.base \\
--window-size 8 --output-dir ./output
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--repo-id user/dataset --camera-key observation.images.base \\
--window-size 6 --stride 3 --model Qwen/Qwen2-VL-7B-Instruct
# Use Qwen3-VL (MoE)
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--data-dir /path/to/dataset --camera-key observation.images.base \\
--model Qwen/Qwen3-VL-30B-A3B-Instruct
"""),
)
data_group = parser.add_mutually_exclusive_group(required=True)
data_group.add_argument("--data-dir", type=str, help="Path to local LeRobot dataset")
data_group.add_argument("--repo-id", type=str, help="HuggingFace Hub dataset repository ID")
parser.add_argument(
"--camera-key",
type=str,
required=True,
help="Image or video observation key (e.g. observation.images.base)",
)
parser.add_argument(
"--model",
type=str,
default="Qwen/Qwen2-VL-7B-Instruct",
help="VLM model: Qwen2-VL or Qwen3-VL (default: Qwen/Qwen2-VL-7B-Instruct)",
)
parser.add_argument(
"--device",
type=str,
default="cuda",
)
parser.add_argument(
"--window-size",
type=int,
default=8,
help="Number of frames per window (default: 8)",
)
parser.add_argument(
"--stride",
type=int,
default=None,
help="Stride for sliding window (default: window_size = non-overlapping)",
)
parser.add_argument(
"--batch-size",
type=int,
default=1,
help="Number of windows to process in one VLM call (default: 1; increase for speed)",
)
parser.add_argument(
"--max-frames-per-window",
type=int,
default=None,
metavar="N",
help="If window has more than N frames, randomly sample N frames (order kept) to avoid OOM (e.g. 16)",
)
parser.add_argument("--episodes", type=int, nargs="+", help="Episode indices to annotate (default: all)")
parser.add_argument("--skip-existing", action="store_true", help="Skip episodes that already have annotations")
parser.add_argument(
"--subtask-labels",
type=str,
nargs="*",
default=None,
help="Closed vocabulary: model must choose only from these labels",
)
parser.add_argument("--output-dir", type=str, help="Output directory for dataset with subtask_index")
parser.add_argument("--output-repo-id", type=str, help="Output repo id (default: <repo_id>_with_subtasks)")
parser.add_argument("--push-to-hub", action="store_true")
args = parser.parse_args()
console = Console()
# Load dataset
console.print("[cyan]Loading dataset...[/cyan]")
if args.data_dir:
dataset = LeRobotDataset(repo_id="local/dataset", root=args.data_dir, download_videos=False)
else:
dataset = LeRobotDataset(repo_id=args.repo_id, download_videos=True)
camera_keys = dataset.meta.camera_keys
if args.camera_key not in camera_keys:
console.print(f"[red]Error: camera key '{args.camera_key}' not in {camera_keys}[/red]")
return
console.print(f"[green]✓ Loaded dataset, {dataset.meta.total_episodes} episodes[/green]")
# Same Qwen VLM as subtask_annotate (Qwen2-VL or Qwen3-VL), image windows instead of video
segmenter = get_image_segmenter(args.model, args.device, torch.bfloat16)
annotator = SkillAnnotatorImage(
segmenter=segmenter,
window_size=args.window_size,
stride=args.stride,
batch_size=args.batch_size,
max_frames_per_window=args.max_frames_per_window,
console=console,
)
annotations = annotator.annotate_dataset(
dataset=dataset,
camera_key=args.camera_key,
episodes=args.episodes,
skip_existing=args.skip_existing,
subtask_labels=args.subtask_labels,
)
if not annotations:
console.print("[yellow]No annotations to save.[/yellow]")
return
output_dir = Path(args.output_dir) if args.output_dir else None
output_repo_id = args.output_repo_id
new_dataset = save_skill_annotations(dataset, annotations, output_dir, output_repo_id)
total_skills = sum(len(a.skills) for a in annotations.values())
console.print(f"[bold green]✓ Done.[/bold green] Episodes: {len(annotations)}, total window skills: {total_skills}")
console.print(f" Dataset with subtask_index: {new_dataset.root}")
if args.push_to_hub and not args.data_dir:
console.print("[cyan]Pushing to Hub...[/cyan]")
try:
new_dataset.push_to_hub(push_videos=False)
console.print("[green]✓ Pushed.[/green]")
except Exception as e:
console.print(f"[red]Push failed: {e}[/red]")
if __name__ == "__main__":
main()
+81 -9
View File
@@ -95,34 +95,106 @@ LIBERO_KEY_PIXELS_EYE_IN_HAND = "pixels/robot0_eye_in_hand_image"
# Skill segmentation prompt template for VLM-based subtask annotation
# Placeholders: {goal_context}, {subtask_labels_section}
# When subtask_labels are provided, use format_subtask_labels_section() to fill {subtask_labels_section}.
def format_subtask_labels_section(subtask_labels: list[str]) -> str:
"""Format a list of subtask labels for insertion into SKILL_SEGMENTATION_PROMPT_TEMPLATE.
The model will be instructed to choose only from these labels.
"""
if not subtask_labels:
return ""
return "\n".join(f' "{label}",' for label in subtask_labels).rstrip(",")
SKILL_SEGMENTATION_PROMPT_TEMPLATE = """# Role
You are a Robotics Vision System specializing in temporal action segmentation for robot manipulation demonstrations.
# Video duration (critical)
The total video length is **{video_duration_seconds} seconds** ({video_duration_mm_ss}). All "start" and "end" values in your JSON must be numeric seconds in the range [0.0, {video_duration_seconds}]. The last skill's "end" must be exactly **{video_duration_seconds}**. Do not stop earlier.
# Task
{goal_context}Segment this robot demonstration video into short atomic manipulation skills. Each skill should:
- Last approximately 1-3 seconds
- Last approximately 1-3 seconds (or longer if the action takes longer)
- Describe a clear, single action (e.g., "pick up object", "move arm left", "release gripper")
- Have precise start and end timestamps
- Have precise start and end timestamps in seconds (float)
# Requirements
1. **Atomic Actions**: Each skill should be a single, indivisible action
2. **Complete Coverage**: Skills must cover the entire video duration with no gaps
2. **Complete Coverage**: Skills must cover the entire video from 0.0 to {video_duration_seconds} seconds with no gaps
3. **Boundary Consistency**: The end of one skill equals the start of the next
4. **Natural Language**: Use clear, descriptive names for each skill
5. **Timestamps**: Use seconds (float) for all timestamps
{subtask_labels_section}
5. **Timestamps**: Use seconds as floats (e.g. 12.5) for all timestamps; the last "end" must be {video_duration_seconds}. If the video has a visible timer in the corner showing elapsed time in seconds, use it to report accurate start and end times for each skill.
# Subtask Label Set (Closed Vocabulary)
You MUST strictly identify the video segments using ONLY the following labels. Do not create new labels or modify existing ones:
[
{subtask_labels_section}
]
The video shows one successful execution of all subtasks in a logical order.
# Ground-Truth Semantics (Very Important)
Use **visual state changes** to define when a subtask starts and ends. Do NOT assume equal durations for the subtasks.
- A subtask **starts** at the first frame where the robot's motion clearly initiates that subtask.
- A subtask **ends** at the first frame where that specific action is visually completed and the manipulated object reaches a temporary, stable configuration.
If there are short pauses or micro-motions that don't clearly correspond to a new subtask, they belong to the **current** subtask.
# Hard Constraints & Logic
1. **Continuous Coverage (No Gaps):**
- The entire video from 0.0 to {video_duration_seconds} seconds must be covered by subtasks.
- There can be no gaps between subtasks.
- If there is any idle or ambiguous time between clear actions, extend the *preceding* subtask to cover it.
2. **Boundary Consistency:**
- The `"end"` timestamp of one subtask must be exactly equal to the `"start"` timestamp of the next subtask.
- Boundaries must coincide with a real visual state transition, not just a convenient time split.
3. **Chronological Order, One Occurrence Each:**
- This is a single successful demonstration.
- Each subtask from the vocabulary appears **exactly once**, in the correct logical order.
- **Durations may be very different** between subtasks. Never assume they are similar lengths. Base all boundaries only on the video.
4. **Reject Uniform Segmentation (Important):**
- Do NOT simply divide the video into equal or nearly equal time chunks.
- If your boundaries would result in subtasks with similar durations (e.g. all around 5 seconds), treat this as evidence that your segmentation is wrong and refine the boundaries.
- Only use nearly equal durations if the video truly shows each subtask taking the same amount of time (this is very rare).
5. **Timestamps (critical):**
- Use numeric seconds (float) in the JSON, e.g. 0.0, 5.2, 12.8.
- The first subtask always starts at 0.0.
- The last subtask must end at exactly {video_duration_seconds} (the full video length).
- **Time is displayed inside the video**: a visible timer in one corner shows the elapsed time in seconds (from 0.0 to the end). Use this on-screen timer to set accurate start and end times for each skill.
# Step 1 — Textual Timeline (must do this first)
First, write a extensive and detailed textual timeline describing what happens in the video with approximate timestamps. **Read the time from the visible timer shown in the video** to get accurate timestamps.
For each subtask, include:
- its name
- an approximate start and end time (from the on-screen timer),
- an description of the visual event at the boundary (e.g. "shirt fully folded to the left", "robot rotates folded shirt 90 degrees").
Format this as a bullet list.
# Output Format
After your analysis, output ONLY valid JSON with this exact structure:
After your analysis, output ONLY valid JSON with this exact structure. The last skill's "end" MUST be exactly {video_duration_seconds}. Use the timestamps you read from the visible timer in the video:
```json
{{
"skills": [
{{"name": "skill description", "start": 0.0, "end": 1.5}},
{{"name": "another skill", "start": 1.5, "end": 3.2}}
{{"name": "first skill", "start": 0.0, "end": 5.0}},
{{"name": "second skill", "start": 5.0, "end": 12.0}},
{{"name": "last skill", "start": 12.0, "end": {video_duration_seconds}}}
]
}}
```
The first skill must start at 0.0 and the last skill must end at the video duration.
The first skill must start at 0.0 and the last skill must end at **{video_duration_seconds}** (the total video duration in seconds).
# Strict Structural Rule
This video contains exactly 4 subtasks.
You MUST output exactly 4 segments.
Each segment must use a unique label from the vocabulary.
No label may be repeated.
"""