fix(robots): add reachy2 fixes (#2783)

* fix(robots): add reachy2 fixes

* tests(robots): remove reachy sdk stub
This commit is contained in:
Steven Palma
2026-01-12 18:05:16 +01:00
committed by GitHub
parent 6b8d4c75a6
commit b8ec1152d4
15 changed files with 165 additions and 246 deletions
+34 -19
View File
@@ -38,6 +38,7 @@ docker run --rm -it \
start_rviz:=true start_sdk_server:=true mujoco:=true
```
> [!NOTE]
> If MuJoCo runs slowly (low simulation frequency), append `-e LD_LIBRARY_PATH="/opt/host-libs:$LD_LIBRARY_PATH" \` to the previous command to improve performance:
>
> ```
@@ -141,7 +142,7 @@ If you choose this option but still want to use the VR teleoperation application
First add reachy2 and reachy2_teleoperator to the imports of the record script. Then you can use the following command:
```bash
python -m lerobot.record \
lerobot-record \
--robot.type=reachy2 \
--robot.ip_address=192.168.0.200 \
--robot.id=r2-0000 \
@@ -150,6 +151,7 @@ python -m lerobot.record \
--teleop.type=reachy2_teleoperator \
--teleop.ip_address=192.168.0.200 \
--teleop.with_mobile_base=false \
--robot.with_torso_camera=true \
--dataset.repo_id=pollen_robotics/record_test \
--dataset.single_task="Reachy 2 recording test" \
--dataset.num_episodes=1 \
@@ -165,7 +167,7 @@ python -m lerobot.record \
**Extended setup overview (all options included):**
```bash
python -m lerobot.record \
lerobot-record \
--robot.type=reachy2 \
--robot.ip_address=192.168.0.200 \
--robot.use_external_commands=true \
@@ -177,6 +179,8 @@ python -m lerobot.record \
--robot.with_left_teleop_camera=true \
--robot.with_right_teleop_camera=true \
--robot.with_torso_camera=false \
--robot.camera_width=640 \
--robot.camera_height=480 \
--robot.disable_torque_on_disconnect=false \
--robot.max_relative_target=5.0 \
--teleop.type=reachy2_teleoperator \
@@ -212,9 +216,10 @@ Must be set to true if a compliant Reachy 2 is used to control another one.
From our initial tests, recording **all** joints when only some are moving can reduce model quality with certain policies.
To avoid this, you can exclude specific parts from recording and replay using:
````
```bash
--robot.with_<part>=false
```,
```
with `<part>` being one of : `mobile_base`, `l_arm`, `r_arm", `neck`, `antennas`.
It determine whether the corresponding part is recorded in the observations. True if not set.
@@ -222,49 +227,60 @@ By default, **all parts are recorded**.
The same per-part mechanism is available in `reachy2_teleoperator` as well.
````
```bash
--teleop.with\_<part>
```
with `<part>` being one of : `mobile_base`, `l_arm`, `r_arm", `neck`, `antennas`.
Determine whether the corresponding part is recorded in the actions. True if not set.
> **Important:** In a given session, the **enabled parts must match** on both the robot and the teleoperator.
For example, if the robot runs with `--robot.with_mobile_base=false`, the teleoperator must disable the same part `--teleoperator.with_mobile_base=false`.
> For example, if the robot runs with `--robot.with_mobile_base=false`, the teleoperator must disable the same part `--teleoperator.with_mobile_base=false`.
##### Use the relevant cameras
You can do the same for **cameras**. By default, only the **teleoperation cameras** are recorded (both `left_teleop_camera` and `right_teleop_camera`). Enable or disable each camera with:
You can do the same for **cameras**. Enable or disable each camera with default parameters using:
```bash
--robot.with_left_teleop_camera=<true|false> \
--robot.with_right_teleop_camera=<true|false> \
--robot.with_torso_camera=<true|false>
```
--robot.with_left_teleop_camera=<true|false>
--robot.with_right_teleop_camera=<true|false>
--robot.with_torso_camera=<true|false>
By default, no camera is recorded, all camera arguments are set to `false`.
If you want to, you can use custom `width` and `height` parameters for Reachy 2's cameras using the `--robot.camera_width` & `--robot.camera_height` argument:
````
```bash
--robot.camera_width=1920 \
--robot.camera_height=1080
```
This will change the resolution of all 3 default robot cameras (enabled by the above bool arguments).
If you want, you can add additional cameras other than the ones in the robot as usual with:
```bash
--robot.cameras="{ extra: {type: opencv, index_or_path: 42, width: 640, height: 480, fps: 30}}" \
```
## Step 2: Replay
Make sure the robot is configured with the same parts as the dataset:
```bash
python -m lerobot.replay \
lerobot-replay \
--robot.type=reachy2 \
--robot.ip_address=192.168.0.200 \
--robot.use_external_commands=false \
--robot.with_mobile_base=false \
--dataset.repo_id=pollen_robotics/record_test \
--dataset.episode=0
--display_data=true
````
```
## Step 3: Train
```bash
python -m lerobot.scripts.train \
lerobot-train \
--dataset.repo_id=pollen_robotics/record_test \
--policy.type=act \
--output_dir=outputs/train/reachy2_test \
@@ -277,10 +293,9 @@ python -m lerobot.scripts.train \
## Step 4: Evaluate
```bash
python -m lerobot.record \
lerobot-eval \
--robot.type=reachy2 \
--robot.ip_address=192.168.0.200 \
--display_data=false \
--dataset.repo_id=pollen_robotics/eval_record_test \
--dataset.single_task="Evaluate reachy2 policy" \
--dataset.num_episodes=10 \
+1 -1
View File
@@ -111,7 +111,7 @@ unitree_g1 = [
"pyzmq>=26.2.1,<28.0.0",
"onnxruntime>=1.16.0,<2.0.0"
]
reachy2 = ["reachy2_sdk>=1.0.14,<1.1.0"]
reachy2 = ["reachy2_sdk>=1.0.15,<1.1.0"]
kinematics = ["lerobot[placo-dep]"]
intelrealsense = [
"pyrealsense2>=2.55.1.6486,<2.57.0 ; sys_platform != 'darwin'",
@@ -35,18 +35,19 @@ class Reachy2CameraConfig(CameraConfig):
name="teleop",
image_type="left",
ip_address="192.168.0.200", # IP address of the robot
fps=15,
port=50065, # Port of the camera server
width=640,
height=480,
fps=30, # Not configurable for Reachy 2 cameras
color_mode=ColorMode.RGB,
) # Left teleop camera, 640x480 @ 15FPS
) # Left teleop camera, 640x480 @ 30FPS
```
Attributes:
name: Name of the camera device. Can be "teleop" or "depth".
image_type: Type of image stream. For "teleop" camera, can be "left" or "right".
For "depth" camera, can be "rgb" or "depth". (depth is not supported yet)
fps: Requested frames per second for the color stream.
fps: Requested frames per second for the color stream. Not configurable for Reachy 2 cameras.
width: Requested frame width in pixels for the color stream.
height: Requested frame height in pixels for the color stream.
color_mode: Color mode for image output (RGB or BGR). Defaults to RGB.
@@ -62,7 +63,6 @@ class Reachy2CameraConfig(CameraConfig):
color_mode: ColorMode = ColorMode.RGB
ip_address: str | None = "localhost"
port: int = 50065
# use_depth: bool = False
def __post_init__(self) -> None:
if self.name not in ["teleop", "depth"]:
@@ -16,12 +16,13 @@
Provides the Reachy2Camera class for capturing frames from Reachy 2 cameras using Reachy 2's CameraManager.
"""
from __future__ import annotations
import logging
import os
import platform
import time
from threading import Event, Lock, Thread
from typing import Any
from typing import TYPE_CHECKING, Any
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
@@ -30,10 +31,19 @@ if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2 # type: ignore # TODO: add type stubs for OpenCV
import numpy as np # type: ignore # TODO: add type stubs for numpy
from reachy2_sdk.media.camera import CameraView # type: ignore # TODO: add type stubs for reachy2_sdk
from reachy2_sdk.media.camera_manager import ( # type: ignore # TODO: add type stubs for reachy2_sdk
CameraManager,
)
from lerobot.utils.import_utils import _reachy2_sdk_available
if TYPE_CHECKING or _reachy2_sdk_available:
from reachy2_sdk.media.camera import CameraView
from reachy2_sdk.media.camera_manager import CameraManager
else:
CameraManager = None
class CameraView:
LEFT = 0
RIGHT = 1
from lerobot.utils.errors import DeviceNotConnectedError
@@ -69,17 +79,10 @@ class Reachy2Camera(Camera):
self.config = config
self.fps = config.fps
self.color_mode = config.color_mode
self.cam_manager: CameraManager | None = None
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.frame_lock: Lock = Lock()
self.latest_frame: NDArray[Any] | None = None
self.new_frame_event: Event = Event()
def __str__(self) -> str:
return f"{self.__class__.__name__}({self.config.name}, {self.config.image_type})"
@@ -100,44 +103,23 @@ class Reachy2Camera(Camera):
def connect(self, warmup: bool = True) -> None:
"""
Connects to the Reachy2 CameraManager as specified in the configuration.
Raises:
DeviceNotConnectedError: If the camera is not connected.
"""
self.cam_manager = CameraManager(host=self.config.ip_address, port=self.config.port)
if self.cam_manager is None:
raise DeviceNotConnectedError(f"Could not connect to {self}.")
self.cam_manager.initialize_cameras()
logger.info(f"{self} connected.")
@staticmethod
def find_cameras(ip_address: str = "localhost", port: int = 50065) -> list[dict[str, Any]]:
def find_cameras() -> list[dict[str, Any]]:
"""
Detects available Reachy 2 cameras.
Returns:
List[Dict[str, Any]]: A list of dictionaries,
where each dictionary contains 'name', 'stereo',
and the default profile properties (width, height, fps).
Detection not implemented for Reachy2 cameras.
"""
initialized_cameras = []
camera_manager = CameraManager(host=ip_address, port=port)
for camera in [camera_manager.teleop, camera_manager.depth]:
if camera is None:
continue
height, width, _, _, _, _, _ = camera.get_parameters()
camera_info = {
"name": camera._cam_info.name,
"stereo": camera._cam_info.stereo,
"default_profile": {
"width": width,
"height": height,
"fps": 30,
},
}
initialized_cameras.append(camera_info)
camera_manager.disconnect()
return initialized_cameras
raise NotImplementedError("Camera detection is not implemented for Reachy2 cameras.")
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""
@@ -155,95 +137,49 @@ class Reachy2Camera(Camera):
(height, width, channels), using the specified or default
color mode and applying any configured rotation.
"""
start_time = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start_time = time.perf_counter()
if self.cam_manager is None:
raise DeviceNotConnectedError(f"{self} is not connected.")
frame: NDArray[Any] = np.empty((0, 0, 3), dtype=np.uint8)
if self.cam_manager is None:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"):
if self.config.image_type == "left":
frame = self.cam_manager.teleop.get_frame(
CameraView.LEFT, size=(self.config.width, self.config.height)
)[0]
elif self.config.image_type == "right":
frame = self.cam_manager.teleop.get_frame(
CameraView.RIGHT, size=(self.config.width, self.config.height)
)[0]
elif self.config.name == "depth" and hasattr(self.cam_manager, "depth"):
if self.config.image_type == "depth":
frame = self.cam_manager.depth.get_depth_frame()[0]
elif self.config.image_type == "rgb":
frame = self.cam_manager.depth.get_frame(size=(self.config.width, self.config.height))[0]
else:
if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"):
if self.config.image_type == "left":
frame = self.cam_manager.teleop.get_frame(CameraView.LEFT, size=(640, 480))[0]
elif self.config.image_type == "right":
frame = self.cam_manager.teleop.get_frame(CameraView.RIGHT, size=(640, 480))[0]
elif self.config.name == "depth" and hasattr(self.cam_manager, "depth"):
if self.config.image_type == "depth":
frame = self.cam_manager.depth.get_depth_frame()[0]
elif self.config.image_type == "rgb":
frame = self.cam_manager.depth.get_frame(size=(640, 480))[0]
raise ValueError(f"Invalid camera name '{self.config.name}'. Expected 'teleop' or 'depth'.")
if frame is None:
return np.empty((0, 0, 3), dtype=np.uint8)
if frame is None:
return np.empty((0, 0, 3), dtype=np.uint8)
if self.config.color_mode == "rgb":
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if self.config.color_mode == "rgb":
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} read took: {read_duration_ms:.1f}ms")
return frame
def _read_loop(self) -> None:
"""
Internal loop run by the background thread for asynchronous reading.
On each iteration:
1. Reads a color frame
2. Stores result in latest_frame (thread-safe)
3. Sets new_frame_event to notify listeners
Stops on DeviceNotConnectedError, logs other errors and continues.
"""
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.")
while not self.stop_event.is_set():
try:
color_image = self.read()
with self.frame_lock:
self.latest_frame = color_image
self.new_frame_event.set()
except DeviceNotConnectedError:
break
except Exception as e:
logger.warning(f"Error reading frame in background thread for {self}: {e}")
def _start_read_thread(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
if self.stop_event is not None:
self.stop_event.set()
self.stop_event = Event()
self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.thread.daemon = True
self.thread.start()
def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None:
self.stop_event.set()
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=2.0)
self.thread = None
self.stop_event = None
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame asynchronously.
Reads the latest available frame.
This method retrieves the most recent frame captured by the background
read thread. It does not block waiting for the camera hardware directly,
but may wait up to timeout_ms for the background thread to provide a frame.
This method retrieves the most recent frame available in Reachy 2's low-level software.
Args:
timeout_ms (float): Maximum time in milliseconds to wait for a frame
@@ -261,22 +197,10 @@ class Reachy2Camera(Camera):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
self._start_read_thread()
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
thread_alive = self.thread is not None and self.thread.is_alive()
raise TimeoutError(
f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. "
f"Read thread alive: {thread_alive}."
)
with self.frame_lock:
frame = self.latest_frame
self.new_frame_event.clear()
frame = self.read()
if frame is None:
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
raise RuntimeError(f"Internal error: No frame available for {self}.")
return frame
@@ -287,12 +211,9 @@ class Reachy2Camera(Camera):
Raises:
DeviceNotConnectedError: If the camera is already disconnected.
"""
if not self.is_connected and self.thread is None:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} not connected.")
if self.thread is not None:
self._stop_read_thread()
if self.cam_manager is not None:
self.cam_manager.disconnect()
@@ -30,6 +30,8 @@ class Reachy2RobotConfig(RobotConfig):
# IP address of the Reachy 2 robot
ip_address: str | None = "localhost"
# Port of the Reachy 2 robot
port: int = 50065
# If True, turn_off_smoothly() will be sent to the robot before disconnecting.
disable_torque_on_disconnect: bool = False
@@ -51,11 +53,16 @@ class Reachy2RobotConfig(RobotConfig):
# Robot cameras
# Set to True if you want to use the corresponding cameras in the observations.
# By default, only the teleop cameras are used.
with_left_teleop_camera: bool = True
with_right_teleop_camera: bool = True
# By default, no camera is used.
with_left_teleop_camera: bool = False
with_right_teleop_camera: bool = False
with_torso_camera: bool = False
# Camera parameters
camera_width: int = 640
camera_height: int = 480
# For cameras other than the 3 default Reachy 2 cameras.
cameras: dict[str, CameraConfig] = field(default_factory=dict)
def __post_init__(self) -> None:
@@ -65,9 +72,10 @@ class Reachy2RobotConfig(RobotConfig):
name="teleop",
image_type="left",
ip_address=self.ip_address,
fps=15,
width=640,
height=480,
port=self.port,
width=self.camera_width,
height=self.camera_height,
fps=30, # Not configurable for Reachy 2 cameras
color_mode=ColorMode.RGB,
)
if self.with_right_teleop_camera:
@@ -75,9 +83,10 @@ class Reachy2RobotConfig(RobotConfig):
name="teleop",
image_type="right",
ip_address=self.ip_address,
fps=15,
width=640,
height=480,
port=self.port,
width=self.camera_width,
height=self.camera_height,
fps=30, # Not configurable for Reachy 2 cameras
color_mode=ColorMode.RGB,
)
if self.with_torso_camera:
@@ -85,9 +94,10 @@ class Reachy2RobotConfig(RobotConfig):
name="depth",
image_type="rgb",
ip_address=self.ip_address,
fps=15,
width=640,
height=480,
port=self.port,
width=self.camera_width,
height=self.camera_height,
fps=30, # Not configurable for Reachy 2 cameras
color_mode=ColorMode.RGB,
)
+8 -2
View File
@@ -13,19 +13,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import time
from typing import Any
from typing import TYPE_CHECKING, Any
import numpy as np
from reachy2_sdk import ReachySDK
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.utils.import_utils import _reachy2_sdk_available
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .configuration_reachy2 import Reachy2RobotConfig
if TYPE_CHECKING or _reachy2_sdk_available:
from reachy2_sdk import ReachySDK
else:
ReachySDK = None
# {lerobot_keys: reachy2_sdk_keys}
REACHY2_NECK_JOINTS = {
"neck_yaw.pos": "head.neck.yaw",
+3
View File
@@ -73,6 +73,7 @@ from lerobot.cameras import ( # noqa: F401
CameraConfig, # noqa: F401
)
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.reachy2_camera.configuration_reachy2_camera import Reachy2CameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.cameras.zmq.configuration_zmq import ZMQCameraConfig # noqa: F401
from lerobot.configs import parser
@@ -103,6 +104,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower,
make_robot_from_config,
omx_follower,
reachy2,
so_follower,
unitree_g1,
)
@@ -114,6 +116,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader,
make_teleoperator_from_config,
omx_leader,
reachy2_teleoperator,
so_leader,
)
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop
+1
View File
@@ -59,6 +59,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower,
make_robot_from_config,
omx_follower,
reachy2,
so_follower,
unitree_g1,
)
@@ -76,6 +76,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower,
make_robot_from_config,
omx_follower,
reachy2,
so_follower,
)
from lerobot.teleoperators import ( # noqa: F401
@@ -88,6 +89,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader,
make_teleoperator_from_config,
omx_leader,
reachy2_teleoperator,
so_leader,
)
from lerobot.utils.import_utils import register_third_party_plugins
@@ -13,11 +13,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import time
from typing import TYPE_CHECKING
from reachy2_sdk import ReachySDK
from lerobot.utils.import_utils import _reachy2_sdk_available
if TYPE_CHECKING or _reachy2_sdk_available:
from reachy2_sdk import ReachySDK
else:
ReachySDK = None
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .config_reachy2_teleoperator import Reachy2TeleoperatorConfig
@@ -75,6 +84,7 @@ class Reachy2Teleoperator(Teleoperator):
def __init__(self, config: Reachy2TeleoperatorConfig):
super().__init__(config)
self.config = config
self.reachy: None | ReachySDK = None
@@ -117,9 +127,13 @@ class Reachy2Teleoperator(Teleoperator):
return self.reachy.is_connected() if self.reachy is not None else False
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.reachy = ReachySDK(self.config.ip_address)
if not self.is_connected:
raise ConnectionError()
raise DeviceNotConnectedError()
logger.info(f"{self} connected.")
@property
@@ -135,23 +149,24 @@ class Reachy2Teleoperator(Teleoperator):
def get_action(self) -> dict[str, float]:
start = time.perf_counter()
if self.reachy and self.is_connected:
if self.config.use_present_position:
joint_action = {
k: self.reachy.joints[v].present_position for k, v in self.joints_dict.items()
}
else:
joint_action = {k: self.reachy.joints[v].goal_position for k, v in self.joints_dict.items()}
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if not self.config.with_mobile_base:
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return joint_action
joint_action: dict[str, float] = {}
vel_action: dict[str, float] = {}
if self.config.use_present_position:
vel_action = {k: self.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items()}
else:
vel_action = {k: self.reachy.mobile_base.last_cmd_vel[v] for k, v in REACHY2_VEL.items()}
if self.config.use_present_position:
joint_action = {k: self.reachy.joints[v].present_position for k, v in self.joints_dict.items()}
else:
joint_action = {k: self.reachy.joints[v].goal_position for k, v in self.joints_dict.items()}
if not self.config.with_mobile_base:
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return joint_action
if self.config.use_present_position:
vel_action = {k: self.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items()}
else:
vel_action = {k: self.reachy.mobile_base.last_cmd_vel[v] for k, v in REACHY2_VEL.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return {**joint_action, **vel_action}
@@ -160,5 +175,5 @@ class Reachy2Teleoperator(Teleoperator):
raise NotImplementedError
def disconnect(self) -> None:
if self.reachy and self.is_connected:
if self.is_connected:
self.reachy.disconnect()
+1
View File
@@ -64,6 +64,7 @@ def is_package_available(pkg_name: str, return_version: bool = False) -> tuple[b
_transformers_available = is_package_available("transformers")
_peft_available = is_package_available("peft")
_scipy_available = is_package_available("scipy")
_reachy2_sdk_available = is_package_available("reachy2_sdk")
def make_device_from_device_class(config: ChoiceRegistry) -> Any:
+2 -12
View File
@@ -20,6 +20,8 @@ from unittest.mock import MagicMock, patch
import numpy as np
import pytest
pytest.importorskip("reachy2_sdk")
from lerobot.cameras.reachy2_camera import Reachy2Camera, Reachy2CameraConfig
from lerobot.utils.errors import DeviceNotConnectedError
@@ -127,24 +129,12 @@ def test_async_read(camera):
try:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
finally:
if camera.is_connected:
camera.disconnect()
def test_async_read_timeout(camera):
camera.connect()
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
finally:
if camera.is_connected:
camera.disconnect()
def test_read_before_connect(camera):
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
-1
View File
@@ -28,7 +28,6 @@ pytest_plugins = [
"tests.fixtures.files",
"tests.fixtures.hub",
"tests.fixtures.optimizers",
"tests.plugins.reachy2_sdk",
]
-46
View File
@@ -1,46 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import types
from unittest.mock import MagicMock
def _install_reachy2_sdk_stub():
sdk = types.ModuleType("reachy2_sdk")
sdk.__path__ = []
sdk.ReachySDK = MagicMock(name="ReachySDK")
media = types.ModuleType("reachy2_sdk.media")
media.__path__ = []
camera = types.ModuleType("reachy2_sdk.media.camera")
camera.CameraView = MagicMock(name="CameraView")
camera_manager = types.ModuleType("reachy2_sdk.media.camera_manager")
camera_manager.CameraManager = MagicMock(name="CameraManager")
sdk.media = media
media.camera = camera
media.camera_manager = camera_manager
# Register in sys.modules
sys.modules.setdefault("reachy2_sdk", sdk)
sys.modules.setdefault("reachy2_sdk.media", media)
sys.modules.setdefault("reachy2_sdk.media.camera", camera)
sys.modules.setdefault("reachy2_sdk.media.camera_manager", camera_manager)
def pytest_sessionstart(session):
_install_reachy2_sdk_stub()
+2
View File
@@ -19,6 +19,8 @@ from unittest.mock import MagicMock, patch
import numpy as np
import pytest
pytest.importorskip("reachy2_sdk")
from lerobot.robots.reachy2 import (
REACHY2_ANTENNAS_JOINTS,
REACHY2_L_ARM_JOINTS,