diff --git a/examples/openarms/evaluate.py b/examples/openarms/evaluate.py index 86677067e..f0a42df15 100644 --- a/examples/openarms/evaluate.py +++ b/examples/openarms/evaluate.py @@ -28,6 +28,7 @@ import time from pathlib import Path from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig +from lerobot.configs.policies import PreTrainedConfig from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features from lerobot.datasets.utils import combine_feature_dicts @@ -43,9 +44,9 @@ from lerobot.utils.utils import log_say from lerobot.utils.visualization_utils import init_rerun -HF_MODEL_ID = "/" # TODO: Replace with your trained model -HF_EVAL_DATASET_ID = "/" # TODO: Replace with your eval dataset name -TASK_DESCRIPTION = "Your task description here" # TODO: Replace with your task, this should match!! +HF_MODEL_ID = "lerobot-data-collection/two-folds-act" # TODO: Replace with your trained model +HF_EVAL_DATASET_ID = "lerobot-data-collection/eval-two-folds-act-50k-9" # TODO: Replace with your eval dataset name +TASK_DESCRIPTION = "two-folds-dataset" # TODO: Replace with your task, this should match!! NUM_EPISODES = 5 FPS = 30 @@ -64,8 +65,8 @@ LEADER_RIGHT_PORT = "can1" # Camera configuration CAMERA_CONFIG = { "left_wrist": OpenCVCameraConfig(index_or_path="/dev/video0", width=640, height=480, fps=FPS), - "right_wrist": OpenCVCameraConfig(index_or_path="/dev/video1", width=640, height=480, fps=FPS), - "base": OpenCVCameraConfig(index_or_path="/dev/video4", width=640, height=480, fps=FPS), + "right_wrist": OpenCVCameraConfig(index_or_path="/dev/video4", width=640, height=480, fps=FPS), + "base": OpenCVCameraConfig(index_or_path="/dev/video7", width=640, height=480, fps=FPS), } def main(): @@ -121,9 +122,6 @@ def main(): else: print(f"Leader connected but gravity compensation unavailable (no URDF)") - - policy = make_policy.from_pretrained(HF_MODEL_ID) - # Build default processors for action and observation teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors() @@ -171,6 +169,11 @@ def main(): image_writer_threads=12, ) + # Load policy config from pretrained model and create policy using factory + policy_config = PreTrainedConfig.from_pretrained(HF_MODEL_ID) + policy_config.pretrained_path = HF_MODEL_ID + policy = make_policy(policy_config, ds_meta=dataset.meta) + preprocessor, postprocessor = make_pre_post_processors( policy_cfg=policy.config, pretrained_path=HF_MODEL_ID, diff --git a/examples/openarms_web_interface/web_record_server.py b/examples/openarms_web_interface/web_record_server.py index 75b243715..27c338c10 100644 --- a/examples/openarms_web_interface/web_record_server.py +++ b/examples/openarms_web_interface/web_record_server.py @@ -189,7 +189,7 @@ def initialize_robots_only(config: RobotSetupConfig): can_interface="socketcan", id="openarms_follower", disable_torque_on_disconnect=True, - max_relative_target=10.0, + max_relative_target=30.0, cameras=camera_config, ) diff --git a/src/lerobot/motors/damiao/damiao.py b/src/lerobot/motors/damiao/damiao.py index 1d9e6eb7f..bb6353f02 100644 --- a/src/lerobot/motors/damiao/damiao.py +++ b/src/lerobot/motors/damiao/damiao.py @@ -377,7 +377,7 @@ class DamiaoMotorsBus(MotorsBusBase): try: while len(responses) < len(expected_recv_ids) and (time.time() - start_time) < timeout: - msg = self.canbus.recv(timeout=0.0001) # 100us poll timeout + msg = self.canbus.recv(timeout=0.0002) # 200us poll timeout (increased from 100us for better reliability) if msg and msg.arbitration_id in expected_set: responses[msg.arbitration_id] = msg if len(responses) == len(expected_recv_ids): @@ -633,7 +633,7 @@ class DamiaoMotorsBus(MotorsBusBase): # Step 2: Collect all responses at once (batch receive) expected_recv_ids = [self._get_motor_recv_id(motor) for motor in motors] - responses = self._recv_all_responses(expected_recv_ids, timeout=0.003) # 3ms total timeout + responses = self._recv_all_responses(expected_recv_ids, timeout=0.01) # 10ms total timeout # Step 3: Parse responses for motor in motors: @@ -688,16 +688,17 @@ class DamiaoMotorsBus(MotorsBusBase): motors = self._get_motors_list(motors) result = {} - # Step 1: Send refresh commands to ALL motors first (no waiting) + # Step 1: Send refresh commands to ALL motors first (with small delays to reduce bus congestion) for motor in motors: motor_id = self._get_motor_id(motor) data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0] msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False) self.canbus.send(msg) + time.sleep(0.0001) # 100us delay between commands to reduce bus congestion # Step 2: Collect all responses at once (batch receive) expected_recv_ids = [self._get_motor_recv_id(motor) for motor in motors] - responses = self._recv_all_responses(expected_recv_ids, timeout=0.003) # 3ms total timeout + responses = self._recv_all_responses(expected_recv_ids, timeout=0.015) # 15ms timeout (increased for reliability) # Step 3: Parse responses and extract ALL state values for motor in motors: @@ -774,10 +775,11 @@ class DamiaoMotorsBus(MotorsBusBase): msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) self.canbus.send(msg) + time.sleep(0.0001) # 100us delay between commands to reduce bus congestion # Step 2: Collect all responses at once expected_recv_ids = [self._get_motor_recv_id(motor) for motor in values.keys()] - self._recv_all_responses(expected_recv_ids, timeout=0.002) # 2ms timeout + self._recv_all_responses(expected_recv_ids, timeout=0.015) # 15ms timeout (increased for reliability) else: # Fall back to individual writes for other data types for motor, value in values.items(): diff --git a/src/lerobot/motors/damiao/seed_damiao.py b/src/lerobot/motors/damiao/seed_damiao.py deleted file mode 100644 index 70a36fa1f..000000000 --- a/src/lerobot/motors/damiao/seed_damiao.py +++ /dev/null @@ -1,833 +0,0 @@ -## This is a derivative of the following software. -## https://github.com/cmjang/DM_Control_Python/blob/main/DM_CAN.py - -import can -from time import sleep, time -import numpy as np -from enum import IntEnum -from struct import unpack -from struct import pack - -class Motor: - def __init__(self, MotorType, SlaveID, MasterID): - """ - define Motor object 定义电机对象 - :param MotorType: Motor type 电机类型 - :param SlaveID: CANID 电机ID - :param MasterID: MasterID 主机ID 建议不要设为0 - """ - self.Pd = float(0) - self.Vd = float(0) - self.goal_position = float(0) - self.goal_tau = float(0) - self.state_q = float(0) - self.state_dq = float(0) - self.state_tau = float(0) - self.state_tmos = int(0) - self.state_trotor = int(0) - self.SlaveID = SlaveID - self.MasterID = MasterID - self.MotorType = MotorType - self.isEnable = False - self.NowControlMode = Control_Type.MIT - self.temp_param_dict = {} - - def recv_data(self, q: float, dq: float, tau: float, tmos: int, trotor: int): - self.state_q = q - self.state_dq = dq - self.state_tau = tau - self.state_tmos = tmos - self.state_trotor = trotor - - def getPosition(self): - """ - get the position of the motor 获取电机位置 - :return: the position of the motor 电机位置 - """ - return self.state_q - - def getVelocity(self): - """ - get the velocity of the motor 获取电机速度 - :return: the velocity of the motor 电机速度 - """ - return self.state_dq - - def getTorque(self): - """ - get the torque of the motor 获取电机力矩 - :return: the torque of the motor 电机力矩 - """ - return self.state_tau - - def getParam(self, RID): - """ - get the parameter of the motor 获取电机内部的参数,需要提前读取 - :param RID: DM_variable 电机参数 - :return: the parameter of the motor 电机参数 - """ - if RID in self.temp_param_dict: - return self.temp_param_dict[RID] - else: - return None - - -class MotorControl: - #send_data_frame = np.array( - # [0x55, 0xAA, 0x1e, 0x03, 0x01, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0, 0, 0, 0, 0x00, 0x08, 0x00, - # 0x00, 0, 0, 0, 0, 0, 0, 0, 0, 0x00], np.uint8) - # 4310 4310_48 4340 4340_48 - Limit_Param = [[12.5, 30, 10], [12.5, 50, 10], [12.5, 8, 28], [12.5, 10, 28], - # 6006 8006 8009 10010L 10010 - [12.5, 45, 20], [12.5, 45, 40], [12.5, 45, 54], [12.5, 25, 200], [12.5, 20, 200], - # H3510 DMG62150 DMH6220 - [12.5 , 280 , 1],[12.5 , 45 , 10],[12.5 , 45 , 10]] - - def __init__(self, channel: str, bitrate: int = 1000000): - """ - define MotorControl object 定义电机控制对象 - :param serial_device: serial object 串口对象 - """ - #self.serial_ = serial_device - self.motors_map = dict() - self.data_save = bytes() # save data - #if self.serial_.is_open: # open the serial port - # print("Serial port is open") - # serial_device.close() - #self.serial_.open() - self.canbus = can.interface.Bus(channel=channel, interface='socketcan', bitrate=bitrate) - - #print("can is open") - - - def controlMIT(self, DM_Motor, kp: float, kd: float, q: float, dq: float, tau: float): - """ - MIT Control Mode Function 达妙电机MIT控制模式函数 - :param DM_Motor: Motor object 电机对象 - :param kp: kp - :param kd: kd - :param q: position 期望位置 - :param dq: velocity 期望速度 - :param tau: torque 期望力矩 - :return: None - """ - if DM_Motor.SlaveID not in self.motors_map: - print("controlMIT ERROR : Motor ID not found") - return - kp_uint = float_to_uint(kp, 0, 500, 12) - kd_uint = float_to_uint(kd, 0, 5, 12) - MotorType = DM_Motor.MotorType - Q_MAX = self.Limit_Param[MotorType][0] - DQ_MAX = self.Limit_Param[MotorType][1] - TAU_MAX = self.Limit_Param[MotorType][2] - q_uint = float_to_uint(q, -Q_MAX, Q_MAX, 16) - dq_uint = float_to_uint(dq, -DQ_MAX, DQ_MAX, 12) - tau_uint = float_to_uint(tau, -TAU_MAX, TAU_MAX, 12) - data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) - data_buf[0] = (q_uint >> 8) & 0xff - data_buf[1] = q_uint & 0xff - data_buf[2] = dq_uint >> 4 - data_buf[3] = ((dq_uint & 0xf) << 4) | ((kp_uint >> 8) & 0xf) - data_buf[4] = kp_uint & 0xff - data_buf[5] = kd_uint >> 4 - data_buf[6] = ((kd_uint & 0xf) << 4) | ((tau_uint >> 8) & 0xf) - data_buf[7] = tau_uint & 0xff - self.__send_data(DM_Motor.SlaveID, data_buf) - self.recv() # receive the data from serial port - - def control_delay(self, DM_Motor, kp: float, kd: float, q: float, dq: float, tau: float, delay: float): - """ - MIT Control Mode Function with delay 达妙电机MIT控制模式函数带延迟 - :param DM_Motor: Motor object 电机对象 - :param kp: kp - :param kd: kd - :param q: position 期望位置 - :param dq: velocity 期望速度 - :param tau: torque 期望力矩 - :param delay: delay time 延迟时间 单位秒 - """ - self.controlMIT(DM_Motor, kp, kd, q, dq, tau) - sleep(delay) - - def control_Pos_Vel(self, Motor, P_desired: float, V_desired: float): - """ - control the motor in position and velocity control mode 电机位置速度控制模式 - :param Motor: Motor object 电机对象 - :param P_desired: desired position 期望位置 - :param V_desired: desired velocity 期望速度 - :return: None - """ - if Motor.SlaveID not in self.motors_map: - print("Control Pos_Vel Error : Motor ID not found") - return - motorid = 0x100 + Motor.SlaveID - data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) - P_desired_uint8s = float_to_uint8s(P_desired) - V_desired_uint8s = float_to_uint8s(V_desired) - data_buf[0:4] = P_desired_uint8s - data_buf[4:8] = V_desired_uint8s - self.__send_data(motorid, data_buf) - self.recv() # receive the data from serial port - - def control_Vel(self, Motor, Vel_desired): - """ - control the motor in velocity control mode 电机速度控制模式 - :param Motor: Motor object 电机对象 - :param Vel_desired: desired velocity 期望速度 - """ - if Motor.SlaveID not in self.motors_map: - print("control_VEL ERROR : Motor ID not found") - return - motorid = 0x200 + Motor.SlaveID - data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) - Vel_desired_uint8s = float_to_uint8s(Vel_desired) - data_buf[0:4] = Vel_desired_uint8s - self.__send_data(motorid, data_buf) - self.recv() # receive the data from serial port - - def control_pos_force(self, Motor, Pos_des: float, Vel_des, i_des): - """ - control the motor in EMIT control mode 电机力位混合模式 - :param Pos_des: desired position rad 期望位置 单位为rad - :param Vel_des: desired velocity rad/s 期望速度 为放大100倍 - :param i_des: desired current rang 0-10000 期望电流标幺值放大10000倍 - 电流标幺值:实际电流值除以最大电流值,最大电流见上电打印 - """ - if Motor.SlaveID not in self.motors_map: - print("control_pos_vel ERROR : Motor ID not found") - return - motorid = 0x300 + Motor.SlaveID - data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) - Pos_desired_uint8s = float_to_uint8s(Pos_des) - data_buf[0:4] = Pos_desired_uint8s - Vel_uint = np.uint16(Vel_des) - ides_uint = np.uint16(i_des) - data_buf[4] = Vel_uint & 0xff - data_buf[5] = Vel_uint >> 8 - data_buf[6] = ides_uint & 0xff - data_buf[7] = ides_uint >> 8 - self.__send_data(motorid, data_buf) - self.recv() # receive the data from serial port - - def enable(self, Motor): - """ - enable motor 使能电机 - 最好在上电后几秒后再使能电机 - :param Motor: Motor object 电机对象 - """ - self.__control_cmd(Motor, np.uint8(0xFC)) - sleep(0.1) - self.recv() # receive the data from serial port - - def enable_old(self, Motor ,ControlMode): - """ - enable motor old firmware 使能电机旧版本固件,这个是为了旧版本电机固件的兼容性 - 可恶的旧版本固件使能需要加上偏移量 - 最好在上电后几秒后再使能电机 - :param Motor: Motor object 电机对象 - """ - data_buf = np.array([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xfc], np.uint8) - enable_id = ((int(ControlMode)-1) << 2) + Motor.SlaveID - self.__send_data(enable_id, data_buf) - sleep(0.1) - self.recv() # receive the data from serial port - - def disable(self, Motor): - """ - disable motor 失能电机 - :param Motor: Motor object 电机对象 - """ - self.__control_cmd(Motor, np.uint8(0xFD)) - sleep(0.1) - self.recv() # receive the data from serial port - - def set_zero_position(self, Motor): - """ - set the zero position of the motor 设置电机0位 - :param Motor: Motor object 电机对象 - """ - self.__control_cmd(Motor, np.uint8(0xFE)) - sleep(0.1) - self.recv() # receive the data from serial port - - def recv(self): - # 把上次没有解析完的剩下的也放进来 - # data_recv = b''.join([self.data_save, self.serial_.read_all()]) - #data_recv = b''.join([self.data_save, self.canbus.recv()]) - - - # packets = self.__extract_packets(data_recv) - # for packet in packets: - # data = packet[7:15] - # CANID = (packet[6] << 24) | (packet[5] << 16) | (packet[4] << 8) | packet[3] - # CMD = packet[1] - # self.__process_packet(data, CANID, CMD) - - data_recv = self.canbus.recv(0.1) - - if data_recv is not None: - # data = data_recv.data - # err = data[0] >> 12 - # id = data[0] & 0x7f - # pos = (data[1] << 8) + data[2] - # vel = (data[3] << 4) + (data[4] >> 4) - # tau = ((data[4] & 0x0f) << 8) + data[5] - # t_mos = data[6] - # t_rotor = data[7] - # print(hex(id), err, id, pos, vel, tau, goal_tau, t_mos, t_rotor) - # CANID = data_recv.arbitration_id - CANID = data_recv.data[0] - # CMD = data_recv.data[3] - CMD = 0x11 # 飯田:修正の必要あり - self.__process_packet(data_recv.data, CANID, CMD) - - # 飯田:Debug print - # print(hex(CANID),hex(CMD)) - # print(hex(data_recv.data[0]),hex(data_recv.data[1]),hex(data_recv.data[2]),hex(data_recv.data[3]),hex(data_recv.data[4]),hex(data_recv.data[5]),hex(data_recv.data[6]),hex(data_recv.data[7])) - #return data - - def recv_set_param_data(self): - #data_recv = self.serial_.read_all() - - # packets = self.__extract_packets(data_recv) - # for packet in packets: - # data = packet[7:15] - # CANID = (packet[6] << 24) | (packet[5] << 16) | (packet[4] << 8) | packet[3] - # CMD = packet[1] - # self.__process_set_param_packet(data, CANID, CMD) - - data_recv = self.canbus.recv(0.1) - - - if data_recv is not None: - data = data_recv.data - CANID = data_recv.arbitration_id - # CANID = data_recv.data[0] - # CMD = data_recv.data[3] - CMD = 0x11 # 飯田:修正の必要あり - self.__process_packet(data, CANID, CMD) - - - # 飯田:Debug print - print(hex(CANID),hex(CMD)) - print(hex(data_recv.data[0]),hex(data_recv.data[1]),hex(data_recv.data[2]),hex(data_recv.data[3]),hex(data_recv.data[4]),hex(data_recv.data[5]),hex(data_recv.data[6]),hex(data_recv.data[7])) - - def __process_packet(self, data, CANID, CMD): - if CMD == 0x11: - if CANID != 0x00: - if CANID in self.motors_map: - q_uint = np.uint16((np.uint16(data[1]) << 8) | data[2]) - dq_uint = np.uint16((np.uint16(data[3]) << 4) | (data[4] >> 4)) - tau_uint = np.uint16(((data[4] & 0xf) << 8) | data[5]) - t_mos = data[6] - t_rotor = data[7] - MotorType_recv = self.motors_map[CANID].MotorType - Q_MAX = self.Limit_Param[MotorType_recv][0] - DQ_MAX = self.Limit_Param[MotorType_recv][1] - TAU_MAX = self.Limit_Param[MotorType_recv][2] - recv_q = uint_to_float(q_uint, -Q_MAX, Q_MAX, 16) - recv_dq = uint_to_float(dq_uint, -DQ_MAX, DQ_MAX, 12) - recv_tau = uint_to_float(tau_uint, -TAU_MAX, TAU_MAX, 12) - self.motors_map[CANID].recv_data(recv_q, recv_dq, recv_tau, t_mos, t_rotor) - else: - MasterID=data[0] & 0x0f - if MasterID in self.motors_map: - q_uint = np.uint16((np.uint16(data[1]) << 8) | data[2]) - dq_uint = np.uint16((np.uint16(data[3]) << 4) | (data[4] >> 4)) - tau_uint = np.uint16(((data[4] & 0xf) << 8) | data[5]) - t_mos = data[6] - t_rotor = data[7] - MotorType_recv = self.motors_map[MasterID].MotorType - Q_MAX = self.Limit_Param[MotorType_recv][0] - DQ_MAX = self.Limit_Param[MotorType_recv][1] - TAU_MAX = self.Limit_Param[MotorType_recv][2] - recv_q = uint_to_float(q_uint, -Q_MAX, Q_MAX, 16) - recv_dq = uint_to_float(dq_uint, -DQ_MAX, DQ_MAX, 12) - recv_tau = uint_to_float(tau_uint, -TAU_MAX, TAU_MAX, 12) - self.motors_map[MasterID].recv_data(recv_q, recv_dq, recv_tau, t_mos, t_rotor) - - - def __process_set_param_packet(self, data, CANID, CMD): - if CMD == 0x11 and (data[2] == 0x33 or data[2] == 0x55): - masterid=CANID - slaveId = ((data[1] << 8) | data[0]) - if CANID==0x00: #防止有人把MasterID设为0稳一手 - masterid=slaveId - - if masterid not in self.motors_map: - if slaveId not in self.motors_map: - return - else: - masterid=slaveId - - RID = data[3] - # 读取参数得到的数据 - if is_in_ranges(RID): - #uint32类型 - num = uint8s_to_uint32(data[4], data[5], data[6], data[7]) - self.motors_map[masterid].temp_param_dict[RID] = num - - else: - #float类型 - num = uint8s_to_float(data[4], data[5], data[6], data[7]) - self.motors_map[masterid].temp_param_dict[RID] = num - - - def addMotor(self, Motor): - """ - add motor to the motor control object 添加电机到电机控制对象 - :param Motor: Motor object 电机对象 - """ - self.motors_map[Motor.SlaveID] = Motor - if Motor.MasterID != 0: - self.motors_map[Motor.MasterID] = Motor - return True - - def __control_cmd(self, Motor, cmd: np.uint8): # 飯田:コマンドは通ります - data_buf = np.array([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, cmd], np.uint8) - self.__send_data(Motor.SlaveID, data_buf) - - def __send_data(self, motor_id, data): - """ - send data to the motor 发送数据到电机 - :param motor_id: - :param data: - :return: - """ - #self.send_data_frame[13] = motor_id & 0xff - #self.send_data_frame[14] = (motor_id >> 8)& 0xff #id high 8 bits - #self.send_data_frame[21:29] = data - #self.serial_.write(bytes(self.send_data_frame.T)) - - msg =can.Message(is_extended_id=False,arbitration_id=motor_id,data=data,is_remote_frame = False) - self.canbus.send(msg) - - def __read_RID_param(self, Motor, RID): # 飯田:修正の必要あり? - can_id_l = Motor.SlaveID & 0xff #id low 8 bits - can_id_h = (Motor.SlaveID >> 8)& 0xff #id high 8 bits - data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0x33, np.uint8(RID), 0x00, 0x00, 0x00, 0x00], np.uint8) - self.__send_data(0x7FF, data_buf) - - - - def __write_motor_param(self, Motor, RID, data): # 飯田:修正の必要あり? - can_id_l = Motor.SlaveID & 0xff #id low 8 bits - can_id_h = (Motor.SlaveID >> 8)& 0xff #id high 8 bits - data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0x55, np.uint8(RID), 0x00, 0x00, 0x00, 0x00], np.uint8) - if not is_in_ranges(RID): - # data is float - data_buf[4:8] = float_to_uint8s(data) - else: - # data is int - data_buf[4:8] = data_to_uint8s(int(data)) - self.__send_data(0x7FF, data_buf) - - def switchControlMode(self, Motor, ControlMode): - """ - switch the control mode of the motor 切换电机控制模式 - :param Motor: Motor object 电机对象 - :param ControlMode: Control_Type 电机控制模式 example:MIT:Control_Type.MIT MIT模式 - """ - max_retries = 20 - retry_interval = 0.1 #retry times - RID = 10 - self.__write_motor_param(Motor, RID, np.uint8(ControlMode)) - for _ in range(max_retries): - sleep(retry_interval) - self.recv_set_param_data() - if Motor.SlaveID in self.motors_map: - if RID in self.motors_map[Motor.SlaveID].temp_param_dict: - if abs(self.motors_map[Motor.SlaveID].temp_param_dict[RID] - ControlMode) < 0.1: - return True - else: - return False - return False - - def save_motor_param(self, Motor): - """ - save the all parameter to flash 保存所有电机参数 - :param Motor: Motor object 电机对象 - :return: - """ - can_id_l = Motor.SlaveID & 0xff #id low 8 bits - can_id_h = (Motor.SlaveID >> 8)& 0xff #id high 8 bits - data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0xAA, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) - self.disable(Motor) # before save disable the motor - self.__send_data(0x7FF, data_buf) - sleep(0.001) - - def change_limit_param(self, Motor_Type, PMAX, VMAX, TMAX): - """ - change the PMAX VMAX TMAX of the motor 改变电机的PMAX VMAX TMAX - :param Motor_Type: - :param PMAX: 电机的PMAX - :param VMAX: 电机的VMAX - :param TMAX: 电机的TMAX - :return: - """ - self.Limit_Param[Motor_Type][0] = PMAX - self.Limit_Param[Motor_Type][1] = VMAX - self.Limit_Param[Motor_Type][2] = TMAX - - def refresh_motor_status(self,Motor): - """ - get the motor status 获得电机状态 - """ - can_id_l = Motor.SlaveID & 0xff #id low 8 bits - can_id_h = (Motor.SlaveID >> 8) & 0xff #id high 8 bits - data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) - self.__send_data(0x7FF, data_buf) - self.recv() # receive the data from serial port - - def change_motor_param(self, Motor, RID, data): - """ - change the RID of the motor 改变电机的参数 - :param Motor: Motor object 电机对象 - :param RID: DM_variable 电机参数 - :param data: 电机参数的值 - :return: True or False ,True means success, False means fail - """ - max_retries = 20 - retry_interval = 0.05 #retry times - - self.__write_motor_param(Motor, RID, data) - for _ in range(max_retries): - self.recv_set_param_data() - if Motor.SlaveID in self.motors_map and RID in self.motors_map[Motor.SlaveID].temp_param_dict: - if abs(self.motors_map[Motor.SlaveID].temp_param_dict[RID] - data) < 0.1: - return True - else: - return False - sleep(retry_interval) - return False - - def read_motor_param(self, Motor, RID): - """ - read only the RID of the motor 读取电机的内部信息例如 版本号等 - :param Motor: Motor object 电机对象 - :param RID: DM_variable 电机参数 - :return: 电机参数的值 - """ - max_retries = 5 - retry_interval = 0.05 #retry times - self.__read_RID_param(Motor, RID) - for _ in range(max_retries): - sleep(retry_interval) - self.recv_set_param_data() - if Motor.SlaveID in self.motors_map: - if RID in self.motors_map[Motor.SlaveID].temp_param_dict: - return self.motors_map[Motor.SlaveID].temp_param_dict[RID] - return None - - # ------------------------------------------------- - # Extract packets from the serial data - def __extract_packets(self, data): - frames = [] - header = 0xAA - tail = 0x55 - frame_length = 16 - i = 0 - remainder_pos = 0 - - while i <= len(data) - frame_length: - if data[i] == header and data[i + frame_length - 1] == tail: - frame = data[i:i + frame_length] - frames.append(frame) - i += frame_length - remainder_pos = i - else: - i += 1 - self.data_save = data[remainder_pos:] - return frames - - -def LIMIT_MIN_MAX(x, min, max): - if x <= min: - x = min - elif x > max: - x = max - - -def float_to_uint(x: float, x_min: float, x_max: float, bits): - LIMIT_MIN_MAX(x, x_min, x_max) - span = x_max - x_min - data_norm = (x - x_min) / span - return np.uint16(data_norm * ((1 << bits) - 1)) - - -def uint_to_float(x: np.uint16, min: float, max: float, bits): - span = max - min - data_norm = float(x) / ((1 << bits) - 1) - temp = data_norm * span + min - return np.float32(temp) - - -def float_to_uint8s(value): - # Pack the float into 4 bytes - packed = pack('f', value) - # Unpack the bytes into four uint8 values - return unpack('4B', packed) - - -def data_to_uint8s(value): - # Check if the value is within the range of uint32 - if isinstance(value, int) and (0 <= value <= 0xFFFFFFFF): - # Pack the uint32 into 4 bytes - packed = pack('I', value) - else: - raise ValueError("Value must be an integer within the range of uint32") - - # Unpack the bytes into four uint8 values - return unpack('4B', packed) - - -def is_in_ranges(number): - """ - check if the number is in the range of uint32 - :param number: - :return: - """ - if (7 <= number <= 10) or (13 <= number <= 16) or (35 <= number <= 36): - return True - return False - - -def uint8s_to_uint32(byte1, byte2, byte3, byte4): - # Pack the four uint8 values into a single uint32 value in little-endian order - packed = pack('<4B', byte1, byte2, byte3, byte4) - # Unpack the packed bytes into a uint32 value - return unpack('