improve(dataset-tools): image2video editing tools : Multiple episodes per video file (#2811)

* improve image2video

* add episodes video encoding

* fix mypy failing

* iterate on review

* nit

* remove max, and let it be optional

* iterate more

* update docs

* fix test

---------

Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co>
This commit is contained in:
Jade Choghari
2026-01-20 11:04:22 +01:00
committed by GitHub
parent b2ff219624
commit 79688a09f2
4 changed files with 611 additions and 391 deletions
+13 -6
View File
@@ -95,26 +95,26 @@ Convert an image-based dataset to video format, creating a new LeRobotDataset wh
# Local-only: Save to a custom output directory (no hub push) # Local-only: Save to a custom output directory (no hub push)
lerobot-edit-dataset \ lerobot-edit-dataset \
--repo_id lerobot/pusht_image \ --repo_id lerobot/pusht_image \
--operation.type convert_to_video \ --operation.type convert_image_to_video \
--operation.output_dir /path/to/output/pusht_video --operation.output_dir /path/to/output/pusht_video
# Save with new repo_id (local storage) # Save with new repo_id (local storage)
lerobot-edit-dataset \ lerobot-edit-dataset \
--repo_id lerobot/pusht_image \ --repo_id lerobot/pusht_image \
--new_repo_id lerobot/pusht_video \ --new_repo_id lerobot/pusht_video \
--operation.type convert_to_video --operation.type convert_image_to_video
# Convert and push to Hugging Face Hub # Convert and push to Hugging Face Hub
lerobot-edit-dataset \ lerobot-edit-dataset \
--repo_id lerobot/pusht_image \ --repo_id lerobot/pusht_image \
--new_repo_id lerobot/pusht_video \ --new_repo_id lerobot/pusht_video \
--operation.type convert_to_video \ --operation.type convert_image_to_video \
--push_to_hub true --push_to_hub true
# Convert with custom video codec and quality settings # Convert with custom video codec and quality settings
lerobot-edit-dataset \ lerobot-edit-dataset \
--repo_id lerobot/pusht_image \ --repo_id lerobot/pusht_image \
--operation.type convert_to_video \ --operation.type convert_image_to_video \
--operation.output_dir outputs/pusht_video \ --operation.output_dir outputs/pusht_video \
--operation.vcodec libsvtav1 \ --operation.vcodec libsvtav1 \
--operation.pix_fmt yuv420p \ --operation.pix_fmt yuv420p \
@@ -124,16 +124,23 @@ lerobot-edit-dataset \
# Convert only specific episodes # Convert only specific episodes
lerobot-edit-dataset \ lerobot-edit-dataset \
--repo_id lerobot/pusht_image \ --repo_id lerobot/pusht_image \
--operation.type convert_to_video \ --operation.type convert_image_to_video \
--operation.output_dir outputs/pusht_video \ --operation.output_dir outputs/pusht_video \
--operation.episode_indices "[0, 1, 2, 5, 10]" --operation.episode_indices "[0, 1, 2, 5, 10]"
# Convert with multiple workers for parallel processing # Convert with multiple workers for parallel processing
lerobot-edit-dataset \ lerobot-edit-dataset \
--repo_id lerobot/pusht_image \ --repo_id lerobot/pusht_image \
--operation.type convert_to_video \ --operation.type convert_image_to_video \
--operation.output_dir outputs/pusht_video \ --operation.output_dir outputs/pusht_video \
--operation.num_workers 8 --operation.num_workers 8
# For memory-constrained systems, users can now specify limits:
lerobot-edit-dataset \
--repo_id lerobot/pusht_image \
--operation.type convert_to_video \
--operation.max_episodes_per_batch 50 \
--operation.max_frames_per_batch 10000
``` ```
**Parameters:** **Parameters:**
+561 -1
View File
@@ -26,6 +26,7 @@ This module provides utilities for:
import logging import logging
import shutil import shutil
from collections.abc import Callable from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path from pathlib import Path
import datasets import datasets
@@ -51,7 +52,8 @@ from lerobot.datasets.utils import (
write_stats, write_stats,
write_tasks, write_tasks,
) )
from lerobot.utils.constants import HF_LEROBOT_HOME from lerobot.datasets.video_utils import encode_video_frames, get_video_info
from lerobot.utils.constants import HF_LEROBOT_HOME, OBS_IMAGE
def _load_episode_with_stats(src_dataset: LeRobotDataset, episode_idx: int) -> dict: def _load_episode_with_stats(src_dataset: LeRobotDataset, episode_idx: int) -> dict:
@@ -1083,3 +1085,561 @@ def _copy_episodes_metadata_and_stats(
else: else:
if src_dataset.meta.stats: if src_dataset.meta.stats:
write_stats(src_dataset.meta.stats, dst_meta.root) write_stats(src_dataset.meta.stats, dst_meta.root)
def _save_episode_images_for_video(
dataset: LeRobotDataset,
imgs_dir: Path,
img_key: str,
episode_index: int,
num_workers: int = 4,
) -> None:
"""Save images from a specific episode and camera to disk for video encoding.
Args:
dataset: The LeRobot dataset to extract images from
imgs_dir: Directory to save images to
img_key: The image key (camera) to extract
episode_index: Index of the episode to save
num_workers: Number of threads for parallel image saving
"""
# Create directory
imgs_dir.mkdir(parents=True, exist_ok=True)
# Get dataset without torch format for PIL image access
hf_dataset = dataset.hf_dataset.with_format(None)
# Select only this camera's images
imgs_dataset = hf_dataset.select_columns(img_key)
# Get episode start and end indices
from_idx = dataset.meta.episodes["dataset_from_index"][episode_index]
to_idx = dataset.meta.episodes["dataset_to_index"][episode_index]
# Get all items for this episode
episode_dataset = imgs_dataset.select(range(from_idx, to_idx))
# Define function to save a single image
def save_single_image(i_item_tuple):
i, item = i_item_tuple
img = item[img_key]
# Use frame-XXXXXX.png format to match encode_video_frames expectations
img.save(str(imgs_dir / f"frame-{i:06d}.png"), quality=100)
return i
# Save images with proper naming convention for encode_video_frames (frame-XXXXXX.png)
items = list(enumerate(episode_dataset))
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(save_single_image, item) for item in items]
for future in as_completed(futures):
future.result() # This will raise any exceptions that occurred
def _save_batch_episodes_images(
dataset: LeRobotDataset,
imgs_dir: Path,
img_key: str,
episode_indices: list[int],
num_workers: int = 4,
) -> list[float]:
"""Save images from multiple episodes to disk for batch video encoding.
Args:
dataset: The LeRobot dataset to extract images from
imgs_dir: Directory to save images to
img_key: The image key (camera) to extract
episode_indices: List of episode indices to save
num_workers: Number of threads for parallel image saving
Returns:
List of episode durations in seconds
"""
imgs_dir.mkdir(parents=True, exist_ok=True)
hf_dataset = dataset.hf_dataset.with_format(None)
imgs_dataset = hf_dataset.select_columns(img_key)
# Define function to save a single image with global frame index
# Defined once outside the loop to avoid repeated closure creation
def save_single_image(i_item_tuple, base_frame_idx, img_key_param):
i, item = i_item_tuple
img = item[img_key_param]
# Use global frame index for naming
img.save(str(imgs_dir / f"frame-{base_frame_idx + i:06d}.png"), quality=100)
return i
episode_durations = []
frame_idx = 0
for ep_idx in episode_indices:
# Get episode range
from_idx = dataset.meta.episodes["dataset_from_index"][ep_idx]
to_idx = dataset.meta.episodes["dataset_to_index"][ep_idx]
episode_length = to_idx - from_idx
episode_durations.append(episode_length / dataset.fps)
# Get episode images
episode_dataset = imgs_dataset.select(range(from_idx, to_idx))
# Save images
items = list(enumerate(episode_dataset))
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(save_single_image, item, frame_idx, img_key) for item in items]
for future in as_completed(futures):
future.result()
frame_idx += episode_length
return episode_durations
def _iter_episode_batches(
episode_indices: list[int],
episode_lengths: dict[int, int],
size_per_frame_mb: float,
video_file_size_limit: float,
max_episodes: int | None,
max_frames: int | None,
):
"""Generator that yields batches of episode indices for video encoding.
Groups episodes into batches that respect size and memory constraints:
- Stays under video file size limit
- Respects maximum episodes per batch (if specified)
- Respects maximum frames per batch (if specified)
Args:
episode_indices: List of episode indices to batch
episode_lengths: Dictionary mapping episode index to episode length
size_per_frame_mb: Estimated size per frame in MB
video_file_size_limit: Maximum video file size in MB
max_episodes: Maximum number of episodes per batch (None = no limit)
max_frames: Maximum number of frames per batch (None = no limit)
Yields:
List of episode indices for each batch
"""
batch_episodes = []
estimated_size = 0.0
total_frames = 0
for ep_idx in episode_indices:
ep_length = episode_lengths[ep_idx]
ep_estimated_size = ep_length * size_per_frame_mb
# we check if adding this episode would exceed any constraint
would_exceed_size = estimated_size > 0 and estimated_size + ep_estimated_size >= video_file_size_limit
would_exceed_episodes = max_episodes is not None and len(batch_episodes) >= max_episodes
would_exceed_frames = max_frames is not None and total_frames + ep_length > max_frames
if batch_episodes and (would_exceed_size or would_exceed_episodes or would_exceed_frames):
# yield current batch before adding this episode
yield batch_episodes
# start a new batch with current episode
batch_episodes = [ep_idx]
estimated_size = ep_estimated_size
total_frames = ep_length
else:
# add to current batch
batch_episodes.append(ep_idx)
estimated_size += ep_estimated_size
total_frames += ep_length
# yield final batch if not empty
if batch_episodes:
yield batch_episodes
def _estimate_frame_size_via_calibration(
dataset: LeRobotDataset,
img_key: str,
episode_indices: list[int],
temp_dir: Path,
fps: int,
vcodec: str,
pix_fmt: str,
g: int,
crf: int,
fast_decode: int,
num_calibration_frames: int = 30,
) -> float:
"""Estimate MB per frame by encoding a small calibration sample.
Encodes a representative sample of frames using the exact codec parameters
to measure actual compression ratio, which is more accurate than heuristics.
Args:
dataset: Source dataset with images.
img_key: Image key to calibrate (e.g., "observation.images.top").
episode_indices: List of episode indices being processed.
temp_dir: Temporary directory for calibration files.
fps: Frames per second for video encoding.
vcodec: Video codec (libsvtav1, h264, hevc).
pix_fmt: Pixel format (yuv420p, etc.).
g: GOP size (group of pictures).
crf: Constant Rate Factor (quality).
fast_decode: Fast decode tuning parameter.
num_calibration_frames: Number of frames to use for calibration (default: 30).
Returns:
Estimated size in MB per frame based on actual encoding.
"""
calibration_dir = temp_dir / "calibration" / img_key
calibration_dir.mkdir(parents=True, exist_ok=True)
try:
# Select a representative episode (prefer middle episode if available)
calibration_ep_idx = episode_indices[len(episode_indices) // 2]
# Get episode range
from_idx = dataset.meta.episodes["dataset_from_index"][calibration_ep_idx]
to_idx = dataset.meta.episodes["dataset_to_index"][calibration_ep_idx]
episode_length = to_idx - from_idx
# Use up to num_calibration_frames from this episode
num_frames = min(num_calibration_frames, episode_length)
# Get frames from dataset
hf_dataset = dataset.hf_dataset.with_format(None)
sample_indices = range(from_idx, from_idx + num_frames)
# Save calibration frames
for i, idx in enumerate(sample_indices):
img = hf_dataset[idx][img_key]
img.save(str(calibration_dir / f"frame-{i:06d}.png"), quality=100)
# Encode calibration video
calibration_video_path = calibration_dir / "calibration.mp4"
encode_video_frames(
imgs_dir=calibration_dir,
video_path=calibration_video_path,
fps=fps,
vcodec=vcodec,
pix_fmt=pix_fmt,
g=g,
crf=crf,
fast_decode=fast_decode,
overwrite=True,
)
# Measure actual compressed size
video_size_bytes = calibration_video_path.stat().st_size
video_size_mb = video_size_bytes / BYTES_PER_MIB
size_per_frame_mb = video_size_mb / num_frames
logging.info(
f" Calibration: {num_frames} frames -> {video_size_mb:.2f} MB "
f"= {size_per_frame_mb:.4f} MB/frame for {img_key}"
)
return size_per_frame_mb
finally:
# Clean up calibration files
if calibration_dir.exists():
shutil.rmtree(calibration_dir)
def _copy_data_without_images(
src_dataset: LeRobotDataset,
dst_meta: LeRobotDatasetMetadata,
episode_indices: list[int],
img_keys: list[str],
) -> None:
"""Copy data files without image columns.
Args:
src_dataset: Source dataset
dst_meta: Destination metadata
episode_indices: Episodes to include
img_keys: Image keys to remove
"""
from lerobot.datasets.utils import DATA_DIR
data_dir = src_dataset.root / DATA_DIR
parquet_files = sorted(data_dir.glob("*/*.parquet"))
if not parquet_files:
raise ValueError(f"No parquet files found in {data_dir}")
episode_set = set(episode_indices)
for src_path in tqdm(parquet_files, desc="Processing data files"):
df = pd.read_parquet(src_path).reset_index(drop=True)
# Filter to only include selected episodes
df = df[df["episode_index"].isin(episode_set)].copy()
if len(df) == 0:
continue
# Remove image columns
columns_to_drop = [col for col in img_keys if col in df.columns]
if columns_to_drop:
df = df.drop(columns=columns_to_drop)
# Get chunk and file indices from path
relative_path = src_path.relative_to(src_dataset.root)
chunk_dir = relative_path.parts[1]
file_name = relative_path.parts[2]
chunk_idx = int(chunk_dir.split("-")[1])
file_idx = int(file_name.split("-")[1].split(".")[0])
# Write to destination without pandas index
dst_path = dst_meta.root / f"data/chunk-{chunk_idx:03d}/file-{file_idx:03d}.parquet"
dst_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(dst_path, index=False)
# Video conversion constants
BYTES_PER_KIB = 1024
BYTES_PER_MIB = BYTES_PER_KIB * BYTES_PER_KIB
def convert_image_to_video_dataset(
dataset: LeRobotDataset,
output_dir: Path,
repo_id: str | None = None,
vcodec: str = "libsvtav1",
pix_fmt: str = "yuv420p",
g: int = 2,
crf: int = 30,
fast_decode: int = 0,
episode_indices: list[int] | None = None,
num_workers: int = 4,
max_episodes_per_batch: int | None = None,
max_frames_per_batch: int | None = None,
) -> LeRobotDataset:
"""Convert image-to-video dataset.
Creates a new LeRobotDataset with images encoded as videos, following the proper
LeRobot dataset structure with videos stored in chunked MP4 files.
Args:
dataset: The source LeRobot dataset with images
output_dir: Directory to save the new video dataset
repo_id: Repository ID for the new dataset (default: original_id + "_video")
vcodec: Video codec (default: libsvtav1)
pix_fmt: Pixel format (default: yuv420p)
g: Group of pictures size (default: 2)
crf: Constant rate factor (default: 30)
fast_decode: Fast decode tuning (default: 0)
episode_indices: List of episode indices to convert (None = all episodes)
num_workers: Number of threads for parallel processing (default: 4)
max_episodes_per_batch: Maximum episodes per video batch to avoid memory issues (None = no limit)
max_frames_per_batch: Maximum frames per video batch to avoid memory issues (None = no limit)
Returns:
New LeRobotDataset with images encoded as videos
"""
# Check that it's an image dataset
if len(dataset.meta.video_keys) > 0:
raise ValueError(
f"This operation is for image datasets only. Video dataset provided: {dataset.repo_id}"
)
# Get all image keys
hf_dataset = dataset.hf_dataset.with_format(None)
img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)]
if len(img_keys) == 0:
raise ValueError(f"No image keys found in dataset {dataset.repo_id}")
# Determine which episodes to process
if episode_indices is None:
episode_indices = list(range(dataset.meta.total_episodes))
if repo_id is None:
repo_id = f"{dataset.repo_id}_video"
logging.info(
f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras from {dataset.repo_id}"
)
logging.info(f"Video codec: {vcodec}, pixel format: {pix_fmt}, GOP: {g}, CRF: {crf}")
# Create new features dict, converting image features to video features
new_features = {}
for key, value in dataset.meta.features.items():
if key not in img_keys:
new_features[key] = value
else:
# Convert image key to video format
new_features[key] = value.copy()
new_features[key]["dtype"] = "video" # Change dtype from "image" to "video"
# Video info will be updated after episodes are encoded
# Create new metadata for video dataset
new_meta = LeRobotDatasetMetadata.create(
repo_id=repo_id,
fps=dataset.meta.fps,
features=new_features,
robot_type=dataset.meta.robot_type,
root=output_dir,
use_videos=True,
chunks_size=dataset.meta.chunks_size,
data_files_size_in_mb=dataset.meta.data_files_size_in_mb,
video_files_size_in_mb=dataset.meta.video_files_size_in_mb,
)
# Create temporary directory for image extraction
temp_dir = output_dir / "temp_images"
temp_dir.mkdir(parents=True, exist_ok=True)
# Process all episodes and batch encode videos
# Use dictionary for O(1) episode metadata lookups instead of O(n) linear search
all_episode_metadata = {}
fps = int(dataset.fps)
try:
# Build episode metadata entries first
logging.info("Building episode metadata...")
cumulative_frame_idx = 0
for ep_idx in episode_indices:
src_episode = dataset.meta.episodes[ep_idx]
ep_length = src_episode["length"]
ep_meta = {
"episode_index": ep_idx,
"length": ep_length,
"dataset_from_index": cumulative_frame_idx,
"dataset_to_index": cumulative_frame_idx + ep_length,
}
if "data/chunk_index" in src_episode:
ep_meta["data/chunk_index"] = src_episode["data/chunk_index"]
ep_meta["data/file_index"] = src_episode["data/file_index"]
all_episode_metadata[ep_idx] = ep_meta
cumulative_frame_idx += ep_length
# Process each camera and batch encode multiple episodes together
video_file_size_limit = new_meta.video_files_size_in_mb
# Pre-compute episode lengths for batching
episode_lengths = {ep_idx: dataset.meta.episodes["length"][ep_idx] for ep_idx in episode_indices}
for img_key in tqdm(img_keys, desc="Processing cameras"):
# Estimate size per frame by encoding a small calibration sample
# This provides accurate compression ratio for the specific codec parameters
size_per_frame_mb = _estimate_frame_size_via_calibration(
dataset=dataset,
img_key=img_key,
episode_indices=episode_indices,
temp_dir=temp_dir,
fps=fps,
vcodec=vcodec,
pix_fmt=pix_fmt,
g=g,
crf=crf,
fast_decode=fast_decode,
)
logging.info(f"Processing camera: {img_key}")
chunk_idx, file_idx = 0, 0
cumulative_timestamp = 0.0
# Process episodes in batches to stay under size limit
for batch_episodes in _iter_episode_batches(
episode_indices=episode_indices,
episode_lengths=episode_lengths,
size_per_frame_mb=size_per_frame_mb,
video_file_size_limit=video_file_size_limit,
max_episodes=max_episodes_per_batch,
max_frames=max_frames_per_batch,
):
total_frames_in_batch = sum(episode_lengths[idx] for idx in batch_episodes)
logging.info(
f" Encoding batch of {len(batch_episodes)} episodes "
f"({batch_episodes[0]}-{batch_episodes[-1]}) = {total_frames_in_batch} frames"
)
# Save images for all episodes in this batch
imgs_dir = temp_dir / f"batch_{chunk_idx}_{file_idx}" / img_key
episode_durations = _save_batch_episodes_images(
dataset=dataset,
imgs_dir=imgs_dir,
img_key=img_key,
episode_indices=batch_episodes,
num_workers=num_workers,
)
# Encode all batched episodes into single video
video_path = new_meta.root / new_meta.video_path.format(
video_key=img_key, chunk_index=chunk_idx, file_index=file_idx
)
video_path.parent.mkdir(parents=True, exist_ok=True)
encode_video_frames(
imgs_dir=imgs_dir,
video_path=video_path,
fps=fps,
vcodec=vcodec,
pix_fmt=pix_fmt,
g=g,
crf=crf,
fast_decode=fast_decode,
overwrite=True,
)
# Clean up temporary images
shutil.rmtree(imgs_dir)
# Update metadata for each episode in the batch
for ep_idx, duration in zip(batch_episodes, episode_durations, strict=True):
from_timestamp = cumulative_timestamp
to_timestamp = cumulative_timestamp + duration
cumulative_timestamp = to_timestamp
# Find episode metadata entry and add video metadata (O(1) dictionary lookup)
ep_meta = all_episode_metadata[ep_idx]
ep_meta[f"videos/{img_key}/chunk_index"] = chunk_idx
ep_meta[f"videos/{img_key}/file_index"] = file_idx
ep_meta[f"videos/{img_key}/from_timestamp"] = from_timestamp
ep_meta[f"videos/{img_key}/to_timestamp"] = to_timestamp
# Move to next video file for next batch
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, new_meta.chunks_size)
cumulative_timestamp = 0.0
# Copy and transform data files (removing image columns)
_copy_data_without_images(dataset, new_meta, episode_indices, img_keys)
# Save episode metadata
episodes_df = pd.DataFrame(list(all_episode_metadata.values()))
episodes_path = new_meta.root / "meta" / "episodes" / "chunk-000" / "file-000.parquet"
episodes_path.parent.mkdir(parents=True, exist_ok=True)
episodes_df.to_parquet(episodes_path, index=False)
# Update metadata info
new_meta.info["total_episodes"] = len(episode_indices)
new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata.values())
new_meta.info["total_tasks"] = dataset.meta.total_tasks
new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"}
# Update video info for all image keys (now videos)
# We need to manually set video info since update_video_info() checks video_keys first
for img_key in img_keys:
if not new_meta.features[img_key].get("info", None):
video_path = new_meta.root / new_meta.video_path.format(
video_key=img_key, chunk_index=0, file_index=0
)
new_meta.info["features"][img_key]["info"] = get_video_info(video_path)
write_info(new_meta.info, new_meta.root)
# Copy stats and tasks
if dataset.meta.stats is not None:
# Remove image stats
new_stats = {k: v for k, v in dataset.meta.stats.items() if k not in img_keys}
write_stats(new_stats, new_meta.root)
if dataset.meta.tasks is not None:
write_tasks(dataset.meta.tasks, new_meta.root)
finally:
# Clean up temporary directory
if temp_dir.exists():
shutil.rmtree(temp_dir)
logging.info(f"Completed converting {dataset.repo_id} to video format")
logging.info(f"New dataset saved to: {output_dir}")
# Return new dataset
return LeRobotDataset(repo_id=repo_id, root=output_dir)
+32 -379
View File
@@ -66,23 +66,23 @@ Remove camera feature:
--operation.type remove_feature \ --operation.type remove_feature \
--operation.feature_names "['observation.images.top']" --operation.feature_names "['observation.images.top']"
Convert image dataset to video format (saves locally): Convert image dataset to video format and save locally:
python -m lerobot.scripts.lerobot_edit_dataset \ python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht_image \ --repo_id lerobot/pusht_image \
--operation.type convert_to_video \ --operation.type convert_image_to_video \
--operation.output_dir /path/to/output/pusht_video --operation.output_dir /path/to/output/pusht_video
Convert image dataset and save with new repo_id: Convert image dataset to video format and save with new repo_id:
python -m lerobot.scripts.lerobot_edit_dataset \ python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht_image \ --repo_id lerobot/pusht_image \
--new_repo_id lerobot/pusht_video \ --new_repo_id lerobot/pusht_video \
--operation.type convert_to_video --operation.type convert_image_to_video
Convert and push to hub: Convert image dataset to video format and push to hub:
python -m lerobot.scripts.lerobot_edit_dataset \ python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht_image \ --repo_id lerobot/pusht_image \
--new_repo_id lerobot/pusht_video \ --new_repo_id lerobot/pusht_video \
--operation.type convert_to_video \ --operation.type convert_image_to_video \
--push_to_hub true --push_to_hub true
Using JSON config file: Using JSON config file:
@@ -92,24 +92,19 @@ Using JSON config file:
import logging import logging
import shutil import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import pandas as pd
from tqdm import tqdm
from lerobot.configs import parser from lerobot.configs import parser
from lerobot.datasets.dataset_tools import ( from lerobot.datasets.dataset_tools import (
convert_image_to_video_dataset,
delete_episodes, delete_episodes,
merge_datasets, merge_datasets,
remove_feature, remove_feature,
split_dataset, split_dataset,
) )
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import write_stats, write_tasks from lerobot.utils.constants import HF_LEROBOT_HOME
from lerobot.datasets.video_utils import encode_video_frames, get_video_info
from lerobot.utils.constants import HF_LEROBOT_HOME, OBS_IMAGE
from lerobot.utils.utils import init_logging from lerobot.utils.utils import init_logging
@@ -138,8 +133,8 @@ class RemoveFeatureConfig:
@dataclass @dataclass
class ConvertToVideoConfig: class ConvertImageToVideoConfig:
type: str = "convert_to_video" type: str = "convert_image_to_video"
output_dir: str | None = None output_dir: str | None = None
vcodec: str = "libsvtav1" vcodec: str = "libsvtav1"
pix_fmt: str = "yuv420p" pix_fmt: str = "yuv420p"
@@ -148,12 +143,16 @@ class ConvertToVideoConfig:
fast_decode: int = 0 fast_decode: int = 0
episode_indices: list[int] | None = None episode_indices: list[int] | None = None
num_workers: int = 4 num_workers: int = 4
max_episodes_per_batch: int | None = None
max_frames_per_batch: int | None = None
@dataclass @dataclass
class EditDatasetConfig: class EditDatasetConfig:
repo_id: str repo_id: str
operation: DeleteEpisodesConfig | SplitConfig | MergeConfig | RemoveFeatureConfig | ConvertToVideoConfig operation: (
DeleteEpisodesConfig | SplitConfig | MergeConfig | RemoveFeatureConfig | ConvertImageToVideoConfig
)
root: str | None = None root: str | None = None
new_repo_id: str | None = None new_repo_id: str | None = None
push_to_hub: bool = False push_to_hub: bool = False
@@ -297,362 +296,7 @@ def handle_remove_feature(cfg: EditDatasetConfig) -> None:
LeRobotDataset(output_repo_id, root=output_dir).push_to_hub() LeRobotDataset(output_repo_id, root=output_dir).push_to_hub()
def save_episode_images_for_video( def handle_convert_image_to_video(cfg: EditDatasetConfig) -> None:
dataset: LeRobotDataset,
imgs_dir: Path,
img_key: str,
episode_index: int,
num_workers: int = 4,
) -> None:
"""Save images from a specific episode and camera to disk for video encoding.
Args:
dataset: The LeRobot dataset to extract images from
imgs_dir: Directory to save images to
img_key: The image key (camera) to extract
episode_index: Index of the episode to save
num_workers: Number of threads for parallel image saving
"""
# Create directory
imgs_dir.mkdir(parents=True, exist_ok=True)
# Get dataset without torch format for PIL image access
hf_dataset = dataset.hf_dataset.with_format(None)
# Select only this camera's images
imgs_dataset = hf_dataset.select_columns(img_key)
# Get episode start and end indices
from_idx = dataset.meta.episodes["dataset_from_index"][episode_index]
to_idx = dataset.meta.episodes["dataset_to_index"][episode_index]
# Get all items for this episode
episode_dataset = imgs_dataset.select(range(from_idx, to_idx))
# Define function to save a single image
def save_single_image(i_item_tuple):
i, item = i_item_tuple
img = item[img_key]
# Use frame-XXXXXX.png format to match encode_video_frames expectations
img.save(str(imgs_dir / f"frame-{i:06d}.png"), quality=100)
return i
# Save images with proper naming convention for encode_video_frames (frame-XXXXXX.png)
items = list(enumerate(episode_dataset))
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(save_single_image, item) for item in items]
for future in as_completed(futures):
future.result() # This will raise any exceptions that occurred
def encode_episode_videos(
dataset: LeRobotDataset,
new_meta: LeRobotDatasetMetadata,
episode_index: int,
vcodec: str,
pix_fmt: str,
g: int,
crf: int,
fast_decode: int,
temp_dir: Path,
num_image_workers: int = 4,
) -> dict[str, dict]:
"""Encode videos for a single episode and return video metadata.
Args:
dataset: Source dataset with images
new_meta: Metadata object for the new video dataset
episode_index: Episode index to process
vcodec: Video codec
pix_fmt: Pixel format
g: Group of pictures size
crf: Constant rate factor
fast_decode: Fast decode tuning
temp_dir: Temporary directory for images
num_image_workers: Number of workers for saving images
Returns:
Dictionary mapping video keys to their metadata (chunk_index, file_index, timestamps)
"""
hf_dataset = dataset.hf_dataset.with_format(None)
img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)]
video_metadata = {}
fps = int(dataset.fps) # Convert to int for PyAV compatibility
episode_length = dataset.meta.episodes["length"][episode_index]
episode_duration = episode_length / dataset.fps # Use original fps for duration calculation
for img_key in img_keys:
# Save images temporarily
imgs_dir = temp_dir / f"episode_{episode_index:06d}" / img_key
save_episode_images_for_video(dataset, imgs_dir, img_key, episode_index, num_image_workers)
# Determine chunk and file indices
# For simplicity, we'll put each episode in its own file
chunk_idx = episode_index // new_meta.chunks_size
file_idx = episode_index % new_meta.chunks_size
# Create video path in the new dataset structure
video_path = new_meta.root / new_meta.video_path.format(
video_key=img_key, chunk_index=chunk_idx, file_index=file_idx
)
video_path.parent.mkdir(parents=True, exist_ok=True)
# Encode video
encode_video_frames(
imgs_dir=imgs_dir,
video_path=video_path,
fps=fps,
vcodec=vcodec,
pix_fmt=pix_fmt,
g=g,
crf=crf,
fast_decode=fast_decode,
overwrite=True,
)
# Clean up temporary images
shutil.rmtree(imgs_dir)
# Store video metadata
video_metadata[img_key] = {
f"videos/{img_key}/chunk_index": chunk_idx,
f"videos/{img_key}/file_index": file_idx,
f"videos/{img_key}/from_timestamp": 0.0,
f"videos/{img_key}/to_timestamp": episode_duration,
}
return video_metadata
def convert_dataset_to_videos(
dataset: LeRobotDataset,
output_dir: Path,
repo_id: str | None = None,
vcodec: str = "libsvtav1",
pix_fmt: str = "yuv420p",
g: int = 2,
crf: int = 30,
fast_decode: int = 0,
episode_indices: list[int] | None = None,
num_workers: int = 4,
) -> LeRobotDataset:
"""Convert image-based dataset to video-based dataset.
Creates a new LeRobotDataset with videos instead of images, following the proper
LeRobot dataset structure with videos stored in chunked MP4 files.
Args:
dataset: The source LeRobot dataset with images
output_dir: Directory to save the new video dataset
repo_id: Repository ID for the new dataset (default: original_id + "_video")
vcodec: Video codec (default: libsvtav1)
pix_fmt: Pixel format (default: yuv420p)
g: Group of pictures size (default: 2)
crf: Constant rate factor (default: 30)
fast_decode: Fast decode tuning (default: 0)
episode_indices: List of episode indices to convert (None = all episodes)
num_workers: Number of threads for parallel processing (default: 4)
Returns:
New LeRobotDataset with videos
"""
# Check that it's an image dataset
if len(dataset.meta.video_keys) > 0:
raise ValueError(
f"This operation is for image datasets only. Video dataset provided: {dataset.repo_id}"
)
# Get all image keys
hf_dataset = dataset.hf_dataset.with_format(None)
img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)]
if len(img_keys) == 0:
raise ValueError(f"No image keys found in dataset {dataset.repo_id}")
# Determine which episodes to process
if episode_indices is None:
episode_indices = list(range(dataset.meta.total_episodes))
if repo_id is None:
repo_id = f"{dataset.repo_id}_video"
logging.info(
f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras from {dataset.repo_id}"
)
logging.info(f"Video codec: {vcodec}, pixel format: {pix_fmt}, GOP: {g}, CRF: {crf}")
# Create new features dict, converting image features to video features
new_features = {}
for key, value in dataset.meta.features.items():
if key not in img_keys:
new_features[key] = value
else:
# Convert image key to video format
new_features[key] = value.copy()
new_features[key]["dtype"] = "video" # Change dtype from "image" to "video"
# Video info will be updated after episodes are encoded
# Create new metadata for video dataset
new_meta = LeRobotDatasetMetadata.create(
repo_id=repo_id,
fps=dataset.meta.fps,
features=new_features,
robot_type=dataset.meta.robot_type,
root=output_dir,
use_videos=True,
chunks_size=dataset.meta.chunks_size,
data_files_size_in_mb=dataset.meta.data_files_size_in_mb,
video_files_size_in_mb=dataset.meta.video_files_size_in_mb,
)
# Create temporary directory for image extraction
temp_dir = output_dir / "temp_images"
temp_dir.mkdir(parents=True, exist_ok=True)
# Process each episode
all_episode_metadata = []
try:
for ep_idx in tqdm(episode_indices, desc="Converting episodes to videos"):
# Get episode metadata from source
src_episode = dataset.meta.episodes[ep_idx]
# Encode videos for this episode
video_metadata = encode_episode_videos(
dataset=dataset,
new_meta=new_meta,
episode_index=ep_idx,
vcodec=vcodec,
pix_fmt=pix_fmt,
g=g,
crf=crf,
fast_decode=fast_decode,
temp_dir=temp_dir,
num_image_workers=num_workers,
)
# Build episode metadata
episode_meta = {
"episode_index": ep_idx,
"length": src_episode["length"],
"dataset_from_index": ep_idx * src_episode["length"],
"dataset_to_index": (ep_idx + 1) * src_episode["length"],
}
# Add video metadata
for img_key in img_keys:
episode_meta.update(video_metadata[img_key])
# Add data chunk/file info (using same structure as source)
if "data/chunk_index" in src_episode:
episode_meta["data/chunk_index"] = src_episode["data/chunk_index"]
episode_meta["data/file_index"] = src_episode["data/file_index"]
all_episode_metadata.append(episode_meta)
# Copy and transform data files (removing image columns)
_copy_data_without_images(dataset, new_meta, episode_indices, img_keys)
# Save episode metadata
episodes_df = pd.DataFrame(all_episode_metadata)
episodes_path = new_meta.root / "meta" / "episodes" / "chunk-000" / "file-000.parquet"
episodes_path.parent.mkdir(parents=True, exist_ok=True)
episodes_df.to_parquet(episodes_path, index=False)
# Update metadata info
new_meta.info["total_episodes"] = len(episode_indices)
new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata)
new_meta.info["total_tasks"] = dataset.meta.total_tasks
new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"}
# Update video info for all image keys (now videos)
# We need to manually set video info since update_video_info() checks video_keys first
for img_key in img_keys:
if not new_meta.features[img_key].get("info", None):
video_path = new_meta.root / new_meta.video_path.format(
video_key=img_key, chunk_index=0, file_index=0
)
new_meta.info["features"][img_key]["info"] = get_video_info(video_path)
from lerobot.datasets.utils import write_info
write_info(new_meta.info, new_meta.root)
# Copy stats and tasks
if dataset.meta.stats is not None:
# Remove image stats
new_stats = {k: v for k, v in dataset.meta.stats.items() if k not in img_keys}
write_stats(new_stats, new_meta.root)
if dataset.meta.tasks is not None:
write_tasks(dataset.meta.tasks, new_meta.root)
finally:
# Clean up temporary directory
if temp_dir.exists():
shutil.rmtree(temp_dir)
logging.info(f"✓ Completed converting {dataset.repo_id} to video format")
logging.info(f"New dataset saved to: {output_dir}")
# Return new dataset
return LeRobotDataset(repo_id=repo_id, root=output_dir)
def _copy_data_without_images(
src_dataset: LeRobotDataset,
dst_meta: LeRobotDatasetMetadata,
episode_indices: list[int],
img_keys: list[str],
) -> None:
"""Copy data files without image columns.
Args:
src_dataset: Source dataset
dst_meta: Destination metadata
episode_indices: Episodes to include
img_keys: Image keys to remove
"""
from lerobot.datasets.utils import DATA_DIR
data_dir = src_dataset.root / DATA_DIR
parquet_files = sorted(data_dir.glob("*/*.parquet"))
if not parquet_files:
raise ValueError(f"No parquet files found in {data_dir}")
episode_set = set(episode_indices)
for src_path in tqdm(parquet_files, desc="Processing data files"):
df = pd.read_parquet(src_path).reset_index(drop=True)
# Filter to only include selected episodes
df = df[df["episode_index"].isin(episode_set)].copy()
if len(df) == 0:
continue
# Remove image columns
columns_to_drop = [col for col in img_keys if col in df.columns]
if columns_to_drop:
df = df.drop(columns=columns_to_drop)
# Get chunk and file indices from path
relative_path = src_path.relative_to(src_dataset.root)
chunk_dir = relative_path.parts[1]
file_name = relative_path.parts[2]
chunk_idx = int(chunk_dir.split("-")[1])
file_idx = int(file_name.split("-")[1].split(".")[0])
# Write to destination without pandas index
dst_path = dst_meta.root / f"data/chunk-{chunk_idx:03d}/file-{file_idx:03d}.parquet"
dst_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(dst_path, index=False)
def handle_convert_to_video(cfg: EditDatasetConfig) -> None:
# Note: Parser may create any config type with the right fields, so we access fields directly # Note: Parser may create any config type with the right fields, so we access fields directly
# instead of checking isinstance() # instead of checking isinstance()
dataset = LeRobotDataset(cfg.repo_id, root=cfg.root) dataset = LeRobotDataset(cfg.repo_id, root=cfg.root)
@@ -664,8 +308,12 @@ def handle_convert_to_video(cfg: EditDatasetConfig) -> None:
if cfg.new_repo_id: if cfg.new_repo_id:
# Use new_repo_id for both local storage and hub push # Use new_repo_id for both local storage and hub push
output_repo_id = cfg.new_repo_id output_repo_id = cfg.new_repo_id
output_dir = Path(cfg.root) / cfg.new_repo_id if cfg.root else HF_LEROBOT_HOME / cfg.new_repo_id # Place new dataset as a sibling to the original dataset
logging.info(f"Saving to new dataset: {cfg.new_repo_id}") # Get the parent of the actual dataset root (not cfg.root which might be the lerobot cache dir)
# Extract just the dataset name (after last slash) for the local directory
local_dir_name = cfg.new_repo_id.split("/")[-1]
output_dir = dataset.root.parent / local_dir_name
logging.info(f"Saving to new dataset: {cfg.new_repo_id} at {output_dir}")
elif output_dir_config: elif output_dir_config:
# Use custom output directory for local-only storage # Use custom output directory for local-only storage
output_dir = Path(output_dir_config) output_dir = Path(output_dir_config)
@@ -675,12 +323,15 @@ def handle_convert_to_video(cfg: EditDatasetConfig) -> None:
else: else:
# Auto-generate name: append "_video" to original repo_id # Auto-generate name: append "_video" to original repo_id
output_repo_id = f"{cfg.repo_id}_video" output_repo_id = f"{cfg.repo_id}_video"
output_dir = Path(cfg.root) / output_repo_id if cfg.root else HF_LEROBOT_HOME / output_repo_id # Place new dataset as a sibling to the original dataset
# Extract just the dataset name (after last slash) for the local directory
local_dir_name = output_repo_id.split("/")[-1]
output_dir = dataset.root.parent / local_dir_name
logging.info(f"Saving to auto-generated location: {output_dir}") logging.info(f"Saving to auto-generated location: {output_dir}")
logging.info(f"Converting dataset {cfg.repo_id} to video format") logging.info(f"Converting dataset {cfg.repo_id} to video format")
new_dataset = convert_dataset_to_videos( new_dataset = convert_image_to_video_dataset(
dataset=dataset, dataset=dataset,
output_dir=output_dir, output_dir=output_dir,
repo_id=output_repo_id, repo_id=output_repo_id,
@@ -691,6 +342,8 @@ def handle_convert_to_video(cfg: EditDatasetConfig) -> None:
fast_decode=getattr(cfg.operation, "fast_decode", 0), fast_decode=getattr(cfg.operation, "fast_decode", 0),
episode_indices=getattr(cfg.operation, "episode_indices", None), episode_indices=getattr(cfg.operation, "episode_indices", None),
num_workers=getattr(cfg.operation, "num_workers", 4), num_workers=getattr(cfg.operation, "num_workers", 4),
max_episodes_per_batch=getattr(cfg.operation, "max_episodes_per_batch", None),
max_frames_per_batch=getattr(cfg.operation, "max_frames_per_batch", None),
) )
logging.info("Video dataset created successfully!") logging.info("Video dataset created successfully!")
@@ -718,8 +371,8 @@ def edit_dataset(cfg: EditDatasetConfig) -> None:
handle_merge(cfg) handle_merge(cfg)
elif operation_type == "remove_feature": elif operation_type == "remove_feature":
handle_remove_feature(cfg) handle_remove_feature(cfg)
elif operation_type == "convert_to_video": elif operation_type == "convert_image_to_video":
handle_convert_to_video(cfg) handle_convert_image_to_video(cfg)
else: else:
raise ValueError( raise ValueError(
f"Unknown operation type: {operation_type}\n" f"Unknown operation type: {operation_type}\n"
+5 -5
View File
@@ -29,7 +29,7 @@ from lerobot.datasets.dataset_tools import (
remove_feature, remove_feature,
split_dataset, split_dataset,
) )
from lerobot.scripts.lerobot_edit_dataset import convert_dataset_to_videos from lerobot.scripts.lerobot_edit_dataset import convert_image_to_video_dataset
@pytest.fixture @pytest.fixture
@@ -1050,7 +1050,7 @@ def test_modify_features_preserves_file_structure(sample_dataset, tmp_path):
assert "reward" in modified_dataset.meta.features assert "reward" in modified_dataset.meta.features
def test_convert_dataset_to_videos(tmp_path): def test_convert_image_to_video_dataset(tmp_path):
"""Test converting lerobot/pusht_image dataset to video format.""" """Test converting lerobot/pusht_image dataset to video format."""
from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.lerobot_dataset import LeRobotDataset
@@ -1071,7 +1071,7 @@ def test_convert_dataset_to_videos(tmp_path):
assert "observation.image" in source_dataset.meta.features assert "observation.image" in source_dataset.meta.features
# Convert to video dataset (only first 2 episodes for speed) # Convert to video dataset (only first 2 episodes for speed)
video_dataset = convert_dataset_to_videos( video_dataset = convert_image_to_video_dataset(
dataset=source_dataset, dataset=source_dataset,
output_dir=output_dir, output_dir=output_dir,
repo_id="lerobot/pusht_video", repo_id="lerobot/pusht_video",
@@ -1113,7 +1113,7 @@ def test_convert_dataset_to_videos(tmp_path):
shutil.rmtree(output_dir) shutil.rmtree(output_dir)
def test_convert_dataset_to_videos_subset_episodes(tmp_path): def test_convert_image_to_video_dataset_subset_episodes(tmp_path):
"""Test converting only specific episodes from lerobot/pusht_image to video format.""" """Test converting only specific episodes from lerobot/pusht_image to video format."""
from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.lerobot_dataset import LeRobotDataset
@@ -1132,7 +1132,7 @@ def test_convert_dataset_to_videos_subset_episodes(tmp_path):
# Convert only episode 0 to video (subset of loaded episodes) # Convert only episode 0 to video (subset of loaded episodes)
episode_indices = [0] episode_indices = [0]
video_dataset = convert_dataset_to_videos( video_dataset = convert_image_to_video_dataset(
dataset=source_dataset, dataset=source_dataset,
output_dir=output_dir, output_dir=output_dir,
repo_id="lerobot/pusht_video_subset", repo_id="lerobot/pusht_video_subset",