This commit is contained in:
Pepijn
2025-11-18 15:28:40 +01:00
parent f688eb160b
commit 1ffdc6f49e
5 changed files with 624 additions and 56 deletions
+258 -41
View File
@@ -42,13 +42,30 @@ Usage:
# Install dependencies
pip install transformers torch qwen-vl-utils accelerate
# Annotate and push to hub:
# Sequential processing (single GPU):
python subtask_annotation.py \\
--repo-id pepijn223/mydataset \\
--subtasks "reach,grasp,lift,place" \\
--video-key observation.images.base \\
--push-to-hub
# Parallel processing (4 GPUs):
python subtask_annotation.py \\
--repo-id pepijn223/mydataset \\
--subtasks "reach,grasp,lift,place" \\
--video-key observation.images.base \\
--num-workers 4 \\
--push-to-hub
# Parallel with specific GPU IDs:
python subtask_annotation.py \\
--repo-id pepijn223/mydataset \\
--subtasks "reach,grasp,lift,place" \\
--video-key observation.images.base \\
--num-workers 2 \\
--gpu-ids 0 2 \\
--push-to-hub
"""
import argparse
@@ -56,6 +73,8 @@ import json
import time
from pathlib import Path
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import multiprocessing as mp
import pandas as pd
import torch
@@ -63,7 +82,7 @@ from pydantic import BaseModel, Field
from qwen_vl_utils import process_vision_info
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
from rich.tree import Tree
from transformers import Qwen3VLMoeForConditionalGeneration, AutoProcessor
@@ -177,7 +196,7 @@ class VideoAnnotator:
file_path: Path,
start_timestamp: float,
end_timestamp: float,
target_fps: int = 2
target_fps: int = 1
) -> Path:
"""
Extract a specific episode segment from concatenated video.
@@ -187,7 +206,7 @@ class VideoAnnotator:
file_path: Path to the concatenated video file
start_timestamp: Starting timestamp in seconds (within this video file)
end_timestamp: Ending timestamp in seconds (within this video file)
target_fps: Target FPS (default: 2 for faster processing)
target_fps: Target FPS (default: 1 for faster processing)
Returns:
Path to extracted video file
@@ -683,6 +702,77 @@ def process_single_episode(
return ep_idx, None, str(e)
def worker_process_episodes(
worker_id: int,
gpu_id: int,
episode_indices: list[int],
repo_id: str,
video_key: str,
subtask_list: list[str],
model_name: str,
torch_dtype: torch.dtype,
) -> dict[int, SubtaskAnnotation]:
"""
Worker function for parallel processing across GPUs.
Args:
worker_id: Worker ID for logging
gpu_id: GPU device ID to use
episode_indices: List of episode indices to process
repo_id: Dataset repo ID
video_key: Video key to use
subtask_list: List of subtask names
model_name: Model name to load
torch_dtype: Model dtype
Returns:
Dictionary of episode_idx -> SubtaskAnnotation
"""
# Set GPU device
device = f"cuda:{gpu_id}"
# Initialize console for this worker
console = Console()
console.print(f"[cyan]Worker {worker_id} starting on GPU {gpu_id} with {len(episode_indices)} episodes[/cyan]")
# Load dataset (this is lightweight, just metadata)
dataset = LeRobotDataset(repo_id, download_videos=False)
fps = dataset.fps
# Initialize annotator for this worker
annotator = VideoAnnotator(
subtask_list=subtask_list,
model_name=model_name,
device=device,
torch_dtype=torch_dtype
)
# Process assigned episodes
annotations = {}
for i, ep_idx in enumerate(episode_indices):
console.print(f"[cyan]Worker {worker_id} | Episode {ep_idx} ({i+1}/{len(episode_indices)})[/cyan]")
result_ep_idx, annotation, error = process_single_episode(
ep_idx,
dataset.root,
dataset.meta,
video_key,
fps,
annotator,
console
)
if error:
console.print(f"[red]Worker {worker_id} | ✗ Failed episode {result_ep_idx}: {error}[/red]")
elif annotation:
annotations[result_ep_idx] = annotation
console.print(f"[green]Worker {worker_id} | ✓ Completed episode {result_ep_idx}[/green]")
console.print(f"[bold green]Worker {worker_id} completed {len(annotations)}/{len(episode_indices)} episodes[/bold green]")
return annotations
def compute_temporal_proportions(annotations: dict[int, SubtaskAnnotation], fps: int = 30) -> dict[str, float]:
"""
Compute average temporal proportion for each subtask across all episodes.
@@ -742,16 +832,41 @@ def main():
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Sequential processing (single GPU):
python subtask_annotation.py \\
--repo-id pepijn223/mydataset \\
--subtasks "reach,grasp,lift,place" \\
--video-key observation.images.top \\
--push-to-hub
# Parallel processing with 4 GPUs (4x speedup):
python subtask_annotation.py \\
--repo-id pepijn223/mydataset \\
--subtasks "reach,grasp,lift,place" \\
--video-key observation.images.top \\
--num-workers 4 \\
--push-to-hub
# Parallel with specific GPU IDs (e.g., GPUs 0, 2, 3):
python subtask_annotation.py \\
--repo-id pepijn223/mydataset \\
--subtasks "reach,grasp,lift,place" \\
--video-key observation.images.top \\
--num-workers 3 \\
--gpu-ids 0 2 3 \\
--push-to-hub
# List available cameras:
python subtask_annotation.py --repo-id pepijn223/mydataset --subtasks "reach,grasp" --max-episodes 0
# Annotate with specific camera:
python subtask_annotation.py --repo-id pepijn223/mydataset --subtasks "reach,grasp" --video-key observation.images.top --push-to-hub
# Use custom model:
python subtask_annotation.py --repo-id pepijn223/mydataset --subtasks "reach,grasp" --video-key observation.images.top --model Qwen/Qwen3-VL-30B-A3B-Instruct --push-to-hub
python subtask_annotation.py \\
--repo-id pepijn223/mydataset \\
--subtasks "reach,grasp" \\
--max-episodes 0
Note: The 30B model requires ~60GB VRAM. Make sure you have sufficient GPU memory.
Performance Tips:
- Each worker loads one model instance on its assigned GPU
- The 30B model requires ~60GB VRAM per GPU
- Use --num-workers N for N GPUs to get N× speedup
- Episodes are distributed round-robin across workers
"""
)
parser.add_argument(
@@ -821,6 +936,27 @@ Note: The 30B model requires ~60GB VRAM. Make sure you have sufficient GPU memor
choices=["bfloat16", "float16", "float32"],
help="Model dtype (default: bfloat16)",
)
parser.add_argument(
"--num-workers",
type=int,
default=1,
help="Number of parallel workers for multi-GPU processing (default: 1 for sequential). "
"Set to number of GPUs available for parallel processing.",
)
parser.add_argument(
"--gpu-ids",
type=int,
nargs="+",
default=None,
help="Specific GPU IDs to use (e.g., --gpu-ids 0 1 2). "
"If not specified, uses GPUs 0 to num-workers-1.",
)
parser.add_argument(
"--batch-size",
type=int,
default=1,
help="Batch size for processing multiple episodes per inference (experimental, default: 1)",
)
args = parser.parse_args()
@@ -898,39 +1034,120 @@ Note: The 30B model requires ~60GB VRAM. Make sure you have sufficient GPU memor
console.print("[green]All episodes already annotated![/green]")
return
# Initialize annotator with subtask list
annotator = VideoAnnotator(
subtask_list=subtask_list,
model_name=args.model,
device=args.device,
torch_dtype=torch_dtype
)
# Determine GPU IDs to use
if args.gpu_ids:
gpu_ids = args.gpu_ids
if len(gpu_ids) < args.num_workers:
console.print(f"[yellow]Warning: {args.num_workers} workers requested but only {len(gpu_ids)} GPU IDs provided[/yellow]")
args.num_workers = len(gpu_ids)
else:
# Check available GPUs
if torch.cuda.is_available():
num_gpus = torch.cuda.device_count()
if args.num_workers > num_gpus:
console.print(f"[yellow]Warning: {args.num_workers} workers requested but only {num_gpus} GPUs available[/yellow]")
args.num_workers = min(args.num_workers, num_gpus)
gpu_ids = list(range(args.num_workers))
else:
console.print("[yellow]Warning: CUDA not available, using CPU (num_workers will be ignored)[/yellow]")
args.num_workers = 1
gpu_ids = [0] # Dummy value for CPU
# Annotate episodes (sequential processing)
# Annotate episodes - choose sequential or parallel mode
annotations = existing_annotations.copy()
for i, ep_idx in enumerate(episode_indices):
console.print(f"\n[bold cyan]{'=' * 60}[/bold cyan]")
console.print(f"[bold cyan]Episode {ep_idx} ({i + 1}/{len(episode_indices)})[/bold cyan]")
console.print(f"[bold cyan]{'=' * 60}[/bold cyan]")
result_ep_idx, annotation, error = process_single_episode(
ep_idx,
dataset.root,
dataset.meta,
video_key,
fps,
annotator,
console
)
if args.num_workers > 1:
# ===== PARALLEL PROCESSING MODE =====
console.print(f"\n[bold cyan]Using {args.num_workers} parallel workers on GPUs: {gpu_ids}[/bold cyan]")
if error:
console.print(f"[red]✗ Failed to annotate episode {result_ep_idx}: {error}[/red]")
continue
elif annotation:
annotations[result_ep_idx] = annotation
display_annotation(annotation, console, result_ep_idx, fps)
save_annotations_to_dataset(dataset.root, annotations, fps)
# Split episodes across workers
episodes_per_worker = [[] for _ in range(args.num_workers)]
for i, ep_idx in enumerate(episode_indices):
worker_idx = i % args.num_workers
episodes_per_worker[worker_idx].append(ep_idx)
# Show distribution
for worker_id, episodes in enumerate(episodes_per_worker):
console.print(f"[cyan]Worker {worker_id} (GPU {gpu_ids[worker_id]}): {len(episodes)} episodes[/cyan]")
# Start parallel processing using ProcessPoolExecutor
console.print(f"\n[bold cyan]Starting parallel annotation...[/bold cyan]")
with ProcessPoolExecutor(max_workers=args.num_workers) as executor:
# Submit all worker jobs
futures = []
for worker_id in range(args.num_workers):
if not episodes_per_worker[worker_id]:
continue # Skip workers with no episodes
future = executor.submit(
worker_process_episodes,
worker_id,
gpu_ids[worker_id],
episodes_per_worker[worker_id],
args.repo_id,
video_key,
subtask_list,
args.model,
torch_dtype,
)
futures.append(future)
# Collect results as they complete
for future in as_completed(futures):
try:
worker_annotations = future.result()
annotations.update(worker_annotations)
# Save after each worker completes
save_annotations_to_dataset(dataset.root, annotations, fps)
console.print(f"[green]✓ Worker completed, saved {len(worker_annotations)} annotations[/green]")
except Exception as e:
console.print(f"[red]✗ Worker failed: {e}[/red]")
console.print(f"\n[bold green]Parallel processing complete! Annotated {len(annotations)} episodes[/bold green]")
# Display all annotations
for ep_idx in sorted(annotations.keys()):
if ep_idx not in existing_annotations: # Only show newly annotated
display_annotation(annotations[ep_idx], console, ep_idx, fps)
else:
# ===== SEQUENTIAL PROCESSING MODE =====
console.print(f"\n[bold cyan]Using sequential processing (single GPU/CPU)[/bold cyan]")
# Initialize annotator with subtask list
annotator = VideoAnnotator(
subtask_list=subtask_list,
model_name=args.model,
device=args.device,
torch_dtype=torch_dtype
)
# Process episodes sequentially
for i, ep_idx in enumerate(episode_indices):
console.print(f"\n[bold cyan]{'=' * 60}[/bold cyan]")
console.print(f"[bold cyan]Episode {ep_idx} ({i + 1}/{len(episode_indices)})[/bold cyan]")
console.print(f"[bold cyan]{'=' * 60}[/bold cyan]")
result_ep_idx, annotation, error = process_single_episode(
ep_idx,
dataset.root,
dataset.meta,
video_key,
fps,
annotator,
console
)
if error:
console.print(f"[red]✗ Failed to annotate episode {result_ep_idx}: {error}[/red]")
continue
elif annotation:
annotations[result_ep_idx] = annotation
display_annotation(annotation, console, result_ep_idx, fps)
save_annotations_to_dataset(dataset.root, annotations, fps)
# Compute temporal proportions (key SARM insight)
console.print(f"\n[bold cyan]Computing Temporal Proportions[/bold cyan]")