#!/usr/bin/env python """ SONIC planner with full mode control. Keyboard controls: N / P - next / previous motion set 1-8 - select mode within current set WASD - movement direction Q / E - rotate facing left / right 9 / 0 - decrease / increase speed - / = - decrease / increase height R - force replan Space - emergency stop -> IDLE Esc - quit Gamepad controls (Unitree wireless controller): Left stick Y - speed (forward = fast, back = stop) Left stick X - movement direction (offset from facing) Right stick X - facing direction (incremental rotation) Right stick Y - height (up = tall 0.8m, down = low 0.1m) Buttons - unused (mode selection is keyboard-only) """ import argparse, gc, math, select, sys, termios, tty import multiprocessing as mp import threading, time from dataclasses import dataclass from enum import IntEnum import numpy as np import onnxruntime as ort from huggingface_hub import hf_hub_download from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1 from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex # ── Constants ──────────────────────────────────────────────────────────────── DEFAULT_ANGLES = np.array([ -0.312, 0.0, 0.0, 0.669, -0.363, 0.0, -0.312, 0.0, 0.0, 0.669, -0.363, 0.0, 0.0, 0.0, 0.0, 0.2, 0.2, 0.0, 0.6, 0.0, 0.0, 0.0, 0.2, -0.2, 0.0, 0.6, 0.0, 0.0, 0.0, ], dtype=np.float32) NATURAL_FREQ = 10.0 * 2.0 * np.pi ARMATURE = {"5020": 0.003609725, "7520_14": 0.010177520, "7520_22": 0.025101925, "4010": 0.00425} EFFORT = {"5020": 25.0, "7520_14": 88.0, "7520_22": 139.0, "4010": 5.0} def _action_scale(k): return 0.25 * EFFORT[k] / (ARMATURE[k] * NATURAL_FREQ**2) _J = ["7520_22","7520_22","7520_14","7520_22","5020","5020"] * 2 + \ ["7520_14","5020","5020"] + \ ["5020","5020","5020","5020","5020","4010","4010"] * 2 ACTION_SCALE = np.array([_action_scale(k) for k in _J], dtype=np.float32) CONTROL_DT = 0.02 DEFAULT_HEIGHT = 0.788740 TOKEN_DIM = 64 ENCODER_UPDATE_EVERY = 5 DEBUG_PRINT_EVERY = 100 MOTION_LOOK_AHEAD_STEPS = 2 INITIAL_RANDOM_SEED = 1234 MIN_TOKENS, MAX_TOKENS = 6, 16 K = MAX_TOKENS - MIN_TOKENS + 1 DEADZONE = 0.05 BLEND_FRAMES = 8 REPLAN_INTERVAL = { "running": 0.1, "crawling": 0.2, "boxing": 1.0, "default": 1.0 } ISAACLAB_TO_MUJOCO = np.array([ 0, 3, 6, 9, 13, 17, 1, 4, 7, 10, 14, 18, 2, 5, 8, 11, 15, 19, 21, 23, 25, 27, 12, 16, 20, 22, 24, 26, 28 ], dtype=np.int32) MUJOCO_TO_ISAACLAB = np.array([ 0, 6, 12, 1, 7, 13, 2, 8, 14, 3, 9, 15, 22, 4, 10, 16, 23, 5, 11, 17, 24, 18, 25, 19, 26, 20, 27, 21, 28 ], dtype=np.int32) def _to_mujoco(a): return a[MUJOCO_TO_ISAACLAB] def _to_runtime(a): r = np.zeros(29, np.float32); r[MUJOCO_TO_ISAACLAB] = a; return r DEFAULT_ANGLES_MUJOCO = _to_mujoco(DEFAULT_ANGLES) ENCODER_STANDING_REF = DEFAULT_ANGLES.copy() LOWER_BODY_IL = np.array([0,3,6,9,13,17,1,4,7,10,14,18], dtype=np.int32) WRIST_IL = np.array([23,24,25,26,27,28], dtype=np.int32) VR_TARGET_DEF = np.zeros(9, dtype=np.float32) VR_ORN_DEF = np.array([1,0,0,0,1,0,0,0,1,0,0,0], dtype=np.float32) SMPL_DEF = np.zeros(720, dtype=np.float32) # ── PD gains ───────────────────────────────────────────────────────────────── def _kp_kd(): s = lambda k: ARMATURE[k] * NATURAL_FREQ**2 d = lambda k: 2.0 * 2.0 * ARMATURE[k] * NATURAL_FREQ _kp_keys = ["7520_22","7520_22","7520_14","7520_22","5020","5020"] * 2 + \ ["7520_14","5020","5020"] + \ ["5020","5020","5020","5020","5020","4010","4010"] * 2 _kd_keys = _kp_keys _double = {4,5,10,11,13,14} # ankle + waist indices with factor 2 kp = np.array([2*s(k) if i in _double else s(k) for i,k in enumerate(_kp_keys)], dtype=np.float32) kd = np.array([2*d(k) if i in _double else d(k) for i,k in enumerate(_kd_keys)], dtype=np.float32) return kp, kd # ── Quaternion helpers ──────────────────────────────────────────────────────── def quat_conj(q): return np.array([q[0], -q[1], -q[2], -q[3]], dtype=np.float32) def quat_mul(q1, q2): w1,x1,y1,z1 = q1; w2,x2,y2,z2 = q2 return np.array([ w1*w2 - x1*x2 - y1*y2 - z1*z2, w1*x2 + x1*w2 + y1*z2 - z1*y2, w1*y2 - x1*z2 + y1*w2 + z1*x2, w1*z2 + x1*y2 - y1*x2 + z1*w2, ], dtype=np.float32) def gravity_dir(q): q = q / (np.linalg.norm(q) + 1e-8) qv = np.array([0, 0, 0, -1], dtype=np.float32) return quat_mul(quat_mul(quat_conj(q), qv), q)[1:] def quat_to_6d(q): w,x,y,z = q return np.array([ 1-2*(y*y+z*z), 2*(x*y-z*w), 2*(x*y+z*w), 1-2*(x*x+z*z), 2*(x*z-y*w), 2*(y*z+x*w), ], dtype=np.float32) def calc_heading(q): w,x,y,z = q return float(np.arctan2(2*(x*y + w*z), 1-2*(y*y+z*z))) def heading_quat(q, sign=1.0): a = sign * calc_heading(q) / 2.0 return np.array([np.cos(a), 0, 0, np.sin(a)], dtype=np.float64) heading_quat_inv = lambda q: heading_quat(q, -1.0) def quat_slerp(q0, q1, t): q0 = q0 / (np.linalg.norm(q0)+1e-12); q1 = q1 / (np.linalg.norm(q1)+1e-12) dot = float(np.dot(q0, q1)) if dot < 0: q1, dot = -q1, -dot dot = min(dot, 1.0) if dot > 0.9995: r = q0 + t*(q1-q0); return r/(np.linalg.norm(r)+1e-12) th = np.arccos(dot); st = np.sin(th) return (np.sin((1-t)*th)/st)*q0 + (np.sin(t*th)/st)*q1 def quat_slerp_batch(q0, q1, t): q0 = q0 / (np.linalg.norm(q0,axis=1,keepdims=True)+1e-12) q1 = q1 / (np.linalg.norm(q1,axis=1,keepdims=True)+1e-12) dot = np.sum(q0*q1, axis=1); neg = dot<0 q1=q1.copy(); q1[neg]=-q1[neg]; dot[neg]=-dot[neg]; dot=np.clip(dot,-1,1) lin = dot>0.9995; th=np.arccos(dot); st=np.where(np.sin(th)==0,1,np.sin(th)) c0=np.sin((1-t)*th)/st; c1=np.sin(t*th)/st c0[lin]=1-t[lin]; c1[lin]=t[lin] r = c0[:,None]*q0 + c1[:,None]*q1 return r / (np.linalg.norm(r,axis=1,keepdims=True)+1e-12) # ── Locomotion modes ────────────────────────────────────────────────────────── class LocomotionMode(IntEnum): IDLE=0; SLOW_WALK=1; WALK=2; RUN=3; SQUAT=4; KNEEL_TWO_LEGS=5; KNEEL=6 LYING_FACE_DOWN=7; CRAWLING=8; IDLE_BOXING=9; WALK_BOXING=10 LEFT_PUNCH=11; RIGHT_PUNCH=12; RANDOM_PUNCH=13; ELBOW_CRAWLING=14 LEFT_HOOK=15; RIGHT_HOOK=16; FORWARD_JUMP=17; STEALTH_WALK=18 INJURED_WALK=19; LEDGE_WALKING=20; OBJECT_CARRYING=21; STEALTH_WALK_2=22 HAPPY_DANCE_WALK=23; ZOMBIE_WALK=24; GUN_WALK=25; SCARE_WALK=26 LM = LocomotionMode MOTION_SETS = [ ("Standing", [LM.SLOW_WALK, LM.WALK, LM.RUN, LM.FORWARD_JUMP, LM.STEALTH_WALK, LM.INJURED_WALK]), ("Squat / Low", [LM.SQUAT, LM.KNEEL_TWO_LEGS, LM.KNEEL, LM.CRAWLING, LM.ELBOW_CRAWLING]), ("Boxing", [LM.IDLE_BOXING, LM.WALK_BOXING, LM.LEFT_PUNCH, LM.RIGHT_PUNCH, LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK]), ("Styled Walks", [LM.LEDGE_WALKING, LM.OBJECT_CARRYING, LM.STEALTH_WALK_2, LM.HAPPY_DANCE_WALK, LM.ZOMBIE_WALK, LM.GUN_WALK, LM.SCARE_WALK]), ] STATIC_MODES = {LM.IDLE, LM.SQUAT, LM.KNEEL_TWO_LEGS, LM.KNEEL, LM.LYING_FACE_DOWN, LM.IDLE_BOXING} STANDING_MODES = {LM.IDLE, LM.SLOW_WALK, LM.WALK, LM.RUN, LM.IDLE_BOXING, LM.WALK_BOXING, LM.LEFT_PUNCH, LM.RIGHT_PUNCH, LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK, LM.FORWARD_JUMP, LM.STEALTH_WALK, LM.INJURED_WALK, LM.LEDGE_WALKING, LM.OBJECT_CARRYING, LM.STEALTH_WALK_2, LM.HAPPY_DANCE_WALK, LM.ZOMBIE_WALK, LM.GUN_WALK, LM.SCARE_WALK} BOXING_MODES = {LM.WALK_BOXING, LM.LEFT_PUNCH, LM.RIGHT_PUNCH, LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK} SPEED_RANGES = {LM.SLOW_WALK:(0.2,0.8), LM.WALK:(0.8,1.5), LM.RUN:(1.5,3.0), LM.CRAWLING:(0.4,1.0), LM.ELBOW_CRAWLING:(0.7,1.0)} def clamp_mode_params(ms): m = LM(ms.mode) ms.height = -1.0 if m in STANDING_MODES else max(0.1, min(0.8, ms.height if ms.height>=0 else 0.2)) if m in STATIC_MODES: ms.speed = -1.0 elif m in SPEED_RANGES: lo, hi = SPEED_RANGES[m] ms.speed = max(lo, min(hi, ms.speed if ms.speed>=0 else lo)) elif m in BOXING_MODES: ms.speed = max(0.7, min(1.5, ms.speed if ms.speed>=0 else 0.7)) else: ms.speed = -1.0 def replan_interval(mode): m = LM(mode) if m == LM.RUN: return REPLAN_INTERVAL["running"] if m == LM.CRAWLING: return REPLAN_INTERVAL["crawling"] if m in {LM.LEFT_PUNCH, LM.RIGHT_PUNCH, LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK}: return REPLAN_INTERVAL["boxing"] return REPLAN_INTERVAL["default"] # ── Movement state ──────────────────────────────────────────────────────────── @dataclass class MovementState: mode: int = 0 speed: float = -1.0 height: float = -1.0 facing_angle: float = 0.0 movement_angle: float = 0.0 has_movement: bool = False motion_set_idx: int = 0 needs_replan: bool = False @property def movement_direction(self): if not self.has_movement: return (0.0, 0.0, 0.0) return (math.cos(self.movement_angle), math.sin(self.movement_angle), 0.0) @property def facing_direction(self): return (math.cos(self.facing_angle), math.sin(self.facing_angle), 0.0) def status_line(self): return (f"[{MOTION_SETS[self.motion_set_idx][0]}] mode={self.mode}({LM(self.mode).name}) " f"spd={'default' if self.speed<0 else f'{self.speed:.1f}'} " f"hgt={'default' if self.height<0 else f'{self.height:.2f}'} " f"facing={math.degrees(self.facing_angle):.0f}° " f"{'moving' if self.has_movement else 'still'}") # ── Encoder / Decoder ───────────────────────────────────────────────────────── class StandingEncoderDecoder: def __init__(self, encoder, decoder): self.encoder, self.decoder = encoder, decoder self.encoder_input = encoder.get_inputs()[0].name self.decoder_input = decoder.get_inputs()[0].name enc_dim = int(encoder.get_inputs()[0].shape[1]) dec_dim = int(decoder.get_inputs()[0].shape[1]) if enc_dim != 1762 or dec_dim != 994: raise RuntimeError(f"Unexpected dims encoder={enc_dim}, decoder={dec_dim}") self.token = np.zeros(TOKEN_DIM, np.float32) self.last_action_mj = np.zeros(29, np.float32) self.h_q_mj = [np.zeros(29, np.float32)] * 10 self.h_dq_mj = [np.zeros(29, np.float32)] * 10 self.h_ang = [np.zeros(3, np.float32)] * 10 self.h_act_mj = [np.zeros(29, np.float32)] * 10 self.h_quat = [np.array([1,0,0,0], np.float32)] * 10 self.init_base_quat = np.array([1,0,0,0], np.float32) self.init_ref_quat = np.array([1,0,0,0], np.float32) self._heading_init = False self.encode_mode = 0 self.vr_3point_local_target = VR_TARGET_DEF.copy() self.vr_3point_local_orn_target = VR_ORN_DEF.copy() self.smpl_joints_10frame_step1 = SMPL_DEF.copy() self.set_zero_reference() def update_history(self, q, dq, ang, quat): quat = quat / (np.linalg.norm(quat)+1e-8) q_mj = _to_mujoco(q); dq_mj = _to_mujoco(dq) self.h_q_mj = [q_mj - DEFAULT_ANGLES_MUJOCO] + self.h_q_mj[:-1] self.h_dq_mj = [dq_mj] + self.h_dq_mj[:-1] self.h_ang = [ang.copy()] + self.h_ang[:-1] self.h_act_mj = [self.last_action_mj.copy()] + self.h_act_mj[:-1] self.h_quat = [quat.copy()] + self.h_quat[:-1] if not self._heading_init: self.init_base_quat = quat.copy(); self._heading_init = True def _heading_quat(self, q): h = calc_heading(q) / 2.0 return np.array([np.cos(h), 0, 0, np.sin(h)], np.float32) def _heading_quat_inv(self, q): h = calc_heading(q) / 2.0 return np.array([np.cos(-h), 0, 0, np.sin(-h)], np.float32) def _anchor_6d(self, base_quat, ref_quat=None): if ref_quat is None: ref_quat = self.init_ref_quat delta = quat_mul(self._heading_quat(self.init_base_quat), self._heading_quat_inv(self.init_ref_quat)) new_ref = quat_mul(delta, ref_quat) return quat_to_6d(quat_mul(quat_conj(base_quat), new_ref)) def set_zero_reference(self): self.motion_joint_positions = [ENCODER_STANDING_REF.copy()] self.motion_joint_velocities = [np.zeros(29, np.float32)] self.motion_body_quats = [np.array([1,0,0,0], np.float32)] self.motion_body_z = [DEFAULT_HEIGHT] self.motion_timesteps = 1 self.freeze_ref_frame = 0 self.init_ref_quat = self.motion_body_quats[0].copy() def build_encoder_obs(self): obs = np.zeros(1762, np.float32) obs[0] = float(self.encode_mode) rf = min(self.freeze_ref_frame, self.motion_timesteps - 1) ref_pos, ref_quat = self.motion_joint_positions[rf], self.motion_body_quats[rf] if self.encode_mode == 0: for f in range(10): obs[4+29*f:4+29*(f+1)] = ref_pos obs[601+6*f:601+6*(f+1)] = self._anchor_6d(self.h_quat[0], ref_quat) elif self.encode_mode == 1: ref_lower = ref_pos[LOWER_BODY_IL] for f in range(10): obs[661+12*f:661+12*(f+1)] = ref_lower obs[901:910] = self.vr_3point_local_target obs[910:922] = self.vr_3point_local_orn_target obs[595:601] = self._anchor_6d(self.h_quat[0], ref_quat) elif self.encode_mode == 2: obs[922:1642] = self.smpl_joints_10frame_step1 for f in range(10): obs[1642+6*f:1642+6*(f+1)] = self._anchor_6d(self.h_quat[0], ref_quat) obs[1702+6*f:1702+6*(f+1)] = ref_pos[WRIST_IL] else: raise RuntimeError(f"Unsupported encoder mode: {self.encode_mode}") return obs def build_decoder_obs(self): obs = np.zeros(994, np.float32); off = 0 obs[off:off+64] = self.token; off += 64 for h, sz in [(list(reversed(self.h_ang)),3), (list(reversed(self.h_q_mj)),29), (list(reversed(self.h_dq_mj)),29), (list(reversed(self.h_act_mj)),29)]: for f in range(10): obs[off:off+sz] = h[f]; off += sz for q in reversed(self.h_quat): obs[off:off+3] = gravity_dir(q); off += 3 assert off == 994, f"Decoder obs mismatch: {off}" return obs def run_encoder(self): return self.encoder.run(None, {self.encoder_input: self.build_encoder_obs().reshape(1,-1)})[0].squeeze().astype(np.float32) def step(self, robot_obs, update_encoder, debug=False): jnames = [m.name for m in G1_29_JointIndex] q = np.array([robot_obs.get(f"{n}.q", DEFAULT_ANGLES[m.value]) for m,n in zip(G1_29_JointIndex,jnames)], np.float32) dq = np.array([robot_obs.get(f"{n}.dq", 0.0) for n in jnames], np.float32) quat = np.array([robot_obs.get("imu.quat.w",1), robot_obs.get("imu.quat.x",0), robot_obs.get("imu.quat.y",0), robot_obs.get("imu.quat.z",0)], np.float32) ang = np.array([robot_obs.get(f"imu.gyro.{a}",0) for a in "xyz"], np.float32) self.update_history(q, dq, ang, quat) if update_encoder: self.token = self.run_encoder() action_mj = self.decoder.run(None, {self.decoder_input: self.build_decoder_obs().reshape(1,-1)})[0].squeeze().astype(np.float32) self.last_action_mj = action_mj.copy() target = DEFAULT_ANGLES + action_mj[ISAACLAB_TO_MUJOCO] * ACTION_SCALE if debug: delta = target - q print(f"token_norm={np.linalg.norm(self.token):.4f} action_norm={np.linalg.norm(action_mj):.4f} " f"delta_max={np.max(np.abs(delta)):.4f} delta_rms={np.sqrt(np.mean(delta**2)):.4f}") return {f"{m.name}.q": float(target[m.value]) for m in G1_29_JointIndex} def print_input_diagnostics(self): print("\n[Diag] Standing reference checks") names = {0:"g1", 1:"teleop", 2:"smpl"} print(f" encoder mode: {self.encode_mode} ({names.get(self.encode_mode,'unknown')})") print(f" DEFAULT_ANGLES range: [{DEFAULT_ANGLES.min():+.4f}, {DEFAULT_ANGLES.max():+.4f}]") print(f" anchor_6d(identity): {self._anchor_6d(np.array([1,0,0,0],np.float32), np.array([1,0,0,0],np.float32))}") print(f" gravity(identity): {gravity_dir(np.array([1,0,0,0],np.float32))} (expect [0,0,-1])") dec0 = self.build_decoder_obs() print(f" decoder q-delta max: {np.max(np.abs(dec0[94:384])):.6f}") print(f" decoder dq max: {np.max(np.abs(dec0[384:674])):.6f}") # ── Planner motion buffer ───────────────────────────────────────────────────── class PlannerMotion: def __init__(self, max_frames=1500): self.timesteps = 0 self.joint_positions = np.zeros((max_frames, 29), np.float64) self.joint_velocities = np.zeros((max_frames, 29), np.float64) self.body_positions = np.zeros((max_frames, 3), np.float64) self.body_quaternions = np.zeros((max_frames, 4), np.float64) self.body_quaternions[:, 0] = 1.0 # ── Subprocess planner ──────────────────────────────────────────────────────── def _resample_30_to_50(qpos, n30): t50 = int(np.floor(n30 / 30.0 * 50)) f30 = np.arange(t50) / 50.0 * 30.0 f0 = np.floor(f30).astype(int) f1 = np.minimum(f0+1, n30-1) frac, w0 = (f30-f0).astype(np.float64), None w0 = 1.0 - frac jp = (w0[:,None]*qpos[f0,7:36] + frac[:,None]*qpos[f1,7:36])[:,MUJOCO_TO_ISAACLAB] jv = np.zeros_like(jp) if t50 >= 2: jv[:t50-1] = (jp[1:] - jp[:-1]) * 50.0; jv[-1] = jv[-2] return { "timesteps": t50, "joint_positions": jp, "joint_velocities": jv, "body_positions": w0[:,None]*qpos[f0,:3] + frac[:,None]*qpos[f1,:3], "body_quaternions": quat_slerp_batch(qpos[f0,3:7], qpos[f1,3:7], frac), } def _build_planner_inputs(ctx, ms_dict, version, seed): inp = { "context_mujoco_qpos": ctx.astype(np.float32).reshape(1,4,36), "target_vel": np.array([ms_dict["speed"]], np.float32), "mode": np.array([ms_dict["mode"]], np.int64), "movement_direction": np.array(ms_dict["movement_direction"], np.float32).reshape(1,3), "facing_direction": np.array(ms_dict["facing_direction"], np.float32).reshape(1,3), "random_seed": np.array([seed], np.int64), } if version >= 1: allowed = np.zeros((1,K), np.int64); allowed[0,:6] = 1 inp.update({ "height": np.array([ms_dict["height"]], np.float32), "has_specific_target": np.array([[0]], np.int64), "specific_target_positions": np.zeros((1,4,3), np.float32), "specific_target_headings": np.zeros((1,4), np.float32), "allowed_pred_num_tokens": allowed, }) return inp def _planner_worker(path, req_q, res_q, stop_evt, version, seed): so = ort.SessionOptions(); so.log_severity_level = 3 sess = ort.InferenceSession(path, sess_options=so, providers=["CPUExecutionProvider"]) while not stop_evt.is_set(): try: ctx, gf, ms_dict = req_q.get(timeout=0.05) except Exception: continue try: inp = _build_planner_inputs(ctx, ms_dict, version, seed) t0 = time.time() qpos_out, num_pred = sess.run(None, inp) t_inf = time.time() n = int(num_pred.flat[0]) qpos = qpos_out[0,:n] if np.any(np.isnan(qpos)): continue motion = _resample_30_to_50(qpos, n) motion["gen_frame"] = gf print(f"[Planner] inf={1000*(t_inf-t0):.1f}ms total={1000*(time.time()-t0):.1f}ms frames={n}", flush=True) while not res_q.empty(): try: res_q.get_nowait() except Exception: break res_q.put(motion) except Exception as e: print(f"[Planner] Error: {e}", flush=True) # ── SonicPlanner ────────────────────────────────────────────────────────────── class SonicPlanner: def __init__(self, session, planner_path): self.session = session self.planner_path = planner_path self.gen_frame = 0 self.random_seed = INITIAL_RANDOM_SEED self.version = 1 if len(session.get_inputs()) >= 11 else 0 self.motion_50hz = PlannerMotion() self._snapshot = PlannerMotion() self._req_q = self._res_q = self._stop_evt = self._proc = None self._ctrl = None def _build_inputs(self, ctx, ms): return _build_planner_inputs( ctx, {"mode": ms.mode, "speed": ms.speed, "height": ms.height, "movement_direction": list(ms.movement_direction), "facing_direction": list(ms.facing_direction)}, self.version, self.random_seed) @staticmethod def build_initial_context(joint_positions): ctx = np.zeros((4,36), np.float32) for n in range(4): ctx[n,2] = DEFAULT_HEIGHT; ctx[n,3] = 1.0 ctx[n,7:36] = joint_positions.astype(np.float32) return ctx def _context_from_controller(self, current_frame): ctrl = self._ctrl gen_frame = current_frame + MOTION_LOOK_AHEAD_STEPS t_arr = gen_frame/50.0 + np.arange(4)/30.0 f50 = t_arr * 50.0 with ctrl.motion_lock: ts = ctrl.motion_timesteps bp = ctrl.motion_body_pos[:ts].copy() bq = ctrl.motion_body_quats[:ts].copy() jp = ctrl.motion_joint_positions[:ts].copy() f0 = np.minimum(np.floor(f50).astype(int), ts-1) f1 = np.minimum(f0+1, ts-1) frac, w0 = f50-f0, None; w0 = 1.0-frac ctx = np.zeros((4,36), np.float32) ctx[:,0:3] = w0[:,None]*bp[f0] + frac[:,None]*bp[f1] ctx[:,3:7] = quat_slerp_batch(bq[f0], bq[f1], frac) ij = w0[:,None]*jp[f0] + frac[:,None]*jp[f1] ctx[:,7:36] = ij[:,ISAACLAB_TO_MUJOCO] self.gen_frame = gen_frame return ctx def _load_motion_in_place(self, qpos, n30, target=None): if target is None: target = self.motion_50hz r = _resample_30_to_50(qpos, n30) n = r["timesteps"]; target.timesteps = n target.joint_positions[:n] = r["joint_positions"] target.joint_velocities[:n] = r["joint_velocities"] target.body_positions[:n] = r["body_positions"] target.body_quaternions[:n] = r["body_quaternions"] return target def initialize(self, joint_positions, ms): ctx = self.build_initial_context(joint_positions) qpos_out, num_pred = self.session.run(None, self._build_inputs(ctx, ms)) n = int(num_pred.flat[0]); qpos = qpos_out[0,:n] if np.any(np.isnan(qpos)): raise RuntimeError("Planner initial output contains NaN") print(f"[Planner] Init: {n} frames @ 30 Hz") self._load_motion_in_place(qpos, n) print(f"[Planner] Resampled to {self.motion_50hz.timesteps} frames @ 50 Hz") return self.motion_50hz def request_replan(self, cursor, ms): if self._req_q is None: return ctx = self._context_from_controller(cursor) ms_dict = {"mode": ms.mode, "speed": ms.speed, "height": ms.height, "movement_direction": list(ms.movement_direction), "facing_direction": list(ms.facing_direction)} while not self._req_q.empty(): try: self._req_q.get_nowait() except Exception: break self._req_q.put((ctx, self.gen_frame, ms_dict)) def try_get_new_motion(self): if self._res_q is None: return None result = None while not self._res_q.empty(): try: result = self._res_q.get_nowait() except Exception: break if result is None: return None n, gf = result["timesteps"], result["gen_frame"] s = self._snapshot; s.timesteps = n s.joint_positions[:n] = result["joint_positions"] s.joint_velocities[:n] = result["joint_velocities"] s.body_positions[:n] = result["body_positions"] s.body_quaternions[:n] = result["body_quaternions"] return s, gf def start_subprocess(self, controller): self._ctrl = controller self._req_q, self._res_q, self._stop_evt = mp.Queue(), mp.Queue(), mp.Event() self._proc = mp.Process( target=_planner_worker, args=(self.planner_path, self._req_q, self._res_q, self._stop_evt, self.version, self.random_seed), daemon=True) self._proc.start() print(f"[Planner] Background process started (PID={self._proc.pid})") def stop_subprocess(self): if self._stop_evt: self._stop_evt.set() if self._proc: self._proc.join(timeout=3.0) if self._proc.is_alive(): self._proc.terminate() print("[Planner] Background process stopped") for q in (self._req_q, self._res_q): if q: q.close() # ── PlannerController ───────────────────────────────────────────────────────── class PlannerController(StandingEncoderDecoder): def __init__(self, planner, encoder, decoder): super().__init__(encoder, decoder) self.planner = planner self.ref_cursor = 0 self.motion_timesteps = 0 self.motion_joint_positions = np.zeros((1500,29), np.float64) self.motion_joint_velocities = np.zeros((1500,29), np.float64) self.motion_body_quats = np.zeros((1500,4), np.float64); self.motion_body_quats[:,0] = 1.0 self.motion_body_pos = np.zeros((1500,3), np.float64) self.init_ref_quat = np.array([1,0,0,0], np.float64) self.heading_init_base_quat = np.array([1,0,0,0], np.float64) self.delta_heading = 0.0 self.reinit_heading = False self.playing = self.first_motion = False self.motion_lock = threading.Lock() def load_initial_motion(self, motion): with self.motion_lock: n = motion.timesteps self.motion_timesteps = n self.motion_joint_positions[:n] = motion.joint_positions[:n] self.motion_joint_velocities[:n] = motion.joint_velocities[:n] self.motion_body_quats[:n] = motion.body_quaternions[:n] self.motion_body_pos[:n] = motion.body_positions[:n] self.init_ref_quat = motion.body_quaternions[0].copy() self.ref_cursor = 0; self.first_motion = True self.playing = True; self.delta_heading = 0.0 def blend_new_motion(self, new_motion, gen_frame): with self.motion_lock: cur = self.ref_cursor new_len = gen_frame - cur + new_motion.timesteps if new_len <= 0: return f_arr = np.arange(new_len) f_old = np.minimum(f_arr + cur, self.motion_timesteps - 1) f_new = np.clip(f_arr + cur - gen_frame, 0, new_motion.timesteps - 1) blend_start = max(0, gen_frame - cur) w_new = np.clip((f_arr - blend_start) / BLEND_FRAMES if BLEND_FRAMES > 0 else np.ones(new_len), 0.0, 1.0) w_old = 1.0 - w_new self.motion_joint_positions[:new_len] = w_old[:,None]*self.motion_joint_positions[f_old] + w_new[:,None]*new_motion.joint_positions[f_new] self.motion_joint_velocities[:new_len] = w_old[:,None]*self.motion_joint_velocities[f_old] + w_new[:,None]*new_motion.joint_velocities[f_new] self.motion_body_pos[:new_len] = w_old[:,None]*self.motion_body_pos[f_old] + w_new[:,None]*new_motion.body_positions[f_new] self.motion_body_quats[:new_len] = quat_slerp_batch(self.motion_body_quats[f_old], new_motion.body_quaternions[f_new], w_new) self.motion_timesteps = new_len; self.first_motion = False; self.ref_cursor = 0 self.init_ref_quat = self.motion_body_quats[0].copy() def _heading_apply_delta(self): delta = quat_mul(heading_quat(self.heading_init_base_quat).astype(np.float32), heading_quat_inv(self.init_ref_quat).astype(np.float32)) if self.delta_heading: h = self.delta_heading / 2.0 delta = quat_mul(np.array([np.cos(h),0,0,np.sin(h)], np.float32), delta) return delta def _anchor_6d(self, base_quat, ref_quat=None): if ref_quat is None: ref_quat = self.init_ref_quat new_ref = quat_mul(self._heading_apply_delta(), ref_quat.astype(np.float32)) return quat_to_6d(quat_mul(quat_conj(base_quat.astype(np.float32)), new_ref)) def build_encoder_obs(self): obs = np.zeros(1762, np.float32); obs[0] = float(self.encode_mode) with self.motion_lock: for f in range(10): tf = min(self.ref_cursor + f*5 if self.playing else self.ref_cursor, self.motion_timesteps - 1) obs[4+29*f:4+29*(f+1)] = self.motion_joint_positions[tf].astype(np.float32) if self.playing: obs[294+29*f:294+29*(f+1)] = self.motion_joint_velocities[tf].astype(np.float32) obs[601+6*f:601+6*(f+1)] = self._anchor_6d( self.h_quat[0], self.motion_body_quats[tf].astype(np.float32)) return obs def step(self, robot_obs, update_encoder, debug=False): if robot_obs and (self.first_motion or self.reinit_heading): q = robot_obs.get("imu.quaternion") if q is not None: self.heading_init_base_quat = np.array(q, np.float64) with self.motion_lock: rf = min(self.ref_cursor, self.motion_timesteps - 1) self.init_ref_quat = self.motion_body_quats[rf].copy() self.delta_heading = 0.0 self.first_motion = False self.reinit_heading = False print(f"[Heading] init quat: {self.heading_init_base_quat}") return super().step(robot_obs, update_encoder=update_encoder, debug=debug) def advance_cursor(self, wall_dt): if not self.playing: return frames = max(1, round(wall_dt / CONTROL_DT)) with self.motion_lock: self.ref_cursor = min(self.ref_cursor + frames, self.motion_timesteps - 1) # ── Keyboard ────────────────────────────────────────────────────────────────── class RawKeyboard: def __init__(self): self.fd = sys.stdin.fileno() self.old = termios.tcgetattr(self.fd) def __enter__(self): tty.setcbreak(self.fd); return self def __exit__(self, *_): termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old) def get_key(self): return sys.stdin.read(1) if select.select([sys.stdin],[],[],0)[0] else None def process_keyboard(key, ms, controller=None): if key is None: return False if key == '\x1b': return True if key == ' ': ms.mode = LM.IDLE; ms.speed = ms.height = -1.0 ms.has_movement = False; ms.needs_replan = True if controller: controller.playing = False; controller.reinit_heading = True print("\n >> EMERGENCY STOP -> IDLE"); return False if key in ('r','R'): ms.needs_replan = True; print("\n >> Manual replan"); return False if key in ('n','N','p','P'): ms.motion_set_idx = (ms.motion_set_idx + (1 if key in ('n','N') else -1)) % len(MOTION_SETS) name, modes = MOTION_SETS[ms.motion_set_idx] print(f"\n >> Motion set: {name}") [print(f" {i+1}: {m.name}") for i,m in enumerate(modes)] return False if key.isdigit() and key not in ('9','0'): idx = int(key) - 1; modes = MOTION_SETS[ms.motion_set_idx][1] if 0 <= idx < len(modes): ms.mode = modes[idx]; ms.needs_replan = True if controller: controller.playing = True; controller.reinit_heading = True print(f"\n >> Mode: {LM(ms.mode).name} ({ms.mode}) [replanning...]") return False if key == '9': ms.speed = max(0.0, (ms.speed if ms.speed>=0 else 1.0) - 0.1) print(f"\n >> Speed: {ms.speed:.1f}"); return False if key == '0': ms.speed = min(5.0, (ms.speed if ms.speed>=0 else 1.0) + 0.1) print(f"\n >> Speed: {ms.speed:.1f}"); return False if key == '-': ms.height = max(0.2, (ms.height if ms.height>=0 else DEFAULT_HEIGHT) - 0.02) print(f"\n >> Height: {ms.height:.2f}"); return False if key == '=': ms.height = min(1.0, (ms.height if ms.height>=0 else DEFAULT_HEIGHT) + 0.02) print(f"\n >> Height: {ms.height:.2f}"); return False if key.lower() == 'w': ms.movement_angle = ms.facing_angle elif key.lower() == 's': ms.movement_angle = ms.facing_angle + math.pi elif key.lower() == 'a': ms.movement_angle = ms.facing_angle + math.pi/2 elif key.lower() == 'd': ms.movement_angle = ms.facing_angle - math.pi/2 if key.lower() in ('w','s','a','d'): ms.has_movement = ms.needs_replan = True elif key.lower() == 'q': ms.facing_angle += 0.1 if controller: controller.delta_heading += 0.1 print(f"\n >> Facing: {math.degrees(ms.facing_angle):.0f}°") elif key.lower() == 'e': ms.facing_angle -= 0.1 if controller: controller.delta_heading -= 0.1 print(f"\n >> Facing: {math.degrees(ms.facing_angle):.0f}°") return False _joy_prev_active = False def _parse_wireless(wr): """Parse wireless_remote (bytes or int-array) into (lx, ly, rx, ry).""" import struct as _st if not isinstance(wr, (bytes, bytearray)): wr = bytes(wr) if len(wr) < 24: return None lx = _st.unpack("f", wr[4:8])[0] rx = _st.unpack("f", wr[8:12])[0] ry = _st.unpack("f", wr[12:16])[0] ly = _st.unpack("f", wr[20:24])[0] return lx, ly, rx, ry def process_joystick(obs, ms, controller=None): """Joystick mirrors keyboard: left stick=WASD, right stick X=Q/E, right stick Y=height.""" global _joy_prev_active wr = obs.get("wireless_remote") if wr is None: return parsed = _parse_wireless(wr) if parsed is None: return lx, ly, rx, ry = parsed # Dead zone + negate both Y axes (bridge already flips them once) lx = 0.0 if abs(lx) < DEADZONE else lx ly = 0.0 if abs(ly) < DEADZONE else -ly rx = 0.0 if abs(rx) < DEADZONE else rx ry = 0.0 if abs(ry) < DEADZONE else -ry left_active = abs(lx) > 0 or abs(ly) > 0 # Left stick → WASD (movement direction relative to facing) if left_active: ms.movement_angle = ms.facing_angle + math.atan2(-lx, -ly) ms.has_movement = True if not _joy_prev_active: ms.needs_replan = True _joy_prev_active = True elif _joy_prev_active and not (abs(rx) > 0 or abs(ry) > 0): _joy_prev_active = False ms.has_movement = False # Right stick X → Q/E (facing rotation, ~1 rad/s at full deflection) if abs(rx) > 0: delta = -0.02 * rx ms.facing_angle += delta if controller: controller.delta_heading += delta # Right stick Y → -/= (height adjustment, ~0.25/s at full deflection) if abs(ry) > 0: step = -0.005 * ry ms.height = max(0.1, min(1.0, (ms.height if ms.height >= 0 else DEFAULT_HEIGHT) + step)) # ── Main ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control") parser.add_argument("--ip", type=str, default=None, help="Robot IP for real hardware (e.g. 192.168.123.164). " "Omit for simulation.") args = parser.parse_args() print("=" * 60) print("SONIC planner - full mode control") print(" N/P cycle sets | 1-8 select mode | WASD move") print(" Q/E rotate | 9/0 speed | -/= height") print(" R replan | Space IDLE | Esc quit") if args.ip: print(f" Robot IP: {args.ip}") else: print(" Mode: simulation") print("=" * 60 + "\n") planner_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="planner_sonic.onnx") encoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_encoder.onnx") decoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_decoder.onnx") providers = ort.get_available_providers() use_gpu = "CUDAExecutionProvider" in providers gpu_ep = (["CUDAExecutionProvider","CPUExecutionProvider"] if use_gpu else ["CPUExecutionProvider"]) so = ort.SessionOptions(); so.log_severity_level = 3 print(f"[ONNX] enc/dec={'GPU' if use_gpu else 'CPU'}, planner=CPU") planner_sess = ort.InferenceSession(planner_path, sess_options=so, providers=["CPUExecutionProvider"]) encoder_sess = ort.InferenceSession(encoder_path, sess_options=so, providers=gpu_ep) decoder_sess = ort.InferenceSession(decoder_path, sess_options=so, providers=gpu_ep) print(f"[Planner] version={'v1+' if len(planner_sess.get_inputs())>=11 else 'v0'}") cfg = UnitreeG1Config() if args.ip: cfg.is_simulation = False cfg.robot_ip = args.ip robot = UnitreeG1(cfg); robot.connect() kp, kd = _kp_kd(); robot.kp = kp.copy(); robot.kd = kd.copy() ms = MovementState() planner = SonicPlanner(planner_sess, planner_path) controller = PlannerController(planner, encoder_sess, decoder_sess) motion = planner.initialize(DEFAULT_ANGLES, ms) controller.load_initial_motion(motion) controller.print_input_diagnostics() planner.start_subprocess(controller) print(f"\nStarting: {MOTION_SETS[0][0]}") [print(f" {i+1}: {m.name}") for i,m in enumerate(MOTION_SETS[0][1])] with RawKeyboard() as kb: try: gc.disable(); gc_timer = 0.0 robot.reset(CONTROL_DT, DEFAULT_ANGLES); time.sleep(1.0) step = 0; last_status = replan_timer = 0.0 loop_t = enc_t = dec_t = obs_t = act_t = [] slow_n = blend_n = 0; stall_src = ""; did_blend = False prev_end = time.time(); t_start = time.time() log_path = "/tmp/sonic_pose_log.csv" jnames = [m.name for m in G1_29_JointIndex] with open(log_path, "w") as log_f: log_f.write("t,step,cursor,ts,blend,mode," + ",".join(f"q{i}" for i in range(29)) + "," + ",".join(f"ref{i}" for i in range(29)) + "," + ",".join(f"act{i}" for i in range(29)) + ",delta_max,action_norm,token_norm\n") while not robot._shutdown_event.is_set(): t0 = time.time() if process_keyboard(kb.get_key(), ms, controller): break obs = robot.get_observation(); t_obs = time.time() obs_t.append(1000*(t_obs - t0)) if not obs: step += 1; prev_end = time.time() time.sleep(max(0.0, CONTROL_DT-(time.time()-t0))); continue process_joystick(obs, ms, controller) clamp_mode_params(ms) is_static = LM(ms.mode) in STATIC_MODES do_req = ms.needs_replan and step > 0 if do_req: ms.needs_replan = False; replan_timer = 0.0 elif not is_static and step > 0 and ms.speed != 0: replan_timer += CONTROL_DT if replan_timer >= replan_interval(ms.mode): do_req = True; replan_timer = 0.0 if do_req: planner.request_replan(controller.ref_cursor, ms) do_enc = (step % ENCODER_UPDATE_EVERY == 0) t_step = time.time() action = controller.step(obs, update_encoder=do_enc, debug=(step % DEBUG_PRINT_EVERY == 0)) step_ms = 1000*(time.time()-t_step) (enc_t if do_enc else dec_t).append(step_ms) t_act = time.time() robot.send_action(action) act_t.append(1000*(time.time()-t_act)) result = planner.try_get_new_motion() t_blend = time.time() if result: controller.blend_new_motion(*result) blend_ms = 1000*(time.time()-t_blend) blend_n += 1; did_blend = True else: blend_ms = 0.0 if step % 5 == 0: t_rel = time.time() - t_start q_r = np.array([obs.get(f"{n}.q", 0) for n in jnames]) a_v = np.array([action.get(f"{n}.q", 0) for n in jnames]) cur, ts = controller.ref_cursor, controller.motion_timesteps q_ref = controller.motion_joint_positions[min(cur,ts-1)] if ts > 0 else np.zeros(29) log_f.write(f"{t_rel:.4f},{step},{cur},{ts},{int(did_blend)},{ms.mode}," + ",".join(f"{v:.6f}" for v in q_r) + "," + ",".join(f"{v:.6f}" for v in q_ref) + "," + ",".join(f"{v:.6f}" for v in a_v) + "," + f"{np.max(np.abs(a_v-q_r)):.6f}," f"{np.linalg.norm(a_v):.6f}," f"{np.linalg.norm(controller.token):.6f}\n") did_blend = False now = time.time(); loop_ms = 1000*(now-t0) wall_dt = now - prev_end; loop_t.append(loop_ms) if loop_ms > 50: stall_src = (f"[STALL] {loop_ms:.0f}ms: " f"obs={obs_t[-1]:.0f} blend={blend_ms:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}") if loop_ms > CONTROL_DT*1500: slow_n += 1 controller.advance_cursor(wall_dt) if now - last_status > 2.0: def _avg(l): return sum(l)/len(l) if l else 0 hz = 1000/_avg(loop_t) if _avg(loop_t) else 0 print(f"\r {ms.status_line()} step={step} ref={controller.ref_cursor}/{controller.motion_timesteps} " f"loop={_avg(loop_t):.1f}ms(max={max(loop_t,default=0):.1f}) hz={hz:.0f} " f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} " f"slow={slow_n} blends={blend_n}", end="", flush=True) if stall_src: print(f"\n {stall_src}"); stall_src = "" last_status = now loop_t=enc_t=dec_t=obs_t=act_t=[]; slow_n=blend_n=0 prev_end = time.time() gc_timer += CONTROL_DT if gc_timer >= 10.0: gc.collect(); gc_timer = 0.0 step += 1 time.sleep(max(0.0, CONTROL_DT-(time.time()-t0))) except KeyboardInterrupt: pass finally: gc.enable() print(f"\n[Log] Saved to {log_path}") planner.stop_subprocess() print("\nStopping...") if robot.is_connected: robot.disconnect() print("Done.") if __name__ == "__main__": main()