mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-19 02:29:47 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6e01006d94 | |||
| 5547757cea | |||
| 197fcdb807 | |||
| e8503f5fe6 | |||
| 7559641c55 |
@@ -31,6 +31,7 @@ from .dataset_tools import (
|
||||
modify_features,
|
||||
modify_tasks,
|
||||
recompute_stats,
|
||||
reencode_dataset,
|
||||
remove_feature,
|
||||
split_dataset,
|
||||
)
|
||||
@@ -77,6 +78,7 @@ __all__ = [
|
||||
"modify_features",
|
||||
"modify_tasks",
|
||||
"recompute_stats",
|
||||
"reencode_dataset",
|
||||
"remove_feature",
|
||||
"resolve_delta_timestamps",
|
||||
"safe_stop_image_writer",
|
||||
|
||||
@@ -26,7 +26,7 @@ This module provides utilities for:
|
||||
import logging
|
||||
import shutil
|
||||
from collections.abc import Callable
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
|
||||
import datasets
|
||||
@@ -61,11 +61,13 @@ from .utils import (
|
||||
DEFAULT_DATA_FILE_SIZE_IN_MB,
|
||||
DEFAULT_DATA_PATH,
|
||||
DEFAULT_EPISODES_PATH,
|
||||
VIDEO_DIR,
|
||||
update_chunk_file_indices,
|
||||
)
|
||||
from .video_utils import (
|
||||
encode_video_frames,
|
||||
get_video_info,
|
||||
reencode_video,
|
||||
)
|
||||
|
||||
|
||||
@@ -1884,3 +1886,83 @@ def convert_image_to_video_dataset(
|
||||
|
||||
# Return new dataset
|
||||
return LeRobotDataset(repo_id=repo_id, root=output_dir)
|
||||
|
||||
|
||||
def _reencode_video_worker(args: tuple) -> Path:
|
||||
"""Picklable worker for :func:`reencode_dataset`'s process pool."""
|
||||
video_path, camera_encoder, encoder_threads = args
|
||||
reencode_video(
|
||||
input_video_path=video_path,
|
||||
output_video_path=video_path,
|
||||
camera_encoder=camera_encoder,
|
||||
encoder_threads=encoder_threads,
|
||||
overwrite=True,
|
||||
)
|
||||
return video_path
|
||||
|
||||
|
||||
def reencode_dataset(
|
||||
dataset: LeRobotDataset,
|
||||
camera_encoder: VideoEncoderConfig,
|
||||
encoder_threads: int | None = None,
|
||||
num_workers: int | None = None,
|
||||
) -> LeRobotDataset:
|
||||
"""Re-encode every video in a dataset with a new set of encoding parameters.
|
||||
|
||||
Videos are re-encoded in-place and the video information in ``info.json`` is refreshed.
|
||||
|
||||
Args:
|
||||
dataset: An existing :class:`LeRobotDataset` whose videos will be
|
||||
re-encoded.
|
||||
camera_encoder: Target encoder configuration applied to every video
|
||||
file.
|
||||
encoder_threads: Per-encoder thread count forwarded to
|
||||
:func:`reencode_video`. ``None`` lets the codec decide.
|
||||
num_workers: Number of parallel processes. ``None`` or ``0`` means
|
||||
sequential (no multiprocessing); ``1+`` spawns a
|
||||
:class:`~concurrent.futures.ProcessPoolExecutor`.
|
||||
|
||||
Returns:
|
||||
The same :class:`LeRobotDataset` instance with its metadata updated
|
||||
on disk.
|
||||
"""
|
||||
meta = dataset.meta
|
||||
video_paths_list = []
|
||||
|
||||
# Only re-encode if the videos are not already encoded with the given video encoding parameters
|
||||
for video_key in meta.video_keys:
|
||||
current_info = meta.info.features[video_key].get("info", {})
|
||||
current_encoder = VideoEncoderConfig.from_video_info(current_info)
|
||||
if current_encoder != camera_encoder:
|
||||
video_paths_list.extend((meta.root / VIDEO_DIR / video_key).rglob("*.mp4"))
|
||||
else:
|
||||
logging.info(f"{video_key} videos are already encoded with {camera_encoder}. Nothing to do.")
|
||||
|
||||
if len(video_paths_list) == 0:
|
||||
logging.warning("Dataset has no videos to re-encode.")
|
||||
return dataset
|
||||
logging.info(f"Re-encoding {len(video_paths_list)} video file(s) with {camera_encoder}")
|
||||
|
||||
worker_args = [(vp, camera_encoder, encoder_threads) for vp in video_paths_list]
|
||||
if num_workers and num_workers > 1:
|
||||
with ProcessPoolExecutor(max_workers=num_workers) as pool:
|
||||
futures = [pool.submit(_reencode_video_worker, args) for args in worker_args]
|
||||
for future in tqdm(
|
||||
as_completed(futures),
|
||||
total=len(futures),
|
||||
desc="Re-encoding videos",
|
||||
):
|
||||
future.result()
|
||||
else:
|
||||
for args in tqdm(worker_args, desc="Re-encoding videos"):
|
||||
_reencode_video_worker(args)
|
||||
|
||||
# Refresh video info in metadata for every video key.
|
||||
for vid_key in meta.video_keys:
|
||||
video_path = meta.root / meta.get_video_file_path(0, vid_key)
|
||||
meta.info.features[vid_key]["info"] = get_video_info(video_path, camera_encoder=camera_encoder)
|
||||
|
||||
write_info(meta.info, meta.root)
|
||||
logging.info("Dataset metadata updated.")
|
||||
|
||||
return dataset
|
||||
|
||||
@@ -403,6 +403,92 @@ def encode_video_frames(
|
||||
raise OSError(f"Video encoding did not work. File not found: {video_path}.")
|
||||
|
||||
|
||||
def reencode_video(
|
||||
input_video_path: Path | str,
|
||||
output_video_path: Path | str,
|
||||
camera_encoder: VideoEncoderConfig | None = None,
|
||||
encoder_threads: int | None = None,
|
||||
log_level: int | None = av.logging.WARNING,
|
||||
overwrite: bool = False,
|
||||
) -> None:
|
||||
"""Re-encode a video file using the given encoder configuration.
|
||||
|
||||
Args:
|
||||
input_video_path: Existing video file to read.
|
||||
output_video_path: Path for the re-encoded file.
|
||||
camera_encoder: Encoder configuration. Defaults to :func:`camera_encoder_defaults`.
|
||||
encoder_threads: Optional thread count forwarded to :meth:`VideoEncoderConfig.get_codec_options`.
|
||||
log_level: libav log level while encoding, or ``None`` to leave logging unchanged. Defaults to WARNING.
|
||||
overwrite: When ``False`` and ``output_video_path`` already exists, skip and log a warning.
|
||||
"""
|
||||
|
||||
camera_encoder = camera_encoder or camera_encoder_defaults()
|
||||
|
||||
output_video_path = Path(output_video_path)
|
||||
|
||||
if output_video_path.exists() and not overwrite:
|
||||
logger.warning(f"Video file already exists: {output_video_path}. Skipping re-encode.")
|
||||
return
|
||||
|
||||
output_video_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
video_options = camera_encoder.get_codec_options(encoder_threads, as_strings=True)
|
||||
vcodec = camera_encoder.vcodec
|
||||
pix_fmt = camera_encoder.pix_fmt
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_named_file:
|
||||
tmp_output_video_path = tmp_named_file.name
|
||||
|
||||
if log_level is not None:
|
||||
logging.getLogger("libav").setLevel(log_level)
|
||||
|
||||
try:
|
||||
with av.open(input_video_path, mode="r") as src:
|
||||
try:
|
||||
in_stream = src.streams.video[0]
|
||||
except IndexError as e:
|
||||
raise ValueError(f"No video stream in {input_video_path}") from e
|
||||
|
||||
fps = (
|
||||
in_stream.base_rate
|
||||
) # We allow fractional fps though LeRobotDataset only supports integer fps
|
||||
width = int(in_stream.width)
|
||||
height = int(in_stream.height)
|
||||
|
||||
with av.open(
|
||||
tmp_output_video_path,
|
||||
mode="w",
|
||||
options={
|
||||
"movflags": "faststart"
|
||||
}, # faststart is to move the metadata to the beginning of the file to speed up loading
|
||||
) as dst:
|
||||
out_stream = dst.add_stream(vcodec, fps, options=video_options)
|
||||
out_stream.pix_fmt = pix_fmt
|
||||
out_stream.width = width
|
||||
out_stream.height = height
|
||||
|
||||
for frame in src.decode(in_stream):
|
||||
frame = frame.reformat(width=width, height=height, format=pix_fmt)
|
||||
packet = out_stream.encode(frame)
|
||||
if packet:
|
||||
dst.mux(packet)
|
||||
|
||||
packet = out_stream.encode()
|
||||
if packet:
|
||||
dst.mux(packet)
|
||||
|
||||
shutil.move(tmp_output_video_path, output_video_path)
|
||||
except Exception:
|
||||
Path(tmp_output_video_path).unlink(missing_ok=True)
|
||||
raise
|
||||
finally:
|
||||
if log_level is not None:
|
||||
av.logging.restore_default_callback()
|
||||
|
||||
if not output_video_path.exists():
|
||||
raise OSError(f"Video re-encoding did not work. File not found: {output_video_path}.")
|
||||
|
||||
|
||||
def concatenate_video_files(
|
||||
input_video_paths: list[Path | str],
|
||||
output_video_path: Path,
|
||||
|
||||
@@ -178,6 +178,31 @@ Recompute stats for relative actions and push to hub:
|
||||
--operation.num_workers 4 \
|
||||
--push_to_hub true
|
||||
|
||||
Re-encode all videos in a dataset (saves to lerobot/pusht_reencoded by default):
|
||||
lerobot-edit-dataset \
|
||||
--repo_id lerobot/pusht \
|
||||
--operation.type reencode_videos \
|
||||
--operation.camera_encoder.vcodec h264 \
|
||||
--operation.camera_encoder.pix_fmt yuv420p \
|
||||
--operation.camera_encoder.crf 23
|
||||
|
||||
Re-encode videos into a new dataset using 4 parallel processes:
|
||||
lerobot-edit-dataset \
|
||||
--repo_id lerobot/pusht \
|
||||
--new_repo_id lerobot/pusht_h264 \
|
||||
--operation.type reencode_videos \
|
||||
--operation.camera_encoder.vcodec h264 \
|
||||
--operation.camera_encoder.crf 23 \
|
||||
--operation.num_workers 4
|
||||
|
||||
Re-encode videos in-place (overwrites original dataset):
|
||||
lerobot-edit-dataset \
|
||||
--repo_id lerobot/pusht \
|
||||
--new_repo_id lerobot/pusht \
|
||||
--operation.type reencode_videos \
|
||||
--operation.camera_encoder.vcodec h264 \
|
||||
--operation.overwrite true
|
||||
|
||||
Using JSON config file:
|
||||
lerobot-edit-dataset \
|
||||
--config_path path/to/edit_config.json
|
||||
@@ -200,6 +225,7 @@ from lerobot.datasets import (
|
||||
merge_datasets,
|
||||
modify_tasks,
|
||||
recompute_stats,
|
||||
reencode_dataset,
|
||||
remove_feature,
|
||||
split_dataset,
|
||||
)
|
||||
@@ -268,6 +294,15 @@ class RecomputeStatsConfig(OperationConfig):
|
||||
overwrite: bool = False
|
||||
|
||||
|
||||
@OperationConfig.register_subclass("reencode_videos")
|
||||
@dataclass
|
||||
class ReencodeVideosConfig(OperationConfig):
|
||||
camera_encoder: VideoEncoderConfig = field(default_factory=camera_encoder_defaults)
|
||||
num_workers: int = 0
|
||||
encoder_threads: int | None = None
|
||||
overwrite: bool = False
|
||||
|
||||
|
||||
@OperationConfig.register_subclass("info")
|
||||
@dataclass
|
||||
class InfoConfig(OperationConfig):
|
||||
@@ -634,6 +669,58 @@ def handle_recompute_stats(cfg: EditDatasetConfig) -> None:
|
||||
dataset.push_to_hub()
|
||||
|
||||
|
||||
def handle_reencode_videos(cfg: EditDatasetConfig) -> None:
|
||||
if not isinstance(cfg.operation, ReencodeVideosConfig):
|
||||
raise ValueError("Operation config must be ReencodeVideosConfig")
|
||||
|
||||
output_repo_id, input_root, output_root = _resolve_io_paths(
|
||||
cfg.repo_id,
|
||||
cfg.new_repo_id,
|
||||
cfg.root,
|
||||
cfg.new_root,
|
||||
default_new_repo_id=f"{cfg.repo_id}_reencoded",
|
||||
)
|
||||
in_place = output_root == input_root
|
||||
|
||||
if in_place and not cfg.operation.overwrite:
|
||||
raise ValueError(
|
||||
f"reencode_videos would overwrite the dataset in-place at {input_root}. "
|
||||
"Pass --operation.overwrite true to allow in-place modification, "
|
||||
"or use --new_repo_id / --new_root to write to a different location. "
|
||||
f"Default output repo_id when neither is set: '{cfg.repo_id}_reencoded'."
|
||||
)
|
||||
|
||||
if in_place:
|
||||
logging.warning(
|
||||
f"Overwriting dataset videos in-place at {input_root}. The original videos will be lost."
|
||||
)
|
||||
dataset = LeRobotDataset(cfg.repo_id, root=input_root)
|
||||
else:
|
||||
logging.info(f"Copying dataset from {input_root} to {output_root}")
|
||||
if output_root.exists():
|
||||
backup_path = output_root.with_name(output_root.name + "_old")
|
||||
logging.warning(f"Output directory {output_root} already exists. Moving to {backup_path}")
|
||||
if backup_path.exists():
|
||||
shutil.rmtree(backup_path)
|
||||
shutil.move(output_root, backup_path)
|
||||
shutil.copytree(input_root, output_root)
|
||||
dataset = LeRobotDataset(output_repo_id, root=output_root)
|
||||
|
||||
logging.info(f"Re-encoding videos in {output_repo_id} with {cfg.operation.camera_encoder}")
|
||||
reencode_dataset(
|
||||
dataset,
|
||||
camera_encoder=cfg.operation.camera_encoder,
|
||||
encoder_threads=cfg.operation.encoder_threads,
|
||||
num_workers=cfg.operation.num_workers,
|
||||
)
|
||||
|
||||
logging.info(f"All videos re-encoded at {dataset.root}")
|
||||
|
||||
if cfg.push_to_hub:
|
||||
logging.info(f"Pushing to hub as {output_repo_id}...")
|
||||
dataset.push_to_hub()
|
||||
|
||||
|
||||
def _get_dataset_size(repo_path):
|
||||
import os
|
||||
|
||||
@@ -707,6 +794,8 @@ def edit_dataset(cfg: EditDatasetConfig) -> None:
|
||||
handle_convert_image_to_video(cfg)
|
||||
elif operation_type == "recompute_stats":
|
||||
handle_recompute_stats(cfg)
|
||||
elif operation_type == "reencode_videos":
|
||||
handle_reencode_videos(cfg)
|
||||
elif operation_type == "info":
|
||||
handle_info(cfg)
|
||||
else:
|
||||
|
||||
@@ -23,6 +23,7 @@ import torch
|
||||
|
||||
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
|
||||
|
||||
|
||||
from lerobot.configs import VideoEncoderConfig
|
||||
from lerobot.datasets.dataset_tools import (
|
||||
add_features,
|
||||
@@ -31,9 +32,12 @@ from lerobot.datasets.dataset_tools import (
|
||||
merge_datasets,
|
||||
modify_features,
|
||||
modify_tasks,
|
||||
reencode_dataset,
|
||||
remove_feature,
|
||||
split_dataset,
|
||||
)
|
||||
from lerobot.datasets.io_utils import load_info
|
||||
from tests.datasets.test_video_encoding import _add_frames, require_h264, require_libsvtav1
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -1326,3 +1330,41 @@ def test_convert_image_to_video_dataset_subset_episodes(tmp_path):
|
||||
|
||||
if output_dir.exists():
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
|
||||
# ─── reencode_dataset ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
@require_libsvtav1
|
||||
@require_h264
|
||||
def test_reencode_dataset_multi_key_multiprocessing(
|
||||
tmp_path, empty_lerobot_dataset_factory, features_factory
|
||||
):
|
||||
"""Re-encode a two-camera dataset with num_workers=2 and verify metadata refresh."""
|
||||
features = features_factory(use_videos=True)
|
||||
initial_cfg = VideoEncoderConfig(vcodec="libsvtav1", g=2, crf=30, preset=12)
|
||||
dataset = empty_lerobot_dataset_factory(
|
||||
root=tmp_path / "ds",
|
||||
features=features,
|
||||
use_videos=True,
|
||||
camera_encoder=initial_cfg,
|
||||
)
|
||||
|
||||
_add_frames(dataset, num_frames=4)
|
||||
dataset.save_episode()
|
||||
_add_frames(dataset, num_frames=4)
|
||||
dataset.save_episode()
|
||||
dataset.finalize()
|
||||
|
||||
assert len(dataset.meta.video_keys) == 2
|
||||
|
||||
target_cfg = VideoEncoderConfig(vcodec="h264", g=6, crf=23, pix_fmt="yuv420p")
|
||||
|
||||
result = reencode_dataset(dataset, camera_encoder=target_cfg, num_workers=2)
|
||||
|
||||
assert result is dataset
|
||||
|
||||
persisted_info = load_info(dataset.root)
|
||||
for vk in dataset.meta.video_keys:
|
||||
persisted_encoder = VideoEncoderConfig.from_video_info(persisted_info.features[vk].get("info", {}))
|
||||
assert persisted_encoder == target_cfg
|
||||
|
||||
@@ -35,6 +35,7 @@ from lerobot.datasets.video_utils import (
|
||||
concatenate_video_files,
|
||||
encode_video_frames,
|
||||
get_video_info,
|
||||
reencode_video,
|
||||
)
|
||||
from tests.fixtures.constants import DUMMY_VIDEO_INFO
|
||||
|
||||
@@ -347,16 +348,22 @@ def _read_feature_info(dataset: LeRobotDataset) -> dict:
|
||||
return info["features"][VIDEO_KEY]["info"]
|
||||
|
||||
|
||||
def _add_frames(dataset: LeRobotDataset, num_frames: int) -> None:
|
||||
shape = dataset.meta.features[VIDEO_KEY]["shape"]
|
||||
def _add_frames(dataset: LeRobotDataset, num_frames: int, video_keys: list[str] | None = None) -> None:
|
||||
from lerobot.utils.constants import DEFAULT_FEATURES
|
||||
|
||||
if video_keys is None:
|
||||
video_keys = dataset.meta.video_keys
|
||||
for _ in range(num_frames):
|
||||
dataset.add_frame(
|
||||
{
|
||||
VIDEO_KEY: np.random.randint(0, 256, shape, dtype=np.uint8),
|
||||
"action": np.zeros(2, dtype=np.float32),
|
||||
"task": "test",
|
||||
}
|
||||
)
|
||||
frame: dict = {"task": "test"}
|
||||
for key, ft in dataset.meta.features.items():
|
||||
if key in DEFAULT_FEATURES:
|
||||
continue
|
||||
shape = ft["shape"]
|
||||
if key in video_keys:
|
||||
frame[key] = np.random.randint(0, 256, shape, dtype=np.uint8)
|
||||
else:
|
||||
frame[key] = np.zeros(shape, dtype=np.float32)
|
||||
dataset.add_frame(frame)
|
||||
|
||||
|
||||
class TestGetVideoInfo:
|
||||
@@ -474,6 +481,30 @@ class TestEncodeVideoFrames:
|
||||
assert info["video.extra_options"] == {}
|
||||
|
||||
|
||||
class TestReencodeVideo:
|
||||
@require_libsvtav1
|
||||
@require_h264
|
||||
def test_reencode_video(self, tmp_path):
|
||||
src = TEST_ARTIFACTS_DIR / "clip_4frames.mp4"
|
||||
out = tmp_path / "reencoded.mp4"
|
||||
cfg = VideoEncoderConfig(vcodec="h264", g=6, crf=23, pix_fmt="yuv444p")
|
||||
reencode_video(src, out, camera_encoder=cfg, overwrite=True)
|
||||
|
||||
assert out.exists()
|
||||
with av.open(str(out)) as container:
|
||||
n_frames = sum(1 for _ in container.decode(video=0))
|
||||
assert n_frames == 4
|
||||
|
||||
info = get_video_info(out, camera_encoder=cfg)
|
||||
assert info["video.codec"] == "h264"
|
||||
assert info["video.pix_fmt"] == "yuv444p"
|
||||
assert info["video.height"] == 64
|
||||
assert info["video.width"] == 96
|
||||
assert info["video.fps"] == 30
|
||||
assert info["video.g"] == 6
|
||||
assert info["video.crf"] == 23
|
||||
|
||||
|
||||
class TestConcatenateVideoFiles:
|
||||
def test_two_clips_frame_count(self, tmp_path):
|
||||
"""Output frame count equals the sum of the two input frame counts."""
|
||||
|
||||
Reference in New Issue
Block a user