From 56e2360072385606de755c287c556af8a14241fd Mon Sep 17 00:00:00 2001 From: Pepijn <138571049+pkooij@users.noreply.github.com> Date: Mon, 27 Oct 2025 02:11:10 -0700 Subject: [PATCH] add damiao --- src/lerobot/motors/damiao/__init__.py | 18 + src/lerobot/motors/damiao/damiao.py | 472 +++++++++++++ src/lerobot/motors/damiao/seed_damiao.py | 833 +++++++++++++++++++++++ src/lerobot/motors/damiao/tables.py | 0 tests/motors/test_damiao.py | 79 +++ 5 files changed, 1402 insertions(+) create mode 100644 src/lerobot/motors/damiao/__init__.py create mode 100644 src/lerobot/motors/damiao/damiao.py create mode 100644 src/lerobot/motors/damiao/seed_damiao.py create mode 100644 src/lerobot/motors/damiao/tables.py create mode 100644 tests/motors/test_damiao.py diff --git a/src/lerobot/motors/damiao/__init__.py b/src/lerobot/motors/damiao/__init__.py new file mode 100644 index 000000000..8240138cf --- /dev/null +++ b/src/lerobot/motors/damiao/__init__.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .damiao import DamiaoMotorsBus +from .tables import * diff --git a/src/lerobot/motors/damiao/damiao.py b/src/lerobot/motors/damiao/damiao.py new file mode 100644 index 000000000..2c22112c0 --- /dev/null +++ b/src/lerobot/motors/damiao/damiao.py @@ -0,0 +1,472 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO(pepijn): add license of: https://github.com/cmjang/DM_Control_Python?tab=MIT-1-ov-file#readme + +import logging +from copy import deepcopy +from enum import Enum + +from lerobot.motors.encoding_utils import decode_twos_complement, encode_twos_complement + +logger = logging.getLogger(__name__) + +class OperatingMode(Enum): + MIT = 0 + + +class DamiaoMotorsBus(MotorsBus): + """ + The Damiao implementation for a MotorsBus. It relies on the python-can library to communicate with + the motors. For more info, see the python-can documentation: https://python-can.readthedocs.io/en/stable/, seedstudio documentation: https://wiki.seeedstudio.com/damiao_series/ and DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python + https://wiki.seeedstudio.com/damiao_series/ and DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python + """ + + def __init__( + self, + port: str, + ): + self.port = port + + def configure_motors(self) -> None: + for motor in self.motors + + + @cached_property + def models(self) -> list[str]: + return [m.model for m in self.motors.values()] + + @cached_property + def ids(self) -> list[int]: + return [m.id for m in self.motors.values()] + + @property + def is_connected(self) -> bool: + """bool: `True` if the underlying serial port is open.""" + return self.port_handler.is_open + + def connect(self, handshake: bool = True) -> None: + """Open the serial port and initialise communication. + + Args: + handshake (bool, optional): Pings every expected motor and performs additional + integrity checks specific to the implementation. Defaults to `True`. + + Raises: + DeviceAlreadyConnectedError: The port is already open. + ConnectionError: The underlying SDK failed to open the port or the handshake did not succeed. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError( + f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice." + ) + + self._connect(handshake) + self.set_timeout() + logger.debug(f"{self.__class__.__name__} connected.") + + + @property + def is_calibrated(self) -> bool: + return self.calibration == self.read_calibration() + + def read_calibration(self) -> dict[str, MotorCalibration]: + offsets = self.sync_read("Homing_Offset", normalize=False) + mins = self.sync_read("Min_Position_Limit", normalize=False) + maxes = self.sync_read("Max_Position_Limit", normalize=False) + drive_modes = self.sync_read("Drive_Mode", normalize=False) + + calibration = {} + for motor, m in self.motors.items(): + calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=drive_modes[motor], + homing_offset=offsets[motor], + range_min=mins[motor], + range_max=maxes[motor], + ) + + return calibration + + def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None: + for motor, calibration in calibration_dict.items(): + self.write("Homing_Offset", motor, calibration.homing_offset) + self.write("Min_Position_Limit", motor, calibration.range_min) + self.write("Max_Position_Limit", motor, calibration.range_max) + + if cache: + self.calibration = calibration_dict + + def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: + for motor in self._get_motors_list(motors): + self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry) + + def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: + for motor in self._get_motors_list(motors): + self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry) + + def disconnect(self, disable_torque: bool = True) -> None: + """Close the serial port (optionally disabling torque first). + + Args: + disable_torque (bool, optional): If `True` (default) torque is disabled on every motor before + closing the port. This can prevent damaging motors if they are left applying resisting torque + after disconnect. + """ + if not self.is_connected: + raise DeviceNotConnectedError( + f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first." + ) + + if disable_torque: + self.port_handler.clearPort() + self.port_handler.is_using = False + self.disable_torque(num_retry=5) + + self.port_handler.closePort() + logger.debug(f"{self.__class__.__name__} disconnected.") + + @classmethod + def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]: + """Probe *port* at every supported baud-rate and list responding IDs. + + Args: + port (str): Serial/USB port to scan (e.g. ``"/dev/ttyUSB0"``). + *args, **kwargs: Forwarded to the subclass constructor. + + Returns: + dict[int, list[int]]: Mapping *baud-rate → list of motor IDs* + for every baud-rate that produced at least one response. + """ + bus = cls(port, {}, *args, **kwargs) + bus._connect(handshake=False) + baudrate_ids = {} + for baudrate in tqdm(bus.available_baudrates, desc="Scanning port"): + bus.set_baudrate(baudrate) + ids_models = bus.broadcast_ping() + if ids_models: + tqdm.write(f"Motors found for {baudrate=}: {pformat(ids_models, indent=4)}") + baudrate_ids[baudrate] = list(ids_models) + + bus.port_handler.closePort() + return baudrate_ids + + def setup_motor( + self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None + ) -> None: + """Assign the correct ID and baud-rate to a single motor. + + This helper temporarily switches to the motor's current settings, disables torque, sets the desired + ID, and finally programs the bus' default baud-rate. + + Args: + motor (str): Key of the motor in :pyattr:`motors`. + initial_baudrate (int | None, optional): Current baud-rate (skips scanning when provided). + Defaults to None. + initial_id (int | None, optional): Current ID (skips scanning when provided). Defaults to None. + + Raises: + RuntimeError: The motor could not be found or its model number + does not match the expected one. + ConnectionError: Communication with the motor failed. + """ + if not self.is_connected: + self._connect(handshake=False) + + if initial_baudrate is None: + initial_baudrate, initial_id = self._find_single_motor(motor) + + if initial_id is None: + _, initial_id = self._find_single_motor(motor, initial_baudrate) + + model = self.motors[motor].model + target_id = self.motors[motor].id + self.set_baudrate(initial_baudrate) + self._disable_torque(initial_id, model) + + # Set ID + addr, length = get_address(self.model_ctrl_table, model, "ID") + self._write(addr, length, initial_id, target_id) + + # Set Baudrate + addr, length = get_address(self.model_ctrl_table, model, "Baud_Rate") + baudrate_value = self.model_baudrate_table[model][self.default_baudrate] + self._write(addr, length, target_id, baudrate_value) + + self.set_baudrate(self.default_baudrate) + + @contextmanager + def torque_disabled(self, motors: int | str | list[str] | None = None): + """Context-manager that guarantees torque is re-enabled. + + This helper is useful to temporarily disable torque when configuring motors. + + Examples: + >>> with bus.torque_disabled(): + ... # Safe operations here + ... pass + """ + self.disable_torque(motors) + try: + yield + finally: + self.enable_torque(motors) + + def set_timeout(self, timeout_ms: int | None = None): + """Change the packet timeout used by the SDK. + + Args: + timeout_ms (int | None, optional): Timeout in *milliseconds*. If `None` (default) the method falls + back to :pyattr:`default_timeout`. + """ + timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout + self.port_handler.setPacketTimeoutMillis(timeout_ms) + + def get_baudrate(self) -> int: + """Return the current baud-rate configured on the port. + + Returns: + int: Baud-rate in bits / second. + """ + return self.port_handler.getBaudRate() + + def set_baudrate(self, baudrate: int) -> None: + """Set a new UART baud-rate on the port. + + Args: + baudrate (int): Desired baud-rate in bits / second. + + Raises: + RuntimeError: The SDK failed to apply the change. + """ + present_bus_baudrate = self.port_handler.getBaudRate() + if present_bus_baudrate != baudrate: + logger.info(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.") + self.port_handler.setBaudRate(baudrate) + + if self.port_handler.getBaudRate() != baudrate: + raise RuntimeError("Failed to write bus baud rate.") + + def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None: + """Restore factory calibration for the selected motors. + + Homing offset is set to ``0`` and min/max position limits are set to the full usable range. + The in-memory :pyattr:`calibration` is cleared. + + Args: + motors (NameOrID | list[NameOrID] | None, optional): Selection of motors. `None` (default) + resets every motor. + """ + if motors is None: + motors = list(self.motors) + elif isinstance(motors, (str | int)): + motors = [motors] + elif not isinstance(motors, list): + raise TypeError(motors) + + for motor in motors: + model = self._get_motor_model(motor) + max_res = self.model_resolution_table[model] - 1 + self.write("Homing_Offset", motor, 0, normalize=False) + self.write("Min_Position_Limit", motor, 0, normalize=False) + self.write("Max_Position_Limit", motor, max_res, normalize=False) + + self.calibration = {} + + def record_ranges_of_motion( + self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True + ) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: + """Interactively record the min/max encoder values of each motor. + + Move the joints by hand (with torque disabled) while the method streams live positions. Press + :kbd:`Enter` to finish. + + Args: + motors (NameOrID | list[NameOrID] | None, optional): Motors to record. + Defaults to every motor (`None`). + display_values (bool, optional): When `True` (default) a live table is printed to the console. + + Returns: + tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: Two dictionaries *mins* and *maxes* with the + extreme values observed for each motor. + """ + if motors is None: + motors = list(self.motors) + elif isinstance(motors, (str | int)): + motors = [motors] + elif not isinstance(motors, list): + raise TypeError(motors) + + start_positions = self.sync_read("Present_Position", motors, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + + user_pressed_enter = False + while not user_pressed_enter: + positions = self.sync_read("Present_Position", motors, normalize=False) + mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()} + maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()} + + if display_values: + print("\n-------------------------------------------") + print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}") + for motor in motors: + print(f"{motor:<15} | {mins[motor]:>6} | {positions[motor]:>6} | {maxes[motor]:>6}") + + if enter_pressed(): + user_pressed_enter = True + + if display_values and not user_pressed_enter: + # Move cursor up to overwrite the previous output + move_cursor_up(len(motors) + 3) + + same_min_max = [motor for motor in motors if mins[motor] == maxes[motor]] + if same_min_max: + raise ValueError(f"Some motors have the same min and max values:\n{pformat(same_min_max)}") + + return mins, maxes + + def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]: + if not self.calibration: + raise RuntimeError(f"{self} has no calibration registered.") + + normalized_values = {} + for id_, val in ids_values.items(): + motor = self._id_to_name(id_) + min_ = self.calibration[motor].range_min + max_ = self.calibration[motor].range_max + drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode + if max_ == min_: + raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.") + + bounded_val = min(max_, max(min_, val)) + if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100: + norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 + normalized_values[id_] = -norm if drive_mode else norm + elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100: + norm = ((bounded_val - min_) / (max_ - min_)) * 100 + normalized_values[id_] = 100 - norm if drive_mode else norm + elif self.motors[motor].norm_mode is MotorNormMode.DEGREES: + mid = (min_ + max_) / 2 + max_res = self.model_resolution_table[self._id_to_model(id_)] - 1 + normalized_values[id_] = (val - mid) * 360 / max_res + else: + raise NotImplementedError + + return normalized_values + + def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]: + if not self.calibration: + raise RuntimeError(f"{self} has no calibration registered.") + + unnormalized_values = {} + for id_, val in ids_values.items(): + motor = self._id_to_name(id_) + min_ = self.calibration[motor].range_min + max_ = self.calibration[motor].range_max + drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode + if max_ == min_: + raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.") + + if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100: + val = -val if drive_mode else val + bounded_val = min(100.0, max(-100.0, val)) + unnormalized_values[id_] = int(((bounded_val + 100) / 200) * (max_ - min_) + min_) + elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100: + val = 100 - val if drive_mode else val + bounded_val = min(100.0, max(0.0, val)) + unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_) + elif self.motors[motor].norm_mode is MotorNormMode.DEGREES: + mid = (min_ + max_) / 2 + max_res = self.model_resolution_table[self._id_to_model(id_)] - 1 + unnormalized_values[id_] = int((val * max_res / 360) + mid) + else: + raise NotImplementedError + + return unnormalized_values + + def read( + self, + data_name: str, + motor: str, + *, + normalize: bool = True, + num_retry: int = 0, + ) -> Value: + """Read a register from a motor. + + Args: + data_name (str): Control-table key (e.g. `"Present_Position"`). + motor (str): Motor name. + normalize (bool, optional): When `True` (default) scale the value to a user-friendly range as + defined by the calibration. + num_retry (int, optional): Retry attempts. Defaults to `0`. + + Returns: + Value: Raw or normalised value depending on *normalize*. + """ + if not self.is_connected: + raise DeviceNotConnectedError( + f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." + ) + + id_ = self.motors[motor].id + model = self.motors[motor].model + addr, length = get_address(self.model_ctrl_table, model, data_name) + + err_msg = f"Failed to read '{data_name}' on {id_=} after {num_retry + 1} tries." + value, _, _ = self._read(addr, length, id_, num_retry=num_retry, raise_on_error=True, err_msg=err_msg) + + id_value = self._decode_sign(data_name, {id_: value}) + + if normalize and data_name in self.normalized_data: + id_value = self._normalize(id_value) + + return id_value[id_] + + def write( + self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0 + ) -> None: + """Write a value to a single motor's register. + + Contrary to :pymeth:`sync_write`, this expects a response status packet emitted by the motor, which + provides a guarantee that the value was written to the register successfully. In consequence, it is + slower than :pymeth:`sync_write` but it is more reliable. It should typically be used when configuring + motors. + + Args: + data_name (str): Register name. + motor (str): Motor name. + value (Value): Value to write. If *normalize* is `True` the value is first converted to raw + units. + normalize (bool, optional): Enable or disable normalisation. Defaults to `True`. + num_retry (int, optional): Retry attempts. Defaults to `0`. + """ + if not self.is_connected: + raise DeviceNotConnectedError( + f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." + ) + + id_ = self.motors[motor].id + model = self.motors[motor].model + addr, length = get_address(self.model_ctrl_table, model, data_name) + + if normalize and data_name in self.normalized_data: + value = self._unnormalize({id_: value})[id_] + + value = self._encode_sign(data_name, {id_: value})[id_] + + err_msg = f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries." + self._write(addr, length, id_, value, num_retry=num_retry, raise_on_error=True, err_msg=err_msg) + diff --git a/src/lerobot/motors/damiao/seed_damiao.py b/src/lerobot/motors/damiao/seed_damiao.py new file mode 100644 index 000000000..70a36fa1f --- /dev/null +++ b/src/lerobot/motors/damiao/seed_damiao.py @@ -0,0 +1,833 @@ +## This is a derivative of the following software. +## https://github.com/cmjang/DM_Control_Python/blob/main/DM_CAN.py + +import can +from time import sleep, time +import numpy as np +from enum import IntEnum +from struct import unpack +from struct import pack + +class Motor: + def __init__(self, MotorType, SlaveID, MasterID): + """ + define Motor object 定义电机对象 + :param MotorType: Motor type 电机类型 + :param SlaveID: CANID 电机ID + :param MasterID: MasterID 主机ID 建议不要设为0 + """ + self.Pd = float(0) + self.Vd = float(0) + self.goal_position = float(0) + self.goal_tau = float(0) + self.state_q = float(0) + self.state_dq = float(0) + self.state_tau = float(0) + self.state_tmos = int(0) + self.state_trotor = int(0) + self.SlaveID = SlaveID + self.MasterID = MasterID + self.MotorType = MotorType + self.isEnable = False + self.NowControlMode = Control_Type.MIT + self.temp_param_dict = {} + + def recv_data(self, q: float, dq: float, tau: float, tmos: int, trotor: int): + self.state_q = q + self.state_dq = dq + self.state_tau = tau + self.state_tmos = tmos + self.state_trotor = trotor + + def getPosition(self): + """ + get the position of the motor 获取电机位置 + :return: the position of the motor 电机位置 + """ + return self.state_q + + def getVelocity(self): + """ + get the velocity of the motor 获取电机速度 + :return: the velocity of the motor 电机速度 + """ + return self.state_dq + + def getTorque(self): + """ + get the torque of the motor 获取电机力矩 + :return: the torque of the motor 电机力矩 + """ + return self.state_tau + + def getParam(self, RID): + """ + get the parameter of the motor 获取电机内部的参数,需要提前读取 + :param RID: DM_variable 电机参数 + :return: the parameter of the motor 电机参数 + """ + if RID in self.temp_param_dict: + return self.temp_param_dict[RID] + else: + return None + + +class MotorControl: + #send_data_frame = np.array( + # [0x55, 0xAA, 0x1e, 0x03, 0x01, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0, 0, 0, 0, 0x00, 0x08, 0x00, + # 0x00, 0, 0, 0, 0, 0, 0, 0, 0, 0x00], np.uint8) + # 4310 4310_48 4340 4340_48 + Limit_Param = [[12.5, 30, 10], [12.5, 50, 10], [12.5, 8, 28], [12.5, 10, 28], + # 6006 8006 8009 10010L 10010 + [12.5, 45, 20], [12.5, 45, 40], [12.5, 45, 54], [12.5, 25, 200], [12.5, 20, 200], + # H3510 DMG62150 DMH6220 + [12.5 , 280 , 1],[12.5 , 45 , 10],[12.5 , 45 , 10]] + + def __init__(self, channel: str, bitrate: int = 1000000): + """ + define MotorControl object 定义电机控制对象 + :param serial_device: serial object 串口对象 + """ + #self.serial_ = serial_device + self.motors_map = dict() + self.data_save = bytes() # save data + #if self.serial_.is_open: # open the serial port + # print("Serial port is open") + # serial_device.close() + #self.serial_.open() + self.canbus = can.interface.Bus(channel=channel, interface='socketcan', bitrate=bitrate) + + #print("can is open") + + + def controlMIT(self, DM_Motor, kp: float, kd: float, q: float, dq: float, tau: float): + """ + MIT Control Mode Function 达妙电机MIT控制模式函数 + :param DM_Motor: Motor object 电机对象 + :param kp: kp + :param kd: kd + :param q: position 期望位置 + :param dq: velocity 期望速度 + :param tau: torque 期望力矩 + :return: None + """ + if DM_Motor.SlaveID not in self.motors_map: + print("controlMIT ERROR : Motor ID not found") + return + kp_uint = float_to_uint(kp, 0, 500, 12) + kd_uint = float_to_uint(kd, 0, 5, 12) + MotorType = DM_Motor.MotorType + Q_MAX = self.Limit_Param[MotorType][0] + DQ_MAX = self.Limit_Param[MotorType][1] + TAU_MAX = self.Limit_Param[MotorType][2] + q_uint = float_to_uint(q, -Q_MAX, Q_MAX, 16) + dq_uint = float_to_uint(dq, -DQ_MAX, DQ_MAX, 12) + tau_uint = float_to_uint(tau, -TAU_MAX, TAU_MAX, 12) + data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) + data_buf[0] = (q_uint >> 8) & 0xff + data_buf[1] = q_uint & 0xff + data_buf[2] = dq_uint >> 4 + data_buf[3] = ((dq_uint & 0xf) << 4) | ((kp_uint >> 8) & 0xf) + data_buf[4] = kp_uint & 0xff + data_buf[5] = kd_uint >> 4 + data_buf[6] = ((kd_uint & 0xf) << 4) | ((tau_uint >> 8) & 0xf) + data_buf[7] = tau_uint & 0xff + self.__send_data(DM_Motor.SlaveID, data_buf) + self.recv() # receive the data from serial port + + def control_delay(self, DM_Motor, kp: float, kd: float, q: float, dq: float, tau: float, delay: float): + """ + MIT Control Mode Function with delay 达妙电机MIT控制模式函数带延迟 + :param DM_Motor: Motor object 电机对象 + :param kp: kp + :param kd: kd + :param q: position 期望位置 + :param dq: velocity 期望速度 + :param tau: torque 期望力矩 + :param delay: delay time 延迟时间 单位秒 + """ + self.controlMIT(DM_Motor, kp, kd, q, dq, tau) + sleep(delay) + + def control_Pos_Vel(self, Motor, P_desired: float, V_desired: float): + """ + control the motor in position and velocity control mode 电机位置速度控制模式 + :param Motor: Motor object 电机对象 + :param P_desired: desired position 期望位置 + :param V_desired: desired velocity 期望速度 + :return: None + """ + if Motor.SlaveID not in self.motors_map: + print("Control Pos_Vel Error : Motor ID not found") + return + motorid = 0x100 + Motor.SlaveID + data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) + P_desired_uint8s = float_to_uint8s(P_desired) + V_desired_uint8s = float_to_uint8s(V_desired) + data_buf[0:4] = P_desired_uint8s + data_buf[4:8] = V_desired_uint8s + self.__send_data(motorid, data_buf) + self.recv() # receive the data from serial port + + def control_Vel(self, Motor, Vel_desired): + """ + control the motor in velocity control mode 电机速度控制模式 + :param Motor: Motor object 电机对象 + :param Vel_desired: desired velocity 期望速度 + """ + if Motor.SlaveID not in self.motors_map: + print("control_VEL ERROR : Motor ID not found") + return + motorid = 0x200 + Motor.SlaveID + data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) + Vel_desired_uint8s = float_to_uint8s(Vel_desired) + data_buf[0:4] = Vel_desired_uint8s + self.__send_data(motorid, data_buf) + self.recv() # receive the data from serial port + + def control_pos_force(self, Motor, Pos_des: float, Vel_des, i_des): + """ + control the motor in EMIT control mode 电机力位混合模式 + :param Pos_des: desired position rad 期望位置 单位为rad + :param Vel_des: desired velocity rad/s 期望速度 为放大100倍 + :param i_des: desired current rang 0-10000 期望电流标幺值放大10000倍 + 电流标幺值:实际电流值除以最大电流值,最大电流见上电打印 + """ + if Motor.SlaveID not in self.motors_map: + print("control_pos_vel ERROR : Motor ID not found") + return + motorid = 0x300 + Motor.SlaveID + data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) + Pos_desired_uint8s = float_to_uint8s(Pos_des) + data_buf[0:4] = Pos_desired_uint8s + Vel_uint = np.uint16(Vel_des) + ides_uint = np.uint16(i_des) + data_buf[4] = Vel_uint & 0xff + data_buf[5] = Vel_uint >> 8 + data_buf[6] = ides_uint & 0xff + data_buf[7] = ides_uint >> 8 + self.__send_data(motorid, data_buf) + self.recv() # receive the data from serial port + + def enable(self, Motor): + """ + enable motor 使能电机 + 最好在上电后几秒后再使能电机 + :param Motor: Motor object 电机对象 + """ + self.__control_cmd(Motor, np.uint8(0xFC)) + sleep(0.1) + self.recv() # receive the data from serial port + + def enable_old(self, Motor ,ControlMode): + """ + enable motor old firmware 使能电机旧版本固件,这个是为了旧版本电机固件的兼容性 + 可恶的旧版本固件使能需要加上偏移量 + 最好在上电后几秒后再使能电机 + :param Motor: Motor object 电机对象 + """ + data_buf = np.array([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xfc], np.uint8) + enable_id = ((int(ControlMode)-1) << 2) + Motor.SlaveID + self.__send_data(enable_id, data_buf) + sleep(0.1) + self.recv() # receive the data from serial port + + def disable(self, Motor): + """ + disable motor 失能电机 + :param Motor: Motor object 电机对象 + """ + self.__control_cmd(Motor, np.uint8(0xFD)) + sleep(0.1) + self.recv() # receive the data from serial port + + def set_zero_position(self, Motor): + """ + set the zero position of the motor 设置电机0位 + :param Motor: Motor object 电机对象 + """ + self.__control_cmd(Motor, np.uint8(0xFE)) + sleep(0.1) + self.recv() # receive the data from serial port + + def recv(self): + # 把上次没有解析完的剩下的也放进来 + # data_recv = b''.join([self.data_save, self.serial_.read_all()]) + #data_recv = b''.join([self.data_save, self.canbus.recv()]) + + + # packets = self.__extract_packets(data_recv) + # for packet in packets: + # data = packet[7:15] + # CANID = (packet[6] << 24) | (packet[5] << 16) | (packet[4] << 8) | packet[3] + # CMD = packet[1] + # self.__process_packet(data, CANID, CMD) + + data_recv = self.canbus.recv(0.1) + + if data_recv is not None: + # data = data_recv.data + # err = data[0] >> 12 + # id = data[0] & 0x7f + # pos = (data[1] << 8) + data[2] + # vel = (data[3] << 4) + (data[4] >> 4) + # tau = ((data[4] & 0x0f) << 8) + data[5] + # t_mos = data[6] + # t_rotor = data[7] + # print(hex(id), err, id, pos, vel, tau, goal_tau, t_mos, t_rotor) + # CANID = data_recv.arbitration_id + CANID = data_recv.data[0] + # CMD = data_recv.data[3] + CMD = 0x11 # 飯田:修正の必要あり + self.__process_packet(data_recv.data, CANID, CMD) + + # 飯田:Debug print + # print(hex(CANID),hex(CMD)) + # print(hex(data_recv.data[0]),hex(data_recv.data[1]),hex(data_recv.data[2]),hex(data_recv.data[3]),hex(data_recv.data[4]),hex(data_recv.data[5]),hex(data_recv.data[6]),hex(data_recv.data[7])) + #return data + + def recv_set_param_data(self): + #data_recv = self.serial_.read_all() + + # packets = self.__extract_packets(data_recv) + # for packet in packets: + # data = packet[7:15] + # CANID = (packet[6] << 24) | (packet[5] << 16) | (packet[4] << 8) | packet[3] + # CMD = packet[1] + # self.__process_set_param_packet(data, CANID, CMD) + + data_recv = self.canbus.recv(0.1) + + + if data_recv is not None: + data = data_recv.data + CANID = data_recv.arbitration_id + # CANID = data_recv.data[0] + # CMD = data_recv.data[3] + CMD = 0x11 # 飯田:修正の必要あり + self.__process_packet(data, CANID, CMD) + + + # 飯田:Debug print + print(hex(CANID),hex(CMD)) + print(hex(data_recv.data[0]),hex(data_recv.data[1]),hex(data_recv.data[2]),hex(data_recv.data[3]),hex(data_recv.data[4]),hex(data_recv.data[5]),hex(data_recv.data[6]),hex(data_recv.data[7])) + + def __process_packet(self, data, CANID, CMD): + if CMD == 0x11: + if CANID != 0x00: + if CANID in self.motors_map: + q_uint = np.uint16((np.uint16(data[1]) << 8) | data[2]) + dq_uint = np.uint16((np.uint16(data[3]) << 4) | (data[4] >> 4)) + tau_uint = np.uint16(((data[4] & 0xf) << 8) | data[5]) + t_mos = data[6] + t_rotor = data[7] + MotorType_recv = self.motors_map[CANID].MotorType + Q_MAX = self.Limit_Param[MotorType_recv][0] + DQ_MAX = self.Limit_Param[MotorType_recv][1] + TAU_MAX = self.Limit_Param[MotorType_recv][2] + recv_q = uint_to_float(q_uint, -Q_MAX, Q_MAX, 16) + recv_dq = uint_to_float(dq_uint, -DQ_MAX, DQ_MAX, 12) + recv_tau = uint_to_float(tau_uint, -TAU_MAX, TAU_MAX, 12) + self.motors_map[CANID].recv_data(recv_q, recv_dq, recv_tau, t_mos, t_rotor) + else: + MasterID=data[0] & 0x0f + if MasterID in self.motors_map: + q_uint = np.uint16((np.uint16(data[1]) << 8) | data[2]) + dq_uint = np.uint16((np.uint16(data[3]) << 4) | (data[4] >> 4)) + tau_uint = np.uint16(((data[4] & 0xf) << 8) | data[5]) + t_mos = data[6] + t_rotor = data[7] + MotorType_recv = self.motors_map[MasterID].MotorType + Q_MAX = self.Limit_Param[MotorType_recv][0] + DQ_MAX = self.Limit_Param[MotorType_recv][1] + TAU_MAX = self.Limit_Param[MotorType_recv][2] + recv_q = uint_to_float(q_uint, -Q_MAX, Q_MAX, 16) + recv_dq = uint_to_float(dq_uint, -DQ_MAX, DQ_MAX, 12) + recv_tau = uint_to_float(tau_uint, -TAU_MAX, TAU_MAX, 12) + self.motors_map[MasterID].recv_data(recv_q, recv_dq, recv_tau, t_mos, t_rotor) + + + def __process_set_param_packet(self, data, CANID, CMD): + if CMD == 0x11 and (data[2] == 0x33 or data[2] == 0x55): + masterid=CANID + slaveId = ((data[1] << 8) | data[0]) + if CANID==0x00: #防止有人把MasterID设为0稳一手 + masterid=slaveId + + if masterid not in self.motors_map: + if slaveId not in self.motors_map: + return + else: + masterid=slaveId + + RID = data[3] + # 读取参数得到的数据 + if is_in_ranges(RID): + #uint32类型 + num = uint8s_to_uint32(data[4], data[5], data[6], data[7]) + self.motors_map[masterid].temp_param_dict[RID] = num + + else: + #float类型 + num = uint8s_to_float(data[4], data[5], data[6], data[7]) + self.motors_map[masterid].temp_param_dict[RID] = num + + + def addMotor(self, Motor): + """ + add motor to the motor control object 添加电机到电机控制对象 + :param Motor: Motor object 电机对象 + """ + self.motors_map[Motor.SlaveID] = Motor + if Motor.MasterID != 0: + self.motors_map[Motor.MasterID] = Motor + return True + + def __control_cmd(self, Motor, cmd: np.uint8): # 飯田:コマンドは通ります + data_buf = np.array([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, cmd], np.uint8) + self.__send_data(Motor.SlaveID, data_buf) + + def __send_data(self, motor_id, data): + """ + send data to the motor 发送数据到电机 + :param motor_id: + :param data: + :return: + """ + #self.send_data_frame[13] = motor_id & 0xff + #self.send_data_frame[14] = (motor_id >> 8)& 0xff #id high 8 bits + #self.send_data_frame[21:29] = data + #self.serial_.write(bytes(self.send_data_frame.T)) + + msg =can.Message(is_extended_id=False,arbitration_id=motor_id,data=data,is_remote_frame = False) + self.canbus.send(msg) + + def __read_RID_param(self, Motor, RID): # 飯田:修正の必要あり? + can_id_l = Motor.SlaveID & 0xff #id low 8 bits + can_id_h = (Motor.SlaveID >> 8)& 0xff #id high 8 bits + data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0x33, np.uint8(RID), 0x00, 0x00, 0x00, 0x00], np.uint8) + self.__send_data(0x7FF, data_buf) + + + + def __write_motor_param(self, Motor, RID, data): # 飯田:修正の必要あり? + can_id_l = Motor.SlaveID & 0xff #id low 8 bits + can_id_h = (Motor.SlaveID >> 8)& 0xff #id high 8 bits + data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0x55, np.uint8(RID), 0x00, 0x00, 0x00, 0x00], np.uint8) + if not is_in_ranges(RID): + # data is float + data_buf[4:8] = float_to_uint8s(data) + else: + # data is int + data_buf[4:8] = data_to_uint8s(int(data)) + self.__send_data(0x7FF, data_buf) + + def switchControlMode(self, Motor, ControlMode): + """ + switch the control mode of the motor 切换电机控制模式 + :param Motor: Motor object 电机对象 + :param ControlMode: Control_Type 电机控制模式 example:MIT:Control_Type.MIT MIT模式 + """ + max_retries = 20 + retry_interval = 0.1 #retry times + RID = 10 + self.__write_motor_param(Motor, RID, np.uint8(ControlMode)) + for _ in range(max_retries): + sleep(retry_interval) + self.recv_set_param_data() + if Motor.SlaveID in self.motors_map: + if RID in self.motors_map[Motor.SlaveID].temp_param_dict: + if abs(self.motors_map[Motor.SlaveID].temp_param_dict[RID] - ControlMode) < 0.1: + return True + else: + return False + return False + + def save_motor_param(self, Motor): + """ + save the all parameter to flash 保存所有电机参数 + :param Motor: Motor object 电机对象 + :return: + """ + can_id_l = Motor.SlaveID & 0xff #id low 8 bits + can_id_h = (Motor.SlaveID >> 8)& 0xff #id high 8 bits + data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0xAA, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) + self.disable(Motor) # before save disable the motor + self.__send_data(0x7FF, data_buf) + sleep(0.001) + + def change_limit_param(self, Motor_Type, PMAX, VMAX, TMAX): + """ + change the PMAX VMAX TMAX of the motor 改变电机的PMAX VMAX TMAX + :param Motor_Type: + :param PMAX: 电机的PMAX + :param VMAX: 电机的VMAX + :param TMAX: 电机的TMAX + :return: + """ + self.Limit_Param[Motor_Type][0] = PMAX + self.Limit_Param[Motor_Type][1] = VMAX + self.Limit_Param[Motor_Type][2] = TMAX + + def refresh_motor_status(self,Motor): + """ + get the motor status 获得电机状态 + """ + can_id_l = Motor.SlaveID & 0xff #id low 8 bits + can_id_h = (Motor.SlaveID >> 8) & 0xff #id high 8 bits + data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8) + self.__send_data(0x7FF, data_buf) + self.recv() # receive the data from serial port + + def change_motor_param(self, Motor, RID, data): + """ + change the RID of the motor 改变电机的参数 + :param Motor: Motor object 电机对象 + :param RID: DM_variable 电机参数 + :param data: 电机参数的值 + :return: True or False ,True means success, False means fail + """ + max_retries = 20 + retry_interval = 0.05 #retry times + + self.__write_motor_param(Motor, RID, data) + for _ in range(max_retries): + self.recv_set_param_data() + if Motor.SlaveID in self.motors_map and RID in self.motors_map[Motor.SlaveID].temp_param_dict: + if abs(self.motors_map[Motor.SlaveID].temp_param_dict[RID] - data) < 0.1: + return True + else: + return False + sleep(retry_interval) + return False + + def read_motor_param(self, Motor, RID): + """ + read only the RID of the motor 读取电机的内部信息例如 版本号等 + :param Motor: Motor object 电机对象 + :param RID: DM_variable 电机参数 + :return: 电机参数的值 + """ + max_retries = 5 + retry_interval = 0.05 #retry times + self.__read_RID_param(Motor, RID) + for _ in range(max_retries): + sleep(retry_interval) + self.recv_set_param_data() + if Motor.SlaveID in self.motors_map: + if RID in self.motors_map[Motor.SlaveID].temp_param_dict: + return self.motors_map[Motor.SlaveID].temp_param_dict[RID] + return None + + # ------------------------------------------------- + # Extract packets from the serial data + def __extract_packets(self, data): + frames = [] + header = 0xAA + tail = 0x55 + frame_length = 16 + i = 0 + remainder_pos = 0 + + while i <= len(data) - frame_length: + if data[i] == header and data[i + frame_length - 1] == tail: + frame = data[i:i + frame_length] + frames.append(frame) + i += frame_length + remainder_pos = i + else: + i += 1 + self.data_save = data[remainder_pos:] + return frames + + +def LIMIT_MIN_MAX(x, min, max): + if x <= min: + x = min + elif x > max: + x = max + + +def float_to_uint(x: float, x_min: float, x_max: float, bits): + LIMIT_MIN_MAX(x, x_min, x_max) + span = x_max - x_min + data_norm = (x - x_min) / span + return np.uint16(data_norm * ((1 << bits) - 1)) + + +def uint_to_float(x: np.uint16, min: float, max: float, bits): + span = max - min + data_norm = float(x) / ((1 << bits) - 1) + temp = data_norm * span + min + return np.float32(temp) + + +def float_to_uint8s(value): + # Pack the float into 4 bytes + packed = pack('f', value) + # Unpack the bytes into four uint8 values + return unpack('4B', packed) + + +def data_to_uint8s(value): + # Check if the value is within the range of uint32 + if isinstance(value, int) and (0 <= value <= 0xFFFFFFFF): + # Pack the uint32 into 4 bytes + packed = pack('I', value) + else: + raise ValueError("Value must be an integer within the range of uint32") + + # Unpack the bytes into four uint8 values + return unpack('4B', packed) + + +def is_in_ranges(number): + """ + check if the number is in the range of uint32 + :param number: + :return: + """ + if (7 <= number <= 10) or (13 <= number <= 16) or (35 <= number <= 36): + return True + return False + + +def uint8s_to_uint32(byte1, byte2, byte3, byte4): + # Pack the four uint8 values into a single uint32 value in little-endian order + packed = pack('<4B', byte1, byte2, byte3, byte4) + # Unpack the packed bytes into a uint32 value + return unpack('