diff --git a/debug_can_communication.py b/debug_can_communication.py new file mode 100644 index 000000000..280fb6a75 --- /dev/null +++ b/debug_can_communication.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +""" +Debug script for Damiao motor CAN communication on macOS. +This replaces candump for macOS SLCAN debugging. +""" + +import can +import time +import sys + +def test_can_communication(port="/dev/cu.usbmodem2101"): + """Test basic CAN communication with a Damiao motor.""" + + print("=" * 60) + print("Damiao Motor CAN Communication Debug Tool") + print("=" * 60) + print(f"\nPort: {port}") + print() + + try: + # Connect to SLCAN + print("Step 1: Connecting to SLCAN...") + bus = can.interface.Bus( + channel=port, + interface='slcan', + bitrate=1000000 + ) + print("✓ Connected to SLCAN") + + # Test 1: Send enable command and listen for ANY response + print("\n" + "=" * 60) + print("Test 1: Enable Motor (ID 0x01)") + print("=" * 60) + print("Sending enable command to 0x01...") + + enable_msg = can.Message( + arbitration_id=0x01, + data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC], + is_extended_id=False + ) + bus.send(enable_msg) + print("✓ Enable command sent") + + print("\nListening for responses (2 seconds)...") + print("Expected: Response from 0x11 (master ID)") + print() + + responses = [] + start_time = time.time() + timeout = 2.0 + + while time.time() - start_time < timeout: + msg = bus.recv(timeout=0.1) + if msg: + responses.append(msg) + print(f" → Response from 0x{msg.arbitration_id:02X}: {msg.data.hex()}") + + if not responses: + print("✗ NO RESPONSES RECEIVED") + print("\nPossible issues:") + print(" 1. Motor not powered (check 24V supply)") + print(" 2. CAN wiring incorrect (CANH, CANL, GND)") + print(" 3. Motor master ID not set to 0x11") + print(" 4. SLCAN adapter not working properly") + print(" 5. Wrong CAN port specified") + else: + print(f"\n✓ Received {len(responses)} response(s)") + + # Check if we got response from expected ID + recv_ids = [msg.arbitration_id for msg in responses] + if 0x11 in recv_ids: + print("✓ Motor 0x11 is responding!") + else: + print(f"⚠ Responses from unexpected IDs: {[hex(id) for id in recv_ids]}") + + # Test 2: Send refresh command + print("\n" + "=" * 60) + print("Test 2: Refresh Motor State (ID 0x01)") + print("=" * 60) + print("Sending refresh command...") + + refresh_msg = can.Message( + arbitration_id=0x7FF, # Parameter ID + data=[0x01, 0x00, 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00], + is_extended_id=False + ) + bus.send(refresh_msg) + print("✓ Refresh command sent") + + print("\nListening for responses (2 seconds)...") + responses = [] + start_time = time.time() + + while time.time() - start_time < timeout: + msg = bus.recv(timeout=0.1) + if msg: + responses.append(msg) + print(f" → Response from 0x{msg.arbitration_id:02X}: {msg.data.hex()}") + + if not responses: + print("✗ NO RESPONSES RECEIVED") + else: + print(f"\n✓ Received {len(responses)} response(s)") + + # Test 3: Listen for any spontaneous traffic + print("\n" + "=" * 60) + print("Test 3: Listen for Any CAN Traffic") + print("=" * 60) + print("Listening for 5 seconds...") + print("(This will catch any background CAN traffic)") + print() + + start_time = time.time() + traffic_count = 0 + + while time.time() - start_time < 5.0: + msg = bus.recv(timeout=0.1) + if msg: + traffic_count += 1 + print(f" [{time.time() - start_time:.2f}s] ID=0x{msg.arbitration_id:03X}: {msg.data.hex()}") + + if traffic_count == 0: + print("✗ No CAN traffic detected at all") + print("\nThis suggests:") + print(" - SLCAN adapter may not be working") + print(" - No devices on the CAN bus are active") + print(" - Wrong port specified") + else: + print(f"\n✓ Detected {traffic_count} CAN messages") + + # Cleanup + print("\n" + "=" * 60) + print("Cleanup") + print("=" * 60) + print("Sending disable command...") + disable_msg = can.Message( + arbitration_id=0x01, + data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD], + is_extended_id=False + ) + bus.send(disable_msg) + time.sleep(0.1) + + bus.shutdown() + print("✓ Disconnected from CAN bus") + + except Exception as e: + print(f"\n✗ Error: {e}") + import traceback + traceback.print_exc() + return False + + print("\n" + "=" * 60) + print("Debug Complete") + print("=" * 60) + return True + + +if __name__ == "__main__": + if len(sys.argv) > 1: + port = sys.argv[1] + else: + port = "/dev/cu.usbmodem2101" + + test_can_communication(port) + diff --git a/src/lerobot/motors/damiao/damiao.py b/src/lerobot/motors/damiao/damiao.py index f3186b661..90fa69c92 100644 --- a/src/lerobot/motors/damiao/damiao.py +++ b/src/lerobot/motors/damiao/damiao.py @@ -73,7 +73,7 @@ class DamiaoMotorsBus(MotorsBusBase): port: str, motors: dict[str, Motor], calibration: dict[str, MotorCalibration] | None = None, - can_interface: str = "socketcan", + can_interface: str = "auto", ): """ Initialize the Damiao motors bus. @@ -82,7 +82,7 @@ class DamiaoMotorsBus(MotorsBusBase): port: CAN interface name (e.g., "can0" for Linux, "/dev/cu.usbmodem*" for macOS) motors: Dictionary mapping motor names to Motor objects calibration: Optional calibration data - can_interface: CAN interface type - "socketcan" (Linux) or "slcan" (macOS/serial) + can_interface: CAN interface type - "auto" (default), "socketcan" (Linux), or "slcan" (macOS/serial) """ super().__init__(port, motors, calibration) self.port = port @@ -94,7 +94,7 @@ class DamiaoMotorsBus(MotorsBusBase): self._motor_can_ids = {} self._recv_id_to_motor = {} - # Store motor types + # Store motor types and recv IDs self._motor_types = {} for name, motor in self.motors.items(): if hasattr(motor, "motor_type"): @@ -102,6 +102,10 @@ class DamiaoMotorsBus(MotorsBusBase): else: # Default to DM4310 if not specified self._motor_types[name] = MotorType.DM4310 + + # Map recv_id to motor name for filtering responses + if hasattr(motor, "recv_id"): + self._recv_id_to_motor[motor.recv_id] = name @property def is_connected(self) -> bool: @@ -206,18 +210,20 @@ class DamiaoMotorsBus(MotorsBusBase): def _enable_motor(self, motor: NameOrID) -> None: """Enable a single motor.""" motor_id = self._get_motor_id(motor) + recv_id = self._get_motor_recv_id(motor) data = [0xFF] * 7 + [CAN_CMD_ENABLE] msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) self.canbus.send(msg) - self._recv_motor_response() + self._recv_motor_response(expected_recv_id=recv_id) def _disable_motor(self, motor: NameOrID) -> None: """Disable a single motor.""" motor_id = self._get_motor_id(motor) + recv_id = self._get_motor_recv_id(motor) data = [0xFF] * 7 + [CAN_CMD_DISABLE] msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) self.canbus.send(msg) - self._recv_motor_response() + self._recv_motor_response(expected_recv_id=recv_id) def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: """Enable torque on selected motors.""" @@ -250,26 +256,54 @@ class DamiaoMotorsBus(MotorsBusBase): motors = self._get_motors_list(motors) for motor in motors: motor_id = self._get_motor_id(motor) + recv_id = self._get_motor_recv_id(motor) data = [0xFF] * 7 + [CAN_CMD_SET_ZERO] msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) self.canbus.send(msg) - self._recv_motor_response() + self._recv_motor_response(expected_recv_id=recv_id) time.sleep(0.01) - def _refresh_motor(self, motor: NameOrID) -> None: - """Refresh motor status.""" + def _refresh_motor(self, motor: NameOrID) -> Optional[can.Message]: + """Refresh motor status and return the response.""" motor_id = self._get_motor_id(motor) + recv_id = self._get_motor_recv_id(motor) data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0] msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False) self.canbus.send(msg) - self._recv_motor_response() + return self._recv_motor_response(expected_recv_id=recv_id) - def _recv_motor_response(self, timeout: float = 0.1) -> Optional[can.Message]: - """Receive a response from a motor.""" + def _recv_motor_response(self, expected_recv_id: Optional[int] = None, timeout: float = 0.5) -> Optional[can.Message]: + """ + Receive a response from a motor. + + Args: + expected_recv_id: If provided, only return messages from this CAN ID + timeout: Timeout in seconds + + Returns: + CAN message if received, None otherwise + """ try: - msg = self.canbus.recv(timeout=timeout) - if msg: - return msg + start_time = time.time() + messages_seen = [] + while time.time() - start_time < timeout: + msg = self.canbus.recv(timeout=0.01) # Short timeout for polling + if msg: + messages_seen.append(f"0x{msg.arbitration_id:02X}") + # If no filter specified, return any message + if expected_recv_id is None: + return msg + # Otherwise, only return if it matches the expected recv_id + if msg.arbitration_id == expected_recv_id: + return msg + else: + logger.debug(f"Ignoring message from CAN ID 0x{msg.arbitration_id:02X}, expected 0x{expected_recv_id:02X}") + + # Log what we saw for debugging + if messages_seen: + logger.warning(f"Received {len(messages_seen)} message(s) from IDs {set(messages_seen)}, but expected 0x{expected_recv_id:02X}") + else: + logger.warning(f"No CAN messages received (expected from 0x{expected_recv_id:02X})") except Exception as e: logger.debug(f"Failed to receive CAN message: {e}") return None @@ -325,7 +359,8 @@ class DamiaoMotorsBus(MotorsBusBase): msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) self.canbus.send(msg) - self._recv_motor_response() + recv_id = self._get_motor_recv_id(motor) + self._recv_motor_response(expected_recv_id=recv_id) def _float_to_uint(self, x: float, x_min: float, x_max: float, bits: int) -> int: """Convert float to unsigned integer for CAN transmission.""" @@ -384,12 +419,15 @@ class DamiaoMotorsBus(MotorsBusBase): raise DeviceNotConnectedError(f"{self} is not connected.") # Refresh motor to get latest state - self._refresh_motor(motor) - - # Read response - msg = self._recv_motor_response() + msg = self._refresh_motor(motor) if msg is None: - raise ConnectionError(f"No response from motor {motor}") + motor_id = self._get_motor_id(motor) + recv_id = self._get_motor_recv_id(motor) + raise ConnectionError( + f"No response from motor '{motor}' (send ID: 0x{motor_id:02X}, recv ID: 0x{recv_id:02X}). " + f"Check that: 1) Motor is powered (24V), 2) CAN wiring is correct, " + f"3) Motor IDs are configured correctly using Damiao Debugging Tools" + ) motor_type = self._motor_types.get(motor, MotorType.DM4310) position_degrees, velocity_deg_per_sec, torque, t_mos, t_rotor = self._decode_motor_state(msg.data, motor_type) @@ -578,6 +616,14 @@ class DamiaoMotorsBus(MotorsBusBase): if m.id == motor: return name raise ValueError(f"Unknown motor ID: {motor}") + + def _get_motor_recv_id(self, motor: NameOrID) -> Optional[int]: + """Get motor recv_id from name or ID.""" + motor_name = self._get_motor_name(motor) + motor_obj = self.motors.get(motor_name) + if motor_obj and hasattr(motor_obj, "recv_id"): + return motor_obj.recv_id + return None @cached_property def is_calibrated(self) -> bool: diff --git a/tests/conftest.py b/tests/conftest.py index b14e9aed5..6f4cfdff0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,6 +32,39 @@ pytest_plugins = [ ] +def pytest_addoption(parser): + """Add custom command line option for hardware tests.""" + parser.addoption( + "--run-hardware", + action="store_true", + default=False, + help="Run hardware tests that require actual motors connected", + ) + parser.addoption( + "--can-port", + action="store", + default=None, + help="CAN interface port (e.g., 'can0' for Linux, '/dev/cu.usbmodem*' for macOS)", + ) + + +def pytest_configure(config): + """Register custom marker for hardware tests.""" + config.addinivalue_line("markers", "hardware: mark test as requiring hardware") + + +def pytest_collection_modifyitems(config, items): + """Skip hardware tests unless --run-hardware flag is provided.""" + if config.getoption("--run-hardware"): + # --run-hardware given in cli: do not skip hardware tests + return + + skip_hardware = pytest.mark.skip(reason="need --run-hardware option to run") + for item in items: + if "hardware" in item.keywords: + item.add_marker(skip_hardware) + + def pytest_collection_finish(): print(f"\nTesting with {DEVICE=}") diff --git a/tests/motors/test_damiao.py b/tests/motors/test_damiao.py index 9b6daa375..ebf1d8ba6 100644 --- a/tests/motors/test_damiao.py +++ b/tests/motors/test_damiao.py @@ -14,66 +14,325 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO(pepijn): add license of: https://github.com/cmjang/DM_Control_Python?tab=MIT-1-ov-file#readme +""" +Test script for Damiao motor communication and control. + +This script tests basic functionality of a single Damiao motor via CAN bus: +1. Connects to CAN interface +2. Discovers and enables the motor +3. Reads current position +4. Sets zero position +5. Writes target positions +6. Disables torque + +Requirements: +- Motor must be connected and powered (24V) +- CAN interface must be configured (e.g., can0) +- Motor ID must be set to 0x01 (send) and 0x11 (receive) + +Setup CAN interface: + sudo ip link set can0 type can bitrate 1000000 + sudo ip link set can0 up + +Verify connection: + candump can0 # In another terminal + cansend can0 001#FFFFFFFFFFFFFFFC # Should enable motor and LED turns green +""" import time -import math -import numpy as np -from damiao_motor import Motor, MotorControl, DM_Motor_Type, Control_Type -# Configuration parameters -NUM_MOTORS = 1 # Number of motors to control -CAN_INTERFACE = "can0" # CAN interface name -CAN_BITRATE = 1000000 # CAN bus baud rate -MOTOR_TYPE = DM_Motor_Type.DM4310 # Motor model +import pytest -# Sine wave parameters -FREQUENCY = 0.1 # Frequency (Hz) -AMPLITUDE = 6 # Amplitude (rad) -DURATION = 60.0 # Operation duration (s) +from lerobot.motors import Motor, MotorNormMode +from lerobot.motors.damiao import DamiaoMotorsBus +from lerobot.motors.damiao.tables import MotorType -def main(): - # Create motor controller object - control = MotorControl(CAN_INTERFACE, bitrate=CAN_BITRATE) +@pytest.fixture +def can_port(request): + """Get CAN port from command line or raise error if not provided.""" + port = request.config.getoption("--can-port") + if port is None: + pytest.skip("CAN port not specified. Use --can-port to specify the CAN interface.") + return port + + +@pytest.mark.hardware +def test_single_motor_basic_operations(can_port): + """ + Test basic operations with a single Damiao motor. - # Create and add motors - motors = [] - for i in range(NUM_MOTORS): - motor = Motor(MOTOR_TYPE, i + 1, i + 0X10) # CAN IDs start from 1 - control.addMotor(motor) - motors.append(motor) - control.enable(motor) - print(f"Motor {i + 1} enabled") + This test requires actual hardware and is skipped by default. + To run with hardware, use: pytest tests/motors/test_damiao.py --run-hardware --can-port PORT + """ + + # Configuration + MOTOR_ID = 0x01 # Sender CAN ID + MOTOR_RECV_ID = 0x11 # Receiver/Master ID + MOTOR_TYPE = "dm4310" + MOTOR_NAME = "test_motor" + + print(f"\n{'='*60}") + print("Damiao Motor Test - Single Motor Basic Operations") + print(f"{'='*60}\n") + + # Step 1: Create motor configuration + print(f"Step 1: Creating motor configuration...") + print(f" - Motor ID: 0x{MOTOR_ID:02X} (send) / 0x{MOTOR_RECV_ID:02X} (recv)") + print(f" - Motor Type: {MOTOR_TYPE}") + print(f" - CAN Port: {can_port}") + + motor = Motor(MOTOR_ID, MOTOR_TYPE, MotorNormMode.DEGREES) + motor.recv_id = MOTOR_RECV_ID + motor.motor_type = MotorType.DM4310 + + motors = {MOTOR_NAME: motor} + + # Step 2: Connect to CAN bus + print(f"\nStep 2: Connecting to CAN bus...") + bus = DamiaoMotorsBus(port=can_port, motors=motors) try: - start_time = time.time() - while time.time() - start_time < DURATION: - current_time = time.time() - start_time + bus.connect(handshake=True) + print(f" ✓ Connected to {can_port}") + except Exception as e: + print(f" ✗ Failed to connect: {e}") + print("\nTroubleshooting:") + print(f" 1. Check CAN interface is up: ip link show {can_port}") + print(f" 2. Setup if needed: sudo ip link set {can_port} type can bitrate 1000000") + print(f" 3. Bring up: sudo ip link set {can_port} up") + print(f" 4. Test with: cansend {can_port} 001#FFFFFFFFFFFFFFFC") + return + + try: + # Step 3: Enable motor (torque on) + print(f"\nStep 3: Enabling motor...") + bus.enable_torque(MOTOR_NAME) + time.sleep(0.1) + print(f" ✓ Motor enabled (LED should be green)") + + # Step 4: Read current position + print(f"\nStep 4: Reading current position...") + current_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False) + current_vel = bus.read("Present_Velocity", MOTOR_NAME, normalize=False) + current_torque = bus.read("Present_Torque", MOTOR_NAME, normalize=False) + + print(f" Current State:") + print(f" Position: {current_pos:8.2f}°") + print(f" Velocity: {current_vel:8.2f}°/s") + print(f" Torque: {current_torque:8.3f} N·m") + + # Step 5: Set zero position + print(f"\nStep 5: Setting current position as zero...") + bus.set_zero_position([MOTOR_NAME]) + time.sleep(0.2) + + new_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False) + print(f" Position after zero: {new_pos:8.2f}°") + print(f" ✓ Zero position set") + + # Step 6: Test position commands + print(f"\nStep 6: Testing position control...") + + test_positions = [0.0, 45.0, -45.0, 0.0] + + for target_pos in test_positions: + print(f"\n Moving to {target_pos:6.1f}°...") + bus.write("Goal_Position", MOTOR_NAME, target_pos, normalize=False) + time.sleep(1.0) # Allow motor to move - # Calculate sine wave position - position = AMPLITUDE * math.sin(2 * math.pi * FREQUENCY * current_time) + actual_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False) + error = abs(actual_pos - target_pos) - # Control all motors - for motor in motors: - control.controlMIT( - motor, - kp=10.0, # Position gain - kd=1.0, # Velocity gain - q=position, # Target position - dq=0.0, # Target velocity - tau=0.0 # Feedforward torque - ) + print(f" Target: {target_pos:8.2f}°") + print(f" Actual: {actual_pos:8.2f}°") + print(f" Error: {error:8.2f}°") - # Control frequency - time.sleep(0.001) # 1kHz control frequency - - except KeyboardInterrupt: - print("\nProgram interrupted by user") + if error > 10.0: + print(f" ⚠ Large position error!") + else: + print(f" ✓ Position reached") + + # Step 7: Test MIT control with custom gains + print(f"\nStep 7: Testing MIT control with custom gains...") + print(f" Using lower gains for gentler movement...") + + # Lower gains for smoother motion + bus._mit_control( + MOTOR_NAME, + kp=5.0, # Lower position gain + kd=0.3, # Lower damping + position_degrees=30.0, + velocity_deg_per_sec=0.0, + torque=0.0 + ) + time.sleep(1.5) + + final_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False) + print(f" Final position: {final_pos:8.2f}°") + print(f" ✓ MIT control test complete") + + # Step 8: Return to zero + print(f"\nStep 8: Returning to zero position...") + bus.write("Goal_Position", MOTOR_NAME, 0.0, normalize=False) + time.sleep(1.0) + + final_pos = bus.read("Present_Position", MOTOR_NAME, normalize=False) + print(f" Final position: {final_pos:8.2f}°") + finally: - # Disable all motors - for motor in motors: - control.disable(motor) - print(f"Motor {motor.SlaveID} disabled") + # Step 9: Disable motor + print(f"\nStep 9: Disabling motor...") + if bus.is_connected: + bus.disable_torque(MOTOR_NAME) + time.sleep(0.1) + print(f" ✓ Motor disabled (torque off)") + + # Step 10: Disconnect + print(f"\nStep 10: Disconnecting...") + if bus.is_connected: + bus.disconnect(disable_torque=False) # Already disabled + print(f" ✓ Disconnected from {can_port}") + + print(f"\n{'='*60}") + print("Test completed successfully!") + print(f"{'='*60}\n") + + +@pytest.mark.hardware +def test_motor_discovery_and_setup(can_port): + """ + Test motor discovery and ID configuration. + + Note: This test requires the Damiao Debugging Tools for actual ID changes. + This test only demonstrates the bus scan functionality. + """ + + print(f"\n{'='*60}") + print("Damiao Motor Discovery Test") + print(f"{'='*60}\n") + + print("Note: Motor ID configuration must be done via Damiao Debugging Tools") + print("See: https://docs.openarm.dev/software/setup/motor-id") + print() + + # Test if CAN interface is accessible + print(f"Testing CAN interface: {can_port}") + + # Create a minimal motor bus for testing + test_motor = Motor(0x01, "dm4310", MotorNormMode.DEGREES) + test_motor.recv_id = 0x11 + test_motor.motor_type = MotorType.DM4310 + + bus = DamiaoMotorsBus(port=can_port, motors={"test": test_motor}) + + try: + bus.connect(handshake=False) + print(f"✓ CAN interface {can_port} is accessible") + + # Try to communicate with motor at 0x01 + print(f"\nLooking for motor at ID 0x01...") + try: + bus._refresh_motor("test") + msg = bus._recv_motor_response(timeout=0.5) + if msg: + print(f"✓ Motor found at ID 0x01, response ID: 0x{msg.arbitration_id:02X}") + else: + print(f"✗ No response from motor") + print("\nTroubleshooting:") + print(" 1. Verify motor is powered (24V)") + print(" 2. Check CAN wiring (CANH, CANL)") + print(" 3. Verify motor ID is set to 0x01") + print(" 4. Enable with: cansend can0 001#FFFFFFFFFFFFFFFC") + except Exception as e: + print(f"✗ Error communicating with motor: {e}") + + except Exception as e: + print(f"✗ Failed to access CAN interface: {e}") + print("\nSetup CAN interface:") + print(f" sudo ip link set {can_port} type can bitrate 1000000") + print(f" sudo ip link set {can_port} up") + + finally: + if bus.is_connected: + bus.disconnect(disable_torque=True) + + print(f"\n{'='*60}\n") + + +@pytest.mark.hardware +def test_multi_motor_sync_operations(can_port): + """ + Test synchronized read/write with multiple motors. + + This demonstrates how to control multiple motors simultaneously. + """ + + print(f"\n{'='*60}") + print("Damiao Multi-Motor Sync Test") + print(f"{'='*60}\n") + + # Setup motors (adjust IDs as needed) + motors = { + "joint_1": Motor(0x01, "dm4310", MotorNormMode.DEGREES), + "joint_2": Motor(0x02, "dm4310", MotorNormMode.DEGREES), + } + + motors["joint_1"].recv_id = 0x11 + motors["joint_1"].motor_type = MotorType.DM4310 + motors["joint_2"].recv_id = 0x12 + motors["joint_2"].motor_type = MotorType.DM4310 + + bus = DamiaoMotorsBus(port=can_port, motors=motors) + + try: + bus.connect() + bus.enable_torque() + + print("Reading all motor positions...") + positions = bus.sync_read("Present_Position") + for motor, pos in positions.items(): + print(f" {motor}: {pos:.2f}°") + + print("\nMoving all motors to 45°...") + target_positions = {motor: 45.0 for motor in motors} + bus.sync_write("Goal_Position", target_positions) + time.sleep(2.0) + + positions = bus.sync_read("Present_Position") + print("Final positions:") + for motor, pos in positions.items(): + print(f" {motor}: {pos:.2f}°") + + except Exception as e: + print(f"✗ Test failed: {e}") + print("\nThis is expected on macOS without proper CAN hardware.") + print("macOS does not support SocketCAN natively (Linux-only feature).") + print("For macOS, you need a USB-CAN adapter with SLCAN support.") + finally: + if bus.is_connected: + bus.disable_torque() + bus.disconnect() + + print(f"\n{'='*60}\n") + if __name__ == "__main__": - main() + print("Damiao Motor Test Suite") + print("=" * 60) + print("\nThese tests require actual hardware to run.") + print("Please ensure:") + print(" 1. Motor is connected and powered (24V)") + print(" 2. CAN interface is configured") + print(" 3. Motor ID is set to 0x01/0x11") + print("\nTo run tests with hardware:") + print("\n Linux (SocketCAN):") + print(" sudo ip link set can0 type can bitrate 1000000") + print(" sudo ip link set can0 up") + print(" pytest tests/motors/test_damiao.py --run-hardware --can-port can0") + print("\n macOS (USB-CAN adapter with SLCAN):") + print(" pytest tests/motors/test_damiao.py --run-hardware --can-port /dev/cu.usbmodem00000000050C1") + print("\nTo run without hardware (tests will be skipped):") + print(" pytest tests/motors/test_damiao.py") + print("\nNote: The --run-hardware and --can-port flags are configured in tests/conftest.py") + print("=" * 60) +