diff --git a/tests/cameras/test_reachy2_camera.py b/tests/cameras/test_reachy2_camera.py new file mode 100644 index 000000000..958f803ac --- /dev/null +++ b/tests/cameras/test_reachy2_camera.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Example of running a specific test: +# ```bash +# pytest tests/cameras/test_opencv.py::test_connect +# ``` + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import numpy as np +import time +import pytest + +from lerobot.cameras.configs import Cv2Rotation +from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +from lerobot.cameras.reachy2_camera import Reachy2Camera, Reachy2CameraConfig + + +PARAMS = [ + ("teleop", "left"), + ("teleop", "right"), + ("depth", "rgb"), + # ("depth", "depth"), # Depth camera is not available yet +] + + +def _make_cam_manager_mock(): + c = MagicMock(name="CameraManagerMock") + + teleop = MagicMock(name="TeleopCam") + teleop.width = 640 + teleop.height = 480 + teleop.get_frame = MagicMock( + side_effect=lambda *_, **__: ( + np.zeros((480, 640, 3), dtype=np.uint8), + time.time(), + ) + ) + + depth = MagicMock(name="DepthCam") + depth.width = 640 + depth.height = 480 + depth.get_frame = MagicMock( + side_effect=lambda *_, **__: ( + np.zeros((480, 640, 3), dtype=np.uint8), + time.time(), + ) + ) + + c.is_connected.return_value = True + c.teleop = teleop + c.depth = depth + + def _connect(): + c.teleop = teleop + c.depth = depth + c.is_connected.return_value = True + + def _disconnect(): + c.teleop = None + c.depth = None + c.is_connected.return_value = False + + c.connect = MagicMock(side_effect=_connect) + c.disconnect = MagicMock(side_effect=_disconnect) + + # Mock methods + c.initialize_cameras = MagicMock() + + return c + + +@pytest.fixture( + params=PARAMS, + # ids=["teleop-left", "teleop-right", "torso-rgb", "torso-depth"], # Depth camera is not available yet + ids=["teleop-left", "teleop-right", "torso-rgb"], +) +def camera(request): + name, image_type = request.param + with ( + patch( + "lerobot.cameras.reachy2_camera.reachy2_camera.CameraManager", + side_effect=lambda *a, **k: _make_cam_manager_mock(), + ), + ): + config = Reachy2CameraConfig(name=name, image_type=image_type) + cam = Reachy2Camera(config) + yield cam + if cam.is_connected: + cam.disconnect() + + +def test_connect(camera): + camera.connect() + assert camera.is_connected + camera.cam_manager.initialize_cameras.assert_called_once() + + +def test_read(camera): + camera.connect() + + img = camera.read() + if camera.config.name == "teleop": + camera.cam_manager.teleop.get_frame.assert_called_once() + elif camera.config.name == "depth": + camera.cam_manager.depth.get_frame.assert_called_once() + assert isinstance(img, np.ndarray) + assert img.shape == (480, 640, 3) + + +def test_disconnect(camera): + camera.connect() + + camera.disconnect() + assert not camera.is_connected + + +def test_async_read(camera): + camera.connect() + try: + img = camera.async_read() + + assert camera.thread is not None + assert camera.thread.is_alive() + assert isinstance(img, np.ndarray) + finally: + if camera.is_connected: + camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends + + +def test_async_read_timeout(camera): + camera.connect() + try: + with pytest.raises(TimeoutError): + camera.async_read(timeout_ms=0) + finally: + if camera.is_connected: + camera.disconnect() + + +def test_read_before_connect(camera): + with pytest.raises(DeviceNotConnectedError): + _ = camera.read() + + +def test_disconnect_before_connect(camera): + with pytest.raises(DeviceNotConnectedError): + camera.disconnect() + + +def test_async_read_before_connect(camera): + with pytest.raises(DeviceNotConnectedError): + _ = camera.async_read() diff --git a/tests/robots/test_reachy2.py b/tests/robots/test_reachy2.py index 669376ca3..470199573 100644 --- a/tests/robots/test_reachy2.py +++ b/tests/robots/test_reachy2.py @@ -56,7 +56,7 @@ REACHY2_VEL = { } PARAMS = [ - {}, # config par défaut + {}, # default config {"with_mobile_base": False}, {"with_mobile_base": False, "with_l_arm": False, "with_antennas": False}, {"with_r_arm": False, "with_neck": False, "with_antennas": False},