Files
lerobot/tests/cameras/test_opencv.py
2026-03-08 14:02:33 +01:00

300 lines
9.4 KiB
Python

#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Example of running a specific test:
# ```bash
# pytest tests/cameras/test_opencv.py::test_connect
# ```
from pathlib import Path
from unittest.mock import patch
import cv2
import numpy as np
import pytest
from lerobot.cameras.configs import Cv2Rotation
from lerobot.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
RealVideoCapture = cv2.VideoCapture
class MockLoopingVideoCapture:
"""
Wraps the real OpenCV VideoCapture.
Motivation: cv2.VideoCapture(file.png) is only valid for one read.
Strategy: Read the file once & return the cached frame for subsequent reads.
Consequence: No recurrent I/O operations, but we keep the test artifacts simple.
"""
def __init__(self, *args, **kwargs):
args_clean = [str(a) if isinstance(a, Path) else a for a in args]
self._real_vc = RealVideoCapture(*args_clean, **kwargs)
self._cached_frame = None
def read(self):
ret, frame = self._real_vc.read()
if ret:
self._cached_frame = frame
return ret, frame
if not ret and self._cached_frame is not None:
return True, self._cached_frame.copy()
return ret, frame
def __getattr__(self, name):
return getattr(self._real_vc, name)
@pytest.fixture(autouse=True)
def patch_opencv_videocapture():
"""
Automatically patches cv2.VideoCapture for all tests.
"""
module_path = OpenCVCamera.__module__
target = f"{module_path}.cv2.VideoCapture"
with patch(target, new=MockLoopingVideoCapture):
yield
# NOTE(Steven): more tests + assertions?
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "cameras"
DEFAULT_PNG_FILE_PATH = TEST_ARTIFACTS_DIR / "image_160x120.png"
TEST_IMAGE_SIZES = ["128x128", "160x120", "320x180", "480x270"]
TEST_IMAGE_PATHS = [TEST_ARTIFACTS_DIR / f"image_{size}.png" for size in TEST_IMAGE_SIZES]
def test_abc_implementation():
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
config = OpenCVCameraConfig(index_or_path=0)
_ = OpenCVCamera(config)
def test_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
assert camera.is_connected
def test_connect_already_connected():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera, pytest.raises(DeviceAlreadyConnectedError):
camera.connect()
def test_connect_invalid_camera_path():
config = OpenCVCameraConfig(index_or_path="nonexistent/camera.png")
camera = OpenCVCamera(config)
with pytest.raises(ConnectionError):
camera.connect(warmup=False)
def test_invalid_width_connect():
config = OpenCVCameraConfig(
index_or_path=DEFAULT_PNG_FILE_PATH,
width=99999, # Invalid width to trigger error
height=480,
)
camera = OpenCVCamera(config)
with pytest.raises(RuntimeError):
camera.connect(warmup=False)
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES)
def test_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path, warmup_s=0)
with OpenCVCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
def test_read_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
def test_disconnect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
camera.disconnect()
assert not camera.is_connected
def test_disconnect_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.disconnect()
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES)
def test_async_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path, warmup_s=0)
with OpenCVCamera(config) as camera:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
@pytest.mark.skip("Skipping test: async_read 0 timeout behavior may be flaky/non-deterministic.")
def test_async_read_timeout():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera, pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0) # consumes any available frame by then
camera.async_read(timeout_ms=0) # request immediately another one
def test_async_read_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.async_read()
def test_read_latest():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
# ensure at least one fresh frame is captured
frame = camera.read()
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == frame.shape
def test_read_latest_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read_latest()
def test_read_latest_high_frequency():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
# prime to ensure frames are available
ref = camera.read()
for _ in range(20):
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == ref.shape
def test_read_latest_too_old():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
# prime to ensure frames are available
_ = camera.read()
with pytest.raises(TimeoutError):
_ = camera.read_latest(max_age_ms=0) # immediately too old
def test_fourcc_configuration():
"""Test FourCC configuration validation and application."""
# Test MJPG specifically (main use case)
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc="MJPG")
camera = OpenCVCamera(config)
assert camera.config.fourcc == "MJPG"
# Test a few other common formats
valid_fourcc_codes = ["YUYV", "YUY2", "RGB3"]
for fourcc in valid_fourcc_codes:
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc=fourcc)
camera = OpenCVCamera(config)
assert camera.config.fourcc == fourcc
# Test invalid FOURCC codes
invalid_fourcc_codes = ["ABC", "ABCDE", ""]
for fourcc in invalid_fourcc_codes:
with pytest.raises(ValueError):
OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc=fourcc)
def test_fourcc_with_camera():
"""Test FourCC functionality with actual camera connection."""
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc="MJPG", warmup_s=0)
# Connect should work with MJPG specified
with OpenCVCamera(config) as camera:
assert camera.is_connected
# Read should work normally
img = camera.read()
assert isinstance(img, np.ndarray)
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES)
@pytest.mark.parametrize(
"rotation",
[
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
],
ids=["no_rot", "rot90", "rot180", "rot270"],
)
def test_rotation(rotation, index_or_path):
filename = Path(index_or_path).name
dimensions = filename.split("_")[-1].split(".")[0] # Assumes filenames format (_wxh.png)
original_width, original_height = map(int, dimensions.split("x"))
config = OpenCVCameraConfig(index_or_path=index_or_path, rotation=rotation, warmup_s=0)
with OpenCVCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
assert camera.width == original_height
assert camera.height == original_width
assert img.shape[:2] == (original_width, original_height)
else:
assert camera.width == original_width
assert camera.height == original_height
assert img.shape[:2] == (original_height, original_width)