Files
lerobot/examples/dataset/visualization_tools/create_progress_videos.py
T
2026-03-23 20:17:59 -07:00

527 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Create MP4 videos with sarm_progress overlay for specified episodes.
Downloads datasets from HuggingFace, extracts episode video + progress data,
and draws the progress line directly on each frame (no panel, no axes).
"""
import json
import subprocess
from pathlib import Path
import cv2
import numpy as np
import pandas as pd
from huggingface_hub import snapshot_download
DATASETS = [
{"repo_id": "lerobot-data-collection/level2_final_quality3", "episode": 250},
]
CAMERA_KEY = (
"observation.images.base" # None = auto-select first camera, or set e.g. "observation.images.top"
)
OUTPUT_DIR = Path(__file__).resolve().parent / "outputs"
OUTPUT_DIR.mkdir(exist_ok=True)
# Progress line spans the full video height
GRAPH_Y_TOP_FRAC = 0.01
GRAPH_Y_BOT_FRAC = 0.99
LINE_THICKNESS = 3
SHADOW_THICKNESS = 6 # white edge thickness
REF_ALPHA = 0.45 # opacity of the 1.0 reference line
FILL_ALPHA = 0.55 # opacity of the grey fill under the line
SCORE_FONT_SCALE = 0.8
TASK_FONT_SCALE = 0.55
def download_episode(repo_id: str, episode: int) -> Path:
"""Download only the files needed for this episode."""
# We need: meta/, sarm_progress.parquet, and the relevant video/data chunks.
# We'll download meta + sarm first, then figure out chunks.
print(f"\n[1/5] Downloading metadata for {repo_id}")
local = Path(
snapshot_download(
repo_id=repo_id,
repo_type="dataset",
allow_patterns=["meta/**", "sarm_progress.parquet"],
ignore_patterns=["*.mp4"],
)
)
return local
def load_episode_meta(local: Path, episode: int) -> dict:
"""Read info.json + episode-level parquet to get fps, video paths, timestamps."""
info = json.loads((local / "meta" / "info.json").read_text())
fps = info["fps"]
features = info["features"]
# Find video keys (keys whose dtype=="video")
video_keys = [k for k, v in features.items() if v.get("dtype") == "video"]
if not video_keys:
raise RuntimeError("No video keys found in dataset features")
if CAMERA_KEY is not None:
if CAMERA_KEY not in video_keys:
raise RuntimeError(f"CAMERA_KEY='{CAMERA_KEY}' not found. Available: {video_keys}")
first_cam = CAMERA_KEY
else:
first_cam = video_keys[0]
print(f" fps={fps} camera='{first_cam}' all_cams={video_keys}")
# Load all episode-meta parquet files and find our episode
ep_rows = []
for pq in sorted((local / "meta" / "episodes").glob("**/*.parquet")):
df = pd.read_parquet(pq)
ep_rows.append(df)
ep_df = pd.concat(ep_rows, ignore_index=True)
row = ep_df[ep_df["episode_index"] == episode]
if row.empty:
raise RuntimeError(f"Episode {episode} not found in episode metadata")
row = row.iloc[0]
# Extract video chunk/file index for first camera
# Try both dot and slash variants of the key
chunk_col = f"videos/{first_cam}/chunk_index"
file_col = f"videos/{first_cam}/file_index"
ts_col = f"videos/{first_cam}/from_timestamp"
to_col = f"videos/{first_cam}/to_timestamp"
# Some datasets use different column naming
if chunk_col not in row.index:
# Try without the 'videos/' prefix
chunk_col = f"{first_cam}/chunk_index"
file_col = f"{first_cam}/file_index"
ts_col = f"{first_cam}/from_timestamp"
to_col = f"{first_cam}/to_timestamp"
if chunk_col not in row.index:
raise RuntimeError(
f"Cannot find video metadata columns for {first_cam}.\nAvailable: {list(row.index)}"
)
chunk_idx = int(row[chunk_col])
file_idx = int(row[file_col])
from_ts = float(row[ts_col])
to_ts = float(row[to_col])
video_template = info.get(
"video_path", "videos/{video_key}/chunk-{chunk_index:03d}/file-{file_index:03d}.mp4"
)
video_rel = video_template.format(
video_key=first_cam,
chunk_index=chunk_idx,
file_index=file_idx,
)
# Load task name for this episode
# tasks.parquet uses the task string as the row index; task_index column holds the int id
task_name = ""
try:
# Prefer the 'tasks' list directly on the episode row
if "tasks" in row.index and row["tasks"] is not None:
tasks_val = row["tasks"]
if isinstance(tasks_val, (list, tuple, np.ndarray)) and len(tasks_val) > 0:
task_name = str(tasks_val[0])
else:
task_name = str(tasks_val).strip("[]'")
else:
tasks_pq = local / "meta" / "tasks.parquet"
if tasks_pq.exists():
tasks_df = pd.read_parquet(tasks_pq)
# Row index is the task string; task_index column is the int
task_idx = int(row.get("task_index", 0)) if "task_index" in row.index else 0
match = tasks_df[tasks_df["task_index"] == task_idx]
if not match.empty:
task_name = str(match.index[0])
print(f" Task name: '{task_name}'")
except Exception as e:
print(f" WARNING: could not load task name: {e}")
return {
"fps": fps,
"first_cam": first_cam,
"video_rel": video_rel,
"chunk_index": chunk_idx,
"file_index": file_idx,
"from_ts": from_ts,
"to_ts": to_ts,
"task_name": task_name,
}
def download_video(repo_id: str, local: Path, video_rel: str) -> Path:
"""Download the specific video file if not already present."""
video_path = local / video_rel
if video_path.exists():
print(f" Video already cached: {video_path}")
return video_path
print(f"[2/5] Downloading video file {video_rel}")
snapshot_download(
repo_id=repo_id,
repo_type="dataset",
local_dir=str(local),
allow_patterns=[video_rel],
)
if not video_path.exists():
raise RuntimeError(f"Video not found after download: {video_path}")
return video_path
def load_progress(local: Path, episode: int) -> np.ndarray | None:
"""Load sarm_progress values for this episode. Returns sorted array of (frame_index, progress)."""
pq_path = local / "sarm_progress.parquet"
if not pq_path.exists():
print(" WARNING: sarm_progress.parquet not found, trying data parquet …")
return None
df = pd.read_parquet(pq_path)
print(f" sarm_progress.parquet columns: {list(df.columns)}")
ep_df = df[df["episode_index"] == episode].copy()
if ep_df.empty:
print(f" WARNING: No sarm_progress rows for episode {episode}")
return None
ep_df = ep_df.sort_values("frame_index")
# Prefer dense, fall back to sparse
if "progress_dense" in ep_df.columns and ep_df["progress_dense"].notna().any():
prog_col = "progress_dense"
elif "progress_sparse" in ep_df.columns:
prog_col = "progress_sparse"
else:
# Last resort: any column with 'progress' in the name
prog_cols = [c for c in ep_df.columns if "progress" in c.lower()]
if not prog_cols:
return None
prog_col = prog_cols[0]
print(f" Using progress column: '{prog_col}'")
return ep_df[["frame_index", prog_col]].rename(columns={prog_col: "progress"}).values
def extract_episode_clip(video_path: Path, from_ts: float, to_ts: float, out_path: Path) -> Path:
"""Use ffmpeg to cut the episode segment from the combined video file."""
duration = to_ts - from_ts
print(f"[3/5] Extracting clip [{from_ts:.3f}s → {to_ts:.3f}s] ({duration:.2f}s) …")
cmd = [
"ffmpeg",
"-y",
"-ss",
str(from_ts),
"-i",
str(video_path),
"-t",
str(duration),
"-c:v",
"libx264",
"-preset",
"fast",
"-crf",
"18",
"-an",
str(out_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg clip extraction failed:\n{result.stderr}")
return out_path
def precompute_pixels(
progress_data: np.ndarray,
n_frames: int,
frame_w: int,
frame_h: int,
) -> np.ndarray:
"""
Map each progress sample to pixel coordinates.
Returns array of shape (N, 2) with (x, y) in pixel space.
x spans full video width; y maps progress [0,1] to graph band.
"""
frame_indices = progress_data[:, 0].astype(float)
progress_vals = np.clip(progress_data[:, 1].astype(float), 0.0, 1.0)
y_top = int(frame_h * GRAPH_Y_TOP_FRAC)
y_bot = int(frame_h * GRAPH_Y_BOT_FRAC)
graph_h = y_bot - y_top
xs = (frame_indices / (n_frames - 1) * (frame_w - 1)).astype(int)
# progress=1 → y_top, progress=0 → y_bot
ys = (y_bot - progress_vals * graph_h).astype(int)
return np.stack([xs, ys], axis=1) # (N, 2)
def progress_color(t: float) -> tuple[int, int, int]:
"""Interpolate BGR color red→green based on normalised position t in [0,1]."""
r = int(255 * (1.0 - t))
g = int(255 * t)
return (0, g, r) # BGR
def prerender_fill(
pixels: np.ndarray,
frame_w: int,
frame_h: int,
) -> np.ndarray:
"""Pre-render the full grey fill polygon under the curve as a BGRA image."""
y_bot = int(frame_h * GRAPH_Y_BOT_FRAC)
fill_img = np.zeros((frame_h, frame_w, 4), dtype=np.uint8)
poly = np.concatenate(
[
pixels,
[[pixels[-1][0], y_bot], [pixels[0][0], y_bot]],
],
axis=0,
).astype(np.int32)
cv2.fillPoly(fill_img, [poly], color=(128, 128, 128, int(255 * FILL_ALPHA)))
return fill_img
def alpha_composite(base: np.ndarray, overlay_bgra: np.ndarray, x_max: int) -> None:
"""Blend overlay onto base in-place, but only for x < x_max."""
if x_max <= 0:
return
roi_b = base[:, :x_max]
roi_o = overlay_bgra[:, :x_max]
alpha = roi_o[:, :, 3:4].astype(np.float32) / 255.0
roi_b[:] = np.clip(
roi_o[:, :, :3].astype(np.float32) * alpha + roi_b.astype(np.float32) * (1.0 - alpha),
0,
255,
).astype(np.uint8)
def draw_text_outlined(
frame: np.ndarray,
text: str,
pos: tuple[int, int],
font_scale: float,
thickness: int = 1,
) -> None:
"""Draw text with a dark outline for readability on any background."""
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(frame, text, pos, font, font_scale, (0, 0, 0), thickness + 2, cv2.LINE_AA)
cv2.putText(frame, text, pos, font, font_scale, (255, 255, 255), thickness, cv2.LINE_AA)
def composite_video(
clip_path: Path,
progress_data: np.ndarray,
out_path: Path,
fps: float,
frame_h: int,
frame_w: int,
task_name: str = "",
) -> Path:
"""Read clip frames, draw gradient progress line with fill + labels, export as GIF."""
n_total = int(cv2.VideoCapture(str(clip_path)).get(cv2.CAP_PROP_FRAME_COUNT))
pixels = precompute_pixels(progress_data, n_total, frame_w, frame_h)
y_ref = int(frame_h * GRAPH_Y_TOP_FRAC)
# Pre-render fill polygon (line is drawn per-frame with live color)
fill_img = prerender_fill(pixels, frame_w, frame_h)
# 1.0 reference line overlay (full width, drawn once)
ref_img = np.zeros((frame_h, frame_w, 4), dtype=np.uint8)
cv2.line(ref_img, (0, y_ref), (frame_w - 1, y_ref), (200, 200, 200, int(255 * REF_ALPHA)), 1, cv2.LINE_AA)
frame_indices = progress_data[:, 0].astype(int)
progress_vals = progress_data[:, 1].astype(float)
print(f"[4/4] Compositing {n_total} frames …")
cap = cv2.VideoCapture(str(clip_path))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
tmp_path = out_path.parent / (out_path.stem + "_tmp.mp4")
writer = cv2.VideoWriter(str(tmp_path), fourcc, fps, (frame_w, frame_h))
fi = 0
while True:
ret, frame = cap.read()
if not ret:
break
n_drawn = int(np.searchsorted(frame_indices, fi, side="right"))
x_cur = int(pixels[min(n_drawn, len(pixels)) - 1][0]) + 1 if n_drawn > 0 else 0
# 1. reference line (full width, always)
alpha_composite(frame, ref_img, frame_w)
# 2. grey fill under curve up to current x
alpha_composite(frame, fill_img, x_cur)
# 3. progress line — single color that transitions red→green over time
if n_drawn >= 2:
t_cur = (n_drawn - 1) / max(len(progress_vals) - 1, 1)
line_col = progress_color(t_cur)
pts = pixels[:n_drawn].reshape(-1, 1, 2).astype(np.int32)
cv2.polylines(
frame,
[pts],
isClosed=False,
color=(255, 255, 255),
thickness=SHADOW_THICKNESS,
lineType=cv2.LINE_AA,
)
cv2.polylines(
frame, [pts], isClosed=False, color=line_col, thickness=LINE_THICKNESS, lineType=cv2.LINE_AA
)
# 4. score — bottom right
if n_drawn > 0:
score = float(progress_vals[min(n_drawn, len(progress_vals)) - 1])
score_text = f"{score:.2f}"
(tw, th), _ = cv2.getTextSize(score_text, cv2.FONT_HERSHEY_SIMPLEX, SCORE_FONT_SCALE, 2)
sx = frame_w - tw - 12
sy = frame_h - 12
# coloured score matching current gradient position
t_cur = (n_drawn - 1) / max(len(progress_vals) - 1, 1)
score_col = progress_color(t_cur)
cv2.putText(
frame,
score_text,
(sx, sy),
cv2.FONT_HERSHEY_SIMPLEX,
SCORE_FONT_SCALE,
(0, 0, 0),
4,
cv2.LINE_AA,
)
cv2.putText(
frame,
score_text,
(sx, sy),
cv2.FONT_HERSHEY_SIMPLEX,
SCORE_FONT_SCALE,
score_col,
2,
cv2.LINE_AA,
)
# 5. task name — top centre
if task_name:
(tw, _), _ = cv2.getTextSize(task_name, cv2.FONT_HERSHEY_SIMPLEX, TASK_FONT_SCALE, 1)
tx = max((frame_w - tw) // 2, 4)
draw_text_outlined(frame, task_name, (tx, 22), TASK_FONT_SCALE)
writer.write(frame)
fi += 1
if fi % 100 == 0:
print(f" Frame {fi}/{n_total}", end="\r")
cap.release()
writer.release()
print()
# Convert to GIF: full resolution, 12fps, 128-color diff palette (<40MB)
gif_path = out_path.with_suffix(".gif")
palette = out_path.parent / "_palette.png"
r1 = subprocess.run( # nosec B607
[
"ffmpeg",
"-y",
"-i",
str(tmp_path),
"-vf",
f"fps=10,scale={frame_w}:-1:flags=lanczos,palettegen=max_colors=128:stats_mode=diff",
"-update",
"1",
str(palette),
],
capture_output=True,
text=True,
)
if r1.returncode != 0:
print(f" WARNING: palettegen failed:\n{r1.stderr[-500:]}")
r2 = subprocess.run( # nosec B607
[
"ffmpeg",
"-y",
"-i",
str(tmp_path),
"-i",
str(palette),
"-filter_complex",
f"fps=10,scale={frame_w}:-1:flags=lanczos[v];[v][1:v]paletteuse=dither=bayer:bayer_scale=3",
str(gif_path),
],
capture_output=True,
text=True,
)
if r2.returncode != 0:
print(f" WARNING: gif encode failed:\n{r2.stderr[-500:]}")
tmp_path.unlink(missing_ok=True)
palette.unlink(missing_ok=True)
return gif_path
def process_dataset(repo_id: str, episode: int):
safe_name = repo_id.replace("/", "_")
print(f"\n{'=' * 60}")
print(f"Processing: {repo_id} | episode {episode}")
print(f"{'=' * 60}")
# 1. Download metadata
local = download_episode(repo_id, episode)
print(f" Local cache: {local}")
# 2. Read episode metadata
ep_meta = load_episode_meta(local, episode)
print(f" Episode meta: {ep_meta}")
# 3. Download video file
video_path = download_video(repo_id, local, ep_meta["video_rel"])
# 4. Extract clip
clip_path = OUTPUT_DIR / f"{safe_name}_ep{episode}_clip.mp4"
extract_episode_clip(video_path, ep_meta["from_ts"], ep_meta["to_ts"], clip_path)
# 5. Load progress data
progress_data = load_progress(local, episode)
if progress_data is None:
print(" ERROR: Could not load sarm_progress data. Skipping overlay.")
return
n_progress = len(progress_data)
print(f" Progress frames: {n_progress}")
# 6. Get clip dimensions
cap = cv2.VideoCapture(str(clip_path))
frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
actual_fps = cap.get(cv2.CAP_PROP_FPS) or ep_meta["fps"]
cap.release()
print(f" Clip: {frame_w}×{frame_h} {n_frames} frames @ {actual_fps:.1f}fps")
# 7. Composite (draw line directly on frames)
out_path = OUTPUT_DIR / f"{safe_name}_ep{episode}_progress.mp4"
final = composite_video(
clip_path,
progress_data,
out_path,
actual_fps,
frame_h,
frame_w,
task_name=ep_meta.get("task_name", ""),
)
clip_path.unlink(missing_ok=True)
print(f"\n✓ Done: {final}")
return final
if __name__ == "__main__":
results = []
for cfg in DATASETS:
try:
out = process_dataset(cfg["repo_id"], cfg["episode"])
if out:
results.append(out)
except Exception as e:
print(f"\nERROR processing {cfg['repo_id']}: {e}")
import traceback
traceback.print_exc()
print("\n" + "=" * 60)
print("Output files:")
for r in results:
print(f" {r}")