mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-28 05:37:16 +00:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f7afeb9cb2 | |||
| 3b6de2fdf8 | |||
| 744f3667c0 | |||
| fdde436776 | |||
| 5c683c65c6 | |||
| dfbc25c58f | |||
| 804c76bcc2 | |||
| e6afa69be9 | |||
| 31d1439e29 | |||
| 1c118c6359 |
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Load an SMPL motion clip and expose it in SONIC's encoder format.
|
||||
|
||||
SONIC's whole-body tracking mode (``encode_mode == 2``) consumes a flat
|
||||
720-vector ``smpl_joints_10frame_step1`` = 10 consecutive frames x 24 SMPL
|
||||
joints x 3 (xyz) at 50 Hz.
|
||||
|
||||
IMPORTANT - frame convention: the encoder expects each frame's joints with the
|
||||
body's *root orientation removed* (per-frame canonical), exactly like the live
|
||||
deploy stream's ``smpl_joints_local`` (see ``process_smpl_joints`` in the GEAR
|
||||
PICO teleop and ``smpl_joints_multi_future_local`` in training). The reference
|
||||
``smpl_filtered`` clips instead store **world-frame** joints (heading retained),
|
||||
so feeding them raw makes the robot move but track poorly / never face-forward.
|
||||
This loader therefore canonicalizes on load using the clip's per-frame root
|
||||
orientation (``pose_aa[:, :3]``):
|
||||
|
||||
A = Rx(+90deg) * rotvec(pose_aa[:, :3]) # y-up -> z-up root quat
|
||||
local = base120 * A^-1 * joints # remove root orient
|
||||
|
||||
with ``base120 = quat(0.5,0.5,0.5,0.5)`` (SMPL base rotation). This reproduces
|
||||
the deployed transform (verified: per-frame hip-heading std -> 0).
|
||||
|
||||
Clip is read from a numpy ``.npz``. Expected keys:
|
||||
smpl_joints : (T, 24, 3) float32 -- world-frame joint positions, 50 fps
|
||||
pose_aa : (T, 72) float32 -- SMPL axis-angle (root = [:, :3])
|
||||
transl : (T, 3) float32 -- global root translation (optional)
|
||||
fps : scalar
|
||||
|
||||
Example:
|
||||
python examples/unitree_g1/motion_loader.py \
|
||||
--motion examples/unitree_g1/motions/walk_forward.npz
|
||||
"""
|
||||
|
||||
import argparse
|
||||
|
||||
import numpy as np
|
||||
|
||||
WINDOW = 10 # frames per encoder window (smpl_joints_10frame_step1)
|
||||
N_JOINTS = 24
|
||||
JOINT_DIM = 3
|
||||
SMPL_OBS_DIM = WINDOW * N_JOINTS * JOINT_DIM # 720
|
||||
|
||||
|
||||
def canonicalize_smpl_joints(smpl_joints: np.ndarray, root_aa: np.ndarray) -> np.ndarray:
|
||||
"""Remove per-frame root orientation -> SONIC ``smpl_joints_local`` format.
|
||||
|
||||
Args:
|
||||
smpl_joints: (T, 24, 3) world-frame (z-up) SMPL joint positions.
|
||||
root_aa: (T, 3) SMPL global-orient axis-angle (y-up convention).
|
||||
|
||||
Returns:
|
||||
(T, 24, 3) per-frame root-orientation-removed joints.
|
||||
"""
|
||||
from scipy.spatial.transform import Rotation
|
||||
|
||||
rx90 = Rotation.from_euler("x", 90, degrees=True) # smpl_root_ytoz_up
|
||||
base120 = Rotation.from_quat([0.5, 0.5, 0.5, 0.5]) # remove_smpl_base_rot
|
||||
a = rx90 * Rotation.from_rotvec(root_aa) # z-up root quat (left-mult)
|
||||
b_inv = base120 * a.inv() # inv(remove_smpl_base_rot(a))
|
||||
return np.einsum("tij,tkj->tki", b_inv.as_matrix(), smpl_joints).astype(np.float32)
|
||||
|
||||
|
||||
class SmplMotion:
|
||||
"""A single SMPL clip with SONIC-format windowing."""
|
||||
|
||||
def __init__(self, path: str, loop: bool = True, canonicalize: bool = True):
|
||||
data = np.load(path)
|
||||
smpl_joints = data["smpl_joints"].astype(np.float32) # (T, 24, 3)
|
||||
self.pose_aa = data["pose_aa"].astype(np.float32) if "pose_aa" in data.files else None
|
||||
self.transl = data["transl"].astype(np.float32) if "transl" in data.files else None
|
||||
self.fps = float(data["fps"]) if "fps" in data.files else 50.0
|
||||
self.loop = loop
|
||||
|
||||
if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (N_JOINTS, JOINT_DIM):
|
||||
raise ValueError(f"Expected smpl_joints (T, {N_JOINTS}, {JOINT_DIM}), got {smpl_joints.shape}")
|
||||
|
||||
# Reference clips store world-frame joints; the encoder wants per-frame
|
||||
# root-orientation-removed joints. Canonicalize when we have the root pose.
|
||||
self.canonicalized = False
|
||||
if canonicalize and self.pose_aa is not None:
|
||||
smpl_joints = canonicalize_smpl_joints(smpl_joints, self.pose_aa[:, :3])
|
||||
self.canonicalized = True
|
||||
self.smpl_joints = smpl_joints
|
||||
|
||||
self.num_frames = self.smpl_joints.shape[0]
|
||||
self._cursor = 0
|
||||
|
||||
def window(self, start: int) -> np.ndarray:
|
||||
"""Return the 720-vector for the 10-frame window beginning at ``start``.
|
||||
|
||||
Frames are laid out oldest->newest, joint-major within a frame:
|
||||
[f0_j0_xyz, f0_j1_xyz, ..., f9_j23_xyz].
|
||||
"""
|
||||
idx = np.arange(start, start + WINDOW)
|
||||
idx = np.mod(idx, self.num_frames) if self.loop else np.clip(idx, 0, self.num_frames - 1)
|
||||
return self.smpl_joints[idx].reshape(-1).astype(np.float32)
|
||||
|
||||
def reset(self):
|
||||
self._cursor = 0
|
||||
|
||||
def step(self) -> np.ndarray:
|
||||
"""Advance one frame and return the current 720-vector window."""
|
||||
w = self.window(self._cursor)
|
||||
self._cursor += 1
|
||||
if self.loop:
|
||||
self._cursor %= self.num_frames
|
||||
return w
|
||||
|
||||
@property
|
||||
def done(self) -> bool:
|
||||
return (not self.loop) and (self._cursor + WINDOW >= self.num_frames)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--motion", required=True, help="Path to motion .npz")
|
||||
parser.add_argument("--no-loop", action="store_true")
|
||||
parser.add_argument(
|
||||
"--no-canon", action="store_true", help="Skip canonicalization (feed raw stored joints)"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
m = SmplMotion(args.motion, loop=not args.no_loop, canonicalize=not args.no_canon)
|
||||
duration = m.num_frames / m.fps
|
||||
print(f"Loaded '{args.motion}'")
|
||||
print(f" frames={m.num_frames} fps={m.fps:.1f} duration={duration:.1f}s")
|
||||
print(
|
||||
f" smpl_joints={m.smpl_joints.shape} canonicalized={m.canonicalized} "
|
||||
f"pose_aa={None if m.pose_aa is None else m.pose_aa.shape} "
|
||||
f"transl={None if m.transl is None else m.transl.shape}"
|
||||
)
|
||||
|
||||
# Sanity: after canonicalization the per-frame body heading should be fixed.
|
||||
j = m.smpl_joints
|
||||
v = j[:, 2, :2] - j[:, 1, :2] # R_hip - L_hip, horizontal
|
||||
a = np.arctan2(v[:, 1], v[:, 0])
|
||||
rlen = np.clip(np.hypot(np.cos(a).mean(), np.sin(a).mean()), 1e-9, 1.0)
|
||||
circ_std = np.degrees(np.sqrt(-2 * np.log(rlen)))
|
||||
print(f" hip-heading circ-std={circ_std:.1f} deg (~0 => orientation removed; large => world-frame)")
|
||||
|
||||
w0 = m.window(0)
|
||||
print(f" window(0): shape={w0.shape} (expected {SMPL_OBS_DIM}) min={w0.min():.3f} max={w0.max():.3f}")
|
||||
assert w0.shape == (SMPL_OBS_DIM,), "window must be 720-dim for obs[922:1642]"
|
||||
|
||||
# Simulate a few control ticks.
|
||||
print(" stepping 5 ticks:")
|
||||
for t in range(5):
|
||||
w = m.step()
|
||||
print(f" t={t} cursor={m._cursor} window_norm={np.linalg.norm(w):.2f}")
|
||||
|
||||
print("OK: motion loads and yields SONIC-format 720-vec windows.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,88 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Convert a GEAR-SONIC / BONES-SEED ``smpl_filtered`` clip (.pkl) to .npz.
|
||||
|
||||
The reference clips are zlib-compressed joblib pickles holding a dict with
|
||||
``pose_aa`` (T, 72), ``transl`` (T, 3), ``smpl_joints`` (T, 24, 3), ``fps``.
|
||||
``motion_loader.SmplMotion`` consumes the .npz form so the runtime needs no
|
||||
joblib dependency. Canonicalization (root-orientation removal) happens at load
|
||||
time in ``motion_loader``, so this converter just repackages the raw arrays.
|
||||
|
||||
Run this in an environment that has ``joblib`` (e.g. the sonic teleop venv):
|
||||
|
||||
python examples/unitree_g1/pkl_to_npz.py \
|
||||
--pkl sample_data/smpl_filtered/walk_forward_amateur_001__A001.pkl \
|
||||
--out examples/unitree_g1/motions/walk_forward.npz
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def load_pkl(path: str) -> dict:
|
||||
try:
|
||||
import joblib
|
||||
|
||||
return joblib.load(path)
|
||||
except Exception:
|
||||
# joblib clips are zlib-compressed pickles; fall back to manual inflate.
|
||||
import contextlib
|
||||
import pickle # nosec B403 - loads trusted local SMPL clips authored by the user
|
||||
import zlib
|
||||
|
||||
with open(path, "rb") as f:
|
||||
raw = f.read()
|
||||
with contextlib.suppress(zlib.error):
|
||||
raw = zlib.decompress(raw)
|
||||
return pickle.loads(raw) # nosec B301 - local, user-provided motion files only
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--pkl", required=True, help="Input smpl_filtered .pkl")
|
||||
parser.add_argument("--out", required=True, help="Output .npz path")
|
||||
args = parser.parse_args()
|
||||
|
||||
d = load_pkl(args.pkl)
|
||||
if not isinstance(d, dict) or "smpl_joints" not in d:
|
||||
raise ValueError(f"Unexpected pkl structure; keys={list(d) if isinstance(d, dict) else type(d)}")
|
||||
|
||||
smpl_joints = np.asarray(d["smpl_joints"], np.float32)
|
||||
if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (24, 3):
|
||||
raise ValueError(f"smpl_joints must be (T,24,3), got {smpl_joints.shape}")
|
||||
|
||||
out = {"smpl_joints": smpl_joints, "fps": np.float32(d.get("fps", 50.0))}
|
||||
if "pose_aa" in d:
|
||||
out["pose_aa"] = np.asarray(d["pose_aa"], np.float32)
|
||||
else:
|
||||
print("[warn] no pose_aa -> loader cannot canonicalize (will feed raw)")
|
||||
if "transl" in d:
|
||||
out["transl"] = np.asarray(d["transl"], np.float32)
|
||||
|
||||
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
|
||||
np.savez_compressed(args.out, **out)
|
||||
dur = smpl_joints.shape[0] / float(out["fps"])
|
||||
print(f"Wrote {args.out}")
|
||||
print(
|
||||
f" frames={smpl_joints.shape[0]} fps={float(out['fps']):.1f} duration={dur:.1f}s keys={sorted(out)}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,294 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
SONIC planner with full mode control.
|
||||
|
||||
Keyboard controls:
|
||||
N / P - next / previous motion set
|
||||
1-8 - select mode within current set
|
||||
WASD - movement direction
|
||||
Q / E - rotate facing left / right
|
||||
9 / 0 - decrease / increase speed
|
||||
- / = - decrease / increase height
|
||||
R - force replan
|
||||
M - toggle SMPL motion playback <-> locomotion (needs --motion-file)
|
||||
Space - emergency stop -> IDLE
|
||||
Esc - quit
|
||||
|
||||
Gamepad controls (Unitree wireless controller):
|
||||
Left stick Y - speed (forward = fast, back = stop)
|
||||
Left stick X - movement direction (offset from facing)
|
||||
Right stick X - facing direction (incremental rotation)
|
||||
Right stick Y - height (up = tall 0.8m, down = low 0.1m)
|
||||
Buttons - unused (mode selection is keyboard-only)
|
||||
|
||||
For teleop integration use --robot.controller=SonicWholeBodyController instead.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import faulthandler
|
||||
import gc
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from motion_loader import SmplMotion
|
||||
|
||||
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
|
||||
from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
|
||||
CONTROL_DT,
|
||||
DEFAULT_ANGLES,
|
||||
LM,
|
||||
MOTION_SETS,
|
||||
RawKeyboard,
|
||||
compute_kp_kd,
|
||||
drain_keyboard,
|
||||
)
|
||||
from lerobot.robots.unitree_g1.controllers.sonic_whole_body import SonicRuntime
|
||||
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
|
||||
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control")
|
||||
parser.add_argument(
|
||||
"--ip",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Robot IP for real hardware (e.g. 192.168.123.164). Omit for simulation.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--log-csv",
|
||||
action="store_true",
|
||||
help="Write /tmp/sonic_pose_log.csv (disabled by default for teleop perf)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cpu",
|
||||
action="store_true",
|
||||
help="Force CPU ONNX Runtime (skip CUDA even if onnxruntime-gpu is installed)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--headless", action="store_true", help="Ignored for sim (stock UnitreeG1 uses hub MuJoCo defaults)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--gamepad",
|
||||
action="store_true",
|
||||
help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--keyboard-only", action="store_true", help="Ignore wireless gamepad (terminal keyboard only)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--motion-file",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Play an SMPL motion clip (.npz) via SONIC whole-body mode "
|
||||
"(encode_mode=2) instead of locomotion planning.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-loop", action="store_true", help="With --motion-file, play once instead of looping"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Surface native crashes (onnxruntime / mujoco) with a real traceback, and
|
||||
# avoid losing buffered diagnostics if the process dies mid-loop.
|
||||
faulthandler.enable()
|
||||
with contextlib.suppress(Exception):
|
||||
sys.stdout.reconfigure(line_buffering=True)
|
||||
|
||||
print("=" * 60)
|
||||
print("SONIC planner - full mode control")
|
||||
print(" N/P cycle sets | 1-8 select mode | WASD move")
|
||||
print(" Q/E rotate | 9/0 speed | -/= height")
|
||||
print(" R replan | Space IDLE | Esc quit")
|
||||
if args.ip:
|
||||
print(f" Robot IP: {args.ip}")
|
||||
else:
|
||||
print(" Mode: simulation")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
cfg = UnitreeG1Config(controller=None) # full-body SONIC; standalone loop owns publish
|
||||
if args.ip:
|
||||
cfg.is_simulation = False
|
||||
cfg.robot_ip = args.ip
|
||||
else:
|
||||
cfg.is_simulation = True
|
||||
if args.headless:
|
||||
print("[Note] --headless ignored: sim uses stock UnitreeG1 + hub env")
|
||||
robot = UnitreeG1(cfg)
|
||||
robot.connect()
|
||||
kp, kd = compute_kp_kd()
|
||||
robot.kp = kp.copy()
|
||||
robot.kd = kd.copy()
|
||||
|
||||
runtime = SonicRuntime(force_cpu=args.cpu)
|
||||
controller = runtime.controller
|
||||
ms = runtime.ms
|
||||
|
||||
motion = None
|
||||
if args.motion_file:
|
||||
motion = SmplMotion(args.motion_file, loop=not args.no_loop)
|
||||
controller.smpl_motion = motion # lets 'M' key toggle playback
|
||||
controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation
|
||||
dur = motion.num_frames / motion.fps
|
||||
print(f"\n[Motion] SMPL whole-body playback: {args.motion_file}")
|
||||
print(
|
||||
f" frames={motion.num_frames} fps={motion.fps:.1f} "
|
||||
f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2"
|
||||
)
|
||||
print(" Press 'M' to toggle SMPL playback <-> locomotion at runtime.")
|
||||
|
||||
runtime.controller.print_input_diagnostics()
|
||||
|
||||
print(f"\nStarting: {MOTION_SETS[0][0]} (default mode: {LM(ms.mode).name})")
|
||||
[print(f" {i + 1}: {m.name}") for i, m in enumerate(MOTION_SETS[0][1])]
|
||||
print(
|
||||
"\n[Ready] Click THIS terminal, then W/A/S/D to move. 1-6 change mode, 9/0 speed, Esc quit.\n",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
# Sim hub publishes wireless_remote bytes that can fight terminal WASD.
|
||||
base_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None)
|
||||
|
||||
with RawKeyboard() as kb:
|
||||
try:
|
||||
gc.disable()
|
||||
gc_timer = 0.0
|
||||
robot.reset(CONTROL_DT, DEFAULT_ANGLES)
|
||||
time.sleep(1.0)
|
||||
|
||||
last_status = time.time() - 2.1
|
||||
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
|
||||
slow_n = blend_n = 0
|
||||
stall_src = ""
|
||||
did_blend = False
|
||||
t_start = time.time()
|
||||
|
||||
log_path = os.path.join(tempfile.gettempdir(), "sonic_pose_log.csv")
|
||||
jnames = [m.name for m in G1_29_JointIndex]
|
||||
log_ctx = open(log_path, "w") if args.log_csv else None # noqa: SIM115
|
||||
if log_ctx:
|
||||
log_ctx.write(
|
||||
"t,step,cursor,ts,blend,mode,"
|
||||
+ ",".join(f"q{i}" for i in range(29))
|
||||
+ ","
|
||||
+ ",".join(f"ref{i}" for i in range(29))
|
||||
+ ","
|
||||
+ ",".join(f"act{i}" for i in range(29))
|
||||
+ ",delta_max,action_norm,token_norm\n"
|
||||
)
|
||||
|
||||
try:
|
||||
while not robot._shutdown_event.is_set():
|
||||
t0 = time.time()
|
||||
if drain_keyboard(kb, ms, controller):
|
||||
break
|
||||
|
||||
obs = robot.get_observation()
|
||||
t_obs = time.time()
|
||||
obs_t.append(1000 * (t_obs - t0))
|
||||
if not obs:
|
||||
runtime.tick({}, use_joystick=False)
|
||||
time.sleep(max(0.0, CONTROL_DT - (time.time() - t0)))
|
||||
continue
|
||||
|
||||
# SMPL playback only while in whole-body mode; 'M' toggles it.
|
||||
motion_active = motion is not None and controller.encode_mode == 2
|
||||
if motion_active:
|
||||
controller.smpl_joints_10frame_step1 = motion.step()
|
||||
if motion.done:
|
||||
print("\n[Motion] clip finished")
|
||||
break
|
||||
|
||||
step_before = runtime.step
|
||||
t_step = time.time()
|
||||
action = runtime.tick(obs, use_joystick=base_joystick and not motion_active)
|
||||
step_ms = 1000 * (time.time() - t_step)
|
||||
do_enc = step_before % 5 == 0
|
||||
(enc_t if do_enc else dec_t).append(step_ms)
|
||||
|
||||
t_act = time.time()
|
||||
robot.send_action(action)
|
||||
act_t.append(1000 * (time.time() - t_act))
|
||||
|
||||
if log_ctx and runtime.step % 5 == 0:
|
||||
t_rel = time.time() - t_start
|
||||
q_r = np.array([obs.get(f"{n}.q", 0) for n in jnames])
|
||||
a_v = np.array([action.get(f"{n}.q", 0) for n in jnames])
|
||||
cur, ts = controller.ref_cursor, controller.motion_timesteps
|
||||
q_ref = (
|
||||
controller.motion_joint_positions[min(cur, ts - 1)] if ts > 0 else np.zeros(29)
|
||||
)
|
||||
log_ctx.write(
|
||||
f"{t_rel:.4f},{runtime.step},{cur},{ts},{int(did_blend)},{ms.mode},"
|
||||
+ ",".join(f"{v:.6f}" for v in q_r)
|
||||
+ ","
|
||||
+ ",".join(f"{v:.6f}" for v in q_ref)
|
||||
+ ","
|
||||
+ ",".join(f"{v:.6f}" for v in a_v)
|
||||
+ ","
|
||||
+ f"{np.max(np.abs(a_v - q_r)):.6f},"
|
||||
f"{np.linalg.norm(a_v):.6f},"
|
||||
f"{np.linalg.norm(controller.token):.6f}\n"
|
||||
)
|
||||
did_blend = False
|
||||
|
||||
now = time.time()
|
||||
loop_ms = 1000 * (now - t0)
|
||||
if loop_ms > 50:
|
||||
stall_src = (
|
||||
f"[STALL] {loop_ms:.0f}ms: "
|
||||
f"obs={obs_t[-1]:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}"
|
||||
)
|
||||
if loop_ms > CONTROL_DT * 1500:
|
||||
slow_n += 1
|
||||
|
||||
if now - last_status > 2.0:
|
||||
|
||||
def _avg(lst):
|
||||
return sum(lst) / len(lst) if lst else 0
|
||||
|
||||
hz = 1000 / _avg(loop_t) if _avg(loop_t) else 0
|
||||
print(
|
||||
f"\r {ms.status_line()} step={runtime.step} "
|
||||
f"ref={controller.ref_cursor}/{controller.motion_timesteps} "
|
||||
f"loop={_avg(loop_t):.1f}ms(max={max(loop_t, default=0):.1f}) hz={hz:.0f} "
|
||||
f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} "
|
||||
f"slow={slow_n} blends={blend_n}",
|
||||
end="",
|
||||
flush=True,
|
||||
)
|
||||
if stall_src:
|
||||
print(f"\n {stall_src}")
|
||||
stall_src = ""
|
||||
last_status = now
|
||||
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
|
||||
slow_n = blend_n = 0
|
||||
|
||||
gc_timer += CONTROL_DT
|
||||
if gc_timer >= 10.0:
|
||||
gc.collect()
|
||||
gc_timer = 0.0
|
||||
loop_t.append(loop_ms)
|
||||
time.sleep(max(0.0, CONTROL_DT - (time.time() - t0)))
|
||||
finally:
|
||||
if log_ctx:
|
||||
log_ctx.close()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
gc.enable()
|
||||
if args.log_csv:
|
||||
print(f"\n[Log] Saved to {log_path}")
|
||||
runtime.shutdown()
|
||||
print("\nStopping...")
|
||||
if robot.is_connected:
|
||||
robot.disconnect()
|
||||
print("Done.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -68,6 +68,6 @@ class UnitreeG1Config(RobotConfig):
|
||||
# Compensates for gravity on the unitree's arms using the arm ik solver
|
||||
gravity_compensation: bool = False
|
||||
|
||||
# Lower-body controller class name, e.g. "GrootLocomotionController" or
|
||||
# "HolosomaLocomotionController". None disables it.
|
||||
# Locomotion controller class name, e.g. "GrootLocomotionController",
|
||||
# "HolosomaLocomotionController", or "SonicWholeBodyController". None disables it.
|
||||
controller: str | None = None
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Unitree G1 locomotion controllers (Groot, Holosoma, SONIC, mjlab motion imitation)."""
|
||||
|
||||
__all__ = [
|
||||
"GrootLocomotionController",
|
||||
"HolosomaLocomotionController",
|
||||
"SonicWholeBodyController",
|
||||
"SonicRuntime",
|
||||
"MjlabMotionImitationController",
|
||||
]
|
||||
+1
-1
@@ -21,7 +21,7 @@ import numpy as np
|
||||
import onnxruntime as ort
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
from .g1_utils import (
|
||||
from lerobot.robots.unitree_g1.g1_utils import (
|
||||
REMOTE_AXES,
|
||||
REMOTE_BUTTONS,
|
||||
G1_29_JointIndex,
|
||||
+1
-1
@@ -22,7 +22,7 @@ import onnx
|
||||
import onnxruntime as ort
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
from .g1_utils import (
|
||||
from lerobot.robots.unitree_g1.g1_utils import (
|
||||
REMOTE_AXES,
|
||||
G1_29_JointArmIndex,
|
||||
G1_29_JointIndex,
|
||||
@@ -0,0 +1,296 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""mjlab BeyondMimic motion-imitation controller for the Unitree G1.
|
||||
|
||||
Deploys any mjlab BeyondMimic motion-tracking policy (trained without state
|
||||
estimation and exported to ONNX) together with its reference clip. The pipeline
|
||||
is motion-agnostic: point it at a different ``(onnx, motion.npz)`` pair and it
|
||||
will track that clip. The bundled default is the "double spin kick".
|
||||
|
||||
The exported policy is a pure function ``actions = policy(obs)`` of a 154-dim
|
||||
observation. The observation terms (in concatenation order) are::
|
||||
|
||||
command (58) ref joint_pos[t] (29) + ref joint_vel[t] (29)
|
||||
motion_anchor_ori_b (6) first two columns of R(q_robot_torso^-1 * q_ref_torso[t])
|
||||
base_ang_vel (3) pelvis IMU angular velocity (body frame)
|
||||
joint_pos (29) q - default_q
|
||||
joint_vel (29) dq
|
||||
actions (29) previous raw policy output
|
||||
|
||||
The reference trajectory (``command`` + reference torso quaternion) is read from
|
||||
``motion.npz``. Joint order matches ``G1_29_JointIndex`` exactly, so no remapping
|
||||
is needed. Per-joint action scale and PD gains are read from the ONNX metadata so
|
||||
the controller always stays consistent with the exported policy.
|
||||
|
||||
Note: the policy's anchor body is ``torso_link`` while ``base_ang_vel`` comes from
|
||||
the pelvis IMU (mjlab's sensors live on ``imu_in_pelvis``). This sim publishes the
|
||||
pelvis as ``imu_state``, so the torso orientation used for ``motion_anchor_ori_b``
|
||||
is reconstructed from the pelvis quaternion + the three waist joints.
|
||||
|
||||
Override the deployed policy/clip without code changes via env vars:
|
||||
MJLAB_ONNX_PATH=/path/to/policy.onnx
|
||||
MJLAB_MOTION_PATH=/path/to/motion.npz
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
import numpy as np
|
||||
import onnxruntime as ort
|
||||
|
||||
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_HERE = os.path.dirname(os.path.abspath(__file__))
|
||||
# Policies + reference clips live in examples/unitree_g1/motions/.
|
||||
_MOTIONS_DIR = os.path.normpath(
|
||||
os.path.join(_HERE, "..", "..", "..", "..", "..", "examples", "unitree_g1", "motions")
|
||||
)
|
||||
# Default bundled policy + clip (the double spin kick). Override with env vars or
|
||||
# the onnx_path / motion_path constructor args to deploy any other mjlab policy.
|
||||
DEFAULT_ONNX_PATH = os.environ.get(
|
||||
"MJLAB_ONNX_PATH", os.path.join(_MOTIONS_DIR, "2026-06-26_14-03-33.onnx")
|
||||
)
|
||||
DEFAULT_MOTION_PATH = os.environ.get("MJLAB_MOTION_PATH", os.path.join(_MOTIONS_DIR, "motion.npz"))
|
||||
|
||||
CONTROL_DT = 0.02 # 50 Hz, matches mjlab decimation=4 * timestep=0.005
|
||||
|
||||
# Index of ``torso_link`` in the full 30-body mjlab G1 body ordering (the layout
|
||||
# stored in motion.npz body_*_w arrays).
|
||||
TORSO_BODY_INDEX = 15
|
||||
|
||||
# Hold the first motion frame for a short settle window before playing the clip,
|
||||
# so the robot can reach the start pose before the motion begins.
|
||||
START_HOLD_STEPS = 50
|
||||
|
||||
|
||||
def _parse_floats(meta: dict, key: str) -> np.ndarray:
|
||||
return np.array([float(x) for x in meta[key].split(",")], dtype=np.float32)
|
||||
|
||||
|
||||
def _quat_inv(q: np.ndarray) -> np.ndarray:
|
||||
"""Inverse of a unit quaternion (w, x, y, z)."""
|
||||
q = q / (np.linalg.norm(q) + 1e-8)
|
||||
return np.array([q[0], -q[1], -q[2], -q[3]], dtype=np.float32)
|
||||
|
||||
|
||||
def _yaw_quat(q: np.ndarray) -> np.ndarray:
|
||||
"""Yaw-only component of a quaternion (w, x, y, z)."""
|
||||
w, x, y, z = q
|
||||
yaw = np.arctan2(2.0 * (w * z + x * y), 1.0 - 2.0 * (y * y + z * z))
|
||||
return np.array([np.cos(yaw / 2.0), 0.0, 0.0, np.sin(yaw / 2.0)], dtype=np.float32)
|
||||
|
||||
|
||||
def _quat_mul(a: np.ndarray, b: np.ndarray) -> np.ndarray:
|
||||
w1, x1, y1, z1 = a
|
||||
w2, x2, y2, z2 = b
|
||||
return np.array(
|
||||
[
|
||||
w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2,
|
||||
w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2,
|
||||
w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2,
|
||||
w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2,
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
|
||||
def _pelvis_to_torso_quat(
|
||||
pelvis_quat: np.ndarray, waist_yaw: float, waist_roll: float, waist_pitch: float
|
||||
) -> np.ndarray:
|
||||
"""Torso-link world orientation from the pelvis quaternion + waist joints.
|
||||
|
||||
The kinematic chain is pelvis -> waist_yaw(z) -> waist_roll(x) ->
|
||||
torso_link/waist_pitch(y), and every intermediate body frame is identity, so::
|
||||
|
||||
q_torso = q_pelvis (x) Rz(waist_yaw) (x) Rx(waist_roll) (x) Ry(waist_pitch)
|
||||
"""
|
||||
hy, hr, hp = waist_yaw / 2.0, waist_roll / 2.0, waist_pitch / 2.0
|
||||
qz = np.array([np.cos(hy), 0.0, 0.0, np.sin(hy)], dtype=np.float32)
|
||||
qx = np.array([np.cos(hr), np.sin(hr), 0.0, 0.0], dtype=np.float32)
|
||||
qy = np.array([np.cos(hp), 0.0, np.sin(hp), 0.0], dtype=np.float32)
|
||||
return _quat_mul(_quat_mul(_quat_mul(pelvis_quat, qz), qx), qy)
|
||||
|
||||
|
||||
def _ori_6d(q_rel: np.ndarray) -> np.ndarray:
|
||||
"""First two columns of the rotation matrix of ``q_rel`` (w, x, y, z).
|
||||
|
||||
Matches mjlab ``matrix_from_quat(...)[..., :2].reshape(-1)``: row-major
|
||||
flatten of the first two columns -> [R00, R01, R10, R11, R20, R21].
|
||||
"""
|
||||
q = q_rel / (np.linalg.norm(q_rel) + 1e-8)
|
||||
w, x, y, z = q
|
||||
two = 2.0
|
||||
return np.array(
|
||||
[
|
||||
1.0 - two * (y * y + z * z), # R00
|
||||
two * (x * y - z * w), # R01
|
||||
two * (x * y + z * w), # R10
|
||||
1.0 - two * (x * x + z * z), # R11
|
||||
two * (x * z - y * w), # R20
|
||||
two * (y * z + x * w), # R21
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
|
||||
class MjlabMotionImitationController:
|
||||
"""Full-body mjlab BeyondMimic motion-imitation controller for UnitreeG1.
|
||||
|
||||
Motion-agnostic: pass any exported ``(onnx_path, motion_path)`` pair (or set
|
||||
the ``MJLAB_ONNX_PATH`` / ``MJLAB_MOTION_PATH`` env vars) to deploy a policy
|
||||
trained on a different reference clip.
|
||||
"""
|
||||
|
||||
control_dt = CONTROL_DT
|
||||
full_body = True
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
onnx_path: str = DEFAULT_ONNX_PATH,
|
||||
motion_path: str = DEFAULT_MOTION_PATH,
|
||||
imu_is_pelvis: bool = True,
|
||||
):
|
||||
logger.info("Loading mjlab motion-imitation controller (%s)...", os.path.basename(onnx_path))
|
||||
if not os.path.exists(onnx_path):
|
||||
raise FileNotFoundError(f"mjlab ONNX policy not found: {onnx_path}")
|
||||
if not os.path.exists(motion_path):
|
||||
raise FileNotFoundError(f"mjlab motion file not found: {motion_path}")
|
||||
|
||||
so = ort.SessionOptions()
|
||||
so.log_severity_level = 3
|
||||
self.session = ort.InferenceSession(onnx_path, sess_options=so, providers=["CPUExecutionProvider"])
|
||||
|
||||
meta = dict(self.session.get_modelmeta().custom_metadata_map)
|
||||
|
||||
self.default_q = _parse_floats(meta, "default_joint_pos")
|
||||
self.action_scale = _parse_floats(meta, "action_scale")
|
||||
self.kp = _parse_floats(meta, "joint_stiffness")
|
||||
self.kd = _parse_floats(meta, "joint_damping")
|
||||
if not (len(self.default_q) == len(self.action_scale) == len(self.kp) == len(self.kd) == 29):
|
||||
raise ValueError("mjlab policy metadata must define 29 values per joint array")
|
||||
|
||||
# ONNX input names: obs (float32 [1, 154]) + time_step ([1, 1]).
|
||||
self._inputs = {i.name: i for i in self.session.get_inputs()}
|
||||
self._obs_name = next(n for n in self._inputs if n != "time_step")
|
||||
self._ts_name = "time_step" if "time_step" in self._inputs else None
|
||||
self._ts_dtype = (
|
||||
np.int64 if (self._ts_name and "int" in self._inputs[self._ts_name].type) else np.float32
|
||||
)
|
||||
|
||||
motion = np.load(motion_path)
|
||||
self.ref_joint_pos = motion["joint_pos"].astype(np.float32) # (T, 29)
|
||||
self.ref_joint_vel = motion["joint_vel"].astype(np.float32) # (T, 29)
|
||||
self.ref_torso_quat = motion["body_quat_w"][:, TORSO_BODY_INDEX, :].astype(np.float32) # (T, 4)
|
||||
self.n_frames = int(self.ref_joint_pos.shape[0])
|
||||
|
||||
# The MuJoCo sim publishes the PELVIS (floating base) as imu_state, but the
|
||||
# policy's anchor body is torso_link. When True, reconstruct the torso
|
||||
# orientation from the pelvis quat + waist joints. On hardware where the IMU
|
||||
# already reports the torso, set this False.
|
||||
self.imu_is_pelvis = imu_is_pelvis
|
||||
|
||||
self.step = 0
|
||||
self.last_action = np.zeros(29, dtype=np.float32)
|
||||
# Yaw alignment between the reference clip and the robot's actual heading,
|
||||
# captured lazily on the first run_step after a reset.
|
||||
self._align_quat: np.ndarray | None = None
|
||||
logger.info("mjlab motion-imitation ready: %d frames @ %dHz", self.n_frames, int(1.0 / CONTROL_DT))
|
||||
|
||||
def reset(self) -> None:
|
||||
self.step = 0
|
||||
self.last_action[:] = 0.0
|
||||
self._align_quat = None
|
||||
|
||||
def _motion_index(self) -> int:
|
||||
"""Hold frame 0 during the settle window, then advance and clamp at the end."""
|
||||
idx = self.step - START_HOLD_STEPS
|
||||
if idx < 0:
|
||||
idx = 0
|
||||
return int(min(idx, self.n_frames - 1))
|
||||
|
||||
def run_step(self, action: dict, lowstate) -> dict:
|
||||
if lowstate is None:
|
||||
return {}
|
||||
|
||||
t = self._motion_index()
|
||||
|
||||
# Robot joint state (native G1_29 order == policy order).
|
||||
q = np.empty(29, dtype=np.float32)
|
||||
dq = np.empty(29, dtype=np.float32)
|
||||
for motor in G1_29_JointIndex:
|
||||
i = motor.value
|
||||
q[i] = lowstate.motor_state[i].q
|
||||
dq[i] = lowstate.motor_state[i].dq
|
||||
|
||||
# base_ang_vel is the pelvis gyro (mjlab's IMU sensors live on imu_in_pelvis),
|
||||
# so the raw gyro is already correct. The anchor orientation, however, is the
|
||||
# torso_link (anchor_body); reconstruct it from the pelvis quat + waist joints
|
||||
# since imu_state reports the pelvis in this sim.
|
||||
pelvis_quat = np.array(lowstate.imu_state.quaternion, dtype=np.float32) # (w, x, y, z)
|
||||
gyro = np.array(lowstate.imu_state.gyroscope, dtype=np.float32)
|
||||
if self.imu_is_pelvis:
|
||||
quat = _pelvis_to_torso_quat(
|
||||
pelvis_quat,
|
||||
float(q[G1_29_JointIndex.kWaistYaw.value]),
|
||||
float(q[G1_29_JointIndex.kWaistRoll.value]),
|
||||
float(q[G1_29_JointIndex.kWaistPitch.value]),
|
||||
)
|
||||
else:
|
||||
quat = pelvis_quat
|
||||
|
||||
# Heading-align the reference clip to the robot's actual yaw at start.
|
||||
# The anchor_ori_b term is NOT yaw-invariant, so a clip that starts at a
|
||||
# nonzero world yaw would make the policy see a huge heading error at t=0
|
||||
# and spin. Yaw-only, so true roll/pitch tracking is preserved.
|
||||
if self._align_quat is None:
|
||||
self._align_quat = _quat_mul(_yaw_quat(quat), _quat_inv(_yaw_quat(self.ref_torso_quat[0])))
|
||||
ref_torso = _quat_mul(self._align_quat, self.ref_torso_quat[t])
|
||||
|
||||
# motion_anchor_ori_b = first-2-cols of R(q_robot_torso^-1 * q_ref_torso[t]).
|
||||
q_rel = _quat_mul(_quat_inv(quat), ref_torso)
|
||||
anchor_ori_b = _ori_6d(q_rel)
|
||||
|
||||
obs = np.concatenate(
|
||||
[
|
||||
self.ref_joint_pos[t], # command: ref joint pos (29)
|
||||
self.ref_joint_vel[t], # command: ref joint vel (29)
|
||||
anchor_ori_b, # motion_anchor_ori_b (6)
|
||||
gyro, # base_ang_vel (3)
|
||||
q - self.default_q, # joint_pos (29)
|
||||
dq, # joint_vel (29)
|
||||
self.last_action, # actions (29)
|
||||
]
|
||||
).astype(np.float32)
|
||||
|
||||
feeds: dict[str, np.ndarray] = {self._obs_name: obs[None, :]}
|
||||
if self._ts_name is not None:
|
||||
feeds[self._ts_name] = np.array([[t]], dtype=self._ts_dtype)
|
||||
raw_action = self.session.run(["actions"], feeds)[0].reshape(-1).astype(np.float32)
|
||||
self.last_action = raw_action
|
||||
|
||||
target_q = self.default_q + raw_action * self.action_scale
|
||||
|
||||
# Advance the motion cursor: hold frame 0 for the settle window, play the
|
||||
# clip, then clamp at the final frame (see _motion_index).
|
||||
self.step += 1
|
||||
|
||||
return {f"{motor.name}.q": float(target_q[motor.value]) for motor in G1_29_JointIndex}
|
||||
|
||||
def shutdown(self) -> None:
|
||||
pass
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,151 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""SONIC full-body controller for Unitree G1."""
|
||||
|
||||
import logging
|
||||
|
||||
import onnxruntime as ort
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
|
||||
CONTROL_DT,
|
||||
DEBUG_PRINT_EVERY,
|
||||
DEFAULT_ANGLES,
|
||||
ENCODER_UPDATE_EVERY,
|
||||
LM,
|
||||
MOTION_SETS,
|
||||
MovementState,
|
||||
PlannerController,
|
||||
SonicPlanner,
|
||||
_ort_providers,
|
||||
_snapshot_ms,
|
||||
clamp_mode_params,
|
||||
compute_kp_kd,
|
||||
lowstate_to_obs,
|
||||
process_joystick,
|
||||
should_replan_request,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SonicRuntime:
|
||||
"""Shared SONIC control loop state (standalone demo + locomotion controller)."""
|
||||
|
||||
def __init__(self, force_cpu: bool = False):
|
||||
planner_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="planner_sonic.onnx")
|
||||
encoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_encoder.onnx")
|
||||
decoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_decoder.onnx")
|
||||
|
||||
providers = _ort_providers(force_cpu=force_cpu)
|
||||
self.use_gpu = providers[0] == "CUDAExecutionProvider"
|
||||
so = ort.SessionOptions()
|
||||
so.log_severity_level = 3
|
||||
|
||||
planner_sess = ort.InferenceSession(planner_path, sess_options=so, providers=providers)
|
||||
encoder_sess = ort.InferenceSession(encoder_path, sess_options=so, providers=providers)
|
||||
decoder_sess = ort.InferenceSession(decoder_path, sess_options=so, providers=providers)
|
||||
|
||||
self.kp, self.kd = compute_kp_kd()
|
||||
self.ms = MovementState()
|
||||
self.planner = SonicPlanner(planner_sess, planner_path)
|
||||
self.controller = PlannerController(self.planner, encoder_sess, decoder_sess)
|
||||
|
||||
motion = self.planner.initialize(DEFAULT_ANGLES, self.ms)
|
||||
self.controller.load_initial_motion(motion)
|
||||
self.planner.start_subprocess(self.controller, use_gpu=self.use_gpu)
|
||||
|
||||
self.step = 0
|
||||
self.replan_timer = 0.0
|
||||
self.last_ms = _snapshot_ms(self.ms)
|
||||
|
||||
@property
|
||||
def pipeline(self):
|
||||
return self.controller
|
||||
|
||||
def tick(self, obs: dict, *, debug: bool | None = None, use_joystick: bool = True) -> dict:
|
||||
if not obs:
|
||||
self.step += 1
|
||||
return {}
|
||||
|
||||
if use_joystick:
|
||||
process_joystick(obs, self.ms, self.controller)
|
||||
clamp_mode_params(self.ms)
|
||||
|
||||
if self.step > 0:
|
||||
self.replan_timer += CONTROL_DT
|
||||
if should_replan_request(self.ms, self.last_ms, self.replan_timer, self.step):
|
||||
self.planner.request_replan(self.controller.ref_cursor, self.ms)
|
||||
self.replan_timer = 0.0
|
||||
self.ms.needs_replan = False
|
||||
self.last_ms = _snapshot_ms(self.ms)
|
||||
|
||||
do_enc = self.step % ENCODER_UPDATE_EVERY == 0
|
||||
if debug is None:
|
||||
debug = self.step % DEBUG_PRINT_EVERY == 0
|
||||
action = self.controller.step(obs, update_encoder=do_enc, debug=debug)
|
||||
|
||||
result = self.planner.try_get_new_motion()
|
||||
if result:
|
||||
self.controller.blend_new_motion(*result)
|
||||
|
||||
self.controller.advance_cursor()
|
||||
self.step += 1
|
||||
return action
|
||||
|
||||
def reset(self):
|
||||
self.ms = MovementState()
|
||||
self.controller.reinit_heading = True
|
||||
self.controller.playing = True
|
||||
self.step = 0
|
||||
self.replan_timer = 0.0
|
||||
self.last_ms = _snapshot_ms(self.ms)
|
||||
|
||||
def shutdown(self):
|
||||
self.planner.stop_subprocess()
|
||||
|
||||
|
||||
class SonicWholeBodyController:
|
||||
"""Full-body SONIC controller for UnitreeG1's background controller thread."""
|
||||
|
||||
control_dt = CONTROL_DT
|
||||
full_body = True
|
||||
|
||||
def __init__(self, force_cpu: bool = False):
|
||||
logger.info("Loading SONIC whole-body controller...")
|
||||
self._runtime = SonicRuntime(force_cpu=force_cpu)
|
||||
self.kp = self._runtime.kp
|
||||
self.kd = self._runtime.kd
|
||||
self.controller = self._runtime.controller
|
||||
self.ms = self._runtime.ms
|
||||
logger.info(
|
||||
"SONIC ready: %s (default mode: %s)",
|
||||
MOTION_SETS[0][0],
|
||||
LM(self.ms.mode).name,
|
||||
)
|
||||
|
||||
def run_step(self, action: dict, lowstate) -> dict:
|
||||
if lowstate is None:
|
||||
return {}
|
||||
obs = lowstate_to_obs(lowstate)
|
||||
return self._runtime.tick(obs, debug=False)
|
||||
|
||||
def reset(self):
|
||||
self._runtime.reset()
|
||||
|
||||
def shutdown(self):
|
||||
self._runtime.shutdown()
|
||||
@@ -68,8 +68,10 @@ def make_locomotion_controller(name: str | None):
|
||||
if name is None:
|
||||
return None
|
||||
controllers = {
|
||||
"GrootLocomotionController": "lerobot.robots.unitree_g1.gr00t_locomotion",
|
||||
"HolosomaLocomotionController": "lerobot.robots.unitree_g1.holosoma_locomotion",
|
||||
"GrootLocomotionController": "lerobot.robots.unitree_g1.controllers.gr00t_locomotion",
|
||||
"HolosomaLocomotionController": "lerobot.robots.unitree_g1.controllers.holosoma_locomotion",
|
||||
"SonicWholeBodyController": "lerobot.robots.unitree_g1.controllers.sonic_whole_body",
|
||||
"MjlabMotionImitationController": "lerobot.robots.unitree_g1.controllers.mjlab_motion_imitation",
|
||||
}
|
||||
module_path = controllers.get(name)
|
||||
if module_path is None:
|
||||
|
||||
@@ -338,6 +338,9 @@ class UnitreeG1(Robot):
|
||||
|
||||
self.kp = np.array(self.config.kp, dtype=np.float32)
|
||||
self.kd = np.array(self.config.kd, dtype=np.float32)
|
||||
if self.controller is not None and hasattr(self.controller, "kp"):
|
||||
self.kp = np.array(self.controller.kp, dtype=np.float32)
|
||||
self.kd = np.array(self.controller.kd, dtype=np.float32)
|
||||
|
||||
for joint in G1_29_JointIndex:
|
||||
self.msg.motor_cmd[joint].mode = 1
|
||||
@@ -374,6 +377,9 @@ class UnitreeG1(Robot):
|
||||
# Signal thread to stop and unblock any waits
|
||||
self._shutdown_event.set()
|
||||
|
||||
if self.controller is not None and hasattr(self.controller, "shutdown"):
|
||||
self.controller.shutdown()
|
||||
|
||||
# Wait for subscribe thread to finish
|
||||
if self.subscribe_thread is not None:
|
||||
self.subscribe_thread.join(timeout=2.0)
|
||||
@@ -465,9 +471,11 @@ class UnitreeG1(Robot):
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
action_to_publish = action
|
||||
if self.controller is not None:
|
||||
self._update_controller_action(action)
|
||||
if getattr(self.controller, "full_body", False):
|
||||
return action
|
||||
# Controller thread owns legs/waist. Here we only update joystick inputs
|
||||
# and publish arm targets from the teleoperator.
|
||||
self._update_controller_action(action)
|
||||
arm_prefixes = tuple(j.name for j in G1_29_JointArmIndex)
|
||||
action_to_publish = {
|
||||
key: value
|
||||
|
||||
Reference in New Issue
Block a user