Files
Steven Palma 3409ef0dc2 refactor(cameras): cameras API extension (#2808)
* feat(cameras): add new read_latest() method

* fix(cameras): fix threading bug + clear state

* refactor(cameras): multiple improvements

* feat(camera): add context manager to camera base class

* chore(camera): slight modifications to opencv

* test(cameras): update opencv tests according to the changes

* refactor(cameras): reflect desing changes to realsense + deal with depth

* test(cameras): fix realsense tests accordingly to new changes

* refactor(cameras): update reachymini and zmq accordingly

* chore: wrap resource sensitive examples into a try/finally

* test(cameras): add test for new read_latest

* test(cameras): fix problem with image artifact in opencv tests

* test(cameras): fix test_read_latest_high_frequency expectations

* Apply suggestions from code review 1

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>

* chore(cameras): address feedback

* feat(cameras): add max_age_ms check in read_latest

* test(cameras): fix read_latest tests

* chore(redundancies): removing redundancies in Reachy 2 camera class

* fix(warmup): replacing the arbitrary time.sleep in by an actual warmup in the RealSense camera class

* chore(format): formatting latest changes

* chore(warning): adding a "to be implemented" warning for read_latest() in Camera base class

* chore(warning): making read_latest() warning message shorter and clearer

---------

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
2026-01-29 11:07:47 +01:00

229 lines
7.4 KiB
Python

#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Example of running a specific test:
# ```bash
# pytest tests/cameras/test_opencv.py::test_connect
# ```
from pathlib import Path
from unittest.mock import patch
import numpy as np
import pytest
from lerobot.cameras.configs import Cv2Rotation
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
pytest.importorskip("pyrealsense2")
from lerobot.cameras.realsense import RealSenseCamera, RealSenseCameraConfig
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "cameras"
BAG_FILE_PATH = TEST_ARTIFACTS_DIR / "test_rs.bag"
# NOTE(Steven): For some reason these tests take ~20sec in macOS but only ~2sec in Linux.
def mock_rs_config_enable_device_from_file(rs_config_instance, _sn):
return rs_config_instance.enable_device_from_file(str(BAG_FILE_PATH), repeat_playback=True)
def mock_rs_config_enable_device_bad_file(rs_config_instance, _sn):
return rs_config_instance.enable_device_from_file("non_existent_file.bag", repeat_playback=True)
@pytest.fixture(name="patch_realsense", autouse=True)
def fixture_patch_realsense():
"""Automatically mock pyrealsense2.config.enable_device for all tests."""
with patch(
"pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file
) as mock:
yield mock
def test_abc_implementation():
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
config = RealSenseCameraConfig(serial_number_or_name="042")
_ = RealSenseCamera(config)
def test_connect():
config = RealSenseCameraConfig(serial_number_or_name="042", warmup_s=0)
with RealSenseCamera(config) as camera:
assert camera.is_connected
def test_connect_already_connected():
config = RealSenseCameraConfig(serial_number_or_name="042", warmup_s=0)
with RealSenseCamera(config) as camera, pytest.raises(DeviceAlreadyConnectedError):
camera.connect(warmup=False)
def test_connect_invalid_camera_path(patch_realsense):
patch_realsense.side_effect = mock_rs_config_enable_device_bad_file
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
with pytest.raises(ConnectionError):
camera.connect(warmup=False)
def test_invalid_width_connect():
config = RealSenseCameraConfig(serial_number_or_name="042", width=99999, height=480, fps=30)
camera = RealSenseCamera(config)
with pytest.raises(ConnectionError):
camera.connect(warmup=False)
def test_read():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
# TODO(Steven): Fix this test for the latest version of pyrealsense2.
@pytest.mark.skip("Skipping test: pyrealsense2 version > 2.55.1.6486")
def test_read_depth():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, use_depth=True)
camera = RealSenseCamera(config)
camera.connect(warmup=False)
img = camera.read_depth(timeout_ms=2000) # NOTE(Steven): Reading depth takes longer in CI environments.
assert isinstance(img, np.ndarray)
def test_read_before_connect():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
def test_disconnect():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
camera.connect(warmup=False)
camera.disconnect()
assert not camera.is_connected
def test_disconnect_before_connect():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
camera.disconnect()
def test_async_read():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
def test_async_read_timeout():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera, pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0) # consumes any available frame by then
camera.async_read(timeout_ms=0) # request immediately another one
def test_async_read_before_connect():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.async_read()
def test_read_latest():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.read()
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == img.shape
def test_read_latest_high_frequency():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
# prime with one read to ensure frames are available
ref = camera.read()
for _ in range(20):
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == ref.shape
def test_read_latest_before_connect():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read_latest()
def test_read_latest_too_old():
config = RealSenseCameraConfig(serial_number_or_name="042")
with RealSenseCamera(config) as camera:
# prime to ensure frames are available
_ = camera.read()
with pytest.raises(TimeoutError):
_ = camera.read_latest(max_age_ms=0) # immediately too old
@pytest.mark.parametrize(
"rotation",
[
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
],
ids=["no_rot", "rot90", "rot180", "rot270"],
)
def test_rotation(rotation):
config = RealSenseCameraConfig(serial_number_or_name="042", rotation=rotation, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
assert camera.width == 480
assert camera.height == 640
assert img.shape[:2] == (640, 480)
else:
assert camera.width == 640
assert camera.height == 480
assert img.shape[:2] == (480, 640)