mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-11 22:59:50 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8e6952150f |
@@ -0,0 +1,956 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
SONIC planner with full mode control.
|
||||
|
||||
Keyboard controls:
|
||||
N / P - next / previous motion set
|
||||
1-8 - select mode within current set
|
||||
WASD - movement direction
|
||||
Q / E - rotate facing left / right
|
||||
9 / 0 - decrease / increase speed
|
||||
- / = - decrease / increase height
|
||||
R - force replan
|
||||
Space - emergency stop -> IDLE
|
||||
Esc - quit
|
||||
|
||||
Gamepad controls (Unitree wireless controller):
|
||||
Left stick Y - speed (forward = fast, back = stop)
|
||||
Left stick X - movement direction (offset from facing)
|
||||
Right stick X - facing direction (incremental rotation)
|
||||
Right stick Y - height (up = tall 0.8m, down = low 0.1m)
|
||||
Buttons - unused (mode selection is keyboard-only)
|
||||
"""
|
||||
|
||||
import argparse, gc, math, select, sys, termios, tty
|
||||
import multiprocessing as mp
|
||||
import threading, time
|
||||
from dataclasses import dataclass
|
||||
from enum import IntEnum
|
||||
|
||||
import numpy as np
|
||||
import onnxruntime as ort
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
|
||||
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
|
||||
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
|
||||
|
||||
# ── Constants ────────────────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_ANGLES = np.array([
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0,
|
||||
-0.312, 0.0, 0.0, 0.669, -0.363, 0.0,
|
||||
0.0, 0.0, 0.0,
|
||||
0.2, 0.2, 0.0, 0.6, 0.0, 0.0, 0.0,
|
||||
0.2, -0.2, 0.0, 0.6, 0.0, 0.0, 0.0,
|
||||
], dtype=np.float32)
|
||||
|
||||
NATURAL_FREQ = 10.0 * 2.0 * np.pi
|
||||
ARMATURE = {"5020": 0.003609725, "7520_14": 0.010177520, "7520_22": 0.025101925, "4010": 0.00425}
|
||||
EFFORT = {"5020": 25.0, "7520_14": 88.0, "7520_22": 139.0, "4010": 5.0}
|
||||
|
||||
def _action_scale(k):
|
||||
return 0.25 * EFFORT[k] / (ARMATURE[k] * NATURAL_FREQ**2)
|
||||
|
||||
_J = ["7520_22","7520_22","7520_14","7520_22","5020","5020"] * 2 + \
|
||||
["7520_14","5020","5020"] + \
|
||||
["5020","5020","5020","5020","5020","4010","4010"] * 2
|
||||
ACTION_SCALE = np.array([_action_scale(k) for k in _J], dtype=np.float32)
|
||||
|
||||
CONTROL_DT = 0.02
|
||||
DEFAULT_HEIGHT = 0.788740
|
||||
TOKEN_DIM = 64
|
||||
ENCODER_UPDATE_EVERY = 5
|
||||
DEBUG_PRINT_EVERY = 100
|
||||
MOTION_LOOK_AHEAD_STEPS = 2
|
||||
INITIAL_RANDOM_SEED = 1234
|
||||
MIN_TOKENS, MAX_TOKENS = 6, 16
|
||||
K = MAX_TOKENS - MIN_TOKENS + 1
|
||||
DEADZONE = 0.05
|
||||
BLEND_FRAMES = 8
|
||||
|
||||
REPLAN_INTERVAL = {
|
||||
"running": 0.1, "crawling": 0.2, "boxing": 1.0, "default": 1.0
|
||||
}
|
||||
|
||||
ISAACLAB_TO_MUJOCO = np.array([
|
||||
0, 3, 6, 9, 13, 17, 1, 4, 7, 10, 14, 18, 2, 5, 8,
|
||||
11, 15, 19, 21, 23, 25, 27, 12, 16, 20, 22, 24, 26, 28
|
||||
], dtype=np.int32)
|
||||
|
||||
MUJOCO_TO_ISAACLAB = np.array([
|
||||
0, 6, 12, 1, 7, 13, 2, 8, 14, 3, 9, 15, 22, 4, 10,
|
||||
16, 23, 5, 11, 17, 24, 18, 25, 19, 26, 20, 27, 21, 28
|
||||
], dtype=np.int32)
|
||||
|
||||
def _to_mujoco(a): return a[MUJOCO_TO_ISAACLAB]
|
||||
def _to_runtime(a): r = np.zeros(29, np.float32); r[MUJOCO_TO_ISAACLAB] = a; return r
|
||||
|
||||
DEFAULT_ANGLES_MUJOCO = _to_mujoco(DEFAULT_ANGLES)
|
||||
ENCODER_STANDING_REF = DEFAULT_ANGLES.copy()
|
||||
|
||||
LOWER_BODY_IL = np.array([0,3,6,9,13,17,1,4,7,10,14,18], dtype=np.int32)
|
||||
WRIST_IL = np.array([23,24,25,26,27,28], dtype=np.int32)
|
||||
VR_TARGET_DEF = np.zeros(9, dtype=np.float32)
|
||||
VR_ORN_DEF = np.array([1,0,0,0,1,0,0,0,1,0,0,0], dtype=np.float32)
|
||||
SMPL_DEF = np.zeros(720, dtype=np.float32)
|
||||
|
||||
# ── PD gains ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def _kp_kd():
|
||||
s = lambda k: ARMATURE[k] * NATURAL_FREQ**2
|
||||
d = lambda k: 2.0 * 2.0 * ARMATURE[k] * NATURAL_FREQ
|
||||
_kp_keys = ["7520_22","7520_22","7520_14","7520_22","5020","5020"] * 2 + \
|
||||
["7520_14","5020","5020"] + \
|
||||
["5020","5020","5020","5020","5020","4010","4010"] * 2
|
||||
_kd_keys = _kp_keys
|
||||
_double = {4,5,10,11,13,14} # ankle + waist indices with factor 2
|
||||
kp = np.array([2*s(k) if i in _double else s(k) for i,k in enumerate(_kp_keys)], dtype=np.float32)
|
||||
kd = np.array([2*d(k) if i in _double else d(k) for i,k in enumerate(_kd_keys)], dtype=np.float32)
|
||||
return kp, kd
|
||||
|
||||
# ── Quaternion helpers ────────────────────────────────────────────────────────
|
||||
|
||||
def quat_conj(q):
|
||||
return np.array([q[0], -q[1], -q[2], -q[3]], dtype=np.float32)
|
||||
|
||||
def quat_mul(q1, q2):
|
||||
w1,x1,y1,z1 = q1; w2,x2,y2,z2 = q2
|
||||
return np.array([
|
||||
w1*w2 - x1*x2 - y1*y2 - z1*z2,
|
||||
w1*x2 + x1*w2 + y1*z2 - z1*y2,
|
||||
w1*y2 - x1*z2 + y1*w2 + z1*x2,
|
||||
w1*z2 + x1*y2 - y1*x2 + z1*w2,
|
||||
], dtype=np.float32)
|
||||
|
||||
def gravity_dir(q):
|
||||
q = q / (np.linalg.norm(q) + 1e-8)
|
||||
qv = np.array([0, 0, 0, -1], dtype=np.float32)
|
||||
return quat_mul(quat_mul(quat_conj(q), qv), q)[1:]
|
||||
|
||||
def quat_to_6d(q):
|
||||
w,x,y,z = q
|
||||
return np.array([
|
||||
1-2*(y*y+z*z), 2*(x*y-z*w),
|
||||
2*(x*y+z*w), 1-2*(x*x+z*z),
|
||||
2*(x*z-y*w), 2*(y*z+x*w),
|
||||
], dtype=np.float32)
|
||||
|
||||
def calc_heading(q):
|
||||
w,x,y,z = q
|
||||
return float(np.arctan2(2*(x*y + w*z), 1-2*(y*y+z*z)))
|
||||
|
||||
def heading_quat(q, sign=1.0):
|
||||
a = sign * calc_heading(q) / 2.0
|
||||
return np.array([np.cos(a), 0, 0, np.sin(a)], dtype=np.float64)
|
||||
|
||||
heading_quat_inv = lambda q: heading_quat(q, -1.0)
|
||||
|
||||
def quat_slerp(q0, q1, t):
|
||||
q0 = q0 / (np.linalg.norm(q0)+1e-12); q1 = q1 / (np.linalg.norm(q1)+1e-12)
|
||||
dot = float(np.dot(q0, q1))
|
||||
if dot < 0: q1, dot = -q1, -dot
|
||||
dot = min(dot, 1.0)
|
||||
if dot > 0.9995:
|
||||
r = q0 + t*(q1-q0); return r/(np.linalg.norm(r)+1e-12)
|
||||
th = np.arccos(dot); st = np.sin(th)
|
||||
return (np.sin((1-t)*th)/st)*q0 + (np.sin(t*th)/st)*q1
|
||||
|
||||
def quat_slerp_batch(q0, q1, t):
|
||||
q0 = q0 / (np.linalg.norm(q0,axis=1,keepdims=True)+1e-12)
|
||||
q1 = q1 / (np.linalg.norm(q1,axis=1,keepdims=True)+1e-12)
|
||||
dot = np.sum(q0*q1, axis=1); neg = dot<0
|
||||
q1=q1.copy(); q1[neg]=-q1[neg]; dot[neg]=-dot[neg]; dot=np.clip(dot,-1,1)
|
||||
lin = dot>0.9995; th=np.arccos(dot); st=np.where(np.sin(th)==0,1,np.sin(th))
|
||||
c0=np.sin((1-t)*th)/st; c1=np.sin(t*th)/st
|
||||
c0[lin]=1-t[lin]; c1[lin]=t[lin]
|
||||
r = c0[:,None]*q0 + c1[:,None]*q1
|
||||
return r / (np.linalg.norm(r,axis=1,keepdims=True)+1e-12)
|
||||
|
||||
# ── Locomotion modes ──────────────────────────────────────────────────────────
|
||||
|
||||
class LocomotionMode(IntEnum):
|
||||
IDLE=0; SLOW_WALK=1; WALK=2; RUN=3; SQUAT=4; KNEEL_TWO_LEGS=5; KNEEL=6
|
||||
LYING_FACE_DOWN=7; CRAWLING=8; IDLE_BOXING=9; WALK_BOXING=10
|
||||
LEFT_PUNCH=11; RIGHT_PUNCH=12; RANDOM_PUNCH=13; ELBOW_CRAWLING=14
|
||||
LEFT_HOOK=15; RIGHT_HOOK=16; FORWARD_JUMP=17; STEALTH_WALK=18
|
||||
INJURED_WALK=19; LEDGE_WALKING=20; OBJECT_CARRYING=21; STEALTH_WALK_2=22
|
||||
HAPPY_DANCE_WALK=23; ZOMBIE_WALK=24; GUN_WALK=25; SCARE_WALK=26
|
||||
|
||||
LM = LocomotionMode
|
||||
|
||||
MOTION_SETS = [
|
||||
("Standing", [LM.SLOW_WALK, LM.WALK, LM.RUN, LM.FORWARD_JUMP, LM.STEALTH_WALK, LM.INJURED_WALK]),
|
||||
("Squat / Low", [LM.SQUAT, LM.KNEEL_TWO_LEGS, LM.KNEEL, LM.CRAWLING, LM.ELBOW_CRAWLING]),
|
||||
("Boxing", [LM.IDLE_BOXING, LM.WALK_BOXING, LM.LEFT_PUNCH, LM.RIGHT_PUNCH,
|
||||
LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK]),
|
||||
("Styled Walks", [LM.LEDGE_WALKING, LM.OBJECT_CARRYING, LM.STEALTH_WALK_2,
|
||||
LM.HAPPY_DANCE_WALK, LM.ZOMBIE_WALK, LM.GUN_WALK, LM.SCARE_WALK]),
|
||||
]
|
||||
|
||||
STATIC_MODES = {LM.IDLE, LM.SQUAT, LM.KNEEL_TWO_LEGS, LM.KNEEL, LM.LYING_FACE_DOWN, LM.IDLE_BOXING}
|
||||
STANDING_MODES = {LM.IDLE, LM.SLOW_WALK, LM.WALK, LM.RUN, LM.IDLE_BOXING, LM.WALK_BOXING,
|
||||
LM.LEFT_PUNCH, LM.RIGHT_PUNCH, LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK,
|
||||
LM.FORWARD_JUMP, LM.STEALTH_WALK, LM.INJURED_WALK, LM.LEDGE_WALKING,
|
||||
LM.OBJECT_CARRYING, LM.STEALTH_WALK_2, LM.HAPPY_DANCE_WALK,
|
||||
LM.ZOMBIE_WALK, LM.GUN_WALK, LM.SCARE_WALK}
|
||||
BOXING_MODES = {LM.WALK_BOXING, LM.LEFT_PUNCH, LM.RIGHT_PUNCH,
|
||||
LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK}
|
||||
SPEED_RANGES = {LM.SLOW_WALK:(0.2,0.8), LM.WALK:(0.8,1.5), LM.RUN:(1.5,3.0),
|
||||
LM.CRAWLING:(0.4,1.0), LM.ELBOW_CRAWLING:(0.7,1.0)}
|
||||
|
||||
def clamp_mode_params(ms):
|
||||
m = LM(ms.mode)
|
||||
ms.height = -1.0 if m in STANDING_MODES else max(0.1, min(0.8, ms.height if ms.height>=0 else 0.2))
|
||||
if m in STATIC_MODES:
|
||||
ms.speed = -1.0
|
||||
elif m in SPEED_RANGES:
|
||||
lo, hi = SPEED_RANGES[m]
|
||||
ms.speed = max(lo, min(hi, ms.speed if ms.speed>=0 else lo))
|
||||
elif m in BOXING_MODES:
|
||||
ms.speed = max(0.7, min(1.5, ms.speed if ms.speed>=0 else 0.7))
|
||||
else:
|
||||
ms.speed = -1.0
|
||||
|
||||
def replan_interval(mode):
|
||||
m = LM(mode)
|
||||
if m == LM.RUN: return REPLAN_INTERVAL["running"]
|
||||
if m == LM.CRAWLING: return REPLAN_INTERVAL["crawling"]
|
||||
if m in {LM.LEFT_PUNCH, LM.RIGHT_PUNCH, LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK}:
|
||||
return REPLAN_INTERVAL["boxing"]
|
||||
return REPLAN_INTERVAL["default"]
|
||||
|
||||
# ── Movement state ────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class MovementState:
|
||||
mode: int = 0
|
||||
speed: float = -1.0
|
||||
height: float = -1.0
|
||||
facing_angle: float = 0.0
|
||||
movement_angle: float = 0.0
|
||||
has_movement: bool = False
|
||||
motion_set_idx: int = 0
|
||||
needs_replan: bool = False
|
||||
|
||||
@property
|
||||
def movement_direction(self):
|
||||
if not self.has_movement: return (0.0, 0.0, 0.0)
|
||||
return (math.cos(self.movement_angle), math.sin(self.movement_angle), 0.0)
|
||||
|
||||
@property
|
||||
def facing_direction(self):
|
||||
return (math.cos(self.facing_angle), math.sin(self.facing_angle), 0.0)
|
||||
|
||||
def status_line(self):
|
||||
return (f"[{MOTION_SETS[self.motion_set_idx][0]}] mode={self.mode}({LM(self.mode).name}) "
|
||||
f"spd={'default' if self.speed<0 else f'{self.speed:.1f}'} "
|
||||
f"hgt={'default' if self.height<0 else f'{self.height:.2f}'} "
|
||||
f"facing={math.degrees(self.facing_angle):.0f}° "
|
||||
f"{'moving' if self.has_movement else 'still'}")
|
||||
|
||||
# ── Encoder / Decoder ─────────────────────────────────────────────────────────
|
||||
|
||||
class StandingEncoderDecoder:
|
||||
def __init__(self, encoder, decoder):
|
||||
self.encoder, self.decoder = encoder, decoder
|
||||
self.encoder_input = encoder.get_inputs()[0].name
|
||||
self.decoder_input = decoder.get_inputs()[0].name
|
||||
enc_dim = int(encoder.get_inputs()[0].shape[1])
|
||||
dec_dim = int(decoder.get_inputs()[0].shape[1])
|
||||
if enc_dim != 1762 or dec_dim != 994:
|
||||
raise RuntimeError(f"Unexpected dims encoder={enc_dim}, decoder={dec_dim}")
|
||||
self.token = np.zeros(TOKEN_DIM, np.float32)
|
||||
self.last_action_mj = np.zeros(29, np.float32)
|
||||
self.h_q_mj = [np.zeros(29, np.float32)] * 10
|
||||
self.h_dq_mj = [np.zeros(29, np.float32)] * 10
|
||||
self.h_ang = [np.zeros(3, np.float32)] * 10
|
||||
self.h_act_mj = [np.zeros(29, np.float32)] * 10
|
||||
self.h_quat = [np.array([1,0,0,0], np.float32)] * 10
|
||||
self.init_base_quat = np.array([1,0,0,0], np.float32)
|
||||
self.init_ref_quat = np.array([1,0,0,0], np.float32)
|
||||
self._heading_init = False
|
||||
self.encode_mode = 0
|
||||
self.vr_3point_local_target = VR_TARGET_DEF.copy()
|
||||
self.vr_3point_local_orn_target = VR_ORN_DEF.copy()
|
||||
self.smpl_joints_10frame_step1 = SMPL_DEF.copy()
|
||||
self.set_zero_reference()
|
||||
|
||||
def update_history(self, q, dq, ang, quat):
|
||||
quat = quat / (np.linalg.norm(quat)+1e-8)
|
||||
q_mj = _to_mujoco(q); dq_mj = _to_mujoco(dq)
|
||||
self.h_q_mj = [q_mj - DEFAULT_ANGLES_MUJOCO] + self.h_q_mj[:-1]
|
||||
self.h_dq_mj = [dq_mj] + self.h_dq_mj[:-1]
|
||||
self.h_ang = [ang.copy()] + self.h_ang[:-1]
|
||||
self.h_act_mj = [self.last_action_mj.copy()] + self.h_act_mj[:-1]
|
||||
self.h_quat = [quat.copy()] + self.h_quat[:-1]
|
||||
if not self._heading_init:
|
||||
self.init_base_quat = quat.copy(); self._heading_init = True
|
||||
|
||||
def _heading_quat(self, q):
|
||||
h = calc_heading(q) / 2.0
|
||||
return np.array([np.cos(h), 0, 0, np.sin(h)], np.float32)
|
||||
|
||||
def _heading_quat_inv(self, q):
|
||||
h = calc_heading(q) / 2.0
|
||||
return np.array([np.cos(-h), 0, 0, np.sin(-h)], np.float32)
|
||||
|
||||
def _anchor_6d(self, base_quat, ref_quat=None):
|
||||
if ref_quat is None: ref_quat = self.init_ref_quat
|
||||
delta = quat_mul(self._heading_quat(self.init_base_quat), self._heading_quat_inv(self.init_ref_quat))
|
||||
new_ref = quat_mul(delta, ref_quat)
|
||||
return quat_to_6d(quat_mul(quat_conj(base_quat), new_ref))
|
||||
|
||||
def set_zero_reference(self):
|
||||
self.motion_joint_positions = [ENCODER_STANDING_REF.copy()]
|
||||
self.motion_joint_velocities = [np.zeros(29, np.float32)]
|
||||
self.motion_body_quats = [np.array([1,0,0,0], np.float32)]
|
||||
self.motion_body_z = [DEFAULT_HEIGHT]
|
||||
self.motion_timesteps = 1
|
||||
self.freeze_ref_frame = 0
|
||||
self.init_ref_quat = self.motion_body_quats[0].copy()
|
||||
|
||||
def build_encoder_obs(self):
|
||||
obs = np.zeros(1762, np.float32)
|
||||
obs[0] = float(self.encode_mode)
|
||||
rf = min(self.freeze_ref_frame, self.motion_timesteps - 1)
|
||||
ref_pos, ref_quat = self.motion_joint_positions[rf], self.motion_body_quats[rf]
|
||||
if self.encode_mode == 0:
|
||||
for f in range(10):
|
||||
obs[4+29*f:4+29*(f+1)] = ref_pos
|
||||
obs[601+6*f:601+6*(f+1)] = self._anchor_6d(self.h_quat[0], ref_quat)
|
||||
elif self.encode_mode == 1:
|
||||
ref_lower = ref_pos[LOWER_BODY_IL]
|
||||
for f in range(10):
|
||||
obs[661+12*f:661+12*(f+1)] = ref_lower
|
||||
obs[901:910] = self.vr_3point_local_target
|
||||
obs[910:922] = self.vr_3point_local_orn_target
|
||||
obs[595:601] = self._anchor_6d(self.h_quat[0], ref_quat)
|
||||
elif self.encode_mode == 2:
|
||||
obs[922:1642] = self.smpl_joints_10frame_step1
|
||||
for f in range(10):
|
||||
obs[1642+6*f:1642+6*(f+1)] = self._anchor_6d(self.h_quat[0], ref_quat)
|
||||
obs[1702+6*f:1702+6*(f+1)] = ref_pos[WRIST_IL]
|
||||
else:
|
||||
raise RuntimeError(f"Unsupported encoder mode: {self.encode_mode}")
|
||||
return obs
|
||||
|
||||
def build_decoder_obs(self):
|
||||
obs = np.zeros(994, np.float32); off = 0
|
||||
obs[off:off+64] = self.token; off += 64
|
||||
for h, sz in [(list(reversed(self.h_ang)),3), (list(reversed(self.h_q_mj)),29),
|
||||
(list(reversed(self.h_dq_mj)),29), (list(reversed(self.h_act_mj)),29)]:
|
||||
for f in range(10): obs[off:off+sz] = h[f]; off += sz
|
||||
for q in reversed(self.h_quat):
|
||||
obs[off:off+3] = gravity_dir(q); off += 3
|
||||
assert off == 994, f"Decoder obs mismatch: {off}"
|
||||
return obs
|
||||
|
||||
def run_encoder(self):
|
||||
return self.encoder.run(None, {self.encoder_input: self.build_encoder_obs().reshape(1,-1)})[0].squeeze().astype(np.float32)
|
||||
|
||||
def step(self, robot_obs, update_encoder, debug=False):
|
||||
jnames = [m.name for m in G1_29_JointIndex]
|
||||
q = np.array([robot_obs.get(f"{n}.q", DEFAULT_ANGLES[m.value]) for m,n in zip(G1_29_JointIndex,jnames)], np.float32)
|
||||
dq = np.array([robot_obs.get(f"{n}.dq", 0.0) for n in jnames], np.float32)
|
||||
quat = np.array([robot_obs.get("imu.quat.w",1), robot_obs.get("imu.quat.x",0),
|
||||
robot_obs.get("imu.quat.y",0), robot_obs.get("imu.quat.z",0)], np.float32)
|
||||
ang = np.array([robot_obs.get(f"imu.gyro.{a}",0) for a in "xyz"], np.float32)
|
||||
self.update_history(q, dq, ang, quat)
|
||||
if update_encoder: self.token = self.run_encoder()
|
||||
action_mj = self.decoder.run(None, {self.decoder_input: self.build_decoder_obs().reshape(1,-1)})[0].squeeze().astype(np.float32)
|
||||
self.last_action_mj = action_mj.copy()
|
||||
target = DEFAULT_ANGLES + action_mj[ISAACLAB_TO_MUJOCO] * ACTION_SCALE
|
||||
if debug:
|
||||
delta = target - q
|
||||
print(f"token_norm={np.linalg.norm(self.token):.4f} action_norm={np.linalg.norm(action_mj):.4f} "
|
||||
f"delta_max={np.max(np.abs(delta)):.4f} delta_rms={np.sqrt(np.mean(delta**2)):.4f}")
|
||||
return {f"{m.name}.q": float(target[m.value]) for m in G1_29_JointIndex}
|
||||
|
||||
def print_input_diagnostics(self):
|
||||
print("\n[Diag] Standing reference checks")
|
||||
names = {0:"g1", 1:"teleop", 2:"smpl"}
|
||||
print(f" encoder mode: {self.encode_mode} ({names.get(self.encode_mode,'unknown')})")
|
||||
print(f" DEFAULT_ANGLES range: [{DEFAULT_ANGLES.min():+.4f}, {DEFAULT_ANGLES.max():+.4f}]")
|
||||
print(f" anchor_6d(identity): {self._anchor_6d(np.array([1,0,0,0],np.float32), np.array([1,0,0,0],np.float32))}")
|
||||
print(f" gravity(identity): {gravity_dir(np.array([1,0,0,0],np.float32))} (expect [0,0,-1])")
|
||||
dec0 = self.build_decoder_obs()
|
||||
print(f" decoder q-delta max: {np.max(np.abs(dec0[94:384])):.6f}")
|
||||
print(f" decoder dq max: {np.max(np.abs(dec0[384:674])):.6f}")
|
||||
|
||||
# ── Planner motion buffer ─────────────────────────────────────────────────────
|
||||
|
||||
class PlannerMotion:
|
||||
def __init__(self, max_frames=1500):
|
||||
self.timesteps = 0
|
||||
self.joint_positions = np.zeros((max_frames, 29), np.float64)
|
||||
self.joint_velocities = np.zeros((max_frames, 29), np.float64)
|
||||
self.body_positions = np.zeros((max_frames, 3), np.float64)
|
||||
self.body_quaternions = np.zeros((max_frames, 4), np.float64)
|
||||
self.body_quaternions[:, 0] = 1.0
|
||||
|
||||
# ── Subprocess planner ────────────────────────────────────────────────────────
|
||||
|
||||
def _resample_30_to_50(qpos, n30):
|
||||
t50 = int(np.floor(n30 / 30.0 * 50))
|
||||
f30 = np.arange(t50) / 50.0 * 30.0
|
||||
f0 = np.floor(f30).astype(int)
|
||||
f1 = np.minimum(f0+1, n30-1)
|
||||
frac, w0 = (f30-f0).astype(np.float64), None
|
||||
w0 = 1.0 - frac
|
||||
jp = (w0[:,None]*qpos[f0,7:36] + frac[:,None]*qpos[f1,7:36])[:,MUJOCO_TO_ISAACLAB]
|
||||
jv = np.zeros_like(jp)
|
||||
if t50 >= 2: jv[:t50-1] = (jp[1:] - jp[:-1]) * 50.0; jv[-1] = jv[-2]
|
||||
return {
|
||||
"timesteps": t50,
|
||||
"joint_positions": jp,
|
||||
"joint_velocities": jv,
|
||||
"body_positions": w0[:,None]*qpos[f0,:3] + frac[:,None]*qpos[f1,:3],
|
||||
"body_quaternions": quat_slerp_batch(qpos[f0,3:7], qpos[f1,3:7], frac),
|
||||
}
|
||||
|
||||
def _build_planner_inputs(ctx, ms_dict, version, seed):
|
||||
inp = {
|
||||
"context_mujoco_qpos": ctx.astype(np.float32).reshape(1,4,36),
|
||||
"target_vel": np.array([ms_dict["speed"]], np.float32),
|
||||
"mode": np.array([ms_dict["mode"]], np.int64),
|
||||
"movement_direction": np.array(ms_dict["movement_direction"], np.float32).reshape(1,3),
|
||||
"facing_direction": np.array(ms_dict["facing_direction"], np.float32).reshape(1,3),
|
||||
"random_seed": np.array([seed], np.int64),
|
||||
}
|
||||
if version >= 1:
|
||||
allowed = np.zeros((1,K), np.int64); allowed[0,:6] = 1
|
||||
inp.update({
|
||||
"height": np.array([ms_dict["height"]], np.float32),
|
||||
"has_specific_target": np.array([[0]], np.int64),
|
||||
"specific_target_positions": np.zeros((1,4,3), np.float32),
|
||||
"specific_target_headings": np.zeros((1,4), np.float32),
|
||||
"allowed_pred_num_tokens": allowed,
|
||||
})
|
||||
return inp
|
||||
|
||||
def _planner_worker(path, req_q, res_q, stop_evt, version, seed):
|
||||
so = ort.SessionOptions(); so.log_severity_level = 3
|
||||
sess = ort.InferenceSession(path, sess_options=so, providers=["CPUExecutionProvider"])
|
||||
while not stop_evt.is_set():
|
||||
try: ctx, gf, ms_dict = req_q.get(timeout=0.05)
|
||||
except Exception: continue
|
||||
try:
|
||||
inp = _build_planner_inputs(ctx, ms_dict, version, seed)
|
||||
t0 = time.time()
|
||||
qpos_out, num_pred = sess.run(None, inp)
|
||||
t_inf = time.time()
|
||||
n = int(num_pred.flat[0])
|
||||
qpos = qpos_out[0,:n]
|
||||
if np.any(np.isnan(qpos)): continue
|
||||
motion = _resample_30_to_50(qpos, n)
|
||||
motion["gen_frame"] = gf
|
||||
print(f"[Planner] inf={1000*(t_inf-t0):.1f}ms total={1000*(time.time()-t0):.1f}ms frames={n}", flush=True)
|
||||
while not res_q.empty():
|
||||
try: res_q.get_nowait()
|
||||
except Exception: break
|
||||
res_q.put(motion)
|
||||
except Exception as e:
|
||||
print(f"[Planner] Error: {e}", flush=True)
|
||||
|
||||
# ── SonicPlanner ──────────────────────────────────────────────────────────────
|
||||
|
||||
class SonicPlanner:
|
||||
def __init__(self, session, planner_path):
|
||||
self.session = session
|
||||
self.planner_path = planner_path
|
||||
self.gen_frame = 0
|
||||
self.random_seed = INITIAL_RANDOM_SEED
|
||||
self.version = 1 if len(session.get_inputs()) >= 11 else 0
|
||||
self.motion_50hz = PlannerMotion()
|
||||
self._snapshot = PlannerMotion()
|
||||
self._req_q = self._res_q = self._stop_evt = self._proc = None
|
||||
self._ctrl = None
|
||||
|
||||
def _build_inputs(self, ctx, ms):
|
||||
return _build_planner_inputs(
|
||||
ctx,
|
||||
{"mode": ms.mode, "speed": ms.speed, "height": ms.height,
|
||||
"movement_direction": list(ms.movement_direction),
|
||||
"facing_direction": list(ms.facing_direction)},
|
||||
self.version, self.random_seed)
|
||||
|
||||
@staticmethod
|
||||
def build_initial_context(joint_positions):
|
||||
ctx = np.zeros((4,36), np.float32)
|
||||
for n in range(4):
|
||||
ctx[n,2] = DEFAULT_HEIGHT; ctx[n,3] = 1.0
|
||||
ctx[n,7:36] = joint_positions.astype(np.float32)
|
||||
return ctx
|
||||
|
||||
def _context_from_controller(self, current_frame):
|
||||
ctrl = self._ctrl
|
||||
gen_frame = current_frame + MOTION_LOOK_AHEAD_STEPS
|
||||
t_arr = gen_frame/50.0 + np.arange(4)/30.0
|
||||
f50 = t_arr * 50.0
|
||||
with ctrl.motion_lock:
|
||||
ts = ctrl.motion_timesteps
|
||||
bp = ctrl.motion_body_pos[:ts].copy()
|
||||
bq = ctrl.motion_body_quats[:ts].copy()
|
||||
jp = ctrl.motion_joint_positions[:ts].copy()
|
||||
f0 = np.minimum(np.floor(f50).astype(int), ts-1)
|
||||
f1 = np.minimum(f0+1, ts-1)
|
||||
frac, w0 = f50-f0, None; w0 = 1.0-frac
|
||||
ctx = np.zeros((4,36), np.float32)
|
||||
ctx[:,0:3] = w0[:,None]*bp[f0] + frac[:,None]*bp[f1]
|
||||
ctx[:,3:7] = quat_slerp_batch(bq[f0], bq[f1], frac)
|
||||
ij = w0[:,None]*jp[f0] + frac[:,None]*jp[f1]
|
||||
ctx[:,7:36] = ij[:,ISAACLAB_TO_MUJOCO]
|
||||
self.gen_frame = gen_frame
|
||||
return ctx
|
||||
|
||||
def _load_motion_in_place(self, qpos, n30, target=None):
|
||||
if target is None: target = self.motion_50hz
|
||||
r = _resample_30_to_50(qpos, n30)
|
||||
n = r["timesteps"]; target.timesteps = n
|
||||
target.joint_positions[:n] = r["joint_positions"]
|
||||
target.joint_velocities[:n] = r["joint_velocities"]
|
||||
target.body_positions[:n] = r["body_positions"]
|
||||
target.body_quaternions[:n] = r["body_quaternions"]
|
||||
return target
|
||||
|
||||
def initialize(self, joint_positions, ms):
|
||||
ctx = self.build_initial_context(joint_positions)
|
||||
qpos_out, num_pred = self.session.run(None, self._build_inputs(ctx, ms))
|
||||
n = int(num_pred.flat[0]); qpos = qpos_out[0,:n]
|
||||
if np.any(np.isnan(qpos)): raise RuntimeError("Planner initial output contains NaN")
|
||||
print(f"[Planner] Init: {n} frames @ 30 Hz")
|
||||
self._load_motion_in_place(qpos, n)
|
||||
print(f"[Planner] Resampled to {self.motion_50hz.timesteps} frames @ 50 Hz")
|
||||
return self.motion_50hz
|
||||
|
||||
def request_replan(self, cursor, ms):
|
||||
if self._req_q is None: return
|
||||
ctx = self._context_from_controller(cursor)
|
||||
ms_dict = {"mode": ms.mode, "speed": ms.speed, "height": ms.height,
|
||||
"movement_direction": list(ms.movement_direction),
|
||||
"facing_direction": list(ms.facing_direction)}
|
||||
while not self._req_q.empty():
|
||||
try: self._req_q.get_nowait()
|
||||
except Exception: break
|
||||
self._req_q.put((ctx, self.gen_frame, ms_dict))
|
||||
|
||||
def try_get_new_motion(self):
|
||||
if self._res_q is None: return None
|
||||
result = None
|
||||
while not self._res_q.empty():
|
||||
try: result = self._res_q.get_nowait()
|
||||
except Exception: break
|
||||
if result is None: return None
|
||||
n, gf = result["timesteps"], result["gen_frame"]
|
||||
s = self._snapshot; s.timesteps = n
|
||||
s.joint_positions[:n] = result["joint_positions"]
|
||||
s.joint_velocities[:n] = result["joint_velocities"]
|
||||
s.body_positions[:n] = result["body_positions"]
|
||||
s.body_quaternions[:n] = result["body_quaternions"]
|
||||
return s, gf
|
||||
|
||||
def start_subprocess(self, controller):
|
||||
self._ctrl = controller
|
||||
self._req_q, self._res_q, self._stop_evt = mp.Queue(), mp.Queue(), mp.Event()
|
||||
self._proc = mp.Process(
|
||||
target=_planner_worker,
|
||||
args=(self.planner_path, self._req_q, self._res_q,
|
||||
self._stop_evt, self.version, self.random_seed),
|
||||
daemon=True)
|
||||
self._proc.start()
|
||||
print(f"[Planner] Background process started (PID={self._proc.pid})")
|
||||
|
||||
def stop_subprocess(self):
|
||||
if self._stop_evt: self._stop_evt.set()
|
||||
if self._proc:
|
||||
self._proc.join(timeout=3.0)
|
||||
if self._proc.is_alive(): self._proc.terminate()
|
||||
print("[Planner] Background process stopped")
|
||||
for q in (self._req_q, self._res_q):
|
||||
if q: q.close()
|
||||
|
||||
# ── PlannerController ─────────────────────────────────────────────────────────
|
||||
|
||||
class PlannerController(StandingEncoderDecoder):
|
||||
def __init__(self, planner, encoder, decoder):
|
||||
super().__init__(encoder, decoder)
|
||||
self.planner = planner
|
||||
self.ref_cursor = 0
|
||||
self.motion_timesteps = 0
|
||||
self.motion_joint_positions = np.zeros((1500,29), np.float64)
|
||||
self.motion_joint_velocities = np.zeros((1500,29), np.float64)
|
||||
self.motion_body_quats = np.zeros((1500,4), np.float64); self.motion_body_quats[:,0] = 1.0
|
||||
self.motion_body_pos = np.zeros((1500,3), np.float64)
|
||||
self.init_ref_quat = np.array([1,0,0,0], np.float64)
|
||||
self.heading_init_base_quat = np.array([1,0,0,0], np.float64)
|
||||
self.delta_heading = 0.0
|
||||
self.reinit_heading = False
|
||||
self.playing = self.first_motion = False
|
||||
self.motion_lock = threading.Lock()
|
||||
|
||||
def load_initial_motion(self, motion):
|
||||
with self.motion_lock:
|
||||
n = motion.timesteps
|
||||
self.motion_timesteps = n
|
||||
self.motion_joint_positions[:n] = motion.joint_positions[:n]
|
||||
self.motion_joint_velocities[:n] = motion.joint_velocities[:n]
|
||||
self.motion_body_quats[:n] = motion.body_quaternions[:n]
|
||||
self.motion_body_pos[:n] = motion.body_positions[:n]
|
||||
self.init_ref_quat = motion.body_quaternions[0].copy()
|
||||
self.ref_cursor = 0; self.first_motion = True
|
||||
self.playing = True; self.delta_heading = 0.0
|
||||
|
||||
def blend_new_motion(self, new_motion, gen_frame):
|
||||
with self.motion_lock:
|
||||
cur = self.ref_cursor
|
||||
new_len = gen_frame - cur + new_motion.timesteps
|
||||
if new_len <= 0: return
|
||||
f_arr = np.arange(new_len)
|
||||
f_old = np.minimum(f_arr + cur, self.motion_timesteps - 1)
|
||||
f_new = np.clip(f_arr + cur - gen_frame, 0, new_motion.timesteps - 1)
|
||||
blend_start = max(0, gen_frame - cur)
|
||||
w_new = np.clip((f_arr - blend_start) / BLEND_FRAMES if BLEND_FRAMES > 0
|
||||
else np.ones(new_len), 0.0, 1.0)
|
||||
w_old = 1.0 - w_new
|
||||
self.motion_joint_positions[:new_len] = w_old[:,None]*self.motion_joint_positions[f_old] + w_new[:,None]*new_motion.joint_positions[f_new]
|
||||
self.motion_joint_velocities[:new_len] = w_old[:,None]*self.motion_joint_velocities[f_old] + w_new[:,None]*new_motion.joint_velocities[f_new]
|
||||
self.motion_body_pos[:new_len] = w_old[:,None]*self.motion_body_pos[f_old] + w_new[:,None]*new_motion.body_positions[f_new]
|
||||
self.motion_body_quats[:new_len] = quat_slerp_batch(self.motion_body_quats[f_old], new_motion.body_quaternions[f_new], w_new)
|
||||
self.motion_timesteps = new_len; self.first_motion = False; self.ref_cursor = 0
|
||||
self.init_ref_quat = self.motion_body_quats[0].copy()
|
||||
|
||||
def _heading_apply_delta(self):
|
||||
delta = quat_mul(heading_quat(self.heading_init_base_quat).astype(np.float32),
|
||||
heading_quat_inv(self.init_ref_quat).astype(np.float32))
|
||||
if self.delta_heading:
|
||||
h = self.delta_heading / 2.0
|
||||
delta = quat_mul(np.array([np.cos(h),0,0,np.sin(h)], np.float32), delta)
|
||||
return delta
|
||||
|
||||
def _anchor_6d(self, base_quat, ref_quat=None):
|
||||
if ref_quat is None: ref_quat = self.init_ref_quat
|
||||
new_ref = quat_mul(self._heading_apply_delta(), ref_quat.astype(np.float32))
|
||||
return quat_to_6d(quat_mul(quat_conj(base_quat.astype(np.float32)), new_ref))
|
||||
|
||||
def build_encoder_obs(self):
|
||||
obs = np.zeros(1762, np.float32); obs[0] = float(self.encode_mode)
|
||||
with self.motion_lock:
|
||||
for f in range(10):
|
||||
tf = min(self.ref_cursor + f*5 if self.playing else self.ref_cursor,
|
||||
self.motion_timesteps - 1)
|
||||
obs[4+29*f:4+29*(f+1)] = self.motion_joint_positions[tf].astype(np.float32)
|
||||
if self.playing:
|
||||
obs[294+29*f:294+29*(f+1)] = self.motion_joint_velocities[tf].astype(np.float32)
|
||||
obs[601+6*f:601+6*(f+1)] = self._anchor_6d(
|
||||
self.h_quat[0], self.motion_body_quats[tf].astype(np.float32))
|
||||
return obs
|
||||
|
||||
def step(self, robot_obs, update_encoder, debug=False):
|
||||
if robot_obs and (self.first_motion or self.reinit_heading):
|
||||
q = robot_obs.get("imu.quaternion")
|
||||
if q is not None:
|
||||
self.heading_init_base_quat = np.array(q, np.float64)
|
||||
with self.motion_lock:
|
||||
rf = min(self.ref_cursor, self.motion_timesteps - 1)
|
||||
self.init_ref_quat = self.motion_body_quats[rf].copy()
|
||||
self.delta_heading = 0.0
|
||||
self.first_motion = False
|
||||
self.reinit_heading = False
|
||||
print(f"[Heading] init quat: {self.heading_init_base_quat}")
|
||||
return super().step(robot_obs, update_encoder=update_encoder, debug=debug)
|
||||
|
||||
def advance_cursor(self, wall_dt):
|
||||
if not self.playing: return
|
||||
frames = max(1, round(wall_dt / CONTROL_DT))
|
||||
with self.motion_lock:
|
||||
self.ref_cursor = min(self.ref_cursor + frames, self.motion_timesteps - 1)
|
||||
|
||||
# ── Keyboard ──────────────────────────────────────────────────────────────────
|
||||
|
||||
class RawKeyboard:
|
||||
def __init__(self):
|
||||
self.fd = sys.stdin.fileno()
|
||||
self.old = termios.tcgetattr(self.fd)
|
||||
def __enter__(self): tty.setcbreak(self.fd); return self
|
||||
def __exit__(self, *_): termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
|
||||
def get_key(self):
|
||||
return sys.stdin.read(1) if select.select([sys.stdin],[],[],0)[0] else None
|
||||
|
||||
def process_keyboard(key, ms, controller=None):
|
||||
if key is None: return False
|
||||
if key == '\x1b': return True
|
||||
if key == ' ':
|
||||
ms.mode = LM.IDLE; ms.speed = ms.height = -1.0
|
||||
ms.has_movement = False; ms.needs_replan = True
|
||||
if controller: controller.playing = False; controller.reinit_heading = True
|
||||
print("\n >> EMERGENCY STOP -> IDLE"); return False
|
||||
if key in ('r','R'):
|
||||
ms.needs_replan = True; print("\n >> Manual replan"); return False
|
||||
if key in ('n','N','p','P'):
|
||||
ms.motion_set_idx = (ms.motion_set_idx + (1 if key in ('n','N') else -1)) % len(MOTION_SETS)
|
||||
name, modes = MOTION_SETS[ms.motion_set_idx]
|
||||
print(f"\n >> Motion set: {name}")
|
||||
[print(f" {i+1}: {m.name}") for i,m in enumerate(modes)]
|
||||
return False
|
||||
if key.isdigit() and key not in ('9','0'):
|
||||
idx = int(key) - 1; modes = MOTION_SETS[ms.motion_set_idx][1]
|
||||
if 0 <= idx < len(modes):
|
||||
ms.mode = modes[idx]; ms.needs_replan = True
|
||||
if controller: controller.playing = True; controller.reinit_heading = True
|
||||
print(f"\n >> Mode: {LM(ms.mode).name} ({ms.mode}) [replanning...]")
|
||||
return False
|
||||
if key == '9':
|
||||
ms.speed = max(0.0, (ms.speed if ms.speed>=0 else 1.0) - 0.1)
|
||||
print(f"\n >> Speed: {ms.speed:.1f}"); return False
|
||||
if key == '0':
|
||||
ms.speed = min(5.0, (ms.speed if ms.speed>=0 else 1.0) + 0.1)
|
||||
print(f"\n >> Speed: {ms.speed:.1f}"); return False
|
||||
if key == '-':
|
||||
ms.height = max(0.2, (ms.height if ms.height>=0 else DEFAULT_HEIGHT) - 0.02)
|
||||
print(f"\n >> Height: {ms.height:.2f}"); return False
|
||||
if key == '=':
|
||||
ms.height = min(1.0, (ms.height if ms.height>=0 else DEFAULT_HEIGHT) + 0.02)
|
||||
print(f"\n >> Height: {ms.height:.2f}"); return False
|
||||
if key.lower() == 'w': ms.movement_angle = ms.facing_angle
|
||||
elif key.lower() == 's': ms.movement_angle = ms.facing_angle + math.pi
|
||||
elif key.lower() == 'a': ms.movement_angle = ms.facing_angle + math.pi/2
|
||||
elif key.lower() == 'd': ms.movement_angle = ms.facing_angle - math.pi/2
|
||||
if key.lower() in ('w','s','a','d'):
|
||||
ms.has_movement = ms.needs_replan = True
|
||||
elif key.lower() == 'q':
|
||||
ms.facing_angle += 0.1
|
||||
if controller: controller.delta_heading += 0.1
|
||||
print(f"\n >> Facing: {math.degrees(ms.facing_angle):.0f}°")
|
||||
elif key.lower() == 'e':
|
||||
ms.facing_angle -= 0.1
|
||||
if controller: controller.delta_heading -= 0.1
|
||||
print(f"\n >> Facing: {math.degrees(ms.facing_angle):.0f}°")
|
||||
return False
|
||||
|
||||
_joy_prev_active = False
|
||||
|
||||
|
||||
def _parse_wireless(wr):
|
||||
"""Parse wireless_remote (bytes or int-array) into (lx, ly, rx, ry)."""
|
||||
import struct as _st
|
||||
if not isinstance(wr, (bytes, bytearray)):
|
||||
wr = bytes(wr)
|
||||
if len(wr) < 24:
|
||||
return None
|
||||
lx = _st.unpack("f", wr[4:8])[0]
|
||||
rx = _st.unpack("f", wr[8:12])[0]
|
||||
ry = _st.unpack("f", wr[12:16])[0]
|
||||
ly = _st.unpack("f", wr[20:24])[0]
|
||||
return lx, ly, rx, ry
|
||||
|
||||
|
||||
def process_joystick(obs, ms, controller=None):
|
||||
"""Joystick mirrors keyboard: left stick=WASD, right stick X=Q/E, right stick Y=height."""
|
||||
global _joy_prev_active
|
||||
wr = obs.get("wireless_remote")
|
||||
if wr is None:
|
||||
return
|
||||
parsed = _parse_wireless(wr)
|
||||
if parsed is None:
|
||||
return
|
||||
lx, ly, rx, ry = parsed
|
||||
|
||||
# Dead zone + negate both Y axes (bridge already flips them once)
|
||||
lx = 0.0 if abs(lx) < DEADZONE else lx
|
||||
ly = 0.0 if abs(ly) < DEADZONE else -ly
|
||||
rx = 0.0 if abs(rx) < DEADZONE else rx
|
||||
ry = 0.0 if abs(ry) < DEADZONE else -ry
|
||||
|
||||
left_active = abs(lx) > 0 or abs(ly) > 0
|
||||
|
||||
# Left stick → WASD (movement direction relative to facing)
|
||||
if left_active:
|
||||
ms.movement_angle = ms.facing_angle + math.atan2(-lx, -ly)
|
||||
ms.has_movement = True
|
||||
if not _joy_prev_active:
|
||||
ms.needs_replan = True
|
||||
_joy_prev_active = True
|
||||
elif _joy_prev_active and not (abs(rx) > 0 or abs(ry) > 0):
|
||||
_joy_prev_active = False
|
||||
ms.has_movement = False
|
||||
|
||||
# Right stick X → Q/E (facing rotation, ~1 rad/s at full deflection)
|
||||
if abs(rx) > 0:
|
||||
delta = -0.02 * rx
|
||||
ms.facing_angle += delta
|
||||
if controller:
|
||||
controller.delta_heading += delta
|
||||
|
||||
# Right stick Y → -/= (height adjustment, ~0.25/s at full deflection)
|
||||
if abs(ry) > 0:
|
||||
step = -0.005 * ry
|
||||
ms.height = max(0.1, min(1.0, (ms.height if ms.height >= 0 else DEFAULT_HEIGHT) + step))
|
||||
|
||||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control")
|
||||
parser.add_argument("--ip", type=str, default=None,
|
||||
help="Robot IP for real hardware (e.g. 192.168.123.164). "
|
||||
"Omit for simulation.")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("=" * 60)
|
||||
print("SONIC planner - full mode control")
|
||||
print(" N/P cycle sets | 1-8 select mode | WASD move")
|
||||
print(" Q/E rotate | 9/0 speed | -/= height")
|
||||
print(" R replan | Space IDLE | Esc quit")
|
||||
if args.ip:
|
||||
print(f" Robot IP: {args.ip}")
|
||||
else:
|
||||
print(" Mode: simulation")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
planner_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="planner_sonic.onnx")
|
||||
encoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_encoder.onnx")
|
||||
decoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_decoder.onnx")
|
||||
|
||||
providers = ort.get_available_providers()
|
||||
use_gpu = "CUDAExecutionProvider" in providers
|
||||
gpu_ep = (["CUDAExecutionProvider","CPUExecutionProvider"] if use_gpu else ["CPUExecutionProvider"])
|
||||
so = ort.SessionOptions(); so.log_severity_level = 3
|
||||
|
||||
print(f"[ONNX] enc/dec={'GPU' if use_gpu else 'CPU'}, planner=CPU")
|
||||
planner_sess = ort.InferenceSession(planner_path, sess_options=so, providers=["CPUExecutionProvider"])
|
||||
encoder_sess = ort.InferenceSession(encoder_path, sess_options=so, providers=gpu_ep)
|
||||
decoder_sess = ort.InferenceSession(decoder_path, sess_options=so, providers=gpu_ep)
|
||||
print(f"[Planner] version={'v1+' if len(planner_sess.get_inputs())>=11 else 'v0'}")
|
||||
|
||||
cfg = UnitreeG1Config()
|
||||
if args.ip:
|
||||
cfg.is_simulation = False
|
||||
cfg.robot_ip = args.ip
|
||||
robot = UnitreeG1(cfg); robot.connect()
|
||||
kp, kd = _kp_kd(); robot.kp = kp.copy(); robot.kd = kd.copy()
|
||||
|
||||
ms = MovementState()
|
||||
planner = SonicPlanner(planner_sess, planner_path)
|
||||
controller = PlannerController(planner, encoder_sess, decoder_sess)
|
||||
|
||||
motion = planner.initialize(DEFAULT_ANGLES, ms)
|
||||
controller.load_initial_motion(motion)
|
||||
controller.print_input_diagnostics()
|
||||
planner.start_subprocess(controller)
|
||||
|
||||
print(f"\nStarting: {MOTION_SETS[0][0]}")
|
||||
[print(f" {i+1}: {m.name}") for i,m in enumerate(MOTION_SETS[0][1])]
|
||||
|
||||
with RawKeyboard() as kb:
|
||||
try:
|
||||
gc.disable(); gc_timer = 0.0
|
||||
robot.reset(CONTROL_DT, DEFAULT_ANGLES); time.sleep(1.0)
|
||||
|
||||
step = 0; last_status = replan_timer = 0.0
|
||||
loop_t = enc_t = dec_t = obs_t = act_t = []
|
||||
slow_n = blend_n = 0; stall_src = ""; did_blend = False
|
||||
prev_end = time.time(); t_start = time.time()
|
||||
|
||||
log_path = "/tmp/sonic_pose_log.csv"
|
||||
jnames = [m.name for m in G1_29_JointIndex]
|
||||
with open(log_path, "w") as log_f:
|
||||
log_f.write("t,step,cursor,ts,blend,mode," +
|
||||
",".join(f"q{i}" for i in range(29)) + "," +
|
||||
",".join(f"ref{i}" for i in range(29)) + "," +
|
||||
",".join(f"act{i}" for i in range(29)) +
|
||||
",delta_max,action_norm,token_norm\n")
|
||||
|
||||
while not robot._shutdown_event.is_set():
|
||||
t0 = time.time()
|
||||
if process_keyboard(kb.get_key(), ms, controller): break
|
||||
|
||||
obs = robot.get_observation(); t_obs = time.time()
|
||||
obs_t.append(1000*(t_obs - t0))
|
||||
if not obs:
|
||||
step += 1; prev_end = time.time()
|
||||
time.sleep(max(0.0, CONTROL_DT-(time.time()-t0))); continue
|
||||
|
||||
process_joystick(obs, ms, controller)
|
||||
clamp_mode_params(ms)
|
||||
|
||||
is_static = LM(ms.mode) in STATIC_MODES
|
||||
do_req = ms.needs_replan and step > 0
|
||||
if do_req: ms.needs_replan = False; replan_timer = 0.0
|
||||
elif not is_static and step > 0 and ms.speed != 0:
|
||||
replan_timer += CONTROL_DT
|
||||
if replan_timer >= replan_interval(ms.mode):
|
||||
do_req = True; replan_timer = 0.0
|
||||
if do_req: planner.request_replan(controller.ref_cursor, ms)
|
||||
|
||||
do_enc = (step % ENCODER_UPDATE_EVERY == 0)
|
||||
t_step = time.time()
|
||||
action = controller.step(obs, update_encoder=do_enc, debug=(step % DEBUG_PRINT_EVERY == 0))
|
||||
step_ms = 1000*(time.time()-t_step)
|
||||
(enc_t if do_enc else dec_t).append(step_ms)
|
||||
|
||||
t_act = time.time()
|
||||
robot.send_action(action)
|
||||
act_t.append(1000*(time.time()-t_act))
|
||||
|
||||
result = planner.try_get_new_motion()
|
||||
t_blend = time.time()
|
||||
if result:
|
||||
controller.blend_new_motion(*result)
|
||||
blend_ms = 1000*(time.time()-t_blend)
|
||||
blend_n += 1; did_blend = True
|
||||
else:
|
||||
blend_ms = 0.0
|
||||
|
||||
if step % 5 == 0:
|
||||
t_rel = time.time() - t_start
|
||||
q_r = np.array([obs.get(f"{n}.q", 0) for n in jnames])
|
||||
a_v = np.array([action.get(f"{n}.q", 0) for n in jnames])
|
||||
cur, ts = controller.ref_cursor, controller.motion_timesteps
|
||||
q_ref = controller.motion_joint_positions[min(cur,ts-1)] if ts > 0 else np.zeros(29)
|
||||
log_f.write(f"{t_rel:.4f},{step},{cur},{ts},{int(did_blend)},{ms.mode}," +
|
||||
",".join(f"{v:.6f}" for v in q_r) + "," +
|
||||
",".join(f"{v:.6f}" for v in q_ref) + "," +
|
||||
",".join(f"{v:.6f}" for v in a_v) + "," +
|
||||
f"{np.max(np.abs(a_v-q_r)):.6f},"
|
||||
f"{np.linalg.norm(a_v):.6f},"
|
||||
f"{np.linalg.norm(controller.token):.6f}\n")
|
||||
did_blend = False
|
||||
|
||||
now = time.time(); loop_ms = 1000*(now-t0)
|
||||
wall_dt = now - prev_end; loop_t.append(loop_ms)
|
||||
if loop_ms > 50:
|
||||
stall_src = (f"[STALL] {loop_ms:.0f}ms: "
|
||||
f"obs={obs_t[-1]:.0f} blend={blend_ms:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}")
|
||||
if loop_ms > CONTROL_DT*1500: slow_n += 1
|
||||
|
||||
controller.advance_cursor(wall_dt)
|
||||
|
||||
if now - last_status > 2.0:
|
||||
def _avg(l): return sum(l)/len(l) if l else 0
|
||||
hz = 1000/_avg(loop_t) if _avg(loop_t) else 0
|
||||
print(f"\r {ms.status_line()} step={step} ref={controller.ref_cursor}/{controller.motion_timesteps} "
|
||||
f"loop={_avg(loop_t):.1f}ms(max={max(loop_t,default=0):.1f}) hz={hz:.0f} "
|
||||
f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} "
|
||||
f"slow={slow_n} blends={blend_n}", end="", flush=True)
|
||||
if stall_src: print(f"\n {stall_src}"); stall_src = ""
|
||||
last_status = now
|
||||
loop_t=enc_t=dec_t=obs_t=act_t=[]; slow_n=blend_n=0
|
||||
|
||||
prev_end = time.time()
|
||||
gc_timer += CONTROL_DT
|
||||
if gc_timer >= 10.0: gc.collect(); gc_timer = 0.0
|
||||
step += 1
|
||||
time.sleep(max(0.0, CONTROL_DT-(time.time()-t0)))
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
gc.enable()
|
||||
print(f"\n[Log] Saved to {log_path}")
|
||||
planner.stop_subprocess()
|
||||
print("\nStopping...")
|
||||
if robot.is_connected: robot.disconnect()
|
||||
print("Done.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user