add custom motion loader

This commit is contained in:
Martino Russi
2026-06-17 12:29:36 +02:00
parent 1c118c6359
commit 31d1439e29
2 changed files with 68 additions and 2 deletions
+40 -2
View File
@@ -10,6 +10,7 @@ Keyboard controls:
9 / 0 - decrease / increase speed
- / = - decrease / increase height
R - force replan
M - toggle SMPL motion playback <-> locomotion (needs --motion-file)
Space - emergency stop -> IDLE
Esc - quit
@@ -24,7 +25,9 @@ For teleop integration use --robot.controller=SonicWholeBodyController instead.
"""
import argparse
import faulthandler
import gc
import sys
import time
import numpy as np
@@ -43,6 +46,8 @@ from lerobot.robots.unitree_g1.controllers.sonic_pipeline import (
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1
from motion_loader import SmplMotion
def main():
parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control")
@@ -59,8 +64,21 @@ def main():
help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)")
parser.add_argument("--keyboard-only", action="store_true",
help="Ignore wireless gamepad (terminal keyboard only)")
parser.add_argument("--motion-file", type=str, default=None,
help="Play an SMPL motion clip (.npz) via SONIC whole-body mode "
"(encode_mode=2) instead of locomotion planning.")
parser.add_argument("--no-loop", action="store_true",
help="With --motion-file, play once instead of looping")
args = parser.parse_args()
# Surface native crashes (onnxruntime / mujoco) with a real traceback, and
# avoid losing buffered diagnostics if the process dies mid-loop.
faulthandler.enable()
try:
sys.stdout.reconfigure(line_buffering=True)
except Exception:
pass
print("=" * 60)
print("SONIC planner - full mode control")
print(" N/P cycle sets | 1-8 select mode | WASD move")
@@ -89,6 +107,18 @@ def main():
runtime = SonicRuntime(force_cpu=args.cpu)
controller = runtime.controller
ms = runtime.ms
motion = None
if args.motion_file:
motion = SmplMotion(args.motion_file, loop=not args.no_loop)
controller.smpl_motion = motion # lets 'M' key toggle playback
controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation
dur = motion.num_frames / motion.fps
print(f"\n[Motion] SMPL whole-body playback: {args.motion_file}")
print(f" frames={motion.num_frames} fps={motion.fps:.1f} "
f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2")
print(" Press 'M' to toggle SMPL playback <-> locomotion at runtime.")
runtime.controller.print_input_diagnostics()
print(f"\nStarting: {MOTION_SETS[0][0]} (default mode: {LM(ms.mode).name})")
@@ -97,7 +127,7 @@ def main():
"1-6 change mode, 9/0 speed, Esc quit.\n", flush=True)
# Sim hub publishes wireless_remote bytes that can fight terminal WASD.
use_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None)
base_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None)
with RawKeyboard() as kb:
try:
@@ -138,9 +168,17 @@ def main():
time.sleep(max(0.0, CONTROL_DT - (time.time() - t0)))
continue
# SMPL playback only while in whole-body mode; 'M' toggles it.
motion_active = motion is not None and controller.encode_mode == 2
if motion_active:
controller.smpl_joints_10frame_step1 = motion.step()
if motion.done:
print("\n[Motion] clip finished")
break
step_before = runtime.step
t_step = time.time()
action = runtime.tick(obs, use_joystick=use_joystick)
action = runtime.tick(obs, use_joystick=base_joystick and not motion_active)
step_ms = 1000 * (time.time() - t_step)
do_enc = step_before % 5 == 0
(enc_t if do_enc else dec_t).append(step_ms)
@@ -735,6 +735,19 @@ class PlannerController(StandingEncoderDecoder):
def build_encoder_obs(self):
obs = np.zeros(1762, np.float32); obs[0] = float(self.encode_mode)
with self.motion_lock:
if self.encode_mode == 2:
# SMPL whole-body imitation: the 720-dim SMPL window carries the
# target pose; the planner reference frame supplies anchor + wrist.
rf = min(self.ref_cursor, self.motion_timesteps - 1)
ref_pos = self.motion_joint_positions[rf].astype(np.float32)
ref_quat = self.motion_body_quats[rf].astype(np.float32)
anchor = self._anchor_6d(self.h_quat[0], ref_quat)
wrist = ref_pos[WRIST_IL]
obs[922:1642] = self.smpl_joints_10frame_step1
for f in range(10):
obs[1642+6*f:1642+6*(f+1)] = anchor
obs[1702+6*f:1702+6*(f+1)] = wrist
return obs
for f in range(10):
tf = min(self.ref_cursor + f*5 if self.playing else self.ref_cursor,
self.motion_timesteps - 1)
@@ -809,6 +822,21 @@ def process_keyboard(key, ms, controller=None):
print("\n >> EMERGENCY STOP -> IDLE"); return False
if key in ('r','R'):
ms.needs_replan = True; print("\n >> Manual replan"); return False
if key in ('m','M'):
if controller is not None and getattr(controller, "smpl_motion", None) is not None:
if controller.encode_mode == 2:
controller.encode_mode = 0
controller.playing = True; controller.reinit_heading = True
ms.needs_replan = True
print("\n >> Motion OFF -> locomotion (mode 0). WASD to drive.")
else:
controller.encode_mode = 2
controller.reinit_heading = True
controller.smpl_motion.reset()
print("\n >> Motion ON -> SMPL playback (mode 2)")
else:
print("\n >> No motion loaded (start with --motion-file)")
return False
if key in ('n','N','p','P'):
ms.motion_set_idx = (ms.motion_set_idx + (1 if key in ('n','N') else -1)) % len(MOTION_SETS)
name, modes = MOTION_SETS[ms.motion_set_idx]