mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-30 22:57:00 +00:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3b6de2fdf8 | |||
| 744f3667c0 | |||
| fdde436776 | |||
| 5c683c65c6 | |||
| dfbc25c58f | |||
| 804c76bcc2 | |||
| e6afa69be9 | |||
| 31d1439e29 | |||
| 1c118c6359 |
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Load an SMPL motion clip and expose it in SONIC's encoder format.
|
||||
|
||||
SONIC's whole-body tracking mode (``encode_mode == 2``) consumes a flat
|
||||
720-vector ``smpl_joints_10frame_step1`` = 10 consecutive frames x 24 SMPL
|
||||
joints x 3 (xyz) at 50 Hz.
|
||||
|
||||
IMPORTANT - frame convention: the encoder expects each frame's joints with the
|
||||
body's *root orientation removed* (per-frame canonical), exactly like the live
|
||||
deploy stream's ``smpl_joints_local`` (see ``process_smpl_joints`` in the GEAR
|
||||
PICO teleop and ``smpl_joints_multi_future_local`` in training). The reference
|
||||
``smpl_filtered`` clips instead store **world-frame** joints (heading retained),
|
||||
so feeding them raw makes the robot move but track poorly / never face-forward.
|
||||
This loader therefore canonicalizes on load using the clip's per-frame root
|
||||
orientation (``pose_aa[:, :3]``):
|
||||
|
||||
A = Rx(+90deg) * rotvec(pose_aa[:, :3]) # y-up -> z-up root quat
|
||||
local = base120 * A^-1 * joints # remove root orient
|
||||
|
||||
with ``base120 = quat(0.5,0.5,0.5,0.5)`` (SMPL base rotation). This reproduces
|
||||
the deployed transform (verified: per-frame hip-heading std -> 0).
|
||||
|
||||
Clip is read from a numpy ``.npz``. Expected keys:
|
||||
smpl_joints : (T, 24, 3) float32 -- world-frame joint positions, 50 fps
|
||||
pose_aa : (T, 72) float32 -- SMPL axis-angle (root = [:, :3])
|
||||
transl : (T, 3) float32 -- global root translation (optional)
|
||||
fps : scalar
|
||||
|
||||
Example:
|
||||
python examples/unitree_g1/motion_loader.py \
|
||||
--motion examples/unitree_g1/motions/walk_forward.npz
|
||||
"""
|
||||
|
||||
import argparse
|
||||
|
||||
import numpy as np
|
||||
|
||||
WINDOW = 10 # frames per encoder window (smpl_joints_10frame_step1)
|
||||
N_JOINTS = 24
|
||||
JOINT_DIM = 3
|
||||
SMPL_OBS_DIM = WINDOW * N_JOINTS * JOINT_DIM # 720
|
||||
|
||||
|
||||
def canonicalize_smpl_joints(smpl_joints: np.ndarray, root_aa: np.ndarray) -> np.ndarray:
|
||||
"""Remove per-frame root orientation -> SONIC ``smpl_joints_local`` format.
|
||||
|
||||
Args:
|
||||
smpl_joints: (T, 24, 3) world-frame (z-up) SMPL joint positions.
|
||||
root_aa: (T, 3) SMPL global-orient axis-angle (y-up convention).
|
||||
|
||||
Returns:
|
||||
(T, 24, 3) per-frame root-orientation-removed joints.
|
||||
"""
|
||||
from scipy.spatial.transform import Rotation
|
||||
|
||||
rx90 = Rotation.from_euler("x", 90, degrees=True) # smpl_root_ytoz_up
|
||||
base120 = Rotation.from_quat([0.5, 0.5, 0.5, 0.5]) # remove_smpl_base_rot
|
||||
a = rx90 * Rotation.from_rotvec(root_aa) # z-up root quat (left-mult)
|
||||
b_inv = base120 * a.inv() # inv(remove_smpl_base_rot(a))
|
||||
return np.einsum("tij,tkj->tki", b_inv.as_matrix(), smpl_joints).astype(np.float32)
|
||||
|
||||
|
||||
class SmplMotion:
|
||||
"""A single SMPL clip with SONIC-format windowing."""
|
||||
|
||||
def __init__(self, path: str, loop: bool = True, canonicalize: bool = True):
|
||||
data = np.load(path)
|
||||
smpl_joints = data["smpl_joints"].astype(np.float32) # (T, 24, 3)
|
||||
self.pose_aa = data["pose_aa"].astype(np.float32) if "pose_aa" in data.files else None
|
||||
self.transl = data["transl"].astype(np.float32) if "transl" in data.files else None
|
||||
self.fps = float(data["fps"]) if "fps" in data.files else 50.0
|
||||
self.loop = loop
|
||||
|
||||
if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (N_JOINTS, JOINT_DIM):
|
||||
raise ValueError(f"Expected smpl_joints (T, {N_JOINTS}, {JOINT_DIM}), got {smpl_joints.shape}")
|
||||
|
||||
# Reference clips store world-frame joints; the encoder wants per-frame
|
||||
# root-orientation-removed joints. Canonicalize when we have the root pose.
|
||||
self.canonicalized = False
|
||||
if canonicalize and self.pose_aa is not None:
|
||||
smpl_joints = canonicalize_smpl_joints(smpl_joints, self.pose_aa[:, :3])
|
||||
self.canonicalized = True
|
||||
self.smpl_joints = smpl_joints
|
||||
|
||||
self.num_frames = self.smpl_joints.shape[0]
|
||||
self._cursor = 0
|
||||
|
||||
def window(self, start: int) -> np.ndarray:
|
||||
"""Return the 720-vector for the 10-frame window beginning at ``start``.
|
||||
|
||||
Frames are laid out oldest->newest, joint-major within a frame:
|
||||
[f0_j0_xyz, f0_j1_xyz, ..., f9_j23_xyz].
|
||||
"""
|
||||
idx = np.arange(start, start + WINDOW)
|
||||
idx = np.mod(idx, self.num_frames) if self.loop else np.clip(idx, 0, self.num_frames - 1)
|
||||
return self.smpl_joints[idx].reshape(-1).astype(np.float32)
|
||||
|
||||
def reset(self):
|
||||
self._cursor = 0
|
||||
|
||||
def step(self) -> np.ndarray:
|
||||
"""Advance one frame and return the current 720-vector window."""
|
||||
w = self.window(self._cursor)
|
||||
self._cursor += 1
|
||||
if self.loop:
|
||||
self._cursor %= self.num_frames
|
||||
return w
|
||||
|
||||
@property
|
||||
def done(self) -> bool:
|
||||
return (not self.loop) and (self._cursor + WINDOW >= self.num_frames)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--motion", required=True, help="Path to motion .npz")
|
||||
parser.add_argument("--no-loop", action="store_true")
|
||||
parser.add_argument(
|
||||
"--no-canon", action="store_true", help="Skip canonicalization (feed raw stored joints)"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
m = SmplMotion(args.motion, loop=not args.no_loop, canonicalize=not args.no_canon)
|
||||
duration = m.num_frames / m.fps
|
||||
print(f"Loaded '{args.motion}'")
|
||||
print(f" frames={m.num_frames} fps={m.fps:.1f} duration={duration:.1f}s")
|
||||
print(
|
||||
f" smpl_joints={m.smpl_joints.shape} canonicalized={m.canonicalized} "
|
||||
f"pose_aa={None if m.pose_aa is None else m.pose_aa.shape} "
|
||||
f"transl={None if m.transl is None else m.transl.shape}"
|
||||
)
|
||||
|
||||
# Sanity: after canonicalization the per-frame body heading should be fixed.
|
||||
j = m.smpl_joints
|
||||
v = j[:, 2, :2] - j[:, 1, :2] # R_hip - L_hip, horizontal
|
||||
a = np.arctan2(v[:, 1], v[:, 0])
|
||||
rlen = np.clip(np.hypot(np.cos(a).mean(), np.sin(a).mean()), 1e-9, 1.0)
|
||||
circ_std = np.degrees(np.sqrt(-2 * np.log(rlen)))
|
||||
print(f" hip-heading circ-std={circ_std:.1f} deg (~0 => orientation removed; large => world-frame)")
|
||||
|
||||
w0 = m.window(0)
|
||||
print(f" window(0): shape={w0.shape} (expected {SMPL_OBS_DIM}) min={w0.min():.3f} max={w0.max():.3f}")
|
||||
assert w0.shape == (SMPL_OBS_DIM,), "window must be 720-dim for obs[922:1642]"
|
||||
|
||||
# Simulate a few control ticks.
|
||||
print(" stepping 5 ticks:")
|
||||
for t in range(5):
|
||||
w = m.step()
|
||||
print(f" t={t} cursor={m._cursor} window_norm={np.linalg.norm(w):.2f}")
|
||||
|
||||
print("OK: motion loads and yields SONIC-format 720-vec windows.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,88 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Convert a GEAR-SONIC / BONES-SEED ``smpl_filtered`` clip (.pkl) to .npz.
|
||||
|
||||
The reference clips are zlib-compressed joblib pickles holding a dict with
|
||||
``pose_aa`` (T, 72), ``transl`` (T, 3), ``smpl_joints`` (T, 24, 3), ``fps``.
|
||||
``motion_loader.SmplMotion`` consumes the .npz form so the runtime needs no
|
||||
joblib dependency. Canonicalization (root-orientation removal) happens at load
|
||||
time in ``motion_loader``, so this converter just repackages the raw arrays.
|
||||
|
||||
Run this in an environment that has ``joblib`` (e.g. the sonic teleop venv):
|
||||
|
||||
python examples/unitree_g1/pkl_to_npz.py \
|
||||
--pkl sample_data/smpl_filtered/walk_forward_amateur_001__A001.pkl \
|
||||
--out examples/unitree_g1/motions/walk_forward.npz
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def load_pkl(path: str) -> dict:
|
||||
try:
|
||||
import joblib
|
||||
|
||||
return joblib.load(path)
|
||||
except Exception:
|
||||
# joblib clips are zlib-compressed pickles; fall back to manual inflate.
|
||||
import contextlib
|
||||
import pickle # nosec B403 - loads trusted local SMPL clips authored by the user
|
||||
import zlib
|
||||
|
||||
with open(path, "rb") as f:
|
||||
raw = f.read()
|
||||
with contextlib.suppress(zlib.error):
|
||||
raw = zlib.decompress(raw)
|
||||
return pickle.loads(raw) # nosec B301 - local, user-provided motion files only
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--pkl", required=True, help="Input smpl_filtered .pkl")
|
||||
parser.add_argument("--out", required=True, help="Output .npz path")
|
||||
args = parser.parse_args()
|
||||
|
||||
d = load_pkl(args.pkl)
|
||||
if not isinstance(d, dict) or "smpl_joints" not in d:
|
||||
raise ValueError(f"Unexpected pkl structure; keys={list(d) if isinstance(d, dict) else type(d)}")
|
||||
|
||||
smpl_joints = np.asarray(d["smpl_joints"], np.float32)
|
||||
if smpl_joints.ndim != 3 or smpl_joints.shape[1:] != (24, 3):
|
||||
raise ValueError(f"smpl_joints must be (T,24,3), got {smpl_joints.shape}")
|
||||
|
||||
out = {"smpl_joints": smpl_joints, "fps": np.float32(d.get("fps", 50.0))}
|
||||
if "pose_aa" in d:
|
||||
out["pose_aa"] = np.asarray(d["pose_aa"], np.float32)
|
||||
else:
|
||||
print("[warn] no pose_aa -> loader cannot canonicalize (will feed raw)")
|
||||
if "transl" in d:
|
||||
out["transl"] = np.asarray(d["transl"], np.float32)
|
||||
|
||||
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
|
||||
np.savez_compressed(args.out, **out)
|
||||
dur = smpl_joints.shape[0] / float(out["fps"])
|
||||
print(f"Wrote {args.out}")
|
||||
print(
|
||||
f" frames={smpl_joints.shape[0]} fps={float(out['fps']):.1f} duration={dur:.1f}s keys={sorted(out)}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,294 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
SONIC planner with full mode control.
|
||||
|
||||
Keyboard controls:
|
||||
N / P - next / previous motion set
|
||||
1-8 - select mode within current set
|
||||
WASD - movement direction
|
||||
Q / E - rotate facing left / right
|
||||
9 / 0 - decrease / increase speed
|
||||
- / = - decrease / increase height
|
||||
R - force replan
|
||||
M - toggle SMPL motion playback <-> locomotion (needs --motion-file)
|
||||
Space - emergency stop -> IDLE
|
||||
Esc - quit
|
||||
|
||||
Gamepad controls (Unitree wireless controller):
|
||||
Left stick Y - speed (forward = fast, back = stop)
|
||||
Left stick X - movement direction (offset from facing)
|
||||
Right stick X - facing direction (incremental rotation)
|
||||
Right stick Y - height (up = tall 0.8m, down = low 0.1m)
|
||||
Buttons - unused (mode selection is keyboard-only)
|
||||
|
||||
For teleop integration use --robot.controller=SonicWholeBodyController instead.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import faulthandler
|
||||
import gc
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from motion_loader import SmplMotion
|
||||
|
||||
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
|
||||
from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
|
||||
CONTROL_DT,
|
||||
DEFAULT_ANGLES,
|
||||
LM,
|
||||
MOTION_SETS,
|
||||
RawKeyboard,
|
||||
compute_kp_kd,
|
||||
drain_keyboard,
|
||||
)
|
||||
from lerobot.robots.unitree_g1.controllers.sonic_whole_body import SonicRuntime
|
||||
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
|
||||
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control")
|
||||
parser.add_argument(
|
||||
"--ip",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Robot IP for real hardware (e.g. 192.168.123.164). Omit for simulation.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--log-csv",
|
||||
action="store_true",
|
||||
help="Write /tmp/sonic_pose_log.csv (disabled by default for teleop perf)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cpu",
|
||||
action="store_true",
|
||||
help="Force CPU ONNX Runtime (skip CUDA even if onnxruntime-gpu is installed)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--headless", action="store_true", help="Ignored for sim (stock UnitreeG1 uses hub MuJoCo defaults)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--gamepad",
|
||||
action="store_true",
|
||||
help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--keyboard-only", action="store_true", help="Ignore wireless gamepad (terminal keyboard only)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--motion-file",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Play an SMPL motion clip (.npz) via SONIC whole-body mode "
|
||||
"(encode_mode=2) instead of locomotion planning.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-loop", action="store_true", help="With --motion-file, play once instead of looping"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Surface native crashes (onnxruntime / mujoco) with a real traceback, and
|
||||
# avoid losing buffered diagnostics if the process dies mid-loop.
|
||||
faulthandler.enable()
|
||||
with contextlib.suppress(Exception):
|
||||
sys.stdout.reconfigure(line_buffering=True)
|
||||
|
||||
print("=" * 60)
|
||||
print("SONIC planner - full mode control")
|
||||
print(" N/P cycle sets | 1-8 select mode | WASD move")
|
||||
print(" Q/E rotate | 9/0 speed | -/= height")
|
||||
print(" R replan | Space IDLE | Esc quit")
|
||||
if args.ip:
|
||||
print(f" Robot IP: {args.ip}")
|
||||
else:
|
||||
print(" Mode: simulation")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
cfg = UnitreeG1Config(controller=None) # full-body SONIC; standalone loop owns publish
|
||||
if args.ip:
|
||||
cfg.is_simulation = False
|
||||
cfg.robot_ip = args.ip
|
||||
else:
|
||||
cfg.is_simulation = True
|
||||
if args.headless:
|
||||
print("[Note] --headless ignored: sim uses stock UnitreeG1 + hub env")
|
||||
robot = UnitreeG1(cfg)
|
||||
robot.connect()
|
||||
kp, kd = compute_kp_kd()
|
||||
robot.kp = kp.copy()
|
||||
robot.kd = kd.copy()
|
||||
|
||||
runtime = SonicRuntime(force_cpu=args.cpu)
|
||||
controller = runtime.controller
|
||||
ms = runtime.ms
|
||||
|
||||
motion = None
|
||||
if args.motion_file:
|
||||
motion = SmplMotion(args.motion_file, loop=not args.no_loop)
|
||||
controller.smpl_motion = motion # lets 'M' key toggle playback
|
||||
controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation
|
||||
dur = motion.num_frames / motion.fps
|
||||
print(f"\n[Motion] SMPL whole-body playback: {args.motion_file}")
|
||||
print(
|
||||
f" frames={motion.num_frames} fps={motion.fps:.1f} "
|
||||
f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2"
|
||||
)
|
||||
print(" Press 'M' to toggle SMPL playback <-> locomotion at runtime.")
|
||||
|
||||
runtime.controller.print_input_diagnostics()
|
||||
|
||||
print(f"\nStarting: {MOTION_SETS[0][0]} (default mode: {LM(ms.mode).name})")
|
||||
[print(f" {i + 1}: {m.name}") for i, m in enumerate(MOTION_SETS[0][1])]
|
||||
print(
|
||||
"\n[Ready] Click THIS terminal, then W/A/S/D to move. 1-6 change mode, 9/0 speed, Esc quit.\n",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
# Sim hub publishes wireless_remote bytes that can fight terminal WASD.
|
||||
base_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None)
|
||||
|
||||
with RawKeyboard() as kb:
|
||||
try:
|
||||
gc.disable()
|
||||
gc_timer = 0.0
|
||||
robot.reset(CONTROL_DT, DEFAULT_ANGLES)
|
||||
time.sleep(1.0)
|
||||
|
||||
last_status = time.time() - 2.1
|
||||
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
|
||||
slow_n = blend_n = 0
|
||||
stall_src = ""
|
||||
did_blend = False
|
||||
t_start = time.time()
|
||||
|
||||
log_path = os.path.join(tempfile.gettempdir(), "sonic_pose_log.csv")
|
||||
jnames = [m.name for m in G1_29_JointIndex]
|
||||
log_ctx = open(log_path, "w") if args.log_csv else None # noqa: SIM115
|
||||
if log_ctx:
|
||||
log_ctx.write(
|
||||
"t,step,cursor,ts,blend,mode,"
|
||||
+ ",".join(f"q{i}" for i in range(29))
|
||||
+ ","
|
||||
+ ",".join(f"ref{i}" for i in range(29))
|
||||
+ ","
|
||||
+ ",".join(f"act{i}" for i in range(29))
|
||||
+ ",delta_max,action_norm,token_norm\n"
|
||||
)
|
||||
|
||||
try:
|
||||
while not robot._shutdown_event.is_set():
|
||||
t0 = time.time()
|
||||
if drain_keyboard(kb, ms, controller):
|
||||
break
|
||||
|
||||
obs = robot.get_observation()
|
||||
t_obs = time.time()
|
||||
obs_t.append(1000 * (t_obs - t0))
|
||||
if not obs:
|
||||
runtime.tick({}, use_joystick=False)
|
||||
time.sleep(max(0.0, CONTROL_DT - (time.time() - t0)))
|
||||
continue
|
||||
|
||||
# SMPL playback only while in whole-body mode; 'M' toggles it.
|
||||
motion_active = motion is not None and controller.encode_mode == 2
|
||||
if motion_active:
|
||||
controller.smpl_joints_10frame_step1 = motion.step()
|
||||
if motion.done:
|
||||
print("\n[Motion] clip finished")
|
||||
break
|
||||
|
||||
step_before = runtime.step
|
||||
t_step = time.time()
|
||||
action = runtime.tick(obs, use_joystick=base_joystick and not motion_active)
|
||||
step_ms = 1000 * (time.time() - t_step)
|
||||
do_enc = step_before % 5 == 0
|
||||
(enc_t if do_enc else dec_t).append(step_ms)
|
||||
|
||||
t_act = time.time()
|
||||
robot.send_action(action)
|
||||
act_t.append(1000 * (time.time() - t_act))
|
||||
|
||||
if log_ctx and runtime.step % 5 == 0:
|
||||
t_rel = time.time() - t_start
|
||||
q_r = np.array([obs.get(f"{n}.q", 0) for n in jnames])
|
||||
a_v = np.array([action.get(f"{n}.q", 0) for n in jnames])
|
||||
cur, ts = controller.ref_cursor, controller.motion_timesteps
|
||||
q_ref = (
|
||||
controller.motion_joint_positions[min(cur, ts - 1)] if ts > 0 else np.zeros(29)
|
||||
)
|
||||
log_ctx.write(
|
||||
f"{t_rel:.4f},{runtime.step},{cur},{ts},{int(did_blend)},{ms.mode},"
|
||||
+ ",".join(f"{v:.6f}" for v in q_r)
|
||||
+ ","
|
||||
+ ",".join(f"{v:.6f}" for v in q_ref)
|
||||
+ ","
|
||||
+ ",".join(f"{v:.6f}" for v in a_v)
|
||||
+ ","
|
||||
+ f"{np.max(np.abs(a_v - q_r)):.6f},"
|
||||
f"{np.linalg.norm(a_v):.6f},"
|
||||
f"{np.linalg.norm(controller.token):.6f}\n"
|
||||
)
|
||||
did_blend = False
|
||||
|
||||
now = time.time()
|
||||
loop_ms = 1000 * (now - t0)
|
||||
if loop_ms > 50:
|
||||
stall_src = (
|
||||
f"[STALL] {loop_ms:.0f}ms: "
|
||||
f"obs={obs_t[-1]:.0f} step={step_ms:.0f} act={act_t[-1]:.0f}"
|
||||
)
|
||||
if loop_ms > CONTROL_DT * 1500:
|
||||
slow_n += 1
|
||||
|
||||
if now - last_status > 2.0:
|
||||
|
||||
def _avg(lst):
|
||||
return sum(lst) / len(lst) if lst else 0
|
||||
|
||||
hz = 1000 / _avg(loop_t) if _avg(loop_t) else 0
|
||||
print(
|
||||
f"\r {ms.status_line()} step={runtime.step} "
|
||||
f"ref={controller.ref_cursor}/{controller.motion_timesteps} "
|
||||
f"loop={_avg(loop_t):.1f}ms(max={max(loop_t, default=0):.1f}) hz={hz:.0f} "
|
||||
f"enc={_avg(enc_t):.1f} dec={_avg(dec_t):.1f} obs={_avg(obs_t):.1f} "
|
||||
f"slow={slow_n} blends={blend_n}",
|
||||
end="",
|
||||
flush=True,
|
||||
)
|
||||
if stall_src:
|
||||
print(f"\n {stall_src}")
|
||||
stall_src = ""
|
||||
last_status = now
|
||||
loop_t, enc_t, dec_t, obs_t, act_t = [], [], [], [], []
|
||||
slow_n = blend_n = 0
|
||||
|
||||
gc_timer += CONTROL_DT
|
||||
if gc_timer >= 10.0:
|
||||
gc.collect()
|
||||
gc_timer = 0.0
|
||||
loop_t.append(loop_ms)
|
||||
time.sleep(max(0.0, CONTROL_DT - (time.time() - t0)))
|
||||
finally:
|
||||
if log_ctx:
|
||||
log_ctx.close()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
gc.enable()
|
||||
if args.log_csv:
|
||||
print(f"\n[Log] Saved to {log_path}")
|
||||
runtime.shutdown()
|
||||
print("\nStopping...")
|
||||
if robot.is_connected:
|
||||
robot.disconnect()
|
||||
print("Done.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -68,6 +68,6 @@ class UnitreeG1Config(RobotConfig):
|
||||
# Compensates for gravity on the unitree's arms using the arm ik solver
|
||||
gravity_compensation: bool = False
|
||||
|
||||
# Lower-body controller class name, e.g. "GrootLocomotionController" or
|
||||
# "HolosomaLocomotionController". None disables it.
|
||||
# Locomotion controller class name, e.g. "GrootLocomotionController",
|
||||
# "HolosomaLocomotionController", or "SonicWholeBodyController". None disables it.
|
||||
controller: str | None = None
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Unitree G1 locomotion controllers (Groot, Holosoma, SONIC)."""
|
||||
|
||||
__all__ = [
|
||||
"GrootLocomotionController",
|
||||
"HolosomaLocomotionController",
|
||||
"SonicWholeBodyController",
|
||||
"SonicRuntime",
|
||||
]
|
||||
+1
-1
@@ -21,7 +21,7 @@ import numpy as np
|
||||
import onnxruntime as ort
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
from .g1_utils import (
|
||||
from lerobot.robots.unitree_g1.g1_utils import (
|
||||
REMOTE_AXES,
|
||||
REMOTE_BUTTONS,
|
||||
G1_29_JointIndex,
|
||||
+1
-1
@@ -22,7 +22,7 @@ import onnx
|
||||
import onnxruntime as ort
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
from .g1_utils import (
|
||||
from lerobot.robots.unitree_g1.g1_utils import (
|
||||
REMOTE_AXES,
|
||||
G1_29_JointArmIndex,
|
||||
G1_29_JointIndex,
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,151 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""SONIC full-body controller for Unitree G1."""
|
||||
|
||||
import logging
|
||||
|
||||
import onnxruntime as ort
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
|
||||
CONTROL_DT,
|
||||
DEBUG_PRINT_EVERY,
|
||||
DEFAULT_ANGLES,
|
||||
ENCODER_UPDATE_EVERY,
|
||||
LM,
|
||||
MOTION_SETS,
|
||||
MovementState,
|
||||
PlannerController,
|
||||
SonicPlanner,
|
||||
_ort_providers,
|
||||
_snapshot_ms,
|
||||
clamp_mode_params,
|
||||
compute_kp_kd,
|
||||
lowstate_to_obs,
|
||||
process_joystick,
|
||||
should_replan_request,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SonicRuntime:
|
||||
"""Shared SONIC control loop state (standalone demo + locomotion controller)."""
|
||||
|
||||
def __init__(self, force_cpu: bool = False):
|
||||
planner_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="planner_sonic.onnx")
|
||||
encoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_encoder.onnx")
|
||||
decoder_path = hf_hub_download(repo_id="nvidia/GEAR-SONIC", filename="model_decoder.onnx")
|
||||
|
||||
providers = _ort_providers(force_cpu=force_cpu)
|
||||
self.use_gpu = providers[0] == "CUDAExecutionProvider"
|
||||
so = ort.SessionOptions()
|
||||
so.log_severity_level = 3
|
||||
|
||||
planner_sess = ort.InferenceSession(planner_path, sess_options=so, providers=providers)
|
||||
encoder_sess = ort.InferenceSession(encoder_path, sess_options=so, providers=providers)
|
||||
decoder_sess = ort.InferenceSession(decoder_path, sess_options=so, providers=providers)
|
||||
|
||||
self.kp, self.kd = compute_kp_kd()
|
||||
self.ms = MovementState()
|
||||
self.planner = SonicPlanner(planner_sess, planner_path)
|
||||
self.controller = PlannerController(self.planner, encoder_sess, decoder_sess)
|
||||
|
||||
motion = self.planner.initialize(DEFAULT_ANGLES, self.ms)
|
||||
self.controller.load_initial_motion(motion)
|
||||
self.planner.start_subprocess(self.controller, use_gpu=self.use_gpu)
|
||||
|
||||
self.step = 0
|
||||
self.replan_timer = 0.0
|
||||
self.last_ms = _snapshot_ms(self.ms)
|
||||
|
||||
@property
|
||||
def pipeline(self):
|
||||
return self.controller
|
||||
|
||||
def tick(self, obs: dict, *, debug: bool | None = None, use_joystick: bool = True) -> dict:
|
||||
if not obs:
|
||||
self.step += 1
|
||||
return {}
|
||||
|
||||
if use_joystick:
|
||||
process_joystick(obs, self.ms, self.controller)
|
||||
clamp_mode_params(self.ms)
|
||||
|
||||
if self.step > 0:
|
||||
self.replan_timer += CONTROL_DT
|
||||
if should_replan_request(self.ms, self.last_ms, self.replan_timer, self.step):
|
||||
self.planner.request_replan(self.controller.ref_cursor, self.ms)
|
||||
self.replan_timer = 0.0
|
||||
self.ms.needs_replan = False
|
||||
self.last_ms = _snapshot_ms(self.ms)
|
||||
|
||||
do_enc = self.step % ENCODER_UPDATE_EVERY == 0
|
||||
if debug is None:
|
||||
debug = self.step % DEBUG_PRINT_EVERY == 0
|
||||
action = self.controller.step(obs, update_encoder=do_enc, debug=debug)
|
||||
|
||||
result = self.planner.try_get_new_motion()
|
||||
if result:
|
||||
self.controller.blend_new_motion(*result)
|
||||
|
||||
self.controller.advance_cursor()
|
||||
self.step += 1
|
||||
return action
|
||||
|
||||
def reset(self):
|
||||
self.ms = MovementState()
|
||||
self.controller.reinit_heading = True
|
||||
self.controller.playing = True
|
||||
self.step = 0
|
||||
self.replan_timer = 0.0
|
||||
self.last_ms = _snapshot_ms(self.ms)
|
||||
|
||||
def shutdown(self):
|
||||
self.planner.stop_subprocess()
|
||||
|
||||
|
||||
class SonicWholeBodyController:
|
||||
"""Full-body SONIC controller for UnitreeG1's background controller thread."""
|
||||
|
||||
control_dt = CONTROL_DT
|
||||
full_body = True
|
||||
|
||||
def __init__(self, force_cpu: bool = False):
|
||||
logger.info("Loading SONIC whole-body controller...")
|
||||
self._runtime = SonicRuntime(force_cpu=force_cpu)
|
||||
self.kp = self._runtime.kp
|
||||
self.kd = self._runtime.kd
|
||||
self.controller = self._runtime.controller
|
||||
self.ms = self._runtime.ms
|
||||
logger.info(
|
||||
"SONIC ready: %s (default mode: %s)",
|
||||
MOTION_SETS[0][0],
|
||||
LM(self.ms.mode).name,
|
||||
)
|
||||
|
||||
def run_step(self, action: dict, lowstate) -> dict:
|
||||
if lowstate is None:
|
||||
return {}
|
||||
obs = lowstate_to_obs(lowstate)
|
||||
return self._runtime.tick(obs, debug=False)
|
||||
|
||||
def reset(self):
|
||||
self._runtime.reset()
|
||||
|
||||
def shutdown(self):
|
||||
self._runtime.shutdown()
|
||||
@@ -68,8 +68,9 @@ def make_locomotion_controller(name: str | None):
|
||||
if name is None:
|
||||
return None
|
||||
controllers = {
|
||||
"GrootLocomotionController": "lerobot.robots.unitree_g1.gr00t_locomotion",
|
||||
"HolosomaLocomotionController": "lerobot.robots.unitree_g1.holosoma_locomotion",
|
||||
"GrootLocomotionController": "lerobot.robots.unitree_g1.controllers.gr00t_locomotion",
|
||||
"HolosomaLocomotionController": "lerobot.robots.unitree_g1.controllers.holosoma_locomotion",
|
||||
"SonicWholeBodyController": "lerobot.robots.unitree_g1.controllers.sonic_whole_body",
|
||||
}
|
||||
module_path = controllers.get(name)
|
||||
if module_path is None:
|
||||
|
||||
@@ -338,6 +338,9 @@ class UnitreeG1(Robot):
|
||||
|
||||
self.kp = np.array(self.config.kp, dtype=np.float32)
|
||||
self.kd = np.array(self.config.kd, dtype=np.float32)
|
||||
if self.controller is not None and hasattr(self.controller, "kp"):
|
||||
self.kp = np.array(self.controller.kp, dtype=np.float32)
|
||||
self.kd = np.array(self.controller.kd, dtype=np.float32)
|
||||
|
||||
for joint in G1_29_JointIndex:
|
||||
self.msg.motor_cmd[joint].mode = 1
|
||||
@@ -374,6 +377,9 @@ class UnitreeG1(Robot):
|
||||
# Signal thread to stop and unblock any waits
|
||||
self._shutdown_event.set()
|
||||
|
||||
if self.controller is not None and hasattr(self.controller, "shutdown"):
|
||||
self.controller.shutdown()
|
||||
|
||||
# Wait for subscribe thread to finish
|
||||
if self.subscribe_thread is not None:
|
||||
self.subscribe_thread.join(timeout=2.0)
|
||||
@@ -465,9 +471,11 @@ class UnitreeG1(Robot):
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
action_to_publish = action
|
||||
if self.controller is not None:
|
||||
self._update_controller_action(action)
|
||||
if getattr(self.controller, "full_body", False):
|
||||
return action
|
||||
# Controller thread owns legs/waist. Here we only update joystick inputs
|
||||
# and publish arm targets from the teleoperator.
|
||||
self._update_controller_action(action)
|
||||
arm_prefixes = tuple(j.name for j in G1_29_JointArmIndex)
|
||||
action_to_publish = {
|
||||
key: value
|
||||
|
||||
Reference in New Issue
Block a user