diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index 2f9715ce1..a0b7f4bb7 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -79,6 +79,8 @@ title: Hope Jr - local: reachy2 title: Reachy 2 + - local: unitree_g1 + title: Unitree G1 title: "Robots" - sections: - local: phone_teleop diff --git a/docs/source/unitree_g1.mdx b/docs/source/unitree_g1.mdx index 8414c2098..cb9e307ec 100644 --- a/docs/source/unitree_g1.mdx +++ b/docs/source/unitree_g1.mdx @@ -5,6 +5,7 @@ This guide covers the complete setup process for the Unitree G1 humanoid robot, ## 🤖 About the Unitree G1 The Unitree G1 humanoid comes in two flavors: 29-DOF and 23-DOF humanoid robot capable of whole-body control, manipulation, and locomotion. In this first PR we introduce: + - **Low-level motor control** via DDS (Data Distribution Service) - **ZMQ socket bridge** for remote communication over WiFi, allowing one to deploy policies remotely instead of over ethernet or directly on the Orin - **GR00T locomotion policiey** for bipedal walking and balance @@ -58,7 +59,9 @@ sudo systemctl restart NetworkManager ``` ### Step 2: Enable Internet Forwarding + **On your laptop:** + ```bash # Enable IP forwarding sudo sysctl -w net.ipv4.ip_forward=1 @@ -70,6 +73,7 @@ sudo iptables -A FORWARD -i enp131s0 -o wlp132s0f0 -j ACCEPT ``` **On the robot:** + ```bash # Add laptop as default gateway sudo ip route del default 2>/dev/null || true @@ -117,10 +121,8 @@ The robot server introduced here acts as a DDS-to-ZMQ bridge, allowing your one From your laptop, copy the robot server script: ```bash -# Copy the server script -scp src/lerobot/robots/unitree_g1/robot_server.py unitree@172.18.129.215:~/robot_server.py - -# Copy required dependencies if needed +# Copy the server script and its dependencies +scp src/lerobot/robots/unitree_g1/run_g1_server.py unitree@172.18.129.215:~/run_g1_server.py scp src/lerobot/robots/unitree_g1/g1_utils.py unitree@172.18.129.215:~/g1_utils.py ``` @@ -135,25 +137,23 @@ ssh unitree@172.18.129.215 sudo apt update sudo apt install -y build-essential python3-dev python3-pip -# Install Python packages -pip3 install pyzmq pickle5 - -# Install Unitree SDK (if not already installed) -git clone https://github.com/unitreerobotics/unitree_sdk2_python.git -cd unitree_sdk2_python -pip3 install -e . +# Install Python packages (pyzmq and Unitree SDK) +pip3 install pyzmq +pip3 install git+https://github.com/unitreerobotics/unitree_sdk2_python.git ``` -NOTE: unitreesdk requires dds v 10.2 to be installed + +> **Note**: The Unitree SDK requires CycloneDDS v0.10.2 to be installed. See the [Unitree SDK documentation](https://github.com/unitreerobotics/unitree_sdk2_python) for details. ### Step 3: Run the Robot Server On the robot: ```bash -python3 ~/robot_server.py +python3 ~/run_g1_server.py ``` You should see output like: + ``` Robot server listening on: Commands: tcp://*:6000 (PULL) @@ -169,7 +169,17 @@ DDS initialized, forwarding started... With the robot server running, you can now control the robot from your laptop. -### Step 1: Update Robot IP in Config +### Step 1: Install LeRobot with Unitree G1 Support (on your laptop) + +```bash +# Install lerobot with unitree_g1 extras +pip install 'lerobot[unitree_g1]' + +# Or if installing from source: +pip install -e '.[unitree_g1]' +``` + +### Step 2: Update Robot IP in Config Edit the config file to match your robot's WiFi IP: @@ -178,7 +188,7 @@ Edit the config file to match your robot's WiFi IP: robot_ip: str = "172.18.129.215" # Your robot's WiFi IP ``` -### Step 2: Run the Locomotion Policy +### Step 3: Run the Locomotion Policy ```bash # Run GR00T locomotion controller (downloads policies from HuggingFace) @@ -189,6 +199,7 @@ python examples/unitree_g1/gr00t_locomotion.py ``` The script will: + 1. Download Balance and Walk policies from the Hub (cached locally after first run) 2. Connect to the robot server over WiFi/ZMQ 3. Initialize the robot and locomotion controller @@ -197,6 +208,7 @@ The script will: 6. Accept commands from the wireless remote controller **Expected output:** + ``` INFO - Loading GR00T Balance policy... INFO - Loading GR00T Walk policy... @@ -209,7 +221,7 @@ INFO - Locomotion controller running in background thread INFO - Press Ctrl+C to stop ``` -### Step 3: Control with Remote +### Step 4: Control with Remote - **Left stick**: Forward/backward and left/right movement - **Right stick**: Rotation @@ -229,6 +241,4 @@ To stop, press `Ctrl+C` in the terminal. --- - -*Last updated: November 2025* - +_Last updated: November 2025_ diff --git a/examples/unitree_g1/locomotion/GR00T-WholeBodyControl-Balance.onnx b/examples/unitree_g1/locomotion/GR00T-WholeBodyControl-Balance.onnx deleted file mode 100644 index 45161cb20..000000000 Binary files a/examples/unitree_g1/locomotion/GR00T-WholeBodyControl-Balance.onnx and /dev/null differ diff --git a/examples/unitree_g1/locomotion/GR00T-WholeBodyControl-Walk.onnx b/examples/unitree_g1/locomotion/GR00T-WholeBodyControl-Walk.onnx deleted file mode 100644 index ef5107939..000000000 Binary files a/examples/unitree_g1/locomotion/GR00T-WholeBodyControl-Walk.onnx and /dev/null differ diff --git a/pyproject.toml b/pyproject.toml index 0b53457a1..642cd2807 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,6 +107,10 @@ dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"] gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"] hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"] lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"] +unitree_g1 = [ + "pyzmq>=26.2.1,<28.0.0", + "unitree_sdk2py @ git+https://github.com/unitreerobotics/unitree_sdk2_python.git", +] reachy2 = ["reachy2_sdk>=1.0.14,<1.1.0"] kinematics = ["lerobot[placo-dep]"] intelrealsense = [ diff --git a/src/lerobot/robots/unitree_g1/g1_utils.py b/src/lerobot/robots/unitree_g1/g1_utils.py index 39f488225..d42dbbb1c 100644 --- a/src/lerobot/robots/unitree_g1/g1_utils.py +++ b/src/lerobot/robots/unitree_g1/g1_utils.py @@ -2,6 +2,46 @@ from enum import IntEnum # ruff: noqa: N801, N815 +# ============================================================================= +# Serialization Keys for ZMQ Communication Protocol +# ============================================================================= +# These constants define the JSON keys used for robot state/command serialization. +# They are shared between robot_server.py (robot-side) and unitree_sdk2_socket.py (client-side). + +# Top-level message keys +TOPIC = "topic" +DATA = "data" + +# Motor state keys +MOTOR_STATE = "motor_state" +MOTOR_Q = "q" # position +MOTOR_DQ = "dq" # velocity +MOTOR_TAU_EST = "tau_est" # estimated torque +MOTOR_TEMPERATURE = "temperature" + +# Motor command keys +MOTOR_CMD = "motor_cmd" +MOTOR_MODE = "mode" +MOTOR_KP = "kp" +MOTOR_KD = "kd" +MOTOR_TAU = "tau" + +# IMU state keys +IMU_STATE = "imu_state" +IMU_QUATERNION = "quaternion" +IMU_GYROSCOPE = "gyroscope" +IMU_ACCELEROMETER = "accelerometer" +IMU_RPY = "rpy" +IMU_TEMPERATURE = "temperature" + +# Other state keys +WIRELESS_REMOTE = "wireless_remote" +MODE_MACHINE = "mode_machine" +MODE_PR = "mode_pr" + +# Number of motors +NUM_MOTORS = 35 + class G1_29_JointArmIndex(IntEnum): # Left arm diff --git a/src/lerobot/robots/unitree_g1/robot_server.py b/src/lerobot/robots/unitree_g1/robot_server.py deleted file mode 100644 index c58853748..000000000 --- a/src/lerobot/robots/unitree_g1/robot_server.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python3 -import contextlib -import pickle -import threading -import time - -import zmq -from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient -from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelPublisher, ChannelSubscriber -from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ as hg_LowCmd, LowState_ as hg_LowState -from unitree_sdk2py.utils.crc import CRC - -# DDS topic names follow Unitree SDK naming conventions -# ruff: noqa: N816 -kTopicLowCommand_Debug = "rt/lowcmd" # action to robot -kTopicLowState = "rt/lowstate" # observation from robot - -LOWCMD_PORT = 6000 -LOWSTATE_PORT = 6001 - - -def state_forward_loop( - lowstate_sub, lowstate_sock, state_period: float -): # read observation from DDS and send to server - last_state_time = 0.0 - - while True: - # read from DDS - msg = lowstate_sub.Read() - if msg is None: - continue - - now = time.time() - # optional downsampling (if robot dds rate > state_period) - if now - last_state_time >= state_period: - payload = pickle.dumps((kTopicLowState, msg), protocol=pickle.HIGHEST_PROTOCOL) - # if no subscribers / tx buffer full, just drop - with contextlib.suppress(zmq.Again): - lowstate_sock.send(payload, zmq.NOBLOCK) - last_state_time = now - - -def cmd_forward_loop(lowcmd_sock, lowcmd_pub_debug, crc: CRC): # send action to robot - while True: - payload = lowcmd_sock.recv() - topic, cmd = pickle.loads(payload) - - # recompute crc just in case - cmd.crc = crc.Crc(cmd) - - if topic == kTopicLowCommand_Debug: - lowcmd_pub_debug.Write(cmd) - else: - pass - - -def main(): - # initialize DDS - ChannelFactoryInitialize(0) - - # stop all active publishers on the robot - msc = MotionSwitcherClient() - msc.SetTimeout(5.0) - msc.Init() - - status, result = msc.CheckMode() - while result is not None and "name" in result and result["name"]: - msc.ReleaseMode() - status, result = msc.CheckMode() - time.sleep(1.0) - - crc = CRC() - - # initialize DDS publisher - lowcmd_pub_debug = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd) - lowcmd_pub_debug.Init() - - # initialize DDS subscriber - lowstate_sub = ChannelSubscriber(kTopicLowState, hg_LowState) - lowstate_sub.Init() - - # initialize ZMQ - ctx = zmq.Context.instance() - - # send action to robot - lowcmd_sock = ctx.socket(zmq.PULL) - lowcmd_sock.bind(f"tcp://0.0.0.0:{LOWCMD_PORT}") - - # send observation to server - lowstate_sock = ctx.socket(zmq.PUB) - lowstate_sock.bind(f"tcp://0.0.0.0:{LOWSTATE_PORT}") - - state_period = 0.002 # ~500 hz - - # start observation forwarding thread - t_state = threading.Thread( - target=state_forward_loop, - args=(lowstate_sub, lowstate_sock, state_period), - daemon=True, - ) - t_state.start() - - # start action forwarding thread - t_cmd = threading.Thread( - target=cmd_forward_loop, - args=(lowcmd_sock, lowcmd_pub_debug, crc), - daemon=True, - ) - t_cmd.start() - - print("bridge running (lowstate -> zmq, lowcmd -> dds)") - # keep main thread alive so daemon threads don’t exit - try: - while True: - time.sleep(1.0) - except KeyboardInterrupt: - print("shutting down bridge...") - - -if __name__ == "__main__": - main() diff --git a/src/lerobot/robots/unitree_g1/run_g1_server.py b/src/lerobot/robots/unitree_g1/run_g1_server.py new file mode 100644 index 000000000..0498fa899 --- /dev/null +++ b/src/lerobot/robots/unitree_g1/run_g1_server.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +DDS-to-ZMQ bridge server for Unitree G1 robot. + +This server runs on the robot and forwards: +- Robot state (LowState) from DDS to ZMQ (for remote clients) +- Robot commands (LowCmd) from ZMQ to DDS (from remote clients) + +Uses JSON for secure serialization instead of pickle. +""" + +import base64 +import contextlib +import json +import threading +import time +from typing import Any + +import zmq +from g1_utils import ( + DATA, + IMU_ACCELEROMETER, + IMU_GYROSCOPE, + IMU_QUATERNION, + IMU_RPY, + IMU_STATE, + IMU_TEMPERATURE, + MODE_MACHINE, + MODE_PR, + MOTOR_CMD, + MOTOR_DQ, + MOTOR_KD, + MOTOR_KP, + MOTOR_MODE, + MOTOR_Q, + MOTOR_STATE, + MOTOR_TAU, + MOTOR_TAU_EST, + MOTOR_TEMPERATURE, + NUM_MOTORS, + TOPIC, + WIRELESS_REMOTE, +) +from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient +from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelPublisher, ChannelSubscriber +from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ +from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ as hg_LowCmd, LowState_ as hg_LowState +from unitree_sdk2py.utils.crc import CRC + +# DDS topic names follow Unitree SDK naming conventions +# ruff: noqa: N816 +kTopicLowCommand_Debug = "rt/lowcmd" # action to robot +kTopicLowState = "rt/lowstate" # observation from robot + +LOWCMD_PORT = 6000 +LOWSTATE_PORT = 6001 + + +def lowstate_to_dict(msg: hg_LowState) -> dict[str, Any]: + """Convert LowState SDK message to a JSON-serializable dictionary.""" + motor_states = [] + for i in range(NUM_MOTORS): + motor_states.append( + { + MOTOR_Q: msg.motor_state[i].q, + MOTOR_DQ: msg.motor_state[i].dq, + MOTOR_TAU_EST: msg.motor_state[i].tau_est, + MOTOR_TEMPERATURE: msg.motor_state[i].temperature, + } + ) + + return { + MOTOR_STATE: motor_states, + IMU_STATE: { + IMU_QUATERNION: list(msg.imu_state.quaternion), + IMU_GYROSCOPE: list(msg.imu_state.gyroscope), + IMU_ACCELEROMETER: list(msg.imu_state.accelerometer), + IMU_RPY: list(msg.imu_state.rpy), + IMU_TEMPERATURE: msg.imu_state.temperature, + }, + # Encode bytes as base64 for JSON compatibility + WIRELESS_REMOTE: base64.b64encode(bytes(msg.wireless_remote)).decode("ascii"), + MODE_MACHINE: msg.mode_machine, + } + + +def dict_to_lowcmd(data: dict[str, Any]) -> hg_LowCmd: + """Convert dictionary back to LowCmd SDK message.""" + cmd = unitree_hg_msg_dds__LowCmd_() + cmd.mode_pr = data.get(MODE_PR, 0) + cmd.mode_machine = data.get(MODE_MACHINE, 0) + + for i, motor_data in enumerate(data.get(MOTOR_CMD, [])): + cmd.motor_cmd[i].mode = motor_data.get(MOTOR_MODE, 0) + cmd.motor_cmd[i].q = motor_data.get(MOTOR_Q, 0.0) + cmd.motor_cmd[i].dq = motor_data.get(MOTOR_DQ, 0.0) + cmd.motor_cmd[i].kp = motor_data.get(MOTOR_KP, 0.0) + cmd.motor_cmd[i].kd = motor_data.get(MOTOR_KD, 0.0) + cmd.motor_cmd[i].tau = motor_data.get(MOTOR_TAU, 0.0) + + return cmd + + +def state_forward_loop( + lowstate_sub: ChannelSubscriber, + lowstate_sock: zmq.Socket, + state_period: float, +) -> None: + """Read observation from DDS and forward to ZMQ clients.""" + last_state_time = 0.0 + + while True: + # read from DDS + msg = lowstate_sub.Read() + if msg is None: + continue + + now = time.time() + # optional downsampling (if robot dds rate > state_period) + if now - last_state_time >= state_period: + # Convert to dict and serialize with JSON + state_dict = lowstate_to_dict(msg) + payload = json.dumps({TOPIC: kTopicLowState, DATA: state_dict}).encode("utf-8") + # if no subscribers / tx buffer full, just drop + with contextlib.suppress(zmq.Again): + lowstate_sock.send(payload, zmq.NOBLOCK) + last_state_time = now + + +def cmd_forward_loop( + lowcmd_sock: zmq.Socket, + lowcmd_pub_debug: ChannelPublisher, + crc: CRC, +) -> None: + """Receive commands from ZMQ and forward to DDS.""" + while True: + payload = lowcmd_sock.recv() + msg_dict = json.loads(payload.decode("utf-8")) + + topic = msg_dict.get(TOPIC, "") + cmd_data = msg_dict.get(DATA, {}) + + # Reconstruct LowCmd object from dict + cmd = dict_to_lowcmd(cmd_data) + + # recompute crc + cmd.crc = crc.Crc(cmd) + + if topic == kTopicLowCommand_Debug: + lowcmd_pub_debug.Write(cmd) + + +def main() -> None: + """Main entry point for the robot server bridge.""" + # initialize DDS + ChannelFactoryInitialize(0) + + # stop all active publishers on the robot + msc = MotionSwitcherClient() + msc.SetTimeout(5.0) + msc.Init() + + status, result = msc.CheckMode() + while result is not None and "name" in result and result["name"]: + msc.ReleaseMode() + status, result = msc.CheckMode() + time.sleep(1.0) + + crc = CRC() + + # initialize DDS publisher + lowcmd_pub_debug = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd) + lowcmd_pub_debug.Init() + + # initialize DDS subscriber + lowstate_sub = ChannelSubscriber(kTopicLowState, hg_LowState) + lowstate_sub.Init() + + # initialize ZMQ + ctx = zmq.Context.instance() + + # receive commands from remote client + lowcmd_sock = ctx.socket(zmq.PULL) + lowcmd_sock.bind(f"tcp://0.0.0.0:{LOWCMD_PORT}") + + # publish state to remote clients + lowstate_sock = ctx.socket(zmq.PUB) + lowstate_sock.bind(f"tcp://0.0.0.0:{LOWSTATE_PORT}") + + state_period = 0.002 # ~500 hz + + # start observation forwarding thread + t_state = threading.Thread( + target=state_forward_loop, + args=(lowstate_sub, lowstate_sock, state_period), + daemon=True, + ) + t_state.start() + + # start action forwarding thread + t_cmd = threading.Thread( + target=cmd_forward_loop, + args=(lowcmd_sock, lowcmd_pub_debug, crc), + daemon=True, + ) + t_cmd.start() + + print("bridge running (lowstate -> zmq, lowcmd -> dds)") + # keep main thread alive so daemon threads don't exit + try: + while True: + time.sleep(1.0) + except KeyboardInterrupt: + print("shutting down bridge...") + + +if __name__ == "__main__": + main() diff --git a/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py b/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py index 183765c2d..876096c2e 100644 --- a/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py +++ b/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py @@ -1,57 +1,185 @@ -import pickle +""" +ZMQ socket wrapper that mimics the Unitree SDK Channel interface. + +This module provides a drop-in replacement for the Unitree SDK's DDS-based +ChannelPublisher and ChannelSubscriber, using ZMQ sockets instead. This allows +remote communication with the robot over WiFi via the robot_server bridge. + +Uses JSON for secure serialization instead of pickle. +""" + +import base64 +import json +from typing import Any import zmq from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config +from lerobot.robots.unitree_g1.g1_utils import ( + DATA, + IMU_ACCELEROMETER, + IMU_GYROSCOPE, + IMU_QUATERNION, + IMU_RPY, + IMU_STATE, + IMU_TEMPERATURE, + MODE_MACHINE, + MODE_PR, + MOTOR_CMD, + MOTOR_DQ, + MOTOR_KD, + MOTOR_KP, + MOTOR_MODE, + MOTOR_Q, + MOTOR_STATE, + MOTOR_TAU, + MOTOR_TAU_EST, + MOTOR_TEMPERATURE, + TOPIC, + WIRELESS_REMOTE, +) -_ctx = None -_lowcmd_sock = None -_lowstate_sock = None +_ctx: zmq.Context | None = None +_lowcmd_sock: zmq.Socket | None = None +_lowstate_sock: zmq.Socket | None = None LOWCMD_PORT = 6000 LOWSTATE_PORT = 6001 +# DDS topic names follow Unitree SDK naming conventions +# ruff: noqa: N816 +kTopicLowCommand_Debug = "rt/lowcmd" -def ChannelFactoryInitialize(*args, **kwargs): # noqa: N802 + +class LowStateMsg: + """ + Wrapper class that mimics the Unitree SDK LowState_ message structure. + + Reconstructs the message from deserialized JSON data to maintain + compatibility with existing code that expects SDK message objects. + """ + + class MotorState: + """Motor state data for a single joint.""" + + def __init__(self, data: dict[str, Any]) -> None: + self.q: float = data.get(MOTOR_Q, 0.0) + self.dq: float = data.get(MOTOR_DQ, 0.0) + self.tau_est: float = data.get(MOTOR_TAU_EST, 0.0) + self.temperature: float = data.get(MOTOR_TEMPERATURE, 0.0) + + class IMUState: + """IMU sensor data.""" + + def __init__(self, data: dict[str, Any]) -> None: + self.quaternion: list[float] = data.get(IMU_QUATERNION, [1.0, 0.0, 0.0, 0.0]) + self.gyroscope: list[float] = data.get(IMU_GYROSCOPE, [0.0, 0.0, 0.0]) + self.accelerometer: list[float] = data.get(IMU_ACCELEROMETER, [0.0, 0.0, 0.0]) + self.rpy: list[float] = data.get(IMU_RPY, [0.0, 0.0, 0.0]) + self.temperature: float = data.get(IMU_TEMPERATURE, 0.0) + + def __init__(self, data: dict[str, Any]) -> None: + """Initialize from deserialized JSON data.""" + self.motor_state = [self.MotorState(m) for m in data.get(MOTOR_STATE, [])] + self.imu_state = self.IMUState(data.get(IMU_STATE, {})) + # Decode base64-encoded wireless_remote bytes + wireless_b64 = data.get(WIRELESS_REMOTE, "") + self.wireless_remote: bytes = base64.b64decode(wireless_b64) if wireless_b64 else b"" + self.mode_machine: int = data.get(MODE_MACHINE, 0) + + +def lowcmd_to_dict(topic: str, msg: Any) -> dict[str, Any]: + """Convert LowCmd message to a JSON-serializable dictionary.""" + motor_cmds = [] + # Iterate over all motor commands in the message + for i in range(len(msg.motor_cmd)): + motor_cmds.append( + { + MOTOR_MODE: msg.motor_cmd[i].mode, + MOTOR_Q: msg.motor_cmd[i].q, + MOTOR_DQ: msg.motor_cmd[i].dq, + MOTOR_KP: msg.motor_cmd[i].kp, + MOTOR_KD: msg.motor_cmd[i].kd, + MOTOR_TAU: msg.motor_cmd[i].tau, + } + ) + + return { + TOPIC: topic, + DATA: { + MODE_PR: msg.mode_pr, + MODE_MACHINE: msg.mode_machine, + MOTOR_CMD: motor_cmds, + }, + } + + +def ChannelFactoryInitialize(*args: Any, **kwargs: Any) -> None: # noqa: N802 + """ + Initialize ZMQ sockets for robot communication. + + This function mimics the Unitree SDK's ChannelFactoryInitialize but uses + ZMQ sockets to connect to the robot server bridge instead of DDS. + """ global _ctx, _lowcmd_sock, _lowstate_sock + # read socket config config = UnitreeG1Config() robot_ip = config.robot_ip - _ctx = zmq.Context.instance() + ctx = zmq.Context.instance() + _ctx = ctx - # lowcmd: robot action - _lowcmd_sock = _ctx.socket(zmq.PUSH) - _lowcmd_sock.setsockopt(zmq.CONFLATE, 1) # keep only last message - _lowcmd_sock.connect(f"tcp://{robot_ip}:{LOWCMD_PORT}") + # lowcmd: send robot commands + lowcmd_sock = ctx.socket(zmq.PUSH) + lowcmd_sock.setsockopt(zmq.CONFLATE, 1) # keep only last message + lowcmd_sock.connect(f"tcp://{robot_ip}:{LOWCMD_PORT}") + _lowcmd_sock = lowcmd_sock - # lowstate: robot observation - _lowstate_sock = _ctx.socket(zmq.SUB) - _lowstate_sock.setsockopt(zmq.CONFLATE, 1) # keep only last message - _lowstate_sock.connect(f"tcp://{robot_ip}:{LOWSTATE_PORT}") - _lowstate_sock.setsockopt_string(zmq.SUBSCRIBE, "") + # lowstate: receive robot observations + lowstate_sock = ctx.socket(zmq.SUB) + lowstate_sock.setsockopt(zmq.CONFLATE, 1) # keep only last message + lowstate_sock.connect(f"tcp://{robot_ip}:{LOWSTATE_PORT}") + lowstate_sock.setsockopt_string(zmq.SUBSCRIBE, "") + _lowstate_sock = lowstate_sock -class ChannelPublisher: # send action to robot - def __init__(self, topic, msg_type): +class ChannelPublisher: + """ZMQ-based publisher that sends commands to the robot server.""" + + def __init__(self, topic: str, msg_type: type) -> None: self.topic = topic self.msg_type = msg_type - def Init(self): # noqa: N802 + def Init(self) -> None: # noqa: N802 + """Initialize the publisher (no-op for ZMQ).""" pass - def Write(self, msg): # noqa: N802 - _lowcmd_sock.send(pickle.dumps((self.topic, msg))) + def Write(self, msg: Any) -> None: # noqa: N802 + """Serialize and send a command message to the robot.""" + if _lowcmd_sock is None: + raise RuntimeError("ChannelFactoryInitialize must be called first") + + payload = json.dumps(lowcmd_to_dict(self.topic, msg)).encode("utf-8") + _lowcmd_sock.send(payload) -class ChannelSubscriber: # read observation from robot - def __init__(self, topic, msg_type): +class ChannelSubscriber: + """ZMQ-based subscriber that receives state from the robot server.""" + + def __init__(self, topic: str, msg_type: type) -> None: self.topic = topic self.msg_type = msg_type - def Init(self): # noqa: N802 + def Init(self) -> None: # noqa: N802 + """Initialize the subscriber (no-op for ZMQ).""" pass - def Read(self): # noqa: N802 - topic, msg = pickle.loads(_lowstate_sock.recv()) - return msg + def Read(self) -> LowStateMsg: # noqa: N802 + """Receive and deserialize a state message from the robot.""" + if _lowstate_sock is None: + raise RuntimeError("ChannelFactoryInitialize must be called first") + + payload = _lowstate_sock.recv() + msg_dict = json.loads(payload.decode("utf-8")) + return LowStateMsg(msg_dict.get(DATA, {}))