Usable with or without mobile base

This commit is contained in:
glannuzel
2025-08-06 13:15:03 +02:00
parent ee406bdfbf
commit 75652a39b4
8 changed files with 172 additions and 128 deletions
@@ -15,7 +15,6 @@
from dataclasses import dataclass, field
from lerobot.cameras import CameraConfig
# from lerobot.cameras.opencv import OpenCVCameraConfig
from lerobot.cameras.reachy2_camera import Reachy2CameraConfig
from lerobot.cameras.configs import ColorMode, Cv2Rotation
@@ -31,37 +30,38 @@ class Reachy2RobotConfig(RobotConfig):
max_relative_target: int | None = None
ip_address: str | None = "localhost"
use_external_commands: bool = False
with_mobile_base: bool = True
# cameras
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: {
"teleop_left": Reachy2CameraConfig(
name="teleop",
image_type="left",
fps=30,
width=640,
height=480,
color_mode=ColorMode.RGB,
rotation=Cv2Rotation.NO_ROTATION
),
"teleop_right": Reachy2CameraConfig(
name="teleop",
image_type="right",
fps=30,
width=640,
height=480,
color_mode=ColorMode.RGB,
rotation=Cv2Rotation.NO_ROTATION
),
"torso_rgb": Reachy2CameraConfig(
name="depth",
image_type="rgb",
fps=30,
width=640,
height=480,
color_mode=ColorMode.RGB,
rotation=Cv2Rotation.NO_ROTATION
),
# "teleop_left": Reachy2CameraConfig(
# name="teleop",
# image_type="left",
# fps=30,
# width=640,
# height=480,
# color_mode=ColorMode.RGB,
# rotation=Cv2Rotation.NO_ROTATION
# ),
# "teleop_right": Reachy2CameraConfig(
# name="teleop",
# image_type="right",
# fps=30,
# width=640,
# height=480,
# color_mode=ColorMode.RGB,
# rotation=Cv2Rotation.NO_ROTATION
# ),
# "torso_rgb": Reachy2CameraConfig(
# name="depth",
# image_type="rgb",
# fps=30,
# width=640,
# height=480,
# color_mode=ColorMode.RGB,
# rotation=Cv2Rotation.NO_ROTATION
# ),
# "torso_depth": Reachy2CameraConfig(
# name="depth",
# image_type="depth",
@@ -72,6 +72,84 @@ class Reachy2RobotConfig(RobotConfig):
# rotation=Cv2Rotation.NO_ROTATION,
# use_depth=True
# )
# # REAL ROBOT
# "teleop_left": Reachy2CameraConfig(
# name="teleop",
# image_type="left",
# ip_address="192.168.0.199",
# # ip_address="172.18.131.66",
# fps=30,
# width=960,
# height=720,
# color_mode=ColorMode.RGB,
# rotation=Cv2Rotation.NO_ROTATION
# ),
# "teleop_right": Reachy2CameraConfig(
# name="teleop",
# image_type="right",
# ip_address="192.168.0.199",
# # ip_address="172.18.131.66",
# fps=30,
# width=960,
# height=720,
# color_mode=ColorMode.RGB,
# rotation=Cv2Rotation.NO_ROTATION
# ),
# "torso_rgb": Reachy2CameraConfig(
# name="depth",
# image_type="rgb",
# ip_address="172.18.131.66",
# fps=30,
# width=1280,
# height=720,
# color_mode=ColorMode.RGB,
# rotation=Cv2Rotation.NO_ROTATION
# ),
# # REAL ROBOT REDUCED IMAGE SIZE
"teleop_left": Reachy2CameraConfig(
name="teleop",
image_type="left",
ip_address="192.168.0.199",
fps=30,
width=640,
height=480,
color_mode=ColorMode.RGB,
rotation=Cv2Rotation.NO_ROTATION
),
"teleop_right": Reachy2CameraConfig(
name="teleop",
image_type="right",
ip_address="192.168.0.199",
fps=30,
width=640,
height=480,
color_mode=ColorMode.RGB,
rotation=Cv2Rotation.NO_ROTATION
),
# # Reduced size for testing
# "teleop_left": Reachy2CameraConfig(
# name="teleop",
# image_type="left",
# ip_address="172.18.131.66",
# fps=30,
# width=480,
# height=360,
# color_mode=ColorMode.RGB,
# rotation=Cv2Rotation.NO_ROTATION
# ),
# "teleop_right": Reachy2CameraConfig(
# name="teleop",
# image_type="right",
# ip_address="172.18.131.66",
# fps=30,
# width=480,
# height=360,
# color_mode=ColorMode.RGB,
# rotation=Cv2Rotation.NO_ROTATION
# ),
}
)
+17 -13
View File
@@ -20,10 +20,6 @@ import numpy as np
from reachy2_sdk import ReachySDK
from typing import Any
# from stretch_body.gamepad_teleop import GamePadTeleop
# from stretch_body.robot import Robot as StretchAPI
# from stretch_body.robot_params import RobotParams
from lerobot.cameras.utils import make_cameras_from_configs
from ..robot import Robot
@@ -97,13 +93,16 @@ class Reachy2Robot(Robot):
@property
def motors_features(self) -> dict:
return {**dict.fromkeys(
REACHY2_JOINTS.keys(),
float,
), **dict.fromkeys(
REACHY2_VEL.keys(),
float,
)}
if self.config.with_mobile_base:
return {**dict.fromkeys(
REACHY2_JOINTS.keys(),
float,
), **dict.fromkeys(
REACHY2_VEL.keys(),
float,
)}
else:
return dict.fromkeys(REACHY2_JOINTS.keys(), float)
@property
def is_connected(self) -> bool:
@@ -133,6 +132,8 @@ class Reachy2Robot(Robot):
def _get_state(self) -> dict:
pos_dict = {k: self.reachy.joints[v].present_position for k, v in REACHY2_JOINTS.items()}
if not self.config.with_mobile_base:
return pos_dict
vel_dict = {k: self.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items()}
return {**pos_dict, **vel_dict}
@@ -165,12 +166,15 @@ class Reachy2Robot(Robot):
vel[REACHY2_VEL[key]] = val
else:
self.reachy.joints[REACHY2_JOINTS[key]].goal_position = val
self.reachy.mobile_base.set_goal_speed(vel["vx"], vel["vy"], vel["vtheta"])
if self.config.with_mobile_base:
self.reachy.mobile_base.set_goal_speed(vel["vx"], vel["vy"], vel["vtheta"])
# We don't send the goal positions if we control Reachy 2 externally
if not self.use_external_commands:
self.reachy.send_goal_positions()
self.reachy.mobile_base.send_speed_command()
if self.config.with_mobile_base:
self.reachy.mobile_base.send_speed_command()
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
return action
+11 -35
View File
@@ -18,41 +18,17 @@ EPISODE_TIME_SEC = 4
TASK_DESCRIPTION = "Grab a cube with Reachy 2"
REACHY2_MOTORS = {
"neck_yaw.pos": "head.neck.yaw",
"neck_pitch.pos": "head.neck.pitch",
"neck_roll.pos": "head.neck.roll",
"r_shoulder_pitch.pos": "r_arm.shoulder.pitch",
"r_shoulder_roll.pos": "r_arm.shoulder.roll",
"r_elbow_yaw.pos": "r_arm.elbow.yaw",
"r_elbow_pitch.pos": "r_arm.elbow.pitch",
"r_wrist_roll.pos": "r_arm.wrist.roll",
"r_wrist_pitch.pos": "r_arm.wrist.pitch",
"r_wrist_yaw.pos": "r_arm.wrist.yaw",
"r_gripper.pos": "r_arm.gripper",
"l_shoulder_pitch.pos": "l_arm.shoulder.pitch",
"l_shoulder_roll.pos": "l_arm.shoulder.roll",
"l_elbow_yaw.pos": "l_arm.elbow.yaw",
"l_elbow_pitch.pos": "l_arm.elbow.pitch",
"l_wrist_roll.pos": "l_arm.wrist.roll",
"l_wrist_pitch.pos": "l_arm.wrist.pitch",
"l_wrist_yaw.pos": "l_arm.wrist.yaw",
"l_gripper.pos": "l_arm.gripper",
"l_antenna.pos": "head.l_antenna",
"r_antenna.pos": "head.r_antenna",
}
# Create the robot configuration
robot_config = Reachy2RobotConfig(
ip_address="192.168.0.199",
id="test_reachy",
id="reachy2-pvt02",
with_mobile_base=False,
)
# Initialize the robot
robot = Reachy2Robot(robot_config)
# Instantiate a client for starting and intermediate positions
reachy = ReachySDK(robot_config.ip_address)
# reachy = ReachySDK(robot_config.ip_address)
# Initialize the policy
policy = ACTPolicy.from_pretrained("pepijn223/grab_cube_2")
@@ -105,10 +81,10 @@ l_arm_goal = [action["l_shoulder_pitch.pos"],
action["l_wrist_pitch.pos"],
action["l_wrist_yaw.pos"]]
reachy.head.goto(neck_goal)
reachy.r_arm.goto(r_arm_goal)
reachy.r_arm.gripper.goto(100.0)
reachy.l_arm.goto(l_arm_goal, wait=True)
robot.reachy.head.goto(neck_goal)
robot.reachy.r_arm.goto(r_arm_goal)
robot.reachy.r_arm.gripper.goto(100.0)
robot.reachy.l_arm.goto(l_arm_goal, wait=True)
time.sleep(1.0)
for episode_idx in range(NUM_EPISODES):
@@ -127,10 +103,10 @@ for episode_idx in range(NUM_EPISODES):
)
# Set the robot back to the initial pose
reachy.head.goto(neck_goal)
reachy.r_arm.goto(r_arm_goal)
reachy.r_arm.gripper.goto(100.0)
reachy.l_arm.goto(l_arm_goal, wait=True)
robot.reachy.head.goto(neck_goal)
robot.reachy.r_arm.goto(r_arm_goal)
robot.reachy.r_arm.gripper.goto(100.0)
robot.reachy.l_arm.goto(l_arm_goal, wait=True)
time.sleep(1.0)
dataset.save_episode()
+11 -8
View File
@@ -1,7 +1,8 @@
from lerobot.robots.reachy2 import Reachy2Robot, Reachy2RobotConfig
import time
REACHY2_MOTORS = {
# {lerobot_keys: reachy2_sdk_keys}
REACHY2_JOINTS = {
"neck_yaw.pos": "head.neck.yaw",
"neck_pitch.pos": "head.neck.pitch",
"neck_roll.pos": "head.neck.roll",
@@ -23,9 +24,12 @@ REACHY2_MOTORS = {
"l_gripper.pos": "l_arm.gripper",
"l_antenna.pos": "head.l_antenna",
"r_antenna.pos": "head.r_antenna",
# "mobile_base.vx": "mobile_base.vx",
# "mobile_base.vy": "mobile_base.vy",
# "mobile_base.vtheta": "mobile_base.vtheta",
}
REACHY2_VEL = {
"mobile_base.vx": "vx",
"mobile_base.vy": "vy",
"mobile_base.vtheta": "vtheta",
}
@@ -45,10 +49,9 @@ print(f"action_features: {robot.action_features}\n")
def get_action(robot):
my_keys = REACHY2_MOTORS.keys()
my_values = [robot.reachy.joints[motor].present_position for motor in REACHY2_MOTORS.values()]
action = dict(zip(my_keys, my_values))
return action
pos_dict = {k: robot.reachy.joints[v].present_position for k, v in REACHY2_JOINTS.items()}
vel_dict = {k: robot.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items()}
return {**pos_dict, **vel_dict}
action = get_action(robot)
+4 -5
View File
@@ -17,14 +17,13 @@ TASK_DESCRIPTION = "Grab a cube in Mujoco simulation"
# Create the robot and teleoperator configurations
robot_config = Reachy2RobotConfig(
# ip_address="localhost",
# ip_address="172.18.131.66",
ip_address="192.168.0.200",
ip_address="192.168.0.199",
id="test_reachy",
with_mobile_base=False,
)
teleop_config = Reachy2FakeTeleoperatorConfig(
# ip_address="172.18.131.66",
ip_address="192.168.0.200",
ip_address="192.168.0.199",
with_mobile_base=False,
)
# Initialize the robot and teleoperator
+8 -32
View File
@@ -13,43 +13,18 @@ import numpy as np
import time
REACHY2_MOTORS = {
"neck_yaw.pos": "head.neck.yaw",
"neck_pitch.pos": "head.neck.pitch",
"neck_roll.pos": "head.neck.roll",
"r_shoulder_pitch.pos": "r_arm.shoulder.pitch",
"r_shoulder_roll.pos": "r_arm.shoulder.roll",
"r_elbow_yaw.pos": "r_arm.elbow.yaw",
"r_elbow_pitch.pos": "r_arm.elbow.pitch",
"r_wrist_roll.pos": "r_arm.wrist.roll",
"r_wrist_pitch.pos": "r_arm.wrist.pitch",
"r_wrist_yaw.pos": "r_arm.wrist.yaw",
"r_gripper.pos": "r_arm.gripper",
"l_shoulder_pitch.pos": "l_arm.shoulder.pitch",
"l_shoulder_roll.pos": "l_arm.shoulder.roll",
"l_elbow_yaw.pos": "l_arm.elbow.yaw",
"l_elbow_pitch.pos": "l_arm.elbow.pitch",
"l_wrist_roll.pos": "l_arm.wrist.roll",
"l_wrist_pitch.pos": "l_arm.wrist.pitch",
"l_wrist_yaw.pos": "l_arm.wrist.yaw",
"l_gripper.pos": "l_arm.gripper",
"l_antenna.pos": "head.l_antenna",
"r_antenna.pos": "head.r_antenna",
}
# Create the robot configuration
robot_config = Reachy2RobotConfig(
# ip_address="localhost",
# ip_address="172.18.131.66",
ip_address="192.168.0.199",
id="reachy2-pvt02",
with_mobile_base=False,
)
# Initialize the robot
robot = Reachy2Robot(robot_config)
reachy = ReachySDK(robot_config.ip_address)
# reachy = ReachySDK(robot_config.ip_address)
# Create the dataset
@@ -59,6 +34,7 @@ actions = dataset.hf_dataset.select_columns("action")
# Connect the robot
robot.connect()
# Go smoothly to the first action
action_array = actions[0]["action"]
action = {}
for i, name in enumerate(dataset.features["action"]["names"]):
@@ -80,9 +56,9 @@ l_arm_goal = [action["l_shoulder_pitch.pos"],
action["l_wrist_pitch.pos"],
action["l_wrist_yaw.pos"]]
reachy.head.goto(neck_goal)
reachy.r_arm.goto(r_arm_goal)
reachy.l_arm.goto(l_arm_goal, wait=True)
robot.reachy.head.goto(neck_goal)
robot.reachy.r_arm.goto(r_arm_goal)
robot.reachy.l_arm.goto(l_arm_goal, wait=True)
for idx in range(dataset.num_frames):
start_episode_t = time.perf_counter()
@@ -98,4 +74,4 @@ for idx in range(dataset.num_frames):
busy_wait(1 / dataset.fps - dt_s)
# Clean up
# robot.disconnect()
robot.disconnect()
@@ -24,3 +24,4 @@ from ..config import TeleoperatorConfig
class Reachy2FakeTeleoperatorConfig(TeleoperatorConfig):
# Port to connect to the arm
ip_address: str | None = "localhost"
with_mobile_base: bool = True
@@ -78,13 +78,16 @@ class Reachy2FakeTeleoperator(Teleoperator):
@property
def action_features(self) -> dict[str, type]:
return {**dict.fromkeys(
REACHY2_JOINTS.keys(),
float,
), **dict.fromkeys(
REACHY2_VEL.keys(),
float,
)}
if self.config.with_mobile_base:
return {**dict.fromkeys(
REACHY2_JOINTS.keys(),
float,
), **dict.fromkeys(
REACHY2_VEL.keys(),
float,
)}
else:
return dict.fromkeys(REACHY2_JOINTS.keys(), float)
@property
def feedback_features(self) -> dict[str, type]:
@@ -114,6 +117,10 @@ class Reachy2FakeTeleoperator(Teleoperator):
def get_action(self) -> dict[str, float]:
start = time.perf_counter()
joint_action = {k: self.reachy.joints[v].goal_position for k, v in REACHY2_JOINTS.items()}
if not self.config.with_mobile_base:
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return joint_action
vel_action = {k: self.reachy.mobile_base.last_cmd_vel[v] for k, v in REACHY2_VEL.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")