Run pre-commit hooks

This commit is contained in:
glannuzel
2025-08-22 10:42:27 +02:00
parent fd75203cfa
commit 6560f47d15
7 changed files with 71 additions and 93 deletions
+18 -14
View File
@@ -9,12 +9,12 @@
- **webrtc >= 2.0.1.1**
- Install [Reachy 2 teleoperation application](https://docs.pollen-robotics.com/teleoperation/teleoperation-introduction/discover-teleoperation/).
The teleoperation version must be **>=v1.2.0**
The teleoperation version must be **>=v1.2.0**
Currently, the teleoperation is done through Pollen Robotics teleoperation, which is not included in LeRobot.
We will work on teleoperators directly included soon!
Currently, the teleoperation is done through Pollen Robotics teleoperation, which is not included in LeRobot.
We will work on teleoperators directly included soon!
We advise to have one computer running the teleoperation, while another one is running the recording session with LeRobot.
We advise to have one computer running the teleoperation, while another one is running the recording session with LeRobot.
### Install LeRobot
@@ -28,15 +28,16 @@ pip install -e ".[reachy2]"
### Get Reachy 2 IP address
Before starting teleoperation and data recording, you need to find the [robot IP address](https://docs.pollen-robotics.com/getting-started/setup-reachy2/connect-reachy2/).
Before starting teleoperation and data recording, you need to find the [robot IP address](https://docs.pollen-robotics.com/getting-started/setup-reachy2/connect-reachy2/).
We strongly advise to get all your devices (computer, robot) plugged through ethernet cables.
### (Optional but recommended) Install pollen_data_acquisition_server
The installation of the data acquisition server depends on the way you decide to manage your recording sessions for Reachy 2.
The easier way in currently to use this server, so that you can control the session directly from the VR teleoperation application.
The installation of the data acquisition server depends on the way you decide to manage your recording sessions for Reachy 2.
The easier way in currently to use this server, so that you can control the session directly from the VR teleoperation application.
In your LeRobot environment, install the server from source:
```bash
git clone https://github.com/pollen-robotics/pollen_data_acquisition_server.git
cd pollen_data_acquisition_server
@@ -46,15 +47,16 @@ pip install -e .
## Step 1: Recording
There are two ways of managing recording sessions using the teleoeperation application for Reachy 2:
- using the LeRobot record script: LeRobot is in charge of the session management, and decides when to start and stop an episode. The teleoperation application is only used to control the robot's movements.
- using the included data acquisition server for Reachy 2 (recommended): the teleoperation application does manage the session management, requesting LeRobot to start or stop an episode, in addition to controlling the robot's movements.
### Option 1: Using lerobot.record
Reachy 2 is supported as the other robots by the record features.
Here is an example of record, without mobile base:
```bash
python -m lerobot.record \
--robot.type=reachy2 \
@@ -79,26 +81,28 @@ Then get into the teleoperation application and choose "Standard session".
### Option 2: Using Pollen data acquisition server (recommended)
Make sure you have installed pollen_data_acquisition_server, as explained in the Setup section.
Make sure you have installed pollen_data_acquisition_server, as explained in the Setup section.
Launch the data acquisition server to be able to manage your session directly from the teleoperation application:
```bash
python -m data_acquisition_server.server --display_data=true
```
Then get into the teleoperation application and choose "Data acquisition session".
Then get into the teleoperation application and choose "Data acquisition session".
You can then setup your session by following the screens displayed.
### Choose which parts to use!
From our first tests, we saw that recording all the joints with some policies while only some of them are moving does reduce the quality of the models.
That is why you can choose which parts to record / replay using the arguments `--robot.with_{part}=false`,
with part being one of : "mobile_base", "l_arm", "r_arm", "neck", "antennas".
From our first tests, we saw that recording all the joints with some policies while only some of them are moving does reduce the quality of the models.
That is why you can choose which parts to record / replay using the arguments `--robot.with_{part}=false`,
with part being one of : "mobile_base", "l_arm", "r_arm", "neck", "antennas".
By default, all parts are recorded.
You can do the same for the cameras, where only the teleop cameras are recording by default.
You can do the same for the cameras, where only the teleop cameras are recording by default.
An extended setup for Reachy 2 would look like:
```bash
python -m lerobot.record \
--robot.type=reachy2 \
@@ -14,7 +14,7 @@
from dataclasses import dataclass
from ..configs import CameraConfig, ColorMode, Cv2Rotation
from ..configs import CameraConfig, ColorMode
@CameraConfig.register_subclass("reachy2_camera")
@@ -30,14 +30,14 @@ class Reachy2CameraConfig(CameraConfig):
```python
# Basic configurations
Reachy2CameraConfig(
name="teleop",
image_type="left",
ip_address="192.168.0.200", # IP address of the robot
fps=15,
width=640,
height=480,
color_mode=ColorMode.RGB,
) # Left teleop camera, 640x480 @ 15FPS
name="teleop",
image_type="left",
ip_address="192.168.0.200", # IP address of the robot
fps=15,
width=640,
height=480,
color_mode=ColorMode.RGB,
) # Left teleop camera, 640x480 @ 15FPS
```
Attributes:
@@ -75,4 +75,6 @@ class Reachy2CameraConfig(CameraConfig):
)
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided.")
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
@@ -24,17 +24,15 @@ from threading import Event, Lock, Thread
from typing import Any
# Fix MSMF hardware transform compatibility for Windows before importing cv2
if (
platform.system() == "Windows"
and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ
):
if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ:
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2
import numpy as np
from lerobot.errors import DeviceNotConnectedError
from reachy2_sdk.media.camera import CameraView
from reachy2_sdk.media.camera_manager import CameraManager
from lerobot.errors import DeviceNotConnectedError
from ..camera import Camera
from .configuration_reachy2_camera import ColorMode, Reachy2CameraConfig
@@ -77,9 +75,7 @@ class Reachy2Camera(Camera):
self.new_frame_event: Event = Event()
def __str__(self) -> str:
return (
f"{self.__class__.__name__}({self.config.name}, {self.config.image_type})"
)
return f"{self.__class__.__name__}({self.config.name}, {self.config.image_type})"
@property
def is_connected(self) -> bool:
@@ -94,18 +90,14 @@ class Reachy2Camera(Camera):
"""
Connects to the Reachy2 CameraManager as specified in the configuration.
"""
self.cam_manager = CameraManager(
host=self.config.ip_address, port=self.config.port
)
self.cam_manager = CameraManager(host=self.config.ip_address, port=self.config.port)
self.cam_manager.initialize_cameras()
logger.info(f"{self} connected.")
print(f"{self} connected.")
@staticmethod
def find_cameras(
ip_address: str = "localhost", port: int = 50065
) -> list[dict[str, Any]]:
def find_cameras(ip_address: str = "localhost", port: int = 50065) -> list[dict[str, Any]]:
"""
Detects available Reachy 2 cameras.
@@ -162,13 +154,9 @@ class Reachy2Camera(Camera):
if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"):
if self.config.image_type == "left":
frame = self.cam_manager.teleop.get_frame(
CameraView.LEFT, size=(640, 480)
)[0]
frame = self.cam_manager.teleop.get_frame(CameraView.LEFT, size=(640, 480))[0]
elif self.config.image_type == "right":
frame = self.cam_manager.teleop.get_frame(
CameraView.RIGHT, size=(640, 480)
)[0]
frame = self.cam_manager.teleop.get_frame(CameraView.RIGHT, size=(640, 480))[0]
elif self.config.name == "depth" and hasattr(self.cam_manager, "depth"):
if self.config.image_type == "depth":
frame = self.cam_manager.depth.get_depth_frame()[0]
@@ -208,9 +196,7 @@ class Reachy2Camera(Camera):
except DeviceNotConnectedError:
break
except Exception as e:
logger.warning(
f"Error reading frame in background thread for {self}: {e}"
)
logger.warning(f"Error reading frame in background thread for {self}: {e}")
def _start_read_thread(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
@@ -274,9 +260,7 @@ class Reachy2Camera(Camera):
self.new_frame_event.clear()
if frame is None:
raise RuntimeError(
f"Internal error: Event set but no frame available for {self}."
)
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
return frame
@@ -1,5 +1,3 @@
import time
from lerobot.cameras.reachy2_camera import Reachy2Camera, Reachy2CameraConfig
camera_config = Reachy2CameraConfig(name="teleop", image_type="left")
@@ -13,10 +13,9 @@
# limitations under the License.
from dataclasses import dataclass
from typing import Dict
from lerobot.cameras import CameraConfig
from lerobot.cameras.configs import ColorMode, Cv2Rotation
from lerobot.cameras.configs import ColorMode
from lerobot.cameras.reachy2_camera import Reachy2CameraConfig
from ..config import RobotConfig
@@ -54,7 +53,7 @@ class Reachy2RobotConfig(RobotConfig):
def __post_init__(self) -> None:
# Add cameras
self.cameras: Dict[str, CameraConfig] = {}
self.cameras: dict[str, CameraConfig] = {}
if self.with_left_teleop_camera:
self.cameras["teleop_left"] = Reachy2CameraConfig(
name="teleop",
+19 -30
View File
@@ -15,12 +15,13 @@
# limitations under the License.
import time
from typing import Any, Dict, Optional, Tuple
from typing import Any
import numpy as np
from lerobot.cameras.utils import make_cameras_from_configs
from reachy2_sdk import ReachySDK
from lerobot.cameras.utils import make_cameras_from_configs
from ..robot import Robot
from .configuration_reachy2 import Reachy2RobotConfig
@@ -83,27 +84,24 @@ class Reachy2Robot(Robot):
self.reachy: None | ReachySDK = None
self.cameras = make_cameras_from_configs(config.cameras)
self.logs: Dict[str, float] = {}
self.logs: dict[str, float] = {}
self.joints_dict: Dict[str, str] = self._generate_joints_dict()
self.joints_dict: dict[str, str] = self._generate_joints_dict()
@property
def observation_features(self) -> Dict[str, Any]:
def observation_features(self) -> dict[str, Any]:
return {**self.motors_features, **self.camera_features}
@property
def action_features(self) -> Dict[str, type]:
def action_features(self) -> dict[str, type]:
return self.motors_features
@property
def camera_features(self) -> Dict[str, Tuple[Optional[int], Optional[int], int]]:
return {
cam: (self.cameras[cam].height, self.cameras[cam].width, 3)
for cam in self.cameras
}
def camera_features(self) -> dict[str, tuple[int | None, int | None, int]]:
return {cam: (self.cameras[cam].height, self.cameras[cam].width, 3) for cam in self.cameras}
@property
def motors_features(self) -> Dict[str, type]:
def motors_features(self) -> dict[str, type]:
if self.config.with_mobile_base:
return {
**dict.fromkeys(
@@ -144,7 +142,7 @@ class Reachy2Robot(Robot):
def calibrate(self) -> None:
pass
def _generate_joints_dict(self) -> Dict[str, str]:
def _generate_joints_dict(self) -> dict[str, str]:
self.joints = {}
if self.config.with_neck:
self.joints.update(REACHY2_NECK_JOINTS)
@@ -156,23 +154,18 @@ class Reachy2Robot(Robot):
self.joints.update(REACHY2_ANTENNAS_JOINTS)
return self.joints
def _get_state(self) -> Dict[str, float]:
def _get_state(self) -> dict[str, float]:
if self.reachy is not None:
pos_dict = {
k: self.reachy.joints[v].present_position
for k, v in self.joints_dict.items()
}
pos_dict = {k: self.reachy.joints[v].present_position for k, v in self.joints_dict.items()}
if not self.config.with_mobile_base:
return pos_dict
vel_dict = {
k: self.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items()
}
vel_dict = {k: self.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items()}
return {**pos_dict, **vel_dict}
else:
return {}
def get_observation(self) -> Dict[str, np.ndarray]:
obs_dict: Dict[str, Any] = {}
def get_observation(self) -> dict[str, np.ndarray]:
obs_dict: dict[str, Any] = {}
# Read Reachy 2 state
before_read_t = time.perf_counter()
@@ -185,7 +178,7 @@ class Reachy2Robot(Robot):
return obs_dict
def send_action(self, action: Dict[str, Any]) -> Dict[str, Any]:
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
if self.reachy is not None:
if not self.is_connected:
raise ConnectionError()
@@ -196,18 +189,14 @@ class Reachy2Robot(Robot):
for key, val in action.items():
if key not in self.joints_dict:
if key not in REACHY2_VEL:
raise KeyError(
f"Key '{key}' is not a valid motor key in Reachy 2."
)
raise KeyError(f"Key '{key}' is not a valid motor key in Reachy 2.")
else:
vel[REACHY2_VEL[key]] = val
else:
self.reachy.joints[self.joints_dict[key]].goal_position = float(val)
if self.config.with_mobile_base:
self.reachy.mobile_base.set_goal_speed(
vel["vx"], vel["vy"], vel["vtheta"]
)
self.reachy.mobile_base.set_goal_speed(vel["vx"], vel["vy"], vel["vtheta"])
# We don't send the goal positions if we control Reachy 2 externally
if not self.use_external_commands:
@@ -25,7 +25,6 @@ import time
# )
from reachy2_sdk import ReachySDK
from ..teleoperator import Teleoperator
from .config_reachy2_fake_teleoperator import Reachy2FakeTeleoperatorConfig
@@ -102,13 +101,16 @@ class Reachy2FakeTeleoperator(Teleoperator):
@property
def action_features(self) -> dict[str, type]:
if self.config.with_mobile_base:
return {**dict.fromkeys(
self.joints_dict.keys(),
float,
), **dict.fromkeys(
REACHY2_VEL.keys(),
float,
)}
return {
**dict.fromkeys(
self.joints_dict.keys(),
float,
),
**dict.fromkeys(
REACHY2_VEL.keys(),
float,
),
}
else:
return dict.fromkeys(self.joints_dict.keys(), float)