Compare commits

...

15 Commits

Author SHA1 Message Date
Pepijn 63fad12e8d refactor: consolidate VLM classes into single QwenVL implementation
Remove Qwen2VL, Qwen3VL, Qwen35VL in favor of one QwenVL class that
uses AutoModelForImageTextToText and works with the whole Qwen VL
family. Moves shared _parse_skills_response to BaseVLM and extracts
_build_messages/_prepare_inputs/_decode helpers to reduce duplication.

Made-with: Cursor
2026-03-30 20:37:09 +02:00
Pepijn 2545f1a8ed fix: route video_metadata through videos_kwargs for Qwen3/3.5 processors
The Qwen3VLProcessor distributes kwargs to sub-processors via
_merge_kwargs. Flat kwargs like video_metadata and do_sample_frames
were not reaching the video processor, causing fps to default to 24
and producing shape mismatches.

Pass these kwargs explicitly under videos_kwargs so they reach
Qwen3VLVideoProcessor directly. Revert Qwen2VL to its simpler
original approach since its processor doesn't use videos_kwargs.

Made-with: Cursor
2026-03-30 19:09:46 +02:00
Pepijn 5f85b572d7 fix: unpack video_metadata from tuples and pass separately to processor
The Qwen3.5 processor requires video_metadata as a separate parameter,
not embedded in the video tensors. Use return_video_metadata=True from
process_vision_info, then unpack the (tensor, metadata) tuples into
separate videos and video_metadata lists for the processor call.

Made-with: Cursor
2026-03-30 17:37:59 +02:00
Pepijn 72692525da fix: pass fps=1.0 scalar to processor instead of video_metadata tuples
The return_video_metadata=True approach causes 'list index out of range'
due to (tensor, metadata) tuple format issues. Since all extracted
videos are at 1fps (ffmpeg -r 1), directly pass fps=1.0 as a scalar
alongside do_sample_frames=False — this gives the processor the exact
fps for position embedding computation without format compatibility
issues across Qwen processor versions.

Made-with: Cursor
2026-03-30 17:32:30 +02:00
Pepijn 9a298524ca fix: pass video_metadata via process_vision_info for correct position embeddings
The Qwen3.5 processor needs video_metadata (fps, frame indices) to
compute temporal position embeddings. Use return_video_metadata=True
which embeds metadata inside the video tensors as (tensor, metadata)
tuples, and return_video_kwargs=True which returns {'do_sample_frames':
False} without the problematic fps list.

Made-with: Cursor
2026-03-30 17:23:44 +02:00
Pepijn 002a9dd0b9 fix: use do_sample_frames=False instead of video_kwargs fps list
The Qwen3.5 processor expects fps as a scalar, not a list, so passing
video_kwargs with fps=[...] fails validation. Since process_vision_info
already handles frame sampling, we only need do_sample_frames=False to
tell the processor to use the pre-sampled frames as-is.

Made-with: Cursor
2026-03-30 16:55:46 +02:00
Pepijn e40985b013 fix: pass video_kwargs from process_vision_info to Qwen processor
The Qwen processor needs fps metadata (via return_video_kwargs=True)
to compute correct temporal position embeddings. Without it, the
processor defaults to fps=24 regardless of the actual video fps,
causing shape mismatches between expected and actual video tokens.

Made-with: Cursor
2026-03-30 16:50:34 +02:00
Pepijn d03200bdb3 fix: force torchvision video backend instead of cv2 bypass
Replace manual cv2 frame reading with FORCE_QWENVL_VIDEO_READER=torchvision
env var. The torchvision backend (PyAV) properly reads video metadata and
respects the fps parameter, avoiding the torchcodec fps=24 default issue.

Made-with: Cursor
2026-03-30 16:42:52 +02:00
Pepijn ac41cd6672 fix: bypass torchcodec video decoding by pre-reading frames via cv2
When torchcodec is installed, qwen-vl-utils ignores the fps parameter
and defaults to 24fps if video metadata is missing, causing shape
mismatches. Fix by reading video frames directly as PIL images and
passing them to the processor, bypassing torchcodec entirely.

Made-with: Cursor
2026-03-30 16:03:26 +02:00
Pepijn 9b211a45d6 fix: disable thinking mode in Qwen35VL single-episode fallback path
The single-episode `segment_skills` method was missing
`enable_thinking=False` in `apply_chat_template`, causing the model to
output reasoning traces instead of JSON when the batch path fails and
falls back to per-episode processing.

