Compare commits

...

9 Commits

Author SHA1 Message Date
Martino Russi 3b6de2fdf8 fix(unitree_g1): fix typo flagged by spellchecker in motion_loader docstring 2026-06-26 13:46:33 +02:00
Martino Russi 744f3667c0 fix(unitree_g1): silence bandit findings in SONIC example/pipeline 2026-06-26 13:40:53 +02:00
Martino Russi fdde436776 Merge branch 'main' into feat/unitree_g1_sonic_rebased 2026-06-26 13:35:28 +02:00
Martino Russi 5c683c65c6 Merge branch 'main' into feat/unitree_g1_sonic_rebased 2026-06-25 14:38:48 +02:00
Martino Russi dfbc25c58f fix(unitree_g1): satisfy ruff lint/format and address review comments 2026-06-25 14:37:44 +02:00
Martino Russi 804c76bcc2 Merge branch 'main' into feat/unitree_g1_sonic_rebased 2026-06-25 13:41:04 +02:00
Martino Russi e6afa69be9 add motion loader 2026-06-17 12:31:08 +02:00
Martino Russi 31d1439e29 add custom motion loader 2026-06-17 12:29:36 +02:00
Martino Russi 1c118c6359 feat(unitree_g1): add SONIC whole-body controller
Move GrootLocomotionController and HolosomaLocomotionController into a new
controllers/ subpackage and add the SONIC whole-body controller
(sonic_pipeline.py, sonic_whole_body.py) plus the examples/unitree_g1/sonic.py
standalone script. UnitreeG1 now honors a controller's kp/kd, calls
controller.shutdown() on disconnect, and skips arm publishing for full_body
controllers.
2026-06-16 17:12:20 +02:00
24 changed files with 2045 additions and 7 deletions
+170
View File
@@ -0,0 +1,170 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Load an SMPL motion clip and expose it in SONIC's encoder format.
SONIC's whole-body tracking mode (``encode_mode == 2``) consumes a flat
720-vector ``smpl_joints_10frame_step1`` = 10 consecutive frames x 24 SMPL
joints x 3 (xyz) at 50 Hz.
IMPORTANT - frame convention: the encoder expects each frame's joints with the
body's *root orientation removed* (per-frame canonical), exactly like the live
deploy stream's ``smpl_joints_local`` (see ``process_smpl_joints`` in the GEAR
PICO teleop and ``smpl_joints_multi_future_local`` in training). The reference
``smpl_filtered`` clips instead store **world-frame** joints (heading retained),
so feeding them raw makes the robot move but track poorly / never face-forward.
This loader therefore canonicalizes on load using the clip's per-frame root
orientation (``pose_aa[:, :3]``):
A = Rx(+90deg) * rotvec(pose_aa[:, :3]) # y-up -> z-up root quat
local = base120 * A^-1 * joints # remove root orient
with ``base120 = quat(0.5,0.5,0.5,0.5)`` (SMPL base rotation). This reproduces
the deployed transform (verified: per-frame hip-heading std -> 0).
Clip is read from a numpy ``.npz``. Expected keys:
smpl_joints : (T, 24, 3) float32 -- world-frame joint positions, 50 fps
pose_aa : (T, 72) float32 -- SMPL axis-angle (root = [:, :3])
transl : (T, 3) float32 -- global root translation (optional)
fps : scalar
Example:
python examples/unitree_g1/motion_loader.py \
--motion examples/unitree_g1/motions/walk_forward.npz
"""
import argparse
import numpy as np
WINDOW = 10 # frames per encoder window (smpl_joints_10frame_step1)
N_JOINTS = 24
JOINT_DIM = 3
SMPL_OBS_DIM = WINDOW * N_JOINTS * JOINT_DIM # 720
def canonicalize_smpl_joints(smpl_joints: np.ndarray, root_aa: np.ndarray) -> np.ndarray:
"""Remove per-frame root orientation -> SONIC ``smpl_joints_local`` format.
Args:
smpl_joints: (T, 24, 3) world-frame (z-up) SMPL joint positions.
root_aa: (T, 3) SMPL global-orient axis-angle (y-up convention).
Returns:
(T, 24, 3) per-frame root-orientation-removed joints.
"""
from scipy.spatial.transform import Rotation
rx90 = Rotation.from_euler("x", 90, degrees=True) # smpl_root_ytoz_up
base120 = Rotation.from_quat([0.5, 0.5, 0.5, 0.5]) # remove_smpl_base_rot
a = rx90 * Rotation.from_rotvec(root_aa) # z-up root quat (left-mult)
b_inv = base120 * a.inv() # inv(remove_smpl_base_rot(a))
return np.einsum("tij,tkj->tki", b_inv.as_matrix(), smpl_joints).astype(np.float32)
class SmplMotion:
"""A single SMPL clip with SONIC-format windowing."""
def __init__(self, path: str, loop: bool = True, canonicalize: bool = True):
data = np.load(path)
smpl_joints = data["smpl_joints"].astype(np.float32) # (T, 24, 3)
self.pose_aa = data["pose_aa"].astype(np.float32) if "pose_aa" in data.files else None
self.transl = data["transl"].astype(np.float32) if "transl" in data.files else None
self.fps = float(data["fps"]) if "fps" in data.files else 50.0
self.loop = loop
if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (N_JOINTS, JOINT_DIM):
raise ValueError(f"Expected smpl_joints (T, {N_JOINTS}, {JOINT_DIM}), got {smpl_joints.shape}")
# Reference clips store world-frame joints; the encoder wants per-frame
# root-orientation-removed joints. Canonicalize when we have the root pose.
self.canonicalized = False
if canonicalize and self.pose_aa is not None:
smpl_joints = canonicalize_smpl_joints(smpl_joints, self.pose_aa[:, :3])
self.canonicalized = True
self.smpl_joints = smpl_joints
self.num_frames = self.smpl_joints.shape[0]
self._cursor = 0
def window(self, start: int) -> np.ndarray:
"""Return the 720-vector for the 10-frame window beginning at ``start``.
Frames are laid out oldest->newest, joint-major within a frame:
[f0_j0_xyz, f0_j1_xyz, ..., f9_j23_xyz].
"""
idx = np.arange(start, start + WINDOW)
idx = np.mod(idx, self.num_frames) if self.loop else np.clip(idx, 0, self.num_frames - 1)
return self.smpl_joints[idx].reshape(-1).astype(np.float32)
def reset(self):
self._cursor = 0
def step(self) -> np.ndarray:
"""Advance one frame and return the current 720-vector window."""
w = self.window(self._cursor)
self._cursor += 1
if self.loop:
self._cursor %= self.num_frames
return w
@property
def done(self) -> bool:
return (not self.loop) and (self._cursor + WINDOW >= self.num_frames)
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--motion", required=True, help="Path to motion .npz")
parser.add_argument("--no-loop", action="store_true")
parser.add_argument(
"--no-canon", action="store_true", help="Skip canonicalization (feed raw stored joints)"
)
args = parser.parse_args()
m = SmplMotion(args.motion, loop=not args.no_loop, canonicalize=not args.no_canon)
duration = m.num_frames / m.fps
print(f"Loaded '{args.motion}'")
print(f" frames={m.num_frames} fps={m.fps:.1f} duration={duration:.1f}s")
print(
f" smpl_joints={m.smpl_joints.shape} canonicalized={m.canonicalized} "
f"pose_aa={None if m.pose_aa is None else m.pose_aa.shape} "
f"transl={None if m.transl is None else m.transl.shape}"
)
# Sanity: after canonicalization the per-frame body heading should be fixed.
j = m.smpl_joints
v = j[:, 2, :2] - j[:, 1, :2] # R_hip - L_hip, horizontal
a = np.arctan2(v[:, 1], v[:, 0])
rlen = np.clip(np.hypot(np.cos(a).mean(), np.sin(a).mean()), 1e-9, 1.0)
circ_std = np.degrees(np.sqrt(-2 * np.log(rlen)))
print(f" hip-heading circ-std={circ_std:.1f} deg (~0 => orientation removed; large => world-frame)")
w0 = m.window(0)
print(f" window(0): shape={w0.shape} (expected {SMPL_OBS_DIM}) min={w0.min():.3f} max={w0.max():.3f}")
assert w0.shape == (SMPL_OBS_DIM,), "window must be 720-dim for obs[922:1642]"
# Simulate a few control ticks.
print(" stepping 5 ticks:")
for t in range(5):
w = m.step()
print(f" t={t} cursor={m._cursor} window_norm={np.linalg.norm(w):.2f}")
print("OK: motion loads and yields SONIC-format 720-vec windows.")
if __name__ == "__main__":
main()
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+88
View File
@@ -0,0 +1,88 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Convert a GEAR-SONIC / BONES-SEED ``smpl_filtered`` clip (.pkl) to .npz.
The reference clips are zlib-compressed joblib pickles holding a dict with
``pose_aa`` (T, 72), ``transl`` (T, 3), ``smpl_joints`` (T, 24, 3), ``fps``.
``motion_loader.SmplMotion`` consumes the .npz form so the runtime needs no
joblib dependency. Canonicalization (root-orientation removal) happens at load
time in ``motion_loader``, so this converter just repackages the raw arrays.
Run this in an environment that has ``joblib`` (e.g. the sonic teleop venv):
python examples/unitree_g1/pkl_to_npz.py \
--pkl sample_data/smpl_filtered/walk_forward_amateur_001__A001.pkl \
--out examples/unitree_g1/motions/walk_forward.npz
"""
import argparse
from pathlib import Path
import numpy as np
def load_pkl(path: str) -> dict:
try:
import joblib
return joblib.load(path)
except Exception:
# joblib clips are zlib-compressed pickles; fall back to manual inflate.
import contextlib
import pickle # nosec B403 - loads trusted local SMPL clips authored by the user
import zlib
with open(path, "rb") as f:
raw = f.read()
with contextlib.suppress(zlib.error):
raw = zlib.decompress(raw)
return pickle.loads(raw) # nosec B301 - local, user-provided motion files only
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--pkl", required=True, help="Input smpl_filtered .pkl")
parser.add_argument("--out", required=True, help="Output .npz path")
args = parser.parse_args()
d = load_pkl(args.pkl)
if not isinstance(d, dict) or "smpl_joints" not in d:
raise ValueError(f"Unexpected pkl structure; keys={list(d) if isinstance(d, dict) else type(d)}")
smpl_joints = np.asarray(d["smpl_joints"], np.float32)
if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (24, 3):
raise ValueError(f"smpl_joints must be (T,24,3), got {smpl_joints.shape}")
out = {"smpl_joints": smpl_joints, "fps": np.float32(d.get("fps", 50.0))}
if "pose_aa" in d:
out["pose_aa"] = np.asarray(d["pose_aa"], np.float32)
else:
print("[warn] no pose_aa -> loader cannot canonicalize (will feed raw)")
if "transl" in d:
out["transl"] = np.asarray(d["transl"], np.float32)
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(args.out, **out)
dur = smpl_joints.shape[0] / float(out["fps"])
print(f"Wrote {args.out}")
print(
f" frames={smpl_joints.shape[0]} fps={float(out['fps']):.1f} duration={dur:.1f}s keys={sorted(out)}"
)
if __name__ == "__main__":
main()
+294
View File
@@ -0,0 +1,294 @@
#!/usr/bin/env python
"""
SONIC planner with full mode control.
Keyboard controls:
N / P - next / previous motion set
1-8 - select mode within current set
WASD - movement direction
Q / E - rotate facing left / right
9 / 0 - decrease / increase speed
- / = - decrease / increase height
R - force replan
M - toggle SMPL motion playback <-> locomotion (needs --motion-file)
Space - emergency stop -> IDLE
Esc - quit
Gamepad controls (Unitree wireless controller):
Left stick Y - speed (forward = fast, back = stop)
Left stick X - movement direction (offset from facing)
Right stick X - facing direction (incremental rotation)
Right stick Y - height (up = tall 0.8m, down = low 0.1m)
Buttons - unused (mode selection is keyboard-only)
For teleop integration use --robot.controller=SonicWholeBodyController instead.
"""
import argparse
import contextlib
import faulthandler
import gc
import os
import sys
import tempfile
import time
import numpy as np
from motion_loader import SmplMotion
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
CONTROL_DT,
DEFAULT_ANGLES,
LM,
MOTION_SETS,
RawKeyboard,
compute_kp_kd,
drain_keyboard,
)
from lerobot.robots.unitree_g1.controllers.sonic_whole_body import SonicRuntime
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
def main():
parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control")
parser.add_argument(
"--ip",
type=str,
default=None,
help="Robot IP for real hardware (e.g. 192.168.123.164). Omit for simulation.",
)
parser.add_argument(
"--log-csv",
action="store_true",
help="Write /tmp/sonic_pose_log.csv (disabled by default for teleop perf)",
)
parser.add_argument(
"--cpu",
action="store_true",
help="Force CPU ONNX Runtime (skip CUDA even if onnxruntime-gpu is installed)",
)
parser.add_argument(
"--headless", action="store_true", help="Ignored for sim (stock UnitreeG1 uses hub MuJoCo defaults)"
)
parser.add_argument(
"--gamepad",
action="store_true",
help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)",
)
parser.add_argument(
"--keyboard-only", action="store_true", help="Ignore wireless gamepad (terminal keyboard only)"
)
parser.add_argument(
"--motion-file",
type=str,
default=None,
help="Play an SMPL motion clip (.npz) via SONIC whole-body mode "
"(encode_mode=2) instead of locomotion planning.",
)
parser.add_argument(
"--no-loop", action="store_true", help="With --motion-file, play once instead of looping"
)
args = parser.parse_args()
# Surface native crashes (onnxruntime / mujoco) with a real traceback, and
# avoid losing buffered diagnostics if the process dies mid-loop.
faulthandler.enable()
with contextlib.suppress(Exception):
sys.stdout.reconfigure(line_buffering=True)
print("=" * 60)
print("SONIC planner - full mode control")
print(" N/P cycle sets | 1-8 select mode | WASD move")
print(" Q/E rotate | 9/0 speed | -/= height")
print(" R replan | Space IDLE | Esc quit")
if args.ip:
print(f" Robot IP: {args.ip}")
else:
print(" Mode: simulation")
print("=" * 60 + "\n")
cfg = UnitreeG1Config(controller=None) # full-body SONIC; standalone loop owns publish
if args.ip:
cfg.is_simulation = False
cfg.robot_ip = args.ip
else:
cfg.is_simulation = True
if args.headless:
print("[Note] --headless ignored: sim uses stock UnitreeG1 + hub env")
robot = UnitreeG1(cfg)
robot.connect()
kp, kd = compute_kp_kd()
robot.kp = kp.copy()
robot.kd = kd.copy()
runtime = SonicRuntime(force_cpu=args.cpu)
controller = runtime.controller
ms = runtime.ms
motion = None
if args.motion_file:
motion = SmplMotion(args.motion_file, loop=not args.no_loop)
controller.smpl_motion = motion # lets 'M' key toggle playback
controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation
dur = motion.num_frames / motion.fps
print(f"\n[Motion] SMPL whole-body playback: {args.motion_file}")
print(
f" frames={motion.num_frames} fps={motion.fps:.1f} "
f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2"
)
print(" Press 'M' to toggle SMPL playback <-> locomotion at runtime.")
runtime.controller.print_input_diagnostics()
print(f"\nStarting: {MOTION_SETS[0][0]} (default mode: {LM(ms.mode).name})")
[print(f" {i + 1}: {m.name}") for i, m in enumerate(MOTION_SETS[0][1])]
print(
"\n[Ready] Click THIS terminal, then W/A/S/D to move. 1-6 change mode, 9/0 speed, Esc quit.\n",
flush=True,
)
# Sim hub publishes wireless_remote bytes that can fight terminal WASD.
base_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None)
with RawKeyboard() as kb:
try:
gc.disable()
gc_timer = 0.0
robot.reset(CONTROL_DT, DEFAULT_ANGLES)
time.sleep(1.0)
last_status = time.time() - 2.1
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
slow_n = blend_n = 0
stall_src = ""
did_blend = False
t_start = time.time()
log_path = os.path.join(tempfile.gettempdir(), "sonic_pose_log.csv")
jnames = [m.name for m in G1_29_JointIndex]
log_ctx = open(log_path, "w") if args.log_csv else None # noqa: SIM115
if log_ctx:
log_ctx.write(
"t,step,cursor,ts,blend,mode,"
+ ",".join(f"q{i}" for i in range(29))
+ ","
+ ",".join(f"ref{i}" for i in range(29))
+ ","
+ ",".join(f"act{i}" for i in range(29))
+ ",delta_max,action_norm,token_norm\n"
)
try:
while not robot._shutdown_event.is_set():
t0 = time.time()
if drain_keyboard(kb, ms, controller):
break
obs = robot.get_observation()
t_obs = time.time()
obs_t.append(1000 * (t_obs - t0))
if not obs:
runtime.tick({}, use_joystick=False)
time.sleep(max(0.0, CONTROL_DT - (time.time() - t0)))
continue
# SMPL playback only while in whole-body mode; 'M' toggles it.
motion_active = motion is not None and controller.encode_mode == 2
if motion_active:
controller.smpl_joints_10frame_step1 = motion.step()
if motion.done:
print("\n[Motion] clip finished")
break
step_before = runtime.step
t_step = time.time()
action = runtime.tick(obs, use_joystick=base_joystick and not motion_active)
step_ms = 1000 * (time.time() - t_step)
do_enc = step_before % 5 == 0
(enc_t if do_enc else dec_t).append(step_ms)
t_act = time.time()
robot.send_action(action)
act_t.append(1000 * (time.time() - t_act))
if log_ctx and runtime.step % 5 == 0:
t_rel = time.time() - t_start
q_r = np.array([obs.get(f"{n}.q", 0) for n in jnames])
a_v = np.array([action.get(f"{n}.q", 0) for n in jnames])
cur, ts = controller.ref_cursor, controller.motion_timesteps
q_ref = (
controller.motion_joint_positions[min(cur, ts - 1)] if ts > 0 else np.zeros(29)
)
log_ctx.write(
f"{t_rel:.4f},{runtime.step},{cur},{ts},{int(did_blend)},{ms.mode},"
+ ",".join(f"{v:.6f}" for v in q_r)
+ ","
+ ",".join(f"{v:.6f}" for v in q_ref)
+ ","
+ ",".join(f"{v:.6f}" for v in a_v)
+ ","
+ f"{np.max(np.abs(a_v - q_r)):.6f},"
f"{np.linalg.norm(a_v):.6f},"
f"{np.linalg.norm(controller.token):.6f}\n"
)
did_blend = False
now = time.time()
loop_ms = 1000 * (now - t0)
if loop_ms > 50:
stall_src = (
f"[STALL] {loop_ms:.0f}ms: "
f"obs={obs_t[-1]:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}"
)
if loop_ms > CONTROL_DT * 1500:
slow_n += 1
if now - last_status > 2.0:
def _avg(lst):
return sum(lst) / len(lst) if lst else 0
hz = 1000 / _avg(loop_t) if _avg(loop_t) else 0
print(
f"\r {ms.status_line()} step={runtime.step} "
f"ref={controller.ref_cursor}/{controller.motion_timesteps} "
f"loop={_avg(loop_t):.1f}ms(max={max(loop_t, default=0):.1f}) hz={hz:.0f} "
f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} "
f"slow={slow_n} blends={blend_n}",
end="",
flush=True,
)
if stall_src:
print(f"\n {stall_src}")
stall_src = ""
last_status = now
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
slow_n = blend_n = 0
gc_timer += CONTROL_DT
if gc_timer >= 10.0:
gc.collect()
gc_timer = 0.0
loop_t.append(loop_ms)
time.sleep(max(0.0, CONTROL_DT - (time.time() - t0)))
finally:
if log_ctx:
log_ctx.close()
except KeyboardInterrupt:
pass
finally:
gc.enable()
if args.log_csv:
print(f"\n[Log] Saved to {log_path}")
runtime.shutdown()
print("\nStopping...")
if robot.is_connected:
robot.disconnect()
print("Done.")
if __name__ == "__main__":
main()
@@ -68,6 +68,6 @@ class UnitreeG1Config(RobotConfig):
# Compensates for gravity on the unitree's arms using the arm ik solver
gravity_compensation: bool = False
# Lower-body controller class name, e.g. "GrootLocomotionController" or
# "HolosomaLocomotionController". None disables it.
# Locomotion controller class name, e.g. "GrootLocomotionController",
# "HolosomaLocomotionController", or "SonicWholeBodyController". None disables it.
controller: str | None = None
@@ -0,0 +1,24 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unitree G1 locomotion controllers (Groot, Holosoma, SONIC)."""
__all__ = [
"GrootLocomotionController",
"HolosomaLocomotionController",
"SonicWholeBodyController",
"SonicRuntime",
]
@@ -21,7 +21,7 @@ import numpy as np
import onnxruntime as ort
from huggingface_hub import hf_hub_download
from .g1_utils import (
from lerobot.robots.unitree_g1.g1_utils import (
REMOTE_AXES,
REMOTE_BUTTONS,
G1_29_JointIndex,
@@ -22,7 +22,7 @@ import onnx
import onnxruntime as ort
from huggingface_hub import hf_hub_download
from .g1_utils import (
from lerobot.robots.unitree_g1.g1_utils import (
REMOTE_AXES,
G1_29_JointArmIndex,
G1_29_JointIndex,
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,151 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SONIC full-body controller for Unitree G1."""
import logging
import onnxruntime as ort
from huggingface_hub import hf_hub_download
from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
CONTROL_DT,
DEBUG_PRINT_EVERY,
DEFAULT_ANGLES,
ENCODER_UPDATE_EVERY,
LM,
MOTION_SETS,
MovementState,
PlannerController,
SonicPlanner,
_ort_providers,
_snapshot_ms,
clamp_mode_params,
compute_kp_kd,
lowstate_to_obs,
process_joystick,
should_replan_request,
)
logger = logging.getLogger(__name__)
class SonicRuntime:
"""Shared SONIC control loop state (standalone demo + locomotion controller)."""
def __init__(self, force_cpu: bool = False):
planner_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="planner_sonic.onnx")
encoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_encoder.onnx")
decoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_decoder.onnx")
providers = _ort_providers(force_cpu=force_cpu)
self.use_gpu = providers[0] == "CUDAExecutionProvider"
so = ort.SessionOptions()
so.log_severity_level = 3
planner_sess = ort.InferenceSession(planner_path, sess_options=so, providers=providers)
encoder_sess = ort.InferenceSession(encoder_path, sess_options=so, providers=providers)
decoder_sess = ort.InferenceSession(decoder_path, sess_options=so, providers=providers)
self.kp, self.kd = compute_kp_kd()
self.ms = MovementState()
self.planner = SonicPlanner(planner_sess, planner_path)
self.controller = PlannerController(self.planner, encoder_sess, decoder_sess)
motion = self.planner.initialize(DEFAULT_ANGLES, self.ms)
self.controller.load_initial_motion(motion)
self.planner.start_subprocess(self.controller, use_gpu=self.use_gpu)
self.step = 0
self.replan_timer = 0.0
self.last_ms = _snapshot_ms(self.ms)
@property
def pipeline(self):
return self.controller
def tick(self, obs: dict, *, debug: bool | None = None, use_joystick: bool = True) -> dict:
if not obs:
self.step += 1
return {}
if use_joystick:
process_joystick(obs, self.ms, self.controller)
clamp_mode_params(self.ms)
if self.step > 0:
self.replan_timer += CONTROL_DT
if should_replan_request(self.ms, self.last_ms, self.replan_timer, self.step):
self.planner.request_replan(self.controller.ref_cursor, self.ms)
self.replan_timer = 0.0
self.ms.needs_replan = False
self.last_ms = _snapshot_ms(self.ms)
do_enc = self.step % ENCODER_UPDATE_EVERY == 0
if debug is None:
debug = self.step % DEBUG_PRINT_EVERY == 0
action = self.controller.step(obs, update_encoder=do_enc, debug=debug)
result = self.planner.try_get_new_motion()
if result:
self.controller.blend_new_motion(*result)
self.controller.advance_cursor()
self.step += 1
return action
def reset(self):
self.ms = MovementState()
self.controller.reinit_heading = True
self.controller.playing = True
self.step = 0
self.replan_timer = 0.0
self.last_ms = _snapshot_ms(self.ms)
def shutdown(self):
self.planner.stop_subprocess()
class SonicWholeBodyController:
"""Full-body SONIC controller for UnitreeG1's background controller thread."""
control_dt = CONTROL_DT
full_body = True
def __init__(self, force_cpu: bool = False):
logger.info("Loading SONIC whole-body controller...")
self._runtime = SonicRuntime(force_cpu=force_cpu)
self.kp = self._runtime.kp
self.kd = self._runtime.kd
self.controller = self._runtime.controller
self.ms = self._runtime.ms
logger.info(
"SONIC ready: %s (default mode: %s)",
MOTION_SETS[0][0],
LM(self.ms.mode).name,
)
def run_step(self, action: dict, lowstate) -> dict:
if lowstate is None:
return {}
obs = lowstate_to_obs(lowstate)
return self._runtime.tick(obs, debug=False)
def reset(self):
self._runtime.reset()
def shutdown(self):
self._runtime.shutdown()
+3 -2
View File
@@ -68,8 +68,9 @@ def make_locomotion_controller(name: str | None):
if name is None:
return None
controllers = {
"GrootLocomotionController": "lerobot.robots.unitree_g1.gr00t_locomotion",
"HolosomaLocomotionController": "lerobot.robots.unitree_g1.holosoma_locomotion",
"GrootLocomotionController": "lerobot.robots.unitree_g1.controllers.gr00t_locomotion",
"HolosomaLocomotionController": "lerobot.robots.unitree_g1.controllers.holosoma_locomotion",
"SonicWholeBodyController": "lerobot.robots.unitree_g1.controllers.sonic_whole_body",
}
module_path = controllers.get(name)
if module_path is None:
+9 -1
View File
@@ -338,6 +338,9 @@ class UnitreeG1(Robot):
self.kp = np.array(self.config.kp, dtype=np.float32)
self.kd = np.array(self.config.kd, dtype=np.float32)
if self.controller is not None and hasattr(self.controller, "kp"):
self.kp = np.array(self.controller.kp, dtype=np.float32)
self.kd = np.array(self.controller.kd, dtype=np.float32)
for joint in G1_29_JointIndex:
self.msg.motor_cmd[joint].mode = 1
@@ -374,6 +377,9 @@ class UnitreeG1(Robot):
# Signal thread to stop and unblock any waits
self._shutdown_event.set()
if self.controller is not None and hasattr(self.controller, "shutdown"):
self.controller.shutdown()
# Wait for subscribe thread to finish
if self.subscribe_thread is not None:
self.subscribe_thread.join(timeout=2.0)
@@ -465,9 +471,11 @@ class UnitreeG1(Robot):
def send_action(self, action: RobotAction) -> RobotAction:
action_to_publish = action
if self.controller is not None:
self._update_controller_action(action)
if getattr(self.controller, "full_body", False):
return action
# Controller thread owns legs/waist. Here we only update joystick inputs
# and publish arm targets from the teleoperator.
self._update_controller_action(action)
arm_prefixes = tuple(j.name for j in G1_29_JointArmIndex)
action_to_publish = {
key: value