feat(VideoEncoderConfig): creating a VideoEncoderConfig to encapsulate encoding parameters

This commit is contained in:
CarolinePascal
2026-04-22 21:15:30 +02:00
parent 9787b8fa26
commit 479e444517
2 changed files with 177 additions and 116 deletions
+15 -1
View File
@@ -40,10 +40,19 @@ from .io_utils import load_episodes, write_stats
from .lerobot_dataset import LeRobotDataset
from .multi_dataset import MultiLeRobotDataset
from .pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from .pyav_utils import (
check_config_against_bundled_ffmpeg,
detect_available_encoders,
get_codec,
)
from .sampler import EpisodeAwareSampler
from .streaming_dataset import StreamingLeRobotDataset
from .utils import DEFAULT_EPISODES_PATH, create_lerobot_dataset_card
from .video_utils import VideoEncodingManager
from .video_utils import (
VideoEncoderConfig,
VideoEncodingManager,
camera_encoder_defaults,
)
# NOTE: Low-level I/O functions (cast_stats_to_numpy, get_parquet_file_size_in_mb, etc.)
# and legacy migration constants are intentionally NOT re-exported here.
@@ -58,15 +67,20 @@ __all__ = [
"LeRobotDatasetMetadata",
"MultiLeRobotDataset",
"StreamingLeRobotDataset",
"VideoEncoderConfig",
"VideoEncodingManager",
"camera_encoder_defaults",
"add_features",
"aggregate_datasets",
"aggregate_pipeline_dataset_features",
"aggregate_stats",
"check_config_against_bundled_ffmpeg",
"convert_image_to_video_dataset",
"create_initial_features",
"create_lerobot_dataset_card",
"delete_episodes",
"detect_available_encoders",
"get_codec",
"get_feature_stats",
"load_episodes",
"make_dataset",
+162 -115
View File
@@ -37,6 +37,10 @@ import torchvision
from datasets.features.features import register_feature
from PIL import Image
from lerobot.datasets.pyav_utils import (
check_config_against_bundled_ffmpeg,
detect_available_encoders,
)
from lerobot.utils.import_utils import get_safe_default_video_backend
logger = logging.getLogger(__name__)
@@ -54,68 +58,150 @@ HW_ENCODERS = [
VALID_VIDEO_CODECS = {"h264", "hevc", "libsvtav1", "auto"} | set(HW_ENCODERS)
LIBSVTAV1_DEFAULT_PRESET: int = 12
@dataclass
class VideoEncoderConfig:
"""Video encoder configuration.
Attributes:
vcodec: FFmpeg encoder name. ``"auto"`` is resolved during
construction (HW encoder if available, else ``libsvtav1``).
pix_fmt: Pixel format (e.g. ``"yuv420p"``).
g: GOP size (keyframe interval).
crf: Quality level — mapped to the native quality parameter of the
codec (``crf`` for software, ``qp`` for NVENC/VAAPI,
``q:v`` for VideoToolbox, ``global_quality`` for QSV).
preset: Speed/quality preset. Accepted type is per-codec.
fast_decode: Fast-decode tuning. For libsvtav1 this is a level
(0-4); for h264/hevc any non-zero value enables ``tune=fastdecode``.
video_backend: Python library driving FFmpeg for encoding. Only ``"pyav"`` is currently supported.
extra_options: Free-form dictionary of additional FFmpeg options
(e.g. ``{"tune": "film", "profile:v": "high", "bf": 2}``).
"""
vcodec: str = "libsvtav1"
pix_fmt: str = "yuv420p"
g: int | None = 2
crf: int | None = 30
preset: int | str | None = None
fast_decode: int = 0
# TODO(CarolinePascal): add torchcodec support + find a way to unify the
# two backends (encoding and decoding).
video_backend: str = "pyav"
extra_options: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
self.vcodec = resolve_vcodec(self.vcodec)
# Empty-constructor ergonomics: ``VideoEncoderConfig()`` must "just work".
if self.preset is None and self.vcodec == "libsvtav1":
self.preset = LIBSVTAV1_DEFAULT_PRESET
check_config_against_bundled_ffmpeg(self)
def get_codec_options(self, encoder_threads: int | None = None) -> dict[str, str]:
"""Build codec-specific FFmpeg options from the tuning fields.
Args:
encoder_threads: Number of encoder threads set globally for all VideoEncoderConfigs. For libsvtav1, this is mapped to ``lp`` via ``svtav1-params``. For h264/hevc, this is mapped to ``threads``. Hardware encoders ignore this.
"""
return _get_codec_options(
self.vcodec,
g=self.g,
crf=self.crf,
preset=self.preset,
fast_decode=self.fast_decode,
encoder_threads=encoder_threads,
extra_options=self.extra_options,
)
def camera_encoder_defaults() -> VideoEncoderConfig:
"""Return a :class:`VideoEncoderConfig` with RGB-camera defaults."""
return VideoEncoderConfig()
def _get_codec_options(
vcodec: str,
g: int | None = 2,
crf: int | None = 30,
preset: int | None = None,
) -> dict:
"""Build codec-specific options dict for video encoding."""
options = {}
*,
g: int | None,
crf: int | None,
preset: int | str | None,
fast_decode: int,
encoder_threads: int | None,
extra_options: dict[str, Any] | None = None,
) -> dict[str, str]:
"""Translate tuning options to FFmpeg codec options.
# GOP size (keyframe interval) - supported by VideoToolbox and software encoders
if g is not None and (vcodec in ("h264_videotoolbox", "hevc_videotoolbox") or vcodec not in HW_ENCODERS):
options["g"] = str(g)
``extra_options`` are merged last and take precedence over the named fields.
"""
opts: dict[str, str] = {}
# Quality control (codec-specific parameter names)
if crf is not None:
if vcodec in ("h264", "hevc", "libsvtav1"):
options["crf"] = str(crf)
elif vcodec in ("h264_videotoolbox", "hevc_videotoolbox"):
quality = max(1, min(100, int(100 - crf * 2)))
options["q:v"] = str(quality)
elif vcodec in ("h264_nvenc", "hevc_nvenc"):
options["rc"] = "constqp"
options["qp"] = str(crf)
elif vcodec in ("h264_vaapi",):
options["qp"] = str(crf)
elif vcodec in ("h264_qsv",):
options["global_quality"] = str(crf)
def set_if(key: str, value: Any) -> None:
if value is not None:
opts[key] = str(value)
# Preset (only for libsvtav1)
if vcodec == "libsvtav1":
options["preset"] = str(preset) if preset is not None else "12"
set_if("g", g)
set_if("crf", crf)
set_if("preset", preset)
svtav1_parts: list[str] = []
if fast_decode:
svtav1_parts.append(f"fast-decode={fast_decode}")
if encoder_threads is not None:
svtav1_parts.append(f"lp={encoder_threads}")
if svtav1_parts:
opts["svtav1-params"] = ":".join(svtav1_parts)
elif vcodec in ("h264", "hevc"):
set_if("g", g)
set_if("crf", crf)
set_if("preset", preset)
if fast_decode:
opts["tune"] = "fastdecode"
set_if("threads", encoder_threads)
elif vcodec in ("h264_videotoolbox", "hevc_videotoolbox"):
set_if("g", g)
if crf is not None:
opts["q:v"] = str(max(1, min(100, 100 - crf * 2)))
elif vcodec in ("h264_nvenc", "hevc_nvenc"):
opts["rc"] = "constqp"
set_if("qp", crf)
set_if("preset", preset)
elif vcodec == "h264_vaapi":
set_if("qp", crf)
elif vcodec == "h264_qsv":
set_if("global_quality", crf)
set_if("preset", preset)
else:
set_if("g", g)
set_if("crf", crf)
set_if("preset", preset)
return options
if extra_options:
for k, v in extra_options.items():
opts.setdefault(k, str(v))
def detect_available_hw_encoders() -> list[str]:
"""Probe PyAV/FFmpeg for available hardware video encoders."""
available = []
for codec_name in HW_ENCODERS:
try:
av.codec.Codec(codec_name, "w")
available.append(codec_name)
except Exception: # nosec B110
logger.debug("HW encoder '%s' not available", codec_name) # nosec B110
return available
return opts
def resolve_vcodec(vcodec: str) -> str:
"""Validate vcodec and resolve 'auto' to best available HW encoder, fallback to libsvtav1."""
if vcodec not in VALID_VIDEO_CODECS:
raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}")
if vcodec != "auto":
if vcodec != "auto" and detect_available_encoders(vcodec) != []:
logger.info(f"Using video codec: {vcodec}")
return vcodec
available = detect_available_hw_encoders()
for encoder in HW_ENCODERS:
if encoder in available:
logger.info(f"Auto-selected video codec: {encoder}")
return encoder
logger.info("No hardware encoder available, falling back to software encoder 'libsvtav1'")
return "libsvtav1"
elif vcodec == "auto":
available = detect_available_encoders()
for encoder in HW_ENCODERS:
if encoder in available:
logger.info(f"Auto-selected video codec: {encoder}")
return encoder
else:
logger.info("No hardware encoder available, falling back to software encoder 'libsvtav1'")
return "libsvtav1"
def decode_video_frames(
@@ -400,18 +486,17 @@ def encode_video_frames(
imgs_dir: Path | str,
video_path: Path | str,
fps: int,
vcodec: str = "libsvtav1",
pix_fmt: str = "yuv420p",
g: int | None = 2,
crf: int | None = 30,
fast_decode: int = 0,
camera_encoder_config: VideoEncoderConfig | None = None,
encoder_threads: int | None = None,
*,
log_level: int | None = av.logging.WARNING,
overwrite: bool = False,
preset: int | None = None,
encoder_threads: int | None = None,
) -> None:
"""More info on ffmpeg arguments tuning on `benchmark/video/README.md`"""
vcodec = resolve_vcodec(vcodec)
if camera_encoder_config is None:
camera_encoder_config = VideoEncoderConfig()
vcodec = camera_encoder_config.vcodec
pix_fmt = camera_encoder_config.pix_fmt
video_path = Path(video_path)
imgs_dir = Path(imgs_dir)
@@ -422,42 +507,18 @@ def encode_video_frames(
video_path.parent.mkdir(parents=True, exist_ok=True)
# Encoders/pixel formats incompatibility check
if (vcodec == "libsvtav1" or vcodec == "hevc") and pix_fmt == "yuv444p":
logger.warning(
f"Incompatible pixel format 'yuv444p' for codec {vcodec}, auto-selecting format 'yuv420p'"
)
pix_fmt = "yuv420p"
# Get input frames
template = "frame-" + ("[0-9]" * 6) + ".png"
input_list = sorted(
glob.glob(str(imgs_dir / template)), key=lambda x: int(x.split("-")[-1].split(".")[0])
)
# Define video output frame size (assuming all input frames are the same size)
if len(input_list) == 0:
raise FileNotFoundError(f"No images found in {imgs_dir}.")
with Image.open(input_list[0]) as dummy_image:
width, height = dummy_image.size
# Define video codec options
video_options = _get_codec_options(vcodec, g, crf, preset)
if fast_decode:
key = "svtav1-params" if vcodec == "libsvtav1" else "tune"
value = f"fast-decode={fast_decode}" if vcodec == "libsvtav1" else "fastdecode"
video_options[key] = value
if encoder_threads is not None:
if vcodec == "libsvtav1":
lp_param = f"lp={encoder_threads}"
if "svtav1-params" in video_options:
video_options["svtav1-params"] += f":{lp_param}"
else:
video_options["svtav1-params"] = lp_param
else:
video_options["threads"] = str(encoder_threads)
video_options = camera_encoder_config.get_codec_options(encoder_threads)
# Set logging level
if log_level is not None:
@@ -591,26 +652,20 @@ class _CameraEncoderThread(threading.Thread):
fps: int,
vcodec: str,
pix_fmt: str,
g: int | None,
crf: int | None,
preset: int | None,
codec_options: dict[str, str],
frame_queue: queue.Queue,
result_queue: queue.Queue,
stop_event: threading.Event,
encoder_threads: int | None = None,
):
super().__init__(daemon=True)
self.video_path = video_path
self.fps = fps
self.vcodec = vcodec
self.pix_fmt = pix_fmt
self.g = g
self.crf = crf
self.preset = preset
self.codec_options = codec_options
self.frame_queue = frame_queue
self.result_queue = result_queue
self.stop_event = stop_event
self.encoder_threads = encoder_threads
def run(self) -> None:
from .compute_stats import RunningQuantileStats, auto_downsample_height_width
@@ -646,19 +701,9 @@ class _CameraEncoderThread(threading.Thread):
# Open container on first frame (to get width/height)
if container is None:
height, width = frame_data.shape[:2]
video_options = _get_codec_options(self.vcodec, self.g, self.crf, self.preset)
if self.encoder_threads is not None:
if self.vcodec == "libsvtav1":
lp_param = f"lp={self.encoder_threads}"
if "svtav1-params" in video_options:
video_options["svtav1-params"] += f":{lp_param}"
else:
video_options["svtav1-params"] = lp_param
else:
video_options["threads"] = str(self.encoder_threads)
Path(self.video_path).parent.mkdir(parents=True, exist_ok=True)
container = av.open(str(self.video_path), "w")
output_stream = container.add_stream(self.vcodec, self.fps, options=video_options)
output_stream = container.add_stream(self.vcodec, self.fps, options=self.codec_options)
output_stream.pix_fmt = self.pix_fmt
output_stream.width = width
output_stream.height = height
@@ -724,22 +769,25 @@ class StreamingVideoEncoder:
def __init__(
self,
fps: int,
vcodec: str = "libsvtav1",
pix_fmt: str = "yuv420p",
g: int | None = 2,
crf: int | None = 30,
preset: int | None = None,
queue_maxsize: int = 30,
camera_encoder_config: VideoEncoderConfig | None = None,
encoder_threads: int | None = None,
*,
queue_maxsize: int = 30,
):
"""
Args:
fps: Frames per second for the output videos.
camera_encoder_config: Video encoder settings applied to all cameras.
When ``None``, :class:`VideoEncoderConfig` defaults are used.
encoder_threads: Number of encoder threads (global setting).
``None`` lets the codec decide.
queue_maxsize: Max frames to buffer per camera before
back-pressure drops frames.
"""
self.fps = fps
self.vcodec = resolve_vcodec(vcodec)
self.pix_fmt = pix_fmt
self.g = g
self.crf = crf
self.preset = preset
self._camera_encoder_config = camera_encoder_config or VideoEncoderConfig()
self._encoder_threads = encoder_threads
self.queue_maxsize = queue_maxsize
self.encoder_threads = encoder_threads
self._frame_queues: dict[str, queue.Queue] = {}
self._result_queues: dict[str, queue.Queue] = {}
@@ -770,18 +818,17 @@ class StreamingVideoEncoder:
temp_video_dir = Path(tempfile.mkdtemp(dir=temp_dir))
video_path = temp_video_dir / f"{video_key.replace('/', '_')}_streaming.mp4"
vcodec = self._camera_encoder_config.vcodec
codec_options = self._camera_encoder_config.get_codec_options(self._encoder_threads)
encoder_thread = _CameraEncoderThread(
video_path=video_path,
fps=self.fps,
vcodec=self.vcodec,
pix_fmt=self.pix_fmt,
g=self.g,
crf=self.crf,
preset=self.preset,
vcodec=vcodec,
pix_fmt=self._camera_encoder_config.pix_fmt,
codec_options=codec_options,
frame_queue=frame_queue,
result_queue=result_queue,
stop_event=stop_event,
encoder_threads=self.encoder_threads,
)
encoder_thread.start()