Update reachy2_cameras depth + CameraManager

This commit is contained in:
glannuzel
2025-08-26 18:35:31 +02:00
parent ecca518072
commit 4259c38856
3 changed files with 32 additions and 39 deletions
@@ -23,7 +23,7 @@ class Reachy2CameraConfig(CameraConfig):
"""Configuration class for Reachy 2 camera devices.
This class provides configuration options for Reachy 2 cameras,
supporting both the teleop and torso cameras. It includes settings
supporting both the teleop and depth cameras. It includes settings
for resolution, frame rate, color mode, and the selection of the cameras.
Example configurations:
@@ -41,9 +41,9 @@ class Reachy2CameraConfig(CameraConfig):
```
Attributes:
name: Name of the camera device. Can be "teleop" or "torso".
name: Name of the camera device. Can be "teleop" or "depth".
image_type: Type of image stream. For "teleop" camera, can be "left" or "right".
For "torso" camera, can be "rgb" or "depth". (depth is not supported yet)
For "depth" camera, can be "rgb" or "depth". (depth is not supported yet)
fps: Requested frames per second for the color stream.
width: Requested frame width in pixels for the color stream.
height: Requested frame height in pixels for the color stream.
@@ -63,15 +63,15 @@ class Reachy2CameraConfig(CameraConfig):
# use_depth: bool = False
def __post_init__(self):
if self.name not in ["teleop", "torso"]:
raise ValueError(f"`name` is expected to be 'teleop' or 'torso', but {self.name} is provided.")
if self.name not in ["teleop", "depth"]:
raise ValueError(f"`name` is expected to be 'teleop' or 'depth', but {self.name} is provided.")
if (
(self.name == "teleop" and self.image_type not in ["left", "right"])
or (self.name == "torso")
or (self.name == "depth")
and self.image_type not in ["rgb", "depth"]
):
raise ValueError(
f"`image_type` is expected to be 'left' or 'right' for teleop camera, and 'rgb' or 'depth' for torso camera, but {self.image_type} is provided."
f"`image_type` is expected to be 'left' or 'right' for teleop camera, and 'rgb' or 'depth' for depth camera, but {self.image_type} is provided."
)
if self.color_mode not in ["rgb", "bgr"]:
@@ -68,6 +68,8 @@ class Reachy2Camera(Camera):
self.fps = config.fps
self.color_mode = config.color_mode
self.cam_manager: CameraManager | None = None
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.frame_lock: Lock = Lock()
@@ -80,11 +82,12 @@ class Reachy2Camera(Camera):
@property
def is_connected(self) -> bool:
"""Checks if the camera is currently connected and opened."""
return (
(self.cam_manager.teleop is not None and self.cam_manager.depth is not None)
if self.cam_manager is not None
else False
)
if self.config.name == "teleop":
return self.cam_manager._grpc_connected and self.cam_manager.teleop if self.cam_manager else False
elif self.config.name == "depth":
return self.cam_manager._grpc_connected and self.cam_manager.depth if self.cam_manager else False
else:
raise ValueError(f"Invalid camera name '{self.config.name}'. Expected 'teleop' or 'depth'.")
def connect(self, warmup: bool = True):
"""
@@ -94,7 +97,6 @@ class Reachy2Camera(Camera):
self.cam_manager.initialize_cameras()
logger.info(f"{self} connected.")
print(f"{self} connected.")
@staticmethod
def find_cameras(ip_address: str = "localhost", port: int = 50065) -> list[dict[str, Any]]:
@@ -152,22 +154,25 @@ class Reachy2Camera(Camera):
frame = None
if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"):
if self.config.image_type == "left":
frame = self.cam_manager.teleop.get_frame(CameraView.LEFT, size=(640, 480))[0]
elif self.config.image_type == "right":
frame = self.cam_manager.teleop.get_frame(CameraView.RIGHT, size=(640, 480))[0]
elif self.config.name == "depth" and hasattr(self.cam_manager, "depth"):
if self.config.image_type == "depth":
frame = self.cam_manager.depth.get_depth_frame()[0]
elif self.config.image_type == "rgb":
frame = self.cam_manager.depth.get_frame(size=(640, 480))[0]
if self.cam_manager is None:
raise DeviceNotConnectedError(f"{self} is not connected.")
else:
if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"):
if self.config.image_type == "left":
frame = self.cam_manager.teleop.get_frame(CameraView.LEFT, size=(640, 480))[0]
elif self.config.image_type == "right":
frame = self.cam_manager.teleop.get_frame(CameraView.RIGHT, size=(640, 480))[0]
elif self.config.name == "depth" and hasattr(self.cam_manager, "depth"):
if self.config.image_type == "depth":
frame = self.cam_manager.depth.get_depth_frame()[0]
elif self.config.image_type == "rgb":
frame = self.cam_manager.depth.get_frame(size=(640, 480))[0]
if frame is None:
return np.empty((0, 0, 3), dtype=np.uint8)
if frame is None:
return np.empty((0, 0, 3), dtype=np.uint8)
if self.config.color_mode == "rgb":
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if self.config.color_mode == "rgb":
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} read took: {read_duration_ms:.1f}ms")
@@ -1,12 +0,0 @@
from lerobot.cameras.reachy2_camera import Reachy2Camera, Reachy2CameraConfig
camera_config = Reachy2CameraConfig(name="teleop", image_type="left")
camera = Reachy2Camera(camera_config)
camera.connect()
frame = camera.read()
print(frame)
frame_async = camera.async_read()
print(frame_async)