fix(unitree_g1): satisfy ruff lint/format and address review comments

This commit is contained in:
Martino Russi
2026-06-25 14:36:05 +02:00
parent 804c76bcc2
commit dfbc25c58f
6 changed files with 814 additions and 405 deletions
+18 -22
View File
@@ -66,12 +66,12 @@ def canonicalize_smpl_joints(smpl_joints: np.ndarray, root_aa: np.ndarray) -> np
Returns:
(T, 24, 3) per-frame root-orientation-removed joints.
"""
from scipy.spatial.transform import Rotation as R
from scipy.spatial.transform import Rotation
rx90 = R.from_euler("x", 90, degrees=True) # smpl_root_ytoz_up
base120 = R.from_quat([0.5, 0.5, 0.5, 0.5]) # remove_smpl_base_rot
a = rx90 * R.from_rotvec(root_aa) # z-up root quat (left-mult)
b_inv = base120 * a.inv() # inv(remove_smpl_base_rot(a))
rx90 = Rotation.from_euler("x", 90, degrees=True) # smpl_root_ytoz_up
base120 = Rotation.from_quat([0.5, 0.5, 0.5, 0.5]) # remove_smpl_base_rot
a = rx90 * Rotation.from_rotvec(root_aa) # z-up root quat (left-mult)
b_inv = base120 * a.inv() # inv(remove_smpl_base_rot(a))
return np.einsum("tij,tkj->tki", b_inv.as_matrix(), smpl_joints).astype(np.float32)
@@ -87,9 +87,7 @@ class SmplMotion:
self.loop = loop
if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (N_JOINTS, JOINT_DIM):
raise ValueError(
f"Expected smpl_joints (T, {N_JOINTS}, {JOINT_DIM}), got {smpl_joints.shape}"
)
raise ValueError(f"Expected smpl_joints (T, {N_JOINTS}, {JOINT_DIM}), got {smpl_joints.shape}")
# Reference clips store world-frame joints; the encoder wants per-frame
# root-orientation-removed joints. Canonicalize when we have the root pose.
@@ -109,10 +107,7 @@ class SmplMotion:
[f0_j0_xyz, f0_j1_xyz, ..., f9_j23_xyz].
"""
idx = np.arange(start, start + WINDOW)
if self.loop:
idx = np.mod(idx, self.num_frames)
else:
idx = np.clip(idx, 0, self.num_frames - 1)
idx = np.mod(idx, self.num_frames) if self.loop else np.clip(idx, 0, self.num_frames - 1)
return self.smpl_joints[idx].reshape(-1).astype(np.float32)
def reset(self):
@@ -135,30 +130,31 @@ def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--motion", required=True, help="Path to motion .npz")
parser.add_argument("--no-loop", action="store_true")
parser.add_argument("--no-canon", action="store_true",
help="Skip canonicalization (feed raw stored joints)")
parser.add_argument(
"--no-canon", action="store_true", help="Skip canonicalization (feed raw stored joints)"
)
args = parser.parse_args()
m = SmplMotion(args.motion, loop=not args.no_loop, canonicalize=not args.no_canon)
duration = m.num_frames / m.fps
print(f"Loaded '{args.motion}'")
print(f" frames={m.num_frames} fps={m.fps:.1f} duration={duration:.1f}s")
print(f" smpl_joints={m.smpl_joints.shape} canonicalized={m.canonicalized} "
f"pose_aa={None if m.pose_aa is None else m.pose_aa.shape} "
f"transl={None if m.transl is None else m.transl.shape}")
print(
f" smpl_joints={m.smpl_joints.shape} canonicalized={m.canonicalized} "
f"pose_aa={None if m.pose_aa is None else m.pose_aa.shape} "
f"transl={None if m.transl is None else m.transl.shape}"
)
# Sanity: after canonicalization the per-frame body heading should be fixed.
j = m.smpl_joints
v = (j[:, 2, :2] - j[:, 1, :2]) # R_hip - L_hip, horizontal
v = j[:, 2, :2] - j[:, 1, :2] # R_hip - L_hip, horizontal
a = np.arctan2(v[:, 1], v[:, 0])
rlen = np.clip(np.hypot(np.cos(a).mean(), np.sin(a).mean()), 1e-9, 1.0)
circ_std = np.degrees(np.sqrt(-2 * np.log(rlen)))
print(f" hip-heading circ-std={circ_std:.1f} deg "
f"(~0 => orientation removed; large => world-frame)")
print(f" hip-heading circ-std={circ_std:.1f} deg (~0 => orientation removed; large => world-frame)")
w0 = m.window(0)
print(f" window(0): shape={w0.shape} (expected {SMPL_OBS_DIM}) "
f"min={w0.min():.3f} max={w0.max():.3f}")
print(f" window(0): shape={w0.shape} (expected {SMPL_OBS_DIM}) min={w0.min():.3f} max={w0.max():.3f}")
assert w0.shape == (SMPL_OBS_DIM,), "window must be 720-dim for obs[922:1642]"
# Simulate a few control ticks.
+5 -5
View File
@@ -42,15 +42,14 @@ def load_pkl(path: str) -> dict:
return joblib.load(path)
except Exception:
# joblib clips are zlib-compressed pickles; fall back to manual inflate.
import contextlib
import pickle
import zlib
with open(path, "rb") as f:
raw = f.read()
try:
with contextlib.suppress(zlib.error):
raw = zlib.decompress(raw)
except zlib.error:
pass
return pickle.loads(raw)
@@ -80,8 +79,9 @@ def main():
np.savez_compressed(args.out, **out)
dur = smpl_joints.shape[0] / float(out["fps"])
print(f"Wrote {args.out}")
print(f" frames={smpl_joints.shape[0]} fps={float(out['fps']):.1f} duration={dur:.1f}s "
f"keys={sorted(out)}")
print(
f" frames={smpl_joints.shape[0]} fps={float(out['fps']):.1f} duration={dur:.1f}s keys={sorted(out)}"
)
if __name__ == "__main__":
+93 -56
View File
@@ -25,15 +25,16 @@ For teleop integration use --robot.controller=SonicWholeBodyController instead.
"""
import argparse
import contextlib
import faulthandler
import gc
import sys
import time
import numpy as np
from motion_loader import SmplMotion
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
from lerobot.robots.unitree_g1.controllers.sonic_whole_body import SonicRuntime
from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
CONTROL_DT,
DEFAULT_ANGLES,
@@ -43,41 +44,57 @@ from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
compute_kp_kd,
drain_keyboard,
)
from lerobot.robots.unitree_g1.controllers.sonic_whole_body import SonicRuntime
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
from motion_loader import SmplMotion
def main():
parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control")
parser.add_argument("--ip", type=str, default=None,
help="Robot IP for real hardware (e.g. 192.168.123.164). "
"Omit for simulation.")
parser.add_argument("--log-csv", action="store_true",
help="Write /tmp/sonic_pose_log.csv (disabled by default for teleop perf)")
parser.add_argument("--cpu", action="store_true",
help="Force CPU ONNX Runtime (skip CUDA even if onnxruntime-gpu is installed)")
parser.add_argument("--headless", action="store_true",
help="Ignored for sim (stock UnitreeG1 uses hub MuJoCo defaults)")
parser.add_argument("--gamepad", action="store_true",
help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)")
parser.add_argument("--keyboard-only", action="store_true",
help="Ignore wireless gamepad (terminal keyboard only)")
parser.add_argument("--motion-file", type=str, default=None,
help="Play an SMPL motion clip (.npz) via SONIC whole-body mode "
"(encode_mode=2) instead of locomotion planning.")
parser.add_argument("--no-loop", action="store_true",
help="With --motion-file, play once instead of looping")
parser.add_argument(
"--ip",
type=str,
default=None,
help="Robot IP for real hardware (e.g. 192.168.123.164). Omit for simulation.",
)
parser.add_argument(
"--log-csv",
action="store_true",
help="Write /tmp/sonic_pose_log.csv (disabled by default for teleop perf)",
)
parser.add_argument(
"--cpu",
action="store_true",
help="Force CPU ONNX Runtime (skip CUDA even if onnxruntime-gpu is installed)",
)
parser.add_argument(
"--headless", action="store_true", help="Ignored for sim (stock UnitreeG1 uses hub MuJoCo defaults)"
)
parser.add_argument(
"--gamepad",
action="store_true",
help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)",
)
parser.add_argument(
"--keyboard-only", action="store_true", help="Ignore wireless gamepad (terminal keyboard only)"
)
parser.add_argument(
"--motion-file",
type=str,
default=None,
help="Play an SMPL motion clip (.npz) via SONIC whole-body mode "
"(encode_mode=2) instead of locomotion planning.",
)
parser.add_argument(
"--no-loop", action="store_true", help="With --motion-file, play once instead of looping"
)
args = parser.parse_args()
# Surface native crashes (onnxruntime / mujoco) with a real traceback, and
# avoid losing buffered diagnostics if the process dies mid-loop.
faulthandler.enable()
try:
with contextlib.suppress(Exception):
sys.stdout.reconfigure(line_buffering=True)
except Exception:
pass
print("=" * 60)
print("SONIC planner - full mode control")
@@ -111,20 +128,24 @@ def main():
motion = None
if args.motion_file:
motion = SmplMotion(args.motion_file, loop=not args.no_loop)
controller.smpl_motion = motion # lets 'M' key toggle playback
controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation
controller.smpl_motion = motion # lets 'M' key toggle playback
controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation
dur = motion.num_frames / motion.fps
print(f"\n[Motion] SMPL whole-body playback: {args.motion_file}")
print(f" frames={motion.num_frames} fps={motion.fps:.1f} "
f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2")
print(
f" frames={motion.num_frames} fps={motion.fps:.1f} "
f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2"
)
print(" Press 'M' to toggle SMPL playback <-> locomotion at runtime.")
runtime.controller.print_input_diagnostics()
print(f"\nStarting: {MOTION_SETS[0][0]} (default mode: {LM(ms.mode).name})")
[print(f" {i+1}: {m.name}") for i, m in enumerate(MOTION_SETS[0][1])]
print("\n[Ready] Click THIS terminal, then W/A/S/D to move. "
"1-6 change mode, 9/0 speed, Esc quit.\n", flush=True)
[print(f" {i + 1}: {m.name}") for i, m in enumerate(MOTION_SETS[0][1])]
print(
"\n[Ready] Click THIS terminal, then W/A/S/D to move. 1-6 change mode, 9/0 speed, Esc quit.\n",
flush=True,
)
# Sim hub publishes wireless_remote bytes that can fight terminal WASD.
base_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None)
@@ -137,22 +158,25 @@ def main():
time.sleep(1.0)
last_status = time.time() - 2.1
loop_t = enc_t = dec_t = obs_t = act_t = []
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
slow_n = blend_n = 0
stall_src = ""
did_blend = False
prev_end = time.time()
t_start = time.time()
log_path = "/tmp/sonic_pose_log.csv"
jnames = [m.name for m in G1_29_JointIndex]
log_ctx = open(log_path, "w") if args.log_csv else None
log_ctx = open(log_path, "w") if args.log_csv else None # noqa: SIM115
if log_ctx:
log_ctx.write("t,step,cursor,ts,blend,mode," +
",".join(f"q{i}" for i in range(29)) + "," +
",".join(f"ref{i}" for i in range(29)) + "," +
",".join(f"act{i}" for i in range(29)) +
",delta_max,action_norm,token_norm\n")
log_ctx.write(
"t,step,cursor,ts,blend,mode,"
+ ",".join(f"q{i}" for i in range(29))
+ ","
+ ",".join(f"ref{i}" for i in range(29))
+ ","
+ ",".join(f"act{i}" for i in range(29))
+ ",delta_max,action_norm,token_norm\n"
)
try:
while not robot._shutdown_event.is_set():
@@ -192,42 +216,55 @@ def main():
q_r = np.array([obs.get(f"{n}.q", 0) for n in jnames])
a_v = np.array([action.get(f"{n}.q", 0) for n in jnames])
cur, ts = controller.ref_cursor, controller.motion_timesteps
q_ref = controller.motion_joint_positions[min(cur, ts - 1)] if ts > 0 else np.zeros(29)
log_ctx.write(f"{t_rel:.4f},{runtime.step},{cur},{ts},{int(did_blend)},{ms.mode}," +
",".join(f"{v:.6f}" for v in q_r) + "," +
",".join(f"{v:.6f}" for v in q_ref) + "," +
",".join(f"{v:.6f}" for v in a_v) + "," +
f"{np.max(np.abs(a_v - q_r)):.6f},"
f"{np.linalg.norm(a_v):.6f},"
f"{np.linalg.norm(controller.token):.6f}\n")
q_ref = (
controller.motion_joint_positions[min(cur, ts - 1)] if ts > 0 else np.zeros(29)
)
log_ctx.write(
f"{t_rel:.4f},{runtime.step},{cur},{ts},{int(did_blend)},{ms.mode},"
+ ",".join(f"{v:.6f}" for v in q_r)
+ ","
+ ",".join(f"{v:.6f}" for v in q_ref)
+ ","
+ ",".join(f"{v:.6f}" for v in a_v)
+ ","
+ f"{np.max(np.abs(a_v - q_r)):.6f},"
f"{np.linalg.norm(a_v):.6f},"
f"{np.linalg.norm(controller.token):.6f}\n"
)
did_blend = False
now = time.time()
loop_ms = 1000 * (now - t0)
if loop_ms > 50:
stall_src = (f"[STALL] {loop_ms:.0f}ms: "
f"obs={obs_t[-1]:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}")
stall_src = (
f"[STALL] {loop_ms:.0f}ms: "
f"obs={obs_t[-1]:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}"
)
if loop_ms > CONTROL_DT * 1500:
slow_n += 1
if now - last_status > 2.0:
def _avg(lst):
return sum(lst) / len(lst) if lst else 0
hz = 1000 / _avg(loop_t) if _avg(loop_t) else 0
print(f"\r {ms.status_line()} step={runtime.step} "
f"ref={controller.ref_cursor}/{controller.motion_timesteps} "
f"loop={_avg(loop_t):.1f}ms(max={max(loop_t, default=0):.1f}) hz={hz:.0f} "
f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} "
f"slow={slow_n} blends={blend_n}", end="", flush=True)
print(
f"\r {ms.status_line()} step={runtime.step} "
f"ref={controller.ref_cursor}/{controller.motion_timesteps} "
f"loop={_avg(loop_t):.1f}ms(max={max(loop_t, default=0):.1f}) hz={hz:.0f} "
f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} "
f"slow={slow_n} blends={blend_n}",
end="",
flush=True,
)
if stall_src:
print(f"\n {stall_src}")
stall_src = ""
last_status = now
loop_t = enc_t = dec_t = obs_t = act_t = []
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
slow_n = blend_n = 0
prev_end = time.time()
gc_timer += CONTROL_DT
if gc_timer >= 10.0:
gc.collect()
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unitree G1 locomotion controllers (Groot, Holosoma, SONIC)."""
__all__ = [
File diff suppressed because it is too large Load Diff
@@ -18,7 +18,6 @@
import logging
import numpy as np
import onnxruntime as ort
from huggingface_hub import hf_hub_download
@@ -32,13 +31,13 @@ from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
MovementState,
PlannerController,
SonicPlanner,
_ort_providers,
_snapshot_ms,
clamp_mode_params,
compute_kp_kd,
lowstate_to_obs,
process_joystick,
should_replan_request,
_ort_providers,
_snapshot_ms,
)
logger = logging.getLogger(__name__)