refactor(cameras): cameras API extension (#2808)

* feat(cameras): add new read_latest() method

* fix(cameras): fix threading bug + clear state

* refactor(cameras): multiple improvements

* feat(camera): add context manager to camera base class

* chore(camera): slight modifications to opencv

* test(cameras): update opencv tests according to the changes

* refactor(cameras): reflect desing changes to realsense + deal with depth

* test(cameras): fix realsense tests accordingly to new changes

* refactor(cameras): update reachymini and zmq accordingly

* chore: wrap resource sensitive examples into a try/finally

* test(cameras): add test for new read_latest

* test(cameras): fix problem with image artifact in opencv tests

* test(cameras): fix test_read_latest_high_frequency expectations

* Apply suggestions from code review 1

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>

* chore(cameras): address feedback

* feat(cameras): add max_age_ms check in read_latest

* test(cameras): fix read_latest tests

* chore(redundancies): removing redundancies in Reachy 2 camera class

* fix(warmup): replacing the arbitrary time.sleep in by an actual warmup in the RealSense camera class

* chore(format): formatting latest changes

* chore(warning): adding a "to be implemented" warning for read_latest() in Camera base class

* chore(warning): making read_latest() warning message shorter and clearer

---------

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
This commit is contained in:
Steven Palma
2026-01-29 04:07:47 -06:00
committed by GitHub
parent 4483184875
commit 3409ef0dc2
21 changed files with 1179 additions and 641 deletions
+127 -57
View File
@@ -20,7 +20,9 @@
# ```
from pathlib import Path
from unittest.mock import patch
import cv2
import numpy as np
import pytest
@@ -28,6 +30,50 @@ from lerobot.cameras.configs import Cv2Rotation
from lerobot.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
RealVideoCapture = cv2.VideoCapture
class MockLoopingVideoCapture:
"""
Wraps the real OpenCV VideoCapture.
Motivation: cv2.VideoCapture(file.png) is only valid for one read.
Strategy: Read the file once & return the cached frame for subsequent reads.
Consequence: No recurrent I/O operations, but we keep the test artifacts simple.
"""
def __init__(self, *args, **kwargs):
args_clean = [str(a) if isinstance(a, Path) else a for a in args]
self._real_vc = RealVideoCapture(*args_clean, **kwargs)
self._cached_frame = None
def read(self):
ret, frame = self._real_vc.read()
if ret:
self._cached_frame = frame
return ret, frame
if not ret and self._cached_frame is not None:
return True, self._cached_frame.copy()
return ret, frame
def __getattr__(self, name):
return getattr(self._real_vc, name)
@pytest.fixture(autouse=True)
def patch_opencv_videocapture():
"""
Automatically patches cv2.VideoCapture for all tests.
"""
module_path = OpenCVCamera.__module__
target = f"{module_path}.cv2.VideoCapture"
with patch(target, new=MockLoopingVideoCapture):
yield
# NOTE(Steven): more tests + assertions?
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "cameras"
DEFAULT_PNG_FILE_PATH = TEST_ARTIFACTS_DIR / "image_160x120.png"
@@ -43,25 +89,22 @@ def test_abc_implementation():
def test_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
camera.connect(warmup=False)
assert camera.is_connected
with OpenCVCamera(config) as camera:
assert camera.is_connected
def test_connect_already_connected():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with pytest.raises(DeviceAlreadyConnectedError):
camera.connect(warmup=False)
with OpenCVCamera(config) as camera, pytest.raises(DeviceAlreadyConnectedError):
camera.connect()
def test_connect_invalid_camera_path():
config = OpenCVCameraConfig(index_or_path="nonexistent/camera.png")
camera = OpenCVCamera(config)
with pytest.raises(ConnectionError):
@@ -74,27 +117,25 @@ def test_invalid_width_connect():
width=99999, # Invalid width to trigger error
height=480,
)
camera = OpenCVCamera(config)
camera = OpenCVCamera(config)
with pytest.raises(RuntimeError):
camera.connect(warmup=False)
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES)
def test_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=index_or_path, warmup_s=0)
img = camera.read()
assert isinstance(img, np.ndarray)
with OpenCVCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
def test_read_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
@@ -119,32 +160,22 @@ def test_disconnect_before_connect():
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES)
def test_async_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=index_or_path, warmup_s=0)
try:
with OpenCVCamera(config) as camera:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
finally:
if camera.is_connected:
camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends
def test_async_read_timeout():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
finally:
if camera.is_connected:
camera.disconnect()
with OpenCVCamera(config) as camera, pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0) # consumes any available frame by then
camera.async_read(timeout_ms=0) # request immediately another one
def test_async_read_before_connect():
@@ -155,6 +186,50 @@ def test_async_read_before_connect():
_ = camera.async_read()
def test_read_latest():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
# ensure at least one fresh frame is captured
frame = camera.read()
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == frame.shape
def test_read_latest_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read_latest()
def test_read_latest_high_frequency():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
# prime to ensure frames are available
ref = camera.read()
for _ in range(20):
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == ref.shape
def test_read_latest_too_old():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
# prime to ensure frames are available
_ = camera.read()
with pytest.raises(TimeoutError):
_ = camera.read_latest(max_age_ms=0) # immediately too old
def test_fourcc_configuration():
"""Test FourCC configuration validation and application."""
@@ -181,18 +256,15 @@ def test_fourcc_configuration():
def test_fourcc_with_camera():
"""Test FourCC functionality with actual camera connection."""
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc="MJPG")
camera = OpenCVCamera(config)
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc="MJPG", warmup_s=0)
# Connect should work with MJPG specified
camera.connect(warmup=False)
assert camera.is_connected
with OpenCVCamera(config) as camera:
assert camera.is_connected
# Read should work normally
img = camera.read()
assert isinstance(img, np.ndarray)
camera.disconnect()
# Read should work normally
img = camera.read()
assert isinstance(img, np.ndarray)
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES)
@@ -211,18 +283,16 @@ def test_rotation(rotation, index_or_path):
dimensions = filename.split("_")[-1].split(".")[0] # Assumes filenames format (_wxh.png)
original_width, original_height = map(int, dimensions.split("x"))
config = OpenCVCameraConfig(index_or_path=index_or_path, rotation=rotation)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=index_or_path, rotation=rotation, warmup_s=0)
with OpenCVCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
img = camera.read()
assert isinstance(img, np.ndarray)
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
assert camera.width == original_height
assert camera.height == original_width
assert img.shape[:2] == (original_width, original_height)
else:
assert camera.width == original_width
assert camera.height == original_height
assert img.shape[:2] == (original_height, original_width)
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
assert camera.width == original_height
assert camera.height == original_width
assert img.shape[:2] == (original_width, original_height)
else:
assert camera.width == original_width
assert camera.height == original_height
assert img.shape[:2] == (original_height, original_width)
+38
View File
@@ -150,6 +150,44 @@ def test_async_read_before_connect(camera):
_ = camera.async_read()
def test_read_latest(camera):
camera.connect()
frame = camera.read()
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == frame.shape
def test_read_latest_before_connect(camera):
# camera fixture yields an unconnected camera instance
with pytest.raises(DeviceNotConnectedError):
_ = camera.read_latest()
def test_read_latest_high_frequency(camera):
camera.connect()
# prime to ensure frames are available
ref = camera.read()
for _ in range(20):
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == ref.shape
def test_read_latest_too_old(camera):
camera.connect()
# prime to ensure frames are available
_ = camera.read()
with pytest.raises(TimeoutError):
_ = camera.read_latest(max_age_ms=0) # immediately too old
def test_wrong_camera_name():
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="wrong-name", image_type="left")
+68 -46
View File
@@ -62,19 +62,15 @@ def test_abc_implementation():
def test_connect():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
config = RealSenseCameraConfig(serial_number_or_name="042", warmup_s=0)
camera.connect(warmup=False)
assert camera.is_connected
with RealSenseCamera(config) as camera:
assert camera.is_connected
def test_connect_already_connected():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
camera.connect(warmup=False)
with pytest.raises(DeviceAlreadyConnectedError):
config = RealSenseCameraConfig(serial_number_or_name="042", warmup_s=0)
with RealSenseCamera(config) as camera, pytest.raises(DeviceAlreadyConnectedError):
camera.connect(warmup=False)
@@ -96,12 +92,10 @@ def test_invalid_width_connect():
def test_read():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(warmup=False)
img = camera.read()
assert isinstance(img, np.ndarray)
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
# TODO(Steven): Fix this test for the latest version of pyrealsense2.
@@ -142,32 +136,21 @@ def test_disconnect_before_connect():
def test_async_read():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(warmup=False)
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
try:
with RealSenseCamera(config) as camera:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
finally:
if camera.is_connected:
camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends
def test_async_read_timeout():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(warmup=False)
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
finally:
if camera.is_connected:
camera.disconnect()
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera, pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0) # consumes any available frame by then
camera.async_read(timeout_ms=0) # request immediately another one
def test_async_read_before_connect():
@@ -178,6 +161,47 @@ def test_async_read_before_connect():
_ = camera.async_read()
def test_read_latest():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.read()
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == img.shape
def test_read_latest_high_frequency():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
# prime with one read to ensure frames are available
ref = camera.read()
for _ in range(20):
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == ref.shape
def test_read_latest_before_connect():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read_latest()
def test_read_latest_too_old():
config = RealSenseCameraConfig(serial_number_or_name="042")
with RealSenseCamera(config) as camera:
# prime to ensure frames are available
_ = camera.read()
with pytest.raises(TimeoutError):
_ = camera.read_latest(max_age_ms=0) # immediately too old
@pytest.mark.parametrize(
"rotation",
[
@@ -189,18 +213,16 @@ def test_async_read_before_connect():
ids=["no_rot", "rot90", "rot180", "rot270"],
)
def test_rotation(rotation):
config = RealSenseCameraConfig(serial_number_or_name="042", rotation=rotation)
camera = RealSenseCamera(config)
camera.connect(warmup=False)
config = RealSenseCameraConfig(serial_number_or_name="042", rotation=rotation, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
img = camera.read()
assert isinstance(img, np.ndarray)
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
assert camera.width == 480
assert camera.height == 640
assert img.shape[:2] == (640, 480)
else:
assert camera.width == 640
assert camera.height == 480
assert img.shape[:2] == (480, 640)
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
assert camera.width == 480
assert camera.height == 640
assert img.shape[:2] == (640, 480)
else:
assert camera.width == 640
assert camera.height == 480
assert img.shape[:2] == (480, 640)