mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-28 15:09:51 +00:00
add high/low/normal level annotation
This commit is contained in:
@@ -57,6 +57,7 @@ from lerobot.datasets.utils import (
|
|||||||
load_info,
|
load_info,
|
||||||
load_nested_dataset,
|
load_nested_dataset,
|
||||||
load_stats,
|
load_stats,
|
||||||
|
load_subtasks,
|
||||||
load_tasks,
|
load_tasks,
|
||||||
load_tasks_high_level,
|
load_tasks_high_level,
|
||||||
update_chunk_file_indices,
|
update_chunk_file_indices,
|
||||||
@@ -164,6 +165,7 @@ class LeRobotDatasetMetadata:
|
|||||||
check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION)
|
check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION)
|
||||||
self.tasks = load_tasks(self.root)
|
self.tasks = load_tasks(self.root)
|
||||||
self.tasks_high_level = load_tasks_high_level(self.root)
|
self.tasks_high_level = load_tasks_high_level(self.root)
|
||||||
|
self.subtasks = load_subtasks(self.root)
|
||||||
self.episodes = load_episodes(self.root)
|
self.episodes = load_episodes(self.root)
|
||||||
self.stats = load_stats(self.root)
|
self.stats = load_stats(self.root)
|
||||||
|
|
||||||
@@ -520,6 +522,8 @@ class LeRobotDatasetMetadata:
|
|||||||
_validate_feature_names(features)
|
_validate_feature_names(features)
|
||||||
|
|
||||||
obj.tasks = None
|
obj.tasks = None
|
||||||
|
obj.tasks_high_level = None
|
||||||
|
obj.subtasks = None
|
||||||
obj.episodes = None
|
obj.episodes = None
|
||||||
obj.stats = None
|
obj.stats = None
|
||||||
obj.info = create_empty_dataset_info(
|
obj.info = create_empty_dataset_info(
|
||||||
@@ -1068,6 +1072,12 @@ class LeRobotDataset(torch.utils.data.Dataset):
|
|||||||
high_level_task_idx = item["task_index_high_level"].item()
|
high_level_task_idx = item["task_index_high_level"].item()
|
||||||
item["robot_utterance"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["robot_utterance"]
|
item["robot_utterance"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["robot_utterance"]
|
||||||
item["user_prompt"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["user_prompt"]
|
item["user_prompt"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["user_prompt"]
|
||||||
|
|
||||||
|
# optionally add subtask information
|
||||||
|
if "subtask_index" in self.features and self.meta.subtasks is not None:
|
||||||
|
subtask_idx = item["subtask_index"].item()
|
||||||
|
item["subtask"] = self.meta.subtasks.iloc[subtask_idx].name
|
||||||
|
|
||||||
return item
|
return item
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
|||||||
@@ -62,6 +62,7 @@ CHUNK_FILE_PATTERN = "chunk-{chunk_index:03d}/file-{file_index:03d}"
|
|||||||
DEFAULT_TASKS_PATH = "meta/tasks.parquet"
|
DEFAULT_TASKS_PATH = "meta/tasks.parquet"
|
||||||
DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
|
DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
|
||||||
DEFAULT_TASKS_HIGH_LEVEL_PATH = "meta/tasks_high_level.parquet"
|
DEFAULT_TASKS_HIGH_LEVEL_PATH = "meta/tasks_high_level.parquet"
|
||||||
|
DEFAULT_SUBTASKS_PATH = "meta/subtasks.parquet"
|
||||||
DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
|
DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
|
||||||
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
|
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
|
||||||
DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png"
|
DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png"
|
||||||
@@ -353,9 +354,20 @@ def load_tasks(local_dir: Path) -> pandas.DataFrame:
|
|||||||
tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_PATH)
|
tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_PATH)
|
||||||
return tasks
|
return tasks
|
||||||
|
|
||||||
def load_tasks_high_level(local_dir: Path) -> pandas.DataFrame:
|
def load_tasks_high_level(local_dir: Path) -> pandas.DataFrame | None:
|
||||||
tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_HIGH_LEVEL_PATH)
|
"""Load high-level tasks from tasks_high_level.parquet if it exists."""
|
||||||
return tasks
|
tasks_high_level_path = local_dir / DEFAULT_TASKS_HIGH_LEVEL_PATH
|
||||||
|
if tasks_high_level_path.exists():
|
||||||
|
return pd.read_parquet(tasks_high_level_path)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def load_subtasks(local_dir: Path) -> pandas.DataFrame | None:
|
||||||
|
"""Load subtasks from subtasks.parquet if it exists."""
|
||||||
|
subtasks_path = local_dir / DEFAULT_SUBTASKS_PATH
|
||||||
|
if subtasks_path.exists():
|
||||||
|
return pd.read_parquet(subtasks_path)
|
||||||
|
return None
|
||||||
|
|
||||||
def write_episodes(episodes: Dataset, local_dir: Path) -> None:
|
def write_episodes(episodes: Dataset, local_dir: Path) -> None:
|
||||||
"""Write episode metadata to a parquet file in the LeRobot v3.0 format.
|
"""Write episode metadata to a parquet file in the LeRobot v3.0 format.
|
||||||
|
|||||||
@@ -9,21 +9,28 @@ MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct"
|
|||||||
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
|
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
|
||||||
|
|
||||||
|
|
||||||
OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen"
|
OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen_new"
|
||||||
|
|
||||||
BATCH_SIZE=32
|
BATCH_SIZE=32
|
||||||
TEMPERATURE=0.9
|
TEMPERATURE=0.9
|
||||||
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
|
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
|
||||||
|
|
||||||
# run synthetic data generation (all episodes processed)
|
# Run subtask annotation
|
||||||
python examples/dataset/annotate_pgen.py \
|
python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
|
||||||
--repo-id "$REPO_ID" \
|
--repo-id "$REPO_ID" \
|
||||||
--model "$MODEL" \
|
--video-key observation.images.base \
|
||||||
--output-dir "$OUTPUT_DIR" \
|
--output-dir "$OUTPUT_DIR" \
|
||||||
--temperature "$TEMPERATURE" \
|
--output-repo-id "jadechoghari/collect-data-with-subtasks"
|
||||||
--batch-size "$BATCH_SIZE" \
|
# run synthetic data generation (all episodes processed)
|
||||||
--sample-interval "$SAMPLE_INTERVAL" \
|
# python examples/dataset/annotate_pgen.py \
|
||||||
--image-key observation.images.base \
|
# --repo-id "$REPO_ID" \
|
||||||
--num-image-views-per-sample 1
|
# --model "$MODEL" \
|
||||||
|
# --output-dir "$OUTPUT_DIR" \
|
||||||
|
# --temperature "$TEMPERATURE" \
|
||||||
|
# --batch-size "$BATCH_SIZE" \
|
||||||
|
# --sample-interval "$SAMPLE_INTERVAL" \
|
||||||
|
# --image-key observation.images.base \
|
||||||
|
# --num-image-views-per-sample 1
|
||||||
|
|
||||||
# for faster testing, increase sample interval:
|
# for faster testing, increase sample interval:
|
||||||
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
|
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
|
||||||
|
|||||||
@@ -19,13 +19,17 @@ Automatic Skill Annotation for LeRobot Datasets.
|
|||||||
|
|
||||||
This script performs automatic subtask/skill labeling for ANY LeRobot dataset using
|
This script performs automatic subtask/skill labeling for ANY LeRobot dataset using
|
||||||
Vision-Language Models (VLMs). It segments each robot demonstration into short atomic
|
Vision-Language Models (VLMs). It segments each robot demonstration into short atomic
|
||||||
skills (1-3 seconds each) and updates the dataset's task field.
|
skills (1-3 seconds each) and creates a new dataset with subtask annotations.
|
||||||
|
|
||||||
The pipeline:
|
The pipeline:
|
||||||
1. Loads a LeRobot dataset (local or from HuggingFace Hub)
|
1. Loads a LeRobot dataset (local or from HuggingFace Hub)
|
||||||
2. For each episode, extracts video frames
|
2. For each episode, extracts video frames
|
||||||
3. Uses a VLM to identify skill boundaries and labels
|
3. Uses a VLM to identify skill boundaries and labels
|
||||||
4. Updates the dataset's task metadata with skill annotations
|
4. Creates a subtasks.parquet file with unique subtasks
|
||||||
|
5. Adds a subtask_index feature to the dataset
|
||||||
|
|
||||||
|
NOTE: This script does NOT modify the original tasks.parquet file. It creates a
|
||||||
|
separate subtask hierarchy while preserving the original task annotations.
|
||||||
|
|
||||||
Supported VLMs (modular design allows easy extension):
|
Supported VLMs (modular design allows easy extension):
|
||||||
- Qwen2-VL (default): "Qwen/Qwen2-VL-7B-Instruct"
|
- Qwen2-VL (default): "Qwen/Qwen2-VL-7B-Instruct"
|
||||||
@@ -37,6 +41,7 @@ python examples/dataset/annotate.py \
|
|||||||
--repo-id your-username/your-dataset \
|
--repo-id your-username/your-dataset \
|
||||||
--video-key observation.images.base \
|
--video-key observation.images.base \
|
||||||
--model Qwen/Qwen2-VL-7B-Instruct \
|
--model Qwen/Qwen2-VL-7B-Instruct \
|
||||||
|
--output-dir /path/to/output \
|
||||||
--push-to-hub
|
--push-to-hub
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -44,14 +49,16 @@ Or with a local dataset:
|
|||||||
```bash
|
```bash
|
||||||
python examples/dataset/annotate.py \
|
python examples/dataset/annotate.py \
|
||||||
--data-dir /path/to/local/dataset \
|
--data-dir /path/to/local/dataset \
|
||||||
--video-key observation.images.base
|
--video-key observation.images.base \
|
||||||
|
--output-dir /path/to/output
|
||||||
```
|
```
|
||||||
After running, you can access the skill for any frame via:
|
|
||||||
|
After running, you can access the subtask for any frame via:
|
||||||
```python
|
```python
|
||||||
dataset = LeRobotDataset(repo_id="your/dataset")
|
dataset = LeRobotDataset(repo_id="your/dataset_with_subtasks")
|
||||||
item = dataset[100]
|
item = dataset[100]
|
||||||
task_idx = item["task_index"]
|
subtask_idx = item["subtask_index"]
|
||||||
skill_name = dataset.meta.tasks.iloc[task_idx.item()].name
|
subtask_name = dataset.meta.subtasks.iloc[subtask_idx.item()].name
|
||||||
```
|
```
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -66,10 +73,13 @@ from pathlib import Path
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import cv2
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
import torch
|
import torch
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.progress import Progress, SpinnerColumn, TextColumn
|
from rich.progress import Progress, SpinnerColumn, TextColumn
|
||||||
|
|
||||||
|
from lerobot.datasets.dataset_tools import add_features
|
||||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||||
|
|
||||||
|
|
||||||
@@ -856,24 +866,18 @@ def get_skill_for_timestamp(skills: list[Skill], timestamp: float) -> Skill | No
|
|||||||
return skills[-1] if skills else None # Fallback to last skill
|
return skills[-1] if skills else None # Fallback to last skill
|
||||||
|
|
||||||
|
|
||||||
def update_dataset_tasks(
|
def create_subtasks_dataframe(
|
||||||
dataset: LeRobotDataset,
|
|
||||||
annotations: dict[int, EpisodeSkills],
|
annotations: dict[int, EpisodeSkills],
|
||||||
) -> dict[str, int]:
|
) -> tuple[pd.DataFrame, dict[str, int]]:
|
||||||
"""
|
"""
|
||||||
Register all unique skill names as new tasks in the dataset.
|
Create a subtasks DataFrame from skill annotations.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
dataset: The LeRobot dataset to update
|
|
||||||
annotations: Dictionary of episode skills
|
annotations: Dictionary of episode skills
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary mapping skill name to task_index
|
Tuple of (subtasks_df, skill_to_subtask_idx mapping)
|
||||||
"""
|
"""
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
from lerobot.datasets.utils import write_tasks
|
|
||||||
|
|
||||||
console = Console()
|
console = Console()
|
||||||
|
|
||||||
# Collect all unique skill names
|
# Collect all unique skill names
|
||||||
@@ -882,99 +886,69 @@ def update_dataset_tasks(
|
|||||||
for skill in episode_skills.skills:
|
for skill in episode_skills.skills:
|
||||||
all_skill_names.add(skill.name)
|
all_skill_names.add(skill.name)
|
||||||
|
|
||||||
console.print(f"[cyan]Found {len(all_skill_names)} unique skills[/cyan]")
|
console.print(f"[cyan]Found {len(all_skill_names)} unique subtasks[/cyan]")
|
||||||
|
|
||||||
# Build new tasks DataFrame
|
# Build subtasks DataFrame
|
||||||
# Start with existing tasks if any
|
subtask_data = []
|
||||||
if dataset.meta.tasks is not None and len(dataset.meta.tasks) > 0:
|
for i, skill_name in enumerate(sorted(all_skill_names)):
|
||||||
existing_tasks = set(dataset.meta.tasks.index.tolist())
|
subtask_data.append({
|
||||||
max_task_idx = dataset.meta.tasks["task_index"].max()
|
"subtask": skill_name,
|
||||||
else:
|
"subtask_index": i,
|
||||||
existing_tasks = set()
|
|
||||||
max_task_idx = -1
|
|
||||||
|
|
||||||
# Add new skills as tasks
|
|
||||||
new_tasks = all_skill_names - existing_tasks
|
|
||||||
if new_tasks:
|
|
||||||
new_task_data = []
|
|
||||||
for i, skill_name in enumerate(sorted(new_tasks)):
|
|
||||||
new_task_data.append({
|
|
||||||
"task": skill_name,
|
|
||||||
"task_index": max_task_idx + 1 + i,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
new_tasks_df = pd.DataFrame(new_task_data).set_index("task")
|
subtasks_df = pd.DataFrame(subtask_data).set_index("subtask")
|
||||||
|
|
||||||
if dataset.meta.tasks is not None and len(dataset.meta.tasks) > 0:
|
# Build skill name to subtask_index mapping
|
||||||
dataset.meta.tasks = pd.concat([dataset.meta.tasks, new_tasks_df])
|
skill_to_subtask_idx = {
|
||||||
else:
|
skill_name: int(subtasks_df.loc[skill_name, "subtask_index"])
|
||||||
dataset.meta.tasks = new_tasks_df
|
for skill_name in all_skill_names
|
||||||
|
|
||||||
# Write updated tasks to disk
|
|
||||||
write_tasks(dataset.meta.tasks, dataset.root)
|
|
||||||
console.print(f"[green]✓ Added {len(new_tasks)} new tasks to tasks.parquet[/green]")
|
|
||||||
|
|
||||||
# Build skill name to task_index mapping
|
|
||||||
skill_to_task_idx = {
|
|
||||||
task_name: int(dataset.meta.tasks.loc[task_name, "task_index"])
|
|
||||||
for task_name in all_skill_names
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return skill_to_task_idx
|
return subtasks_df, skill_to_subtask_idx
|
||||||
|
|
||||||
|
|
||||||
def update_frame_task_indices(
|
def save_subtasks(
|
||||||
dataset: LeRobotDataset,
|
subtasks_df: pd.DataFrame,
|
||||||
annotations: dict[int, EpisodeSkills],
|
dataset_root: Path,
|
||||||
skill_to_task_idx: dict[str, int],
|
console: Console | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""Save subtasks to subtasks.parquet."""
|
||||||
Update the task_index for each frame based on skill annotations.
|
if console is None:
|
||||||
|
|
||||||
This reads the data parquet files, updates task_index based on which
|
|
||||||
skill covers each frame's timestamp, and writes back to disk.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
dataset: The LeRobot dataset to update
|
|
||||||
annotations: Dictionary of episode skills
|
|
||||||
skill_to_task_idx: Mapping from skill name to task_index
|
|
||||||
"""
|
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
console = Console()
|
console = Console()
|
||||||
|
|
||||||
# Group episodes by their data file (chunk_index, file_index)
|
output_path = dataset_root / "meta" / "subtasks.parquet"
|
||||||
episodes_by_file: dict[tuple[int, int], list[int]] = {}
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
for ep_idx in annotations.keys():
|
|
||||||
ep = dataset.meta.episodes[ep_idx]
|
|
||||||
chunk_idx = ep["data/chunk_index"]
|
|
||||||
file_idx = ep["data/file_index"]
|
|
||||||
key = (chunk_idx, file_idx)
|
|
||||||
if key not in episodes_by_file:
|
|
||||||
episodes_by_file[key] = []
|
|
||||||
episodes_by_file[key].append(ep_idx)
|
|
||||||
|
|
||||||
# Process each data file
|
subtasks_df.to_parquet(output_path, engine="pyarrow", compression="snappy")
|
||||||
for (chunk_idx, file_idx), episode_indices in episodes_by_file.items():
|
console.print(f"[green]✓ Saved subtasks to {output_path}[/green]")
|
||||||
data_path = dataset.root / dataset.meta.data_path.format(
|
|
||||||
chunk_index=chunk_idx, file_index=file_idx
|
|
||||||
)
|
|
||||||
|
|
||||||
if not data_path.exists():
|
|
||||||
console.print(f"[yellow]Warning: Data file not found: {data_path}[/yellow]")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Read the parquet file
|
def create_subtask_index_array(
|
||||||
df = pd.read_parquet(data_path)
|
dataset: LeRobotDataset,
|
||||||
original_task_indices = df["task_index"].copy()
|
annotations: dict[int, EpisodeSkills],
|
||||||
updated_count = 0
|
skill_to_subtask_idx: dict[str, int],
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""
|
||||||
|
Create a subtask_index array for each frame based on skill annotations.
|
||||||
|
|
||||||
# Update task_index for each episode in this file
|
Args:
|
||||||
for ep_idx in episode_indices:
|
dataset: The LeRobot dataset
|
||||||
if ep_idx not in annotations:
|
annotations: Dictionary of episode skills
|
||||||
continue
|
skill_to_subtask_idx: Mapping from skill name to subtask_index
|
||||||
|
|
||||||
episode_skills = annotations[ep_idx]
|
Returns:
|
||||||
|
Array of subtask indices for each frame in the dataset
|
||||||
|
"""
|
||||||
|
console = Console()
|
||||||
|
|
||||||
|
# Array to store subtask index for each frame
|
||||||
|
full_dataset_length = len(dataset)
|
||||||
|
subtask_indices = np.zeros(full_dataset_length, dtype=np.int64)
|
||||||
|
|
||||||
|
console.print(f"[cyan]Creating subtask_index array for {full_dataset_length} frames...[/cyan]")
|
||||||
|
|
||||||
|
# Assign subtask_index for each annotated episode
|
||||||
|
for ep_idx, episode_skills in annotations.items():
|
||||||
skills = episode_skills.skills
|
skills = episode_skills.skills
|
||||||
|
|
||||||
# Get episode frame range
|
# Get episode frame range
|
||||||
@@ -982,63 +956,65 @@ def update_frame_task_indices(
|
|||||||
ep_from = ep["dataset_from_index"]
|
ep_from = ep["dataset_from_index"]
|
||||||
ep_to = ep["dataset_to_index"]
|
ep_to = ep["dataset_to_index"]
|
||||||
|
|
||||||
# Filter to rows for this episode
|
# Process each frame in the episode
|
||||||
episode_mask = (df["index"] >= ep_from) & (df["index"] < ep_to)
|
for frame_idx in range(ep_from, ep_to):
|
||||||
episode_rows = df.loc[episode_mask]
|
frame = dataset[frame_idx]
|
||||||
|
timestamp = frame["timestamp"].item()
|
||||||
|
|
||||||
# Update task_index for each frame based on its timestamp
|
# Find which skill covers this timestamp
|
||||||
for idx, row in episode_rows.iterrows():
|
|
||||||
timestamp = row["timestamp"]
|
|
||||||
skill = get_skill_for_timestamp(skills, timestamp)
|
skill = get_skill_for_timestamp(skills, timestamp)
|
||||||
|
|
||||||
if skill and skill.name in skill_to_task_idx:
|
if skill and skill.name in skill_to_subtask_idx:
|
||||||
new_task_idx = skill_to_task_idx[skill.name]
|
subtask_idx = skill_to_subtask_idx[skill.name]
|
||||||
if df.at[idx, "task_index"] != new_task_idx:
|
subtask_indices[frame_idx] = subtask_idx
|
||||||
df.at[idx, "task_index"] = new_task_idx
|
|
||||||
updated_count += 1
|
|
||||||
|
|
||||||
# Write back if any changes were made
|
console.print(f"[green]✓ Created subtask_index array[/green]")
|
||||||
if updated_count > 0:
|
return subtask_indices
|
||||||
df.to_parquet(data_path, engine="pyarrow", compression="snappy", index=False)
|
|
||||||
console.print(
|
|
||||||
f"[green]✓ Updated {updated_count} frame task_indices in {data_path.name}[/green]"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def save_skill_annotations(
|
def save_skill_annotations(
|
||||||
dataset: LeRobotDataset,
|
dataset: LeRobotDataset,
|
||||||
annotations: dict[int, EpisodeSkills],
|
annotations: dict[int, EpisodeSkills],
|
||||||
output_path: Path | None = None,
|
output_dir: Path | None = None,
|
||||||
) -> None:
|
repo_id: str | None = None,
|
||||||
|
) -> LeRobotDataset:
|
||||||
"""
|
"""
|
||||||
Save skill annotations to the dataset, updating both:
|
Save skill annotations to the dataset by:
|
||||||
1. The tasks.parquet with new skill names
|
1. Creating a subtasks.parquet file with unique subtasks
|
||||||
2. The per-frame task_index in data parquet files
|
2. Adding a subtask_index feature to the dataset
|
||||||
|
3. Saving raw skill annotations as JSON for reference
|
||||||
|
|
||||||
This function updates the task field for each frame based on
|
This function does NOT modify tasks.parquet - it keeps the original tasks intact
|
||||||
which skill covers that frame's timestamp.
|
and creates a separate subtask hierarchy.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
dataset: The LeRobot dataset to update
|
dataset: The LeRobot dataset to annotate
|
||||||
annotations: Dictionary of episode skills
|
annotations: Dictionary of episode skills
|
||||||
output_path: Optional custom output path for the annotations JSON
|
output_dir: Optional directory to save the modified dataset
|
||||||
|
repo_id: Optional repository ID for the new dataset
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
New dataset with subtask_index feature added
|
||||||
"""
|
"""
|
||||||
console = Console()
|
console = Console()
|
||||||
|
|
||||||
if not annotations:
|
if not annotations:
|
||||||
console.print("[yellow]No annotations to save[/yellow]")
|
console.print("[yellow]No annotations to save[/yellow]")
|
||||||
return
|
return dataset
|
||||||
|
|
||||||
# Step 1: Register all unique skills as tasks
|
# Step 1: Create subtasks DataFrame
|
||||||
console.print("[cyan]Registering skills as tasks...[/cyan]")
|
console.print("[cyan]Creating subtasks DataFrame...[/cyan]")
|
||||||
skill_to_task_idx = update_dataset_tasks(dataset, annotations)
|
subtasks_df, skill_to_subtask_idx = create_subtasks_dataframe(annotations)
|
||||||
|
|
||||||
# Step 2: Update per-frame task_index in data parquet files
|
# Step 2: Create subtask_index array for all frames
|
||||||
console.print("[cyan]Updating per-frame task indices...[/cyan]")
|
console.print("[cyan]Creating subtask_index array...[/cyan]")
|
||||||
update_frame_task_indices(dataset, annotations, skill_to_task_idx)
|
subtask_indices = create_subtask_index_array(dataset, annotations, skill_to_subtask_idx)
|
||||||
|
|
||||||
# Step 3: Also save the raw skill annotations as JSON for reference
|
# Step 3: Save subtasks.parquet to the original dataset root
|
||||||
skills_path = output_path or (dataset.root / "meta" / "skills.json")
|
save_subtasks(subtasks_df, dataset.root, console)
|
||||||
|
|
||||||
|
# Step 4: Save the raw skill annotations as JSON for reference
|
||||||
|
skills_path = dataset.root / "meta" / "skills.json"
|
||||||
skills_path.parent.mkdir(parents=True, exist_ok=True)
|
skills_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
# Load existing skills data if it exists and is not empty
|
# Load existing skills data if it exists and is not empty
|
||||||
@@ -1062,16 +1038,16 @@ def save_skill_annotations(
|
|||||||
merged_episodes = existing_skills_data.get("episodes", {}).copy()
|
merged_episodes = existing_skills_data.get("episodes", {}).copy()
|
||||||
merged_episodes.update(new_episodes)
|
merged_episodes.update(new_episodes)
|
||||||
|
|
||||||
# Merge skill_to_task_index mappings
|
# Merge skill_to_subtask_index mappings
|
||||||
merged_skill_to_task = existing_skills_data.get("skill_to_task_index", {}).copy()
|
merged_skill_to_subtask = existing_skills_data.get("skill_to_subtask_index", {}).copy()
|
||||||
merged_skill_to_task.update(skill_to_task_idx)
|
merged_skill_to_subtask.update(skill_to_subtask_idx)
|
||||||
|
|
||||||
# Use existing coarse_description if available, otherwise use new one
|
# Use existing coarse_description if available, otherwise use new one
|
||||||
coarse_desc = existing_skills_data.get("coarse_description", annotations[next(iter(annotations))].description)
|
coarse_desc = existing_skills_data.get("coarse_description", annotations[next(iter(annotations))].description)
|
||||||
|
|
||||||
skills_data = {
|
skills_data = {
|
||||||
"coarse_description": coarse_desc,
|
"coarse_description": coarse_desc,
|
||||||
"skill_to_task_index": merged_skill_to_task,
|
"skill_to_subtask_index": merged_skill_to_subtask,
|
||||||
"episodes": merged_episodes,
|
"episodes": merged_episodes,
|
||||||
}
|
}
|
||||||
console.print(f"[cyan]Updated {len(new_episodes)} episode(s), total episodes in skills.json: {len(merged_episodes)}[/cyan]")
|
console.print(f"[cyan]Updated {len(new_episodes)} episode(s), total episodes in skills.json: {len(merged_episodes)}[/cyan]")
|
||||||
@@ -1079,7 +1055,7 @@ def save_skill_annotations(
|
|||||||
# No existing data, create new
|
# No existing data, create new
|
||||||
skills_data = {
|
skills_data = {
|
||||||
"coarse_description": annotations[next(iter(annotations))].description,
|
"coarse_description": annotations[next(iter(annotations))].description,
|
||||||
"skill_to_task_index": skill_to_task_idx,
|
"skill_to_subtask_index": skill_to_subtask_idx,
|
||||||
"episodes": new_episodes,
|
"episodes": new_episodes,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1088,8 +1064,49 @@ def save_skill_annotations(
|
|||||||
|
|
||||||
console.print(f"[green]✓ Saved skill annotations to {skills_path}[/green]")
|
console.print(f"[green]✓ Saved skill annotations to {skills_path}[/green]")
|
||||||
|
|
||||||
# Reload the dataset's hf_dataset to reflect changes
|
# Step 5: Add subtask_index feature to dataset using add_features
|
||||||
dataset._lazy_loading = True
|
console.print("[cyan]Adding subtask_index feature to dataset...[/cyan]")
|
||||||
|
|
||||||
|
# Determine output directory and repo_id
|
||||||
|
if output_dir is None:
|
||||||
|
output_dir = dataset.root.parent / f"{dataset.root.name}_with_subtasks"
|
||||||
|
else:
|
||||||
|
output_dir = Path(output_dir)
|
||||||
|
|
||||||
|
if repo_id is None:
|
||||||
|
repo_id = f"{dataset.repo_id}_with_subtasks"
|
||||||
|
|
||||||
|
# Add feature using dataset_tools
|
||||||
|
feature_info = {
|
||||||
|
"dtype": "int64",
|
||||||
|
"shape": (1,),
|
||||||
|
"names": None,
|
||||||
|
}
|
||||||
|
new_dataset = add_features(
|
||||||
|
dataset=dataset,
|
||||||
|
features={
|
||||||
|
"subtask_index": (subtask_indices, feature_info),
|
||||||
|
},
|
||||||
|
output_dir=output_dir,
|
||||||
|
repo_id=repo_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Copy subtasks.parquet to new output directory
|
||||||
|
import shutil
|
||||||
|
shutil.copy(
|
||||||
|
dataset.root / "meta" / "subtasks.parquet",
|
||||||
|
output_dir / "meta" / "subtasks.parquet"
|
||||||
|
)
|
||||||
|
shutil.copy(
|
||||||
|
dataset.root / "meta" / "skills.json",
|
||||||
|
output_dir / "meta" / "skills.json"
|
||||||
|
)
|
||||||
|
|
||||||
|
console.print(f"[bold green]✓ Successfully added subtask_index feature![/bold green]")
|
||||||
|
console.print(f" New dataset saved to: {new_dataset.root}")
|
||||||
|
console.print(f" Total subtasks: {len(subtasks_df)}")
|
||||||
|
|
||||||
|
return new_dataset
|
||||||
|
|
||||||
|
|
||||||
def load_skill_annotations(dataset_root: Path) -> dict | None:
|
def load_skill_annotations(dataset_root: Path) -> dict | None:
|
||||||
@@ -1112,17 +1129,20 @@ def main():
|
|||||||
epilog=textwrap.dedent("""\
|
epilog=textwrap.dedent("""\
|
||||||
Examples:
|
Examples:
|
||||||
# Annotate a HuggingFace Hub dataset
|
# Annotate a HuggingFace Hub dataset
|
||||||
python annotate.py --repo-id user/dataset --video-key observation.images.base
|
python annotate.py --repo-id user/dataset --video-key observation.images.base \\
|
||||||
|
--output-dir ./output
|
||||||
|
|
||||||
# Annotate a local dataset with custom batch size
|
# Annotate a local dataset with custom batch size
|
||||||
python annotate.py --data-dir /path/to/dataset --video-key observation.images.base --batch-size 16
|
python annotate.py --data-dir /path/to/dataset --video-key observation.images.base \\
|
||||||
|
--batch-size 16 --output-dir ./output
|
||||||
|
|
||||||
# Use a specific model
|
# Use a specific model
|
||||||
python annotate.py --repo-id user/dataset --video-key observation.images.base \\
|
python annotate.py --repo-id user/dataset --video-key observation.images.base \\
|
||||||
--model Qwen/Qwen2-VL-7B-Instruct
|
--model Qwen/Qwen2-VL-7B-Instruct --output-dir ./output
|
||||||
|
|
||||||
# Push annotated dataset to Hub
|
# Push annotated dataset to Hub
|
||||||
python annotate.py --repo-id user/dataset --video-key observation.images.base --push-to-hub
|
python annotate.py --repo-id user/dataset --video-key observation.images.base \\
|
||||||
|
--output-dir ./output --push-to-hub
|
||||||
"""),
|
"""),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1186,9 +1206,14 @@ def main():
|
|||||||
help="Push annotated dataset to HuggingFace Hub",
|
help="Push annotated dataset to HuggingFace Hub",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--output-path",
|
"--output-dir",
|
||||||
type=str,
|
type=str,
|
||||||
help="Custom output path for annotations JSON",
|
help="Output directory for modified dataset with subtask_index feature",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--output-repo-id",
|
||||||
|
type=str,
|
||||||
|
help="Repository ID for the new dataset (default: original_repo_id_with_subtasks)",
|
||||||
)
|
)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
@@ -1232,14 +1257,16 @@ def main():
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Save annotations
|
# Save annotations
|
||||||
output_path = Path(args.output_path) if args.output_path else None
|
output_dir = Path(args.output_dir) if args.output_dir else None
|
||||||
save_skill_annotations(dataset, annotations, output_path)
|
output_repo_id = args.output_repo_id if args.output_repo_id else None
|
||||||
|
new_dataset = save_skill_annotations(dataset, annotations, output_dir, output_repo_id)
|
||||||
|
|
||||||
# Summary
|
# Summary
|
||||||
total_skills = sum(len(ann.skills) for ann in annotations.values())
|
total_skills = sum(len(ann.skills) for ann in annotations.values())
|
||||||
console.print(f"\n[bold green]✓ Annotation complete![/bold green]")
|
console.print(f"\n[bold green]✓ Annotation complete![/bold green]")
|
||||||
console.print(f" Episodes annotated: {len(annotations)}")
|
console.print(f" Episodes annotated: {len(annotations)}")
|
||||||
console.print(f" Total skills identified: {total_skills}")
|
console.print(f" Total subtasks identified: {total_skills}")
|
||||||
|
console.print(f" Dataset with subtask_index saved to: {new_dataset.root}")
|
||||||
|
|
||||||
# Push to hub if requested
|
# Push to hub if requested
|
||||||
if args.push_to_hub:
|
if args.push_to_hub:
|
||||||
@@ -1248,8 +1275,8 @@ def main():
|
|||||||
else:
|
else:
|
||||||
console.print("[cyan]Pushing to HuggingFace Hub...[/cyan]")
|
console.print("[cyan]Pushing to HuggingFace Hub...[/cyan]")
|
||||||
try:
|
try:
|
||||||
dataset.push_to_hub(push_videos=False)
|
new_dataset.push_to_hub(push_videos=False)
|
||||||
console.print(f"[green]✓ Pushed to {args.repo_id}[/green]")
|
console.print(f"[green]✓ Pushed to {output_repo_id or f'{args.repo_id}_with_subtasks'}[/green]")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
console.print(f"[red]Push failed: {e}[/red]")
|
console.print(f"[red]Push failed: {e}[/red]")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user