mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-15 16:49:55 +00:00
add tests and debug
This commit is contained in:
@@ -0,0 +1,166 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Debug script for Damiao motor CAN communication on macOS.
|
||||
This replaces candump for macOS SLCAN debugging.
|
||||
"""
|
||||
|
||||
import can
|
||||
import time
|
||||
import sys
|
||||
|
||||
def test_can_communication(port="/dev/cu.usbmodem2101"):
|
||||
"""Test basic CAN communication with a Damiao motor."""
|
||||
|
||||
print("=" * 60)
|
||||
print("Damiao Motor CAN Communication Debug Tool")
|
||||
print("=" * 60)
|
||||
print(f"\nPort: {port}")
|
||||
print()
|
||||
|
||||
try:
|
||||
# Connect to SLCAN
|
||||
print("Step 1: Connecting to SLCAN...")
|
||||
bus = can.interface.Bus(
|
||||
channel=port,
|
||||
interface='slcan',
|
||||
bitrate=1000000
|
||||
)
|
||||
print("✓ Connected to SLCAN")
|
||||
|
||||
# Test 1: Send enable command and listen for ANY response
|
||||
print("\n" + "=" * 60)
|
||||
print("Test 1: Enable Motor (ID 0x01)")
|
||||
print("=" * 60)
|
||||
print("Sending enable command to 0x01...")
|
||||
|
||||
enable_msg = can.Message(
|
||||
arbitration_id=0x01,
|
||||
data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC],
|
||||
is_extended_id=False
|
||||
)
|
||||
bus.send(enable_msg)
|
||||
print("✓ Enable command sent")
|
||||
|
||||
print("\nListening for responses (2 seconds)...")
|
||||
print("Expected: Response from 0x11 (master ID)")
|
||||
print()
|
||||
|
||||
responses = []
|
||||
start_time = time.time()
|
||||
timeout = 2.0
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
msg = bus.recv(timeout=0.1)
|
||||
if msg:
|
||||
responses.append(msg)
|
||||
print(f" → Response from 0x{msg.arbitration_id:02X}: {msg.data.hex()}")
|
||||
|
||||
if not responses:
|
||||
print("✗ NO RESPONSES RECEIVED")
|
||||
print("\nPossible issues:")
|
||||
print(" 1. Motor not powered (check 24V supply)")
|
||||
print(" 2. CAN wiring incorrect (CANH, CANL, GND)")
|
||||
print(" 3. Motor master ID not set to 0x11")
|
||||
print(" 4. SLCAN adapter not working properly")
|
||||
print(" 5. Wrong CAN port specified")
|
||||
else:
|
||||
print(f"\n✓ Received {len(responses)} response(s)")
|
||||
|
||||
# Check if we got response from expected ID
|
||||
recv_ids = [msg.arbitration_id for msg in responses]
|
||||
if 0x11 in recv_ids:
|
||||
print("✓ Motor 0x11 is responding!")
|
||||
else:
|
||||
print(f"⚠ Responses from unexpected IDs: {[hex(id) for id in recv_ids]}")
|
||||
|
||||
# Test 2: Send refresh command
|
||||
print("\n" + "=" * 60)
|
||||
print("Test 2: Refresh Motor State (ID 0x01)")
|
||||
print("=" * 60)
|
||||
print("Sending refresh command...")
|
||||
|
||||
refresh_msg = can.Message(
|
||||
arbitration_id=0x7FF, # Parameter ID
|
||||
data=[0x01, 0x00, 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00],
|
||||
is_extended_id=False
|
||||
)
|
||||
bus.send(refresh_msg)
|
||||
print("✓ Refresh command sent")
|
||||
|
||||
print("\nListening for responses (2 seconds)...")
|
||||
responses = []
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
msg = bus.recv(timeout=0.1)
|
||||
if msg:
|
||||
responses.append(msg)
|
||||
print(f" → Response from 0x{msg.arbitration_id:02X}: {msg.data.hex()}")
|
||||
|
||||
if not responses:
|
||||
print("✗ NO RESPONSES RECEIVED")
|
||||
else:
|
||||
print(f"\n✓ Received {len(responses)} response(s)")
|
||||
|
||||
# Test 3: Listen for any spontaneous traffic
|
||||
print("\n" + "=" * 60)
|
||||
print("Test 3: Listen for Any CAN Traffic")
|
||||
print("=" * 60)
|
||||
print("Listening for 5 seconds...")
|
||||
print("(This will catch any background CAN traffic)")
|
||||
print()
|
||||
|
||||
start_time = time.time()
|
||||
traffic_count = 0
|
||||
|
||||
while time.time() - start_time < 5.0:
|
||||
msg = bus.recv(timeout=0.1)
|
||||
if msg:
|
||||
traffic_count += 1
|
||||
print(f" [{time.time() - start_time:.2f}s] ID=0x{msg.arbitration_id:03X}: {msg.data.hex()}")
|
||||
|
||||
if traffic_count == 0:
|
||||
print("✗ No CAN traffic detected at all")
|
||||
print("\nThis suggests:")
|
||||
print(" - SLCAN adapter may not be working")
|
||||
print(" - No devices on the CAN bus are active")
|
||||
print(" - Wrong port specified")
|
||||
else:
|
||||
print(f"\n✓ Detected {traffic_count} CAN messages")
|
||||
|
||||
# Cleanup
|
||||
print("\n" + "=" * 60)
|
||||
print("Cleanup")
|
||||
print("=" * 60)
|
||||
print("Sending disable command...")
|
||||
disable_msg = can.Message(
|
||||
arbitration_id=0x01,
|
||||
data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD],
|
||||
is_extended_id=False
|
||||
)
|
||||
bus.send(disable_msg)
|
||||
time.sleep(0.1)
|
||||
|
||||
bus.shutdown()
|
||||
print("✓ Disconnected from CAN bus")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n✗ Error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("Debug Complete")
|
||||
print("=" * 60)
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) > 1:
|
||||
port = sys.argv[1]
|
||||
else:
|
||||
port = "/dev/cu.usbmodem2101"
|
||||
|
||||
test_can_communication(port)
|
||||
|
||||
@@ -73,7 +73,7 @@ class DamiaoMotorsBus(MotorsBusBase):
|
||||
port: str,
|
||||
motors: dict[str, Motor],
|
||||
calibration: dict[str, MotorCalibration] | None = None,
|
||||
can_interface: str = "socketcan",
|
||||
can_interface: str = "auto",
|
||||
):
|
||||
"""
|
||||
Initialize the Damiao motors bus.
|
||||
@@ -82,7 +82,7 @@ class DamiaoMotorsBus(MotorsBusBase):
|
||||
port: CAN interface name (e.g., "can0" for Linux, "/dev/cu.usbmodem*" for macOS)
|
||||
motors: Dictionary mapping motor names to Motor objects
|
||||
calibration: Optional calibration data
|
||||
can_interface: CAN interface type - "socketcan" (Linux) or "slcan" (macOS/serial)
|
||||
can_interface: CAN interface type - "auto" (default), "socketcan" (Linux), or "slcan" (macOS/serial)
|
||||
"""
|
||||
super().__init__(port, motors, calibration)
|
||||
self.port = port
|
||||
@@ -94,7 +94,7 @@ class DamiaoMotorsBus(MotorsBusBase):
|
||||
self._motor_can_ids = {}
|
||||
self._recv_id_to_motor = {}
|
||||
|
||||
# Store motor types
|
||||
# Store motor types and recv IDs
|
||||
self._motor_types = {}
|
||||
for name, motor in self.motors.items():
|
||||
if hasattr(motor, "motor_type"):
|
||||
@@ -102,6 +102,10 @@ class DamiaoMotorsBus(MotorsBusBase):
|
||||
else:
|
||||
# Default to DM4310 if not specified
|
||||
self._motor_types[name] = MotorType.DM4310
|
||||
|
||||
# Map recv_id to motor name for filtering responses
|
||||
if hasattr(motor, "recv_id"):
|
||||
self._recv_id_to_motor[motor.recv_id] = name
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
@@ -206,18 +210,20 @@ class DamiaoMotorsBus(MotorsBusBase):
|
||||
def _enable_motor(self, motor: NameOrID) -> None:
|
||||
"""Enable a single motor."""
|
||||
motor_id = self._get_motor_id(motor)
|
||||
recv_id = self._get_motor_recv_id(motor)
|
||||
data = [0xFF] * 7 + [CAN_CMD_ENABLE]
|
||||
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
|
||||
self.canbus.send(msg)
|
||||
self._recv_motor_response()
|
||||
self._recv_motor_response(expected_recv_id=recv_id)
|
||||
|
||||
def _disable_motor(self, motor: NameOrID) -> None:
|
||||
"""Disable a single motor."""
|
||||
motor_id = self._get_motor_id(motor)
|
||||
recv_id = self._get_motor_recv_id(motor)
|
||||
data = [0xFF] * 7 + [CAN_CMD_DISABLE]
|
||||
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
|
||||
self.canbus.send(msg)
|
||||
self._recv_motor_response()
|
||||
self._recv_motor_response(expected_recv_id=recv_id)
|
||||
|
||||
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
|
||||
"""Enable torque on selected motors."""
|
||||
@@ -250,26 +256,54 @@ class DamiaoMotorsBus(MotorsBusBase):
|
||||
motors = self._get_motors_list(motors)
|
||||
for motor in motors:
|
||||
motor_id = self._get_motor_id(motor)
|
||||
recv_id = self._get_motor_recv_id(motor)
|
||||
data = [0xFF] * 7 + [CAN_CMD_SET_ZERO]
|
||||
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
|
||||
self.canbus.send(msg)
|
||||
self._recv_motor_response()
|
||||
self._recv_motor_response(expected_recv_id=recv_id)
|
||||
time.sleep(0.01)
|
||||
|
||||
def _refresh_motor(self, motor: NameOrID) -> None:
|
||||
"""Refresh motor status."""
|
||||
def _refresh_motor(self, motor: NameOrID) -> Optional[can.Message]:
|
||||
"""Refresh motor status and return the response."""
|
||||
motor_id = self._get_motor_id(motor)
|
||||
recv_id = self._get_motor_recv_id(motor)
|
||||
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
|
||||
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False)
|
||||
self.canbus.send(msg)
|
||||
self._recv_motor_response()
|
||||
return self._recv_motor_response(expected_recv_id=recv_id)
|
||||
|
||||
def _recv_motor_response(self, timeout: float = 0.1) -> Optional[can.Message]:
|
||||
"""Receive a response from a motor."""
|
||||
def _recv_motor_response(self, expected_recv_id: Optional[int] = None, timeout: float = 0.5) -> Optional[can.Message]:
|
||||
"""
|
||||
Receive a response from a motor.
|
||||
|
||||
Args:
|
||||
expected_recv_id: If provided, only return messages from this CAN ID
|
||||
timeout: Timeout in seconds
|
||||
|
||||
Returns:
|
||||
CAN message if received, None otherwise
|
||||
"""
|
||||
try:
|
||||
msg = self.canbus.recv(timeout=timeout)
|
||||
if msg:
|
||||
return msg
|
||||
start_time = time.time()
|
||||
messages_seen = []
|
||||
while time.time() - start_time < timeout:
|
||||
msg = self.canbus.recv(timeout=0.01) # Short timeout for polling
|
||||
if msg:
|
||||
messages_seen.append(f"0x{msg.arbitration_id:02X}")
|
||||
# If no filter specified, return any message
|
||||
if expected_recv_id is None:
|
||||
return msg
|
||||
# Otherwise, only return if it matches the expected recv_id
|
||||
if msg.arbitration_id == expected_recv_id:
|
||||
return msg
|
||||
else:
|
||||
logger.debug(f"Ignoring message from CAN ID 0x{msg.arbitration_id:02X}, expected 0x{expected_recv_id:02X}")
|
||||
|
||||
# Log what we saw for debugging
|
||||
if messages_seen:
|
||||
logger.warning(f"Received {len(messages_seen)} message(s) from IDs {set(messages_seen)}, but expected 0x{expected_recv_id:02X}")
|
||||
else:
|
||||
logger.warning(f"No CAN messages received (expected from 0x{expected_recv_id:02X})")
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to receive CAN message: {e}")
|
||||
return None
|
||||
@@ -325,7 +359,8 @@ class DamiaoMotorsBus(MotorsBusBase):
|
||||
|
||||
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
|
||||
self.canbus.send(msg)
|
||||
self._recv_motor_response()
|
||||
recv_id = self._get_motor_recv_id(motor)
|
||||
self._recv_motor_response(expected_recv_id=recv_id)
|
||||
|
||||
def _float_to_uint(self, x: float, x_min: float, x_max: float, bits: int) -> int:
|
||||
"""Convert float to unsigned integer for CAN transmission."""
|
||||
@@ -384,12 +419,15 @@ class DamiaoMotorsBus(MotorsBusBase):
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
# Refresh motor to get latest state
|
||||
self._refresh_motor(motor)
|
||||
|
||||
# Read response
|
||||
msg = self._recv_motor_response()
|
||||
msg = self._refresh_motor(motor)
|
||||
if msg is None:
|
||||
raise ConnectionError(f"No response from motor {motor}")
|
||||
motor_id = self._get_motor_id(motor)
|
||||
recv_id = self._get_motor_recv_id(motor)
|
||||
raise ConnectionError(
|
||||
f"No response from motor '{motor}' (send ID: 0x{motor_id:02X}, recv ID: 0x{recv_id:02X}). "
|
||||
f"Check that: 1) Motor is powered (24V), 2) CAN wiring is correct, "
|
||||
f"3) Motor IDs are configured correctly using Damiao Debugging Tools"
|
||||
)
|
||||
|
||||
motor_type = self._motor_types.get(motor, MotorType.DM4310)
|
||||
position_degrees, velocity_deg_per_sec, torque, t_mos, t_rotor = self._decode_motor_state(msg.data, motor_type)
|
||||
@@ -578,6 +616,14 @@ class DamiaoMotorsBus(MotorsBusBase):
|
||||
if m.id == motor:
|
||||
return name
|
||||
raise ValueError(f"Unknown motor ID: {motor}")
|
||||
|
||||
def _get_motor_recv_id(self, motor: NameOrID) -> Optional[int]:
|
||||
"""Get motor recv_id from name or ID."""
|
||||
motor_name = self._get_motor_name(motor)
|
||||
motor_obj = self.motors.get(motor_name)
|
||||
if motor_obj and hasattr(motor_obj, "recv_id"):
|
||||
return motor_obj.recv_id
|
||||
return None
|
||||
|
||||
@cached_property
|
||||
def is_calibrated(self) -> bool:
|
||||
|
||||
@@ -32,6 +32,39 @@ pytest_plugins = [
|
||||
]
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
"""Add custom command line option for hardware tests."""
|
||||
parser.addoption(
|
||||
"--run-hardware",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Run hardware tests that require actual motors connected",
|
||||
)
|
||||
parser.addoption(
|
||||
"--can-port",
|
||||
action="store",
|
||||
default=None,
|
||||
help="CAN interface port (e.g., 'can0' for Linux, '/dev/cu.usbmodem*' for macOS)",
|
||||
)
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
"""Register custom marker for hardware tests."""
|
||||
config.addinivalue_line("markers", "hardware: mark test as requiring hardware")
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(config, items):
|
||||
"""Skip hardware tests unless --run-hardware flag is provided."""
|
||||
if config.getoption("--run-hardware"):
|
||||
# --run-hardware given in cli: do not skip hardware tests
|
||||
return
|
||||
|
||||
skip_hardware = pytest.mark.skip(reason="need --run-hardware option to run")
|
||||
for item in items:
|
||||
if "hardware" in item.keywords:
|
||||
item.add_marker(skip_hardware)
|
||||
|
||||
|
||||
def pytest_collection_finish():
|
||||
print(f"\nTesting with {DEVICE=}")
|
||||
|
||||
|
||||
+308
-49
@@ -14,66 +14,325 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# TODO(pepijn): add license of: https://github.com/cmjang/DM_Control_Python?tab=MIT-1-ov-file#readme
|
||||
"""
|
||||
Test script for Damiao motor communication and control.
|
||||
|
||||
This script tests basic functionality of a single Damiao motor via CAN bus:
|
||||
1. Connects to CAN interface
|
||||
2. Discovers and enables the motor
|
||||
3. Reads current position
|
||||
4. Sets zero position
|
||||
5. Writes target positions
|
||||
6. Disables torque
|
||||
|
||||
Requirements:
|
||||
- Motor must be connected and powered (24V)
|
||||
- CAN interface must be configured (e.g., can0)
|
||||
- Motor ID must be set to 0x01 (send) and 0x11 (receive)
|
||||
|
||||
Setup CAN interface:
|
||||
sudo ip link set can0 type can bitrate 1000000
|
||||
sudo ip link set can0 up
|
||||
|
||||
Verify connection:
|
||||
candump can0 # In another terminal
|
||||
cansend can0 001#FFFFFFFFFFFFFFFC # Should enable motor and LED turns green
|
||||
"""
|
||||
|
||||
import time
|
||||
import math
|
||||
import numpy as np
|
||||
from damiao_motor import Motor, MotorControl, DM_Motor_Type, Control_Type
|
||||
|
||||
# Configuration parameters
|
||||
NUM_MOTORS = 1 # Number of motors to control
|
||||
CAN_INTERFACE = "can0" # CAN interface name
|
||||
CAN_BITRATE = 1000000 # CAN bus baud rate
|
||||
MOTOR_TYPE = DM_Motor_Type.DM4310 # Motor model
|
||||
import pytest
|
||||
|
||||
# Sine wave parameters
|
||||
FREQUENCY = 0.1 # Frequency (Hz)
|
||||
AMPLITUDE = 6 # Amplitude (rad)
|
||||
DURATION = 60.0 # Operation duration (s)
|
||||
from lerobot.motors import Motor, MotorNormMode
|
||||
from lerobot.motors.damiao import DamiaoMotorsBus
|
||||
from lerobot.motors.damiao.tables import MotorType
|
||||
|
||||
def main():
|
||||
# Create motor controller object
|
||||
control = MotorControl(CAN_INTERFACE, bitrate=CAN_BITRATE)
|
||||
@pytest.fixture
|
||||
def can_port(request):
|
||||
"""Get CAN port from command line or raise error if not provided."""
|
||||
port = request.config.getoption("--can-port")
|
||||
if port is None:
|
||||
pytest.skip("CAN port not specified. Use --can-port to specify the CAN interface.")
|
||||
return port
|
||||
|
||||
|
||||
@pytest.mark.hardware
|
||||
def test_single_motor_basic_operations(can_port):
|
||||
"""
|
||||
Test basic operations with a single Damiao motor.
|
||||
|
||||
# Create and add motors
|
||||
motors = []
|
||||
for i in range(NUM_MOTORS):
|
||||
motor = Motor(MOTOR_TYPE, i + 1, i + 0X10) # CAN IDs start from 1
|
||||
control.addMotor(motor)
|
||||
motors.append(motor)
|
||||
control.enable(motor)
|
||||
print(f"Motor {i + 1} enabled")
|
||||
This test requires actual hardware and is skipped by default.
|
||||
To run with hardware, use: pytest tests/motors/test_damiao.py --run-hardware --can-port PORT
|
||||
"""
|
||||
|
||||
# Configuration
|
||||
MOTOR_ID = 0x01 # Sender CAN ID
|
||||
MOTOR_RECV_ID = 0x11 # Receiver/Master ID
|
||||
MOTOR_TYPE = "dm4310"
|
||||
MOTOR_NAME = "test_motor"
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print("Damiao Motor Test - Single Motor Basic Operations")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
# Step 1: Create motor configuration
|
||||
print(f"Step 1: Creating motor configuration...")
|
||||
print(f" - Motor ID: 0x{MOTOR_ID:02X} (send) / 0x{MOTOR_RECV_ID:02X} (recv)")
|
||||
print(f" - Motor Type: {MOTOR_TYPE}")
|
||||
print(f" - CAN Port: {can_port}")
|
||||
|
||||
motor = Motor(MOTOR_ID, MOTOR_TYPE, MotorNormMode.DEGREES)
|
||||
motor.recv_id = MOTOR_RECV_ID
|
||||
motor.motor_type = MotorType.DM4310
|
||||
|
||||
motors = {MOTOR_NAME: motor}
|
||||
|
||||
# Step 2: Connect to CAN bus
|
||||
print(f"\nStep 2: Connecting to CAN bus...")
|
||||
bus = DamiaoMotorsBus(port=can_port, motors=motors)
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < DURATION:
|
||||
current_time = time.time() - start_time
|
||||
bus.connect(handshake=True)
|
||||
print(f" ✓ Connected to {can_port}")
|
||||
except Exception as e:
|
||||
print(f" ✗ Failed to connect: {e}")
|
||||
print("\nTroubleshooting:")
|
||||
print(f" 1. Check CAN interface is up: ip link show {can_port}")
|
||||
print(f" 2. Setup if needed: sudo ip link set {can_port} type can bitrate 1000000")
|
||||
print(f" 3. Bring up: sudo ip link set {can_port} up")
|
||||
print(f" 4. Test with: cansend {can_port} 001#FFFFFFFFFFFFFFFC")
|
||||
return
|
||||
|
||||
try:
|
||||
# Step 3: Enable motor (torque on)
|
||||
print(f"\nStep 3: Enabling motor...")
|
||||
bus.enable_torque(MOTOR_NAME)
|
||||
time.sleep(0.1)
|
||||
print(f" ✓ Motor enabled (LED should be green)")
|
||||
|
||||
# Step 4: Read current position
|
||||
print(f"\nStep 4: Reading current position...")
|
||||
current_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False)
|
||||
current_vel = bus.read("Present_Velocity", MOTOR_NAME, normalize=False)
|
||||
current_torque = bus.read("Present_Torque", MOTOR_NAME, normalize=False)
|
||||
|
||||
print(f" Current State:")
|
||||
print(f" Position: {current_pos:8.2f}°")
|
||||
print(f" Velocity: {current_vel:8.2f}°/s")
|
||||
print(f" Torque: {current_torque:8.3f} N·m")
|
||||
|
||||
# Step 5: Set zero position
|
||||
print(f"\nStep 5: Setting current position as zero...")
|
||||
bus.set_zero_position([MOTOR_NAME])
|
||||
time.sleep(0.2)
|
||||
|
||||
new_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False)
|
||||
print(f" Position after zero: {new_pos:8.2f}°")
|
||||
print(f" ✓ Zero position set")
|
||||
|
||||
# Step 6: Test position commands
|
||||
print(f"\nStep 6: Testing position control...")
|
||||
|
||||
test_positions = [0.0, 45.0, -45.0, 0.0]
|
||||
|
||||
for target_pos in test_positions:
|
||||
print(f"\n Moving to {target_pos:6.1f}°...")
|
||||
bus.write("Goal_Position", MOTOR_NAME, target_pos, normalize=False)
|
||||
time.sleep(1.0) # Allow motor to move
|
||||
|
||||
# Calculate sine wave position
|
||||
position = AMPLITUDE * math.sin(2 * math.pi * FREQUENCY * current_time)
|
||||
actual_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False)
|
||||
error = abs(actual_pos - target_pos)
|
||||
|
||||
# Control all motors
|
||||
for motor in motors:
|
||||
control.controlMIT(
|
||||
motor,
|
||||
kp=10.0, # Position gain
|
||||
kd=1.0, # Velocity gain
|
||||
q=position, # Target position
|
||||
dq=0.0, # Target velocity
|
||||
tau=0.0 # Feedforward torque
|
||||
)
|
||||
print(f" Target: {target_pos:8.2f}°")
|
||||
print(f" Actual: {actual_pos:8.2f}°")
|
||||
print(f" Error: {error:8.2f}°")
|
||||
|
||||
# Control frequency
|
||||
time.sleep(0.001) # 1kHz control frequency
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nProgram interrupted by user")
|
||||
if error > 10.0:
|
||||
print(f" ⚠ Large position error!")
|
||||
else:
|
||||
print(f" ✓ Position reached")
|
||||
|
||||
# Step 7: Test MIT control with custom gains
|
||||
print(f"\nStep 7: Testing MIT control with custom gains...")
|
||||
print(f" Using lower gains for gentler movement...")
|
||||
|
||||
# Lower gains for smoother motion
|
||||
bus._mit_control(
|
||||
MOTOR_NAME,
|
||||
kp=5.0, # Lower position gain
|
||||
kd=0.3, # Lower damping
|
||||
position_degrees=30.0,
|
||||
velocity_deg_per_sec=0.0,
|
||||
torque=0.0
|
||||
)
|
||||
time.sleep(1.5)
|
||||
|
||||
final_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False)
|
||||
print(f" Final position: {final_pos:8.2f}°")
|
||||
print(f" ✓ MIT control test complete")
|
||||
|
||||
# Step 8: Return to zero
|
||||
print(f"\nStep 8: Returning to zero position...")
|
||||
bus.write("Goal_Position", MOTOR_NAME, 0.0, normalize=False)
|
||||
time.sleep(1.0)
|
||||
|
||||
final_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False)
|
||||
print(f" Final position: {final_pos:8.2f}°")
|
||||
|
||||
finally:
|
||||
# Disable all motors
|
||||
for motor in motors:
|
||||
control.disable(motor)
|
||||
print(f"Motor {motor.SlaveID} disabled")
|
||||
# Step 9: Disable motor
|
||||
print(f"\nStep 9: Disabling motor...")
|
||||
if bus.is_connected:
|
||||
bus.disable_torque(MOTOR_NAME)
|
||||
time.sleep(0.1)
|
||||
print(f" ✓ Motor disabled (torque off)")
|
||||
|
||||
# Step 10: Disconnect
|
||||
print(f"\nStep 10: Disconnecting...")
|
||||
if bus.is_connected:
|
||||
bus.disconnect(disable_torque=False) # Already disabled
|
||||
print(f" ✓ Disconnected from {can_port}")
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print("Test completed successfully!")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
|
||||
@pytest.mark.hardware
|
||||
def test_motor_discovery_and_setup(can_port):
|
||||
"""
|
||||
Test motor discovery and ID configuration.
|
||||
|
||||
Note: This test requires the Damiao Debugging Tools for actual ID changes.
|
||||
This test only demonstrates the bus scan functionality.
|
||||
"""
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print("Damiao Motor Discovery Test")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
print("Note: Motor ID configuration must be done via Damiao Debugging Tools")
|
||||
print("See: https://docs.openarm.dev/software/setup/motor-id")
|
||||
print()
|
||||
|
||||
# Test if CAN interface is accessible
|
||||
print(f"Testing CAN interface: {can_port}")
|
||||
|
||||
# Create a minimal motor bus for testing
|
||||
test_motor = Motor(0x01, "dm4310", MotorNormMode.DEGREES)
|
||||
test_motor.recv_id = 0x11
|
||||
test_motor.motor_type = MotorType.DM4310
|
||||
|
||||
bus = DamiaoMotorsBus(port=can_port, motors={"test": test_motor})
|
||||
|
||||
try:
|
||||
bus.connect(handshake=False)
|
||||
print(f"✓ CAN interface {can_port} is accessible")
|
||||
|
||||
# Try to communicate with motor at 0x01
|
||||
print(f"\nLooking for motor at ID 0x01...")
|
||||
try:
|
||||
bus._refresh_motor("test")
|
||||
msg = bus._recv_motor_response(timeout=0.5)
|
||||
if msg:
|
||||
print(f"✓ Motor found at ID 0x01, response ID: 0x{msg.arbitration_id:02X}")
|
||||
else:
|
||||
print(f"✗ No response from motor")
|
||||
print("\nTroubleshooting:")
|
||||
print(" 1. Verify motor is powered (24V)")
|
||||
print(" 2. Check CAN wiring (CANH, CANL)")
|
||||
print(" 3. Verify motor ID is set to 0x01")
|
||||
print(" 4. Enable with: cansend can0 001#FFFFFFFFFFFFFFFC")
|
||||
except Exception as e:
|
||||
print(f"✗ Error communicating with motor: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ Failed to access CAN interface: {e}")
|
||||
print("\nSetup CAN interface:")
|
||||
print(f" sudo ip link set {can_port} type can bitrate 1000000")
|
||||
print(f" sudo ip link set {can_port} up")
|
||||
|
||||
finally:
|
||||
if bus.is_connected:
|
||||
bus.disconnect(disable_torque=True)
|
||||
|
||||
print(f"\n{'='*60}\n")
|
||||
|
||||
|
||||
@pytest.mark.hardware
|
||||
def test_multi_motor_sync_operations(can_port):
|
||||
"""
|
||||
Test synchronized read/write with multiple motors.
|
||||
|
||||
This demonstrates how to control multiple motors simultaneously.
|
||||
"""
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print("Damiao Multi-Motor Sync Test")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
# Setup motors (adjust IDs as needed)
|
||||
motors = {
|
||||
"joint_1": Motor(0x01, "dm4310", MotorNormMode.DEGREES),
|
||||
"joint_2": Motor(0x02, "dm4310", MotorNormMode.DEGREES),
|
||||
}
|
||||
|
||||
motors["joint_1"].recv_id = 0x11
|
||||
motors["joint_1"].motor_type = MotorType.DM4310
|
||||
motors["joint_2"].recv_id = 0x12
|
||||
motors["joint_2"].motor_type = MotorType.DM4310
|
||||
|
||||
bus = DamiaoMotorsBus(port=can_port, motors=motors)
|
||||
|
||||
try:
|
||||
bus.connect()
|
||||
bus.enable_torque()
|
||||
|
||||
print("Reading all motor positions...")
|
||||
positions = bus.sync_read("Present_Position")
|
||||
for motor, pos in positions.items():
|
||||
print(f" {motor}: {pos:.2f}°")
|
||||
|
||||
print("\nMoving all motors to 45°...")
|
||||
target_positions = {motor: 45.0 for motor in motors}
|
||||
bus.sync_write("Goal_Position", target_positions)
|
||||
time.sleep(2.0)
|
||||
|
||||
positions = bus.sync_read("Present_Position")
|
||||
print("Final positions:")
|
||||
for motor, pos in positions.items():
|
||||
print(f" {motor}: {pos:.2f}°")
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ Test failed: {e}")
|
||||
print("\nThis is expected on macOS without proper CAN hardware.")
|
||||
print("macOS does not support SocketCAN natively (Linux-only feature).")
|
||||
print("For macOS, you need a USB-CAN adapter with SLCAN support.")
|
||||
finally:
|
||||
if bus.is_connected:
|
||||
bus.disable_torque()
|
||||
bus.disconnect()
|
||||
|
||||
print(f"\n{'='*60}\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
print("Damiao Motor Test Suite")
|
||||
print("=" * 60)
|
||||
print("\nThese tests require actual hardware to run.")
|
||||
print("Please ensure:")
|
||||
print(" 1. Motor is connected and powered (24V)")
|
||||
print(" 2. CAN interface is configured")
|
||||
print(" 3. Motor ID is set to 0x01/0x11")
|
||||
print("\nTo run tests with hardware:")
|
||||
print("\n Linux (SocketCAN):")
|
||||
print(" sudo ip link set can0 type can bitrate 1000000")
|
||||
print(" sudo ip link set can0 up")
|
||||
print(" pytest tests/motors/test_damiao.py --run-hardware --can-port can0")
|
||||
print("\n macOS (USB-CAN adapter with SLCAN):")
|
||||
print(" pytest tests/motors/test_damiao.py --run-hardware --can-port /dev/cu.usbmodem00000000050C1")
|
||||
print("\nTo run without hardware (tests will be skipped):")
|
||||
print(" pytest tests/motors/test_damiao.py")
|
||||
print("\nNote: The --run-hardware and --can-port flags are configured in tests/conftest.py")
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user