From 51b3b31927e7fefa995c26a80e28dde5484169aa Mon Sep 17 00:00:00 2001 From: Jade Choghari Date: Thu, 12 Feb 2026 12:46:45 +0000 Subject: [PATCH] more annotation changes --- .../data_processing/annotations/run_pgen.sh | 38 +- .../annotations/subtask_annotate.py | 226 +++++-- .../annotations/subtask_annotate_image.py | 561 ++++++++++++++++++ src/lerobot/utils/constants.py | 90 ++- 4 files changed, 847 insertions(+), 68 deletions(-) create mode 100644 src/lerobot/data_processing/annotations/subtask_annotate_image.py diff --git a/src/lerobot/data_processing/annotations/run_pgen.sh b/src/lerobot/data_processing/annotations/run_pgen.sh index 7250d8bf3..4f2d676ca 100644 --- a/src/lerobot/data_processing/annotations/run_pgen.sh +++ b/src/lerobot/data_processing/annotations/run_pgen.sh @@ -4,23 +4,47 @@ # This generates user prompts and robot utterances for hierarchical policy training # Configuration -REPO_ID="jadechoghari/collect-data" -MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct" +REPO_ID="jadechoghari/piper-demo-20260205_103303" +# MODEL="Qwen/Qwen3-VL-30B-A3B-Thinking" +MODEL="Qwen/Qwen2-VL-7B-Instruct" # or: MODEL="Qwen/Qwen2-VL-7B-Instruct" OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen_new" -BATCH_SIZE=32 +BATCH_SIZE=2 TEMPERATURE=0.9 SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed) -# Run subtask annotation -python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \ +# Run subtask annotation. +# To use closed-vocabulary labels, add a line: --subtask-labels "label1" "label2" ... +# Example (add backslash after "$MODEL" and uncomment the next line): +# --model "$MODEL" \ +# --subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can" +python /admin/home/jade_choghari/lerobot/src/lerobot/data_processing/annotations/subtask_annotate.py \ --repo-id "$REPO_ID" \ - --video-key observation.images.base \ + --video-key observation.images.top \ --output-dir "$OUTPUT_DIR" \ - --output-repo-id "jadechoghari/collect-data-with-subtasks" + --output-repo-id "jadechoghari/piper-demo-annotated1" \ + --push-to-hub \ + --model "$MODEL" \ + --subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can" \ + --batch-size 2 + +# Run subtask annotation (image-window: frames as images for better accuracy) +# python /admin/home/jade_choghari/lerobot/src/lerobot/data_processing/annotations/subtask_annotate_image.py \ +# --repo-id "$REPO_ID" \ +# --camera-key observation.images.wrist \ +# --output-dir "$OUTPUT_DIR" \ +# --output-repo-id "jadechoghari/piper-demo-annotated1-image" \ +# --push-to-hub \ +# --model "$MODEL" \ +# --window-size 184 \ +# --max-frames-per-window 16 \ +# --subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can" \ +# --batch-size 2 + + # run synthetic data generation (all episodes processed) # python examples/dataset/annotate_pgen.py \ # --repo-id "$REPO_ID" \ diff --git a/src/lerobot/data_processing/annotations/subtask_annotate.py b/src/lerobot/data_processing/annotations/subtask_annotate.py index f296cfea1..71ec49941 100644 --- a/src/lerobot/data_processing/annotations/subtask_annotate.py +++ b/src/lerobot/data_processing/annotations/subtask_annotate.py @@ -81,7 +81,10 @@ from rich.progress import Progress, SpinnerColumn, TextColumn from lerobot.datasets.dataset_tools import add_features from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.utils.constants import SKILL_SEGMENTATION_PROMPT_TEMPLATE +from lerobot.utils.constants import ( + SKILL_SEGMENTATION_PROMPT_TEMPLATE, + format_subtask_labels_section, +) # Skill Annotation Data Structures @@ -155,7 +158,7 @@ class BaseVLM(ABC): video_path: Path to the video file episode_duration: Total duration of the episode in seconds coarse_goal: Optional high-level task description - subtask_labels: Optional list of allowed skill labels to use + subtask_labels: If provided, model must choose only from these labels (closed vocabulary) Returns: List of Skill objects representing atomic manipulation skills @@ -177,7 +180,6 @@ class BaseVLM(ABC): video_paths: List of paths to video files episode_durations: List of episode durations in seconds coarse_goal: Optional high-level task description - subtask_labels: Optional list of allowed skill labels to use Returns: List of skill lists, one for each video @@ -188,33 +190,65 @@ class BaseVLM(ABC): def create_skill_segmentation_prompt( coarse_goal: str | None = None, subtask_labels: list[str] | None = None, + duration_seconds: float | None = None, ) -> str: """Create the prompt for skill segmentation. - - Args: - coarse_goal: Optional high-level task description. - subtask_labels: Optional list of allowed skill/subtask labels. When provided, - the model is instructed to use only these labels (choosing the best match - for each segment). - - Returns: - The formatted prompt string. + If subtask_labels is provided, uses closed-vocabulary template from constants. + duration_seconds is required when using subtask_labels so the model knows the full video length. """ goal_context = f'The overall goal is: "{coarse_goal}"\n\n' if coarse_goal else "" + if subtask_labels: - labels_str = ", ".join(f'"{label}"' for label in subtask_labels) - subtask_labels_section = ( - f'6. **Allowed labels**: Use ONLY the following skill names ' - f"(choose the best match for each segment): {labels_str}\n\n" - ) - else: - subtask_labels_section = "" - return textwrap.dedent( - SKILL_SEGMENTATION_PROMPT_TEMPLATE.format( + if duration_seconds is None: + raise ValueError("duration_seconds is required when using subtask_labels (closed vocabulary)") + subtask_labels_section = format_subtask_labels_section(subtask_labels) + video_duration_mm_ss = f"{int(duration_seconds // 60):02d}:{int(duration_seconds % 60):02d}" + return SKILL_SEGMENTATION_PROMPT_TEMPLATE.format( goal_context=goal_context, subtask_labels_section=subtask_labels_section, + video_duration_seconds=duration_seconds, + video_duration_mm_ss=video_duration_mm_ss, ) + + duration_note = ( + f"\nThe total video duration is **{duration_seconds} seconds**. " + "All start/end values must be in seconds (float). The last skill's \"end\" must be exactly this duration.\n\n" + if duration_seconds is not None + else "" ) + return textwrap.dedent(f"""\ + # Role + You are a Robotics Vision System specializing in temporal action segmentation for robot manipulation demonstrations. + + # Task + {duration_note}{goal_context}Segment this robot demonstration video into short atomic manipulation skills. Each skill should: + - Last approximately 1-3 seconds + - Describe a clear, single action (e.g., "pick up object", "move arm left", "release gripper") + - Have precise start and end timestamps + + # Requirements + 1. **Atomic Actions**: Each skill should be a single, indivisible action + 2. **Complete Coverage**: Skills must cover the entire video duration with no gaps + 3. **Boundary Consistency**: The end of one skill equals the start of the next + 4. **Natural Language**: Use clear, descriptive names for each skill + 5. **Timestamps**: Use seconds (float) for all timestamps. If the video has a visible timer in the corner showing elapsed time in seconds, use it to report accurate start and end times for each skill. + + + + # Output Format + After your analysis, output ONLY valid JSON with this exact structure: + + ```json + {{ + "skills": [ + {{"name": "skill description", "start": 0.0, "end": 1.5}}, + {{"name": "another skill", "start": 1.5, "end": 3.2}} + ] + }} + ``` + + The first skill must start at 0.0 and the last skill must end at the video duration. + """) # Qwen2-VL Implementation @@ -249,7 +283,9 @@ class Qwen2VL(BaseVLM): subtask_labels: list[str] | None = None, ) -> list[Skill]: """Segment video into skills using Qwen2-VL.""" - prompt = create_skill_segmentation_prompt(coarse_goal, subtask_labels) + prompt = create_skill_segmentation_prompt( + coarse_goal, subtask_labels, duration_seconds=episode_duration + ) duration_str = f"{int(episode_duration // 60):02d}:{int(episode_duration % 60):02d}" messages = [ @@ -260,7 +296,7 @@ class Qwen2VL(BaseVLM): {"type": "video", "video": str(video_path), "fps": 1.0}, { "type": "text", - "text": f"Video duration: {duration_str} (~{episode_duration:.1f}s). Segment into atomic skills.", + "text": f"Video duration: {duration_str} (exactly {episode_duration:.1f} seconds). Segment into atomic skills. Last skill must end at {episode_duration:.1f}.", }, ], }, @@ -294,11 +330,12 @@ class Qwen2VL(BaseVLM): subtask_labels: list[str] | None = None, ) -> list[list[Skill]]: """Segment multiple videos into skills using Qwen2-VL in a batch.""" - prompt = create_skill_segmentation_prompt(coarse_goal, subtask_labels) - - # Create messages for each video + # Create messages for each video (prompt includes duration so each gets correct length) all_messages = [] for video_path, duration in zip(video_paths, episode_durations): + prompt = create_skill_segmentation_prompt( + coarse_goal, subtask_labels, duration_seconds=duration + ) duration_str = f"{int(duration // 60):02d}:{int(duration % 60):02d}" messages = [ {"role": "system", "content": [{"type": "text", "text": prompt}]}, @@ -308,7 +345,7 @@ class Qwen2VL(BaseVLM): {"type": "video", "video": str(video_path), "fps": 1.0}, { "type": "text", - "text": f"Video duration: {duration_str} (~{duration:.1f}s). Segment into atomic skills.", + "text": f"Video duration: {duration_str} (exactly {duration:.1f} seconds). Segment into atomic skills. Last skill must end at {duration:.1f}.", }, ], }, @@ -413,9 +450,10 @@ class Qwen3VL(BaseVLM): subtask_labels: list[str] | None = None, ) -> list[Skill]: """Segment video into skills using Qwen3-VL.""" - prompt = create_skill_segmentation_prompt(coarse_goal, subtask_labels) + prompt = create_skill_segmentation_prompt( + coarse_goal, subtask_labels, duration_seconds=episode_duration + ) duration_str = f"{int(episode_duration // 60):02d}:{int(episode_duration % 60):02d}" - messages = [ {"role": "system", "content": [{"type": "text", "text": prompt}]}, { @@ -424,7 +462,7 @@ class Qwen3VL(BaseVLM): {"type": "video", "video": str(video_path), "fps": 1.0}, { "type": "text", - "text": f"Video duration: {duration_str} (~{episode_duration:.1f}s). Segment into atomic skills.", + "text": f"Video duration: {duration_str} (exactly {episode_duration:.1f} seconds). Segment into atomic skills. Last skill must end at {episode_duration:.1f}.", }, ], }, @@ -439,7 +477,6 @@ class Qwen3VL(BaseVLM): padding=True, return_tensors="pt", ).to(self.device) - with torch.no_grad(): generated_ids = self.model.generate(**inputs, max_new_tokens=1024, do_sample=True, temperature=0.7) @@ -458,11 +495,12 @@ class Qwen3VL(BaseVLM): subtask_labels: list[str] | None = None, ) -> list[list[Skill]]: """Segment multiple videos into skills using Qwen3-VL in a batch.""" - prompt = create_skill_segmentation_prompt(coarse_goal, subtask_labels) - - # Create messages for each video + # Create messages for each video (prompt includes duration so each gets correct length) all_messages = [] for video_path, duration in zip(video_paths, episode_durations): + prompt = create_skill_segmentation_prompt( + coarse_goal, subtask_labels, duration_seconds=duration + ) duration_str = f"{int(duration // 60):02d}:{int(duration % 60):02d}" messages = [ {"role": "system", "content": [{"type": "text", "text": prompt}]}, @@ -472,7 +510,7 @@ class Qwen3VL(BaseVLM): {"type": "video", "video": str(video_path), "fps": 1.0}, { "type": "text", - "text": f"Video duration: {duration_str} (~{duration:.1f}s). Segment into atomic skills.", + "text": f"Video duration: {duration_str} (exactly {duration:.1f} seconds). Segment into atomic skills. Last skill must end at {duration:.1f}.", }, ], }, @@ -660,6 +698,58 @@ class VideoExtractor: return tmp_path + def add_timer_overlay(self, video_path: Path) -> Path: + """ + Add a visible timer overlay to each frame (elapsed time in seconds) in one corner. + Used so the VLM can read the timestamp from the image instead of relying on file metadata. + Writes to a new temporary file and returns its path. + """ + out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) + out_path = Path(out_file.name) + out_file.close() + + cap = cv2.VideoCapture(str(video_path)) + fps = cap.get(cv2.CAP_PROP_FPS) or 1.0 + w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + writer = cv2.VideoWriter(str(out_path), fourcc, fps, (w, h)) + + # Font scale so timer is big and readable (proportional to frame size) + font_scale = max(1.2, min(h, w) / 350.0) + thickness = max(2, int(font_scale)) + font = cv2.FONT_HERSHEY_SIMPLEX + + frame_idx = 0 + while True: + ret, frame = cap.read() + if not ret: + break + t_sec = frame_idx / fps + text = f"{t_sec:.1f} s" + # Position: top-right corner with margin + (tw, th), _ = cv2.getTextSize(text, font, font_scale, thickness) + x = w - tw - 30 + y = 25 + th + # Black outline for readability on any background (draw in 8 directions then center) + for dx in (-2, -1, 0, 1, 2): + for dy in (-2, -1, 0, 1, 2): + if dx != 0 or dy != 0: + cv2.putText( + frame, text, (x + dx, y + dy), font, font_scale, (0, 0, 0), thickness + ) + cv2.putText(frame, text, (x, y), font, font_scale, (255, 255, 255), thickness) + writer.write(frame) + frame_idx += 1 + + cap.release() + writer.release() + if not out_path.exists() or out_path.stat().st_size < 1024: + if out_path.exists(): + out_path.unlink() + raise RuntimeError("Timer overlay produced invalid file") + return out_path + def get_video_duration(self, video_path: Path) -> float: """Get duration of a video file in seconds.""" cap = cv2.VideoCapture(str(video_path)) @@ -687,11 +777,13 @@ class SkillAnnotator: video_extractor: VideoExtractor | None = None, console: Console | None = None, batch_size: int = 8, + add_timer_overlay: bool = True, ): self.vlm = vlm self.console = console or Console() self.video_extractor = video_extractor or VideoExtractor(self.console) self.batch_size = batch_size + self.add_timer_overlay = add_timer_overlay def annotate_dataset( self, @@ -709,7 +801,7 @@ class SkillAnnotator: video_key: Key for video observations (e.g., "observation.images.base") episodes: Specific episode indices to annotate (None = all) skip_existing: Skip episodes that already have skill annotations - subtask_labels: Optional list of allowed skill labels (VLM will use only these) + subtask_labels: If provided, model must choose only from these labels (closed vocabulary) Returns: Dictionary mapping episode index to EpisodeSkills @@ -854,6 +946,8 @@ class SkillAnnotator: """Annotate multiple episodes with skill labels in a batch.""" # Extract all videos for this batch extracted_paths = [] + timer_paths = [] + paths_for_vlm = [] durations = [] valid_episode_indices = [] @@ -876,8 +970,16 @@ class SkillAnnotator: extracted_path = self.video_extractor.extract_episode_video( video_path, start_ts, end_ts, target_fps=1 ) - - extracted_paths.append(extracted_path) + if self.add_timer_overlay: + video_for_vlm = self.video_extractor.add_timer_overlay(extracted_path) + extracted_paths.append(extracted_path) + timer_paths.append(video_for_vlm) + else: + video_for_vlm = extracted_path + extracted_paths.append(extracted_path) + timer_paths.append(None) + + paths_for_vlm.append(video_for_vlm) durations.append(duration) valid_episode_indices.append(ep_idx) @@ -885,13 +987,13 @@ class SkillAnnotator: self.console.print(f"[yellow]Warning: Failed to extract video for episode {ep_idx}: {e}[/yellow]") continue - if not extracted_paths: + if not paths_for_vlm: return {} try: # Run VLM skill segmentation in batch all_skills = self.vlm.segment_skills_batch( - extracted_paths, durations, coarse_goal, subtask_labels + paths_for_vlm, durations, coarse_goal, subtask_labels ) # Map results back to episode indices @@ -902,10 +1004,13 @@ class SkillAnnotator: return results finally: - # Clean up all temporary files + # Clean up all temporary files (extracted and timer-overlay) for path in extracted_paths: if path.exists(): path.unlink() + for path in timer_paths: + if path is not None and path.exists(): + path.unlink() def _annotate_episode( self, @@ -932,17 +1037,23 @@ class SkillAnnotator: extracted_path = self.video_extractor.extract_episode_video( video_path, start_ts, end_ts, target_fps=1 ) + if self.add_timer_overlay: + video_for_vlm = self.video_extractor.add_timer_overlay(extracted_path) + else: + video_for_vlm = extracted_path try: # Run VLM skill segmentation skills = self.vlm.segment_skills( - extracted_path, duration, coarse_goal, subtask_labels + video_for_vlm, duration, coarse_goal, subtask_labels ) return skills finally: - # Clean up temporary file + # Clean up temporary files (extracted and optionally timer-overlay) if extracted_path.exists(): extracted_path.unlink() + if self.add_timer_overlay and video_for_vlm != extracted_path and video_for_vlm.exists(): + video_for_vlm.unlink() # Metadata Writer - Updates per-frame task_index based on skills @@ -1301,13 +1412,6 @@ def main(): action="store_true", help="Skip episodes that already have annotations", ) - parser.add_argument( - "--subtask-labels", - type=str, - nargs="+", - default=None, - help="Optional list of allowed skill labels (VLM will use only these; space-separated)", - ) # Output options parser.add_argument( @@ -1325,6 +1429,18 @@ def main(): type=str, help="Repository ID for the new dataset (default: original_repo_id_with_subtasks)", ) + parser.add_argument( + "--subtask-labels", + type=str, + nargs="*", + default=None, + help="Closed vocabulary: model must choose only from these labels", + ) + parser.add_argument( + "--no-timer-overlay", + action="store_true", + help="Disable timer overlay on video (by default a timer is drawn in the corner so the VLM can read timestamps from the image)", + ) args = parser.parse_args() console = Console() @@ -1357,7 +1473,13 @@ def main(): vlm = get_vlm(args.model, args.device, torch_dtype) # Create annotator and run annotation - annotator = SkillAnnotator(vlm=vlm, console=console, batch_size=args.batch_size) + add_timer_overlay = not args.no_timer_overlay + annotator = SkillAnnotator( + vlm=vlm, + console=console, + batch_size=args.batch_size, + add_timer_overlay=add_timer_overlay, + ) console.print(f"[cyan]Processing with batch size: {args.batch_size}[/cyan]") annotations = annotator.annotate_dataset( dataset=dataset, @@ -1393,4 +1515,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/src/lerobot/data_processing/annotations/subtask_annotate_image.py b/src/lerobot/data_processing/annotations/subtask_annotate_image.py new file mode 100644 index 000000000..d6f871764 --- /dev/null +++ b/src/lerobot/data_processing/annotations/subtask_annotate_image.py @@ -0,0 +1,561 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Image-window subtask annotation for LeRobot datasets using Qwen VLMs. + +This script assigns a subtask to each window of consecutive frames by sending +those frames as images to the VLM (instead of a video) for better accuracy. +Supports Qwen2-VL and Qwen3-VL (same models as subtask_annotate.py). + +Pipeline: +1. Load a LeRobot dataset (local or Hub). +2. For each episode, slide a window over frame indices. +3. For each window, load the corresponding images (from image_key or decoded video_key). +4. Send the window of images to Qwen2-VL with the same skill prompt; get one subtask name. +5. Assign that subtask to all frames in the window. +6. Write subtasks.parquet and add subtask_index via add_features (same as subtask_annotate). + +Usage: + python -m lerobot.data_processing.annotations.subtask_annotate_image \\ + --data-dir /path/to/dataset --camera-key observation.images.base \\ + --window-size 8 --stride 8 --output-dir ./output +""" + +from __future__ import annotations + +import argparse +import random +import textwrap +from pathlib import Path + +import numpy as np +import PIL.Image +import torch +from rich.console import Console + +from lerobot.datasets.lerobot_dataset import LeRobotDataset + +# Reuse data structures and save/load from the video-based annotator +from lerobot.data_processing.annotations.subtask_annotate import ( + EpisodeSkills, + Skill, + load_skill_annotations, + save_skill_annotations, +) + + +def create_window_skill_prompt( + coarse_goal: str | None = None, + subtask_labels: list[str] | None = None, +) -> str: + """Prompt for labeling a single window of frames with one atomic skill. + If subtask_labels are provided, the model must choose exactly one from that list. + """ + goal_context = f'The overall goal is: "{coarse_goal}".\n\n' if coarse_goal else "" + if subtask_labels: + labels_list = ", ".join(f'"{l}"' for l in subtask_labels) + label_instruction = ( + f"You must choose exactly ONE skill from this list: [{labels_list}]. " + "Do not create new labels. Reply with only that label.\n\n" + ) + else: + label_instruction = "" + return textwrap.dedent(f"""\ + # Role + You are a Robotics Vision System that labels short clips from robot manipulation demonstrations. + + # Task + {goal_context}{label_instruction}The following images are consecutive frames from a single short clip of a robot demonstration. + What single atomic manipulation skill is being performed in this clip? + + # Requirements + - Reply with ONLY one short skill name (e.g. "pick up object", "move arm left", "release gripper"). + - No explanation, no timestamps, no JSON. Just the skill name. + """).strip() + + +def _run_image_segmenter( + self, + images: list[PIL.Image.Image], + coarse_goal: str | None, + subtask_labels: list[str] | None = None, +) -> str: + """Shared inference for Qwen2-VL and Qwen3-VL image window labeling.""" + prompt = create_window_skill_prompt(coarse_goal, subtask_labels) + content = [] + for img in images: + content.append({"type": "image", "image": img}) + content.append({"type": "text", "text": "What single atomic skill is shown in these frames? Reply with only the skill name."}) + + messages = [ + {"role": "system", "content": [{"type": "text", "text": prompt}]}, + {"role": "user", "content": content}, + ] + text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) + image_inputs, video_inputs = self.process_vision_info(messages) + inputs = self.processor( + text=[text], + images=image_inputs, + videos=video_inputs, + padding=True, + return_tensors="pt", + ).to(self.device) + + with torch.no_grad(): + generated_ids = self.model.generate(**inputs, max_new_tokens=128, do_sample=False) + + response = self.processor.batch_decode( + [out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids)], + skip_special_tokens=True, + )[0].strip() + skill_name = response.split("\n")[0].strip().strip('."') + return skill_name if skill_name else "unknown" + + +def _run_image_segmenter_batch( + self, + batch_images: list[list[PIL.Image.Image]], + coarse_goal: str | None, + subtask_labels: list[str] | None = None, +) -> list[str]: + """Run VLM on multiple windows at once; returns one skill name per window.""" + if not batch_images: + return [] + prompt = create_window_skill_prompt(coarse_goal, subtask_labels) + all_texts = [] + all_image_inputs = [] + all_video_inputs = [] + for images in batch_images: + content = [] + for img in images: + content.append({"type": "image", "image": img}) + content.append({"type": "text", "text": "What single atomic skill is shown in these frames? Reply with only the skill name."}) + messages = [ + {"role": "system", "content": [{"type": "text", "text": prompt}]}, + {"role": "user", "content": content}, + ] + text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) + image_inputs, video_inputs = self.process_vision_info(messages) + all_texts.append(text) + if image_inputs is not None: + all_image_inputs.extend(image_inputs if isinstance(image_inputs, list) else [image_inputs]) + if video_inputs is not None: + all_video_inputs.extend(video_inputs if isinstance(video_inputs, list) else [video_inputs]) + inputs = self.processor( + text=all_texts, + images=all_image_inputs if all_image_inputs else None, + videos=all_video_inputs if all_video_inputs else None, + padding=True, + return_tensors="pt", + ).to(self.device) + with torch.no_grad(): + generated_ids = self.model.generate(**inputs, max_new_tokens=128, do_sample=False) + responses = self.processor.batch_decode( + [out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids)], + skip_special_tokens=True, + ) + return [ + (r.split("\n")[0].strip().strip('."') or "unknown") + for r in responses + ] + + +class Qwen2VLImageSegmenter: + """Uses Qwen2-VL to assign one skill name to a window of images (same model as subtask_annotate).""" + + def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16): + from qwen_vl_utils import process_vision_info + from transformers import AutoProcessor, Qwen2VLForConditionalGeneration + + self.console = Console() + self.device = device + self.process_vision_info = process_vision_info + self.console.print(f"[cyan]Loading Qwen2-VL for image-window labeling: {model_name}...[/cyan]") + self.model = Qwen2VLForConditionalGeneration.from_pretrained( + model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True + ) + self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True) + self.console.print(f"[green]✓ Model loaded on {device}[/green]") + + def segment_skill_from_images( + self, + images: list[PIL.Image.Image], + coarse_goal: str | None = None, + subtask_labels: list[str] | None = None, + ) -> str: + """Return a single skill name for the given window of images.""" + return _run_image_segmenter(self, images, coarse_goal, subtask_labels) + + def segment_skill_from_images_batch( + self, + batch_images: list[list[PIL.Image.Image]], + coarse_goal: str | None = None, + subtask_labels: list[str] | None = None, + ) -> list[str]: + """Return one skill name per window; processes multiple windows in one forward pass.""" + return _run_image_segmenter_batch(self, batch_images, coarse_goal, subtask_labels) + + +class Qwen3VLImageSegmenter: + """Uses Qwen3-VL (MoE) to assign one skill name to a window of images.""" + + def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16): + from qwen_vl_utils import process_vision_info + from transformers import AutoProcessor, Qwen3VLMoeForConditionalGeneration + + self.console = Console() + self.device = device + self.process_vision_info = process_vision_info + self.console.print(f"[cyan]Loading Qwen3-VL for image-window labeling: {model_name}...[/cyan]") + self.model = Qwen3VLMoeForConditionalGeneration.from_pretrained( + model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True + ) + self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True) + self.console.print(f"[green]✓ Model loaded on {device}[/green]") + + def segment_skill_from_images( + self, + images: list[PIL.Image.Image], + coarse_goal: str | None = None, + subtask_labels: list[str] | None = None, + ) -> str: + """Return a single skill name for the given window of images.""" + return _run_image_segmenter(self, images, coarse_goal, subtask_labels) + + def segment_skill_from_images_batch( + self, + batch_images: list[list[PIL.Image.Image]], + coarse_goal: str | None = None, + subtask_labels: list[str] | None = None, + ) -> list[str]: + """Return one skill name per window; processes multiple windows in one forward pass.""" + return _run_image_segmenter_batch(self, batch_images, coarse_goal, subtask_labels) + + +def get_image_segmenter( + model_name: str, + device: str = "cuda", + torch_dtype: torch.dtype = torch.bfloat16, +): + """Return the appropriate image-window segmenter for the model (Qwen2-VL or Qwen3-VL).""" + model_lower = model_name.lower() + if "qwen3" in model_lower: + return Qwen3VLImageSegmenter(model_name, device, torch_dtype) + return Qwen2VLImageSegmenter(model_name, device, torch_dtype) + + +def frame_to_pil(frame_value) -> PIL.Image.Image: + """Convert a single frame from dataset (tensor or PIL or path) to PIL.Image.""" + if isinstance(frame_value, PIL.Image.Image): + return frame_value + if isinstance(frame_value, (str, Path)): + return PIL.Image.open(frame_value).convert("RGB") + if hasattr(frame_value, "numpy"): + arr = frame_value.numpy() + else: + arr = np.asarray(frame_value) + if arr.ndim == 3 and arr.shape[0] in (1, 3, 4): + arr = np.transpose(arr, (1, 2, 0)) + if arr.dtype == np.float32 or arr.dtype == np.float64: + arr = (np.clip(arr, 0, 1) * 255).astype(np.uint8) + elif arr.dtype != np.uint8: + arr = np.clip(arr, 0, 255).astype(np.uint8) + if arr.shape[-1] == 1: + arr = np.repeat(arr, 3, axis=-1) + return PIL.Image.fromarray(arr) + + +def _sample_window_indices(window_length: int, max_frames: int) -> list[int]: + """Return indices into a window of length window_length, at most max_frames, in order. + If window_length <= max_frames, returns range(window_length). + Otherwise returns sorted random sample of max_frames indices (temporal order preserved). + """ + if max_frames <= 0 or window_length <= max_frames: + return list(range(window_length)) + return sorted(random.sample(range(window_length), max_frames)) + + +class SkillAnnotatorImage: + """Annotates episodes by sliding a window over frames and labeling each window with the VLM.""" + + def __init__( + self, + segmenter: Qwen2VLImageSegmenter | Qwen3VLImageSegmenter, + window_size: int = 8, + stride: int | None = None, + batch_size: int = 1, + max_frames_per_window: int | None = None, + console: Console | None = None, + ): + self.segmenter = segmenter + self.window_size = window_size + self.stride = stride if stride is not None else window_size + self.batch_size = max(1, batch_size) + self.max_frames_per_window = max_frames_per_window + self.console = console or Console() + + def annotate_dataset( + self, + dataset: LeRobotDataset, + camera_key: str, + episodes: list[int] | None = None, + skip_existing: bool = False, + subtask_labels: list[str] | None = None, + ) -> dict[int, EpisodeSkills]: + """Annotate episodes using image windows. camera_key can be an image_key or video_key.""" + episode_indices = episodes or list(range(dataset.meta.total_episodes)) + coarse_goal = self._get_coarse_goal(dataset) + annotations: dict[int, EpisodeSkills] = {} + + if skip_existing: + existing = load_skill_annotations(dataset.root) + if existing and existing.get("episodes"): + existing_eps = {int(k) for k in existing["episodes"] if existing["episodes"][k].get("skills")} + episode_indices = [i for i in episode_indices if i not in existing_eps] + + for ep_idx in episode_indices: + try: + skills = self._annotate_episode( + dataset, ep_idx, camera_key, coarse_goal, subtask_labels + ) + if skills: + annotations[ep_idx] = EpisodeSkills( + episode_index=ep_idx, + description=coarse_goal, + skills=skills, + ) + self.console.print(f"[green]✓ Episode {ep_idx}: {len(skills)} window skills[/green]") + else: + self.console.print(f"[yellow]⚠ Episode {ep_idx}: no skills[/yellow]") + except Exception as e: + self.console.print(f"[red]Episode {ep_idx} failed: {e}[/red]") + + return annotations + + def _get_coarse_goal(self, dataset: LeRobotDataset) -> str: + if dataset.meta.tasks is not None and len(dataset.meta.tasks) > 0: + return str(dataset.meta.tasks.index[0]) + return "Perform the demonstrated manipulation task." + + def _annotate_episode( + self, + dataset: LeRobotDataset, + episode_index: int, + camera_key: str, + coarse_goal: str, + subtask_labels: list[str] | None = None, + ) -> list[Skill]: + ep = dataset.meta.episodes[episode_index] + ep_from = int(ep["dataset_from_index"]) + ep_to = int(ep["dataset_to_index"]) + length = ep_to - ep_from + fps = dataset.meta.fps + if length == 0: + return [] + + # Collect full windows: (images, t_start, t_end) using frame timestamps. + # If max_frames_per_window is set and window is larger, sample that many frames (order preserved). + window_specs: list[tuple[list[PIL.Image.Image], float, float]] = [] + start = 0 + while start + self.window_size <= length: + offsets = _sample_window_indices( + self.window_size, + self.max_frames_per_window or self.window_size, + ) + frame_indices = [ep_from + start + i for i in offsets] + images = [] + t_start = float(dataset[frame_indices[0]]["timestamp"].item()) + for idx in frame_indices: + item = dataset[idx] + images.append(frame_to_pil(item[camera_key])) + t_end = t_start + self.window_size / fps + window_specs.append((images, t_start, t_end)) + start += self.stride + + # Last partial window + if start < length: + partial_len = ep_to - (ep_from + start) + offsets = _sample_window_indices( + partial_len, + self.max_frames_per_window or partial_len, + ) + frame_indices = [ep_from + start + i for i in offsets] + images = [] + t_start = float(dataset[frame_indices[0]]["timestamp"].item()) + for idx in frame_indices: + item = dataset[idx] + images.append(frame_to_pil(item[camera_key])) + t_end = float(dataset[frame_indices[-1]]["timestamp"].item()) + 1.0 / fps + window_specs.append((images, t_start, t_end)) + + # Run in batches + skills: list[Skill] = [] + for i in range(0, len(window_specs), self.batch_size): + chunk = window_specs[i : i + self.batch_size] + batch_images = [spec[0] for spec in chunk] + if len(batch_images) > 1: + skill_names = self.segmenter.segment_skill_from_images_batch( + batch_images, coarse_goal, subtask_labels + ) + else: + skill_names = [ + self.segmenter.segment_skill_from_images( + batch_images[0], coarse_goal, subtask_labels + ) + ] + for (_, t_start, t_end), name in zip(chunk, skill_names, strict=True): + skills.append(Skill(name=name, start=t_start, end=t_end)) + + return skills + + +def main(): + parser = argparse.ArgumentParser( + description="Image-window subtask annotation using Qwen VLM (frames as images for better accuracy)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent("""\ + Examples: + python -m lerobot.data_processing.annotations.subtask_annotate_image \\ + --data-dir /path/to/dataset --camera-key observation.images.base \\ + --window-size 8 --output-dir ./output + + python -m lerobot.data_processing.annotations.subtask_annotate_image \\ + --repo-id user/dataset --camera-key observation.images.base \\ + --window-size 6 --stride 3 --model Qwen/Qwen2-VL-7B-Instruct + + # Use Qwen3-VL (MoE) + python -m lerobot.data_processing.annotations.subtask_annotate_image \\ + --data-dir /path/to/dataset --camera-key observation.images.base \\ + --model Qwen/Qwen3-VL-30B-A3B-Instruct + """), + ) + data_group = parser.add_mutually_exclusive_group(required=True) + data_group.add_argument("--data-dir", type=str, help="Path to local LeRobot dataset") + data_group.add_argument("--repo-id", type=str, help="HuggingFace Hub dataset repository ID") + + parser.add_argument( + "--camera-key", + type=str, + required=True, + help="Image or video observation key (e.g. observation.images.base)", + ) + parser.add_argument( + "--model", + type=str, + default="Qwen/Qwen2-VL-7B-Instruct", + help="VLM model: Qwen2-VL or Qwen3-VL (default: Qwen/Qwen2-VL-7B-Instruct)", + ) + parser.add_argument( + "--device", + type=str, + default="cuda", + ) + parser.add_argument( + "--window-size", + type=int, + default=8, + help="Number of frames per window (default: 8)", + ) + parser.add_argument( + "--stride", + type=int, + default=None, + help="Stride for sliding window (default: window_size = non-overlapping)", + ) + parser.add_argument( + "--batch-size", + type=int, + default=1, + help="Number of windows to process in one VLM call (default: 1; increase for speed)", + ) + parser.add_argument( + "--max-frames-per-window", + type=int, + default=None, + metavar="N", + help="If window has more than N frames, randomly sample N frames (order kept) to avoid OOM (e.g. 16)", + ) + parser.add_argument("--episodes", type=int, nargs="+", help="Episode indices to annotate (default: all)") + parser.add_argument("--skip-existing", action="store_true", help="Skip episodes that already have annotations") + parser.add_argument( + "--subtask-labels", + type=str, + nargs="*", + default=None, + help="Closed vocabulary: model must choose only from these labels", + ) + parser.add_argument("--output-dir", type=str, help="Output directory for dataset with subtask_index") + parser.add_argument("--output-repo-id", type=str, help="Output repo id (default: _with_subtasks)") + parser.add_argument("--push-to-hub", action="store_true") + + args = parser.parse_args() + console = Console() + + # Load dataset + console.print("[cyan]Loading dataset...[/cyan]") + if args.data_dir: + dataset = LeRobotDataset(repo_id="local/dataset", root=args.data_dir, download_videos=False) + else: + dataset = LeRobotDataset(repo_id=args.repo_id, download_videos=True) + camera_keys = dataset.meta.camera_keys + if args.camera_key not in camera_keys: + console.print(f"[red]Error: camera key '{args.camera_key}' not in {camera_keys}[/red]") + return + console.print(f"[green]✓ Loaded dataset, {dataset.meta.total_episodes} episodes[/green]") + + # Same Qwen VLM as subtask_annotate (Qwen2-VL or Qwen3-VL), image windows instead of video + segmenter = get_image_segmenter(args.model, args.device, torch.bfloat16) + + annotator = SkillAnnotatorImage( + segmenter=segmenter, + window_size=args.window_size, + stride=args.stride, + batch_size=args.batch_size, + max_frames_per_window=args.max_frames_per_window, + console=console, + ) + annotations = annotator.annotate_dataset( + dataset=dataset, + camera_key=args.camera_key, + episodes=args.episodes, + skip_existing=args.skip_existing, + subtask_labels=args.subtask_labels, + ) + + if not annotations: + console.print("[yellow]No annotations to save.[/yellow]") + return + + output_dir = Path(args.output_dir) if args.output_dir else None + output_repo_id = args.output_repo_id + new_dataset = save_skill_annotations(dataset, annotations, output_dir, output_repo_id) + + total_skills = sum(len(a.skills) for a in annotations.values()) + console.print(f"[bold green]✓ Done.[/bold green] Episodes: {len(annotations)}, total window skills: {total_skills}") + console.print(f" Dataset with subtask_index: {new_dataset.root}") + + if args.push_to_hub and not args.data_dir: + console.print("[cyan]Pushing to Hub...[/cyan]") + try: + new_dataset.push_to_hub(push_videos=False) + console.print("[green]✓ Pushed.[/green]") + except Exception as e: + console.print(f"[red]Push failed: {e}[/red]") + + +if __name__ == "__main__": + main() diff --git a/src/lerobot/utils/constants.py b/src/lerobot/utils/constants.py index edc1762f5..127ee7937 100644 --- a/src/lerobot/utils/constants.py +++ b/src/lerobot/utils/constants.py @@ -95,34 +95,106 @@ LIBERO_KEY_PIXELS_EYE_IN_HAND = "pixels/robot0_eye_in_hand_image" # Skill segmentation prompt template for VLM-based subtask annotation # Placeholders: {goal_context}, {subtask_labels_section} +# When subtask_labels are provided, use format_subtask_labels_section() to fill {subtask_labels_section}. + + +def format_subtask_labels_section(subtask_labels: list[str]) -> str: + """Format a list of subtask labels for insertion into SKILL_SEGMENTATION_PROMPT_TEMPLATE. + The model will be instructed to choose only from these labels. + """ + if not subtask_labels: + return "" + return "\n".join(f' "{label}",' for label in subtask_labels).rstrip(",") + + SKILL_SEGMENTATION_PROMPT_TEMPLATE = """# Role You are a Robotics Vision System specializing in temporal action segmentation for robot manipulation demonstrations. +# Video duration (critical) +The total video length is **{video_duration_seconds} seconds** ({video_duration_mm_ss}). All "start" and "end" values in your JSON must be numeric seconds in the range [0.0, {video_duration_seconds}]. The last skill's "end" must be exactly **{video_duration_seconds}**. Do not stop earlier. + # Task {goal_context}Segment this robot demonstration video into short atomic manipulation skills. Each skill should: -- Last approximately 1-3 seconds +- Last approximately 1-3 seconds (or longer if the action takes longer) - Describe a clear, single action (e.g., "pick up object", "move arm left", "release gripper") -- Have precise start and end timestamps +- Have precise start and end timestamps in seconds (float) # Requirements 1. **Atomic Actions**: Each skill should be a single, indivisible action -2. **Complete Coverage**: Skills must cover the entire video duration with no gaps +2. **Complete Coverage**: Skills must cover the entire video from 0.0 to {video_duration_seconds} seconds with no gaps 3. **Boundary Consistency**: The end of one skill equals the start of the next 4. **Natural Language**: Use clear, descriptive names for each skill -5. **Timestamps**: Use seconds (float) for all timestamps -{subtask_labels_section} +5. **Timestamps**: Use seconds as floats (e.g. 12.5) for all timestamps; the last "end" must be {video_duration_seconds}. If the video has a visible timer in the corner showing elapsed time in seconds, use it to report accurate start and end times for each skill. +# Subtask Label Set (Closed Vocabulary) + You MUST strictly identify the video segments using ONLY the following labels. Do not create new labels or modify existing ones: + + [ + {subtask_labels_section} + ] + + The video shows one successful execution of all subtasks in a logical order. + + # Ground-Truth Semantics (Very Important) + Use **visual state changes** to define when a subtask starts and ends. Do NOT assume equal durations for the subtasks. + + - A subtask **starts** at the first frame where the robot's motion clearly initiates that subtask. + - A subtask **ends** at the first frame where that specific action is visually completed and the manipulated object reaches a temporary, stable configuration. + + If there are short pauses or micro-motions that don't clearly correspond to a new subtask, they belong to the **current** subtask. + + # Hard Constraints & Logic + 1. **Continuous Coverage (No Gaps):** + - The entire video from 0.0 to {video_duration_seconds} seconds must be covered by subtasks. + - There can be no gaps between subtasks. + - If there is any idle or ambiguous time between clear actions, extend the *preceding* subtask to cover it. + + 2. **Boundary Consistency:** + - The `"end"` timestamp of one subtask must be exactly equal to the `"start"` timestamp of the next subtask. + - Boundaries must coincide with a real visual state transition, not just a convenient time split. + + 3. **Chronological Order, One Occurrence Each:** + - This is a single successful demonstration. + - Each subtask from the vocabulary appears **exactly once**, in the correct logical order. + - **Durations may be very different** between subtasks. Never assume they are similar lengths. Base all boundaries only on the video. + + 4. **Reject Uniform Segmentation (Important):** + - Do NOT simply divide the video into equal or nearly equal time chunks. + - If your boundaries would result in subtasks with similar durations (e.g. all around 5 seconds), treat this as evidence that your segmentation is wrong and refine the boundaries. + - Only use nearly equal durations if the video truly shows each subtask taking the same amount of time (this is very rare). + + 5. **Timestamps (critical):** + - Use numeric seconds (float) in the JSON, e.g. 0.0, 5.2, 12.8. + - The first subtask always starts at 0.0. + - The last subtask must end at exactly {video_duration_seconds} (the full video length). + - **Time is displayed inside the video**: a visible timer in one corner shows the elapsed time in seconds (from 0.0 to the end). Use this on-screen timer to set accurate start and end times for each skill. + + # Step 1 — Textual Timeline (must do this first) + First, write a extensive and detailed textual timeline describing what happens in the video with approximate timestamps. **Read the time from the visible timer shown in the video** to get accurate timestamps. + For each subtask, include: + - its name + - an approximate start and end time (from the on-screen timer), + - an description of the visual event at the boundary (e.g. "shirt fully folded to the left", "robot rotates folded shirt 90 degrees"). + + Format this as a bullet list. # Output Format -After your analysis, output ONLY valid JSON with this exact structure: +After your analysis, output ONLY valid JSON with this exact structure. The last skill's "end" MUST be exactly {video_duration_seconds}. Use the timestamps you read from the visible timer in the video: ```json {{ "skills": [ - {{"name": "skill description", "start": 0.0, "end": 1.5}}, - {{"name": "another skill", "start": 1.5, "end": 3.2}} + {{"name": "first skill", "start": 0.0, "end": 5.0}}, + {{"name": "second skill", "start": 5.0, "end": 12.0}}, + {{"name": "last skill", "start": 12.0, "end": {video_duration_seconds}}} ] }} ``` -The first skill must start at 0.0 and the last skill must end at the video duration. +The first skill must start at 0.0 and the last skill must end at **{video_duration_seconds}** (the total video duration in seconds). +# Strict Structural Rule +This video contains exactly 4 subtasks. +You MUST output exactly 4 segments. +Each segment must use a unique label from the vocabulary. +No label may be repeated. + """