From 06e944214933dad587460d2d73fb90ddf721511c Mon Sep 17 00:00:00 2001 From: CarolinePascal Date: Fri, 12 Jun 2026 19:00:17 +0200 Subject: [PATCH] test(depth): cleaning up depth tests --- tests/datasets/test_depth.py | 331 ++++++++++++++--------------------- 1 file changed, 134 insertions(+), 197 deletions(-) diff --git a/tests/datasets/test_depth.py b/tests/datasets/test_depth.py index 9a6c1f942..de2839f8d 100644 --- a/tests/datasets/test_depth.py +++ b/tests/datasets/test_depth.py @@ -1,12 +1,14 @@ """Tests for the depth-integration feature. -Covers quantization/dequantization round-trips (depth_utils), image writer -depth support (image_writer), hardware→dataset feature routing -(feature_utils), video info helpers (video_utils / configs.video), and -feature-to-file-format routing through the dataset writer. +Covers: +- ``depth_utils`` quantize/dequantize round-trips and backend agreement. +- Image-writer support for single-channel depth. +- Hardware-feature → depth flag routing. +- Feature-to-file-format routing through the dataset writer. -Depth metadata detection on ``LeRobotDatasetMetadata.depth_keys`` (canonical -and legacy marker variants) lives in ``test_dataset_metadata.py``. +Depth metadata detection on ``LeRobotDatasetMetadata.depth_keys`` lives in +``test_dataset_metadata.py``. Depth video encoding/decoding lives in +``test_video_encoding.py``. """ from pathlib import Path @@ -18,169 +20,159 @@ import pytest import torch from lerobot.configs import DepthEncoderConfig -from lerobot.configs.video import DEPTH_QMAX, VALID_VIDEO_CODECS +from lerobot.configs.video import DEFAULT_DEPTH_MAX, DEFAULT_DEPTH_MIN, DEPTH_QMAX from lerobot.datasets.depth_utils import dequantize_depth, quantize_depth -from lerobot.datasets.image_writer import ( - image_array_to_pil_image, - save_kwargs_for_path, - write_image, -) -from lerobot.datasets.pyav_utils import get_pix_fmt_channels +from lerobot.datasets.image_writer import image_array_to_pil_image, write_image from tests.fixtures.constants import ( DEFAULT_FPS, DUMMY_CAMERA_FEATURES, DUMMY_DEPTH_CAMERA_FEATURES, - DUMMY_MOTOR_FEATURES, DUMMY_REPO_ID, + DUMMY_CAMERA_FEATURES_WITH_DEPTH, + DUMMY_CHW ) +from tests.fixtures.dataset_factories import add_frames -H, W = 48, 64 -DEPTH_MIN = 0.01 -DEPTH_MAX = 10.0 +_, H, W = DUMMY_CHW + +def _depth_metres_ramp() -> np.ndarray: + """Linearly-spaced float32 depth in metres covering the default range.""" + return np.linspace(DEFAULT_DEPTH_MIN, DEFAULT_DEPTH_MAX, H * W, dtype=np.float32).reshape(H, W) -# ── 1. Quantize / Dequantize round-trips ──────────────────────────── +# ── 1. Quantize / dequantize round-trips ────────────────────────────── class TestQuantizeDequantize: - """Core numerical tests for depth_utils.quantize_depth / dequantize_depth.""" + """Numerical contract of ``quantize_depth`` / ``dequantize_depth``.""" - def _make_depth_metres(self) -> np.ndarray: - """Linearly-spaced float32 depth in metres covering the default range.""" - return np.linspace(DEPTH_MIN, DEPTH_MAX, H * W, dtype=np.float32).reshape(H, W) - - def test_roundtrip_linear_metres(self): - depth = self._make_depth_metres() - quantized = quantize_depth(depth, use_log=False, video_backend=None) - recovered = dequantize_depth(quantized, use_log=False, output_unit="m") - - assert recovered.shape == (H, W, 1), f"Expected (H,W,1), got {recovered.shape}" - assert recovered.dtype == np.float32 - tol = (DEPTH_MAX - DEPTH_MIN) / DEPTH_QMAX - np.testing.assert_allclose(recovered[..., 0], depth, atol=tol + 1e-6) - - def test_roundtrip_log_metres(self): - depth = self._make_depth_metres() - quantized = quantize_depth(depth, use_log=True, video_backend=None) - recovered = dequantize_depth(quantized, use_log=True, output_unit="m") - - assert recovered.shape == (H, W, 1) - near = depth < 1.0 - far = depth > 8.0 - err_near = np.abs(recovered[..., 0][near] - depth[near]) - err_far = np.abs(recovered[..., 0][far] - depth[far]) - assert err_near.mean() < err_far.mean(), "Log quant should be more precise at close range" - - def test_roundtrip_mm_uint16_input(self): - depth_mm = np.linspace(10, 10000, H * W, dtype=np.float64).reshape(H, W).astype(np.uint16) - quantized = quantize_depth(depth_mm, use_log=False, video_backend=None, input_unit="mm") - recovered = dequantize_depth(quantized, use_log=False, output_unit="mm") - - assert recovered.dtype == np.uint16 - tol_mm = (DEPTH_MAX - DEPTH_MIN) * 1000.0 / DEPTH_QMAX - np.testing.assert_allclose( - recovered[..., 0].astype(np.float64), depth_mm.astype(np.float64), atol=tol_mm + 1.0 + @pytest.mark.parametrize("use_log", [False, True]) + @pytest.mark.parametrize("output_unit", ["m", "mm"]) + @pytest.mark.parametrize("output_channel_last", [False, True]) + def test_roundtrip(self, use_log, output_unit, output_channel_last): + """quantize → dequantize recovers depth; layout and unit are honored.""" + depth = _depth_metres_ramp() + quantized = quantize_depth(depth, use_log=use_log, video_backend=None) + recovered = dequantize_depth( + quantized, + use_log=use_log, + output_unit=output_unit, + output_tensor=False, + output_channel_last=output_channel_last, ) - def test_quantize_clamps_out_of_range(self): - depth = np.array([[0.001, 99.0]], dtype=np.float32) - quantized = quantize_depth(depth, use_log=False, video_backend=None) - assert quantized[0, 0] == 0 - assert quantized[0, 1] == DEPTH_QMAX + expected_shape = (H, W, 1) if output_channel_last else (1, H, W) + assert recovered.shape == expected_shape - def test_quantize_accepts_torch_tensor(self): - t = torch.rand(H, W, dtype=torch.float32) * (DEPTH_MAX - DEPTH_MIN) + DEPTH_MIN - result = quantize_depth(t, video_backend=None) - assert isinstance(result, np.ndarray) - assert result.dtype == np.uint16 + recovered_m = recovered.astype(np.float32) + if output_unit == "mm": + recovered_m = recovered_m / 1000.0 + recovered_2d = recovered_m[..., 0] if output_channel_last else recovered_m[0] - def test_quantize_squeezes_channel_dim(self): - depth = self._make_depth_metres() - for shape in [(H, W, 1), (1, H, W)]: - reshaped = depth.reshape(shape) - quantized = quantize_depth(reshaped, video_backend=None) - assert quantized.ndim == 2, f"Input shape {shape} should be squeezed to 2D" + if use_log: + # Log mode: tighter near-range error than far-range (the whole point). + near = depth < 1.0 + far = depth > 8.0 + err_near = np.abs(recovered_2d[near] - depth[near]) + err_far = np.abs(recovered_2d[far] - depth[far]) + assert err_near.mean() < err_far.mean() + else: + # Linear mode: bounded by quant step + 1 mm of unit-conversion rounding. + tol = (DEFAULT_DEPTH_MAX - DEFAULT_DEPTH_MIN) / DEPTH_QMAX + 1e-3 + np.testing.assert_allclose(recovered_2d, depth, atol=tol) - def test_quantize_returns_pyav_frame(self): - depth = self._make_depth_metres() - result = quantize_depth(depth, video_backend="pyav") - assert isinstance(result, av.VideoFrame) + @pytest.mark.parametrize("use_log", [False, True]) + @pytest.mark.parametrize("output_unit", ["m", "mm"]) + def test_numpy_torch_agree(self, use_log, output_unit): + """Batched torch path produces the same values as the numpy path.""" + T = 3 + per_frame = np.linspace(0, DEPTH_QMAX, H * W, dtype=np.uint16).reshape(H, W) + batch_np = np.broadcast_to(per_frame[None, None, ...], (T, 1, H, W)).copy() + batch_t = torch.from_numpy(batch_np.astype(np.int32)) # torch.uint16 support is patchy. - def test_dequantize_output_tensor(self): - quantized = np.full((H, W), DEPTH_QMAX // 2, dtype=np.uint16) - result = dequantize_depth(quantized, output_unit="m", output_tensor=True) - assert isinstance(result, torch.Tensor) - assert result.shape == (H, W, 1) + ref = dequantize_depth(batch_np, use_log=use_log, output_unit=output_unit, output_tensor=False) + out = dequantize_depth(batch_t, use_log=use_log, output_unit=output_unit, output_tensor=True) + + assert isinstance(out, torch.Tensor) + assert out.shape == (T, 1, H, W) + # ``m``: float32 noise (~10 µm in log mode, after ``exp``) — still 200× below the ~2 mm quant step. + # ``mm`` + tensor stays in float32 (no uint16 round-trip), so allow 1 mm slop. + atol = 1e-5 if output_unit == "m" else 1.0 + np.testing.assert_allclose( + out.cpu().numpy().astype(np.float64), ref.astype(np.float64), atol=atol + ) + + @pytest.mark.parametrize( + "input_shape,output_shape", + [ + ((H, W), (1, H, W)), + ((1, H, W), (1, H, W)), + ((H, W, 1), (1, H, W)), + ((3, 1, H, W), (3, 1, H, W)), + ((3, H, W, 1), (3, 1, H, W)), + ], + ) + def test_input_layouts_accepted(self, input_shape, output_shape): + """All documented input layouts decode to the channel-first default.""" + quantized = np.full(input_shape, DEPTH_QMAX // 2, dtype=np.uint16) + out = dequantize_depth(quantized, output_unit="m", output_tensor=False) + assert out.shape == output_shape + + def test_pyav_frame_roundtrip(self): + """quantize → av.VideoFrame → dequantize works.""" + depth = _depth_metres_ramp() + frame = quantize_depth(depth, use_log=False, video_backend="pyav") + assert isinstance(frame, av.VideoFrame) + + recovered = dequantize_depth(frame, use_log=False, output_unit="m", output_tensor=False) + assert recovered.shape == (1, H, W) + tol = (DEFAULT_DEPTH_MAX - DEFAULT_DEPTH_MIN) / DEPTH_QMAX + 1e-3 + np.testing.assert_allclose(recovered[0], depth, atol=tol) def test_invalid_log_params_raises(self): - depth = np.ones((4, 4), dtype=np.float32) - with pytest.raises(ValueError, match="depth_min \\+ shift must be positive"): - quantize_depth(depth, depth_min=1.0, shift=-2.0, use_log=True, video_backend=None) + with pytest.raises(ValueError, match=r"depth_min \+ shift must be positive"): + quantize_depth( + _depth_metres_ramp(), depth_min=1.0, shift=-2.0, use_log=True, video_backend=None + ) -# ── 2. Image writer depth support ─────────────────────────────────── +# ── 2. Image writer depth support ───────────────────────────────────── class TestImageWriterDepth: - """image_array_to_pil_image and write_image for single-channel depth maps.""" + """``image_array_to_pil_image`` and ``write_image`` for depth maps.""" - def test_pil_uint16_grayscale(self): - arr = np.arange(H * W, dtype=np.uint16).reshape(H, W) + @pytest.mark.parametrize("dtype,expected_mode", [(np.uint16, "I;16"), (np.float32, "F")]) + @pytest.mark.parametrize("shape", [(H, W), (H, W, 1), (1, H, W)]) + def test_pil_depth_modes_and_squeeze(self, dtype, expected_mode, shape): + """Single-channel depth converts to PIL with the right mode and (W, H) size.""" + arr = np.zeros(shape, dtype=dtype) img = image_array_to_pil_image(arr) - assert isinstance(img, PIL.Image.Image) - assert img.mode == "I;16" + assert img.mode == expected_mode assert img.size == (W, H) - def test_pil_float32_grayscale(self): - arr = np.random.rand(H, W).astype(np.float32) - img = image_array_to_pil_image(arr) - assert img.mode == "F" - - def test_pil_squeeze_hwc1_and_1hw(self): - arr_uint16 = np.zeros((H, W), dtype=np.uint16) - for input_arr in [arr_uint16.reshape(H, W, 1), arr_uint16.reshape(1, H, W)]: - img = image_array_to_pil_image(input_arr) - assert img.size == (W, H) - - def test_save_kwargs_png_vs_tiff(self): - png_kw = save_kwargs_for_path(Path("frame.png"), compress_level=5) - assert png_kw == {"compress_level": 5} - - tiff_kw = save_kwargs_for_path(Path("frame.tiff"), compress_level=5) - assert tiff_kw == {"compression": "raw"} - - assert save_kwargs_for_path(Path("frame.jpg"), compress_level=5) == {} - def test_write_image_tiff_roundtrip(self, tmp_path): + """uint16 depth round-trips through .tiff.""" arr = np.arange(H * W, dtype=np.uint16).reshape(H, W) fpath = tmp_path / "depth.tiff" write_image(arr, fpath) - - assert fpath.exists() with PIL.Image.open(fpath) as loaded: recovered = np.array(loaded) np.testing.assert_array_equal(recovered, arr) -# ── 3. Feature routing ────────────────────────────────────────────── +# ── 3. Hardware-feature → depth flag ────────────────────────────────── class TestHwToDatasetFeaturesDepth: - """hw_to_dataset_features marks single-channel cameras as depth.""" + """``hw_to_dataset_features`` flags single-channel cameras as depth.""" - def test_single_channel_cam_marked_depth(self): + @pytest.mark.parametrize("channels,is_depth", [(1, True), (3, False)]) + def test_depth_marker_by_channels(self, channels, is_depth): from lerobot.utils.feature_utils import hw_to_dataset_features - features = hw_to_dataset_features({"cam": (480, 640, 1)}, prefix="observation") - ft = features["observation.images.cam"] - assert ft["info"]["is_depth_map"] is True - - def test_three_channel_cam_not_depth(self): - from lerobot.utils.feature_utils import hw_to_dataset_features - - features = hw_to_dataset_features({"cam": (480, 640, 3)}, prefix="observation") - ft = features["observation.images.cam"] - assert ft["info"]["is_depth_map"] is False + features = hw_to_dataset_features({"cam": (480, 640, channels)}, prefix="observation") + assert features["observation.images.cam"]["info"]["is_depth_map"] is is_depth def test_invalid_channel_count_raises(self): from lerobot.utils.feature_utils import hw_to_dataset_features @@ -189,65 +181,24 @@ class TestHwToDatasetFeaturesDepth: hw_to_dataset_features({"cam": (480, 640, 2)}, prefix="observation") -# ── 4. Video info depth flag ──────────────────────────────────────── +# ── 4. Feature-to-file-format routing ──────────────────────────────── -class TestVideoInfoDepthFlag: - """Misc depth-related constants and helpers in video_utils / configs.""" - - def test_get_pix_fmt_channels_gray(self): - assert get_pix_fmt_channels("gray12le") == 1 - assert get_pix_fmt_channels("gray8") == 1 - - def test_ffv1_in_valid_codecs(self): - assert "ffv1" in VALID_VIDEO_CODECS - - -# ── 5. Feature-to-file-format routing ─────────────────────────────── - - -def _build_mixed_features(dtype: str) -> dict: - """Build a feature dict with one RGB camera and one depth camera. - - Uses shapes from ``DUMMY_CAMERA_FEATURES`` and ``DUMMY_DEPTH_CAMERA_FEATURES`` - defined in ``tests.fixtures.constants``. - """ - rgb_cam = next(iter(DUMMY_CAMERA_FEATURES.values())) - depth_cam = next(iter(DUMMY_DEPTH_CAMERA_FEATURES.values())) - return { - "observation.images.rgb": {"dtype": dtype, **rgb_cam}, - "observation.images.depth": {"dtype": dtype, **depth_cam}, - **{k: {"dtype": v["dtype"], **v} for k, v in DUMMY_MOTOR_FEATURES.items()}, - } - - -def _make_mixed_frame(features: dict) -> dict: - """Build a valid frame dict matching the given feature schema.""" - frame: dict = {"task": "test task"} - for key, ft in features.items(): - shape = ft["shape"] - if ft["dtype"] in ("image", "video"): - channels = shape[-1] - if channels == 1: - frame[key] = np.random.randint(0, 4095, shape, dtype=np.uint16) - else: - frame[key] = np.random.randint(0, 255, shape, dtype=np.uint8) - else: - frame[key] = np.random.randn(*shape).astype(ft["dtype"]) - return frame +# Keys derived from DUMMY_CAMERA_FEATURES_WITH_DEPTH; pick one RGB and the depth camera. +RGB_KEY = next(iter(DUMMY_CAMERA_FEATURES)) +DEPTH_KEY = next(iter(DUMMY_DEPTH_CAMERA_FEATURES)) class TestFeatureFileRouting: - """Verify that depth vs RGB features are routed to the correct file format.""" + """Depth vs RGB features route to the correct file format.""" NUM_FRAMES = 5 - def test_no_video_depth_tiff_rgb_png(self, tmp_path): - """Without video encoding: depth -> .tiff, RGB -> .png.""" + def test_image_mode_depth_tiff_rgb_png(self, tmp_path, features_factory): + """Without video encoding: depth → .tiff, RGB → .png.""" from lerobot.datasets.lerobot_dataset import LeRobotDataset - features = _build_mixed_features(dtype="image") - + features = features_factory(camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH, use_videos=False) dataset = LeRobotDataset.create( repo_id=DUMMY_REPO_ID, fps=DEFAULT_FPS, @@ -256,27 +207,20 @@ class TestFeatureFileRouting: use_videos=False, ) - for _ in range(self.NUM_FRAMES): - dataset.add_frame(_make_mixed_frame(features)) + add_frames(dataset, num_frames=self.NUM_FRAMES) buf = dataset.writer.episode_buffer - depth_paths = [Path(p) for p in buf["observation.images.depth"]] - rgb_paths = [Path(p) for p in buf["observation.images.rgb"]] - - assert all(p.suffix == ".tiff" for p in depth_paths), "Depth frames should be .tiff" - assert all(p.suffix == ".png" for p in rgb_paths), "RGB frames should be .png" - assert all(p.exists() for p in depth_paths), "Depth TIFF files should exist on disk" - assert all(p.exists() for p in rgb_paths), "RGB PNG files should exist on disk" + assert all(Path(p).suffix == ".tiff" for p in buf[DEPTH_KEY]) + assert all(Path(p).suffix == ".png" for p in buf[RGB_KEY]) dataset.save_episode() dataset.finalize() - def test_video_depth_uses_depth_encoder(self, tmp_path): - """With streaming video encoding: depth keys use DepthEncoderConfig, RGB keys do not.""" + def test_video_mode_depth_uses_depth_encoder(self, tmp_path, features_factory): + """With streaming video encoding: depth → DepthEncoderConfig, RGB does not.""" from lerobot.datasets.lerobot_dataset import LeRobotDataset - features = _build_mixed_features(dtype="video") - + features = features_factory(camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH, use_videos=True) dataset = LeRobotDataset.create( repo_id=DUMMY_REPO_ID, fps=DEFAULT_FPS, @@ -286,19 +230,12 @@ class TestFeatureFileRouting: streaming_encoding=True, ) - assert dataset.writer._streaming_encoder is not None + add_frames(dataset, num_frames=self.NUM_FRAMES) + encoder = dataset.writer._streaming_encoder - - for _ in range(self.NUM_FRAMES): - dataset.add_frame(_make_mixed_frame(features)) - - rgb_thread = encoder._threads["observation.images.rgb"] - depth_thread = encoder._threads["observation.images.depth"] - - assert not isinstance(rgb_thread.video_encoder, DepthEncoderConfig) - assert isinstance(depth_thread.video_encoder, DepthEncoderConfig) - assert depth_thread.is_depth is True - assert rgb_thread.is_depth is False + assert encoder is not None + assert isinstance(encoder._threads[DEPTH_KEY].video_encoder, DepthEncoderConfig) + assert not isinstance(encoder._threads[RGB_KEY].video_encoder, DepthEncoderConfig) dataset.save_episode() dataset.finalize()