feat(robots): Unitree G1 WBC implementation (#2876)

* move locomotion from examples to robot, move controller to teleoperator class

* modify teleoperate to send back actions to robot

* whole body controller

* add holosoma to locomotros

* various updates

* update joint zeroing etc

* ensure safefail with locomotion

* add unitree locomotion

* launch camera from g1 server

* publish at varying framerates

* fix async read in camera

* attempting to fix camera lag

* test camera speedup

* training

* inference works

* remove logging from pi0

* remove logging

* push local changes

* testing

* final changes

* revert control_utils

* revert utils

* revert

* revert g1

* revert again:

* revert utils

* push recents

* remove examples

* remove junk

* remove mjlog

* revergt edit_dataset

* Update lerobot_edit_dataset.py

Signed-off-by: Martino Russi <77496684+nepyope@users.noreply.github.com>

* undo teleop changes

* revert logging

* remove loggings

* remove loogs

* revert dataset tools

* Update dataset_tools.py

Signed-off-by: Martino Russi <77496684+nepyope@users.noreply.github.com>

* move gravity to utils

* revert changes

* remove matplotlib viewer (rerun works fine)

* factory revert

* send policy action directly

* recent changes

* implement flexible action space

* send empty command if arms are missing

* rename locomotion to controller

* add init

* implement feedback

* add feedback for teleoperator

* fix ruff

* fix ruff

* use read_latest

* fix zmq camera

* revert exo_serial

* simplify PR

* revert exo_changes

* revert camera_zmq

* Update camera_zmq.py

Signed-off-by: Martino Russi <77496684+nepyope@users.noreply.github.com>

* remove frame duplication from zmq server

* revert channerfactoryinitialize

* keep channelfactoryinitialize

* remove zeroing out logic

* fix typo

* refactor teleop class

* simplify teleop further

* import armindex at the top

* fix visualizer again

* revert ik helper

* push stuff

* simplify image_server

* update image_server

* asd

* add threading logic

* simplify ik helper stuff

* simplify holosoma

* fix names

* fix docs

* revert leg override

* clean connect

* fix controller

* fix ruff

* clean teleoperator

* set_from_wireless

* avoid double initializations

* refactor robot class

* fix pre-commit

* update docs

* update docs format

* add teleop instructions

* unitree_g1 specific exception in record/teleoperate

* add thumbnail to docs

* add thumbnail to doc

* refactor(unitree): multiple improvements (#3103)

* refactor(unitree): multiple improvements

* test(unitree): added tests + improved installation instructions

* refactor(robots): minor changes unitree robot kinematic

* chore(robots): rename g1 kinematics file

---------

Signed-off-by: Martino Russi <77496684+nepyope@users.noreply.github.com>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Steven Palma <steven.palma@huggingface.co>
This commit is contained in:
Martino Russi
2026-03-08 11:33:24 +01:00
committed by GitHub
parent 6139b133ca
commit 4f2ef024d8
24 changed files with 1504 additions and 637 deletions
+267
View File
@@ -0,0 +1,267 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for Unitree G1 robot. Meant to be run in an environment where the Unitree SDK is installed."""
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
from lerobot.utils.import_utils import _unitree_sdk_available
if not _unitree_sdk_available:
pytest.skip("Unitree SDK not available", allow_module_level=True)
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
from lerobot.robots.unitree_g1.g1_utils import (
NUM_MOTORS,
REMOTE_AXES,
REMOTE_BUTTONS,
REMOTE_KEYS,
G1_29_JointArmIndex,
G1_29_JointIndex,
default_remote_input,
get_gravity_orientation,
)
# ---------------------------------------------------------------------------
# Unit tests for g1_utils (no SDK needed)
# ---------------------------------------------------------------------------
class TestG1Utils:
def test_num_motors(self):
assert NUM_MOTORS == 29
def test_joint_index_count(self):
assert len(G1_29_JointIndex) == 29
def test_joint_arm_index_count(self):
assert len(G1_29_JointArmIndex) == 14
def test_arm_indices_are_subset_of_full(self):
full_values = {j.value for j in G1_29_JointIndex}
arm_values = {j.value for j in G1_29_JointArmIndex}
assert arm_values.issubset(full_values)
def test_arm_indices_start_at_15(self):
assert min(j.value for j in G1_29_JointArmIndex) == 15
assert max(j.value for j in G1_29_JointArmIndex) == 28
def test_enum_naming_consistency(self):
"""Verify all wrist joints use consistent PascalCase naming."""
wrist_joints = [j for j in G1_29_JointIndex if "Wrist" in j.name]
for j in wrist_joints:
# Should be "WristYaw", "WristPitch", "WristRoll" — no lowercase after "Wrist"
after_wrist = j.name.split("Wrist")[1]
assert after_wrist[0].isupper(), f"{j.name} has inconsistent casing after 'Wrist'"
def test_remote_keys_structure(self):
assert len(REMOTE_AXES) == 4
assert len(REMOTE_BUTTONS) == 16
assert len(REMOTE_KEYS) == 20
assert REMOTE_KEYS == REMOTE_AXES + REMOTE_BUTTONS
def test_default_remote_input(self):
d = default_remote_input()
assert len(d) == 20
assert all(v == 0.0 for v in d.values())
assert set(d.keys()) == set(REMOTE_KEYS)
def test_gravity_orientation_identity(self):
"""Quaternion [1, 0, 0, 0] (no rotation) should give gravity along -z."""
g = get_gravity_orientation([1.0, 0.0, 0.0, 0.0])
assert g.shape == (3,)
assert g.dtype == np.float32
np.testing.assert_allclose(g, [0.0, 0.0, -1.0], atol=1e-6)
def test_gravity_orientation_dtype(self):
g = get_gravity_orientation(np.array([1.0, 0.0, 0.0, 0.0]))
assert g.dtype == np.float32
# ---------------------------------------------------------------------------
# Unit tests for UnitreeG1Config (no SDK needed)
# ---------------------------------------------------------------------------
class TestUnitreeG1Config:
def test_default_config(self):
cfg = UnitreeG1Config()
assert len(cfg.kp) == 29
assert len(cfg.kd) == 29
assert len(cfg.default_positions) == 29
assert cfg.is_simulation is True
assert cfg.controller is None
assert cfg.gravity_compensation is False
def test_gains_are_positive(self):
cfg = UnitreeG1Config()
assert all(v > 0 for v in cfg.kp)
assert all(v > 0 for v in cfg.kd)
def test_config_copies_gains(self):
"""Each config instance should have its own copy of gains."""
cfg1 = UnitreeG1Config()
cfg2 = UnitreeG1Config()
cfg1.kp[0] = 999.0
assert cfg2.kp[0] != 999.0
# ---------------------------------------------------------------------------
# Robot mock and integration tests
# ---------------------------------------------------------------------------
def _make_lowstate_msg_mock():
"""Create a mock that mimics the SDK LowState_ message."""
msg = MagicMock()
for i in range(29):
motor = MagicMock()
motor.q = float(i) * 0.1
motor.dq = float(i) * 0.01
motor.tau_est = float(i) * 0.001
motor.temperature = 30.0 + i
msg.motor_state.__getitem__ = lambda self, idx, _motors={}: _motors.setdefault(
idx, MagicMock(q=idx * 0.1, dq=idx * 0.01, tau_est=idx * 0.001, temperature=30.0 + idx)
)
msg.imu_state.quaternion = [1.0, 0.0, 0.0, 0.0]
msg.imu_state.gyroscope = [0.1, 0.2, 0.3]
msg.imu_state.accelerometer = [0.0, 0.0, 9.81]
msg.imu_state.rpy = [0.0, 0.0, 0.0]
msg.imu_state.temperature = 25.0
msg.wireless_remote = b"\x00" * 40
msg.mode_machine = 0
return msg
def _make_sdk_mocks():
"""Create mocks for the Unitree SDK modules used by UnitreeG1."""
lowcmd_default = MagicMock()
lowcmd_default.mode_pr = 0
lowcmd_default.motor_cmd = [MagicMock() for _ in range(35)]
crc_mock = MagicMock()
crc_mock.Crc.return_value = 0
lowstate_msg = _make_lowstate_msg_mock()
subscriber_mock = MagicMock()
subscriber_mock.Read.return_value = lowstate_msg
publisher_mock = MagicMock()
return {
"lowcmd_default": lowcmd_default,
"crc_mock": crc_mock,
"subscriber_mock": subscriber_mock,
"publisher_mock": publisher_mock,
"lowstate_msg": lowstate_msg,
}
@pytest.fixture
def unitree_g1():
"""Create a UnitreeG1 robot with all SDK dependencies mocked."""
mocks = _make_sdk_mocks()
mock_channel_init = MagicMock()
mock_channel_pub = MagicMock(return_value=mocks["publisher_mock"])
mock_channel_sub = MagicMock(return_value=mocks["subscriber_mock"])
with (
patch(
"lerobot.robots.unitree_g1.unitree_g1.make_cameras_from_configs",
return_value={},
),
patch(
"lerobot.robots.unitree_g1.unitree_g1.G1_29_ArmIK",
return_value=MagicMock(),
),
patch(
"lerobot.robots.unitree_g1.unitree_g1._SDKChannelFactoryInitialize",
mock_channel_init,
),
patch(
"lerobot.robots.unitree_g1.unitree_g1._SDKChannelPublisher",
mock_channel_pub,
),
patch(
"lerobot.robots.unitree_g1.unitree_g1._SDKChannelSubscriber",
mock_channel_sub,
),
patch(
"lerobot.robots.unitree_g1.unitree_g1.unitree_hg_msg_dds__LowCmd_",
MagicMock(return_value=mocks["lowcmd_default"]),
),
patch(
"lerobot.robots.unitree_g1.unitree_g1.hg_LowCmd",
MagicMock,
),
patch(
"lerobot.robots.unitree_g1.unitree_g1.hg_LowState",
MagicMock,
),
patch(
"lerobot.robots.unitree_g1.unitree_g1.CRC",
MagicMock(return_value=mocks["crc_mock"]),
),
):
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
cfg = UnitreeG1Config(is_simulation=True, gravity_compensation=False)
robot = UnitreeG1(cfg)
yield robot, mocks
if robot.is_connected:
robot.disconnect()
def test_init_state(unitree_g1):
robot, _ = unitree_g1
assert not robot.is_connected
assert robot.controller is None
def test_observation_features(unitree_g1):
robot, _ = unitree_g1
features = robot.observation_features
# Should have .q for all 29 joints (no cameras configured)
assert len(features) == 29
for joint in G1_29_JointIndex:
assert f"{joint.name}.q" in features
def test_action_features_no_controller(unitree_g1):
robot, _ = unitree_g1
features = robot.action_features
# Without controller: all 29 joints
assert len(features) == 29
for joint in G1_29_JointIndex:
assert f"{joint.name}.q" in features
def test_get_observation_before_connect(unitree_g1):
robot, _ = unitree_g1
obs = robot.get_observation()
assert obs == {}
def test_disconnect_idempotent(unitree_g1):
robot, _ = unitree_g1
# Should not raise even when not connected
robot.disconnect()
@@ -0,0 +1,309 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for Unitree G1 teleoperator. Meant to be run in an environment where the Unitree SDK is installed."""
from unittest.mock import MagicMock
import pytest
from lerobot.utils.import_utils import _unitree_sdk_available
if not _unitree_sdk_available:
pytest.skip("Unitree SDK not available", allow_module_level=True)
from lerobot.robots.unitree_g1.g1_utils import REMOTE_AXES
from lerobot.teleoperators.unitree_g1.config_unitree_g1 import (
ExoskeletonArmPortConfig,
UnitreeG1TeleoperatorConfig,
)
from lerobot.teleoperators.unitree_g1.unitree_g1 import RemoteController, UnitreeG1Teleoperator
# ---------------------------------------------------------------------------
# Tests for RemoteController
# ---------------------------------------------------------------------------
def _make_joystick_mock():
"""Create a mock Joystick class matching the SDK interface."""
joystick = MagicMock()
# Axes are Axis objects with .data attribute
joystick.lx = MagicMock(data=0.0, smooth=0.03, deadzone=0.01)
joystick.ly = MagicMock(data=0.0, smooth=0.03, deadzone=0.01)
joystick.rx = MagicMock(data=0.0, smooth=0.03, deadzone=0.01)
joystick.ry = MagicMock(data=0.0, smooth=0.03, deadzone=0.01)
# Buttons are Button objects with .data attribute
for name in ["RB", "LB", "start", "back", "RT", "LT", "A", "B", "X", "Y", "up", "right", "down", "left"]:
setattr(joystick, name, MagicMock(data=0))
return joystick
@pytest.fixture
def remote_controller():
"""Create a RemoteController with a mocked Joystick."""
mock_joystick = _make_joystick_mock()
rc = RemoteController()
rc._joystick = mock_joystick
yield rc, mock_joystick
def test_remote_controller_init(remote_controller):
rc, _ = remote_controller
assert rc.lx == 0.0
assert rc.ly == 0.0
assert rc.rx == 0.0
assert rc.ry == 0.0
assert len(rc.button) == 16
assert all(b == 0 for b in rc.button)
def test_sync_remote_action(remote_controller):
rc, _ = remote_controller
rc.lx = 0.5
rc.ly = -0.3
rc.rx = 0.1
rc.ry = 0.0
rc._sync_remote_action()
assert rc.remote_action["remote.lx"] == 0.5
assert rc.remote_action["remote.ly"] == -0.3
assert rc.remote_action["remote.rx"] == 0.1
assert rc.remote_action["remote.ry"] == 0.0
def test_set_from_wireless_calls_extract(remote_controller):
rc, mock_joystick = remote_controller
# Set up the mock to populate data after extract
mock_joystick.lx.data = 0.5
mock_joystick.ly.data = -0.3
mock_joystick.rx.data = 0.1
mock_joystick.ry.data = 0.0
wireless_data = b"\x00" * 40
rc.set_from_wireless(wireless_data)
mock_joystick.extract.assert_called_once_with(wireless_data)
assert rc.lx == 0.5
assert rc.ly == -0.3
def test_set_from_wireless_short_data(remote_controller):
rc, mock_joystick = remote_controller
rc.set_from_wireless(b"\x00" * 10) # Too short
mock_joystick.extract.assert_not_called()
def test_set_from_wireless_buttons(remote_controller):
rc, mock_joystick = remote_controller
# Simulate RB pressed
mock_joystick.RB.data = 1
mock_joystick.lx.data = 0.0
mock_joystick.ly.data = 0.0
mock_joystick.rx.data = 0.0
mock_joystick.ry.data = 0.0
rc.set_from_wireless(b"\x00" * 40)
assert rc.button[0] == 1 # RB maps to button[0]
def test_set_from_exo_left(remote_controller):
rc, _ = remote_controller
rc.use_left_exo_joystick = True
rc.left_center_x = 2048
rc.left_center_y = 2048
raw16 = [0] * 16
raw16[11] = 3048 # X axis: (3048 - 2048) / 2047.5 ≈ 0.488
raw16[13] = 1048 # Y axis: (1048 - 2048) / 2047.5 ≈ -0.488
raw16[12] = 0 # Button pressed (below ADC_HALF)
rc.set_from_exo(raw16, "left")
assert rc.lx == pytest.approx((3048 - 2048) / 2047.5, abs=1e-3)
assert rc.ly == pytest.approx((1048 - 2048) / 2047.5, abs=1e-3)
assert rc.button[4] == 1 # Left button maps to button[4]
def test_set_from_exo_clears_button(remote_controller):
rc, _ = remote_controller
rc.use_left_exo_joystick = True
rc.button[4] = 1 # Pre-set
raw16 = [0] * 16
raw16[12] = 4000 # Button NOT pressed (above ADC_HALF)
rc.set_from_exo(raw16, "left")
assert rc.button[4] == 0 # Should be cleared
def test_set_from_exo_ignored_when_not_enabled(remote_controller):
rc, _ = remote_controller
rc.use_left_exo_joystick = False
raw16 = [0] * 16
raw16[11] = 3000
rc.set_from_exo(raw16, "left")
assert rc.lx == 0.0 # Unchanged
# ---------------------------------------------------------------------------
# Tests for UnitreeG1TeleoperatorConfig (no SDK needed)
# ---------------------------------------------------------------------------
class TestTeleoperatorConfig:
def test_default_config(self):
cfg = UnitreeG1TeleoperatorConfig()
assert cfg.left_arm_config.port == ""
assert cfg.right_arm_config.port == ""
assert cfg.frozen_joints == ""
def test_config_with_ports(self):
cfg = UnitreeG1TeleoperatorConfig(
left_arm_config=ExoskeletonArmPortConfig(port="/dev/ttyACM0"),
right_arm_config=ExoskeletonArmPortConfig(port="/dev/ttyACM1"),
)
assert cfg.left_arm_config.port == "/dev/ttyACM0"
assert cfg.right_arm_config.port == "/dev/ttyACM1"
# ---------------------------------------------------------------------------
# Tests for UnitreeG1Teleoperator
# ---------------------------------------------------------------------------
@pytest.fixture
def teleop_remote_only():
"""Create a UnitreeG1Teleoperator in remote-only mode (no exo arms)."""
cfg = UnitreeG1TeleoperatorConfig() # No ports = remote-only mode
teleop = UnitreeG1Teleoperator(cfg)
yield teleop
def test_remote_only_connect(teleop_remote_only):
"""Remote-only mode should connect immediately without serial ports."""
teleop = teleop_remote_only
teleop.connect()
assert teleop.is_connected
assert not teleop._arm_control_enabled
def test_remote_only_action_features(teleop_remote_only):
teleop = teleop_remote_only
features = teleop.action_features
# Remote-only: just the 4 remote axes
assert set(features.keys()) == set(REMOTE_AXES)
def test_feedback_features(teleop_remote_only):
teleop = teleop_remote_only
features = teleop.feedback_features
assert "wireless_remote" in features
assert features["wireless_remote"] is bytes
def test_remote_only_get_action(teleop_remote_only):
teleop = teleop_remote_only
teleop.connect()
action = teleop.get_action()
assert set(action.keys()) == set(REMOTE_AXES)
assert all(isinstance(v, float) for v in action.values())
def test_send_feedback(teleop_remote_only):
teleop = teleop_remote_only
teleop.connect()
# Should not raise
teleop.send_feedback({"wireless_remote": b"\x00" * 40})
def test_send_feedback_missing_key(teleop_remote_only):
teleop = teleop_remote_only
teleop.connect()
# Should not raise even with missing key
teleop.send_feedback({"other_key": 42})
def test_asymmetric_exo_ports_raises():
"""Configuring only one exo port should raise ValueError."""
cfg = UnitreeG1TeleoperatorConfig(
left_arm_config=ExoskeletonArmPortConfig(port="/dev/ttyACM0"),
# right_arm_config left empty
)
with pytest.raises(ValueError, match="set both left/right"):
UnitreeG1Teleoperator(cfg)
# ---------------------------------------------------------------------------
# Tests for ExoskeletonArm (needs serial mock)
# ---------------------------------------------------------------------------
class TestExoskeletonArm:
def test_parse_raw16_valid(self):
from lerobot.teleoperators.unitree_g1.exo_serial import parse_raw16
line = b"100 200 300 400 500 600 700 800 900 1000 1100 1200 1300 1400 1500 1600\n"
result = parse_raw16(line)
assert result is not None
assert len(result) == 16
assert result[0] == 100
assert result[15] == 1600
def test_parse_raw16_too_short(self):
from lerobot.teleoperators.unitree_g1.exo_serial import parse_raw16
line = b"100 200 300\n"
assert parse_raw16(line) is None
def test_parse_raw16_garbage(self):
from lerobot.teleoperators.unitree_g1.exo_serial import parse_raw16
assert parse_raw16(b"not numbers at all\n") is None
assert parse_raw16(b"\xff\xfe\xfd\n") is None
assert parse_raw16(b"") is None
def test_calibrate_requires_connection(self):
from lerobot.teleoperators.unitree_g1.exo_serial import ExoskeletonArm
arm = ExoskeletonArm(
port="/dev/null",
calibration_fpath=MagicMock(is_file=MagicMock(return_value=False)),
side="left",
)
with pytest.raises(RuntimeError, match="not connected"):
arm.calibrate()
def test_is_connected_false_by_default(self):
from lerobot.teleoperators.unitree_g1.exo_serial import ExoskeletonArm
arm = ExoskeletonArm(
port="/dev/null",
calibration_fpath=MagicMock(is_file=MagicMock(return_value=False)),
side="left",
)
assert not arm.is_connected
assert not arm.is_calibrated
def test_read_raw_when_disconnected(self):
from lerobot.teleoperators.unitree_g1.exo_serial import ExoskeletonArm
arm = ExoskeletonArm(
port="/dev/null",
calibration_fpath=MagicMock(is_file=MagicMock(return_value=False)),
side="left",
)
assert arm.read_raw() is None