diff --git a/docs/source/unitree_g1.mdx b/docs/source/unitree_g1.mdx
index fa7159154..39bd7832b 100644
--- a/docs/source/unitree_g1.mdx
+++ b/docs/source/unitree_g1.mdx
@@ -1,23 +1,49 @@
# Unitree G1
-This guide covers the complete setup process for the Unitree G1 humanoid, from initial connection to running gr00t_wbc locomotion.
+
-## About
-
-We support both 29 and 23 DOF G1 EDU version. We introduce:
-
-- **`unitree g1` robot class, handling low level read/write from/to the humanoid**
-- **ZMQ socket bridge** for remote communication and camera streaming, allowing for remote policy deployment over wlan, eth or directly on the robot
-- **Locomotion policies** from NVIDIA gr00t and Amazon FAR Holosoma
-- **Simulation mode** for testing policies without the physical robot in mujoco
+The Unitree G1 humanoid is now supported in LeRobot! You can teleoperate, train locomanipulation policies, test in sim, and more. Both 29 and 23 DoF variants are supported.
---
-## Connection guide
+## Part 1: Getting Started
-### Step 1: Configure Ethernet Interface
+### Install LeRobot on Your Machine
-Set a static IP on the same subnet as the robot:
+```bash
+conda create -y -n lerobot python=3.12
+conda activate lerobot
+git clone https://github.com/unitreerobotics/unitree_sdk2_python.git
+cd unitree_sdk2_python && pip install -e .
+git clone https://github.com/huggingface/lerobot.git
+cd lerobot
+pip install -e '.[unitree_g1]'
+```
+
+### Test the Installation (Simulation)
+
+```bash
+lerobot-teleoperate \
+ --robot.type=unitree_g1 \
+ --robot.is_simulation=true \
+ --teleop.type=unitree_g1 \
+ --teleop.id=wbc_unitree \
+ --robot.cameras='{"global_view": {"type": "zmq", "server_address": "localhost", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
+ --display_data=true
+```
+
+This will launch a [MuJoCo sim instance](https://huggingface.co/lerobot/unitree-g1-mujoco/tree/main) for the G1.
+
+- Press `9` to release the robot
+- Press `7` / `8` to increase / decrease waist height
+
+### Connect to the Robot
+
+The G1's Ethernet IP is fixed at `192.168.123.164`. Your machine must have a static IP on the same subnet: `192.168.123.x` where `x ≠ 164`.
```bash
# Replace 'enp131s0' with your ethernet interface name (check with `ip a`)
@@ -26,272 +52,200 @@ sudo ip addr add 192.168.123.200/24 dev enp131s0
sudo ip link set enp131s0 up
```
-**Note**: The G1's Ethernet IP is fixed at `192.168.123.164`. Your computer must use `192.168.123.x` with x ≠ 164.
-
-### Step 2: SSH into the Robot
+### SSH into the Robot
```bash
ssh unitree@192.168.123.164
# Password: 123
```
-You should now be connected to the G1's Orin.
+### Install LeRobot on the G1
+
+From the robot:
+
+```bash
+conda create -y -n lerobot python=3.12
+conda activate lerobot
+git clone https://github.com/unitreerobotics/unitree_sdk2_python.git
+cd unitree_sdk2_python && pip install -e .
+git clone https://github.com/huggingface/lerobot.git
+cd lerobot
+pip install -e '.[unitree_g1]'
+```
+
+> **Note:** The Unitree SDK requires CycloneDDS v0.10.2. See the [Unitree SDK docs](https://github.com/unitreerobotics/unitree_sdk2_python) for details.
---
## Part 2: Enable WiFi on the Robot
-Wlan0 is disabled by default on the G1. To enable it:
-
-### Step 1: Enable WiFi Hardware
+Wi-Fi connectivity is blocked by default on the G1. To activate:
```bash
-sudo rfkill unblock wifi
sudo rfkill unblock all
-
-# Bring up wlan0
sudo ip link set wlan0 up
-
-# Enable NetworkManager control of wlan0
sudo nmcli radio wifi on
sudo nmcli device set wlan0 managed yes
sudo systemctl restart NetworkManager
```
-### Step 2: Enable Internet Forwarding
-
-**On your laptop:**
+**On your laptop** (share internet via Ethernet):
```bash
-# Enable IP forwarding
sudo sysctl -w net.ipv4.ip_forward=1
-# Set up NAT (replace wlp132s0f0 with your WiFi interface)
+# Replace wlp132s0f0 with your WiFi interface name
sudo iptables -t nat -A POSTROUTING -o wlp132s0f0 -s 192.168.123.0/24 -j MASQUERADE
sudo iptables -A FORWARD -i wlp132s0f0 -o enp131s0 -m state --state RELATED,ESTABLISHED -j ACCEPT
sudo iptables -A FORWARD -i enp131s0 -o wlp132s0f0 -j ACCEPT
```
-**On the G1:**
+**On the G1** (set default route through your laptop):
```bash
-# Add laptop as default gateway
sudo ip route del default 2>/dev/null || true
sudo ip route add default via 192.168.123.200 dev eth0
echo "nameserver 8.8.8.8" | sudo tee /etc/resolv.conf
-# Test connection
+# Verify
ping -c 3 8.8.8.8
```
-### Step 3: Connect to WiFi Network
+**Connect to a WiFi network:**
```bash
-# List available networks
nmcli device wifi list
-# Connect to your WiFi (example)
sudo nmcli connection add type wifi ifname wlan0 con-name "YourNetwork" ssid "YourNetwork"
sudo nmcli connection modify "YourNetwork" wifi-sec.key-mgmt wpa-psk
sudo nmcli connection modify "YourNetwork" wifi-sec.psk "YourPassword"
sudo nmcli connection modify "YourNetwork" connection.autoconnect yes
sudo nmcli connection up "YourNetwork"
-# Check WiFi IP address
ip a show wlan0
```
-### Step 4: SSH Over WiFi
-
-Once connected to WiFi, note the robot's IP address and disconnect the Ethernet cable. You can now SSH over WiFi:
+You can now SSH over WiFi:
```bash
-ssh unitree@
+ssh unitree@
# Password: 123
```
-Replace `` with your robot's actual WiFi IP address.
-
---
-## Part 3: Robot Server Setup
+## Part 3: Teleoperation & Locomotion
-### Step 1: Install LeRobot on the Orin
-
-SSH into the robot and install LeRobot:
-
-```bash
-ssh unitree@
-
-conda create -y -n lerobot python=3.12
-conda activate lerobot
-git clone https://github.com/huggingface/lerobot.git
-cd lerobot
-pip install -e '.[unitree_g1]'
-git clone https://github.com/unitreerobotics/unitree_sdk2_python.git
-cd unitree_sdk2_python && pip install -e .
-```
-
-**Note**: The Unitree SDK requires CycloneDDS v0.10.2 to be installed. See the [Unitree SDK documentation](https://github.com/unitreerobotics/unitree_sdk2_python) for details.
-
-### Step 2: Run the Robot Server
+### Run the Robot Server
On the robot:
```bash
-python src/lerobot/robots/unitree_g1/run_g1_server.py
+python src/lerobot/robots/unitree_g1/run_g1_server.py --camera
```
-**Important**: Keep this terminal running. The server must be active for remote control.
+### Run the Locomotion Policy
+
+```bash
+lerobot-teleoperate \
+ --robot.type=unitree_g1 \
+ --robot.is_simulation=false \
+ --robot.robot_ip= \
+ --teleop.type=unitree_g1 \
+ --teleop.id=wbc_unitree \
+ --robot.cameras='{"global_view": {"type": "zmq", "server_address": "", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
+ --display_data=true \
+ --robot.controller=HolosomaLocomotionController
+```
+
+We support both [HolosomaLocomotionController](https://github.com/amazon-far/holosoma) and [GrootLocomotionController](https://github.com/NVlabs/GR00T-WholeBodyControl).
---
-## Part 4: Controlling the robot
+## Part 4: Loco-Manipulation with the Homunculus Exoskeleton
-With the robot server running, you can now control the robot remotely. Let's launch a locomotion policy
+We provide a loco-manipulation solution via the Homunculus Exoskeleton — an open-source 7 DoF exoskeleton for whole-body control. Assembly instructions [here](https://github.com/nepyope/hmc_exo).
-### Step 1: Install LeRobot on your machine
-
-```bash
-conda create -y -n lerobot python=3.12
-conda activate lerobot
-git clone https://github.com/huggingface/lerobot.git
-cd lerobot
-pip install -e '.[unitree_g1]'
-git clone https://github.com/unitreerobotics/unitree_sdk2_python.git
-cd unitree_sdk2_python && pip install -e .
-```
-
-### Step 2: Update Robot IP in Config
-
-Edit the config file to match your robot's WiFi IP:
-
-```python
-# In src/lerobot/robots/unitree_g1/config_unitree_g1.py
-robot_ip: str = "" # Replace with your robot's WiFi IP.
-```
-
-### Step 3: Run the Locomotion Policy
-
-```bash
-# Run GR00T locomotion controller
-python examples/unitree_g1/gr00t_locomotion.py --repo-id "nepyope/GR00T-WholeBodyControl_g1"
-
-# Run Holosoma locomotion controller
-python examples/unitree_g1/holosoma_locomotion.py
-
-```
-
-Press `Ctrl+C` to stop the policy.
-
----
-
-## Running in Simulation Mode (MuJoCo)
-
-You can test policies before deploying on the physical robot using MuJoCo simulation. Set `is_simulation=True` in config or pass `--robot.is_simulation=true` via CLI.
-
-### Calibrate Exoskeleton Teleoperator
+### Calibrate
```bash
lerobot-calibrate \
- --teleop.type=unitree_g1 \
- --teleop.left_arm_config.port=/dev/ttyACM1 \
- --teleop.right_arm_config.port=/dev/ttyACM0 \
- --teleop.id=exo
+ --teleop.type=unitree_g1 \
+ --teleop.left_arm_config.port=/dev/ttyACM1 \
+ --teleop.right_arm_config.port=/dev/ttyACM0 \
+ --teleop.id=exo
```
-### Teleoperate in Simulation
+During calibration move each joint through its entire range. After fitting, move the joint in a neutral position and press `n` to advance.
-```bash
-lerobot-teleoperate \
- --robot.type=unitree_g1 \
- --robot.is_simulation=true \
- --teleop.type=unitree_g1 \
- --teleop.left_arm_config.port=/dev/ttyACM1 \
- --teleop.right_arm_config.port=/dev/ttyACM0 \
- --teleop.id=exo \
- --fps=100
-```
-
-### Record Dataset in Simulation
+### Record a Dataset
```bash
lerobot-record \
- --robot.type=unitree_g1 \
- --robot.is_simulation=true \
- --robot.cameras='{"global_view": {"type": "zmq", "server_address": "localhost", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
- --teleop.type=unitree_g1 \
- --teleop.left_arm_config.port=/dev/ttyACM1 \
- --teleop.right_arm_config.port=/dev/ttyACM0 \
- --teleop.id=exo \
- --dataset.repo_id=your-username/dataset-name \
- --dataset.single_task="Test" \
- --dataset.num_episodes=2 \
- --dataset.episode_time_s=5 \
- --dataset.reset_time_s=5 \
- --dataset.push_to_hub=true \
- --dataset.streaming_encoding=true \
- # --dataset.vcodec=auto \
- --dataset.encoder_threads=2
+ --robot.type=unitree_g1 \
+ --robot.is_simulation=true \
+ --robot.cameras='{"global_view": {"type": "zmq", "server_address": "localhost", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
+ --teleop.type=unitree_g1 \
+ --teleop.left_arm_config.port=/dev/ttyACM1 \
+ --teleop.right_arm_config.port=/dev/ttyACM0 \
+ --teleop.id=exo \
+ --dataset.repo_id=your-username/dataset-name \
+ --dataset.single_task="Test" \
+ --dataset.num_episodes=2 \
+ --dataset.episode_time_s=5 \
+ --dataset.reset_time_s=5 \
+ --dataset.push_to_hub=true \
+ --dataset.streaming_encoding=true \
+ --dataset.encoder_threads=2
```
-Example simulation dataset: [nepyope/teleop_test_sim](https://huggingface.co/datasets/nepyope/teleop_test_sim)
+> **Note:** Omit `--teleop.left_arm_config.port` and `--teleop.right_arm_config.port` if you're only using the joystick.
+
+Example dataset: [nepyope/unitree_box_move_blue_full](https://huggingface.co/datasets/nepyope/unitree_box_move_blue_full)
---
-## Running on Real Robot
+## Part 5: Training & Inference
-Once the robot server is running on the G1 (see Part 3), you can teleoperate and record on the real robot.
-
-### Start the Camera Server
-
-On the robot, start the ZMQ image server:
+### Train
```bash
-python src/lerobot/cameras/zmq/image_server.py
+python src/lerobot/scripts/lerobot_train.py \
+ --dataset.repo_id=your-username/dataset-name \
+ --policy.type=pi05 \
+ --output_dir=./outputs/pi05_training \
+ --job_name=pi05_training \
+ --policy.repo_id=your-username/your-repo-id \
+ --policy.pretrained_path=lerobot/pi05_base \
+ --policy.compile_model=true \
+ --policy.gradient_checkpointing=true \
+ --wandb.enable=true \
+ --policy.dtype=bfloat16 \
+ --policy.freeze_vision_encoder=false \
+ --policy.train_expert_only=false \
+ --steps=3000 \
+ --policy.device=cuda \
+ --batch_size=32
```
-Keep this running in a separate terminal for camera streaming during recording.
+### Inference with RTC
-### Teleoperate Real Robot
+Once trained, we recommend deploying policies using inference-time RTC:
```bash
-lerobot-teleoperate \
- --robot.type=unitree_g1 \
- --robot.is_simulation=false \
- --teleop.type=unitree_g1 \
- --teleop.left_arm_config.port=/dev/ttyACM1 \
- --teleop.right_arm_config.port=/dev/ttyACM0 \
- --teleop.id=exo \
- --fps=100
+python examples/rtc/eval_with_real_robot.py \
+ --policy.path=your-username/your-repo-id \
+ --policy.device=cuda \
+ --robot.type=unitree_g1 \
+ --robot.is_simulation=false \
+ --robot.controller=HolosomaLocomotionController \
+ --robot.cameras='{"global_view": {"type": "zmq", "server_address": "", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
+ --task="task_description" \
+ --duration=1000 \
+ --fps=30 \
+ --rtc.enabled=true
```
-### Record Dataset on Real Robot
-
-```bash
-lerobot-record \
- --robot.type=unitree_g1 \
- --robot.is_simulation=false \
- --robot.cameras='{"global_view": {"type": "zmq", "server_address": "172.18.129.215", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
- --teleop.type=unitree_g1 \
- --teleop.left_arm_config.port=/dev/ttyACM1 \
- --teleop.right_arm_config.port=/dev/ttyACM0 \
- --teleop.id=exo \
- --dataset.repo_id=your-username/dataset-name \
- --dataset.single_task="Test" \
- --dataset.num_episodes=2 \
- --dataset.episode_time_s=5 \
- --dataset.reset_time_s=5 \
- --dataset.push_to_hub=true \
- --dataset.streaming_encoding=true \
- # --dataset.vcodec=auto \
- --dataset.encoder_threads=2
-```
-
-**Note**: Update `server_address` to match your robot's camera server IP.
-
-Example real robot dataset: [nepyope/teleop_test_real](https://huggingface.co/datasets/nepyope/teleop_test_real)
-
---
## Additional Resources
@@ -300,8 +254,8 @@ Example real robot dataset: [nepyope/teleop_test_real](https://huggingface.co/da
- [GR00T-WholeBodyControl](https://github.com/NVlabs/GR00T-WholeBodyControl)
- [Holosoma](https://github.com/amazon-far/holosoma)
- [LeRobot Documentation](https://github.com/huggingface/lerobot)
-- [Unitree_IL_Lerobot](https://github.com/unitreerobotics/unitree_IL_lerobot)
+- [Unitree IL LeRobot](https://github.com/unitreerobotics/unitree_IL_lerobot)
---
-_Last updated: December 2025_
+_Last updated: March 2026_
diff --git a/examples/rtc/eval_with_real_robot.py b/examples/rtc/eval_with_real_robot.py
index 4c803eb7e..9d9e1364a 100644
--- a/examples/rtc/eval_with_real_robot.py
+++ b/examples/rtc/eval_with_real_robot.py
@@ -78,6 +78,7 @@ from torch import Tensor
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
+from lerobot.cameras.zmq.configuration_zmq import ZMQCameraConfig # noqa: F401
from lerobot.configs import parser
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import RTCAttentionSchedule
@@ -97,6 +98,7 @@ from lerobot.robots import ( # noqa: F401
bi_so_follower,
koch_follower,
so_follower,
+ unitree_g1,
)
from lerobot.robots.utils import make_robot_from_config
from lerobot.utils.constants import OBS_IMAGES
diff --git a/pyproject.toml b/pyproject.toml
index d75d6b788..696d8597d 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -119,11 +119,13 @@ gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"]
hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
unitree_g1 = [
+ "unitree-sdk2==1.0.1",
"pyzmq>=26.2.1,<28.0.0",
"onnxruntime>=1.16.0,<2.0.0",
"pin>=3.0.0,<4.0.0",
"meshcat>=0.3.0,<0.4.0",
"lerobot[matplotlib-dep]",
+ "lerobot[pygame-dep]",
"casadi>=3.6.0,<4.0.0",
]
reachy2 = ["reachy2_sdk>=1.0.15,<1.1.0"]
@@ -206,6 +208,7 @@ all = [
"lerobot[metaworld]",
"lerobot[sarm]",
"lerobot[peft]",
+ # "lerobot[unitree_g1]", TODO: Unitree requires specific installation instructions for unitree_sdk2
]
[project.scripts]
diff --git a/src/lerobot/cameras/zmq/camera_zmq.py b/src/lerobot/cameras/zmq/camera_zmq.py
index 16523b50a..2fbe50d8b 100644
--- a/src/lerobot/cameras/zmq/camera_zmq.py
+++ b/src/lerobot/cameras/zmq/camera_zmq.py
@@ -181,7 +181,7 @@ class ZMQCamera(Camera):
try:
message = self.socket.recv_string()
except Exception as e:
- # Check for ZMQ timeout (EAGAIN/Again) without requiring global zmq import
+ # zmq is lazy-imported in connect(), so check by name to avoid a top-level import
if type(e).__name__ == "Again":
raise TimeoutError(f"{self} timeout after {self.timeout_ms}ms") from e
raise
diff --git a/src/lerobot/cameras/zmq/image_server.py b/src/lerobot/cameras/zmq/image_server.py
index 2da366cef..8222b9fee 100644
--- a/src/lerobot/cameras/zmq/image_server.py
+++ b/src/lerobot/cameras/zmq/image_server.py
@@ -23,6 +23,7 @@ import base64
import contextlib
import json
import logging
+import threading
import time
from collections import deque
@@ -42,10 +43,57 @@ def encode_image(image: np.ndarray, quality: int = 80) -> str:
return base64.b64encode(buffer).decode("utf-8")
+class CameraCaptureThread:
+ """Background thread that continuously captures and encodes frames from a camera."""
+
+ def __init__(self, camera: OpenCVCamera, name: str):
+ self.camera = camera
+ self.name = name
+ self.latest_encoded: str | None = None # Pre-encoded JPEG as base64
+ self.latest_timestamp: float = 0.0
+ self.frame_lock = threading.Lock()
+ self.running = False
+ self.thread: threading.Thread | None = None
+
+ def start(self):
+ """Start the capture thread."""
+ self.running = True
+ self.thread = threading.Thread(target=self._capture_loop, daemon=True)
+ self.thread.start()
+
+ def stop(self):
+ """Stop the capture thread."""
+ self.running = False
+ if self.thread:
+ self.thread.join(timeout=1.0)
+
+ def _capture_loop(self):
+ """Continuously capture and encode frames at the camera's native rate."""
+ while self.running:
+ try:
+ frame = self.camera.read() # Blocks at camera's native rate
+ timestamp = time.time()
+ # Encode immediately in capture thread (this is the slow part)
+ encoded = encode_image(frame)
+ with self.frame_lock:
+ self.latest_encoded = encoded
+ self.latest_timestamp = timestamp
+ except Exception as e:
+ logger.warning(f"Camera {self.name} capture error: {e}")
+ time.sleep(0.01)
+
+ def get_latest(self) -> tuple[str | None, float]:
+ """Get the latest encoded frame and its timestamp."""
+ with self.frame_lock:
+ return self.latest_encoded, self.latest_timestamp
+
+
class ImageServer:
def __init__(self, config: dict, port: int = 5555):
+ # fps controls the publish loop rate (how often frames are sent over ZMQ), not the camera capture rate
self.fps = config.get("fps", 30)
self.cameras: dict[str, OpenCVCamera] = {}
+ self.capture_threads: dict[str, CameraCaptureThread] = {}
for name, cfg in config.get("cameras", {}).items():
shape = cfg.get("shape", [480, 640])
@@ -61,6 +109,10 @@ class ImageServer:
self.cameras[name] = camera
logger.info(f"Camera {name}: {shape[1]}x{shape[0]}")
+ # Create capture thread for this camera
+ capture_thread = CameraCaptureThread(camera, name)
+ self.capture_threads[name] = capture_thread
+
# ZMQ PUB socket
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
@@ -73,6 +125,18 @@ class ImageServer:
def run(self):
frame_count = 0
frame_times = deque(maxlen=60)
+ last_published_ts: dict[str, float] = {}
+
+ # Start all capture threads
+ for capture_thread in self.capture_threads.values():
+ capture_thread.start()
+
+ # Wait for first frames to be captured and encoded
+ logger.info("Waiting for cameras to start capturing...")
+ for name, capture_thread in self.capture_threads.items():
+ while capture_thread.get_latest()[0] is None:
+ time.sleep(0.01)
+ logger.info(f"Camera {name} ready (capture + encode in background)")
try:
while True:
@@ -80,10 +144,12 @@ class ImageServer:
# Build message
message = {"timestamps": {}, "images": {}}
- for name, cam in self.cameras.items():
- frame = cam.read() # Returns RGB
- message["timestamps"][name] = time.time()
- message["images"][name] = encode_image(frame)
+ for name, capture_thread in self.capture_threads.items():
+ encoded, timestamp = capture_thread.get_latest()
+ if encoded is not None and timestamp > last_published_ts.get(name, 0.0):
+ message["timestamps"][name] = timestamp
+ message["images"][name] = encoded
+ last_published_ts[name] = timestamp
# Send as JSON string (suppress if buffer full)
with contextlib.suppress(zmq.Again):
@@ -102,6 +168,8 @@ class ImageServer:
except KeyboardInterrupt:
pass
finally:
+ for capture_thread in self.capture_threads.values():
+ capture_thread.stop()
for cam in self.cameras.values():
cam.disconnect()
self.socket.close()
diff --git a/src/lerobot/robots/unitree_g1/__init__.py b/src/lerobot/robots/unitree_g1/__init__.py
index d91be150f..ef3a9d05e 100644
--- a/src/lerobot/robots/unitree_g1/__init__.py
+++ b/src/lerobot/robots/unitree_g1/__init__.py
@@ -16,3 +16,5 @@
from .config_unitree_g1 import UnitreeG1Config
from .unitree_g1 import UnitreeG1
+
+__all__ = ["UnitreeG1", "UnitreeG1Config"]
diff --git a/src/lerobot/robots/unitree_g1/config_unitree_g1.py b/src/lerobot/robots/unitree_g1/config_unitree_g1.py
index 1b81214a6..b786c2a33 100644
--- a/src/lerobot/robots/unitree_g1/config_unitree_g1.py
+++ b/src/lerobot/robots/unitree_g1/config_unitree_g1.py
@@ -27,11 +27,10 @@ _GAINS: dict[str, dict[str, list[float]]] = {
}, # pitch, roll, yaw, knee, ankle_pitch, ankle_roll
"right_leg": {"kp": [150, 150, 150, 300, 40, 40], "kd": [2, 2, 2, 4, 2, 2]},
"waist": {"kp": [250, 250, 250], "kd": [5, 5, 5]}, # yaw, roll, pitch
- "left_arm": {"kp": [80, 80, 80, 80], "kd": [3, 3, 3, 3]}, # shoulder_pitch/roll/yaw, elbow
+ "left_arm": {"kp": [50, 50, 80, 80], "kd": [3, 3, 3, 3]}, # shoulder_pitch/roll/yaw, elbow
"left_wrist": {"kp": [40, 40, 40], "kd": [1.5, 1.5, 1.5]}, # roll, pitch, yaw
- "right_arm": {"kp": [80, 80, 80, 80], "kd": [3, 3, 3, 3]},
+ "right_arm": {"kp": [50, 50, 80, 80], "kd": [3, 3, 3, 3]},
"right_wrist": {"kp": [40, 40, 40], "kd": [1.5, 1.5, 1.5]},
- "other": {"kp": [80, 80, 80, 80, 80, 80], "kd": [3, 3, 3, 3, 3, 3]},
}
@@ -68,3 +67,7 @@ class UnitreeG1Config(RobotConfig):
# Compensates for gravity on the unitree's arms using the arm ik solver
gravity_compensation: bool = False
+
+ # Lower-body controller class name, e.g. "GrootLocomotionController" or
+ # "HolosomaLocomotionController". None disables it.
+ controller: str | None = None
diff --git a/src/lerobot/robots/unitree_g1/robot_kinematic_processor.py b/src/lerobot/robots/unitree_g1/g1_kinematics.py
similarity index 87%
rename from src/lerobot/robots/unitree_g1/robot_kinematic_processor.py
rename to src/lerobot/robots/unitree_g1/g1_kinematics.py
index d086a9986..f57320a11 100644
--- a/src/lerobot/robots/unitree_g1/robot_kinematic_processor.py
+++ b/src/lerobot/robots/unitree_g1/g1_kinematics.py
@@ -16,13 +16,11 @@
import logging
import os
-import sys
+from collections import deque
import numpy as np
logger = logging.getLogger(__name__)
-parent2_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-sys.path.append(parent2_dir)
class WeightedMovingFilter:
@@ -31,18 +29,14 @@ class WeightedMovingFilter:
self._weights = np.array(weights)
self._data_size = data_size
self._filtered_data = np.zeros(self._data_size)
- self._data_queue = []
+ self._data_queue = deque(maxlen=self._window_size)
def _apply_filter(self):
if len(self._data_queue) < self._window_size:
return self._data_queue[-1]
data_array = np.array(self._data_queue)
- temp_filtered_data = np.zeros(self._data_size)
- for i in range(self._data_size):
- temp_filtered_data[i] = np.convolve(data_array[:, i], self._weights, mode="valid")[-1]
-
- return temp_filtered_data
+ return data_array.T @ self._weights
def add_data(self, new_data):
assert len(new_data) == self._data_size
@@ -52,9 +46,6 @@ class WeightedMovingFilter:
): # skip duplicate data
return
- if len(self._data_queue) >= self._window_size:
- self._data_queue.pop(0)
-
self._data_queue.append(new_data)
self._filtered_data = self._apply_filter()
@@ -71,8 +62,6 @@ class G1_29_ArmIK: # noqa: N801
from pinocchio import casadi as cpin
self._pin = pin
- np.set_printoptions(precision=5, suppress=True, linewidth=200)
-
self.unit_test = unit_test
self.repo_path = snapshot_download("lerobot/unitree-g1-mujoco")
@@ -249,50 +238,35 @@ class G1_29_ArmIK: # noqa: N801
self.opti.set_value(self.param_tf_r, right_wrist)
self.opti.set_value(self.var_q_last, self.init_data) # for smooth
+ converged = True
try:
self.opti.solve()
-
sol_q = self.opti.value(self.var_q)
- self.smooth_filter.add_data(sol_q)
- sol_q = self.smooth_filter.filtered_data
-
- if current_lr_arm_motor_dq is not None:
- v = current_lr_arm_motor_dq * 0.0
- else:
- v = (sol_q - self.init_data) * 0.0
-
- self.init_data = sol_q
-
- sol_tauff = self._pin.rnea(
- self.reduced_robot.model,
- self.reduced_robot.data,
- sol_q,
- v,
- np.zeros(self.reduced_robot.model.nv),
- )
-
- return sol_q, sol_tauff
-
except Exception as e:
- logger.error(f"ERROR in convergence, plotting debug info.{e}")
-
+ converged = False
+ logger.error(f"IK convergence error: {e}")
sol_q = self.opti.debug.value(self.var_q)
- self.smooth_filter.add_data(sol_q)
- sol_q = self.smooth_filter.filtered_data
- if current_lr_arm_motor_dq is not None:
- v = current_lr_arm_motor_dq * 0.0
- else:
- v = (sol_q - self.init_data) * 0.0
-
- self.init_data = sol_q
+ self.smooth_filter.add_data(sol_q)
+ sol_q = self.smooth_filter.filtered_data
+ self.init_data = sol_q
+ if not converged:
logger.error(
f"sol_q:{sol_q} \nmotorstate: \n{current_lr_arm_motor_q} \nleft_pose: \n{left_wrist} \nright_pose: \n{right_wrist}"
)
-
return current_lr_arm_motor_q, np.zeros(self.reduced_robot.model.nv)
+ sol_tauff = self._pin.rnea(
+ self.reduced_robot.model,
+ self.reduced_robot.data,
+ sol_q,
+ np.zeros(self.reduced_robot.model.nv),
+ np.zeros(self.reduced_robot.model.nv),
+ )
+
+ return sol_q, sol_tauff
+
def solve_tau(self, current_lr_arm_motor_q=None, current_lr_arm_motor_dq=None):
try:
q_g1 = np.array(current_lr_arm_motor_q, dtype=float)
diff --git a/src/lerobot/robots/unitree_g1/g1_utils.py b/src/lerobot/robots/unitree_g1/g1_utils.py
index 4e37bdcef..91f009b26 100644
--- a/src/lerobot/robots/unitree_g1/g1_utils.py
+++ b/src/lerobot/robots/unitree_g1/g1_utils.py
@@ -14,12 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import importlib
from enum import IntEnum
+import numpy as np
+
# ruff: noqa: N801, N815
NUM_MOTORS = 29
+REMOTE_AXES = ("remote.lx", "remote.ly", "remote.rx", "remote.ry")
+REMOTE_BUTTONS = tuple(f"remote.button.{i}" for i in range(16))
+REMOTE_KEYS = REMOTE_AXES + REMOTE_BUTTONS
+
+
+def default_remote_input() -> dict[str, float]:
+ """Return a zeroed-out remote input dict (axes + buttons)."""
+ return dict.fromkeys(REMOTE_KEYS, 0.0)
+
+
+def get_gravity_orientation(quaternion: list[float] | np.ndarray) -> np.ndarray:
+ """Get gravity orientation from quaternion [w, x, y, z]."""
+ qw, qx, qy, qz = quaternion
+ gravity_orientation = np.zeros(3, dtype=np.float32)
+ gravity_orientation[0] = 2 * (-qz * qx + qw * qy)
+ gravity_orientation[1] = -2 * (qz * qy + qw * qx)
+ gravity_orientation[2] = 1 - 2 * (qw * qw + qz * qz)
+ return gravity_orientation
+
class G1_29_JointArmIndex(IntEnum):
# Left arm
@@ -29,7 +51,7 @@ class G1_29_JointArmIndex(IntEnum):
kLeftElbow = 18
kLeftWristRoll = 19
kLeftWristPitch = 20
- kLeftWristyaw = 21
+ kLeftWristYaw = 21
# Right arm
kRightShoulderPitch = 22
@@ -41,6 +63,21 @@ class G1_29_JointArmIndex(IntEnum):
kRightWristYaw = 28
+def make_locomotion_controller(name: str | None):
+ """Instantiate a locomotion controller by class name. Returns None if name is None."""
+ if name is None:
+ return None
+ controllers = {
+ "GrootLocomotionController": "lerobot.robots.unitree_g1.gr00t_locomotion",
+ "HolosomaLocomotionController": "lerobot.robots.unitree_g1.holosoma_locomotion",
+ }
+ module_path = controllers.get(name)
+ if module_path is None:
+ raise ValueError(f"Unknown controller: {name!r}. Available: {list(controllers)}")
+ module = importlib.import_module(module_path)
+ return getattr(module, name)()
+
+
class G1_29_JointIndex(IntEnum):
# Left leg
kLeftHipPitch = 0
@@ -69,7 +106,7 @@ class G1_29_JointIndex(IntEnum):
kLeftElbow = 18
kLeftWristRoll = 19
kLeftWristPitch = 20
- kLeftWristyaw = 21
+ kLeftWristYaw = 21
# Right arm
kRightShoulderPitch = 22
diff --git a/examples/unitree_g1/gr00t_locomotion.py b/src/lerobot/robots/unitree_g1/gr00t_locomotion.py
similarity index 59%
rename from examples/unitree_g1/gr00t_locomotion.py
rename to src/lerobot/robots/unitree_g1/gr00t_locomotion.py
index 0123b5206..31166e123 100644
--- a/examples/unitree_g1/gr00t_locomotion.py
+++ b/src/lerobot/robots/unitree_g1/gr00t_locomotion.py
@@ -14,20 +14,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import argparse
import logging
-import time
from collections import deque
import numpy as np
import onnxruntime as ort
from huggingface_hub import hf_hub_download
-from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
-from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
-from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
+from lerobot.robots.unitree_g1.g1_utils import (
+ REMOTE_AXES,
+ REMOTE_BUTTONS,
+ G1_29_JointIndex,
+ get_gravity_orientation,
+)
-logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -36,18 +36,13 @@ GROOT_DEFAULT_ANGLES[[0, 6]] = -0.1 # Hip pitch
GROOT_DEFAULT_ANGLES[[3, 9]] = 0.3 # Knee
GROOT_DEFAULT_ANGLES[[4, 10]] = -0.2 # Ankle pitch
-MISSING_JOINTS = []
-G1_MODEL = "g1_23" # Or "g1_29"
-if G1_MODEL == "g1_23":
- MISSING_JOINTS = [12, 14, 20, 21, 27, 28] # Waist yaw/pitch, wrist pitch/yaw
-
# Control parameters
ACTION_SCALE = 0.25
CONTROL_DT = 0.02 # 50Hz
ANG_VEL_SCALE: float = 0.25
DOF_POS_SCALE: float = 1.0
DOF_VEL_SCALE: float = 0.05
-CMD_SCALE: list = [2.0, 2.0, 0.25]
+CMD_SCALE: list[float] = [2.0, 2.0, 0.25]
DEFAULT_GROOT_REPO_ID = "nepyope/GR00T-WholeBodyControl_g1"
@@ -85,11 +80,11 @@ def load_groot_policies(
class GrootLocomotionController:
"""GR00T lower-body locomotion controller for the Unitree G1."""
- def __init__(self, policy_balance, policy_walk, robot, config):
- self.policy_balance = policy_balance
- self.policy_walk = policy_walk
- self.robot = robot
- self.config = config
+ control_dt = CONTROL_DT # Expose for unitree_g1.py
+
+ def __init__(self):
+ # Load policies
+ self.policy_balance, self.policy_walk = load_groot_policies()
self.cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) # vx, vy, theta_dot
@@ -109,45 +104,60 @@ class GrootLocomotionController:
logger.info("GrootLocomotionController initialized")
- def run_step(self):
- # Get current observation
- obs = self.robot.get_observation()
+ def reset(self) -> None:
+ """Reset internal state for a new episode."""
+ self.cmd[:] = 0.0
+ self.groot_qj_all[:] = 0.0
+ self.groot_dqj_all[:] = 0.0
+ self.groot_action[:] = 0.0
+ self.groot_obs_single[:] = 0.0
+ self.groot_obs_stacked[:] = 0.0
+ self.groot_height_cmd = 0.74
+ self.groot_orientation_cmd[:] = 0.0
+ self.groot_obs_history.clear()
+ for _ in range(6):
+ self.groot_obs_history.append(np.zeros(86, dtype=np.float32))
- if not obs:
- return
+ def run_step(self, action: dict, lowstate) -> dict:
+ """Run one step of the locomotion controller.
- # Get command from remote controller
- if obs["remote.buttons"][0]: # R1 - raise waist
+ Args:
+ action: Action dict containing remote.lx/ly/rx/ry and buttons
+ lowstate: Robot lowstate containing motor positions/velocities and IMU
+
+ Returns:
+ Action dict for lower body joints (0-14)
+ """
+ if lowstate is None:
+ return {}
+
+ buttons = [int(action.get(k, 0)) for k in REMOTE_BUTTONS]
+ if buttons[0]: # R1 - raise waist
self.groot_height_cmd += 0.001
self.groot_height_cmd = np.clip(self.groot_height_cmd, 0.50, 1.00)
- if obs["remote.buttons"][4]: # R2 - lower waist
+ if buttons[4]: # R2 - lower waist
self.groot_height_cmd -= 0.001
self.groot_height_cmd = np.clip(self.groot_height_cmd, 0.50, 1.00)
- self.cmd[0] = obs["remote.ly"] # Forward/backward
- self.cmd[1] = obs["remote.lx"] * -1 # Left/right
- self.cmd[2] = obs["remote.rx"] * -1 # Rotation rate
+ lx, ly, rx, _ry = (action.get(k, 0.0) for k in REMOTE_AXES)
+ self.cmd[0] = ly # Forward/backward
+ self.cmd[1] = -lx # Left/right (negated)
+ self.cmd[2] = -rx # Rotation rate (negated)
- # Get joint positions and velocities from flat dict
+ # Get joint positions and velocities from lowstate
for motor in G1_29_JointIndex:
- name = motor.name
idx = motor.value
- self.groot_qj_all[idx] = obs[f"{name}.q"]
- self.groot_dqj_all[idx] = obs[f"{name}.dq"]
-
- # Adapt observation for g1_23dof
- for idx in MISSING_JOINTS:
- self.groot_qj_all[idx] = 0.0
- self.groot_dqj_all[idx] = 0.0
+ self.groot_qj_all[idx] = lowstate.motor_state[idx].q
+ self.groot_dqj_all[idx] = lowstate.motor_state[idx].dq
# Scale joint positions and velocities
qj_obs = self.groot_qj_all.copy()
dqj_obs = self.groot_dqj_all.copy()
# Express IMU data in gravity frame of reference
- quat = [obs["imu.quat.w"], obs["imu.quat.x"], obs["imu.quat.y"], obs["imu.quat.z"]]
- ang_vel = np.array([obs["imu.gyro.x"], obs["imu.gyro.y"], obs["imu.gyro.z"]], dtype=np.float32)
- gravity_orientation = self.robot.get_gravity_orientation(quat)
+ quat = lowstate.imu_state.quaternion
+ ang_vel = np.array(lowstate.imu_state.gyroscope, dtype=np.float32)
+ gravity_orientation = get_gravity_orientation(quat)
# Scale joint positions and velocities before policy inference
qj_obs = (qj_obs - GROOT_DEFAULT_ANGLES) * DOF_POS_SCALE
@@ -186,73 +196,10 @@ class GrootLocomotionController:
# Transform action back to target joint positions
target_dof_pos_15 = GROOT_DEFAULT_ANGLES[:15] + self.groot_action * ACTION_SCALE
- # Build action dict (only first 15 joints for GR00T)
+ # Build action dict
action_dict = {}
for i in range(15):
motor_name = G1_29_JointIndex(i).name
action_dict[f"{motor_name}.q"] = float(target_dof_pos_15[i])
- # Zero out missing joints for g1_23dof
- for joint_idx in MISSING_JOINTS:
- motor_name = G1_29_JointIndex(joint_idx).name
- action_dict[f"{motor_name}.q"] = 0.0
-
- # Send action to robot
- self.robot.send_action(action_dict)
-
-
-def run(repo_id: str = DEFAULT_GROOT_REPO_ID) -> None:
- """Main function to run the GR00T locomotion controller.
-
- Args:
- repo_id: Hugging Face Hub repository ID for GR00T policies.
- """
- # Load policies
- policy_balance, policy_walk = load_groot_policies(repo_id=repo_id)
-
- # Initialize robot
- config = UnitreeG1Config()
- robot = UnitreeG1(config)
-
- robot.connect()
-
- # Initialize gr00T locomotion controller
- groot_controller = GrootLocomotionController(
- policy_balance=policy_balance,
- policy_walk=policy_walk,
- robot=robot,
- config=config,
- )
-
- try:
- robot.reset(CONTROL_DT, GROOT_DEFAULT_ANGLES)
-
- logger.info("Use joystick: LY=fwd/back, LX=left/right, RX=rotate, R1=raise waist, R2=lower waist")
- logger.info("Press Ctrl+C to stop")
-
- # Run step
- while not robot._shutdown_event.is_set():
- start_time = time.time()
- groot_controller.run_step()
- elapsed = time.time() - start_time
- sleep_time = max(0, CONTROL_DT - elapsed)
- time.sleep(sleep_time)
- except KeyboardInterrupt:
- logger.info("Stopping locomotion...")
- finally:
- if robot.is_connected:
- robot.disconnect()
- logger.info("Done!")
-
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="GR00T Locomotion Controller for Unitree G1")
- parser.add_argument(
- "--repo-id",
- type=str,
- default=DEFAULT_GROOT_REPO_ID,
- help=f"Hugging Face Hub repo ID for GR00T policies (default: {DEFAULT_GROOT_REPO_ID})",
- )
- args = parser.parse_args()
-
- run(repo_id=args.repo_id)
+ return action_dict
diff --git a/examples/unitree_g1/holosoma_locomotion.py b/src/lerobot/robots/unitree_g1/holosoma_locomotion.py
similarity index 53%
rename from examples/unitree_g1/holosoma_locomotion.py
rename to src/lerobot/robots/unitree_g1/holosoma_locomotion.py
index 3a07023de..857bb97bc 100644
--- a/examples/unitree_g1/holosoma_locomotion.py
+++ b/src/lerobot/robots/unitree_g1/holosoma_locomotion.py
@@ -14,21 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import argparse
import json
import logging
-import time
import numpy as np
import onnx
import onnxruntime as ort
from huggingface_hub import hf_hub_download
-from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
-from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
-from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
+from lerobot.robots.unitree_g1.g1_utils import (
+ REMOTE_AXES,
+ G1_29_JointArmIndex,
+ G1_29_JointIndex,
+ get_gravity_orientation,
+)
-logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DEFAULT_ANGLES = np.zeros(29, dtype=np.float32)
@@ -40,18 +40,13 @@ DEFAULT_ANGLES[16] = 0.2 # Left shoulder roll
DEFAULT_ANGLES[23] = -0.2 # Right shoulder roll
DEFAULT_ANGLES[[18, 25]] = 0.6 # Elbow
-MISSING_JOINTS = []
-G1_MODEL = "g1_23" # Or "g1_29"
-if G1_MODEL == "g1_23":
- MISSING_JOINTS = [12, 14, 20, 21, 27, 28] # Waist yaw/pitch, wrist pitch/yaw
-
# Control parameters
ACTION_SCALE = 0.25
-CONTROL_DT = 0.02 # 50Hz
+CONTROL_DT = 0.005 # 200Hz
ANG_VEL_SCALE = 0.25
DOF_POS_SCALE = 1.0
DOF_VEL_SCALE = 0.05
-GAIT_PERIOD = 1.0
+GAIT_PERIOD = 0.5
DEFAULT_HOLOSOMA_REPO_ID = "nepyope/holosoma_locomotion"
@@ -87,7 +82,7 @@ def load_policy(
logger.info(f"Policy loaded: {policy.get_inputs()[0].shape} → {policy.get_outputs()[0].shape}")
# Extract KP/KD from ONNX metadata
- model = onnx.load(policy_path)
+ model = onnx.load(policy_path, load_external_data=False)
metadata = {prop.key: prop.value for prop in model.metadata_props}
if "kp" not in metadata or "kd" not in metadata:
@@ -101,15 +96,13 @@ def load_policy(
class HolosomaLocomotionController:
- """Holosoma whole-body locomotion controller for Unitree G1."""
+ """Holosoma lower-body locomotion controller for Unitree G1."""
- def __init__(self, policy, robot, kp: np.ndarray, kd: np.ndarray):
- self.policy = policy
- self.robot = robot
+ control_dt = CONTROL_DT # Expose for unitree_g1.py
- # Override robot's PD gains with policy gains
- self.robot.kp = kp
- self.robot.kd = kd
+ def __init__(self):
+ # Load policy and gains
+ self.policy, self.kp, self.kd = load_policy()
self.cmd = np.zeros(3, dtype=np.float32)
@@ -124,35 +117,55 @@ class HolosomaLocomotionController:
self.phase_dt = 2 * np.pi / ((1.0 / CONTROL_DT) * GAIT_PERIOD)
self.is_standing = True
- def run_step(self):
- # Get current observation
- obs = self.robot.get_observation()
+ logger.info("HolosomaLocomotionController initialized")
- if not obs:
- return
+ def reset(self) -> None:
+ """Reset internal state for a new episode."""
+ self.cmd[:] = 0.0
+ self.qj[:] = 0.0
+ self.dqj[:] = 0.0
+ self.obs[:] = 0.0
+ self.last_action[:] = 0.0
+ self.phase = np.array([[0.0, np.pi]], dtype=np.float32)
+ self.is_standing = True
- # Get command from remote controller
- ly = obs["remote.ly"] if abs(obs["remote.ly"]) > 0.1 else 0.0
- lx = obs["remote.lx"] if abs(obs["remote.lx"]) > 0.1 else 0.0
- rx = obs["remote.rx"] if abs(obs["remote.rx"]) > 0.1 else 0.0
+ def run_step(self, action: dict, lowstate) -> dict:
+ """Run one step of the locomotion controller.
+
+ Args:
+ action: Action dict containing remote.lx/ly/rx/ry
+ lowstate: Robot lowstate containing motor positions/velocities and IMU
+
+ Returns:
+ Action dict for lower body joints (0-14)
+ """
+ if lowstate is None:
+ return {}
+
+ lx, ly, rx, _ry = (action.get(k, 0.0) for k in REMOTE_AXES)
+ ly = ly if abs(ly) > 0.1 else 0.0
+ lx = lx if abs(lx) > 0.1 else 0.0
+ rx = rx if abs(rx) > 0.1 else 0.0
+ ly = np.clip(ly, -0.3, 0.3)
+ lx = np.clip(lx, -0.3, 0.3)
self.cmd[:] = [ly, -lx, -rx]
- # Get joint positions and velocities
+ # Get joint positions and velocities from lowstate
for motor in G1_29_JointIndex:
- name = motor.name
idx = motor.value
- self.qj[idx] = obs[f"{name}.q"]
- self.dqj[idx] = obs[f"{name}.dq"]
+ self.qj[idx] = lowstate.motor_state[idx].q
+ self.dqj[idx] = lowstate.motor_state[idx].dq
- # Adapt observation for g1_23dof
- for idx in MISSING_JOINTS:
- self.qj[idx] = 0.0
- self.dqj[idx] = 0.0
+ # Hide arm positions from policy (show DEFAULT_ANGLES instead)
+ # This prevents policy from reacting to teleop arm movements
+ for arm_joint in G1_29_JointArmIndex:
+ self.qj[arm_joint.value] = DEFAULT_ANGLES[arm_joint.value]
+ self.dqj[arm_joint.value] = 0.0
# Express IMU data in gravity frame of reference
- quat = [obs["imu.quat.w"], obs["imu.quat.x"], obs["imu.quat.y"], obs["imu.quat.z"]]
- ang_vel = np.array([obs["imu.gyro.x"], obs["imu.gyro.y"], obs["imu.gyro.z"]], dtype=np.float32)
- gravity = self.robot.get_gravity_orientation(quat)
+ quat = lowstate.imu_state.quaternion
+ ang_vel = np.array(lowstate.imu_state.gyroscope, dtype=np.float32)
+ gravity = get_gravity_orientation(quat)
# Scale joint positions and velocities before policy inference
qj_obs = (self.qj - DEFAULT_ANGLES) * DOF_POS_SCALE
@@ -186,79 +199,16 @@ class HolosomaLocomotionController:
# Run policy inference
ort_in = {self.policy.get_inputs()[0].name: self.obs.reshape(1, -1).astype(np.float32)}
raw_action = self.policy.run(None, ort_in)[0].squeeze()
- action = np.clip(raw_action, -100.0, 100.0)
- self.last_action = action.copy()
+ policy_action = np.clip(raw_action, -100.0, 100.0)
+ self.last_action = policy_action.copy()
# Transform action back to target joint positions
- target = DEFAULT_ANGLES + action * ACTION_SCALE
+ target = DEFAULT_ANGLES + policy_action * ACTION_SCALE
- # Build action dict
+ # Build action dict (first 15 joints only)
action_dict = {}
- for motor in G1_29_JointIndex:
- action_dict[f"{motor.name}.q"] = float(target[motor.value])
+ for i in range(15):
+ motor_name = G1_29_JointIndex(i).name
+ action_dict[f"{motor_name}.q"] = float(target[i])
- # Zero out missing joints for g1_23dof
- for joint_idx in MISSING_JOINTS:
- motor_name = G1_29_JointIndex(joint_idx).name
- action_dict[f"{motor_name}.q"] = 0.0
-
- # Send action to robot
- self.robot.send_action(action_dict)
-
-
-def run(repo_id: str = DEFAULT_HOLOSOMA_REPO_ID, policy_type: str = "fastsac") -> None:
- """Main function to run the Holosoma locomotion controller.
-
- Args:
- repo_id: Hugging Face Hub repository ID for Holosoma policies.
- policy_type: Policy type to use ('fastsac' or 'ppo').
- """
- # Load policy and gains
- policy, kp, kd = load_policy(repo_id=repo_id, policy_type=policy_type)
-
- # Initialize robot
- config = UnitreeG1Config()
- robot = UnitreeG1(config)
- robot.connect()
-
- holosoma_controller = HolosomaLocomotionController(policy, robot, kp, kd)
-
- try:
- robot.reset(CONTROL_DT, DEFAULT_ANGLES)
-
- logger.info("Use joystick: LY=fwd/back, LX=left/right, RX=rotate")
- logger.info("Press Ctrl+C to stop")
-
- # Run step
- while not robot._shutdown_event.is_set():
- start_time = time.time()
- holosoma_controller.run_step()
- elapsed = time.time() - start_time
- sleep_time = max(0, CONTROL_DT - elapsed)
- time.sleep(sleep_time)
- except KeyboardInterrupt:
- logger.info("Stopping locomotion...")
- finally:
- if robot.is_connected:
- robot.disconnect()
- logger.info("Done!")
-
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Holosoma Locomotion Controller for Unitree G1")
- parser.add_argument(
- "--repo-id",
- type=str,
- default=DEFAULT_HOLOSOMA_REPO_ID,
- help=f"Hugging Face Hub repo ID for Holosoma policies (default: {DEFAULT_HOLOSOMA_REPO_ID})",
- )
- parser.add_argument(
- "--policy",
- type=str,
- choices=["fastsac", "ppo"],
- default="fastsac",
- help="Policy type to use: 'fastsac' (default) or 'ppo'",
- )
- args = parser.parse_args()
-
- run(repo_id=args.repo_id, policy_type=args.policy)
+ return action_dict
diff --git a/src/lerobot/robots/unitree_g1/run_g1_server.py b/src/lerobot/robots/unitree_g1/run_g1_server.py
index 70166b590..b5bd0baf8 100644
--- a/src/lerobot/robots/unitree_g1/run_g1_server.py
+++ b/src/lerobot/robots/unitree_g1/run_g1_server.py
@@ -24,6 +24,7 @@ This server runs on the robot and forwards:
Uses JSON for secure serialization instead of pickle.
"""
+import argparse
import base64
import contextlib
import json
@@ -38,6 +39,8 @@ from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ as hg_LowCmd, LowState_ as hg_LowState
from unitree_sdk2py.utils.crc import CRC
+from lerobot.cameras.zmq.image_server import ImageServer
+
# DDS topic names follow Unitree SDK naming conventions
# ruff: noqa: N816
kTopicLowCommand_Debug = "rt/lowcmd" # action to robot
@@ -150,6 +153,32 @@ def cmd_forward_loop(
def main() -> None:
"""Main entry point for the robot server bridge."""
+ parser = argparse.ArgumentParser(description="DDS-to-ZMQ bridge server for Unitree G1")
+ parser.add_argument("--camera", action="store_true", help="Also launch camera server")
+ parser.add_argument("--camera-device", type=int, default=4, help="Camera device ID (default: 4)")
+ parser.add_argument("--camera-fps", type=int, default=30, help="Camera FPS (default: 30)")
+ parser.add_argument("--camera-width", type=int, default=640, help="Camera width (default: 640)")
+ parser.add_argument("--camera-height", type=int, default=480, help="Camera height (default: 480)")
+ parser.add_argument("--camera-port", type=int, default=5555, help="Camera ZMQ port (default: 5555)")
+ args = parser.parse_args()
+
+ # Optionally start camera server in background thread
+ camera_thread = None
+ if args.camera:
+ camera_config = {
+ "fps": args.camera_fps,
+ "cameras": {
+ "head_camera": {
+ "device_id": args.camera_device,
+ "shape": [args.camera_height, args.camera_width],
+ }
+ },
+ }
+ camera_server = ImageServer(camera_config, port=args.camera_port)
+ camera_thread = threading.Thread(target=camera_server.run, daemon=True)
+ camera_thread.start()
+ print(f"Camera server started on port {args.camera_port} (device {args.camera_device})")
+
# initialize DDS
ChannelFactoryInitialize(0)
@@ -206,6 +235,8 @@ def main() -> None:
shutdown_event.set()
ctx.term() # terminates blocking zmq.recv() calls
t_state.join(timeout=2.0)
+ if camera_thread is not None:
+ camera_thread.join(timeout=2.0)
if __name__ == "__main__":
diff --git a/src/lerobot/robots/unitree_g1/unitree_g1.py b/src/lerobot/robots/unitree_g1/unitree_g1.py
index df0de8f19..41146ebe6 100644
--- a/src/lerobot/robots/unitree_g1/unitree_g1.py
+++ b/src/lerobot/robots/unitree_g1/unitree_g1.py
@@ -14,27 +14,67 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import annotations
+
import logging
-import struct
import threading
import time
from dataclasses import dataclass, field
from functools import cached_property
-from typing import Any
+from typing import TYPE_CHECKING, Protocol, runtime_checkable
import numpy as np
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.envs.factory import make_env
from lerobot.processor import RobotAction, RobotObservation
-from lerobot.robots.unitree_g1.g1_utils import G1_29_JointArmIndex, G1_29_JointIndex
-from lerobot.robots.unitree_g1.robot_kinematic_processor import G1_29_ArmIK
+from lerobot.robots.unitree_g1.g1_kinematics import G1_29_ArmIK
+from lerobot.robots.unitree_g1.g1_utils import (
+ REMOTE_AXES,
+ REMOTE_KEYS,
+ G1_29_JointArmIndex,
+ G1_29_JointIndex,
+ default_remote_input,
+ make_locomotion_controller,
+)
+from lerobot.utils.import_utils import _unitree_sdk_available
from ..robot import Robot
from .config_unitree_g1 import UnitreeG1Config
+if TYPE_CHECKING or _unitree_sdk_available:
+ from unitree_sdk2py.core.channel import (
+ ChannelFactoryInitialize as _SDKChannelFactoryInitialize,
+ ChannelPublisher as _SDKChannelPublisher,
+ ChannelSubscriber as _SDKChannelSubscriber,
+ )
+ from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
+ from unitree_sdk2py.idl.unitree_hg.msg.dds_ import (
+ LowCmd_ as hg_LowCmd,
+ LowState_ as hg_LowState,
+ )
+ from unitree_sdk2py.utils.crc import CRC
+else:
+ _SDKChannelFactoryInitialize = None
+ _SDKChannelPublisher = None
+ _SDKChannelSubscriber = None
+ unitree_hg_msg_dds__LowCmd_ = None
+ hg_LowCmd = None
+ hg_LowState = None
+ CRC = None
+
logger = logging.getLogger(__name__)
+
+@runtime_checkable
+class LocomotionController(Protocol):
+ control_dt: float
+
+ def run_step(self, action: dict, lowstate) -> dict: ...
+
+ def reset(self) -> None: ...
+
+
# DDS topic names follow Unitree SDK naming conventions
# ruff: noqa: N816
kTopicLowCommand_Debug = "rt/lowcmd"
@@ -63,7 +103,7 @@ class IMUState:
class G1_29_LowState: # noqa: N801
motor_state: list[MotorState] = field(default_factory=lambda: [MotorState() for _ in G1_29_JointIndex])
imu_state: IMUState = field(default_factory=IMUState)
- wireless_remote: Any = None # Raw wireless remote data
+ wireless_remote: bytes | None = None # Raw wireless remote data
mode_machine: int = 0 # Robot mode
@@ -71,25 +111,6 @@ class UnitreeG1(Robot):
config_class = UnitreeG1Config
name = "unitree_g1"
- # unitree remote controller
- class RemoteController:
- def __init__(self):
- self.lx = 0
- self.ly = 0
- self.rx = 0
- self.ry = 0
- self.button = [0] * 16
-
- def set(self, data):
- # wireless_remote
- keys = struct.unpack("H", data[2:4])[0]
- for i in range(16):
- self.button[i] = (keys & (1 << i)) >> i
- self.lx = struct.unpack("f", data[4:8])[0]
- self.rx = struct.unpack("f", data[8:12])[0]
- self.ry = struct.unpack("f", data[12:16])[0]
- self.ly = struct.unpack("f", data[20:24])[0]
-
def __init__(self, config: UnitreeG1Config):
super().__init__(config)
@@ -103,11 +124,9 @@ class UnitreeG1(Robot):
# Import channel classes based on mode
if config.is_simulation:
- from unitree_sdk2py.core.channel import (
- ChannelFactoryInitialize,
- ChannelPublisher,
- ChannelSubscriber,
- )
+ self._ChannelFactoryInitialize = _SDKChannelFactoryInitialize
+ self._ChannelPublisher = _SDKChannelPublisher
+ self._ChannelSubscriber = _SDKChannelSubscriber
else:
from lerobot.robots.unitree_g1.unitree_sdk2_socket import (
ChannelFactoryInitialize,
@@ -115,22 +134,30 @@ class UnitreeG1(Robot):
ChannelSubscriber,
)
- # Store for use in connect()
- self._ChannelFactoryInitialize = ChannelFactoryInitialize
- self._ChannelPublisher = ChannelPublisher
- self._ChannelSubscriber = ChannelSubscriber
+ self._ChannelFactoryInitialize = ChannelFactoryInitialize
+ self._ChannelPublisher = ChannelPublisher
+ self._ChannelSubscriber = ChannelSubscriber
# Initialize state variables
self.sim_env = None
self._env_wrapper = None
self._lowstate = None
+ self._lowstate_lock = threading.Lock()
self._shutdown_event = threading.Event()
self.subscribe_thread = None
- self.remote_controller = self.RemoteController()
- self.arm_ik = G1_29_ArmIK()
+ self.arm_ik = G1_29_ArmIK() if config.gravity_compensation else None
- def _subscribe_motor_state(self): # polls robot state @ 250Hz
+ # Lower-body controller loaded dynamically
+ self.controller: LocomotionController | None = make_locomotion_controller(config.controller)
+
+ # Controller thread state
+ self._controller_thread = None
+ self._controller_action_lock = threading.Lock()
+ self.controller_input = default_remote_input()
+ self.controller_output = {}
+
+ def _subscribe_lowstate(self): # polls robot state @ 250Hz
while not self._shutdown_event.is_set():
start_time = time.time()
@@ -143,11 +170,11 @@ class UnitreeG1(Robot):
lowstate = G1_29_LowState()
# Capture motor states using jointindex
- for id in G1_29_JointIndex:
- lowstate.motor_state[id].q = msg.motor_state[id].q
- lowstate.motor_state[id].dq = msg.motor_state[id].dq
- lowstate.motor_state[id].tau_est = msg.motor_state[id].tau_est
- lowstate.motor_state[id].temperature = msg.motor_state[id].temperature
+ for joint in G1_29_JointIndex:
+ lowstate.motor_state[joint].q = msg.motor_state[joint].q
+ lowstate.motor_state[joint].dq = msg.motor_state[joint].dq
+ lowstate.motor_state[joint].tau_est = msg.motor_state[joint].tau_est
+ lowstate.motor_state[joint].temperature = msg.motor_state[joint].temperature
# Capture IMU state
lowstate.imu_state.quaternion = list(msg.imu_state.quaternion)
@@ -162,31 +189,106 @@ class UnitreeG1(Robot):
# Capture mode_machine
lowstate.mode_machine = msg.mode_machine
- self._lowstate = lowstate
+ with self._lowstate_lock:
+ self._lowstate = lowstate
current_time = time.time()
all_t_elapsed = current_time - start_time
sleep_time = max(0, (self.control_dt - all_t_elapsed)) # maintain constant control dt
time.sleep(sleep_time)
+ def publish_lowcmd(
+ self,
+ action: RobotAction,
+ kp: np.ndarray | list[float] | None = None,
+ kd: np.ndarray | list[float] | None = None,
+ tau: np.ndarray | list[float] | None = None,
+ ) -> None: # writes robot command whenever requested
+ for motor in G1_29_JointIndex:
+ key = f"{motor.name}.q"
+ if key in action:
+ self.msg.motor_cmd[motor.value].q = action[key]
+ self.msg.motor_cmd[motor.value].qd = 0
+ self.msg.motor_cmd[motor.value].kp = (
+ kp[motor.value] if kp is not None else self.kp[motor.value]
+ )
+ self.msg.motor_cmd[motor.value].kd = (
+ kd[motor.value] if kd is not None else self.kd[motor.value]
+ )
+ self.msg.motor_cmd[motor.value].tau = tau[motor.value] if tau is not None else 0.0
+
+ self.msg.crc = self.crc.Crc(self.msg)
+ self.lowcmd_publisher.Write(self.msg)
+
+ @property
+ def _cameras_ft(self) -> dict[str, tuple]:
+ return {
+ cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
+ }
+
+ @cached_property
+ def observation_features(self) -> dict[str, type | tuple]:
+ return {**self._motors_ft, **self._cameras_ft}
+
@cached_property
def action_features(self) -> dict[str, type]:
- return {f"{G1_29_JointIndex(motor).name}.q": float for motor in G1_29_JointIndex}
+ if self.controller is None:
+ return {f"{G1_29_JointIndex(motor).name}.q": float for motor in G1_29_JointIndex}
- def calibrate(self) -> None: # robot is already calibrated
+ arm_features = {f"{G1_29_JointArmIndex(motor).name}.q": float for motor in G1_29_JointArmIndex}
+ remote_features = dict.fromkeys(REMOTE_AXES, float)
+ return {**arm_features, **remote_features}
+
+ def _controller_loop(self):
+ """Background thread that runs controller at policy's control_dt."""
+ control_dt = self.controller.control_dt
+ logger.info(f"Controller loop starting with control_dt={control_dt} ({1.0 / control_dt:.1f}Hz)")
+
+ loop_count = 0
+ last_log_time = time.time()
+
+ while not self._shutdown_event.is_set():
+ start_time = time.time()
+
+ with self._lowstate_lock:
+ lowstate = self._lowstate
+
+ if lowstate is not None and self.controller is not None:
+ loop_count += 1
+ if time.time() - last_log_time >= 5.0: # Log every 5 seconds
+ actual_hz = loop_count / (time.time() - last_log_time)
+ logger.info(
+ f"Controller actual rate: {actual_hz:.1f}Hz (target: {1.0 / control_dt:.1f}Hz)"
+ )
+ loop_count = 0
+ last_log_time = time.time()
+ # Read controller input snapshot
+ with self._controller_action_lock:
+ controller_input = dict(self.controller_input)
+
+ # Run controller step
+ controller_action = self.controller.run_step(controller_input, lowstate)
+
+ # Write controller output snapshot
+ with self._controller_action_lock:
+ self.controller_output = dict(controller_action)
+
+ ctrl_kp = self.controller.kp if hasattr(self.controller, "kp") else None
+ ctrl_kd = self.controller.kd if hasattr(self.controller, "kd") else None
+ self.publish_lowcmd(controller_action, kp=ctrl_kp, kd=ctrl_kd)
+
+ elapsed = time.time() - start_time
+ sleep_time = max(0, control_dt - elapsed)
+ time.sleep(sleep_time)
+
+ def calibrate(self) -> None:
+ # TODO: implement g1_29 calibration
pass
def configure(self) -> None:
pass
def connect(self, calibrate: bool = True) -> None: # connect to DDS
- from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
- from unitree_sdk2py.idl.unitree_hg.msg.dds_ import (
- LowCmd_ as hg_LowCmd,
- LowState_ as hg_LowState,
- )
- from unitree_sdk2py.utils.crc import CRC
-
# Initialize DDS channel and simulation environment
if self.config.is_simulation:
self._ChannelFactoryInitialize(0, "lo")
@@ -194,7 +296,7 @@ class UnitreeG1(Robot):
# Extract the actual gym env from the dict structure
self.sim_env = self._env_wrapper["hub_env"][0].envs[0]
else:
- self._ChannelFactoryInitialize(0)
+ self._ChannelFactoryInitialize(0, config=self.config)
# Initialize direct motor control interface
self.lowcmd_publisher = self._ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd)
@@ -203,7 +305,7 @@ class UnitreeG1(Robot):
self.lowstate_subscriber.Init()
# Start subscribe thread to read robot state
- self.subscribe_thread = threading.Thread(target=self._subscribe_motor_state)
+ self.subscribe_thread = threading.Thread(target=self._subscribe_lowstate)
self.subscribe_thread.start()
# Connect cameras
@@ -220,25 +322,53 @@ class UnitreeG1(Robot):
# Wait for first state message to arrive
lowstate = None
+ deadline = time.time() + 10.0
while lowstate is None:
- lowstate = self._lowstate
+ with self._lowstate_lock:
+ lowstate = self._lowstate
if lowstate is None:
+ if time.time() > deadline:
+ raise TimeoutError("Timed out waiting for robot state (10s)")
+ logger.warning("[UnitreeG1] Waiting for robot state...")
time.sleep(0.01)
- logger.warning("[UnitreeG1] Waiting for robot state...")
- logger.warning("[UnitreeG1] Connected to robot.")
+ logger.info("[UnitreeG1] Connected to robot.")
self.msg.mode_machine = lowstate.mode_machine
- # Initialize all motors with unified kp/kd from config
self.kp = np.array(self.config.kp, dtype=np.float32)
self.kd = np.array(self.config.kd, dtype=np.float32)
- for id in G1_29_JointIndex:
- self.msg.motor_cmd[id].mode = 1
- self.msg.motor_cmd[id].kp = self.kp[id.value]
- self.msg.motor_cmd[id].kd = self.kd[id.value]
- self.msg.motor_cmd[id].q = lowstate.motor_state[id.value].q
+ for joint in G1_29_JointIndex:
+ self.msg.motor_cmd[joint].mode = 1
+ self.msg.motor_cmd[joint].kp = self.kp[joint.value]
+ self.msg.motor_cmd[joint].kd = self.kd[joint.value]
+ self.msg.motor_cmd[joint].q = lowstate.motor_state[joint.value].q
+
+ # Start controller thread if enabled
+ if self.controller is not None:
+ self._controller_thread = threading.Thread(target=self._controller_loop, daemon=True)
+ self._controller_thread.start()
+ fps = int(1.0 / self.controller.control_dt)
+ logger.info(f"Controller thread started ({fps}Hz)")
+
+ def _send_zero_torque(self) -> None:
+ """Send a zero-gain command to make joints passive before shutting down."""
+ try:
+ with self._lowstate_lock:
+ lowstate = self._lowstate
+ if lowstate is None:
+ return
+ action = {f"{motor.name}.q": lowstate.motor_state[motor.value].q for motor in G1_29_JointIndex}
+ zero_gains = np.zeros(29, dtype=np.float32)
+ self.publish_lowcmd(action, kp=zero_gains, kd=zero_gains, tau=zero_gains)
+ logger.info("Sent zero-torque command for safe shutdown")
+ except Exception as e:
+ logger.warning(f"Failed to send zero-torque on disconnect: {e}")
def disconnect(self):
+ # Put robot in passive mode before stopping threads
+ if not self.config.is_simulation:
+ self._send_zero_torque()
+
# Signal thread to stop and unblock any waits
self._shutdown_event.set()
@@ -248,6 +378,12 @@ class UnitreeG1(Robot):
if self.subscribe_thread.is_alive():
logger.warning("Subscribe thread did not stop cleanly")
+ # Wait for controller thread to finish
+ if self._controller_thread is not None:
+ self._controller_thread.join(timeout=2.0)
+ if self._controller_thread.is_alive():
+ logger.warning("Controller thread did not stop cleanly")
+
# Close simulation environment
if self.config.is_simulation and self.sim_env is not None:
try:
@@ -274,7 +410,8 @@ class UnitreeG1(Robot):
cam.disconnect()
def get_observation(self) -> RobotObservation:
- lowstate = self._lowstate
+ with self._lowstate_lock:
+ lowstate = self._lowstate
if lowstate is None:
return {}
@@ -313,14 +450,9 @@ class UnitreeG1(Robot):
obs["imu.rpy.pitch"] = lowstate.imu_state.rpy[1]
obs["imu.rpy.yaw"] = lowstate.imu_state.rpy[2]
- # Controller - parse wireless_remote and add to obs
- if lowstate.wireless_remote and len(lowstate.wireless_remote) >= 24:
- self.remote_controller.set(lowstate.wireless_remote)
- obs["remote.buttons"] = self.remote_controller.button.copy()
- obs["remote.lx"] = self.remote_controller.lx
- obs["remote.ly"] = self.remote_controller.ly
- obs["remote.rx"] = self.remote_controller.rx
- obs["remote.ry"] = self.remote_controller.ry
+ # Wireless remote (raw bytes for teleoperator)
+ if lowstate.wireless_remote:
+ obs["wireless_remote"] = lowstate.wireless_remote
# Cameras - read images from ZMQ cameras
for cam_name, cam in self._cameras.items():
@@ -328,73 +460,63 @@ class UnitreeG1(Robot):
return obs
+ def send_action(self, action: RobotAction) -> RobotAction:
+ action_to_publish = action
+ if self.controller is not None:
+ # Controller thread owns legs/waist. Here we only update joystick inputs
+ # and publish arm targets from the teleoperator.
+ self._update_controller_action(action)
+ arm_prefixes = tuple(j.name for j in G1_29_JointArmIndex)
+ action_to_publish = {
+ key: value
+ for key, value in action.items()
+ if key.endswith(".q") and key.startswith(arm_prefixes)
+ }
+
+ tau = None
+ if self.config.gravity_compensation and self.arm_ik is not None:
+ tau = np.zeros(29, dtype=np.float32)
+ action_np = np.array(
+ [
+ action_to_publish.get(f"{joint.name}.q", self.msg.motor_cmd[joint.value].q)
+ for joint in G1_29_JointArmIndex
+ ],
+ dtype=np.float32,
+ )
+ arm_tau = self.arm_ik.solve_tau(action_np)
+ arm_start_idx = G1_29_JointArmIndex.kLeftShoulderPitch.value
+ for joint in G1_29_JointArmIndex:
+ local_idx = joint.value - arm_start_idx
+ tau[joint.value] = arm_tau[local_idx]
+
+ self.publish_lowcmd(action_to_publish, tau=tau)
+ return action
+
+ def _update_controller_action(self, action: RobotAction) -> None:
+ """Update controller input state from incoming teleop action."""
+ with self._controller_action_lock:
+ for key in REMOTE_KEYS:
+ if key in action:
+ self.controller_input[key] = action[key]
+
@property
def is_calibrated(self) -> bool:
return True
@property
def is_connected(self) -> bool:
- return self._lowstate is not None
+ with self._lowstate_lock:
+ return self._lowstate is not None
@property
def _motors_ft(self) -> dict[str, type]:
+ """Joint positions for all 29 joints."""
return {f"{G1_29_JointIndex(motor).name}.q": float for motor in G1_29_JointIndex}
@property
def cameras(self) -> dict:
return self._cameras
- @property
- def _cameras_ft(self) -> dict[str, tuple]:
- return {
- cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
- }
-
- @cached_property
- def observation_features(self) -> dict[str, type | tuple]:
- return {**self._motors_ft, **self._cameras_ft}
-
- def send_action(self, action: RobotAction) -> RobotAction:
- for motor in G1_29_JointIndex:
- key = f"{motor.name}.q"
- if key in action:
- self.msg.motor_cmd[motor.value].q = action[key]
- self.msg.motor_cmd[motor.value].qd = 0
- self.msg.motor_cmd[motor.value].kp = self.kp[motor.value]
- self.msg.motor_cmd[motor.value].kd = self.kd[motor.value]
- self.msg.motor_cmd[motor.value].tau = 0
-
- if self.config.gravity_compensation:
- # Build action_np from motor commands (arm joints are indices 15-28, local indices 0-13)
- action_np = np.zeros(14)
- arm_start_idx = G1_29_JointArmIndex.kLeftShoulderPitch.value # 15
- for joint in G1_29_JointArmIndex:
- local_idx = joint.value - arm_start_idx
- action_np[local_idx] = self.msg.motor_cmd[joint.value].q
- tau = self.arm_ik.solve_tau(action_np)
-
- # Apply tau back to motor commands
- for joint in G1_29_JointArmIndex:
- local_idx = joint.value - arm_start_idx
- self.msg.motor_cmd[joint.value].tau = tau[local_idx]
-
- self.msg.crc = self.crc.Crc(self.msg)
- self.lowcmd_publisher.Write(self.msg)
- return action
-
- def get_gravity_orientation(self, quaternion): # get gravity orientation from quaternion
- """Get gravity orientation from quaternion."""
- qw = quaternion[0]
- qx = quaternion[1]
- qy = quaternion[2]
- qz = quaternion[3]
-
- gravity_orientation = np.zeros(3)
- gravity_orientation[0] = 2 * (-qz * qx + qw * qy)
- gravity_orientation[1] = -2 * (qz * qy + qw * qx)
- gravity_orientation[2] = 1 - 2 * (qw * qw + qz * qz)
- return gravity_orientation
-
def reset(
self,
control_dt: float | None = None,
@@ -407,15 +529,9 @@ class UnitreeG1(Robot):
if self.config.is_simulation and self.sim_env is not None:
self.sim_env.reset()
-
- for motor in G1_29_JointIndex:
- self.msg.motor_cmd[motor.value].q = default_positions[motor.value]
- self.msg.motor_cmd[motor.value].qd = 0
- self.msg.motor_cmd[motor.value].kp = self.kp[motor.value]
- self.msg.motor_cmd[motor.value].kd = self.kd[motor.value]
- self.msg.motor_cmd[motor.value].tau = 0
- self.msg.crc = self.crc.Crc(self.msg)
- self.lowcmd_publisher.Write(self.msg)
+ self.publish_lowcmd(
+ {f"{motor.name}.q": float(default_positions[motor.value]) for motor in G1_29_JointIndex}
+ )
else:
total_time = 3.0
num_steps = int(total_time / control_dt)
@@ -446,4 +562,8 @@ class UnitreeG1(Robot):
sleep_time = max(0, control_dt - elapsed)
time.sleep(sleep_time)
+ # Reset controller internal state (gait phase, obs history, etc.)
+ if self.controller is not None and hasattr(self.controller, "reset"):
+ self.controller.reset()
+
logger.info("Reached default position")
diff --git a/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py b/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py
index ad96df965..0f1f8f8d6 100644
--- a/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py
+++ b/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py
@@ -22,6 +22,8 @@ import zmq
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
+# Module-level ZMQ state mirrors the Unitree SDK's global ChannelFactory Singleton.
+# Only one robot connection per process is supported.
_ctx: zmq.Context | None = None
_lowcmd_sock: zmq.Socket | None = None
_lowstate_sock: zmq.Socket | None = None
@@ -97,17 +99,22 @@ def lowcmd_to_dict(topic: str, msg: Any) -> dict[str, Any]:
}
-def ChannelFactoryInitialize(*args: Any, **kwargs: Any) -> None: # noqa: N802
+def ChannelFactoryInitialize(domain_id: int = 0, config: Any = None) -> None: # noqa: N802
"""
Initialize ZMQ sockets for robot communication.
This function mimics the Unitree SDK's ChannelFactoryInitialize but uses
ZMQ sockets to connect to the robot server bridge instead of DDS.
+
+ Args:
+ domain_id: Ignored (for API compatibility with Unitree SDK)
+ config: UnitreeG1Config instance with robot_ip
"""
global _ctx, _lowcmd_sock, _lowstate_sock
# read socket config
- config = UnitreeG1Config()
+ if config is None:
+ config = UnitreeG1Config()
robot_ip = config.robot_ip
ctx = zmq.Context.instance()
diff --git a/src/lerobot/scripts/lerobot_record.py b/src/lerobot/scripts/lerobot_record.py
index 72708ba23..dc682fe6f 100644
--- a/src/lerobot/scripts/lerobot_record.py
+++ b/src/lerobot/scripts/lerobot_record.py
@@ -369,6 +369,8 @@ def record_loop(
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
elif policy is None and isinstance(teleop, Teleoperator):
+ if robot.name == "unitree_g1":
+ teleop.send_feedback(obs)
act = teleop.get_action()
# Applies a pipeline to the raw teleop action, default is IdentityProcessor
@@ -556,10 +558,6 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
):
log_say("Reset the environment", cfg.play_sounds)
- # reset g1 robot
- if robot.name == "unitree_g1":
- robot.reset()
-
record_loop(
robot=robot,
events=events,
diff --git a/src/lerobot/scripts/lerobot_teleoperate.py b/src/lerobot/scripts/lerobot_teleoperate.py
index dad479b2e..f050d572a 100644
--- a/src/lerobot/scripts/lerobot_teleoperate.py
+++ b/src/lerobot/scripts/lerobot_teleoperate.py
@@ -60,6 +60,7 @@ import rerun as rr
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
+from lerobot.cameras.zmq.configuration_zmq import ZMQCameraConfig # noqa: F401
from lerobot.configs import parser
from lerobot.processor import (
RobotAction,
@@ -153,7 +154,6 @@ def teleop_loop(
display_len = max(len(key) for key in robot.action_features)
start = time.perf_counter()
-
while True:
loop_start = time.perf_counter()
@@ -163,6 +163,9 @@ def teleop_loop(
# given that it is the identity processor as default
obs = robot.get_observation()
+ if robot.name == "unitree_g1":
+ teleop.send_feedback(obs)
+
# Get teleop action
raw_action = teleop.get_action()
diff --git a/src/lerobot/teleoperators/unitree_g1/__init__.py b/src/lerobot/teleoperators/unitree_g1/__init__.py
index 45955a0e2..5e67538b8 100644
--- a/src/lerobot/teleoperators/unitree_g1/__init__.py
+++ b/src/lerobot/teleoperators/unitree_g1/__init__.py
@@ -19,3 +19,13 @@ from .exo_calib import ExoskeletonCalibration, ExoskeletonJointCalibration
from .exo_ik import ExoskeletonIKHelper
from .exo_serial import ExoskeletonArm
from .unitree_g1 import UnitreeG1Teleoperator
+
+__all__ = [
+ "ExoskeletonArmPortConfig",
+ "ExoskeletonCalibration",
+ "ExoskeletonIKHelper",
+ "ExoskeletonJointCalibration",
+ "ExoskeletonArm",
+ "UnitreeG1Teleoperator",
+ "UnitreeG1TeleoperatorConfig",
+]
diff --git a/src/lerobot/teleoperators/unitree_g1/exo_calib.py b/src/lerobot/teleoperators/unitree_g1/exo_calib.py
index 2927a1b55..b90e8fd7e 100644
--- a/src/lerobot/teleoperators/unitree_g1/exo_calib.py
+++ b/src/lerobot/teleoperators/unitree_g1/exo_calib.py
@@ -35,6 +35,9 @@ import serial
logger = logging.getLogger(__name__)
+ADC_MAX = 2**12 - 1
+ADC_HALF = ADC_MAX / 2
+
# exoskeleton joint names -> ADC channel pairs. TODO: add wrist pitch and wrist yaw
JOINTS = {
"shoulder_pitch": (0, 1),
@@ -59,7 +62,7 @@ class ExoskeletonCalibration:
version: int = 2
side: str = ""
- adc_max: int = 2**12 - 1
+ adc_max: int = ADC_MAX
joints: list[ExoskeletonJointCalibration] = field(default_factory=list)
def to_dict(self) -> dict:
@@ -92,7 +95,7 @@ class ExoskeletonCalibration:
return cls(
version=data.get("version", 2),
side=data.get("side", ""),
- adc_max=data.get("adc_max", 2**12 - 1),
+ adc_max=data.get("adc_max", ADC_MAX),
joints=joints,
)
@@ -112,11 +115,8 @@ class CalibParams:
def normalize_angle(angle: float) -> float:
- while angle > np.pi:
- angle -= 2 * np.pi
- while angle < -np.pi:
- angle += 2 * np.pi
- return angle
+ """Normalize angle to [-pi, pi]."""
+ return float(np.arctan2(np.sin(angle), np.cos(angle)))
def joint_z_and_angle(raw16: list[int], j: ExoskeletonJointCalibration) -> tuple[np.ndarray, float]:
@@ -125,7 +125,7 @@ def joint_z_and_angle(raw16: list[int], j: ExoskeletonJointCalibration) -> tuple
"""
pair = JOINTS[j.name]
s, c = raw16[pair[0]], raw16[pair[1]] # get sin and cos
- p = np.array([float(c) - (2**12 - 1) / 2, float(s) - (2**12 - 1) / 2]) # center the raw values
+ p = np.array([float(c) - ADC_HALF, float(s) - ADC_HALF]) # center the raw values
z = np.asarray(j.T) @ (
p - np.asarray(j.center_fit)
) # center the ellipse and invert the transformation matrix to get unit circle coords
@@ -167,7 +167,7 @@ def run_exo_calibration(
def read_joint_point(raw16: list[int], pair: tuple[int, int]):
s, c = raw16[pair[0]], raw16[pair[1]]
- return float(c) - (2**12 - 1) / 2, float(s) - (2**12 - 1) / 2, float(s), float(c)
+ return float(c) - ADC_HALF, float(s) - ADC_HALF, float(s), float(c)
def select_fit_subset(xs, ys):
"""Select and filter points for ellipse fitting. Trims outliers by radius and downsamples."""
@@ -317,7 +317,7 @@ def run_exo_calibration(
calib = ExoskeletonCalibration(
version=2,
side=side,
- adc_max=2**12 - 1,
+ adc_max=ADC_MAX,
joints=[
ExoskeletonJointCalibration(
name=j["name"],
@@ -367,8 +367,8 @@ def run_exo_calibration(
state["win_s"].append(s_raw)
state["win_c"].append(c_raw)
if len(state["win_s"]) >= max(3, params.median_window):
- state["ys"].append(running_median(state["win_s"]) - (2**12 - 1) / 2)
- state["xs"].append(running_median(state["win_c"]) - (2**12 - 1) / 2)
+ state["ys"].append(running_median(state["win_s"]) - ADC_HALF)
+ state["xs"].append(running_median(state["win_c"]) - ADC_HALF)
else:
jdata = joints_out[-1]
z = np.array(jdata["T"]) @ (np.array([x_raw, y_raw]) - np.array(jdata["center_fit"]))
diff --git a/src/lerobot/teleoperators/unitree_g1/exo_ik.py b/src/lerobot/teleoperators/unitree_g1/exo_ik.py
index 92519540f..3fd18d2f8 100644
--- a/src/lerobot/teleoperators/unitree_g1/exo_ik.py
+++ b/src/lerobot/teleoperators/unitree_g1/exo_ik.py
@@ -25,8 +25,8 @@ from dataclasses import dataclass
import numpy as np
+from lerobot.robots.unitree_g1.g1_kinematics import G1_29_ArmIK
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointArmIndex
-from lerobot.robots.unitree_g1.robot_kinematic_processor import G1_29_ArmIK
from .exo_calib import JOINTS
diff --git a/src/lerobot/teleoperators/unitree_g1/exo_serial.py b/src/lerobot/teleoperators/unitree_g1/exo_serial.py
index 1211c57cc..4f45997c0 100644
--- a/src/lerobot/teleoperators/unitree_g1/exo_serial.py
+++ b/src/lerobot/teleoperators/unitree_g1/exo_serial.py
@@ -32,25 +32,29 @@ def parse_raw16(line: bytes) -> list[int] | None:
if len(parts) < 16:
return None
return [int(x) for x in parts[:16]]
- except Exception:
+ except (ValueError, IndexError):
return None
def read_raw_from_serial(ser) -> list[int] | None:
"""Read latest sample from serial; if buffer is backed up, keep only the newest."""
- last = None
- while ser.in_waiting > 0:
- b = ser.readline()
- if not b:
- break
- raw16 = parse_raw16(b)
- if raw16 is not None:
- last = raw16
- if last is None:
- b = ser.readline()
- if b:
- last = parse_raw16(b)
- return last
+ try:
+ last = None
+ while ser.in_waiting > 0:
+ b = ser.readline()
+ if not b:
+ break
+ raw16 = parse_raw16(b)
+ if raw16 is not None:
+ last = raw16
+ if last is None:
+ b = ser.readline()
+ if b:
+ last = parse_raw16(b)
+ return last
+ except serial.SerialException as e:
+ logger.warning(f"Serial read error: {e}")
+ return None
@dataclass
@@ -115,5 +119,6 @@ class ExoskeletonArm:
return {} if raw is None else exo_raw_to_angles(raw, self.calibration)
def calibrate(self) -> None:
- ser = self._ser
- self.calibration = run_exo_calibration(ser, self.side, self.calibration_fpath)
+ if not self.is_connected:
+ raise RuntimeError("Cannot calibrate: exoskeleton not connected")
+ self.calibration = run_exo_calibration(self._ser, self.side, self.calibration_fpath)
diff --git a/src/lerobot/teleoperators/unitree_g1/unitree_g1.py b/src/lerobot/teleoperators/unitree_g1/unitree_g1.py
index 3779d83ec..242613e7e 100644
--- a/src/lerobot/teleoperators/unitree_g1/unitree_g1.py
+++ b/src/lerobot/teleoperators/unitree_g1/unitree_g1.py
@@ -17,9 +17,22 @@
import logging
import time
from functools import cached_property
+from typing import TYPE_CHECKING, Any
-from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
+from lerobot.robots.unitree_g1.g1_utils import REMOTE_AXES, G1_29_JointArmIndex
from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS
+from lerobot.utils.import_utils import _unitree_sdk_available
+
+if TYPE_CHECKING or _unitree_sdk_available:
+ from unitree_sdk2py.utils.joystick import Joystick
+else:
+
+ class Joystick:
+ def __init__(self):
+ raise ImportError(
+ "unitree_sdk2py is required for RemoteController. Install with: pip install unitree_sdk2py"
+ )
+
from ..teleoperator import Teleoperator
from .config_unitree_g1 import UnitreeG1TeleoperatorConfig
@@ -29,6 +42,120 @@ from .exo_serial import ExoskeletonArm
logger = logging.getLogger(__name__)
+class RemoteController:
+ """Unitree remote controller data parser for joystick and button state."""
+
+ # ADC parameters for exoskeleton joystick (12-bit ADC)
+ ADC_MAX = 4095
+ ADC_HALF = ADC_MAX / 2
+ JOYSTICK_X_IDX = 11 # X axis in raw ADC array
+ JOYSTICK_BTN_IDX = 12 # Button in raw ADC array
+ JOYSTICK_Y_IDX = 13 # Y axis in raw ADC array
+
+ # Map SDK named buttons to positional indices matching the wireless_remote
+ # byte layout (little-endian uint16 from bytes 2-3).
+ _BUTTON_MAP: list[str] = [
+ "RB",
+ "LB",
+ "start",
+ "back",
+ "RT",
+ "LT",
+ "",
+ "",
+ "A",
+ "B",
+ "X",
+ "Y",
+ "up",
+ "right",
+ "down",
+ "left",
+ ]
+
+ def __init__(self):
+ self.lx = 0.0
+ self.ly = 0.0
+ self.rx = 0.0
+ self.ry = 0.0
+ self.button = [0] * 16
+ self.remote_action = dict.fromkeys(REMOTE_AXES, 0.0)
+
+ # SDK joystick parser for wireless remote bytes
+ self._joystick = Joystick()
+ # Disable axis smoothing and deadzone to preserve raw values
+ for axis in (self._joystick.lx, self._joystick.ly, self._joystick.rx, self._joystick.ry):
+ axis.smooth = 1.0
+ axis.deadzone = 0.0
+
+ # Joystick center calibration (read at connect time)
+ self.left_center_x = self.ADC_HALF
+ self.left_center_y = self.ADC_HALF
+ self.right_center_x = self.ADC_HALF
+ self.right_center_y = self.ADC_HALF
+
+ # Whether to use exo joystick (detected at connect time)
+ self.use_left_exo_joystick = False
+ self.use_right_exo_joystick = False
+
+ def _sync_remote_action(self) -> None:
+ self.remote_action.update(zip(REMOTE_AXES, (self.lx, self.ly, self.rx, self.ry), strict=True))
+
+ def calibrate_center(self, raw16: list[int] | None, side: str) -> None:
+ if raw16 is None or len(raw16) < 16:
+ logger.info(f"{side.capitalize()} exo joystick: no data available")
+ return
+
+ btn_val = raw16[self.JOYSTICK_BTN_IDX]
+ logger.info(f"{side.capitalize()} exo joystick button ADC: {btn_val} (threshold: {self.ADC_HALF})")
+ if btn_val <= self.ADC_HALF:
+ logger.info(f"{side.capitalize()} exo joystick not detected (button below threshold)")
+ return
+
+ x = raw16[self.JOYSTICK_X_IDX]
+ y = raw16[self.JOYSTICK_Y_IDX]
+ if side == "left":
+ self.use_left_exo_joystick = True
+ self.left_center_x, self.left_center_y = x, y
+ else:
+ self.use_right_exo_joystick = True
+ self.right_center_x, self.right_center_y = x, y
+ logger.info(f"{side.capitalize()} exo joystick enabled, center: x={x}, y={y}")
+
+ def set_from_exo(self, raw16: list[int] | None, side: str) -> None:
+ if raw16 is None or len(raw16) < 16:
+ return
+
+ if side == "left":
+ if not self.use_left_exo_joystick:
+ return
+ self.lx = (raw16[self.JOYSTICK_X_IDX] - self.left_center_x) / self.ADC_HALF
+ self.ly = (raw16[self.JOYSTICK_Y_IDX] - self.left_center_y) / self.ADC_HALF
+ self.button[4] = 1 if raw16[self.JOYSTICK_BTN_IDX] < self.ADC_HALF else 0
+ return
+
+ if not self.use_right_exo_joystick:
+ return
+ self.rx = (raw16[self.JOYSTICK_X_IDX] - self.right_center_x) / self.ADC_HALF
+ self.ry = (raw16[self.JOYSTICK_Y_IDX] - self.right_center_y) / self.ADC_HALF
+ self.button[0] = 1 if raw16[self.JOYSTICK_BTN_IDX] < self.ADC_HALF else 0
+
+ def set_from_wireless(self, wireless_remote: bytes) -> None:
+ """Parse Unitree wireless remote raw bytes into joystick + button state."""
+ if len(wireless_remote) < 24:
+ return
+ self._joystick.extract(wireless_remote)
+
+ self.lx = self._joystick.lx.data
+ self.ly = self._joystick.ly.data
+ self.rx = self._joystick.rx.data
+ self.ry = self._joystick.ry.data
+
+ for i, name in enumerate(self._BUTTON_MAP):
+ if name:
+ self.button[i] = getattr(self._joystick, name).data
+
+
class UnitreeG1Teleoperator(Teleoperator):
"""
Bimanual exoskeleton arms teleoperator for Unitree G1 arms.
@@ -43,6 +170,13 @@ class UnitreeG1Teleoperator(Teleoperator):
def __init__(self, config: UnitreeG1TeleoperatorConfig):
super().__init__(config)
self.config = config
+ left_exo_enabled = bool(config.left_arm_config.port.strip())
+ right_exo_enabled = bool(config.right_arm_config.port.strip())
+ if left_exo_enabled != right_exo_enabled:
+ raise ValueError(
+ "Invalid exo config: set both left/right exo ports, or leave both empty for remote-only mode."
+ )
+ self._arm_control_enabled = left_exo_enabled and right_exo_enabled
# Setup calibration directory
self.calibration_dir = (
@@ -70,24 +204,37 @@ class UnitreeG1Teleoperator(Teleoperator):
)
self.ik_helper: ExoskeletonIKHelper | None = None
+ self.remote_controller = RemoteController()
@cached_property
def action_features(self) -> dict[str, type]:
- return {f"{name}.q": float for name in self._g1_joint_names}
+ remote_features = dict.fromkeys(self.remote_controller.remote_action, float)
+ if not self._arm_control_enabled:
+ return remote_features
+ joint_features = {f"{name}.q": float for name in self._g1_arm_joint_names}
+ return {**joint_features, **remote_features}
@cached_property
def feedback_features(self) -> dict[str, type]:
- return {}
+ return {"wireless_remote": bytes}
@property
def is_connected(self) -> bool:
+ if not self._arm_control_enabled:
+ return True
return self.left_arm.is_connected and self.right_arm.is_connected
@property
def is_calibrated(self) -> bool:
+ if not self._arm_control_enabled:
+ return True
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
def connect(self, calibrate: bool = True) -> None:
+ if not self._arm_control_enabled:
+ logger.warning("Exo ports not fully configured; teleop will send joystick only (no arm actions)")
+ return
+
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@@ -95,6 +242,13 @@ class UnitreeG1Teleoperator(Teleoperator):
self.ik_helper = ExoskeletonIKHelper(frozen_joints=frozen_joints)
logger.info("IK helper initialized")
+ time.sleep(0.1) # Give serial time to populate buffer
+
+ left_raw = self.left_arm.read_raw()
+ right_raw = self.right_arm.read_raw()
+ self.remote_controller.calibrate_center(left_raw, "left")
+ self.remote_controller.calibrate_center(right_raw, "right")
+
def calibrate(self) -> None:
if not self.left_arm.is_calibrated:
logger.info("Starting calibration for left arm...")
@@ -115,12 +269,33 @@ class UnitreeG1Teleoperator(Teleoperator):
pass
def get_action(self) -> dict[str, float]:
- left_angles = self.left_arm.get_angles()
- right_angles = self.right_arm.get_angles()
- return self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles)
+ joint_action = {}
+ left_raw = None
+ right_raw = None
+ if self._arm_control_enabled:
+ left_raw = self.left_arm.read_raw()
+ right_raw = self.right_arm.read_raw()
- def send_feedback(self, feedback: dict[str, float]) -> None:
- raise NotImplementedError("Exoskeleton arms do not support feedback")
+ left_angles = self.left_arm.get_angles()
+ right_angles = self.right_arm.get_angles()
+ joint_action = self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles)
+
+ # Wireless remote has priority when non-zero; otherwise, use exo joystick.
+ rc = self.remote_controller
+ wireless_active = (
+ abs(rc.lx) > 1e-3 or abs(rc.ly) > 1e-3 or abs(rc.rx) > 1e-3 or abs(rc.ry) > 1e-3
+ ) or any(rc.button)
+ if self._arm_control_enabled and not wireless_active:
+ rc.set_from_exo(left_raw, "left")
+ rc.set_from_exo(right_raw, "right")
+
+ rc._sync_remote_action()
+ return {**joint_action, **rc.remote_action}
+
+ def send_feedback(self, feedback: dict[str, Any]) -> None:
+ wireless_remote = feedback.get("wireless_remote")
+ if wireless_remote is not None:
+ self.remote_controller.set_from_wireless(wireless_remote)
def disconnect(self) -> None:
self.left_arm.disconnect()
@@ -153,5 +328,5 @@ class UnitreeG1Teleoperator(Teleoperator):
print("\n\nVisualization stopped.")
@cached_property
- def _g1_joint_names(self) -> list[str]:
- return [joint.name for joint in G1_29_JointIndex]
+ def _g1_arm_joint_names(self) -> list[str]:
+ return [joint.name for joint in G1_29_JointArmIndex]
diff --git a/src/lerobot/utils/import_utils.py b/src/lerobot/utils/import_utils.py
index c33a73589..cae445e06 100644
--- a/src/lerobot/utils/import_utils.py
+++ b/src/lerobot/utils/import_utils.py
@@ -74,6 +74,8 @@ _peft_available = is_package_available("peft")
_scipy_available = is_package_available("scipy")
_reachy2_sdk_available = is_package_available("reachy2_sdk")
_can_available = is_package_available("python-can", "can")
+_unitree_sdk_available = is_package_available("unitree-sdk2", "unitree_sdk2py")
+_pygame_available = is_package_available("pygame")
def make_device_from_device_class(config: ChoiceRegistry) -> Any:
diff --git a/tests/robots/test_unitree_g1.py b/tests/robots/test_unitree_g1.py
new file mode 100644
index 000000000..8cc85b572
--- /dev/null
+++ b/tests/robots/test_unitree_g1.py
@@ -0,0 +1,267 @@
+#!/usr/bin/env python
+
+# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for Unitree G1 robot. Meant to be run in an environment where the Unitree SDK is installed."""
+
+from unittest.mock import MagicMock, patch
+
+import numpy as np
+import pytest
+
+from lerobot.utils.import_utils import _unitree_sdk_available
+
+if not _unitree_sdk_available:
+ pytest.skip("Unitree SDK not available", allow_module_level=True)
+
+from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
+from lerobot.robots.unitree_g1.g1_utils import (
+ NUM_MOTORS,
+ REMOTE_AXES,
+ REMOTE_BUTTONS,
+ REMOTE_KEYS,
+ G1_29_JointArmIndex,
+ G1_29_JointIndex,
+ default_remote_input,
+ get_gravity_orientation,
+)
+
+# ---------------------------------------------------------------------------
+# Unit tests for g1_utils (no SDK needed)
+# ---------------------------------------------------------------------------
+
+
+class TestG1Utils:
+ def test_num_motors(self):
+ assert NUM_MOTORS == 29
+
+ def test_joint_index_count(self):
+ assert len(G1_29_JointIndex) == 29
+
+ def test_joint_arm_index_count(self):
+ assert len(G1_29_JointArmIndex) == 14
+
+ def test_arm_indices_are_subset_of_full(self):
+ full_values = {j.value for j in G1_29_JointIndex}
+ arm_values = {j.value for j in G1_29_JointArmIndex}
+ assert arm_values.issubset(full_values)
+
+ def test_arm_indices_start_at_15(self):
+ assert min(j.value for j in G1_29_JointArmIndex) == 15
+ assert max(j.value for j in G1_29_JointArmIndex) == 28
+
+ def test_enum_naming_consistency(self):
+ """Verify all wrist joints use consistent PascalCase naming."""
+ wrist_joints = [j for j in G1_29_JointIndex if "Wrist" in j.name]
+ for j in wrist_joints:
+ # Should be "WristYaw", "WristPitch", "WristRoll" — no lowercase after "Wrist"
+ after_wrist = j.name.split("Wrist")[1]
+ assert after_wrist[0].isupper(), f"{j.name} has inconsistent casing after 'Wrist'"
+
+ def test_remote_keys_structure(self):
+ assert len(REMOTE_AXES) == 4
+ assert len(REMOTE_BUTTONS) == 16
+ assert len(REMOTE_KEYS) == 20
+ assert REMOTE_KEYS == REMOTE_AXES + REMOTE_BUTTONS
+
+ def test_default_remote_input(self):
+ d = default_remote_input()
+ assert len(d) == 20
+ assert all(v == 0.0 for v in d.values())
+ assert set(d.keys()) == set(REMOTE_KEYS)
+
+ def test_gravity_orientation_identity(self):
+ """Quaternion [1, 0, 0, 0] (no rotation) should give gravity along -z."""
+ g = get_gravity_orientation([1.0, 0.0, 0.0, 0.0])
+ assert g.shape == (3,)
+ assert g.dtype == np.float32
+ np.testing.assert_allclose(g, [0.0, 0.0, -1.0], atol=1e-6)
+
+ def test_gravity_orientation_dtype(self):
+ g = get_gravity_orientation(np.array([1.0, 0.0, 0.0, 0.0]))
+ assert g.dtype == np.float32
+
+
+# ---------------------------------------------------------------------------
+# Unit tests for UnitreeG1Config (no SDK needed)
+# ---------------------------------------------------------------------------
+
+
+class TestUnitreeG1Config:
+ def test_default_config(self):
+ cfg = UnitreeG1Config()
+ assert len(cfg.kp) == 29
+ assert len(cfg.kd) == 29
+ assert len(cfg.default_positions) == 29
+ assert cfg.is_simulation is True
+ assert cfg.controller is None
+ assert cfg.gravity_compensation is False
+
+ def test_gains_are_positive(self):
+ cfg = UnitreeG1Config()
+ assert all(v > 0 for v in cfg.kp)
+ assert all(v > 0 for v in cfg.kd)
+
+ def test_config_copies_gains(self):
+ """Each config instance should have its own copy of gains."""
+ cfg1 = UnitreeG1Config()
+ cfg2 = UnitreeG1Config()
+ cfg1.kp[0] = 999.0
+ assert cfg2.kp[0] != 999.0
+
+
+# ---------------------------------------------------------------------------
+# Robot mock and integration tests
+# ---------------------------------------------------------------------------
+
+
+def _make_lowstate_msg_mock():
+ """Create a mock that mimics the SDK LowState_ message."""
+ msg = MagicMock()
+ for i in range(29):
+ motor = MagicMock()
+ motor.q = float(i) * 0.1
+ motor.dq = float(i) * 0.01
+ motor.tau_est = float(i) * 0.001
+ motor.temperature = 30.0 + i
+ msg.motor_state.__getitem__ = lambda self, idx, _motors={}: _motors.setdefault(
+ idx, MagicMock(q=idx * 0.1, dq=idx * 0.01, tau_est=idx * 0.001, temperature=30.0 + idx)
+ )
+
+ msg.imu_state.quaternion = [1.0, 0.0, 0.0, 0.0]
+ msg.imu_state.gyroscope = [0.1, 0.2, 0.3]
+ msg.imu_state.accelerometer = [0.0, 0.0, 9.81]
+ msg.imu_state.rpy = [0.0, 0.0, 0.0]
+ msg.imu_state.temperature = 25.0
+ msg.wireless_remote = b"\x00" * 40
+ msg.mode_machine = 0
+ return msg
+
+
+def _make_sdk_mocks():
+ """Create mocks for the Unitree SDK modules used by UnitreeG1."""
+ lowcmd_default = MagicMock()
+ lowcmd_default.mode_pr = 0
+ lowcmd_default.motor_cmd = [MagicMock() for _ in range(35)]
+
+ crc_mock = MagicMock()
+ crc_mock.Crc.return_value = 0
+
+ lowstate_msg = _make_lowstate_msg_mock()
+
+ subscriber_mock = MagicMock()
+ subscriber_mock.Read.return_value = lowstate_msg
+
+ publisher_mock = MagicMock()
+
+ return {
+ "lowcmd_default": lowcmd_default,
+ "crc_mock": crc_mock,
+ "subscriber_mock": subscriber_mock,
+ "publisher_mock": publisher_mock,
+ "lowstate_msg": lowstate_msg,
+ }
+
+
+@pytest.fixture
+def unitree_g1():
+ """Create a UnitreeG1 robot with all SDK dependencies mocked."""
+ mocks = _make_sdk_mocks()
+
+ mock_channel_init = MagicMock()
+ mock_channel_pub = MagicMock(return_value=mocks["publisher_mock"])
+ mock_channel_sub = MagicMock(return_value=mocks["subscriber_mock"])
+
+ with (
+ patch(
+ "lerobot.robots.unitree_g1.unitree_g1.make_cameras_from_configs",
+ return_value={},
+ ),
+ patch(
+ "lerobot.robots.unitree_g1.unitree_g1.G1_29_ArmIK",
+ return_value=MagicMock(),
+ ),
+ patch(
+ "lerobot.robots.unitree_g1.unitree_g1._SDKChannelFactoryInitialize",
+ mock_channel_init,
+ ),
+ patch(
+ "lerobot.robots.unitree_g1.unitree_g1._SDKChannelPublisher",
+ mock_channel_pub,
+ ),
+ patch(
+ "lerobot.robots.unitree_g1.unitree_g1._SDKChannelSubscriber",
+ mock_channel_sub,
+ ),
+ patch(
+ "lerobot.robots.unitree_g1.unitree_g1.unitree_hg_msg_dds__LowCmd_",
+ MagicMock(return_value=mocks["lowcmd_default"]),
+ ),
+ patch(
+ "lerobot.robots.unitree_g1.unitree_g1.hg_LowCmd",
+ MagicMock,
+ ),
+ patch(
+ "lerobot.robots.unitree_g1.unitree_g1.hg_LowState",
+ MagicMock,
+ ),
+ patch(
+ "lerobot.robots.unitree_g1.unitree_g1.CRC",
+ MagicMock(return_value=mocks["crc_mock"]),
+ ),
+ ):
+ from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
+
+ cfg = UnitreeG1Config(is_simulation=True, gravity_compensation=False)
+ robot = UnitreeG1(cfg)
+ yield robot, mocks
+ if robot.is_connected:
+ robot.disconnect()
+
+
+def test_init_state(unitree_g1):
+ robot, _ = unitree_g1
+ assert not robot.is_connected
+ assert robot.controller is None
+
+
+def test_observation_features(unitree_g1):
+ robot, _ = unitree_g1
+ features = robot.observation_features
+ # Should have .q for all 29 joints (no cameras configured)
+ assert len(features) == 29
+ for joint in G1_29_JointIndex:
+ assert f"{joint.name}.q" in features
+
+
+def test_action_features_no_controller(unitree_g1):
+ robot, _ = unitree_g1
+ features = robot.action_features
+ # Without controller: all 29 joints
+ assert len(features) == 29
+ for joint in G1_29_JointIndex:
+ assert f"{joint.name}.q" in features
+
+
+def test_get_observation_before_connect(unitree_g1):
+ robot, _ = unitree_g1
+ obs = robot.get_observation()
+ assert obs == {}
+
+
+def test_disconnect_idempotent(unitree_g1):
+ robot, _ = unitree_g1
+ # Should not raise even when not connected
+ robot.disconnect()
diff --git a/tests/teleoperators/test_unitree_g1_teleoperator.py b/tests/teleoperators/test_unitree_g1_teleoperator.py
new file mode 100644
index 000000000..52f4a8482
--- /dev/null
+++ b/tests/teleoperators/test_unitree_g1_teleoperator.py
@@ -0,0 +1,309 @@
+#!/usr/bin/env python
+
+# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for Unitree G1 teleoperator. Meant to be run in an environment where the Unitree SDK is installed."""
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from lerobot.utils.import_utils import _unitree_sdk_available
+
+if not _unitree_sdk_available:
+ pytest.skip("Unitree SDK not available", allow_module_level=True)
+
+from lerobot.robots.unitree_g1.g1_utils import REMOTE_AXES
+from lerobot.teleoperators.unitree_g1.config_unitree_g1 import (
+ ExoskeletonArmPortConfig,
+ UnitreeG1TeleoperatorConfig,
+)
+from lerobot.teleoperators.unitree_g1.unitree_g1 import RemoteController, UnitreeG1Teleoperator
+
+# ---------------------------------------------------------------------------
+# Tests for RemoteController
+# ---------------------------------------------------------------------------
+
+
+def _make_joystick_mock():
+ """Create a mock Joystick class matching the SDK interface."""
+ joystick = MagicMock()
+ # Axes are Axis objects with .data attribute
+ joystick.lx = MagicMock(data=0.0, smooth=0.03, deadzone=0.01)
+ joystick.ly = MagicMock(data=0.0, smooth=0.03, deadzone=0.01)
+ joystick.rx = MagicMock(data=0.0, smooth=0.03, deadzone=0.01)
+ joystick.ry = MagicMock(data=0.0, smooth=0.03, deadzone=0.01)
+ # Buttons are Button objects with .data attribute
+ for name in ["RB", "LB", "start", "back", "RT", "LT", "A", "B", "X", "Y", "up", "right", "down", "left"]:
+ setattr(joystick, name, MagicMock(data=0))
+ return joystick
+
+
+@pytest.fixture
+def remote_controller():
+ """Create a RemoteController with a mocked Joystick."""
+ mock_joystick = _make_joystick_mock()
+
+ rc = RemoteController()
+ rc._joystick = mock_joystick
+ yield rc, mock_joystick
+
+
+def test_remote_controller_init(remote_controller):
+ rc, _ = remote_controller
+ assert rc.lx == 0.0
+ assert rc.ly == 0.0
+ assert rc.rx == 0.0
+ assert rc.ry == 0.0
+ assert len(rc.button) == 16
+ assert all(b == 0 for b in rc.button)
+
+
+def test_sync_remote_action(remote_controller):
+ rc, _ = remote_controller
+ rc.lx = 0.5
+ rc.ly = -0.3
+ rc.rx = 0.1
+ rc.ry = 0.0
+ rc._sync_remote_action()
+
+ assert rc.remote_action["remote.lx"] == 0.5
+ assert rc.remote_action["remote.ly"] == -0.3
+ assert rc.remote_action["remote.rx"] == 0.1
+ assert rc.remote_action["remote.ry"] == 0.0
+
+
+def test_set_from_wireless_calls_extract(remote_controller):
+ rc, mock_joystick = remote_controller
+ # Set up the mock to populate data after extract
+ mock_joystick.lx.data = 0.5
+ mock_joystick.ly.data = -0.3
+ mock_joystick.rx.data = 0.1
+ mock_joystick.ry.data = 0.0
+
+ wireless_data = b"\x00" * 40
+ rc.set_from_wireless(wireless_data)
+
+ mock_joystick.extract.assert_called_once_with(wireless_data)
+ assert rc.lx == 0.5
+ assert rc.ly == -0.3
+
+
+def test_set_from_wireless_short_data(remote_controller):
+ rc, mock_joystick = remote_controller
+ rc.set_from_wireless(b"\x00" * 10) # Too short
+ mock_joystick.extract.assert_not_called()
+
+
+def test_set_from_wireless_buttons(remote_controller):
+ rc, mock_joystick = remote_controller
+ # Simulate RB pressed
+ mock_joystick.RB.data = 1
+ mock_joystick.lx.data = 0.0
+ mock_joystick.ly.data = 0.0
+ mock_joystick.rx.data = 0.0
+ mock_joystick.ry.data = 0.0
+
+ rc.set_from_wireless(b"\x00" * 40)
+ assert rc.button[0] == 1 # RB maps to button[0]
+
+
+def test_set_from_exo_left(remote_controller):
+ rc, _ = remote_controller
+ rc.use_left_exo_joystick = True
+ rc.left_center_x = 2048
+ rc.left_center_y = 2048
+
+ raw16 = [0] * 16
+ raw16[11] = 3048 # X axis: (3048 - 2048) / 2047.5 ≈ 0.488
+ raw16[13] = 1048 # Y axis: (1048 - 2048) / 2047.5 ≈ -0.488
+ raw16[12] = 0 # Button pressed (below ADC_HALF)
+
+ rc.set_from_exo(raw16, "left")
+ assert rc.lx == pytest.approx((3048 - 2048) / 2047.5, abs=1e-3)
+ assert rc.ly == pytest.approx((1048 - 2048) / 2047.5, abs=1e-3)
+ assert rc.button[4] == 1 # Left button maps to button[4]
+
+
+def test_set_from_exo_clears_button(remote_controller):
+ rc, _ = remote_controller
+ rc.use_left_exo_joystick = True
+ rc.button[4] = 1 # Pre-set
+
+ raw16 = [0] * 16
+ raw16[12] = 4000 # Button NOT pressed (above ADC_HALF)
+
+ rc.set_from_exo(raw16, "left")
+ assert rc.button[4] == 0 # Should be cleared
+
+
+def test_set_from_exo_ignored_when_not_enabled(remote_controller):
+ rc, _ = remote_controller
+ rc.use_left_exo_joystick = False
+ raw16 = [0] * 16
+ raw16[11] = 3000
+
+ rc.set_from_exo(raw16, "left")
+ assert rc.lx == 0.0 # Unchanged
+
+
+# ---------------------------------------------------------------------------
+# Tests for UnitreeG1TeleoperatorConfig (no SDK needed)
+# ---------------------------------------------------------------------------
+
+
+class TestTeleoperatorConfig:
+ def test_default_config(self):
+ cfg = UnitreeG1TeleoperatorConfig()
+ assert cfg.left_arm_config.port == ""
+ assert cfg.right_arm_config.port == ""
+ assert cfg.frozen_joints == ""
+
+ def test_config_with_ports(self):
+ cfg = UnitreeG1TeleoperatorConfig(
+ left_arm_config=ExoskeletonArmPortConfig(port="/dev/ttyACM0"),
+ right_arm_config=ExoskeletonArmPortConfig(port="/dev/ttyACM1"),
+ )
+ assert cfg.left_arm_config.port == "/dev/ttyACM0"
+ assert cfg.right_arm_config.port == "/dev/ttyACM1"
+
+
+# ---------------------------------------------------------------------------
+# Tests for UnitreeG1Teleoperator
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture
+def teleop_remote_only():
+ """Create a UnitreeG1Teleoperator in remote-only mode (no exo arms)."""
+ cfg = UnitreeG1TeleoperatorConfig() # No ports = remote-only mode
+ teleop = UnitreeG1Teleoperator(cfg)
+ yield teleop
+
+
+def test_remote_only_connect(teleop_remote_only):
+ """Remote-only mode should connect immediately without serial ports."""
+ teleop = teleop_remote_only
+ teleop.connect()
+ assert teleop.is_connected
+ assert not teleop._arm_control_enabled
+
+
+def test_remote_only_action_features(teleop_remote_only):
+ teleop = teleop_remote_only
+ features = teleop.action_features
+ # Remote-only: just the 4 remote axes
+ assert set(features.keys()) == set(REMOTE_AXES)
+
+
+def test_feedback_features(teleop_remote_only):
+ teleop = teleop_remote_only
+ features = teleop.feedback_features
+ assert "wireless_remote" in features
+ assert features["wireless_remote"] is bytes
+
+
+def test_remote_only_get_action(teleop_remote_only):
+ teleop = teleop_remote_only
+ teleop.connect()
+ action = teleop.get_action()
+ assert set(action.keys()) == set(REMOTE_AXES)
+ assert all(isinstance(v, float) for v in action.values())
+
+
+def test_send_feedback(teleop_remote_only):
+ teleop = teleop_remote_only
+ teleop.connect()
+ # Should not raise
+ teleop.send_feedback({"wireless_remote": b"\x00" * 40})
+
+
+def test_send_feedback_missing_key(teleop_remote_only):
+ teleop = teleop_remote_only
+ teleop.connect()
+ # Should not raise even with missing key
+ teleop.send_feedback({"other_key": 42})
+
+
+def test_asymmetric_exo_ports_raises():
+ """Configuring only one exo port should raise ValueError."""
+ cfg = UnitreeG1TeleoperatorConfig(
+ left_arm_config=ExoskeletonArmPortConfig(port="/dev/ttyACM0"),
+ # right_arm_config left empty
+ )
+ with pytest.raises(ValueError, match="set both left/right"):
+ UnitreeG1Teleoperator(cfg)
+
+
+# ---------------------------------------------------------------------------
+# Tests for ExoskeletonArm (needs serial mock)
+# ---------------------------------------------------------------------------
+
+
+class TestExoskeletonArm:
+ def test_parse_raw16_valid(self):
+ from lerobot.teleoperators.unitree_g1.exo_serial import parse_raw16
+
+ line = b"100 200 300 400 500 600 700 800 900 1000 1100 1200 1300 1400 1500 1600\n"
+ result = parse_raw16(line)
+ assert result is not None
+ assert len(result) == 16
+ assert result[0] == 100
+ assert result[15] == 1600
+
+ def test_parse_raw16_too_short(self):
+ from lerobot.teleoperators.unitree_g1.exo_serial import parse_raw16
+
+ line = b"100 200 300\n"
+ assert parse_raw16(line) is None
+
+ def test_parse_raw16_garbage(self):
+ from lerobot.teleoperators.unitree_g1.exo_serial import parse_raw16
+
+ assert parse_raw16(b"not numbers at all\n") is None
+ assert parse_raw16(b"\xff\xfe\xfd\n") is None
+ assert parse_raw16(b"") is None
+
+ def test_calibrate_requires_connection(self):
+ from lerobot.teleoperators.unitree_g1.exo_serial import ExoskeletonArm
+
+ arm = ExoskeletonArm(
+ port="/dev/null",
+ calibration_fpath=MagicMock(is_file=MagicMock(return_value=False)),
+ side="left",
+ )
+ with pytest.raises(RuntimeError, match="not connected"):
+ arm.calibrate()
+
+ def test_is_connected_false_by_default(self):
+ from lerobot.teleoperators.unitree_g1.exo_serial import ExoskeletonArm
+
+ arm = ExoskeletonArm(
+ port="/dev/null",
+ calibration_fpath=MagicMock(is_file=MagicMock(return_value=False)),
+ side="left",
+ )
+ assert not arm.is_connected
+ assert not arm.is_calibrated
+
+ def test_read_raw_when_disconnected(self):
+ from lerobot.teleoperators.unitree_g1.exo_serial import ExoskeletonArm
+
+ arm = ExoskeletonArm(
+ port="/dev/null",
+ calibration_fpath=MagicMock(is_file=MagicMock(return_value=False)),
+ side="left",
+ )
+ assert arm.read_raw() is None