pos teleop

This commit is contained in:
croissant
2025-10-31 10:01:41 +01:00
committed by Michel Aractingi
parent b011643dc9
commit fa6a2fb9b7
10 changed files with 1095 additions and 614 deletions
-166
View File
@@ -1,166 +0,0 @@
#!/usr/bin/env python3
"""
Debug script for Damiao motor CAN communication on macOS.
This replaces candump for macOS SLCAN debugging.
"""
import can
import time
import sys
def test_can_communication(port="/dev/cu.usbmodem2101"):
"""Test basic CAN communication with a Damiao motor."""
print("=" * 60)
print("Damiao Motor CAN Communication Debug Tool")
print("=" * 60)
print(f"\nPort: {port}")
print()
try:
# Connect to SLCAN
print("Step 1: Connecting to SLCAN...")
bus = can.interface.Bus(
channel=port,
interface='slcan',
bitrate=1000000
)
print("✓ Connected to SLCAN")
# Test 1: Send enable command and listen for ANY response
print("\n" + "=" * 60)
print("Test 1: Enable Motor (ID 0x01)")
print("=" * 60)
print("Sending enable command to 0x01...")
enable_msg = can.Message(
arbitration_id=0x01,
data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC],
is_extended_id=False
)
bus.send(enable_msg)
print("✓ Enable command sent")
print("\nListening for responses (2 seconds)...")
print("Expected: Response from 0x11 (master ID)")
print()
responses = []
start_time = time.time()
timeout = 2.0
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg:
responses.append(msg)
print(f" → Response from 0x{msg.arbitration_id:02X}: {msg.data.hex()}")
if not responses:
print("✗ NO RESPONSES RECEIVED")
print("\nPossible issues:")
print(" 1. Motor not powered (check 24V supply)")
print(" 2. CAN wiring incorrect (CANH, CANL, GND)")
print(" 3. Motor master ID not set to 0x11")
print(" 4. SLCAN adapter not working properly")
print(" 5. Wrong CAN port specified")
else:
print(f"\n✓ Received {len(responses)} response(s)")
# Check if we got response from expected ID
recv_ids = [msg.arbitration_id for msg in responses]
if 0x11 in recv_ids:
print("✓ Motor 0x11 is responding!")
else:
print(f"⚠ Responses from unexpected IDs: {[hex(id) for id in recv_ids]}")
# Test 2: Send refresh command
print("\n" + "=" * 60)
print("Test 2: Refresh Motor State (ID 0x01)")
print("=" * 60)
print("Sending refresh command...")
refresh_msg = can.Message(
arbitration_id=0x7FF, # Parameter ID
data=[0x01, 0x00, 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00],
is_extended_id=False
)
bus.send(refresh_msg)
print("✓ Refresh command sent")
print("\nListening for responses (2 seconds)...")
responses = []
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg:
responses.append(msg)
print(f" → Response from 0x{msg.arbitration_id:02X}: {msg.data.hex()}")
if not responses:
print("✗ NO RESPONSES RECEIVED")
else:
print(f"\n✓ Received {len(responses)} response(s)")
# Test 3: Listen for any spontaneous traffic
print("\n" + "=" * 60)
print("Test 3: Listen for Any CAN Traffic")
print("=" * 60)
print("Listening for 5 seconds...")
print("(This will catch any background CAN traffic)")
print()
start_time = time.time()
traffic_count = 0
while time.time() - start_time < 5.0:
msg = bus.recv(timeout=0.1)
if msg:
traffic_count += 1
print(f" [{time.time() - start_time:.2f}s] ID=0x{msg.arbitration_id:03X}: {msg.data.hex()}")
if traffic_count == 0:
print("✗ No CAN traffic detected at all")
print("\nThis suggests:")
print(" - SLCAN adapter may not be working")
print(" - No devices on the CAN bus are active")
print(" - Wrong port specified")
else:
print(f"\n✓ Detected {traffic_count} CAN messages")
# Cleanup
print("\n" + "=" * 60)
print("Cleanup")
print("=" * 60)
print("Sending disable command...")
disable_msg = can.Message(
arbitration_id=0x01,
data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD],
is_extended_id=False
)
bus.send(disable_msg)
time.sleep(0.1)
bus.shutdown()
print("✓ Disconnected from CAN bus")
except Exception as e:
print(f"\n✗ Error: {e}")
import traceback
traceback.print_exc()
return False
print("\n" + "=" * 60)
print("Debug Complete")
print("=" * 60)
return True
if __name__ == "__main__":
if len(sys.argv) > 1:
port = sys.argv[1]
else:
port = "/dev/cu.usbmodem2101"
test_can_communication(port)
+20 -233
View File
@@ -34,51 +34,7 @@ Each arm has the following motor configuration based on the [OpenArm setup guide
For dual arm setups, the left arm uses IDs 0x09-0x10 for joints 1-8 with the same motor types.
## Quick Start (macOS)
If you're on macOS, here's the fastest way to get started:
```bash
# 1. Install LeRobot with OpenArms dependencies
pip install -e ".[openarms]"
# 2. Find your USB-CAN adapter
ls /dev/cu.usbmodem*
# 3. Test communication
python3 -c "
import can
bus = can.interface.Bus(channel='/dev/cu.usbmodem00000000050C1', interface='slcan', bitrate=1000000)
print('✓ CAN interface connected')
bus.shutdown()
"
```
Then use this configuration:
```python
from lerobot.robots.openarms import OpenArmsFollower
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
config = OpenArmsFollowerConfig(
port="/dev/cu.usbmodem00000000050C1", # Your adapter
can_interface="auto", # Auto-detects slcan for /dev/* ports
is_dual_arm=True,
)
robot = OpenArmsFollower(config)
robot.connect()
```
## Prerequisites
### Software Requirements
**Linux:**
- Ubuntu 22.04/24.04 (or any Linux with SocketCAN)
- Python 3.8+
- `can-utils` and `iproute2` packages
- LeRobot with OpenArms dependencies
## Quick Start
```bash
# Install system dependencies
@@ -88,26 +44,6 @@ sudo apt install can-utils iproute2
pip install -e ".[openarms]"
```
**macOS:**
- macOS 12+ (Monterey or later)
- Python 3.8+
- LeRobot with OpenArms dependencies
```bash
# Install LeRobot with OpenArms support (includes python-can)
pip install -e ".[openarms]"
```
The `openarms` extra installs:
- `python-can>=4.2.0` - CAN bus communication library (supports SocketCAN on Linux and SLCAN on macOS)
:::tip
If you've already installed LeRobot and want to add OpenArms support, run:
```bash
pip install -e ".[openarms]"
```
:::
## Setup Guide
### Step 1: Motor ID Configuration
@@ -117,10 +53,9 @@ pip install -e ".[openarms]"
Refer to the [OpenArm Motor ID Configuration Guide](https://docs.openarm.dev/software/setup/motor-id) for detailed instructions using the Damiao Debugging Tools on Windows.
Key points:
- Each motor needs a unique **Sender CAN ID** (0x01-0x08 for first arm)
- Each motor needs a unique **Receiver/Master ID** (0x11-0x18 for first arm)
- Each motor needs a unique **Sender CAN ID** (0x01-0x08)
- Each motor needs a unique **Receiver/Master ID** (0x11-0x18)
- Use the Damiao Debugging Tools to set these IDs
- For dual arm setups, use 0x09-0x10 for the second arm
### Step 2: Setup CAN Interface
@@ -132,95 +67,39 @@ Configure your CAN interface as described in the [OpenArm CAN Setup Guide](https
# Find your CAN interface
ip link show
# Setup CAN 2.0 at 1 Mbps (standard)
# Configure can0, 1, 2, 3
sudo ip link set can0 down
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up
sudo ip link set can1 down
sudo ip link set can1 type can bitrate 1000000
sudo ip link set can1 up
sudo ip link set can2 down
sudo ip link set can2 type can bitrate 1000000
sudo ip link set can2 up
sudo ip link set can3 down
sudo ip link set can3 type can bitrate 1000000
sudo ip link set can3 up
# Verify configuration
ip link show can0
```
#### macOS
or run:
macOS doesn't have native SocketCAN support.
`examples/openarms/setup_can.sh`
**Use SLCAN (Serial Line CAN)**
### Testing canbus and motor connection
For USB-CAN adapters that support SLCAN protocol (like CANable):
```bash
# Install python-can if not already installed
pip install python-can
# The adapter will appear as a serial device
ls /dev/cu.usbmodem*
# Use with python-can slcan interface
# Example: /dev/cu.usbmodem14201
```
In your code, specify the slcan interface:
```python
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
config = OpenArmsFollowerConfig(
port="/dev/cu.usbmodem14201", # Your USB-CAN adapter
can_interface="slcan", # Will auto-detect if set to "auto"
)
```
### Step 3: Test Motor Communication
**On Linux:**
Test basic communication as described in the [OpenArm Motor Test Guide](https://docs.openarm.dev/software/setup/configure-test):
```bash
# Terminal 1: Monitor CAN traffic
candump can0
# Terminal 2: Enable motor 1
cansend can0 001#FFFFFFFFFFFFFFFC
# Expected response on Terminal 1:
# can0 011 [8] XX XX XX XX XX XX XX XX
# Disable motor 1
cansend can0 001#FFFFFFFFFFFFFFFD
```
**On macOS:**
Testing is done differently since you'll use serial-based adapters:
```bash
# Find your USB-CAN adapter
ls /dev/cu.usbmodem*
# Example output: /dev/cu.usbmodem00000000050C1
# Test with Python directly (can-utils don't work on macOS)
python3 -c "
import can
bus = can.interface.Bus(channel='/dev/cu.usbmodem00000000050C1', interface='slcan', bitrate=1000000)
msg = can.Message(arbitration_id=0x01, data=[0xFF]*7+[0xFC])
bus.send(msg)
response = bus.recv(timeout=1.0)
if response:
print(f'✓ Motor responded: ID 0x{response.arbitration_id:02X}')
else:
print('✗ No response')
bus.shutdown()
"
```
Please run this script to check if all motors can be found and to find your can-fd speed: `python examples/openarms/debug_can_communication.py`
## Usage
### Basic Setup
**On Linux:**
```python
from lerobot.robots.openarms import OpenArmsFollower
@@ -238,26 +117,6 @@ robot = OpenArmsFollower(config)
robot.connect()
```
**On macOS:**
```python
from lerobot.robots.openarms import OpenArmsFollower
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
# Find your USB-CAN adapter first
# ls /dev/cu.usbmodem*
config = OpenArmsFollowerConfig(
port="/dev/cu.usbmodem14201", # Your adapter's serial port
can_interface="slcan", # Or "auto" for auto-detection
id="openarms_dual",
is_dual_arm=True,
)
robot = OpenArmsFollower(config)
robot.connect()
```
### Calibration
On first use, you'll need to calibrate the robot:
@@ -424,53 +283,6 @@ follower_config = OpenArmsFollowerConfig(
## Troubleshooting
### Motors Not Responding
**Linux:**
1. **Check power supply**: 24V with sufficient current
2. **Verify CAN interface**: `ip link show can0` should show "UP"
3. **Test with cansend**: Follow [motor test guide](https://docs.openarm.dev/software/setup/configure-test)
4. **Check motor IDs**: Use Damiao Debugging Tools to verify IDs
5. **Check termination**: 120Ω resistor should be enabled on CAN interface
**macOS:**
1. **Check power supply**: 24V with sufficient current
2. **Find adapter**: `ls /dev/cu.usbmodem*` should show your device
3. **Test connection**: Use Python script above to test communication
4. **Check motor IDs**: Use Damiao Debugging Tools on Windows
5. **Verify drivers**: Ensure USB-CAN adapter drivers are installed
6. **Try different baudrate**: Some adapters default to different rates
### macOS-Specific Issues
**"No such interface" error:**
```python
# Try auto-detection
config.can_interface = "auto"
# Or explicitly list available interfaces
import can
print(can.detect_available_configs())
```
**Permission denied on `/dev/cu.*`:**
```bash
# Add user to dialout group (if applicable)
sudo dscl . -append /Groups/_dialout GroupMembership $USER
# Or run with sudo (not recommended)
sudo python your_script.py
```
**Adapter not showing up:**
```bash
# Check USB devices
system_profiler SPUSBDataType
# Reinstall python-can
pip install --upgrade --force-reinstall python-can
```
### Motor Shaking/Unstable
- **Lower control gains**: Reduce `position_kp` and `position_kd`
@@ -480,7 +292,6 @@ pip install --upgrade --force-reinstall python-can
### CAN Bus Errors
**Linux:**
```bash
# Check for errors
ip -s link show can0
@@ -490,30 +301,6 @@ sudo ip link set can0 down
sudo ip link set can0 up
```
**macOS:**
```bash
# Reconnect USB adapter
# Unplug and replug the USB cable
# Restart Python script
# The slcan interface auto-reconnects
```
### Position Drift
- **Re-calibrate**: Run calibration procedure
- **Set zero position**: Use `robot.bus.set_zero_position()` with arm in known position
- **Check temperature**: Motors may drift when hot
## Technical Details
### Position Units
All positions are in **degrees**:
- Motor internal representation: radians
- User API: degrees
- Automatic conversion handled by `DamiaoMotorsBus`
### Control Mode
OpenArms uses **MIT control mode** which allows simultaneous control of:
@@ -0,0 +1,416 @@
#!/usr/bin/env python3
"""
Comprehensive debug script for OpenArms CAN FD communication.
Tests all 4 CAN interfaces with CAN FD support.
"""
import can
import time
import sys
import subprocess
def check_can_interface(port):
"""Check if CAN interface is UP and configured."""
try:
result = subprocess.run(['ip', 'link', 'show', port],
capture_output=True, text=True)
if result.returncode != 0:
return False, "Interface not found", None
output = result.stdout
if 'UP' not in output:
return False, "Interface is DOWN", None
# Check if CAN FD is enabled
is_fd = 'fd on' in output.lower() or 'canfd' in output.lower()
return True, "Interface is UP", is_fd
except FileNotFoundError:
return None, "Cannot check (ip command not found)", None
def test_motor_on_interface(bus, motor_id, timeout=2.0, use_fd=False):
"""
Test a single motor and return all responses.
Returns:
list of (arbitration_id, data) tuples for all responses received
"""
# Send enable command
enable_msg = can.Message(
arbitration_id=motor_id,
data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC],
is_extended_id=False,
is_fd=use_fd
)
try:
bus.send(enable_msg)
except Exception as e:
return None, f"Send error: {e}"
# Listen for responses
responses = []
start_time = time.time()
while time.time() - start_time < timeout:
msg = bus.recv(timeout=0.1)
if msg:
responses.append((msg.arbitration_id, msg.data, msg.is_fd if hasattr(msg, 'is_fd') else False))
# Send disable command
disable_msg = can.Message(
arbitration_id=motor_id,
data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD],
is_extended_id=False,
is_fd=use_fd
)
try:
bus.send(disable_msg)
except:
pass
return responses, None
def test_interface(port, interface_type="socketcan", use_can_fd=True):
"""Test all 8 motors on a single CAN interface."""
results = {
'interface': port,
'status': None,
'is_fd': use_can_fd,
'motors': {}
}
# Check interface status
status_ok, status_msg, interface_has_fd = check_can_interface(port)
if interface_has_fd is not None:
results['interface_fd_enabled'] = interface_has_fd
if use_can_fd and not interface_has_fd:
status_msg += " (CAN FD NOT enabled on interface!)"
elif interface_has_fd:
status_msg += " (CAN FD enabled)"
results['status'] = status_msg
if status_ok is False:
return results
# Try to connect
try:
if use_can_fd:
print(f" Connecting to {port} with CAN FD (1 Mbps / 5 Mbps)...")
bus = can.interface.Bus(
channel=port,
interface=interface_type,
bitrate=1000000,
data_bitrate=5000000,
fd=True
)
else:
print(f" Connecting to {port} with CAN 2.0 (1 Mbps)...")
bus = can.interface.Bus(
channel=port,
interface=interface_type,
bitrate=1000000
)
except Exception as e:
results['status'] = f"Connection failed: {e}"
return results
try:
# Clear any pending messages
while bus.recv(timeout=0.01):
pass
# Test each motor (0x01 to 0x08)
for motor_id in range(0x01, 0x09):
responses, error = test_motor_on_interface(bus, motor_id, timeout=1.0, use_fd=use_can_fd)
if error:
results['motors'][motor_id] = {'error': error}
elif responses:
results['motors'][motor_id] = {
'found': True,
'responses': responses
}
else:
results['motors'][motor_id] = {
'found': False,
'responses': []
}
time.sleep(0.05) # Small delay between motors
finally:
bus.shutdown()
return results
def print_results(all_results):
"""Print formatted results for all interfaces."""
print("SUMMARY - Motors Found on Each Interface")
motor_names = {
0x01: "joint_1 (Shoulder pan)",
0x02: "joint_2 (Shoulder lift)",
0x03: "joint_3 (Shoulder rotation)",
0x04: "joint_4 (Elbow flex)",
0x05: "joint_5 (Wrist roll)",
0x06: "joint_6 (Wrist pitch)",
0x07: "joint_7 (Wrist rotation)",
0x08: "gripper",
}
total_found = 0
for result in all_results:
interface = result['interface']
status = result['status']
print(f"{interface}: {status}")
if result.get('is_fd'):
print(f" Mode: CAN FD")
else:
print(f" Mode: CAN 2.0")
if 'Connection failed' in status or 'DOWN' in status:
print(f" ⚠ Cannot test {interface}")
continue
motors_found = 0
for motor_id in range(0x01, 0x09):
motor_data = result['motors'].get(motor_id, {})
motor_name = motor_names.get(motor_id, "Unknown")
if motor_data.get('error'):
print(f" Motor 0x{motor_id:02X} ({motor_name}): ✗ {motor_data['error']}")
elif motor_data.get('found'):
motors_found += 1
total_found += 1
responses = motor_data['responses']
print(f" Motor 0x{motor_id:02X} ({motor_name}): ✓ FOUND")
for resp_id, data, is_fd in responses:
data_hex = data.hex()
fd_flag = " [FD]" if is_fd else " [2.0]"
print(f" → Response from 0x{resp_id:02X}{fd_flag}: {data_hex}")
else:
print(f" Motor 0x{motor_id:02X} ({motor_name}): ✗ No response")
print(f"\n Summary: {motors_found}/8 motors found on {interface}")
# Overall summary
print("OVERALL SUMMARY")
print(f"Total motors found across all interfaces: {total_found}")
# Analyze configuration
print("DIAGNOSIS")
for result in all_results:
interface = result['interface']
motors_found = sum(1 for m in result['motors'].values() if m.get('found'))
if motors_found == 0:
print(f"\n{interface}: NO MOTORS FOUND")
print(" Possible issues:")
print(" 1. CAN FD mode mismatch (interface vs motor configuration)")
print(" 2. Missing 120Ω termination resistors at BOTH cable ends")
print(" 3. Motor timeout parameter set incorrectly (should NOT be 0)")
print(" 4. CANH/CANL wiring issue")
print(" 5. Cable too long (>40m for CAN FD at 5Mbps)")
# Check FD mismatch
if result.get('is_fd') and not result.get('interface_fd_enabled'):
print(" ⚠️ CRITICAL: Trying CAN FD but interface NOT configured for FD!")
print(f" Fix: sudo ip link set {interface} type can bitrate 1000000 dbitrate 5000000 fd on")
elif motors_found < 8:
print(f"\n{interface}: Only {motors_found}/8 motors responding")
print(" Check power and connections for missing motors")
else:
print(f"\n{interface}: All 8 motors responding correctly!")
# Check for unexpected response IDs
print("RESPONSE ID ANALYSIS")
for result in all_results:
interface = result['interface']
unexpected = []
for motor_id, motor_data in result['motors'].items():
if motor_data.get('found'):
expected_id = motor_id + 0x10
actual_ids = [resp[0] for resp in motor_data['responses']]
if expected_id not in actual_ids:
unexpected.append((motor_id, actual_ids))
if unexpected:
print(f"\n{interface}: Unexpected response IDs detected")
for motor_id, actual_ids in unexpected:
expected_id = motor_id + 0x10
print(f" Motor 0x{motor_id:02X}: Expected 0x{expected_id:02X}, "
f"got {[f'0x{id:02X}' for id in actual_ids]}")
print(" → Motor Master IDs need reconfiguration")
else:
motors_found = sum(1 for m in result['motors'].values() if m.get('found'))
if motors_found > 0:
print(f"\n{interface}: All responding motors use correct IDs")
def test_communication_speed(interface, motor_id, num_iterations=100):
"""
Test communication speed with a motor.
Returns:
tuple: (hz, avg_latency_ms) or (None, None) if test failed
"""
try:
# Connect to interface
bus = can.interface.Bus(
channel=interface,
interface="socketcan",
bitrate=1000000,
data_bitrate=5000000,
fd=True
)
# Send refresh commands and measure round-trip time
latencies = []
successful = 0
for _ in range(num_iterations):
start = time.perf_counter()
# Send enable command (lightweight operation)
enable_msg = can.Message(
arbitration_id=motor_id,
data=[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC],
is_extended_id=False,
is_fd=True
)
bus.send(enable_msg)
# Wait for response
msg = bus.recv(timeout=0.1)
if msg:
latency = (time.perf_counter() - start) * 1000 # Convert to ms
latencies.append(latency)
successful += 1
bus.shutdown()
if successful > 0:
avg_latency = sum(latencies) / len(latencies)
hz = 1000.0 / avg_latency if avg_latency > 0 else 0
return hz, avg_latency
return None, None
except Exception as e:
print(f" Speed test error: {e}")
return None, None
def main():
"""Main function to test all CAN interfaces with CAN FD."""
print("\nThis will test all 4 CAN interfaces (can0-can3) with CAN FD")
print("Testing motors 0x01-0x08 on each interface")
print()
print("Make sure:")
print(" ✓ Motors are powered (24V)")
print(" ✓ CAN interfaces configured with FD mode:")
print(" ./examples/openarms/setup_can.sh")
print(" ✓ Motor 'timeout' parameter NOT set to 0 (use Damiao tools)")
print(" ✓ CAN wiring includes 120Ω termination at BOTH ends")
print()
input("Press ENTER to start testing...")
# Test all 4 interfaces with CAN FD
all_results = []
for i in range(4):
interface = f"can{i}"
print(f"Testing {interface}...")
result = test_interface(interface, use_can_fd=True)
all_results.append(result)
# Quick status
if 'Connection failed' in result['status'] or 'DOWN' in result['status']:
print(f"{interface}: {result['status']}")
else:
motors_found = sum(1 for m in result['motors'].values() if m.get('found'))
print(f" {interface}: {motors_found}/8 motors found")
time.sleep(0.2)
# Print detailed results
print_results(all_results)
print("Testing Complete!")
all_found = sum(sum(1 for m in r['motors'].values() if m.get('found')) for r in all_results)
if all_found == 0:
print("\n⚠️ CRITICAL: No motors found on any interface!")
print("\nTop issues to check:")
print(" 1. Motor 'timeout' parameter (use Damiao tools to set > 0)")
print(" 2. CAN FD not enabled (run ./examples/openarms/setup_can.sh)")
print(" 3. Missing termination resistors")
print("\nTry:")
print(" a) Check motor parameters with Damiao Debugging Tools")
print(" b) Verify CAN FD is enabled: ip -d link show can0 | grep fd")
print(" c) Run setup script: ./examples/openarms/setup_can.sh")
else:
# Run speed test on interfaces with motors
print("COMMUNICATION SPEED TEST")
print("\nTesting maximum communication frequency...")
for result in all_results:
interface = result['interface']
# Find first responding motor
responding_motor = None
for motor_id, motor_data in result['motors'].items():
if motor_data.get('found'):
responding_motor = motor_id
break
if responding_motor:
print(f"\n{interface}: Testing with motor 0x{responding_motor:02X}...")
hz, latency = test_communication_speed(interface, responding_motor, num_iterations=100)
if hz:
print(f" ✓ Max frequency: {hz:.1f} Hz")
print(f" ✓ Avg latency: {latency:.2f} ms")
print(f" ✓ Commands per second: ~{int(hz)}")
else:
print(f" ✗ Speed test failed")
else:
print(f"\n{interface}: No motors found, skipping speed test")
print()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\nTesting interrupted by user.")
sys.exit(1)
except Exception as e:
print(f"\nUnexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
+73
View File
@@ -0,0 +1,73 @@
#!/bin/bash
# Setup all OpenArms CAN interfaces with CAN FD
set -e
echo "=========================================="
echo "OpenArms CAN FD Interface Setup"
echo "=========================================="
echo ""
echo "Mode: CAN FD"
echo " - Nominal bitrate: 1 Mbps"
echo " - Data bitrate: 5 Mbps"
echo ""
echo "Configuring interfaces can0, can1, can2, can3..."
echo ""
# Configure each CAN interface with CAN FD
for i in 0 1 2 3; do
interface="can$i"
# Check if interface exists
if ! ip link show "$interface" &> /dev/null; then
echo "$interface: Not found, skipping"
continue
fi
# Bring down interface
sudo ip link set "$interface" down 2>/dev/null
# Configure CAN FD mode
sudo ip link set "$interface" type can \
bitrate 1000000 \
dbitrate 5000000 \
fd on
# Bring up interface
sudo ip link set "$interface" up
# Verify configuration
if ip link show "$interface" | grep -q "UP"; then
echo "$interface: Configured and UP"
else
echo "$interface: Failed to bring UP"
fi
done
echo ""
echo "=========================================="
echo "Verification"
echo "=========================================="
echo ""
# Show detailed status for each interface
for i in 0 1 2 3; do
interface="can$i"
if ip link show "$interface" &> /dev/null; then
echo "$interface:"
# Show key parameters
ip -d link show "$interface" | grep -E "can|state|bitrate|dbitrate" | head -3
echo ""
fi
done
echo "=========================================="
echo "Setup Complete!"
echo "=========================================="
echo ""
echo "All interfaces configured for CAN FD mode"
echo ""
echo "Next steps:"
echo " 1. Test motors: python debug_can_communication.py"
echo " 2. Run teleoperation: python examples/openarms/teleop.py"
echo ""
+148
View File
@@ -0,0 +1,148 @@
"""
OpenArms Teleoperation Example - Full Dual Arms
This script demonstrates teleoperation of OpenArms follower robot using an OpenArms leader arm.
It first calibrates both devices, then enters a teleoperation loop for both arms.
"""
import time
from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader
from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig
follower_config = OpenArmsFollowerConfig(
port_left="can0", # CAN interface for follower left arm
port_right="can1", # CAN interface for follower right arm
can_interface="socketcan", # Linux SocketCAN
id="openarms_follower",
disable_torque_on_disconnect=True,
max_relative_target=5.0, # Safety limit
)
leader_config = OpenArmsLeaderConfig(
port_left="can2", # CAN interface for leader left arm
port_right="can3", # CAN interface for leader right arm
can_interface="socketcan", # Linux SocketCAN
id="openarms_leader",
manual_control=True, # Enable manual control (torque disabled)
)
print("=" * 60)
print("OpenArms Teleoperation - Full Dual Arms")
print("=" * 60)
# Initialize devices
print("\n[1/4] Initializing devices...")
follower = OpenArmsFollower(follower_config)
leader = OpenArmsLeader(leader_config)
# Connect and calibrate follower
print("\n[2/4] Connecting and calibrating follower robot...")
print("Note: If you have existing calibration, just press ENTER to use it.")
follower.connect(calibrate=True)
# Connect and calibrate leader
print("\n[3/4] Connecting and calibrating leader arm...")
print("Note: The leader arm will have torque disabled for manual control.")
leader.connect(calibrate=True)
# Wait for user to be ready
print("\n[4/4] Ready for teleoperation!")
print("\nBoth arms will be controlled (16 motors total):")
print(" RIGHT ARM: joints 1-7 + gripper")
print(" LEFT ARM: joints 1-7 + gripper")
print("\nPress ENTER to start teleoperation...")
input()
print("\nTeleoperation started! Move both leader arms.")
print("Press Ctrl+C to stop.\n")
# All joints for both arms (16 motors total)
all_joints = [
# Right arm
"right_joint_1",
"right_joint_2",
"right_joint_3",
"right_joint_4",
"right_joint_5",
"right_joint_6",
"right_joint_7",
"right_gripper",
# Left arm
"left_joint_1",
"left_joint_2",
"left_joint_3",
"left_joint_4",
"left_joint_5",
"left_joint_6",
"left_joint_7",
"left_gripper",
]
# Performance monitoring
loop_times = []
start_time = time.perf_counter()
last_print_time = start_time
try:
while True:
loop_start = time.perf_counter()
# Get action from leader
leader_action = leader.get_action()
# Filter to only position data for all joints (both arms)
joint_action = {}
for joint in all_joints:
pos_key = f"{joint}.pos"
if pos_key in leader_action:
joint_action[pos_key] = leader_action[pos_key]
# Send action to follower (both arms)
if joint_action:
follower.send_action(joint_action)
# Measure loop time
loop_end = time.perf_counter()
loop_time = loop_end - loop_start
loop_times.append(loop_time)
# Print stats every 2 seconds
if loop_end - last_print_time >= 2.0:
if loop_times:
avg_time = sum(loop_times) / len(loop_times)
current_hz = 1.0 / avg_time if avg_time > 0 else 0
min_time = min(loop_times)
max_time = max(loop_times)
max_hz = 1.0 / min_time if min_time > 0 else 0
min_hz = 1.0 / max_time if max_time > 0 else 0
print(f"[Hz Stats] Avg: {current_hz:.1f} Hz | "
f"Range: {min_hz:.1f}-{max_hz:.1f} Hz | "
f"Avg loop time: {avg_time*1000:.1f} ms")
# Reset for next measurement window
loop_times = []
last_print_time = loop_end
except KeyboardInterrupt:
print("\n\nStopping teleoperation...")
finally:
# Disconnect devices
print("Disconnecting devices...")
try:
follower.disconnect()
except Exception as e:
print(f"Error disconnecting follower: {e}")
try:
leader.disconnect()
except Exception as e:
print(f"Error disconnecting leader: {e}")
print("Done!")
+62 -12
View File
@@ -16,6 +16,7 @@
import logging
import time
from contextlib import contextmanager
from copy import deepcopy
from functools import cached_property
from typing import Dict, List, Optional, Tuple, Union
@@ -74,6 +75,9 @@ class DamiaoMotorsBus(MotorsBusBase):
motors: dict[str, Motor],
calibration: dict[str, MotorCalibration] | None = None,
can_interface: str = "auto",
use_can_fd: bool = True,
bitrate: int = 1000000,
data_bitrate: int | None = 5000000,
):
"""
Initialize the Damiao motors bus.
@@ -83,10 +87,16 @@ class DamiaoMotorsBus(MotorsBusBase):
motors: Dictionary mapping motor names to Motor objects
calibration: Optional calibration data
can_interface: CAN interface type - "auto" (default), "socketcan" (Linux), or "slcan" (macOS/serial)
use_can_fd: Whether to use CAN FD mode (default: True for OpenArms)
bitrate: Nominal bitrate in bps (default: 1000000 = 1 Mbps)
data_bitrate: Data bitrate for CAN FD in bps (default: 5000000 = 5 Mbps), ignored if use_can_fd is False
"""
super().__init__(port, motors, calibration)
self.port = port
self.can_interface = can_interface
self.use_can_fd = use_can_fd
self.bitrate = bitrate
self.data_bitrate = data_bitrate
self.canbus = None
self._is_connected = False
@@ -138,26 +148,48 @@ class DamiaoMotorsBus(MotorsBusBase):
# Connect to CAN bus
if self.can_interface == "socketcan":
# Linux SocketCAN
self.canbus = can.interface.Bus(
channel=self.port,
interface="socketcan",
bitrate=self.default_baudrate
)
# Linux SocketCAN with CAN FD support
if self.use_can_fd and self.data_bitrate is not None:
self.canbus = can.interface.Bus(
channel=self.port,
interface="socketcan",
bitrate=self.bitrate,
data_bitrate=self.data_bitrate,
fd=True
)
logger.info(f"Connected to {self.port} with CAN FD (bitrate={self.bitrate}, data_bitrate={self.data_bitrate})")
else:
self.canbus = can.interface.Bus(
channel=self.port,
interface="socketcan",
bitrate=self.bitrate
)
logger.info(f"Connected to {self.port} with CAN 2.0 (bitrate={self.bitrate})")
elif self.can_interface == "slcan":
# Serial Line CAN (macOS, Windows, or USB adapters)
# Note: SLCAN typically doesn't support CAN FD
self.canbus = can.interface.Bus(
channel=self.port,
interface="slcan",
bitrate=self.default_baudrate
bitrate=self.bitrate
)
logger.info(f"Connected to {self.port} with SLCAN (bitrate={self.bitrate})")
else:
# Generic interface (vector, pcan, etc.)
self.canbus = can.interface.Bus(
channel=self.port,
interface=self.can_interface,
bitrate=self.default_baudrate
)
if self.use_can_fd and self.data_bitrate is not None:
self.canbus = can.interface.Bus(
channel=self.port,
interface=self.can_interface,
bitrate=self.bitrate,
data_bitrate=self.data_bitrate,
fd=True
)
else:
self.canbus = can.interface.Bus(
channel=self.port,
interface=self.can_interface,
bitrate=self.bitrate
)
self._is_connected = True
@@ -251,6 +283,24 @@ class DamiaoMotorsBus(MotorsBusBase):
raise e
time.sleep(0.01)
@contextmanager
def torque_disabled(self, motors: str | list[str] | None = None):
"""
Context manager that guarantees torque is re-enabled.
This helper is useful to temporarily disable torque when configuring motors.
Examples:
>>> with bus.torque_disabled():
... # Safe operations here with torque disabled
... pass
"""
self.disable_torque(motors)
try:
yield
finally:
self.enable_torque(motors)
def set_zero_position(self, motors: str | list[str] | None = None) -> None:
"""Set current position as zero for selected motors."""
motors = self._get_motors_list(motors)
@@ -15,7 +15,7 @@
# limitations under the License.
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from typing import Dict, Optional
from lerobot.cameras import CameraConfig
from lerobot.motors.damiao.tables import MotorType
@@ -28,13 +28,21 @@ from ..config import RobotConfig
class OpenArmsFollowerConfig(RobotConfig):
"""Configuration for the OpenArms follower robot with Damiao motors."""
# CAN interface to connect to
# CAN interfaces - one per arm
# Right arm CAN interface (e.g., "can0")
# Left arm CAN interface (e.g., "can1")
# Linux: "can0", "can1", etc.
# macOS: "/dev/cu.usbmodem*" (serial device)
port: str = "can0"
port_right: str = "can0" # CAN interface for right arm
port_left: str = "can1" # CAN interface for left arm
# CAN interface type: "socketcan" (Linux), "slcan" (macOS/serial), or "auto" (auto-detect)
can_interface: str = "auto"
can_interface: str = "socketcan"
# CAN FD settings (OpenArms uses CAN FD by default)
use_can_fd: bool = True
can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps)
can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps)
# Whether to disable torque when disconnecting
disable_torque_on_disconnect: bool = True
@@ -64,9 +72,10 @@ class OpenArmsFollowerConfig(RobotConfig):
"gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
})
# MIT control parameters for position control
position_kp: float = 10.0 # Position gain
position_kd: float = 0.5 # Velocity damping
# MIT control parameters for position control (per motor)
# Values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0])
position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
# Calibration parameters
calibration_mode: str = "manual" # "manual" or "auto"
+228 -112
View File
@@ -48,31 +48,44 @@ class OpenArmsFollower(Robot):
super().__init__(config)
self.config = config
norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors
motors = {}
norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors
# Right arm
# Right arm motors (on port_right)
# Each arm uses the same CAN IDs since they're on separate buses
motors_right = {}
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
prefixed_name = f"right_{motor_name}"
motor = Motor(send_id, motor_type_str, norm_mode_body)
motor.recv_id = recv_id
motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_"))
motors[prefixed_name] = motor
motors_right[motor_name] = motor
# Left arm (offset IDs by 8)
# Left arm motors (on port_left, same IDs as right since separate bus)
motors_left = {}
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
prefixed_name = f"left_{motor_name}"
motor = Motor(send_id + 0x08, motor_type_str, norm_mode_body)
motor.recv_id = recv_id + 0x08
motor = Motor(send_id, motor_type_str, norm_mode_body)
motor.recv_id = recv_id
motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_"))
motors[prefixed_name] = motor
motors_left[motor_name] = motor
# Initialize the Damiao motors bus
self.bus = DamiaoMotorsBus(
port=self.config.port,
motors=motors,
calibration=self.calibration,
# Initialize separate Damiao motors buses (one per arm) with CAN FD support
self.bus_right = DamiaoMotorsBus(
port=self.config.port_right,
motors=motors_right,
calibration={k.replace("right_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("right_")},
can_interface=self.config.can_interface,
use_can_fd=self.config.use_can_fd,
bitrate=self.config.can_bitrate,
data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
)
self.bus_left = DamiaoMotorsBus(
port=self.config.port_left,
motors=motors_left,
calibration={k.replace("left_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("left_")},
can_interface=self.config.can_interface,
use_can_fd=self.config.use_can_fd,
bitrate=self.config.can_bitrate,
data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
)
# Initialize cameras
@@ -92,10 +105,16 @@ class OpenArmsFollower(Robot):
def _motors_ft(self) -> Dict[str, type]:
"""Motor features for observation and action spaces."""
features = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float
features[f"{motor}.torque"] = float
# Right arm motors
for motor in self.bus_right.motors:
features[f"right_{motor}.pos"] = float
features[f"right_{motor}.vel"] = float
features[f"right_{motor}.torque"] = float
# Left arm motors
for motor in self.bus_left.motors:
features[f"left_{motor}.pos"] = float
features[f"left_{motor}.vel"] = float
features[f"left_{motor}.torque"] = float
return features
@property
@@ -119,20 +138,25 @@ class OpenArmsFollower(Robot):
@property
def is_connected(self) -> bool:
"""Check if robot is connected."""
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
return (self.bus_right.is_connected and
self.bus_left.is_connected and
all(cam.is_connected for cam in self.cameras.values()))
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the robot and optionally calibrate.
We assume that at connection time, the arm is in a safe rest position,
We assume that at connection time, the arms are in a safe rest position,
and torque can be safely disabled to run calibration if needed.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
self.bus.connect()
# Connect to both CAN buses
logger.info(f"Connecting right arm on {self.config.port_right}...")
self.bus_right.connect()
logger.info(f"Connecting left arm on {self.config.port_left}...")
self.bus_left.connect()
# Run calibration if needed
if not self.is_calibrated and calibrate:
@@ -151,14 +175,15 @@ class OpenArmsFollower(Robot):
# Optionally set zero position
if self.config.zero_position_on_connect:
logger.info("Setting current position as zero...")
self.bus.set_zero_position()
self.bus_right.set_zero_position()
self.bus_left.set_zero_position()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
"""Check if robot is calibrated."""
return self.bus.is_calibrated
return self.bus_right.is_calibrated and self.bus_left.is_calibrated
def calibrate(self) -> None:
"""
@@ -166,7 +191,7 @@ class OpenArmsFollower(Robot):
The calibration procedure:
1. Disable torque
2. Ask user to position arm in hanging position with gripper closed
2. Ask user to position arms in hanging position with grippers closed
3. Set this as zero position
4. Record range of motion for each joint
5. Save calibration
@@ -179,18 +204,32 @@ class OpenArmsFollower(Robot):
)
if user_input.strip().lower() != "c":
logger.info(f"Using existing calibration for {self.id}")
self.bus.write_calibration(self.calibration)
# Split calibration for each bus
cal_right = {k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")}
cal_left = {k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")}
self.bus_right.write_calibration(cal_right)
self.bus_left.write_calibration(cal_left)
return
logger.info(f"\nRunning calibration for {self}")
# Calibrate each arm separately
self._calibrate_arm("right", self.bus_right)
self._calibrate_arm("left", self.bus_left)
print(f"\nCalibration complete and saved to {self.calibration_fpath}")
def _calibrate_arm(self, arm_name: str, bus: DamiaoMotorsBus) -> None:
"""Calibrate a single arm."""
logger.info(f"\n=== Calibrating {arm_name.upper()} arm ===")
# Disable torque for manual positioning
self.bus.disable_torque()
bus.disable_torque()
time.sleep(0.1)
# Step 1: Set zero position
input(
"\nCalibration Step 1: Zero Position\n"
f"\nCalibration: Zero Position ({arm_name.upper()} arm)\n"
"Position the arm in the following configuration:\n"
" - Arm hanging straight down\n"
" - Gripper closed\n"
@@ -198,60 +237,51 @@ class OpenArmsFollower(Robot):
)
# Set current position as zero for all motors
self.bus.set_zero_position()
logger.info("Zero position set.")
bus.set_zero_position()
logger.info(f"{arm_name.capitalize()} arm zero position set.")
# Step 2: Record range of motion
# Automatically set range to -90° to +90° for all joints
print(
"\nCalibration Step 2: Range of Motion\n"
"Move each joint through its full range of motion.\n"
"The system will record min/max positions.\n"
"Press ENTER when done..."
f"\nAutomatically setting range: -90° to +90° for all joints"
)
# Record ranges
range_mins, range_maxes = self.bus.record_ranges_of_motion()
# Create calibration data (ranges are already in degrees)
self.calibration = {}
for motor_name, motor in self.bus.motors.items():
self.calibration[motor_name] = MotorCalibration(
# Create calibration data with fixed ranges
if self.calibration is None:
self.calibration = {}
for motor_name, motor in bus.motors.items():
# Prefix motor name with arm name for storage
prefixed_name = f"{arm_name}_{motor_name}"
# Use -90 to +90 for all joints and gripper (integers required)
self.calibration[prefixed_name] = MotorCalibration(
id=motor.id,
drive_mode=0, # Normal direction
homing_offset=0, # Already set via set_zero_position
range_min=range_mins.get(motor_name, -180.0), # Default -180 degrees
range_max=range_maxes.get(motor_name, 180.0), # Default +180 degrees
range_min=-90, # -90 degrees (integer)
range_max=90, # +90 degrees (integer)
)
logger.info(f" {prefixed_name}: range set to [-90°, +90°]")
# Special handling for gripper range
if "gripper" in self.calibration:
gripper_cal = self.calibration["gripper"]
gripper_range = abs(gripper_cal.range_max - gripper_cal.range_min)
if gripper_range < 5.0: # If gripper wasn't moved much (less than 5 degrees)
# Set default gripper range in degrees
gripper_cal.range_min = 0.0
gripper_cal.range_max = 90.0 # 90 degrees for full gripper motion
# Write calibration to motors and save to file
self.bus.write_calibration(self.calibration)
self._save_calibration()
print(f"\nCalibration complete and saved to {self.calibration_fpath}")
# Write calibration to this arm's motors
cal_for_bus = {k.replace(f"{arm_name}_", ""): v for k, v in self.calibration.items() if k.startswith(f"{arm_name}_")}
bus.write_calibration(cal_for_bus)
# Re-enable torque
self.bus.enable_torque()
bus.enable_torque()
# Save calibration after each arm
self._save_calibration()
def configure(self) -> None:
"""Configure motors with appropriate settings."""
with self.bus.torque_disabled():
# Configure all motors
self.bus.configure_motors()
# Set specific parameters for gripper if present
if "gripper" in self.bus.motors:
# Gripper uses lower gains to avoid damage
# These will be applied during MIT control commands
pass # Parameters are set during control commands
# Configure right arm
with self.bus_right.torque_disabled():
self.bus_right.configure_motors()
# Configure left arm
with self.bus_left.torque_disabled():
self.bus_left.configure_motors()
def setup_motors(self) -> None:
raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.")
@@ -263,16 +293,26 @@ class OpenArmsFollower(Robot):
obs_dict = {}
# Read motor positions, velocities, and torques
# Read motor positions, velocities, and torques from right arm
start = time.perf_counter()
positions = self.bus.sync_read("Present_Position")
velocities = self.bus.sync_read("Present_Velocity")
torques = self.bus.sync_read("Present_Torque")
positions_right = self.bus_right.sync_read("Present_Position")
velocities_right = self.bus_right.sync_read("Present_Velocity")
torques_right = self.bus_right.sync_read("Present_Torque")
for motor in self.bus.motors:
obs_dict[f"{motor}.pos"] = positions.get(motor, 0.0)
obs_dict[f"{motor}.vel"] = velocities.get(motor, 0.0)
obs_dict[f"{motor}.torque"] = torques.get(motor, 0.0)
for motor in self.bus_right.motors:
obs_dict[f"right_{motor}.pos"] = positions_right.get(motor, 0.0)
obs_dict[f"right_{motor}.vel"] = velocities_right.get(motor, 0.0)
obs_dict[f"right_{motor}.torque"] = torques_right.get(motor, 0.0)
# Read motor positions, velocities, and torques from left arm
positions_left = self.bus_left.sync_read("Present_Position")
velocities_left = self.bus_left.sync_read("Present_Velocity")
torques_left = self.bus_left.sync_read("Present_Torque")
for motor in self.bus_left.motors:
obs_dict[f"left_{motor}.pos"] = positions_left.get(motor, 0.0)
obs_dict[f"left_{motor}.vel"] = velocities_left.get(motor, 0.0)
obs_dict[f"left_{motor}.torque"] = torques_left.get(motor, 0.0)
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
@@ -293,7 +333,7 @@ class OpenArmsFollower(Robot):
The action magnitude may be clipped based on safety limits.
Args:
action: Dictionary with motor positions
action: Dictionary with motor positions (e.g., "right_joint_1.pos", "left_joint_2.pos")
Returns:
The action actually sent (potentially clipped)
@@ -301,37 +341,69 @@ class OpenArmsFollower(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Extract motor positions from action
goal_pos = {
key.removesuffix(".pos"): val
for key, val in action.items()
if key.endswith(".pos")
}
# Extract motor positions from action and split by arm
goal_pos_right = {}
goal_pos_left = {}
for key, val in action.items():
if key.endswith(".pos"):
motor_name = key.removesuffix(".pos")
if motor_name.startswith("right_"):
# Remove "right_" prefix for bus access
goal_pos_right[motor_name.removeprefix("right_")] = val
elif motor_name.startswith("left_"):
# Remove "left_" prefix for bus access
goal_pos_left[motor_name.removeprefix("left_")] = val
# Apply safety limits if configured
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {
key: (g_pos, present_pos[key])
for key, g_pos in goal_pos.items()
}
goal_pos = ensure_safe_goal_position(
goal_present_pos,
self.config.max_relative_target
)
# Get current positions
present_pos_right = self.bus_right.sync_read("Present_Position")
present_pos_left = self.bus_left.sync_read("Present_Position")
# Apply safety limits to right arm
if goal_pos_right:
goal_present_pos_right = {
key: (g_pos, present_pos_right.get(key, 0.0))
for key, g_pos in goal_pos_right.items()
}
goal_pos_right = ensure_safe_goal_position(
goal_present_pos_right,
self.config.max_relative_target
)
# Apply safety limits to left arm
if goal_pos_left:
goal_present_pos_left = {
key: (g_pos, present_pos_left.get(key, 0.0))
for key, g_pos in goal_pos_left.items()
}
goal_pos_left = ensure_safe_goal_position(
goal_present_pos_left,
self.config.max_relative_target
)
# Prepare MIT control commands for each motor
for motor_name, position_degrees in goal_pos.items():
# Use different gains for gripper
if motor_name == "gripper":
kp = self.config.position_kp * 0.5 # Lower gain for gripper
kd = self.config.position_kd * 0.5
else:
kp = self.config.position_kp
kd = self.config.position_kd
# Motor name to index mapping for gains
motor_index = {
"joint_1": 0,
"joint_2": 1,
"joint_3": 2,
"joint_4": 3,
"joint_5": 4,
"joint_6": 5,
"joint_7": 6,
"gripper": 7,
}
# Send MIT control commands to right arm
for motor_name, position_degrees in goal_pos_right.items():
# Get per-motor gains from config
idx = motor_index.get(motor_name, 0)
kp = self.config.position_kp[idx]
kd = self.config.position_kd[idx]
# Send MIT control command (position is in degrees)
self.bus._mit_control(
self.bus_right._mit_control(
motor_name,
kp=kp,
kd=kd,
@@ -340,15 +412,39 @@ class OpenArmsFollower(Robot):
torque=0.0
)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
# Send MIT control commands to left arm
for motor_name, position_degrees in goal_pos_left.items():
# Get per-motor gains from config
idx = motor_index.get(motor_name, 0)
kp = self.config.position_kp[idx]
kd = self.config.position_kd[idx]
# Send MIT control command (position is in degrees)
self.bus_left._mit_control(
motor_name,
kp=kp,
kd=kd,
position_degrees=position_degrees,
velocity_deg_per_sec=0.0,
torque=0.0
)
# Return the actions that were actually sent
result = {}
for motor, val in goal_pos_right.items():
result[f"right_{motor}.pos"] = val
for motor, val in goal_pos_left.items():
result[f"left_{motor}.pos"] = val
return result
def disconnect(self):
"""Disconnect from robot."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Disconnect from CAN bus
self.bus.disconnect(self.config.disable_torque_on_disconnect)
# Disconnect from CAN buses
self.bus_right.disconnect(self.config.disable_torque_on_disconnect)
self.bus_left.disconnect(self.config.disable_torque_on_disconnect)
# Disconnect cameras
for cam in self.cameras.values():
@@ -363,10 +459,10 @@ class OpenArmsFollower(Robot):
def _gravity_from_q(self, q_rad: Dict[str, float]) -> Dict[str, float]:
"""
Compute g(q) [N·m] for all joints in the robot.
The order of joints in the URDF matches self.bus.motors.
The order of joints in the URDF matches the concatenated motor lists (right then left).
Args:
q_rad: Dictionary mapping motor names to positions in radians
q_rad: Dictionary mapping motor names (with arm prefix) to positions in radians
Returns:
Dictionary mapping motor names to gravity torques in N·m
@@ -380,14 +476,34 @@ class OpenArmsFollower(Robot):
"Ensure urdf/openarms.urdf exists and is valid."
)
# Build position vector in the order of motors
# Build position vector in the order of motors (right arm, then left arm)
q = np.zeros(self.pin_robot.model.nq)
for i, motor_name in enumerate(self.bus.motors):
q[i] = q_rad[motor_name]
idx = 0
# Right arm motors
for motor_name in self.bus_right.motors:
full_name = f"right_{motor_name}"
q[idx] = q_rad.get(full_name, 0.0)
idx += 1
# Left arm motors
for motor_name in self.bus_left.motors:
full_name = f"left_{motor_name}"
q[idx] = q_rad.get(full_name, 0.0)
idx += 1
# Compute generalized gravity vector
g = pin.computeGeneralizedGravity(self.pin_robot.model, self.pin_robot.data, q)
# Map back to motor names
return {motor_name: float(g[i]) for i, motor_name in enumerate(self.bus.motors)}
result = {}
idx = 0
for motor_name in self.bus_right.motors:
result[f"right_{motor_name}"] = float(g[idx])
idx += 1
for motor_name in self.bus_left.motors:
result[f"left_{motor_name}"] = float(g[idx])
idx += 1
return result
@@ -25,13 +25,21 @@ from ..config import TeleoperatorConfig
class OpenArmsLeaderConfig(TeleoperatorConfig):
"""Configuration for the OpenArms leader/teleoperator with Damiao motors."""
# CAN interface to connect to
# CAN interfaces - one per arm
# Right arm CAN interface (e.g., "can2")
# Left arm CAN interface (e.g., "can3")
# Linux: "can0", "can1", etc.
# macOS: "/dev/cu.usbmodem*" (serial device)
port: str = "can0"
port_right: str = "can2" # CAN interface for right arm
port_left: str = "can3" # CAN interface for left arm
# CAN interface type: "socketcan" (Linux), "slcan" (macOS/serial), or "auto" (auto-detect)
can_interface: str = "auto"
can_interface: str = "socketcan"
# CAN FD settings (OpenArms uses CAN FD by default)
use_can_fd: bool = True
can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps)
can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps)
# Motor configuration for OpenArms (7 DOF per arm)
# Maps motor names to (send_can_id, recv_can_id, motor_type)
@@ -16,7 +16,7 @@
import logging
import time
from typing import Dict
from typing import Any, Dict
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
@@ -44,42 +44,60 @@ class OpenArmsLeader(Teleoperator):
super().__init__(config)
self.config = config
norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors
norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors
motors = {}
# Right arm (original IDs)
# Right arm motors (on port_right)
# Each arm uses the same CAN IDs since they're on separate buses
motors_right = {}
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
prefixed_name = f"right_{motor_name}"
motor = Motor(send_id, motor_type_str, norm_mode_body)
motor.recv_id = recv_id
motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_"))
motors[prefixed_name] = motor
motors_right[motor_name] = motor
# Left arm (offset IDs by 8)
# Left arm motors (on port_left, same IDs as right since separate bus)
motors_left = {}
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
prefixed_name = f"left_{motor_name}"
motor = Motor(send_id + 0x08, motor_type_str, norm_mode_body)
motor.recv_id = recv_id + 0x08
motor = Motor(send_id, motor_type_str, norm_mode_body)
motor.recv_id = recv_id
motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_"))
motors[prefixed_name] = motor
# Initialize the Damiao motors bus
self.bus = DamiaoMotorsBus(
port=self.config.port,
motors=motors,
calibration=self.calibration,
motors_left[motor_name] = motor
# Initialize separate Damiao motors buses (one per arm) with CAN FD support
self.bus_right = DamiaoMotorsBus(
port=self.config.port_right,
motors=motors_right,
calibration={k.replace("right_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("right_")},
can_interface=self.config.can_interface,
use_can_fd=self.config.use_can_fd,
bitrate=self.config.can_bitrate,
data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
)
self.bus_left = DamiaoMotorsBus(
port=self.config.port_left,
motors=motors_left,
calibration={k.replace("left_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("left_")},
can_interface=self.config.can_interface,
use_can_fd=self.config.use_can_fd,
bitrate=self.config.can_bitrate,
data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
)
@property
def action_features(self) -> Dict[str, type]:
"""Features produced by this teleoperator."""
features = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float
features[f"{motor}.torque"] = float
# Right arm motors
for motor in self.bus_right.motors:
features[f"right_{motor}.pos"] = float
features[f"right_{motor}.vel"] = float
features[f"right_{motor}.torque"] = float
# Left arm motors
for motor in self.bus_left.motors:
features[f"left_{motor}.pos"] = float
features[f"left_{motor}.vel"] = float
features[f"left_{motor}.torque"] = float
return features
@property
@@ -90,7 +108,7 @@ class OpenArmsLeader(Teleoperator):
@property
def is_connected(self) -> bool:
"""Check if teleoperator is connected."""
return self.bus.is_connected
return self.bus_right.is_connected and self.bus_left.is_connected
def connect(self, calibrate: bool = True) -> None:
"""
@@ -102,8 +120,11 @@ class OpenArmsLeader(Teleoperator):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
self.bus.connect()
# Connect to CAN buses
logger.info(f"Connecting right arm on {self.config.port_right}...")
self.bus_right.connect()
logger.info(f"Connecting left arm on {self.config.port_left}...")
self.bus_left.connect()
# Run calibration if needed
if not self.is_calibrated and calibrate:
@@ -120,7 +141,7 @@ class OpenArmsLeader(Teleoperator):
@property
def is_calibrated(self) -> bool:
"""Check if teleoperator is calibrated."""
return self.bus.is_calibrated
return self.bus_right.is_calibrated and self.bus_left.is_calibrated
def calibrate(self) -> None:
"""
@@ -141,18 +162,32 @@ class OpenArmsLeader(Teleoperator):
)
if user_input.strip().lower() != "c":
logger.info(f"Using existing calibration for {self.id}")
self.bus.write_calibration(self.calibration)
# Split calibration for each bus
cal_right = {k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")}
cal_left = {k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")}
self.bus_right.write_calibration(cal_right)
self.bus_left.write_calibration(cal_left)
return
logger.info(f"\nRunning calibration for {self}")
# Calibrate each arm separately
self._calibrate_arm("right", self.bus_right)
self._calibrate_arm("left", self.bus_left)
print(f"\nCalibration complete and saved to {self.calibration_fpath}")
def _calibrate_arm(self, arm_name: str, bus: DamiaoMotorsBus) -> None:
"""Calibrate a single arm."""
logger.info(f"\n=== Calibrating {arm_name.upper()} arm ===")
# Ensure torque is disabled for manual positioning
self.bus.disable_torque()
bus.disable_torque()
time.sleep(0.1)
# Step 1: Set zero position
input(
"\nCalibration Step 1: Zero Position\n"
f"\nCalibration: Zero Position ({arm_name.upper()} arm)\n"
"Position the arm in the following configuration:\n"
" - Arm hanging straight down\n"
" - Gripper closed\n"
@@ -160,45 +195,38 @@ class OpenArmsLeader(Teleoperator):
)
# Set current position as zero for all motors
self.bus.set_zero_position()
logger.info("Zero position set.")
bus.set_zero_position()
logger.info(f"{arm_name.capitalize()} arm zero position set.")
# Step 2: Record range of motion
# Automatically set range to -90° to +90° for all joints
print(
"\nCalibration Step 2: Range of Motion\n"
"Move each joint through its full range of motion.\n"
"The system will record min/max positions.\n"
"Press ENTER when done..."
f"\nAutomatically setting range: -90° to +90° for all joints"
)
# Record ranges
range_mins, range_maxes = self.bus.record_ranges_of_motion()
# Create calibration data (ranges are already in degrees)
self.calibration = {}
for motor_name, motor in self.bus.motors.items():
self.calibration[motor_name] = MotorCalibration(
# Create calibration data with fixed ranges
if self.calibration is None:
self.calibration = {}
for motor_name, motor in bus.motors.items():
# Prefix motor name with arm name for storage
prefixed_name = f"{arm_name}_{motor_name}"
# Use -90 to +90 for all joints and gripper (integers required)
self.calibration[prefixed_name] = MotorCalibration(
id=motor.id,
drive_mode=0, # Normal direction
homing_offset=0, # Already set via set_zero_position
range_min=range_mins.get(motor_name, -180.0), # Default -180 degrees
range_max=range_maxes.get(motor_name, 180.0), # Default +180 degrees
range_min=-90, # -90 degrees (integer)
range_max=90, # +90 degrees (integer)
)
logger.info(f" {prefixed_name}: range set to [-90°, +90°]")
# Special handling for gripper range
if "gripper" in self.calibration:
gripper_cal = self.calibration["gripper"]
gripper_range = abs(gripper_cal.range_max - gripper_cal.range_min)
if gripper_range < 5.0: # If gripper wasn't moved much (less than 5 degrees)
# Set default gripper range in degrees
gripper_cal.range_min = 0.0
gripper_cal.range_max = 90.0 # 90 degrees for full gripper motion
# Write calibration to this arm's motors
cal_for_bus = {k.replace(f"{arm_name}_", ""): v for k, v in self.calibration.items() if k.startswith(f"{arm_name}_")}
bus.write_calibration(cal_for_bus)
# Write calibration and save to file
self.bus.write_calibration(self.calibration)
# Save calibration after each arm
self._save_calibration()
print(f"\nCalibration complete and saved to {self.calibration_fpath}")
def configure(self) -> None:
"""
@@ -209,44 +237,54 @@ class OpenArmsLeader(Teleoperator):
if self.config.manual_control:
# Disable torque for manual control
logger.info("Disabling torque for manual control...")
self.bus.disable_torque()
self.bus_right.disable_torque()
self.bus_left.disable_torque()
else:
# Configure motors normally
self.bus.configure_motors()
self.bus_right.configure_motors()
self.bus_left.configure_motors()
def setup_motors(self) -> None:
raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.")
def get_observation(self) -> Dict[str, Any]:
"""Get current observation from robot including position, velocity, and torque."""
def get_action(self) -> Dict[str, Any]:
"""
Get current action from the leader arm.
This is the main method for teleoperators - it reads the current state
of the leader arm and returns it as an action that can be sent to a follower.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
action_dict = {}
# Read motor positions, velocities, and torques
# Read motor positions, velocities, and torques from right arm
start = time.perf_counter()
positions = self.bus.sync_read("Present_Position")
velocities = self.bus.sync_read("Present_Velocity")
torques = self.bus.sync_read("Present_Torque")
positions_right = self.bus_right.sync_read("Present_Position")
velocities_right = self.bus_right.sync_read("Present_Velocity")
torques_right = self.bus_right.sync_read("Present_Torque")
for motor in self.bus.motors:
obs_dict[f"{motor}.pos"] = positions.get(motor, 0.0)
obs_dict[f"{motor}.vel"] = velocities.get(motor, 0.0)
obs_dict[f"{motor}.torque"] = torques.get(motor, 0.0)
for motor in self.bus_right.motors:
action_dict[f"right_{motor}.pos"] = positions_right.get(motor, 0.0)
action_dict[f"right_{motor}.vel"] = velocities_right.get(motor, 0.0)
action_dict[f"right_{motor}.torque"] = torques_right.get(motor, 0.0)
# Read motor positions, velocities, and torques from left arm
positions_left = self.bus_left.sync_read("Present_Position")
velocities_left = self.bus_left.sync_read("Present_Velocity")
torques_left = self.bus_left.sync_read("Present_Torque")
for motor in self.bus_left.motors:
action_dict[f"left_{motor}.pos"] = positions_left.get(motor, 0.0)
action_dict[f"left_{motor}.vel"] = velocities_left.get(motor, 0.0)
action_dict[f"left_{motor}.torque"] = torques_left.get(motor, 0.0)
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
return action_dict
def send_feedback(self, feedback: Dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArms leader.")
@@ -259,12 +297,14 @@ class OpenArmsLeader(Teleoperator):
# For manual control, ensure torque is disabled before disconnecting
if self.config.manual_control:
try:
self.bus.disable_torque()
self.bus_right.disable_torque()
self.bus_left.disable_torque()
except Exception as e:
logger.warning(f"Failed to disable torque during disconnect: {e}")
# Disconnect from CAN bus
self.bus.disconnect(disable_torque=False) # Already disabled above if needed
# Disconnect from CAN buses
self.bus_right.disconnect(disable_torque=False) # Already disabled above if needed
self.bus_left.disconnect(disable_torque=False)
logger.info(f"{self} disconnected.")