diff --git a/examples/unitree_g1/motion_loader.py b/examples/unitree_g1/motion_loader.py index c4c7e6267..05e867cdc 100644 --- a/examples/unitree_g1/motion_loader.py +++ b/examples/unitree_g1/motion_loader.py @@ -66,12 +66,12 @@ def canonicalize_smpl_joints(smpl_joints: np.ndarray, root_aa: np.ndarray) -> np Returns: (T, 24, 3) per-frame root-orientation-removed joints. """ - from scipy.spatial.transform import Rotation as R + from scipy.spatial.transform import Rotation - rx90 = R.from_euler("x", 90, degrees=True) # smpl_root_ytoz_up - base120 = R.from_quat([0.5, 0.5, 0.5, 0.5]) # remove_smpl_base_rot - a = rx90 * R.from_rotvec(root_aa) # z-up root quat (left-mult) - b_inv = base120 * a.inv() # inv(remove_smpl_base_rot(a)) + rx90 = Rotation.from_euler("x", 90, degrees=True) # smpl_root_ytoz_up + base120 = Rotation.from_quat([0.5, 0.5, 0.5, 0.5]) # remove_smpl_base_rot + a = rx90 * Rotation.from_rotvec(root_aa) # z-up root quat (left-mult) + b_inv = base120 * a.inv() # inv(remove_smpl_base_rot(a)) return np.einsum("tij,tkj->tki", b_inv.as_matrix(), smpl_joints).astype(np.float32) @@ -87,9 +87,7 @@ class SmplMotion: self.loop = loop if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (N_JOINTS, JOINT_DIM): - raise ValueError( - f"Expected smpl_joints (T, {N_JOINTS}, {JOINT_DIM}), got {smpl_joints.shape}" - ) + raise ValueError(f"Expected smpl_joints (T, {N_JOINTS}, {JOINT_DIM}), got {smpl_joints.shape}") # Reference clips store world-frame joints; the encoder wants per-frame # root-orientation-removed joints. Canonicalize when we have the root pose. @@ -109,10 +107,7 @@ class SmplMotion: [f0_j0_xyz, f0_j1_xyz, ..., f9_j23_xyz]. """ idx = np.arange(start, start + WINDOW) - if self.loop: - idx = np.mod(idx, self.num_frames) - else: - idx = np.clip(idx, 0, self.num_frames - 1) + idx = np.mod(idx, self.num_frames) if self.loop else np.clip(idx, 0, self.num_frames - 1) return self.smpl_joints[idx].reshape(-1).astype(np.float32) def reset(self): @@ -135,30 +130,31 @@ def main(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--motion", required=True, help="Path to motion .npz") parser.add_argument("--no-loop", action="store_true") - parser.add_argument("--no-canon", action="store_true", - help="Skip canonicalization (feed raw stored joints)") + parser.add_argument( + "--no-canon", action="store_true", help="Skip canonicalization (feed raw stored joints)" + ) args = parser.parse_args() m = SmplMotion(args.motion, loop=not args.no_loop, canonicalize=not args.no_canon) duration = m.num_frames / m.fps print(f"Loaded '{args.motion}'") print(f" frames={m.num_frames} fps={m.fps:.1f} duration={duration:.1f}s") - print(f" smpl_joints={m.smpl_joints.shape} canonicalized={m.canonicalized} " - f"pose_aa={None if m.pose_aa is None else m.pose_aa.shape} " - f"transl={None if m.transl is None else m.transl.shape}") + print( + f" smpl_joints={m.smpl_joints.shape} canonicalized={m.canonicalized} " + f"pose_aa={None if m.pose_aa is None else m.pose_aa.shape} " + f"transl={None if m.transl is None else m.transl.shape}" + ) # Sanity: after canonicalization the per-frame body heading should be fixed. j = m.smpl_joints - v = (j[:, 2, :2] - j[:, 1, :2]) # R_hip - L_hip, horizontal + v = j[:, 2, :2] - j[:, 1, :2] # R_hip - L_hip, horizontal a = np.arctan2(v[:, 1], v[:, 0]) rlen = np.clip(np.hypot(np.cos(a).mean(), np.sin(a).mean()), 1e-9, 1.0) circ_std = np.degrees(np.sqrt(-2 * np.log(rlen))) - print(f" hip-heading circ-std={circ_std:.1f} deg " - f"(~0 => orientation removed; large => world-frame)") + print(f" hip-heading circ-std={circ_std:.1f} deg (~0 => orientation removed; large => world-frame)") w0 = m.window(0) - print(f" window(0): shape={w0.shape} (expected {SMPL_OBS_DIM}) " - f"min={w0.min():.3f} max={w0.max():.3f}") + print(f" window(0): shape={w0.shape} (expected {SMPL_OBS_DIM}) min={w0.min():.3f} max={w0.max():.3f}") assert w0.shape == (SMPL_OBS_DIM,), "window must be 720-dim for obs[922:1642]" # Simulate a few control ticks. diff --git a/examples/unitree_g1/pkl_to_npz.py b/examples/unitree_g1/pkl_to_npz.py index b7de3a69a..d6ad3e98e 100644 --- a/examples/unitree_g1/pkl_to_npz.py +++ b/examples/unitree_g1/pkl_to_npz.py @@ -42,15 +42,14 @@ def load_pkl(path: str) -> dict: return joblib.load(path) except Exception: # joblib clips are zlib-compressed pickles; fall back to manual inflate. + import contextlib import pickle import zlib with open(path, "rb") as f: raw = f.read() - try: + with contextlib.suppress(zlib.error): raw = zlib.decompress(raw) - except zlib.error: - pass return pickle.loads(raw) @@ -80,8 +79,9 @@ def main(): np.savez_compressed(args.out, **out) dur = smpl_joints.shape[0] / float(out["fps"]) print(f"Wrote {args.out}") - print(f" frames={smpl_joints.shape[0]} fps={float(out['fps']):.1f} duration={dur:.1f}s " - f"keys={sorted(out)}") + print( + f" frames={smpl_joints.shape[0]} fps={float(out['fps']):.1f} duration={dur:.1f}s keys={sorted(out)}" + ) if __name__ == "__main__": diff --git a/examples/unitree_g1/sonic.py b/examples/unitree_g1/sonic.py index 2dfad1fd7..00b5644da 100644 --- a/examples/unitree_g1/sonic.py +++ b/examples/unitree_g1/sonic.py @@ -25,15 +25,16 @@ For teleop integration use --robot.controller=SonicWholeBodyController instead. """ import argparse +import contextlib import faulthandler import gc import sys import time import numpy as np +from motion_loader import SmplMotion from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config -from lerobot.robots.unitree_g1.controllers.sonic_whole_body import SonicRuntime from lerobot.robots.unitree_g1.controllers.sonic_pipeline import ( CONTROL_DT, DEFAULT_ANGLES, @@ -43,41 +44,57 @@ from lerobot.robots.unitree_g1.controllers.sonic_pipeline import ( compute_kp_kd, drain_keyboard, ) +from lerobot.robots.unitree_g1.controllers.sonic_whole_body import SonicRuntime from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1 -from motion_loader import SmplMotion - def main(): parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control") - parser.add_argument("--ip", type=str, default=None, - help="Robot IP for real hardware (e.g. 192.168.123.164). " - "Omit for simulation.") - parser.add_argument("--log-csv", action="store_true", - help="Write /tmp/sonic_pose_log.csv (disabled by default for teleop perf)") - parser.add_argument("--cpu", action="store_true", - help="Force CPU ONNX Runtime (skip CUDA even if onnxruntime-gpu is installed)") - parser.add_argument("--headless", action="store_true", - help="Ignored for sim (stock UnitreeG1 uses hub MuJoCo defaults)") - parser.add_argument("--gamepad", action="store_true", - help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)") - parser.add_argument("--keyboard-only", action="store_true", - help="Ignore wireless gamepad (terminal keyboard only)") - parser.add_argument("--motion-file", type=str, default=None, - help="Play an SMPL motion clip (.npz) via SONIC whole-body mode " - "(encode_mode=2) instead of locomotion planning.") - parser.add_argument("--no-loop", action="store_true", - help="With --motion-file, play once instead of looping") + parser.add_argument( + "--ip", + type=str, + default=None, + help="Robot IP for real hardware (e.g. 192.168.123.164). Omit for simulation.", + ) + parser.add_argument( + "--log-csv", + action="store_true", + help="Write /tmp/sonic_pose_log.csv (disabled by default for teleop perf)", + ) + parser.add_argument( + "--cpu", + action="store_true", + help="Force CPU ONNX Runtime (skip CUDA even if onnxruntime-gpu is installed)", + ) + parser.add_argument( + "--headless", action="store_true", help="Ignored for sim (stock UnitreeG1 uses hub MuJoCo defaults)" + ) + parser.add_argument( + "--gamepad", + action="store_true", + help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)", + ) + parser.add_argument( + "--keyboard-only", action="store_true", help="Ignore wireless gamepad (terminal keyboard only)" + ) + parser.add_argument( + "--motion-file", + type=str, + default=None, + help="Play an SMPL motion clip (.npz) via SONIC whole-body mode " + "(encode_mode=2) instead of locomotion planning.", + ) + parser.add_argument( + "--no-loop", action="store_true", help="With --motion-file, play once instead of looping" + ) args = parser.parse_args() # Surface native crashes (onnxruntime / mujoco) with a real traceback, and # avoid losing buffered diagnostics if the process dies mid-loop. faulthandler.enable() - try: + with contextlib.suppress(Exception): sys.stdout.reconfigure(line_buffering=True) - except Exception: - pass print("=" * 60) print("SONIC planner - full mode control") @@ -111,20 +128,24 @@ def main(): motion = None if args.motion_file: motion = SmplMotion(args.motion_file, loop=not args.no_loop) - controller.smpl_motion = motion # lets 'M' key toggle playback - controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation + controller.smpl_motion = motion # lets 'M' key toggle playback + controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation dur = motion.num_frames / motion.fps print(f"\n[Motion] SMPL whole-body playback: {args.motion_file}") - print(f" frames={motion.num_frames} fps={motion.fps:.1f} " - f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2") + print( + f" frames={motion.num_frames} fps={motion.fps:.1f} " + f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2" + ) print(" Press 'M' to toggle SMPL playback <-> locomotion at runtime.") runtime.controller.print_input_diagnostics() print(f"\nStarting: {MOTION_SETS[0][0]} (default mode: {LM(ms.mode).name})") - [print(f" {i+1}: {m.name}") for i, m in enumerate(MOTION_SETS[0][1])] - print("\n[Ready] Click THIS terminal, then W/A/S/D to move. " - "1-6 change mode, 9/0 speed, Esc quit.\n", flush=True) + [print(f" {i + 1}: {m.name}") for i, m in enumerate(MOTION_SETS[0][1])] + print( + "\n[Ready] Click THIS terminal, then W/A/S/D to move. 1-6 change mode, 9/0 speed, Esc quit.\n", + flush=True, + ) # Sim hub publishes wireless_remote bytes that can fight terminal WASD. base_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None) @@ -137,22 +158,25 @@ def main(): time.sleep(1.0) last_status = time.time() - 2.1 - loop_t = enc_t = dec_t = obs_t = act_t = [] + loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], [] slow_n = blend_n = 0 stall_src = "" did_blend = False - prev_end = time.time() t_start = time.time() log_path = "/tmp/sonic_pose_log.csv" jnames = [m.name for m in G1_29_JointIndex] - log_ctx = open(log_path, "w") if args.log_csv else None + log_ctx = open(log_path, "w") if args.log_csv else None # noqa: SIM115 if log_ctx: - log_ctx.write("t,step,cursor,ts,blend,mode," + - ",".join(f"q{i}" for i in range(29)) + "," + - ",".join(f"ref{i}" for i in range(29)) + "," + - ",".join(f"act{i}" for i in range(29)) + - ",delta_max,action_norm,token_norm\n") + log_ctx.write( + "t,step,cursor,ts,blend,mode," + + ",".join(f"q{i}" for i in range(29)) + + "," + + ",".join(f"ref{i}" for i in range(29)) + + "," + + ",".join(f"act{i}" for i in range(29)) + + ",delta_max,action_norm,token_norm\n" + ) try: while not robot._shutdown_event.is_set(): @@ -192,42 +216,55 @@ def main(): q_r = np.array([obs.get(f"{n}.q", 0) for n in jnames]) a_v = np.array([action.get(f"{n}.q", 0) for n in jnames]) cur, ts = controller.ref_cursor, controller.motion_timesteps - q_ref = controller.motion_joint_positions[min(cur, ts - 1)] if ts > 0 else np.zeros(29) - log_ctx.write(f"{t_rel:.4f},{runtime.step},{cur},{ts},{int(did_blend)},{ms.mode}," + - ",".join(f"{v:.6f}" for v in q_r) + "," + - ",".join(f"{v:.6f}" for v in q_ref) + "," + - ",".join(f"{v:.6f}" for v in a_v) + "," + - f"{np.max(np.abs(a_v - q_r)):.6f}," - f"{np.linalg.norm(a_v):.6f}," - f"{np.linalg.norm(controller.token):.6f}\n") + q_ref = ( + controller.motion_joint_positions[min(cur, ts - 1)] if ts > 0 else np.zeros(29) + ) + log_ctx.write( + f"{t_rel:.4f},{runtime.step},{cur},{ts},{int(did_blend)},{ms.mode}," + + ",".join(f"{v:.6f}" for v in q_r) + + "," + + ",".join(f"{v:.6f}" for v in q_ref) + + "," + + ",".join(f"{v:.6f}" for v in a_v) + + "," + + f"{np.max(np.abs(a_v - q_r)):.6f}," + f"{np.linalg.norm(a_v):.6f}," + f"{np.linalg.norm(controller.token):.6f}\n" + ) did_blend = False now = time.time() loop_ms = 1000 * (now - t0) if loop_ms > 50: - stall_src = (f"[STALL] {loop_ms:.0f}ms: " - f"obs={obs_t[-1]:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}") + stall_src = ( + f"[STALL] {loop_ms:.0f}ms: " + f"obs={obs_t[-1]:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}" + ) if loop_ms > CONTROL_DT * 1500: slow_n += 1 if now - last_status > 2.0: + def _avg(lst): return sum(lst) / len(lst) if lst else 0 hz = 1000 / _avg(loop_t) if _avg(loop_t) else 0 - print(f"\r {ms.status_line()} step={runtime.step} " - f"ref={controller.ref_cursor}/{controller.motion_timesteps} " - f"loop={_avg(loop_t):.1f}ms(max={max(loop_t, default=0):.1f}) hz={hz:.0f} " - f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} " - f"slow={slow_n} blends={blend_n}", end="", flush=True) + print( + f"\r {ms.status_line()} step={runtime.step} " + f"ref={controller.ref_cursor}/{controller.motion_timesteps} " + f"loop={_avg(loop_t):.1f}ms(max={max(loop_t, default=0):.1f}) hz={hz:.0f} " + f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} " + f"slow={slow_n} blends={blend_n}", + end="", + flush=True, + ) if stall_src: print(f"\n {stall_src}") stall_src = "" last_status = now - loop_t = enc_t = dec_t = obs_t = act_t = [] + loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], [] slow_n = blend_n = 0 - prev_end = time.time() gc_timer += CONTROL_DT if gc_timer >= 10.0: gc.collect() diff --git a/src/lerobot/robots/unitree_g1/controllers/__init__.py b/src/lerobot/robots/unitree_g1/controllers/__init__.py index 8b7f3ded6..53625c634 100644 --- a/src/lerobot/robots/unitree_g1/controllers/__init__.py +++ b/src/lerobot/robots/unitree_g1/controllers/__init__.py @@ -1,3 +1,19 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """Unitree G1 locomotion controllers (Groot, Holosoma, SONIC).""" __all__ = [ diff --git a/src/lerobot/robots/unitree_g1/controllers/sonic_pipeline.py b/src/lerobot/robots/unitree_g1/controllers/sonic_pipeline.py index 3392cb168..314ca0352 100644 --- a/src/lerobot/robots/unitree_g1/controllers/sonic_pipeline.py +++ b/src/lerobot/robots/unitree_g1/controllers/sonic_pipeline.py @@ -1,9 +1,24 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """SONIC planner pipeline: ONNX enc/dec/planner, movement state, and input helpers.""" import math import queue import select -import struct import sys import termios import threading @@ -19,76 +34,180 @@ from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex # ── Constants ──────────────────────────────────────────────────────────────── -DEFAULT_ANGLES = np.array([ - -0.312, 0.0, 0.0, 0.669, -0.363, 0.0, - -0.312, 0.0, 0.0, 0.669, -0.363, 0.0, - 0.0, 0.0, 0.0, - 0.2, 0.2, 0.0, 0.6, 0.0, 0.0, 0.0, - 0.2, -0.2, 0.0, 0.6, 0.0, 0.0, 0.0, -], dtype=np.float32) +DEFAULT_ANGLES = np.array( + [ + -0.312, + 0.0, + 0.0, + 0.669, + -0.363, + 0.0, + -0.312, + 0.0, + 0.0, + 0.669, + -0.363, + 0.0, + 0.0, + 0.0, + 0.0, + 0.2, + 0.2, + 0.0, + 0.6, + 0.0, + 0.0, + 0.0, + 0.2, + -0.2, + 0.0, + 0.6, + 0.0, + 0.0, + 0.0, + ], + dtype=np.float32, +) NATURAL_FREQ = 10.0 * 2.0 * np.pi ARMATURE = {"5020": 0.003609725, "7520_14": 0.010177520, "7520_22": 0.025101925, "4010": 0.00425} -EFFORT = {"5020": 25.0, "7520_14": 88.0, "7520_22": 139.0, "4010": 5.0} +EFFORT = {"5020": 25.0, "7520_14": 88.0, "7520_22": 139.0, "4010": 5.0} + def _action_scale(k): return 0.25 * EFFORT[k] / (ARMATURE[k] * NATURAL_FREQ**2) -_J = ["7520_22","7520_22","7520_14","7520_22","5020","5020"] * 2 + \ - ["7520_14","5020","5020"] + \ - ["5020","5020","5020","5020","5020","4010","4010"] * 2 + +_J = ( + ["7520_22", "7520_22", "7520_14", "7520_22", "5020", "5020"] * 2 + + ["7520_14", "5020", "5020"] + + ["5020", "5020", "5020", "5020", "5020", "4010", "4010"] * 2 +) ACTION_SCALE = np.array([_action_scale(k) for k in _J], dtype=np.float32) -CONTROL_DT = 0.02 -DEFAULT_HEIGHT = 0.788740 -TOKEN_DIM = 64 +CONTROL_DT = 0.02 +DEFAULT_HEIGHT = 0.788740 +TOKEN_DIM = 64 ENCODER_UPDATE_EVERY = 5 -DEBUG_PRINT_EVERY = 100 +DEBUG_PRINT_EVERY = 100 MOTION_LOOK_AHEAD_STEPS = 2 -INITIAL_RANDOM_SEED = 1234 +INITIAL_RANDOM_SEED = 1234 MIN_TOKENS, MAX_TOKENS = 6, 16 K = MAX_TOKENS - MIN_TOKENS + 1 -DEADZONE = 0.05 -BLEND_FRAMES = 8 +DEADZONE = 0.05 +BLEND_FRAMES = 8 -REPLAN_INTERVAL = { - "running": 0.1, "crawling": 0.2, "boxing": 1.0, "default": 1.0 -} +REPLAN_INTERVAL = {"running": 0.1, "crawling": 0.2, "boxing": 1.0, "default": 1.0} -ISAACLAB_TO_MUJOCO = np.array([ - 0, 3, 6, 9, 13, 17, 1, 4, 7, 10, 14, 18, 2, 5, 8, - 11, 15, 19, 21, 23, 25, 27, 12, 16, 20, 22, 24, 26, 28 -], dtype=np.int32) +ISAACLAB_TO_MUJOCO = np.array( + [ + 0, + 3, + 6, + 9, + 13, + 17, + 1, + 4, + 7, + 10, + 14, + 18, + 2, + 5, + 8, + 11, + 15, + 19, + 21, + 23, + 25, + 27, + 12, + 16, + 20, + 22, + 24, + 26, + 28, + ], + dtype=np.int32, +) -MUJOCO_TO_ISAACLAB = np.array([ - 0, 6, 12, 1, 7, 13, 2, 8, 14, 3, 9, 15, 22, 4, 10, - 16, 23, 5, 11, 17, 24, 18, 25, 19, 26, 20, 27, 21, 28 -], dtype=np.int32) +MUJOCO_TO_ISAACLAB = np.array( + [ + 0, + 6, + 12, + 1, + 7, + 13, + 2, + 8, + 14, + 3, + 9, + 15, + 22, + 4, + 10, + 16, + 23, + 5, + 11, + 17, + 24, + 18, + 25, + 19, + 26, + 20, + 27, + 21, + 28, + ], + dtype=np.int32, +) + + +def _to_mujoco(a): + return a[MUJOCO_TO_ISAACLAB] + + +def _to_runtime(a): + r = np.zeros(29, np.float32) + r[MUJOCO_TO_ISAACLAB] = a + return r -def _to_mujoco(a): return a[MUJOCO_TO_ISAACLAB] -def _to_runtime(a): r = np.zeros(29, np.float32); r[MUJOCO_TO_ISAACLAB] = a; return r DEFAULT_ANGLES_MUJOCO = _to_mujoco(DEFAULT_ANGLES) -ENCODER_STANDING_REF = DEFAULT_ANGLES.copy() +ENCODER_STANDING_REF = DEFAULT_ANGLES.copy() -LOWER_BODY_IL = np.array([0,3,6,9,13,17,1,4,7,10,14,18], dtype=np.int32) -WRIST_IL = np.array([23,24,25,26,27,28], dtype=np.int32) -VR_TARGET_DEF = np.zeros(9, dtype=np.float32) -VR_ORN_DEF = np.array([1,0,0,0,1,0,0,0,1,0,0,0], dtype=np.float32) -SMPL_DEF = np.zeros(720, dtype=np.float32) +LOWER_BODY_IL = np.array([0, 3, 6, 9, 13, 17, 1, 4, 7, 10, 14, 18], dtype=np.int32) +WRIST_IL = np.array([23, 24, 25, 26, 27, 28], dtype=np.int32) +VR_TARGET_DEF = np.zeros(9, dtype=np.float32) +VR_ORN_DEF = np.array([1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0], dtype=np.float32) +SMPL_DEF = np.zeros(720, dtype=np.float32) # ── PD gains ───────────────────────────────────────────────────────────────── + def compute_kp_kd(): - s = lambda k: ARMATURE[k] * NATURAL_FREQ**2 - d = lambda k: 2.0 * 2.0 * ARMATURE[k] * NATURAL_FREQ - _kp_keys = ["7520_22","7520_22","7520_14","7520_22","5020","5020"] * 2 + \ - ["7520_14","5020","5020"] + \ - ["5020","5020","5020","5020","5020","4010","4010"] * 2 + def s(k): + return ARMATURE[k] * NATURAL_FREQ**2 + + def d(k): + return 2.0 * 2.0 * ARMATURE[k] * NATURAL_FREQ + + _kp_keys = ( + ["7520_22", "7520_22", "7520_14", "7520_22", "5020", "5020"] * 2 + + ["7520_14", "5020", "5020"] + + ["5020", "5020", "5020", "5020", "5020", "4010", "4010"] * 2 + ) _kd_keys = _kp_keys - _double = {4,5,10,11,13,14} # ankle + waist indices with factor 2 - kp = np.array([2*s(k) if i in _double else s(k) for i,k in enumerate(_kp_keys)], dtype=np.float32) - kd = np.array([2*d(k) if i in _double else d(k) for i,k in enumerate(_kd_keys)], dtype=np.float32) + _double = {4, 5, 10, 11, 13, 14} # ankle + waist indices with factor 2 + kp = np.array([2 * s(k) if i in _double else s(k) for i, k in enumerate(_kp_keys)], dtype=np.float32) + kd = np.array([2 * d(k) if i in _double else d(k) for i, k in enumerate(_kd_keys)], dtype=np.float32) return kp, kd @@ -119,111 +238,213 @@ def lowstate_to_obs(lowstate) -> dict: # ── Quaternion helpers ──────────────────────────────────────────────────────── + def quat_conj(q): return np.array([q[0], -q[1], -q[2], -q[3]], dtype=np.float32) + def quat_mul(q1, q2): - w1,x1,y1,z1 = q1; w2,x2,y2,z2 = q2 - return np.array([ - w1*w2 - x1*x2 - y1*y2 - z1*z2, - w1*x2 + x1*w2 + y1*z2 - z1*y2, - w1*y2 - x1*z2 + y1*w2 + z1*x2, - w1*z2 + x1*y2 - y1*x2 + z1*w2, - ], dtype=np.float32) + w1, x1, y1, z1 = q1 + w2, x2, y2, z2 = q2 + return np.array( + [ + w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2, + w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2, + w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2, + w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2, + ], + dtype=np.float32, + ) + def gravity_dir(q): q = q / (np.linalg.norm(q) + 1e-8) qv = np.array([0, 0, 0, -1], dtype=np.float32) return quat_mul(quat_mul(quat_conj(q), qv), q)[1:] + def quat_to_6d(q): - w,x,y,z = q - return np.array([ - 1-2*(y*y+z*z), 2*(x*y-z*w), - 2*(x*y+z*w), 1-2*(x*x+z*z), - 2*(x*z-y*w), 2*(y*z+x*w), - ], dtype=np.float32) + w, x, y, z = q + return np.array( + [ + 1 - 2 * (y * y + z * z), + 2 * (x * y - z * w), + 2 * (x * y + z * w), + 1 - 2 * (x * x + z * z), + 2 * (x * z - y * w), + 2 * (y * z + x * w), + ], + dtype=np.float32, + ) + def calc_heading(q): - w,x,y,z = q - return float(np.arctan2(2*(x*y + w*z), 1-2*(y*y+z*z))) + w, x, y, z = q + return float(np.arctan2(2 * (x * y + w * z), 1 - 2 * (y * y + z * z))) + def heading_quat(q, sign=1.0): a = sign * calc_heading(q) / 2.0 return np.array([np.cos(a), 0, 0, np.sin(a)], dtype=np.float64) -heading_quat_inv = lambda q: heading_quat(q, -1.0) + +def heading_quat_inv(q): + return heading_quat(q, -1.0) + def quat_slerp(q0, q1, t): - q0 = q0 / (np.linalg.norm(q0)+1e-12); q1 = q1 / (np.linalg.norm(q1)+1e-12) + q0 = q0 / (np.linalg.norm(q0) + 1e-12) + q1 = q1 / (np.linalg.norm(q1) + 1e-12) dot = float(np.dot(q0, q1)) - if dot < 0: q1, dot = -q1, -dot + if dot < 0: + q1, dot = -q1, -dot dot = min(dot, 1.0) if dot > 0.9995: - r = q0 + t*(q1-q0); return r/(np.linalg.norm(r)+1e-12) - th = np.arccos(dot); st = np.sin(th) - return (np.sin((1-t)*th)/st)*q0 + (np.sin(t*th)/st)*q1 + r = q0 + t * (q1 - q0) + return r / (np.linalg.norm(r) + 1e-12) + th = np.arccos(dot) + st = np.sin(th) + return (np.sin((1 - t) * th) / st) * q0 + (np.sin(t * th) / st) * q1 + def quat_slerp_batch(q0, q1, t): - q0 = q0 / (np.linalg.norm(q0,axis=1,keepdims=True)+1e-12) - q1 = q1 / (np.linalg.norm(q1,axis=1,keepdims=True)+1e-12) - dot = np.sum(q0*q1, axis=1); neg = dot<0 - q1=q1.copy(); q1[neg]=-q1[neg]; dot[neg]=-dot[neg]; dot=np.clip(dot,-1,1) - lin = dot>0.9995; th=np.arccos(dot); st=np.where(np.sin(th)==0,1,np.sin(th)) - c0=np.sin((1-t)*th)/st; c1=np.sin(t*th)/st - c0[lin]=1-t[lin]; c1[lin]=t[lin] - r = c0[:,None]*q0 + c1[:,None]*q1 - return r / (np.linalg.norm(r,axis=1,keepdims=True)+1e-12) + q0 = q0 / (np.linalg.norm(q0, axis=1, keepdims=True) + 1e-12) + q1 = q1 / (np.linalg.norm(q1, axis=1, keepdims=True) + 1e-12) + dot = np.sum(q0 * q1, axis=1) + neg = dot < 0 + q1 = q1.copy() + q1[neg] = -q1[neg] + dot[neg] = -dot[neg] + dot = np.clip(dot, -1, 1) + lin = dot > 0.9995 + th = np.arccos(dot) + st = np.where(np.sin(th) == 0, 1, np.sin(th)) + c0 = np.sin((1 - t) * th) / st + c1 = np.sin(t * th) / st + c0[lin] = 1 - t[lin] + c1[lin] = t[lin] + r = c0[:, None] * q0 + c1[:, None] * q1 + return r / (np.linalg.norm(r, axis=1, keepdims=True) + 1e-12) + # ── Locomotion modes ────────────────────────────────────────────────────────── + class LocomotionMode(IntEnum): - IDLE=0; SLOW_WALK=1; WALK=2; RUN=3; SQUAT=4; KNEEL_TWO_LEGS=5; KNEEL=6 - LYING_FACE_DOWN=7; CRAWLING=8; IDLE_BOXING=9; WALK_BOXING=10 - LEFT_PUNCH=11; RIGHT_PUNCH=12; RANDOM_PUNCH=13; ELBOW_CRAWLING=14 - LEFT_HOOK=15; RIGHT_HOOK=16; FORWARD_JUMP=17; STEALTH_WALK=18 - INJURED_WALK=19; LEDGE_WALKING=20; OBJECT_CARRYING=21; STEALTH_WALK_2=22 - HAPPY_DANCE_WALK=23; ZOMBIE_WALK=24; GUN_WALK=25; SCARE_WALK=26 + IDLE = 0 + SLOW_WALK = 1 + WALK = 2 + RUN = 3 + SQUAT = 4 + KNEEL_TWO_LEGS = 5 + KNEEL = 6 + LYING_FACE_DOWN = 7 + CRAWLING = 8 + IDLE_BOXING = 9 + WALK_BOXING = 10 + LEFT_PUNCH = 11 + RIGHT_PUNCH = 12 + RANDOM_PUNCH = 13 + ELBOW_CRAWLING = 14 + LEFT_HOOK = 15 + RIGHT_HOOK = 16 + FORWARD_JUMP = 17 + STEALTH_WALK = 18 + INJURED_WALK = 19 + LEDGE_WALKING = 20 + OBJECT_CARRYING = 21 + STEALTH_WALK_2 = 22 + HAPPY_DANCE_WALK = 23 + ZOMBIE_WALK = 24 + GUN_WALK = 25 + SCARE_WALK = 26 + LM = LocomotionMode MOTION_SETS = [ - ("Standing", [LM.SLOW_WALK, LM.WALK, LM.RUN, LM.FORWARD_JUMP, LM.STEALTH_WALK, LM.INJURED_WALK]), - ("Squat / Low", [LM.SQUAT, LM.KNEEL_TWO_LEGS, LM.KNEEL, LM.CRAWLING, LM.ELBOW_CRAWLING]), - ("Boxing", [LM.IDLE_BOXING, LM.WALK_BOXING, LM.LEFT_PUNCH, LM.RIGHT_PUNCH, - LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK]), - ("Styled Walks", [LM.LEDGE_WALKING, LM.OBJECT_CARRYING, LM.STEALTH_WALK_2, - LM.HAPPY_DANCE_WALK, LM.ZOMBIE_WALK, LM.GUN_WALK, LM.SCARE_WALK]), + ("Standing", [LM.SLOW_WALK, LM.WALK, LM.RUN, LM.FORWARD_JUMP, LM.STEALTH_WALK, LM.INJURED_WALK]), + ("Squat / Low", [LM.SQUAT, LM.KNEEL_TWO_LEGS, LM.KNEEL, LM.CRAWLING, LM.ELBOW_CRAWLING]), + ( + "Boxing", + [ + LM.IDLE_BOXING, + LM.WALK_BOXING, + LM.LEFT_PUNCH, + LM.RIGHT_PUNCH, + LM.RANDOM_PUNCH, + LM.LEFT_HOOK, + LM.RIGHT_HOOK, + ], + ), + ( + "Styled Walks", + [ + LM.LEDGE_WALKING, + LM.OBJECT_CARRYING, + LM.STEALTH_WALK_2, + LM.HAPPY_DANCE_WALK, + LM.ZOMBIE_WALK, + LM.GUN_WALK, + LM.SCARE_WALK, + ], + ), ] -STATIC_MODES = {LM.IDLE, LM.SQUAT, LM.KNEEL_TWO_LEGS, LM.KNEEL, LM.LYING_FACE_DOWN, LM.IDLE_BOXING} -STANDING_MODES = {LM.IDLE, LM.SLOW_WALK, LM.WALK, LM.RUN, LM.IDLE_BOXING, LM.WALK_BOXING, - LM.LEFT_PUNCH, LM.RIGHT_PUNCH, LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK, - LM.FORWARD_JUMP, LM.STEALTH_WALK, LM.INJURED_WALK, LM.LEDGE_WALKING, - LM.OBJECT_CARRYING, LM.STEALTH_WALK_2, LM.HAPPY_DANCE_WALK, - LM.ZOMBIE_WALK, LM.GUN_WALK, LM.SCARE_WALK} -BOXING_MODES = {LM.WALK_BOXING, LM.LEFT_PUNCH, LM.RIGHT_PUNCH, - LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK} -SPEED_RANGES = {LM.SLOW_WALK:(0.2,0.8), LM.WALK:(0.8,1.5), LM.RUN:(1.5,3.0), - LM.CRAWLING:(0.4,1.0), LM.ELBOW_CRAWLING:(0.7,1.0)} +STATIC_MODES = {LM.IDLE, LM.SQUAT, LM.KNEEL_TWO_LEGS, LM.KNEEL, LM.LYING_FACE_DOWN, LM.IDLE_BOXING} +STANDING_MODES = { + LM.IDLE, + LM.SLOW_WALK, + LM.WALK, + LM.RUN, + LM.IDLE_BOXING, + LM.WALK_BOXING, + LM.LEFT_PUNCH, + LM.RIGHT_PUNCH, + LM.RANDOM_PUNCH, + LM.LEFT_HOOK, + LM.RIGHT_HOOK, + LM.FORWARD_JUMP, + LM.STEALTH_WALK, + LM.INJURED_WALK, + LM.LEDGE_WALKING, + LM.OBJECT_CARRYING, + LM.STEALTH_WALK_2, + LM.HAPPY_DANCE_WALK, + LM.ZOMBIE_WALK, + LM.GUN_WALK, + LM.SCARE_WALK, +} +BOXING_MODES = {LM.WALK_BOXING, LM.LEFT_PUNCH, LM.RIGHT_PUNCH, LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK} +SPEED_RANGES = { + LM.SLOW_WALK: (0.2, 0.8), + LM.WALK: (0.8, 1.5), + LM.RUN: (1.5, 3.0), + LM.CRAWLING: (0.4, 1.0), + LM.ELBOW_CRAWLING: (0.7, 1.0), +} + def clamp_mode_params(ms): m = LM(ms.mode) - ms.height = -1.0 if m in STANDING_MODES else max(0.1, min(0.8, ms.height if ms.height>=0 else 0.2)) + ms.height = -1.0 if m in STANDING_MODES else max(0.1, min(0.8, ms.height if ms.height >= 0 else 0.2)) if m in STATIC_MODES: ms.speed = -1.0 elif m in SPEED_RANGES: lo, hi = SPEED_RANGES[m] - ms.speed = max(lo, min(hi, ms.speed if ms.speed>=0 else lo)) + ms.speed = max(lo, min(hi, ms.speed if ms.speed >= 0 else lo)) elif m in BOXING_MODES: - ms.speed = max(0.7, min(1.5, ms.speed if ms.speed>=0 else 0.7)) + ms.speed = max(0.7, min(1.5, ms.speed if ms.speed >= 0 else 0.7)) else: ms.speed = -1.0 + def replan_interval(mode): m = LM(mode) - if m == LM.RUN: return REPLAN_INTERVAL["running"] - if m == LM.CRAWLING: return REPLAN_INTERVAL["crawling"] + if m == LM.RUN: + return REPLAN_INTERVAL["running"] + if m == LM.CRAWLING: + return REPLAN_INTERVAL["crawling"] if m in {LM.LEFT_PUNCH, LM.RIGHT_PUNCH, LM.RANDOM_PUNCH, LM.LEFT_HOOK, LM.RIGHT_HOOK}: return REPLAN_INTERVAL["boxing"] return REPLAN_INTERVAL["default"] @@ -236,22 +457,26 @@ def _ort_providers(force_cpu: bool = False) -> list[str]: return ["CUDAExecutionProvider", "CPUExecutionProvider"] return ["CPUExecutionProvider"] + # ── Movement state ──────────────────────────────────────────────────────────── + @dataclass class MovementState: - mode: int = LM.SLOW_WALK # not IDLE — walking modes respond to WASD - speed: float = -1.0 - height: float = -1.0 + mode: int = LM.SLOW_WALK # not IDLE — walking modes respond to WASD + speed: float = -1.0 + height: float = -1.0 facing_angle: float = 0.0 movement_angle: float = 0.0 has_movement: bool = False motion_set_idx: int = 0 needs_replan: bool = False + joy_prev_active: bool = False # tracks right-stick activity across joystick polls @property def movement_direction(self): - if not self.has_movement: return (0.0, 0.0, 0.0) + if not self.has_movement: + return (0.0, 0.0, 0.0) return (math.cos(self.movement_angle), math.sin(self.movement_angle), 0.0) @property @@ -259,11 +484,13 @@ class MovementState: return (math.cos(self.facing_angle), math.sin(self.facing_angle), 0.0) def status_line(self): - return (f"[{MOTION_SETS[self.motion_set_idx][0]}] mode={self.mode}({LM(self.mode).name}) " - f"spd={'default' if self.speed<0 else f'{self.speed:.1f}'} " - f"hgt={'default' if self.height<0 else f'{self.height:.2f}'} " - f"facing={math.degrees(self.facing_angle):.0f}° " - f"{'moving' if self.has_movement else 'still'}") + return ( + f"[{MOTION_SETS[self.motion_set_idx][0]}] mode={self.mode}({LM(self.mode).name}) " + f"spd={'default' if self.speed < 0 else f'{self.speed:.1f}'} " + f"hgt={'default' if self.height < 0 else f'{self.height:.2f}'} " + f"facing={math.degrees(self.facing_angle):.0f}° " + f"{'moving' if self.has_movement else 'still'}" + ) @dataclass @@ -296,47 +523,49 @@ def should_replan_request(ms: MovementState, last: MovementSnapshot, replan_time if mode_changed or facing_changed or height_changed: return True time_to_replan = replan_timer >= replan_interval(ms.mode) - if not is_static and (speed_changed or dir_changed or (time_to_replan and ms.speed != 0)): - return True - return False + return not is_static and (speed_changed or dir_changed or (time_to_replan and ms.speed != 0)) + # ── Encoder / Decoder ───────────────────────────────────────────────────────── + class StandingEncoderDecoder: def __init__(self, encoder, decoder): self.encoder, self.decoder = encoder, decoder - self.encoder_input = encoder.get_inputs()[0].name - self.decoder_input = decoder.get_inputs()[0].name + self.encoder_input = encoder.get_inputs()[0].name + self.decoder_input = decoder.get_inputs()[0].name enc_dim = int(encoder.get_inputs()[0].shape[1]) dec_dim = int(decoder.get_inputs()[0].shape[1]) if enc_dim != 1762 or dec_dim != 994: raise RuntimeError(f"Unexpected dims encoder={enc_dim}, decoder={dec_dim}") - self.token = np.zeros(TOKEN_DIM, np.float32) - self.last_action_mj = np.zeros(29, np.float32) - self.h_q_mj = [np.zeros(29, np.float32)] * 10 - self.h_dq_mj = [np.zeros(29, np.float32)] * 10 - self.h_ang = [np.zeros(3, np.float32)] * 10 + self.token = np.zeros(TOKEN_DIM, np.float32) + self.last_action_mj = np.zeros(29, np.float32) + self.h_q_mj = [np.zeros(29, np.float32)] * 10 + self.h_dq_mj = [np.zeros(29, np.float32)] * 10 + self.h_ang = [np.zeros(3, np.float32)] * 10 self.h_act_mj = [np.zeros(29, np.float32)] * 10 - self.h_quat = [np.array([1,0,0,0], np.float32)] * 10 - self.init_base_quat = np.array([1,0,0,0], np.float32) - self.init_ref_quat = np.array([1,0,0,0], np.float32) - self._heading_init = False - self.encode_mode = 0 - self.vr_3point_local_target = VR_TARGET_DEF.copy() + self.h_quat = [np.array([1, 0, 0, 0], np.float32)] * 10 + self.init_base_quat = np.array([1, 0, 0, 0], np.float32) + self.init_ref_quat = np.array([1, 0, 0, 0], np.float32) + self._heading_init = False + self.encode_mode = 0 + self.vr_3point_local_target = VR_TARGET_DEF.copy() self.vr_3point_local_orn_target = VR_ORN_DEF.copy() self.smpl_joints_10frame_step1 = SMPL_DEF.copy() self.set_zero_reference() def update_history(self, q, dq, ang, quat): - quat = quat / (np.linalg.norm(quat)+1e-8) - q_mj = _to_mujoco(q); dq_mj = _to_mujoco(dq) - self.h_q_mj = [q_mj - DEFAULT_ANGLES_MUJOCO] + self.h_q_mj[:-1] - self.h_dq_mj = [dq_mj] + self.h_dq_mj[:-1] - self.h_ang = [ang.copy()] + self.h_ang[:-1] + quat = quat / (np.linalg.norm(quat) + 1e-8) + q_mj = _to_mujoco(q) + dq_mj = _to_mujoco(dq) + self.h_q_mj = [q_mj - DEFAULT_ANGLES_MUJOCO] + self.h_q_mj[:-1] + self.h_dq_mj = [dq_mj] + self.h_dq_mj[:-1] + self.h_ang = [ang.copy()] + self.h_ang[:-1] self.h_act_mj = [self.last_action_mj.copy()] + self.h_act_mj[:-1] - self.h_quat = [quat.copy()] + self.h_quat[:-1] + self.h_quat = [quat.copy()] + self.h_quat[:-1] if not self._heading_init: - self.init_base_quat = quat.copy(); self._heading_init = True + self.init_base_quat = quat.copy() + self._heading_init = True def _heading_quat(self, q): h = calc_heading(q) / 2.0 @@ -347,19 +576,20 @@ class StandingEncoderDecoder: return np.array([np.cos(-h), 0, 0, np.sin(-h)], np.float32) def _anchor_6d(self, base_quat, ref_quat=None): - if ref_quat is None: ref_quat = self.init_ref_quat + if ref_quat is None: + ref_quat = self.init_ref_quat delta = quat_mul(self._heading_quat(self.init_base_quat), self._heading_quat_inv(self.init_ref_quat)) new_ref = quat_mul(delta, ref_quat) return quat_to_6d(quat_mul(quat_conj(base_quat), new_ref)) def set_zero_reference(self): - self.motion_joint_positions = [ENCODER_STANDING_REF.copy()] + self.motion_joint_positions = [ENCODER_STANDING_REF.copy()] self.motion_joint_velocities = [np.zeros(29, np.float32)] - self.motion_body_quats = [np.array([1,0,0,0], np.float32)] - self.motion_body_z = [DEFAULT_HEIGHT] - self.motion_timesteps = 1 - self.freeze_ref_frame = 0 - self.init_ref_quat = self.motion_body_quats[0].copy() + self.motion_body_quats = [np.array([1, 0, 0, 0], np.float32)] + self.motion_body_z = [DEFAULT_HEIGHT] + self.motion_timesteps = 1 + self.freeze_ref_frame = 0 + self.init_ref_quat = self.motion_body_quats[0].copy() def build_encoder_obs(self): obs = np.zeros(1762, np.float32) @@ -368,167 +598,229 @@ class StandingEncoderDecoder: ref_pos, ref_quat = self.motion_joint_positions[rf], self.motion_body_quats[rf] if self.encode_mode == 0: for f in range(10): - obs[4+29*f:4+29*(f+1)] = ref_pos - obs[601+6*f:601+6*(f+1)] = self._anchor_6d(self.h_quat[0], ref_quat) + obs[4 + 29 * f : 4 + 29 * (f + 1)] = ref_pos + obs[601 + 6 * f : 601 + 6 * (f + 1)] = self._anchor_6d(self.h_quat[0], ref_quat) elif self.encode_mode == 1: ref_lower = ref_pos[LOWER_BODY_IL] for f in range(10): - obs[661+12*f:661+12*(f+1)] = ref_lower + obs[661 + 12 * f : 661 + 12 * (f + 1)] = ref_lower obs[901:910] = self.vr_3point_local_target obs[910:922] = self.vr_3point_local_orn_target - obs[595:601] = self._anchor_6d(self.h_quat[0], ref_quat) + obs[595:601] = self._anchor_6d(self.h_quat[0], ref_quat) elif self.encode_mode == 2: obs[922:1642] = self.smpl_joints_10frame_step1 for f in range(10): - obs[1642+6*f:1642+6*(f+1)] = self._anchor_6d(self.h_quat[0], ref_quat) - obs[1702+6*f:1702+6*(f+1)] = ref_pos[WRIST_IL] + obs[1642 + 6 * f : 1642 + 6 * (f + 1)] = self._anchor_6d(self.h_quat[0], ref_quat) + obs[1702 + 6 * f : 1702 + 6 * (f + 1)] = ref_pos[WRIST_IL] else: raise RuntimeError(f"Unsupported encoder mode: {self.encode_mode}") return obs def build_decoder_obs(self): - obs = np.zeros(994, np.float32); off = 0 - obs[off:off+64] = self.token; off += 64 - for h, sz in [(list(reversed(self.h_ang)),3), (list(reversed(self.h_q_mj)),29), - (list(reversed(self.h_dq_mj)),29), (list(reversed(self.h_act_mj)),29)]: - for f in range(10): obs[off:off+sz] = h[f]; off += sz + obs = np.zeros(994, np.float32) + off = 0 + obs[off : off + 64] = self.token + off += 64 + for h, sz in [ + (list(reversed(self.h_ang)), 3), + (list(reversed(self.h_q_mj)), 29), + (list(reversed(self.h_dq_mj)), 29), + (list(reversed(self.h_act_mj)), 29), + ]: + for f in range(10): + obs[off : off + sz] = h[f] + off += sz for q in reversed(self.h_quat): - obs[off:off+3] = gravity_dir(q); off += 3 + obs[off : off + 3] = gravity_dir(q) + off += 3 assert off == 994, f"Decoder obs mismatch: {off}" return obs def run_encoder(self): - return self.encoder.run(None, {self.encoder_input: self.build_encoder_obs().reshape(1,-1)})[0].squeeze().astype(np.float32) + return ( + self.encoder.run(None, {self.encoder_input: self.build_encoder_obs().reshape(1, -1)})[0] + .squeeze() + .astype(np.float32) + ) def step(self, robot_obs, update_encoder, debug=False): jnames = [m.name for m in G1_29_JointIndex] - q = np.array([robot_obs.get(f"{n}.q", DEFAULT_ANGLES[m.value]) for m,n in zip(G1_29_JointIndex,jnames)], np.float32) - dq = np.array([robot_obs.get(f"{n}.dq", 0.0) for n in jnames], np.float32) - quat = np.array([robot_obs.get("imu.quat.w",1), robot_obs.get("imu.quat.x",0), - robot_obs.get("imu.quat.y",0), robot_obs.get("imu.quat.z",0)], np.float32) - ang = np.array([robot_obs.get(f"imu.gyro.{a}",0) for a in "xyz"], np.float32) + q = np.array( + [ + robot_obs.get(f"{n}.q", DEFAULT_ANGLES[m.value]) + for m, n in zip(G1_29_JointIndex, jnames, strict=False) + ], + np.float32, + ) + dq = np.array([robot_obs.get(f"{n}.dq", 0.0) for n in jnames], np.float32) + quat = np.array( + [ + robot_obs.get("imu.quat.w", 1), + robot_obs.get("imu.quat.x", 0), + robot_obs.get("imu.quat.y", 0), + robot_obs.get("imu.quat.z", 0), + ], + np.float32, + ) + ang = np.array([robot_obs.get(f"imu.gyro.{a}", 0) for a in "xyz"], np.float32) self.update_history(q, dq, ang, quat) - if update_encoder: self.token = self.run_encoder() - action_mj = self.decoder.run(None, {self.decoder_input: self.build_decoder_obs().reshape(1,-1)})[0].squeeze().astype(np.float32) + if update_encoder: + self.token = self.run_encoder() + action_mj = ( + self.decoder.run(None, {self.decoder_input: self.build_decoder_obs().reshape(1, -1)})[0] + .squeeze() + .astype(np.float32) + ) self.last_action_mj = action_mj.copy() target = DEFAULT_ANGLES + action_mj[ISAACLAB_TO_MUJOCO] * ACTION_SCALE if debug: delta = target - q - print(f"token_norm={np.linalg.norm(self.token):.4f} action_norm={np.linalg.norm(action_mj):.4f} " - f"delta_max={np.max(np.abs(delta)):.4f} delta_rms={np.sqrt(np.mean(delta**2)):.4f}") + print( + f"token_norm={np.linalg.norm(self.token):.4f} action_norm={np.linalg.norm(action_mj):.4f} " + f"delta_max={np.max(np.abs(delta)):.4f} delta_rms={np.sqrt(np.mean(delta**2)):.4f}" + ) return {f"{m.name}.q": float(target[m.value]) for m in G1_29_JointIndex} def print_input_diagnostics(self): print("\n[Diag] Standing reference checks") - names = {0:"g1", 1:"teleop", 2:"smpl"} - print(f" encoder mode: {self.encode_mode} ({names.get(self.encode_mode,'unknown')})") + names = {0: "g1", 1: "teleop", 2: "smpl"} + print(f" encoder mode: {self.encode_mode} ({names.get(self.encode_mode, 'unknown')})") print(f" DEFAULT_ANGLES range: [{DEFAULT_ANGLES.min():+.4f}, {DEFAULT_ANGLES.max():+.4f}]") - print(f" anchor_6d(identity): {self._anchor_6d(np.array([1,0,0,0],np.float32), np.array([1,0,0,0],np.float32))}") - print(f" gravity(identity): {gravity_dir(np.array([1,0,0,0],np.float32))} (expect [0,0,-1])") + print( + f" anchor_6d(identity): {self._anchor_6d(np.array([1, 0, 0, 0], np.float32), np.array([1, 0, 0, 0], np.float32))}" + ) + print(f" gravity(identity): {gravity_dir(np.array([1, 0, 0, 0], np.float32))} (expect [0,0,-1])") dec0 = self.build_decoder_obs() print(f" decoder q-delta max: {np.max(np.abs(dec0[94:384])):.6f}") print(f" decoder dq max: {np.max(np.abs(dec0[384:674])):.6f}") + # ── Planner motion buffer ───────────────────────────────────────────────────── + class PlannerMotion: def __init__(self, max_frames=1500): - self.timesteps = 0 - self.joint_positions = np.zeros((max_frames, 29), np.float64) - self.joint_velocities = np.zeros((max_frames, 29), np.float64) - self.body_positions = np.zeros((max_frames, 3), np.float64) - self.body_quaternions = np.zeros((max_frames, 4), np.float64) + self.timesteps = 0 + self.joint_positions = np.zeros((max_frames, 29), np.float64) + self.joint_velocities = np.zeros((max_frames, 29), np.float64) + self.body_positions = np.zeros((max_frames, 3), np.float64) + self.body_quaternions = np.zeros((max_frames, 4), np.float64) self.body_quaternions[:, 0] = 1.0 + # ── Subprocess planner ──────────────────────────────────────────────────────── + def _resample_30_to_50(qpos, n30): t50 = int(np.floor(n30 / 30.0 * 50)) f30 = np.arange(t50) / 50.0 * 30.0 - f0 = np.floor(f30).astype(int) - f1 = np.minimum(f0+1, n30-1) - frac, w0 = (f30-f0).astype(np.float64), None + f0 = np.floor(f30).astype(int) + f1 = np.minimum(f0 + 1, n30 - 1) + frac, w0 = (f30 - f0).astype(np.float64), None w0 = 1.0 - frac - jp = (w0[:,None]*qpos[f0,7:36] + frac[:,None]*qpos[f1,7:36])[:,MUJOCO_TO_ISAACLAB] + jp = (w0[:, None] * qpos[f0, 7:36] + frac[:, None] * qpos[f1, 7:36])[:, MUJOCO_TO_ISAACLAB] jv = np.zeros_like(jp) - if t50 >= 2: jv[:t50-1] = (jp[1:] - jp[:-1]) * 50.0; jv[-1] = jv[-2] + if t50 >= 2: + jv[: t50 - 1] = (jp[1:] - jp[:-1]) * 50.0 + jv[-1] = jv[-2] return { "timesteps": t50, - "joint_positions": jp, + "joint_positions": jp, "joint_velocities": jv, - "body_positions": w0[:,None]*qpos[f0,:3] + frac[:,None]*qpos[f1,:3], - "body_quaternions": quat_slerp_batch(qpos[f0,3:7], qpos[f1,3:7], frac), + "body_positions": w0[:, None] * qpos[f0, :3] + frac[:, None] * qpos[f1, :3], + "body_quaternions": quat_slerp_batch(qpos[f0, 3:7], qpos[f1, 3:7], frac), } + def _build_planner_inputs(ctx, ms_dict, version, seed): inp = { - "context_mujoco_qpos": ctx.astype(np.float32).reshape(1,4,36), - "target_vel": np.array([ms_dict["speed"]], np.float32), - "mode": np.array([ms_dict["mode"]], np.int64), - "movement_direction": np.array(ms_dict["movement_direction"], np.float32).reshape(1,3), - "facing_direction": np.array(ms_dict["facing_direction"], np.float32).reshape(1,3), - "random_seed": np.array([seed], np.int64), + "context_mujoco_qpos": ctx.astype(np.float32).reshape(1, 4, 36), + "target_vel": np.array([ms_dict["speed"]], np.float32), + "mode": np.array([ms_dict["mode"]], np.int64), + "movement_direction": np.array(ms_dict["movement_direction"], np.float32).reshape(1, 3), + "facing_direction": np.array(ms_dict["facing_direction"], np.float32).reshape(1, 3), + "random_seed": np.array([seed], np.int64), } if version >= 1: # TensorRT deploy: allow 9–11 prediction tokens only (indices 3–5 for MIN_TOKENS=6). allowed = np.zeros((1, K), np.int64) if K >= 6: allowed[0, 3:6] = 1 - inp.update({ - "height": np.array([ms_dict["height"]], np.float32), - "has_specific_target": np.array([[0]], np.int64), - "specific_target_positions": np.zeros((1,4,3), np.float32), - "specific_target_headings": np.zeros((1,4), np.float32), - "allowed_pred_num_tokens": allowed, - }) + inp.update( + { + "height": np.array([ms_dict["height"]], np.float32), + "has_specific_target": np.array([[0]], np.int64), + "specific_target_positions": np.zeros((1, 4, 3), np.float32), + "specific_target_headings": np.zeros((1, 4), np.float32), + "allowed_pred_num_tokens": allowed, + } + ) return inp + def _planner_worker(path, req_q, res_q, stop_evt, version, seed, use_gpu): - so = ort.SessionOptions(); so.log_severity_level = 3 + so = ort.SessionOptions() + so.log_severity_level = 3 providers = _ort_providers(force_cpu=not use_gpu) sess = ort.InferenceSession(path, sess_options=so, providers=providers) while not stop_evt.is_set(): - try: ctx, gf, ms_dict = req_q.get(timeout=0.05) - except Exception: continue + try: + ctx, gf, ms_dict = req_q.get(timeout=0.05) + except Exception: + continue try: inp = _build_planner_inputs(ctx, ms_dict, version, seed) t0 = time.time() qpos_out, num_pred = sess.run(None, inp) t_inf = time.time() n = int(num_pred.flat[0]) - qpos = qpos_out[0,:n] - if np.any(np.isnan(qpos)): continue + qpos = qpos_out[0, :n] + if np.any(np.isnan(qpos)): + continue motion = _resample_30_to_50(qpos, n) motion["gen_frame"] = gf - print(f"[Planner] inf={1000*(t_inf-t0):.1f}ms total={1000*(time.time()-t0):.1f}ms frames={n}", flush=True) + print( + f"[Planner] inf={1000 * (t_inf - t0):.1f}ms total={1000 * (time.time() - t0):.1f}ms frames={n}", + flush=True, + ) while not res_q.empty(): - try: res_q.get_nowait() - except queue.Empty: break + try: + res_q.get_nowait() + except queue.Empty: + break res_q.put(motion) except Exception as e: print(f"[Planner] Error: {e}", flush=True) + # ── SonicPlanner ────────────────────────────────────────────────────────────── + class SonicPlanner: def __init__(self, session, planner_path): - self.session = session + self.session = session self.planner_path = planner_path - self.gen_frame = 0 - self.random_seed = INITIAL_RANDOM_SEED - self.version = 1 if len(session.get_inputs()) >= 11 else 0 - self.motion_50hz = PlannerMotion() - self._snapshot = PlannerMotion() + self.gen_frame = 0 + self.random_seed = INITIAL_RANDOM_SEED + self.version = 1 if len(session.get_inputs()) >= 11 else 0 + self.motion_50hz = PlannerMotion() + self._snapshot = PlannerMotion() self._req_q = self._res_q = self._stop_evt = self._planner_thread = None self._ctrl = None def _build_inputs(self, ctx, ms): return _build_planner_inputs( ctx, - {"mode": ms.mode, "speed": ms.speed, "height": ms.height, - "movement_direction": list(ms.movement_direction), - "facing_direction": list(ms.facing_direction)}, - self.version, self.random_seed) + { + "mode": ms.mode, + "speed": ms.speed, + "height": ms.height, + "movement_direction": list(ms.movement_direction), + "facing_direction": list(ms.facing_direction), + }, + self.version, + self.random_seed, + ) @staticmethod def build_initial_context(joint_positions): @@ -563,48 +855,64 @@ class SonicPlanner: return ctx def _load_motion_in_place(self, qpos, n30, target=None): - if target is None: target = self.motion_50hz + if target is None: + target = self.motion_50hz r = _resample_30_to_50(qpos, n30) - n = r["timesteps"]; target.timesteps = n - target.joint_positions[:n] = r["joint_positions"] + n = r["timesteps"] + target.timesteps = n + target.joint_positions[:n] = r["joint_positions"] target.joint_velocities[:n] = r["joint_velocities"] - target.body_positions[:n] = r["body_positions"] + target.body_positions[:n] = r["body_positions"] target.body_quaternions[:n] = r["body_quaternions"] return target def initialize(self, joint_positions, ms): ctx = self.build_initial_context(joint_positions) qpos_out, num_pred = self.session.run(None, self._build_inputs(ctx, ms)) - n = int(num_pred.flat[0]); qpos = qpos_out[0,:n] - if np.any(np.isnan(qpos)): raise RuntimeError("Planner initial output contains NaN") + n = int(num_pred.flat[0]) + qpos = qpos_out[0, :n] + if np.any(np.isnan(qpos)): + raise RuntimeError("Planner initial output contains NaN") print(f"[Planner] Init: {n} frames @ 30 Hz") self._load_motion_in_place(qpos, n) print(f"[Planner] Resampled to {self.motion_50hz.timesteps} frames @ 50 Hz") return self.motion_50hz def request_replan(self, cursor, ms): - if self._req_q is None: return + if self._req_q is None: + return ctx = self._context_from_controller(cursor) - ms_dict = {"mode": ms.mode, "speed": ms.speed, "height": ms.height, - "movement_direction": list(ms.movement_direction), - "facing_direction": list(ms.facing_direction)} + ms_dict = { + "mode": ms.mode, + "speed": ms.speed, + "height": ms.height, + "movement_direction": list(ms.movement_direction), + "facing_direction": list(ms.facing_direction), + } while not self._req_q.empty(): - try: self._req_q.get_nowait() - except queue.Empty: break + try: + self._req_q.get_nowait() + except queue.Empty: + break self._req_q.put((ctx, self.gen_frame, ms_dict)) def try_get_new_motion(self): - if self._res_q is None: return None + if self._res_q is None: + return None result = None while not self._res_q.empty(): - try: result = self._res_q.get_nowait() - except queue.Empty: break - if result is None: return None + try: + result = self._res_q.get_nowait() + except queue.Empty: + break + if result is None: + return None n, gf = result["timesteps"], result["gen_frame"] - s = self._snapshot; s.timesteps = n - s.joint_positions[:n] = result["joint_positions"] + s = self._snapshot + s.timesteps = n + s.joint_positions[:n] = result["joint_positions"] s.joint_velocities[:n] = result["joint_velocities"] - s.body_positions[:n] = result["body_positions"] + s.body_positions[:n] = result["body_positions"] s.body_quaternions[:n] = result["body_quaternions"] return s, gf @@ -616,8 +924,15 @@ class SonicPlanner: self._stop_evt = threading.Event() self._planner_thread = threading.Thread( target=_planner_worker, - args=(self.planner_path, self._req_q, self._res_q, - self._stop_evt, self.version, self.random_seed, use_gpu), + args=( + self.planner_path, + self._req_q, + self._res_q, + self._stop_evt, + self.version, + self.random_seed, + use_gpu, + ), daemon=True, name="sonic-planner", ) @@ -633,20 +948,23 @@ class SonicPlanner: self._planner_thread = None self._req_q = self._res_q = self._stop_evt = None + # ── PlannerController ───────────────────────────────────────────────────────── + class PlannerController(StandingEncoderDecoder): def __init__(self, planner, encoder, decoder): super().__init__(encoder, decoder) self.planner = planner - self.ref_cursor = 0 + self.ref_cursor = 0 self.motion_timesteps = 0 - self.motion_joint_positions = np.zeros((1500,29), np.float64) - self.motion_joint_velocities = np.zeros((1500,29), np.float64) - self.motion_body_quats = np.zeros((1500,4), np.float64); self.motion_body_quats[:,0] = 1.0 - self.motion_body_pos = np.zeros((1500,3), np.float64) - self.init_ref_quat = np.array([1,0,0,0], np.float64) - self.heading_init_base_quat = np.array([1,0,0,0], np.float64) + self.motion_joint_positions = np.zeros((1500, 29), np.float64) + self.motion_joint_velocities = np.zeros((1500, 29), np.float64) + self.motion_body_quats = np.zeros((1500, 4), np.float64) + self.motion_body_quats[:, 0] = 1.0 + self.motion_body_pos = np.zeros((1500, 3), np.float64) + self.init_ref_quat = np.array([1, 0, 0, 0], np.float64) + self.heading_init_base_quat = np.array([1, 0, 0, 0], np.float64) self.delta_heading = 0.0 self.reinit_heading = False self.playing = self.first_motion = False @@ -656,13 +974,15 @@ class PlannerController(StandingEncoderDecoder): with self.motion_lock: n = motion.timesteps self.motion_timesteps = n - self.motion_joint_positions[:n] = motion.joint_positions[:n] + self.motion_joint_positions[:n] = motion.joint_positions[:n] self.motion_joint_velocities[:n] = motion.joint_velocities[:n] - self.motion_body_quats[:n] = motion.body_quaternions[:n] - self.motion_body_pos[:n] = motion.body_positions[:n] + self.motion_body_quats[:n] = motion.body_quaternions[:n] + self.motion_body_pos[:n] = motion.body_positions[:n] self.init_ref_quat = motion.body_quaternions[0].copy() - self.ref_cursor = 0; self.first_motion = True - self.playing = True; self.delta_heading = 0.0 + self.ref_cursor = 0 + self.first_motion = True + self.playing = True + self.delta_heading = 0.0 def blend_new_motion(self, new_motion, gen_frame): """Blend like C++ CurrentFrameAdvancement: 8-frame cross-fade, then copy tail.""" @@ -692,16 +1012,13 @@ class PlannerController(StandingEncoderDecoder): w_new = min(1.0, max(0.0, (f - blend_start) / BLEND_FRAMES)) w_old = 1.0 - w_new self.motion_joint_positions[f] = ( - w_old * self.motion_joint_positions[f_old] - + w_new * new_motion.joint_positions[f_new] + w_old * self.motion_joint_positions[f_old] + w_new * new_motion.joint_positions[f_new] ) self.motion_joint_velocities[f] = ( - w_old * self.motion_joint_velocities[f_old] - + w_new * new_motion.joint_velocities[f_new] + w_old * self.motion_joint_velocities[f_old] + w_new * new_motion.joint_velocities[f_new] ) self.motion_body_pos[f] = ( - w_old * self.motion_body_pos[f_old] - + w_new * new_motion.body_positions[f_new] + w_old * self.motion_body_pos[f_old] + w_new * new_motion.body_positions[f_new] ) self.motion_body_quats[f] = quat_slerp( self.motion_body_quats[f_old], new_motion.body_quaternions[f_new], w_new @@ -720,52 +1037,65 @@ class PlannerController(StandingEncoderDecoder): self.init_ref_quat = self.motion_body_quats[0].copy() def _heading_apply_delta(self): - delta = quat_mul(heading_quat(self.heading_init_base_quat).astype(np.float32), - heading_quat_inv(self.init_ref_quat).astype(np.float32)) + delta = quat_mul( + heading_quat(self.heading_init_base_quat).astype(np.float32), + heading_quat_inv(self.init_ref_quat).astype(np.float32), + ) if self.delta_heading: h = self.delta_heading / 2.0 - delta = quat_mul(np.array([np.cos(h),0,0,np.sin(h)], np.float32), delta) + delta = quat_mul(np.array([np.cos(h), 0, 0, np.sin(h)], np.float32), delta) return delta def _anchor_6d(self, base_quat, ref_quat=None): - if ref_quat is None: ref_quat = self.init_ref_quat + if ref_quat is None: + ref_quat = self.init_ref_quat new_ref = quat_mul(self._heading_apply_delta(), ref_quat.astype(np.float32)) return quat_to_6d(quat_mul(quat_conj(base_quat.astype(np.float32)), new_ref)) def build_encoder_obs(self): - obs = np.zeros(1762, np.float32); obs[0] = float(self.encode_mode) + obs = np.zeros(1762, np.float32) + obs[0] = float(self.encode_mode) with self.motion_lock: if self.encode_mode == 2: # SMPL whole-body imitation: the 720-dim SMPL window carries the # target pose; the planner reference frame supplies anchor + wrist. rf = min(self.ref_cursor, self.motion_timesteps - 1) - ref_pos = self.motion_joint_positions[rf].astype(np.float32) + ref_pos = self.motion_joint_positions[rf].astype(np.float32) ref_quat = self.motion_body_quats[rf].astype(np.float32) anchor = self._anchor_6d(self.h_quat[0], ref_quat) - wrist = ref_pos[WRIST_IL] + wrist = ref_pos[WRIST_IL] obs[922:1642] = self.smpl_joints_10frame_step1 for f in range(10): - obs[1642+6*f:1642+6*(f+1)] = anchor - obs[1702+6*f:1702+6*(f+1)] = wrist + obs[1642 + 6 * f : 1642 + 6 * (f + 1)] = anchor + obs[1702 + 6 * f : 1702 + 6 * (f + 1)] = wrist return obs for f in range(10): - tf = min(self.ref_cursor + f*5 if self.playing else self.ref_cursor, - self.motion_timesteps - 1) - obs[4+29*f:4+29*(f+1)] = self.motion_joint_positions[tf].astype(np.float32) + tf = min( + self.ref_cursor + f * 5 if self.playing else self.ref_cursor, self.motion_timesteps - 1 + ) + obs[4 + 29 * f : 4 + 29 * (f + 1)] = self.motion_joint_positions[tf].astype(np.float32) if self.playing: - obs[294+29*f:294+29*(f+1)] = self.motion_joint_velocities[tf].astype(np.float32) - obs[601+6*f:601+6*(f+1)] = self._anchor_6d( - self.h_quat[0], self.motion_body_quats[tf].astype(np.float32)) + obs[294 + 29 * f : 294 + 29 * (f + 1)] = self.motion_joint_velocities[tf].astype( + np.float32 + ) + obs[601 + 6 * f : 601 + 6 * (f + 1)] = self._anchor_6d( + self.h_quat[0], self.motion_body_quats[tf].astype(np.float32) + ) return obs def step(self, robot_obs, update_encoder, debug=False): if robot_obs and (self.first_motion or self.reinit_heading): q = None if "imu.quat.w" in robot_obs: - q = np.array([ - robot_obs["imu.quat.w"], robot_obs["imu.quat.x"], - robot_obs["imu.quat.y"], robot_obs["imu.quat.z"], - ], np.float64) + q = np.array( + [ + robot_obs["imu.quat.w"], + robot_obs["imu.quat.x"], + robot_obs["imu.quat.y"], + robot_obs["imu.quat.z"], + ], + np.float64, + ) else: q = robot_obs.get("imu.quaternion") if q is not None: @@ -789,16 +1119,24 @@ class PlannerController(StandingEncoderDecoder): if self.motion_timesteps > 0: self.ref_cursor = min(self.ref_cursor + 1, self.motion_timesteps - 1) + # ── Keyboard ────────────────────────────────────────────────────────────────── + class RawKeyboard: def __init__(self): self.fd = sys.stdin.fileno() self.old = termios.tcgetattr(self.fd) - def __enter__(self): tty.setcbreak(self.fd); return self - def __exit__(self, *_): termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old) + + def __enter__(self): + tty.setcbreak(self.fd) + return self + + def __exit__(self, *_): + termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old) + def get_key(self): - return sys.stdin.read(1) if select.select([sys.stdin],[],[],0)[0] else None + return sys.stdin.read(1) if select.select([sys.stdin], [], [], 0)[0] else None def drain_keyboard(kb, ms, controller=None) -> bool: @@ -812,21 +1150,32 @@ def drain_keyboard(kb, ms, controller=None) -> bool: quit_requested = True return quit_requested + def process_keyboard(key, ms, controller=None): - if key is None: return False - if key == '\x1b': return True - if key == ' ': - ms.mode = LM.IDLE; ms.speed = ms.height = -1.0 - ms.has_movement = False; ms.needs_replan = True - if controller: controller.playing = False; controller.reinit_heading = True - print("\n >> EMERGENCY STOP -> IDLE"); return False - if key in ('r','R'): - ms.needs_replan = True; print("\n >> Manual replan"); return False - if key in ('m','M'): + if key is None: + return False + if key == "\x1b": + return True + if key == " ": + ms.mode = LM.IDLE + ms.speed = ms.height = -1.0 + ms.has_movement = False + ms.needs_replan = True + if controller: + controller.playing = False + controller.reinit_heading = True + print("\n >> EMERGENCY STOP -> IDLE") + return False + if key in ("r", "R"): + ms.needs_replan = True + print("\n >> Manual replan") + return False + if key in ("m", "M"): if controller is not None and getattr(controller, "smpl_motion", None) is not None: if controller.encode_mode == 2: controller.encode_mode = 0 - controller.playing = True; controller.reinit_heading = True + controller.playing = True + controller.reinit_heading = True ms.needs_replan = True print("\n >> Motion OFF -> locomotion (mode 0). WASD to drive.") else: @@ -837,56 +1186,69 @@ def process_keyboard(key, ms, controller=None): else: print("\n >> No motion loaded (start with --motion-file)") return False - if key in ('n','N','p','P'): - ms.motion_set_idx = (ms.motion_set_idx + (1 if key in ('n','N') else -1)) % len(MOTION_SETS) + if key in ("n", "N", "p", "P"): + ms.motion_set_idx = (ms.motion_set_idx + (1 if key in ("n", "N") else -1)) % len(MOTION_SETS) name, modes = MOTION_SETS[ms.motion_set_idx] print(f"\n >> Motion set: {name}") - [print(f" {i+1}: {m.name}") for i,m in enumerate(modes)] + [print(f" {i + 1}: {m.name}") for i, m in enumerate(modes)] return False - if key.isdigit() and key not in ('9','0'): - idx = int(key) - 1; modes = MOTION_SETS[ms.motion_set_idx][1] + if key.isdigit() and key not in ("9", "0"): + idx = int(key) - 1 + modes = MOTION_SETS[ms.motion_set_idx][1] if 0 <= idx < len(modes): - ms.mode = modes[idx]; ms.needs_replan = True - if controller: controller.playing = True; controller.reinit_heading = True + ms.mode = modes[idx] + ms.needs_replan = True + if controller: + controller.playing = True + controller.reinit_heading = True print(f"\n >> Mode: {LM(ms.mode).name} ({ms.mode}) [replanning...]") return False - if key == '9': - ms.speed = max(0.0, (ms.speed if ms.speed>=0 else 1.0) - 0.1) - print(f"\n >> Speed: {ms.speed:.1f}"); return False - if key == '0': - ms.speed = min(5.0, (ms.speed if ms.speed>=0 else 1.0) + 0.1) - print(f"\n >> Speed: {ms.speed:.1f}"); return False - if key == '-': - ms.height = max(0.2, (ms.height if ms.height>=0 else DEFAULT_HEIGHT) - 0.02) - print(f"\n >> Height: {ms.height:.2f}"); return False - if key == '=': - ms.height = min(1.0, (ms.height if ms.height>=0 else DEFAULT_HEIGHT) + 0.02) - print(f"\n >> Height: {ms.height:.2f}"); return False - if key.lower() == 'w': ms.movement_angle = ms.facing_angle - elif key.lower() == 's': ms.movement_angle = ms.facing_angle + math.pi - elif key.lower() == 'a': ms.movement_angle = ms.facing_angle + math.pi/2 - elif key.lower() == 'd': ms.movement_angle = ms.facing_angle - math.pi/2 - if key.lower() in ('w','s','a','d'): + if key == "9": + ms.speed = max(0.0, (ms.speed if ms.speed >= 0 else 1.0) - 0.1) + print(f"\n >> Speed: {ms.speed:.1f}") + return False + if key == "0": + ms.speed = min(5.0, (ms.speed if ms.speed >= 0 else 1.0) + 0.1) + print(f"\n >> Speed: {ms.speed:.1f}") + return False + if key == "-": + ms.height = max(0.2, (ms.height if ms.height >= 0 else DEFAULT_HEIGHT) - 0.02) + print(f"\n >> Height: {ms.height:.2f}") + return False + if key == "=": + ms.height = min(1.0, (ms.height if ms.height >= 0 else DEFAULT_HEIGHT) + 0.02) + print(f"\n >> Height: {ms.height:.2f}") + return False + if key.lower() == "w": + ms.movement_angle = ms.facing_angle + elif key.lower() == "s": + ms.movement_angle = ms.facing_angle + math.pi + elif key.lower() == "a": + ms.movement_angle = ms.facing_angle + math.pi / 2 + elif key.lower() == "d": + ms.movement_angle = ms.facing_angle - math.pi / 2 + if key.lower() in ("w", "s", "a", "d"): ms.has_movement = ms.needs_replan = True if controller: controller.playing = True print(f"\n >> Move {key.upper()} (replanning...)") - elif key.lower() == 'q': + elif key.lower() == "q": ms.facing_angle += 0.1 - if controller: controller.delta_heading += 0.1 + if controller: + controller.delta_heading += 0.1 print(f"\n >> Facing: {math.degrees(ms.facing_angle):.0f}°") - elif key.lower() == 'e': + elif key.lower() == "e": ms.facing_angle -= 0.1 - if controller: controller.delta_heading -= 0.1 + if controller: + controller.delta_heading -= 0.1 print(f"\n >> Facing: {math.degrees(ms.facing_angle):.0f}°") return False -_joy_prev_active = False - def _parse_wireless(wr): """Parse wireless_remote (bytes or int-array) into (lx, ly, rx, ry).""" import struct as _st + if not isinstance(wr, (bytes, bytearray)): wr = bytes(wr) if len(wr) < 24: @@ -900,7 +1262,6 @@ def _parse_wireless(wr): def process_joystick(obs, ms, controller=None): """Joystick mirrors keyboard: left stick=WASD, right stick X=Q/E, right stick Y=height.""" - global _joy_prev_active wr = obs.get("wireless_remote") if wr is None: return @@ -921,11 +1282,11 @@ def process_joystick(obs, ms, controller=None): if left_active: ms.movement_angle = ms.facing_angle + math.atan2(-lx, -ly) ms.has_movement = True - if not _joy_prev_active: + if not ms.joy_prev_active: ms.needs_replan = True - _joy_prev_active = True - elif _joy_prev_active and not (abs(rx) > 0 or abs(ry) > 0): - _joy_prev_active = False + ms.joy_prev_active = True + elif ms.joy_prev_active and not (abs(rx) > 0 or abs(ry) > 0): + ms.joy_prev_active = False ms.has_movement = False # Right stick X → Q/E (facing rotation, ~1 rad/s at full deflection) diff --git a/src/lerobot/robots/unitree_g1/controllers/sonic_whole_body.py b/src/lerobot/robots/unitree_g1/controllers/sonic_whole_body.py index 7fba49b27..56958c742 100644 --- a/src/lerobot/robots/unitree_g1/controllers/sonic_whole_body.py +++ b/src/lerobot/robots/unitree_g1/controllers/sonic_whole_body.py @@ -18,7 +18,6 @@ import logging -import numpy as np import onnxruntime as ort from huggingface_hub import hf_hub_download @@ -32,13 +31,13 @@ from lerobot.robots.unitree_g1.controllers.sonic_pipeline import ( MovementState, PlannerController, SonicPlanner, + _ort_providers, + _snapshot_ms, clamp_mode_params, compute_kp_kd, lowstate_to_obs, process_joystick, should_replan_request, - _ort_providers, - _snapshot_ms, ) logger = logging.getLogger(__name__)