mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-25 13:40:00 +00:00
add damiao
This commit is contained in:
@@ -0,0 +1,18 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from .damiao import DamiaoMotorsBus
|
||||
from .tables import *
|
||||
@@ -0,0 +1,472 @@
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# TODO(pepijn): add license of: https://github.com/cmjang/DM_Control_Python?tab=MIT-1-ov-file#readme
|
||||
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from enum import Enum
|
||||
|
||||
from lerobot.motors.encoding_utils import decode_twos_complement, encode_twos_complement
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class OperatingMode(Enum):
|
||||
MIT = 0
|
||||
|
||||
|
||||
class DamiaoMotorsBus(MotorsBus):
|
||||
"""
|
||||
The Damiao implementation for a MotorsBus. It relies on the python-can library to communicate with
|
||||
the motors. For more info, see the python-can documentation: https://python-can.readthedocs.io/en/stable/, seedstudio documentation: https://wiki.seeedstudio.com/damiao_series/ and DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python
|
||||
https://wiki.seeedstudio.com/damiao_series/ and DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
port: str,
|
||||
):
|
||||
self.port = port
|
||||
|
||||
def configure_motors(self) -> None:
|
||||
for motor in self.motors
|
||||
|
||||
|
||||
@cached_property
|
||||
def models(self) -> list[str]:
|
||||
return [m.model for m in self.motors.values()]
|
||||
|
||||
@cached_property
|
||||
def ids(self) -> list[int]:
|
||||
return [m.id for m in self.motors.values()]
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""bool: `True` if the underlying serial port is open."""
|
||||
return self.port_handler.is_open
|
||||
|
||||
def connect(self, handshake: bool = True) -> None:
|
||||
"""Open the serial port and initialise communication.
|
||||
|
||||
Args:
|
||||
handshake (bool, optional): Pings every expected motor and performs additional
|
||||
integrity checks specific to the implementation. Defaults to `True`.
|
||||
|
||||
Raises:
|
||||
DeviceAlreadyConnectedError: The port is already open.
|
||||
ConnectionError: The underlying SDK failed to open the port or the handshake did not succeed.
|
||||
"""
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(
|
||||
f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice."
|
||||
)
|
||||
|
||||
self._connect(handshake)
|
||||
self.set_timeout()
|
||||
logger.debug(f"{self.__class__.__name__} connected.")
|
||||
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.calibration == self.read_calibration()
|
||||
|
||||
def read_calibration(self) -> dict[str, MotorCalibration]:
|
||||
offsets = self.sync_read("Homing_Offset", normalize=False)
|
||||
mins = self.sync_read("Min_Position_Limit", normalize=False)
|
||||
maxes = self.sync_read("Max_Position_Limit", normalize=False)
|
||||
drive_modes = self.sync_read("Drive_Mode", normalize=False)
|
||||
|
||||
calibration = {}
|
||||
for motor, m in self.motors.items():
|
||||
calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=drive_modes[motor],
|
||||
homing_offset=offsets[motor],
|
||||
range_min=mins[motor],
|
||||
range_max=maxes[motor],
|
||||
)
|
||||
|
||||
return calibration
|
||||
|
||||
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
|
||||
for motor, calibration in calibration_dict.items():
|
||||
self.write("Homing_Offset", motor, calibration.homing_offset)
|
||||
self.write("Min_Position_Limit", motor, calibration.range_min)
|
||||
self.write("Max_Position_Limit", motor, calibration.range_max)
|
||||
|
||||
if cache:
|
||||
self.calibration = calibration_dict
|
||||
|
||||
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
|
||||
for motor in self._get_motors_list(motors):
|
||||
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)
|
||||
|
||||
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
|
||||
for motor in self._get_motors_list(motors):
|
||||
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)
|
||||
|
||||
def disconnect(self, disable_torque: bool = True) -> None:
|
||||
"""Close the serial port (optionally disabling torque first).
|
||||
|
||||
Args:
|
||||
disable_torque (bool, optional): If `True` (default) torque is disabled on every motor before
|
||||
closing the port. This can prevent damaging motors if they are left applying resisting torque
|
||||
after disconnect.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(
|
||||
f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first."
|
||||
)
|
||||
|
||||
if disable_torque:
|
||||
self.port_handler.clearPort()
|
||||
self.port_handler.is_using = False
|
||||
self.disable_torque(num_retry=5)
|
||||
|
||||
self.port_handler.closePort()
|
||||
logger.debug(f"{self.__class__.__name__} disconnected.")
|
||||
|
||||
@classmethod
|
||||
def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]:
|
||||
"""Probe *port* at every supported baud-rate and list responding IDs.
|
||||
|
||||
Args:
|
||||
port (str): Serial/USB port to scan (e.g. ``"/dev/ttyUSB0"``).
|
||||
*args, **kwargs: Forwarded to the subclass constructor.
|
||||
|
||||
Returns:
|
||||
dict[int, list[int]]: Mapping *baud-rate → list of motor IDs*
|
||||
for every baud-rate that produced at least one response.
|
||||
"""
|
||||
bus = cls(port, {}, *args, **kwargs)
|
||||
bus._connect(handshake=False)
|
||||
baudrate_ids = {}
|
||||
for baudrate in tqdm(bus.available_baudrates, desc="Scanning port"):
|
||||
bus.set_baudrate(baudrate)
|
||||
ids_models = bus.broadcast_ping()
|
||||
if ids_models:
|
||||
tqdm.write(f"Motors found for {baudrate=}: {pformat(ids_models, indent=4)}")
|
||||
baudrate_ids[baudrate] = list(ids_models)
|
||||
|
||||
bus.port_handler.closePort()
|
||||
return baudrate_ids
|
||||
|
||||
def setup_motor(
|
||||
self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None
|
||||
) -> None:
|
||||
"""Assign the correct ID and baud-rate to a single motor.
|
||||
|
||||
This helper temporarily switches to the motor's current settings, disables torque, sets the desired
|
||||
ID, and finally programs the bus' default baud-rate.
|
||||
|
||||
Args:
|
||||
motor (str): Key of the motor in :pyattr:`motors`.
|
||||
initial_baudrate (int | None, optional): Current baud-rate (skips scanning when provided).
|
||||
Defaults to None.
|
||||
initial_id (int | None, optional): Current ID (skips scanning when provided). Defaults to None.
|
||||
|
||||
Raises:
|
||||
RuntimeError: The motor could not be found or its model number
|
||||
does not match the expected one.
|
||||
ConnectionError: Communication with the motor failed.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
self._connect(handshake=False)
|
||||
|
||||
if initial_baudrate is None:
|
||||
initial_baudrate, initial_id = self._find_single_motor(motor)
|
||||
|
||||
if initial_id is None:
|
||||
_, initial_id = self._find_single_motor(motor, initial_baudrate)
|
||||
|
||||
model = self.motors[motor].model
|
||||
target_id = self.motors[motor].id
|
||||
self.set_baudrate(initial_baudrate)
|
||||
self._disable_torque(initial_id, model)
|
||||
|
||||
# Set ID
|
||||
addr, length = get_address(self.model_ctrl_table, model, "ID")
|
||||
self._write(addr, length, initial_id, target_id)
|
||||
|
||||
# Set Baudrate
|
||||
addr, length = get_address(self.model_ctrl_table, model, "Baud_Rate")
|
||||
baudrate_value = self.model_baudrate_table[model][self.default_baudrate]
|
||||
self._write(addr, length, target_id, baudrate_value)
|
||||
|
||||
self.set_baudrate(self.default_baudrate)
|
||||
|
||||
@contextmanager
|
||||
def torque_disabled(self, motors: int | str | list[str] | None = None):
|
||||
"""Context-manager that guarantees torque is re-enabled.
|
||||
|
||||
This helper is useful to temporarily disable torque when configuring motors.
|
||||
|
||||
Examples:
|
||||
>>> with bus.torque_disabled():
|
||||
... # Safe operations here
|
||||
... pass
|
||||
"""
|
||||
self.disable_torque(motors)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self.enable_torque(motors)
|
||||
|
||||
def set_timeout(self, timeout_ms: int | None = None):
|
||||
"""Change the packet timeout used by the SDK.
|
||||
|
||||
Args:
|
||||
timeout_ms (int | None, optional): Timeout in *milliseconds*. If `None` (default) the method falls
|
||||
back to :pyattr:`default_timeout`.
|
||||
"""
|
||||
timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout
|
||||
self.port_handler.setPacketTimeoutMillis(timeout_ms)
|
||||
|
||||
def get_baudrate(self) -> int:
|
||||
"""Return the current baud-rate configured on the port.
|
||||
|
||||
Returns:
|
||||
int: Baud-rate in bits / second.
|
||||
"""
|
||||
return self.port_handler.getBaudRate()
|
||||
|
||||
def set_baudrate(self, baudrate: int) -> None:
|
||||
"""Set a new UART baud-rate on the port.
|
||||
|
||||
Args:
|
||||
baudrate (int): Desired baud-rate in bits / second.
|
||||
|
||||
Raises:
|
||||
RuntimeError: The SDK failed to apply the change.
|
||||
"""
|
||||
present_bus_baudrate = self.port_handler.getBaudRate()
|
||||
if present_bus_baudrate != baudrate:
|
||||
logger.info(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.")
|
||||
self.port_handler.setBaudRate(baudrate)
|
||||
|
||||
if self.port_handler.getBaudRate() != baudrate:
|
||||
raise RuntimeError("Failed to write bus baud rate.")
|
||||
|
||||
def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
|
||||
"""Restore factory calibration for the selected motors.
|
||||
|
||||
Homing offset is set to ``0`` and min/max position limits are set to the full usable range.
|
||||
The in-memory :pyattr:`calibration` is cleared.
|
||||
|
||||
Args:
|
||||
motors (NameOrID | list[NameOrID] | None, optional): Selection of motors. `None` (default)
|
||||
resets every motor.
|
||||
"""
|
||||
if motors is None:
|
||||
motors = list(self.motors)
|
||||
elif isinstance(motors, (str | int)):
|
||||
motors = [motors]
|
||||
elif not isinstance(motors, list):
|
||||
raise TypeError(motors)
|
||||
|
||||
for motor in motors:
|
||||
model = self._get_motor_model(motor)
|
||||
max_res = self.model_resolution_table[model] - 1
|
||||
self.write("Homing_Offset", motor, 0, normalize=False)
|
||||
self.write("Min_Position_Limit", motor, 0, normalize=False)
|
||||
self.write("Max_Position_Limit", motor, max_res, normalize=False)
|
||||
|
||||
self.calibration = {}
|
||||
|
||||
def record_ranges_of_motion(
|
||||
self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True
|
||||
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
|
||||
"""Interactively record the min/max encoder values of each motor.
|
||||
|
||||
Move the joints by hand (with torque disabled) while the method streams live positions. Press
|
||||
:kbd:`Enter` to finish.
|
||||
|
||||
Args:
|
||||
motors (NameOrID | list[NameOrID] | None, optional): Motors to record.
|
||||
Defaults to every motor (`None`).
|
||||
display_values (bool, optional): When `True` (default) a live table is printed to the console.
|
||||
|
||||
Returns:
|
||||
tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: Two dictionaries *mins* and *maxes* with the
|
||||
extreme values observed for each motor.
|
||||
"""
|
||||
if motors is None:
|
||||
motors = list(self.motors)
|
||||
elif isinstance(motors, (str | int)):
|
||||
motors = [motors]
|
||||
elif not isinstance(motors, list):
|
||||
raise TypeError(motors)
|
||||
|
||||
start_positions = self.sync_read("Present_Position", motors, normalize=False)
|
||||
mins = start_positions.copy()
|
||||
maxes = start_positions.copy()
|
||||
|
||||
user_pressed_enter = False
|
||||
while not user_pressed_enter:
|
||||
positions = self.sync_read("Present_Position", motors, normalize=False)
|
||||
mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()}
|
||||
maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()}
|
||||
|
||||
if display_values:
|
||||
print("\n-------------------------------------------")
|
||||
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
|
||||
for motor in motors:
|
||||
print(f"{motor:<15} | {mins[motor]:>6} | {positions[motor]:>6} | {maxes[motor]:>6}")
|
||||
|
||||
if enter_pressed():
|
||||
user_pressed_enter = True
|
||||
|
||||
if display_values and not user_pressed_enter:
|
||||
# Move cursor up to overwrite the previous output
|
||||
move_cursor_up(len(motors) + 3)
|
||||
|
||||
same_min_max = [motor for motor in motors if mins[motor] == maxes[motor]]
|
||||
if same_min_max:
|
||||
raise ValueError(f"Some motors have the same min and max values:\n{pformat(same_min_max)}")
|
||||
|
||||
return mins, maxes
|
||||
|
||||
def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]:
|
||||
if not self.calibration:
|
||||
raise RuntimeError(f"{self} has no calibration registered.")
|
||||
|
||||
normalized_values = {}
|
||||
for id_, val in ids_values.items():
|
||||
motor = self._id_to_name(id_)
|
||||
min_ = self.calibration[motor].range_min
|
||||
max_ = self.calibration[motor].range_max
|
||||
drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode
|
||||
if max_ == min_:
|
||||
raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")
|
||||
|
||||
bounded_val = min(max_, max(min_, val))
|
||||
if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100:
|
||||
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
|
||||
normalized_values[id_] = -norm if drive_mode else norm
|
||||
elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100:
|
||||
norm = ((bounded_val - min_) / (max_ - min_)) * 100
|
||||
normalized_values[id_] = 100 - norm if drive_mode else norm
|
||||
elif self.motors[motor].norm_mode is MotorNormMode.DEGREES:
|
||||
mid = (min_ + max_) / 2
|
||||
max_res = self.model_resolution_table[self._id_to_model(id_)] - 1
|
||||
normalized_values[id_] = (val - mid) * 360 / max_res
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
return normalized_values
|
||||
|
||||
def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]:
|
||||
if not self.calibration:
|
||||
raise RuntimeError(f"{self} has no calibration registered.")
|
||||
|
||||
unnormalized_values = {}
|
||||
for id_, val in ids_values.items():
|
||||
motor = self._id_to_name(id_)
|
||||
min_ = self.calibration[motor].range_min
|
||||
max_ = self.calibration[motor].range_max
|
||||
drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode
|
||||
if max_ == min_:
|
||||
raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")
|
||||
|
||||
if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100:
|
||||
val = -val if drive_mode else val
|
||||
bounded_val = min(100.0, max(-100.0, val))
|
||||
unnormalized_values[id_] = int(((bounded_val + 100) / 200) * (max_ - min_) + min_)
|
||||
elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100:
|
||||
val = 100 - val if drive_mode else val
|
||||
bounded_val = min(100.0, max(0.0, val))
|
||||
unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_)
|
||||
elif self.motors[motor].norm_mode is MotorNormMode.DEGREES:
|
||||
mid = (min_ + max_) / 2
|
||||
max_res = self.model_resolution_table[self._id_to_model(id_)] - 1
|
||||
unnormalized_values[id_] = int((val * max_res / 360) + mid)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
return unnormalized_values
|
||||
|
||||
def read(
|
||||
self,
|
||||
data_name: str,
|
||||
motor: str,
|
||||
*,
|
||||
normalize: bool = True,
|
||||
num_retry: int = 0,
|
||||
) -> Value:
|
||||
"""Read a register from a motor.
|
||||
|
||||
Args:
|
||||
data_name (str): Control-table key (e.g. `"Present_Position"`).
|
||||
motor (str): Motor name.
|
||||
normalize (bool, optional): When `True` (default) scale the value to a user-friendly range as
|
||||
defined by the calibration.
|
||||
num_retry (int, optional): Retry attempts. Defaults to `0`.
|
||||
|
||||
Returns:
|
||||
Value: Raw or normalised value depending on *normalize*.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(
|
||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||
)
|
||||
|
||||
id_ = self.motors[motor].id
|
||||
model = self.motors[motor].model
|
||||
addr, length = get_address(self.model_ctrl_table, model, data_name)
|
||||
|
||||
err_msg = f"Failed to read '{data_name}' on {id_=} after {num_retry + 1} tries."
|
||||
value, _, _ = self._read(addr, length, id_, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
|
||||
|
||||
id_value = self._decode_sign(data_name, {id_: value})
|
||||
|
||||
if normalize and data_name in self.normalized_data:
|
||||
id_value = self._normalize(id_value)
|
||||
|
||||
return id_value[id_]
|
||||
|
||||
def write(
|
||||
self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0
|
||||
) -> None:
|
||||
"""Write a value to a single motor's register.
|
||||
|
||||
Contrary to :pymeth:`sync_write`, this expects a response status packet emitted by the motor, which
|
||||
provides a guarantee that the value was written to the register successfully. In consequence, it is
|
||||
slower than :pymeth:`sync_write` but it is more reliable. It should typically be used when configuring
|
||||
motors.
|
||||
|
||||
Args:
|
||||
data_name (str): Register name.
|
||||
motor (str): Motor name.
|
||||
value (Value): Value to write. If *normalize* is `True` the value is first converted to raw
|
||||
units.
|
||||
normalize (bool, optional): Enable or disable normalisation. Defaults to `True`.
|
||||
num_retry (int, optional): Retry attempts. Defaults to `0`.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(
|
||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||
)
|
||||
|
||||
id_ = self.motors[motor].id
|
||||
model = self.motors[motor].model
|
||||
addr, length = get_address(self.model_ctrl_table, model, data_name)
|
||||
|
||||
if normalize and data_name in self.normalized_data:
|
||||
value = self._unnormalize({id_: value})[id_]
|
||||
|
||||
value = self._encode_sign(data_name, {id_: value})[id_]
|
||||
|
||||
err_msg = f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
|
||||
self._write(addr, length, id_, value, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
|
||||
|
||||
@@ -0,0 +1,833 @@
|
||||
## This is a derivative of the following software.
|
||||
## https://github.com/cmjang/DM_Control_Python/blob/main/DM_CAN.py
|
||||
|
||||
import can
|
||||
from time import sleep, time
|
||||
import numpy as np
|
||||
from enum import IntEnum
|
||||
from struct import unpack
|
||||
from struct import pack
|
||||
|
||||
class Motor:
|
||||
def __init__(self, MotorType, SlaveID, MasterID):
|
||||
"""
|
||||
define Motor object 定义电机对象
|
||||
:param MotorType: Motor type 电机类型
|
||||
:param SlaveID: CANID 电机ID
|
||||
:param MasterID: MasterID 主机ID 建议不要设为0
|
||||
"""
|
||||
self.Pd = float(0)
|
||||
self.Vd = float(0)
|
||||
self.goal_position = float(0)
|
||||
self.goal_tau = float(0)
|
||||
self.state_q = float(0)
|
||||
self.state_dq = float(0)
|
||||
self.state_tau = float(0)
|
||||
self.state_tmos = int(0)
|
||||
self.state_trotor = int(0)
|
||||
self.SlaveID = SlaveID
|
||||
self.MasterID = MasterID
|
||||
self.MotorType = MotorType
|
||||
self.isEnable = False
|
||||
self.NowControlMode = Control_Type.MIT
|
||||
self.temp_param_dict = {}
|
||||
|
||||
def recv_data(self, q: float, dq: float, tau: float, tmos: int, trotor: int):
|
||||
self.state_q = q
|
||||
self.state_dq = dq
|
||||
self.state_tau = tau
|
||||
self.state_tmos = tmos
|
||||
self.state_trotor = trotor
|
||||
|
||||
def getPosition(self):
|
||||
"""
|
||||
get the position of the motor 获取电机位置
|
||||
:return: the position of the motor 电机位置
|
||||
"""
|
||||
return self.state_q
|
||||
|
||||
def getVelocity(self):
|
||||
"""
|
||||
get the velocity of the motor 获取电机速度
|
||||
:return: the velocity of the motor 电机速度
|
||||
"""
|
||||
return self.state_dq
|
||||
|
||||
def getTorque(self):
|
||||
"""
|
||||
get the torque of the motor 获取电机力矩
|
||||
:return: the torque of the motor 电机力矩
|
||||
"""
|
||||
return self.state_tau
|
||||
|
||||
def getParam(self, RID):
|
||||
"""
|
||||
get the parameter of the motor 获取电机内部的参数,需要提前读取
|
||||
:param RID: DM_variable 电机参数
|
||||
:return: the parameter of the motor 电机参数
|
||||
"""
|
||||
if RID in self.temp_param_dict:
|
||||
return self.temp_param_dict[RID]
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class MotorControl:
|
||||
#send_data_frame = np.array(
|
||||
# [0x55, 0xAA, 0x1e, 0x03, 0x01, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0, 0, 0, 0, 0x00, 0x08, 0x00,
|
||||
# 0x00, 0, 0, 0, 0, 0, 0, 0, 0, 0x00], np.uint8)
|
||||
# 4310 4310_48 4340 4340_48
|
||||
Limit_Param = [[12.5, 30, 10], [12.5, 50, 10], [12.5, 8, 28], [12.5, 10, 28],
|
||||
# 6006 8006 8009 10010L 10010
|
||||
[12.5, 45, 20], [12.5, 45, 40], [12.5, 45, 54], [12.5, 25, 200], [12.5, 20, 200],
|
||||
# H3510 DMG62150 DMH6220
|
||||
[12.5 , 280 , 1],[12.5 , 45 , 10],[12.5 , 45 , 10]]
|
||||
|
||||
def __init__(self, channel: str, bitrate: int = 1000000):
|
||||
"""
|
||||
define MotorControl object 定义电机控制对象
|
||||
:param serial_device: serial object 串口对象
|
||||
"""
|
||||
#self.serial_ = serial_device
|
||||
self.motors_map = dict()
|
||||
self.data_save = bytes() # save data
|
||||
#if self.serial_.is_open: # open the serial port
|
||||
# print("Serial port is open")
|
||||
# serial_device.close()
|
||||
#self.serial_.open()
|
||||
self.canbus = can.interface.Bus(channel=channel, interface='socketcan', bitrate=bitrate)
|
||||
|
||||
#print("can is open")
|
||||
|
||||
|
||||
def controlMIT(self, DM_Motor, kp: float, kd: float, q: float, dq: float, tau: float):
|
||||
"""
|
||||
MIT Control Mode Function 达妙电机MIT控制模式函数
|
||||
:param DM_Motor: Motor object 电机对象
|
||||
:param kp: kp
|
||||
:param kd: kd
|
||||
:param q: position 期望位置
|
||||
:param dq: velocity 期望速度
|
||||
:param tau: torque 期望力矩
|
||||
:return: None
|
||||
"""
|
||||
if DM_Motor.SlaveID not in self.motors_map:
|
||||
print("controlMIT ERROR : Motor ID not found")
|
||||
return
|
||||
kp_uint = float_to_uint(kp, 0, 500, 12)
|
||||
kd_uint = float_to_uint(kd, 0, 5, 12)
|
||||
MotorType = DM_Motor.MotorType
|
||||
Q_MAX = self.Limit_Param[MotorType][0]
|
||||
DQ_MAX = self.Limit_Param[MotorType][1]
|
||||
TAU_MAX = self.Limit_Param[MotorType][2]
|
||||
q_uint = float_to_uint(q, -Q_MAX, Q_MAX, 16)
|
||||
dq_uint = float_to_uint(dq, -DQ_MAX, DQ_MAX, 12)
|
||||
tau_uint = float_to_uint(tau, -TAU_MAX, TAU_MAX, 12)
|
||||
data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8)
|
||||
data_buf[0] = (q_uint >> 8) & 0xff
|
||||
data_buf[1] = q_uint & 0xff
|
||||
data_buf[2] = dq_uint >> 4
|
||||
data_buf[3] = ((dq_uint & 0xf) << 4) | ((kp_uint >> 8) & 0xf)
|
||||
data_buf[4] = kp_uint & 0xff
|
||||
data_buf[5] = kd_uint >> 4
|
||||
data_buf[6] = ((kd_uint & 0xf) << 4) | ((tau_uint >> 8) & 0xf)
|
||||
data_buf[7] = tau_uint & 0xff
|
||||
self.__send_data(DM_Motor.SlaveID, data_buf)
|
||||
self.recv() # receive the data from serial port
|
||||
|
||||
def control_delay(self, DM_Motor, kp: float, kd: float, q: float, dq: float, tau: float, delay: float):
|
||||
"""
|
||||
MIT Control Mode Function with delay 达妙电机MIT控制模式函数带延迟
|
||||
:param DM_Motor: Motor object 电机对象
|
||||
:param kp: kp
|
||||
:param kd: kd
|
||||
:param q: position 期望位置
|
||||
:param dq: velocity 期望速度
|
||||
:param tau: torque 期望力矩
|
||||
:param delay: delay time 延迟时间 单位秒
|
||||
"""
|
||||
self.controlMIT(DM_Motor, kp, kd, q, dq, tau)
|
||||
sleep(delay)
|
||||
|
||||
def control_Pos_Vel(self, Motor, P_desired: float, V_desired: float):
|
||||
"""
|
||||
control the motor in position and velocity control mode 电机位置速度控制模式
|
||||
:param Motor: Motor object 电机对象
|
||||
:param P_desired: desired position 期望位置
|
||||
:param V_desired: desired velocity 期望速度
|
||||
:return: None
|
||||
"""
|
||||
if Motor.SlaveID not in self.motors_map:
|
||||
print("Control Pos_Vel Error : Motor ID not found")
|
||||
return
|
||||
motorid = 0x100 + Motor.SlaveID
|
||||
data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8)
|
||||
P_desired_uint8s = float_to_uint8s(P_desired)
|
||||
V_desired_uint8s = float_to_uint8s(V_desired)
|
||||
data_buf[0:4] = P_desired_uint8s
|
||||
data_buf[4:8] = V_desired_uint8s
|
||||
self.__send_data(motorid, data_buf)
|
||||
self.recv() # receive the data from serial port
|
||||
|
||||
def control_Vel(self, Motor, Vel_desired):
|
||||
"""
|
||||
control the motor in velocity control mode 电机速度控制模式
|
||||
:param Motor: Motor object 电机对象
|
||||
:param Vel_desired: desired velocity 期望速度
|
||||
"""
|
||||
if Motor.SlaveID not in self.motors_map:
|
||||
print("control_VEL ERROR : Motor ID not found")
|
||||
return
|
||||
motorid = 0x200 + Motor.SlaveID
|
||||
data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8)
|
||||
Vel_desired_uint8s = float_to_uint8s(Vel_desired)
|
||||
data_buf[0:4] = Vel_desired_uint8s
|
||||
self.__send_data(motorid, data_buf)
|
||||
self.recv() # receive the data from serial port
|
||||
|
||||
def control_pos_force(self, Motor, Pos_des: float, Vel_des, i_des):
|
||||
"""
|
||||
control the motor in EMIT control mode 电机力位混合模式
|
||||
:param Pos_des: desired position rad 期望位置 单位为rad
|
||||
:param Vel_des: desired velocity rad/s 期望速度 为放大100倍
|
||||
:param i_des: desired current rang 0-10000 期望电流标幺值放大10000倍
|
||||
电流标幺值:实际电流值除以最大电流值,最大电流见上电打印
|
||||
"""
|
||||
if Motor.SlaveID not in self.motors_map:
|
||||
print("control_pos_vel ERROR : Motor ID not found")
|
||||
return
|
||||
motorid = 0x300 + Motor.SlaveID
|
||||
data_buf = np.array([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8)
|
||||
Pos_desired_uint8s = float_to_uint8s(Pos_des)
|
||||
data_buf[0:4] = Pos_desired_uint8s
|
||||
Vel_uint = np.uint16(Vel_des)
|
||||
ides_uint = np.uint16(i_des)
|
||||
data_buf[4] = Vel_uint & 0xff
|
||||
data_buf[5] = Vel_uint >> 8
|
||||
data_buf[6] = ides_uint & 0xff
|
||||
data_buf[7] = ides_uint >> 8
|
||||
self.__send_data(motorid, data_buf)
|
||||
self.recv() # receive the data from serial port
|
||||
|
||||
def enable(self, Motor):
|
||||
"""
|
||||
enable motor 使能电机
|
||||
最好在上电后几秒后再使能电机
|
||||
:param Motor: Motor object 电机对象
|
||||
"""
|
||||
self.__control_cmd(Motor, np.uint8(0xFC))
|
||||
sleep(0.1)
|
||||
self.recv() # receive the data from serial port
|
||||
|
||||
def enable_old(self, Motor ,ControlMode):
|
||||
"""
|
||||
enable motor old firmware 使能电机旧版本固件,这个是为了旧版本电机固件的兼容性
|
||||
可恶的旧版本固件使能需要加上偏移量
|
||||
最好在上电后几秒后再使能电机
|
||||
:param Motor: Motor object 电机对象
|
||||
"""
|
||||
data_buf = np.array([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xfc], np.uint8)
|
||||
enable_id = ((int(ControlMode)-1) << 2) + Motor.SlaveID
|
||||
self.__send_data(enable_id, data_buf)
|
||||
sleep(0.1)
|
||||
self.recv() # receive the data from serial port
|
||||
|
||||
def disable(self, Motor):
|
||||
"""
|
||||
disable motor 失能电机
|
||||
:param Motor: Motor object 电机对象
|
||||
"""
|
||||
self.__control_cmd(Motor, np.uint8(0xFD))
|
||||
sleep(0.1)
|
||||
self.recv() # receive the data from serial port
|
||||
|
||||
def set_zero_position(self, Motor):
|
||||
"""
|
||||
set the zero position of the motor 设置电机0位
|
||||
:param Motor: Motor object 电机对象
|
||||
"""
|
||||
self.__control_cmd(Motor, np.uint8(0xFE))
|
||||
sleep(0.1)
|
||||
self.recv() # receive the data from serial port
|
||||
|
||||
def recv(self):
|
||||
# 把上次没有解析完的剩下的也放进来
|
||||
# data_recv = b''.join([self.data_save, self.serial_.read_all()])
|
||||
#data_recv = b''.join([self.data_save, self.canbus.recv()])
|
||||
|
||||
|
||||
# packets = self.__extract_packets(data_recv)
|
||||
# for packet in packets:
|
||||
# data = packet[7:15]
|
||||
# CANID = (packet[6] << 24) | (packet[5] << 16) | (packet[4] << 8) | packet[3]
|
||||
# CMD = packet[1]
|
||||
# self.__process_packet(data, CANID, CMD)
|
||||
|
||||
data_recv = self.canbus.recv(0.1)
|
||||
|
||||
if data_recv is not None:
|
||||
# data = data_recv.data
|
||||
# err = data[0] >> 12
|
||||
# id = data[0] & 0x7f
|
||||
# pos = (data[1] << 8) + data[2]
|
||||
# vel = (data[3] << 4) + (data[4] >> 4)
|
||||
# tau = ((data[4] & 0x0f) << 8) + data[5]
|
||||
# t_mos = data[6]
|
||||
# t_rotor = data[7]
|
||||
# print(hex(id), err, id, pos, vel, tau, goal_tau, t_mos, t_rotor)
|
||||
# CANID = data_recv.arbitration_id
|
||||
CANID = data_recv.data[0]
|
||||
# CMD = data_recv.data[3]
|
||||
CMD = 0x11 # 飯田:修正の必要あり
|
||||
self.__process_packet(data_recv.data, CANID, CMD)
|
||||
|
||||
# 飯田:Debug print
|
||||
# print(hex(CANID),hex(CMD))
|
||||
# print(hex(data_recv.data[0]),hex(data_recv.data[1]),hex(data_recv.data[2]),hex(data_recv.data[3]),hex(data_recv.data[4]),hex(data_recv.data[5]),hex(data_recv.data[6]),hex(data_recv.data[7]))
|
||||
#return data
|
||||
|
||||
def recv_set_param_data(self):
|
||||
#data_recv = self.serial_.read_all()
|
||||
|
||||
# packets = self.__extract_packets(data_recv)
|
||||
# for packet in packets:
|
||||
# data = packet[7:15]
|
||||
# CANID = (packet[6] << 24) | (packet[5] << 16) | (packet[4] << 8) | packet[3]
|
||||
# CMD = packet[1]
|
||||
# self.__process_set_param_packet(data, CANID, CMD)
|
||||
|
||||
data_recv = self.canbus.recv(0.1)
|
||||
|
||||
|
||||
if data_recv is not None:
|
||||
data = data_recv.data
|
||||
CANID = data_recv.arbitration_id
|
||||
# CANID = data_recv.data[0]
|
||||
# CMD = data_recv.data[3]
|
||||
CMD = 0x11 # 飯田:修正の必要あり
|
||||
self.__process_packet(data, CANID, CMD)
|
||||
|
||||
|
||||
# 飯田:Debug print
|
||||
print(hex(CANID),hex(CMD))
|
||||
print(hex(data_recv.data[0]),hex(data_recv.data[1]),hex(data_recv.data[2]),hex(data_recv.data[3]),hex(data_recv.data[4]),hex(data_recv.data[5]),hex(data_recv.data[6]),hex(data_recv.data[7]))
|
||||
|
||||
def __process_packet(self, data, CANID, CMD):
|
||||
if CMD == 0x11:
|
||||
if CANID != 0x00:
|
||||
if CANID in self.motors_map:
|
||||
q_uint = np.uint16((np.uint16(data[1]) << 8) | data[2])
|
||||
dq_uint = np.uint16((np.uint16(data[3]) << 4) | (data[4] >> 4))
|
||||
tau_uint = np.uint16(((data[4] & 0xf) << 8) | data[5])
|
||||
t_mos = data[6]
|
||||
t_rotor = data[7]
|
||||
MotorType_recv = self.motors_map[CANID].MotorType
|
||||
Q_MAX = self.Limit_Param[MotorType_recv][0]
|
||||
DQ_MAX = self.Limit_Param[MotorType_recv][1]
|
||||
TAU_MAX = self.Limit_Param[MotorType_recv][2]
|
||||
recv_q = uint_to_float(q_uint, -Q_MAX, Q_MAX, 16)
|
||||
recv_dq = uint_to_float(dq_uint, -DQ_MAX, DQ_MAX, 12)
|
||||
recv_tau = uint_to_float(tau_uint, -TAU_MAX, TAU_MAX, 12)
|
||||
self.motors_map[CANID].recv_data(recv_q, recv_dq, recv_tau, t_mos, t_rotor)
|
||||
else:
|
||||
MasterID=data[0] & 0x0f
|
||||
if MasterID in self.motors_map:
|
||||
q_uint = np.uint16((np.uint16(data[1]) << 8) | data[2])
|
||||
dq_uint = np.uint16((np.uint16(data[3]) << 4) | (data[4] >> 4))
|
||||
tau_uint = np.uint16(((data[4] & 0xf) << 8) | data[5])
|
||||
t_mos = data[6]
|
||||
t_rotor = data[7]
|
||||
MotorType_recv = self.motors_map[MasterID].MotorType
|
||||
Q_MAX = self.Limit_Param[MotorType_recv][0]
|
||||
DQ_MAX = self.Limit_Param[MotorType_recv][1]
|
||||
TAU_MAX = self.Limit_Param[MotorType_recv][2]
|
||||
recv_q = uint_to_float(q_uint, -Q_MAX, Q_MAX, 16)
|
||||
recv_dq = uint_to_float(dq_uint, -DQ_MAX, DQ_MAX, 12)
|
||||
recv_tau = uint_to_float(tau_uint, -TAU_MAX, TAU_MAX, 12)
|
||||
self.motors_map[MasterID].recv_data(recv_q, recv_dq, recv_tau, t_mos, t_rotor)
|
||||
|
||||
|
||||
def __process_set_param_packet(self, data, CANID, CMD):
|
||||
if CMD == 0x11 and (data[2] == 0x33 or data[2] == 0x55):
|
||||
masterid=CANID
|
||||
slaveId = ((data[1] << 8) | data[0])
|
||||
if CANID==0x00: #防止有人把MasterID设为0稳一手
|
||||
masterid=slaveId
|
||||
|
||||
if masterid not in self.motors_map:
|
||||
if slaveId not in self.motors_map:
|
||||
return
|
||||
else:
|
||||
masterid=slaveId
|
||||
|
||||
RID = data[3]
|
||||
# 读取参数得到的数据
|
||||
if is_in_ranges(RID):
|
||||
#uint32类型
|
||||
num = uint8s_to_uint32(data[4], data[5], data[6], data[7])
|
||||
self.motors_map[masterid].temp_param_dict[RID] = num
|
||||
|
||||
else:
|
||||
#float类型
|
||||
num = uint8s_to_float(data[4], data[5], data[6], data[7])
|
||||
self.motors_map[masterid].temp_param_dict[RID] = num
|
||||
|
||||
|
||||
def addMotor(self, Motor):
|
||||
"""
|
||||
add motor to the motor control object 添加电机到电机控制对象
|
||||
:param Motor: Motor object 电机对象
|
||||
"""
|
||||
self.motors_map[Motor.SlaveID] = Motor
|
||||
if Motor.MasterID != 0:
|
||||
self.motors_map[Motor.MasterID] = Motor
|
||||
return True
|
||||
|
||||
def __control_cmd(self, Motor, cmd: np.uint8): # 飯田:コマンドは通ります
|
||||
data_buf = np.array([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, cmd], np.uint8)
|
||||
self.__send_data(Motor.SlaveID, data_buf)
|
||||
|
||||
def __send_data(self, motor_id, data):
|
||||
"""
|
||||
send data to the motor 发送数据到电机
|
||||
:param motor_id:
|
||||
:param data:
|
||||
:return:
|
||||
"""
|
||||
#self.send_data_frame[13] = motor_id & 0xff
|
||||
#self.send_data_frame[14] = (motor_id >> 8)& 0xff #id high 8 bits
|
||||
#self.send_data_frame[21:29] = data
|
||||
#self.serial_.write(bytes(self.send_data_frame.T))
|
||||
|
||||
msg =can.Message(is_extended_id=False,arbitration_id=motor_id,data=data,is_remote_frame = False)
|
||||
self.canbus.send(msg)
|
||||
|
||||
def __read_RID_param(self, Motor, RID): # 飯田:修正の必要あり?
|
||||
can_id_l = Motor.SlaveID & 0xff #id low 8 bits
|
||||
can_id_h = (Motor.SlaveID >> 8)& 0xff #id high 8 bits
|
||||
data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0x33, np.uint8(RID), 0x00, 0x00, 0x00, 0x00], np.uint8)
|
||||
self.__send_data(0x7FF, data_buf)
|
||||
|
||||
|
||||
|
||||
def __write_motor_param(self, Motor, RID, data): # 飯田:修正の必要あり?
|
||||
can_id_l = Motor.SlaveID & 0xff #id low 8 bits
|
||||
can_id_h = (Motor.SlaveID >> 8)& 0xff #id high 8 bits
|
||||
data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0x55, np.uint8(RID), 0x00, 0x00, 0x00, 0x00], np.uint8)
|
||||
if not is_in_ranges(RID):
|
||||
# data is float
|
||||
data_buf[4:8] = float_to_uint8s(data)
|
||||
else:
|
||||
# data is int
|
||||
data_buf[4:8] = data_to_uint8s(int(data))
|
||||
self.__send_data(0x7FF, data_buf)
|
||||
|
||||
def switchControlMode(self, Motor, ControlMode):
|
||||
"""
|
||||
switch the control mode of the motor 切换电机控制模式
|
||||
:param Motor: Motor object 电机对象
|
||||
:param ControlMode: Control_Type 电机控制模式 example:MIT:Control_Type.MIT MIT模式
|
||||
"""
|
||||
max_retries = 20
|
||||
retry_interval = 0.1 #retry times
|
||||
RID = 10
|
||||
self.__write_motor_param(Motor, RID, np.uint8(ControlMode))
|
||||
for _ in range(max_retries):
|
||||
sleep(retry_interval)
|
||||
self.recv_set_param_data()
|
||||
if Motor.SlaveID in self.motors_map:
|
||||
if RID in self.motors_map[Motor.SlaveID].temp_param_dict:
|
||||
if abs(self.motors_map[Motor.SlaveID].temp_param_dict[RID] - ControlMode) < 0.1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
return False
|
||||
|
||||
def save_motor_param(self, Motor):
|
||||
"""
|
||||
save the all parameter to flash 保存所有电机参数
|
||||
:param Motor: Motor object 电机对象
|
||||
:return:
|
||||
"""
|
||||
can_id_l = Motor.SlaveID & 0xff #id low 8 bits
|
||||
can_id_h = (Motor.SlaveID >> 8)& 0xff #id high 8 bits
|
||||
data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0xAA, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8)
|
||||
self.disable(Motor) # before save disable the motor
|
||||
self.__send_data(0x7FF, data_buf)
|
||||
sleep(0.001)
|
||||
|
||||
def change_limit_param(self, Motor_Type, PMAX, VMAX, TMAX):
|
||||
"""
|
||||
change the PMAX VMAX TMAX of the motor 改变电机的PMAX VMAX TMAX
|
||||
:param Motor_Type:
|
||||
:param PMAX: 电机的PMAX
|
||||
:param VMAX: 电机的VMAX
|
||||
:param TMAX: 电机的TMAX
|
||||
:return:
|
||||
"""
|
||||
self.Limit_Param[Motor_Type][0] = PMAX
|
||||
self.Limit_Param[Motor_Type][1] = VMAX
|
||||
self.Limit_Param[Motor_Type][2] = TMAX
|
||||
|
||||
def refresh_motor_status(self,Motor):
|
||||
"""
|
||||
get the motor status 获得电机状态
|
||||
"""
|
||||
can_id_l = Motor.SlaveID & 0xff #id low 8 bits
|
||||
can_id_h = (Motor.SlaveID >> 8) & 0xff #id high 8 bits
|
||||
data_buf = np.array([np.uint8(can_id_l), np.uint8(can_id_h), 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00], np.uint8)
|
||||
self.__send_data(0x7FF, data_buf)
|
||||
self.recv() # receive the data from serial port
|
||||
|
||||
def change_motor_param(self, Motor, RID, data):
|
||||
"""
|
||||
change the RID of the motor 改变电机的参数
|
||||
:param Motor: Motor object 电机对象
|
||||
:param RID: DM_variable 电机参数
|
||||
:param data: 电机参数的值
|
||||
:return: True or False ,True means success, False means fail
|
||||
"""
|
||||
max_retries = 20
|
||||
retry_interval = 0.05 #retry times
|
||||
|
||||
self.__write_motor_param(Motor, RID, data)
|
||||
for _ in range(max_retries):
|
||||
self.recv_set_param_data()
|
||||
if Motor.SlaveID in self.motors_map and RID in self.motors_map[Motor.SlaveID].temp_param_dict:
|
||||
if abs(self.motors_map[Motor.SlaveID].temp_param_dict[RID] - data) < 0.1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
sleep(retry_interval)
|
||||
return False
|
||||
|
||||
def read_motor_param(self, Motor, RID):
|
||||
"""
|
||||
read only the RID of the motor 读取电机的内部信息例如 版本号等
|
||||
:param Motor: Motor object 电机对象
|
||||
:param RID: DM_variable 电机参数
|
||||
:return: 电机参数的值
|
||||
"""
|
||||
max_retries = 5
|
||||
retry_interval = 0.05 #retry times
|
||||
self.__read_RID_param(Motor, RID)
|
||||
for _ in range(max_retries):
|
||||
sleep(retry_interval)
|
||||
self.recv_set_param_data()
|
||||
if Motor.SlaveID in self.motors_map:
|
||||
if RID in self.motors_map[Motor.SlaveID].temp_param_dict:
|
||||
return self.motors_map[Motor.SlaveID].temp_param_dict[RID]
|
||||
return None
|
||||
|
||||
# -------------------------------------------------
|
||||
# Extract packets from the serial data
|
||||
def __extract_packets(self, data):
|
||||
frames = []
|
||||
header = 0xAA
|
||||
tail = 0x55
|
||||
frame_length = 16
|
||||
i = 0
|
||||
remainder_pos = 0
|
||||
|
||||
while i <= len(data) - frame_length:
|
||||
if data[i] == header and data[i + frame_length - 1] == tail:
|
||||
frame = data[i:i + frame_length]
|
||||
frames.append(frame)
|
||||
i += frame_length
|
||||
remainder_pos = i
|
||||
else:
|
||||
i += 1
|
||||
self.data_save = data[remainder_pos:]
|
||||
return frames
|
||||
|
||||
|
||||
def LIMIT_MIN_MAX(x, min, max):
|
||||
if x <= min:
|
||||
x = min
|
||||
elif x > max:
|
||||
x = max
|
||||
|
||||
|
||||
def float_to_uint(x: float, x_min: float, x_max: float, bits):
|
||||
LIMIT_MIN_MAX(x, x_min, x_max)
|
||||
span = x_max - x_min
|
||||
data_norm = (x - x_min) / span
|
||||
return np.uint16(data_norm * ((1 << bits) - 1))
|
||||
|
||||
|
||||
def uint_to_float(x: np.uint16, min: float, max: float, bits):
|
||||
span = max - min
|
||||
data_norm = float(x) / ((1 << bits) - 1)
|
||||
temp = data_norm * span + min
|
||||
return np.float32(temp)
|
||||
|
||||
|
||||
def float_to_uint8s(value):
|
||||
# Pack the float into 4 bytes
|
||||
packed = pack('f', value)
|
||||
# Unpack the bytes into four uint8 values
|
||||
return unpack('4B', packed)
|
||||
|
||||
|
||||
def data_to_uint8s(value):
|
||||
# Check if the value is within the range of uint32
|
||||
if isinstance(value, int) and (0 <= value <= 0xFFFFFFFF):
|
||||
# Pack the uint32 into 4 bytes
|
||||
packed = pack('I', value)
|
||||
else:
|
||||
raise ValueError("Value must be an integer within the range of uint32")
|
||||
|
||||
# Unpack the bytes into four uint8 values
|
||||
return unpack('4B', packed)
|
||||
|
||||
|
||||
def is_in_ranges(number):
|
||||
"""
|
||||
check if the number is in the range of uint32
|
||||
:param number:
|
||||
:return:
|
||||
"""
|
||||
if (7 <= number <= 10) or (13 <= number <= 16) or (35 <= number <= 36):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def uint8s_to_uint32(byte1, byte2, byte3, byte4):
|
||||
# Pack the four uint8 values into a single uint32 value in little-endian order
|
||||
packed = pack('<4B', byte1, byte2, byte3, byte4)
|
||||
# Unpack the packed bytes into a uint32 value
|
||||
return unpack('<I', packed)[0]
|
||||
|
||||
|
||||
def uint8s_to_float(byte1, byte2, byte3, byte4):
|
||||
# Pack the four uint8 values into a single float value in little-endian order
|
||||
packed = pack('<4B', byte1, byte2, byte3, byte4)
|
||||
# Unpack the packed bytes into a float value
|
||||
return unpack('<f', packed)[0]
|
||||
|
||||
|
||||
def print_hex(data):
|
||||
hex_values = [f'{byte:02X}' for byte in data]
|
||||
print(' '.join(hex_values))
|
||||
|
||||
|
||||
def get_enum_by_index(index, enum_class):
|
||||
try:
|
||||
return enum_class(index)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
class DM_Motor_Type(IntEnum):
|
||||
DM4310 = 0
|
||||
DM4310_48V = 1
|
||||
DM4340 = 2
|
||||
DM4340_48V = 3
|
||||
DM6006 = 4
|
||||
DM8006 = 5
|
||||
DM8009 = 6
|
||||
DM10010L = 7
|
||||
DM10010 = 8
|
||||
DMH3510 = 9
|
||||
DMH6215 = 10
|
||||
DMG6220 = 11
|
||||
|
||||
|
||||
class DM_variable(IntEnum):
|
||||
UV_Value = 0
|
||||
KT_Value = 1
|
||||
OT_Value = 2
|
||||
OC_Value = 3
|
||||
ACC = 4
|
||||
DEC = 5
|
||||
MAX_SPD = 6
|
||||
MST_ID = 7
|
||||
ESC_ID = 8
|
||||
TIMEOUT = 9
|
||||
CTRL_MODE = 10
|
||||
Damp = 11
|
||||
Inertia = 12
|
||||
hw_ver = 13
|
||||
sw_ver = 14
|
||||
SN = 15
|
||||
NPP = 16
|
||||
Rs = 17
|
||||
LS = 18
|
||||
Flux = 19
|
||||
Gr = 20
|
||||
PMAX = 21
|
||||
VMAX = 22
|
||||
TMAX = 23
|
||||
I_BW = 24
|
||||
KP_ASR = 25
|
||||
KI_ASR = 26
|
||||
KP_APR = 27
|
||||
KI_APR = 28
|
||||
OV_Value = 29
|
||||
GREF = 30
|
||||
Deta = 31
|
||||
V_BW = 32
|
||||
IQ_c1 = 33
|
||||
VL_c1 = 34
|
||||
can_br = 35
|
||||
sub_ver = 36
|
||||
u_off = 50
|
||||
v_off = 51
|
||||
k1 = 52
|
||||
k2 = 53
|
||||
m_off = 54
|
||||
dir = 55
|
||||
p_m = 80
|
||||
xout = 81
|
||||
|
||||
|
||||
class Control_Type(IntEnum):
|
||||
MIT = 1
|
||||
POS_VEL = 2
|
||||
VEL = 3
|
||||
Torque_Pos = 4
|
||||
|
||||
class DamiaoPort:
|
||||
def __init__(self, device, types, can_ids, master_ids, motor_with_torque, control_mode=Control_Type.MIT):
|
||||
self.device = device
|
||||
self.types = types
|
||||
self.can_ids = can_ids
|
||||
self.master_ids = master_ids
|
||||
self.control = MotorControl(self.device, bitrate=4000000)
|
||||
self.motors = [Motor(type, can_id, master_id) for type, can_id, master_id in zip(types, can_ids, master_ids)]
|
||||
self.stat_data = []
|
||||
self.stat_time = []
|
||||
for motor in self.motors:
|
||||
self.control.addMotor(motor)
|
||||
self.control.enable(motor)
|
||||
|
||||
def get_present_status(self):
|
||||
self.stat_time.append(time())
|
||||
stat = [[
|
||||
motor.goal_position,
|
||||
motor.goal_tau,
|
||||
motor.getPosition(),
|
||||
motor.getVelocity(),
|
||||
motor.getTorque(),
|
||||
motor.state_tmos,
|
||||
motor.state_trotor,
|
||||
] for motor in self.motors]
|
||||
self.stat_data.append(stat)
|
||||
|
||||
return stat
|
||||
|
||||
def save_status(self, filename):
|
||||
np.savez(filename, np.array(self.stat_time), np.array(self.stat_data))
|
||||
|
||||
def disable(self):
|
||||
for motor in self.motors:
|
||||
self.control.disable(motor)
|
||||
|
||||
def shutdown(self):
|
||||
for motor in self.motors:
|
||||
self.control.controlMIT(motor, 0, 0, 0, 0, 0)
|
||||
self.control.canbus.shutdown()
|
||||
|
||||
def set_zero_position(self):
|
||||
for motor in self.motors:
|
||||
self.control.disable(motor)
|
||||
sleep(1)
|
||||
for motor in self.motors:
|
||||
self.control.set_zero_position(motor)
|
||||
sleep(1)
|
||||
for motor in self.motors:
|
||||
self.control.enable(motor)
|
||||
return 0
|
||||
|
||||
async def move_towards(self, goal_positions, kps, kds):
|
||||
for motor, goal_position, kp, kd in zip(self.motors, goal_positions, kps, kds):
|
||||
delta = goal_position - motor.getPosition()
|
||||
v = motor.getVelocity()
|
||||
tau = kp * delta - kd * v
|
||||
motor.goal_position = goal_position
|
||||
motor.goal_tau = tau
|
||||
self.control.controlMIT(motor, 0, 0, 0, 0, tau)
|
||||
await asyncio.sleep(0.00003)
|
||||
|
||||
def move_regressor_sync(self, regs, search_range, search_step, goal_positions, kps, kds):
|
||||
TORQUE_SCALER=30
|
||||
if len(self.stat_data) == 0:
|
||||
return self.move_towards_sync(goal_positions, kps, kds)
|
||||
for motor, reg, goal_position, kp, kd, stat in zip(
|
||||
self.motors, regs, goal_positions, kps, kds, self.stat_data[-1]):
|
||||
pos = motor.getPosition()
|
||||
vel = motor.getVelocity()
|
||||
delta = goal_position - pos
|
||||
goal_tau = kp * delta - kd * vel
|
||||
_goal_pos, _goal_tau, _pos, _vel, _tau = stat
|
||||
x = np.array([[_pos, _vel, _tau, _goal_pos, _goal_tau],
|
||||
[pos, vel, motor.getTorque(), goal_position, goal_tau]])
|
||||
x /= np.array([[np.pi, 10, TORQUE_SCALER, np.pi, TORQUE_SCALER]])
|
||||
xs = []
|
||||
for tau in np.linspace(goal_tau/TORQUE_SCALER - search_range,
|
||||
goal_tau/TORQUE_SCALER + search_range,
|
||||
num=search_step):
|
||||
x_ = x.copy()
|
||||
x_[0,4] = tau
|
||||
xs.append(x_.flatten())
|
||||
h = reg.predict(xs)
|
||||
diff = h - goal_position
|
||||
tau = TORQUE_SCALER * xs[np.argmin(diff ** 2)][4]
|
||||
goal_tau = tau
|
||||
motor.goal_position = goal_position
|
||||
motor.goal_tau = goal_tau
|
||||
self.control.controlMIT(motor, 0, 0, 0, 0, goal_tau)
|
||||
sleep(0.00003)
|
||||
|
||||
def move_towards_sync(self, goal_positions, kps, kds):
|
||||
for motor, goal_position, kp, kd in zip(self.motors, goal_positions, kps, kds):
|
||||
delta = goal_position - motor.getPosition()
|
||||
v = motor.getVelocity()
|
||||
tau = kp * delta - kd * v
|
||||
motor.goal_position = goal_position
|
||||
motor.goal_tau = tau
|
||||
self.control.controlMIT(motor, 0, 0, 0, 0, tau)
|
||||
# sleep(0.00003)
|
||||
|
||||
def set_goal_torque_sync(self, goal_taus):
|
||||
for motor, goal_tau in zip(self.motors, goal_taus):
|
||||
motor.goal_position = 0
|
||||
motor.goal_tau = goal_tau
|
||||
self.control.controlMIT(motor, 0, 0, 0, 0, motor.goal_tau)
|
||||
sleep(0.00003)
|
||||
|
||||
def move_torque_sync(self, taus):
|
||||
for motor,tau in zip(self.motors, taus):
|
||||
motor.goal_position = 0
|
||||
motor.goal_tau = tau
|
||||
self.control.controlMIT(motor, 0, 0, 0, 0, motor.goal_tau)
|
||||
sleep(0.00003)
|
||||
|
||||
def keep_torque_sync(self):
|
||||
for motor in self.motors:
|
||||
self.control.controlMIT(motor, 0, 0, 0, 0, motor.goal_tau)
|
||||
sleep(0.00003)
|
||||
|
||||
async def set_goal_positions(self, goal_positions, kps):
|
||||
for motor, goal_position, kp in zip(self.motors, goal_positions, kps):
|
||||
motor.goal_position = goal_position
|
||||
motor.goal_tau = 0
|
||||
self.control.controlMIT(motor, kp, 1.2, goal_position, 0, 0)
|
||||
await asyncio.sleep(0.00003)
|
||||
|
||||
def set_goal_positions_sync(self, goal_positions, kps, kds):
|
||||
for motor, goal_position, kp, kd in zip(self.motors, goal_positions, kps, kds):
|
||||
motor.goal_position = goal_position
|
||||
motor.goal_tau = 0
|
||||
self.control.controlMIT(motor, kp, kd, goal_position, 0, 0)
|
||||
sleep(0.00003)
|
||||
|
||||
def set_goal_posvel(self, goal_positions):
|
||||
for motor, goal_position in zip(self.motors, goal_positions):
|
||||
motor.goal_position = goal_position
|
||||
motor.goal_tau = 0
|
||||
self.control.control_pos_force(motor, goal_position, 1, 1)
|
||||
|
||||
def controlMIT(self, motor, kp, kd, q, dq, tau):
|
||||
self.control.controlMIT(self.motors[motor], kp, kd, q, dq, tau)
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# TODO(pepijn): add license of: https://github.com/cmjang/DM_Control_Python?tab=MIT-1-ov-file#readme
|
||||
|
||||
import time
|
||||
import math
|
||||
import numpy as np
|
||||
from damiao_motor import Motor, MotorControl, DM_Motor_Type, Control_Type
|
||||
|
||||
# Configuration parameters
|
||||
NUM_MOTORS = 1 # Number of motors to control
|
||||
CAN_INTERFACE = "can0" # CAN interface name
|
||||
CAN_BITRATE = 1000000 # CAN bus baud rate
|
||||
MOTOR_TYPE = DM_Motor_Type.DM4310 # Motor model
|
||||
|
||||
# Sine wave parameters
|
||||
FREQUENCY = 0.1 # Frequency (Hz)
|
||||
AMPLITUDE = 6 # Amplitude (rad)
|
||||
DURATION = 60.0 # Operation duration (s)
|
||||
|
||||
def main():
|
||||
# Create motor controller object
|
||||
control = MotorControl(CAN_INTERFACE, bitrate=CAN_BITRATE)
|
||||
|
||||
# Create and add motors
|
||||
motors = []
|
||||
for i in range(NUM_MOTORS):
|
||||
motor = Motor(MOTOR_TYPE, i + 1, i + 0X10) # CAN IDs start from 1
|
||||
control.addMotor(motor)
|
||||
motors.append(motor)
|
||||
control.enable(motor)
|
||||
print(f"Motor {i + 1} enabled")
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < DURATION:
|
||||
current_time = time.time() - start_time
|
||||
|
||||
# Calculate sine wave position
|
||||
position = AMPLITUDE * math.sin(2 * math.pi * FREQUENCY * current_time)
|
||||
|
||||
# Control all motors
|
||||
for motor in motors:
|
||||
control.controlMIT(
|
||||
motor,
|
||||
kp=10.0, # Position gain
|
||||
kd=1.0, # Velocity gain
|
||||
q=position, # Target position
|
||||
dq=0.0, # Target velocity
|
||||
tau=0.0 # Feedforward torque
|
||||
)
|
||||
|
||||
# Control frequency
|
||||
time.sleep(0.001) # 1kHz control frequency
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nProgram interrupted by user")
|
||||
finally:
|
||||
# Disable all motors
|
||||
for motor in motors:
|
||||
control.disable(motor)
|
||||
print(f"Motor {motor.SlaveID} disabled")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user