diff --git a/src/lerobot/datasets/lerobot_dataset.py b/src/lerobot/datasets/lerobot_dataset.py index 93f91cb88..7231bc78d 100644 --- a/src/lerobot/datasets/lerobot_dataset.py +++ b/src/lerobot/datasets/lerobot_dataset.py @@ -57,6 +57,7 @@ from lerobot.datasets.utils import ( load_info, load_nested_dataset, load_stats, + load_subtasks, load_tasks, load_tasks_high_level, update_chunk_file_indices, @@ -164,6 +165,7 @@ class LeRobotDatasetMetadata: check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION) self.tasks = load_tasks(self.root) self.tasks_high_level = load_tasks_high_level(self.root) + self.subtasks = load_subtasks(self.root) self.episodes = load_episodes(self.root) self.stats = load_stats(self.root) @@ -520,6 +522,8 @@ class LeRobotDatasetMetadata: _validate_feature_names(features) obj.tasks = None + obj.tasks_high_level = None + obj.subtasks = None obj.episodes = None obj.stats = None obj.info = create_empty_dataset_info( @@ -1068,6 +1072,12 @@ class LeRobotDataset(torch.utils.data.Dataset): high_level_task_idx = item["task_index_high_level"].item() item["robot_utterance"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["robot_utterance"] item["user_prompt"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["user_prompt"] + + # optionally add subtask information + if "subtask_index" in self.features and self.meta.subtasks is not None: + subtask_idx = item["subtask_index"].item() + item["subtask"] = self.meta.subtasks.iloc[subtask_idx].name + return item def __repr__(self): diff --git a/src/lerobot/datasets/utils.py b/src/lerobot/datasets/utils.py index d3a146fc1..91203bb22 100644 --- a/src/lerobot/datasets/utils.py +++ b/src/lerobot/datasets/utils.py @@ -62,6 +62,7 @@ CHUNK_FILE_PATTERN = "chunk-{chunk_index:03d}/file-{file_index:03d}" DEFAULT_TASKS_PATH = "meta/tasks.parquet" DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet" DEFAULT_TASKS_HIGH_LEVEL_PATH = "meta/tasks_high_level.parquet" +DEFAULT_SUBTASKS_PATH = "meta/subtasks.parquet" DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet" DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4" DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png" @@ -353,9 +354,20 @@ def load_tasks(local_dir: Path) -> pandas.DataFrame: tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_PATH) return tasks -def load_tasks_high_level(local_dir: Path) -> pandas.DataFrame: - tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_HIGH_LEVEL_PATH) - return tasks +def load_tasks_high_level(local_dir: Path) -> pandas.DataFrame | None: + """Load high-level tasks from tasks_high_level.parquet if it exists.""" + tasks_high_level_path = local_dir / DEFAULT_TASKS_HIGH_LEVEL_PATH + if tasks_high_level_path.exists(): + return pd.read_parquet(tasks_high_level_path) + return None + + +def load_subtasks(local_dir: Path) -> pandas.DataFrame | None: + """Load subtasks from subtasks.parquet if it exists.""" + subtasks_path = local_dir / DEFAULT_SUBTASKS_PATH + if subtasks_path.exists(): + return pd.read_parquet(subtasks_path) + return None def write_episodes(episodes: Dataset, local_dir: Path) -> None: """Write episode metadata to a parquet file in the LeRobot v3.0 format. diff --git a/src/lerobot/policies/pi05_full/annotate/run_pgen.sh b/src/lerobot/policies/pi05_full/annotate/run_pgen.sh index 570a67799..7250d8bf3 100644 --- a/src/lerobot/policies/pi05_full/annotate/run_pgen.sh +++ b/src/lerobot/policies/pi05_full/annotate/run_pgen.sh @@ -9,21 +9,28 @@ MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct" # or: MODEL="Qwen/Qwen2-VL-7B-Instruct" -OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen" +OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen_new" + BATCH_SIZE=32 TEMPERATURE=0.9 SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed) -# run synthetic data generation (all episodes processed) -python examples/dataset/annotate_pgen.py \ +# Run subtask annotation +python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \ --repo-id "$REPO_ID" \ - --model "$MODEL" \ + --video-key observation.images.base \ --output-dir "$OUTPUT_DIR" \ - --temperature "$TEMPERATURE" \ - --batch-size "$BATCH_SIZE" \ - --sample-interval "$SAMPLE_INTERVAL" \ - --image-key observation.images.base \ - --num-image-views-per-sample 1 + --output-repo-id "jadechoghari/collect-data-with-subtasks" +# run synthetic data generation (all episodes processed) +# python examples/dataset/annotate_pgen.py \ +# --repo-id "$REPO_ID" \ +# --model "$MODEL" \ +# --output-dir "$OUTPUT_DIR" \ +# --temperature "$TEMPERATURE" \ +# --batch-size "$BATCH_SIZE" \ +# --sample-interval "$SAMPLE_INTERVAL" \ +# --image-key observation.images.base \ +# --num-image-views-per-sample 1 # for faster testing, increase sample interval: # --sample-interval 5.0 # Samples every 5 seconds (much faster) diff --git a/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py b/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py index da296bd2d..74f1a7d54 100644 --- a/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py +++ b/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py @@ -19,13 +19,17 @@ Automatic Skill Annotation for LeRobot Datasets. This script performs automatic subtask/skill labeling for ANY LeRobot dataset using Vision-Language Models (VLMs). It segments each robot demonstration into short atomic -skills (1-3 seconds each) and updates the dataset's task field. +skills (1-3 seconds each) and creates a new dataset with subtask annotations. The pipeline: 1. Loads a LeRobot dataset (local or from HuggingFace Hub) 2. For each episode, extracts video frames 3. Uses a VLM to identify skill boundaries and labels -4. Updates the dataset's task metadata with skill annotations +4. Creates a subtasks.parquet file with unique subtasks +5. Adds a subtask_index feature to the dataset + +NOTE: This script does NOT modify the original tasks.parquet file. It creates a +separate subtask hierarchy while preserving the original task annotations. Supported VLMs (modular design allows easy extension): - Qwen2-VL (default): "Qwen/Qwen2-VL-7B-Instruct" @@ -37,6 +41,7 @@ python examples/dataset/annotate.py \ --repo-id your-username/your-dataset \ --video-key observation.images.base \ --model Qwen/Qwen2-VL-7B-Instruct \ + --output-dir /path/to/output \ --push-to-hub ``` @@ -44,14 +49,16 @@ Or with a local dataset: ```bash python examples/dataset/annotate.py \ --data-dir /path/to/local/dataset \ - --video-key observation.images.base + --video-key observation.images.base \ + --output-dir /path/to/output ``` -After running, you can access the skill for any frame via: + +After running, you can access the subtask for any frame via: ```python -dataset = LeRobotDataset(repo_id="your/dataset") +dataset = LeRobotDataset(repo_id="your/dataset_with_subtasks") item = dataset[100] -task_idx = item["task_index"] -skill_name = dataset.meta.tasks.iloc[task_idx.item()].name +subtask_idx = item["subtask_index"] +subtask_name = dataset.meta.subtasks.iloc[subtask_idx.item()].name ``` """ @@ -66,10 +73,13 @@ from pathlib import Path from typing import Any import cv2 +import numpy as np +import pandas as pd import torch from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn +from lerobot.datasets.dataset_tools import add_features from lerobot.datasets.lerobot_dataset import LeRobotDataset @@ -856,24 +866,18 @@ def get_skill_for_timestamp(skills: list[Skill], timestamp: float) -> Skill | No return skills[-1] if skills else None # Fallback to last skill -def update_dataset_tasks( - dataset: LeRobotDataset, +def create_subtasks_dataframe( annotations: dict[int, EpisodeSkills], -) -> dict[str, int]: +) -> tuple[pd.DataFrame, dict[str, int]]: """ - Register all unique skill names as new tasks in the dataset. + Create a subtasks DataFrame from skill annotations. Args: - dataset: The LeRobot dataset to update annotations: Dictionary of episode skills Returns: - Dictionary mapping skill name to task_index + Tuple of (subtasks_df, skill_to_subtask_idx mapping) """ - import pandas as pd - - from lerobot.datasets.utils import write_tasks - console = Console() # Collect all unique skill names @@ -882,163 +886,135 @@ def update_dataset_tasks( for skill in episode_skills.skills: all_skill_names.add(skill.name) - console.print(f"[cyan]Found {len(all_skill_names)} unique skills[/cyan]") + console.print(f"[cyan]Found {len(all_skill_names)} unique subtasks[/cyan]") - # Build new tasks DataFrame - # Start with existing tasks if any - if dataset.meta.tasks is not None and len(dataset.meta.tasks) > 0: - existing_tasks = set(dataset.meta.tasks.index.tolist()) - max_task_idx = dataset.meta.tasks["task_index"].max() - else: - existing_tasks = set() - max_task_idx = -1 + # Build subtasks DataFrame + subtask_data = [] + for i, skill_name in enumerate(sorted(all_skill_names)): + subtask_data.append({ + "subtask": skill_name, + "subtask_index": i, + }) - # Add new skills as tasks - new_tasks = all_skill_names - existing_tasks - if new_tasks: - new_task_data = [] - for i, skill_name in enumerate(sorted(new_tasks)): - new_task_data.append({ - "task": skill_name, - "task_index": max_task_idx + 1 + i, - }) + subtasks_df = pd.DataFrame(subtask_data).set_index("subtask") - new_tasks_df = pd.DataFrame(new_task_data).set_index("task") - - if dataset.meta.tasks is not None and len(dataset.meta.tasks) > 0: - dataset.meta.tasks = pd.concat([dataset.meta.tasks, new_tasks_df]) - else: - dataset.meta.tasks = new_tasks_df - - # Write updated tasks to disk - write_tasks(dataset.meta.tasks, dataset.root) - console.print(f"[green]✓ Added {len(new_tasks)} new tasks to tasks.parquet[/green]") - - # Build skill name to task_index mapping - skill_to_task_idx = { - task_name: int(dataset.meta.tasks.loc[task_name, "task_index"]) - for task_name in all_skill_names + # Build skill name to subtask_index mapping + skill_to_subtask_idx = { + skill_name: int(subtasks_df.loc[skill_name, "subtask_index"]) + for skill_name in all_skill_names } - return skill_to_task_idx + return subtasks_df, skill_to_subtask_idx -def update_frame_task_indices( +def save_subtasks( + subtasks_df: pd.DataFrame, + dataset_root: Path, + console: Console | None = None, +) -> None: + """Save subtasks to subtasks.parquet.""" + if console is None: + console = Console() + + output_path = dataset_root / "meta" / "subtasks.parquet" + output_path.parent.mkdir(parents=True, exist_ok=True) + + subtasks_df.to_parquet(output_path, engine="pyarrow", compression="snappy") + console.print(f"[green]✓ Saved subtasks to {output_path}[/green]") + + +def create_subtask_index_array( dataset: LeRobotDataset, annotations: dict[int, EpisodeSkills], - skill_to_task_idx: dict[str, int], -) -> None: + skill_to_subtask_idx: dict[str, int], +) -> np.ndarray: """ - Update the task_index for each frame based on skill annotations. - - This reads the data parquet files, updates task_index based on which - skill covers each frame's timestamp, and writes back to disk. + Create a subtask_index array for each frame based on skill annotations. Args: - dataset: The LeRobot dataset to update + dataset: The LeRobot dataset annotations: Dictionary of episode skills - skill_to_task_idx: Mapping from skill name to task_index - """ - import pandas as pd + skill_to_subtask_idx: Mapping from skill name to subtask_index + Returns: + Array of subtask indices for each frame in the dataset + """ console = Console() - # Group episodes by their data file (chunk_index, file_index) - episodes_by_file: dict[tuple[int, int], list[int]] = {} - for ep_idx in annotations.keys(): + # Array to store subtask index for each frame + full_dataset_length = len(dataset) + subtask_indices = np.zeros(full_dataset_length, dtype=np.int64) + + console.print(f"[cyan]Creating subtask_index array for {full_dataset_length} frames...[/cyan]") + + # Assign subtask_index for each annotated episode + for ep_idx, episode_skills in annotations.items(): + skills = episode_skills.skills + + # Get episode frame range ep = dataset.meta.episodes[ep_idx] - chunk_idx = ep["data/chunk_index"] - file_idx = ep["data/file_index"] - key = (chunk_idx, file_idx) - if key not in episodes_by_file: - episodes_by_file[key] = [] - episodes_by_file[key].append(ep_idx) + ep_from = ep["dataset_from_index"] + ep_to = ep["dataset_to_index"] - # Process each data file - for (chunk_idx, file_idx), episode_indices in episodes_by_file.items(): - data_path = dataset.root / dataset.meta.data_path.format( - chunk_index=chunk_idx, file_index=file_idx - ) + # Process each frame in the episode + for frame_idx in range(ep_from, ep_to): + frame = dataset[frame_idx] + timestamp = frame["timestamp"].item() + + # Find which skill covers this timestamp + skill = get_skill_for_timestamp(skills, timestamp) - if not data_path.exists(): - console.print(f"[yellow]Warning: Data file not found: {data_path}[/yellow]") - continue + if skill and skill.name in skill_to_subtask_idx: + subtask_idx = skill_to_subtask_idx[skill.name] + subtask_indices[frame_idx] = subtask_idx - # Read the parquet file - df = pd.read_parquet(data_path) - original_task_indices = df["task_index"].copy() - updated_count = 0 - - # Update task_index for each episode in this file - for ep_idx in episode_indices: - if ep_idx not in annotations: - continue - - episode_skills = annotations[ep_idx] - skills = episode_skills.skills - - # Get episode frame range - ep = dataset.meta.episodes[ep_idx] - ep_from = ep["dataset_from_index"] - ep_to = ep["dataset_to_index"] - - # Filter to rows for this episode - episode_mask = (df["index"] >= ep_from) & (df["index"] < ep_to) - episode_rows = df.loc[episode_mask] - - # Update task_index for each frame based on its timestamp - for idx, row in episode_rows.iterrows(): - timestamp = row["timestamp"] - skill = get_skill_for_timestamp(skills, timestamp) - - if skill and skill.name in skill_to_task_idx: - new_task_idx = skill_to_task_idx[skill.name] - if df.at[idx, "task_index"] != new_task_idx: - df.at[idx, "task_index"] = new_task_idx - updated_count += 1 - - # Write back if any changes were made - if updated_count > 0: - df.to_parquet(data_path, engine="pyarrow", compression="snappy", index=False) - console.print( - f"[green]✓ Updated {updated_count} frame task_indices in {data_path.name}[/green]" - ) + console.print(f"[green]✓ Created subtask_index array[/green]") + return subtask_indices def save_skill_annotations( dataset: LeRobotDataset, annotations: dict[int, EpisodeSkills], - output_path: Path | None = None, -) -> None: + output_dir: Path | None = None, + repo_id: str | None = None, +) -> LeRobotDataset: """ - Save skill annotations to the dataset, updating both: - 1. The tasks.parquet with new skill names - 2. The per-frame task_index in data parquet files + Save skill annotations to the dataset by: + 1. Creating a subtasks.parquet file with unique subtasks + 2. Adding a subtask_index feature to the dataset + 3. Saving raw skill annotations as JSON for reference - This function updates the task field for each frame based on - which skill covers that frame's timestamp. + This function does NOT modify tasks.parquet - it keeps the original tasks intact + and creates a separate subtask hierarchy. Args: - dataset: The LeRobot dataset to update + dataset: The LeRobot dataset to annotate annotations: Dictionary of episode skills - output_path: Optional custom output path for the annotations JSON + output_dir: Optional directory to save the modified dataset + repo_id: Optional repository ID for the new dataset + + Returns: + New dataset with subtask_index feature added """ console = Console() if not annotations: console.print("[yellow]No annotations to save[/yellow]") - return + return dataset - # Step 1: Register all unique skills as tasks - console.print("[cyan]Registering skills as tasks...[/cyan]") - skill_to_task_idx = update_dataset_tasks(dataset, annotations) + # Step 1: Create subtasks DataFrame + console.print("[cyan]Creating subtasks DataFrame...[/cyan]") + subtasks_df, skill_to_subtask_idx = create_subtasks_dataframe(annotations) + + # Step 2: Create subtask_index array for all frames + console.print("[cyan]Creating subtask_index array...[/cyan]") + subtask_indices = create_subtask_index_array(dataset, annotations, skill_to_subtask_idx) - # Step 2: Update per-frame task_index in data parquet files - console.print("[cyan]Updating per-frame task indices...[/cyan]") - update_frame_task_indices(dataset, annotations, skill_to_task_idx) + # Step 3: Save subtasks.parquet to the original dataset root + save_subtasks(subtasks_df, dataset.root, console) - # Step 3: Also save the raw skill annotations as JSON for reference - skills_path = output_path or (dataset.root / "meta" / "skills.json") + # Step 4: Save the raw skill annotations as JSON for reference + skills_path = dataset.root / "meta" / "skills.json" skills_path.parent.mkdir(parents=True, exist_ok=True) # Load existing skills data if it exists and is not empty @@ -1062,16 +1038,16 @@ def save_skill_annotations( merged_episodes = existing_skills_data.get("episodes", {}).copy() merged_episodes.update(new_episodes) - # Merge skill_to_task_index mappings - merged_skill_to_task = existing_skills_data.get("skill_to_task_index", {}).copy() - merged_skill_to_task.update(skill_to_task_idx) + # Merge skill_to_subtask_index mappings + merged_skill_to_subtask = existing_skills_data.get("skill_to_subtask_index", {}).copy() + merged_skill_to_subtask.update(skill_to_subtask_idx) # Use existing coarse_description if available, otherwise use new one coarse_desc = existing_skills_data.get("coarse_description", annotations[next(iter(annotations))].description) skills_data = { "coarse_description": coarse_desc, - "skill_to_task_index": merged_skill_to_task, + "skill_to_subtask_index": merged_skill_to_subtask, "episodes": merged_episodes, } console.print(f"[cyan]Updated {len(new_episodes)} episode(s), total episodes in skills.json: {len(merged_episodes)}[/cyan]") @@ -1079,7 +1055,7 @@ def save_skill_annotations( # No existing data, create new skills_data = { "coarse_description": annotations[next(iter(annotations))].description, - "skill_to_task_index": skill_to_task_idx, + "skill_to_subtask_index": skill_to_subtask_idx, "episodes": new_episodes, } @@ -1088,8 +1064,49 @@ def save_skill_annotations( console.print(f"[green]✓ Saved skill annotations to {skills_path}[/green]") - # Reload the dataset's hf_dataset to reflect changes - dataset._lazy_loading = True + # Step 5: Add subtask_index feature to dataset using add_features + console.print("[cyan]Adding subtask_index feature to dataset...[/cyan]") + + # Determine output directory and repo_id + if output_dir is None: + output_dir = dataset.root.parent / f"{dataset.root.name}_with_subtasks" + else: + output_dir = Path(output_dir) + + if repo_id is None: + repo_id = f"{dataset.repo_id}_with_subtasks" + + # Add feature using dataset_tools + feature_info = { + "dtype": "int64", + "shape": (1,), + "names": None, + } + new_dataset = add_features( + dataset=dataset, + features={ + "subtask_index": (subtask_indices, feature_info), + }, + output_dir=output_dir, + repo_id=repo_id, + ) + + # Copy subtasks.parquet to new output directory + import shutil + shutil.copy( + dataset.root / "meta" / "subtasks.parquet", + output_dir / "meta" / "subtasks.parquet" + ) + shutil.copy( + dataset.root / "meta" / "skills.json", + output_dir / "meta" / "skills.json" + ) + + console.print(f"[bold green]✓ Successfully added subtask_index feature![/bold green]") + console.print(f" New dataset saved to: {new_dataset.root}") + console.print(f" Total subtasks: {len(subtasks_df)}") + + return new_dataset def load_skill_annotations(dataset_root: Path) -> dict | None: @@ -1112,17 +1129,20 @@ def main(): epilog=textwrap.dedent("""\ Examples: # Annotate a HuggingFace Hub dataset - python annotate.py --repo-id user/dataset --video-key observation.images.base + python annotate.py --repo-id user/dataset --video-key observation.images.base \\ + --output-dir ./output # Annotate a local dataset with custom batch size - python annotate.py --data-dir /path/to/dataset --video-key observation.images.base --batch-size 16 + python annotate.py --data-dir /path/to/dataset --video-key observation.images.base \\ + --batch-size 16 --output-dir ./output # Use a specific model python annotate.py --repo-id user/dataset --video-key observation.images.base \\ - --model Qwen/Qwen2-VL-7B-Instruct + --model Qwen/Qwen2-VL-7B-Instruct --output-dir ./output # Push annotated dataset to Hub - python annotate.py --repo-id user/dataset --video-key observation.images.base --push-to-hub + python annotate.py --repo-id user/dataset --video-key observation.images.base \\ + --output-dir ./output --push-to-hub """), ) @@ -1186,9 +1206,14 @@ def main(): help="Push annotated dataset to HuggingFace Hub", ) parser.add_argument( - "--output-path", + "--output-dir", type=str, - help="Custom output path for annotations JSON", + help="Output directory for modified dataset with subtask_index feature", + ) + parser.add_argument( + "--output-repo-id", + type=str, + help="Repository ID for the new dataset (default: original_repo_id_with_subtasks)", ) args = parser.parse_args() @@ -1232,14 +1257,16 @@ def main(): ) # Save annotations - output_path = Path(args.output_path) if args.output_path else None - save_skill_annotations(dataset, annotations, output_path) + output_dir = Path(args.output_dir) if args.output_dir else None + output_repo_id = args.output_repo_id if args.output_repo_id else None + new_dataset = save_skill_annotations(dataset, annotations, output_dir, output_repo_id) # Summary total_skills = sum(len(ann.skills) for ann in annotations.values()) console.print(f"\n[bold green]✓ Annotation complete![/bold green]") console.print(f" Episodes annotated: {len(annotations)}") - console.print(f" Total skills identified: {total_skills}") + console.print(f" Total subtasks identified: {total_skills}") + console.print(f" Dataset with subtask_index saved to: {new_dataset.root}") # Push to hub if requested if args.push_to_hub: @@ -1248,8 +1275,8 @@ def main(): else: console.print("[cyan]Pushing to HuggingFace Hub...[/cyan]") try: - dataset.push_to_hub(push_videos=False) - console.print(f"[green]✓ Pushed to {args.repo_id}[/green]") + new_dataset.push_to_hub(push_videos=False) + console.print(f"[green]✓ Pushed to {output_repo_id or f'{args.repo_id}_with_subtasks'}[/green]") except Exception as e: console.print(f"[red]Push failed: {e}[/red]")