add g1 teleoperation (#2791)

* add gravity compensation

* add g1 teleoperation

---------

Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co>
This commit is contained in:
Martino Russi
2026-01-28 15:17:38 +01:00
committed by GitHub
parent bf337e716d
commit 149628dfd5
16 changed files with 1581 additions and 5 deletions
+99 -1
View File
@@ -188,7 +188,105 @@ Press `Ctrl+C` to stop the policy.
## Running in Simulation Mode (MuJoCo) ## Running in Simulation Mode (MuJoCo)
You can now test policies before unleashing them on the physical robot using MuJoCo. To do so simply set `is_simulation=True` in config. You can test policies before deploying on the physical robot using MuJoCo simulation. Set `is_simulation=True` in config or pass `--robot.is_simulation=true` via CLI.
### Calibrate Exoskeleton Teleoperator
```bash
lerobot-calibrate \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo
```
### Teleoperate in Simulation
```bash
lerobot-teleoperate \
--robot.type=unitree_g1 \
--robot.is_simulation=true \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo \
--fps=100
```
### Record Dataset in Simulation
```bash
python -m lerobot.scripts.lerobot_record \
--robot.type=unitree_g1 \
--robot.is_simulation=true \
--robot.cameras='{"global_view": {"type": "zmq", "server_address": "localhost", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo \
--dataset.repo_id=your-username/dataset-name \
--dataset.single_task="Test" \
--dataset.num_episodes=2 \
--dataset.episode_time_s=5 \
--dataset.reset_time_s=5 \
--dataset.push_to_hub=true
```
Example simulation dataset: [nepyope/teleop_test_sim](https://huggingface.co/datasets/nepyope/teleop_test_sim)
---
## Running on Real Robot
Once the robot server is running on the G1 (see Part 3), you can teleoperate and record on the real robot.
### Start the Camera Server
On the robot, start the ZMQ image server:
```bash
python src/lerobot/cameras/zmq/image_server.py
```
Keep this running in a separate terminal for camera streaming during recording.
### Teleoperate Real Robot
```bash
lerobot-teleoperate \
--robot.type=unitree_g1 \
--robot.is_simulation=false \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo \
--fps=100
```
### Record Dataset on Real Robot
```bash
python -m lerobot.scripts.lerobot_record \
--robot.type=unitree_g1 \
--robot.is_simulation=false \
--robot.cameras='{"global_view": {"type": "zmq", "server_address": "172.18.129.215", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo \
--dataset.repo_id=your-username/dataset-name \
--dataset.single_task="Test" \
--dataset.num_episodes=2 \
--dataset.episode_time_s=5 \
--dataset.reset_time_s=5 \
--dataset.push_to_hub=true
```
**Note**: Update `server_address` to match your robot's camera server IP.
Example real robot dataset: [nepyope/teleop_test_real](https://huggingface.co/datasets/nepyope/teleop_test_real)
---
## Additional Resources ## Additional Resources
+5 -1
View File
@@ -111,7 +111,11 @@ hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"] lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
unitree_g1 = [ unitree_g1 = [
"pyzmq>=26.2.1,<28.0.0", "pyzmq>=26.2.1,<28.0.0",
"onnxruntime>=1.16.0,<2.0.0" "onnxruntime>=1.16.0,<2.0.0",
"pin>=3.0.0,<4.0.0",
"meshcat>=0.3.0,<0.4.0",
"matplotlib>=3.9.0,<4.0.0",
"casadi>=3.6.0,<4.0.0",
] ]
reachy2 = ["reachy2_sdk>=1.0.15,<1.1.0"] reachy2 = ["reachy2_sdk>=1.0.15,<1.1.0"]
kinematics = ["lerobot[placo-dep]"] kinematics = ["lerobot[placo-dep]"]
@@ -65,3 +65,6 @@ class UnitreeG1Config(RobotConfig):
# Cameras (ZMQ-based remote cameras) # Cameras (ZMQ-based remote cameras)
cameras: dict[str, CameraConfig] = field(default_factory=dict) cameras: dict[str, CameraConfig] = field(default_factory=dict)
# Compensates for gravity on the unitree's arms using the arm ik solver
gravity_compensation: bool = False
+1 -1
View File
@@ -18,7 +18,7 @@ from enum import IntEnum
# ruff: noqa: N801, N815 # ruff: noqa: N801, N815
NUM_MOTORS = 35 NUM_MOTORS = 29
class G1_29_JointArmIndex(IntEnum): class G1_29_JointArmIndex(IntEnum):
@@ -0,0 +1,313 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import numpy as np
logger = logging.getLogger(__name__)
parent2_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(parent2_dir)
class WeightedMovingFilter:
def __init__(self, weights, data_size=14):
self._window_size = len(weights)
self._weights = np.array(weights)
self._data_size = data_size
self._filtered_data = np.zeros(self._data_size)
self._data_queue = []
def _apply_filter(self):
if len(self._data_queue) < self._window_size:
return self._data_queue[-1]
data_array = np.array(self._data_queue)
temp_filtered_data = np.zeros(self._data_size)
for i in range(self._data_size):
temp_filtered_data[i] = np.convolve(data_array[:, i], self._weights, mode="valid")[-1]
return temp_filtered_data
def add_data(self, new_data):
assert len(new_data) == self._data_size
if len(self._data_queue) > 0 and np.array_equal(
new_data, self._data_queue[-1]
): # skip duplicate data
return
if len(self._data_queue) >= self._window_size:
self._data_queue.pop(0)
self._data_queue.append(new_data)
self._filtered_data = self._apply_filter()
@property
def filtered_data(self):
return self._filtered_data
class G1_29_ArmIK: # noqa: N801
def __init__(self, unit_test=False):
import casadi
import pinocchio as pin
from huggingface_hub import snapshot_download
from pinocchio import casadi as cpin
self._pin = pin
np.set_printoptions(precision=5, suppress=True, linewidth=200)
self.unit_test = unit_test
self.repo_path = snapshot_download("lerobot/unitree-g1-mujoco")
urdf_path = os.path.join(self.repo_path, "assets", "g1_body29_hand14.urdf")
mesh_dir = os.path.join(self.repo_path, "assets")
self.robot = self._pin.RobotWrapper.BuildFromURDF(urdf_path, mesh_dir)
self.mixed_jointsToLockIDs = [
"left_hip_pitch_joint",
"left_hip_roll_joint",
"left_hip_yaw_joint",
"left_knee_joint",
"left_ankle_pitch_joint",
"left_ankle_roll_joint",
"right_hip_pitch_joint",
"right_hip_roll_joint",
"right_hip_yaw_joint",
"right_knee_joint",
"right_ankle_pitch_joint",
"right_ankle_roll_joint",
"waist_yaw_joint",
"waist_roll_joint",
"waist_pitch_joint",
"left_hand_thumb_0_joint",
"left_hand_thumb_1_joint",
"left_hand_thumb_2_joint",
"left_hand_middle_0_joint",
"left_hand_middle_1_joint",
"left_hand_index_0_joint",
"left_hand_index_1_joint",
"right_hand_thumb_0_joint",
"right_hand_thumb_1_joint",
"right_hand_thumb_2_joint",
"right_hand_index_0_joint",
"right_hand_index_1_joint",
"right_hand_middle_0_joint",
"right_hand_middle_1_joint",
]
self.reduced_robot = self.robot.buildReducedRobot(
list_of_joints_to_lock=self.mixed_jointsToLockIDs,
reference_configuration=np.array([0.0] * self.robot.model.nq),
)
# Arm joint names in G1 motor order (G1_29_JointArmIndex)
self._arm_joint_names_g1 = [
"left_shoulder_pitch_joint",
"left_shoulder_roll_joint",
"left_shoulder_yaw_joint",
"left_elbow_joint",
"left_wrist_roll_joint",
"left_wrist_pitch_joint",
"left_wrist_yaw_joint",
"right_shoulder_pitch_joint",
"right_shoulder_roll_joint",
"right_shoulder_yaw_joint",
"right_elbow_joint",
"right_wrist_roll_joint",
"right_wrist_pitch_joint",
"right_wrist_yaw_joint",
]
# Pinocchio uses its own joint order in q; build index mapping.
self._arm_joint_names_pin = sorted(
self._arm_joint_names_g1,
key=lambda name: self.reduced_robot.model.idx_qs[self.reduced_robot.model.getJointId(name)],
)
logger.info(f"Pinocchio arm joint order: {self._arm_joint_names_pin}")
self._arm_reorder_g1_to_pin = [
self._arm_joint_names_g1.index(name) for name in self._arm_joint_names_pin
]
# Inverse mapping to return tau in G1 motor order.
self._arm_reorder_pin_to_g1 = np.argsort(self._arm_reorder_g1_to_pin)
self.reduced_robot.model.addFrame(
self._pin.Frame(
"L_ee",
self.reduced_robot.model.getJointId("left_wrist_yaw_joint"),
self._pin.SE3(np.eye(3), np.array([0.05, 0, 0]).T),
self._pin.FrameType.OP_FRAME,
)
)
self.reduced_robot.model.addFrame(
self._pin.Frame(
"R_ee",
self.reduced_robot.model.getJointId("right_wrist_yaw_joint"),
self._pin.SE3(np.eye(3), np.array([0.05, 0, 0]).T),
self._pin.FrameType.OP_FRAME,
)
)
# Creating Casadi models and data for symbolic computing
self.cmodel = cpin.Model(self.reduced_robot.model)
self.cdata = self.cmodel.createData()
# Creating symbolic variables
self.cq = casadi.SX.sym("q", self.reduced_robot.model.nq, 1)
self.cTf_l = casadi.SX.sym("tf_l", 4, 4)
self.cTf_r = casadi.SX.sym("tf_r", 4, 4)
cpin.framesForwardKinematics(self.cmodel, self.cdata, self.cq)
# Get the hand joint ID and define the error function
self.L_hand_id = self.reduced_robot.model.getFrameId("L_ee")
self.R_hand_id = self.reduced_robot.model.getFrameId("R_ee")
self.translational_error = casadi.Function(
"translational_error",
[self.cq, self.cTf_l, self.cTf_r],
[
casadi.vertcat(
self.cdata.oMf[self.L_hand_id].translation - self.cTf_l[:3, 3],
self.cdata.oMf[self.R_hand_id].translation - self.cTf_r[:3, 3],
)
],
)
self.rotational_error = casadi.Function(
"rotational_error",
[self.cq, self.cTf_l, self.cTf_r],
[
casadi.vertcat(
cpin.log3(self.cdata.oMf[self.L_hand_id].rotation @ self.cTf_l[:3, :3].T),
cpin.log3(self.cdata.oMf[self.R_hand_id].rotation @ self.cTf_r[:3, :3].T),
)
],
)
# Defining the optimization problem
self.opti = casadi.Opti()
self.var_q = self.opti.variable(self.reduced_robot.model.nq)
self.var_q_last = self.opti.parameter(self.reduced_robot.model.nq) # for smooth
self.param_tf_l = self.opti.parameter(4, 4)
self.param_tf_r = self.opti.parameter(4, 4)
self.translational_cost = casadi.sumsqr(
self.translational_error(self.var_q, self.param_tf_l, self.param_tf_r)
)
self.rotation_cost = casadi.sumsqr(
self.rotational_error(self.var_q, self.param_tf_l, self.param_tf_r)
)
self.regularization_cost = casadi.sumsqr(self.var_q)
self.smooth_cost = casadi.sumsqr(self.var_q - self.var_q_last)
# Setting optimization constraints and goals
self.opti.subject_to(
self.opti.bounded(
self.reduced_robot.model.lowerPositionLimit,
self.var_q,
self.reduced_robot.model.upperPositionLimit,
)
)
self.opti.minimize(
50 * self.translational_cost
+ self.rotation_cost
+ 0.02 * self.regularization_cost
+ 0.1 * self.smooth_cost
)
opts = {
"ipopt": {"print_level": 0, "max_iter": 50, "tol": 1e-6},
"print_time": False, # print or not
"calc_lam_p": False, # https://github.com/casadi/casadi/wiki/FAQ:-Why-am-I-getting-%22NaN-detected%22in-my-optimization%3F
}
self.opti.solver("ipopt", opts)
self.init_data = np.zeros(self.reduced_robot.model.nq)
self.smooth_filter = WeightedMovingFilter(np.array([0.4, 0.3, 0.2, 0.1]), 14)
def solve_ik(self, left_wrist, right_wrist, current_lr_arm_motor_q=None, current_lr_arm_motor_dq=None):
if current_lr_arm_motor_q is not None:
self.init_data = current_lr_arm_motor_q
self.opti.set_initial(self.var_q, self.init_data)
self.opti.set_value(self.param_tf_l, left_wrist)
self.opti.set_value(self.param_tf_r, right_wrist)
self.opti.set_value(self.var_q_last, self.init_data) # for smooth
try:
self.opti.solve()
sol_q = self.opti.value(self.var_q)
self.smooth_filter.add_data(sol_q)
sol_q = self.smooth_filter.filtered_data
if current_lr_arm_motor_dq is not None:
v = current_lr_arm_motor_dq * 0.0
else:
v = (sol_q - self.init_data) * 0.0
self.init_data = sol_q
sol_tauff = self._pin.rnea(
self.reduced_robot.model,
self.reduced_robot.data,
sol_q,
v,
np.zeros(self.reduced_robot.model.nv),
)
return sol_q, sol_tauff
except Exception as e:
logger.error(f"ERROR in convergence, plotting debug info.{e}")
sol_q = self.opti.debug.value(self.var_q)
self.smooth_filter.add_data(sol_q)
sol_q = self.smooth_filter.filtered_data
if current_lr_arm_motor_dq is not None:
v = current_lr_arm_motor_dq * 0.0
else:
v = (sol_q - self.init_data) * 0.0
self.init_data = sol_q
logger.error(
f"sol_q:{sol_q} \nmotorstate: \n{current_lr_arm_motor_q} \nleft_pose: \n{left_wrist} \nright_pose: \n{right_wrist}"
)
return current_lr_arm_motor_q, np.zeros(self.reduced_robot.model.nv)
def solve_tau(self, current_lr_arm_motor_q=None, current_lr_arm_motor_dq=None):
try:
q_g1 = np.array(current_lr_arm_motor_q, dtype=float)
if q_g1.shape[0] != len(self._arm_joint_names_g1):
raise ValueError(f"Expected {len(self._arm_joint_names_g1)} arm joints, got {q_g1.shape[0]}")
q_pin = q_g1[self._arm_reorder_g1_to_pin]
sol_tauff = self._pin.rnea(
self.reduced_robot.model,
self.reduced_robot.data,
q_pin,
np.zeros(self.reduced_robot.model.nv),
np.zeros(self.reduced_robot.model.nv),
)
return sol_tauff[self._arm_reorder_pin_to_g1]
except Exception as e:
logger.error(f"ERROR in convergence, plotting debug info.{e}")
return np.zeros(self.reduced_robot.model.nv)
+18 -1
View File
@@ -27,7 +27,8 @@ import numpy as np
from lerobot.cameras.utils import make_cameras_from_configs from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.envs.factory import make_env from lerobot.envs.factory import make_env
from lerobot.processor import RobotAction, RobotObservation from lerobot.processor import RobotAction, RobotObservation
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex from lerobot.robots.unitree_g1.g1_utils import G1_29_JointArmIndex, G1_29_JointIndex
from lerobot.robots.unitree_g1.robot_kinematic_processor import G1_29_ArmIK
from ..robot import Robot from ..robot import Robot
from .config_unitree_g1 import UnitreeG1Config from .config_unitree_g1 import UnitreeG1Config
@@ -127,6 +128,8 @@ class UnitreeG1(Robot):
self.subscribe_thread = None self.subscribe_thread = None
self.remote_controller = self.RemoteController() self.remote_controller = self.RemoteController()
self.arm_ik = G1_29_ArmIK()
def _subscribe_motor_state(self): # polls robot state @ 250Hz def _subscribe_motor_state(self): # polls robot state @ 250Hz
while not self._shutdown_event.is_set(): while not self._shutdown_event.is_set():
start_time = time.time() start_time = time.time()
@@ -361,6 +364,20 @@ class UnitreeG1(Robot):
self.msg.motor_cmd[motor.value].kd = self.kd[motor.value] self.msg.motor_cmd[motor.value].kd = self.kd[motor.value]
self.msg.motor_cmd[motor.value].tau = 0 self.msg.motor_cmd[motor.value].tau = 0
if self.config.gravity_compensation:
# Build action_np from motor commands (arm joints are indices 15-28, local indices 0-13)
action_np = np.zeros(14)
arm_start_idx = G1_29_JointArmIndex.kLeftShoulderPitch.value # 15
for joint in G1_29_JointArmIndex:
local_idx = joint.value - arm_start_idx
action_np[local_idx] = self.msg.motor_cmd[joint.value].q
tau = self.arm_ik.solve_tau(action_np)
# Apply tau back to motor commands
for joint in G1_29_JointArmIndex:
local_idx = joint.value - arm_start_idx
self.msg.motor_cmd[joint.value].tau = tau[local_idx]
self.msg.crc = self.crc.Crc(self.msg) self.msg.crc = self.crc.Crc(self.msg)
self.lowcmd_publisher.Write(self.msg) self.lowcmd_publisher.Write(self.msg)
return action return action
+1
View File
@@ -55,6 +55,7 @@ from lerobot.teleoperators import ( # noqa: F401
omx_leader, omx_leader,
openarm_leader, openarm_leader,
so_leader, so_leader,
unitree_g1,
) )
from lerobot.utils.import_utils import register_third_party_plugins from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.utils import init_logging from lerobot.utils.utils import init_logging
+2 -1
View File
@@ -107,7 +107,7 @@ from lerobot.robots import ( # noqa: F401
openarm_follower, openarm_follower,
reachy2, reachy2,
so_follower, so_follower,
unitree_g1, unitree_g1 as unitree_g1_robot,
) )
from lerobot.teleoperators import ( # noqa: F401 from lerobot.teleoperators import ( # noqa: F401
Teleoperator, Teleoperator,
@@ -120,6 +120,7 @@ from lerobot.teleoperators import ( # noqa: F401
openarm_leader, openarm_leader,
reachy2_teleoperator, reachy2_teleoperator,
so_leader, so_leader,
unitree_g1,
) )
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop
from lerobot.utils.constants import ACTION, OBS_STR from lerobot.utils.constants import ACTION, OBS_STR
@@ -79,6 +79,7 @@ from lerobot.robots import ( # noqa: F401
openarm_follower, openarm_follower,
reachy2, reachy2,
so_follower, so_follower,
unitree_g1 as unitree_g1_robot,
) )
from lerobot.teleoperators import ( # noqa: F401 from lerobot.teleoperators import ( # noqa: F401
Teleoperator, Teleoperator,
@@ -93,6 +94,7 @@ from lerobot.teleoperators import ( # noqa: F401
openarm_leader, openarm_leader,
reachy2_teleoperator, reachy2_teleoperator,
so_leader, so_leader,
unitree_g1,
) )
from lerobot.utils.import_utils import register_third_party_plugins from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.robot_utils import precise_sleep from lerobot.utils.robot_utils import precise_sleep
@@ -0,0 +1,21 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_unitree_g1 import ExoskeletonArmPortConfig, UnitreeG1TeleoperatorConfig
from .exo_calib import ExoskeletonCalibration, ExoskeletonJointCalibration
from .exo_ik import ExoskeletonIKHelper
from .exo_serial import ExoskeletonArm
from .unitree_g1 import UnitreeG1Teleoperator
@@ -0,0 +1,37 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from ..config import TeleoperatorConfig
@dataclass
class ExoskeletonArmPortConfig:
"""Serial port configuration for individual exoskeleton arm."""
port: str = ""
baud_rate: int = 115200
@TeleoperatorConfig.register_subclass("unitree_g1")
@dataclass
class UnitreeG1TeleoperatorConfig(TeleoperatorConfig):
left_arm_config: ExoskeletonArmPortConfig = field(default_factory=ExoskeletonArmPortConfig)
right_arm_config: ExoskeletonArmPortConfig = field(default_factory=ExoskeletonArmPortConfig)
# Frozen joints (comma-separated joint names that won't be moved by IK)
frozen_joints: str = ""
@@ -0,0 +1,446 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module handles calibration of hall effect sensors used in the exoskeleton.
Each joint has a pair of ADC channels outputting sin and cos values that trace an ellipse
as the joint rotates due to imprecision in magnet/sensor placement. We fit this ellipse to a unit circle,
and calculate arctan2 of the unit circle to get the joint angle.
We then store the ellipse parameters and the zero offset for each joint to be used at runtime.
"""
import json
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
import serial
logger = logging.getLogger(__name__)
# exoskeleton joint names -> ADC channel pairs. TODO: add wrist pitch and wrist yaw
JOINTS = {
"shoulder_pitch": (0, 1),
"shoulder_yaw": (2, 3),
"shoulder_roll": (4, 5),
"elbow_flex": (6, 7),
"wrist_roll": (14, 15),
}
@dataclass
class ExoskeletonJointCalibration:
name: str # joint name
center_fit: list[float] # center of the ellipse
T: list[list[float]] # 2x2 transformation matrix
zero_offset: float = 0.0 # angle at neutral pose
@dataclass
class ExoskeletonCalibration:
"""Full calibration data for an exoskeleton arm."""
version: int = 2
side: str = ""
adc_max: int = 2**12 - 1
joints: list[ExoskeletonJointCalibration] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"version": self.version,
"side": self.side,
"adc_max": self.adc_max,
"joints": [
{
"name": j.name,
"center_fit": j.center_fit,
"T": j.T,
"zero_offset": j.zero_offset,
}
for j in self.joints
],
}
@classmethod
def from_dict(cls, data: dict) -> "ExoskeletonCalibration":
joints = [
ExoskeletonJointCalibration(
name=j["name"],
center_fit=j["center_fit"],
T=j["T"],
zero_offset=j.get("zero_offset", 0.0),
)
for j in data.get("joints", [])
]
return cls(
version=data.get("version", 2),
side=data.get("side", ""),
adc_max=data.get("adc_max", 2**12 - 1),
joints=joints,
)
@dataclass(frozen=True)
class CalibParams:
fit_every: float = 0.15
min_fit_points: int = 60
fit_window: int = 900
max_fit_points: int = 300
trim_low: float = 0.05
trim_high: float = 0.95
median_window: int = 5
history: int = 3500
draw_hz: float = 120.0
sample_count: int = 50
def normalize_angle(angle: float) -> float:
while angle > np.pi:
angle -= 2 * np.pi
while angle < -np.pi:
angle += 2 * np.pi
return angle
def joint_z_and_angle(raw16: list[int], j: ExoskeletonJointCalibration) -> tuple[np.ndarray, float]:
"""
Applies calibration to each joint: raw → centered → ellipse-to-circle → angle.
"""
pair = JOINTS[j.name]
s, c = raw16[pair[0]], raw16[pair[1]] # get sin and cos
p = np.array([float(c) - (2**12 - 1) / 2, float(s) - (2**12 - 1) / 2]) # center the raw values
z = np.asarray(j.T) @ (
p - np.asarray(j.center_fit)
) # center the ellipse and invert the transformation matrix to get unit circle coords
ang = float(np.arctan2(z[1], z[0])) - j.zero_offset # calculate the anvgle and apply the zero offset
return z, normalize_angle(-ang) # ensure range is [-pi, pi]
def exo_raw_to_angles(raw16: list[int], calib: ExoskeletonCalibration) -> dict[str, float]:
"""Convert raw sensor readings to joint angles using calibration."""
return {j.name: joint_z_and_angle(raw16, j)[1] for j in calib.joints}
def run_exo_calibration(
ser: serial.Serial,
side: str,
save_path: Path,
params: CalibParams | None = None,
) -> ExoskeletonCalibration:
"""
Run interactive calibration for an exoskeleton arm.
"""
try:
import cv2
import matplotlib.pyplot as plt
except ImportError as e:
raise ImportError(
"Calibration requires matplotlib and opencv-python. "
"Install with: pip install matplotlib opencv-python"
) from e
from .exo_serial import read_raw_from_serial
params = params or CalibParams()
joint_list = list(JOINTS.items()) # Convert dict to list for indexing
logger.info(f"Starting calibration for {side} exoskeleton arm")
def running_median(win: deque) -> float:
return float(np.median(np.fromiter(win, dtype=float)))
def read_joint_point(raw16: list[int], pair: tuple[int, int]):
s, c = raw16[pair[0]], raw16[pair[1]]
return float(c) - (2**12 - 1) / 2, float(s) - (2**12 - 1) / 2, float(s), float(c)
def select_fit_subset(xs, ys):
"""Select and filter points for ellipse fitting. Trims outliers by radius and downsamples."""
n = min(params.fit_window, len(xs))
if n <= 0:
return None, None
x = np.asarray(list(xs)[-n:], dtype=float) # most recent n samples
y = np.asarray(list(ys)[-n:], dtype=float)
r = np.sqrt(x * x + y * y) # radius from origin
if len(r) >= 20:
lo, hi = np.quantile(r, params.trim_low), np.quantile(r, params.trim_high) # outlier bounds
keep = (r >= lo) & (r <= hi)
x, y = x[keep], y[keep] # remove outliers
if len(x) > params.max_fit_points:
idx = np.linspace(0, len(x) - 1, params.max_fit_points).astype(int) # downsample evenly
x, y = x[idx], y[idx]
return x, y
def fit_ellipse_opencv(x, y):
"""Fit ellipse to (x,y) points using OpenCV. Returns center, axes, rotation matrix, and outline."""
x, y = np.asarray(x, dtype=float), np.asarray(y, dtype=float)
if len(x) < 5:
return None
pts = np.stack([x, y], axis=1).astype(np.float32).reshape(-1, 1, 2)
try:
(xc, yc), (w, h), angle_deg = cv2.fitEllipse(pts) # returns center, axes, rotation in degrees
except cv2.error:
return None
a, b = float(w) * 0.5, float(h) * 0.5 # get ellipse major and minor semi-axes
phi = np.deg2rad(float(angle_deg)) # to rad
if b > a: # ensure major axis is a
a, b = b, a
phi += np.pi / 2.0
if not np.isfinite(a) or not np.isfinite(b) or a <= 1e-6 or b <= 1e-6:
return None
cp, sp = float(np.cos(phi)), float(np.sin(phi)) #
rot = np.array([[cp, -sp], [sp, cp]], dtype=float) # 2x2 rotation matrix
center = np.array([float(xc), float(yc)], dtype=float) # offset vector
tt = np.linspace(0, 2 * np.pi, 360)
outline = (rot @ np.stack([a * np.cos(tt), b * np.sin(tt)])).T + center # for viz
return {"center": center, "a": a, "b": b, "R": rot, "ex": outline[:, 0], "ey": outline[:, 1]}
# Setup matplotlib
plt.ion()
fig, (ax0, ax1) = plt.subplots(1, 2, figsize=(12, 6))
ax0.set_xlabel("cos - center")
ax0.set_ylabel("sin - center")
ax0.grid(True, alpha=0.25)
ax0.set_aspect("equal", adjustable="box")
ax1.set_title("Unit circle + angle")
ax1.set_xlabel("x")
ax1.set_ylabel("y")
ax1.grid(True, alpha=0.25)
ax1.set_aspect("equal", adjustable="box")
tt = np.linspace(0, 2 * np.pi, 360)
ax1.plot(np.cos(tt), np.sin(tt), "k-", linewidth=1)
ax0.set_xlim(-2200, 2200)
ax0.set_ylim(-2200, 2200)
ax1.set_xlim(-1.4, 1.4)
ax1.set_ylim(-1.4, 1.4)
sc0 = ax0.scatter([], [], s=6, animated=True)
(ell_line,) = ax0.plot([], [], "r-", linewidth=2, animated=True)
sc1 = ax1.scatter([], [], s=6, animated=True)
(radius_line,) = ax1.plot([], [], "g-", linewidth=2, animated=True)
angle_text = ax1.text(
0.02, 0.98, "", transform=ax1.transAxes, va="top", ha="left", fontsize=12, animated=True
)
fig.canvas.draw()
bg0 = fig.canvas.copy_from_bbox(ax0.bbox)
bg1 = fig.canvas.copy_from_bbox(ax1.bbox)
# State
joints_out = []
joint_idx = 0
phase = "ellipse"
advance_requested = False
zero_samples = []
def on_key(event):
nonlocal advance_requested
if event.key in ("n", "N", "enter", " "):
advance_requested = True
fig.canvas.mpl_connect("key_press_event", on_key)
def reset_state():
return {
"xs": deque(maxlen=params.history),
"ys": deque(maxlen=params.history),
"xu": deque(maxlen=params.history),
"yu": deque(maxlen=params.history),
"win_s": deque(maxlen=params.median_window),
"win_c": deque(maxlen=params.median_window),
"ellipse_cache": None,
"T": None,
"center_fit": None,
"have_transform": False,
"latest_z": None,
"last_fit": 0.0,
}
state = reset_state()
last_draw = 0.0
name, pair = joint_list[joint_idx]
fig.canvas.manager.set_window_title(f"[{joint_idx + 1}/{len(joint_list)}] {name} - ELLIPSE")
ax0.set_title(f"{name} raw (filtered)")
logger.info(f"[{joint_idx + 1}/{len(joint_list)}] Calibrating {name}")
logger.info("Step 1: Move joint around to map ellipse, then press 'n'")
try:
while plt.fignum_exists(fig.number):
name, pair = joint_list[joint_idx]
# Handles calibration GUI state: ellipse → zero_pose → next joint -> ellipse -> ...
if phase == "ellipse" and advance_requested and state["have_transform"]:
joints_out.append(
{
"name": name,
"center_fit": state["center_fit"].tolist(),
"T": state["T"].tolist(),
}
)
logger.info(f" -> Ellipse saved for {name}")
phase, zero_samples, advance_requested = "zero_pose", [], False
fig.canvas.manager.set_window_title(f"[{joint_idx + 1}/{len(joint_list)}] {name} - ZERO POSE")
ax0.set_title(f"{name} - hold zero pose")
fig.canvas.draw()
bg0, bg1 = fig.canvas.copy_from_bbox(ax0.bbox), fig.canvas.copy_from_bbox(ax1.bbox)
logger.info(f"Step 2: Hold {name} in zero position, then press 'n'")
elif phase == "ellipse" and advance_requested and not state["have_transform"]:
logger.info(" (Need valid fit first - keep moving the joint)")
advance_requested = False
elif phase == "zero_pose" and advance_requested:
if len(zero_samples) >= params.sample_count:
zero_offset = float(np.mean(zero_samples[-params.sample_count :]))
joints_out[-1]["zero_offset"] = zero_offset
logger.info(f" -> {name} zero: {zero_offset:+.3f} rad ({np.degrees(zero_offset):+.1f}°)")
joint_idx += 1
advance_requested = False
if joint_idx >= len(joint_list):
# All joints done
calib = ExoskeletonCalibration(
version=2,
side=side,
adc_max=2**12 - 1,
joints=[
ExoskeletonJointCalibration(
name=j["name"],
center_fit=j["center_fit"],
T=j["T"],
zero_offset=j.get("zero_offset", 0.0),
)
for j in joints_out
],
)
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, "w") as f:
json.dump(calib.to_dict(), f, indent=2)
logger.info(f"Saved calibration to {save_path}")
logger.info("Calibration complete!")
plt.close(fig)
return calib
# Next joint
phase, state = "ellipse", reset_state()
name, pair = joint_list[joint_idx]
fig.canvas.manager.set_window_title(
f"[{joint_idx + 1}/{len(joint_list)}] {name} - ELLIPSE"
)
ax0.set_title(f"{name} raw (filtered)")
fig.canvas.draw()
bg0, bg1 = fig.canvas.copy_from_bbox(ax0.bbox), fig.canvas.copy_from_bbox(ax1.bbox)
logger.info(f"[{joint_idx + 1}/{len(joint_list)}] Calibrating {name}")
logger.info("Step 1: Move joint around to map ellipse, then press 'n'")
else:
logger.info(
f" (Collecting samples: {len(zero_samples)}/{params.sample_count} - hold still)"
)
advance_requested = False
# Read sensor
raw16 = read_raw_from_serial(ser)
if raw16 is not None:
x_raw, y_raw, s_raw, c_raw = read_joint_point(raw16, pair)
if phase == "ellipse":
if state["have_transform"]:
z = state["T"] @ (np.array([x_raw, y_raw]) - state["center_fit"])
state["xu"].append(float(z[0]))
state["yu"].append(float(z[1]))
state["latest_z"] = (float(z[0]), float(z[1]))
state["win_s"].append(s_raw)
state["win_c"].append(c_raw)
if len(state["win_s"]) >= max(3, params.median_window):
state["ys"].append(running_median(state["win_s"]) - (2**12 - 1) / 2)
state["xs"].append(running_median(state["win_c"]) - (2**12 - 1) / 2)
else:
jdata = joints_out[-1]
z = np.array(jdata["T"]) @ (np.array([x_raw, y_raw]) - np.array(jdata["center_fit"]))
zero_samples.append(float(np.arctan2(z[1], z[0])))
state["latest_z"] = (float(z[0]), float(z[1]))
# Ellipse fitting
t = time.time()
if (
phase == "ellipse"
and (t - state["last_fit"]) >= params.fit_every
and len(state["xs"]) >= params.min_fit_points
):
xfit, yfit = select_fit_subset(state["xs"], state["ys"])
if xfit is not None and len(xfit) >= params.min_fit_points:
fit = fit_ellipse_opencv(xfit, yfit)
if fit is not None:
state["center_fit"] = fit["center"]
state["T"] = np.diag([1.0 / fit["a"], 1.0 / fit["b"]]) @ fit["R"].T
state["ellipse_cache"] = (fit["ex"], fit["ey"])
state["have_transform"] = True
state["last_fit"] = t
# Drawing
if (t - last_draw) >= 1.0 / params.draw_hz:
fig.canvas.restore_region(bg0)
fig.canvas.restore_region(bg1)
if phase == "ellipse":
sc0.set_offsets(np.c_[state["xs"], state["ys"]] if state["xs"] else np.empty((0, 2)))
ax0.draw_artist(sc0)
ell_line.set_data(*state["ellipse_cache"] if state["ellipse_cache"] else ([], []))
ax0.draw_artist(ell_line)
sc1.set_offsets(np.c_[state["xu"], state["yu"]] if state["xu"] else np.empty((0, 2)))
ax1.draw_artist(sc1)
if state["latest_z"]:
zx, zy = state["latest_z"]
radius_line.set_data([0.0, zx], [0.0, zy])
ang = float(np.arctan2(zy, zx))
angle_text.set_text(
f"angle: {ang:+.3f} rad ({np.degrees(ang):+.1f}°)\nmove {name}, press 'n' to advance"
)
else:
radius_line.set_data([], [])
angle_text.set_text("(waiting for fit)")
else:
sc0.set_offsets(np.empty((0, 2)))
ax0.draw_artist(sc0)
ell_line.set_data([], [])
ax0.draw_artist(ell_line)
if state["latest_z"]:
zx, zy = state["latest_z"]
sc1.set_offsets([[zx, zy]])
radius_line.set_data([0.0, zx], [0.0, zy])
ang = float(np.arctan2(zy, zx))
angle_text.set_text(
f"Zero pose for {name}\nangle: {ang:+.3f} rad\nsamples: {len(zero_samples)}/{params.sample_count}\nhold still, press 'n'"
)
else:
sc1.set_offsets(np.empty((0, 2)))
radius_line.set_data([], [])
angle_text.set_text("(waiting for data)")
ax1.draw_artist(sc1)
ax1.draw_artist(radius_line)
ax1.draw_artist(angle_text)
fig.canvas.blit(ax0.bbox)
fig.canvas.blit(ax1.bbox)
fig.canvas.flush_events()
last_draw = t
plt.pause(0.001)
finally:
plt.close(fig)
@@ -0,0 +1,353 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
IK helper for exoskeleton-to-G1 teleoperation. We map Exoskeleton joint angles to end-effector pose in world frame,
visualizing the result in meshcat after calibration.
"""
import logging
import os
from dataclasses import dataclass
import numpy as np
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointArmIndex
from lerobot.robots.unitree_g1.robot_kinematic_processor import G1_29_ArmIK
from .exo_calib import JOINTS
logger = logging.getLogger(__name__)
def _frame_id(model, name: str) -> int | None:
try:
fid = model.getFrameId(name)
return fid if 0 <= fid < model.nframes else None
except Exception:
return None
@dataclass
class ArmCfg:
side: str # "left" | "right"
urdf: str # exo_left.urdf / exo_right.urdf
root: str # "exo_left" / "exo_right"
g1_ee: str # "l_ee" / "r_ee"
offset: np.ndarray # world offset for viz + target
marker_prefix: str # "left" / "right"
class Markers:
"""Creates meshcat visualization primitives, showing end-effector frames of exoskeleton and G1"""
def __init__(self, viewer):
self.v = viewer
def sphere(self, path: str, r: float, rgba: tuple[float, float, float, float]):
import meshcat.geometry as mg
c = (int(rgba[0] * 255) << 16) | (int(rgba[1] * 255) << 8) | int(rgba[2] * 255)
self.v[path].set_object(
mg.Sphere(r),
mg.MeshPhongMaterial(color=c, opacity=rgba[3], transparent=rgba[3] < 1.0),
)
def axes(self, path: str, axis_len: float = 0.1, axis_w: int = 6):
import meshcat.geometry as mg
pts = np.array(
[[0, 0, 0], [axis_len, 0, 0], [0, 0, 0], [0, axis_len, 0], [0, 0, 0], [0, 0, axis_len]],
dtype=np.float32,
).T
cols = np.array(
[[1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 1, 0], [0, 0, 1], [0, 0, 1]],
dtype=np.float32,
).T
self.v[path].set_object(
mg.LineSegments(
mg.PointsGeometry(position=pts, color=cols),
mg.LineBasicMaterial(linewidth=axis_w, vertexColors=True),
)
)
def tf(self, path: str, mat: np.ndarray):
self.v[path].set_transform(mat)
class ExoskeletonIKHelper:
"""
- Loads G1 robot and exoskeleton URDF models via Pinocchio
- Computes forward kinematics on exoskeleton to get end-effector poses
- Solves inverse kinematics on G1 to match those poses
- Provides meshcat visualization showing both robots and targets
Args:
frozen_joints: List of G1 joint names to exclude from IK (kept at neutral).
"""
def __init__(self, frozen_joints: list[str] | None = None):
try:
import pinocchio as pin
except ImportError as e:
raise ImportError("ik mode needs pinocchio: pip install pin") from e
self.pin = pin
self.frozen_joints = frozen_joints or []
self.g1_ik = G1_29_ArmIK()
self.robot_g1 = self.g1_ik.reduced_robot
self.robot_g1.data = self.robot_g1.model.createData()
self.q_g1 = pin.neutral(self.robot_g1.model)
assets_dir = os.path.join(self.g1_ik.repo_path, "assets")
self.frozen_idx = self._frozen_joint_indices()
self.arms = [
ArmCfg(
side="left",
urdf=os.path.join(assets_dir, "exo_left.urdf"),
root="exo_left",
g1_ee="L_ee",
offset=np.array([0.6, 0.3, 0.0]),
marker_prefix="left",
),
ArmCfg(
side="right",
urdf=os.path.join(assets_dir, "exo_right.urdf"),
root="exo_right",
g1_ee="R_ee",
offset=np.array([0.6, -0.3, 0.0]),
marker_prefix="right",
),
]
self.exo = {} # side -> pin.RobotWrapper
self.q_exo = {} # side -> q
self.ee_id_exo = {} # side -> frame id
self.qmap = {} # side -> {joint_name: q_idx}
self.ee_id_g1 = {} # side -> frame id
self._load_exo_models(assets_dir)
for a in self.arms:
self.ee_id_g1[a.side] = _frame_id(self.robot_g1.model, a.g1_ee)
self.viewer = None
self.markers: Markers | None = None
self.viz_g1 = None
self.viz_exo = {} # side -> viz
def _frozen_joint_indices(self) -> dict[str, int]:
out = {}
m = self.robot_g1.model
for name in self.frozen_joints:
if name in m.names:
jid = m.getJointId(name)
out[name] = m.idx_qs[jid]
logger.info(f"freezing joint: {name} (q_idx={out[name]})")
return out
def _find_exo_ee(self, model, ee_name: str = "ee") -> int:
ee = _frame_id(model, ee_name)
if ee is not None:
return ee
for fid in reversed(range(model.nframes)):
if model.frames[fid].type == self.pin.FrameType.BODY:
return fid
return 0
def _build_joint_map(self, robot) -> dict[str, int]:
m = robot.model
return {n: m.idx_qs[m.getJointId(n)] for n in JOINTS if n in m.names}
def _load_exo_models(self, assets_dir: str):
pin = self.pin
for a in self.arms:
if not os.path.exists(a.urdf):
logger.warning(f"{a.side} exo urdf not found: {a.urdf}")
continue
r = pin.RobotWrapper.BuildFromURDF(a.urdf, assets_dir)
self.exo[a.side] = r
self.q_exo[a.side] = pin.neutral(r.model)
self.ee_id_exo[a.side] = self._find_exo_ee(r.model)
self.qmap[a.side] = self._build_joint_map(r)
logger.info(f"loaded {a.side} exo urdf: {a.urdf}")
def init_visualization(self):
"""
Creates a browser-based visualization of exoskeleton and G1 robot,
highlighting end-effector frames and target positions.
"""
try:
from pinocchio.visualize import MeshcatVisualizer
except ImportError as e:
logger.warning(f"meshcat viz unavailable: {e}")
return
# g1
self.viz_g1 = MeshcatVisualizer(
self.robot_g1.model, self.robot_g1.collision_model, self.robot_g1.visual_model
)
self.viz_g1.initViewer(open=True)
self.viz_g1.loadViewerModel("g1")
self.viz_g1.display(self.q_g1)
self.viewer = self.viz_g1.viewer
self.markers = Markers(self.viewer)
# exos
for a in self.arms:
if a.side not in self.exo:
continue
r = self.exo[a.side]
v = MeshcatVisualizer(r.model, r.collision_model, r.visual_model)
v.initViewer(open=False)
v.viewer = self.viewer
v.loadViewerModel(a.root)
offset_tf = np.eye(4)
offset_tf[:3, 3] = a.offset
self.viewer[a.root].set_transform(offset_tf)
v.display(self.q_exo[a.side])
self.viz_exo[a.side] = v
# markers
for a in self.arms:
p = a.marker_prefix
self.markers.sphere(f"markers/{p}_exo_ee", 0.012, (0.2, 1.0, 0.2, 0.9))
self.markers.sphere(f"markers/{p}_g1_ee", 0.015, (1.0, 0.2, 0.2, 0.9))
self.markers.sphere(f"markers/{p}_ik_target", 0.015, (0.1, 0.3, 1.0, 0.9))
self.markers.axes(f"markers/{p}_exo_axes", 0.06)
self.markers.axes(f"markers/{p}_g1_axes", 0.08)
logger.info(f"meshcat viz initialized: {self.viewer.url()}")
print(f"\nmeshcat url: {self.viewer.url()}\n")
def _fk_target_world(self, side: str, angles: dict[str, float]) -> np.ndarray | None:
"""returns wrist frame target to be used for G1 IK in 4x4 homogeneous transform. Takes offset into account."""
if side not in self.exo or not angles:
return None
pin = self.pin
q = self.q_exo[side]
qmap = self.qmap[side]
for name, ang in angles.items():
idx = qmap.get(name)
if idx is not None:
q[idx] = float(ang)
r = self.exo[side]
pin.forwardKinematics(r.model, r.data, q)
pin.updateFramePlacements(r.model, r.data)
ee = r.data.oMf[self.ee_id_exo[side]]
target = np.eye(4)
target[:3, :3] = ee.rotation
# offset gets applied in world space
cfg = next(a for a in self.arms if a.side == side)
target[:3, 3] = cfg.offset + ee.translation
return target
def update_visualization(self):
if self.viewer is None or self.markers is None:
return
pin = self.pin
# g1
if self.viz_g1 is not None:
self.viz_g1.display(self.q_g1)
pin.forwardKinematics(self.robot_g1.model, self.robot_g1.data, self.q_g1)
pin.updateFramePlacements(self.robot_g1.model, self.robot_g1.data)
for a in self.arms:
fid = self.ee_id_g1.get(a.side)
if fid is None:
continue
ee_tf = self.robot_g1.data.oMf[fid].homogeneous
p = a.marker_prefix
self.markers.tf(f"markers/{p}_g1_ee", ee_tf)
self.markers.tf(f"markers/{p}_g1_axes", ee_tf)
# exos
for a in self.arms:
side = a.side
v = self.viz_exo.get(side)
if v is None:
continue
v.display(self.q_exo[side])
r = self.exo[side]
pin.forwardKinematics(r.model, r.data, self.q_exo[side])
pin.updateFramePlacements(r.model, r.data)
ee = r.data.oMf[self.ee_id_exo[side]]
world_tf = (pin.SE3(np.eye(3), a.offset) * ee).homogeneous
p = a.marker_prefix
self.markers.tf(f"markers/{p}_exo_ee", world_tf)
self.markers.tf(f"markers/{p}_exo_axes", world_tf)
target_tf = np.eye(4)
target_tf[:3, :3] = ee.rotation
target_tf[:3, 3] = a.offset + ee.translation
self.markers.tf(f"markers/{p}_ik_target", target_tf)
def compute_g1_joints_from_exo(
self,
left_angles: dict[str, float],
right_angles: dict[str, float],
) -> dict[str, float]:
"""
Performs FK on exoskeleton to get end-effector poses in world frame,
after which it solves IK on G1 to return joint angles matching those poses in G1 motor order.
"""
pin = self.pin
targets = {
"left": self._fk_target_world("left", left_angles),
"right": self._fk_target_world("right", right_angles),
}
# fallback to current g1 ee pose if missing target
pin.forwardKinematics(self.robot_g1.model, self.robot_g1.data, self.q_g1)
pin.updateFramePlacements(self.robot_g1.model, self.robot_g1.data)
for a in self.arms:
if targets[a.side] is not None:
continue
fid = self.ee_id_g1.get(a.side)
if fid is not None:
targets[a.side] = self.robot_g1.data.oMf[fid].homogeneous
if targets["left"] is None or targets["right"] is None:
logger.warning("missing ik targets, returning current pose")
return {}
frozen_vals = {n: self.q_g1[i] for n, i in self.frozen_idx.items()}
self.q_g1, _ = self.g1_ik.solve_ik(
targets["left"], targets["right"], current_lr_arm_motor_q=self.q_g1
)
for n, i in self.frozen_idx.items():
self.q_g1[i] = frozen_vals[n]
return {
f"{j.name}.q": float(self.q_g1[i])
for i, j in enumerate(G1_29_JointArmIndex)
if i < len(self.q_g1)
}
@@ -0,0 +1,119 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from dataclasses import dataclass
from pathlib import Path
import serial
from .exo_calib import ExoskeletonCalibration, exo_raw_to_angles, run_exo_calibration
logger = logging.getLogger(__name__)
def parse_raw16(line: bytes) -> list[int] | None:
try:
parts = line.decode("utf-8", errors="ignore").split()
if len(parts) < 16:
return None
return [int(x) for x in parts[:16]]
except Exception:
return None
def read_raw_from_serial(ser) -> list[int] | None:
"""Read latest sample from serial; if buffer is backed up, keep only the newest."""
last = None
while ser.in_waiting > 0:
b = ser.readline()
if not b:
break
raw16 = parse_raw16(b)
if raw16 is not None:
last = raw16
if last is None:
b = ser.readline()
if b:
last = parse_raw16(b)
return last
@dataclass
class ExoskeletonArm:
port: str
calibration_fpath: Path
side: str
baud_rate: int = 115200
_ser: serial.Serial | None = None
calibration: ExoskeletonCalibration | None = None
def __post_init__(self):
if self.calibration_fpath.is_file():
self._load_calibration()
@property
def is_connected(self) -> bool:
return self._ser is not None and getattr(self._ser, "is_open", False)
@property
def is_calibrated(self) -> bool:
return self.calibration is not None
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
return
try:
self._ser = serial.Serial(self.port, self.baud_rate, timeout=0.02)
self._ser.reset_input_buffer()
logger.info(f"connected: {self.port}")
except serial.SerialException as e:
raise ConnectionError(f"failed to connect to {self.port}: {e}") from e
if calibrate and not self.is_calibrated:
self.calibrate()
def disconnect(self) -> None:
if self._ser:
try:
self._ser.close()
finally:
self._ser = None
def _load_calibration(self) -> None:
try:
data = json.loads(self.calibration_fpath.read_text())
self.calibration = ExoskeletonCalibration.from_dict(data)
logger.info(f"loaded calibration: {self.calibration_fpath}")
except Exception as e:
logger.warning(f"failed to load calibration: {e}")
def read_raw(self) -> list[int] | None:
if not self._ser:
return None
return read_raw_from_serial(self._ser)
def get_angles(self) -> dict[str, float]:
if not self.calibration:
raise RuntimeError("exoskeleton not calibrated")
raw = self.read_raw()
return {} if raw is None else exo_raw_to_angles(raw, self.calibration)
def calibrate(self) -> None:
ser = self._ser
self.calibration = run_exo_calibration(ser, self.side, self.calibration_fpath)
@@ -0,0 +1,157 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS
from ..teleoperator import Teleoperator
from .config_unitree_g1 import UnitreeG1TeleoperatorConfig
from .exo_ik import ExoskeletonIKHelper
from .exo_serial import ExoskeletonArm
logger = logging.getLogger(__name__)
class UnitreeG1Teleoperator(Teleoperator):
"""
Bimanual exoskeleton arms teleoperator for Unitree G1 arms.
Uses inverse kinematics: exoskeleton FK computes end-effector pose,
G1 IK solves for joint angles.
"""
config_class = UnitreeG1TeleoperatorConfig
name = "unitree_g1"
def __init__(self, config: UnitreeG1TeleoperatorConfig):
super().__init__(config)
self.config = config
# Setup calibration directory
self.calibration_dir = (
config.calibration_dir
if config.calibration_dir
else HF_LEROBOT_CALIBRATION / TELEOPERATORS / self.name
)
self.calibration_dir.mkdir(parents=True, exist_ok=True)
left_id = f"{config.id}_left" if config.id else "left"
right_id = f"{config.id}_right" if config.id else "right"
# Create exoskeleton arm instances
self.left_arm = ExoskeletonArm(
port=config.left_arm_config.port,
baud_rate=config.left_arm_config.baud_rate,
calibration_fpath=self.calibration_dir / f"{left_id}.json",
side="left",
)
self.right_arm = ExoskeletonArm(
port=config.right_arm_config.port,
baud_rate=config.right_arm_config.baud_rate,
calibration_fpath=self.calibration_dir / f"{right_id}.json",
side="right",
)
self.ik_helper: ExoskeletonIKHelper | None = None
@cached_property
def action_features(self) -> dict[str, type]:
return {f"{name}.q": float for name in self._g1_joint_names}
@cached_property
def feedback_features(self) -> dict[str, type]:
return {}
@property
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
@property
def is_calibrated(self) -> bool:
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
frozen_joints = [j.strip() for j in self.config.frozen_joints.split(",") if j.strip()]
self.ik_helper = ExoskeletonIKHelper(frozen_joints=frozen_joints)
logger.info("IK helper initialized")
def calibrate(self) -> None:
if not self.left_arm.is_calibrated:
logger.info("Starting calibration for left arm...")
self.left_arm.calibrate()
else:
logger.info("Left arm already calibrated. Skipping.")
if not self.right_arm.is_calibrated:
logger.info("Starting calibration for right arm...")
self.right_arm.calibrate()
else:
logger.info("Right arm already calibrated. Skipping.")
logger.info("Starting visualization to verify calibration...")
self.run_visualization_loop()
def configure(self) -> None:
pass
def get_action(self) -> dict[str, float]:
left_angles = self.left_arm.get_angles()
right_angles = self.right_arm.get_angles()
return self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles)
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Exoskeleton arms do not support feedback")
def disconnect(self) -> None:
self.left_arm.disconnect()
self.right_arm.disconnect()
def run_visualization_loop(self):
"""Run interactive Meshcat visualization loop to verify tracking."""
if self.ik_helper is None:
frozen_joints = [j.strip() for j in self.config.frozen_joints.split(",") if j.strip()]
self.ik_helper = ExoskeletonIKHelper(frozen_joints=frozen_joints)
self.ik_helper.init_visualization()
print("\n" + "=" * 60)
print("Visualization running! Move the exoskeletons to test tracking.")
print("Press Ctrl+C to exit.")
print("=" * 60 + "\n")
try:
while True:
left_angles = self.left_arm.get_angles()
right_angles = self.right_arm.get_angles()
self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles)
self.ik_helper.update_visualization()
time.sleep(0.01)
except KeyboardInterrupt:
print("\n\nVisualization stopped.")
@cached_property
def _g1_joint_names(self) -> list[str]:
return [joint.name for joint in G1_29_JointIndex]
+4
View File
@@ -75,6 +75,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
from .homunculus import HomunculusArm from .homunculus import HomunculusArm
return HomunculusArm(config) return HomunculusArm(config)
elif config.type == "unitree_g1":
from .unitree_g1 import UnitreeG1Teleoperator
return UnitreeG1Teleoperator(config)
elif config.type == "bi_so_leader": elif config.type == "bi_so_leader":
from .bi_so_leader import BiSOLeader from .bi_so_leader import BiSOLeader