Update cameras

This commit is contained in:
glannuzel
2025-08-21 17:34:35 +02:00
parent b07c3b271c
commit fc6b709da0
3 changed files with 66 additions and 48 deletions
@@ -20,31 +20,36 @@ from ..configs import CameraConfig, ColorMode, Cv2Rotation
@CameraConfig.register_subclass("reachy2_camera") @CameraConfig.register_subclass("reachy2_camera")
@dataclass @dataclass
class Reachy2CameraConfig(CameraConfig): class Reachy2CameraConfig(CameraConfig):
"""Configuration class for OpenCV-based camera devices or video files. """Configuration class for Reachy 2 camera devices.
This class provides configuration options for cameras accessed through OpenCV, This class provides configuration options for Reachy 2 cameras,
supporting both physical camera devices and video files. It includes settings supporting both the teleop and torso cameras. It includes settings
for resolution, frame rate, color mode, and image rotation. for resolution, frame rate, color mode, and the selection of the cameras.
Example configurations: Example configurations:
```python ```python
# Basic configurations # Basic configurations
OpenCVCameraConfig(0, 30, 1280, 720) # 1280x720 @ 30FPS Reachy2CameraConfig(
OpenCVCameraConfig(/dev/video4, 60, 640, 480) # 640x480 @ 60FPS name="teleop",
image_type="left",
# Advanced configurations ip_address="192.168.0.200", # IP address of the robot
OpenCVCameraConfig(128422271347, 30, 640, 480, rotation=Cv2Rotation.ROTATE_90) # With 90° rotation fps=15,
width=640,
height=480,
color_mode=ColorMode.RGB,
) # Left teleop camera, 640x480 @ 15FPS
``` ```
Attributes: Attributes:
index_or_path: Either an integer representing the camera device index, name: Name of the camera device. Can be "teleop" or "torso".
or a Path object pointing to a video file. image_type: Type of image stream. For "teleop" camera, can be "left" or "right".
For "torso" camera, can be "rgb" or "depth". (depth is not supported yet)
fps: Requested frames per second for the color stream. fps: Requested frames per second for the color stream.
width: Requested frame width in pixels for the color stream. width: Requested frame width in pixels for the color stream.
height: Requested frame height in pixels for the color stream. height: Requested frame height in pixels for the color stream.
color_mode: Color mode for image output (RGB or BGR). Defaults to RGB. color_mode: Color mode for image output (RGB or BGR). Defaults to RGB.
rotation: Image rotation setting (0°, 90°, 180°, or 270°). Defaults to no rotation. ip_address: IP address of the robot. Defaults to "localhost".
warmup_s: Time reading frames before returning from connect (in seconds) port: Port number for the camera server. Defaults to 50065.
Note: Note:
- Only 3-channel color output (RGB/BGR) is currently supported. - Only 3-channel color output (RGB/BGR) is currently supported.
@@ -53,11 +58,21 @@ class Reachy2CameraConfig(CameraConfig):
name: str name: str
image_type: str image_type: str
color_mode: ColorMode = ColorMode.RGB color_mode: ColorMode = ColorMode.RGB
rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION
ip_address: str | None = "localhost" ip_address: str | None = "localhost"
port: int = 50065 port: int = 50065
# use_depth: bool = False # use_depth: bool = False
def __post_init__(self): def __post_init__(self):
if self.name not in ["teleop", "torso"]:
raise ValueError(f"`name` is expected to be 'teleop' or 'torso', but {self.name} is provided.")
if (
(self.name == "teleop" and self.image_type not in ["left", "right"])
or (self.name == "torso")
and self.image_type not in ["rgb", "depth"]
):
raise ValueError(
f"`image_type` is expected to be 'left' or 'right' for teleop camera, and 'rgb' or 'depth' for torso camera, but {self.image_type} is provided."
)
if self.color_mode not in ["rgb", "bgr"]: if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided.") raise ValueError(f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided.")
@@ -13,15 +13,13 @@
# limitations under the License. # limitations under the License.
""" """
Provides the OpenCVCamera class for capturing frames from cameras using OpenCV. Provides the Reachy2Camera class for capturing frames from Reachy 2 cameras using Reachy 2's CameraManager.
""" """
import logging import logging
import math
import os import os
import platform import platform
import time import time
from pathlib import Path
from threading import Event, Lock, Thread from threading import Event, Lock, Thread
from typing import Any from typing import Any
@@ -33,8 +31,7 @@ if (
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0" os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2 import cv2
import numpy as np import numpy as np
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.errors import DeviceNotConnectedError
from reachy2_sdk import ReachySDK
from reachy2_sdk.media.camera import CameraView from reachy2_sdk.media.camera import CameraView
from reachy2_sdk.media.camera_manager import CameraManager from reachy2_sdk.media.camera_manager import CameraManager
@@ -61,7 +58,7 @@ class Reachy2Camera(Camera):
def __init__(self, config: Reachy2CameraConfig): def __init__(self, config: Reachy2CameraConfig):
""" """
Initializes the OpenCVCamera instance. Initializes the Reachy2Camera instance.
Args: Args:
config: The configuration settings for the camera. config: The configuration settings for the camera.
@@ -73,8 +70,6 @@ class Reachy2Camera(Camera):
self.fps = config.fps self.fps = config.fps
self.color_mode = config.color_mode self.color_mode = config.color_mode
self.reachy2_sdk: ReachySDK | None = None
self.thread: Thread | None = None self.thread: Thread | None = None
self.stop_event: Event | None = None self.stop_event: Event | None = None
self.frame_lock: Lock = Lock() self.frame_lock: Lock = Lock()
@@ -90,12 +85,8 @@ class Reachy2Camera(Camera):
def is_connected(self) -> bool: def is_connected(self) -> bool:
"""Checks if the camera is currently connected and opened.""" """Checks if the camera is currently connected and opened."""
return ( return (
( (self.cam_manager.teleop is not None and self.cam_manager.depth is not None)
self.reachy.is_connected() if self.cam_manager is not None
and self.reachy.cameras.teleop is not None
and self.reachy.cameras.depth is not None
)
if self.reachy is not None
else False else False
) )
@@ -103,8 +94,6 @@ class Reachy2Camera(Camera):
""" """
Connects to the Reachy2 CameraManager as specified in the configuration. Connects to the Reachy2 CameraManager as specified in the configuration.
""" """
self.reachy = ReachySDK(self.config.ip_address)
self.cam_manager = CameraManager( self.cam_manager = CameraManager(
host=self.config.ip_address, port=self.config.port host=self.config.ip_address, port=self.config.port
) )
@@ -114,23 +103,43 @@ class Reachy2Camera(Camera):
print(f"{self} connected.") print(f"{self} connected.")
@staticmethod @staticmethod
def find_cameras() -> list[dict[str, Any]]: def find_cameras(
ip_address: str = "localhost", port: int = 50065
) -> list[dict[str, Any]]:
""" """
Detects available Reachy 2 cameras. Detects available Reachy 2 cameras.
Returns: Returns:
List[Dict[str, Any]]: A list of dictionaries, List[Dict[str, Any]]: A list of dictionaries,
where each dictionary contains 'type', 'id' (port index or path), where each dictionary contains 'name', 'stereo',
and the default profile properties (width, height, fps, format). and the default profile properties (width, height, fps).
""" """
pass initialized_cameras = []
camera_manager = CameraManager(host=ip_address, port=port)
for camera in [camera_manager.teleop, camera_manager.depth]:
if camera is None:
continue
height, width, _, _, _, _, _ = camera.get_parameters()
camera_info = {
"name": camera._cam_info.name,
"stereo": camera._cam_info.stereo,
"default_profile": {
"width": width,
"height": height,
"fps": 30,
},
}
initialized_cameras.append(camera_info)
return initialized_cameras
def read(self, color_mode: ColorMode | None = None) -> np.ndarray: def read(self, color_mode: ColorMode | None = None) -> np.ndarray:
""" """
Reads a single frame synchronously from the camera. Reads a single frame synchronously from the camera.
This is a blocking call. It waits for the next available frame from the This is a blocking call.
camera hardware via OpenCV.
Args: Args:
color_mode (Optional[ColorMode]): If specified, overrides the default color_mode (Optional[ColorMode]): If specified, overrides the default
@@ -165,9 +174,9 @@ class Reachy2Camera(Camera):
frame = self.cam_manager.depth.get_frame(size=(640, 480))[0] frame = self.cam_manager.depth.get_frame(size=(640, 480))[0]
if frame is None: if frame is None:
return None return np.empty((0, 0, 3), dtype=np.uint8)
if frame is not None and self.config.color_mode == "rgb": if self.config.color_mode == "rgb":
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
read_duration_ms = (time.perf_counter() - start_time) * 1e3 read_duration_ms = (time.perf_counter() - start_time) * 1e3
@@ -271,10 +280,7 @@ class Reachy2Camera(Camera):
def disconnect(self): def disconnect(self):
""" """
Disconnects from the camera and cleans up resources. Stops the background read thread (if running).
Stops the background read thread (if running) and releases the OpenCV
VideoCapture object.
Raises: Raises:
DeviceNotConnectedError: If the camera is already disconnected. DeviceNotConnectedError: If the camera is already disconnected.
@@ -60,33 +60,30 @@ class Reachy2RobotConfig(RobotConfig):
name="teleop", name="teleop",
image_type="left", image_type="left",
ip_address=self.ip_address, ip_address=self.ip_address,
fps=30, fps=15,
width=640, width=640,
height=480, height=480,
color_mode=ColorMode.RGB, color_mode=ColorMode.RGB,
rotation=Cv2Rotation.NO_ROTATION,
) )
if self.with_right_teleop_camera: if self.with_right_teleop_camera:
self.cameras["teleop_right"] = Reachy2CameraConfig( self.cameras["teleop_right"] = Reachy2CameraConfig(
name="teleop", name="teleop",
image_type="right", image_type="right",
ip_address=self.ip_address, ip_address=self.ip_address,
fps=30, fps=15,
width=640, width=640,
height=480, height=480,
color_mode=ColorMode.RGB, color_mode=ColorMode.RGB,
rotation=Cv2Rotation.NO_ROTATION,
) )
if self.with_torso_camera: if self.with_torso_camera:
self.cameras["torso_rgb"] = Reachy2CameraConfig( self.cameras["torso_rgb"] = Reachy2CameraConfig(
name="depth", name="depth",
image_type="rgb", image_type="rgb",
ip_address=self.ip_address, ip_address=self.ip_address,
fps=30, fps=15,
width=640, width=640,
height=480, height=480,
color_mode=ColorMode.RGB, color_mode=ColorMode.RGB,
rotation=Cv2Rotation.NO_ROTATION,
) )
super().__post_init__() super().__post_init__()