diff --git a/examples/unitree_g1/gr00t_locomotion.py b/examples/unitree_g1/gr00t_locomotion.py index 7a5d94157..b2aadb363 100644 --- a/examples/unitree_g1/gr00t_locomotion.py +++ b/examples/unitree_g1/gr00t_locomotion.py @@ -15,57 +15,91 @@ from collections import deque import numpy as np import onnxruntime as ort import torch -from scipy.spatial.transform import Rotation as R from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1 logger = logging.getLogger(__name__) +GROOT_DEFAULT_ANGLES = np.array( + [ + -0.1, + 0.0, + 0.0, + 0.3, + -0.2, + 0.0, # left leg + -0.1, + 0.0, + 0.0, + 0.3, + -0.2, + 0.0, # right leg + 0.0, + 0.0, + 0.0, # waist + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, # left arm (zeroed) + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, # right arm (zeroed) + ], + dtype=np.float32, +) + +JOINTS_TO_ZERO = [12, 14, 20, 21, 27, 28] # waist yaw/pitch, wrist pitch/yaw +PROBLEMATIC_JOINTS = [12, 14, 20, 21, 27, 28] + +LOCOMOTION_ACTION_SCALE = 0.25 + +LOCOMOTION_CONTROL_DT = 0.02 + + +ANG_VEL_SCALE: float = 0.25 +DOF_POS_SCALE: float = 1.0 +DOF_VEL_SCALE: float = 0.05 +CMD_SCALE: list = [2.0, 2.0, 0.25] + def load_groot_policies() -> tuple: """Load GR00T dual-policy system (Balance + Walk) from ONNX files.""" logger.info("Loading GR00T dual-policy system...") - + # Load ONNX policies - policy_balance = ort.InferenceSession("examples/unitree_g1/locomotion/GR00T-WholeBodyControl-Balance.onnx") + policy_balance = ort.InferenceSession( + "examples/unitree_g1/locomotion/GR00T-WholeBodyControl-Balance.onnx" + ) policy_walk = ort.InferenceSession("examples/unitree_g1/locomotion/GR00T-WholeBodyControl-Walk.onnx") - + logger.info("GR00T policies loaded successfully") - logger.info(f" Input shape: {policy_balance.get_inputs()[0].shape}") - logger.info(f" Output shape: {policy_balance.get_outputs()[0].shape}") - + return policy_balance, policy_walk class GrootLocomotionController: """ Handles GR00T-style locomotion control for the Unitree G1 robot. - + This controller manages: - Dual-policy system (Balance + Walk) - 29-joint observation processing - 15D action output (legs + waist) - Policy inference and motor command generation """ - - # GR00T default angles for all 29 joints - GROOT_DEFAULT_ANGLES = np.array([ - -0.1, 0.0, 0.0, 0.3, -0.2, 0.0, # left leg - -0.1, 0.0, 0.0, 0.3, -0.2, 0.0, # right leg - 0.0, 0.0, 0.0, # waist - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, # left arm (zeroed) - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, # right arm (zeroed) - ], dtype=np.float32) - - # Joints to zero out in observations and commands - JOINTS_TO_ZERO = [12, 14, 20, 21, 27, 28] # waist yaw/pitch, wrist pitch/yaw - PROBLEMATIC_JOINTS = [12, 14, 20, 21, 27, 28] - + def __init__(self, policy_balance, policy_walk, robot, config): """ Initialize the GR00T locomotion controller. - + Args: policy_balance: ONNX InferenceSession for balance/standing policy policy_walk: ONNX InferenceSession for walking policy @@ -76,11 +110,9 @@ class GrootLocomotionController: self.policy_walk = policy_walk self.robot = robot self.config = config - - # Locomotion state - self.locomotion_counter = 0 + self.locomotion_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) # vx, vy, yaw_rate - + # GR00T-specific state self.groot_qj_all = np.zeros(29, dtype=np.float32) self.groot_dqj_all = np.zeros(29, dtype=np.float32) @@ -90,30 +122,27 @@ class GrootLocomotionController: self.groot_obs_stacked = np.zeros(516, dtype=np.float32) self.groot_height_cmd = 0.74 # Default base height self.groot_orientation_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) - + # Initialize history with zeros for _ in range(6): self.groot_obs_history.append(np.zeros(86, dtype=np.float32)) - + # Thread management self.locomotion_running = False self.locomotion_thread = None - + logger.info("GrootLocomotionController initialized") - + def groot_locomotion_run(self): - """GR00T-style locomotion policy loop for ONNX policies - reads all 29 joints, outputs 15D action.""" - self.locomotion_counter += 1 - - # Get current lowstate - lowstate = self.robot.lowstate_buffer.GetData() - if lowstate is None: + # Get current obs + robot_state = self.robot.get_observation() + if robot_state is None: return - + # Update remote controller from lowstate - if lowstate.wireless_remote is not None: - self.robot.remote_controller.set(lowstate.wireless_remote) - + if robot_state.wireless_remote is not None: + self.robot.remote_controller.set(robot_state.wireless_remote) + # R1/R2 buttons for height control on real robot (button indices 0 and 4) if self.robot.remote_controller.button[0]: # R1 - raise height self.groot_height_cmd += 0.001 # Small increment per timestep (~0.05m per second at 50Hz) @@ -127,49 +156,39 @@ class GrootLocomotionController: self.robot.remote_controller.ly = 0.0 self.robot.remote_controller.rx = 0.0 self.robot.remote_controller.ry = 0.0 - + # Get ALL 29 joint positions and velocities for i in range(29): - self.groot_qj_all[i] = lowstate.motor_state[i].q - self.groot_dqj_all[i] = lowstate.motor_state[i].dq - + self.groot_qj_all[i] = robot_state.motor_state[i].q + self.groot_dqj_all[i] = robot_state.motor_state[i].dq + # Get IMU data - quat = lowstate.imu_state.quaternion - ang_vel = np.array(lowstate.imu_state.gyroscope, dtype=np.float32) - - # Transform IMU if using torso IMU - if self.config.locomotion_imu_type == "torso": - waist_yaw = lowstate.motor_state[12].q # Waist yaw index - waist_yaw_omega = lowstate.motor_state[12].dq - quat, ang_vel_3d = self.robot.locomotion_transform_imu_data( - waist_yaw, waist_yaw_omega, quat, np.array([ang_vel]) - ) - ang_vel = ang_vel_3d.flatten() - - # Create observation - gravity_orientation = self.robot.locomotion_get_gravity_orientation(quat) - + quat = robot_state.imu_state.quaternion + ang_vel = np.array(robot_state.imu_state.gyroscope, dtype=np.float32) + + gravity_orientation = self.robot.get_gravity_orientation(quat) + # Zero out specific joints in observation - for idx in self.JOINTS_TO_ZERO: + for idx in JOINTS_TO_ZERO: self.groot_qj_all[idx] = 0.0 self.groot_dqj_all[idx] = 0.0 - + # Scale joint positions and velocities qj_obs = self.groot_qj_all.copy() dqj_obs = self.groot_dqj_all.copy() - - qj_obs = (qj_obs - self.GROOT_DEFAULT_ANGLES) * self.config.dof_pos_scale - dqj_obs = dqj_obs * self.config.dof_vel_scale - ang_vel_scaled = ang_vel * self.config.groot_ang_vel_scale - + + qj_obs = (qj_obs - GROOT_DEFAULT_ANGLES) * DOF_POS_SCALE + dqj_obs = dqj_obs * DOF_VEL_SCALE + ang_vel_scaled = ang_vel * ANG_VEL_SCALE + # Get velocity commands (keyboard or remote) if not self.robot.simulation_mode: self.locomotion_cmd[0] = self.robot.remote_controller.ly self.locomotion_cmd[1] = self.robot.remote_controller.lx * -1 self.locomotion_cmd[2] = self.robot.remote_controller.rx * -1 - + # Build 86D single frame observation (GR00T format) - self.groot_obs_single[:3] = self.locomotion_cmd * np.array(self.config.groot_cmd_scale) + self.groot_obs_single[:3] = self.locomotion_cmd * np.array(CMD_SCALE) self.groot_obs_single[3] = self.groot_height_cmd self.groot_obs_single[4:7] = self.groot_orientation_cmd self.groot_obs_single[7:10] = ang_vel_scaled @@ -177,19 +196,19 @@ class GrootLocomotionController: self.groot_obs_single[13:42] = qj_obs # 29D joint positions self.groot_obs_single[42:71] = dqj_obs # 29D joint velocities self.groot_obs_single[71:86] = self.groot_action # 15D previous actions - + # Add to history and stack observations (6 frames × 86D = 516D) self.groot_obs_history.append(self.groot_obs_single.copy()) - + # Stack all 6 frames into 516D vector for i, obs_frame in enumerate(self.groot_obs_history): start_idx = i * 86 end_idx = start_idx + 86 self.groot_obs_stacked[start_idx:end_idx] = obs_frame - + # Run policy inference (ONNX) with 516D stacked observation obs_tensor = torch.from_numpy(self.groot_obs_stacked).unsqueeze(0) - + # Select appropriate policy based on command magnitude (dual-policy system) cmd_magnitude = np.linalg.norm(self.locomotion_cmd) if cmd_magnitude < 0.05: @@ -198,57 +217,48 @@ class GrootLocomotionController: else: # Use walking policy for movement commands selected_policy = self.policy_walk - + ort_inputs = {selected_policy.get_inputs()[0].name: obs_tensor.cpu().numpy()} ort_outs = selected_policy.run(None, ort_inputs) self.groot_action = ort_outs[0].squeeze() - + # Zero out waist actions (yaw=12, roll=13, pitch=14) - only use leg actions (0-11) self.groot_action[12] = 0.0 # Waist yaw self.groot_action[13] = 0.0 # Waist roll self.groot_action[14] = 0.0 # Waist pitch - + # Transform action to target joint positions (15D: legs + waist) - target_dof_pos_15 = ( - self.GROOT_DEFAULT_ANGLES[:15] + self.groot_action * self.config.locomotion_action_scale - ) - + target_dof_pos_15 = GROOT_DEFAULT_ANGLES[:15] + self.groot_action * LOCOMOTION_ACTION_SCALE + # Send commands to LEG motors (0-11) for i in range(12): motor_idx = i self.robot.msg.motor_cmd[motor_idx].q = target_dof_pos_15[i] self.robot.msg.motor_cmd[motor_idx].qd = 0 - self.robot.msg.motor_cmd[motor_idx].kp = self.config.locomotion_kps[i] - self.robot.msg.motor_cmd[motor_idx].kd = self.config.locomotion_kds[i] + self.robot.msg.motor_cmd[motor_idx].kp = self.robot.kp[motor_idx] + self.robot.msg.motor_cmd[motor_idx].kd = self.robot.kd[motor_idx] self.robot.msg.motor_cmd[motor_idx].tau = 0 - + # Send WAIST commands - but SKIP waist yaw (12) and waist pitch (14) # Only send waist roll (13) waist_roll_idx = 13 waist_roll_action_idx = 13 self.robot.msg.motor_cmd[waist_roll_idx].q = target_dof_pos_15[waist_roll_action_idx] self.robot.msg.motor_cmd[waist_roll_idx].qd = 0 - self.robot.msg.motor_cmd[waist_roll_idx].kp = self.config.locomotion_arm_waist_kps[1] - self.robot.msg.motor_cmd[waist_roll_idx].kd = self.config.locomotion_arm_waist_kds[1] + self.robot.msg.motor_cmd[waist_roll_idx].kp = self.robot.kp[waist_roll_idx] + self.robot.msg.motor_cmd[waist_roll_idx].kd = self.robot.kd[waist_roll_idx] self.robot.msg.motor_cmd[waist_roll_idx].tau = 0 - + # Zero out the problematic joints (waist yaw, waist pitch, wrist pitch/yaw) - for joint_idx in self.PROBLEMATIC_JOINTS: + for joint_idx in PROBLEMATIC_JOINTS: self.robot.msg.motor_cmd[joint_idx].q = 0.0 self.robot.msg.motor_cmd[joint_idx].qd = 0 - if joint_idx in [12, 14]: # waist - kp_idx = 0 if joint_idx == 12 else 2 # yaw or pitch - self.robot.msg.motor_cmd[joint_idx].kp = self.config.locomotion_arm_waist_kps[kp_idx] - self.robot.msg.motor_cmd[joint_idx].kd = self.config.locomotion_arm_waist_kds[kp_idx] - else: # wrists (20, 21, 27, 28) - self.robot.msg.motor_cmd[joint_idx].kp = self.robot.kp_wrist - self.robot.msg.motor_cmd[joint_idx].kd = self.robot.kd_wrist + self.robot.msg.motor_cmd[joint_idx].kp = self.robot.kp[joint_idx] + self.robot.msg.motor_cmd[joint_idx].kd = self.robot.kd[joint_idx] self.robot.msg.motor_cmd[joint_idx].tau = 0 - - # Send command - self.robot.msg.crc = self.robot.crc.Crc(self.robot.msg) - self.robot.lowcmd_publisher.Write(self.robot.msg) - + + self.robot.send_action(self.robot.msg) + def _locomotion_thread_loop(self): """Background thread that runs the locomotion policy at specified rate.""" logger.info("Locomotion thread started") @@ -258,56 +268,49 @@ class GrootLocomotionController: self.groot_locomotion_run() except Exception as e: logger.error(f"Error in locomotion loop: {e}") - + # Sleep to maintain control rate elapsed = time.time() - start_time - sleep_time = max(0, self.config.locomotion_control_dt - elapsed) + sleep_time = max(0, LOCOMOTION_CONTROL_DT - elapsed) time.sleep(sleep_time) logger.info("Locomotion thread stopped") - + def start_locomotion_thread(self): """Start the background locomotion control thread.""" if self.locomotion_running: logger.warning("Locomotion thread already running") return - + logger.info("Starting locomotion control thread...") self.locomotion_running = True self.locomotion_thread = threading.Thread(target=self._locomotion_thread_loop, daemon=True) self.locomotion_thread.start() logger.info("Locomotion control thread started!") - + def stop_locomotion_thread(self): """Stop the background locomotion control thread.""" if not self.locomotion_running: return - + logger.info("Stopping locomotion control thread...") self.locomotion_running = False if self.locomotion_thread: self.locomotion_thread.join(timeout=2.0) logger.info("Locomotion control thread stopped") - + def init_groot_locomotion(self): """Initialize GR00T-style locomotion for ONNX policies (29 DOF, 15D actions).""" logger.info("Starting GR00T locomotion initialization...") - - # Move legs to default position - self.robot.locomotion_move_to_default_pos() - + + # Reset legs to default position + self.robot.reset_legs() + # Wait 3 seconds time.sleep(3.0) - - # Hold default leg position for 2 seconds - self.robot.locomotion_default_pos_state() - + # Start locomotion policy thread logger.info("Starting GR00T locomotion policy control...") self.start_locomotion_thread() - - logger.info("GR00T locomotion initialization complete! Policy is now running.") - logger.info("516D observations (86D × 6 frames), 15D actions (legs + waist)") - if __name__ == "__main__": @@ -335,11 +338,11 @@ if __name__ == "__main__": print("Robot initialized with GR00T locomotion policies") print("Locomotion controller running in background thread") print("Press Ctrl+C to stop") - + try: while True: time.sleep(1.0) except KeyboardInterrupt: print("\nStopping locomotion...") groot_controller.stop_locomotion_thread() - print("Done!") \ No newline at end of file + print("Done!") diff --git a/src/lerobot/robots/unitree_g1/config_unitree_g1.py b/src/lerobot/robots/unitree_g1/config_unitree_g1.py index 63f000512..62130cf1d 100644 --- a/src/lerobot/robots/unitree_g1/config_unitree_g1.py +++ b/src/lerobot/robots/unitree_g1/config_unitree_g1.py @@ -15,9 +15,6 @@ # limitations under the License. from dataclasses import dataclass, field -from typing import Any - -from lerobot.cameras import CameraConfig from ..config import RobotConfig @@ -25,66 +22,94 @@ from ..config import RobotConfig @RobotConfig.register_subclass("unitree_g1") @dataclass class UnitreeG1Config(RobotConfig): - # id: str = "unitree_g1" + # id: str = "unitree_g1" simulation_mode: bool = False - kp_high = 40.0 - kd_high = 3.0 - kp_low = 80.0 - kd_low = 3.0 - kp_wrist = 40.0 - kd_wrist = 1.5 - all_motor_q = None + + kp: list = field( + default_factory=lambda: [ + 150, + 150, + 150, + 300, + 40, + 40, # Left leg pitch, roll, yaw, knee, ankle pitch, ankle roll + 150, + 150, + 150, + 300, + 40, + 40, # Right leg pitch, roll, yaw, knee, ankle pitch, ankle roll + 250, + 250, + 250, # Waist yaw, roll, pitch + 80, + 80, + 80, + 80, # Left shoulder pitch, roll, yaw, elbow (kp_low) + 40, + 40, + 40, # Left wrist roll, pitch, yaw (kp_wrist) + 80, + 80, + 80, + 80, # Right shoulder pitch, roll, yaw, elbow (kp_low) + 40, + 40, + 40, # Right wrist roll, pitch, yaw (kp_wrist) + 80, + 80, + 80, + 80, + 80, + 80, # Other + ] + ) + + kd: list = field( + default_factory=lambda: [ + 2, + 2, + 2, + 4, + 2, + 2, # Left leg pitch, roll, yaw, knee, ankle pitch, ankle roll + 2, + 2, + 2, + 4, + 2, + 2, # Right leg pitch, roll, yaw, knee, ankle pitch, ankle roll + 5, + 5, + 5, # Waist yaw, roll, pitch + 3, + 3, + 3, + 3, # Left shoulder pitch, roll, yaw, elbow (kd_low) + 1.5, + 1.5, + 1.5, # Left wrist roll, pitch, yaw (kd_wrist) + 3, + 3, + 3, + 3, # Right shoulder pitch, roll, yaw, elbow (kd_low) + 1.5, + 1.5, + 1.5, # Right wrist roll, pitch, yaw (kd_wrist) + 3, + 3, + 3, + 3, + 3, + 3, # Other + ] + ) + arm_velocity_limit = 100.0 control_dt = 1.0 / 250.0 - all_motor_q = None - arm_velocity_limit = 100.0 - control_dt = 1.0 / 250.0 - - gradual_start_time: float | None = None - gradual_time: float | None = None - - freeze_body: bool = False - gravity_compensation: bool = True - - cameras: dict[str, CameraConfig] = field(default_factory=dict) - - # Socket communication configuration (REQUIRED) - # This robot class ONLY uses sockets to communicate with a bridge on the Orin - # Run 'python dds_to_socket.py' on the Orin first, then set this to the Orin's IP - # Example: socket_host="192.168.123.164" (Orin's wlan0 IP) - socket_host: str | None = None# = "172.18.129.215" - socket_port: int | None = None - - # Locomotion control - locomotion_control: bool = True - # Pre-loaded policies (preferred method for GR00T locomotion) - policy_walk: Any = None # Pre-loaded walk policy (ONNX InferenceSession) - policy_balance: Any = None # Pre-loaded balance policy (ONNX InferenceSession) - - # Locomotion parameters (from g1.yaml) - locomotion_control_dt: float = 0.02 - leg_joint2motor_idx: list = field(default_factory=lambda: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]) - locomotion_kps: list = field(default_factory=lambda: [150, 150, 150, 300, 40, 40, 150, 150, 150, 300, 40, 40]) - locomotion_kds: list = field(default_factory=lambda: [2, 2, 2, 4, 2, 2, 2, 2, 2, 4, 2, 2]) - default_leg_angles: list = field(default_factory=lambda: [-0.1, 0.0, 0.0, 0.3, -0.2, 0.0, -0.1, 0.0, 0.0, 0.3, -0.2, 0.0]) - arm_waist_joint2motor_idx: list = field(default_factory=lambda: [12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]) - locomotion_arm_waist_kps: list = field(default_factory=lambda: [250, 250, 250, 100, 100, 50, 50, 20, 20, 20, 100, 100, 50, 50, 20, 20, 20]) - locomotion_arm_waist_kds: list = field(default_factory=lambda: [5, 5, 5, 2, 2, 2, 2, 1, 1, 1, 2, 2, 2, 2, 1, 1, 1]) - locomotion_arm_waist_target: list = field(default_factory=lambda: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) - - ang_vel_scale: float = 0.25 - dof_pos_scale: float = 1.0 - dof_vel_scale: float = 0.05 - locomotion_action_scale: float = 0.25 - cmd_scale: list = field(default_factory=lambda: [2.0, 2.0, 0.25]) - - # GR00T-specific scaling (different from regular locomotion!) - groot_ang_vel_scale: float = 0.25 # GR00T uses 0.5, not 0.25 - groot_cmd_scale: list = field(default_factory=lambda: [2.0, 2.0, 0.25]) # yaw is 0.5 for GR00T - num_locomotion_actions: int = 12 - num_locomotion_obs: int = 47 - max_cmd: list = field(default_factory=lambda: [0.8, 0.5, 1.57]) - locomotion_imu_type: str = "pelvis" # "torso" or "pelvis" \ No newline at end of file + default_leg_angles: list = field( + default_factory=lambda: [-0.1, 0.0, 0.0, 0.3, -0.2, 0.0, -0.1, 0.0, 0.0, 0.3, -0.2, 0.0] + ) diff --git a/src/lerobot/robots/unitree_g1/unitree_g1.py b/src/lerobot/robots/unitree_g1/unitree_g1.py index 8bc9722d1..5e6c61939 100644 --- a/src/lerobot/robots/unitree_g1/unitree_g1.py +++ b/src/lerobot/robots/unitree_g1/unitree_g1.py @@ -1,19 +1,12 @@ import json import logging -import select import struct -import sys -import termios import threading import time -import tty -from collections import deque from functools import cached_property from typing import Any import numpy as np -import onnxruntime as ort -import torch from scipy.spatial.transform import Rotation as R from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ from unitree_sdk2py.idl.unitree_hg.msg.dds_ import ( @@ -22,8 +15,12 @@ from unitree_sdk2py.idl.unitree_hg.msg.dds_ import ( ) # idl for g1, h1_2 from unitree_sdk2py.utils.crc import CRC -from lerobot.cameras.utils import make_cameras_from_configs from lerobot.robots.unitree_g1.g1_utils import G1_29_JointArmIndex, G1_29_JointIndex +from lerobot.robots.unitree_g1.unitree_sdk2_socket import ( + ChannelFactoryInitialize, + ChannelPublisher, + ChannelSubscriber, +) from ..robot import Robot from .config_unitree_g1 import UnitreeG1Config @@ -81,46 +78,42 @@ class UnitreeG1(Robot): config_class = UnitreeG1Config name = "unitree_g1" + class RemoteController: + def __init__(self): + self.lx = 0 + self.ly = 0 + self.rx = 0 + self.ry = 0 + self.button = [0] * 16 + + def set(self, data): + # wireless_remote + keys = struct.unpack("H", data[2:4])[0] + for i in range(16): + self.button[i] = (keys & (1 << i)) >> i + self.lx = struct.unpack("f", data[4:8])[0] + self.rx = struct.unpack("f", data[8:12])[0] + self.ry = struct.unpack("f", data[12:16])[0] + self.ly = struct.unpack("f", data[20:24])[0] + def __init__(self, config: UnitreeG1Config): super().__init__(config) logger.info("Initialize UnitreeG1...") self.config = config - self.cameras = make_cameras_from_configs(config.cameras) self.q_target = np.zeros(14) self.tauff_target = np.zeros(14) self.simulation_mode = config.simulation_mode - self.kp_high = config.kp_high - self.kd_high = config.kd_high - self.kp_low = config.kp_low - self.kd_low = config.kd_low - self.kp_wrist = config.kp_wrist - self.kd_wrist = config.kd_wrist - self.all_motor_q = config.all_motor_q + # Unified kp/kd arrays for all 35 motors + self.kp = np.array(config.kp, dtype=np.float32) + self.kd = np.array(config.kd, dtype=np.float32) + self.arm_velocity_limit = config.arm_velocity_limit self.control_dt = config.control_dt - self._gradual_start_time = config.gradual_start_time - self._gradual_time = config.gradual_time - - # Teleop warmup: gradually move from current position to targets over 2 seconds - self.teleop_warmup_duration = 2.0 # seconds - self.teleop_warmup_start_time = None - self.teleop_warmup_initial_q = None - - self.freeze_body = config.freeze_body - self.gravity_compensation = config.gravity_compensation - - self.calibrated = False - - from lerobot.robots.unitree_g1.unitree_sdk2_socket import ( - ChannelFactoryInitialize, - ChannelPublisher, - ChannelSubscriber, - ) # dds - + # Initialize DDS ChannelFactoryInitialize(0) # Always use debug mode (direct motor control) @@ -145,38 +138,13 @@ class UnitreeG1(Robot): self.msg = unitree_hg_msg_dds__LowCmd_() self.msg.mode_pr = 0 self.msg.mode_machine = self.get_mode_machine() - print(self.msg) - self.all_motor_q = self.get_current_motor_q() - print(self.all_motor_q) - logger.info(f"Current all body motor state q:\n{self.all_motor_q} \n") - logger.info(f"Current two arms motor state q:\n{self.get_current_dual_arm_q()}\n") - logger.info("Lock all joints except two arms...\n") - - arm_indices = {member.value for member in G1_29_JointArmIndex} + # Initialize all motors with unified kp/kd from config for id in G1_29_JointIndex: self.msg.motor_cmd[id].mode = 1 - if id.value in arm_indices: - if self._Is_wrist_motor(id): - self.msg.motor_cmd[id].kp = self.kp_wrist - self.msg.motor_cmd[id].kd = self.kd_wrist - else: - self.msg.motor_cmd[id].kp = self.kp_low - self.msg.motor_cmd[id].kd = self.kd_low - else: - if self._Is_weak_motor(id): - self.msg.motor_cmd[id].kp = self.kp_low - self.msg.motor_cmd[id].kd = self.kd_low - else: - self.msg.motor_cmd[id].kp = self.kp_high - self.msg.motor_cmd[id].kd = self.kd_high - self.msg.motor_cmd[id].q = self.all_motor_q[id] - - # Initialize control flags BEFORE starting threads - self.keyboard_thread = None - self.keyboard_running = False - self.locomotion_thread = None - self.locomotion_running = False + self.msg.motor_cmd[id].kp = self.kp[id.value] + self.msg.motor_cmd[id].kd = self.kd[id.value] + self.msg.motor_cmd[id].q = self.get_current_motor_q()[id.value] # Both update different parts of self.msg, both call Write() self.publish_thread = None @@ -187,9 +155,6 @@ class UnitreeG1(Robot): logger.warning("Arm control publish thread started") self.remote_controller = self.RemoteController() - - logger.info("Initialize G1 OK!\n") - def _subscribe_motor_state(self): while True: start_time = time.time() @@ -314,34 +279,6 @@ class UnitreeG1(Robot): """Return current state dq of the left and right arm motors.""" return np.array([self.lowstate_buffer.GetData().motor_state[id].dq for id in G1_29_JointArmIndex]) - def _Is_weak_motor(self, motor_index): - weak_motors = [ - G1_29_JointIndex.kLeftAnklePitch.value, - G1_29_JointIndex.kRightAnklePitch.value, - # Left arm - G1_29_JointIndex.kLeftShoulderPitch.value, - G1_29_JointIndex.kLeftShoulderRoll.value, - G1_29_JointIndex.kLeftShoulderYaw.value, - G1_29_JointIndex.kLeftElbow.value, - # Right arm - G1_29_JointIndex.kRightShoulderPitch.value, - G1_29_JointIndex.kRightShoulderRoll.value, - G1_29_JointIndex.kRightShoulderYaw.value, - G1_29_JointIndex.kRightElbow.value, - ] - return motor_index.value in weak_motors - - def _Is_wrist_motor(self, motor_index): - wrist_motors = [ - G1_29_JointIndex.kLeftWristRoll.value, - G1_29_JointIndex.kLeftWristPitch.value, - G1_29_JointIndex.kLeftWristyaw.value, - G1_29_JointIndex.kRightWristRoll.value, - G1_29_JointIndex.kRightWristPitch.value, - G1_29_JointIndex.kRightWristYaw.value, - ] - return motor_index.value in wrist_motors - @cached_property def action_features(self) -> dict[str, type]: return {f"{G1_29_JointArmIndex(motor).name}.pos": float for motor in G1_29_JointArmIndex} @@ -372,64 +309,8 @@ class UnitreeG1(Robot): logger.info(f"{self} disconnected.") - def get_full_robot_state(self) -> dict[str, Any]: - """ - Get full robot state including IMU and extended motor data. - - Returns: - dict with keys: - - 'imu': dict containing IMU data (quaternion, gyroscope, accelerometer, rpy, temperature) - - 'motors': list of dicts, one per motor, containing q, dq, tau_est, temperature - """ - lowstate = self.lowstate_buffer.GetData() - if lowstate is None: - raise RuntimeError("No robot state available. Is the robot connected?") - - # Extract IMU data - imu_data = { - "quaternion": lowstate.imu_state.quaternion, # [w, x, y, z] - "gyroscope": lowstate.imu_state.gyroscope, # [x, y, z] rad/s - "accelerometer": lowstate.imu_state.accelerometer, # [x, y, z] m/s² - "rpy": lowstate.imu_state.rpy, # [roll, pitch, yaw] rad - "temperature": lowstate.imu_state.temperature, # °C - } - - # Extract motor data - motors_data = [] - for i in range(G1_29_Num_Motors): - motor = lowstate.motor_state[i] - motors_data.append( - { - "id": i, - "q": motor.q, # position (rad) - "dq": motor.dq, # velocity (rad/s) - "tau_est": motor.tau_est, # estimated torque (Nm) - "temperature": motor.temperature[0] - if isinstance(motor.temperature, (list, tuple)) - else motor.temperature, # °C - } - ) - - return { - "imu": imu_data, - "motors": motors_data, - } - def get_observation(self) -> dict[str, Any]: - obs_array = self.get_current_dual_arm_q() - obs_dict = { - f"{G1_29_JointArmIndex(motor).name}.pos": val - for motor, val in zip(G1_29_JointArmIndex, obs_array, strict=True) - } - - # Capture images from cameras - for cam_key, cam in self.cameras.items(): - start = time.perf_counter() - obs_dict[cam_key] = cam.async_read() - dt_ms = (time.perf_counter() - start) * 1e3 - logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") - - return obs_dict + return self.lowstate_buffer.GetData() @property def is_calibrated(self) -> bool: @@ -454,129 +335,23 @@ class UnitreeG1(Robot): return {**self._motors_ft, **self._cameras_ft} def send_action(self, action: dict[str, Any]) -> dict[str, Any]: - # need a any to any teleoperator solution. i wanna teleoperate a horse with a shoe. action - # to action mapping, when you do teleoperate. the keys that are left empty are just set to 0 - # also what would be fun is finding all sorts of robots and adding them to lerobot, see if people do the same. - # then teleop them wiuth the glove hehe - # then we get ALL THE DATA - if self.is_calibrated: - uncalibrated_action = action.copy() - action = self.invert_calibration(action) - # if an action was 0.5 write 0 in its place - for key, value in uncalibrated_action.items(): - if value == 0.5: - action[key] = 0.0 - # check if action is within bounds - for key, value in action.items(): - if value < self.calibration[key]["range_min"] or value > self.calibration[key]["range_max"]: - raise ValueError( - f"Action value {value} for {key} is out of bounds, actions are not normalized" - ) - if self.freeze_body: - arm_joint_indices = set(range(15, 29)) # 15–28 are arms - for jid in G1_29_JointIndex: - if jid.value not in arm_joint_indices: - self.msg.motor_cmd[jid].mode = 1 - self.msg.motor_cmd[jid].q = 0.0 - self.msg.motor_cmd[jid].dq = 0.0 - self.msg.motor_cmd[jid].tau = 0.0 + self.msg.crc = self.crc.Crc(action) + self.lowcmd_publisher.Write(action) - action_np = np.stack([v for v in action.values()]) - # action_np is just zeros - # action_np = np.zeros(14) - # print(action_np) - # exit() - - tau = np.zeros(14) - - self.ctrl_dual_arm(action_np, tau) - - def apply_calibration(self, action: dict[str, float]) -> dict[str, float]: - """Map motor ranges to [0, 1].""" - calibrated = {} - for key, value in action.items(): - value = float(value.item()) - - cal = self.calibration[key] - mn, mx = cal["range_min"], cal["range_max"] - - if mx == mn: - norm = 0.0 - else: - norm = (value - mn) / (mx - mn) - norm = max(0.0, min(1.0, norm)) - - # Round to 5 decimal places to avoid floating point precision issues - calibrated[key] = round(norm, 5) - - return calibrated - - def invert_calibration(self, action: dict[str, float]) -> dict[str, float]: - """Map [0, 1] actions back to motor ranges.""" - calibrated = {} - for key, value in action.items(): - value = float(value.item()) if hasattr(value, "item") else float(value) - - cal = self.calibration[key] - mn, mx = cal["range_min"], cal["range_max"] - - # inverse mapping - real_val = mn + value * (mx - mn) - - # Round to 5 decimal places to avoid floating point precision issues - calibrated[key] = round(real_val, 5) - - return calibrated - - ###################LOCOMOTION CONTROL################### - - def locomotion_create_damping_cmd(self): - """Set all motors to damping mode (kp=0, kd=8).""" - size = len(self.msg.motor_cmd) - for i in range(size): - self.msg.motor_cmd[i].q = 0 - self.msg.motor_cmd[i].qd = 0 - self.msg.motor_cmd[i].kp = 0 - self.msg.motor_cmd[i].kd = 8 - self.msg.motor_cmd[i].tau = 0 - self.msg.crc = self.crc.Crc(self.msg) - self.lowcmd_publisher.Write(self.msg) - - def locomotion_create_zero_cmd(self): - """Set all motors to zero torque mode.""" - size = len(self.msg.motor_cmd) - for i in range(size): - self.msg.motor_cmd[i].q = 0 - self.msg.motor_cmd[i].qd = 0 - self.msg.motor_cmd[i].kp = 0 - self.msg.motor_cmd[i].kd = 0 - self.msg.motor_cmd[i].tau = 0 - self.msg.crc = self.crc.Crc(self.msg) - self.lowcmd_publisher.Write(self.msg) - - def locomotion_zero_torque_state(self): - """Enter zero torque state.""" - logger.info("Enter zero torque state.") - self.locomotion_create_zero_cmd() - time.sleep(self.config.locomotion_control_dt) - - def locomotion_move_to_default_pos(self): + def reset_legs(self): """Move robot legs to default standing position over 2 seconds (arms are not moved).""" - logger.info("Moving legs to default locomotion pos.") total_time = 2.0 - num_step = int(total_time / self.config.locomotion_control_dt) + num_step = int(total_time / self.control_dt) # Only control legs, not arms dof_idx = self.config.leg_joint2motor_idx - kps = self.config.locomotion_kps - kds = self.config.locomotion_kds default_pos = np.array(self.config.default_leg_angles, dtype=np.float32) dof_size = len(dof_idx) # Get current lowstate lowstate = self.lowstate_buffer.GetData() if lowstate is None: - logger.error("Cannot get lowstate for locomotion") + logger.error("Cannot get lowstate") return # Record the current leg positions @@ -592,55 +367,15 @@ class UnitreeG1(Robot): target_pos = default_pos[j] self.msg.motor_cmd[motor_idx].q = init_dof_pos[j] * (1 - alpha) + target_pos * alpha self.msg.motor_cmd[motor_idx].qd = 0 - self.msg.motor_cmd[motor_idx].kp = kps[j] - self.msg.motor_cmd[motor_idx].kd = kds[j] + self.msg.motor_cmd[motor_idx].kp = self.kp[motor_idx] + self.msg.motor_cmd[motor_idx].kd = self.kd[motor_idx] self.msg.motor_cmd[motor_idx].tau = 0 self.msg.crc = self.crc.Crc(self.msg) self.lowcmd_publisher.Write(self.msg) - time.sleep(self.config.locomotion_control_dt) - logger.info("Reached default locomotion position (legs only)") + time.sleep(self.control_dt) + logger.info("Reached default position (legs only)") - def locomotion_default_pos_state(self): - """Hold default leg position for 2 seconds (arms are not controlled).""" - logger.info("Enter default pos state - holding legs for 2 seconds") - - # Only control legs, not arms - for i in range(len(self.config.leg_joint2motor_idx)): - motor_idx = self.config.leg_joint2motor_idx[i] - self.msg.motor_cmd[motor_idx].q = self.config.default_leg_angles[i] - self.msg.motor_cmd[motor_idx].qd = 0 - self.msg.motor_cmd[motor_idx].kp = self.config.locomotion_kps[i] - self.msg.motor_cmd[motor_idx].kd = self.config.locomotion_kds[i] - self.msg.motor_cmd[motor_idx].tau = 0 - - # Hold leg position for 2 seconds - hold_time = 2.0 - num_steps = int(hold_time / self.config.locomotion_control_dt) - for _ in range(num_steps): - self.msg.crc = self.crc.Crc(self.msg) - self.lowcmd_publisher.Write(self.msg) - time.sleep(self.config.locomotion_control_dt) - logger.info("Finished holding default leg position") - - class RemoteController: - def __init__(self): - self.lx = 0 - self.ly = 0 - self.rx = 0 - self.ry = 0 - self.button = [0] * 16 - - def set(self, data): - # wireless_remote - keys = struct.unpack("H", data[2:4])[0] - for i in range(16): - self.button[i] = (keys & (1 << i)) >> i - self.lx = struct.unpack("f", data[4:8])[0] - self.rx = struct.unpack("f", data[8:12])[0] - self.ry = struct.unpack("f", data[12:16])[0] - self.ly = struct.unpack("f", data[20:24])[0] - - def locomotion_get_gravity_orientation(self, quaternion): + def get_gravity_orientation(self, quaternion): """Get gravity orientation from quaternion.""" qw = quaternion[0] qx = quaternion[1] @@ -651,10 +386,9 @@ class UnitreeG1(Robot): gravity_orientation[0] = 2 * (-qz * qx + qw * qy) gravity_orientation[1] = -2 * (qz * qy + qw * qx) gravity_orientation[2] = 1 - 2 * (qw * qw + qz * qz) - return gravity_orientation - def locomotion_transform_imu_data(self, waist_yaw, waist_yaw_omega, imu_quat, imu_omega): + def transform_imu_data(self, waist_yaw, waist_yaw_omega, imu_quat, imu_omega): """Transform IMU data from torso to pelvis frame.""" RzWaist = R.from_euler("z", waist_yaw).as_matrix() R_torso = R.from_quat([imu_quat[1], imu_quat[2], imu_quat[3], imu_quat[0]]).as_matrix()