Files
lerobot/eval_robot/robot_control/robot_hand_brainco.py
T
2025-11-21 14:13:05 +01:00

197 lines
8.1 KiB
Python

from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds
from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_, MotorStates_ # idl
from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_
import numpy as np
from enum import IntEnum
import threading
import time
from multiprocessing import Process, Array
import logging_mp
logger_mp = logging_mp.get_logger(__name__)
brainco_Num_Motors = 6
kTopicbraincoLeftCommand = "rt/brainco/left/cmd"
kTopicbraincoLeftState = "rt/brainco/left/state"
kTopicbraincoRightCommand = "rt/brainco/right/cmd"
kTopicbraincoRightState = "rt/brainco/right/state"
class Brainco_Controller:
def __init__(
self,
left_hand_array,
right_hand_array,
dual_hand_data_lock=None,
dual_hand_state_array=None,
dual_hand_action_array=None,
fps=100.0,
Unit_Test=False,
simulation_mode=False,
):
logger_mp.info("Initialize Brainco_Controller...")
self.fps = fps
self.hand_sub_ready = False
self.Unit_Test = Unit_Test
self.simulation_mode = simulation_mode
if self.simulation_mode:
ChannelFactoryInitialize(1)
else:
ChannelFactoryInitialize(0)
# initialize handcmd publisher and handstate subscriber
self.LeftHandCmb_publisher = ChannelPublisher(kTopicbraincoLeftCommand, MotorCmds_)
self.LeftHandCmb_publisher.Init()
self.RightHandCmb_publisher = ChannelPublisher(kTopicbraincoRightCommand, MotorCmds_)
self.RightHandCmb_publisher.Init()
self.LeftHandState_subscriber = ChannelSubscriber(kTopicbraincoLeftState, MotorStates_)
self.LeftHandState_subscriber.Init()
self.RightHandState_subscriber = ChannelSubscriber(kTopicbraincoRightState, MotorStates_)
self.RightHandState_subscriber.Init()
# Shared Arrays for hand states
self.left_hand_state_array = Array("d", brainco_Num_Motors, lock=True)
self.right_hand_state_array = Array("d", brainco_Num_Motors, lock=True)
# initialize subscribe thread
self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state)
self.subscribe_state_thread.daemon = True
self.subscribe_state_thread.start()
while not self.hand_sub_ready:
time.sleep(0.1)
logger_mp.warning("[brainco_Controller] Waiting to subscribe dds...")
logger_mp.info("[brainco_Controller] Subscribe dds ok.")
hand_control_process = Process(
target=self.control_process,
args=(
left_hand_array,
right_hand_array,
self.left_hand_state_array,
self.right_hand_state_array,
dual_hand_data_lock,
dual_hand_state_array,
dual_hand_action_array,
),
)
hand_control_process.daemon = True
hand_control_process.start()
logger_mp.info("Initialize brainco_Controller OK!\n")
def _subscribe_hand_state(self):
while True:
left_hand_msg = self.LeftHandState_subscriber.Read()
right_hand_msg = self.RightHandState_subscriber.Read()
self.hand_sub_ready = True
if left_hand_msg is not None and right_hand_msg is not None:
# Update left hand state
for idx, id in enumerate(Brainco_Left_Hand_JointIndex):
self.left_hand_state_array[idx] = left_hand_msg.states[id].q
# Update right hand state
for idx, id in enumerate(Brainco_Right_Hand_JointIndex):
self.right_hand_state_array[idx] = right_hand_msg.states[id].q
time.sleep(0.002)
def ctrl_dual_hand(self, left_q_target, right_q_target):
"""
Set current left, right hand motor state target q
"""
for idx, id in enumerate(Brainco_Left_Hand_JointIndex):
self.left_hand_msg.cmds[id].q = left_q_target[idx]
for idx, id in enumerate(Brainco_Right_Hand_JointIndex):
self.right_hand_msg.cmds[id].q = right_q_target[idx]
self.LeftHandCmb_publisher.Write(self.left_hand_msg)
self.RightHandCmb_publisher.Write(self.right_hand_msg)
# logger_mp.debug("hand ctrl publish ok.")
def control_process(
self,
left_hand_array,
right_hand_array,
left_hand_state_array,
right_hand_state_array,
dual_hand_data_lock=None,
dual_hand_state_array=None,
dual_hand_action_array=None,
):
self.running = True
left_q_target = np.full(brainco_Num_Motors, 0)
right_q_target = np.full(brainco_Num_Motors, 0)
# initialize brainco hand's cmd msg
self.left_hand_msg = MotorCmds_()
self.left_hand_msg.cmds = [unitree_go_msg_dds__MotorCmd_() for _ in range(len(Brainco_Left_Hand_JointIndex))]
self.right_hand_msg = MotorCmds_()
self.right_hand_msg.cmds = [unitree_go_msg_dds__MotorCmd_() for _ in range(len(Brainco_Right_Hand_JointIndex))]
for idx, id in enumerate(Brainco_Left_Hand_JointIndex):
self.left_hand_msg.cmds[id].q = 0.0
self.left_hand_msg.cmds[id].dq = 1.0
for idx, id in enumerate(Brainco_Right_Hand_JointIndex):
self.right_hand_msg.cmds[id].q = 0.0
self.right_hand_msg.cmds[id].dq = 1.0
try:
while self.running:
start_time = time.time()
# get dual hand state
with left_hand_array.get_lock():
left_hand_mat = np.array(left_hand_array[:]).copy()
with right_hand_array.get_lock():
right_hand_mat = np.array(right_hand_array[:]).copy()
# Read left and right q_state from shared arrays
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:])))
action_data = np.concatenate((left_hand_mat, right_hand_mat))
if dual_hand_data_lock is not None:
with dual_hand_data_lock:
dual_hand_state_array[:] = state_data
dual_hand_action_array[:] = action_data
if dual_hand_state_array and dual_hand_action_array:
with dual_hand_data_lock:
left_q_target = left_hand_mat
right_q_target = right_hand_mat
self.ctrl_dual_hand(left_q_target, right_q_target)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, (1 / self.fps) - time_elapsed)
time.sleep(sleep_time)
finally:
logger_mp.info("brainco_Controller has been closed.")
# according to the official documentation, https://www.brainco-hz.com/docs/revolimb-hand/product/parameters.html
# the motor sequence is as shown in the table below
# ┌──────┬───────┬────────────┬────────┬────────┬────────┬────────┐
# │ Id │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │
# ├──────┼───────┼────────────┼────────┼────────┼────────┼────────┤
# │Joint │ thumb │ thumb-aux | index │ middle │ ring │ pinky │
# └──────┴───────┴────────────┴────────┴────────┴────────┴────────┘
class Brainco_Right_Hand_JointIndex(IntEnum):
kRightHandThumb = 0
kRightHandThumbAux = 1
kRightHandIndex = 2
kRightHandMiddle = 3
kRightHandRing = 4
kRightHandPinky = 5
class Brainco_Left_Hand_JointIndex(IntEnum):
kLeftHandThumb = 0
kLeftHandThumbAux = 1
kLeftHandIndex = 2
kLeftHandMiddle = 3
kLeftHandRing = 4
kLeftHandPinky = 5