Merge branch 'main' into feature/add-multitask-dit

This commit is contained in:
Bryson Jones
2026-01-07 08:11:43 -08:00
committed by GitHub
7 changed files with 434 additions and 218 deletions
+25 -30
View File
@@ -1,21 +1,21 @@
# Unitree G1 Robot Setup and Control # Unitree G1
This guide covers the complete setup process for the Unitree G1 humanoid, from initial connection to running gr00t_wbc locomotion. This guide covers the complete setup process for the Unitree G1 humanoid, from initial connection to running gr00t_wbc locomotion.
## About the Unitree G1 ## About
We offer support for both 29 and 23 DOF G1. We introduce: We support both 29 and 23 DOF G1 EDU version. We introduce:
- **`unitree g1` robot class, handling low level communication with the humanoid** - **`unitree g1` robot class, handling low level read/write from/to the humanoid**
- **ZMQ socket bridge** for remote communication over WiFi, allowing one to deploy policies remotely instead of over ethernet or directly on the Orin - **ZMQ socket bridge** for remote communication over wlan, allowing for remote policy deployment as well as over eth or directly on the Orin
- **GR00T locomotion policy** for bipedal walking and balance - **Locomotion policies** from NVIDIA gr00t and Amazon FAR Holosoma
- **MuJoCo simulation mode** for testing policies without the physical robot - **Simulation mode** for testing policies without the physical robot in mujoco
--- ---
## Part 1: Connect to Robot over Ethernet ## Connection guide
### Step 1: Configure Your Computer's Ethernet Interface ### Step 1: Configure Ethernet Interface
Set a static IP on the same subnet as the robot: Set a static IP on the same subnet as the robot:
@@ -26,7 +26,7 @@ sudo ip addr add 192.168.123.200/24 dev enp131s0
sudo ip link set enp131s0 up sudo ip link set enp131s0 up
``` ```
**Note**: The robot's Ethernet IP is fixed at `192.168.123.164`. Your computer must use `192.168.123.x` where x ≠ 164. **Note**: The G1's Ethernet IP is fixed at `192.168.123.164`. Your computer must use `192.168.123.x` with x ≠ 164.
### Step 2: SSH into the Robot ### Step 2: SSH into the Robot
@@ -35,25 +35,24 @@ ssh unitree@192.168.123.164
# Password: 123 # Password: 123
``` ```
You should now be connected to the robot's onboard computer. You should now be connected to the G1's Orin.
--- ---
## Part 2: Enable WiFi on the Robot ## Part 2: Enable WiFi on the Robot
Once connected via Ethernet, follow these steps to enable WiFi: Wlan0 is disabled by default on the G1. To enable it:
### Step 1: Enable WiFi Hardware ### Step 1: Enable WiFi Hardware
```bash ```bash
# Unblock WiFi radio
sudo rfkill unblock wifi sudo rfkill unblock wifi
sudo rfkill unblock all sudo rfkill unblock all
# Bring up WiFi interface # Bring up wlan0
sudo ip link set wlan0 up sudo ip link set wlan0 up
# Enable NetworkManager control # Enable NetworkManager control of wlan0
sudo nmcli radio wifi on sudo nmcli radio wifi on
sudo nmcli device set wlan0 managed yes sudo nmcli device set wlan0 managed yes
sudo systemctl restart NetworkManager sudo systemctl restart NetworkManager
@@ -73,7 +72,7 @@ sudo iptables -A FORWARD -i wlp132s0f0 -o enp131s0 -m state --state RELATED,ESTA
sudo iptables -A FORWARD -i enp131s0 -o wlp132s0f0 -j ACCEPT sudo iptables -A FORWARD -i enp131s0 -o wlp132s0f0 -j ACCEPT
``` ```
**On the robot:** **On the G1:**
```bash ```bash
# Add laptop as default gateway # Add laptop as default gateway
@@ -147,9 +146,9 @@ python src/lerobot/robots/unitree_g1/run_g1_server.py
--- ---
## Part 4: Running GR00T Locomotion ## Part 4: Controlling the robot
With the robot server running, you can now control the robot from your laptop. With the robot server running, you can now control the robot remotely. Let's launch a locomotion policy
### Step 1: Install LeRobot on your machine ### Step 1: Install LeRobot on your machine
@@ -172,34 +171,30 @@ Edit the config file to match your robot's WiFi IP:
robot_ip: str = "<YOUR_ROBOT_IP>" # Replace with your robot's WiFi IP. robot_ip: str = "<YOUR_ROBOT_IP>" # Replace with your robot's WiFi IP.
``` ```
**Note**: When running directly on the G1 (not remotely), set `robot_ip: str = "127.0.0.1"` instead.
### Step 3: Run the Locomotion Policy ### Step 3: Run the Locomotion Policy
```bash ```bash
# Run GR00T locomotion controller # Run GR00T locomotion controller
python examples/unitree_g1/gr00t_locomotion.py --repo-id "nepyope/GR00T-WholeBodyControl_g1" python examples/unitree_g1/gr00t_locomotion.py --repo-id "nepyope/GR00T-WholeBodyControl_g1"
# Run Holosoma locomotion controller
python examples/unitree_g1/holosoma_locomotion.py
``` ```
### Step 4: Control with Remote
- **Left stick**: Forward/backward and left/right movement
- **Right stick**: Rotation
- **R1 button**: Raise waist height
- **R2 button**: Lower waist height
Press `Ctrl+C` to stop the policy. Press `Ctrl+C` to stop the policy.
--- ---
## Extra: Running in Simulation Mode (MuJoCo) ## Running in Simulation Mode (MuJoCo)
You can now test and develop policies without a physical robot using MuJoCo. to do so set `is_simulation=True` in config. You can now test and develop policies without a physical robot using MuJoCo. To do so simply set `is_simulation=True` in config.
## Additional Resources ## Additional Resources
- [Unitree SDK Documentation](https://github.com/unitreerobotics/unitree_sdk2_python) - [Unitree SDK Documentation](https://github.com/unitreerobotics/unitree_sdk2_python)
- [GR00T Policy Repository](https://huggingface.co/nepyope/GR00T-WholeBodyControl_g1) - [GR00T-WholeBodyControl](https://github.com/NVlabs/GR00T-WholeBodyControl)
- [Holosoma](https://github.com/amazon-far/holosoma)
- [LeRobot Documentation](https://github.com/huggingface/lerobot) - [LeRobot Documentation](https://github.com/huggingface/lerobot)
- [Unitree_IL_Lerobot](https://github.com/unitreerobotics/unitree_IL_lerobot) - [Unitree_IL_Lerobot](https://github.com/unitreerobotics/unitree_IL_lerobot)
+82 -168
View File
@@ -13,16 +13,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""
Example: GR00T Locomotion with Pre-loaded Policies
This example demonstrates the NEW pattern for loading GR00T policies externally
and passing them to the robot class.
"""
import argparse import argparse
import logging import logging
import threading
import time import time
from collections import deque from collections import deque
@@ -31,24 +24,26 @@ import onnxruntime as ort
from huggingface_hub import hf_hub_download from huggingface_hub import hf_hub_download
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1 from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
GROOT_DEFAULT_ANGLES = np.zeros(29, dtype=np.float32) GROOT_DEFAULT_ANGLES = np.zeros(29, dtype=np.float32)
GROOT_DEFAULT_ANGLES[[0, 6]] = -0.1 # hip pitch GROOT_DEFAULT_ANGLES[[0, 6]] = -0.1 # Hip pitch
GROOT_DEFAULT_ANGLES[[3, 9]] = 0.3 # knee GROOT_DEFAULT_ANGLES[[3, 9]] = 0.3 # Knee
GROOT_DEFAULT_ANGLES[[4, 10]] = -0.2 # ankle pitch GROOT_DEFAULT_ANGLES[[4, 10]] = -0.2 # Ankle pitch
MISSING_JOINTS = [] MISSING_JOINTS = []
G1_MODEL = "g1_23" # or "g1_29" G1_MODEL = "g1_23" # Or "g1_29"
if G1_MODEL == "g1_23": if G1_MODEL == "g1_23":
MISSING_JOINTS = [12, 14, 20, 21, 27, 28] # waist yaw/pitch, wrist pitch/yaw MISSING_JOINTS = [12, 14, 20, 21, 27, 28] # Waist yaw/pitch, wrist pitch/yaw
LOCOMOTION_ACTION_SCALE = 0.25
LOCOMOTION_CONTROL_DT = 0.02
# Control parameters
ACTION_SCALE = 0.25
CONTROL_DT = 0.02 # 50Hz
ANG_VEL_SCALE: float = 0.25 ANG_VEL_SCALE: float = 0.25
DOF_POS_SCALE: float = 1.0 DOF_POS_SCALE: float = 1.0
DOF_VEL_SCALE: float = 0.05 DOF_VEL_SCALE: float = 0.05
@@ -61,12 +56,12 @@ DEFAULT_GROOT_REPO_ID = "nepyope/GR00T-WholeBodyControl_g1"
def load_groot_policies( def load_groot_policies(
repo_id: str = DEFAULT_GROOT_REPO_ID, repo_id: str = DEFAULT_GROOT_REPO_ID,
) -> tuple[ort.InferenceSession, ort.InferenceSession]: ) -> tuple[ort.InferenceSession, ort.InferenceSession]:
"""Load GR00T dual-policy system (Balance + Walk) from Hugging Face Hub. """Load GR00T dual-policy system (Balance + Walk) from the hub.
Args: Args:
repo_id: Hugging Face Hub repository ID containing the ONNX policies. repo_id: Hugging Face Hub repository ID containing the ONNX policies.
""" """
logger.info(f"Loading GR00T dual-policy system from Hugging Face Hub ({repo_id})...") logger.info(f"Loading GR00T dual-policy system from the hub ({repo_id})...")
# Download ONNX policies from Hugging Face Hub # Download ONNX policies from Hugging Face Hub
balance_path = hf_hub_download( balance_path = hf_hub_download(
@@ -88,15 +83,7 @@ def load_groot_policies(
class GrootLocomotionController: class GrootLocomotionController:
""" """GR00T lower-body locomotion controller for the Unitree G1."""
Handles GR00T-style locomotion control for the Unitree G1 robot.
This controller manages:
- Dual-policy system (Balance + Walk)
- 29-joint observation processing
- 15D action output (legs + waist)
- Policy inference and motor command generation
"""
def __init__(self, policy_balance, policy_walk, robot, config): def __init__(self, policy_balance, policy_walk, robot, config):
self.policy_balance = policy_balance self.policy_balance = policy_balance
@@ -104,9 +91,9 @@ class GrootLocomotionController:
self.robot = robot self.robot = robot
self.config = config self.config = config
self.locomotion_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) # vx, vy, theta_dot self.cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) # vx, vy, theta_dot
# GR00T-specific state # Robot state
self.groot_qj_all = np.zeros(29, dtype=np.float32) self.groot_qj_all = np.zeros(29, dtype=np.float32)
self.groot_dqj_all = np.zeros(29, dtype=np.float32) self.groot_dqj_all = np.zeros(29, dtype=np.float32)
self.groot_action = np.zeros(15, dtype=np.float32) self.groot_action = np.zeros(15, dtype=np.float32)
@@ -116,24 +103,20 @@ class GrootLocomotionController:
self.groot_height_cmd = 0.74 # Default base height self.groot_height_cmd = 0.74 # Default base height
self.groot_orientation_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) self.groot_orientation_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32)
# input to gr00t is 6 frames (6*86D=516) # Input to GR00T is 6 frames (6*86D=516)
for _ in range(6): for _ in range(6):
self.groot_obs_history.append(np.zeros(86, dtype=np.float32)) self.groot_obs_history.append(np.zeros(86, dtype=np.float32))
# Thread management
self.locomotion_running = False
self.locomotion_thread = None
logger.info("GrootLocomotionController initialized") logger.info("GrootLocomotionController initialized")
def groot_locomotion_run(self): def run_step(self):
# get current observation # Get current observation
robot_state = self.robot.get_observation() robot_state = self.robot.get_observation()
if robot_state is None: if robot_state is None:
return return
# get command from remote controller # Get command from remote controller
if robot_state.wireless_remote is not None: if robot_state.wireless_remote is not None:
self.robot.remote_controller.set(robot_state.wireless_remote) self.robot.remote_controller.set(robot_state.wireless_remote)
if self.robot.remote_controller.button[0]: # R1 - raise waist if self.robot.remote_controller.button[0]: # R1 - raise waist
@@ -148,15 +131,16 @@ class GrootLocomotionController:
self.robot.remote_controller.rx = 0.0 self.robot.remote_controller.rx = 0.0
self.robot.remote_controller.ry = 0.0 self.robot.remote_controller.ry = 0.0
self.locomotion_cmd[0] = self.robot.remote_controller.ly # forward/backward self.cmd[0] = self.robot.remote_controller.ly # Forward/backward
self.locomotion_cmd[1] = self.robot.remote_controller.lx * -1 # left/right self.cmd[1] = self.robot.remote_controller.lx * -1 # Left/right
self.locomotion_cmd[2] = self.robot.remote_controller.rx * -1 # rotation rate self.cmd[2] = self.robot.remote_controller.rx * -1 # Rotation rate
# Get joint positions and velocities
for i in range(29): for i in range(29):
self.groot_qj_all[i] = robot_state.motor_state[i].q self.groot_qj_all[i] = robot_state.motor_state[i].q
self.groot_dqj_all[i] = robot_state.motor_state[i].dq self.groot_dqj_all[i] = robot_state.motor_state[i].dq
# adapt observation for g1_23dof # Adapt observation for g1_23dof
for idx in MISSING_JOINTS: for idx in MISSING_JOINTS:
self.groot_qj_all[idx] = 0.0 self.groot_qj_all[idx] = 0.0
self.groot_dqj_all[idx] = 0.0 self.groot_dqj_all[idx] = 0.0
@@ -165,18 +149,18 @@ class GrootLocomotionController:
qj_obs = self.groot_qj_all.copy() qj_obs = self.groot_qj_all.copy()
dqj_obs = self.groot_dqj_all.copy() dqj_obs = self.groot_dqj_all.copy()
# express imu data in gravity frame of reference # Express IMU data in gravity frame of reference
quat = robot_state.imu_state.quaternion quat = robot_state.imu_state.quaternion
ang_vel = np.array(robot_state.imu_state.gyroscope, dtype=np.float32) ang_vel = np.array(robot_state.imu_state.gyroscope, dtype=np.float32)
gravity_orientation = self.robot.get_gravity_orientation(quat) gravity_orientation = self.robot.get_gravity_orientation(quat)
# scale joint positions and velocities before policy inference # Scale joint positions and velocities before policy inference
qj_obs = (qj_obs - GROOT_DEFAULT_ANGLES) * DOF_POS_SCALE qj_obs = (qj_obs - GROOT_DEFAULT_ANGLES) * DOF_POS_SCALE
dqj_obs = dqj_obs * DOF_VEL_SCALE dqj_obs = dqj_obs * DOF_VEL_SCALE
ang_vel_scaled = ang_vel * ANG_VEL_SCALE ang_vel_scaled = ang_vel * ANG_VEL_SCALE
# build single frame observation # Build single frame observation
self.groot_obs_single[:3] = self.locomotion_cmd * np.array(CMD_SCALE) self.groot_obs_single[:3] = self.cmd * np.array(CMD_SCALE)
self.groot_obs_single[3] = self.groot_height_cmd self.groot_obs_single[3] = self.groot_height_cmd
self.groot_obs_single[4:7] = self.groot_orientation_cmd self.groot_obs_single[4:7] = self.groot_orientation_cmd
self.groot_obs_single[7:10] = ang_vel_scaled self.groot_obs_single[7:10] = ang_vel_scaled
@@ -194,113 +178,74 @@ class GrootLocomotionController:
end_idx = start_idx + 86 end_idx = start_idx + 86
self.groot_obs_stacked[start_idx:end_idx] = obs_frame self.groot_obs_stacked[start_idx:end_idx] = obs_frame
# Run policy inference (ONNX) with 516D stacked observation cmd_magnitude = np.linalg.norm(self.cmd)
cmd_magnitude = np.linalg.norm(self.locomotion_cmd)
selected_policy = ( selected_policy = (
self.policy_balance if cmd_magnitude < 0.05 else self.policy_walk self.policy_balance if cmd_magnitude < 0.05 else self.policy_walk
) # balance/standing policy for small commands, walking policy for movement commands ) # Balance/standing policy for small commands, walking policy for movement commands
# run policy inference # Run policy inference
ort_inputs = {selected_policy.get_inputs()[0].name: np.expand_dims(self.groot_obs_stacked, axis=0)} ort_inputs = {selected_policy.get_inputs()[0].name: np.expand_dims(self.groot_obs_stacked, axis=0)}
ort_outs = selected_policy.run(None, ort_inputs) ort_outs = selected_policy.run(None, ort_inputs)
self.groot_action = ort_outs[0].squeeze() self.groot_action = ort_outs[0].squeeze()
# transform action back to target joint positions # Transform action back to target joint positions
target_dof_pos_15 = GROOT_DEFAULT_ANGLES[:15] + self.groot_action * LOCOMOTION_ACTION_SCALE target_dof_pos_15 = GROOT_DEFAULT_ANGLES[:15] + self.groot_action * ACTION_SCALE
# command motors # Build action dict (only first 15 joints for GR00T)
action_dict = {}
for i in range(15): for i in range(15):
motor_idx = i motor_name = G1_29_JointIndex(i).name
self.robot.msg.motor_cmd[motor_idx].q = target_dof_pos_15[i] action_dict[f"{motor_name}.q"] = float(target_dof_pos_15[i])
self.robot.msg.motor_cmd[motor_idx].qd = 0
self.robot.msg.motor_cmd[motor_idx].kp = self.robot.kp[motor_idx]
self.robot.msg.motor_cmd[motor_idx].kd = self.robot.kd[motor_idx]
self.robot.msg.motor_cmd[motor_idx].tau = 0
# adapt action for g1_23dof # Zero out missing joints for g1_23dof
for joint_idx in MISSING_JOINTS: for joint_idx in MISSING_JOINTS:
self.robot.msg.motor_cmd[joint_idx].q = 0.0 motor_name = G1_29_JointIndex(joint_idx).name
self.robot.msg.motor_cmd[joint_idx].qd = 0 action_dict[f"{motor_name}.q"] = 0.0
self.robot.msg.motor_cmd[joint_idx].kp = self.robot.kp[joint_idx]
self.robot.msg.motor_cmd[joint_idx].kd = self.robot.kd[joint_idx]
self.robot.msg.motor_cmd[joint_idx].tau = 0
# send action to robot # Send action to robot
self.robot.send_action(self.robot.msg) self.robot.send_action(action_dict)
def _locomotion_thread_loop(self):
"""Background thread that runs the locomotion policy at specified rate.""" def run(repo_id: str = DEFAULT_GROOT_REPO_ID) -> None:
logger.info("Locomotion thread started") """Main function to run the GR00T locomotion controller.
while self.locomotion_running:
Args:
repo_id: Hugging Face Hub repository ID for GR00T policies.
"""
# Load policies
policy_balance, policy_walk = load_groot_policies(repo_id=repo_id)
# Initialize robot
config = UnitreeG1Config()
robot = UnitreeG1(config)
# Initialize gr00T locomotion controller
groot_controller = GrootLocomotionController(
policy_balance=policy_balance,
policy_walk=policy_walk,
robot=robot,
config=config,
)
try:
robot.reset(CONTROL_DT, GROOT_DEFAULT_ANGLES)
logger.info("Use joystick: LY=fwd/back, LX=left/right, RX=rotate, R1=raise waist, R2=lower waist")
logger.info("Press Ctrl+C to stop")
# Run step
while True:
start_time = time.time() start_time = time.time()
try: groot_controller.run_step()
self.groot_locomotion_run()
except Exception as e:
logger.error(f"Error in locomotion loop: {e}")
# Sleep to maintain control rate
elapsed = time.time() - start_time elapsed = time.time() - start_time
sleep_time = max(0, LOCOMOTION_CONTROL_DT - elapsed) sleep_time = max(0, CONTROL_DT - elapsed)
time.sleep(sleep_time) time.sleep(sleep_time)
logger.info("Locomotion thread stopped") except KeyboardInterrupt:
logger.info("Stopping locomotion...")
def start_locomotion_thread(self): finally:
if self.locomotion_running: if robot.is_connected:
logger.warning("Locomotion thread already running") robot.disconnect()
return logger.info("Done!")
logger.info("Starting locomotion control thread...")
self.locomotion_running = True
self.locomotion_thread = threading.Thread(target=self._locomotion_thread_loop, daemon=True)
self.locomotion_thread.start()
logger.info("Locomotion control thread started!")
def stop_locomotion_thread(self):
if not self.locomotion_running:
return
logger.info("Stopping locomotion control thread...")
self.locomotion_running = False
if self.locomotion_thread:
self.locomotion_thread.join(timeout=2.0)
logger.info("Locomotion control thread stopped")
def reset_robot(self):
"""Move robot legs to default standing position over 2 seconds (arms are not moved)."""
total_time = 3.0
num_step = int(total_time / self.robot.control_dt)
# Only control legs, not arms (first 12 joints)
default_pos = GROOT_DEFAULT_ANGLES # First 12 values are leg angles
dof_size = len(default_pos)
# Get current lowstate
robot_state = self.robot.get_observation()
# Record the current leg positions
init_dof_pos = np.zeros(dof_size, dtype=np.float32)
for i in range(dof_size):
init_dof_pos[i] = robot_state.motor_state[i].q
# Move legs to default pos
for i in range(num_step):
alpha = i / num_step
for motor_idx in range(dof_size):
target_pos = default_pos[motor_idx]
self.robot.msg.motor_cmd[motor_idx].q = (
init_dof_pos[motor_idx] * (1 - alpha) + target_pos * alpha
)
self.robot.msg.motor_cmd[motor_idx].qd = 0
self.robot.msg.motor_cmd[motor_idx].kp = self.robot.kp[motor_idx]
self.robot.msg.motor_cmd[motor_idx].kd = self.robot.kd[motor_idx]
self.robot.msg.motor_cmd[motor_idx].tau = 0
self.robot.msg.crc = self.robot.crc.Crc(self.robot.msg)
self.robot.lowcmd_publisher.Write(self.robot.msg)
time.sleep(self.robot.control_dt)
logger.info("Reached default position (legs only)")
if __name__ == "__main__": if __name__ == "__main__":
@@ -313,35 +258,4 @@ if __name__ == "__main__":
) )
args = parser.parse_args() args = parser.parse_args()
# load policies run(repo_id=args.repo_id)
policy_balance, policy_walk = load_groot_policies(repo_id=args.repo_id)
# initialize robot
config = UnitreeG1Config()
robot = UnitreeG1(config)
# initialize gr00t locomotion controller
groot_controller = GrootLocomotionController(
policy_balance=policy_balance,
policy_walk=policy_walk,
robot=robot,
config=config,
)
# reset legs and start locomotion thread
try:
groot_controller.reset_robot()
groot_controller.start_locomotion_thread()
# log status
logger.info("Robot initialized with GR00T locomotion policies")
logger.info("Locomotion controller running in background thread")
logger.info("Press Ctrl+C to stop")
# keep robot alive
while True:
time.sleep(1.0)
except KeyboardInterrupt:
print("\nStopping locomotion...")
groot_controller.stop_locomotion_thread()
print("Done!")
+264
View File
@@ -0,0 +1,264 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import logging
import time
import numpy as np
import onnx
import onnxruntime as ort
from huggingface_hub import hf_hub_download
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DEFAULT_ANGLES = np.zeros(29, dtype=np.float32)
DEFAULT_ANGLES[[0, 6]] = -0.312 # Hip pitch
DEFAULT_ANGLES[[3, 9]] = 0.669 # Knee
DEFAULT_ANGLES[[4, 10]] = -0.363 # Ankle pitch
DEFAULT_ANGLES[[15, 22]] = 0.2 # Shoulder pitch
DEFAULT_ANGLES[16] = 0.2 # Left shoulder roll
DEFAULT_ANGLES[23] = -0.2 # Right shoulder roll
DEFAULT_ANGLES[[18, 25]] = 0.6 # Elbow
MISSING_JOINTS = []
G1_MODEL = "g1_23" # Or "g1_29"
if G1_MODEL == "g1_23":
MISSING_JOINTS = [12, 14, 20, 21, 27, 28] # Waist yaw/pitch, wrist pitch/yaw
# Control parameters
ACTION_SCALE = 0.25
CONTROL_DT = 0.02 # 50Hz
ANG_VEL_SCALE = 0.25
DOF_POS_SCALE = 1.0
DOF_VEL_SCALE = 0.05
GAIT_PERIOD = 1.0
DEFAULT_HOLOSOMA_REPO_ID = "nepyope/holosoma_locomotion"
# Policy filename mapping
POLICY_FILES = {
"fastsac": "fastsac_g1_29dof.onnx",
"ppo": "ppo_g1_29dof.onnx",
}
def load_policy(
repo_id: str = DEFAULT_HOLOSOMA_REPO_ID,
policy_type: str = "fastsac",
) -> tuple[ort.InferenceSession, np.ndarray, np.ndarray]:
"""Load Holosoma locomotion policy and extract KP/KD from metadata.
Args:
repo_id: Hugging Face Hub repo ID
policy_type: Either "fastsac" (default) or "ppo"
Returns:
(policy, kp, kd) tuple
"""
if policy_type not in POLICY_FILES:
raise ValueError(f"Unknown policy type: {policy_type}. Choose from: {list(POLICY_FILES.keys())}")
filename = POLICY_FILES[policy_type]
logger.info(f"Loading {policy_type.upper()} policy from: {repo_id}/{filename}")
policy_path = hf_hub_download(repo_id=repo_id, filename=filename)
policy = ort.InferenceSession(policy_path)
logger.info(f"Policy loaded: {policy.get_inputs()[0].shape}{policy.get_outputs()[0].shape}")
# Extract KP/KD from ONNX metadata
model = onnx.load(policy_path)
metadata = {prop.key: prop.value for prop in model.metadata_props}
if "kp" not in metadata or "kd" not in metadata:
raise ValueError("ONNX model must contain 'kp' and 'kd' in metadata")
kp = np.array(json.loads(metadata["kp"]), dtype=np.float32)
kd = np.array(json.loads(metadata["kd"]), dtype=np.float32)
logger.info(f"Loaded KP/KD from ONNX ({len(kp)} joints)")
return policy, kp, kd
class HolosomaLocomotionController:
"""Holosoma whole-body locomotion controller for Unitree G1."""
def __init__(self, policy, robot, kp: np.ndarray, kd: np.ndarray):
self.policy = policy
self.robot = robot
# Override robot's PD gains with policy gains
self.robot.kp = kp
self.robot.kd = kd
self.cmd = np.zeros(3, dtype=np.float32)
# Robot state
self.qj = np.zeros(29, dtype=np.float32)
self.dqj = np.zeros(29, dtype=np.float32)
self.obs = np.zeros(100, dtype=np.float32)
self.last_action = np.zeros(29, dtype=np.float32)
# Gait phase
self.phase = np.array([[0.0, np.pi]], dtype=np.float32)
self.phase_dt = 2 * np.pi / ((1.0 / CONTROL_DT) * GAIT_PERIOD)
self.is_standing = True
def run_step(self):
# Get current observation
robot_state = self.robot.get_observation()
if robot_state is None:
return
# Get command from remote controller
if robot_state.wireless_remote is not None:
self.robot.remote_controller.set(robot_state.wireless_remote)
ly = self.robot.remote_controller.ly if abs(self.robot.remote_controller.ly) > 0.1 else 0.0
lx = self.robot.remote_controller.lx if abs(self.robot.remote_controller.lx) > 0.1 else 0.0
rx = self.robot.remote_controller.rx if abs(self.robot.remote_controller.rx) > 0.1 else 0.0
self.cmd[:] = [ly, -lx, -rx]
# Get joint positions and velocities
for i in range(29):
self.qj[i] = robot_state.motor_state[i].q
self.dqj[i] = robot_state.motor_state[i].dq
# Adapt observation for g1_23dof
for idx in MISSING_JOINTS:
self.qj[idx] = 0.0
self.dqj[idx] = 0.0
# Express IMU data in gravity frame of reference
quat = robot_state.imu_state.quaternion
ang_vel = np.array(robot_state.imu_state.gyroscope, dtype=np.float32)
gravity = self.robot.get_gravity_orientation(quat)
# Scale joint positions and velocities before policy inference
qj_obs = (self.qj - DEFAULT_ANGLES) * DOF_POS_SCALE
dqj_obs = self.dqj * DOF_VEL_SCALE
ang_vel_s = ang_vel * ANG_VEL_SCALE
# Update gait phase
if np.linalg.norm(self.cmd[:2]) < 0.01 and abs(self.cmd[2]) < 0.01:
self.phase[0, :] = np.pi
self.is_standing = True
elif self.is_standing:
self.phase = np.array([[0.0, np.pi]], dtype=np.float32)
self.is_standing = False
else:
self.phase = np.fmod(self.phase + self.phase_dt + np.pi, 2 * np.pi) - np.pi
sin_ph = np.sin(self.phase[0])
cos_ph = np.cos(self.phase[0])
# Build observations
self.obs[0:29] = self.last_action
self.obs[29:32] = ang_vel_s
self.obs[32] = self.cmd[2]
self.obs[33:35] = self.cmd[:2]
self.obs[35:37] = cos_ph
self.obs[37:66] = qj_obs
self.obs[66:95] = dqj_obs
self.obs[95:98] = gravity
self.obs[98:100] = sin_ph
# Run policy inference
ort_in = {self.policy.get_inputs()[0].name: self.obs.reshape(1, -1).astype(np.float32)}
raw_action = self.policy.run(None, ort_in)[0].squeeze()
action = np.clip(raw_action, -100.0, 100.0)
self.last_action = action.copy()
# Transform action back to target joint positions
target = DEFAULT_ANGLES + action * ACTION_SCALE
# Build action dict
action_dict = {}
for motor in G1_29_JointIndex:
action_dict[f"{motor.name}.q"] = float(target[motor.value])
# Zero out missing joints for g1_23dof
for joint_idx in MISSING_JOINTS:
motor_name = G1_29_JointIndex(joint_idx).name
action_dict[f"{motor_name}.q"] = 0.0
# Send action to robot
self.robot.send_action(action_dict)
def run(repo_id: str = DEFAULT_HOLOSOMA_REPO_ID, policy_type: str = "fastsac") -> None:
"""Main function to run the Holosoma locomotion controller.
Args:
repo_id: Hugging Face Hub repository ID for Holosoma policies.
policy_type: Policy type to use ('fastsac' or 'ppo').
"""
# Load policy and gains
policy, kp, kd = load_policy(repo_id=repo_id, policy_type=policy_type)
# Initialize robot
config = UnitreeG1Config()
robot = UnitreeG1(config)
holosoma_controller = HolosomaLocomotionController(policy, robot, kp, kd)
try:
robot.reset(CONTROL_DT, DEFAULT_ANGLES)
logger.info("Use joystick: LY=fwd/back, LX=left/right, RX=rotate")
logger.info("Press Ctrl+C to stop")
# Run step
while True:
start_time = time.time()
holosoma_controller.run_step()
elapsed = time.time() - start_time
sleep_time = max(0, CONTROL_DT - elapsed)
time.sleep(sleep_time)
except KeyboardInterrupt:
logger.info("Stopping locomotion...")
finally:
if robot.is_connected:
robot.disconnect()
logger.info("Done!")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Holosoma Locomotion Controller for Unitree G1")
parser.add_argument(
"--repo-id",
type=str,
default=DEFAULT_HOLOSOMA_REPO_ID,
help=f"Hugging Face Hub repo ID for Holosoma policies (default: {DEFAULT_HOLOSOMA_REPO_ID})",
)
parser.add_argument(
"--policy",
type=str,
choices=["fastsac", "ppo"],
default="fastsac",
help="Policy type to use: 'fastsac' (default) or 'ppo'",
)
args = parser.parse_args()
run(repo_id=args.repo_id, policy_type=args.policy)
+3 -3
View File
@@ -109,7 +109,7 @@ hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"] lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
unitree_g1 = [ unitree_g1 = [
"pyzmq>=26.2.1,<28.0.0", "pyzmq>=26.2.1,<28.0.0",
"onnxruntime>=1.16.0" "onnxruntime>=1.16.0,<2.0.0"
] ]
reachy2 = ["reachy2_sdk>=1.0.14,<1.1.0"] reachy2 = ["reachy2_sdk>=1.0.14,<1.1.0"]
kinematics = ["lerobot[placo-dep]"] kinematics = ["lerobot[placo-dep]"]
@@ -141,13 +141,13 @@ groot = [
"ninja>=1.11.1,<2.0.0", "ninja>=1.11.1,<2.0.0",
"flash-attn>=2.5.9,<3.0.0 ; sys_platform != 'darwin'" "flash-attn>=2.5.9,<3.0.0 ; sys_platform != 'darwin'"
] ]
sarm = ["lerobot[transformers-dep]", "faker>=33.0.0,<35.0.0", "matplotlib>=3.10.3,<4.0.0", "qwen-vl-utils>=0.0.14"] sarm = ["lerobot[transformers-dep]", "faker>=33.0.0,<35.0.0", "matplotlib>=3.10.3,<4.0.0", "qwen-vl-utils>=0.0.14,<0.1.0"]
xvla = ["lerobot[transformers-dep]"] xvla = ["lerobot[transformers-dep]"]
hilserl = ["lerobot[transformers-dep]", "gym-hil>=0.1.13,<0.2.0", "lerobot[grpcio-dep]", "lerobot[placo-dep]"] hilserl = ["lerobot[transformers-dep]", "gym-hil>=0.1.13,<0.2.0", "lerobot[grpcio-dep]", "lerobot[placo-dep]"]
# Features # Features
async = ["lerobot[grpcio-dep]", "matplotlib>=3.10.3,<4.0.0"] async = ["lerobot[grpcio-dep]", "matplotlib>=3.10.3,<4.0.0"]
peft = ["lerobot[transformers-dep]", "peft>=0.18.0"] peft = ["lerobot[transformers-dep]", "peft>=0.18.0,<1.0.0"]
# Development # Development
dev = ["pre-commit>=3.7.0,<5.0.0", "debugpy>=1.8.1,<1.9.0", "lerobot[grpcio-dep]", "grpcio-tools==1.73.1", "mypy>=1.19.1"] dev = ["pre-commit>=3.7.0,<5.0.0", "debugpy>=1.8.1,<1.9.0", "lerobot[grpcio-dep]", "grpcio-tools==1.73.1", "mypy>=1.19.1"]
@@ -49,10 +49,14 @@ class UnitreeG1Config(RobotConfig):
kp: list[float] = field(default_factory=lambda: _DEFAULT_KP.copy()) kp: list[float] = field(default_factory=lambda: _DEFAULT_KP.copy())
kd: list[float] = field(default_factory=lambda: _DEFAULT_KD.copy()) kd: list[float] = field(default_factory=lambda: _DEFAULT_KD.copy())
# Default joint positions
default_positions: list[float] = field(default_factory=lambda: [0.0] * 29)
# Control loop timestep
control_dt: float = 1.0 / 250.0 # 250Hz control_dt: float = 1.0 / 250.0 # 250Hz
# launch mujoco simulation # Launch mujoco simulation
is_simulation: bool = True is_simulation: bool = True
# socket config for ZMQ bridge # Socket config for ZMQ bridge
robot_ip: str = "192.168.123.164" robot_ip: str = "192.168.123.164" # default G1 IP
@@ -79,11 +79,3 @@ class G1_29_JointIndex(IntEnum):
kRightWristRoll = 26 kRightWristRoll = 26
kRightWristPitch = 27 kRightWristPitch = 27
kRightWristYaw = 28 kRightWristYaw = 28
# not used
kNotUsedJoint0 = 29
kNotUsedJoint1 = 30
kNotUsedJoint2 = 31
kNotUsedJoint3 = 32
kNotUsedJoint4 = 33
kNotUsedJoint5 = 34
+53 -6
View File
@@ -43,10 +43,7 @@ logger = logging.getLogger(__name__)
kTopicLowCommand_Debug = "rt/lowcmd" kTopicLowCommand_Debug = "rt/lowcmd"
kTopicLowState = "rt/lowstate" kTopicLowState = "rt/lowstate"
G1_29_Num_Motors = 35 G1_29_Num_Motors = 29
G1_23_Num_Motors = 35
H1_2_Num_Motors = 35
H1_Num_Motors = 20
@dataclass @dataclass
@@ -266,8 +263,17 @@ class UnitreeG1(Robot):
return {**self._motors_ft, **self._cameras_ft} return {**self._motors_ft, **self._cameras_ft}
def send_action(self, action: dict[str, Any]) -> dict[str, Any]: def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
self.msg.crc = self.crc.Crc(action) for motor in G1_29_JointIndex:
self.lowcmd_publisher.Write(action) key = f"{motor.name}.q"
if key in action:
self.msg.motor_cmd[motor.value].q = action[key]
self.msg.motor_cmd[motor.value].qd = 0
self.msg.motor_cmd[motor.value].kp = self.kp[motor.value]
self.msg.motor_cmd[motor.value].kd = self.kd[motor.value]
self.msg.motor_cmd[motor.value].tau = 0
self.msg.crc = self.crc.Crc(self.msg)
self.lowcmd_publisher.Write(self.msg)
return action return action
def get_gravity_orientation(self, quaternion): # get gravity orientation from quaternion def get_gravity_orientation(self, quaternion): # get gravity orientation from quaternion
@@ -282,3 +288,44 @@ class UnitreeG1(Robot):
gravity_orientation[1] = -2 * (qz * qy + qw * qx) gravity_orientation[1] = -2 * (qz * qy + qw * qx)
gravity_orientation[2] = 1 - 2 * (qw * qw + qz * qz) gravity_orientation[2] = 1 - 2 * (qw * qw + qz * qz)
return gravity_orientation return gravity_orientation
def reset(
self,
control_dt: float | None = None,
default_positions: list[float] | None = None,
) -> None: # interpolate to default position
if control_dt is None:
control_dt = self.config.control_dt
if default_positions is None:
default_positions = np.array(self.config.default_positions, dtype=np.float32)
total_time = 3.0
num_steps = int(total_time / control_dt)
# get current state
robot_state = self.get_observation()
# record current positions
init_dof_pos = np.zeros(29, dtype=np.float32)
for i in range(29):
init_dof_pos[i] = robot_state.motor_state[i].q
# Interpolate to default position
for step in range(num_steps):
start_time = time.time()
alpha = step / num_steps
action_dict = {}
for motor in G1_29_JointIndex:
target_pos = default_positions[motor.value]
interp_pos = init_dof_pos[motor.value] * (1 - alpha) + target_pos * alpha
action_dict[f"{motor.name}.q"] = float(interp_pos)
self.send_action(action_dict)
# Maintain constant control rate
elapsed = time.time() - start_time
sleep_time = max(0, control_dt - elapsed)
time.sleep(sleep_time)
logger.info("Reached default position")