From 6560f47d1542d38d1cbd57ffe5a2e26a70c4e333 Mon Sep 17 00:00:00 2001 From: glannuzel Date: Fri, 22 Aug 2025 10:42:27 +0200 Subject: [PATCH] Run pre-commit hooks --- docs/source/reachy2.mdx | 32 ++++++------ .../configuration_reachy2_camera.py | 22 +++++---- .../cameras/reachy2_camera/reachy2_camera.py | 36 ++++---------- .../reachy2_camera/test_reachy2_camera.py | 2 - .../robots/reachy2/configuration_reachy2.py | 5 +- src/lerobot/robots/reachy2/robot_reachy2.py | 49 +++++++------------ .../reachy2_fake_teleoperator.py | 18 ++++--- 7 files changed, 71 insertions(+), 93 deletions(-) diff --git a/docs/source/reachy2.mdx b/docs/source/reachy2.mdx index f56f23b08..33219675c 100644 --- a/docs/source/reachy2.mdx +++ b/docs/source/reachy2.mdx @@ -9,12 +9,12 @@ - **webrtc >= 2.0.1.1** - Install [Reachy 2 teleoperation application](https://docs.pollen-robotics.com/teleoperation/teleoperation-introduction/discover-teleoperation/). -The teleoperation version must be **>=v1.2.0** + The teleoperation version must be **>=v1.2.0** -Currently, the teleoperation is done through Pollen Robotics teleoperation, which is not included in LeRobot. -We will work on teleoperators directly included soon! +Currently, the teleoperation is done through Pollen Robotics teleoperation, which is not included in LeRobot. +We will work on teleoperators directly included soon! -We advise to have one computer running the teleoperation, while another one is running the recording session with LeRobot. +We advise to have one computer running the teleoperation, while another one is running the recording session with LeRobot. ### Install LeRobot @@ -28,15 +28,16 @@ pip install -e ".[reachy2]" ### Get Reachy 2 IP address -Before starting teleoperation and data recording, you need to find the [robot IP address](https://docs.pollen-robotics.com/getting-started/setup-reachy2/connect-reachy2/). +Before starting teleoperation and data recording, you need to find the [robot IP address](https://docs.pollen-robotics.com/getting-started/setup-reachy2/connect-reachy2/). We strongly advise to get all your devices (computer, robot) plugged through ethernet cables. ### (Optional but recommended) Install pollen_data_acquisition_server -The installation of the data acquisition server depends on the way you decide to manage your recording sessions for Reachy 2. -The easier way in currently to use this server, so that you can control the session directly from the VR teleoperation application. +The installation of the data acquisition server depends on the way you decide to manage your recording sessions for Reachy 2. +The easier way in currently to use this server, so that you can control the session directly from the VR teleoperation application. In your LeRobot environment, install the server from source: + ```bash git clone https://github.com/pollen-robotics/pollen_data_acquisition_server.git cd pollen_data_acquisition_server @@ -46,15 +47,16 @@ pip install -e . ## Step 1: Recording There are two ways of managing recording sessions using the teleoeperation application for Reachy 2: + - using the LeRobot record script: LeRobot is in charge of the session management, and decides when to start and stop an episode. The teleoperation application is only used to control the robot's movements. - using the included data acquisition server for Reachy 2 (recommended): the teleoperation application does manage the session management, requesting LeRobot to start or stop an episode, in addition to controlling the robot's movements. - ### Option 1: Using lerobot.record Reachy 2 is supported as the other robots by the record features. Here is an example of record, without mobile base: + ```bash python -m lerobot.record \ --robot.type=reachy2 \ @@ -79,26 +81,28 @@ Then get into the teleoperation application and choose "Standard session". ### Option 2: Using Pollen data acquisition server (recommended) -Make sure you have installed pollen_data_acquisition_server, as explained in the Setup section. +Make sure you have installed pollen_data_acquisition_server, as explained in the Setup section. Launch the data acquisition server to be able to manage your session directly from the teleoperation application: + ```bash python -m data_acquisition_server.server --display_data=true ``` -Then get into the teleoperation application and choose "Data acquisition session". +Then get into the teleoperation application and choose "Data acquisition session". You can then setup your session by following the screens displayed. ### Choose which parts to use! -From our first tests, we saw that recording all the joints with some policies while only some of them are moving does reduce the quality of the models. -That is why you can choose which parts to record / replay using the arguments `--robot.with_{part}=false`, -with part being one of : "mobile_base", "l_arm", "r_arm", "neck", "antennas". +From our first tests, we saw that recording all the joints with some policies while only some of them are moving does reduce the quality of the models. +That is why you can choose which parts to record / replay using the arguments `--robot.with_{part}=false`, +with part being one of : "mobile_base", "l_arm", "r_arm", "neck", "antennas". By default, all parts are recorded. -You can do the same for the cameras, where only the teleop cameras are recording by default. +You can do the same for the cameras, where only the teleop cameras are recording by default. An extended setup for Reachy 2 would look like: + ```bash python -m lerobot.record \ --robot.type=reachy2 \ diff --git a/src/lerobot/cameras/reachy2_camera/configuration_reachy2_camera.py b/src/lerobot/cameras/reachy2_camera/configuration_reachy2_camera.py index 7164b26b1..80aeefb8a 100644 --- a/src/lerobot/cameras/reachy2_camera/configuration_reachy2_camera.py +++ b/src/lerobot/cameras/reachy2_camera/configuration_reachy2_camera.py @@ -14,7 +14,7 @@ from dataclasses import dataclass -from ..configs import CameraConfig, ColorMode, Cv2Rotation +from ..configs import CameraConfig, ColorMode @CameraConfig.register_subclass("reachy2_camera") @@ -30,14 +30,14 @@ class Reachy2CameraConfig(CameraConfig): ```python # Basic configurations Reachy2CameraConfig( - name="teleop", - image_type="left", - ip_address="192.168.0.200", # IP address of the robot - fps=15, - width=640, - height=480, - color_mode=ColorMode.RGB, - ) # Left teleop camera, 640x480 @ 15FPS + name="teleop", + image_type="left", + ip_address="192.168.0.200", # IP address of the robot + fps=15, + width=640, + height=480, + color_mode=ColorMode.RGB, + ) # Left teleop camera, 640x480 @ 15FPS ``` Attributes: @@ -75,4 +75,6 @@ class Reachy2CameraConfig(CameraConfig): ) if self.color_mode not in ["rgb", "bgr"]: - raise ValueError(f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided.") + raise ValueError( + f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided." + ) diff --git a/src/lerobot/cameras/reachy2_camera/reachy2_camera.py b/src/lerobot/cameras/reachy2_camera/reachy2_camera.py index 2cefe559e..9ca293543 100644 --- a/src/lerobot/cameras/reachy2_camera/reachy2_camera.py +++ b/src/lerobot/cameras/reachy2_camera/reachy2_camera.py @@ -24,17 +24,15 @@ from threading import Event, Lock, Thread from typing import Any # Fix MSMF hardware transform compatibility for Windows before importing cv2 -if ( - platform.system() == "Windows" - and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ -): +if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ: os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0" import cv2 import numpy as np -from lerobot.errors import DeviceNotConnectedError from reachy2_sdk.media.camera import CameraView from reachy2_sdk.media.camera_manager import CameraManager +from lerobot.errors import DeviceNotConnectedError + from ..camera import Camera from .configuration_reachy2_camera import ColorMode, Reachy2CameraConfig @@ -77,9 +75,7 @@ class Reachy2Camera(Camera): self.new_frame_event: Event = Event() def __str__(self) -> str: - return ( - f"{self.__class__.__name__}({self.config.name}, {self.config.image_type})" - ) + return f"{self.__class__.__name__}({self.config.name}, {self.config.image_type})" @property def is_connected(self) -> bool: @@ -94,18 +90,14 @@ class Reachy2Camera(Camera): """ Connects to the Reachy2 CameraManager as specified in the configuration. """ - self.cam_manager = CameraManager( - host=self.config.ip_address, port=self.config.port - ) + self.cam_manager = CameraManager(host=self.config.ip_address, port=self.config.port) self.cam_manager.initialize_cameras() logger.info(f"{self} connected.") print(f"{self} connected.") @staticmethod - def find_cameras( - ip_address: str = "localhost", port: int = 50065 - ) -> list[dict[str, Any]]: + def find_cameras(ip_address: str = "localhost", port: int = 50065) -> list[dict[str, Any]]: """ Detects available Reachy 2 cameras. @@ -162,13 +154,9 @@ class Reachy2Camera(Camera): if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"): if self.config.image_type == "left": - frame = self.cam_manager.teleop.get_frame( - CameraView.LEFT, size=(640, 480) - )[0] + frame = self.cam_manager.teleop.get_frame(CameraView.LEFT, size=(640, 480))[0] elif self.config.image_type == "right": - frame = self.cam_manager.teleop.get_frame( - CameraView.RIGHT, size=(640, 480) - )[0] + frame = self.cam_manager.teleop.get_frame(CameraView.RIGHT, size=(640, 480))[0] elif self.config.name == "depth" and hasattr(self.cam_manager, "depth"): if self.config.image_type == "depth": frame = self.cam_manager.depth.get_depth_frame()[0] @@ -208,9 +196,7 @@ class Reachy2Camera(Camera): except DeviceNotConnectedError: break except Exception as e: - logger.warning( - f"Error reading frame in background thread for {self}: {e}" - ) + logger.warning(f"Error reading frame in background thread for {self}: {e}") def _start_read_thread(self) -> None: """Starts or restarts the background read thread if it's not running.""" @@ -274,9 +260,7 @@ class Reachy2Camera(Camera): self.new_frame_event.clear() if frame is None: - raise RuntimeError( - f"Internal error: Event set but no frame available for {self}." - ) + raise RuntimeError(f"Internal error: Event set but no frame available for {self}.") return frame diff --git a/src/lerobot/cameras/reachy2_camera/test_reachy2_camera.py b/src/lerobot/cameras/reachy2_camera/test_reachy2_camera.py index ff2764594..e74c1f46c 100644 --- a/src/lerobot/cameras/reachy2_camera/test_reachy2_camera.py +++ b/src/lerobot/cameras/reachy2_camera/test_reachy2_camera.py @@ -1,5 +1,3 @@ -import time - from lerobot.cameras.reachy2_camera import Reachy2Camera, Reachy2CameraConfig camera_config = Reachy2CameraConfig(name="teleop", image_type="left") diff --git a/src/lerobot/robots/reachy2/configuration_reachy2.py b/src/lerobot/robots/reachy2/configuration_reachy2.py index 63bbd2aa2..432e48144 100644 --- a/src/lerobot/robots/reachy2/configuration_reachy2.py +++ b/src/lerobot/robots/reachy2/configuration_reachy2.py @@ -13,10 +13,9 @@ # limitations under the License. from dataclasses import dataclass -from typing import Dict from lerobot.cameras import CameraConfig -from lerobot.cameras.configs import ColorMode, Cv2Rotation +from lerobot.cameras.configs import ColorMode from lerobot.cameras.reachy2_camera import Reachy2CameraConfig from ..config import RobotConfig @@ -54,7 +53,7 @@ class Reachy2RobotConfig(RobotConfig): def __post_init__(self) -> None: # Add cameras - self.cameras: Dict[str, CameraConfig] = {} + self.cameras: dict[str, CameraConfig] = {} if self.with_left_teleop_camera: self.cameras["teleop_left"] = Reachy2CameraConfig( name="teleop", diff --git a/src/lerobot/robots/reachy2/robot_reachy2.py b/src/lerobot/robots/reachy2/robot_reachy2.py index 2190e14d8..bf60f3014 100644 --- a/src/lerobot/robots/reachy2/robot_reachy2.py +++ b/src/lerobot/robots/reachy2/robot_reachy2.py @@ -15,12 +15,13 @@ # limitations under the License. import time -from typing import Any, Dict, Optional, Tuple +from typing import Any import numpy as np -from lerobot.cameras.utils import make_cameras_from_configs from reachy2_sdk import ReachySDK +from lerobot.cameras.utils import make_cameras_from_configs + from ..robot import Robot from .configuration_reachy2 import Reachy2RobotConfig @@ -83,27 +84,24 @@ class Reachy2Robot(Robot): self.reachy: None | ReachySDK = None self.cameras = make_cameras_from_configs(config.cameras) - self.logs: Dict[str, float] = {} + self.logs: dict[str, float] = {} - self.joints_dict: Dict[str, str] = self._generate_joints_dict() + self.joints_dict: dict[str, str] = self._generate_joints_dict() @property - def observation_features(self) -> Dict[str, Any]: + def observation_features(self) -> dict[str, Any]: return {**self.motors_features, **self.camera_features} @property - def action_features(self) -> Dict[str, type]: + def action_features(self) -> dict[str, type]: return self.motors_features @property - def camera_features(self) -> Dict[str, Tuple[Optional[int], Optional[int], int]]: - return { - cam: (self.cameras[cam].height, self.cameras[cam].width, 3) - for cam in self.cameras - } + def camera_features(self) -> dict[str, tuple[int | None, int | None, int]]: + return {cam: (self.cameras[cam].height, self.cameras[cam].width, 3) for cam in self.cameras} @property - def motors_features(self) -> Dict[str, type]: + def motors_features(self) -> dict[str, type]: if self.config.with_mobile_base: return { **dict.fromkeys( @@ -144,7 +142,7 @@ class Reachy2Robot(Robot): def calibrate(self) -> None: pass - def _generate_joints_dict(self) -> Dict[str, str]: + def _generate_joints_dict(self) -> dict[str, str]: self.joints = {} if self.config.with_neck: self.joints.update(REACHY2_NECK_JOINTS) @@ -156,23 +154,18 @@ class Reachy2Robot(Robot): self.joints.update(REACHY2_ANTENNAS_JOINTS) return self.joints - def _get_state(self) -> Dict[str, float]: + def _get_state(self) -> dict[str, float]: if self.reachy is not None: - pos_dict = { - k: self.reachy.joints[v].present_position - for k, v in self.joints_dict.items() - } + pos_dict = {k: self.reachy.joints[v].present_position for k, v in self.joints_dict.items()} if not self.config.with_mobile_base: return pos_dict - vel_dict = { - k: self.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items() - } + vel_dict = {k: self.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items()} return {**pos_dict, **vel_dict} else: return {} - def get_observation(self) -> Dict[str, np.ndarray]: - obs_dict: Dict[str, Any] = {} + def get_observation(self) -> dict[str, np.ndarray]: + obs_dict: dict[str, Any] = {} # Read Reachy 2 state before_read_t = time.perf_counter() @@ -185,7 +178,7 @@ class Reachy2Robot(Robot): return obs_dict - def send_action(self, action: Dict[str, Any]) -> Dict[str, Any]: + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: if self.reachy is not None: if not self.is_connected: raise ConnectionError() @@ -196,18 +189,14 @@ class Reachy2Robot(Robot): for key, val in action.items(): if key not in self.joints_dict: if key not in REACHY2_VEL: - raise KeyError( - f"Key '{key}' is not a valid motor key in Reachy 2." - ) + raise KeyError(f"Key '{key}' is not a valid motor key in Reachy 2.") else: vel[REACHY2_VEL[key]] = val else: self.reachy.joints[self.joints_dict[key]].goal_position = float(val) if self.config.with_mobile_base: - self.reachy.mobile_base.set_goal_speed( - vel["vx"], vel["vy"], vel["vtheta"] - ) + self.reachy.mobile_base.set_goal_speed(vel["vx"], vel["vy"], vel["vtheta"]) # We don't send the goal positions if we control Reachy 2 externally if not self.use_external_commands: diff --git a/src/lerobot/teleoperators/reachy2_fake_teleoperator/reachy2_fake_teleoperator.py b/src/lerobot/teleoperators/reachy2_fake_teleoperator/reachy2_fake_teleoperator.py index c77bc0ed4..b790e9f4d 100644 --- a/src/lerobot/teleoperators/reachy2_fake_teleoperator/reachy2_fake_teleoperator.py +++ b/src/lerobot/teleoperators/reachy2_fake_teleoperator/reachy2_fake_teleoperator.py @@ -25,7 +25,6 @@ import time # ) from reachy2_sdk import ReachySDK - from ..teleoperator import Teleoperator from .config_reachy2_fake_teleoperator import Reachy2FakeTeleoperatorConfig @@ -102,13 +101,16 @@ class Reachy2FakeTeleoperator(Teleoperator): @property def action_features(self) -> dict[str, type]: if self.config.with_mobile_base: - return {**dict.fromkeys( - self.joints_dict.keys(), - float, - ), **dict.fromkeys( - REACHY2_VEL.keys(), - float, - )} + return { + **dict.fromkeys( + self.joints_dict.keys(), + float, + ), + **dict.fromkeys( + REACHY2_VEL.keys(), + float, + ), + } else: return dict.fromkeys(self.joints_dict.keys(), float)