feat(robots): add Unitree G1 humanoid support with ZMQ bridge

- Use JSON + base64 serialization for secure communication instead of pickle
- Add documentation section
- Rename robot_server to run_g1_server
- Add dependecies to pyproject.toml
This commit is contained in:
Michel Aractingi
2025-11-27 17:32:24 +01:00
parent 53c678c29f
commit 414df3ef5b
9 changed files with 462 additions and 166 deletions
+2
View File
@@ -79,6 +79,8 @@
title: Hope Jr
- local: reachy2
title: Reachy 2
- local: unitree_g1
title: Unitree G1
title: "Robots"
- sections:
- local: phone_teleop
+29 -19
View File
@@ -5,6 +5,7 @@ This guide covers the complete setup process for the Unitree G1 humanoid robot,
## 🤖 About the Unitree G1
The Unitree G1 humanoid comes in two flavors: 29-DOF and 23-DOF humanoid robot capable of whole-body control, manipulation, and locomotion. In this first PR we introduce:
- **Low-level motor control** via DDS (Data Distribution Service)
- **ZMQ socket bridge** for remote communication over WiFi, allowing one to deploy policies remotely instead of over ethernet or directly on the Orin
- **GR00T locomotion policiey** for bipedal walking and balance
@@ -58,7 +59,9 @@ sudo systemctl restart NetworkManager
```
### Step 2: Enable Internet Forwarding
**On your laptop:**
```bash
# Enable IP forwarding
sudo sysctl -w net.ipv4.ip_forward=1
@@ -70,6 +73,7 @@ sudo iptables -A FORWARD -i enp131s0 -o wlp132s0f0 -j ACCEPT
```
**On the robot:**
```bash
# Add laptop as default gateway
sudo ip route del default 2>/dev/null || true
@@ -117,10 +121,8 @@ The robot server introduced here acts as a DDS-to-ZMQ bridge, allowing your one
From your laptop, copy the robot server script:
```bash
# Copy the server script
scp src/lerobot/robots/unitree_g1/robot_server.py unitree@172.18.129.215:~/robot_server.py
# Copy required dependencies if needed
# Copy the server script and its dependencies
scp src/lerobot/robots/unitree_g1/run_g1_server.py unitree@172.18.129.215:~/run_g1_server.py
scp src/lerobot/robots/unitree_g1/g1_utils.py unitree@172.18.129.215:~/g1_utils.py
```
@@ -135,25 +137,23 @@ ssh unitree@172.18.129.215
sudo apt update
sudo apt install -y build-essential python3-dev python3-pip
# Install Python packages
pip3 install pyzmq pickle5
# Install Unitree SDK (if not already installed)
git clone https://github.com/unitreerobotics/unitree_sdk2_python.git
cd unitree_sdk2_python
pip3 install -e .
# Install Python packages (pyzmq and Unitree SDK)
pip3 install pyzmq
pip3 install git+https://github.com/unitreerobotics/unitree_sdk2_python.git
```
NOTE: unitreesdk requires dds v 10.2 to be installed
> **Note**: The Unitree SDK requires CycloneDDS v0.10.2 to be installed. See the [Unitree SDK documentation](https://github.com/unitreerobotics/unitree_sdk2_python) for details.
### Step 3: Run the Robot Server
On the robot:
```bash
python3 ~/robot_server.py
python3 ~/run_g1_server.py
```
You should see output like:
```
Robot server listening on:
Commands: tcp://*:6000 (PULL)
@@ -169,7 +169,17 @@ DDS initialized, forwarding started...
With the robot server running, you can now control the robot from your laptop.
### Step 1: Update Robot IP in Config
### Step 1: Install LeRobot with Unitree G1 Support (on your laptop)
```bash
# Install lerobot with unitree_g1 extras
pip install 'lerobot[unitree_g1]'
# Or if installing from source:
pip install -e '.[unitree_g1]'
```
### Step 2: Update Robot IP in Config
Edit the config file to match your robot's WiFi IP:
@@ -178,7 +188,7 @@ Edit the config file to match your robot's WiFi IP:
robot_ip: str = "172.18.129.215" # Your robot's WiFi IP
```
### Step 2: Run the Locomotion Policy
### Step 3: Run the Locomotion Policy
```bash
# Run GR00T locomotion controller (downloads policies from HuggingFace)
@@ -189,6 +199,7 @@ python examples/unitree_g1/gr00t_locomotion.py
```
The script will:
1. Download Balance and Walk policies from the Hub (cached locally after first run)
2. Connect to the robot server over WiFi/ZMQ
3. Initialize the robot and locomotion controller
@@ -197,6 +208,7 @@ The script will:
6. Accept commands from the wireless remote controller
**Expected output:**
```
INFO - Loading GR00T Balance policy...
INFO - Loading GR00T Walk policy...
@@ -209,7 +221,7 @@ INFO - Locomotion controller running in background thread
INFO - Press Ctrl+C to stop
```
### Step 3: Control with Remote
### Step 4: Control with Remote
- **Left stick**: Forward/backward and left/right movement
- **Right stick**: Rotation
@@ -229,6 +241,4 @@ To stop, press `Ctrl+C` in the terminal.
---
*Last updated: November 2025*
_Last updated: November 2025_
+4
View File
@@ -107,6 +107,10 @@ dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"]
gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"]
hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
unitree_g1 = [
"pyzmq>=26.2.1,<28.0.0",
"unitree_sdk2py @ git+https://github.com/unitreerobotics/unitree_sdk2_python.git",
]
reachy2 = ["reachy2_sdk>=1.0.14,<1.1.0"]
kinematics = ["lerobot[placo-dep]"]
intelrealsense = [
+40
View File
@@ -2,6 +2,46 @@ from enum import IntEnum
# ruff: noqa: N801, N815
# =============================================================================
# Serialization Keys for ZMQ Communication Protocol
# =============================================================================
# These constants define the JSON keys used for robot state/command serialization.
# They are shared between robot_server.py (robot-side) and unitree_sdk2_socket.py (client-side).
# Top-level message keys
TOPIC = "topic"
DATA = "data"
# Motor state keys
MOTOR_STATE = "motor_state"
MOTOR_Q = "q" # position
MOTOR_DQ = "dq" # velocity
MOTOR_TAU_EST = "tau_est" # estimated torque
MOTOR_TEMPERATURE = "temperature"
# Motor command keys
MOTOR_CMD = "motor_cmd"
MOTOR_MODE = "mode"
MOTOR_KP = "kp"
MOTOR_KD = "kd"
MOTOR_TAU = "tau"
# IMU state keys
IMU_STATE = "imu_state"
IMU_QUATERNION = "quaternion"
IMU_GYROSCOPE = "gyroscope"
IMU_ACCELEROMETER = "accelerometer"
IMU_RPY = "rpy"
IMU_TEMPERATURE = "temperature"
# Other state keys
WIRELESS_REMOTE = "wireless_remote"
MODE_MACHINE = "mode_machine"
MODE_PR = "mode_pr"
# Number of motors
NUM_MOTORS = 35
class G1_29_JointArmIndex(IntEnum):
# Left arm
@@ -1,121 +0,0 @@
#!/usr/bin/env python3
import contextlib
import pickle
import threading
import time
import zmq
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelPublisher, ChannelSubscriber
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ as hg_LowCmd, LowState_ as hg_LowState
from unitree_sdk2py.utils.crc import CRC
# DDS topic names follow Unitree SDK naming conventions
# ruff: noqa: N816
kTopicLowCommand_Debug = "rt/lowcmd" # action to robot
kTopicLowState = "rt/lowstate" # observation from robot
LOWCMD_PORT = 6000
LOWSTATE_PORT = 6001
def state_forward_loop(
lowstate_sub, lowstate_sock, state_period: float
): # read observation from DDS and send to server
last_state_time = 0.0
while True:
# read from DDS
msg = lowstate_sub.Read()
if msg is None:
continue
now = time.time()
# optional downsampling (if robot dds rate > state_period)
if now - last_state_time >= state_period:
payload = pickle.dumps((kTopicLowState, msg), protocol=pickle.HIGHEST_PROTOCOL)
# if no subscribers / tx buffer full, just drop
with contextlib.suppress(zmq.Again):
lowstate_sock.send(payload, zmq.NOBLOCK)
last_state_time = now
def cmd_forward_loop(lowcmd_sock, lowcmd_pub_debug, crc: CRC): # send action to robot
while True:
payload = lowcmd_sock.recv()
topic, cmd = pickle.loads(payload)
# recompute crc just in case
cmd.crc = crc.Crc(cmd)
if topic == kTopicLowCommand_Debug:
lowcmd_pub_debug.Write(cmd)
else:
pass
def main():
# initialize DDS
ChannelFactoryInitialize(0)
# stop all active publishers on the robot
msc = MotionSwitcherClient()
msc.SetTimeout(5.0)
msc.Init()
status, result = msc.CheckMode()
while result is not None and "name" in result and result["name"]:
msc.ReleaseMode()
status, result = msc.CheckMode()
time.sleep(1.0)
crc = CRC()
# initialize DDS publisher
lowcmd_pub_debug = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd)
lowcmd_pub_debug.Init()
# initialize DDS subscriber
lowstate_sub = ChannelSubscriber(kTopicLowState, hg_LowState)
lowstate_sub.Init()
# initialize ZMQ
ctx = zmq.Context.instance()
# send action to robot
lowcmd_sock = ctx.socket(zmq.PULL)
lowcmd_sock.bind(f"tcp://0.0.0.0:{LOWCMD_PORT}")
# send observation to server
lowstate_sock = ctx.socket(zmq.PUB)
lowstate_sock.bind(f"tcp://0.0.0.0:{LOWSTATE_PORT}")
state_period = 0.002 # ~500 hz
# start observation forwarding thread
t_state = threading.Thread(
target=state_forward_loop,
args=(lowstate_sub, lowstate_sock, state_period),
daemon=True,
)
t_state.start()
# start action forwarding thread
t_cmd = threading.Thread(
target=cmd_forward_loop,
args=(lowcmd_sock, lowcmd_pub_debug, crc),
daemon=True,
)
t_cmd.start()
print("bridge running (lowstate -> zmq, lowcmd -> dds)")
# keep main thread alive so daemon threads dont exit
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
print("shutting down bridge...")
if __name__ == "__main__":
main()
@@ -0,0 +1,233 @@
#!/usr/bin/env python3
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DDS-to-ZMQ bridge server for Unitree G1 robot.
This server runs on the robot and forwards:
- Robot state (LowState) from DDS to ZMQ (for remote clients)
- Robot commands (LowCmd) from ZMQ to DDS (from remote clients)
Uses JSON for secure serialization instead of pickle.
"""
import base64
import contextlib
import json
import threading
import time
from typing import Any
import zmq
from g1_utils import (
DATA,
IMU_ACCELEROMETER,
IMU_GYROSCOPE,
IMU_QUATERNION,
IMU_RPY,
IMU_STATE,
IMU_TEMPERATURE,
MODE_MACHINE,
MODE_PR,
MOTOR_CMD,
MOTOR_DQ,
MOTOR_KD,
MOTOR_KP,
MOTOR_MODE,
MOTOR_Q,
MOTOR_STATE,
MOTOR_TAU,
MOTOR_TAU_EST,
MOTOR_TEMPERATURE,
NUM_MOTORS,
TOPIC,
WIRELESS_REMOTE,
)
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelPublisher, ChannelSubscriber
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ as hg_LowCmd, LowState_ as hg_LowState
from unitree_sdk2py.utils.crc import CRC
# DDS topic names follow Unitree SDK naming conventions
# ruff: noqa: N816
kTopicLowCommand_Debug = "rt/lowcmd" # action to robot
kTopicLowState = "rt/lowstate" # observation from robot
LOWCMD_PORT = 6000
LOWSTATE_PORT = 6001
def lowstate_to_dict(msg: hg_LowState) -> dict[str, Any]:
"""Convert LowState SDK message to a JSON-serializable dictionary."""
motor_states = []
for i in range(NUM_MOTORS):
motor_states.append(
{
MOTOR_Q: msg.motor_state[i].q,
MOTOR_DQ: msg.motor_state[i].dq,
MOTOR_TAU_EST: msg.motor_state[i].tau_est,
MOTOR_TEMPERATURE: msg.motor_state[i].temperature,
}
)
return {
MOTOR_STATE: motor_states,
IMU_STATE: {
IMU_QUATERNION: list(msg.imu_state.quaternion),
IMU_GYROSCOPE: list(msg.imu_state.gyroscope),
IMU_ACCELEROMETER: list(msg.imu_state.accelerometer),
IMU_RPY: list(msg.imu_state.rpy),
IMU_TEMPERATURE: msg.imu_state.temperature,
},
# Encode bytes as base64 for JSON compatibility
WIRELESS_REMOTE: base64.b64encode(bytes(msg.wireless_remote)).decode("ascii"),
MODE_MACHINE: msg.mode_machine,
}
def dict_to_lowcmd(data: dict[str, Any]) -> hg_LowCmd:
"""Convert dictionary back to LowCmd SDK message."""
cmd = unitree_hg_msg_dds__LowCmd_()
cmd.mode_pr = data.get(MODE_PR, 0)
cmd.mode_machine = data.get(MODE_MACHINE, 0)
for i, motor_data in enumerate(data.get(MOTOR_CMD, [])):
cmd.motor_cmd[i].mode = motor_data.get(MOTOR_MODE, 0)
cmd.motor_cmd[i].q = motor_data.get(MOTOR_Q, 0.0)
cmd.motor_cmd[i].dq = motor_data.get(MOTOR_DQ, 0.0)
cmd.motor_cmd[i].kp = motor_data.get(MOTOR_KP, 0.0)
cmd.motor_cmd[i].kd = motor_data.get(MOTOR_KD, 0.0)
cmd.motor_cmd[i].tau = motor_data.get(MOTOR_TAU, 0.0)
return cmd
def state_forward_loop(
lowstate_sub: ChannelSubscriber,
lowstate_sock: zmq.Socket,
state_period: float,
) -> None:
"""Read observation from DDS and forward to ZMQ clients."""
last_state_time = 0.0
while True:
# read from DDS
msg = lowstate_sub.Read()
if msg is None:
continue
now = time.time()
# optional downsampling (if robot dds rate > state_period)
if now - last_state_time >= state_period:
# Convert to dict and serialize with JSON
state_dict = lowstate_to_dict(msg)
payload = json.dumps({TOPIC: kTopicLowState, DATA: state_dict}).encode("utf-8")
# if no subscribers / tx buffer full, just drop
with contextlib.suppress(zmq.Again):
lowstate_sock.send(payload, zmq.NOBLOCK)
last_state_time = now
def cmd_forward_loop(
lowcmd_sock: zmq.Socket,
lowcmd_pub_debug: ChannelPublisher,
crc: CRC,
) -> None:
"""Receive commands from ZMQ and forward to DDS."""
while True:
payload = lowcmd_sock.recv()
msg_dict = json.loads(payload.decode("utf-8"))
topic = msg_dict.get(TOPIC, "")
cmd_data = msg_dict.get(DATA, {})
# Reconstruct LowCmd object from dict
cmd = dict_to_lowcmd(cmd_data)
# recompute crc
cmd.crc = crc.Crc(cmd)
if topic == kTopicLowCommand_Debug:
lowcmd_pub_debug.Write(cmd)
def main() -> None:
"""Main entry point for the robot server bridge."""
# initialize DDS
ChannelFactoryInitialize(0)
# stop all active publishers on the robot
msc = MotionSwitcherClient()
msc.SetTimeout(5.0)
msc.Init()
status, result = msc.CheckMode()
while result is not None and "name" in result and result["name"]:
msc.ReleaseMode()
status, result = msc.CheckMode()
time.sleep(1.0)
crc = CRC()
# initialize DDS publisher
lowcmd_pub_debug = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd)
lowcmd_pub_debug.Init()
# initialize DDS subscriber
lowstate_sub = ChannelSubscriber(kTopicLowState, hg_LowState)
lowstate_sub.Init()
# initialize ZMQ
ctx = zmq.Context.instance()
# receive commands from remote client
lowcmd_sock = ctx.socket(zmq.PULL)
lowcmd_sock.bind(f"tcp://0.0.0.0:{LOWCMD_PORT}")
# publish state to remote clients
lowstate_sock = ctx.socket(zmq.PUB)
lowstate_sock.bind(f"tcp://0.0.0.0:{LOWSTATE_PORT}")
state_period = 0.002 # ~500 hz
# start observation forwarding thread
t_state = threading.Thread(
target=state_forward_loop,
args=(lowstate_sub, lowstate_sock, state_period),
daemon=True,
)
t_state.start()
# start action forwarding thread
t_cmd = threading.Thread(
target=cmd_forward_loop,
args=(lowcmd_sock, lowcmd_pub_debug, crc),
daemon=True,
)
t_cmd.start()
print("bridge running (lowstate -> zmq, lowcmd -> dds)")
# keep main thread alive so daemon threads don't exit
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
print("shutting down bridge...")
if __name__ == "__main__":
main()
@@ -1,57 +1,185 @@
import pickle
"""
ZMQ socket wrapper that mimics the Unitree SDK Channel interface.
This module provides a drop-in replacement for the Unitree SDK's DDS-based
ChannelPublisher and ChannelSubscriber, using ZMQ sockets instead. This allows
remote communication with the robot over WiFi via the robot_server bridge.
Uses JSON for secure serialization instead of pickle.
"""
import base64
import json
from typing import Any
import zmq
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
from lerobot.robots.unitree_g1.g1_utils import (
DATA,
IMU_ACCELEROMETER,
IMU_GYROSCOPE,
IMU_QUATERNION,
IMU_RPY,
IMU_STATE,
IMU_TEMPERATURE,
MODE_MACHINE,
MODE_PR,
MOTOR_CMD,
MOTOR_DQ,
MOTOR_KD,
MOTOR_KP,
MOTOR_MODE,
MOTOR_Q,
MOTOR_STATE,
MOTOR_TAU,
MOTOR_TAU_EST,
MOTOR_TEMPERATURE,
TOPIC,
WIRELESS_REMOTE,
)
_ctx = None
_lowcmd_sock = None
_lowstate_sock = None
_ctx: zmq.Context | None = None
_lowcmd_sock: zmq.Socket | None = None
_lowstate_sock: zmq.Socket | None = None
LOWCMD_PORT = 6000
LOWSTATE_PORT = 6001
# DDS topic names follow Unitree SDK naming conventions
# ruff: noqa: N816
kTopicLowCommand_Debug = "rt/lowcmd"
def ChannelFactoryInitialize(*args, **kwargs): # noqa: N802
class LowStateMsg:
"""
Wrapper class that mimics the Unitree SDK LowState_ message structure.
Reconstructs the message from deserialized JSON data to maintain
compatibility with existing code that expects SDK message objects.
"""
class MotorState:
"""Motor state data for a single joint."""
def __init__(self, data: dict[str, Any]) -> None:
self.q: float = data.get(MOTOR_Q, 0.0)
self.dq: float = data.get(MOTOR_DQ, 0.0)
self.tau_est: float = data.get(MOTOR_TAU_EST, 0.0)
self.temperature: float = data.get(MOTOR_TEMPERATURE, 0.0)
class IMUState:
"""IMU sensor data."""
def __init__(self, data: dict[str, Any]) -> None:
self.quaternion: list[float] = data.get(IMU_QUATERNION, [1.0, 0.0, 0.0, 0.0])
self.gyroscope: list[float] = data.get(IMU_GYROSCOPE, [0.0, 0.0, 0.0])
self.accelerometer: list[float] = data.get(IMU_ACCELEROMETER, [0.0, 0.0, 0.0])
self.rpy: list[float] = data.get(IMU_RPY, [0.0, 0.0, 0.0])
self.temperature: float = data.get(IMU_TEMPERATURE, 0.0)
def __init__(self, data: dict[str, Any]) -> None:
"""Initialize from deserialized JSON data."""
self.motor_state = [self.MotorState(m) for m in data.get(MOTOR_STATE, [])]
self.imu_state = self.IMUState(data.get(IMU_STATE, {}))
# Decode base64-encoded wireless_remote bytes
wireless_b64 = data.get(WIRELESS_REMOTE, "")
self.wireless_remote: bytes = base64.b64decode(wireless_b64) if wireless_b64 else b""
self.mode_machine: int = data.get(MODE_MACHINE, 0)
def lowcmd_to_dict(topic: str, msg: Any) -> dict[str, Any]:
"""Convert LowCmd message to a JSON-serializable dictionary."""
motor_cmds = []
# Iterate over all motor commands in the message
for i in range(len(msg.motor_cmd)):
motor_cmds.append(
{
MOTOR_MODE: msg.motor_cmd[i].mode,
MOTOR_Q: msg.motor_cmd[i].q,
MOTOR_DQ: msg.motor_cmd[i].dq,
MOTOR_KP: msg.motor_cmd[i].kp,
MOTOR_KD: msg.motor_cmd[i].kd,
MOTOR_TAU: msg.motor_cmd[i].tau,
}
)
return {
TOPIC: topic,
DATA: {
MODE_PR: msg.mode_pr,
MODE_MACHINE: msg.mode_machine,
MOTOR_CMD: motor_cmds,
},
}
def ChannelFactoryInitialize(*args: Any, **kwargs: Any) -> None: # noqa: N802
"""
Initialize ZMQ sockets for robot communication.
This function mimics the Unitree SDK's ChannelFactoryInitialize but uses
ZMQ sockets to connect to the robot server bridge instead of DDS.
"""
global _ctx, _lowcmd_sock, _lowstate_sock
# read socket config
config = UnitreeG1Config()
robot_ip = config.robot_ip
_ctx = zmq.Context.instance()
ctx = zmq.Context.instance()
_ctx = ctx
# lowcmd: robot action
_lowcmd_sock = _ctx.socket(zmq.PUSH)
_lowcmd_sock.setsockopt(zmq.CONFLATE, 1) # keep only last message
_lowcmd_sock.connect(f"tcp://{robot_ip}:{LOWCMD_PORT}")
# lowcmd: send robot commands
lowcmd_sock = ctx.socket(zmq.PUSH)
lowcmd_sock.setsockopt(zmq.CONFLATE, 1) # keep only last message
lowcmd_sock.connect(f"tcp://{robot_ip}:{LOWCMD_PORT}")
_lowcmd_sock = lowcmd_sock
# lowstate: robot observation
_lowstate_sock = _ctx.socket(zmq.SUB)
_lowstate_sock.setsockopt(zmq.CONFLATE, 1) # keep only last message
_lowstate_sock.connect(f"tcp://{robot_ip}:{LOWSTATE_PORT}")
_lowstate_sock.setsockopt_string(zmq.SUBSCRIBE, "")
# lowstate: receive robot observations
lowstate_sock = ctx.socket(zmq.SUB)
lowstate_sock.setsockopt(zmq.CONFLATE, 1) # keep only last message
lowstate_sock.connect(f"tcp://{robot_ip}:{LOWSTATE_PORT}")
lowstate_sock.setsockopt_string(zmq.SUBSCRIBE, "")
_lowstate_sock = lowstate_sock
class ChannelPublisher: # send action to robot
def __init__(self, topic, msg_type):
class ChannelPublisher:
"""ZMQ-based publisher that sends commands to the robot server."""
def __init__(self, topic: str, msg_type: type) -> None:
self.topic = topic
self.msg_type = msg_type
def Init(self): # noqa: N802
def Init(self) -> None: # noqa: N802
"""Initialize the publisher (no-op for ZMQ)."""
pass
def Write(self, msg): # noqa: N802
_lowcmd_sock.send(pickle.dumps((self.topic, msg)))
def Write(self, msg: Any) -> None: # noqa: N802
"""Serialize and send a command message to the robot."""
if _lowcmd_sock is None:
raise RuntimeError("ChannelFactoryInitialize must be called first")
payload = json.dumps(lowcmd_to_dict(self.topic, msg)).encode("utf-8")
_lowcmd_sock.send(payload)
class ChannelSubscriber: # read observation from robot
def __init__(self, topic, msg_type):
class ChannelSubscriber:
"""ZMQ-based subscriber that receives state from the robot server."""
def __init__(self, topic: str, msg_type: type) -> None:
self.topic = topic
self.msg_type = msg_type
def Init(self): # noqa: N802
def Init(self) -> None: # noqa: N802
"""Initialize the subscriber (no-op for ZMQ)."""
pass
def Read(self): # noqa: N802
topic, msg = pickle.loads(_lowstate_sock.recv())
return msg
def Read(self) -> LowStateMsg: # noqa: N802
"""Receive and deserialize a state message from the robot."""
if _lowstate_sock is None:
raise RuntimeError("ChannelFactoryInitialize must be called first")
payload = _lowstate_sock.recv()
msg_dict = json.loads(payload.decode("utf-8"))
return LowStateMsg(msg_dict.get(DATA, {}))