diff --git a/src/lerobot/datasets/__init__.py b/src/lerobot/datasets/__init__.py index 6c42959a5..2515a1fd2 100644 --- a/src/lerobot/datasets/__init__.py +++ b/src/lerobot/datasets/__init__.py @@ -40,10 +40,19 @@ from .io_utils import load_episodes, write_stats from .lerobot_dataset import LeRobotDataset from .multi_dataset import MultiLeRobotDataset from .pipeline_features import aggregate_pipeline_dataset_features, create_initial_features +from .pyav_utils import ( + check_config_against_bundled_ffmpeg, + detect_available_encoders, + get_codec, +) from .sampler import EpisodeAwareSampler from .streaming_dataset import StreamingLeRobotDataset from .utils import DEFAULT_EPISODES_PATH, create_lerobot_dataset_card -from .video_utils import VideoEncodingManager +from .video_utils import ( + VideoEncoderConfig, + VideoEncodingManager, + camera_encoder_defaults, +) # NOTE: Low-level I/O functions (cast_stats_to_numpy, get_parquet_file_size_in_mb, etc.) # and legacy migration constants are intentionally NOT re-exported here. @@ -58,15 +67,20 @@ __all__ = [ "LeRobotDatasetMetadata", "MultiLeRobotDataset", "StreamingLeRobotDataset", + "VideoEncoderConfig", "VideoEncodingManager", + "camera_encoder_defaults", "add_features", "aggregate_datasets", "aggregate_pipeline_dataset_features", "aggregate_stats", + "check_config_against_bundled_ffmpeg", "convert_image_to_video_dataset", "create_initial_features", "create_lerobot_dataset_card", "delete_episodes", + "detect_available_encoders", + "get_codec", "get_feature_stats", "load_episodes", "make_dataset", diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 8303dbaea..8f5004df6 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -37,6 +37,10 @@ import torchvision from datasets.features.features import register_feature from PIL import Image +from lerobot.datasets.pyav_utils import ( + check_config_against_bundled_ffmpeg, + detect_available_encoders, +) from lerobot.utils.import_utils import get_safe_default_video_backend logger = logging.getLogger(__name__) @@ -54,68 +58,150 @@ HW_ENCODERS = [ VALID_VIDEO_CODECS = {"h264", "hevc", "libsvtav1", "auto"} | set(HW_ENCODERS) +LIBSVTAV1_DEFAULT_PRESET: int = 12 + + +@dataclass +class VideoEncoderConfig: + """Video encoder configuration. + + Attributes: + vcodec: FFmpeg encoder name. ``"auto"`` is resolved during + construction (HW encoder if available, else ``libsvtav1``). + pix_fmt: Pixel format (e.g. ``"yuv420p"``). + g: GOP size (keyframe interval). + crf: Quality level — mapped to the native quality parameter of the + codec (``crf`` for software, ``qp`` for NVENC/VAAPI, + ``q:v`` for VideoToolbox, ``global_quality`` for QSV). + preset: Speed/quality preset. Accepted type is per-codec. + fast_decode: Fast-decode tuning. For libsvtav1 this is a level + (0-4); for h264/hevc any non-zero value enables ``tune=fastdecode``. + video_backend: Python library driving FFmpeg for encoding. Only ``"pyav"`` is currently supported. + extra_options: Free-form dictionary of additional FFmpeg options + (e.g. ``{"tune": "film", "profile:v": "high", "bf": 2}``). + """ + + vcodec: str = "libsvtav1" + pix_fmt: str = "yuv420p" + g: int | None = 2 + crf: int | None = 30 + preset: int | str | None = None + fast_decode: int = 0 + # TODO(CarolinePascal): add torchcodec support + find a way to unify the + # two backends (encoding and decoding). + video_backend: str = "pyav" + extra_options: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + self.vcodec = resolve_vcodec(self.vcodec) + + # Empty-constructor ergonomics: ``VideoEncoderConfig()`` must "just work". + if self.preset is None and self.vcodec == "libsvtav1": + self.preset = LIBSVTAV1_DEFAULT_PRESET + + check_config_against_bundled_ffmpeg(self) + + def get_codec_options(self, encoder_threads: int | None = None) -> dict[str, str]: + """Build codec-specific FFmpeg options from the tuning fields. + + Args: + encoder_threads: Number of encoder threads set globally for all VideoEncoderConfigs. For libsvtav1, this is mapped to ``lp`` via ``svtav1-params``. For h264/hevc, this is mapped to ``threads``. Hardware encoders ignore this. + """ + return _get_codec_options( + self.vcodec, + g=self.g, + crf=self.crf, + preset=self.preset, + fast_decode=self.fast_decode, + encoder_threads=encoder_threads, + extra_options=self.extra_options, + ) + + +def camera_encoder_defaults() -> VideoEncoderConfig: + """Return a :class:`VideoEncoderConfig` with RGB-camera defaults.""" + return VideoEncoderConfig() + def _get_codec_options( vcodec: str, - g: int | None = 2, - crf: int | None = 30, - preset: int | None = None, -) -> dict: - """Build codec-specific options dict for video encoding.""" - options = {} + *, + g: int | None, + crf: int | None, + preset: int | str | None, + fast_decode: int, + encoder_threads: int | None, + extra_options: dict[str, Any] | None = None, +) -> dict[str, str]: + """Translate tuning options to FFmpeg codec options. - # GOP size (keyframe interval) - supported by VideoToolbox and software encoders - if g is not None and (vcodec in ("h264_videotoolbox", "hevc_videotoolbox") or vcodec not in HW_ENCODERS): - options["g"] = str(g) + ``extra_options`` are merged last and take precedence over the named fields. + """ + opts: dict[str, str] = {} - # Quality control (codec-specific parameter names) - if crf is not None: - if vcodec in ("h264", "hevc", "libsvtav1"): - options["crf"] = str(crf) - elif vcodec in ("h264_videotoolbox", "hevc_videotoolbox"): - quality = max(1, min(100, int(100 - crf * 2))) - options["q:v"] = str(quality) - elif vcodec in ("h264_nvenc", "hevc_nvenc"): - options["rc"] = "constqp" - options["qp"] = str(crf) - elif vcodec in ("h264_vaapi",): - options["qp"] = str(crf) - elif vcodec in ("h264_qsv",): - options["global_quality"] = str(crf) + def set_if(key: str, value: Any) -> None: + if value is not None: + opts[key] = str(value) - # Preset (only for libsvtav1) if vcodec == "libsvtav1": - options["preset"] = str(preset) if preset is not None else "12" + set_if("g", g) + set_if("crf", crf) + set_if("preset", preset) + svtav1_parts: list[str] = [] + if fast_decode: + svtav1_parts.append(f"fast-decode={fast_decode}") + if encoder_threads is not None: + svtav1_parts.append(f"lp={encoder_threads}") + if svtav1_parts: + opts["svtav1-params"] = ":".join(svtav1_parts) + elif vcodec in ("h264", "hevc"): + set_if("g", g) + set_if("crf", crf) + set_if("preset", preset) + if fast_decode: + opts["tune"] = "fastdecode" + set_if("threads", encoder_threads) + elif vcodec in ("h264_videotoolbox", "hevc_videotoolbox"): + set_if("g", g) + if crf is not None: + opts["q:v"] = str(max(1, min(100, 100 - crf * 2))) + elif vcodec in ("h264_nvenc", "hevc_nvenc"): + opts["rc"] = "constqp" + set_if("qp", crf) + set_if("preset", preset) + elif vcodec == "h264_vaapi": + set_if("qp", crf) + elif vcodec == "h264_qsv": + set_if("global_quality", crf) + set_if("preset", preset) + else: + set_if("g", g) + set_if("crf", crf) + set_if("preset", preset) - return options + if extra_options: + for k, v in extra_options.items(): + opts.setdefault(k, str(v)) - -def detect_available_hw_encoders() -> list[str]: - """Probe PyAV/FFmpeg for available hardware video encoders.""" - available = [] - for codec_name in HW_ENCODERS: - try: - av.codec.Codec(codec_name, "w") - available.append(codec_name) - except Exception: # nosec B110 - logger.debug("HW encoder '%s' not available", codec_name) # nosec B110 - return available + return opts def resolve_vcodec(vcodec: str) -> str: """Validate vcodec and resolve 'auto' to best available HW encoder, fallback to libsvtav1.""" if vcodec not in VALID_VIDEO_CODECS: raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}") - if vcodec != "auto": + if vcodec != "auto" and detect_available_encoders(vcodec) != []: logger.info(f"Using video codec: {vcodec}") return vcodec - available = detect_available_hw_encoders() - for encoder in HW_ENCODERS: - if encoder in available: - logger.info(f"Auto-selected video codec: {encoder}") - return encoder - logger.info("No hardware encoder available, falling back to software encoder 'libsvtav1'") - return "libsvtav1" + elif vcodec == "auto": + available = detect_available_encoders() + for encoder in HW_ENCODERS: + if encoder in available: + logger.info(f"Auto-selected video codec: {encoder}") + return encoder + else: + logger.info("No hardware encoder available, falling back to software encoder 'libsvtav1'") + return "libsvtav1" def decode_video_frames( @@ -400,18 +486,17 @@ def encode_video_frames( imgs_dir: Path | str, video_path: Path | str, fps: int, - vcodec: str = "libsvtav1", - pix_fmt: str = "yuv420p", - g: int | None = 2, - crf: int | None = 30, - fast_decode: int = 0, + camera_encoder_config: VideoEncoderConfig | None = None, + encoder_threads: int | None = None, + *, log_level: int | None = av.logging.WARNING, overwrite: bool = False, - preset: int | None = None, - encoder_threads: int | None = None, ) -> None: """More info on ffmpeg arguments tuning on `benchmark/video/README.md`""" - vcodec = resolve_vcodec(vcodec) + if camera_encoder_config is None: + camera_encoder_config = VideoEncoderConfig() + vcodec = camera_encoder_config.vcodec + pix_fmt = camera_encoder_config.pix_fmt video_path = Path(video_path) imgs_dir = Path(imgs_dir) @@ -422,42 +507,18 @@ def encode_video_frames( video_path.parent.mkdir(parents=True, exist_ok=True) - # Encoders/pixel formats incompatibility check - if (vcodec == "libsvtav1" or vcodec == "hevc") and pix_fmt == "yuv444p": - logger.warning( - f"Incompatible pixel format 'yuv444p' for codec {vcodec}, auto-selecting format 'yuv420p'" - ) - pix_fmt = "yuv420p" - # Get input frames template = "frame-" + ("[0-9]" * 6) + ".png" input_list = sorted( glob.glob(str(imgs_dir / template)), key=lambda x: int(x.split("-")[-1].split(".")[0]) ) - # Define video output frame size (assuming all input frames are the same size) if len(input_list) == 0: raise FileNotFoundError(f"No images found in {imgs_dir}.") with Image.open(input_list[0]) as dummy_image: width, height = dummy_image.size - # Define video codec options - video_options = _get_codec_options(vcodec, g, crf, preset) - - if fast_decode: - key = "svtav1-params" if vcodec == "libsvtav1" else "tune" - value = f"fast-decode={fast_decode}" if vcodec == "libsvtav1" else "fastdecode" - video_options[key] = value - - if encoder_threads is not None: - if vcodec == "libsvtav1": - lp_param = f"lp={encoder_threads}" - if "svtav1-params" in video_options: - video_options["svtav1-params"] += f":{lp_param}" - else: - video_options["svtav1-params"] = lp_param - else: - video_options["threads"] = str(encoder_threads) + video_options = camera_encoder_config.get_codec_options(encoder_threads) # Set logging level if log_level is not None: @@ -591,26 +652,20 @@ class _CameraEncoderThread(threading.Thread): fps: int, vcodec: str, pix_fmt: str, - g: int | None, - crf: int | None, - preset: int | None, + codec_options: dict[str, str], frame_queue: queue.Queue, result_queue: queue.Queue, stop_event: threading.Event, - encoder_threads: int | None = None, ): super().__init__(daemon=True) self.video_path = video_path self.fps = fps self.vcodec = vcodec self.pix_fmt = pix_fmt - self.g = g - self.crf = crf - self.preset = preset + self.codec_options = codec_options self.frame_queue = frame_queue self.result_queue = result_queue self.stop_event = stop_event - self.encoder_threads = encoder_threads def run(self) -> None: from .compute_stats import RunningQuantileStats, auto_downsample_height_width @@ -646,19 +701,9 @@ class _CameraEncoderThread(threading.Thread): # Open container on first frame (to get width/height) if container is None: height, width = frame_data.shape[:2] - video_options = _get_codec_options(self.vcodec, self.g, self.crf, self.preset) - if self.encoder_threads is not None: - if self.vcodec == "libsvtav1": - lp_param = f"lp={self.encoder_threads}" - if "svtav1-params" in video_options: - video_options["svtav1-params"] += f":{lp_param}" - else: - video_options["svtav1-params"] = lp_param - else: - video_options["threads"] = str(self.encoder_threads) Path(self.video_path).parent.mkdir(parents=True, exist_ok=True) container = av.open(str(self.video_path), "w") - output_stream = container.add_stream(self.vcodec, self.fps, options=video_options) + output_stream = container.add_stream(self.vcodec, self.fps, options=self.codec_options) output_stream.pix_fmt = self.pix_fmt output_stream.width = width output_stream.height = height @@ -724,22 +769,25 @@ class StreamingVideoEncoder: def __init__( self, fps: int, - vcodec: str = "libsvtav1", - pix_fmt: str = "yuv420p", - g: int | None = 2, - crf: int | None = 30, - preset: int | None = None, - queue_maxsize: int = 30, + camera_encoder_config: VideoEncoderConfig | None = None, encoder_threads: int | None = None, + *, + queue_maxsize: int = 30, ): + """ + Args: + fps: Frames per second for the output videos. + camera_encoder_config: Video encoder settings applied to all cameras. + When ``None``, :class:`VideoEncoderConfig` defaults are used. + encoder_threads: Number of encoder threads (global setting). + ``None`` lets the codec decide. + queue_maxsize: Max frames to buffer per camera before + back-pressure drops frames. + """ self.fps = fps - self.vcodec = resolve_vcodec(vcodec) - self.pix_fmt = pix_fmt - self.g = g - self.crf = crf - self.preset = preset + self._camera_encoder_config = camera_encoder_config or VideoEncoderConfig() + self._encoder_threads = encoder_threads self.queue_maxsize = queue_maxsize - self.encoder_threads = encoder_threads self._frame_queues: dict[str, queue.Queue] = {} self._result_queues: dict[str, queue.Queue] = {} @@ -770,18 +818,17 @@ class StreamingVideoEncoder: temp_video_dir = Path(tempfile.mkdtemp(dir=temp_dir)) video_path = temp_video_dir / f"{video_key.replace('/', '_')}_streaming.mp4" + vcodec = self._camera_encoder_config.vcodec + codec_options = self._camera_encoder_config.get_codec_options(self._encoder_threads) encoder_thread = _CameraEncoderThread( video_path=video_path, fps=self.fps, - vcodec=self.vcodec, - pix_fmt=self.pix_fmt, - g=self.g, - crf=self.crf, - preset=self.preset, + vcodec=vcodec, + pix_fmt=self._camera_encoder_config.pix_fmt, + codec_options=codec_options, frame_queue=frame_queue, result_queue=result_queue, stop_event=stop_event, - encoder_threads=self.encoder_threads, ) encoder_thread.start()