test(depth): cleaning up depth tests

This commit is contained in:
CarolinePascal
2026-06-12 19:00:17 +02:00
parent 979f7b1187
commit 291d2e982c
+134 -197
View File
@@ -1,12 +1,14 @@
"""Tests for the depth-integration feature.
Covers quantization/dequantization round-trips (depth_utils), image writer
depth support (image_writer), hardware→dataset feature routing
(feature_utils), video info helpers (video_utils / configs.video), and
feature-to-file-format routing through the dataset writer.
Covers:
- ``depth_utils`` quantize/dequantize round-trips and backend agreement.
- Image-writer support for single-channel depth.
- Hardware-feature → depth flag routing.
- Feature-to-file-format routing through the dataset writer.
Depth metadata detection on ``LeRobotDatasetMetadata.depth_keys`` (canonical
and legacy marker variants) lives in ``test_dataset_metadata.py``.
Depth metadata detection on ``LeRobotDatasetMetadata.depth_keys`` lives in
``test_dataset_metadata.py``. Depth video encoding/decoding lives in
``test_video_encoding.py``.
"""
from pathlib import Path
@@ -18,169 +20,159 @@ import pytest
import torch
from lerobot.configs import DepthEncoderConfig
from lerobot.configs.video import DEPTH_QMAX, VALID_VIDEO_CODECS
from lerobot.configs.video import DEFAULT_DEPTH_MAX, DEFAULT_DEPTH_MIN, DEPTH_QMAX
from lerobot.datasets.depth_utils import dequantize_depth, quantize_depth
from lerobot.datasets.image_writer import (
image_array_to_pil_image,
save_kwargs_for_path,
write_image,
)
from lerobot.datasets.pyav_utils import get_pix_fmt_channels
from lerobot.datasets.image_writer import image_array_to_pil_image, write_image
from tests.fixtures.constants import (
DEFAULT_FPS,
DUMMY_CAMERA_FEATURES,
DUMMY_DEPTH_CAMERA_FEATURES,
DUMMY_MOTOR_FEATURES,
DUMMY_REPO_ID,
DUMMY_CAMERA_FEATURES_WITH_DEPTH,
DUMMY_CHW
)
from tests.fixtures.dataset_factories import add_frames
H, W = 48, 64
DEPTH_MIN = 0.01
DEPTH_MAX = 10.0
_, H, W = DUMMY_CHW
def _depth_metres_ramp() -> np.ndarray:
"""Linearly-spaced float32 depth in metres covering the default range."""
return np.linspace(DEFAULT_DEPTH_MIN, DEFAULT_DEPTH_MAX, H * W, dtype=np.float32).reshape(H, W)
# ── 1. Quantize / Dequantize round-trips ────────────────────────────
# ── 1. Quantize / dequantize round-trips ──────────────────────────────
class TestQuantizeDequantize:
"""Core numerical tests for depth_utils.quantize_depth / dequantize_depth."""
"""Numerical contract of ``quantize_depth`` / ``dequantize_depth``."""
def _make_depth_metres(self) -> np.ndarray:
"""Linearly-spaced float32 depth in metres covering the default range."""
return np.linspace(DEPTH_MIN, DEPTH_MAX, H * W, dtype=np.float32).reshape(H, W)
def test_roundtrip_linear_metres(self):
depth = self._make_depth_metres()
quantized = quantize_depth(depth, use_log=False, video_backend=None)
recovered = dequantize_depth(quantized, use_log=False, output_unit="m")
assert recovered.shape == (H, W, 1), f"Expected (H,W,1), got {recovered.shape}"
assert recovered.dtype == np.float32
tol = (DEPTH_MAX - DEPTH_MIN) / DEPTH_QMAX
np.testing.assert_allclose(recovered[..., 0], depth, atol=tol + 1e-6)
def test_roundtrip_log_metres(self):
depth = self._make_depth_metres()
quantized = quantize_depth(depth, use_log=True, video_backend=None)
recovered = dequantize_depth(quantized, use_log=True, output_unit="m")
assert recovered.shape == (H, W, 1)
near = depth < 1.0
far = depth > 8.0
err_near = np.abs(recovered[..., 0][near] - depth[near])
err_far = np.abs(recovered[..., 0][far] - depth[far])
assert err_near.mean() < err_far.mean(), "Log quant should be more precise at close range"
def test_roundtrip_mm_uint16_input(self):
depth_mm = np.linspace(10, 10000, H * W, dtype=np.float64).reshape(H, W).astype(np.uint16)
quantized = quantize_depth(depth_mm, use_log=False, video_backend=None, input_unit="mm")
recovered = dequantize_depth(quantized, use_log=False, output_unit="mm")
assert recovered.dtype == np.uint16
tol_mm = (DEPTH_MAX - DEPTH_MIN) * 1000.0 / DEPTH_QMAX
np.testing.assert_allclose(
recovered[..., 0].astype(np.float64), depth_mm.astype(np.float64), atol=tol_mm + 1.0
@pytest.mark.parametrize("use_log", [False, True])
@pytest.mark.parametrize("output_unit", ["m", "mm"])
@pytest.mark.parametrize("output_channel_last", [False, True])
def test_roundtrip(self, use_log, output_unit, output_channel_last):
"""quantize → dequantize recovers depth; layout and unit are honored."""
depth = _depth_metres_ramp()
quantized = quantize_depth(depth, use_log=use_log, video_backend=None)
recovered = dequantize_depth(
quantized,
use_log=use_log,
output_unit=output_unit,
output_tensor=False,
output_channel_last=output_channel_last,
)
def test_quantize_clamps_out_of_range(self):
depth = np.array([[0.001, 99.0]], dtype=np.float32)
quantized = quantize_depth(depth, use_log=False, video_backend=None)
assert quantized[0, 0] == 0
assert quantized[0, 1] == DEPTH_QMAX
expected_shape = (H, W, 1) if output_channel_last else (1, H, W)
assert recovered.shape == expected_shape
def test_quantize_accepts_torch_tensor(self):
t = torch.rand(H, W, dtype=torch.float32) * (DEPTH_MAX - DEPTH_MIN) + DEPTH_MIN
result = quantize_depth(t, video_backend=None)
assert isinstance(result, np.ndarray)
assert result.dtype == np.uint16
recovered_m = recovered.astype(np.float32)
if output_unit == "mm":
recovered_m = recovered_m / 1000.0
recovered_2d = recovered_m[..., 0] if output_channel_last else recovered_m[0]
def test_quantize_squeezes_channel_dim(self):
depth = self._make_depth_metres()
for shape in [(H, W, 1), (1, H, W)]:
reshaped = depth.reshape(shape)
quantized = quantize_depth(reshaped, video_backend=None)
assert quantized.ndim == 2, f"Input shape {shape} should be squeezed to 2D"
if use_log:
# Log mode: tighter near-range error than far-range (the whole point).
near = depth < 1.0
far = depth > 8.0
err_near = np.abs(recovered_2d[near] - depth[near])
err_far = np.abs(recovered_2d[far] - depth[far])
assert err_near.mean() < err_far.mean()
else:
# Linear mode: bounded by quant step + 1 mm of unit-conversion rounding.
tol = (DEFAULT_DEPTH_MAX - DEFAULT_DEPTH_MIN) / DEPTH_QMAX + 1e-3
np.testing.assert_allclose(recovered_2d, depth, atol=tol)
def test_quantize_returns_pyav_frame(self):
depth = self._make_depth_metres()
result = quantize_depth(depth, video_backend="pyav")
assert isinstance(result, av.VideoFrame)
@pytest.mark.parametrize("use_log", [False, True])
@pytest.mark.parametrize("output_unit", ["m", "mm"])
def test_numpy_torch_agree(self, use_log, output_unit):
"""Batched torch path produces the same values as the numpy path."""
T = 3
per_frame = np.linspace(0, DEPTH_QMAX, H * W, dtype=np.uint16).reshape(H, W)
batch_np = np.broadcast_to(per_frame[None, None, ...], (T, 1, H, W)).copy()
batch_t = torch.from_numpy(batch_np.astype(np.int32)) # torch.uint16 support is patchy.
def test_dequantize_output_tensor(self):
quantized = np.full((H, W), DEPTH_QMAX // 2, dtype=np.uint16)
result = dequantize_depth(quantized, output_unit="m", output_tensor=True)
assert isinstance(result, torch.Tensor)
assert result.shape == (H, W, 1)
ref = dequantize_depth(batch_np, use_log=use_log, output_unit=output_unit, output_tensor=False)
out = dequantize_depth(batch_t, use_log=use_log, output_unit=output_unit, output_tensor=True)
assert isinstance(out, torch.Tensor)
assert out.shape == (T, 1, H, W)
# ``m``: float32 noise (~10 µm in log mode, after ``exp``) — still 200× below the ~2 mm quant step.
# ``mm`` + tensor stays in float32 (no uint16 round-trip), so allow 1 mm slop.
atol = 1e-5 if output_unit == "m" else 1.0
np.testing.assert_allclose(
out.cpu().numpy().astype(np.float64), ref.astype(np.float64), atol=atol
)
@pytest.mark.parametrize(
"input_shape,output_shape",
[
((H, W), (1, H, W)),
((1, H, W), (1, H, W)),
((H, W, 1), (1, H, W)),
((3, 1, H, W), (3, 1, H, W)),
((3, H, W, 1), (3, 1, H, W)),
],
)
def test_input_layouts_accepted(self, input_shape, output_shape):
"""All documented input layouts decode to the channel-first default."""
quantized = np.full(input_shape, DEPTH_QMAX // 2, dtype=np.uint16)
out = dequantize_depth(quantized, output_unit="m", output_tensor=False)
assert out.shape == output_shape
def test_pyav_frame_roundtrip(self):
"""quantize → av.VideoFrame → dequantize works."""
depth = _depth_metres_ramp()
frame = quantize_depth(depth, use_log=False, video_backend="pyav")
assert isinstance(frame, av.VideoFrame)
recovered = dequantize_depth(frame, use_log=False, output_unit="m", output_tensor=False)
assert recovered.shape == (1, H, W)
tol = (DEFAULT_DEPTH_MAX - DEFAULT_DEPTH_MIN) / DEPTH_QMAX + 1e-3
np.testing.assert_allclose(recovered[0], depth, atol=tol)
def test_invalid_log_params_raises(self):
depth = np.ones((4, 4), dtype=np.float32)
with pytest.raises(ValueError, match="depth_min \\+ shift must be positive"):
quantize_depth(depth, depth_min=1.0, shift=-2.0, use_log=True, video_backend=None)
with pytest.raises(ValueError, match=r"depth_min \+ shift must be positive"):
quantize_depth(
_depth_metres_ramp(), depth_min=1.0, shift=-2.0, use_log=True, video_backend=None
)
# ── 2. Image writer depth support ───────────────────────────────────
# ── 2. Image writer depth support ─────────────────────────────────────
class TestImageWriterDepth:
"""image_array_to_pil_image and write_image for single-channel depth maps."""
"""``image_array_to_pil_image`` and ``write_image`` for depth maps."""
def test_pil_uint16_grayscale(self):
arr = np.arange(H * W, dtype=np.uint16).reshape(H, W)
@pytest.mark.parametrize("dtype,expected_mode", [(np.uint16, "I;16"), (np.float32, "F")])
@pytest.mark.parametrize("shape", [(H, W), (H, W, 1), (1, H, W)])
def test_pil_depth_modes_and_squeeze(self, dtype, expected_mode, shape):
"""Single-channel depth converts to PIL with the right mode and (W, H) size."""
arr = np.zeros(shape, dtype=dtype)
img = image_array_to_pil_image(arr)
assert isinstance(img, PIL.Image.Image)
assert img.mode == "I;16"
assert img.mode == expected_mode
assert img.size == (W, H)
def test_pil_float32_grayscale(self):
arr = np.random.rand(H, W).astype(np.float32)
img = image_array_to_pil_image(arr)
assert img.mode == "F"
def test_pil_squeeze_hwc1_and_1hw(self):
arr_uint16 = np.zeros((H, W), dtype=np.uint16)
for input_arr in [arr_uint16.reshape(H, W, 1), arr_uint16.reshape(1, H, W)]:
img = image_array_to_pil_image(input_arr)
assert img.size == (W, H)
def test_save_kwargs_png_vs_tiff(self):
png_kw = save_kwargs_for_path(Path("frame.png"), compress_level=5)
assert png_kw == {"compress_level": 5}
tiff_kw = save_kwargs_for_path(Path("frame.tiff"), compress_level=5)
assert tiff_kw == {"compression": "raw"}
assert save_kwargs_for_path(Path("frame.jpg"), compress_level=5) == {}
def test_write_image_tiff_roundtrip(self, tmp_path):
"""uint16 depth round-trips through .tiff."""
arr = np.arange(H * W, dtype=np.uint16).reshape(H, W)
fpath = tmp_path / "depth.tiff"
write_image(arr, fpath)
assert fpath.exists()
with PIL.Image.open(fpath) as loaded:
recovered = np.array(loaded)
np.testing.assert_array_equal(recovered, arr)
# ── 3. Feature routing ──────────────────────────────────────────────
# ── 3. Hardware-feature → depth flag ──────────────────────────────────
class TestHwToDatasetFeaturesDepth:
"""hw_to_dataset_features marks single-channel cameras as depth."""
"""``hw_to_dataset_features`` flags single-channel cameras as depth."""
def test_single_channel_cam_marked_depth(self):
@pytest.mark.parametrize("channels,is_depth", [(1, True), (3, False)])
def test_depth_marker_by_channels(self, channels, is_depth):
from lerobot.utils.feature_utils import hw_to_dataset_features
features = hw_to_dataset_features({"cam": (480, 640, 1)}, prefix="observation")
ft = features["observation.images.cam"]
assert ft["info"]["is_depth_map"] is True
def test_three_channel_cam_not_depth(self):
from lerobot.utils.feature_utils import hw_to_dataset_features
features = hw_to_dataset_features({"cam": (480, 640, 3)}, prefix="observation")
ft = features["observation.images.cam"]
assert ft["info"]["is_depth_map"] is False
features = hw_to_dataset_features({"cam": (480, 640, channels)}, prefix="observation")
assert features["observation.images.cam"]["info"]["is_depth_map"] is is_depth
def test_invalid_channel_count_raises(self):
from lerobot.utils.feature_utils import hw_to_dataset_features
@@ -189,65 +181,24 @@ class TestHwToDatasetFeaturesDepth:
hw_to_dataset_features({"cam": (480, 640, 2)}, prefix="observation")
# ── 4. Video info depth flag ────────────────────────────────────────
# ── 4. Feature-to-file-format routing ────────────────────────────────
class TestVideoInfoDepthFlag:
"""Misc depth-related constants and helpers in video_utils / configs."""
def test_get_pix_fmt_channels_gray(self):
assert get_pix_fmt_channels("gray12le") == 1
assert get_pix_fmt_channels("gray8") == 1
def test_ffv1_in_valid_codecs(self):
assert "ffv1" in VALID_VIDEO_CODECS
# ── 5. Feature-to-file-format routing ───────────────────────────────
def _build_mixed_features(dtype: str) -> dict:
"""Build a feature dict with one RGB camera and one depth camera.
Uses shapes from ``DUMMY_CAMERA_FEATURES`` and ``DUMMY_DEPTH_CAMERA_FEATURES``
defined in ``tests.fixtures.constants``.
"""
rgb_cam = next(iter(DUMMY_CAMERA_FEATURES.values()))
depth_cam = next(iter(DUMMY_DEPTH_CAMERA_FEATURES.values()))
return {
"observation.images.rgb": {"dtype": dtype, **rgb_cam},
"observation.images.depth": {"dtype": dtype, **depth_cam},
**{k: {"dtype": v["dtype"], **v} for k, v in DUMMY_MOTOR_FEATURES.items()},
}
def _make_mixed_frame(features: dict) -> dict:
"""Build a valid frame dict matching the given feature schema."""
frame: dict = {"task": "test task"}
for key, ft in features.items():
shape = ft["shape"]
if ft["dtype"] in ("image", "video"):
channels = shape[-1]
if channels == 1:
frame[key] = np.random.randint(0, 4095, shape, dtype=np.uint16)
else:
frame[key] = np.random.randint(0, 255, shape, dtype=np.uint8)
else:
frame[key] = np.random.randn(*shape).astype(ft["dtype"])
return frame
# Keys derived from DUMMY_CAMERA_FEATURES_WITH_DEPTH; pick one RGB and the depth camera.
RGB_KEY = next(iter(DUMMY_CAMERA_FEATURES))
DEPTH_KEY = next(iter(DUMMY_DEPTH_CAMERA_FEATURES))
class TestFeatureFileRouting:
"""Verify that depth vs RGB features are routed to the correct file format."""
"""Depth vs RGB features route to the correct file format."""
NUM_FRAMES = 5
def test_no_video_depth_tiff_rgb_png(self, tmp_path):
"""Without video encoding: depth -> .tiff, RGB -> .png."""
def test_image_mode_depth_tiff_rgb_png(self, tmp_path, features_factory):
"""Without video encoding: depth .tiff, RGB .png."""
from lerobot.datasets.lerobot_dataset import LeRobotDataset
features = _build_mixed_features(dtype="image")
features = features_factory(camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH, use_videos=False)
dataset = LeRobotDataset.create(
repo_id=DUMMY_REPO_ID,
fps=DEFAULT_FPS,
@@ -256,27 +207,20 @@ class TestFeatureFileRouting:
use_videos=False,
)
for _ in range(self.NUM_FRAMES):
dataset.add_frame(_make_mixed_frame(features))
add_frames(dataset, num_frames=self.NUM_FRAMES)
buf = dataset.writer.episode_buffer
depth_paths = [Path(p) for p in buf["observation.images.depth"]]
rgb_paths = [Path(p) for p in buf["observation.images.rgb"]]
assert all(p.suffix == ".tiff" for p in depth_paths), "Depth frames should be .tiff"
assert all(p.suffix == ".png" for p in rgb_paths), "RGB frames should be .png"
assert all(p.exists() for p in depth_paths), "Depth TIFF files should exist on disk"
assert all(p.exists() for p in rgb_paths), "RGB PNG files should exist on disk"
assert all(Path(p).suffix == ".tiff" for p in buf[DEPTH_KEY])
assert all(Path(p).suffix == ".png" for p in buf[RGB_KEY])
dataset.save_episode()
dataset.finalize()
def test_video_depth_uses_depth_encoder(self, tmp_path):
"""With streaming video encoding: depth keys use DepthEncoderConfig, RGB keys do not."""
def test_video_mode_depth_uses_depth_encoder(self, tmp_path, features_factory):
"""With streaming video encoding: depth DepthEncoderConfig, RGB does not."""
from lerobot.datasets.lerobot_dataset import LeRobotDataset
features = _build_mixed_features(dtype="video")
features = features_factory(camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH, use_videos=True)
dataset = LeRobotDataset.create(
repo_id=DUMMY_REPO_ID,
fps=DEFAULT_FPS,
@@ -286,19 +230,12 @@ class TestFeatureFileRouting:
streaming_encoding=True,
)
assert dataset.writer._streaming_encoder is not None
add_frames(dataset, num_frames=self.NUM_FRAMES)
encoder = dataset.writer._streaming_encoder
for _ in range(self.NUM_FRAMES):
dataset.add_frame(_make_mixed_frame(features))
rgb_thread = encoder._threads["observation.images.rgb"]
depth_thread = encoder._threads["observation.images.depth"]
assert not isinstance(rgb_thread.video_encoder, DepthEncoderConfig)
assert isinstance(depth_thread.video_encoder, DepthEncoderConfig)
assert depth_thread.is_depth is True
assert rgb_thread.is_depth is False
assert encoder is not None
assert isinstance(encoder._threads[DEPTH_KEY].video_encoder, DepthEncoderConfig)
assert not isinstance(encoder._threads[RGB_KEY].video_encoder, DepthEncoderConfig)
dataset.save_episode()
dataset.finalize()