Made-with: Cursor
2026-03-30 15:31:18 +02:00
root a6387da464 add license 2026-03-11 23:14:22 +00:00
Jade Choghari 0328b3f4aa Update src/lerobot/data_processing/data_annotations/vlm_annotations.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Jade Choghari <chogharijade@gmail.com>
2026-03-11 16:10:37 -07:00
root cc8e4c0d86 remove folder 2026-03-11 22:49:38 +00:00
root 819c1b9710 add tests/fixes 2026-03-11 22:49:06 +00:00
root f0848c6887 add subtasl 2026-03-11 19:51:48 +00:00
8 changed files with 1474 additions and 2 deletions
+1
View File
@@ -222,6 +222,7 @@ lerobot-eval="lerobot.scripts.lerobot_eval:main"
lerobot-train="lerobot.scripts.lerobot_train:main"
lerobot-train-tokenizer="lerobot.scripts.lerobot_train_tokenizer:main"
lerobot-dataset-viz="lerobot.scripts.lerobot_dataset_viz:main"
lerobot-dataset-subtask-annotate="lerobot.scripts.lerobot_subtask_annotate:main"
lerobot-info="lerobot.scripts.lerobot_info:main"
lerobot-find-joint-limits="lerobot.scripts.lerobot_find_joint_limits:main"
lerobot-imgtransform-viz="lerobot.scripts.lerobot_imgtransform_viz:main"
@@ -0,0 +1,2 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
# Data annotations for subtasks and VLM-based labeling.
@@ -0,0 +1,671 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import subprocess
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
import cv2
from lerobot.datasets.dataset_tools import add_features
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import (
create_subtask_index_array,
create_subtasks_dataframe,
save_subtasks,
)
if TYPE_CHECKING:
from lerobot.data_processing.data_annotations.vlm_annotations import BaseVLM
# Skill Annotation Data Structures
class Skill:
"""Represents a single atomic skill/subtask in a demonstration."""
def __init__(self, name: str, start: float, end: float):
self.name = name
self.start = start # Start timestamp in seconds
self.end = end # End timestamp in seconds
def to_dict(self) -> dict:
return {"name": self.name, "start": self.start, "end": self.end}
@classmethod
def from_dict(cls, data: dict) -> "Skill":
return cls(name=data["name"], start=data["start"], end=data["end"])
def __repr__(self) -> str:
return f"Skill(name='{self.name}', start={self.start:.2f}, end={self.end:.2f})"
class EpisodeSkills:
"""Container for all skills in an episode."""
def __init__(self, episode_index: int, description: str, skills: list[Skill]):
self.episode_index = episode_index
self.description = description
self.skills = skills
def to_dict(self) -> dict:
return {
"episode_index": self.episode_index,
"description": self.description,
"skills": [s.to_dict() for s in self.skills],
}
# Video Extraction Utilities
class VideoExtractor:
"""Utilities for extracting and processing video segments from LeRobot datasets."""
def __init__(self) -> None:
pass
def extract_episode_video(
self,
video_path: Path,
start_timestamp: float,
end_timestamp: float,
target_fps: int = 1,
) -> Path:
"""
Extract a specific episode segment from a concatenated video file.
Args:
video_path: Path to the source video file
start_timestamp: Start time in seconds
end_timestamp: End time in seconds
target_fps: Target frames per second for output
Returns:
Path to the extracted temporary video file
"""
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file:
tmp_path = Path(tmp_file.name)
duration = end_timestamp - start_timestamp
print(f"Extracting: {start_timestamp:.1f}s - {end_timestamp:.1f}s ({duration:.1f}s)")
cmd = [
"ffmpeg",
"-i",
str(video_path),
"-ss",
str(start_timestamp),
"-t",
str(duration),
"-r",
str(target_fps),
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-crf",
"23",
"-an",
"-y",
str(tmp_path),
]
try:
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"FFmpeg failed: {e}") from e
except FileNotFoundError as e:
raise RuntimeError("FFmpeg not found. Please install ffmpeg.") from e
if not tmp_path.exists() or tmp_path.stat().st_size < 1024:
if tmp_path.exists():
tmp_path.unlink()
raise RuntimeError("Video extraction produced invalid file")
return tmp_path
def add_timer_overlay(self, video_path: Path) -> Path:
"""
Add a visible timer overlay to each frame (elapsed time in seconds) in one corner.
Used so the VLM can read the timestamp from the image instead of relying on file metadata.
Draws a black box with white text at top-right. Writes to a new temporary file and returns its path.
"""
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as out_file:
out_path = Path(out_file.name)
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise RuntimeError("Failed to open video")
fps = cap.get(cv2.CAP_PROP_FPS) or 1.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(out_path), fourcc, fps, (w, h))
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = max(1.2, min(h, w) / 350.0)
thickness = max(2, int(font_scale))
padding = 15
margin = 30
frame_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
t_sec = frame_idx / fps
text = f"{t_sec:.2f} s"
(tw, th), baseline = cv2.getTextSize(text, font, font_scale, thickness)
# Top-right placement
x_text = w - tw - margin - padding
y_text = margin + th + padding
# Rectangle coordinates (black box behind text)
x1 = x_text - padding
y1 = y_text - th - padding
x2 = x_text + tw + padding
y2 = y_text + baseline + padding
# Draw black filled rectangle
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 0), -1)
# Draw white text
cv2.putText(
frame,
text,
(x_text, y_text),
font,
font_scale,
(255, 255, 255),
thickness,
lineType=cv2.LINE_AA,
)
writer.write(frame)
frame_idx += 1
cap.release()
writer.release()
if not out_path.exists() or out_path.stat().st_size < 1024:
if out_path.exists():
out_path.unlink()
raise RuntimeError("Timer overlay produced invalid file")
return out_path
def get_video_duration(self, video_path: Path) -> float:
"""Get duration of a video file in seconds."""
cap = cv2.VideoCapture(str(video_path))
fps = cap.get(cv2.CAP_PROP_FPS) or 30
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
return frame_count / fps
# Skill Annotation Pipeline
class SkillAnnotator:
"""
Main class for annotating LeRobot datasets with skill labels.
This class orchestrates the full annotation pipeline:
1. Load dataset
2. Extract video segments for each episode
3. Run VLM-based skill segmentation
4. Update dataset task metadata
"""
def __init__(
self,
vlm: "BaseVLM",
video_extractor: VideoExtractor | None = None,
batch_size: int = 8,
add_timer_overlay: bool = True,
):
self.vlm = vlm
self.video_extractor = video_extractor or VideoExtractor()
self.batch_size = batch_size
self.add_timer_overlay = add_timer_overlay
def annotate_dataset(
self,
dataset: LeRobotDataset,
video_key: str,
episodes: list[int] | None = None,
skip_existing: bool = False,
subtask_labels: list[str] | None = None,
) -> dict[int, EpisodeSkills]:
"""
Annotate all episodes in a dataset with skill labels using batched processing.
Args:
dataset: LeRobot dataset to annotate
video_key: Key for video observations (e.g., "observation.images.base")
episodes: Specific episode indices to annotate (None = all)
skip_existing: Skip episodes that already have skill annotations
subtask_labels: If provided, model must choose only from these labels (closed vocabulary)
Returns:
Dictionary mapping episode index to EpisodeSkills
"""
episode_indices = episodes or list(range(dataset.meta.total_episodes))
annotations: dict[int, EpisodeSkills] = {}
failed_episodes: dict[int, str] = {} # Track failed episodes with error messages
# Get coarse task description if available
coarse_goal = self._get_coarse_goal(dataset)
# Filter out episodes that already have annotations if skip_existing is True
if skip_existing:
existing_annotations = load_skill_annotations(dataset.root)
if existing_annotations and "episodes" in existing_annotations:
# Only skip episodes that exist AND have non-empty skills
existing_episode_indices = set()
for idx_str, episode_data in existing_annotations["episodes"].items():
idx = int(idx_str)
# Check if skills list exists and is not empty
if "skills" in episode_data and episode_data["skills"]:
existing_episode_indices.add(idx)
original_count = len(episode_indices)
episode_indices = [ep for ep in episode_indices if ep not in existing_episode_indices]
skipped_count = original_count - len(episode_indices)
if skipped_count > 0:
print(f"Skipping {skipped_count} episodes with existing non-empty annotations")
if not episode_indices:
print("No episodes to annotate (all already annotated)")
return annotations
print(f"Annotating {len(episode_indices)} episodes in batches of {self.batch_size}...")
# Process episodes in batches
for batch_start in range(0, len(episode_indices), self.batch_size):
batch_end = min(batch_start + self.batch_size, len(episode_indices))
batch_episodes = episode_indices[batch_start:batch_end]
print(
f"Processing batch {batch_start // self.batch_size + 1}/{(len(episode_indices) + self.batch_size - 1) // self.batch_size} (episodes {batch_episodes[0]} to {batch_episodes[-1]})..."
)
try:
batch_annotations = self._annotate_episodes_batch(
dataset, batch_episodes, video_key, coarse_goal, subtask_labels
)
for ep_idx in batch_episodes:
if ep_idx in batch_annotations and batch_annotations[ep_idx]:
skills = batch_annotations[ep_idx]
annotations[ep_idx] = EpisodeSkills(
episode_index=ep_idx,
description=coarse_goal,
skills=skills,
)
print(f" Episode {ep_idx}: {len(skills)} skills identified")
else:
failed_episodes[ep_idx] = "Empty or missing skills from batch processing"
print(f"⚠ Episode {ep_idx}: No skills extracted, will retry")
except Exception as e:
print(f"✗ Batch failed: {e}. Falling back to single-episode processing...")
# Fallback: process episodes one by one
for ep_idx in batch_episodes:
try:
skills = self._annotate_episode(
dataset, ep_idx, video_key, coarse_goal, subtask_labels
)
if skills:
annotations[ep_idx] = EpisodeSkills(
episode_index=ep_idx,
description=coarse_goal,
skills=skills,
)
print(f" Episode {ep_idx}: {len(skills)} skills identified")
else:
failed_episodes[ep_idx] = "Empty skills list from single-episode processing"
print(f"⚠ Episode {ep_idx}: No skills extracted, will retry")
except Exception as ep_error:
failed_episodes[ep_idx] = str(ep_error)
print(f"⚠ Episode {ep_idx} failed: {ep_error}, will retry")
# Retry failed episodes one more time
if failed_episodes:
print(f"\nRetrying {len(failed_episodes)} failed episodes...")
retry_count = 0
for ep_idx, error_msg in list(failed_episodes.items()):
print(f"Retry attempt for episode {ep_idx} (previous error: {error_msg})")
try:
skills = self._annotate_episode(dataset, ep_idx, video_key, coarse_goal, subtask_labels)
if skills:
annotations[ep_idx] = EpisodeSkills(
episode_index=ep_idx,
description=coarse_goal,
skills=skills,
)
print(f" Episode {ep_idx} (retry): {len(skills)} skills identified")
del failed_episodes[ep_idx]
retry_count += 1
else:
print(f"✗ Episode {ep_idx} (retry): Still no skills extracted")
except Exception as retry_error:
failed_episodes[ep_idx] = str(retry_error)
print(f"✗ Episode {ep_idx} (retry) failed: {retry_error}")
if retry_count > 0:
print(f"Successfully recovered {retry_count} episodes on retry")
if failed_episodes:
print(f"\n⚠ Warning: {len(failed_episodes)} episodes still failed after retry:")
for ep_idx, error_msg in failed_episodes.items():
print(f" Episode {ep_idx}: {error_msg}")
return annotations
def _get_coarse_goal(self, dataset: LeRobotDataset) -> str:
"""Extract or generate the coarse task description."""
# Try to get from existing task metadata
if dataset.meta.tasks is not None and len(dataset.meta.tasks) > 0:
# Get the first task description
first_task = dataset.meta.tasks.index[0]
if first_task:
return str(first_task)
return "Perform the demonstrated manipulation task."
def _annotate_episodes_batch(
self,
dataset: LeRobotDataset,
episode_indices: list[int],
video_key: str,
coarse_goal: str,
subtask_labels: list[str] | None = None,
) -> dict[int, list[Skill]]:
"""Annotate multiple episodes with skill labels in a batch."""
# Extract all videos for this batch
extracted_paths = []
timer_paths = []
paths_for_vlm = []
durations = []
valid_episode_indices = []
for ep_idx in episode_indices:
try:
# Get video path and timestamps
video_path = dataset.root / dataset.meta.get_video_file_path(ep_idx, video_key)
if not video_path.exists():
print(f"Warning: Video not found for episode {ep_idx}")
continue
# Get episode timestamps from metadata
ep = dataset.meta.episodes[ep_idx]
start_ts = float(ep[f"videos/{video_key}/from_timestamp"])
end_ts = float(ep[f"videos/{video_key}/to_timestamp"])
duration = end_ts - start_ts
# Extract episode segment to temporary file
extracted_path = self.video_extractor.extract_episode_video(
video_path, start_ts, end_ts, target_fps=dataset.meta.fps
)
if self.add_timer_overlay:
video_for_vlm = self.video_extractor.add_timer_overlay(extracted_path)
extracted_paths.append(extracted_path)
timer_paths.append(video_for_vlm)
else:
video_for_vlm = extracted_path
extracted_paths.append(extracted_path)
timer_paths.append(None)
paths_for_vlm.append(video_for_vlm)
durations.append(duration)
valid_episode_indices.append(ep_idx)
except Exception as e:
print(f"Warning: Failed to extract video for episode {ep_idx}: {e}")
continue
if not paths_for_vlm:
return {}
try:
# Run VLM skill segmentation in batch
all_skills = self.vlm.segment_skills_batch(paths_for_vlm, durations, coarse_goal, subtask_labels)
# Map results back to episode indices
results = {}
for ep_idx, skills in zip(valid_episode_indices, all_skills, strict=True):
results[ep_idx] = skills
return results
finally:
# Clean up all temporary files (extracted and timer-overlay)
for path in extracted_paths:
if path.exists():
path.unlink()
for path in timer_paths:
if path is not None and path.exists():
path.unlink()
def _annotate_episode(
self,
dataset: LeRobotDataset,
episode_index: int,
video_key: str,
coarse_goal: str,
subtask_labels: list[str] | None = None,
) -> list[Skill]:
"""Annotate a single episode with skill labels."""
# Get video path and timestamps for this episode
video_path = dataset.root / dataset.meta.get_video_file_path(episode_index, video_key)
if not video_path.exists():
raise FileNotFoundError(f"Video not found: {video_path}")
# Get episode timestamps from metadata
ep = dataset.meta.episodes[episode_index]
start_ts = float(ep[f"videos/{video_key}/from_timestamp"])
end_ts = float(ep[f"videos/{video_key}/to_timestamp"])
duration = end_ts - start_ts
# Extract episode segment to temporary file
extracted_path = self.video_extractor.extract_episode_video(
video_path, start_ts, end_ts, target_fps=1
)
if self.add_timer_overlay:
video_for_vlm = self.video_extractor.add_timer_overlay(extracted_path)
else:
video_for_vlm = extracted_path
try:
# Run VLM skill segmentation
skills = self.vlm.segment_skills(video_for_vlm, duration, coarse_goal, subtask_labels)
return skills
finally:
# Clean up temporary files (extracted and optionally timer-overlay)
if extracted_path.exists():
extracted_path.unlink()
if self.add_timer_overlay and video_for_vlm != extracted_path and video_for_vlm.exists():
video_for_vlm.unlink()
# Metadata Writer - Updates per-frame task_index based on skills
def get_skill_for_timestamp(skills: list[Skill], timestamp: float) -> Skill | None:
"""
Find which skill covers a given timestamp.
Args:
skills: List of skills with start/end times
timestamp: Frame timestamp in seconds
Returns:
The Skill that covers this timestamp, or None if not found
"""
for skill in skills:
if skill.start <= timestamp < skill.end:
return skill
# Handle the last frame (end boundary)
if timestamp >= skill.end and skill == skills[-1]:
return skill
return skills[-1] if skills else None # Fallback to last skill
def save_skill_annotations(
dataset: LeRobotDataset,
annotations: dict[int, EpisodeSkills],
output_dir: Path | None = None,
repo_id: str | None = None,
) -> LeRobotDataset:
"""
Save skill annotations to the dataset by:
1. Creating a subtasks.parquet file with unique subtasks
2. Adding a subtask_index feature to the dataset
3. Saving raw skill annotations as JSON for reference
This function does NOT modify tasks.parquet - it keeps the original tasks intact
and creates a separate subtask hierarchy.
Args:
dataset: The LeRobot dataset to annotate
annotations: Dictionary of episode skills
output_dir: Optional directory to save the modified dataset
repo_id: Optional repository ID for the new dataset
Returns:
New dataset with subtask_index feature added
"""
if not annotations:
print("No annotations to save")
return dataset
# Step 1: Create subtasks DataFrame
print("Creating subtasks DataFrame...")
subtasks_df, skill_to_subtask_idx = create_subtasks_dataframe(annotations)
# Step 2: Create subtask_index array for all frames
print("Creating subtask_index array...")
subtask_indices = create_subtask_index_array(dataset, annotations, skill_to_subtask_idx)
# Step 3: Save subtasks.parquet to the original dataset root
save_subtasks(subtasks_df, dataset.root)
# Step 4: Save the raw skill annotations as JSON for reference
skills_path = dataset.root / "meta" / "skills.json"
skills_path.parent.mkdir(parents=True, exist_ok=True)
# Load existing skills data if it exists and is not empty
existing_skills_data = None
if skills_path.exists():
try:
with open(skills_path) as f:
existing_skills_data = json.load(f)
if existing_skills_data and len(existing_skills_data.get("episodes", {})) > 0:
print(
f"Found existing skills.json with {len(existing_skills_data.get('episodes', {}))} episodes, merging..."
)
except (OSError, json.JSONDecodeError):
print("Warning: Could not load existing skills.json, will create new file")
existing_skills_data = None
# Prepare new annotations
new_episodes = {str(ep_idx): ann.to_dict() for ep_idx, ann in annotations.items()}
# Merge with existing data if available
if existing_skills_data:
# Preserve existing episodes that are not being updated
merged_episodes = existing_skills_data.get("episodes", {}).copy()
merged_episodes.update(new_episodes)
# Merge skill_to_subtask_index mappings
merged_skill_to_subtask = existing_skills_data.get("skill_to_subtask_index", {}).copy()
merged_skill_to_subtask.update(skill_to_subtask_idx)
# Use existing coarse_description if available, otherwise use new one
coarse_desc = existing_skills_data.get(
"coarse_description", annotations[next(iter(annotations))].description
)
skills_data = {
"coarse_description": coarse_desc,
"skill_to_subtask_index": merged_skill_to_subtask,
"episodes": merged_episodes,
}
print(
f"Updated {len(new_episodes)} episode(s), total episodes in skills.json: {len(merged_episodes)}"
)
else:
# No existing data, create new
skills_data = {
"coarse_description": annotations[next(iter(annotations))].description,
"skill_to_subtask_index": skill_to_subtask_idx,
"episodes": new_episodes,
}
with open(skills_path, "w") as f:
json.dump(skills_data, f, indent=2)
print(f" Saved skill annotations to {skills_path}")
# Step 5: Add subtask_index feature to dataset using add_features
print("Adding subtask_index feature to dataset...")
# Determine output directory and repo_id
output_dir = dataset.root.parent / f"{dataset.root.name}" if output_dir is None else Path(output_dir)
if repo_id is None:
repo_id = f"{dataset.repo_id}"
# Add feature using dataset_tools
feature_info = {
"dtype": "int64",
"shape": (1,),
"names": None,
}
new_dataset = add_features(
dataset=dataset,
features={
"subtask_index": (subtask_indices, feature_info),
},
output_dir=output_dir,
repo_id=repo_id,
)
# Copy subtasks.parquet to new output directory
import shutil
shutil.copy(dataset.root / "meta" / "subtasks.parquet", output_dir / "meta" / "subtasks.parquet")
shutil.copy(dataset.root / "meta" / "skills.json", output_dir / "meta" / "skills.json")
print(" Successfully added subtask_index feature!")
print(f" New dataset saved to: {new_dataset.root}")
print(f" Total subtasks: {len(subtasks_df)}")
return new_dataset
def load_skill_annotations(dataset_root: Path) -> dict | None:
"""Load existing skill annotations from a dataset."""
skills_path = dataset_root / "meta" / "skills.json"
if skills_path.exists():
with open(skills_path) as f:
return json.load(f)
return None
@@ -0,0 +1,271 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import re
from abc import ABC, abstractmethod
from pathlib import Path
import torch
from lerobot.data_processing.data_annotations.subtask_annotations import Skill
from lerobot.utils.constants import (
SKILL_SEGMENTATION_PROMPT_TEMPLATE,
format_subtask_labels_section,
)
logger = logging.getLogger(__name__)
DEFAULT_MODEL = "Qwen/Qwen3.5-27B"
def create_skill_segmentation_prompt(
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
duration_seconds: float | None = None,
) -> str:
"""Create the prompt for skill segmentation using the template from constants."""
if duration_seconds is None:
raise ValueError("duration_seconds is required for skill segmentation prompt")
goal_context = f'The overall goal is: "{coarse_goal}"\n\n' if coarse_goal else ""
subtask_labels_section = format_subtask_labels_section(subtask_labels) if subtask_labels else ""
video_duration_mm_ss = f"{int(duration_seconds // 60):02d}:{int(duration_seconds % 60):02d}"
return SKILL_SEGMENTATION_PROMPT_TEMPLATE.format(
goal_context=goal_context,
subtask_labels_section=subtask_labels_section,
video_duration_seconds=duration_seconds,
video_duration_mm_ss=video_duration_mm_ss,
)
class BaseVLM(ABC):
"""
Abstract base class for Vision-Language Models used in skill segmentation.
To add a new VLM family:
1. Subclass BaseVLM
2. Implement __init__, segment_skills, and segment_skills_batch
3. Register it in get_vlm()
"""
@abstractmethod
def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16):
pass
@abstractmethod
def segment_skills(
self,
video_path: Path,
episode_duration: float,
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[Skill]:
"""Segment a single video into atomic skills."""
pass
@abstractmethod
def segment_skills_batch(
self,
video_paths: list[Path],
episode_durations: list[float],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[list[Skill]]:
"""Segment multiple videos into atomic skills in a single batch."""
pass
def _parse_skills_response(self, response: str) -> list[Skill]:
"""Parse JSON skill list from VLM response text."""
if "```json" in response:
response = response.split("```json")[1].split("```")[0]
elif "```" in response:
response = response.split("```")[1].split("```")[0]
try:
data = json.loads(response)
skills_data = data.get("skills", data)
if isinstance(skills_data, list):
return [Skill.from_dict(s) for s in skills_data]
except json.JSONDecodeError:
match = re.search(r"\{.*\}", response, re.DOTALL)
if match:
try:
data = json.loads(match.group())
skills_data = data.get("skills", [])
return [Skill.from_dict(s) for s in skills_data]
except json.JSONDecodeError as e:
raise ValueError(f"Could not parse JSON from VLM response: {response[:200]}...") from e
raise ValueError(f"Could not parse skills from response: {response[:200]}...")
class QwenVL(BaseVLM):
"""Qwen VL model for skill segmentation (default: Qwen3.5 series).
Uses qwen-vl-utils for video processing and the HuggingFace transformers
Qwen3VLProcessor pipeline. Requires transformers >= 5.4.0 for correct
video position embeddings.
"""
def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16):
from qwen_vl_utils import process_vision_info
from transformers import AutoModelForImageTextToText, AutoProcessor
self.device = device
self.model_name = model_name
self.process_vision_info = process_vision_info
logger.info(f"Loading model: {model_name}...")
self.model = AutoModelForImageTextToText.from_pretrained(
model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True
)
self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
self.processor.tokenizer.padding_side = "left"
logger.info(f"Model loaded on {device}")
def _build_messages(self, video_path: Path, episode_duration: float, prompt: str) -> list[dict]:
duration_str = f"{int(episode_duration // 60):02d}:{int(episode_duration % 60):02d}"
return [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
{
"role": "user",
"content": [
{"type": "video", "video": str(video_path), "fps": 1.0},
{
"type": "text",
"text": (
f"Video duration: {duration_str} (exactly {episode_duration:.1f} seconds). "
f"Segment into atomic skills. Last skill must end at {episode_duration:.1f}."
),
},
],
},
]
def _prepare_inputs(self, messages: list[dict]) -> dict:
"""Tokenize a single message and return processor inputs on device."""
text = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True, enable_thinking=False
)
image_inputs, video_inputs = self.process_vision_info(messages, return_video_metadata=True)
videos, video_metadata = None, None
if video_inputs:
videos = [v[0] for v in video_inputs]
video_metadata = [v[1] for v in video_inputs]
return self.processor(
text=[text],
images=image_inputs,
videos=videos,
videos_kwargs={
"video_metadata": video_metadata,
"do_sample_frames": False,
},
padding=True,
return_tensors="pt",
).to(self.device)
def _decode(self, inputs, generated_ids) -> list[str]:
return self.processor.batch_decode(
[out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids, strict=True)],
skip_special_tokens=True,
clean_up_tokenization_spaces=False,
)
def segment_skills(
self,
video_path: Path,
episode_duration: float,
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[Skill]:
prompt = create_skill_segmentation_prompt(
coarse_goal, subtask_labels, duration_seconds=episode_duration
)
messages = self._build_messages(video_path, episode_duration, prompt)
inputs = self._prepare_inputs(messages)
with torch.no_grad():
generated_ids = self.model.generate(
**inputs, max_new_tokens=1024, do_sample=True, temperature=0.7
)
response = self._decode(inputs, generated_ids)[0].strip()
return self._parse_skills_response(response)
def segment_skills_batch(
self,
video_paths: list[Path],
episode_durations: list[float],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[list[Skill]]:
all_texts = []
all_video_tuples: list[tuple] = []
for video_path, duration in zip(video_paths, episode_durations, strict=True):
prompt = create_skill_segmentation_prompt(coarse_goal, subtask_labels, duration_seconds=duration)
messages = self._build_messages(video_path, duration, prompt)
text = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True, enable_thinking=False
)
_image_inputs, video_inputs = self.process_vision_info(messages, return_video_metadata=True)
all_texts.append(text)
all_video_tuples.extend(video_inputs or [])
videos, video_metadata = None, None
if all_video_tuples:
videos = [v[0] for v in all_video_tuples]
video_metadata = [v[1] for v in all_video_tuples]
inputs = self.processor(
text=all_texts,
videos=videos,
videos_kwargs={
"video_metadata": video_metadata,
"do_sample_frames": False,
},
padding=True,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(
**inputs, max_new_tokens=1024, do_sample=True, temperature=0.7
)
responses = self._decode(inputs, generated_ids)
all_skills = []
for idx, response in enumerate(responses):
try:
skills = self._parse_skills_response(response.strip())
if not skills:
logger.warning(f"No skills parsed for video {idx}")
all_skills.append(skills)
except Exception as e:
logger.warning(f"Failed to parse response for video {idx}: {e}")
all_skills.append([])
return all_skills
def get_vlm(model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16) -> BaseVLM:
"""Create a VLM instance. Defaults to QwenVL which supports the Qwen3.5 series."""
return QwenVL(model_name, device, torch_dtype)
+112 -2
View File
@@ -1,4 +1,5 @@
#!/usr/bin/env python
from __future__ import annotations
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
@@ -21,7 +22,11 @@ from collections import deque
from collections.abc import Iterable, Iterator
from pathlib import Path
from pprint import pformat
from typing import Any
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from lerobot.data_processing.data_annotations.subtask_annotations import EpisodeSkills
from lerobot.datasets.lerobot_dataset import LeRobotDataset
import datasets
import numpy as np
@@ -1216,6 +1221,111 @@ def find_float_index(target, float_list, threshold=1e-6):
return -1
def create_subtasks_dataframe(
annotations: dict[int, EpisodeSkills],
) -> tuple[pd.DataFrame, dict[str, int]]:
"""
Create a subtasks DataFrame from skill annotations.
Args:
annotations: Dictionary of episode skills
Returns:
Tuple of (subtasks_df, skill_to_subtask_idx mapping)
"""
# Collect all unique skill names
all_skill_names: set[str] = set()
for episode_skills in annotations.values():
for skill in episode_skills.skills:
all_skill_names.add(skill.name)
# Build subtasks DataFrame
subtask_data = []
for i, skill_name in enumerate(sorted(all_skill_names)):
subtask_data.append(
{
"subtask": skill_name,
"subtask_index": i,
}
)
if not subtask_data:
subtasks_df = pd.DataFrame(columns=["subtask", "subtask_index"]).set_index("subtask")
else:
subtasks_df = pd.DataFrame(subtask_data).set_index("subtask")
# Build skill name to subtask_index mapping
skill_to_subtask_idx = {
skill_name: int(subtasks_df.loc[skill_name, "subtask_index"]) for skill_name in all_skill_names
}
return subtasks_df, skill_to_subtask_idx
def save_subtasks(
subtasks_df: pd.DataFrame,
dataset_root: Path,
) -> None:
"""Save subtasks to subtasks.parquet."""
output_path = dataset_root / "meta" / "subtasks.parquet"
output_path.parent.mkdir(parents=True, exist_ok=True)
subtasks_df.to_parquet(output_path, engine="pyarrow", compression="snappy")
def create_subtask_index_array(
dataset: LeRobotDataset,
annotations: dict[int, EpisodeSkills],
skill_to_subtask_idx: dict[str, int],
) -> np.ndarray:
"""
Create a subtask_index array for each frame based on skill annotations.
Args:
dataset: The LeRobot dataset
annotations: Dictionary of episode skills
skill_to_subtask_idx: Mapping from skill name to subtask_index
Returns:
Array of subtask indices for each frame in the dataset
"""
# Array to store subtask index for each frame
# Initialize with -1 to indicate unannotated frames
full_dataset_length = len(dataset)
subtask_indices = np.full(full_dataset_length, -1, dtype=np.int64)
# Assign subtask_index for each annotated episode
fps = float(dataset.meta.fps)
for ep_idx, episode_skills in annotations.items():
skills = episode_skills.skills
# Get episode frame range
ep = dataset.meta.episodes[ep_idx]
ep_from = int(ep["dataset_from_index"])
ep_to = int(ep["dataset_to_index"])
# Process each frame in the episode (compute timestamp from index to avoid loading video)
for frame_idx in range(ep_from, ep_to):
timestamp = (frame_idx - ep_from) / fps
# Find which skill covers this timestamp (inline to avoid circular import)
skill = None
for s in skills:
if s.start <= timestamp < s.end:
skill = s
break
if timestamp >= s.end and s == skills[-1]:
skill = s
break
if not skill and skills:
skill = skills[-1]
if skill and skill.name in skill_to_subtask_idx:
subtask_idx = skill_to_subtask_idx[skill.name]
subtask_indices[frame_idx] = subtask_idx
return subtask_indices
class LookBackError(Exception):
"""
Exception raised when trying to look back in the history of a Backtrackable object.
@@ -1279,7 +1389,7 @@ class Backtrackable[T]:
self._history = history
self._lookahead = lookahead
def __iter__(self) -> "Backtrackable[T]":
def __iter__(self) -> Backtrackable[T]:
return self
def __next__(self) -> T:
@@ -0,0 +1,160 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Automatic Skill Annotation for LeRobot Datasets.
This script performs automatic subtask/skill labeling for ANY LeRobot dataset using
Vision-Language Models (VLMs). It segments each robot demonstration into short atomic
skills (1-3 seconds each) and creates a new dataset with subtask annotations.
The pipeline:
1. Loads a LeRobot dataset (local or from HuggingFace Hub)
2. For each episode, extracts video frames
3. Uses a VLM to identify skill boundaries and labels
4. Creates a subtasks.parquet file with unique subtasks
5. Adds a subtask_index feature to the dataset
Supported VLMs (modular design): Qwen2-VL, Qwen3-VL, Qwen3.5-VL (see vlm_annotations.py).
Usage:
lerobot-dataset-subtask-annotate --repo_id=user/dataset --video_key=observation.images.base ...
lerobot-dataset-subtask-annotate --data_dir=/path/to/dataset --video_key=observation.images.base ...
"""
from dataclasses import dataclass
from pathlib import Path
import torch
from lerobot.configs import parser
from lerobot.data_processing.data_annotations.subtask_annotations import (
SkillAnnotator,
save_skill_annotations,
)
from lerobot.data_processing.data_annotations.vlm_annotations import get_vlm
from lerobot.datasets.lerobot_dataset import LeRobotDataset
@dataclass
class SubtaskAnnotateConfig:
"""Configuration for automatic subtask/skill annotation with VLMs."""
# Data source: provide exactly one of data_dir (local) or repo_id (Hub)
data_dir: str | None = None
repo_id: str | None = None
# Video observation key (e.g. observation.images.base)
video_key: str = "observation.images.base"
# VLM model name (default: Qwen/Qwen2-VL-7B-Instruct)
model: str = "Qwen/Qwen2-VL-7B-Instruct"
device: str = "cuda"
dtype: str = "bfloat16"
batch_size: int = 8
# Episode selection (default: all)
episodes: list[int] | None = None
skip_existing: bool = False
# Output
output_dir: str | None = None
output_repo_id: str | None = None
push_to_hub: bool = False
# Closed vocabulary: comma-separated labels (e.g. "label1,label2,label3")
subtask_labels: str | None = None
# Disable timer overlay on video (by default a timer is drawn for the VLM)
no_timer_overlay: bool = False
@parser.wrap()
def subtask_annotate(cfg: SubtaskAnnotateConfig):
"""
Run automatic skill annotation on a LeRobot dataset using a VLM.
Args:
cfg: SubtaskAnnotateConfig with data source, model, and output options.
"""
if (cfg.data_dir is None) == (cfg.repo_id is None):
raise ValueError("Provide exactly one of --data_dir or --repo_id")
# Parse comma-separated subtask labels into a list (or None)
subtask_labels_list: list[str] | None = None
if cfg.subtask_labels and cfg.subtask_labels.strip():
subtask_labels_list = [s.strip() for s in cfg.subtask_labels.split(",") if s.strip()]
dtype_map = {
"bfloat16": torch.bfloat16,
"float16": torch.float16,
"float32": torch.float32,
}
torch_dtype = dtype_map[cfg.dtype]
print("Loading dataset...")
if cfg.data_dir:
dataset = LeRobotDataset(repo_id="local/dataset", root=cfg.data_dir, download_videos=False)
else:
dataset = LeRobotDataset(repo_id=cfg.repo_id, download_videos=True)
print(f" Loaded dataset with {dataset.meta.total_episodes} episodes")
if cfg.video_key not in dataset.meta.video_keys:
available = ", ".join(dataset.meta.video_keys)
raise ValueError(f"Video key '{cfg.video_key}' not found. Available: {available}")
print(f"Initializing VLM: {cfg.model}...")
vlm = get_vlm(cfg.model, cfg.device, torch_dtype)
add_timer_overlay = not cfg.no_timer_overlay
annotator = SkillAnnotator(
vlm=vlm,
batch_size=cfg.batch_size,
add_timer_overlay=add_timer_overlay,
)
print(f"Processing with batch size: {cfg.batch_size}")
annotations = annotator.annotate_dataset(
dataset=dataset,
video_key=cfg.video_key,
episodes=cfg.episodes,
skip_existing=cfg.skip_existing,
subtask_labels=subtask_labels_list,
)
output_dir = Path(cfg.output_dir) if cfg.output_dir else None
output_repo_id = cfg.output_repo_id
new_dataset = save_skill_annotations(dataset, annotations, output_dir, output_repo_id)
total_skills = sum(len(ann.skills) for ann in annotations.values())
print("\nAnnotation complete!")
print(f"Episodes annotated: {len(annotations)}")
print(f"Total subtasks identified: {total_skills}")
print(f"Dataset with subtask_index saved to: {new_dataset.root}")
if cfg.push_to_hub:
if cfg.data_dir:
print("Warning: --push_to_hub requires --repo_id, skipping...")
else:
print("Pushing to HuggingFace Hub...")
try:
new_dataset.push_to_hub(branch="subtasks")
print(f" Pushed to {output_repo_id or cfg.repo_id}")
except Exception as e:
print(f"Push failed: {e}")
def main():
"""CLI entry point that parses config and runs subtask annotation."""
subtask_annotate()
if __name__ == "__main__":
main()
+89
View File
@@ -89,3 +89,92 @@ LIBERO_KEY_JOINTS_POS = "robot_state/joints/pos"
LIBERO_KEY_JOINTS_VEL = "robot_state/joints/vel"
LIBERO_KEY_PIXELS_AGENTVIEW = "pixels/agentview_image"
LIBERO_KEY_PIXELS_EYE_IN_HAND = "pixels/robot0_eye_in_hand_image"
def format_subtask_labels_section(subtask_labels: list[str]) -> str:
"""Format a list of subtask labels for the closed-vocabulary section of the prompt."""
return "\n ".join(f'"{label}"' for label in subtask_labels)
SKILL_SEGMENTATION_PROMPT_TEMPLATE = """# Role
You are a Robotics Vision System specializing in temporal action segmentation for robot manipulation demonstrations.
# Video duration (critical)
The total video length is **{video_duration_seconds} seconds** ({video_duration_mm_ss}). All "start" and "end" values in your JSON must be numeric seconds in the range [0.0, {video_duration_seconds}]. The last skill's "end" must be exactly **{video_duration_seconds}**. Do not stop earlier.
# Task
{goal_context}Segment this robot demonstration video into short atomic manipulation skills. Each skill should:
- Last approximately 1-3 seconds (or longer if the action takes longer)
- Describe a clear, single action (e.g., "pick up object", "move arm left", "release gripper")
- Have precise start and end timestamps in seconds (float)
# Requirements
1. **Atomic Actions**: Each skill should be a single, indivisible action
2. **Complete Coverage**: Skills must cover the entire video from 0.0 to {video_duration_seconds} seconds with no gaps
3. **Boundary Consistency**: The end of one skill equals the start of the next
4. **Natural Language**: Use clear, descriptive names for each skill
5. **Timestamps**: Use seconds as floats (e.g. 12.5) for all timestamps; the last "end" must be {video_duration_seconds}. If the video has a visible timer in the corner showing elapsed time in seconds, use it to report accurate start and end times for each skill.
# Subtask Label Set (Closed Vocabulary)
You MUST strictly identify the video segments using ONLY the following labels. Do not create new labels or modify existing ones:
[
{subtask_labels_section}
]
The video shows one successful execution of all subtasks in a logical order.
# Ground-Truth Semantics (Very Important)
Use **visual state changes** to define when a subtask starts and ends. Do NOT assume equal durations for the subtasks.
- A subtask **starts** at the first frame where the robot's motion clearly initiates that subtask.
- A subtask **ends** at the first frame where that specific action is visually completed and the manipulated object reaches a temporary, stable configuration.
If there are short pauses or micro-motions that don't clearly correspond to a new subtask, they belong to the **current** subtask.
# Hard Constraints & Logic
1. **Continuous Coverage (No Gaps):**
- The entire video from 0.0 to {video_duration_seconds} seconds must be covered by subtasks.
- There can be no gaps between subtasks.
- If there is any idle or ambiguous time between clear actions, extend the *preceding* subtask to cover it.
2. **Boundary Consistency:**
- The `"end"` timestamp of one subtask must be exactly equal to the `"start"` timestamp of the next subtask.
- Boundaries must coincide with a real visual state transition, not just a convenient time split.
3. **Chronological Order, One Occurrence Each:**
- This is a single successful demonstration.
- Each subtask from the vocabulary appears **exactly once**, in the correct logical order.
- **Durations may be very different** between subtasks. Never assume they are similar lengths. Base all boundaries only on the video.
4. **Reject Uniform Segmentation (Important):**
- Do NOT simply divide the video into equal or nearly equal time chunks.
- If your boundaries would result in subtasks with similar durations (e.g. all around 5 seconds), treat this as evidence that your segmentation is wrong and refine the boundaries.
- Only use nearly equal durations if the video truly shows each subtask taking the same amount of time (this is very rare).
5. **Timestamps (critical):**
- Use numeric seconds (float) in the JSON, e.g. 0.0, 5.2, 12.8.
- The first subtask always starts at 0.0.
- The last subtask must end at exactly {video_duration_seconds} (the full video length).
- **Time is displayed inside the video**: a visible timer in one corner shows the elapsed time in seconds (from 0.0 to the end). Use this on-screen timer to set accurate start and end times for each skill.
Format this as a bullet list.
# Output Format
output ONLY valid JSON with this exact structure. The last skill's "end" MUST be exactly {video_duration_seconds}. Use the timestamps you read from the visible timer in the video:
```json
{{
"skills": [
{{"name": "first skill", "start": 0.0, "end": 5.0}},
{{"name": "second skill", "start": 5.0, "end": 12.0}},
{{"name": "last skill", "start": 12.0, "end": {video_duration_seconds}}}
]
}}
```
The first skill must start at 0.0 and the last skill must end at **{video_duration_seconds}** (the total video duration in seconds).
# Strict Structural Rule
This video contains exactly ALL subtasks given to you.
Each segment must use a unique label from the vocabulary.
No label may be repeated.
"""
+168
View File
@@ -23,11 +23,18 @@ These tests verify that:
- Subtask handling gracefully handles missing data
"""
import numpy as np
import pandas as pd
import pytest
import torch
from lerobot.data_processing.data_annotations.subtask_annotations import EpisodeSkills, Skill
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import (
create_subtask_index_array,
create_subtasks_dataframe,
save_subtasks,
)
class TestSubtaskDataset:
@@ -188,3 +195,164 @@ class TestSubtaskEdgeCases:
)
else:
subtask_map[idx] = subtask
class TestCreateSubtasksDataframe:
"""Tests for create_subtasks_dataframe in utils."""
def test_empty_annotations(self):
"""Empty annotations produce empty DataFrame and empty mapping."""
subtasks_df, skill_to_subtask_idx = create_subtasks_dataframe({})
assert len(subtasks_df) == 0
assert list(subtasks_df.columns) == ["subtask_index"]
assert skill_to_subtask_idx == {}
def test_single_episode_single_skill(self):
"""Single episode with one skill produces one row and correct mapping."""
annotations = {
0: EpisodeSkills(
episode_index=0,
description="Pick",
skills=[Skill("pick", 0.0, 1.0)],
),
}
subtasks_df, skill_to_subtask_idx = create_subtasks_dataframe(annotations)
assert len(subtasks_df) == 1
assert subtasks_df.index.tolist() == ["pick"]
assert subtasks_df.loc["pick", "subtask_index"] == 0
assert skill_to_subtask_idx == {"pick": 0}
def test_multiple_episodes_overlapping_skills(self):
"""Multiple episodes with overlapping skill names yield unique sorted skills."""
annotations = {
0: EpisodeSkills(
episode_index=0,
description="Ep0",
skills=[
Skill("place", 0.0, 0.5),
Skill("pick", 0.5, 1.0),
],
),
1: EpisodeSkills(
episode_index=1,
description="Ep1",
skills=[Skill("pick", 0.0, 1.0)],
),
}
subtasks_df, skill_to_subtask_idx = create_subtasks_dataframe(annotations)
# Sorted order: pick, place
assert subtasks_df.index.tolist() == ["pick", "place"]
assert int(subtasks_df.loc["pick", "subtask_index"]) == 0
assert int(subtasks_df.loc["place", "subtask_index"]) == 1
assert skill_to_subtask_idx["pick"] == 0
assert skill_to_subtask_idx["place"] == 1
def test_skills_sorted_alphabetically(self):
"""Subtask rows are in alphabetical order by skill name."""
annotations = {
0: EpisodeSkills(
episode_index=0,
description="Ep",
skills=[
Skill("z_final", 0.0, 0.33),
Skill("a_first", 0.33, 0.66),
Skill("m_mid", 0.66, 1.0),
],
),
}
subtasks_df, _ = create_subtasks_dataframe(annotations)
assert subtasks_df.index.tolist() == ["a_first", "m_mid", "z_final"]
assert list(subtasks_df["subtask_index"]) == [0, 1, 2]
class TestSaveSubtasks:
"""Tests for save_subtasks in utils."""
def test_save_subtasks_creates_file(self, tmp_path):
"""save_subtasks writes meta/subtasks.parquet and creates parent dir."""
subtasks_df = pd.DataFrame(
[{"subtask": "pick", "subtask_index": 0}, {"subtask": "place", "subtask_index": 1}]
).set_index("subtask")
save_subtasks(subtasks_df, tmp_path)
out = tmp_path / "meta" / "subtasks.parquet"
assert out.exists()
read_df = pd.read_parquet(out)
pd.testing.assert_frame_equal(read_df.reset_index(), subtasks_df.reset_index())
def test_save_subtasks_content_matches(self, tmp_path):
"""Saved parquet round-trips with same content."""
subtasks_df = pd.DataFrame(
[{"subtask": "a", "subtask_index": 0}, {"subtask": "b", "subtask_index": 1}]
).set_index("subtask")
save_subtasks(subtasks_df, tmp_path)
read_df = pd.read_parquet(tmp_path / "meta" / "subtasks.parquet")
assert read_df.index.tolist() == subtasks_df.index.tolist()
assert list(read_df["subtask_index"]) == list(subtasks_df["subtask_index"])
class TestCreateSubtaskIndexArray:
"""Tests for create_subtask_index_array in utils."""
@pytest.fixture
def dataset_with_episodes(self, tmp_path, empty_lerobot_dataset_factory):
"""Dataset with two episodes (10 frames each) for index-array tests."""
features = {"state": {"dtype": "float32", "shape": (2,), "names": None}}
dataset = empty_lerobot_dataset_factory(root=tmp_path / "subtask_idx", features=features)
for _ in range(10):
dataset.add_frame({"state": torch.randn(2), "task": "Task A"})
dataset.save_episode()
for _ in range(10):
dataset.add_frame({"state": torch.randn(2), "task": "Task B"})
dataset.save_episode()
dataset.finalize()
return LeRobotDataset(dataset.repo_id, root=dataset.root)
def test_unannotated_all_minus_one(self, dataset_with_episodes):
"""With no annotations, all frame indices are -1."""
skill_to_subtask_idx = {"pick": 0, "place": 1}
arr = create_subtask_index_array(dataset_with_episodes, {}, skill_to_subtask_idx)
assert len(arr) == len(dataset_with_episodes)
assert arr.dtype == np.int64
assert np.all(arr == -1)
def test_annotated_episode_assigns_by_timestamp(self, dataset_with_episodes):
"""Frames in an annotated episode get subtask index from skill time ranges."""
# Dataset uses DEFAULT_FPS=30. Episode 0: 10 frames -> timestamps 0, 1/30, ..., 9/30 (~0.3s).
# Skills: "pick" [0, 0.2), "place" [0.2, 0.5). At 30 fps: 0.2s = 6 frames, so frames 0-5 = pick, 6-9 = place.
annotations = {
0: EpisodeSkills(
episode_index=0,
description="Pick and place",
skills=[
Skill("pick", 0.0, 0.2), # frames 0-5 at 30 fps
Skill("place", 0.2, 0.5), # frames 6-9 at 30 fps
],
),
}
skill_to_subtask_idx = {"pick": 0, "place": 1}
arr = create_subtask_index_array(dataset_with_episodes, annotations, skill_to_subtask_idx)
assert len(arr) == 20
# Episode 0: from_index=0, to_index=10 at 30 fps
for i in range(6):
assert arr[i] == 0, f"frame {i} should be pick"
for i in range(6, 10):
assert arr[i] == 1, f"frame {i} should be place"
# Episode 1 not annotated
for i in range(10, 20):
assert arr[i] == -1
def test_partial_annotations_leave_others_minus_one(self, dataset_with_episodes):
"""Only annotated episodes get non -1 indices; others stay -1."""
annotations = {
1: EpisodeSkills(
episode_index=1,
description="Place only",
skills=[Skill("place", 0.0, 1.0)],
),
}
skill_to_subtask_idx = {"place": 0}
arr = create_subtask_index_array(dataset_with_episodes, annotations, skill_to_subtask_idx)
for i in range(10):
assert arr[i] == -1
for i in range(10, 20):
assert arr[i] == 0