mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-12 15:19:43 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ee24f64ae5 | |||
| 123b9f7851 | |||
| a6c3a0fa09 | |||
| c2fb644613 | |||
| 1d07a4aefd |
@@ -4,11 +4,12 @@ This guide covers the complete setup process for the Unitree G1 humanoid, from i
|
||||
|
||||
## About the Unitree G1
|
||||
|
||||
We offer support for both 29 and 23 DOF G1. In this first PR we introduce:
|
||||
We offer support for both 29 and 23 DOF G1. We introduce:
|
||||
|
||||
- **`unitree g1` robot class, handling low level communication with the humanoid**
|
||||
- **ZMQ socket bridge** for remote communication over WiFi, allowing one to deploy policies remotely instead of over ethernet or directly on the Orin
|
||||
- **GR00T locomotion policy** for bipedal walking and balance
|
||||
- **MuJoCo simulation mode** for testing policies without the physical robot
|
||||
|
||||
---
|
||||
|
||||
@@ -191,6 +192,10 @@ Press `Ctrl+C` to stop the policy.
|
||||
|
||||
---
|
||||
|
||||
## Extra: Running in Simulation Mode (MuJoCo)
|
||||
|
||||
You can now test and develop policies without a physical robot using MuJoCo. to do so set `is_simulation=True` in config.
|
||||
|
||||
## Additional Resources
|
||||
|
||||
- [Unitree SDK Documentation](https://github.com/unitreerobotics/unitree_sdk2_python)
|
||||
|
||||
@@ -130,13 +130,13 @@ lerobot-train \
|
||||
--policy.path="lerobot/xvla-base" \
|
||||
--policy.repo_id="HF_USER/xvla-your-robot" \
|
||||
--policy.dtype=bfloat16 \
|
||||
--steps=3000 \
|
||||
--policy.action_mode=auto \
|
||||
--steps=20000 \
|
||||
--policy.device=cuda \
|
||||
--policy.freeze_vision_encoder=false \
|
||||
--policy.freeze_language_encoder=false \
|
||||
--policy.train_policy_transformer=true \
|
||||
--policy.train_soft_prompts=true \
|
||||
--policy.action_mode=YOUR_ACTION_MODE
|
||||
```
|
||||
|
||||
### Training Parameters Explained
|
||||
|
||||
@@ -0,0 +1,454 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
WBT (Whole Body Tracking) Dance Policy for Unitree G1
|
||||
|
||||
Uses ONNX model with motion data baked in.
|
||||
Pattern matches gr00t_locomotion.py - uses UnitreeG1 robot class.
|
||||
|
||||
Usage:
|
||||
python examples/unitree_g1/dance.py
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from xml.etree import ElementTree
|
||||
|
||||
import numpy as np
|
||||
import onnx
|
||||
import onnxruntime as ort
|
||||
import pinocchio as pin
|
||||
|
||||
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
|
||||
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# =============================================================================
|
||||
# CONFIGURATION
|
||||
# =============================================================================
|
||||
|
||||
DANCE_ONNX_PATH = "examples/unitree_g1/fastsac_g1_29dof_dancing.onnx"
|
||||
CONTROL_DT = 0.02 # 50 Hz
|
||||
NUM_DOFS = 29
|
||||
|
||||
# Default joint positions (holosoma training defaults)
|
||||
DEFAULT_DOF_POS = np.array([
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # Left leg (6)
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # Right leg (6)
|
||||
0.0, 0.0, 0.0, # Waist (3)
|
||||
0.2, 0.2, 0.0, 0.6, 0.0, 0.0, 0.0, # Left arm (7)
|
||||
0.2, -0.2, 0.0, 0.6, 0.0, 0.0, 0.0, # Right arm (7)
|
||||
], dtype=np.float32)
|
||||
|
||||
# Stiff hold KP/KD (for initialization)
|
||||
STIFF_KP = np.array([
|
||||
150, 150, 200, 200, 40, 40,
|
||||
150, 150, 200, 200, 40, 40,
|
||||
200, 200, 100,
|
||||
100, 100, 100, 100, 50, 50, 50,
|
||||
100, 100, 100, 100, 50, 50, 50,
|
||||
], dtype=np.float32)
|
||||
|
||||
STIFF_KD = np.array([
|
||||
2.5, 2.5, 2.5, 2.5, 2.5, 2.5,
|
||||
2.5, 2.5, 2.5, 2.5, 2.5, 2.5,
|
||||
5.0, 5.0, 5.0,
|
||||
2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5,
|
||||
2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5,
|
||||
], dtype=np.float32)
|
||||
|
||||
# Joints to freeze at 0 with high KP
|
||||
FROZEN_JOINTS = [13, 14, 20, 21, 27, 28]
|
||||
FROZEN_KP = 500.0
|
||||
FROZEN_KD = 5.0
|
||||
|
||||
# =============================================================================
|
||||
# QUATERNION UTILITIES
|
||||
# =============================================================================
|
||||
|
||||
def quat_inverse(q):
|
||||
return np.concatenate((q[:, 0:1], -q[:, 1:]), axis=1)
|
||||
|
||||
def quat_mul(a, b):
|
||||
a, b = a.reshape(-1, 4), b.reshape(-1, 4)
|
||||
w1, x1, y1, z1 = a[..., 0], a[..., 1], a[..., 2], a[..., 3]
|
||||
w2, x2, y2, z2 = b[..., 0], b[..., 1], b[..., 2], b[..., 3]
|
||||
ww = (z1 + x1) * (x2 + y2)
|
||||
yy = (w1 - y1) * (w2 + z2)
|
||||
zz = (w1 + y1) * (w2 - z2)
|
||||
xx = ww + yy + zz
|
||||
qq = 0.5 * (xx + (z1 - x1) * (x2 - y2))
|
||||
w = qq - ww + (z1 - y1) * (y2 - z2)
|
||||
x = qq - xx + (x1 + w1) * (x2 + w2)
|
||||
y = qq - yy + (w1 - x1) * (y2 + z2)
|
||||
z = qq - zz + (z1 + y1) * (w2 - x2)
|
||||
return np.stack([w, x, y, z]).T.reshape(a.shape)
|
||||
|
||||
def subtract_frame_transforms(q01, q02):
|
||||
return quat_mul(quat_inverse(q01), q02)
|
||||
|
||||
def matrix_from_quat(q):
|
||||
r, i, j, k = q[..., 0], q[..., 1], q[..., 2], q[..., 3]
|
||||
two_s = 2.0 / (q * q).sum(-1)
|
||||
o = np.stack((
|
||||
1 - two_s * (j*j + k*k), two_s * (i*j - k*r), two_s * (i*k + j*r),
|
||||
two_s * (i*j + k*r), 1 - two_s * (i*i + k*k), two_s * (j*k - i*r),
|
||||
two_s * (i*k - j*r), two_s * (j*k + i*r), 1 - two_s * (i*i + j*j),
|
||||
), -1)
|
||||
return o.reshape(q.shape[:-1] + (3, 3))
|
||||
|
||||
def xyzw_to_wxyz(xyzw):
|
||||
return np.concatenate([xyzw[:, -1:], xyzw[:, :3]], axis=1)
|
||||
|
||||
def quat_to_rpy(q):
|
||||
w, x, y, z = q
|
||||
roll = np.arctan2(2*(w*x + y*z), 1 - 2*(x**2 + y**2))
|
||||
pitch = np.arcsin(np.clip(2*(w*y - z*x), -1, 1))
|
||||
yaw = np.arctan2(2*(w*z + x*y), 1 - 2*(y**2 + z**2))
|
||||
return roll, pitch, yaw
|
||||
|
||||
def rpy_to_quat(rpy):
|
||||
roll, pitch, yaw = rpy
|
||||
cy, sy = np.cos(yaw*0.5), np.sin(yaw*0.5)
|
||||
cp, sp = np.cos(pitch*0.5), np.sin(pitch*0.5)
|
||||
cr, sr = np.cos(roll*0.5), np.sin(roll*0.5)
|
||||
return np.array([cr*cp*cy + sr*sp*sy, sr*cp*cy - cr*sp*sy,
|
||||
cr*sp*cy + sr*cp*sy, cr*cp*sy - sr*sp*cy])
|
||||
|
||||
# =============================================================================
|
||||
# PINOCCHIO FK
|
||||
# =============================================================================
|
||||
|
||||
DOF_NAMES = (
|
||||
"left_hip_pitch_joint", "left_hip_roll_joint", "left_hip_yaw_joint",
|
||||
"left_knee_joint", "left_ankle_pitch_joint", "left_ankle_roll_joint",
|
||||
"right_hip_pitch_joint", "right_hip_roll_joint", "right_hip_yaw_joint",
|
||||
"right_knee_joint", "right_ankle_pitch_joint", "right_ankle_roll_joint",
|
||||
"waist_yaw_joint", "waist_roll_joint", "waist_pitch_joint",
|
||||
"left_shoulder_pitch_joint", "left_shoulder_roll_joint", "left_shoulder_yaw_joint", "left_elbow_joint",
|
||||
"left_wrist_roll_joint", "left_wrist_pitch_joint", "left_wrist_yaw_joint",
|
||||
"right_shoulder_pitch_joint", "right_shoulder_roll_joint", "right_shoulder_yaw_joint", "right_elbow_joint",
|
||||
"right_wrist_roll_joint", "right_wrist_pitch_joint", "right_wrist_yaw_joint",
|
||||
)
|
||||
|
||||
|
||||
class PinocchioFK:
|
||||
"""Pinocchio forward kinematics for torso_link orientation."""
|
||||
|
||||
def __init__(self, urdf_text: str):
|
||||
root = ElementTree.fromstring(urdf_text)
|
||||
for parent in root.iter():
|
||||
for child in list(parent):
|
||||
if child.tag.split("}")[-1] in {"visual", "collision"}:
|
||||
parent.remove(child)
|
||||
xml_text = '<?xml version="1.0"?>\n' + ElementTree.tostring(root, encoding="unicode")
|
||||
|
||||
self.model = pin.buildModelFromXML(xml_text, pin.JointModelFreeFlyer())
|
||||
self.data = self.model.createData()
|
||||
|
||||
pin_names = [n for n in self.model.names if n not in ["universe", "root_joint"]]
|
||||
self.idx_map = np.array([DOF_NAMES.index(n) for n in pin_names])
|
||||
self.ref_frame_id = self.model.getFrameId("torso_link")
|
||||
logger.info(f"Pinocchio FK: {len(pin_names)} joints, torso_link frame={self.ref_frame_id}")
|
||||
|
||||
def get_torso_quat(self, pos, quat_wxyz, dof_pos):
|
||||
"""Get torso_link orientation in world frame."""
|
||||
quat_xyzw = np.array([quat_wxyz[1], quat_wxyz[2], quat_wxyz[3], quat_wxyz[0]])
|
||||
config = np.concatenate([pos, quat_xyzw, dof_pos[self.idx_map]])
|
||||
pin.framesForwardKinematics(self.model, self.data, config)
|
||||
coeffs = pin.Quaternion(self.data.oMf[self.ref_frame_id].rotation).coeffs()
|
||||
return np.array([coeffs[3], coeffs[0], coeffs[1], coeffs[2]]).reshape(1, 4)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DANCE CONTROLLER
|
||||
# =============================================================================
|
||||
|
||||
class DanceController:
|
||||
"""
|
||||
Handles WBT dance policy for the Unitree G1 robot.
|
||||
|
||||
This controller manages:
|
||||
- 29-joint observation processing
|
||||
- Pinocchio FK for torso orientation
|
||||
- Policy inference with motion data from ONNX
|
||||
"""
|
||||
|
||||
def __init__(self, policy, robot, pinocchio_fk, motor_kp, motor_kd, action_scale):
|
||||
self.policy = policy
|
||||
self.robot = robot
|
||||
self.pinocchio_fk = pinocchio_fk
|
||||
self.motor_kp = motor_kp
|
||||
self.motor_kd = motor_kd
|
||||
self.action_scale = action_scale
|
||||
|
||||
self.obs_dim = policy.get_inputs()[0].shape[1]
|
||||
self.last_action = np.zeros((1, NUM_DOFS), dtype=np.float32)
|
||||
self.motion_command = None
|
||||
self.ref_quat_xyzw = None
|
||||
self.timestep = 0
|
||||
self.yaw_offset = 0.0
|
||||
|
||||
# Get initial motion data from ONNX
|
||||
dummy = np.zeros((1, self.obs_dim), dtype=np.float32)
|
||||
outs = self.policy.run(["joint_pos", "joint_vel", "ref_quat_xyzw"],
|
||||
{"obs": dummy, "time_step": np.array([[0]], dtype=np.float32)})
|
||||
self.motion_command = np.concatenate(outs[0:2], axis=1)
|
||||
self.ref_quat_xyzw = outs[2]
|
||||
self.motion_start_pose = outs[0].flatten()
|
||||
|
||||
# Thread management
|
||||
self.dance_running = False
|
||||
self.dance_thread = None
|
||||
|
||||
logger.info(f"DanceController: obs_dim={self.obs_dim}, action_scale={action_scale}")
|
||||
|
||||
def capture_yaw_offset(self):
|
||||
"""Capture robot's current yaw for relative tracking."""
|
||||
robot_state = self.robot.lowstate_buffer.get_data()
|
||||
if robot_state and self.pinocchio_fk:
|
||||
quat = np.array(robot_state.imu_state.quaternion, dtype=np.float32)
|
||||
dof = np.array([robot_state.motor_state[i].q for i in range(NUM_DOFS)], dtype=np.float32)
|
||||
torso_q = self.pinocchio_fk.get_torso_quat(np.zeros(3), quat, dof)
|
||||
_, _, self.yaw_offset = quat_to_rpy(torso_q.flatten())
|
||||
logger.info(f"Captured yaw offset: {np.degrees(self.yaw_offset):.1f}°")
|
||||
|
||||
def _remove_yaw_offset(self, quat_wxyz):
|
||||
"""Remove stored yaw offset from orientation."""
|
||||
if abs(self.yaw_offset) < 1e-6:
|
||||
return quat_wxyz
|
||||
yaw_q = rpy_to_quat((0, 0, -self.yaw_offset)).reshape(1, 4)
|
||||
return quat_mul(yaw_q, quat_wxyz)
|
||||
|
||||
def run_step(self):
|
||||
"""Single dance step - reads state, runs policy, sends commands."""
|
||||
robot_state = self.robot.lowstate_buffer.get_data()
|
||||
if robot_state is None:
|
||||
return
|
||||
|
||||
# Read robot state
|
||||
quat = np.array(robot_state.imu_state.quaternion, dtype=np.float32)
|
||||
ang_vel = np.array(robot_state.imu_state.gyroscope, dtype=np.float32)
|
||||
dof_pos = np.array([robot_state.motor_state[i].q for i in range(NUM_DOFS)], dtype=np.float32)
|
||||
dof_vel = np.array([robot_state.motor_state[i].dq for i in range(NUM_DOFS)], dtype=np.float32)
|
||||
|
||||
# Compute motion_ref_ori_b using FK
|
||||
if self.pinocchio_fk:
|
||||
torso_q = self.pinocchio_fk.get_torso_quat(np.zeros(3), quat, dof_pos)
|
||||
torso_q = self._remove_yaw_offset(torso_q)
|
||||
motion_ori = xyzw_to_wxyz(self.ref_quat_xyzw)
|
||||
rel_quat = subtract_frame_transforms(torso_q, motion_ori)
|
||||
ori_b = matrix_from_quat(rel_quat)[..., :2].reshape(1, -1)
|
||||
else:
|
||||
ori_b = np.zeros((1, 6), dtype=np.float32)
|
||||
|
||||
dof_rel = (dof_pos - DEFAULT_DOF_POS).reshape(1, -1)
|
||||
|
||||
# Build observation (alphabetical order)
|
||||
obs_dict = {
|
||||
"actions": self.last_action,
|
||||
"base_ang_vel": ang_vel.reshape(1, 3),
|
||||
"dof_pos": dof_rel,
|
||||
"dof_vel": dof_vel.reshape(1, -1),
|
||||
"motion_command": self.motion_command,
|
||||
"motion_ref_ori_b": ori_b,
|
||||
}
|
||||
obs = np.concatenate([obs_dict[k].astype(np.float32) for k in sorted(obs_dict.keys())], axis=1)
|
||||
obs = np.clip(obs, -100, 100)
|
||||
|
||||
# Run policy
|
||||
outs = self.policy.run(["actions", "joint_pos", "joint_vel", "ref_quat_xyzw"],
|
||||
{"obs": obs, "time_step": np.array([[self.timestep]], dtype=np.float32)})
|
||||
|
||||
action = np.clip(outs[0], -100, 100)
|
||||
self.motion_command = np.concatenate(outs[1:3], axis=1)
|
||||
self.ref_quat_xyzw = outs[3]
|
||||
self.last_action = action.copy()
|
||||
|
||||
# Compute target positions
|
||||
target_pos = DEFAULT_DOF_POS + action.flatten() * self.action_scale
|
||||
|
||||
# Send commands
|
||||
for i in range(NUM_DOFS):
|
||||
if i in FROZEN_JOINTS:
|
||||
self.robot.msg.motor_cmd[i].q = 0.0
|
||||
self.robot.msg.motor_cmd[i].kp = FROZEN_KP
|
||||
self.robot.msg.motor_cmd[i].kd = FROZEN_KD
|
||||
else:
|
||||
self.robot.msg.motor_cmd[i].q = float(target_pos[i])
|
||||
self.robot.msg.motor_cmd[i].kp = self.motor_kp[i]
|
||||
self.robot.msg.motor_cmd[i].kd = self.motor_kd[i]
|
||||
self.robot.msg.motor_cmd[i].qd = 0
|
||||
self.robot.msg.motor_cmd[i].tau = 0
|
||||
|
||||
self.robot.send_action(self.robot.msg)
|
||||
self.timestep += 1
|
||||
|
||||
def _dance_thread_loop(self):
|
||||
"""Background thread that runs the dance policy."""
|
||||
logger.info("Dance thread started")
|
||||
while self.dance_running:
|
||||
start_time = time.time()
|
||||
try:
|
||||
self.run_step()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in dance loop: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
sleep_time = max(0, CONTROL_DT - elapsed)
|
||||
time.sleep(sleep_time)
|
||||
logger.info("Dance thread stopped")
|
||||
|
||||
def start_dance_thread(self):
|
||||
"""Start the dance control thread."""
|
||||
if self.dance_running:
|
||||
logger.warning("Dance thread already running")
|
||||
return
|
||||
|
||||
# Reset state for fresh start
|
||||
self.timestep = 0
|
||||
self.last_action.fill(0)
|
||||
|
||||
# Re-get initial motion data
|
||||
dummy = np.zeros((1, self.obs_dim), dtype=np.float32)
|
||||
outs = self.policy.run(["joint_pos", "joint_vel", "ref_quat_xyzw"],
|
||||
{"obs": dummy, "time_step": np.array([[0]], dtype=np.float32)})
|
||||
self.motion_command = np.concatenate(outs[0:2], axis=1)
|
||||
self.ref_quat_xyzw = outs[2]
|
||||
|
||||
self.capture_yaw_offset()
|
||||
|
||||
logger.info("Starting dance control thread...")
|
||||
self.dance_running = True
|
||||
self.dance_thread = threading.Thread(target=self._dance_thread_loop, daemon=True)
|
||||
self.dance_thread.start()
|
||||
|
||||
def stop_dance_thread(self):
|
||||
"""Stop the dance control thread."""
|
||||
if not self.dance_running:
|
||||
return
|
||||
|
||||
logger.info("Stopping dance control thread...")
|
||||
self.dance_running = False
|
||||
if self.dance_thread:
|
||||
self.dance_thread.join(timeout=2.0)
|
||||
logger.info("Dance control thread stopped")
|
||||
|
||||
def reset_to_motion_pose(self, duration: float = 3.0):
|
||||
"""Move robot to initial motion pose over given duration."""
|
||||
logger.info(f"Moving to dance start pose ({duration}s)...")
|
||||
|
||||
robot_state = self.robot.lowstate_buffer.get_data()
|
||||
init_pos = np.array([robot_state.motor_state[i].q for i in range(NUM_DOFS)], dtype=np.float32)
|
||||
target_pos = self.motion_start_pose
|
||||
|
||||
num_steps = int(duration / CONTROL_DT)
|
||||
for step in range(num_steps):
|
||||
alpha = step / num_steps
|
||||
interp = init_pos * (1 - alpha) + target_pos * alpha
|
||||
|
||||
for i in range(NUM_DOFS):
|
||||
if i in FROZEN_JOINTS:
|
||||
self.robot.msg.motor_cmd[i].q = 0.0
|
||||
self.robot.msg.motor_cmd[i].kp = FROZEN_KP
|
||||
self.robot.msg.motor_cmd[i].kd = FROZEN_KD
|
||||
else:
|
||||
self.robot.msg.motor_cmd[i].q = float(interp[i])
|
||||
self.robot.msg.motor_cmd[i].kp = STIFF_KP[i]
|
||||
self.robot.msg.motor_cmd[i].kd = STIFF_KD[i]
|
||||
self.robot.msg.motor_cmd[i].qd = 0
|
||||
self.robot.msg.motor_cmd[i].tau = 0
|
||||
|
||||
self.robot.msg.crc = self.robot.crc.Crc(self.robot.msg)
|
||||
self.robot.lowcmd_publisher.Write(self.robot.msg)
|
||||
time.sleep(CONTROL_DT)
|
||||
|
||||
logger.info("At dance start pose!")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# MAIN
|
||||
# =============================================================================
|
||||
|
||||
def load_dance_policy(onnx_path: str):
|
||||
"""Load dance policy and extract metadata."""
|
||||
logger.info(f"Loading dance policy: {onnx_path}")
|
||||
|
||||
policy = ort.InferenceSession(onnx_path)
|
||||
model = onnx.load(onnx_path)
|
||||
metadata = {p.key: json.loads(p.value) for p in model.metadata_props}
|
||||
|
||||
motor_kp = np.array(metadata.get("kp", STIFF_KP), dtype=np.float32)
|
||||
motor_kd = np.array(metadata.get("kd", STIFF_KD), dtype=np.float32)
|
||||
action_scale = float(metadata.get("action_scale", 1.0))
|
||||
urdf_text = metadata.get("robot_urdf", None)
|
||||
|
||||
logger.info(f" Obs dim: {policy.get_inputs()[0].shape[1]}")
|
||||
logger.info(f" Action scale: {action_scale}")
|
||||
logger.info(f" KP range: [{motor_kp.min():.1f}, {motor_kp.max():.1f}]")
|
||||
|
||||
# Build Pinocchio FK if URDF available
|
||||
pinocchio_fk = None
|
||||
if urdf_text:
|
||||
logger.info(" Building Pinocchio FK from URDF...")
|
||||
pinocchio_fk = PinocchioFK(urdf_text)
|
||||
else:
|
||||
logger.warning(" No URDF in metadata - FK will not work!")
|
||||
|
||||
return policy, pinocchio_fk, motor_kp, motor_kd, action_scale
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="WBT Dance Policy for Unitree G1")
|
||||
parser.add_argument("--onnx", type=str, default=DANCE_ONNX_PATH, help="Path to dance ONNX model")
|
||||
parser.add_argument("--sim", action="store_true", help="Run in simulation mode")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("=" * 70)
|
||||
print("💃 WBT DANCE POLICY")
|
||||
print("=" * 70)
|
||||
|
||||
# Load policy
|
||||
policy, pinocchio_fk, motor_kp, motor_kd, action_scale = load_dance_policy(args.onnx)
|
||||
|
||||
# Initialize robot
|
||||
logger.info("Initializing robot...")
|
||||
config = UnitreeG1Config()
|
||||
robot = UnitreeG1(config)
|
||||
logger.info("Robot connected!")
|
||||
|
||||
# Create controller
|
||||
controller = DanceController(policy, robot, pinocchio_fk, motor_kp, motor_kd, action_scale)
|
||||
|
||||
try:
|
||||
# Move to start pose
|
||||
controller.reset_to_motion_pose(duration=3.0)
|
||||
|
||||
# Start dancing
|
||||
controller.start_dance_thread()
|
||||
|
||||
logger.info("Dancing! Press Ctrl+C to stop.")
|
||||
print("-" * 70)
|
||||
|
||||
# Log status periodically
|
||||
while True:
|
||||
time.sleep(2.0)
|
||||
logger.info(f"timestep={controller.timestep}")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\nStopping...")
|
||||
finally:
|
||||
controller.stop_dance_thread()
|
||||
robot.disconnect()
|
||||
|
||||
print("\nDone!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Binary file not shown.
@@ -0,0 +1,479 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Example: Holosoma Whole-Body Locomotion (23-DOF and 29-DOF)
|
||||
|
||||
This example demonstrates loading Holosoma whole-body locomotion policies
|
||||
and running them on the Unitree G1 robot.
|
||||
|
||||
Supports both:
|
||||
- 23-DOF native policies (82D observations, 23D actions)
|
||||
- 29-DOF policies (100D observations, 29D actions)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import onnxruntime as ort
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
|
||||
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# =============================================================================
|
||||
# 29-DOF Configuration
|
||||
# =============================================================================
|
||||
# fmt: off
|
||||
HOLOSOMA_29DOF_DEFAULT_ANGLES = np.array([
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # left leg
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # right leg
|
||||
0.0, 0.0, 0.0, # waist (yaw, roll, pitch)
|
||||
0.2, 0.2, 0.0, 0.6, 0.0, 0.0, 0.0, # left arm
|
||||
0.2, -0.2, 0.0, 0.6, 0.0, 0.0, 0.0, # right arm
|
||||
], dtype=np.float32)
|
||||
|
||||
HOLOSOMA_29DOF_KP = np.array([
|
||||
40.179238471, 99.098427777, 40.179238471, 99.098427777, 28.501246196, 28.501246196, # left leg
|
||||
40.179238471, 99.098427777, 40.179238471, 99.098427777, 28.501246196, 28.501246196, # right leg
|
||||
40.179238471, 28.501246196, 28.501246196, # waist
|
||||
14.250623098, 14.250623098, 14.250623098, 14.250623098, 14.250623098, 16.778327481, 16.778327481, # left arm
|
||||
14.250623098, 14.250623098, 14.250623098, 14.250623098, 14.250623098, 16.778327481, 16.778327481, # right arm
|
||||
], dtype=np.float32)
|
||||
|
||||
HOLOSOMA_29DOF_KD = np.array([
|
||||
2.557889765, 6.308801854, 2.557889765, 6.308801854, 1.814445687, 1.814445687, # left leg
|
||||
2.557889765, 6.308801854, 2.557889765, 6.308801854, 1.814445687, 1.814445687, # right leg
|
||||
2.557889765, 1.814445687, 1.814445687, # waist
|
||||
0.907222843, 0.907222843, 0.907222843, 0.907222843, 0.907222843, 1.068141502, 1.068141502, # left arm
|
||||
0.907222843, 0.907222843, 0.907222843, 0.907222843, 0.907222843, 1.068141502, 1.068141502, # right arm
|
||||
], dtype=np.float32)
|
||||
|
||||
# =============================================================================
|
||||
# 23-DOF Configuration (native G1-23: no waist_roll/pitch, no wrist_pitch/yaw)
|
||||
# Derived from 29-DOF Holosoma values
|
||||
# =============================================================================
|
||||
# Joint order: 6 left leg, 6 right leg, 1 waist_yaw, 5 left arm, 5 right arm
|
||||
HOLOSOMA_23DOF_DEFAULT_ANGLES = np.array([
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # left leg (from 29-DOF)
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # right leg (from 29-DOF)
|
||||
0.0, # waist_yaw only (from 29-DOF)
|
||||
0.2, 0.2, 0.0, 0.6, 0.0, # left arm first 5 joints (from 29-DOF)
|
||||
0.2, -0.2, 0.0, 0.6, 0.0, # right arm first 5 joints (from 29-DOF)
|
||||
], dtype=np.float32)
|
||||
|
||||
HOLOSOMA_23DOF_KP = np.array([
|
||||
40.179238471, 99.098427777, 40.179238471, 99.098427777, 28.501246196, 28.501246196, # left leg
|
||||
40.179238471, 99.098427777, 40.179238471, 99.098427777, 28.501246196, 28.501246196, # right leg
|
||||
40.179238471, # waist_yaw
|
||||
14.250623098, 14.250623098, 14.250623098, 14.250623098, 14.250623098, # left arm
|
||||
14.250623098, 14.250623098, 14.250623098, 14.250623098, 14.250623098, # right arm
|
||||
], dtype=np.float32)
|
||||
|
||||
HOLOSOMA_23DOF_KD = np.array([
|
||||
2.557889765, 6.308801854, 2.557889765, 6.308801854, 1.814445687, 1.814445687, # left leg
|
||||
2.557889765, 6.308801854, 2.557889765, 6.308801854, 1.814445687, 1.814445687, # right leg
|
||||
2.557889765, # waist_yaw
|
||||
0.907222843, 0.907222843, 0.907222843, 0.907222843, 0.907222843, # left arm
|
||||
0.907222843, 0.907222843, 0.907222843, 0.907222843, 0.907222843, # right arm
|
||||
], dtype=np.float32)
|
||||
|
||||
# Maps 23-DOF policy index → 29-DOF motor index
|
||||
# 23-DOF: legs(0-11), waist_yaw(12), L_arm(13-17), R_arm(18-22)
|
||||
# 29-DOF: legs(0-11), waist(12-14), L_arm(15-21), R_arm(22-28)
|
||||
DOF_23_TO_MOTOR_MAP = [
|
||||
0, 1, 2, 3, 4, 5, # left leg → motor 0-5
|
||||
6, 7, 8, 9, 10, 11, # right leg → motor 6-11
|
||||
12, # waist_yaw → motor 12
|
||||
15, 16, 17, 18, 19, # left arm (skip wrist_pitch/yaw) → motor 15-19
|
||||
22, 23, 24, 25, 26, # right arm (skip wrist_pitch/yaw) → motor 22-26
|
||||
]
|
||||
# fmt: on
|
||||
|
||||
# Control parameters
|
||||
LOCOMOTION_CONTROL_DT = 0.02 # 50Hz
|
||||
LOCOMOTION_ACTION_SCALE = 0.25
|
||||
ANG_VEL_SCALE = 0.25
|
||||
DOF_POS_SCALE = 1.0
|
||||
DOF_VEL_SCALE = 0.05
|
||||
GAIT_PERIOD = 1.0
|
||||
|
||||
DEFAULT_HOLOSOMA_REPO_ID = "nepyope/holosoma_locomotion"
|
||||
|
||||
|
||||
def load_holosoma_policy(
|
||||
repo_id: str = DEFAULT_HOLOSOMA_REPO_ID,
|
||||
policy_name: str = "fastsac",
|
||||
local_path: str | None = None,
|
||||
) -> tuple[ort.InferenceSession, int]:
|
||||
"""Load Holosoma policy and detect observation dimension.
|
||||
|
||||
Returns:
|
||||
(policy, obs_dim) tuple where obs_dim is 82 (23-DOF) or 100 (29-DOF)
|
||||
"""
|
||||
if local_path is not None:
|
||||
logger.info(f"Loading policy from local path: {local_path}")
|
||||
policy_path = local_path
|
||||
else:
|
||||
logger.info(f"Loading policy from Hugging Face Hub: {repo_id}")
|
||||
policy_path = hf_hub_download(repo_id=repo_id, filename=f"{policy_name}_g1_29dof.onnx")
|
||||
|
||||
policy = ort.InferenceSession(policy_path)
|
||||
|
||||
# Detect observation dimension from model input shape
|
||||
input_shape = policy.get_inputs()[0].shape
|
||||
obs_dim = input_shape[1] if len(input_shape) > 1 else input_shape[0]
|
||||
|
||||
logger.info(f"Policy loaded successfully")
|
||||
logger.info(f" Input: {policy.get_inputs()[0].name}, shape: {input_shape} → obs_dim={obs_dim}")
|
||||
logger.info(f" Output: {policy.get_outputs()[0].name}, shape: {policy.get_outputs()[0].shape}")
|
||||
|
||||
return policy, obs_dim
|
||||
|
||||
|
||||
class HolosomaLocomotionController:
|
||||
"""
|
||||
Handles Holosoma whole-body locomotion for Unitree G1.
|
||||
Supports both 23-DOF (82D obs) and 29-DOF (100D obs) policies.
|
||||
"""
|
||||
|
||||
def __init__(self, policy, robot, config, obs_dim: int = 100):
|
||||
self.policy = policy
|
||||
self.robot = robot
|
||||
self.config = config
|
||||
self.obs_dim = obs_dim
|
||||
|
||||
# Detect policy type from observation dimension
|
||||
self.is_23dof = (obs_dim == 82)
|
||||
self.num_dof = 23 if self.is_23dof else 29
|
||||
|
||||
# Velocity commands
|
||||
self.locomotion_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32)
|
||||
|
||||
# State variables sized for policy type
|
||||
self.qj = np.zeros(self.num_dof, dtype=np.float32)
|
||||
self.dqj = np.zeros(self.num_dof, dtype=np.float32)
|
||||
self.locomotion_action = np.zeros(self.num_dof, dtype=np.float32)
|
||||
self.locomotion_obs = np.zeros(obs_dim, dtype=np.float32)
|
||||
self.last_unscaled_action = np.zeros(self.num_dof, dtype=np.float32)
|
||||
|
||||
# Select config based on DOF
|
||||
if self.is_23dof:
|
||||
self.default_angles = HOLOSOMA_23DOF_DEFAULT_ANGLES
|
||||
self.kp = HOLOSOMA_23DOF_KP
|
||||
self.kd = HOLOSOMA_23DOF_KD
|
||||
self.motor_map = DOF_23_TO_MOTOR_MAP
|
||||
else:
|
||||
self.default_angles = HOLOSOMA_29DOF_DEFAULT_ANGLES
|
||||
self.kp = HOLOSOMA_29DOF_KP
|
||||
self.kd = HOLOSOMA_29DOF_KD
|
||||
self.motor_map = list(range(29)) # Identity map for 29-DOF
|
||||
|
||||
# Phase state for gait
|
||||
self.phase = np.zeros((1, 2), dtype=np.float32)
|
||||
self.phase[0, 0] = 0.0
|
||||
self.phase[0, 1] = np.pi
|
||||
self.phase_dt = 2 * np.pi / (50.0 * GAIT_PERIOD)
|
||||
self.is_standing = False
|
||||
|
||||
self.counter = 0
|
||||
self.locomotion_running = False
|
||||
self.locomotion_thread = None
|
||||
|
||||
logger.info(f"HolosomaLocomotionController initialized")
|
||||
logger.info(f" Mode: {'23-DOF (82D obs)' if self.is_23dof else '29-DOF (100D obs)'}")
|
||||
logger.info(f" Action dim: {self.num_dof}")
|
||||
|
||||
def holosoma_locomotion_run(self):
|
||||
"""Main locomotion loop - handles both 23-DOF and 29-DOF."""
|
||||
self.counter += 1
|
||||
|
||||
if self.counter == 1:
|
||||
print("\n" + "=" * 60)
|
||||
print(f"🚀 RUNNING HOLOSOMA {self.num_dof}-DOF LOCOMOTION POLICY")
|
||||
print(f" {self.obs_dim}D observations → {self.num_dof}D actions")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
robot_state = self.robot.get_observation()
|
||||
if robot_state is None:
|
||||
return
|
||||
|
||||
# Remote controller
|
||||
if robot_state.wireless_remote is not None:
|
||||
self.robot.remote_controller.set(robot_state.wireless_remote)
|
||||
else:
|
||||
self.robot.remote_controller.lx = 0.0
|
||||
self.robot.remote_controller.ly = 0.0
|
||||
self.robot.remote_controller.rx = 0.0
|
||||
self.robot.remote_controller.ry = 0.0
|
||||
|
||||
# Deadzone
|
||||
ly = self.robot.remote_controller.ly if abs(self.robot.remote_controller.ly) > 0.1 else 0.0
|
||||
lx = self.robot.remote_controller.lx if abs(self.robot.remote_controller.lx) > 0.1 else 0.0
|
||||
rx = self.robot.remote_controller.rx if abs(self.robot.remote_controller.rx) > 0.1 else 0.0
|
||||
|
||||
self.locomotion_cmd[0] = ly
|
||||
self.locomotion_cmd[1] = -lx
|
||||
self.locomotion_cmd[2] = -rx
|
||||
|
||||
# Read joint states using motor map
|
||||
for i in range(self.num_dof):
|
||||
motor_idx = self.motor_map[i]
|
||||
self.qj[i] = robot_state.motor_state[motor_idx].q
|
||||
self.dqj[i] = robot_state.motor_state[motor_idx].dq
|
||||
|
||||
# IMU
|
||||
quat = robot_state.imu_state.quaternion
|
||||
ang_vel = np.array(robot_state.imu_state.gyroscope, dtype=np.float32)
|
||||
gravity_orientation = self.robot.get_gravity_orientation(quat)
|
||||
|
||||
# Scale observations
|
||||
qj_obs = (self.qj - self.default_angles) * DOF_POS_SCALE
|
||||
dqj_obs = self.dqj * DOF_VEL_SCALE
|
||||
ang_vel_scaled = ang_vel * ANG_VEL_SCALE
|
||||
|
||||
# Phase update
|
||||
cmd_norm = np.linalg.norm(self.locomotion_cmd[:2])
|
||||
ang_cmd_norm = np.abs(self.locomotion_cmd[2])
|
||||
|
||||
if cmd_norm < 0.01 and ang_cmd_norm < 0.01:
|
||||
self.phase[0, :] = np.pi * np.ones(2)
|
||||
self.is_standing = True
|
||||
elif self.is_standing:
|
||||
self.phase = np.array([[0.0, np.pi]], dtype=np.float32)
|
||||
self.is_standing = False
|
||||
else:
|
||||
phase_tp1 = self.phase + self.phase_dt
|
||||
self.phase = np.fmod(phase_tp1 + np.pi, 2 * np.pi) - np.pi
|
||||
|
||||
sin_phase = np.sin(self.phase[0, :])
|
||||
cos_phase = np.cos(self.phase[0, :])
|
||||
|
||||
# Build observation (format depends on DOF)
|
||||
if self.is_23dof:
|
||||
# 82D: [23 actions, 3 ang_vel, 1 cmd_yaw, 2 cmd_lin, 2 cos, 23 pos, 23 vel, 3 grav, 2 sin]
|
||||
self.locomotion_obs[0:23] = self.last_unscaled_action
|
||||
self.locomotion_obs[23:26] = ang_vel_scaled
|
||||
self.locomotion_obs[26] = self.locomotion_cmd[2]
|
||||
self.locomotion_obs[27:29] = self.locomotion_cmd[:2]
|
||||
self.locomotion_obs[29:31] = cos_phase
|
||||
self.locomotion_obs[31:54] = qj_obs
|
||||
self.locomotion_obs[54:77] = dqj_obs
|
||||
self.locomotion_obs[77:80] = gravity_orientation
|
||||
self.locomotion_obs[80:82] = sin_phase
|
||||
else:
|
||||
# 100D: [29 actions, 3 ang_vel, 1 cmd_yaw, 2 cmd_lin, 2 cos, 29 pos, 29 vel, 3 grav, 2 sin]
|
||||
self.locomotion_obs[0:29] = self.last_unscaled_action
|
||||
self.locomotion_obs[29:32] = ang_vel_scaled
|
||||
self.locomotion_obs[32] = self.locomotion_cmd[2]
|
||||
self.locomotion_obs[33:35] = self.locomotion_cmd[:2]
|
||||
self.locomotion_obs[35:37] = cos_phase
|
||||
self.locomotion_obs[37:66] = qj_obs
|
||||
self.locomotion_obs[66:95] = dqj_obs
|
||||
self.locomotion_obs[95:98] = gravity_orientation
|
||||
self.locomotion_obs[98:100] = sin_phase
|
||||
|
||||
# Policy inference
|
||||
obs_input = self.locomotion_obs.reshape(1, -1).astype(np.float32)
|
||||
ort_inputs = {self.policy.get_inputs()[0].name: obs_input}
|
||||
ort_outs = self.policy.run(None, ort_inputs)
|
||||
|
||||
raw_action = ort_outs[0].squeeze()
|
||||
clipped_action = np.clip(raw_action, -100.0, 100.0)
|
||||
|
||||
self.last_unscaled_action = clipped_action.copy()
|
||||
self.locomotion_action = clipped_action * LOCOMOTION_ACTION_SCALE
|
||||
|
||||
# Debug
|
||||
if self.counter <= 3:
|
||||
print(f"\n[Holosoma Debug #{self.counter}]")
|
||||
print(f" Phase: ({self.phase[0, 0]:.3f}, {self.phase[0, 1]:.3f})")
|
||||
print(f" Cmd: ({self.locomotion_cmd[0]:.2f}, {self.locomotion_cmd[1]:.2f}, {self.locomotion_cmd[2]:.2f})")
|
||||
print(f" Action range: [{raw_action.min():.3f}, {raw_action.max():.3f}]")
|
||||
|
||||
# Compute target positions
|
||||
target_dof_pos = self.default_angles + self.locomotion_action
|
||||
|
||||
# Send commands to motors via motor map
|
||||
for i in range(self.num_dof):
|
||||
motor_idx = self.motor_map[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].q = target_dof_pos[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = self.kp[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = self.kd[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# For 23-DOF: zero out missing joints (waist_roll/pitch, wrist_pitch/yaw)
|
||||
if self.is_23dof:
|
||||
missing_motors = [13, 14, 20, 21, 27, 28] # waist_roll, waist_pitch, wrist_pitch/yaw
|
||||
for motor_idx in missing_motors:
|
||||
self.robot.msg.motor_cmd[motor_idx].q = 0.0
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = 40.0
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = 2.0
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
self.robot.send_action(self.robot.msg)
|
||||
|
||||
def _locomotion_thread_loop(self):
|
||||
logger.info("Locomotion thread started")
|
||||
while self.locomotion_running:
|
||||
start_time = time.time()
|
||||
try:
|
||||
self.holosoma_locomotion_run()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in locomotion loop: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
sleep_time = max(0, LOCOMOTION_CONTROL_DT - elapsed)
|
||||
time.sleep(sleep_time)
|
||||
logger.info("Locomotion thread stopped")
|
||||
|
||||
def start_locomotion_thread(self):
|
||||
if self.locomotion_running:
|
||||
logger.warning("Locomotion thread already running")
|
||||
return
|
||||
logger.info("Starting locomotion control thread...")
|
||||
self.locomotion_running = True
|
||||
self.locomotion_thread = threading.Thread(target=self._locomotion_thread_loop, daemon=True)
|
||||
self.locomotion_thread.start()
|
||||
logger.info("Locomotion control thread started!")
|
||||
|
||||
def stop_locomotion_thread(self):
|
||||
if not self.locomotion_running:
|
||||
return
|
||||
logger.info("Stopping locomotion control thread...")
|
||||
self.locomotion_running = False
|
||||
if self.locomotion_thread:
|
||||
self.locomotion_thread.join(timeout=2.0)
|
||||
logger.info("Locomotion control thread stopped")
|
||||
|
||||
def reset_robot(self):
|
||||
"""Move joints to default position."""
|
||||
logger.info(f"Moving {self.num_dof} joints to default position...")
|
||||
|
||||
total_time = 3.0
|
||||
num_step = int(total_time / self.robot.control_dt)
|
||||
|
||||
robot_state = self.robot.get_observation()
|
||||
|
||||
# Record current positions
|
||||
init_dof_pos = np.zeros(self.num_dof, dtype=np.float32)
|
||||
for i in range(self.num_dof):
|
||||
motor_idx = self.motor_map[i]
|
||||
init_dof_pos[i] = robot_state.motor_state[motor_idx].q
|
||||
|
||||
# Interpolate to target
|
||||
for step in range(num_step):
|
||||
alpha = step / num_step
|
||||
for i in range(self.num_dof):
|
||||
motor_idx = self.motor_map[i]
|
||||
target = self.default_angles[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].q = init_dof_pos[i] * (1 - alpha) + target * alpha
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = self.kp[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = self.kd[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# Zero missing joints for 23-DOF
|
||||
if self.is_23dof:
|
||||
for motor_idx in [13, 14, 20, 21, 27, 28]:
|
||||
self.robot.msg.motor_cmd[motor_idx].q = 0.0
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = 40.0
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = 2.0
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
self.robot.msg.crc = self.robot.crc.Crc(self.robot.msg)
|
||||
self.robot.lowcmd_publisher.Write(self.robot.msg)
|
||||
time.sleep(self.robot.control_dt)
|
||||
|
||||
logger.info(f"Reached default position ({self.num_dof} joints)")
|
||||
|
||||
# Hold for 2 seconds
|
||||
logger.info("Holding default position for 2 seconds...")
|
||||
hold_steps = int(2.0 / self.robot.control_dt)
|
||||
for _ in range(hold_steps):
|
||||
for i in range(self.num_dof):
|
||||
motor_idx = self.motor_map[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].q = self.default_angles[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = self.kp[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = self.kd[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
if self.is_23dof:
|
||||
for motor_idx in [13, 14, 20, 21, 27, 28]:
|
||||
self.robot.msg.motor_cmd[motor_idx].q = 0.0
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = 40.0
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = 2.0
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
self.robot.msg.crc = self.robot.crc.Crc(self.robot.msg)
|
||||
self.robot.lowcmd_publisher.Write(self.robot.msg)
|
||||
time.sleep(self.robot.control_dt)
|
||||
|
||||
logger.info("Ready to start locomotion!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Holosoma Locomotion Controller for Unitree G1")
|
||||
parser.add_argument("--repo-id", type=str, default=DEFAULT_HOLOSOMA_REPO_ID)
|
||||
parser.add_argument("--policy", type=str, default="fastsac", choices=["fastsac", "ppo"])
|
||||
parser.add_argument("--local-path", type=str, default=None, help="Path to local ONNX file")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load policy and detect dimensions
|
||||
policy, obs_dim = load_holosoma_policy(
|
||||
repo_id=args.repo_id,
|
||||
policy_name=args.policy,
|
||||
local_path=args.local_path,
|
||||
)
|
||||
|
||||
# Initialize robot
|
||||
config = UnitreeG1Config()
|
||||
robot = UnitreeG1(config)
|
||||
|
||||
# Initialize controller with detected obs_dim
|
||||
controller = HolosomaLocomotionController(
|
||||
policy=policy,
|
||||
robot=robot,
|
||||
config=config,
|
||||
obs_dim=obs_dim,
|
||||
)
|
||||
|
||||
try:
|
||||
#controller.reset_robot()
|
||||
controller.start_locomotion_thread()
|
||||
|
||||
logger.info(f"Robot initialized with Holosoma {'23-DOF' if obs_dim == 82 else '29-DOF'} policy")
|
||||
logger.info("Use remote controller: LY=fwd/back, LX=left/right, RX=rotate")
|
||||
logger.info("Press Ctrl+C to stop")
|
||||
|
||||
while True:
|
||||
time.sleep(1.0)
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopping locomotion...")
|
||||
controller.stop_locomotion_thread()
|
||||
print("Done!")
|
||||
@@ -0,0 +1,607 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Locomotion ↔ Dance Toggle for Unitree G1
|
||||
|
||||
Press Enter to instantly switch between locomotion and dance modes.
|
||||
- Starts in LOCOMOTION mode (joystick control)
|
||||
- Press Enter → DANCE mode (resets to frame 0)
|
||||
- Press Enter → LOCOMOTION mode
|
||||
- Repeat...
|
||||
|
||||
Auto-recovery feature:
|
||||
- If robot tilts beyond threshold during dance, auto-switches to locomotion
|
||||
- When robot recovers (tilt below recovery threshold), resumes dance from where it left off
|
||||
|
||||
Usage:
|
||||
python examples/unitree_g1/locomotion_to_dance.py
|
||||
python examples/unitree_g1/locomotion_to_dance.py --tilt-threshold 25 --recovery-threshold 10
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import select
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from xml.etree import ElementTree
|
||||
|
||||
import numpy as np
|
||||
import onnx
|
||||
import onnxruntime as ort
|
||||
import pinocchio as pin
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
|
||||
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# =============================================================================
|
||||
# CONFIGURATION
|
||||
# =============================================================================
|
||||
|
||||
NUM_DOFS = 29
|
||||
CONTROL_DT = 0.02 # 50Hz
|
||||
|
||||
# Locomotion config
|
||||
DEFAULT_HOLOSOMA_REPO_ID = "nepyope/holosoma_locomotion"
|
||||
LOCOMOTION_ACTION_SCALE = 0.25
|
||||
ANG_VEL_SCALE = 0.25
|
||||
DOF_POS_SCALE = 1.0
|
||||
DOF_VEL_SCALE = 0.05
|
||||
GAIT_PERIOD = 1.0
|
||||
|
||||
# Dance config
|
||||
DANCE_ONNX_PATH = "examples/unitree_g1/fastsac_g1_29dof_dancing.onnx"
|
||||
FROZEN_JOINTS = [13, 14, 20, 21, 27, 28]
|
||||
FROZEN_KP = 500.0
|
||||
FROZEN_KD = 5.0
|
||||
|
||||
# fmt: off
|
||||
# 29-DOF defaults (holosoma training)
|
||||
DEFAULT_29DOF_ANGLES = np.array([
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # left leg
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # right leg
|
||||
0.0, 0.0, 0.0, # waist
|
||||
0.2, 0.2, 0.0, 0.6, 0.0, 0.0, 0.0, # left arm
|
||||
0.2, -0.2, 0.0, 0.6, 0.0, 0.0, 0.0, # right arm
|
||||
], dtype=np.float32)
|
||||
|
||||
DEFAULT_29DOF_KP = np.array([
|
||||
40.179, 99.098, 40.179, 99.098, 28.501, 28.501,
|
||||
40.179, 99.098, 40.179, 99.098, 28.501, 28.501,
|
||||
40.179, 28.501, 28.501,
|
||||
14.251, 14.251, 14.251, 14.251, 14.251, 16.778, 16.778,
|
||||
14.251, 14.251, 14.251, 14.251, 14.251, 16.778, 16.778,
|
||||
], dtype=np.float32)
|
||||
|
||||
DEFAULT_29DOF_KD = np.array([
|
||||
2.558, 6.309, 2.558, 6.309, 1.814, 1.814,
|
||||
2.558, 6.309, 2.558, 6.309, 1.814, 1.814,
|
||||
2.558, 1.814, 1.814,
|
||||
0.907, 0.907, 0.907, 0.907, 0.907, 1.068, 1.068,
|
||||
0.907, 0.907, 0.907, 0.907, 0.907, 1.068, 1.068,
|
||||
], dtype=np.float32)
|
||||
|
||||
# 23-DOF config (no waist_roll/pitch, no wrist_pitch/yaw)
|
||||
DEFAULT_23DOF_ANGLES = np.array([
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # left leg
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0, # right leg
|
||||
0.0, # waist_yaw only
|
||||
0.2, 0.2, 0.0, 0.6, 0.0, # left arm (5 joints)
|
||||
0.2, -0.2, 0.0, 0.6, 0.0, # right arm (5 joints)
|
||||
], dtype=np.float32)
|
||||
|
||||
DEFAULT_23DOF_KP = np.array([
|
||||
40.179, 99.098, 40.179, 99.098, 28.501, 28.501,
|
||||
40.179, 99.098, 40.179, 99.098, 28.501, 28.501,
|
||||
40.179,
|
||||
14.251, 14.251, 14.251, 14.251, 14.251,
|
||||
14.251, 14.251, 14.251, 14.251, 14.251,
|
||||
], dtype=np.float32)
|
||||
|
||||
DEFAULT_23DOF_KD = np.array([
|
||||
2.558, 6.309, 2.558, 6.309, 1.814, 1.814,
|
||||
2.558, 6.309, 2.558, 6.309, 1.814, 1.814,
|
||||
2.558,
|
||||
0.907, 0.907, 0.907, 0.907, 0.907,
|
||||
0.907, 0.907, 0.907, 0.907, 0.907,
|
||||
], dtype=np.float32)
|
||||
|
||||
# 23-DOF policy index → 29-DOF motor index
|
||||
DOF_23_TO_MOTOR = [
|
||||
0, 1, 2, 3, 4, 5, # left leg
|
||||
6, 7, 8, 9, 10, 11, # right leg
|
||||
12, # waist_yaw
|
||||
15, 16, 17, 18, 19, # left arm (skip wrist_pitch/yaw)
|
||||
22, 23, 24, 25, 26, # right arm (skip wrist_pitch/yaw)
|
||||
]
|
||||
MISSING_23DOF_MOTORS = [13, 14, 20, 21, 27, 28]
|
||||
# fmt: on
|
||||
|
||||
# =============================================================================
|
||||
# QUATERNION UTILITIES
|
||||
# =============================================================================
|
||||
|
||||
def quat_inverse(q):
|
||||
return np.concatenate((q[:, 0:1], -q[:, 1:]), axis=1)
|
||||
|
||||
def quat_mul(a, b):
|
||||
a, b = a.reshape(-1, 4), b.reshape(-1, 4)
|
||||
w1, x1, y1, z1 = a[..., 0], a[..., 1], a[..., 2], a[..., 3]
|
||||
w2, x2, y2, z2 = b[..., 0], b[..., 1], b[..., 2], b[..., 3]
|
||||
ww = (z1 + x1) * (x2 + y2)
|
||||
yy = (w1 - y1) * (w2 + z2)
|
||||
zz = (w1 + y1) * (w2 - z2)
|
||||
xx = ww + yy + zz
|
||||
qq = 0.5 * (xx + (z1 - x1) * (x2 - y2))
|
||||
w = qq - ww + (z1 - y1) * (y2 - z2)
|
||||
x = qq - xx + (x1 + w1) * (x2 + w2)
|
||||
y = qq - yy + (w1 - x1) * (y2 + z2)
|
||||
z = qq - zz + (z1 + y1) * (w2 - x2)
|
||||
return np.stack([w, x, y, z]).T.reshape(a.shape)
|
||||
|
||||
def subtract_frame_transforms(q01, q02):
|
||||
return quat_mul(quat_inverse(q01), q02)
|
||||
|
||||
def matrix_from_quat(q):
|
||||
r, i, j, k = q[..., 0], q[..., 1], q[..., 2], q[..., 3]
|
||||
two_s = 2.0 / (q * q).sum(-1)
|
||||
o = np.stack((
|
||||
1 - two_s * (j*j + k*k), two_s * (i*j - k*r), two_s * (i*k + j*r),
|
||||
two_s * (i*j + k*r), 1 - two_s * (i*i + k*k), two_s * (j*k - i*r),
|
||||
two_s * (i*k - j*r), two_s * (j*k + i*r), 1 - two_s * (i*i + j*j),
|
||||
), -1)
|
||||
return o.reshape(q.shape[:-1] + (3, 3))
|
||||
|
||||
def xyzw_to_wxyz(xyzw):
|
||||
return np.concatenate([xyzw[:, -1:], xyzw[:, :3]], axis=1)
|
||||
|
||||
def quat_to_rpy(q):
|
||||
w, x, y, z = q
|
||||
roll = np.arctan2(2*(w*x + y*z), 1 - 2*(x**2 + y**2))
|
||||
pitch = np.arcsin(np.clip(2*(w*y - z*x), -1, 1))
|
||||
yaw = np.arctan2(2*(w*z + x*y), 1 - 2*(y**2 + z**2))
|
||||
return roll, pitch, yaw
|
||||
|
||||
def rpy_to_quat(rpy):
|
||||
roll, pitch, yaw = rpy
|
||||
cy, sy = np.cos(yaw*0.5), np.sin(yaw*0.5)
|
||||
cp, sp = np.cos(pitch*0.5), np.sin(pitch*0.5)
|
||||
cr, sr = np.cos(roll*0.5), np.sin(roll*0.5)
|
||||
return np.array([cr*cp*cy + sr*sp*sy, sr*cp*cy - cr*sp*sy,
|
||||
cr*sp*cy + sr*cp*sy, cr*cp*sy - sr*sp*cy])
|
||||
|
||||
# =============================================================================
|
||||
# PINOCCHIO FK
|
||||
# =============================================================================
|
||||
|
||||
DOF_NAMES = (
|
||||
"left_hip_pitch_joint", "left_hip_roll_joint", "left_hip_yaw_joint",
|
||||
"left_knee_joint", "left_ankle_pitch_joint", "left_ankle_roll_joint",
|
||||
"right_hip_pitch_joint", "right_hip_roll_joint", "right_hip_yaw_joint",
|
||||
"right_knee_joint", "right_ankle_pitch_joint", "right_ankle_roll_joint",
|
||||
"waist_yaw_joint", "waist_roll_joint", "waist_pitch_joint",
|
||||
"left_shoulder_pitch_joint", "left_shoulder_roll_joint", "left_shoulder_yaw_joint", "left_elbow_joint",
|
||||
"left_wrist_roll_joint", "left_wrist_pitch_joint", "left_wrist_yaw_joint",
|
||||
"right_shoulder_pitch_joint", "right_shoulder_roll_joint", "right_shoulder_yaw_joint", "right_elbow_joint",
|
||||
"right_wrist_roll_joint", "right_wrist_pitch_joint", "right_wrist_yaw_joint",
|
||||
)
|
||||
|
||||
|
||||
class PinocchioFK:
|
||||
def __init__(self, urdf_text: str):
|
||||
root = ElementTree.fromstring(urdf_text)
|
||||
for parent in root.iter():
|
||||
for child in list(parent):
|
||||
if child.tag.split("}")[-1] in {"visual", "collision"}:
|
||||
parent.remove(child)
|
||||
xml_text = '<?xml version="1.0"?>\n' + ElementTree.tostring(root, encoding="unicode")
|
||||
self.model = pin.buildModelFromXML(xml_text, pin.JointModelFreeFlyer())
|
||||
self.data = self.model.createData()
|
||||
pin_names = [n for n in self.model.names if n not in ["universe", "root_joint"]]
|
||||
self.idx_map = np.array([DOF_NAMES.index(n) for n in pin_names])
|
||||
self.ref_frame_id = self.model.getFrameId("torso_link")
|
||||
|
||||
def get_torso_quat(self, pos, quat_wxyz, dof_pos):
|
||||
quat_xyzw = np.array([quat_wxyz[1], quat_wxyz[2], quat_wxyz[3], quat_wxyz[0]])
|
||||
config = np.concatenate([pos, quat_xyzw, dof_pos[self.idx_map]])
|
||||
pin.framesForwardKinematics(self.model, self.data, config)
|
||||
coeffs = pin.Quaternion(self.data.oMf[self.ref_frame_id].rotation).coeffs()
|
||||
return np.array([coeffs[3], coeffs[0], coeffs[1], coeffs[2]]).reshape(1, 4)
|
||||
|
||||
def get_torso_tilt(self, pos, quat_wxyz, dof_pos):
|
||||
"""Get torso tilt angle from upright (degrees). Uses roll and pitch."""
|
||||
torso_q = self.get_torso_quat(pos, quat_wxyz, dof_pos)
|
||||
roll, pitch, _ = quat_to_rpy(torso_q.flatten())
|
||||
# Tilt is the angle from vertical - combine roll and pitch
|
||||
tilt_rad = np.sqrt(roll**2 + pitch**2)
|
||||
return np.degrees(tilt_rad), np.degrees(roll), np.degrees(pitch)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# LOCOMOTION CONTROLLER
|
||||
# =============================================================================
|
||||
|
||||
class LocomotionController:
|
||||
"""Holosoma whole-body locomotion (23-DOF or 29-DOF)."""
|
||||
|
||||
def __init__(self, policy, robot, obs_dim: int):
|
||||
self.policy = policy
|
||||
self.robot = robot
|
||||
self.obs_dim = obs_dim
|
||||
|
||||
# Detect DOF mode
|
||||
self.is_23dof = (obs_dim == 82)
|
||||
self.num_dof = 23 if self.is_23dof else 29
|
||||
|
||||
if self.is_23dof:
|
||||
self.default_angles = DEFAULT_23DOF_ANGLES
|
||||
self.kp = DEFAULT_23DOF_KP
|
||||
self.kd = DEFAULT_23DOF_KD
|
||||
self.motor_map = DOF_23_TO_MOTOR
|
||||
logger.info("Locomotion: 23-DOF (82D obs)")
|
||||
else:
|
||||
self.default_angles = DEFAULT_29DOF_ANGLES
|
||||
self.kp = DEFAULT_29DOF_KP
|
||||
self.kd = DEFAULT_29DOF_KD
|
||||
self.motor_map = list(range(29))
|
||||
logger.info("Locomotion: 29-DOF (100D obs)")
|
||||
|
||||
self.cmd = np.zeros(3, dtype=np.float32)
|
||||
self.qj = np.zeros(self.num_dof, dtype=np.float32)
|
||||
self.dqj = np.zeros(self.num_dof, dtype=np.float32)
|
||||
self.obs = np.zeros(obs_dim, dtype=np.float32)
|
||||
self.last_action = np.zeros(self.num_dof, dtype=np.float32)
|
||||
|
||||
self.phase = np.array([[0.0, np.pi]], dtype=np.float32)
|
||||
self.phase_dt = 2 * np.pi / (50.0 * GAIT_PERIOD)
|
||||
self.is_standing = True
|
||||
|
||||
def run_step(self):
|
||||
"""Single locomotion step."""
|
||||
state = self.robot.lowstate_buffer.get_data()
|
||||
if state is None:
|
||||
return
|
||||
|
||||
# Joystick
|
||||
if state.wireless_remote is not None:
|
||||
self.robot.remote_controller.set(state.wireless_remote)
|
||||
|
||||
ly = self.robot.remote_controller.ly if abs(self.robot.remote_controller.ly) > 0.1 else 0.0
|
||||
lx = self.robot.remote_controller.lx if abs(self.robot.remote_controller.lx) > 0.1 else 0.0
|
||||
rx = self.robot.remote_controller.rx if abs(self.robot.remote_controller.rx) > 0.1 else 0.0
|
||||
self.cmd[0], self.cmd[1], self.cmd[2] = ly, -lx, -rx
|
||||
|
||||
# Read joints via motor map
|
||||
for i in range(self.num_dof):
|
||||
self.qj[i] = state.motor_state[self.motor_map[i]].q
|
||||
self.dqj[i] = state.motor_state[self.motor_map[i]].dq
|
||||
|
||||
# IMU
|
||||
quat = state.imu_state.quaternion
|
||||
ang_vel = np.array(state.imu_state.gyroscope, dtype=np.float32)
|
||||
gravity = self.robot.get_gravity_orientation(quat)
|
||||
|
||||
# Scale
|
||||
qj_obs = (self.qj - self.default_angles) * DOF_POS_SCALE
|
||||
dqj_obs = self.dqj * DOF_VEL_SCALE
|
||||
ang_vel_s = ang_vel * ANG_VEL_SCALE
|
||||
|
||||
# Phase
|
||||
cmd_mag = np.linalg.norm(self.cmd[:2])
|
||||
ang_mag = abs(self.cmd[2])
|
||||
if cmd_mag < 0.01 and ang_mag < 0.01:
|
||||
self.phase[0, :] = np.pi
|
||||
self.is_standing = True
|
||||
elif self.is_standing:
|
||||
self.phase = np.array([[0.0, np.pi]], dtype=np.float32)
|
||||
self.is_standing = False
|
||||
else:
|
||||
self.phase = np.fmod(self.phase + self.phase_dt + np.pi, 2*np.pi) - np.pi
|
||||
|
||||
sin_ph, cos_ph = np.sin(self.phase[0]), np.cos(self.phase[0])
|
||||
|
||||
# Build obs
|
||||
if self.is_23dof:
|
||||
self.obs[0:23] = self.last_action
|
||||
self.obs[23:26] = ang_vel_s
|
||||
self.obs[26] = self.cmd[2]
|
||||
self.obs[27:29] = self.cmd[:2]
|
||||
self.obs[29:31] = cos_ph
|
||||
self.obs[31:54] = qj_obs
|
||||
self.obs[54:77] = dqj_obs
|
||||
self.obs[77:80] = gravity
|
||||
self.obs[80:82] = sin_ph
|
||||
else:
|
||||
self.obs[0:29] = self.last_action
|
||||
self.obs[29:32] = ang_vel_s
|
||||
self.obs[32] = self.cmd[2]
|
||||
self.obs[33:35] = self.cmd[:2]
|
||||
self.obs[35:37] = cos_ph
|
||||
self.obs[37:66] = qj_obs
|
||||
self.obs[66:95] = dqj_obs
|
||||
self.obs[95:98] = gravity
|
||||
self.obs[98:100] = sin_ph
|
||||
|
||||
# Inference
|
||||
obs_in = self.obs.reshape(1, -1).astype(np.float32)
|
||||
ort_in = {self.policy.get_inputs()[0].name: obs_in}
|
||||
raw_action = self.policy.run(None, ort_in)[0].squeeze()
|
||||
clipped = np.clip(raw_action, -100.0, 100.0)
|
||||
self.last_action = clipped.copy()
|
||||
scaled = clipped * LOCOMOTION_ACTION_SCALE
|
||||
target = self.default_angles + scaled
|
||||
|
||||
# Send commands
|
||||
for i in range(self.num_dof):
|
||||
motor_idx = self.motor_map[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].q = float(target[i])
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = self.kp[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = self.kd[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# Zero missing joints for 23-DOF
|
||||
if self.is_23dof:
|
||||
for idx in MISSING_23DOF_MOTORS:
|
||||
self.robot.msg.motor_cmd[idx].q = 0.0
|
||||
self.robot.msg.motor_cmd[idx].qd = 0
|
||||
self.robot.msg.motor_cmd[idx].kp = 40.0
|
||||
self.robot.msg.motor_cmd[idx].kd = 2.0
|
||||
self.robot.msg.motor_cmd[idx].tau = 0
|
||||
|
||||
self.robot.send_action(self.robot.msg)
|
||||
|
||||
def reset(self):
|
||||
"""Reset state for fresh start."""
|
||||
self.last_action.fill(0)
|
||||
self.phase = np.array([[0.0, np.pi]], dtype=np.float32)
|
||||
self.is_standing = True
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DANCE CONTROLLER
|
||||
# =============================================================================
|
||||
|
||||
class DanceController:
|
||||
"""WBT dance policy with FK for torso tracking."""
|
||||
|
||||
def __init__(self, policy, robot, pinocchio_fk, motor_kp, motor_kd, action_scale):
|
||||
self.policy = policy
|
||||
self.robot = robot
|
||||
self.pinocchio_fk = pinocchio_fk
|
||||
self.motor_kp = motor_kp
|
||||
self.motor_kd = motor_kd
|
||||
self.action_scale = action_scale
|
||||
|
||||
self.obs_dim = policy.get_inputs()[0].shape[1]
|
||||
self.last_action = np.zeros((1, NUM_DOFS), dtype=np.float32)
|
||||
self.motion_command = None
|
||||
self.ref_quat_xyzw = None
|
||||
self.timestep = 0
|
||||
self.yaw_offset = 0.0
|
||||
|
||||
logger.info(f"Dance: obs_dim={self.obs_dim}, action_scale={action_scale}")
|
||||
|
||||
def initialize(self, reset_to_frame_0: bool = True):
|
||||
"""Initialize dance. If reset_to_frame_0=True, starts from frame 0. Otherwise resumes."""
|
||||
if reset_to_frame_0:
|
||||
self.timestep = 0
|
||||
self.last_action.fill(0)
|
||||
|
||||
# Get initial motion data at frame 0
|
||||
dummy = np.zeros((1, self.obs_dim), dtype=np.float32)
|
||||
outs = self.policy.run(["joint_pos", "joint_vel", "ref_quat_xyzw"],
|
||||
{"obs": dummy, "time_step": np.array([[0]], dtype=np.float32)})
|
||||
self.motion_command = np.concatenate(outs[0:2], axis=1)
|
||||
self.ref_quat_xyzw = outs[2]
|
||||
logger.info("Dance: reset to frame 0")
|
||||
else:
|
||||
# Resume from current timestep - just update motion command for current frame
|
||||
dummy = np.zeros((1, self.obs_dim), dtype=np.float32)
|
||||
outs = self.policy.run(["joint_pos", "joint_vel", "ref_quat_xyzw"],
|
||||
{"obs": dummy, "time_step": np.array([[self.timestep]], dtype=np.float32)})
|
||||
self.motion_command = np.concatenate(outs[0:2], axis=1)
|
||||
self.ref_quat_xyzw = outs[2]
|
||||
logger.info(f"Dance: resuming from frame {self.timestep}")
|
||||
|
||||
# Capture yaw offset
|
||||
state = self.robot.lowstate_buffer.get_data()
|
||||
if state and self.pinocchio_fk:
|
||||
quat = np.array(state.imu_state.quaternion, dtype=np.float32)
|
||||
dof = np.array([state.motor_state[i].q for i in range(NUM_DOFS)], dtype=np.float32)
|
||||
torso_q = self.pinocchio_fk.get_torso_quat(np.zeros(3), quat, dof)
|
||||
_, _, self.yaw_offset = quat_to_rpy(torso_q.flatten())
|
||||
logger.info(f"Dance yaw offset: {np.degrees(self.yaw_offset):.1f}°")
|
||||
|
||||
def _remove_yaw_offset(self, quat_wxyz):
|
||||
if abs(self.yaw_offset) < 1e-6:
|
||||
return quat_wxyz
|
||||
yaw_q = rpy_to_quat((0, 0, -self.yaw_offset)).reshape(1, 4)
|
||||
return quat_mul(yaw_q, quat_wxyz)
|
||||
|
||||
def run_step(self):
|
||||
"""Single dance step."""
|
||||
state = self.robot.lowstate_buffer.get_data()
|
||||
if state is None:
|
||||
return
|
||||
|
||||
quat = np.array(state.imu_state.quaternion, dtype=np.float32)
|
||||
ang_vel = np.array(state.imu_state.gyroscope, dtype=np.float32)
|
||||
dof_pos = np.array([state.motor_state[i].q for i in range(NUM_DOFS)], dtype=np.float32)
|
||||
dof_vel = np.array([state.motor_state[i].dq for i in range(NUM_DOFS)], dtype=np.float32)
|
||||
|
||||
# FK for torso orientation
|
||||
if self.pinocchio_fk:
|
||||
torso_q = self.pinocchio_fk.get_torso_quat(np.zeros(3), quat, dof_pos)
|
||||
torso_q = self._remove_yaw_offset(torso_q)
|
||||
motion_ori = xyzw_to_wxyz(self.ref_quat_xyzw)
|
||||
rel_quat = subtract_frame_transforms(torso_q, motion_ori)
|
||||
ori_b = matrix_from_quat(rel_quat)[..., :2].reshape(1, -1)
|
||||
else:
|
||||
ori_b = np.zeros((1, 6), dtype=np.float32)
|
||||
|
||||
dof_rel = (dof_pos - DEFAULT_29DOF_ANGLES).reshape(1, -1)
|
||||
|
||||
# Build obs (alphabetical)
|
||||
obs_dict = {
|
||||
"actions": self.last_action,
|
||||
"base_ang_vel": ang_vel.reshape(1, 3),
|
||||
"dof_pos": dof_rel,
|
||||
"dof_vel": dof_vel.reshape(1, -1),
|
||||
"motion_command": self.motion_command,
|
||||
"motion_ref_ori_b": ori_b,
|
||||
}
|
||||
obs = np.concatenate([obs_dict[k].astype(np.float32) for k in sorted(obs_dict.keys())], axis=1)
|
||||
obs = np.clip(obs, -100, 100)
|
||||
|
||||
# Inference
|
||||
outs = self.policy.run(["actions", "joint_pos", "joint_vel", "ref_quat_xyzw"],
|
||||
{"obs": obs, "time_step": np.array([[self.timestep]], dtype=np.float32)})
|
||||
action = np.clip(outs[0], -100, 100)
|
||||
self.motion_command = np.concatenate(outs[1:3], axis=1)
|
||||
self.ref_quat_xyzw = outs[3]
|
||||
self.last_action = action.copy()
|
||||
|
||||
target = DEFAULT_29DOF_ANGLES + action.flatten() * self.action_scale
|
||||
|
||||
# Send commands
|
||||
for i in range(NUM_DOFS):
|
||||
if i in FROZEN_JOINTS:
|
||||
self.robot.msg.motor_cmd[i].q = 0.0
|
||||
self.robot.msg.motor_cmd[i].kp = FROZEN_KP
|
||||
self.robot.msg.motor_cmd[i].kd = FROZEN_KD
|
||||
else:
|
||||
self.robot.msg.motor_cmd[i].q = float(target[i])
|
||||
self.robot.msg.motor_cmd[i].kp = self.motor_kp[i]
|
||||
self.robot.msg.motor_cmd[i].kd = self.motor_kd[i]
|
||||
self.robot.msg.motor_cmd[i].qd = 0
|
||||
self.robot.msg.motor_cmd[i].tau = 0
|
||||
|
||||
self.robot.send_action(self.robot.msg)
|
||||
self.timestep += 1
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# MAIN
|
||||
# =============================================================================
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Locomotion ↔ Dance Toggle")
|
||||
parser.add_argument("--loco-repo", type=str, default=DEFAULT_HOLOSOMA_REPO_ID)
|
||||
parser.add_argument("--dance-onnx", type=str, default=DANCE_ONNX_PATH)
|
||||
args = parser.parse_args()
|
||||
|
||||
print("=" * 70)
|
||||
print("🚶 LOCOMOTION ↔ 💃 DANCE")
|
||||
print("=" * 70)
|
||||
print("Press ENTER to toggle between modes")
|
||||
print("=" * 70)
|
||||
|
||||
# Load locomotion policy
|
||||
logger.info("Loading locomotion policy...")
|
||||
loco_path = hf_hub_download(repo_id=args.loco_repo, filename="fastsac_g1_29dof.onnx")
|
||||
loco_policy = ort.InferenceSession(loco_path)
|
||||
loco_obs_dim = loco_policy.get_inputs()[0].shape[1]
|
||||
logger.info(f"Locomotion: {loco_obs_dim}D obs")
|
||||
|
||||
# Load dance policy
|
||||
logger.info("Loading dance policy...")
|
||||
dance_policy = ort.InferenceSession(args.dance_onnx)
|
||||
dance_model = onnx.load(args.dance_onnx)
|
||||
dance_meta = {p.key: json.loads(p.value) for p in dance_model.metadata_props}
|
||||
dance_kp = np.array(dance_meta.get("kp", DEFAULT_29DOF_KP), dtype=np.float32)
|
||||
dance_kd = np.array(dance_meta.get("kd", DEFAULT_29DOF_KD), dtype=np.float32)
|
||||
dance_action_scale = float(dance_meta.get("action_scale", 1.0))
|
||||
logger.info(f"Dance: {dance_policy.get_inputs()[0].shape[1]}D obs, scale={dance_action_scale}")
|
||||
|
||||
# Build Pinocchio FK
|
||||
pinocchio_fk = None
|
||||
if "robot_urdf" in dance_meta:
|
||||
logger.info("Building Pinocchio FK...")
|
||||
pinocchio_fk = PinocchioFK(dance_meta["robot_urdf"])
|
||||
|
||||
# Initialize robot
|
||||
logger.info("Initializing robot...")
|
||||
config = UnitreeG1Config()
|
||||
robot = UnitreeG1(config)
|
||||
logger.info("Robot connected!")
|
||||
|
||||
# Create controllers
|
||||
loco_ctrl = LocomotionController(loco_policy, robot, loco_obs_dim)
|
||||
dance_ctrl = DanceController(dance_policy, robot, pinocchio_fk, dance_kp, dance_kd, dance_action_scale)
|
||||
|
||||
# State
|
||||
mode = "locomotion"
|
||||
toggle_event = threading.Event()
|
||||
shutdown = threading.Event()
|
||||
|
||||
# Input thread
|
||||
def input_loop():
|
||||
while not shutdown.is_set():
|
||||
if select.select([sys.stdin], [], [], 0.1)[0]:
|
||||
sys.stdin.readline()
|
||||
toggle_event.set()
|
||||
|
||||
input_thread = threading.Thread(target=input_loop, daemon=True)
|
||||
input_thread.start()
|
||||
|
||||
print("\n🚶 LOCOMOTION MODE - Use joystick to walk")
|
||||
print(" Press ENTER to switch to DANCE")
|
||||
print("-" * 70)
|
||||
|
||||
step = 0
|
||||
try:
|
||||
while not shutdown.is_set():
|
||||
t0 = time.time()
|
||||
|
||||
# Check toggle
|
||||
if toggle_event.is_set():
|
||||
toggle_event.clear()
|
||||
if mode == "locomotion":
|
||||
mode = "dance"
|
||||
dance_ctrl.initialize()
|
||||
print("\n" + "=" * 70)
|
||||
print("💃 DANCE MODE (frame 0)")
|
||||
print(" Press ENTER to switch to LOCOMOTION")
|
||||
print("=" * 70)
|
||||
else:
|
||||
mode = "locomotion"
|
||||
loco_ctrl.reset()
|
||||
print("\n" + "=" * 70)
|
||||
print("🚶 LOCOMOTION MODE")
|
||||
print(" Press ENTER to switch to DANCE")
|
||||
print("=" * 70)
|
||||
|
||||
# Run controller
|
||||
if mode == "locomotion":
|
||||
loco_ctrl.run_step()
|
||||
else:
|
||||
dance_ctrl.run_step()
|
||||
|
||||
# Log
|
||||
if step % 100 == 0:
|
||||
if mode == "locomotion":
|
||||
print(f"[LOCO ] step={step:5d} cmd=[{loco_ctrl.cmd[0]:.2f},{loco_ctrl.cmd[1]:.2f},{loco_ctrl.cmd[2]:.2f}]")
|
||||
else:
|
||||
print(f"[DANCE] step={step:5d} timestep={dance_ctrl.timestep}")
|
||||
|
||||
step += 1
|
||||
elapsed = time.time() - t0
|
||||
if elapsed < CONTROL_DT:
|
||||
time.sleep(CONTROL_DT - elapsed)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\nStopping...")
|
||||
finally:
|
||||
shutdown.set()
|
||||
robot.disconnect()
|
||||
|
||||
print("Done!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,447 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Example: Unitree RL 12-DOF Legs-Only Locomotion (TorchScript)
|
||||
|
||||
This example demonstrates loading a 12-DOF legs-only locomotion policy
|
||||
(TorchScript .pt format) and running it on the Unitree G1 robot.
|
||||
|
||||
Key characteristics:
|
||||
- Single TorchScript policy (.pt)
|
||||
- 47D observations, 12D actions (legs only)
|
||||
- Phase-based gait timing
|
||||
- Arms and waist held at fixed positions
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from huggingface_hub import hf_hub_download
|
||||
from scipy.spatial.transform import Rotation as R
|
||||
|
||||
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
|
||||
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 12-DOF leg joint configuration
|
||||
# Joint order: [L_hip_pitch, L_hip_roll, L_hip_yaw, L_knee, L_ankle_pitch, L_ankle_roll,
|
||||
# R_hip_pitch, R_hip_roll, R_hip_yaw, R_knee, R_ankle_pitch, R_ankle_roll]
|
||||
LEG_JOINT_INDICES = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
|
||||
|
||||
# Default leg angles for standing
|
||||
DEFAULT_LEG_ANGLES = np.array([
|
||||
-0.1, 0.0, 0.0, 0.3, -0.2, 0.0, # left leg
|
||||
-0.1, 0.0, 0.0, 0.3, -0.2, 0.0, # right leg
|
||||
], dtype=np.float32)
|
||||
|
||||
# KP/KD for leg joints
|
||||
LEG_KPS = np.array([150, 150, 150, 300, 40, 40, 150, 150, 150, 300, 40, 40], dtype=np.float32)
|
||||
LEG_KDS = np.array([6, 6, 6, 4, 2, 2, 6, 6, 6, 4, 2, 2], dtype=np.float32)
|
||||
|
||||
# Waist configuration (held at zero)
|
||||
WAIST_JOINT_INDICES = [12, 13, 14] # yaw, roll, pitch
|
||||
WAIST_KPS = np.array([250, 250, 250], dtype=np.float32)
|
||||
WAIST_KDS = np.array([5, 5, 5], dtype=np.float32)
|
||||
|
||||
# Arm configuration (indices 15-28, held at initial position)
|
||||
ARM_JOINT_INDICES = list(range(15, 29))
|
||||
ARM_KPS = np.array([80, 80, 80, 80, 40, 40, 40, # left arm (shoulder + wrist)
|
||||
80, 80, 80, 80, 40, 40, 40], dtype=np.float32) # right arm
|
||||
ARM_KDS = np.array([3, 3, 3, 3, 1.5, 1.5, 1.5,
|
||||
3, 3, 3, 3, 1.5, 1.5, 1.5], dtype=np.float32)
|
||||
|
||||
# Control parameters
|
||||
LOCOMOTION_CONTROL_DT = 0.02 # 50Hz control rate
|
||||
LOCOMOTION_ACTION_SCALE = 0.25
|
||||
ANG_VEL_SCALE = 0.25
|
||||
DOF_POS_SCALE = 1.0
|
||||
DOF_VEL_SCALE = 0.05
|
||||
CMD_SCALE = np.array([2.0, 2.0, 0.25], dtype=np.float32)
|
||||
MAX_CMD = np.array([0.8, 0.5, 1.57], dtype=np.float32) # max vx, vy, yaw_rate
|
||||
|
||||
# Gait parameters
|
||||
GAIT_PERIOD = 0.8 # seconds
|
||||
|
||||
DEFAULT_REPO_ID = "nepyope/unitree_rl_locomotion"
|
||||
|
||||
|
||||
def load_torchscript_policy(
|
||||
repo_id: str = DEFAULT_REPO_ID,
|
||||
filename: str = "motion.pt",
|
||||
) -> torch.jit.ScriptModule:
|
||||
"""Load TorchScript locomotion policy from Hugging Face Hub.
|
||||
|
||||
Args:
|
||||
repo_id: Hugging Face Hub repository ID containing the policy.
|
||||
filename: Policy filename (default: motion.pt).
|
||||
"""
|
||||
logger.info(f"Loading TorchScript policy from Hugging Face Hub ({repo_id}/{filename})...")
|
||||
|
||||
policy_path = hf_hub_download(
|
||||
repo_id=repo_id,
|
||||
filename=filename,
|
||||
)
|
||||
|
||||
policy = torch.jit.load(policy_path)
|
||||
policy.eval()
|
||||
|
||||
logger.info("TorchScript policy loaded successfully")
|
||||
|
||||
return policy
|
||||
|
||||
|
||||
class UnitreeRLLocomotionController:
|
||||
"""
|
||||
Handles 12-DOF legs-only locomotion control for the Unitree G1 robot.
|
||||
|
||||
This controller manages:
|
||||
- Single TorchScript policy
|
||||
- 47D observations (single frame)
|
||||
- 12D action output (legs only)
|
||||
- Arms and waist held at fixed positions
|
||||
- Phase-based gait timing
|
||||
"""
|
||||
|
||||
def __init__(self, policy, robot, config):
|
||||
self.policy = policy
|
||||
self.robot = robot
|
||||
self.config = config
|
||||
|
||||
# Velocity commands (vx, vy, yaw_rate)
|
||||
self.locomotion_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32)
|
||||
|
||||
# State variables (12 DOF legs)
|
||||
self.qj = np.zeros(12, dtype=np.float32)
|
||||
self.dqj = np.zeros(12, dtype=np.float32)
|
||||
self.locomotion_action = np.zeros(12, dtype=np.float32)
|
||||
self.locomotion_obs = np.zeros(47, dtype=np.float32)
|
||||
|
||||
# Initial arm positions (captured on reset)
|
||||
self.initial_arm_positions = np.zeros(14, dtype=np.float32)
|
||||
|
||||
# Counter for phase calculation
|
||||
self.counter = 0
|
||||
|
||||
# Thread management
|
||||
self.locomotion_running = False
|
||||
self.locomotion_thread = None
|
||||
|
||||
logger.info("UnitreeRLLocomotionController initialized")
|
||||
logger.info(" Observation dim: 47, Action dim: 12 (legs only)")
|
||||
|
||||
def locomotion_run(self):
|
||||
"""12-DOF legs-only locomotion policy loop."""
|
||||
self.counter += 1
|
||||
|
||||
if self.counter == 1:
|
||||
print("\n" + "=" * 60)
|
||||
print("🚀 RUNNING UNITREE RL 12-DOF LOCOMOTION POLICY")
|
||||
print(" 47D observations → 12D actions (legs only)")
|
||||
print(" Arms and waist held at fixed positions")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
# Get current observation
|
||||
robot_state = self.robot.get_observation()
|
||||
if robot_state is None:
|
||||
return
|
||||
|
||||
# Get command from remote controller
|
||||
if robot_state.wireless_remote is not None:
|
||||
self.robot.remote_controller.set(robot_state.wireless_remote)
|
||||
else:
|
||||
self.robot.remote_controller.lx = 0.0
|
||||
self.robot.remote_controller.ly = 0.0
|
||||
self.robot.remote_controller.rx = 0.0
|
||||
self.robot.remote_controller.ry = 0.0
|
||||
|
||||
self.locomotion_cmd[0] = self.robot.remote_controller.ly # forward/backward
|
||||
self.locomotion_cmd[1] = self.robot.remote_controller.lx * -1 # left/right (inverted)
|
||||
self.locomotion_cmd[2] = self.robot.remote_controller.rx * -1 # yaw (inverted)
|
||||
|
||||
# Get leg joint positions and velocities (12 DOF)
|
||||
for i, motor_idx in enumerate(LEG_JOINT_INDICES):
|
||||
self.qj[i] = robot_state.motor_state[motor_idx].q
|
||||
self.dqj[i] = robot_state.motor_state[motor_idx].dq
|
||||
|
||||
# Get IMU data
|
||||
quat = robot_state.imu_state.quaternion
|
||||
ang_vel = np.array(robot_state.imu_state.gyroscope, dtype=np.float32)
|
||||
|
||||
# Scale observations
|
||||
gravity_orientation = self.robot.get_gravity_orientation(quat)
|
||||
qj_obs = (self.qj - DEFAULT_LEG_ANGLES) * DOF_POS_SCALE
|
||||
dqj_obs = self.dqj * DOF_VEL_SCALE
|
||||
ang_vel_scaled = ang_vel * ANG_VEL_SCALE
|
||||
|
||||
# Calculate phase
|
||||
count = self.counter * LOCOMOTION_CONTROL_DT
|
||||
phase = (count % GAIT_PERIOD) / GAIT_PERIOD
|
||||
sin_phase = np.sin(2 * np.pi * phase)
|
||||
cos_phase = np.cos(2 * np.pi * phase)
|
||||
|
||||
# Build 47D observation vector
|
||||
# [0:3] - angular velocity (scaled)
|
||||
# [3:6] - gravity orientation
|
||||
# [6:9] - velocity command (scaled)
|
||||
# [9:21] - joint positions (12D, relative to default)
|
||||
# [21:33] - joint velocities (12D, scaled)
|
||||
# [33:45] - previous actions (12D)
|
||||
# [45] - sin_phase
|
||||
# [46] - cos_phase
|
||||
self.locomotion_obs[0:3] = ang_vel_scaled
|
||||
self.locomotion_obs[3:6] = gravity_orientation
|
||||
self.locomotion_obs[6:9] = self.locomotion_cmd * CMD_SCALE * MAX_CMD
|
||||
self.locomotion_obs[9:21] = qj_obs
|
||||
self.locomotion_obs[21:33] = dqj_obs
|
||||
self.locomotion_obs[33:45] = self.locomotion_action
|
||||
self.locomotion_obs[45] = sin_phase
|
||||
self.locomotion_obs[46] = cos_phase
|
||||
|
||||
# Run policy inference (TorchScript)
|
||||
obs_tensor = torch.from_numpy(self.locomotion_obs).unsqueeze(0).float()
|
||||
with torch.no_grad():
|
||||
action_tensor = self.policy(obs_tensor)
|
||||
self.locomotion_action = action_tensor.squeeze().numpy()
|
||||
|
||||
# Transform action to target joint positions
|
||||
target_leg_pos = DEFAULT_LEG_ANGLES + self.locomotion_action * LOCOMOTION_ACTION_SCALE
|
||||
|
||||
# Debug logging (first 3 iterations)
|
||||
if self.counter <= 3:
|
||||
print(f"\n[Unitree RL Debug #{self.counter}]")
|
||||
print(f" Phase: {phase:.3f} (sin={sin_phase:.3f}, cos={cos_phase:.3f})")
|
||||
print(f" Cmd (vx, vy, yaw): ({self.locomotion_cmd[0]:.2f}, {self.locomotion_cmd[1]:.2f}, {self.locomotion_cmd[2]:.2f})")
|
||||
print(f" Action range: [{self.locomotion_action.min():.3f}, {self.locomotion_action.max():.3f}]")
|
||||
|
||||
# Send commands to LEG motors (0-11)
|
||||
for i, motor_idx in enumerate(LEG_JOINT_INDICES):
|
||||
self.robot.msg.motor_cmd[motor_idx].q = target_leg_pos[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = LEG_KPS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = LEG_KDS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# Hold WAIST motors at zero (12, 13, 14)
|
||||
for i, motor_idx in enumerate(WAIST_JOINT_INDICES):
|
||||
self.robot.msg.motor_cmd[motor_idx].q = 0.0
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = WAIST_KPS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = WAIST_KDS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# Hold ARM motors at initial position (15-28)
|
||||
for i, motor_idx in enumerate(ARM_JOINT_INDICES):
|
||||
self.robot.msg.motor_cmd[motor_idx].q = self.initial_arm_positions[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = ARM_KPS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = ARM_KDS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# Send command
|
||||
self.robot.send_action(self.robot.msg)
|
||||
|
||||
def _locomotion_thread_loop(self):
|
||||
"""Background thread that runs the locomotion policy at specified rate."""
|
||||
logger.info("Locomotion thread started")
|
||||
while self.locomotion_running:
|
||||
start_time = time.time()
|
||||
try:
|
||||
self.locomotion_run()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in locomotion loop: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
# Sleep to maintain control rate
|
||||
elapsed = time.time() - start_time
|
||||
sleep_time = max(0, LOCOMOTION_CONTROL_DT - elapsed)
|
||||
time.sleep(sleep_time)
|
||||
logger.info("Locomotion thread stopped")
|
||||
|
||||
def start_locomotion_thread(self):
|
||||
if self.locomotion_running:
|
||||
logger.warning("Locomotion thread already running")
|
||||
return
|
||||
|
||||
logger.info("Starting locomotion control thread...")
|
||||
self.locomotion_running = True
|
||||
self.locomotion_thread = threading.Thread(target=self._locomotion_thread_loop, daemon=True)
|
||||
self.locomotion_thread.start()
|
||||
|
||||
logger.info("Locomotion control thread started!")
|
||||
|
||||
def stop_locomotion_thread(self):
|
||||
if not self.locomotion_running:
|
||||
return
|
||||
|
||||
logger.info("Stopping locomotion control thread...")
|
||||
self.locomotion_running = False
|
||||
if self.locomotion_thread:
|
||||
self.locomotion_thread.join(timeout=2.0)
|
||||
logger.info("Locomotion control thread stopped")
|
||||
|
||||
def reset_robot(self):
|
||||
"""Move legs to default standing position over 2 seconds (arms are captured and held)."""
|
||||
logger.info("Moving legs to default position...")
|
||||
|
||||
total_time = 2.0
|
||||
num_step = int(total_time / self.robot.control_dt)
|
||||
|
||||
# Get current state
|
||||
robot_state = self.robot.get_observation()
|
||||
|
||||
# Capture initial arm positions (to hold during locomotion)
|
||||
for i, motor_idx in enumerate(ARM_JOINT_INDICES):
|
||||
self.initial_arm_positions[i] = robot_state.motor_state[motor_idx].q
|
||||
logger.info(f"Captured initial arm positions: {self.initial_arm_positions[:4]}...")
|
||||
|
||||
# Record current leg positions
|
||||
init_leg_pos = np.zeros(12, dtype=np.float32)
|
||||
for i, motor_idx in enumerate(LEG_JOINT_INDICES):
|
||||
init_leg_pos[i] = robot_state.motor_state[motor_idx].q
|
||||
|
||||
# Interpolate legs to default position
|
||||
for step in range(num_step):
|
||||
alpha = step / num_step
|
||||
|
||||
# Interpolate leg positions
|
||||
for i, motor_idx in enumerate(LEG_JOINT_INDICES):
|
||||
target_pos = DEFAULT_LEG_ANGLES[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].q = (
|
||||
init_leg_pos[i] * (1 - alpha) + target_pos * alpha
|
||||
)
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = LEG_KPS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = LEG_KDS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# Hold waist at zero
|
||||
for i, motor_idx in enumerate(WAIST_JOINT_INDICES):
|
||||
self.robot.msg.motor_cmd[motor_idx].q = 0.0
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = WAIST_KPS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = WAIST_KDS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# Hold arms at initial position
|
||||
for i, motor_idx in enumerate(ARM_JOINT_INDICES):
|
||||
self.robot.msg.motor_cmd[motor_idx].q = self.initial_arm_positions[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = ARM_KPS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = ARM_KDS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
self.robot.msg.crc = self.robot.crc.Crc(self.robot.msg)
|
||||
self.robot.lowcmd_publisher.Write(self.robot.msg)
|
||||
time.sleep(self.robot.control_dt)
|
||||
|
||||
logger.info("Reached default leg position")
|
||||
|
||||
# Hold position for 2 seconds
|
||||
logger.info("Holding default position for 2 seconds...")
|
||||
hold_time = 2.0
|
||||
num_hold_steps = int(hold_time / self.robot.control_dt)
|
||||
|
||||
for _ in range(num_hold_steps):
|
||||
# Hold legs at default
|
||||
for i, motor_idx in enumerate(LEG_JOINT_INDICES):
|
||||
self.robot.msg.motor_cmd[motor_idx].q = DEFAULT_LEG_ANGLES[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = LEG_KPS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = LEG_KDS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# Hold waist at zero
|
||||
for i, motor_idx in enumerate(WAIST_JOINT_INDICES):
|
||||
self.robot.msg.motor_cmd[motor_idx].q = 0.0
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = WAIST_KPS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = WAIST_KDS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
# Hold arms at initial position
|
||||
for i, motor_idx in enumerate(ARM_JOINT_INDICES):
|
||||
self.robot.msg.motor_cmd[motor_idx].q = self.initial_arm_positions[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].qd = 0
|
||||
self.robot.msg.motor_cmd[motor_idx].kp = ARM_KPS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].kd = ARM_KDS[i]
|
||||
self.robot.msg.motor_cmd[motor_idx].tau = 0
|
||||
|
||||
self.robot.msg.crc = self.robot.crc.Crc(self.robot.msg)
|
||||
self.robot.lowcmd_publisher.Write(self.robot.msg)
|
||||
time.sleep(self.robot.control_dt)
|
||||
|
||||
logger.info("Ready to start locomotion!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Unitree RL 12-DOF Locomotion Controller for Unitree G1")
|
||||
parser.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
default=DEFAULT_REPO_ID,
|
||||
help=f"Hugging Face Hub repo ID for policy (default: {DEFAULT_REPO_ID})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--filename",
|
||||
type=str,
|
||||
default="motion.pt",
|
||||
help="Policy filename (default: motion.pt)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load policy
|
||||
policy = load_torchscript_policy(repo_id=args.repo_id, filename=args.filename)
|
||||
|
||||
# Initialize robot
|
||||
config = UnitreeG1Config()
|
||||
robot = UnitreeG1(config)
|
||||
|
||||
# Initialize locomotion controller
|
||||
locomotion_controller = UnitreeRLLocomotionController(
|
||||
policy=policy,
|
||||
robot=robot,
|
||||
config=config,
|
||||
)
|
||||
|
||||
# Reset robot and start locomotion thread
|
||||
try:
|
||||
locomotion_controller.reset_robot()
|
||||
locomotion_controller.start_locomotion_thread()
|
||||
|
||||
# Log status
|
||||
logger.info("Robot initialized with Unitree RL locomotion policy")
|
||||
logger.info("Locomotion controller running in background thread")
|
||||
logger.info("Use remote controller to command velocity:")
|
||||
logger.info(" Left stick Y: forward/backward")
|
||||
logger.info(" Left stick X: left/right")
|
||||
logger.info(" Right stick X: rotate")
|
||||
logger.info("Press Ctrl+C to stop")
|
||||
|
||||
# Keep robot alive
|
||||
while True:
|
||||
time.sleep(1.0)
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopping locomotion...")
|
||||
locomotion_controller.stop_locomotion_thread()
|
||||
print("Done!")
|
||||
|
||||
@@ -263,6 +263,7 @@ default.extend-ignore-identifiers-re = [
|
||||
"ein",
|
||||
"thw",
|
||||
"inpt",
|
||||
"ROBOTIS",
|
||||
]
|
||||
|
||||
# TODO: Uncomment when ready to use
|
||||
|
||||
@@ -26,4 +26,4 @@ DEFAULT_OBS_QUEUE_TIMEOUT = 2
|
||||
SUPPORTED_POLICIES = ["act", "smolvla", "diffusion", "tdmpc", "vqbet", "pi0", "pi05"]
|
||||
|
||||
# TODO: Add all other robots
|
||||
SUPPORTED_ROBOTS = ["so100_follower", "so101_follower", "bi_so100_follower"]
|
||||
SUPPORTED_ROBOTS = ["so100_follower", "so101_follower", "bi_so100_follower", "omx_follower"]
|
||||
|
||||
@@ -54,6 +54,7 @@ from lerobot.robots import ( # noqa: F401
|
||||
bi_so100_follower,
|
||||
koch_follower,
|
||||
make_robot_from_config,
|
||||
omx_follower,
|
||||
so100_follower,
|
||||
so101_follower,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# OMX is a fully open-source robot from ROBOTIS.
|
||||
# More information at: https://ai.robotis.com/omx/introduction_omx.html
|
||||
|
||||
from .config_omx_follower import OmxFollowerConfig
|
||||
from .omx_follower import OmxFollower
|
||||
@@ -0,0 +1,39 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from lerobot.cameras import CameraConfig
|
||||
|
||||
from ..config import RobotConfig
|
||||
|
||||
|
||||
@RobotConfig.register_subclass("omx_follower")
|
||||
@dataclass
|
||||
class OmxFollowerConfig(RobotConfig):
|
||||
# Port to connect to the arm
|
||||
port: str
|
||||
|
||||
disable_torque_on_disconnect: bool = True
|
||||
|
||||
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
|
||||
# Set this to a positive scalar to have the same value for all motors, or a dictionary that maps motor
|
||||
# names to the max_relative_target value for that motor.
|
||||
max_relative_target: float | dict[str, float] | None = None
|
||||
|
||||
# cameras
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=dict)
|
||||
|
||||
# Set to `True` for backward compatibility with previous policies/dataset
|
||||
use_degrees: bool = False
|
||||
@@ -0,0 +1,225 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
from lerobot.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.motors.dynamixel import (
|
||||
DriveMode,
|
||||
DynamixelMotorsBus,
|
||||
OperatingMode,
|
||||
)
|
||||
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
|
||||
from ..robot import Robot
|
||||
from ..utils import ensure_safe_goal_position
|
||||
from .config_omx_follower import OmxFollowerConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OmxFollower(Robot):
|
||||
"""
|
||||
- [OMX](https://github.com/ROBOTIS-GIT/open_manipulator),
|
||||
expansion, developed by Woojin Wie and Junha Cha from [ROBOTIS](https://ai.robotis.com/)
|
||||
"""
|
||||
|
||||
config_class = OmxFollowerConfig
|
||||
name = "omx_follower"
|
||||
|
||||
def __init__(self, config: OmxFollowerConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
|
||||
self.bus = DynamixelMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
"shoulder_pan": Motor(11, "xl430-w250", norm_mode_body),
|
||||
"shoulder_lift": Motor(12, "xl430-w250", norm_mode_body),
|
||||
"elbow_flex": Motor(13, "xl430-w250", norm_mode_body),
|
||||
"wrist_flex": Motor(14, "xl330-m288", norm_mode_body),
|
||||
"wrist_roll": Motor(15, "xl330-m288", norm_mode_body),
|
||||
"gripper": Motor(16, "xl330-m288", MotorNormMode.RANGE_0_100),
|
||||
},
|
||||
calibration=self.calibration,
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
|
||||
@property
|
||||
def _motors_ft(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
"""
|
||||
For OMX robots that come pre-calibrated:
|
||||
- If default calibration from package doesn't match motors, read from motors and save
|
||||
- This allows using pre-calibrated robots without manual calibration
|
||||
- If no calibration file exists, use factory default values (homing_offset=0, range_min=0, range_max=4095)
|
||||
"""
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
self.bus.connect()
|
||||
if not self.is_calibrated and calibrate:
|
||||
logger.info(
|
||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
||||
)
|
||||
self.calibrate()
|
||||
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
self.configure()
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
self.bus.disable_torque()
|
||||
logger.info(f"\nUsing factory default calibration values for {self}")
|
||||
logger.info(f"\nWriting default configuration of {self} to the motors")
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Drive_Mode", motor, DriveMode.NON_INVERTED.value)
|
||||
|
||||
self.calibration = {}
|
||||
for motor, m in self.bus.motors.items():
|
||||
self.calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=0,
|
||||
homing_offset=0,
|
||||
range_min=0,
|
||||
range_max=4095,
|
||||
)
|
||||
|
||||
self.bus.write_calibration(self.calibration)
|
||||
self._save_calibration()
|
||||
logger.info(f"Calibration saved to {self.calibration_fpath}")
|
||||
|
||||
def configure(self) -> None:
|
||||
with self.bus.torque_disabled():
|
||||
self.bus.configure_motors()
|
||||
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos
|
||||
# can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling
|
||||
# the arm, you could end up with a servo with a position 0 or 4095 at a crucial point
|
||||
for motor in self.bus.motors:
|
||||
if motor != "gripper":
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
# Use 'position control current based' for gripper to be limited by the limit of the current. For
|
||||
# the follower gripper, it means it can grasp an object without forcing too much even tho, its
|
||||
# goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
|
||||
# For the leader gripper, it means we can use it as a physical trigger, since we can force with
|
||||
# our finger to make it move, and it will move back to its original target position when we
|
||||
# release the force.
|
||||
self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
|
||||
|
||||
# Set better PID values to close the gap between recorded states and actions
|
||||
# TODO(rcadene): Implement an automatic procedure to set optimal PID values for each motor
|
||||
self.bus.write("Position_P_Gain", "elbow_flex", 1500)
|
||||
self.bus.write("Position_I_Gain", "elbow_flex", 0)
|
||||
self.bus.write("Position_D_Gain", "elbow_flex", 600)
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
for motor in reversed(self.bus.motors):
|
||||
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
|
||||
self.bus.setup_motor(motor)
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict = self.bus.sync_read("Present_Position")
|
||||
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: dict[str, float]) -> dict[str, float]:
|
||||
"""Command arm to move to a target joint configuration.
|
||||
|
||||
The relative action magnitude may be clipped depending on the configuration parameter
|
||||
`max_relative_target`. In this case, the action sent differs from original action.
|
||||
Thus, this function always returns the action actually sent.
|
||||
|
||||
Args:
|
||||
action (dict[str, float]): The goal positions for the motors.
|
||||
|
||||
Returns:
|
||||
dict[str, float]: The action sent to the motors, potentially clipped.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
|
||||
|
||||
# Cap goal position when too far away from present position.
|
||||
# /!\ Slower fps expected due to reading from the follower.
|
||||
if self.config.max_relative_target is not None:
|
||||
present_pos = self.bus.sync_read("Present_Position")
|
||||
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
|
||||
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
|
||||
|
||||
# Send goal position to the arm
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
|
||||
|
||||
def disconnect(self):
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.bus.disconnect(self.config.disable_torque_on_disconnect)
|
||||
for cam in self.cameras.values():
|
||||
cam.disconnect()
|
||||
|
||||
logger.info(f"{self} disconnected.")
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from lerobot.cameras import CameraConfig
|
||||
|
||||
from ..config import RobotConfig
|
||||
|
||||
_GAINS: dict[str, dict[str, list[float]]] = {
|
||||
@@ -51,5 +53,11 @@ class UnitreeG1Config(RobotConfig):
|
||||
|
||||
control_dt: float = 1.0 / 250.0 # 250Hz
|
||||
|
||||
# launch mujoco simulation
|
||||
is_simulation: bool = False
|
||||
|
||||
# socket config for ZMQ bridge
|
||||
robot_ip: str = "192.168.123.164"
|
||||
robot_ip: str = "172.18.129.215"
|
||||
|
||||
# cameras (optional)
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=dict)
|
||||
|
||||
@@ -0,0 +1,302 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
Standalone keyboard control script for Unitree G1 robot.
|
||||
|
||||
This script provides keyboard-based velocity control for the G1 robot's
|
||||
locomotion system. It can be run alongside the main robot control to
|
||||
provide manual movement commands.
|
||||
|
||||
Usage:
|
||||
python keyboard_control.py [--robot-ip IP] [--simulation]
|
||||
|
||||
Controls:
|
||||
W/S: Forward/Backward
|
||||
A/D: Strafe Left/Right
|
||||
Q/E: Rotate Left/Right
|
||||
R/F: Raise/Lower Height (GR00T policies only)
|
||||
Z: Stop (zero all velocity commands)
|
||||
ESC/Ctrl+C: Exit
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import select
|
||||
import time
|
||||
import numpy as np
|
||||
|
||||
# Terminal handling for non-blocking keyboard input
|
||||
try:
|
||||
import termios
|
||||
import tty
|
||||
HAS_TERMIOS = True
|
||||
except ImportError:
|
||||
HAS_TERMIOS = False
|
||||
print("Warning: termios not available. Keyboard controls require Linux/macOS.")
|
||||
|
||||
|
||||
class KeyboardController:
|
||||
"""Handles keyboard input and converts to locomotion commands."""
|
||||
|
||||
def __init__(self, callback=None):
|
||||
"""
|
||||
Initialize keyboard controller.
|
||||
|
||||
Args:
|
||||
callback: Optional function called when commands change.
|
||||
Signature: callback(vx, vy, yaw, height)
|
||||
"""
|
||||
self.callback = callback
|
||||
self.running = False
|
||||
|
||||
# Locomotion commands
|
||||
self.vx = 0.0 # Forward/backward velocity
|
||||
self.vy = 0.0 # Left/right velocity (strafe)
|
||||
self.yaw = 0.0 # Rotation rate
|
||||
self.height = 0.74 # Base height (for GR00T policies)
|
||||
|
||||
# Command limits
|
||||
self.vx_limit = (-0.8, 0.8)
|
||||
self.vy_limit = (-0.5, 0.5)
|
||||
self.yaw_limit = (-1.0, 1.0)
|
||||
self.height_limit = (0.50, 1.00)
|
||||
|
||||
# Increments per keypress
|
||||
self.vx_increment = 0.4
|
||||
self.vy_increment = 0.25
|
||||
self.yaw_increment = 0.5
|
||||
self.height_increment = 0.05
|
||||
|
||||
self._old_terminal_settings = None
|
||||
|
||||
def get_commands(self) -> tuple[float, float, float, float]:
|
||||
"""Get current command values as tuple (vx, vy, yaw, height)."""
|
||||
return (self.vx, self.vy, self.yaw, self.height)
|
||||
|
||||
def get_commands_array(self) -> np.ndarray:
|
||||
"""Get velocity commands as numpy array [vx, vy, yaw]."""
|
||||
return np.array([self.vx, self.vy, self.yaw], dtype=np.float32)
|
||||
|
||||
def reset_commands(self):
|
||||
"""Reset all commands to zero (stop)."""
|
||||
self.vx = 0.0
|
||||
self.vy = 0.0
|
||||
self.yaw = 0.0
|
||||
self._notify_callback()
|
||||
|
||||
def _clamp(self, value: float, limits: tuple[float, float]) -> float:
|
||||
"""Clamp value to limits."""
|
||||
return max(limits[0], min(limits[1], value))
|
||||
|
||||
def _notify_callback(self):
|
||||
"""Call callback with current commands if set."""
|
||||
if self.callback:
|
||||
self.callback(self.vx, self.vy, self.yaw, self.height)
|
||||
|
||||
def process_key(self, key: str) -> bool:
|
||||
"""
|
||||
Process a single key press and update commands.
|
||||
|
||||
Args:
|
||||
key: Single character key that was pressed.
|
||||
|
||||
Returns:
|
||||
True if key was handled, False otherwise.
|
||||
"""
|
||||
key = key.lower()
|
||||
handled = True
|
||||
|
||||
if key == 'w':
|
||||
self.vx = self._clamp(self.vx + self.vx_increment, self.vx_limit)
|
||||
elif key == 's':
|
||||
self.vx = self._clamp(self.vx - self.vx_increment, self.vx_limit)
|
||||
elif key == 'a':
|
||||
self.vy = self._clamp(self.vy + self.vy_increment, self.vy_limit)
|
||||
elif key == 'd':
|
||||
self.vy = self._clamp(self.vy - self.vy_increment, self.vy_limit)
|
||||
elif key == 'q':
|
||||
self.yaw = self._clamp(self.yaw + self.yaw_increment, self.yaw_limit)
|
||||
elif key == 'e':
|
||||
self.yaw = self._clamp(self.yaw - self.yaw_increment, self.yaw_limit)
|
||||
elif key == 'r':
|
||||
self.height = self._clamp(self.height + self.height_increment, self.height_limit)
|
||||
elif key == 'f':
|
||||
self.height = self._clamp(self.height - self.height_increment, self.height_limit)
|
||||
elif key == 'z':
|
||||
self.reset_commands()
|
||||
return True # Already notified in reset_commands
|
||||
else:
|
||||
handled = False
|
||||
|
||||
if handled:
|
||||
self._notify_callback()
|
||||
|
||||
return handled
|
||||
|
||||
def _setup_terminal(self):
|
||||
"""Set terminal to raw mode for single character input."""
|
||||
if HAS_TERMIOS:
|
||||
self._old_terminal_settings = termios.tcgetattr(sys.stdin)
|
||||
tty.setcbreak(sys.stdin.fileno())
|
||||
|
||||
def _restore_terminal(self):
|
||||
"""Restore terminal to original settings."""
|
||||
if HAS_TERMIOS and self._old_terminal_settings is not None:
|
||||
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._old_terminal_settings)
|
||||
self._old_terminal_settings = None
|
||||
|
||||
def run(self):
|
||||
"""Run the keyboard listener loop (blocking)."""
|
||||
if not HAS_TERMIOS:
|
||||
print("Error: Keyboard controls require termios (Linux/macOS)")
|
||||
return
|
||||
|
||||
self.running = True
|
||||
self._print_controls()
|
||||
|
||||
try:
|
||||
self._setup_terminal()
|
||||
|
||||
while self.running:
|
||||
# Check for keyboard input with timeout
|
||||
if select.select([sys.stdin], [], [], 0.1)[0]:
|
||||
key = sys.stdin.read(1)
|
||||
|
||||
# Handle escape sequences (arrow keys, etc.)
|
||||
if key == '\x1b': # ESC
|
||||
self.running = False
|
||||
break
|
||||
|
||||
if self.process_key(key):
|
||||
self._print_status()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted by user")
|
||||
finally:
|
||||
self._restore_terminal()
|
||||
print("\nKeyboard controls stopped")
|
||||
|
||||
def stop(self):
|
||||
"""Stop the keyboard listener."""
|
||||
self.running = False
|
||||
|
||||
def _print_controls(self):
|
||||
"""Print control instructions."""
|
||||
print("\n" + "=" * 60)
|
||||
print("KEYBOARD CONTROLS ACTIVE")
|
||||
print("=" * 60)
|
||||
print(" W/S: Forward/Backward")
|
||||
print(" A/D: Strafe Left/Right")
|
||||
print(" Q/E: Rotate Left/Right")
|
||||
print(" R/F: Raise/Lower Height (±5cm)")
|
||||
print(" Z: Stop (zero all commands)")
|
||||
print(" ESC: Exit")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
def _print_status(self):
|
||||
"""Print current command status."""
|
||||
print(f"[CMD] vx={self.vx:+.2f}, vy={self.vy:+.2f}, yaw={self.yaw:+.2f} | height={self.height:.3f}m")
|
||||
|
||||
|
||||
class RobotKeyboardController(KeyboardController):
|
||||
"""Keyboard controller that directly updates a robot's locomotion commands."""
|
||||
|
||||
def __init__(self, robot):
|
||||
"""
|
||||
Initialize with a UnitreeG1 robot instance.
|
||||
|
||||
Args:
|
||||
robot: UnitreeG1 robot instance with locomotion_cmd attribute.
|
||||
"""
|
||||
super().__init__()
|
||||
self.robot = robot
|
||||
|
||||
# Initialize from robot's current state if available
|
||||
if hasattr(robot, 'locomotion_cmd'):
|
||||
self.vx = robot.locomotion_cmd[0]
|
||||
self.vy = robot.locomotion_cmd[1]
|
||||
self.yaw = robot.locomotion_cmd[2]
|
||||
|
||||
if hasattr(robot, 'groot_height_cmd'):
|
||||
self.height = robot.groot_height_cmd
|
||||
|
||||
def _notify_callback(self):
|
||||
"""Update robot's locomotion commands directly."""
|
||||
if hasattr(self.robot, 'locomotion_cmd'):
|
||||
self.robot.locomotion_cmd[0] = self.vx
|
||||
self.robot.locomotion_cmd[1] = self.vy
|
||||
self.robot.locomotion_cmd[2] = self.yaw
|
||||
|
||||
if hasattr(self.robot, 'groot_height_cmd'):
|
||||
self.robot.groot_height_cmd = self.height
|
||||
|
||||
|
||||
def start_keyboard_control_thread(robot) -> tuple:
|
||||
"""
|
||||
Start keyboard controls for a robot in a background thread.
|
||||
|
||||
Args:
|
||||
robot: UnitreeG1 robot instance.
|
||||
|
||||
Returns:
|
||||
Tuple of (controller, thread) for later stopping.
|
||||
"""
|
||||
import threading
|
||||
|
||||
controller = RobotKeyboardController(robot)
|
||||
thread = threading.Thread(target=controller.run, daemon=True)
|
||||
thread.start()
|
||||
|
||||
return controller, thread
|
||||
|
||||
|
||||
def stop_keyboard_control_thread(controller, thread, timeout: float = 2.0):
|
||||
"""
|
||||
Stop the keyboard control thread.
|
||||
|
||||
Args:
|
||||
controller: KeyboardController instance.
|
||||
thread: Thread running the controller.
|
||||
timeout: Max time to wait for thread to stop.
|
||||
"""
|
||||
controller.stop()
|
||||
thread.join(timeout=timeout)
|
||||
|
||||
|
||||
def main():
|
||||
"""Standalone keyboard control with optional robot connection."""
|
||||
parser = argparse.ArgumentParser(description="Keyboard control for Unitree G1")
|
||||
parser.add_argument("--standalone", action="store_true",
|
||||
help="Run in standalone mode (just print commands, no robot)")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.standalone:
|
||||
# Standalone mode - just demonstrate keyboard input
|
||||
def print_callback(vx, vy, yaw, height):
|
||||
print(f" → Would send: vx={vx:+.2f}, vy={vy:+.2f}, yaw={yaw:+.2f}, height={height:.3f}")
|
||||
|
||||
controller = KeyboardController(callback=print_callback)
|
||||
print("Running in STANDALONE mode (no robot connection)")
|
||||
controller.run()
|
||||
else:
|
||||
print("To use with a robot, import and use RobotKeyboardController:")
|
||||
print("")
|
||||
print(" from lerobot.robots.unitree_g1.keyboard_control import (")
|
||||
print(" RobotKeyboardController,")
|
||||
print(" start_keyboard_control_thread,")
|
||||
print(" stop_keyboard_control_thread")
|
||||
print(" )")
|
||||
print("")
|
||||
print(" # Start keyboard controls")
|
||||
print(" controller, thread = start_keyboard_control_thread(robot)")
|
||||
print("")
|
||||
print(" # ... robot runs ...")
|
||||
print("")
|
||||
print(" # Stop keyboard controls")
|
||||
print(" stop_keyboard_control_thread(controller, thread)")
|
||||
print("")
|
||||
print("Or run with --standalone to test keyboard input without a robot.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -99,11 +99,12 @@ def state_forward_loop(
|
||||
lowstate_sub: ChannelSubscriber,
|
||||
lowstate_sock: zmq.Socket,
|
||||
state_period: float,
|
||||
shutdown_event: threading.Event,
|
||||
) -> None:
|
||||
"""Read observation from DDS and forward to ZMQ clients."""
|
||||
last_state_time = 0.0
|
||||
|
||||
while True:
|
||||
while not shutdown_event.is_set():
|
||||
# read from DDS
|
||||
msg = lowstate_sub.Read()
|
||||
if msg is None:
|
||||
@@ -128,7 +129,10 @@ def cmd_forward_loop(
|
||||
) -> None:
|
||||
"""Receive commands from ZMQ and forward to DDS."""
|
||||
while True:
|
||||
payload = lowcmd_sock.recv()
|
||||
try:
|
||||
payload = lowcmd_sock.recv()
|
||||
except zmq.ContextTerminated:
|
||||
break
|
||||
msg_dict = json.loads(payload.decode("utf-8"))
|
||||
|
||||
topic = msg_dict.get("topic", "")
|
||||
@@ -182,30 +186,26 @@ def main() -> None:
|
||||
lowstate_sock.bind(f"tcp://0.0.0.0:{LOWSTATE_PORT}")
|
||||
|
||||
state_period = 0.002 # ~500 hz
|
||||
shutdown_event = threading.Event()
|
||||
|
||||
# start observation forwarding thread
|
||||
# start observation forwarding in background thread
|
||||
t_state = threading.Thread(
|
||||
target=state_forward_loop,
|
||||
args=(lowstate_sub, lowstate_sock, state_period),
|
||||
daemon=True,
|
||||
args=(lowstate_sub, lowstate_sock, state_period, shutdown_event),
|
||||
)
|
||||
t_state.start()
|
||||
|
||||
# start action forwarding thread
|
||||
t_cmd = threading.Thread(
|
||||
target=cmd_forward_loop,
|
||||
args=(lowcmd_sock, lowcmd_pub_debug, crc),
|
||||
daemon=True,
|
||||
)
|
||||
t_cmd.start()
|
||||
|
||||
print("bridge running (lowstate -> zmq, lowcmd -> dds)")
|
||||
# keep main thread alive so daemon threads don't exit
|
||||
|
||||
# run command forwarding in main thread
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1.0)
|
||||
cmd_forward_loop(lowcmd_sock, lowcmd_pub_debug, crc)
|
||||
except KeyboardInterrupt:
|
||||
print("shutting down bridge...")
|
||||
finally:
|
||||
shutdown_event.set()
|
||||
ctx.term() # terminates blocking zmq.recv() calls
|
||||
t_state.join(timeout=2.0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -30,12 +30,8 @@ from unitree_sdk2py.idl.unitree_hg.msg.dds_ import (
|
||||
)
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
|
||||
from lerobot.envs.factory import make_env
|
||||
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
|
||||
from lerobot.robots.unitree_g1.unitree_sdk2_socket import (
|
||||
ChannelFactoryInitialize,
|
||||
ChannelPublisher,
|
||||
ChannelSubscriber,
|
||||
)
|
||||
|
||||
from ..robot import Robot
|
||||
from .config_unitree_g1 import UnitreeG1Config
|
||||
@@ -127,7 +123,21 @@ class UnitreeG1(Robot):
|
||||
|
||||
self.control_dt = config.control_dt
|
||||
|
||||
if config.is_simulation:
|
||||
from unitree_sdk2py.core.channel import (
|
||||
ChannelFactoryInitialize,
|
||||
ChannelPublisher,
|
||||
ChannelSubscriber,
|
||||
)
|
||||
else:
|
||||
from lerobot.robots.unitree_g1.unitree_sdk2_socket import (
|
||||
ChannelFactoryInitialize,
|
||||
ChannelPublisher,
|
||||
ChannelSubscriber,
|
||||
)
|
||||
|
||||
# connect robot
|
||||
self.ChannelFactoryInitialize = ChannelFactoryInitialize
|
||||
self.connect()
|
||||
|
||||
# initialize direct motor control interface
|
||||
@@ -138,8 +148,8 @@ class UnitreeG1(Robot):
|
||||
self.lowstate_buffer = DataBuffer()
|
||||
|
||||
# initialize subscribe thread to read robot state
|
||||
self._shutdown_event = threading.Event()
|
||||
self.subscribe_thread = threading.Thread(target=self._subscribe_motor_state)
|
||||
self.subscribe_thread.daemon = True
|
||||
self.subscribe_thread.start()
|
||||
|
||||
while not self.is_connected:
|
||||
@@ -174,7 +184,7 @@ class UnitreeG1(Robot):
|
||||
self.remote_controller = self.RemoteController()
|
||||
|
||||
def _subscribe_motor_state(self): # polls robot state @ 250Hz
|
||||
while True:
|
||||
while not self._shutdown_event.is_set():
|
||||
start_time = time.time()
|
||||
msg = self.lowstate_subscriber.Read()
|
||||
if msg is not None:
|
||||
@@ -218,10 +228,17 @@ class UnitreeG1(Robot):
|
||||
pass
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None: # connect to DDS
|
||||
ChannelFactoryInitialize(0)
|
||||
if self.config.is_simulation:
|
||||
self.ChannelFactoryInitialize(0, "lo")
|
||||
self.mujoco_env = make_env("lerobot/unitree-g1-mujoco", trust_remote_code=True)
|
||||
else:
|
||||
self.ChannelFactoryInitialize(0)
|
||||
|
||||
def disconnect(self):
|
||||
pass
|
||||
self._shutdown_event.set()
|
||||
self.subscribe_thread.join(timeout=2.0)
|
||||
if self.config.is_simulation:
|
||||
self.mujoco_env["hub_env"][0].envs[0].kill_sim()
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
return self.lowstate_buffer.get_data()
|
||||
|
||||
@@ -28,6 +28,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
|
||||
from .koch_follower import KochFollower
|
||||
|
||||
return KochFollower(config)
|
||||
elif config.type == "omx_follower":
|
||||
from .omx_follower import OmxFollower
|
||||
|
||||
return OmxFollower(config)
|
||||
elif config.type == "so100_follower":
|
||||
from .so100_follower import SO100Follower
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ from lerobot.robots import ( # noqa: F401
|
||||
koch_follower,
|
||||
lekiwi,
|
||||
make_robot_from_config,
|
||||
omx_follower,
|
||||
so100_follower,
|
||||
so101_follower,
|
||||
)
|
||||
@@ -49,6 +50,7 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
homunculus,
|
||||
koch_leader,
|
||||
make_teleoperator_from_config,
|
||||
omx_leader,
|
||||
so100_leader,
|
||||
so101_leader,
|
||||
)
|
||||
|
||||
@@ -46,6 +46,7 @@ from lerobot.robots import ( # noqa: F401
|
||||
RobotConfig,
|
||||
koch_follower,
|
||||
make_robot_from_config,
|
||||
omx_follower,
|
||||
so100_follower,
|
||||
so101_follower,
|
||||
)
|
||||
@@ -54,6 +55,7 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
gamepad,
|
||||
koch_leader,
|
||||
make_teleoperator_from_config,
|
||||
omx_leader,
|
||||
so100_leader,
|
||||
so101_leader,
|
||||
)
|
||||
|
||||
@@ -97,9 +97,11 @@ from lerobot.robots import ( # noqa: F401
|
||||
hope_jr,
|
||||
koch_follower,
|
||||
make_robot_from_config,
|
||||
omx_follower,
|
||||
so100_follower,
|
||||
so101_follower,
|
||||
)
|
||||
from lerobot.robots.unitree_g1 import config_unitree_g1 # noqa: F401
|
||||
from lerobot.teleoperators import ( # noqa: F401
|
||||
Teleoperator,
|
||||
TeleoperatorConfig,
|
||||
@@ -107,6 +109,7 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
homunculus,
|
||||
koch_leader,
|
||||
make_teleoperator_from_config,
|
||||
omx_leader,
|
||||
so100_leader,
|
||||
so101_leader,
|
||||
)
|
||||
@@ -195,9 +198,8 @@ class RecordConfig:
|
||||
cli_overrides = parser.get_cli_overrides("policy")
|
||||
self.policy = PreTrainedConfig.from_pretrained(policy_path, cli_overrides=cli_overrides)
|
||||
self.policy.pretrained_path = policy_path
|
||||
|
||||
if self.teleop is None and self.policy is None:
|
||||
raise ValueError("Choose a policy, a teleoperator or both to control the robot")
|
||||
# Note: teleop and policy can both be None for robots with built-in control (e.g. unitree_g1)
|
||||
# This is validated in record() after the robot is instantiated
|
||||
|
||||
@classmethod
|
||||
def __get_path_fields__(cls) -> list[str]:
|
||||
@@ -270,7 +272,12 @@ def record_loop(
|
||||
for t in teleop
|
||||
if isinstance(
|
||||
t,
|
||||
(so100_leader.SO100Leader | so101_leader.SO101Leader | koch_leader.KochLeader),
|
||||
(
|
||||
so100_leader.SO100Leader
|
||||
| so101_leader.SO101Leader
|
||||
| koch_leader.KochLeader
|
||||
| omx_leader.OmxLeader
|
||||
),
|
||||
)
|
||||
),
|
||||
None,
|
||||
@@ -333,6 +340,13 @@ def record_loop(
|
||||
base_action = robot._from_keyboard_to_base_action(keyboard_action)
|
||||
act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
|
||||
act_processed_teleop = teleop_action_processor((act, obs))
|
||||
elif policy is None and teleop is None and dataset is not None:
|
||||
# Observation-only recording (robot controls itself, e.g. unitree_g1)
|
||||
# Record observations, extract action-relevant values (positions) from obs
|
||||
# Filter obs_processed to only include keys that match action_features
|
||||
action_keys = set(robot.action_features.keys())
|
||||
action_values = {k: v for k, v in obs_processed.items() if k in action_keys}
|
||||
robot_action_to_send = None
|
||||
else:
|
||||
logging.info(
|
||||
"No policy or teleoperator provided, skipping action generation."
|
||||
@@ -345,15 +359,17 @@ def record_loop(
|
||||
if policy is not None and act_processed_policy is not None:
|
||||
action_values = act_processed_policy
|
||||
robot_action_to_send = robot_action_processor((act_processed_policy, obs))
|
||||
else:
|
||||
elif teleop is not None:
|
||||
action_values = act_processed_teleop
|
||||
robot_action_to_send = robot_action_processor((act_processed_teleop, obs))
|
||||
# else: observation-only mode, action_values already set above
|
||||
|
||||
# Send action to robot
|
||||
# Action can eventually be clipped using `max_relative_target`,
|
||||
# so action actually sent is saved in the dataset. action = postprocessor.process(action)
|
||||
# TODO(steven, pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot.
|
||||
_sent_action = robot.send_action(robot_action_to_send)
|
||||
# Send action to robot (skip if observation-only mode)
|
||||
if robot_action_to_send is not None:
|
||||
# Action can eventually be clipped using `max_relative_target`,
|
||||
# so action actually sent is saved in the dataset. action = postprocessor.process(action)
|
||||
# TODO(steven, pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot.
|
||||
_sent_action = robot.send_action(robot_action_to_send)
|
||||
|
||||
# Write to dataset
|
||||
if dataset is not None:
|
||||
|
||||
@@ -58,6 +58,7 @@ from lerobot.robots import ( # noqa: F401
|
||||
hope_jr,
|
||||
koch_follower,
|
||||
make_robot_from_config,
|
||||
omx_follower,
|
||||
so100_follower,
|
||||
so101_follower,
|
||||
)
|
||||
|
||||
@@ -33,6 +33,7 @@ from lerobot.robots import ( # noqa: F401
|
||||
koch_follower,
|
||||
lekiwi,
|
||||
make_robot_from_config,
|
||||
omx_follower,
|
||||
so100_follower,
|
||||
so101_follower,
|
||||
)
|
||||
@@ -40,6 +41,7 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
TeleoperatorConfig,
|
||||
koch_leader,
|
||||
make_teleoperator_from_config,
|
||||
omx_leader,
|
||||
so100_leader,
|
||||
so101_leader,
|
||||
)
|
||||
@@ -47,6 +49,8 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
COMPATIBLE_DEVICES = [
|
||||
"koch_follower",
|
||||
"koch_leader",
|
||||
"omx_follower",
|
||||
"omx_leader",
|
||||
"so100_follower",
|
||||
"so100_leader",
|
||||
"so101_follower",
|
||||
|
||||
@@ -75,6 +75,7 @@ from lerobot.robots import ( # noqa: F401
|
||||
hope_jr,
|
||||
koch_follower,
|
||||
make_robot_from_config,
|
||||
omx_follower,
|
||||
so100_follower,
|
||||
so101_follower,
|
||||
)
|
||||
@@ -87,6 +88,7 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
keyboard,
|
||||
koch_leader,
|
||||
make_teleoperator_from_config,
|
||||
omx_leader,
|
||||
so100_leader,
|
||||
so101_leader,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from .config_omx_leader import OmxLeaderConfig
|
||||
from .omx_leader import OmxLeader
|
||||
@@ -0,0 +1,30 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from ..config import TeleoperatorConfig
|
||||
|
||||
|
||||
@TeleoperatorConfig.register_subclass("omx_leader")
|
||||
@dataclass
|
||||
class OmxLeaderConfig(TeleoperatorConfig):
|
||||
# Port to connect to the arm
|
||||
port: str
|
||||
|
||||
# Sets the arm in torque mode with the gripper motor set to this value. This makes it possible to squeeze
|
||||
# the gripper and have it spring back to an open position on its own.
|
||||
gripper_open_pos: float = 37.0
|
||||
@@ -0,0 +1,165 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.motors.dynamixel import (
|
||||
DriveMode,
|
||||
DynamixelMotorsBus,
|
||||
OperatingMode,
|
||||
)
|
||||
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from .config_omx_leader import OmxLeaderConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OmxLeader(Teleoperator):
|
||||
"""
|
||||
- [OMX](https://github.com/ROBOTIS-GIT/open_manipulator),
|
||||
expansion, developed by Woojin Wie and Junha Cha from [ROBOTIS](https://ai.robotis.com/)
|
||||
"""
|
||||
|
||||
config_class = OmxLeaderConfig
|
||||
name = "omx_leader"
|
||||
|
||||
def __init__(self, config: OmxLeaderConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.bus = DynamixelMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
"shoulder_pan": Motor(1, "xl330-m288", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder_lift": Motor(2, "xl330-m288", MotorNormMode.RANGE_M100_100),
|
||||
"elbow_flex": Motor(3, "xl330-m288", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_flex": Motor(4, "xl330-m288", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_roll": Motor(5, "xl330-m288", MotorNormMode.RANGE_M100_100),
|
||||
"gripper": Motor(6, "xl330-m077", MotorNormMode.RANGE_0_100),
|
||||
},
|
||||
calibration=self.calibration,
|
||||
)
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.bus.is_connected
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
self.bus.connect()
|
||||
if not self.is_calibrated and calibrate:
|
||||
logger.info(
|
||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
||||
)
|
||||
self.calibrate()
|
||||
|
||||
self.configure()
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
self.bus.disable_torque()
|
||||
logger.info(f"\nUsing factory default calibration values for {self}")
|
||||
logger.info(f"\nWriting default configuration of {self} to the motors")
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
for motor in self.bus.motors:
|
||||
if motor == "gripper":
|
||||
self.bus.write("Drive_Mode", motor, DriveMode.INVERTED.value)
|
||||
else:
|
||||
self.bus.write("Drive_Mode", motor, DriveMode.NON_INVERTED.value)
|
||||
drive_modes = {motor: 1 if motor == "gripper" else 0 for motor in self.bus.motors}
|
||||
|
||||
self.calibration = {}
|
||||
for motor, m in self.bus.motors.items():
|
||||
self.calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=drive_modes[motor],
|
||||
homing_offset=0,
|
||||
range_min=0,
|
||||
range_max=4095,
|
||||
)
|
||||
|
||||
self.bus.write_calibration(self.calibration)
|
||||
self._save_calibration()
|
||||
logger.info(f"Calibration saved to {self.calibration_fpath}")
|
||||
|
||||
def configure(self) -> None:
|
||||
self.bus.disable_torque()
|
||||
self.bus.configure_motors()
|
||||
for motor in self.bus.motors:
|
||||
if motor != "gripper":
|
||||
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos
|
||||
# can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while
|
||||
# assembling the arm, you could end up with a servo with a position 0 or 4095 at a crucial
|
||||
# point
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
# Use 'position control current based' for gripper to be limited by the limit of the current.
|
||||
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
|
||||
# its goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
|
||||
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
|
||||
# to make it move, and it will move back to its original target position when we release the force.
|
||||
self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
|
||||
# Set gripper's goal pos in current position mode so that we can use it as a trigger.
|
||||
self.bus.enable_torque("gripper")
|
||||
if self.is_calibrated:
|
||||
self.bus.write("Goal_Position", "gripper", self.config.gripper_open_pos)
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
for motor in reversed(self.bus.motors):
|
||||
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
|
||||
self.bus.setup_motor(motor)
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
start = time.perf_counter()
|
||||
action = self.bus.sync_read("Present_Position")
|
||||
action = {f"{motor}.pos": val for motor, val in action.items()}
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||
return action
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
# TODO(rcadene, aliberts): Implement force feedback
|
||||
raise NotImplementedError
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.bus.disconnect()
|
||||
logger.info(f"{self} disconnected.")
|
||||
@@ -41,6 +41,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
|
||||
from .koch_leader import KochLeader
|
||||
|
||||
return KochLeader(config)
|
||||
elif config.type == "omx_leader":
|
||||
from .omx_leader import OmxLeader
|
||||
|
||||
return OmxLeader(config)
|
||||
elif config.type == "so100_leader":
|
||||
from .so100_leader import SO100Leader
|
||||
|
||||
|
||||
Reference in New Issue
Block a user