test(depth encoding): updating and cleaning video/depth encoding tests

This commit is contained in:
CarolinePascal
2026-06-12 19:17:59 +02:00
parent 291d2e982c
commit 6b2d5cec4c
3 changed files with 325 additions and 72 deletions
+288 -64
View File
@@ -37,7 +37,15 @@ from lerobot.datasets.video_utils import (
get_video_info,
reencode_video,
)
from tests.fixtures.constants import DUMMY_VIDEO_INFO
from tests.fixtures.constants import (
DUMMY_DEPTH_FEATURES,
DUMMY_DEPTH_KEY,
DUMMY_DEPTH_VIDEO_INFO_FULL,
DUMMY_VIDEO_FEATURES,
DUMMY_VIDEO_INFO,
DUMMY_VIDEO_KEY,
)
from tests.fixtures.dataset_factories import add_frames
# Per-codec skip markers — validation tests only fire when the codec is available
@@ -48,12 +56,67 @@ def _require_encoder(vcodec: str) -> pytest.MarkDecorator:
require_libsvtav1 = _require_encoder("libsvtav1")
require_h264 = _require_encoder("h264")
require_hevc = _require_encoder("hevc")
require_videotoolbox = _require_encoder("h264_videotoolbox")
require_nvenc = _require_encoder("h264_nvenc")
require_vaapi = _require_encoder("h264_vaapi")
require_qsv = _require_encoder("h264_qsv")
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "encoded_videos"
def _write_RGB_frames(imgs_dir: Path, num_frames: int = 4, height: int = 64, width: int = 96) -> None:
imgs_dir.mkdir(parents=True, exist_ok=True)
for i in range(num_frames):
arr = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
write_image(arr, imgs_dir / f"frame-{i:06d}.png")
def _write_depth_frames(imgs_dir: Path, num_frames: int = 4, height: int = 64, width: int = 96) -> None:
"""Write synthetic uint16 depth TIFFs (millimetres) for depth encoder tests.
Uses a smooth linear ramp + per-frame offset (not white noise) so HEVC Main 12
on ``gray12le`` compresses well. Values span ~100 mm to 10 m, covering most
of the default ``[DEPTH_MIN, DEPTH_MAX]`` metres range after
``quantize_depth(input_unit="auto"="mm")``.
"""
imgs_dir.mkdir(parents=True, exist_ok=True)
base = np.linspace(100.0, 10_000.0, height * width, dtype=np.float32).reshape(height, width)
for i in range(num_frames):
arr = (base + 50.0 * i).clip(0, 65535).astype(np.uint16)
write_image(arr, imgs_dir / f"frame-{i:06d}.tiff")
def _encode_video(
path: Path,
num_frames: int = 4,
fps: int = 30,
cfg: VideoEncoderConfig | None = None,
depth: bool = False,
) -> Path:
"""Write synthetic frames to a temp dir and encode them to ``path``.
``depth=False`` writes uint8 RGB PNG noise and encodes with ``cfg``
(defaulting to the library default). ``depth=True`` writes synthetic uint16
depth TIFFs and encodes with ``cfg`` or a default :class:`DepthEncoderConfig`
(HEVC Main 12 / ``gray12le``).
"""
imgs_dir = path.parent / f"imgs_{path.stem}"
if depth:
_write_depth_frames(imgs_dir, num_frames=num_frames)
cfg = cfg or DepthEncoderConfig()
else:
_write_RGB_frames(imgs_dir, num_frames=num_frames)
encode_video_frames(imgs_dir, path, fps=fps, video_encoder=cfg, overwrite=True)
return path
def _read_feature_info(dataset: LeRobotDataset, key: str = DUMMY_VIDEO_KEY) -> dict:
info = json.loads((dataset.root / INFO_PATH).read_text())
return info["features"][key]["info"]
# ─── VideoEncoderConfig / codec options ──────────────────────────────
@@ -87,7 +150,7 @@ class TestCodecOptions:
assert opts["q:v"] == 40
assert "crf" not in opts
@_require_encoder("h264_nvenc")
@require_nvenc
def test_nvenc_options(self):
cfg = VideoEncoderConfig(vcodec="h264_nvenc", g=2, crf=25, preset=None)
opts = cfg.get_codec_options()
@@ -96,12 +159,12 @@ class TestCodecOptions:
assert "crf" not in opts
assert opts["g"] == 2
@_require_encoder("h264_vaapi")
@require_vaapi
def test_vaapi_options(self):
cfg = VideoEncoderConfig(vcodec="h264_vaapi", crf=28, preset=None)
assert cfg.get_codec_options()["qp"] == 28
@_require_encoder("h264_qsv")
@require_qsv
def test_qsv_options(self):
cfg = VideoEncoderConfig(vcodec="h264_qsv", crf=25, preset=None)
assert cfg.get_codec_options()["global_quality"] == 25
@@ -313,59 +376,6 @@ class TestEncoderDetection:
assert "h264_nvenc" in VALID_VIDEO_CODECS
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "encoded_videos"
# Default video feature set used by persistence tests.
VIDEO_FEATURES = {
"observation.images.cam": {
"dtype": "video",
"shape": (64, 96, 3),
"names": ["height", "width", "channels"],
},
"action": {"dtype": "float32", "shape": (2,), "names": ["a", "b"]},
}
VIDEO_KEY = "observation.images.cam"
def _write_frames(imgs_dir: Path, num_frames: int = 4, height: int = 64, width: int = 96) -> None:
imgs_dir.mkdir(parents=True, exist_ok=True)
for i in range(num_frames):
arr = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
write_image(arr, imgs_dir / f"frame-{i:06d}.png")
def _encode_video(
path: Path, num_frames: int = 4, fps: int = 30, cfg: VideoEncoderConfig | None = None
) -> Path:
imgs_dir = path.parent / f"imgs_{path.stem}"
_write_frames(imgs_dir, num_frames=num_frames)
encode_video_frames(imgs_dir, path, fps=fps, video_encoder=cfg, overwrite=True)
return path
def _read_feature_info(dataset: LeRobotDataset) -> dict:
info = json.loads((dataset.root / INFO_PATH).read_text())
return info["features"][VIDEO_KEY]["info"]
def _add_frames(dataset: LeRobotDataset, num_frames: int, video_keys: list[str] | None = None) -> None:
from lerobot.utils.constants import DEFAULT_FEATURES
if video_keys is None:
video_keys = dataset.meta.video_keys
for _ in range(num_frames):
frame: dict = {"task": "test"}
for key, ft in dataset.meta.features.items():
if key in DEFAULT_FEATURES:
continue
shape = ft["shape"]
if key in video_keys:
frame[key] = np.random.randint(0, 256, shape, dtype=np.uint8)
else:
frame[key] = np.zeros(shape, dtype=np.float32)
dataset.add_frame(frame)
class TestGetVideoInfo:
def test_returns_all_stream_fields(self):
info = get_video_info(TEST_ARTIFACTS_DIR / "clip_4frames.mp4")
@@ -439,7 +449,7 @@ class TestEncodeVideoFrames:
def test_overwrite_false_skips_existing_file(self, tmp_path):
imgs_dir = tmp_path / "imgs"
_write_frames(imgs_dir)
_write_RGB_frames(imgs_dir)
video_path = tmp_path / "out.mp4"
sentinel = b"pre-existing content"
video_path.write_bytes(sentinel)
@@ -451,7 +461,7 @@ class TestEncodeVideoFrames:
@require_libsvtav1
def test_overwrite_true_replaces_existing_file(self, tmp_path):
imgs_dir = tmp_path / "imgs"
_write_frames(imgs_dir)
_write_RGB_frames(imgs_dir)
video_path = tmp_path / "out.mp4"
video_path.write_bytes(b"stale content")
@@ -572,10 +582,10 @@ class TestEncoderConfigPersistence:
def test_first_episode_save_persists_encoder_config(self, tmp_path, empty_lerobot_dataset_factory):
cfg = VideoEncoderConfig(vcodec="libsvtav1", g=2, crf=30, preset=12)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds", features=VIDEO_FEATURES, use_videos=True, camera_encoder=cfg
root=tmp_path / "ds", features=DUMMY_VIDEO_FEATURES, use_videos=True, camera_encoder=cfg
)
_add_frames(dataset, num_frames=4)
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
@@ -595,14 +605,14 @@ class TestEncoderConfigPersistence:
def test_second_episode_does_not_overwrite_encoder_fields(self, tmp_path, empty_lerobot_dataset_factory):
cfg = VideoEncoderConfig(vcodec="libsvtav1", g=2, crf=30, preset=12)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds", features=VIDEO_FEATURES, use_videos=True, camera_encoder=cfg
root=tmp_path / "ds", features=DUMMY_VIDEO_FEATURES, use_videos=True, camera_encoder=cfg
)
_add_frames(dataset, num_frames=4)
add_frames(dataset, num_frames=4)
dataset.save_episode()
first_info = dict(_read_feature_info(dataset))
_add_frames(dataset, num_frames=4)
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
@@ -629,3 +639,217 @@ class TestFromVideoInfo:
# ``{}`` placeholder (typical after a merge with disagreeing sources)
# must not leak into the reconstructed config.
assert cfg.extra_options == VideoEncoderConfig().extra_options
# ─── Depth-specific encoding tests ────────────────────────────────────
class TestEncodeDepthVideoFrames:
"""Depth mirror of :class:`TestEncodeVideoFrames`.
Exercises ``encode_video_frames`` end-to-end through
:class:`DepthEncoderConfig` (HEVC Main 12 / ``gray12le``) on synthetic
uint16 depth TIFFs.
"""
@require_hevc
def test_produces_readable_file(self, tmp_path):
video_path = _encode_video(tmp_path / "out.mp4", depth=True)
assert video_path.exists()
info = get_video_info(video_path, video_encoder=DepthEncoderConfig())
assert info["video.height"] == 64
assert info["video.width"] == 96
assert info["video.codec"] == "hevc"
assert info["video.pix_fmt"] == "gray12le"
assert info["video.channels"] == 1
assert info["is_depth_map"] is True
@require_hevc
def test_frame_count_and_duration_match_input(self, tmp_path):
num_frames = 10
fps = 30
video_path = _encode_video(tmp_path / "out.mp4", num_frames=num_frames, fps=fps, depth=True)
with av.open(str(video_path)) as container:
stream = container.streams.video[0]
actual_frames = sum(1 for _ in container.decode(stream))
duration = (
float(stream.duration * stream.time_base)
if stream.duration is not None
else float(container.duration / av.time_base)
)
assert actual_frames == num_frames
assert abs(duration - num_frames / fps) < 0.1
def test_overwrite_false_skips_existing_file(self, tmp_path):
"""Codec-agnostic: file-system semantics must hold even without an HEVC encoder."""
imgs_dir = tmp_path / "imgs"
_write_depth_frames(imgs_dir)
video_path = tmp_path / "out.mp4"
sentinel = b"pre-existing depth content"
video_path.write_bytes(sentinel)
encode_video_frames(imgs_dir, video_path, fps=30, video_encoder=DepthEncoderConfig(), overwrite=False)
assert video_path.read_bytes() == sentinel
@require_hevc
def test_overwrite_true_replaces_existing_file(self, tmp_path):
imgs_dir = tmp_path / "imgs"
_write_depth_frames(imgs_dir)
video_path = tmp_path / "out.mp4"
video_path.write_bytes(b"stale content")
encode_video_frames(imgs_dir, video_path, fps=30, video_encoder=DepthEncoderConfig(), overwrite=True)
info = get_video_info(video_path, video_encoder=DepthEncoderConfig())
assert info["video.height"] == 64
assert info["video.pix_fmt"] == "gray12le"
assert info["is_depth_map"] is True
@require_hevc
def test_custom_encoder_config_fields_stored_in_info(self, tmp_path):
"""All stream-derived and depth-encoder config fields are present after encoding."""
cfg = DepthEncoderConfig(
vcodec="hevc",
pix_fmt="gray12le",
g=4,
crf=25,
depth_min=0.05,
depth_max=8.0,
shift=2.5,
use_log=False,
)
video_path = _encode_video(tmp_path / "out.mp4", num_frames=4, fps=30, cfg=cfg, depth=True)
info = get_video_info(video_path, video_encoder=cfg)
# Stream-derived
assert info["video.height"] == 64
assert info["video.width"] == 96
assert info["video.channels"] == 1
assert info["video.codec"] == "hevc"
assert info["video.pix_fmt"] == "gray12le"
assert info["video.fps"] == 30
assert info["is_depth_map"] is True
assert info["has_audio"] is False
# Base encoder config
assert info["video.g"] == 4
assert info["video.crf"] == 25
assert info["video.fast_decode"] == 0
assert info["video.video_backend"] == "pyav"
assert info["video.extra_options"] == {}
# Depth-specific tuning
assert info["video.depth_min"] == 0.05
assert info["video.depth_max"] == 8.0
assert info["video.shift"] == 2.5
assert info["video.use_log"] is False
class TestDepthEncoderConfigPersistence:
"""Depth mirror of :class:`TestEncoderConfigPersistence`.
``DepthEncoderConfig`` must be stored as ``video.<field>`` entries
(including the depth-specific ``depth_min`` / ``depth_max`` / ``shift`` /
``use_log``) under ``info["features"][<depth_key>]["info"]`` when the
first episode is saved.
"""
@require_hevc
def test_first_episode_save_persists_depth_encoder_config(self, tmp_path, empty_lerobot_dataset_factory):
cfg = DepthEncoderConfig(
vcodec="hevc",
pix_fmt="gray12le",
g=2,
crf=30,
depth_min=0.05,
depth_max=8.0,
shift=2.5,
use_log=False,
)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds", features=DUMMY_DEPTH_FEATURES, use_videos=True, depth_encoder=cfg
)
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
info = _read_feature_info(dataset, key=DUMMY_DEPTH_KEY)
# Stream-derived
assert info["video.height"] == 64
assert info["video.width"] == 96
assert info["video.fps"] == 30
assert info["video.codec"] == "hevc"
assert info["video.pix_fmt"] == "gray12le"
assert info["is_depth_map"] is True
# Base encoder config
assert info["video.g"] == 2
assert info["video.crf"] == 30
assert info["video.fast_decode"] == 0
assert info["video.video_backend"] == "pyav"
assert info["video.extra_options"] == {}
# Depth-specific tuning
assert info["video.depth_min"] == 0.05
assert info["video.depth_max"] == 8.0
assert info["video.shift"] == 2.5
assert info["video.use_log"] is False
@require_hevc
def test_second_episode_does_not_overwrite_depth_encoder_fields(
self, tmp_path, empty_lerobot_dataset_factory
):
cfg = DepthEncoderConfig(
vcodec="hevc",
pix_fmt="gray12le",
g=2,
crf=30,
depth_min=0.05,
depth_max=8.0,
shift=2.5,
use_log=False,
)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds", features=DUMMY_DEPTH_FEATURES, use_videos=True, depth_encoder=cfg
)
add_frames(dataset, num_frames=4)
dataset.save_episode()
first_info = dict(_read_feature_info(dataset, key=DUMMY_DEPTH_KEY))
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
assert _read_feature_info(dataset, key=DUMMY_DEPTH_KEY) == first_info
class TestDepthFromVideoInfo:
"""``DepthEncoderConfig.from_video_info`` reconstructs a depth encoder
config from the ``video.*`` keys persisted in a dataset's ``info.json``.
Depth mirror of :class:`TestFromVideoInfo`.
"""
@require_hevc
def test_reconstructs_from_dummy_depth_video_info(self):
cfg = DepthEncoderConfig.from_video_info(DUMMY_DEPTH_VIDEO_INFO_FULL)
# No alias for ``"hevc"``; the canonical stream codec is reused as-is.
assert cfg.vcodec == "hevc"
assert cfg.pix_fmt == DUMMY_DEPTH_VIDEO_INFO_FULL["video.pix_fmt"]
assert cfg.g == DUMMY_DEPTH_VIDEO_INFO_FULL["video.g"]
assert cfg.crf == DUMMY_DEPTH_VIDEO_INFO_FULL["video.crf"]
assert cfg.fast_decode == DUMMY_DEPTH_VIDEO_INFO_FULL["video.fast_decode"]
assert cfg.video_backend == DUMMY_DEPTH_VIDEO_INFO_FULL["video.video_backend"]
# ``{}`` placeholder (typical after a merge with disagreeing sources)
# must not leak into the reconstructed config.
assert cfg.extra_options == DepthEncoderConfig().extra_options
# Depth-specific tuning round-trips through ``info.json``.
assert cfg.depth_min == DUMMY_DEPTH_VIDEO_INFO_FULL["video.depth_min"]
assert cfg.depth_max == DUMMY_DEPTH_VIDEO_INFO_FULL["video.depth_max"]
assert cfg.shift == DUMMY_DEPTH_VIDEO_INFO_FULL["video.shift"]
assert cfg.use_log == DUMMY_DEPTH_VIDEO_INFO_FULL["video.use_log"]
+32
View File
@@ -50,6 +50,16 @@ DUMMY_DEPTH_VIDEO_INFO = {
**DUMMY_VIDEO_INFO,
"is_depth_map": True,
}
DUMMY_DEPTH_VIDEO_INFO_FULL = {
**{k: v for k, v in DUMMY_VIDEO_INFO.items() if k != "video.preset"},
"video.codec": "hevc",
"video.pix_fmt": "gray12le",
"is_depth_map": True,
"video.depth_min": 0.05,
"video.depth_max": 8.0,
"video.shift": 2.5,
"video.use_log": True,
}
DUMMY_DEPTH_CAMERA_FEATURES = {
"laptop_depth": {
"shape": (64, 96, 1),
@@ -60,3 +70,25 @@ DUMMY_DEPTH_CAMERA_FEATURES = {
DUMMY_CAMERA_FEATURES_WITH_DEPTH = {**DUMMY_CAMERA_FEATURES, **DUMMY_DEPTH_CAMERA_FEATURES}
DUMMY_CHW = (3, 96, 128)
DUMMY_HWC = (96, 128, 3)
# Default video feature set used by video-encoding persistence tests.
DUMMY_VIDEO_FEATURES = {
"observation.images.cam": {
"dtype": "video",
"shape": (64, 96, 3),
"names": ["height", "width", "channels"],
},
"action": {"dtype": "float32", "shape": (2,), "names": ["a", "b"]},
}
DUMMY_VIDEO_KEY = "observation.images.cam"
DUMMY_DEPTH_FEATURES = {
"observation.images.depth": {
"dtype": "video",
"shape": (64, 96, 1),
"names": ["height", "width", "channels"],
"info": {"is_depth_map": True},
},
"action": {"dtype": "float32", "shape": (2,), "names": ["a", "b"]},
}
DUMMY_DEPTH_KEY = "observation.images.depth"
+5 -8
View File
@@ -38,6 +38,8 @@ from lerobot.datasets.utils import (
DEFAULT_VIDEO_PATH,
DatasetInfo,
)
from lerobot.datasets.video_utils import encode_video_frames
from lerobot.utils.constants import DEFAULT_FEATURES
from tests.fixtures.constants import (
DEFAULT_FPS,
DUMMY_CAMERA_FEATURES,
@@ -45,13 +47,9 @@ from tests.fixtures.constants import (
DUMMY_REPO_ID,
DUMMY_ROBOT_TYPE,
)
from lerobot.datasets.video_utils import encode_video_frames
from lerobot.utils.constants import DEFAULT_FEATURES
def add_frames(
dataset: LeRobotDataset, num_frames: int
) -> None:
def add_frames(dataset: LeRobotDataset, num_frames: int) -> None:
"""Append ``num_frames`` synthetic frames to ``dataset``.
Generates per-feature payloads from ``dataset.meta``: uint16 depth ramps for
@@ -59,9 +57,8 @@ def add_frames(
and float32 zeros for everything else. ``DEFAULT_FEATURES`` (timestamp,
frame_index, ...) are auto-populated by ``add_frame`` and skipped here.
"""
if video_keys is None:
video_keys = dataset.meta.video_keys
depth_keys = set(dataset.meta.depth_keys)
video_keys = dataset.meta.video_keys
depth_keys = dataset.meta.depth_keys
# Smooth gradient base reused per (H, W) to keep depth frames cheap to
# encode (HEVC Main 12 hates white noise).
_depth_base_cache: dict[tuple[int, int], np.ndarray] = {}