From a7391e82c7bd6b97bc897e00334d6a5e55c5ac31 Mon Sep 17 00:00:00 2001 From: croissant Date: Fri, 31 Oct 2025 10:01:41 +0100 Subject: [PATCH] pos teleop --- debug_can_communication.py | 166 ------- docs/source/openarms.mdx | 253 +---------- examples/openarms/debug_can_communication.py | 416 ++++++++++++++++++ examples/openarms/setup_can.sh | 73 +++ examples/openarms/teleop.py | 148 +++++++ src/lerobot/motors/damiao/damiao.py | 74 +++- .../openarms/config_openarms_follower.py | 23 +- .../robots/openarms/openarms_follower.py | 340 +++++++++----- .../openarms/config_openarms_leader.py | 14 +- .../teleoperators/openarms/openarms_leader.py | 202 +++++---- 10 files changed, 1095 insertions(+), 614 deletions(-) create mode 100644 examples/openarms/debug_can_communication.py create mode 100755 examples/openarms/setup_can.sh create mode 100644 examples/openarms/teleop.py diff --git a/debug_can_communication.py b/debug_can_communication.py index 280fb6a75..e69de29bb 100644 --- a/debug_can_communication.py +++ b/debug_can_communication.py @@ -1,166 +0,0 @@ -#!/usr/bin/env python3 -""" -Debug script for Damiao motor CAN communication on macOS. -This replaces candump for macOS SLCAN debugging. -""" - -import can -import time -import sys - -def test_can_communication(port="/dev/cu.usbmodem2101"): - """Test basic CAN communication with a Damiao motor.""" - - print("=" * 60) - print("Damiao Motor CAN Communication Debug Tool") - print("=" * 60) - print(f"\nPort: {port}") - print() - - try: - # Connect to SLCAN - print("Step 1: Connecting to SLCAN...") - bus = can.interface.Bus( - channel=port, - interface='slcan', - bitrate=1000000 - ) - print("✓ Connected to SLCAN") - - # Test 1: Send enable command and listen for ANY response - print("\n" + "=" * 60) - print("Test 1: Enable Motor (ID 0x01)") - print("=" * 60) - print("Sending enable command to 0x01...") - - enable_msg = can.Message( - arbitration_id=0x01, - data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC], - is_extended_id=False - ) - bus.send(enable_msg) - print("✓ Enable command sent") - - print("\nListening for responses (2 seconds)...") - print("Expected: Response from 0x11 (master ID)") - print() - - responses = [] - start_time = time.time() - timeout = 2.0 - - while time.time() - start_time < timeout: - msg = bus.recv(timeout=0.1) - if msg: - responses.append(msg) - print(f" → Response from 0x{msg.arbitration_id:02X}: {msg.data.hex()}") - - if not responses: - print("✗ NO RESPONSES RECEIVED") - print("\nPossible issues:") - print(" 1. Motor not powered (check 24V supply)") - print(" 2. CAN wiring incorrect (CANH, CANL, GND)") - print(" 3. Motor master ID not set to 0x11") - print(" 4. SLCAN adapter not working properly") - print(" 5. Wrong CAN port specified") - else: - print(f"\n✓ Received {len(responses)} response(s)") - - # Check if we got response from expected ID - recv_ids = [msg.arbitration_id for msg in responses] - if 0x11 in recv_ids: - print("✓ Motor 0x11 is responding!") - else: - print(f"⚠ Responses from unexpected IDs: {[hex(id) for id in recv_ids]}") - - # Test 2: Send refresh command - print("\n" + "=" * 60) - print("Test 2: Refresh Motor State (ID 0x01)") - print("=" * 60) - print("Sending refresh command...") - - refresh_msg = can.Message( - arbitration_id=0x7FF, # Parameter ID - data=[0x01, 0x00, 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00], - is_extended_id=False - ) - bus.send(refresh_msg) - print("✓ Refresh command sent") - - print("\nListening for responses (2 seconds)...") - responses = [] - start_time = time.time() - - while time.time() - start_time < timeout: - msg = bus.recv(timeout=0.1) - if msg: - responses.append(msg) - print(f" → Response from 0x{msg.arbitration_id:02X}: {msg.data.hex()}") - - if not responses: - print("✗ NO RESPONSES RECEIVED") - else: - print(f"\n✓ Received {len(responses)} response(s)") - - # Test 3: Listen for any spontaneous traffic - print("\n" + "=" * 60) - print("Test 3: Listen for Any CAN Traffic") - print("=" * 60) - print("Listening for 5 seconds...") - print("(This will catch any background CAN traffic)") - print() - - start_time = time.time() - traffic_count = 0 - - while time.time() - start_time < 5.0: - msg = bus.recv(timeout=0.1) - if msg: - traffic_count += 1 - print(f" [{time.time() - start_time:.2f}s] ID=0x{msg.arbitration_id:03X}: {msg.data.hex()}") - - if traffic_count == 0: - print("✗ No CAN traffic detected at all") - print("\nThis suggests:") - print(" - SLCAN adapter may not be working") - print(" - No devices on the CAN bus are active") - print(" - Wrong port specified") - else: - print(f"\n✓ Detected {traffic_count} CAN messages") - - # Cleanup - print("\n" + "=" * 60) - print("Cleanup") - print("=" * 60) - print("Sending disable command...") - disable_msg = can.Message( - arbitration_id=0x01, - data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD], - is_extended_id=False - ) - bus.send(disable_msg) - time.sleep(0.1) - - bus.shutdown() - print("✓ Disconnected from CAN bus") - - except Exception as e: - print(f"\n✗ Error: {e}") - import traceback - traceback.print_exc() - return False - - print("\n" + "=" * 60) - print("Debug Complete") - print("=" * 60) - return True - - -if __name__ == "__main__": - if len(sys.argv) > 1: - port = sys.argv[1] - else: - port = "/dev/cu.usbmodem2101" - - test_can_communication(port) - diff --git a/docs/source/openarms.mdx b/docs/source/openarms.mdx index 04b3b6926..96eefcf1f 100644 --- a/docs/source/openarms.mdx +++ b/docs/source/openarms.mdx @@ -34,51 +34,7 @@ Each arm has the following motor configuration based on the [OpenArm setup guide For dual arm setups, the left arm uses IDs 0x09-0x10 for joints 1-8 with the same motor types. -## Quick Start (macOS) - -If you're on macOS, here's the fastest way to get started: - -```bash -# 1. Install LeRobot with OpenArms dependencies -pip install -e ".[openarms]" - -# 2. Find your USB-CAN adapter -ls /dev/cu.usbmodem* - -# 3. Test communication -python3 -c " -import can -bus = can.interface.Bus(channel='/dev/cu.usbmodem00000000050C1', interface='slcan', bitrate=1000000) -print('✓ CAN interface connected') -bus.shutdown() -" -``` - -Then use this configuration: - -```python -from lerobot.robots.openarms import OpenArmsFollower -from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig - -config = OpenArmsFollowerConfig( - port="/dev/cu.usbmodem00000000050C1", # Your adapter - can_interface="auto", # Auto-detects slcan for /dev/* ports - is_dual_arm=True, -) - -robot = OpenArmsFollower(config) -robot.connect() -``` - -## Prerequisites - -### Software Requirements - -**Linux:** -- Ubuntu 22.04/24.04 (or any Linux with SocketCAN) -- Python 3.8+ -- `can-utils` and `iproute2` packages -- LeRobot with OpenArms dependencies +## Quick Start ```bash # Install system dependencies @@ -88,26 +44,6 @@ sudo apt install can-utils iproute2 pip install -e ".[openarms]" ``` -**macOS:** -- macOS 12+ (Monterey or later) -- Python 3.8+ -- LeRobot with OpenArms dependencies - -```bash -# Install LeRobot with OpenArms support (includes python-can) -pip install -e ".[openarms]" -``` - -The `openarms` extra installs: -- `python-can>=4.2.0` - CAN bus communication library (supports SocketCAN on Linux and SLCAN on macOS) - -:::tip -If you've already installed LeRobot and want to add OpenArms support, run: -```bash -pip install -e ".[openarms]" -``` -::: - ## Setup Guide ### Step 1: Motor ID Configuration @@ -117,10 +53,9 @@ pip install -e ".[openarms]" Refer to the [OpenArm Motor ID Configuration Guide](https://docs.openarm.dev/software/setup/motor-id) for detailed instructions using the Damiao Debugging Tools on Windows. Key points: -- Each motor needs a unique **Sender CAN ID** (0x01-0x08 for first arm) -- Each motor needs a unique **Receiver/Master ID** (0x11-0x18 for first arm) +- Each motor needs a unique **Sender CAN ID** (0x01-0x08) +- Each motor needs a unique **Receiver/Master ID** (0x11-0x18) - Use the Damiao Debugging Tools to set these IDs -- For dual arm setups, use 0x09-0x10 for the second arm ### Step 2: Setup CAN Interface @@ -132,95 +67,39 @@ Configure your CAN interface as described in the [OpenArm CAN Setup Guide](https # Find your CAN interface ip link show -# Setup CAN 2.0 at 1 Mbps (standard) +# Configure can0, 1, 2, 3 sudo ip link set can0 down sudo ip link set can0 type can bitrate 1000000 sudo ip link set can0 up +sudo ip link set can1 down +sudo ip link set can1 type can bitrate 1000000 +sudo ip link set can1 up + +sudo ip link set can2 down +sudo ip link set can2 type can bitrate 1000000 +sudo ip link set can2 up + +sudo ip link set can3 down +sudo ip link set can3 type can bitrate 1000000 +sudo ip link set can3 up + # Verify configuration ip link show can0 ``` -#### macOS +or run: -macOS doesn't have native SocketCAN support. +`examples/openarms/setup_can.sh` -**Use SLCAN (Serial Line CAN)** +### Testing canbus and motor connection -For USB-CAN adapters that support SLCAN protocol (like CANable): - -```bash -# Install python-can if not already installed -pip install python-can - -# The adapter will appear as a serial device -ls /dev/cu.usbmodem* - -# Use with python-can slcan interface -# Example: /dev/cu.usbmodem14201 -``` - -In your code, specify the slcan interface: - -```python -from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig - -config = OpenArmsFollowerConfig( - port="/dev/cu.usbmodem14201", # Your USB-CAN adapter - can_interface="slcan", # Will auto-detect if set to "auto" -) -``` - -### Step 3: Test Motor Communication - -**On Linux:** - -Test basic communication as described in the [OpenArm Motor Test Guide](https://docs.openarm.dev/software/setup/configure-test): - -```bash -# Terminal 1: Monitor CAN traffic -candump can0 - -# Terminal 2: Enable motor 1 -cansend can0 001#FFFFFFFFFFFFFFFC - -# Expected response on Terminal 1: -# can0 011 [8] XX XX XX XX XX XX XX XX - -# Disable motor 1 -cansend can0 001#FFFFFFFFFFFFFFFD -``` - -**On macOS:** - -Testing is done differently since you'll use serial-based adapters: - -```bash -# Find your USB-CAN adapter -ls /dev/cu.usbmodem* - -# Example output: /dev/cu.usbmodem00000000050C1 - -# Test with Python directly (can-utils don't work on macOS) -python3 -c " -import can -bus = can.interface.Bus(channel='/dev/cu.usbmodem00000000050C1', interface='slcan', bitrate=1000000) -msg = can.Message(arbitration_id=0x01, data=[0xFF]*7+[0xFC]) -bus.send(msg) -response = bus.recv(timeout=1.0) -if response: - print(f'✓ Motor responded: ID 0x{response.arbitration_id:02X}') -else: - print('✗ No response') -bus.shutdown() -" -``` +Please run this script to check if all motors can be found and to find your can-fd speed: `python examples/openarms/debug_can_communication.py` ## Usage ### Basic Setup -**On Linux:** ```python from lerobot.robots.openarms import OpenArmsFollower @@ -238,26 +117,6 @@ robot = OpenArmsFollower(config) robot.connect() ``` -**On macOS:** - -```python -from lerobot.robots.openarms import OpenArmsFollower -from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig - -# Find your USB-CAN adapter first -# ls /dev/cu.usbmodem* - -config = OpenArmsFollowerConfig( - port="/dev/cu.usbmodem14201", # Your adapter's serial port - can_interface="slcan", # Or "auto" for auto-detection - id="openarms_dual", - is_dual_arm=True, -) - -robot = OpenArmsFollower(config) -robot.connect() -``` - ### Calibration On first use, you'll need to calibrate the robot: @@ -424,53 +283,6 @@ follower_config = OpenArmsFollowerConfig( ## Troubleshooting -### Motors Not Responding - -**Linux:** -1. **Check power supply**: 24V with sufficient current -2. **Verify CAN interface**: `ip link show can0` should show "UP" -3. **Test with cansend**: Follow [motor test guide](https://docs.openarm.dev/software/setup/configure-test) -4. **Check motor IDs**: Use Damiao Debugging Tools to verify IDs -5. **Check termination**: 120Ω resistor should be enabled on CAN interface - -**macOS:** -1. **Check power supply**: 24V with sufficient current -2. **Find adapter**: `ls /dev/cu.usbmodem*` should show your device -3. **Test connection**: Use Python script above to test communication -4. **Check motor IDs**: Use Damiao Debugging Tools on Windows -5. **Verify drivers**: Ensure USB-CAN adapter drivers are installed -6. **Try different baudrate**: Some adapters default to different rates - -### macOS-Specific Issues - -**"No such interface" error:** -```python -# Try auto-detection -config.can_interface = "auto" - -# Or explicitly list available interfaces -import can -print(can.detect_available_configs()) -``` - -**Permission denied on `/dev/cu.*`:** -```bash -# Add user to dialout group (if applicable) -sudo dscl . -append /Groups/_dialout GroupMembership $USER - -# Or run with sudo (not recommended) -sudo python your_script.py -``` - -**Adapter not showing up:** -```bash -# Check USB devices -system_profiler SPUSBDataType - -# Reinstall python-can -pip install --upgrade --force-reinstall python-can -``` - ### Motor Shaking/Unstable - **Lower control gains**: Reduce `position_kp` and `position_kd` @@ -480,7 +292,6 @@ pip install --upgrade --force-reinstall python-can ### CAN Bus Errors -**Linux:** ```bash # Check for errors ip -s link show can0 @@ -490,30 +301,6 @@ sudo ip link set can0 down sudo ip link set can0 up ``` -**macOS:** -```bash -# Reconnect USB adapter -# Unplug and replug the USB cable - -# Restart Python script -# The slcan interface auto-reconnects -``` - -### Position Drift - -- **Re-calibrate**: Run calibration procedure -- **Set zero position**: Use `robot.bus.set_zero_position()` with arm in known position -- **Check temperature**: Motors may drift when hot - -## Technical Details - -### Position Units - -All positions are in **degrees**: -- Motor internal representation: radians -- User API: degrees -- Automatic conversion handled by `DamiaoMotorsBus` - ### Control Mode OpenArms uses **MIT control mode** which allows simultaneous control of: diff --git a/examples/openarms/debug_can_communication.py b/examples/openarms/debug_can_communication.py new file mode 100644 index 000000000..67c197d00 --- /dev/null +++ b/examples/openarms/debug_can_communication.py @@ -0,0 +1,416 @@ +#!/usr/bin/env python3 +""" +Comprehensive debug script for OpenArms CAN FD communication. +Tests all 4 CAN interfaces with CAN FD support. +""" + +import can +import time +import sys +import subprocess + +def check_can_interface(port): + """Check if CAN interface is UP and configured.""" + try: + result = subprocess.run(['ip', 'link', 'show', port], + capture_output=True, text=True) + if result.returncode != 0: + return False, "Interface not found", None + + output = result.stdout + if 'UP' not in output: + return False, "Interface is DOWN", None + + # Check if CAN FD is enabled + is_fd = 'fd on' in output.lower() or 'canfd' in output.lower() + + return True, "Interface is UP", is_fd + except FileNotFoundError: + return None, "Cannot check (ip command not found)", None + + +def test_motor_on_interface(bus, motor_id, timeout=2.0, use_fd=False): + """ + Test a single motor and return all responses. + + Returns: + list of (arbitration_id, data) tuples for all responses received + """ + # Send enable command + enable_msg = can.Message( + arbitration_id=motor_id, + data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC], + is_extended_id=False, + is_fd=use_fd + ) + + try: + bus.send(enable_msg) + except Exception as e: + return None, f"Send error: {e}" + + # Listen for responses + responses = [] + start_time = time.time() + + while time.time() - start_time < timeout: + msg = bus.recv(timeout=0.1) + if msg: + responses.append((msg.arbitration_id, msg.data, msg.is_fd if hasattr(msg, 'is_fd') else False)) + + # Send disable command + disable_msg = can.Message( + arbitration_id=motor_id, + data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD], + is_extended_id=False, + is_fd=use_fd + ) + try: + bus.send(disable_msg) + except: + pass + + return responses, None + + +def test_interface(port, interface_type="socketcan", use_can_fd=True): + """Test all 8 motors on a single CAN interface.""" + + results = { + 'interface': port, + 'status': None, + 'is_fd': use_can_fd, + 'motors': {} + } + + # Check interface status + status_ok, status_msg, interface_has_fd = check_can_interface(port) + + if interface_has_fd is not None: + results['interface_fd_enabled'] = interface_has_fd + if use_can_fd and not interface_has_fd: + status_msg += " (CAN FD NOT enabled on interface!)" + elif interface_has_fd: + status_msg += " (CAN FD enabled)" + + results['status'] = status_msg + + if status_ok is False: + return results + + # Try to connect + try: + if use_can_fd: + print(f" Connecting to {port} with CAN FD (1 Mbps / 5 Mbps)...") + bus = can.interface.Bus( + channel=port, + interface=interface_type, + bitrate=1000000, + data_bitrate=5000000, + fd=True + ) + else: + print(f" Connecting to {port} with CAN 2.0 (1 Mbps)...") + bus = can.interface.Bus( + channel=port, + interface=interface_type, + bitrate=1000000 + ) + except Exception as e: + results['status'] = f"Connection failed: {e}" + return results + + try: + # Clear any pending messages + while bus.recv(timeout=0.01): + pass + + # Test each motor (0x01 to 0x08) + for motor_id in range(0x01, 0x09): + responses, error = test_motor_on_interface(bus, motor_id, timeout=1.0, use_fd=use_can_fd) + + if error: + results['motors'][motor_id] = {'error': error} + elif responses: + results['motors'][motor_id] = { + 'found': True, + 'responses': responses + } + else: + results['motors'][motor_id] = { + 'found': False, + 'responses': [] + } + + time.sleep(0.05) # Small delay between motors + + finally: + bus.shutdown() + + return results + + +def print_results(all_results): + """Print formatted results for all interfaces.""" + + print("SUMMARY - Motors Found on Each Interface") + + motor_names = { + 0x01: "joint_1 (Shoulder pan)", + 0x02: "joint_2 (Shoulder lift)", + 0x03: "joint_3 (Shoulder rotation)", + 0x04: "joint_4 (Elbow flex)", + 0x05: "joint_5 (Wrist roll)", + 0x06: "joint_6 (Wrist pitch)", + 0x07: "joint_7 (Wrist rotation)", + 0x08: "gripper", + } + + total_found = 0 + + for result in all_results: + interface = result['interface'] + status = result['status'] + + print(f"{interface}: {status}") + if result.get('is_fd'): + print(f" Mode: CAN FD") + else: + print(f" Mode: CAN 2.0") + + if 'Connection failed' in status or 'DOWN' in status: + print(f" ⚠ Cannot test {interface}") + continue + + motors_found = 0 + + for motor_id in range(0x01, 0x09): + motor_data = result['motors'].get(motor_id, {}) + motor_name = motor_names.get(motor_id, "Unknown") + + if motor_data.get('error'): + print(f" Motor 0x{motor_id:02X} ({motor_name}): ✗ {motor_data['error']}") + elif motor_data.get('found'): + motors_found += 1 + total_found += 1 + responses = motor_data['responses'] + print(f" Motor 0x{motor_id:02X} ({motor_name}): ✓ FOUND") + + for resp_id, data, is_fd in responses: + data_hex = data.hex() + fd_flag = " [FD]" if is_fd else " [2.0]" + print(f" → Response from 0x{resp_id:02X}{fd_flag}: {data_hex}") + else: + print(f" Motor 0x{motor_id:02X} ({motor_name}): ✗ No response") + + print(f"\n Summary: {motors_found}/8 motors found on {interface}") + + # Overall summary + print("OVERALL SUMMARY") + print(f"Total motors found across all interfaces: {total_found}") + + # Analyze configuration + print("DIAGNOSIS") + + for result in all_results: + interface = result['interface'] + motors_found = sum(1 for m in result['motors'].values() if m.get('found')) + + if motors_found == 0: + print(f"\n⚠ {interface}: NO MOTORS FOUND") + print(" Possible issues:") + print(" 1. CAN FD mode mismatch (interface vs motor configuration)") + print(" 2. Missing 120Ω termination resistors at BOTH cable ends") + print(" 3. Motor timeout parameter set incorrectly (should NOT be 0)") + print(" 4. CANH/CANL wiring issue") + print(" 5. Cable too long (>40m for CAN FD at 5Mbps)") + + # Check FD mismatch + if result.get('is_fd') and not result.get('interface_fd_enabled'): + print(" ⚠️ CRITICAL: Trying CAN FD but interface NOT configured for FD!") + print(f" Fix: sudo ip link set {interface} type can bitrate 1000000 dbitrate 5000000 fd on") + + elif motors_found < 8: + print(f"\n⚠ {interface}: Only {motors_found}/8 motors responding") + print(" Check power and connections for missing motors") + else: + print(f"\n✓ {interface}: All 8 motors responding correctly!") + + # Check for unexpected response IDs + print("RESPONSE ID ANALYSIS") + + for result in all_results: + interface = result['interface'] + unexpected = [] + + for motor_id, motor_data in result['motors'].items(): + if motor_data.get('found'): + expected_id = motor_id + 0x10 + actual_ids = [resp[0] for resp in motor_data['responses']] + + if expected_id not in actual_ids: + unexpected.append((motor_id, actual_ids)) + + if unexpected: + print(f"\n⚠ {interface}: Unexpected response IDs detected") + for motor_id, actual_ids in unexpected: + expected_id = motor_id + 0x10 + print(f" Motor 0x{motor_id:02X}: Expected 0x{expected_id:02X}, " + f"got {[f'0x{id:02X}' for id in actual_ids]}") + print(" → Motor Master IDs need reconfiguration") + else: + motors_found = sum(1 for m in result['motors'].values() if m.get('found')) + if motors_found > 0: + print(f"\n✓ {interface}: All responding motors use correct IDs") + + +def test_communication_speed(interface, motor_id, num_iterations=100): + """ + Test communication speed with a motor. + + Returns: + tuple: (hz, avg_latency_ms) or (None, None) if test failed + """ + try: + # Connect to interface + bus = can.interface.Bus( + channel=interface, + interface="socketcan", + bitrate=1000000, + data_bitrate=5000000, + fd=True + ) + + # Send refresh commands and measure round-trip time + latencies = [] + successful = 0 + + for _ in range(num_iterations): + start = time.perf_counter() + + # Send enable command (lightweight operation) + enable_msg = can.Message( + arbitration_id=motor_id, + data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC], + is_extended_id=False, + is_fd=True + ) + bus.send(enable_msg) + + # Wait for response + msg = bus.recv(timeout=0.1) + + if msg: + latency = (time.perf_counter() - start) * 1000 # Convert to ms + latencies.append(latency) + successful += 1 + + bus.shutdown() + + if successful > 0: + avg_latency = sum(latencies) / len(latencies) + hz = 1000.0 / avg_latency if avg_latency > 0 else 0 + return hz, avg_latency + + return None, None + + except Exception as e: + print(f" Speed test error: {e}") + return None, None + + +def main(): + """Main function to test all CAN interfaces with CAN FD.""" + + print("\nThis will test all 4 CAN interfaces (can0-can3) with CAN FD") + print("Testing motors 0x01-0x08 on each interface") + print() + print("Make sure:") + print(" ✓ Motors are powered (24V)") + print(" ✓ CAN interfaces configured with FD mode:") + print(" ./examples/openarms/setup_can.sh") + print(" ✓ Motor 'timeout' parameter NOT set to 0 (use Damiao tools)") + print(" ✓ CAN wiring includes 120Ω termination at BOTH ends") + print() + + input("Press ENTER to start testing...") + + # Test all 4 interfaces with CAN FD + all_results = [] + + for i in range(4): + interface = f"can{i}" + print(f"Testing {interface}...") + + result = test_interface(interface, use_can_fd=True) + all_results.append(result) + + # Quick status + if 'Connection failed' in result['status'] or 'DOWN' in result['status']: + print(f" ⚠ {interface}: {result['status']}") + else: + motors_found = sum(1 for m in result['motors'].values() if m.get('found')) + print(f" {interface}: {motors_found}/8 motors found") + + time.sleep(0.2) + + # Print detailed results + print_results(all_results) + + print("Testing Complete!") + + all_found = sum(sum(1 for m in r['motors'].values() if m.get('found')) for r in all_results) + + if all_found == 0: + print("\n⚠️ CRITICAL: No motors found on any interface!") + print("\nTop issues to check:") + print(" 1. Motor 'timeout' parameter (use Damiao tools to set > 0)") + print(" 2. CAN FD not enabled (run ./examples/openarms/setup_can.sh)") + print(" 3. Missing termination resistors") + print("\nTry:") + print(" a) Check motor parameters with Damiao Debugging Tools") + print(" b) Verify CAN FD is enabled: ip -d link show can0 | grep fd") + print(" c) Run setup script: ./examples/openarms/setup_can.sh") + else: + # Run speed test on interfaces with motors + print("COMMUNICATION SPEED TEST") + print("\nTesting maximum communication frequency...") + + for result in all_results: + interface = result['interface'] + + # Find first responding motor + responding_motor = None + for motor_id, motor_data in result['motors'].items(): + if motor_data.get('found'): + responding_motor = motor_id + break + + if responding_motor: + print(f"\n{interface}: Testing with motor 0x{responding_motor:02X}...") + hz, latency = test_communication_speed(interface, responding_motor, num_iterations=100) + + if hz: + print(f" ✓ Max frequency: {hz:.1f} Hz") + print(f" ✓ Avg latency: {latency:.2f} ms") + print(f" ✓ Commands per second: ~{int(hz)}") + else: + print(f" ✗ Speed test failed") + else: + print(f"\n{interface}: No motors found, skipping speed test") + + print() + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\nTesting interrupted by user.") + sys.exit(1) + except Exception as e: + print(f"\nUnexpected error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + diff --git a/examples/openarms/setup_can.sh b/examples/openarms/setup_can.sh new file mode 100755 index 000000000..542b7353a --- /dev/null +++ b/examples/openarms/setup_can.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# Setup all OpenArms CAN interfaces with CAN FD + +set -e + +echo "==========================================" +echo "OpenArms CAN FD Interface Setup" +echo "==========================================" +echo "" +echo "Mode: CAN FD" +echo " - Nominal bitrate: 1 Mbps" +echo " - Data bitrate: 5 Mbps" +echo "" +echo "Configuring interfaces can0, can1, can2, can3..." +echo "" + +# Configure each CAN interface with CAN FD +for i in 0 1 2 3; do + interface="can$i" + + # Check if interface exists + if ! ip link show "$interface" &> /dev/null; then + echo "⚠ $interface: Not found, skipping" + continue + fi + + # Bring down interface + sudo ip link set "$interface" down 2>/dev/null + + # Configure CAN FD mode + sudo ip link set "$interface" type can \ + bitrate 1000000 \ + dbitrate 5000000 \ + fd on + + # Bring up interface + sudo ip link set "$interface" up + + # Verify configuration + if ip link show "$interface" | grep -q "UP"; then + echo "✓ $interface: Configured and UP" + else + echo "✗ $interface: Failed to bring UP" + fi +done + +echo "" +echo "==========================================" +echo "Verification" +echo "==========================================" +echo "" + +# Show detailed status for each interface +for i in 0 1 2 3; do + interface="can$i" + if ip link show "$interface" &> /dev/null; then + echo "$interface:" + # Show key parameters + ip -d link show "$interface" | grep -E "can|state|bitrate|dbitrate" | head -3 + echo "" + fi +done + +echo "==========================================" +echo "Setup Complete!" +echo "==========================================" +echo "" +echo "All interfaces configured for CAN FD mode" +echo "" +echo "Next steps:" +echo " 1. Test motors: python debug_can_communication.py" +echo " 2. Run teleoperation: python examples/openarms/teleop.py" +echo "" \ No newline at end of file diff --git a/examples/openarms/teleop.py b/examples/openarms/teleop.py new file mode 100644 index 000000000..d67491e1b --- /dev/null +++ b/examples/openarms/teleop.py @@ -0,0 +1,148 @@ +""" +OpenArms Teleoperation Example - Full Dual Arms + +This script demonstrates teleoperation of OpenArms follower robot using an OpenArms leader arm. +It first calibrates both devices, then enters a teleoperation loop for both arms. +""" + +import time + +from lerobot.robots.openarms.openarms_follower import OpenArmsFollower +from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig +from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader +from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig + + +follower_config = OpenArmsFollowerConfig( + port_left="can0", # CAN interface for follower left arm + port_right="can1", # CAN interface for follower right arm + can_interface="socketcan", # Linux SocketCAN + id="openarms_follower", + disable_torque_on_disconnect=True, + max_relative_target=5.0, # Safety limit +) + + +leader_config = OpenArmsLeaderConfig( + port_left="can2", # CAN interface for leader left arm + port_right="can3", # CAN interface for leader right arm + can_interface="socketcan", # Linux SocketCAN + id="openarms_leader", + manual_control=True, # Enable manual control (torque disabled) +) + +print("=" * 60) +print("OpenArms Teleoperation - Full Dual Arms") +print("=" * 60) + +# Initialize devices +print("\n[1/4] Initializing devices...") +follower = OpenArmsFollower(follower_config) +leader = OpenArmsLeader(leader_config) + +# Connect and calibrate follower +print("\n[2/4] Connecting and calibrating follower robot...") +print("Note: If you have existing calibration, just press ENTER to use it.") +follower.connect(calibrate=True) + +# Connect and calibrate leader +print("\n[3/4] Connecting and calibrating leader arm...") +print("Note: The leader arm will have torque disabled for manual control.") +leader.connect(calibrate=True) + +# Wait for user to be ready +print("\n[4/4] Ready for teleoperation!") +print("\nBoth arms will be controlled (16 motors total):") +print(" RIGHT ARM: joints 1-7 + gripper") +print(" LEFT ARM: joints 1-7 + gripper") + +print("\nPress ENTER to start teleoperation...") +input() + +print("\nTeleoperation started! Move both leader arms.") +print("Press Ctrl+C to stop.\n") + +# All joints for both arms (16 motors total) +all_joints = [ + # Right arm + "right_joint_1", + "right_joint_2", + "right_joint_3", + "right_joint_4", + "right_joint_5", + "right_joint_6", + "right_joint_7", + "right_gripper", + # Left arm + "left_joint_1", + "left_joint_2", + "left_joint_3", + "left_joint_4", + "left_joint_5", + "left_joint_6", + "left_joint_7", + "left_gripper", +] + +# Performance monitoring +loop_times = [] +start_time = time.perf_counter() +last_print_time = start_time + +try: + while True: + loop_start = time.perf_counter() + + # Get action from leader + leader_action = leader.get_action() + + # Filter to only position data for all joints (both arms) + joint_action = {} + for joint in all_joints: + pos_key = f"{joint}.pos" + if pos_key in leader_action: + joint_action[pos_key] = leader_action[pos_key] + + # Send action to follower (both arms) + if joint_action: + follower.send_action(joint_action) + + # Measure loop time + loop_end = time.perf_counter() + loop_time = loop_end - loop_start + loop_times.append(loop_time) + + # Print stats every 2 seconds + if loop_end - last_print_time >= 2.0: + if loop_times: + avg_time = sum(loop_times) / len(loop_times) + current_hz = 1.0 / avg_time if avg_time > 0 else 0 + min_time = min(loop_times) + max_time = max(loop_times) + max_hz = 1.0 / min_time if min_time > 0 else 0 + min_hz = 1.0 / max_time if max_time > 0 else 0 + + print(f"[Hz Stats] Avg: {current_hz:.1f} Hz | " + f"Range: {min_hz:.1f}-{max_hz:.1f} Hz | " + f"Avg loop time: {avg_time*1000:.1f} ms") + + # Reset for next measurement window + loop_times = [] + last_print_time = loop_end + +except KeyboardInterrupt: + print("\n\nStopping teleoperation...") +finally: + # Disconnect devices + print("Disconnecting devices...") + try: + follower.disconnect() + except Exception as e: + print(f"Error disconnecting follower: {e}") + + try: + leader.disconnect() + except Exception as e: + print(f"Error disconnecting leader: {e}") + + print("Done!") \ No newline at end of file diff --git a/src/lerobot/motors/damiao/damiao.py b/src/lerobot/motors/damiao/damiao.py index 90fa69c92..4fbfde470 100644 --- a/src/lerobot/motors/damiao/damiao.py +++ b/src/lerobot/motors/damiao/damiao.py @@ -16,6 +16,7 @@ import logging import time +from contextlib import contextmanager from copy import deepcopy from functools import cached_property from typing import Dict, List, Optional, Tuple, Union @@ -74,6 +75,9 @@ class DamiaoMotorsBus(MotorsBusBase): motors: dict[str, Motor], calibration: dict[str, MotorCalibration] | None = None, can_interface: str = "auto", + use_can_fd: bool = True, + bitrate: int = 1000000, + data_bitrate: int | None = 5000000, ): """ Initialize the Damiao motors bus. @@ -83,10 +87,16 @@ class DamiaoMotorsBus(MotorsBusBase): motors: Dictionary mapping motor names to Motor objects calibration: Optional calibration data can_interface: CAN interface type - "auto" (default), "socketcan" (Linux), or "slcan" (macOS/serial) + use_can_fd: Whether to use CAN FD mode (default: True for OpenArms) + bitrate: Nominal bitrate in bps (default: 1000000 = 1 Mbps) + data_bitrate: Data bitrate for CAN FD in bps (default: 5000000 = 5 Mbps), ignored if use_can_fd is False """ super().__init__(port, motors, calibration) self.port = port self.can_interface = can_interface + self.use_can_fd = use_can_fd + self.bitrate = bitrate + self.data_bitrate = data_bitrate self.canbus = None self._is_connected = False @@ -138,26 +148,48 @@ class DamiaoMotorsBus(MotorsBusBase): # Connect to CAN bus if self.can_interface == "socketcan": - # Linux SocketCAN - self.canbus = can.interface.Bus( - channel=self.port, - interface="socketcan", - bitrate=self.default_baudrate - ) + # Linux SocketCAN with CAN FD support + if self.use_can_fd and self.data_bitrate is not None: + self.canbus = can.interface.Bus( + channel=self.port, + interface="socketcan", + bitrate=self.bitrate, + data_bitrate=self.data_bitrate, + fd=True + ) + logger.info(f"Connected to {self.port} with CAN FD (bitrate={self.bitrate}, data_bitrate={self.data_bitrate})") + else: + self.canbus = can.interface.Bus( + channel=self.port, + interface="socketcan", + bitrate=self.bitrate + ) + logger.info(f"Connected to {self.port} with CAN 2.0 (bitrate={self.bitrate})") elif self.can_interface == "slcan": # Serial Line CAN (macOS, Windows, or USB adapters) + # Note: SLCAN typically doesn't support CAN FD self.canbus = can.interface.Bus( channel=self.port, interface="slcan", - bitrate=self.default_baudrate + bitrate=self.bitrate ) + logger.info(f"Connected to {self.port} with SLCAN (bitrate={self.bitrate})") else: # Generic interface (vector, pcan, etc.) - self.canbus = can.interface.Bus( - channel=self.port, - interface=self.can_interface, - bitrate=self.default_baudrate - ) + if self.use_can_fd and self.data_bitrate is not None: + self.canbus = can.interface.Bus( + channel=self.port, + interface=self.can_interface, + bitrate=self.bitrate, + data_bitrate=self.data_bitrate, + fd=True + ) + else: + self.canbus = can.interface.Bus( + channel=self.port, + interface=self.can_interface, + bitrate=self.bitrate + ) self._is_connected = True @@ -251,6 +283,24 @@ class DamiaoMotorsBus(MotorsBusBase): raise e time.sleep(0.01) + @contextmanager + def torque_disabled(self, motors: str | list[str] | None = None): + """ + Context manager that guarantees torque is re-enabled. + + This helper is useful to temporarily disable torque when configuring motors. + + Examples: + >>> with bus.torque_disabled(): + ... # Safe operations here with torque disabled + ... pass + """ + self.disable_torque(motors) + try: + yield + finally: + self.enable_torque(motors) + def set_zero_position(self, motors: str | list[str] | None = None) -> None: """Set current position as zero for selected motors.""" motors = self._get_motors_list(motors) diff --git a/src/lerobot/robots/openarms/config_openarms_follower.py b/src/lerobot/robots/openarms/config_openarms_follower.py index b8571de5c..daeb0e43b 100644 --- a/src/lerobot/robots/openarms/config_openarms_follower.py +++ b/src/lerobot/robots/openarms/config_openarms_follower.py @@ -15,7 +15,7 @@ # limitations under the License. from dataclasses import dataclass, field -from typing import Dict, List, Optional +from typing import Dict, Optional from lerobot.cameras import CameraConfig from lerobot.motors.damiao.tables import MotorType @@ -28,13 +28,21 @@ from ..config import RobotConfig class OpenArmsFollowerConfig(RobotConfig): """Configuration for the OpenArms follower robot with Damiao motors.""" - # CAN interface to connect to + # CAN interfaces - one per arm + # Right arm CAN interface (e.g., "can0") + # Left arm CAN interface (e.g., "can1") # Linux: "can0", "can1", etc. # macOS: "/dev/cu.usbmodem*" (serial device) - port: str = "can0" + port_right: str = "can0" # CAN interface for right arm + port_left: str = "can1" # CAN interface for left arm # CAN interface type: "socketcan" (Linux), "slcan" (macOS/serial), or "auto" (auto-detect) - can_interface: str = "auto" + can_interface: str = "socketcan" + + # CAN FD settings (OpenArms uses CAN FD by default) + use_can_fd: bool = True + can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps) + can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps) # Whether to disable torque when disconnecting disable_torque_on_disconnect: bool = True @@ -64,9 +72,10 @@ class OpenArmsFollowerConfig(RobotConfig): "gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310) }) - # MIT control parameters for position control - position_kp: float = 10.0 # Position gain - position_kd: float = 0.5 # Velocity damping + # MIT control parameters for position control (per motor) + # Values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper] + position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0]) + position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2]) # Calibration parameters calibration_mode: str = "manual" # "manual" or "auto" diff --git a/src/lerobot/robots/openarms/openarms_follower.py b/src/lerobot/robots/openarms/openarms_follower.py index cf6a758e8..d65cd98c5 100644 --- a/src/lerobot/robots/openarms/openarms_follower.py +++ b/src/lerobot/robots/openarms/openarms_follower.py @@ -48,31 +48,44 @@ class OpenArmsFollower(Robot): super().__init__(config) self.config = config - norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors - motors = {} + norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors - # Right arm + # Right arm motors (on port_right) + # Each arm uses the same CAN IDs since they're on separate buses + motors_right = {} for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items(): - prefixed_name = f"right_{motor_name}" motor = Motor(send_id, motor_type_str, norm_mode_body) motor.recv_id = recv_id motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_")) - motors[prefixed_name] = motor + motors_right[motor_name] = motor - # Left arm (offset IDs by 8) + # Left arm motors (on port_left, same IDs as right since separate bus) + motors_left = {} for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items(): - prefixed_name = f"left_{motor_name}" - motor = Motor(send_id + 0x08, motor_type_str, norm_mode_body) - motor.recv_id = recv_id + 0x08 + motor = Motor(send_id, motor_type_str, norm_mode_body) + motor.recv_id = recv_id motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_")) - motors[prefixed_name] = motor + motors_left[motor_name] = motor - # Initialize the Damiao motors bus - self.bus = DamiaoMotorsBus( - port=self.config.port, - motors=motors, - calibration=self.calibration, + # Initialize separate Damiao motors buses (one per arm) with CAN FD support + self.bus_right = DamiaoMotorsBus( + port=self.config.port_right, + motors=motors_right, + calibration={k.replace("right_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("right_")}, can_interface=self.config.can_interface, + use_can_fd=self.config.use_can_fd, + bitrate=self.config.can_bitrate, + data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None, + ) + + self.bus_left = DamiaoMotorsBus( + port=self.config.port_left, + motors=motors_left, + calibration={k.replace("left_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("left_")}, + can_interface=self.config.can_interface, + use_can_fd=self.config.use_can_fd, + bitrate=self.config.can_bitrate, + data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None, ) # Initialize cameras @@ -92,10 +105,16 @@ class OpenArmsFollower(Robot): def _motors_ft(self) -> Dict[str, type]: """Motor features for observation and action spaces.""" features = {} - for motor in self.bus.motors: - features[f"{motor}.pos"] = float - features[f"{motor}.vel"] = float - features[f"{motor}.torque"] = float + # Right arm motors + for motor in self.bus_right.motors: + features[f"right_{motor}.pos"] = float + features[f"right_{motor}.vel"] = float + features[f"right_{motor}.torque"] = float + # Left arm motors + for motor in self.bus_left.motors: + features[f"left_{motor}.pos"] = float + features[f"left_{motor}.vel"] = float + features[f"left_{motor}.torque"] = float return features @property @@ -119,20 +138,25 @@ class OpenArmsFollower(Robot): @property def is_connected(self) -> bool: """Check if robot is connected.""" - return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) + return (self.bus_right.is_connected and + self.bus_left.is_connected and + all(cam.is_connected for cam in self.cameras.values())) def connect(self, calibrate: bool = True) -> None: """ Connect to the robot and optionally calibrate. - We assume that at connection time, the arm is in a safe rest position, + We assume that at connection time, the arms are in a safe rest position, and torque can be safely disabled to run calibration if needed. """ if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - # Connect to CAN bus - self.bus.connect() + # Connect to both CAN buses + logger.info(f"Connecting right arm on {self.config.port_right}...") + self.bus_right.connect() + logger.info(f"Connecting left arm on {self.config.port_left}...") + self.bus_left.connect() # Run calibration if needed if not self.is_calibrated and calibrate: @@ -151,14 +175,15 @@ class OpenArmsFollower(Robot): # Optionally set zero position if self.config.zero_position_on_connect: logger.info("Setting current position as zero...") - self.bus.set_zero_position() + self.bus_right.set_zero_position() + self.bus_left.set_zero_position() logger.info(f"{self} connected.") @property def is_calibrated(self) -> bool: """Check if robot is calibrated.""" - return self.bus.is_calibrated + return self.bus_right.is_calibrated and self.bus_left.is_calibrated def calibrate(self) -> None: """ @@ -166,7 +191,7 @@ class OpenArmsFollower(Robot): The calibration procedure: 1. Disable torque - 2. Ask user to position arm in hanging position with gripper closed + 2. Ask user to position arms in hanging position with grippers closed 3. Set this as zero position 4. Record range of motion for each joint 5. Save calibration @@ -179,18 +204,32 @@ class OpenArmsFollower(Robot): ) if user_input.strip().lower() != "c": logger.info(f"Using existing calibration for {self.id}") - self.bus.write_calibration(self.calibration) + # Split calibration for each bus + cal_right = {k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")} + cal_left = {k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")} + self.bus_right.write_calibration(cal_right) + self.bus_left.write_calibration(cal_left) return logger.info(f"\nRunning calibration for {self}") + # Calibrate each arm separately + self._calibrate_arm("right", self.bus_right) + self._calibrate_arm("left", self.bus_left) + + print(f"\nCalibration complete and saved to {self.calibration_fpath}") + + def _calibrate_arm(self, arm_name: str, bus: DamiaoMotorsBus) -> None: + """Calibrate a single arm.""" + logger.info(f"\n=== Calibrating {arm_name.upper()} arm ===") + # Disable torque for manual positioning - self.bus.disable_torque() + bus.disable_torque() time.sleep(0.1) # Step 1: Set zero position input( - "\nCalibration Step 1: Zero Position\n" + f"\nCalibration: Zero Position ({arm_name.upper()} arm)\n" "Position the arm in the following configuration:\n" " - Arm hanging straight down\n" " - Gripper closed\n" @@ -198,60 +237,51 @@ class OpenArmsFollower(Robot): ) # Set current position as zero for all motors - self.bus.set_zero_position() - logger.info("Zero position set.") + bus.set_zero_position() + logger.info(f"{arm_name.capitalize()} arm zero position set.") - # Step 2: Record range of motion + # Automatically set range to -90° to +90° for all joints print( - "\nCalibration Step 2: Range of Motion\n" - "Move each joint through its full range of motion.\n" - "The system will record min/max positions.\n" - "Press ENTER when done..." + f"\nAutomatically setting range: -90° to +90° for all joints" ) - # Record ranges - range_mins, range_maxes = self.bus.record_ranges_of_motion() - - # Create calibration data (ranges are already in degrees) - self.calibration = {} - for motor_name, motor in self.bus.motors.items(): - self.calibration[motor_name] = MotorCalibration( + # Create calibration data with fixed ranges + if self.calibration is None: + self.calibration = {} + + for motor_name, motor in bus.motors.items(): + # Prefix motor name with arm name for storage + prefixed_name = f"{arm_name}_{motor_name}" + + # Use -90 to +90 for all joints and gripper (integers required) + self.calibration[prefixed_name] = MotorCalibration( id=motor.id, drive_mode=0, # Normal direction homing_offset=0, # Already set via set_zero_position - range_min=range_mins.get(motor_name, -180.0), # Default -180 degrees - range_max=range_maxes.get(motor_name, 180.0), # Default +180 degrees + range_min=-90, # -90 degrees (integer) + range_max=90, # +90 degrees (integer) ) + logger.info(f" {prefixed_name}: range set to [-90°, +90°]") - # Special handling for gripper range - if "gripper" in self.calibration: - gripper_cal = self.calibration["gripper"] - gripper_range = abs(gripper_cal.range_max - gripper_cal.range_min) - if gripper_range < 5.0: # If gripper wasn't moved much (less than 5 degrees) - # Set default gripper range in degrees - gripper_cal.range_min = 0.0 - gripper_cal.range_max = 90.0 # 90 degrees for full gripper motion - - # Write calibration to motors and save to file - self.bus.write_calibration(self.calibration) - self._save_calibration() - - print(f"\nCalibration complete and saved to {self.calibration_fpath}") + # Write calibration to this arm's motors + cal_for_bus = {k.replace(f"{arm_name}_", ""): v for k, v in self.calibration.items() if k.startswith(f"{arm_name}_")} + bus.write_calibration(cal_for_bus) # Re-enable torque - self.bus.enable_torque() + bus.enable_torque() + + # Save calibration after each arm + self._save_calibration() def configure(self) -> None: """Configure motors with appropriate settings.""" - with self.bus.torque_disabled(): - # Configure all motors - self.bus.configure_motors() - - # Set specific parameters for gripper if present - if "gripper" in self.bus.motors: - # Gripper uses lower gains to avoid damage - # These will be applied during MIT control commands - pass # Parameters are set during control commands + # Configure right arm + with self.bus_right.torque_disabled(): + self.bus_right.configure_motors() + + # Configure left arm + with self.bus_left.torque_disabled(): + self.bus_left.configure_motors() def setup_motors(self) -> None: raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.") @@ -263,16 +293,26 @@ class OpenArmsFollower(Robot): obs_dict = {} - # Read motor positions, velocities, and torques + # Read motor positions, velocities, and torques from right arm start = time.perf_counter() - positions = self.bus.sync_read("Present_Position") - velocities = self.bus.sync_read("Present_Velocity") - torques = self.bus.sync_read("Present_Torque") + positions_right = self.bus_right.sync_read("Present_Position") + velocities_right = self.bus_right.sync_read("Present_Velocity") + torques_right = self.bus_right.sync_read("Present_Torque") - for motor in self.bus.motors: - obs_dict[f"{motor}.pos"] = positions.get(motor, 0.0) - obs_dict[f"{motor}.vel"] = velocities.get(motor, 0.0) - obs_dict[f"{motor}.torque"] = torques.get(motor, 0.0) + for motor in self.bus_right.motors: + obs_dict[f"right_{motor}.pos"] = positions_right.get(motor, 0.0) + obs_dict[f"right_{motor}.vel"] = velocities_right.get(motor, 0.0) + obs_dict[f"right_{motor}.torque"] = torques_right.get(motor, 0.0) + + # Read motor positions, velocities, and torques from left arm + positions_left = self.bus_left.sync_read("Present_Position") + velocities_left = self.bus_left.sync_read("Present_Velocity") + torques_left = self.bus_left.sync_read("Present_Torque") + + for motor in self.bus_left.motors: + obs_dict[f"left_{motor}.pos"] = positions_left.get(motor, 0.0) + obs_dict[f"left_{motor}.vel"] = velocities_left.get(motor, 0.0) + obs_dict[f"left_{motor}.torque"] = torques_left.get(motor, 0.0) dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") @@ -293,7 +333,7 @@ class OpenArmsFollower(Robot): The action magnitude may be clipped based on safety limits. Args: - action: Dictionary with motor positions + action: Dictionary with motor positions (e.g., "right_joint_1.pos", "left_joint_2.pos") Returns: The action actually sent (potentially clipped) @@ -301,37 +341,69 @@ class OpenArmsFollower(Robot): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - # Extract motor positions from action - goal_pos = { - key.removesuffix(".pos"): val - for key, val in action.items() - if key.endswith(".pos") - } + # Extract motor positions from action and split by arm + goal_pos_right = {} + goal_pos_left = {} + + for key, val in action.items(): + if key.endswith(".pos"): + motor_name = key.removesuffix(".pos") + if motor_name.startswith("right_"): + # Remove "right_" prefix for bus access + goal_pos_right[motor_name.removeprefix("right_")] = val + elif motor_name.startswith("left_"): + # Remove "left_" prefix for bus access + goal_pos_left[motor_name.removeprefix("left_")] = val # Apply safety limits if configured if self.config.max_relative_target is not None: - present_pos = self.bus.sync_read("Present_Position") - goal_present_pos = { - key: (g_pos, present_pos[key]) - for key, g_pos in goal_pos.items() - } - goal_pos = ensure_safe_goal_position( - goal_present_pos, - self.config.max_relative_target - ) + # Get current positions + present_pos_right = self.bus_right.sync_read("Present_Position") + present_pos_left = self.bus_left.sync_read("Present_Position") + + # Apply safety limits to right arm + if goal_pos_right: + goal_present_pos_right = { + key: (g_pos, present_pos_right.get(key, 0.0)) + for key, g_pos in goal_pos_right.items() + } + goal_pos_right = ensure_safe_goal_position( + goal_present_pos_right, + self.config.max_relative_target + ) + + # Apply safety limits to left arm + if goal_pos_left: + goal_present_pos_left = { + key: (g_pos, present_pos_left.get(key, 0.0)) + for key, g_pos in goal_pos_left.items() + } + goal_pos_left = ensure_safe_goal_position( + goal_present_pos_left, + self.config.max_relative_target + ) - # Prepare MIT control commands for each motor - for motor_name, position_degrees in goal_pos.items(): - # Use different gains for gripper - if motor_name == "gripper": - kp = self.config.position_kp * 0.5 # Lower gain for gripper - kd = self.config.position_kd * 0.5 - else: - kp = self.config.position_kp - kd = self.config.position_kd + # Motor name to index mapping for gains + motor_index = { + "joint_1": 0, + "joint_2": 1, + "joint_3": 2, + "joint_4": 3, + "joint_5": 4, + "joint_6": 5, + "joint_7": 6, + "gripper": 7, + } + + # Send MIT control commands to right arm + for motor_name, position_degrees in goal_pos_right.items(): + # Get per-motor gains from config + idx = motor_index.get(motor_name, 0) + kp = self.config.position_kp[idx] + kd = self.config.position_kd[idx] # Send MIT control command (position is in degrees) - self.bus._mit_control( + self.bus_right._mit_control( motor_name, kp=kp, kd=kd, @@ -340,15 +412,39 @@ class OpenArmsFollower(Robot): torque=0.0 ) - return {f"{motor}.pos": val for motor, val in goal_pos.items()} + # Send MIT control commands to left arm + for motor_name, position_degrees in goal_pos_left.items(): + # Get per-motor gains from config + idx = motor_index.get(motor_name, 0) + kp = self.config.position_kp[idx] + kd = self.config.position_kd[idx] + + # Send MIT control command (position is in degrees) + self.bus_left._mit_control( + motor_name, + kp=kp, + kd=kd, + position_degrees=position_degrees, + velocity_deg_per_sec=0.0, + torque=0.0 + ) + + # Return the actions that were actually sent + result = {} + for motor, val in goal_pos_right.items(): + result[f"right_{motor}.pos"] = val + for motor, val in goal_pos_left.items(): + result[f"left_{motor}.pos"] = val + return result def disconnect(self): """Disconnect from robot.""" if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - # Disconnect from CAN bus - self.bus.disconnect(self.config.disable_torque_on_disconnect) + # Disconnect from CAN buses + self.bus_right.disconnect(self.config.disable_torque_on_disconnect) + self.bus_left.disconnect(self.config.disable_torque_on_disconnect) # Disconnect cameras for cam in self.cameras.values(): @@ -363,10 +459,10 @@ class OpenArmsFollower(Robot): def _gravity_from_q(self, q_rad: Dict[str, float]) -> Dict[str, float]: """ Compute g(q) [N·m] for all joints in the robot. - The order of joints in the URDF matches self.bus.motors. + The order of joints in the URDF matches the concatenated motor lists (right then left). Args: - q_rad: Dictionary mapping motor names to positions in radians + q_rad: Dictionary mapping motor names (with arm prefix) to positions in radians Returns: Dictionary mapping motor names to gravity torques in N·m @@ -380,14 +476,34 @@ class OpenArmsFollower(Robot): "Ensure urdf/openarms.urdf exists and is valid." ) - # Build position vector in the order of motors + # Build position vector in the order of motors (right arm, then left arm) q = np.zeros(self.pin_robot.model.nq) - for i, motor_name in enumerate(self.bus.motors): - q[i] = q_rad[motor_name] + idx = 0 + + # Right arm motors + for motor_name in self.bus_right.motors: + full_name = f"right_{motor_name}" + q[idx] = q_rad.get(full_name, 0.0) + idx += 1 + + # Left arm motors + for motor_name in self.bus_left.motors: + full_name = f"left_{motor_name}" + q[idx] = q_rad.get(full_name, 0.0) + idx += 1 # Compute generalized gravity vector g = pin.computeGeneralizedGravity(self.pin_robot.model, self.pin_robot.data, q) # Map back to motor names - return {motor_name: float(g[i]) for i, motor_name in enumerate(self.bus.motors)} + result = {} + idx = 0 + for motor_name in self.bus_right.motors: + result[f"right_{motor_name}"] = float(g[idx]) + idx += 1 + for motor_name in self.bus_left.motors: + result[f"left_{motor_name}"] = float(g[idx]) + idx += 1 + + return result diff --git a/src/lerobot/teleoperators/openarms/config_openarms_leader.py b/src/lerobot/teleoperators/openarms/config_openarms_leader.py index 0960a707d..c0f02a9e2 100644 --- a/src/lerobot/teleoperators/openarms/config_openarms_leader.py +++ b/src/lerobot/teleoperators/openarms/config_openarms_leader.py @@ -25,13 +25,21 @@ from ..config import TeleoperatorConfig class OpenArmsLeaderConfig(TeleoperatorConfig): """Configuration for the OpenArms leader/teleoperator with Damiao motors.""" - # CAN interface to connect to + # CAN interfaces - one per arm + # Right arm CAN interface (e.g., "can2") + # Left arm CAN interface (e.g., "can3") # Linux: "can0", "can1", etc. # macOS: "/dev/cu.usbmodem*" (serial device) - port: str = "can0" + port_right: str = "can2" # CAN interface for right arm + port_left: str = "can3" # CAN interface for left arm # CAN interface type: "socketcan" (Linux), "slcan" (macOS/serial), or "auto" (auto-detect) - can_interface: str = "auto" + can_interface: str = "socketcan" + + # CAN FD settings (OpenArms uses CAN FD by default) + use_can_fd: bool = True + can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps) + can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps) # Motor configuration for OpenArms (7 DOF per arm) # Maps motor names to (send_can_id, recv_can_id, motor_type) diff --git a/src/lerobot/teleoperators/openarms/openarms_leader.py b/src/lerobot/teleoperators/openarms/openarms_leader.py index f49fac053..ec712ecbc 100644 --- a/src/lerobot/teleoperators/openarms/openarms_leader.py +++ b/src/lerobot/teleoperators/openarms/openarms_leader.py @@ -16,7 +16,7 @@ import logging import time -from typing import Dict +from typing import Any, Dict from lerobot.motors import Motor, MotorCalibration, MotorNormMode from lerobot.motors.damiao import DamiaoMotorsBus @@ -44,42 +44,60 @@ class OpenArmsLeader(Teleoperator): super().__init__(config) self.config = config + norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors - norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors - motors = {} - - # Right arm (original IDs) + # Right arm motors (on port_right) + # Each arm uses the same CAN IDs since they're on separate buses + motors_right = {} for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items(): - prefixed_name = f"right_{motor_name}" motor = Motor(send_id, motor_type_str, norm_mode_body) motor.recv_id = recv_id motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_")) - motors[prefixed_name] = motor + motors_right[motor_name] = motor - # Left arm (offset IDs by 8) + # Left arm motors (on port_left, same IDs as right since separate bus) + motors_left = {} for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items(): - prefixed_name = f"left_{motor_name}" - motor = Motor(send_id + 0x08, motor_type_str, norm_mode_body) - motor.recv_id = recv_id + 0x08 + motor = Motor(send_id, motor_type_str, norm_mode_body) + motor.recv_id = recv_id motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_")) - motors[prefixed_name] = motor - - # Initialize the Damiao motors bus - self.bus = DamiaoMotorsBus( - port=self.config.port, - motors=motors, - calibration=self.calibration, + motors_left[motor_name] = motor + + # Initialize separate Damiao motors buses (one per arm) with CAN FD support + self.bus_right = DamiaoMotorsBus( + port=self.config.port_right, + motors=motors_right, + calibration={k.replace("right_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("right_")}, can_interface=self.config.can_interface, + use_can_fd=self.config.use_can_fd, + bitrate=self.config.can_bitrate, + data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None, + ) + + self.bus_left = DamiaoMotorsBus( + port=self.config.port_left, + motors=motors_left, + calibration={k.replace("left_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("left_")}, + can_interface=self.config.can_interface, + use_can_fd=self.config.use_can_fd, + bitrate=self.config.can_bitrate, + data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None, ) @property def action_features(self) -> Dict[str, type]: """Features produced by this teleoperator.""" features = {} - for motor in self.bus.motors: - features[f"{motor}.pos"] = float - features[f"{motor}.vel"] = float - features[f"{motor}.torque"] = float + # Right arm motors + for motor in self.bus_right.motors: + features[f"right_{motor}.pos"] = float + features[f"right_{motor}.vel"] = float + features[f"right_{motor}.torque"] = float + # Left arm motors + for motor in self.bus_left.motors: + features[f"left_{motor}.pos"] = float + features[f"left_{motor}.vel"] = float + features[f"left_{motor}.torque"] = float return features @property @@ -90,7 +108,7 @@ class OpenArmsLeader(Teleoperator): @property def is_connected(self) -> bool: """Check if teleoperator is connected.""" - return self.bus.is_connected + return self.bus_right.is_connected and self.bus_left.is_connected def connect(self, calibrate: bool = True) -> None: """ @@ -102,8 +120,11 @@ class OpenArmsLeader(Teleoperator): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - # Connect to CAN bus - self.bus.connect() + # Connect to CAN buses + logger.info(f"Connecting right arm on {self.config.port_right}...") + self.bus_right.connect() + logger.info(f"Connecting left arm on {self.config.port_left}...") + self.bus_left.connect() # Run calibration if needed if not self.is_calibrated and calibrate: @@ -120,7 +141,7 @@ class OpenArmsLeader(Teleoperator): @property def is_calibrated(self) -> bool: """Check if teleoperator is calibrated.""" - return self.bus.is_calibrated + return self.bus_right.is_calibrated and self.bus_left.is_calibrated def calibrate(self) -> None: """ @@ -141,18 +162,32 @@ class OpenArmsLeader(Teleoperator): ) if user_input.strip().lower() != "c": logger.info(f"Using existing calibration for {self.id}") - self.bus.write_calibration(self.calibration) + # Split calibration for each bus + cal_right = {k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")} + cal_left = {k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")} + self.bus_right.write_calibration(cal_right) + self.bus_left.write_calibration(cal_left) return logger.info(f"\nRunning calibration for {self}") + # Calibrate each arm separately + self._calibrate_arm("right", self.bus_right) + self._calibrate_arm("left", self.bus_left) + + print(f"\nCalibration complete and saved to {self.calibration_fpath}") + + def _calibrate_arm(self, arm_name: str, bus: DamiaoMotorsBus) -> None: + """Calibrate a single arm.""" + logger.info(f"\n=== Calibrating {arm_name.upper()} arm ===") + # Ensure torque is disabled for manual positioning - self.bus.disable_torque() + bus.disable_torque() time.sleep(0.1) # Step 1: Set zero position input( - "\nCalibration Step 1: Zero Position\n" + f"\nCalibration: Zero Position ({arm_name.upper()} arm)\n" "Position the arm in the following configuration:\n" " - Arm hanging straight down\n" " - Gripper closed\n" @@ -160,45 +195,38 @@ class OpenArmsLeader(Teleoperator): ) # Set current position as zero for all motors - self.bus.set_zero_position() - logger.info("Zero position set.") + bus.set_zero_position() + logger.info(f"{arm_name.capitalize()} arm zero position set.") - # Step 2: Record range of motion + # Automatically set range to -90° to +90° for all joints print( - "\nCalibration Step 2: Range of Motion\n" - "Move each joint through its full range of motion.\n" - "The system will record min/max positions.\n" - "Press ENTER when done..." + f"\nAutomatically setting range: -90° to +90° for all joints" ) - # Record ranges - range_mins, range_maxes = self.bus.record_ranges_of_motion() - - # Create calibration data (ranges are already in degrees) - self.calibration = {} - for motor_name, motor in self.bus.motors.items(): - self.calibration[motor_name] = MotorCalibration( + # Create calibration data with fixed ranges + if self.calibration is None: + self.calibration = {} + + for motor_name, motor in bus.motors.items(): + # Prefix motor name with arm name for storage + prefixed_name = f"{arm_name}_{motor_name}" + + # Use -90 to +90 for all joints and gripper (integers required) + self.calibration[prefixed_name] = MotorCalibration( id=motor.id, drive_mode=0, # Normal direction homing_offset=0, # Already set via set_zero_position - range_min=range_mins.get(motor_name, -180.0), # Default -180 degrees - range_max=range_maxes.get(motor_name, 180.0), # Default +180 degrees + range_min=-90, # -90 degrees (integer) + range_max=90, # +90 degrees (integer) ) + logger.info(f" {prefixed_name}: range set to [-90°, +90°]") - # Special handling for gripper range - if "gripper" in self.calibration: - gripper_cal = self.calibration["gripper"] - gripper_range = abs(gripper_cal.range_max - gripper_cal.range_min) - if gripper_range < 5.0: # If gripper wasn't moved much (less than 5 degrees) - # Set default gripper range in degrees - gripper_cal.range_min = 0.0 - gripper_cal.range_max = 90.0 # 90 degrees for full gripper motion + # Write calibration to this arm's motors + cal_for_bus = {k.replace(f"{arm_name}_", ""): v for k, v in self.calibration.items() if k.startswith(f"{arm_name}_")} + bus.write_calibration(cal_for_bus) - # Write calibration and save to file - self.bus.write_calibration(self.calibration) + # Save calibration after each arm self._save_calibration() - - print(f"\nCalibration complete and saved to {self.calibration_fpath}") def configure(self) -> None: """ @@ -209,44 +237,54 @@ class OpenArmsLeader(Teleoperator): if self.config.manual_control: # Disable torque for manual control logger.info("Disabling torque for manual control...") - self.bus.disable_torque() + self.bus_right.disable_torque() + self.bus_left.disable_torque() else: # Configure motors normally - self.bus.configure_motors() + self.bus_right.configure_motors() + self.bus_left.configure_motors() def setup_motors(self) -> None: raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.") - def get_observation(self) -> Dict[str, Any]: - """Get current observation from robot including position, velocity, and torque.""" + def get_action(self) -> Dict[str, Any]: + """ + Get current action from the leader arm. + + This is the main method for teleoperators - it reads the current state + of the leader arm and returns it as an action that can be sent to a follower. + """ if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - obs_dict = {} + action_dict = {} - # Read motor positions, velocities, and torques + # Read motor positions, velocities, and torques from right arm start = time.perf_counter() - positions = self.bus.sync_read("Present_Position") - velocities = self.bus.sync_read("Present_Velocity") - torques = self.bus.sync_read("Present_Torque") + positions_right = self.bus_right.sync_read("Present_Position") + velocities_right = self.bus_right.sync_read("Present_Velocity") + torques_right = self.bus_right.sync_read("Present_Torque") - for motor in self.bus.motors: - obs_dict[f"{motor}.pos"] = positions.get(motor, 0.0) - obs_dict[f"{motor}.vel"] = velocities.get(motor, 0.0) - obs_dict[f"{motor}.torque"] = torques.get(motor, 0.0) + for motor in self.bus_right.motors: + action_dict[f"right_{motor}.pos"] = positions_right.get(motor, 0.0) + action_dict[f"right_{motor}.vel"] = velocities_right.get(motor, 0.0) + action_dict[f"right_{motor}.torque"] = torques_right.get(motor, 0.0) + + # Read motor positions, velocities, and torques from left arm + positions_left = self.bus_left.sync_read("Present_Position") + velocities_left = self.bus_left.sync_read("Present_Velocity") + torques_left = self.bus_left.sync_read("Present_Torque") + + for motor in self.bus_left.motors: + action_dict[f"left_{motor}.pos"] = positions_left.get(motor, 0.0) + action_dict[f"left_{motor}.vel"] = velocities_left.get(motor, 0.0) + action_dict[f"left_{motor}.torque"] = torques_left.get(motor, 0.0) dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") - # Capture images from cameras - for cam_key, cam in self.cameras.items(): - start = time.perf_counter() - obs_dict[cam_key] = cam.async_read() - dt_ms = (time.perf_counter() - start) * 1e3 - logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") - - return obs_dict + return action_dict def send_feedback(self, feedback: Dict[str, float]) -> None: raise NotImplementedError("Feedback is not yet implemented for OpenArms leader.") @@ -259,12 +297,14 @@ class OpenArmsLeader(Teleoperator): # For manual control, ensure torque is disabled before disconnecting if self.config.manual_control: try: - self.bus.disable_torque() + self.bus_right.disable_torque() + self.bus_left.disable_torque() except Exception as e: logger.warning(f"Failed to disable torque during disconnect: {e}") - # Disconnect from CAN bus - self.bus.disconnect(disable_torque=False) # Already disabled above if needed + # Disconnect from CAN buses + self.bus_right.disconnect(disable_torque=False) # Already disabled above if needed + self.bus_left.disconnect(disable_torque=False) logger.info(f"{self} disconnected.")