diff --git a/examples/unitree_g1/sonic.py b/examples/unitree_g1/sonic.py index e6216cf07..2dfad1fd7 100644 --- a/examples/unitree_g1/sonic.py +++ b/examples/unitree_g1/sonic.py @@ -10,6 +10,7 @@ Keyboard controls: 9 / 0 - decrease / increase speed - / = - decrease / increase height R - force replan + M - toggle SMPL motion playback <-> locomotion (needs --motion-file) Space - emergency stop -> IDLE Esc - quit @@ -24,7 +25,9 @@ For teleop integration use --robot.controller=SonicWholeBodyController instead. """ import argparse +import faulthandler import gc +import sys import time import numpy as np @@ -43,6 +46,8 @@ from lerobot.robots.unitree_g1.controllers.sonic_pipeline import ( from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex from lerobot.robots.unitree_g1.unitree_g1 import UnitreeG1 +from motion_loader import SmplMotion + def main(): parser = argparse.ArgumentParser(description="SONIC planner with keyboard + gamepad control") @@ -59,8 +64,21 @@ def main(): help="Read Unitree wireless gamepad in sim (default: keyboard-only in sim)") parser.add_argument("--keyboard-only", action="store_true", help="Ignore wireless gamepad (terminal keyboard only)") + parser.add_argument("--motion-file", type=str, default=None, + help="Play an SMPL motion clip (.npz) via SONIC whole-body mode " + "(encode_mode=2) instead of locomotion planning.") + parser.add_argument("--no-loop", action="store_true", + help="With --motion-file, play once instead of looping") args = parser.parse_args() + # Surface native crashes (onnxruntime / mujoco) with a real traceback, and + # avoid losing buffered diagnostics if the process dies mid-loop. + faulthandler.enable() + try: + sys.stdout.reconfigure(line_buffering=True) + except Exception: + pass + print("=" * 60) print("SONIC planner - full mode control") print(" N/P cycle sets | 1-8 select mode | WASD move") @@ -89,6 +107,18 @@ def main(): runtime = SonicRuntime(force_cpu=args.cpu) controller = runtime.controller ms = runtime.ms + + motion = None + if args.motion_file: + motion = SmplMotion(args.motion_file, loop=not args.no_loop) + controller.smpl_motion = motion # lets 'M' key toggle playback + controller.encode_mode = 2 # start in SONIC whole-body SMPL imitation + dur = motion.num_frames / motion.fps + print(f"\n[Motion] SMPL whole-body playback: {args.motion_file}") + print(f" frames={motion.num_frames} fps={motion.fps:.1f} " + f"duration={dur:.1f}s loop={not args.no_loop} encode_mode=2") + print(" Press 'M' to toggle SMPL playback <-> locomotion at runtime.") + runtime.controller.print_input_diagnostics() print(f"\nStarting: {MOTION_SETS[0][0]} (default mode: {LM(ms.mode).name})") @@ -97,7 +127,7 @@ def main(): "1-6 change mode, 9/0 speed, Esc quit.\n", flush=True) # Sim hub publishes wireless_remote bytes that can fight terminal WASD. - use_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None) + base_joystick = not args.keyboard_only and (args.gamepad or args.ip is not None) with RawKeyboard() as kb: try: @@ -138,9 +168,17 @@ def main(): time.sleep(max(0.0, CONTROL_DT - (time.time() - t0))) continue + # SMPL playback only while in whole-body mode; 'M' toggles it. + motion_active = motion is not None and controller.encode_mode == 2 + if motion_active: + controller.smpl_joints_10frame_step1 = motion.step() + if motion.done: + print("\n[Motion] clip finished") + break + step_before = runtime.step t_step = time.time() - action = runtime.tick(obs, use_joystick=use_joystick) + action = runtime.tick(obs, use_joystick=base_joystick and not motion_active) step_ms = 1000 * (time.time() - t_step) do_enc = step_before % 5 == 0 (enc_t if do_enc else dec_t).append(step_ms) diff --git a/src/lerobot/robots/unitree_g1/controllers/sonic_pipeline.py b/src/lerobot/robots/unitree_g1/controllers/sonic_pipeline.py index e8c595c8b..3392cb168 100644 --- a/src/lerobot/robots/unitree_g1/controllers/sonic_pipeline.py +++ b/src/lerobot/robots/unitree_g1/controllers/sonic_pipeline.py @@ -735,6 +735,19 @@ class PlannerController(StandingEncoderDecoder): def build_encoder_obs(self): obs = np.zeros(1762, np.float32); obs[0] = float(self.encode_mode) with self.motion_lock: + if self.encode_mode == 2: + # SMPL whole-body imitation: the 720-dim SMPL window carries the + # target pose; the planner reference frame supplies anchor + wrist. + rf = min(self.ref_cursor, self.motion_timesteps - 1) + ref_pos = self.motion_joint_positions[rf].astype(np.float32) + ref_quat = self.motion_body_quats[rf].astype(np.float32) + anchor = self._anchor_6d(self.h_quat[0], ref_quat) + wrist = ref_pos[WRIST_IL] + obs[922:1642] = self.smpl_joints_10frame_step1 + for f in range(10): + obs[1642+6*f:1642+6*(f+1)] = anchor + obs[1702+6*f:1702+6*(f+1)] = wrist + return obs for f in range(10): tf = min(self.ref_cursor + f*5 if self.playing else self.ref_cursor, self.motion_timesteps - 1) @@ -809,6 +822,21 @@ def process_keyboard(key, ms, controller=None): print("\n >> EMERGENCY STOP -> IDLE"); return False if key in ('r','R'): ms.needs_replan = True; print("\n >> Manual replan"); return False + if key in ('m','M'): + if controller is not None and getattr(controller, "smpl_motion", None) is not None: + if controller.encode_mode == 2: + controller.encode_mode = 0 + controller.playing = True; controller.reinit_heading = True + ms.needs_replan = True + print("\n >> Motion OFF -> locomotion (mode 0). WASD to drive.") + else: + controller.encode_mode = 2 + controller.reinit_heading = True + controller.smpl_motion.reset() + print("\n >> Motion ON -> SMPL playback (mode 2)") + else: + print("\n >> No motion loaded (start with --motion-file)") + return False if key in ('n','N','p','P'): ms.motion_set_idx = (ms.motion_set_idx + (1 if key in ('n','N') else -1)) % len(MOTION_SETS) name, modes = MOTION_SETS[ms.motion_set_idx]