Clean code

This commit is contained in:
glannuzel
2025-08-21 15:06:12 +02:00
parent 67acccf778
commit b07c3b271c
5 changed files with 37 additions and 19 deletions
@@ -12,5 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .reachy2_camera import Reachy2Camera
from .configuration_reachy2_camera import Reachy2CameraConfig
from .reachy2_camera import Reachy2Camera
@@ -60,6 +60,4 @@ class Reachy2CameraConfig(CameraConfig):
def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
raise ValueError(f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided.")
@@ -26,13 +26,14 @@ from threading import Event, Lock, Thread
from typing import Any
# Fix MSMF hardware transform compatibility for Windows before importing cv2
if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ:
if (
platform.system() == "Windows"
and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ
):
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2
import numpy as np
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from reachy2_sdk import ReachySDK
from reachy2_sdk.media.camera import CameraView
from reachy2_sdk.media.camera_manager import CameraManager
@@ -40,7 +41,6 @@ from reachy2_sdk.media.camera_manager import CameraManager
from ..camera import Camera
from .configuration_reachy2_camera import ColorMode, Reachy2CameraConfig
logger = logging.getLogger(__name__)
@@ -49,7 +49,7 @@ class Reachy2Camera(Camera):
Manages Reachy 2 camera using Reachy 2 CameraManager.
This class provides a high-level interface to connect to, configure, and read
frames from Reachy 2 cameras. It supports both synchronous and asynchronous
frames from Reachy 2 cameras. It supports both synchronous and asynchronous
frame reading.
An Reachy2Camera instance requires a camera name (e.g., "teleop") and an image
@@ -82,12 +82,22 @@ class Reachy2Camera(Camera):
self.new_frame_event: Event = Event()
def __str__(self) -> str:
return f"{self.__class__.__name__}({self.config.name}, {self.config.image_type})"
return (
f"{self.__class__.__name__}({self.config.name}, {self.config.image_type})"
)
@property
def is_connected(self) -> bool:
"""Checks if the camera is currently connected and opened."""
return (self.reachy.is_connected() and self.reachy.cameras.teleop is not None and self.reachy.cameras.depth is not None) if self.reachy is not None else False
return (
(
self.reachy.is_connected()
and self.reachy.cameras.teleop is not None
and self.reachy.cameras.depth is not None
)
if self.reachy is not None
else False
)
def connect(self, warmup: bool = True):
"""
@@ -95,7 +105,9 @@ class Reachy2Camera(Camera):
"""
self.reachy = ReachySDK(self.config.ip_address)
self.cam_manager = CameraManager(host=self.config.ip_address, port=self.config.port)
self.cam_manager = CameraManager(
host=self.config.ip_address, port=self.config.port
)
self.cam_manager.initialize_cameras()
logger.info(f"{self} connected.")
@@ -139,9 +151,13 @@ class Reachy2Camera(Camera):
if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"):
if self.config.image_type == "left":
frame = self.cam_manager.teleop.get_frame(CameraView.LEFT, size=(640, 480))[0]
frame = self.cam_manager.teleop.get_frame(
CameraView.LEFT, size=(640, 480)
)[0]
elif self.config.image_type == "right":
frame = self.cam_manager.teleop.get_frame(CameraView.RIGHT, size=(640, 480))[0]
frame = self.cam_manager.teleop.get_frame(
CameraView.RIGHT, size=(640, 480)
)[0]
elif self.config.name == "depth" and hasattr(self.cam_manager, "depth"):
if self.config.image_type == "depth":
frame = self.cam_manager.depth.get_depth_frame()[0]
@@ -181,7 +197,9 @@ class Reachy2Camera(Camera):
except DeviceNotConnectedError:
break
except Exception as e:
logger.warning(f"Error reading frame in background thread for {self}: {e}")
logger.warning(
f"Error reading frame in background thread for {self}: {e}"
)
def _start_read_thread(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
@@ -245,7 +263,9 @@ class Reachy2Camera(Camera):
self.new_frame_event.clear()
if frame is None:
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
raise RuntimeError(
f"Internal error: Event set but no frame available for {self}."
)
return frame
@@ -1,6 +1,6 @@
from lerobot.cameras.reachy2_camera import Reachy2Camera, Reachy2CameraConfig
import time
from lerobot.cameras.reachy2_camera import Reachy2Camera, Reachy2CameraConfig
camera_config = Reachy2CameraConfig(name="teleop", image_type="left")
camera = Reachy2Camera(camera_config)
+2 -2
View File
@@ -220,7 +220,7 @@ class Reachy2Robot(Robot):
def disconnect(self) -> None:
if self.reachy is not None:
for cam in self.cameras.values():
cam.disconnect()
self.reachy.turn_off_smoothly()
self.reachy.disconnect()
for cam in self.cameras.values():
cam.disconnect()