Compare commits

...

10 Commits

Author SHA1 Message Date
Martino Russi f7afeb9cb2 feat(unitree_g1): add mjlab BeyondMimic motion-imitation controller
Generalized full-body controller that deploys any mjlab BeyondMimic
motion-tracking policy (ONNX, trained without state estimation) together
with its reference clip. Gains/action-scale/default pose are read from the
ONNX metadata and the trajectory from motion.npz, so a new motion only
needs its (onnx, npz) pair -- selectable via constructor args or the
MJLAB_ONNX_PATH / MJLAB_MOTION_PATH env vars.

Key fix: the policy's anchor body is torso_link while base_ang_vel comes
from the pelvis IMU. The sim publishes the pelvis as imu_state, so the
torso orientation for motion_anchor_ori_b is reconstructed from the pelvis
quaternion + the three waist joints. Without this the anchor used the
pelvis orientation and any dynamic motion (e.g. spin kick) toppled.

Registered as MjlabMotionImitationController in make_locomotion_controller
and bundles the double-spin-kick policy + clip as the default.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-27 16:57:21 +02:00
Martino Russi 3b6de2fdf8 fix(unitree_g1): fix typo flagged by spellchecker in motion_loader docstring 2026-06-26 13:46:33 +02:00
Martino Russi 744f3667c0 fix(unitree_g1): silence bandit findings in SONIC example/pipeline 2026-06-26 13:40:53 +02:00
Martino Russi fdde436776 Merge branch 'main' into feat/unitree_g1_sonic_rebased 2026-06-26 13:35:28 +02:00
Martino Russi 5c683c65c6 Merge branch 'main' into feat/unitree_g1_sonic_rebased 2026-06-25 14:38:48 +02:00
Martino Russi dfbc25c58f fix(unitree_g1): satisfy ruff lint/format and address review comments 2026-06-25 14:37:44 +02:00
Martino Russi 804c76bcc2 Merge branch 'main' into feat/unitree_g1_sonic_rebased 2026-06-25 13:41:04 +02:00
Martino Russi e6afa69be9 add motion loader 2026-06-17 12:31:08 +02:00
Martino Russi 31d1439e29 add custom motion loader 2026-06-17 12:29:36 +02:00
Martino Russi 1c118c6359 feat(unitree_g1): add SONIC whole-body controller
Move GrootLocomotionController and HolosomaLocomotionController into a new
controllers/ subpackage and add the SONIC whole-body controller
(sonic_pipeline.py, sonic_whole_body.py) plus the examples/unitree_g1/sonic.py
standalone script. UnitreeG1 now honors a controller's kp/kd, calls
controller.shutdown() on disconnect, and skips arm publishing for full_body
controllers.
2026-06-16 17:12:20 +02:00
27 changed files with 2343 additions and 7 deletions
+170
View File
@@ -0,0 +1,170 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Load an SMPL motion clip and expose it in SONIC's encoder format.
SONIC's whole-body tracking mode (``encode_mode == 2``) consumes a flat
720-vector ``smpl_joints_10frame_step1`` = 10 consecutive frames x 24 SMPL
joints x 3 (xyz) at 50 Hz.
IMPORTANT - frame convention: the encoder expects each frame's joints with the
body's *root orientation removed* (per-frame canonical), exactly like the live
deploy stream's ``smpl_joints_local`` (see ``process_smpl_joints`` in the GEAR
PICO teleop and ``smpl_joints_multi_future_local`` in training). The reference
``smpl_filtered`` clips instead store **world-frame** joints (heading retained),
so feeding them raw makes the robot move but track poorly / never face-forward.
This loader therefore canonicalizes on load using the clip's per-frame root
orientation (``pose_aa[:, :3]``):
A = Rx(+90deg) * rotvec(pose_aa[:, :3]) # y-up -> z-up root quat
local = base120 * A^-1 * joints # remove root orient
with ``base120 = quat(0.5,0.5,0.5,0.5)`` (SMPL base rotation). This reproduces
the deployed transform (verified: per-frame hip-heading std -> 0).
Clip is read from a numpy ``.npz``. Expected keys:
smpl_joints : (T, 24, 3) float32 -- world-frame joint positions, 50 fps
pose_aa : (T, 72) float32 -- SMPL axis-angle (root = [:, :3])
transl : (T, 3) float32 -- global root translation (optional)
fps : scalar
Example:
python examples/unitree_g1/motion_loader.py \
--motion examples/unitree_g1/motions/walk_forward.npz
"""
import argparse
import numpy as np
WINDOW = 10 # frames per encoder window (smpl_joints_10frame_step1)
N_JOINTS = 24
JOINT_DIM = 3
SMPL_OBS_DIM = WINDOW * N_JOINTS * JOINT_DIM # 720
def canonicalize_smpl_joints(smpl_joints: np.ndarray, root_aa: np.ndarray) -> np.ndarray:
"""Remove per-frame root orientation -> SONIC ``smpl_joints_local`` format.
Args:
smpl_joints: (T, 24, 3) world-frame (z-up) SMPL joint positions.
root_aa: (T, 3) SMPL global-orient axis-angle (y-up convention).
Returns:
(T, 24, 3) per-frame root-orientation-removed joints.
"""
from scipy.spatial.transform import Rotation
rx90 = Rotation.from_euler("x", 90, degrees=True) # smpl_root_ytoz_up
base120 = Rotation.from_quat([0.5, 0.5, 0.5, 0.5]) # remove_smpl_base_rot
a = rx90 * Rotation.from_rotvec(root_aa) # z-up root quat (left-mult)
b_inv = base120 * a.inv() # inv(remove_smpl_base_rot(a))
return np.einsum("tij,tkj->tki", b_inv.as_matrix(), smpl_joints).astype(np.float32)
class SmplMotion:
"""A single SMPL clip with SONIC-format windowing."""
def __init__(self, path: str, loop: bool = True, canonicalize: bool = True):
data = np.load(path)
smpl_joints = data["smpl_joints"].astype(np.float32) # (T, 24, 3)
self.pose_aa = data["pose_aa"].astype(np.float32) if "pose_aa" in data.files else None
self.transl = data["transl"].astype(np.float32) if "transl" in data.files else None
self.fps = float(data["fps"]) if "fps" in data.files else 50.0
self.loop = loop
if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (N_JOINTS, JOINT_DIM):
raise ValueError(f"Expected smpl_joints (T, {N_JOINTS}, {JOINT_DIM}), got {smpl_joints.shape}")
# Reference clips store world-frame joints; the encoder wants per-frame
# root-orientation-removed joints. Canonicalize when we have the root pose.
self.canonicalized = False
if canonicalize and self.pose_aa is not None:
smpl_joints = canonicalize_smpl_joints(smpl_joints, self.pose_aa[:, :3])
self.canonicalized = True
self.smpl_joints = smpl_joints
self.num_frames = self.smpl_joints.shape[0]
self._cursor = 0
def window(self, start: int) -> np.ndarray:
"""Return the 720-vector for the 10-frame window beginning at ``start``.
Frames are laid out oldest->newest, joint-major within a frame:
[f0_j0_xyz, f0_j1_xyz, ..., f9_j23_xyz].
"""
idx = np.arange(start, start + WINDOW)
idx = np.mod(idx, self.num_frames) if self.loop else np.clip(idx, 0, self.num_frames - 1)
return self.smpl_joints[idx].reshape(-1).astype(np.float32)
def reset(self):
self._cursor = 0
def step(self) -> np.ndarray:
"""Advance one frame and return the current 720-vector window."""
w = self.window(self._cursor)
self._cursor += 1
if self.loop:
self._cursor %= self.num_frames
return w
@property
def done(self) -> bool:
return (not self.loop) and (self._cursor + WINDOW >= self.num_frames)
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--motion", required=True, help="Path to motion .npz")
parser.add_argument("--no-loop", action="store_true")
parser.add_argument(
"--no-canon", action="store_true", help="Skip canonicalization (feed raw stored joints)"
)
args = parser.parse_args()
m = SmplMotion(args.motion, loop=not args.no_loop, canonicalize=not args.no_canon)
duration = m.num_frames / m.fps
print(f"Loaded '{args.motion}'")
print(f" frames={m.num_frames} fps={m.fps:.1f} duration={duration:.1f}s")
print(
f" smpl_joints={m.smpl_joints.shape} canonicalized={m.canonicalized} "
f"pose_aa={None if m.pose_aa is None else m.pose_aa.shape} "
f"transl={None if m.transl is None else m.transl.shape}"
)
# Sanity: after canonicalization the per-frame body heading should be fixed.
j = m.smpl_joints
v = j[:, 2, :2] - j[:, 1, :2] # R_hip - L_hip, horizontal
a = np.arctan2(v[:, 1], v[:, 0])
rlen = np.clip(np.hypot(np.cos(a).mean(), np.sin(a).mean()), 1e-9, 1.0)
circ_std = np.degrees(np.sqrt(-2 * np.log(rlen)))
print(f" hip-heading circ-std={circ_std:.1f} deg (~0 => orientation removed; large => world-frame)")
w0 = m.window(0)
print(f" window(0): shape={w0.shape} (expected {SMPL_OBS_DIM}) min={w0.min():.3f} max={w0.max():.3f}")
assert w0.shape == (SMPL_OBS_DIM,), "window must be 720-dim for obs[922:1642]"
# Simulate a few control ticks.
print(" stepping 5 ticks:")
for t in range(5):
w = m.step()
print(f" t={t} cursor={m._cursor} window_norm={np.linalg.norm(w):.2f}")
print("OK: motion loads and yields SONIC-format 720-vec windows.")
if __name__ == "__main__":
main()
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+88
View File
@@ -0,0 +1,88 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Convert a GEAR-SONIC / BONES-SEED ``smpl_filtered`` clip (.pkl) to .npz.
The reference clips are zlib-compressed joblib pickles holding a dict with
``pose_aa`` (T, 72), ``transl`` (T, 3), ``smpl_joints`` (T, 24, 3), ``fps``.
``motion_loader.SmplMotion`` consumes the .npz form so the runtime needs no
joblib dependency. Canonicalization (root-orientation removal) happens at load
time in ``motion_loader``, so this converter just repackages the raw arrays.
Run this in an environment that has ``joblib`` (e.g. the sonic teleop venv):
python examples/unitree_g1/pkl_to_npz.py \
--pkl sample_data/smpl_filtered/walk_forward_amateur_001__A001.pkl \
--out examples/unitree_g1/motions/walk_forward.npz
"""
import argparse
from pathlib import Path
import numpy as np
def load_pkl(path: str) -> dict:
try:
import joblib
return joblib.load(path)
except Exception:
# joblib clips are zlib-compressed pickles; fall back to manual inflate.
import contextlib
import pickle # nosec B403 - loads trusted local SMPL clips authored by the user
import zlib
with open(path, "rb") as f:
raw = f.read()
with contextlib.suppress(zlib.error):
raw = zlib.decompress(raw)
return pickle.loads(raw) # nosec B301 - local, user-provided motion files only
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--pkl", required=True, help="Input smpl_filtered .pkl")
parser.add_argument("--out", required=True, help="Output .npz path")
args = parser.parse_args()
d = load_pkl(args.pkl)
if not isinstance(d, dict) or "smpl_joints" not in d:
raise ValueError(f"Unexpected pkl structure; keys={list(d) if isinstance(d, dict) else type(d)}")
smpl_joints = np.asarray(d["smpl_joints"], np.float32)
if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (24, 3):
raise ValueError(f"smpl_joints must be (T,24,3), got {smpl_joints.shape}")
out = {"smpl_joints": smpl_joints, "fps": np.float32(d.get("fps", 50.0))}
if "pose_aa" in d:
out["pose_aa"] = np.asarray(d["pose_aa"], np.float32)
else:
print("[warn] no pose_aa -> loader cannot canonicalize (will feed raw)")
if "transl" in d:
out["transl"] = np.asarray(d["transl"], np.float32)
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(args.out, **out)
dur = smpl_joints.shape[0] / float(out["fps"])
print(f"Wrote {args.out}")
print(
f" frames={smpl_joints.shape[0]} fps={float(out['fps']):.1f} duration={dur:.1f}s keys={sorted(out)}"
)
if __name__ == "__main__":
main()
+294
View File
@@ -0,0 +1,294 @@
#!/usr/bin/env python
"""
SONIC planner with full mode control.
Keyboard controls:
N / P - next / previous motion set
1-8 - select mode within current set
WASD - movement direction
Q / E - rotate facing left / right
9 / 0 - decrease / increase speed
- / = - decrease / increase height
R - force replan
M - toggle SMPL motion playback <-> locomotion (needs --motion-file)
Space - emergency stop -> IDLE
Esc - quit
Gamepad controls (Unitree wireless controller):
Left stick Y - speed (forward = fast, back = stop)
Left stick X - movement direction (offset from facing)
Right stick X - facing direction (incremental rotation)
Right stick Y - height (up = tall 0.8m, down = low 0.1m)
Buttons - unused (mode selection is keyboard-only)
For teleop integration use --robot.controller=SonicWholeBodyController instead.
"""
import argparse
import contextlib
import faulthandler
import gc
import os
import sys
import tempfile
import time
import numpy as np
from motion_loader import SmplMotion
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
CONTROL_DT,
DEFAULT_ANGLES,
LM,
MOTION_SETS,
RawKeyboard,
compute_kp_kd,
drain_keyboard,
)
from lerobot.robots.unitree_g1.controllers.sonic_whole_body import SonicRuntime
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
def main():
parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control")
parser.add_argument(
"--ip",
type=str,
default=None,
help="Robot IP for real hardware (e.g. 192.168.123.164). Omit for simulation.",
)
parser.add_argument(
"--log-csv",
action="store_true",
help="Write /tmp/sonic_pose_log.csv (disabled by default for teleop perf)",
)
parser.add_argument(
"--cpu",
action="store_true",
help="Force CPU ONNX Runtime (skip CUDA even if onnxruntime-gpu is installed)",
)
parser.add_argument(
"--headless", action="store_true", help="Ignored for sim (stock UnitreeG1 uses hub MuJoCo defaults)"
)
parser.add_argument(
"--gamepad",
action="store_true",
help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)",
)
parser.add_argument(
"--keyboard-only", action="store_true", help="Ignore wireless gamepad (terminal keyboard only)"
)
parser.add_argument(
"--motion-file",
type=str,
default=None,
help="Play an SMPL motion clip (.npz) via SONIC whole-body mode "
"(encode_mode=2) instead of locomotion planning.",
)
parser.add_argument(
"--no-loop", action="store_true", help="With --motion-file, play once instead of looping"
)
args = parser.parse_args()
# Surface native crashes (onnxruntime / mujoco) with a real traceback, and
# avoid losing buffered diagnostics if the process dies mid-loop.
faulthandler.enable()
with contextlib.suppress(Exception):
sys.stdout.reconfigure(line_buffering=True)
print("=" * 60)
print("SONIC planner - full mode control")
print(" N/P cycle sets | 1-8 select mode | WASD move")
print(" Q/E rotate | 9/0 speed | -/= height")
print(" R replan | Space IDLE | Esc quit")
if args.ip:
print(f" Robot IP: {args.ip}")
else:
print(" Mode: simulation")
print("=" * 60 + "\n")
cfg = UnitreeG1Config(controller=None) # full-body SONIC; standalone loop owns publish
if args.ip:
cfg.is_simulation = False
cfg.robot_ip = args.ip
else:
cfg.is_simulation = True
if args.headless:
print("[Note] --headless ignored: sim uses stock UnitreeG1 + hub env")
robot = UnitreeG1(cfg)
robot.connect()
kp, kd = compute_kp_kd()
robot.kp = kp.copy()
robot.kd = kd.copy()
runtime = SonicRuntime(force_cpu=args.cpu)
controller = runtime.controller
ms = runtime.ms
motion = None
if args.motion_file:
motion = SmplMotion(args.motion_file, loop=not args.no_loop)
controller.smpl_motion = motion # lets 'M' key toggle playback
controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation
dur = motion.num_frames / motion.fps
print(f"\n[Motion] SMPL whole-body playback: {args.motion_file}")
print(
f" frames={motion.num_frames} fps={motion.fps:.1f} "
f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2"
)
print(" Press 'M' to toggle SMPL playback <-> locomotion at runtime.")
runtime.controller.print_input_diagnostics()
print(f"\nStarting: {MOTION_SETS[0][0]} (default mode: {LM(ms.mode).name})")
[print(f" {i + 1}: {m.name}") for i, m in enumerate(MOTION_SETS[0][1])]
print(
"\n[Ready] Click THIS terminal, then W/A/S/D to move. 1-6 change mode, 9/0 speed, Esc quit.\n",
flush=True,
)
# Sim hub publishes wireless_remote bytes that can fight terminal WASD.
base_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None)
with RawKeyboard() as kb:
try:
gc.disable()
gc_timer = 0.0
robot.reset(CONTROL_DT, DEFAULT_ANGLES)
time.sleep(1.0)
last_status = time.time() - 2.1
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
slow_n = blend_n = 0
stall_src = ""
did_blend = False
t_start = time.time()
log_path = os.path.join(tempfile.gettempdir(), "sonic_pose_log.csv")
jnames = [m.name for m in G1_29_JointIndex]
log_ctx = open(log_path, "w") if args.log_csv else None # noqa: SIM115
if log_ctx:
log_ctx.write(
"t,step,cursor,ts,blend,mode,"
+ ",".join(f"q{i}" for i in range(29))
+ ","
+ ",".join(f"ref{i}" for i in range(29))
+ ","
+ ",".join(f"act{i}" for i in range(29))
+ ",delta_max,action_norm,token_norm\n"
)
try:
while not robot._shutdown_event.is_set():
t0 = time.time()
if drain_keyboard(kb, ms, controller):
break
obs = robot.get_observation()
t_obs = time.time()
obs_t.append(1000 * (t_obs - t0))
if not obs:
runtime.tick({}, use_joystick=False)
time.sleep(max(0.0, CONTROL_DT - (time.time() - t0)))
continue
# SMPL playback only while in whole-body mode; 'M' toggles it.
motion_active = motion is not None and controller.encode_mode == 2
if motion_active:
controller.smpl_joints_10frame_step1 = motion.step()
if motion.done:
print("\n[Motion] clip finished")
break
step_before = runtime.step
t_step = time.time()
action = runtime.tick(obs, use_joystick=base_joystick and not motion_active)
step_ms = 1000 * (time.time() - t_step)
do_enc = step_before % 5 == 0
(enc_t if do_enc else dec_t).append(step_ms)
t_act = time.time()
robot.send_action(action)
act_t.append(1000 * (time.time() - t_act))
if log_ctx and runtime.step % 5 == 0:
t_rel = time.time() - t_start
q_r = np.array([obs.get(f"{n}.q", 0) for n in jnames])
a_v = np.array([action.get(f"{n}.q", 0) for n in jnames])
cur, ts = controller.ref_cursor, controller.motion_timesteps
q_ref = (
controller.motion_joint_positions[min(cur, ts - 1)] if ts > 0 else np.zeros(29)
)
log_ctx.write(
f"{t_rel:.4f},{runtime.step},{cur},{ts},{int(did_blend)},{ms.mode},"
+ ",".join(f"{v:.6f}" for v in q_r)
+ ","
+ ",".join(f"{v:.6f}" for v in q_ref)
+ ","
+ ",".join(f"{v:.6f}" for v in a_v)
+ ","
+ f"{np.max(np.abs(a_v - q_r)):.6f},"
f"{np.linalg.norm(a_v):.6f},"
f"{np.linalg.norm(controller.token):.6f}\n"
)
did_blend = False
now = time.time()
loop_ms = 1000 * (now - t0)
if loop_ms > 50:
stall_src = (
f"[STALL] {loop_ms:.0f}ms: "
f"obs={obs_t[-1]:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}"
)
if loop_ms > CONTROL_DT * 1500:
slow_n += 1
if now - last_status > 2.0:
def _avg(lst):
return sum(lst) / len(lst) if lst else 0
hz = 1000 / _avg(loop_t) if _avg(loop_t) else 0
print(
f"\r {ms.status_line()} step={runtime.step} "
f"ref={controller.ref_cursor}/{controller.motion_timesteps} "
f"loop={_avg(loop_t):.1f}ms(max={max(loop_t, default=0):.1f}) hz={hz:.0f} "
f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} "
f"slow={slow_n} blends={blend_n}",
end="",
flush=True,
)
if stall_src:
print(f"\n {stall_src}")
stall_src = ""
last_status = now
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
slow_n = blend_n = 0
gc_timer += CONTROL_DT
if gc_timer >= 10.0:
gc.collect()
gc_timer = 0.0
loop_t.append(loop_ms)
time.sleep(max(0.0, CONTROL_DT - (time.time() - t0)))
finally:
if log_ctx:
log_ctx.close()
except KeyboardInterrupt:
pass
finally:
gc.enable()
if args.log_csv:
print(f"\n[Log] Saved to {log_path}")
runtime.shutdown()
print("\nStopping...")
if robot.is_connected:
robot.disconnect()
print("Done.")
if __name__ == "__main__":
main()
@@ -68,6 +68,6 @@ class UnitreeG1Config(RobotConfig):
# Compensates for gravity on the unitree's arms using the arm ik solver
gravity_compensation: bool = False
# Lower-body controller class name, e.g. "GrootLocomotionController" or
# "HolosomaLocomotionController". None disables it.
# Locomotion controller class name, e.g. "GrootLocomotionController",
# "HolosomaLocomotionController", or "SonicWholeBodyController". None disables it.
controller: str | None = None
@@ -0,0 +1,25 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unitree G1 locomotion controllers (Groot, Holosoma, SONIC, mjlab motion imitation)."""
__all__ = [
"GrootLocomotionController",
"HolosomaLocomotionController",
"SonicWholeBodyController",
"SonicRuntime",
"MjlabMotionImitationController",
]
@@ -21,7 +21,7 @@ import numpy as np
import onnxruntime as ort
from huggingface_hub import hf_hub_download
from .g1_utils import (
from lerobot.robots.unitree_g1.g1_utils import (
REMOTE_AXES,
REMOTE_BUTTONS,
G1_29_JointIndex,
@@ -22,7 +22,7 @@ import onnx
import onnxruntime as ort
from huggingface_hub import hf_hub_download
from .g1_utils import (
from lerobot.robots.unitree_g1.g1_utils import (
REMOTE_AXES,
G1_29_JointArmIndex,
G1_29_JointIndex,
@@ -0,0 +1,296 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""mjlab BeyondMimic motion-imitation controller for the Unitree G1.
Deploys any mjlab BeyondMimic motion-tracking policy (trained without state
estimation and exported to ONNX) together with its reference clip. The pipeline
is motion-agnostic: point it at a different ``(onnx, motion.npz)`` pair and it
will track that clip. The bundled default is the "double spin kick".
The exported policy is a pure function ``actions = policy(obs)`` of a 154-dim
observation. The observation terms (in concatenation order) are::
command (58) ref joint_pos[t] (29) + ref joint_vel[t] (29)
motion_anchor_ori_b (6) first two columns of R(q_robot_torso^-1 * q_ref_torso[t])
base_ang_vel (3) pelvis IMU angular velocity (body frame)
joint_pos (29) q - default_q
joint_vel (29) dq
actions (29) previous raw policy output
The reference trajectory (``command`` + reference torso quaternion) is read from
``motion.npz``. Joint order matches ``G1_29_JointIndex`` exactly, so no remapping
is needed. Per-joint action scale and PD gains are read from the ONNX metadata so
the controller always stays consistent with the exported policy.
Note: the policy's anchor body is ``torso_link`` while ``base_ang_vel`` comes from
the pelvis IMU (mjlab's sensors live on ``imu_in_pelvis``). This sim publishes the
pelvis as ``imu_state``, so the torso orientation used for ``motion_anchor_ori_b``
is reconstructed from the pelvis quaternion + the three waist joints.
Override the deployed policy/clip without code changes via env vars:
MJLAB_ONNX_PATH=/path/to/policy.onnx
MJLAB_MOTION_PATH=/path/to/motion.npz
"""
import logging
import os
import numpy as np
import onnxruntime as ort
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
logger = logging.getLogger(__name__)
_HERE = os.path.dirname(os.path.abspath(__file__))
# Policies + reference clips live in examples/unitree_g1/motions/.
_MOTIONS_DIR = os.path.normpath(
os.path.join(_HERE, "..", "..", "..", "..", "..", "examples", "unitree_g1", "motions")
)
# Default bundled policy + clip (the double spin kick). Override with env vars or
# the onnx_path / motion_path constructor args to deploy any other mjlab policy.
DEFAULT_ONNX_PATH = os.environ.get(
"MJLAB_ONNX_PATH", os.path.join(_MOTIONS_DIR, "2026-06-26_14-03-33.onnx")
)
DEFAULT_MOTION_PATH = os.environ.get("MJLAB_MOTION_PATH", os.path.join(_MOTIONS_DIR, "motion.npz"))
CONTROL_DT = 0.02 # 50 Hz, matches mjlab decimation=4 * timestep=0.005
# Index of ``torso_link`` in the full 30-body mjlab G1 body ordering (the layout
# stored in motion.npz body_*_w arrays).
TORSO_BODY_INDEX = 15
# Hold the first motion frame for a short settle window before playing the clip,
# so the robot can reach the start pose before the motion begins.
START_HOLD_STEPS = 50
def _parse_floats(meta: dict, key: str) -> np.ndarray:
return np.array([float(x) for x in meta[key].split(",")], dtype=np.float32)
def _quat_inv(q: np.ndarray) -> np.ndarray:
"""Inverse of a unit quaternion (w, x, y, z)."""
q = q / (np.linalg.norm(q) + 1e-8)
return np.array([q[0], -q[1], -q[2], -q[3]], dtype=np.float32)
def _yaw_quat(q: np.ndarray) -> np.ndarray:
"""Yaw-only component of a quaternion (w, x, y, z)."""
w, x, y, z = q
yaw = np.arctan2(2.0 * (w * z + x * y), 1.0 - 2.0 * (y * y + z * z))
return np.array([np.cos(yaw / 2.0), 0.0, 0.0, np.sin(yaw / 2.0)], dtype=np.float32)
def _quat_mul(a: np.ndarray, b: np.ndarray) -> np.ndarray:
w1, x1, y1, z1 = a
w2, x2, y2, z2 = b
return np.array(
[
w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2,
w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2,
w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2,
w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2,
],
dtype=np.float32,
)
def _pelvis_to_torso_quat(
pelvis_quat: np.ndarray, waist_yaw: float, waist_roll: float, waist_pitch: float
) -> np.ndarray:
"""Torso-link world orientation from the pelvis quaternion + waist joints.
The kinematic chain is pelvis -> waist_yaw(z) -> waist_roll(x) ->
torso_link/waist_pitch(y), and every intermediate body frame is identity, so::
q_torso = q_pelvis (x) Rz(waist_yaw) (x) Rx(waist_roll) (x) Ry(waist_pitch)
"""
hy, hr, hp = waist_yaw / 2.0, waist_roll / 2.0, waist_pitch / 2.0
qz = np.array([np.cos(hy), 0.0, 0.0, np.sin(hy)], dtype=np.float32)
qx = np.array([np.cos(hr), np.sin(hr), 0.0, 0.0], dtype=np.float32)
qy = np.array([np.cos(hp), 0.0, np.sin(hp), 0.0], dtype=np.float32)
return _quat_mul(_quat_mul(_quat_mul(pelvis_quat, qz), qx), qy)
def _ori_6d(q_rel: np.ndarray) -> np.ndarray:
"""First two columns of the rotation matrix of ``q_rel`` (w, x, y, z).
Matches mjlab ``matrix_from_quat(...)[..., :2].reshape(-1)``: row-major
flatten of the first two columns -> [R00, R01, R10, R11, R20, R21].
"""
q = q_rel / (np.linalg.norm(q_rel) + 1e-8)
w, x, y, z = q
two = 2.0
return np.array(
[
1.0 - two * (y * y + z * z), # R00
two * (x * y - z * w), # R01
two * (x * y + z * w), # R10
1.0 - two * (x * x + z * z), # R11
two * (x * z - y * w), # R20
two * (y * z + x * w), # R21
],
dtype=np.float32,
)
class MjlabMotionImitationController:
"""Full-body mjlab BeyondMimic motion-imitation controller for UnitreeG1.
Motion-agnostic: pass any exported ``(onnx_path, motion_path)`` pair (or set
the ``MJLAB_ONNX_PATH`` / ``MJLAB_MOTION_PATH`` env vars) to deploy a policy
trained on a different reference clip.
"""
control_dt = CONTROL_DT
full_body = True
def __init__(
self,
onnx_path: str = DEFAULT_ONNX_PATH,
motion_path: str = DEFAULT_MOTION_PATH,
imu_is_pelvis: bool = True,
):
logger.info("Loading mjlab motion-imitation controller (%s)...", os.path.basename(onnx_path))
if not os.path.exists(onnx_path):
raise FileNotFoundError(f"mjlab ONNX policy not found: {onnx_path}")
if not os.path.exists(motion_path):
raise FileNotFoundError(f"mjlab motion file not found: {motion_path}")
so = ort.SessionOptions()
so.log_severity_level = 3
self.session = ort.InferenceSession(onnx_path, sess_options=so, providers=["CPUExecutionProvider"])
meta = dict(self.session.get_modelmeta().custom_metadata_map)
self.default_q = _parse_floats(meta, "default_joint_pos")
self.action_scale = _parse_floats(meta, "action_scale")
self.kp = _parse_floats(meta, "joint_stiffness")
self.kd = _parse_floats(meta, "joint_damping")
if not (len(self.default_q) == len(self.action_scale) == len(self.kp) == len(self.kd) == 29):
raise ValueError("mjlab policy metadata must define 29 values per joint array")
# ONNX input names: obs (float32 [1, 154]) + time_step ([1, 1]).
self._inputs = {i.name: i for i in self.session.get_inputs()}
self._obs_name = next(n for n in self._inputs if n != "time_step")
self._ts_name = "time_step" if "time_step" in self._inputs else None
self._ts_dtype = (
np.int64 if (self._ts_name and "int" in self._inputs[self._ts_name].type) else np.float32
)
motion = np.load(motion_path)
self.ref_joint_pos = motion["joint_pos"].astype(np.float32) # (T, 29)
self.ref_joint_vel = motion["joint_vel"].astype(np.float32) # (T, 29)
self.ref_torso_quat = motion["body_quat_w"][:, TORSO_BODY_INDEX, :].astype(np.float32) # (T, 4)
self.n_frames = int(self.ref_joint_pos.shape[0])
# The MuJoCo sim publishes the PELVIS (floating base) as imu_state, but the
# policy's anchor body is torso_link. When True, reconstruct the torso
# orientation from the pelvis quat + waist joints. On hardware where the IMU
# already reports the torso, set this False.
self.imu_is_pelvis = imu_is_pelvis
self.step = 0
self.last_action = np.zeros(29, dtype=np.float32)
# Yaw alignment between the reference clip and the robot's actual heading,
# captured lazily on the first run_step after a reset.
self._align_quat: np.ndarray | None = None
logger.info("mjlab motion-imitation ready: %d frames @ %dHz", self.n_frames, int(1.0 / CONTROL_DT))
def reset(self) -> None:
self.step = 0
self.last_action[:] = 0.0
self._align_quat = None
def _motion_index(self) -> int:
"""Hold frame 0 during the settle window, then advance and clamp at the end."""
idx = self.step - START_HOLD_STEPS
if idx < 0:
idx = 0
return int(min(idx, self.n_frames - 1))
def run_step(self, action: dict, lowstate) -> dict:
if lowstate is None:
return {}
t = self._motion_index()
# Robot joint state (native G1_29 order == policy order).
q = np.empty(29, dtype=np.float32)
dq = np.empty(29, dtype=np.float32)
for motor in G1_29_JointIndex:
i = motor.value
q[i] = lowstate.motor_state[i].q
dq[i] = lowstate.motor_state[i].dq
# base_ang_vel is the pelvis gyro (mjlab's IMU sensors live on imu_in_pelvis),
# so the raw gyro is already correct. The anchor orientation, however, is the
# torso_link (anchor_body); reconstruct it from the pelvis quat + waist joints
# since imu_state reports the pelvis in this sim.
pelvis_quat = np.array(lowstate.imu_state.quaternion, dtype=np.float32) # (w, x, y, z)
gyro = np.array(lowstate.imu_state.gyroscope, dtype=np.float32)
if self.imu_is_pelvis:
quat = _pelvis_to_torso_quat(
pelvis_quat,
float(q[G1_29_JointIndex.kWaistYaw.value]),
float(q[G1_29_JointIndex.kWaistRoll.value]),
float(q[G1_29_JointIndex.kWaistPitch.value]),
)
else:
quat = pelvis_quat
# Heading-align the reference clip to the robot's actual yaw at start.
# The anchor_ori_b term is NOT yaw-invariant, so a clip that starts at a
# nonzero world yaw would make the policy see a huge heading error at t=0
# and spin. Yaw-only, so true roll/pitch tracking is preserved.
if self._align_quat is None:
self._align_quat = _quat_mul(_yaw_quat(quat), _quat_inv(_yaw_quat(self.ref_torso_quat[0])))
ref_torso = _quat_mul(self._align_quat, self.ref_torso_quat[t])
# motion_anchor_ori_b = first-2-cols of R(q_robot_torso^-1 * q_ref_torso[t]).
q_rel = _quat_mul(_quat_inv(quat), ref_torso)
anchor_ori_b = _ori_6d(q_rel)
obs = np.concatenate(
[
self.ref_joint_pos[t], # command: ref joint pos (29)
self.ref_joint_vel[t], # command: ref joint vel (29)
anchor_ori_b, # motion_anchor_ori_b (6)
gyro, # base_ang_vel (3)
q - self.default_q, # joint_pos (29)
dq, # joint_vel (29)
self.last_action, # actions (29)
]
).astype(np.float32)
feeds: dict[str, np.ndarray] = {self._obs_name: obs[None, :]}
if self._ts_name is not None:
feeds[self._ts_name] = np.array([[t]], dtype=self._ts_dtype)
raw_action = self.session.run(["actions"], feeds)[0].reshape(-1).astype(np.float32)
self.last_action = raw_action
target_q = self.default_q + raw_action * self.action_scale
# Advance the motion cursor: hold frame 0 for the settle window, play the
# clip, then clamp at the final frame (see _motion_index).
self.step += 1
return {f"{motor.name}.q": float(target_q[motor.value]) for motor in G1_29_JointIndex}
def shutdown(self) -> None:
pass
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,151 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SONIC full-body controller for Unitree G1."""
import logging
import onnxruntime as ort
from huggingface_hub import hf_hub_download
from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
CONTROL_DT,
DEBUG_PRINT_EVERY,
DEFAULT_ANGLES,
ENCODER_UPDATE_EVERY,
LM,
MOTION_SETS,
MovementState,
PlannerController,
SonicPlanner,
_ort_providers,
_snapshot_ms,
clamp_mode_params,
compute_kp_kd,
lowstate_to_obs,
process_joystick,
should_replan_request,
)
logger = logging.getLogger(__name__)
class SonicRuntime:
"""Shared SONIC control loop state (standalone demo + locomotion controller)."""
def __init__(self, force_cpu: bool = False):
planner_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="planner_sonic.onnx")
encoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_encoder.onnx")
decoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_decoder.onnx")
providers = _ort_providers(force_cpu=force_cpu)
self.use_gpu = providers[0] == "CUDAExecutionProvider"
so = ort.SessionOptions()
so.log_severity_level = 3
planner_sess = ort.InferenceSession(planner_path, sess_options=so, providers=providers)
encoder_sess = ort.InferenceSession(encoder_path, sess_options=so, providers=providers)
decoder_sess = ort.InferenceSession(decoder_path, sess_options=so, providers=providers)
self.kp, self.kd = compute_kp_kd()
self.ms = MovementState()
self.planner = SonicPlanner(planner_sess, planner_path)
self.controller = PlannerController(self.planner, encoder_sess, decoder_sess)
motion = self.planner.initialize(DEFAULT_ANGLES, self.ms)
self.controller.load_initial_motion(motion)
self.planner.start_subprocess(self.controller, use_gpu=self.use_gpu)
self.step = 0
self.replan_timer = 0.0
self.last_ms = _snapshot_ms(self.ms)
@property
def pipeline(self):
return self.controller
def tick(self, obs: dict, *, debug: bool | None = None, use_joystick: bool = True) -> dict:
if not obs:
self.step += 1
return {}
if use_joystick:
process_joystick(obs, self.ms, self.controller)
clamp_mode_params(self.ms)
if self.step > 0:
self.replan_timer += CONTROL_DT
if should_replan_request(self.ms, self.last_ms, self.replan_timer, self.step):
self.planner.request_replan(self.controller.ref_cursor, self.ms)
self.replan_timer = 0.0
self.ms.needs_replan = False
self.last_ms = _snapshot_ms(self.ms)
do_enc = self.step % ENCODER_UPDATE_EVERY == 0
if debug is None:
debug = self.step % DEBUG_PRINT_EVERY == 0
action = self.controller.step(obs, update_encoder=do_enc, debug=debug)
result = self.planner.try_get_new_motion()
if result:
self.controller.blend_new_motion(*result)
self.controller.advance_cursor()
self.step += 1
return action
def reset(self):
self.ms = MovementState()
self.controller.reinit_heading = True
self.controller.playing = True
self.step = 0
self.replan_timer = 0.0
self.last_ms = _snapshot_ms(self.ms)
def shutdown(self):
self.planner.stop_subprocess()
class SonicWholeBodyController:
"""Full-body SONIC controller for UnitreeG1's background controller thread."""
control_dt = CONTROL_DT
full_body = True
def __init__(self, force_cpu: bool = False):
logger.info("Loading SONIC whole-body controller...")
self._runtime = SonicRuntime(force_cpu=force_cpu)
self.kp = self._runtime.kp
self.kd = self._runtime.kd
self.controller = self._runtime.controller
self.ms = self._runtime.ms
logger.info(
"SONIC ready: %s (default mode: %s)",
MOTION_SETS[0][0],
LM(self.ms.mode).name,
)
def run_step(self, action: dict, lowstate) -> dict:
if lowstate is None:
return {}
obs = lowstate_to_obs(lowstate)
return self._runtime.tick(obs, debug=False)
def reset(self):
self._runtime.reset()
def shutdown(self):
self._runtime.shutdown()
+4 -2
View File
@@ -68,8 +68,10 @@ def make_locomotion_controller(name: str | None):
if name is None:
return None
controllers = {
"GrootLocomotionController": "lerobot.robots.unitree_g1.gr00t_locomotion",
"HolosomaLocomotionController": "lerobot.robots.unitree_g1.holosoma_locomotion",
"GrootLocomotionController": "lerobot.robots.unitree_g1.controllers.gr00t_locomotion",
"HolosomaLocomotionController": "lerobot.robots.unitree_g1.controllers.holosoma_locomotion",
"SonicWholeBodyController": "lerobot.robots.unitree_g1.controllers.sonic_whole_body",
"MjlabMotionImitationController": "lerobot.robots.unitree_g1.controllers.mjlab_motion_imitation",
}
module_path = controllers.get(name)
if module_path is None:
+9 -1
View File
@@ -338,6 +338,9 @@ class UnitreeG1(Robot):
self.kp = np.array(self.config.kp, dtype=np.float32)
self.kd = np.array(self.config.kd, dtype=np.float32)
if self.controller is not None and hasattr(self.controller, "kp"):
self.kp = np.array(self.controller.kp, dtype=np.float32)
self.kd = np.array(self.controller.kd, dtype=np.float32)
for joint in G1_29_JointIndex:
self.msg.motor_cmd[joint].mode = 1
@@ -374,6 +377,9 @@ class UnitreeG1(Robot):
# Signal thread to stop and unblock any waits
self._shutdown_event.set()
if self.controller is not None and hasattr(self.controller, "shutdown"):
self.controller.shutdown()
# Wait for subscribe thread to finish
if self.subscribe_thread is not None:
self.subscribe_thread.join(timeout=2.0)
@@ -465,9 +471,11 @@ class UnitreeG1(Robot):
def send_action(self, action: RobotAction) -> RobotAction:
action_to_publish = action
if self.controller is not None:
self._update_controller_action(action)
if getattr(self.controller, "full_body", False):
return action
# Controller thread owns legs/waist. Here we only update joystick inputs
# and publish arm targets from the teleoperator.
self._update_controller_action(action)
arm_prefixes = tuple(j.name for j in G1_29_JointArmIndex)
action_to_publish = {
key: value