Add damiao motors and open arm robot

This commit is contained in:
Pepijn
2025-10-27 16:40:05 +01:00
committed by Michel Aractingi
parent 56e2360072
commit 30c10c1c6e
14 changed files with 2194 additions and 397 deletions
+541
View File
@@ -0,0 +1,541 @@
# OpenArms Robot
OpenArms is a 7 DOF robotic arm with a gripper, designed by [Enactic, Inc.](https://www.enactic.com/) It uses Damiao motors controlled via CAN bus communication and MIT control mode for smooth, precise motion.
## Hardware Overview
- **7 DOF per arm** (14 DOF total for dual arm setup)
- **1 gripper per arm** (2 grippers total)
- **Damiao motors** with 4 different types:
- **DM8009** (DM-J8009P-2EC) for shoulders (J1, J2) - high torque
- **DM4340** for shoulder rotation and elbow (J3, J4)
- **DM4310** (DM-J4310-2EC V1.1) for wrist (J5, J6, J7) and gripper (J8)
- **24V power supply** required
- **CAN interface device**:
- **Linux**: Any SocketCAN-compatible adapter
- **macOS**: CANable, PEAK PCAN-USB, or Kvaser USBcan
- Proper CAN wiring (CANH, CANL, 120Ω termination)
## Motor Configuration
Each arm has the following motor configuration based on the [OpenArm setup guide](https://docs.openarm.dev/software/setup/):
| Joint | Motor | Motor Type | Sender CAN ID | Receiver ID | Description |
|-------|-------|------------|---------------|-------------|-------------|
| J1 | joint_1 | DM8009 | 0x01 | 0x11 | Shoulder pan |
| J2 | joint_2 | DM8009 | 0x02 | 0x12 | Shoulder lift |
| J3 | joint_3 | DM4340 | 0x03 | 0x13 | Shoulder rotation |
| J4 | joint_4 | DM4340 | 0x04 | 0x14 | Elbow flex |
| J5 | joint_5 | DM4310 | 0x05 | 0x15 | Wrist roll |
| J6 | joint_6 | DM4310 | 0x06 | 0x16 | Wrist pitch |
| J7 | joint_7 | DM4310 | 0x07 | 0x17 | Wrist rotation |
| J8 | gripper | DM4310 | 0x08 | 0x18 | Gripper |
For dual arm setups, the left arm uses IDs 0x09-0x10 for joints 1-8 with the same motor types.
## Quick Start (macOS)
If you're on macOS, here's the fastest way to get started:
```bash
# 1. Install LeRobot with OpenArms dependencies
pip install -e ".[openarms]"
# 2. Find your USB-CAN adapter
ls /dev/cu.usbmodem*
# 3. Test communication
python3 -c "
import can
bus = can.interface.Bus(channel='/dev/cu.usbmodem00000000050C1', interface='slcan', bitrate=1000000)
print('✓ CAN interface connected')
bus.shutdown()
"
```
Then use this configuration:
```python
from lerobot.robots.openarms import OpenArmsFollower
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
config = OpenArmsFollowerConfig(
port="/dev/cu.usbmodem00000000050C1", # Your adapter
can_interface="auto", # Auto-detects slcan for /dev/* ports
is_dual_arm=True,
)
robot = OpenArmsFollower(config)
robot.connect()
```
## Prerequisites
### Software Requirements
**Linux:**
- Ubuntu 22.04/24.04 (or any Linux with SocketCAN)
- Python 3.8+
- `can-utils` and `iproute2` packages
- LeRobot with OpenArms dependencies
```bash
# Install system dependencies
sudo apt install can-utils iproute2
# Install LeRobot with OpenArms support
pip install -e ".[openarms]"
```
**macOS:**
- macOS 12+ (Monterey or later)
- Python 3.8+
- LeRobot with OpenArms dependencies
```bash
# Install LeRobot with OpenArms support (includes python-can)
pip install -e ".[openarms]"
```
The `openarms` extra installs:
- `python-can>=4.2.0` - CAN bus communication library (supports SocketCAN on Linux and SLCAN on macOS)
:::tip
If you've already installed LeRobot and want to add OpenArms support, run:
```bash
pip install -e ".[openarms]"
```
:::
## Setup Guide
### Step 1: Motor ID Configuration
**IMPORTANT**: Before using the robot, motors must be configured with the correct CAN IDs.
Refer to the [OpenArm Motor ID Configuration Guide](https://docs.openarm.dev/software/setup/motor-id) for detailed instructions using the Damiao Debugging Tools on Windows.
Key points:
- Each motor needs a unique **Sender CAN ID** (0x01-0x08 for first arm)
- Each motor needs a unique **Receiver/Master ID** (0x11-0x18 for first arm)
- Use the Damiao Debugging Tools to set these IDs
- For dual arm setups, use 0x09-0x10 for the second arm
### Step 2: Setup CAN Interface
Configure your CAN interface as described in the [OpenArm CAN Setup Guide](https://docs.openarm.dev/software/setup/can-setup):
#### Linux (SocketCAN)
```bash
# Find your CAN interface
ip link show
# Setup CAN 2.0 at 1 Mbps (standard)
sudo ip link set can0 down
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up
# Verify configuration
ip link show can0
```
#### macOS
macOS doesn't have native SocketCAN support.
**Use SLCAN (Serial Line CAN)**
For USB-CAN adapters that support SLCAN protocol (like CANable):
```bash
# Install python-can if not already installed
pip install python-can
# The adapter will appear as a serial device
ls /dev/cu.usbmodem*
# Use with python-can slcan interface
# Example: /dev/cu.usbmodem14201
```
In your code, specify the slcan interface:
```python
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
config = OpenArmsFollowerConfig(
port="/dev/cu.usbmodem14201", # Your USB-CAN adapter
can_interface="slcan", # Will auto-detect if set to "auto"
)
```
### Step 3: Test Motor Communication
**On Linux:**
Test basic communication as described in the [OpenArm Motor Test Guide](https://docs.openarm.dev/software/setup/configure-test):
```bash
# Terminal 1: Monitor CAN traffic
candump can0
# Terminal 2: Enable motor 1
cansend can0 001#FFFFFFFFFFFFFFFC
# Expected response on Terminal 1:
# can0 011 [8] XX XX XX XX XX XX XX XX
# Disable motor 1
cansend can0 001#FFFFFFFFFFFFFFFD
```
**On macOS:**
Testing is done differently since you'll use serial-based adapters:
```bash
# Find your USB-CAN adapter
ls /dev/cu.usbmodem*
# Example output: /dev/cu.usbmodem00000000050C1
# Test with Python directly (can-utils don't work on macOS)
python3 -c "
import can
bus = can.interface.Bus(channel='/dev/cu.usbmodem00000000050C1', interface='slcan', bitrate=1000000)
msg = can.Message(arbitration_id=0x01, data=[0xFF]*7+[0xFC])
bus.send(msg)
response = bus.recv(timeout=1.0)
if response:
print(f'✓ Motor responded: ID 0x{response.arbitration_id:02X}')
else:
print('✗ No response')
bus.shutdown()
"
```
## Usage
### Basic Setup
**On Linux:**
```python
from lerobot.robots.openarms import OpenArmsFollower
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
# Configure for dual arm setup
config = OpenArmsFollowerConfig(
port="can0",
can_interface="socketcan", # Or "auto" for auto-detection
id="openarms_dual",
is_dual_arm=True,
)
robot = OpenArmsFollower(config)
robot.connect()
```
**On macOS:**
```python
from lerobot.robots.openarms import OpenArmsFollower
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
# Find your USB-CAN adapter first
# ls /dev/cu.usbmodem*
config = OpenArmsFollowerConfig(
port="/dev/cu.usbmodem14201", # Your adapter's serial port
can_interface="slcan", # Or "auto" for auto-detection
id="openarms_dual",
is_dual_arm=True,
)
robot = OpenArmsFollower(config)
robot.connect()
```
### Calibration
On first use, you'll need to calibrate the robot:
```python
robot.calibrate()
```
The calibration process will:
1. Disable torque on all motors
2. Ask you to position arms in **hanging position with grippers closed**
3. Set this as the zero position
4. Ask you to move each joint through its full range
5. Record min/max positions for each joint
6. Save calibration to file
### Reading Observations
The robot provides comprehensive state information:
```python
observation = robot.get_observation()
# Observation includes for each motor:
# - {motor_name}.pos: Position in degrees
# - {motor_name}.vel: Velocity in degrees/second
# - {motor_name}.torque: Motor torque
# - {camera_name}: Camera images (if configured)
print(f"Right arm joint 1 position: {observation['right_joint_1.pos']:.1f}°")
print(f"Right arm joint 1 velocity: {observation['right_joint_1.vel']:.1f}°/s")
print(f"Right arm joint 1 torque: {observation['right_joint_1.torque']:.3f} N·m")
```
### Sending Actions
```python
# Send target positions (in degrees)
action = {
"right_joint_1.pos": 45.0,
"right_joint_2.pos": -30.0,
# ... all joints
"right_gripper.pos": 45.0, # Half-closed
}
actual_action = robot.send_action(action)
```
### Gripper Control
```python
# Open gripper
robot.open_gripper(arm="right")
# Close gripper
robot.close_gripper(arm="right")
```
## Safety Features
### 1. Maximum Relative Target
Limits how far a joint can move in a single command to prevent sudden movements:
```python
config = OpenArmsFollowerConfig(
port="can0",
# Limit all joints to 10 degrees per command
max_relative_target=10.0,
# Or set per-motor limits
max_relative_target={
"right_joint_1": 15.0, # Slower moving joint
"right_joint_2": 10.0,
"right_gripper": 5.0, # Very slow gripper
}
)
```
**How it works**: If current position is 50° and you command 80°, with `max_relative_target=10.0`, the robot will only move to 60° in that step.
### 2. Torque Limits
Control maximum torque output, especially important for grippers and teleoperation:
```python
config = OpenArmsFollowerConfig(
port="can0",
# Gripper torque limit (fraction of motor's max torque)
gripper_torque_limit=0.5, # 50% of max torque
)
```
Lower torque limits prevent damage when gripping delicate objects.
### 3. MIT Control Gains
Control responsiveness and stability via PID-like gains:
```python
config = OpenArmsFollowerConfig(
port="can0",
position_kp=10.0, # Position gain (higher = more responsive)
position_kd=0.5, # Velocity damping (higher = more damped)
)
```
**Guidelines**:
- **For following (robot)**: Higher gains for responsiveness
- `position_kp=10.0`, `position_kd=0.5`
- **For teleoperation (leader)**: Lower gains or disable torque for manual movement
- `manual_control=True` (torque disabled)
### 4. Velocity Limits
Velocity limits are enforced by the Damiao motors based on motor type. For DM4310:
- Max velocity: 30 rad/s ≈ 1718°/s
The motors will automatically limit velocity to safe values.
## Teleoperation
### Leader Arm Setup
The leader arm is moved manually (torque disabled) to generate commands:
```python
from lerobot.teleoperators.openarms import OpenArmsLeader
from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig
config = OpenArmsLeaderConfig(
port="can1", # Separate CAN interface for leader
id="openarms_leader",
manual_control=True, # Torque disabled for manual movement
is_dual_arm=True,
)
leader = OpenArmsLeader(config)
leader.connect()
# Read current position as action
action = leader.get_action()
# action contains positions for all joints in degrees
```
### Safety Considerations for Teleoperation
1. **Use separate CAN interfaces** for leader and follower to avoid conflicts
2. **Enable max_relative_target** on follower to smooth abrupt movements
3. **Lower torque limits** on follower to prevent damage from tracking errors
4. **Test with one arm** before enabling dual arm teleoperation
5. **Have emergency stop** ready (power switch or CAN disable)
```python
# Recommended follower config for teleoperation
follower_config = OpenArmsFollowerConfig(
port="can0",
max_relative_target=5.0, # Small steps for smooth following
gripper_torque_limit=0.3, # Low torque for safety
position_kp=5.0, # Lower gains for gentler following
position_kd=0.3,
)
```
## Troubleshooting
### Motors Not Responding
**Linux:**
1. **Check power supply**: 24V with sufficient current
2. **Verify CAN interface**: `ip link show can0` should show "UP"
3. **Test with cansend**: Follow [motor test guide](https://docs.openarm.dev/software/setup/configure-test)
4. **Check motor IDs**: Use Damiao Debugging Tools to verify IDs
5. **Check termination**: 120Ω resistor should be enabled on CAN interface
**macOS:**
1. **Check power supply**: 24V with sufficient current
2. **Find adapter**: `ls /dev/cu.usbmodem*` should show your device
3. **Test connection**: Use Python script above to test communication
4. **Check motor IDs**: Use Damiao Debugging Tools on Windows
5. **Verify drivers**: Ensure USB-CAN adapter drivers are installed
6. **Try different baudrate**: Some adapters default to different rates
### macOS-Specific Issues
**"No such interface" error:**
```python
# Try auto-detection
config.can_interface = "auto"
# Or explicitly list available interfaces
import can
print(can.detect_available_configs())
```
**Permission denied on `/dev/cu.*`:**
```bash
# Add user to dialout group (if applicable)
sudo dscl . -append /Groups/_dialout GroupMembership $USER
# Or run with sudo (not recommended)
sudo python your_script.py
```
**Adapter not showing up:**
```bash
# Check USB devices
system_profiler SPUSBDataType
# Reinstall python-can
pip install --upgrade --force-reinstall python-can
```
### Motor Shaking/Unstable
- **Lower control gains**: Reduce `position_kp` and `position_kd`
- **Check calibration**: Re-run calibration procedure
- **Verify power**: Insufficient current can cause instability
- **Check mechanical**: Loose connections, binding, or damaged components
### CAN Bus Errors
**Linux:**
```bash
# Check for errors
ip -s link show can0
# Reset CAN interface
sudo ip link set can0 down
sudo ip link set can0 up
```
**macOS:**
```bash
# Reconnect USB adapter
# Unplug and replug the USB cable
# Restart Python script
# The slcan interface auto-reconnects
```
### Position Drift
- **Re-calibrate**: Run calibration procedure
- **Set zero position**: Use `robot.bus.set_zero_position()` with arm in known position
- **Check temperature**: Motors may drift when hot
## Technical Details
### Position Units
All positions are in **degrees**:
- Motor internal representation: radians
- User API: degrees
- Automatic conversion handled by `DamiaoMotorsBus`
### Control Mode
OpenArms uses **MIT control mode** which allows simultaneous control of:
- Position (degrees)
- Velocity (degrees/second)
- Torque (N·m)
- Position gain (Kp)
- Velocity damping (Kd)
### Communication
- **Protocol**: CAN 2.0 at 1 Mbps (or CAN-FD at 5 Mbps)
- **Frame format**: Standard 11-bit IDs
- **Update rate**: Typically 50-100 Hz depending on motor count
- **Latency**: ~10-20ms per motor command
## References
- [OpenArm Official Documentation](https://docs.openarm.dev/)
- [OpenArm Setup Guide](https://docs.openarm.dev/software/setup/)
- [Motor ID Configuration](https://docs.openarm.dev/software/setup/motor-id)
- [CAN Interface Setup](https://docs.openarm.dev/software/setup/can-setup)
- [Motor Communication Test](https://docs.openarm.dev/software/setup/configure-test)
- [Damiao Motor Documentation](https://wiki.seeedstudio.com/damiao_series/)
- [Enactic GitHub](https://github.com/enactic/openarm_can)
+3
View File
@@ -102,8 +102,10 @@ grpcio-dep = ["grpcio==1.73.1", "protobuf==6.31.0"] # TODO: Bumb dependency (com
# Motors
feetech = ["feetech-servo-sdk>=1.0.0,<2.0.0"]
dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"]
damiao = ["python-can>=4.2.0,<5.0.0"]
# Robots
openarms = ["lerobot[damiao]"]
gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"]
hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
@@ -153,6 +155,7 @@ metaworld = ["metaworld==3.0.0"]
# All
all = [
"lerobot[dynamixel]",
"lerobot[openarms]",
"lerobot[gamepad]",
"lerobot[hopejr]",
"lerobot[lekiwi]",
+8 -1
View File
@@ -14,4 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .motors_bus import Motor, MotorCalibration, MotorNormMode, MotorsBus
from .motors_bus import (
Motor,
MotorCalibration,
MotorNormMode,
MotorsBus, # Backward compatibility (alias for SerialMotorsBus)
MotorsBusBase,
SerialMotorsBus,
)
+479 -366
View File
@@ -15,386 +15,361 @@
# TODO(pepijn): add license of: https://github.com/cmjang/DM_Control_Python?tab=MIT-1-ov-file#readme
import logging
import time
from copy import deepcopy
from enum import Enum
from functools import cached_property
from typing import Dict, List, Optional, Tuple, Union
from lerobot.motors.encoding_utils import decode_twos_complement, encode_twos_complement
import can
import numpy as np
from lerobot.motors import Motor, MotorCalibration, MotorNormMode, MotorsBusBase
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.utils.utils import enter_pressed, move_cursor_up
from .tables import (
AVAILABLE_BAUDRATES,
CAN_CMD_DISABLE,
CAN_CMD_ENABLE,
CAN_CMD_REFRESH,
CAN_CMD_SET_ZERO,
CAN_PARAM_ID,
DEFAULT_BAUDRATE,
DEFAULT_TIMEOUT_MS,
MODEL_RESOLUTION,
MOTOR_LIMIT_PARAMS,
NORMALIZED_DATA,
MotorType,
)
logger = logging.getLogger(__name__)
class OperatingMode(Enum):
MIT = 0
NameOrID = Union[str, int]
Value = Union[int, float]
class DamiaoMotorsBus(MotorsBus):
class DamiaoMotorsBus(MotorsBusBase):
"""
The Damiao implementation for a MotorsBus. It relies on the python-can library to communicate with
the motors. For more info, see the python-can documentation: https://python-can.readthedocs.io/en/stable/, seedstudio documentation: https://wiki.seeedstudio.com/damiao_series/ and DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python
https://wiki.seeedstudio.com/damiao_series/ and DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python
The Damiao implementation for a MotorsBus using CAN bus communication.
This class uses python-can for CAN bus communication with Damiao motors.
For more info, see:
- python-can documentation: https://python-can.readthedocs.io/en/stable/
- Seedstudio documentation: https://wiki.seeedstudio.com/damiao_series/
- DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python
"""
# CAN-specific settings
available_baudrates = deepcopy(AVAILABLE_BAUDRATES)
default_baudrate = DEFAULT_BAUDRATE
default_timeout = DEFAULT_TIMEOUT_MS
# Motor configuration
model_resolution_table = deepcopy(MODEL_RESOLUTION)
normalized_data = deepcopy(NORMALIZED_DATA)
def __init__(
self,
port: str,
motors: dict[str, Motor],
calibration: dict[str, MotorCalibration] | None = None,
can_interface: str = "socketcan",
):
"""
Initialize the Damiao motors bus.
Args:
port: CAN interface name (e.g., "can0" for Linux, "/dev/cu.usbmodem*" for macOS)
motors: Dictionary mapping motor names to Motor objects
calibration: Optional calibration data
can_interface: CAN interface type - "socketcan" (Linux) or "slcan" (macOS/serial)
"""
super().__init__(port, motors, calibration)
self.port = port
self.can_interface = can_interface
self.canbus = None
self._is_connected = False
def configure_motors(self) -> None:
for motor in self.motors
# Map motor names to CAN IDs
self._motor_can_ids = {}
self._recv_id_to_motor = {}
@cached_property
def models(self) -> list[str]:
return [m.model for m in self.motors.values()]
@cached_property
def ids(self) -> list[int]:
return [m.id for m in self.motors.values()]
# Store motor types
self._motor_types = {}
for name, motor in self.motors.items():
if hasattr(motor, "motor_type"):
self._motor_types[name] = motor.motor_type
else:
# Default to DM4310 if not specified
self._motor_types[name] = MotorType.DM4310
@property
def is_connected(self) -> bool:
"""bool: `True` if the underlying serial port is open."""
return self.port_handler.is_open
"""Check if the CAN bus is connected."""
return self._is_connected and self.canbus is not None
def connect(self, handshake: bool = True) -> None:
"""Open the serial port and initialise communication.
"""
Open the CAN bus and initialize communication.
Args:
handshake (bool, optional): Pings every expected motor and performs additional
integrity checks specific to the implementation. Defaults to `True`.
Raises:
DeviceAlreadyConnectedError: The port is already open.
ConnectionError: The underlying SDK failed to open the port or the handshake did not succeed.
handshake: If True, ping all motors to verify they're present
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(
f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice."
f"{self.__class__.__name__}('{self.port}') is already connected."
)
self._connect(handshake)
self.set_timeout()
logger.debug(f"{self.__class__.__name__} connected.")
try:
# Auto-detect interface type based on port name
if self.can_interface == "auto":
if self.port.startswith("/dev/"):
# Serial device (macOS/Windows)
self.can_interface = "slcan"
logger.info(f"Auto-detected slcan interface for port {self.port}")
else:
# Network interface (Linux)
self.can_interface = "socketcan"
logger.info(f"Auto-detected socketcan interface for port {self.port}")
@property
def is_calibrated(self) -> bool:
return self.calibration == self.read_calibration()
def read_calibration(self) -> dict[str, MotorCalibration]:
offsets = self.sync_read("Homing_Offset", normalize=False)
mins = self.sync_read("Min_Position_Limit", normalize=False)
maxes = self.sync_read("Max_Position_Limit", normalize=False)
drive_modes = self.sync_read("Drive_Mode", normalize=False)
calibration = {}
for motor, m in self.motors.items():
calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=drive_modes[motor],
homing_offset=offsets[motor],
range_min=mins[motor],
range_max=maxes[motor],
# Connect to CAN bus
if self.can_interface == "socketcan":
# Linux SocketCAN
self.canbus = can.interface.Bus(
channel=self.port,
interface="socketcan",
bitrate=self.default_baudrate
)
elif self.can_interface == "slcan":
# Serial Line CAN (macOS, Windows, or USB adapters)
self.canbus = can.interface.Bus(
channel=self.port,
interface="slcan",
bitrate=self.default_baudrate
)
else:
# Generic interface (vector, pcan, etc.)
self.canbus = can.interface.Bus(
channel=self.port,
interface=self.can_interface,
bitrate=self.default_baudrate
)
return calibration
self._is_connected = True
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
for motor, calibration in calibration_dict.items():
self.write("Homing_Offset", motor, calibration.homing_offset)
self.write("Min_Position_Limit", motor, calibration.range_min)
self.write("Max_Position_Limit", motor, calibration.range_max)
if handshake:
self._handshake()
if cache:
self.calibration = calibration_dict
logger.debug(f"{self.__class__.__name__} connected via {self.can_interface}.")
except Exception as e:
self._is_connected = False
raise ConnectionError(f"Failed to connect to CAN bus: {e}")
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)
def _handshake(self) -> None:
"""Verify all motors are present by refreshing their status."""
for motor_name in self.motors:
self._refresh_motor(motor_name)
time.sleep(0.01) # Small delay between motors
def disconnect(self, disable_torque: bool = True) -> None:
"""Close the serial port (optionally disabling torque first).
"""
Close the CAN bus connection.
Args:
disable_torque (bool, optional): If `True` (default) torque is disabled on every motor before
closing the port. This can prevent damaging motors if they are left applying resisting torque
after disconnect.
disable_torque: If True, disable torque on all motors before disconnecting
"""
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first."
f"{self.__class__.__name__}('{self.port}') is not connected."
)
if disable_torque:
self.port_handler.clearPort()
self.port_handler.is_using = False
self.disable_torque(num_retry=5)
try:
self.disable_torque()
except Exception as e:
logger.warning(f"Failed to disable torque during disconnect: {e}")
self.port_handler.closePort()
if self.canbus:
self.canbus.shutdown()
self.canbus = None
self._is_connected = False
logger.debug(f"{self.__class__.__name__} disconnected.")
@classmethod
def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]:
"""Probe *port* at every supported baud-rate and list responding IDs.
def configure_motors(self) -> None:
"""Configure all motors with default settings."""
# Damiao motors don't require much configuration in MIT mode
# Just ensure they're enabled
for motor in self.motors:
self._enable_motor(motor)
time.sleep(0.01)
Args:
port (str): Serial/USB port to scan (e.g. ``"/dev/ttyUSB0"``).
*args, **kwargs: Forwarded to the subclass constructor.
def _enable_motor(self, motor: NameOrID) -> None:
"""Enable a single motor."""
motor_id = self._get_motor_id(motor)
data = [0xFF] * 7 + [CAN_CMD_ENABLE]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
self.canbus.send(msg)
self._recv_motor_response()
Returns:
dict[int, list[int]]: Mapping *baud-rate → list of motor IDs*
for every baud-rate that produced at least one response.
"""
bus = cls(port, {}, *args, **kwargs)
bus._connect(handshake=False)
baudrate_ids = {}
for baudrate in tqdm(bus.available_baudrates, desc="Scanning port"):
bus.set_baudrate(baudrate)
ids_models = bus.broadcast_ping()
if ids_models:
tqdm.write(f"Motors found for {baudrate=}: {pformat(ids_models, indent=4)}")
baudrate_ids[baudrate] = list(ids_models)
def _disable_motor(self, motor: NameOrID) -> None:
"""Disable a single motor."""
motor_id = self._get_motor_id(motor)
data = [0xFF] * 7 + [CAN_CMD_DISABLE]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
self.canbus.send(msg)
self._recv_motor_response()
bus.port_handler.closePort()
return baudrate_ids
def setup_motor(
self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None
) -> None:
"""Assign the correct ID and baud-rate to a single motor.
This helper temporarily switches to the motor's current settings, disables torque, sets the desired
ID, and finally programs the bus' default baud-rate.
Args:
motor (str): Key of the motor in :pyattr:`motors`.
initial_baudrate (int | None, optional): Current baud-rate (skips scanning when provided).
Defaults to None.
initial_id (int | None, optional): Current ID (skips scanning when provided). Defaults to None.
Raises:
RuntimeError: The motor could not be found or its model number
does not match the expected one.
ConnectionError: Communication with the motor failed.
"""
if not self.is_connected:
self._connect(handshake=False)
if initial_baudrate is None:
initial_baudrate, initial_id = self._find_single_motor(motor)
if initial_id is None:
_, initial_id = self._find_single_motor(motor, initial_baudrate)
model = self.motors[motor].model
target_id = self.motors[motor].id
self.set_baudrate(initial_baudrate)
self._disable_torque(initial_id, model)
# Set ID
addr, length = get_address(self.model_ctrl_table, model, "ID")
self._write(addr, length, initial_id, target_id)
# Set Baudrate
addr, length = get_address(self.model_ctrl_table, model, "Baud_Rate")
baudrate_value = self.model_baudrate_table[model][self.default_baudrate]
self._write(addr, length, target_id, baudrate_value)
self.set_baudrate(self.default_baudrate)
@contextmanager
def torque_disabled(self, motors: int | str | list[str] | None = None):
"""Context-manager that guarantees torque is re-enabled.
This helper is useful to temporarily disable torque when configuring motors.
Examples:
>>> with bus.torque_disabled():
... # Safe operations here
... pass
"""
self.disable_torque(motors)
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
"""Enable torque on selected motors."""
motors = self._get_motors_list(motors)
for motor in motors:
for _ in range(num_retry + 1):
try:
yield
finally:
self.enable_torque(motors)
self._enable_motor(motor)
break
except Exception as e:
if _ == num_retry:
raise e
time.sleep(0.01)
def set_timeout(self, timeout_ms: int | None = None):
"""Change the packet timeout used by the SDK.
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
"""Disable torque on selected motors."""
motors = self._get_motors_list(motors)
for motor in motors:
for _ in range(num_retry + 1):
try:
self._disable_motor(motor)
break
except Exception as e:
if _ == num_retry:
raise e
time.sleep(0.01)
def set_zero_position(self, motors: str | list[str] | None = None) -> None:
"""Set current position as zero for selected motors."""
motors = self._get_motors_list(motors)
for motor in motors:
motor_id = self._get_motor_id(motor)
data = [0xFF] * 7 + [CAN_CMD_SET_ZERO]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
self.canbus.send(msg)
self._recv_motor_response()
time.sleep(0.01)
def _refresh_motor(self, motor: NameOrID) -> None:
"""Refresh motor status."""
motor_id = self._get_motor_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False)
self.canbus.send(msg)
self._recv_motor_response()
def _recv_motor_response(self, timeout: float = 0.1) -> Optional[can.Message]:
"""Receive a response from a motor."""
try:
msg = self.canbus.recv(timeout=timeout)
if msg:
return msg
except Exception as e:
logger.debug(f"Failed to receive CAN message: {e}")
return None
def _mit_control(
self,
motor: NameOrID,
kp: float,
kd: float,
position_degrees: float,
velocity_deg_per_sec: float,
torque: float,
) -> None:
"""
Send MIT control command to a motor.
Args:
timeout_ms (int | None, optional): Timeout in *milliseconds*. If `None` (default) the method falls
back to :pyattr:`default_timeout`.
motor: Motor name or ID
kp: Position gain
kd: Velocity gain
position_degrees: Target position (degrees)
velocity_deg_per_sec: Target velocity (degrees/s)
torque: Target torque (N·m)
"""
timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout
self.port_handler.setPacketTimeoutMillis(timeout_ms)
motor_id = self._get_motor_id(motor)
motor_name = self._get_motor_name(motor)
motor_type = self._motor_types.get(motor_name, MotorType.DM4310)
def get_baudrate(self) -> int:
"""Return the current baud-rate configured on the port.
# Convert degrees to radians for motor control
position_rad = np.radians(position_degrees)
velocity_rad_per_sec = np.radians(velocity_deg_per_sec)
# Get motor limits
pmax, vmax, tmax = MOTOR_LIMIT_PARAMS[motor_type]
# Encode parameters
kp_uint = self._float_to_uint(kp, 0, 500, 12)
kd_uint = self._float_to_uint(kd, 0, 5, 12)
q_uint = self._float_to_uint(position_rad, -pmax, pmax, 16)
dq_uint = self._float_to_uint(velocity_rad_per_sec, -vmax, vmax, 12)
tau_uint = self._float_to_uint(torque, -tmax, tmax, 12)
# Pack data
data = [0] * 8
data[0] = (q_uint >> 8) & 0xFF
data[1] = q_uint & 0xFF
data[2] = dq_uint >> 4
data[3] = ((dq_uint & 0xF) << 4) | ((kp_uint >> 8) & 0xF)
data[4] = kp_uint & 0xFF
data[5] = kd_uint >> 4
data[6] = ((kd_uint & 0xF) << 4) | ((tau_uint >> 8) & 0xF)
data[7] = tau_uint & 0xFF
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
self.canbus.send(msg)
self._recv_motor_response()
def _float_to_uint(self, x: float, x_min: float, x_max: float, bits: int) -> int:
"""Convert float to unsigned integer for CAN transmission."""
x = max(x_min, min(x_max, x)) # Clamp to range
span = x_max - x_min
data_norm = (x - x_min) / span
return int(data_norm * ((1 << bits) - 1))
def _uint_to_float(self, x: int, x_min: float, x_max: float, bits: int) -> float:
"""Convert unsigned integer from CAN to float."""
span = x_max - x_min
data_norm = float(x) / ((1 << bits) - 1)
return data_norm * span + x_min
def _decode_motor_state(self, data: bytes, motor_type: MotorType) -> Tuple[float, float, float, int, int]:
"""
Decode motor state from CAN data.
Returns:
int: Baud-rate in bits / second.
Tuple of (position_degrees, velocity_deg_per_sec, torque, temp_mos, temp_rotor)
"""
return self.port_handler.getBaudRate()
if len(data) < 8:
raise ValueError("Invalid motor state data")
def set_baudrate(self, baudrate: int) -> None:
"""Set a new UART baud-rate on the port.
# Extract encoded values
q_uint = (data[1] << 8) | data[2]
dq_uint = (data[3] << 4) | (data[4] >> 4)
tau_uint = ((data[4] & 0x0F) << 8) | data[5]
t_mos = data[6]
t_rotor = data[7]
Args:
baudrate (int): Desired baud-rate in bits / second.
# Get motor limits
pmax, vmax, tmax = MOTOR_LIMIT_PARAMS[motor_type]
Raises:
RuntimeError: The SDK failed to apply the change.
"""
present_bus_baudrate = self.port_handler.getBaudRate()
if present_bus_baudrate != baudrate:
logger.info(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.")
self.port_handler.setBaudRate(baudrate)
# Decode to physical values (radians)
position_rad = self._uint_to_float(q_uint, -pmax, pmax, 16)
velocity_rad_per_sec = self._uint_to_float(dq_uint, -vmax, vmax, 12)
torque = self._uint_to_float(tau_uint, -tmax, tmax, 12)
if self.port_handler.getBaudRate() != baudrate:
raise RuntimeError("Failed to write bus baud rate.")
# Convert to degrees
position_degrees = np.degrees(position_rad)
velocity_deg_per_sec = np.degrees(velocity_rad_per_sec)
def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
"""Restore factory calibration for the selected motors.
Homing offset is set to ``0`` and min/max position limits are set to the full usable range.
The in-memory :pyattr:`calibration` is cleared.
Args:
motors (NameOrID | list[NameOrID] | None, optional): Selection of motors. `None` (default)
resets every motor.
"""
if motors is None:
motors = list(self.motors)
elif isinstance(motors, (str | int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
for motor in motors:
model = self._get_motor_model(motor)
max_res = self.model_resolution_table[model] - 1
self.write("Homing_Offset", motor, 0, normalize=False)
self.write("Min_Position_Limit", motor, 0, normalize=False)
self.write("Max_Position_Limit", motor, max_res, normalize=False)
self.calibration = {}
def record_ranges_of_motion(
self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
"""Interactively record the min/max encoder values of each motor.
Move the joints by hand (with torque disabled) while the method streams live positions. Press
:kbd:`Enter` to finish.
Args:
motors (NameOrID | list[NameOrID] | None, optional): Motors to record.
Defaults to every motor (`None`).
display_values (bool, optional): When `True` (default) a live table is printed to the console.
Returns:
tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: Two dictionaries *mins* and *maxes* with the
extreme values observed for each motor.
"""
if motors is None:
motors = list(self.motors)
elif isinstance(motors, (str | int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
start_positions = self.sync_read("Present_Position", motors, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
user_pressed_enter = False
while not user_pressed_enter:
positions = self.sync_read("Present_Position", motors, normalize=False)
mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()}
maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()}
if display_values:
print("\n-------------------------------------------")
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
for motor in motors:
print(f"{motor:<15} | {mins[motor]:>6} | {positions[motor]:>6} | {maxes[motor]:>6}")
if enter_pressed():
user_pressed_enter = True
if display_values and not user_pressed_enter:
# Move cursor up to overwrite the previous output
move_cursor_up(len(motors) + 3)
same_min_max = [motor for motor in motors if mins[motor] == maxes[motor]]
if same_min_max:
raise ValueError(f"Some motors have the same min and max values:\n{pformat(same_min_max)}")
return mins, maxes
def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]:
if not self.calibration:
raise RuntimeError(f"{self} has no calibration registered.")
normalized_values = {}
for id_, val in ids_values.items():
motor = self._id_to_name(id_)
min_ = self.calibration[motor].range_min
max_ = self.calibration[motor].range_max
drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode
if max_ == min_:
raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")
bounded_val = min(max_, max(min_, val))
if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100:
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
normalized_values[id_] = -norm if drive_mode else norm
elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100:
norm = ((bounded_val - min_) / (max_ - min_)) * 100
normalized_values[id_] = 100 - norm if drive_mode else norm
elif self.motors[motor].norm_mode is MotorNormMode.DEGREES:
mid = (min_ + max_) / 2
max_res = self.model_resolution_table[self._id_to_model(id_)] - 1
normalized_values[id_] = (val - mid) * 360 / max_res
else:
raise NotImplementedError
return normalized_values
def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]:
if not self.calibration:
raise RuntimeError(f"{self} has no calibration registered.")
unnormalized_values = {}
for id_, val in ids_values.items():
motor = self._id_to_name(id_)
min_ = self.calibration[motor].range_min
max_ = self.calibration[motor].range_max
drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode
if max_ == min_:
raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")
if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100:
val = -val if drive_mode else val
bounded_val = min(100.0, max(-100.0, val))
unnormalized_values[id_] = int(((bounded_val + 100) / 200) * (max_ - min_) + min_)
elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100:
val = 100 - val if drive_mode else val
bounded_val = min(100.0, max(0.0, val))
unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_)
elif self.motors[motor].norm_mode is MotorNormMode.DEGREES:
mid = (min_ + max_) / 2
max_res = self.model_resolution_table[self._id_to_model(id_)] - 1
unnormalized_values[id_] = int((val * max_res / 360) + mid)
else:
raise NotImplementedError
return unnormalized_values
return position_degrees, velocity_deg_per_sec, torque, t_mos, t_rotor
def read(
self,
@@ -404,69 +379,207 @@ class DamiaoMotorsBus(MotorsBus):
normalize: bool = True,
num_retry: int = 0,
) -> Value:
"""Read a register from a motor.
Args:
data_name (str): Control-table key (e.g. `"Present_Position"`).
motor (str): Motor name.
normalize (bool, optional): When `True` (default) scale the value to a user-friendly range as
defined by the calibration.
num_retry (int, optional): Retry attempts. Defaults to `0`.
Returns:
Value: Raw or normalised value depending on *normalize*.
"""
"""Read a value from a single motor. Positions are always in degrees."""
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
)
raise DeviceNotConnectedError(f"{self} is not connected.")
id_ = self.motors[motor].id
model = self.motors[motor].model
addr, length = get_address(self.model_ctrl_table, model, data_name)
# Refresh motor to get latest state
self._refresh_motor(motor)
err_msg = f"Failed to read '{data_name}' on {id_=} after {num_retry + 1} tries."
value, _, _ = self._read(addr, length, id_, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
# Read response
msg = self._recv_motor_response()
if msg is None:
raise ConnectionError(f"No response from motor {motor}")
id_value = self._decode_sign(data_name, {id_: value})
motor_type = self._motor_types.get(motor, MotorType.DM4310)
position_degrees, velocity_deg_per_sec, torque, t_mos, t_rotor = self._decode_motor_state(msg.data, motor_type)
if normalize and data_name in self.normalized_data:
id_value = self._normalize(id_value)
# Return requested data (already in degrees for position/velocity)
if data_name == "Present_Position":
value = position_degrees
elif data_name == "Present_Velocity":
value = velocity_deg_per_sec
elif data_name == "Present_Torque":
value = torque
elif data_name == "Temperature_MOS":
value = t_mos
elif data_name == "Temperature_Rotor":
value = t_rotor
else:
raise ValueError(f"Unknown data_name: {data_name}")
return id_value[id_]
# For Damiao, positions are always in degrees, no normalization needed
# We keep the normalize parameter for compatibility but don't use it
return value
def write(
self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0
self,
data_name: str,
motor: str,
value: Value,
*,
normalize: bool = True,
num_retry: int = 0,
) -> None:
"""Write a value to a single motor's register.
Contrary to :pymeth:`sync_write`, this expects a response status packet emitted by the motor, which
provides a guarantee that the value was written to the register successfully. In consequence, it is
slower than :pymeth:`sync_write` but it is more reliable. It should typically be used when configuring
motors.
Args:
data_name (str): Register name.
motor (str): Motor name.
value (Value): Value to write. If *normalize* is `True` the value is first converted to raw
units.
normalize (bool, optional): Enable or disable normalisation. Defaults to `True`.
num_retry (int, optional): Retry attempts. Defaults to `0`.
"""
"""Write a value to a single motor. Positions are always in degrees."""
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
)
raise DeviceNotConnectedError(f"{self} is not connected.")
id_ = self.motors[motor].id
model = self.motors[motor].model
addr, length = get_address(self.model_ctrl_table, model, data_name)
# Value is expected to be in degrees for positions
if data_name == "Goal_Position":
# Use MIT control with position in degrees
self._mit_control(motor, 10.0, 0.5, value, 0, 0)
else:
raise ValueError(f"Writing {data_name} not supported in MIT mode")
if normalize and data_name in self.normalized_data:
value = self._unnormalize({id_: value})[id_]
def sync_read(
self,
data_name: str,
motors: str | list[str] | None = None,
*,
normalize: bool = True,
num_retry: int = 0,
) -> Dict[str, Value]:
"""Read the same value from multiple motors simultaneously."""
motors = self._get_motors_list(motors)
result = {}
value = self._encode_sign(data_name, {id_: value})[id_]
for motor in motors:
try:
value = self.read(data_name, motor, normalize=normalize, num_retry=num_retry)
result[motor] = value
except Exception as e:
logger.warning(f"Failed to read {data_name} from {motor}: {e}")
result[motor] = 0.0
err_msg = f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
self._write(addr, length, id_, value, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
return result
def sync_write(
self,
data_name: str,
values: Dict[str, Value],
*,
normalize: bool = True,
num_retry: int = 0,
) -> None:
"""Write different values to multiple motors simultaneously. Positions are always in degrees."""
if data_name == "Goal_Position":
# Use MIT control for position commands (values in degrees)
for motor, value_degrees in values.items():
# Use reasonable default gains for position control
self._mit_control(motor, 10.0, 0.5, value_degrees, 0, 0)
else:
# Fall back to individual writes
for motor, value in values.items():
self.write(data_name, motor, value, normalize=normalize, num_retry=num_retry)
def read_calibration(self) -> dict[str, MotorCalibration]:
"""Read calibration data from motors."""
# Damiao motors don't store calibration internally
# Return existing calibration or empty dict
return self.calibration if self.calibration else {}
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
"""Write calibration data to motors."""
# Damiao motors don't store calibration internally
# Just cache it in memory
if cache:
self.calibration = calibration_dict
def record_ranges_of_motion(
self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
"""
Interactively record the min/max values of each motor in degrees.
Move the joints by hand (with torque disabled) while the method streams live positions.
Press Enter to finish.
"""
if motors is None:
motors = list(self.motors.keys())
elif isinstance(motors, (str, int)):
motors = [motors]
# Disable torque for manual movement
self.disable_torque(motors)
time.sleep(0.1)
# Get initial positions (already in degrees)
start_positions = self.sync_read("Present_Position", motors, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
print("\nMove joints through their full range of motion. Press ENTER when done.")
user_pressed_enter = False
while not user_pressed_enter:
positions = self.sync_read("Present_Position", motors, normalize=False)
for motor in motors:
if motor in positions:
mins[motor] = min(positions[motor], mins.get(motor, positions[motor]))
maxes[motor] = max(positions[motor], maxes.get(motor, positions[motor]))
if display_values:
print("\n" + "=" * 50)
print(f"{'MOTOR':<20} | {'MIN (deg)':>12} | {'POS (deg)':>12} | {'MAX (deg)':>12}")
print("-" * 50)
for motor in motors:
if motor in positions:
print(f"{motor:<20} | {mins[motor]:>12.1f} | {positions[motor]:>12.1f} | {maxes[motor]:>12.1f}")
if enter_pressed():
user_pressed_enter = True
if display_values and not user_pressed_enter:
# Move cursor up to overwrite the previous output
move_cursor_up(len(motors) + 4)
time.sleep(0.05)
# Re-enable torque
self.enable_torque(motors)
# Validate ranges
for motor in motors:
if motor in mins and motor in maxes:
if abs(maxes[motor] - mins[motor]) < 5.0: # At least 5 degrees of range
raise ValueError(f"Motor {motor} has insufficient range of motion (< 5 degrees)")
return mins, maxes
def _get_motors_list(self, motors: str | list[str] | None) -> list[str]:
"""Convert motor specification to list of motor names."""
if motors is None:
return list(self.motors.keys())
elif isinstance(motors, str):
return [motors]
elif isinstance(motors, list):
return motors
else:
raise TypeError(f"Invalid motors type: {type(motors)}")
def _get_motor_id(self, motor: NameOrID) -> int:
"""Get CAN ID for a motor."""
if isinstance(motor, str):
if motor in self.motors:
return self.motors[motor].id
else:
raise ValueError(f"Unknown motor: {motor}")
else:
return motor
def _get_motor_name(self, motor: NameOrID) -> str:
"""Get motor name from name or ID."""
if isinstance(motor, str):
return motor
else:
for name, m in self.motors.items():
if m.id == motor:
return name
raise ValueError(f"Unknown motor ID: {motor}")
@cached_property
def is_calibrated(self) -> bool:
"""Check if motors are calibrated."""
return bool(self.calibration)
+209
View File
@@ -0,0 +1,209 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Configuration tables for Damiao motors."""
from enum import IntEnum
from typing import Dict, List, Tuple
# Motor type definitions
class MotorType(IntEnum):
DM3507 = 0
DM4310 = 1
DM4310_48V = 2
DM4340 = 3
DM4340_48V = 4
DM6006 = 5
DM8006 = 6
DM8009 = 7
DM10010L = 8
DM10010 = 9
DMH3510 = 10
DMH6215 = 11
DMG6220 = 12
# Control modes
class ControlMode(IntEnum):
MIT = 1
POS_VEL = 2
VEL = 3
TORQUE_POS = 4
# Motor variable IDs (RID)
class MotorVariable(IntEnum):
UV_VALUE = 0
KT_VALUE = 1
OT_VALUE = 2
OC_VALUE = 3
ACC = 4
DEC = 5
MAX_SPD = 6
MST_ID = 7
ESC_ID = 8
TIMEOUT = 9
CTRL_MODE = 10
DAMP = 11
INERTIA = 12
HW_VER = 13
SW_VER = 14
SN = 15
NPP = 16
RS = 17
LS = 18
FLUX = 19
GR = 20
PMAX = 21
VMAX = 22
TMAX = 23
I_BW = 24
KP_ASR = 25
KI_ASR = 26
KP_APR = 27
KI_APR = 28
OV_VALUE = 29
GREF = 30
DETA = 31
V_BW = 32
IQ_C1 = 33
VL_C1 = 34
CAN_BR = 35
SUB_VER = 36
U_OFF = 50
V_OFF = 51
K1 = 52
K2 = 53
M_OFF = 54
DIR = 55
P_M = 80
XOUT = 81
# Motor limit parameters [PMAX, VMAX, TMAX]
# PMAX: Maximum position (rad)
# VMAX: Maximum velocity (rad/s)
# TMAX: Maximum torque (N·m)
MOTOR_LIMIT_PARAMS = {
MotorType.DM3507: (12.5, 30, 10),
MotorType.DM4310: (12.5, 30, 10),
MotorType.DM4310_48V: (12.5, 50, 10),
MotorType.DM4340: (12.5, 8, 28),
MotorType.DM4340_48V: (12.5, 10, 28),
MotorType.DM6006: (12.5, 45, 20),
MotorType.DM8006: (12.5, 45, 40),
MotorType.DM8009: (12.5, 45, 54),
MotorType.DM10010L: (12.5, 25, 200),
MotorType.DM10010: (12.5, 20, 200),
MotorType.DMH3510: (12.5, 280, 1),
MotorType.DMH6215: (12.5, 45, 10),
MotorType.DMG6220: (12.5, 45, 10),
}
# Motor model names
MODEL_NAMES = {
MotorType.DM3507: "dm3507",
MotorType.DM4310: "dm4310",
MotorType.DM4310_48V: "dm4310_48v",
MotorType.DM4340: "dm4340",
MotorType.DM4340_48V: "dm4340_48v",
MotorType.DM6006: "dm6006",
MotorType.DM8006: "dm8006",
MotorType.DM8009: "dm8009",
MotorType.DM10010L: "dm10010l",
MotorType.DM10010: "dm10010",
MotorType.DMH3510: "dmh3510",
MotorType.DMH6215: "dmh6215",
MotorType.DMG6220: "dmg6220",
}
# Motor resolution table (encoder counts per revolution)
MODEL_RESOLUTION = {
"dm3507": 65536,
"dm4310": 65536,
"dm4310_48v": 65536,
"dm4340": 65536,
"dm4340_48v": 65536,
"dm6006": 65536,
"dm8006": 65536,
"dm8009": 65536,
"dm10010l": 65536,
"dm10010": 65536,
"dmh3510": 65536,
"dmh6215": 65536,
"dmg6220": 65536,
}
# CAN baudrates supported by Damiao motors
AVAILABLE_BAUDRATES = [
125000, # 0: 125 kbps
200000, # 1: 200 kbps
250000, # 2: 250 kbps
500000, # 3: 500 kbps
1000000, # 4: 1 mbps (default for OpenArms)
2000000, # 5: 2 mbps
2500000, # 6: 2.5 mbps
3200000, # 7: 3.2 mbps
4000000, # 8: 4 mbps
5000000, # 9: 5 mbps
]
DEFAULT_BAUDRATE = 1000000 # 1 Mbps is standard for OpenArms
# Default timeout in milliseconds
DEFAULT_TIMEOUT_MS = 1000
# Data that should be normalized
NORMALIZED_DATA = ["Present_Position", "Goal_Position"]
# OpenArms specific configurations
# Based on: https://docs.openarm.dev/software/setup/configure-test
# OpenArms has 7 DOF per arm (14 total for dual arm)
OPENARMS_ARM_MOTOR_IDS = {
"joint_1": {"send": 0x01, "recv": 0x11}, # J1 - Shoulder pan
"joint_2": {"send": 0x02, "recv": 0x12}, # J2 - Shoulder lift
"joint_3": {"send": 0x03, "recv": 0x13}, # J3 - Elbow flex
"joint_4": {"send": 0x04, "recv": 0x14}, # J4 - Wrist flex
"joint_5": {"send": 0x05, "recv": 0x15}, # J5 - Wrist roll
"joint_6": {"send": 0x06, "recv": 0x16}, # J6 - Wrist pitch
"joint_7": {"send": 0x07, "recv": 0x17}, # J7 - Wrist rotation
}
OPENARMS_GRIPPER_MOTOR_IDS = {
"gripper": {"send": 0x08, "recv": 0x18}, # J8 - Gripper
}
# Default motor types for OpenArms
OPENARMS_DEFAULT_MOTOR_TYPES = {
"joint_1": MotorType.DM8009, # Shoulder pan - high torque
"joint_2": MotorType.DM8009, # Shoulder lift - high torque
"joint_3": MotorType.DM4340, # Shoulder rotation
"joint_4": MotorType.DM4340, # Elbow flex
"joint_5": MotorType.DM4310, # Wrist roll
"joint_6": MotorType.DM4310, # Wrist pitch
"joint_7": MotorType.DM4310, # Wrist rotation
"gripper": MotorType.DM4310, # Gripper
}
# MIT control parameter ranges
MIT_KP_RANGE = (0.0, 500.0)
MIT_KD_RANGE = (0.0, 5.0)
# CAN frame command IDs
CAN_CMD_ENABLE = 0xFC
CAN_CMD_DISABLE = 0xFD
CAN_CMD_SET_ZERO = 0xFE
CAN_CMD_REFRESH = 0xCC
CAN_CMD_QUERY_PARAM = 0x33
CAN_CMD_WRITE_PARAM = 0x55
CAN_CMD_SAVE_PARAM = 0xAA
# CAN ID for parameter operations
CAN_PARAM_ID = 0x7FF
+2 -2
View File
@@ -24,7 +24,7 @@ from enum import Enum
from lerobot.motors.encoding_utils import decode_twos_complement, encode_twos_complement
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value, get_address
from ..motors_bus import Motor, MotorCalibration, NameOrID, SerialMotorsBus, Value, get_address
from .tables import (
AVAILABLE_BAUDRATES,
MODEL_BAUDRATE_TABLE,
@@ -100,7 +100,7 @@ def _split_into_byte_chunks(value: int, length: int) -> list[int]:
return data
class DynamixelMotorsBus(MotorsBus):
class DynamixelMotorsBus(SerialMotorsBus):
"""
The Dynamixel implementation for a MotorsBus. It relies on the python dynamixel sdk to communicate with
the motors. For more info, see the Dynamixel SDK Documentation:
+2 -2
View File
@@ -19,7 +19,7 @@ from pprint import pformat
from lerobot.motors.encoding_utils import decode_sign_magnitude, encode_sign_magnitude
from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value, get_address
from ..motors_bus import Motor, MotorCalibration, NameOrID, SerialMotorsBus, Value, get_address
from .tables import (
FIRMWARE_MAJOR_VERSION,
FIRMWARE_MINOR_VERSION,
@@ -96,7 +96,7 @@ def patch_setPacketTimeout(self, packet_length): # noqa: N802
self.packet_timeout = (self.tx_time_per_byte * packet_length) + (self.tx_time_per_byte * 3.0) + 50
class FeetechMotorsBus(MotorsBus):
class FeetechMotorsBus(SerialMotorsBus):
"""
The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the
python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk.
+96 -4
View File
@@ -19,6 +19,8 @@
# TODO(aliberts): Add block noqa when feature below is available
# https://github.com/astral-sh/ruff/issues/3711
from __future__ import annotations
import abc
import logging
from contextlib import contextmanager
@@ -41,6 +43,92 @@ Value: TypeAlias = int | float
logger = logging.getLogger(__name__)
class MotorsBusBase(abc.ABC):
"""
Base class for all motor bus implementations.
This is a minimal interface that all motor buses must implement, regardless of their
communication protocol (serial, CAN, etc.).
"""
def __init__(
self,
port: str,
motors: dict[str, Motor],
calibration: dict[str, MotorCalibration] | None = None,
):
self.port = port
self.motors = motors
self.calibration = calibration if calibration else {}
@abc.abstractmethod
def connect(self, handshake: bool = True) -> None:
"""Establish connection to the motors."""
pass
@abc.abstractmethod
def disconnect(self, disable_torque: bool = True) -> None:
"""Disconnect from the motors."""
pass
@property
@abc.abstractmethod
def is_connected(self) -> bool:
"""Check if connected to the motors."""
pass
@abc.abstractmethod
def read(self, data_name: str, motor: str, *, normalize: bool = True, num_retry: int = 0) -> Value:
"""Read a value from a single motor."""
pass
@abc.abstractmethod
def write(
self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0
) -> None:
"""Write a value to a single motor."""
pass
@abc.abstractmethod
def sync_read(
self, data_name: str, motors: str | list[str] | None = None, *, normalize: bool = True
) -> dict[str, Value]:
"""Read a value from multiple motors."""
pass
@abc.abstractmethod
def sync_write(
self,
data_name: str,
values: Value | dict[str, Value],
motors: str | list[str] | None = None,
*,
normalize: bool = True,
) -> None:
"""Write values to multiple motors."""
pass
@abc.abstractmethod
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
"""Enable torque on selected motors."""
pass
@abc.abstractmethod
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
"""Disable torque on selected motors."""
pass
@abc.abstractmethod
def read_calibration(self) -> dict[str, MotorCalibration]:
"""Read calibration parameters from the motors."""
pass
@abc.abstractmethod
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
"""Write calibration parameters to the motors."""
pass
def get_ctrl_table(model_ctrl_table: dict[str, dict], model: str) -> dict[str, tuple[int, int]]:
ctrl_table = model_ctrl_table.get(model)
if ctrl_table is None:
@@ -203,15 +291,15 @@ class GroupSyncWrite(Protocol):
def txPacket(self): ...
class MotorsBus(abc.ABC):
class SerialMotorsBus(MotorsBusBase):
"""
A MotorsBus allows to efficiently read and write to the attached motors.
A SerialMotorsBus allows to efficiently read and write to motors connected via serial communication.
It represents several motors daisy-chained together and connected through a serial port.
There are currently two implementations of this abstract class:
There are currently two implementations of this class:
- DynamixelMotorsBus
- FeetechMotorsBus
Note: This class may evolve in the future should we add support for other types of bus.
This class is specifically for serial-based motor protocols (Dynamixel, Feetech, etc.).
A MotorsBus subclass instance requires a port (e.g. `FeetechMotorsBus(port="/dev/tty.usbmodem575E0031751"`)).
To find the port, you can run our utility script:
@@ -1212,3 +1300,7 @@ class MotorsBus(abc.ABC):
for id_, value in ids_values.items():
data = self._serialize_data(value, length)
self.sync_writer.addParam(id_, data)
# Backward compatibility alias
MotorsBus = SerialMotorsBus
+20
View File
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_openarms_follower import OpenArmsFollowerConfig
from .openarms_follower import OpenArmsFollower
__all__ = ["OpenArmsFollower", "OpenArmsFollowerConfig"]
@@ -0,0 +1,73 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from lerobot.cameras import CameraConfig
from lerobot.motors.damiao.tables import MotorType
from ..config import RobotConfig
@RobotConfig.register_subclass("openarms_follower")
@dataclass
class OpenArmsFollowerConfig(RobotConfig):
"""Configuration for the OpenArms follower robot with Damiao motors."""
# CAN interface to connect to
# Linux: "can0", "can1", etc.
# macOS: "/dev/cu.usbmodem*" (serial device)
port: str = "can0"
# CAN interface type: "socketcan" (Linux), "slcan" (macOS/serial), or "auto" (auto-detect)
can_interface: str = "auto"
# Whether to disable torque when disconnecting
disable_torque_on_disconnect: bool = True
# Safety limit for relative target positions
# Set to a positive scalar for all motors, or a dict mapping motor names to limits
max_relative_target: Optional[float | Dict[str, float]] = None
# Camera configurations
cameras: Dict[str, CameraConfig] = field(default_factory=dict)
# Motor configuration for OpenArms (7 DOF per arm)
# Maps motor names to (send_can_id, recv_can_id, motor_type)
# Based on: https://docs.openarm.dev/software/setup/configure-test
# OpenArms uses 4 types of motors:
# - DM8009 (DM-J8009P-2EC) for shoulders (high torque)
# - DM4340P and DM4340 for shoulder rotation and elbow
# - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper
motor_config: Dict[str, tuple[int, int, str]] = field(default_factory=lambda: {
"joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009)
"joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009)
"joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340)
"joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340)
"joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310)
"joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310)
"joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310)
"gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
})
# MIT control parameters for position control
position_kp: float = 10.0 # Position gain
position_kd: float = 0.5 # Velocity damping
# Calibration parameters
calibration_mode: str = "manual" # "manual" or "auto"
zero_position_on_connect: bool = False # Set zero position on connect
@@ -0,0 +1,393 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any, Dict
import numpy as np
import pinocchio as pin
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
from lerobot.motors.damiao.tables import MotorType
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_openarms_follower import OpenArmsFollowerConfig
logger = logging.getLogger(__name__)
class OpenArmsFollower(Robot):
"""
OpenArms Follower Robot which uses CAN bus communication to control 7 DOF arm with a gripper.
The arm uses Damiao motors in MIT control mode.
"""
config_class = OpenArmsFollowerConfig
name = "openarms_follower"
def __init__(self, config: OpenArmsFollowerConfig):
super().__init__(config)
self.config = config
norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors
motors = {}
# Right arm
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
prefixed_name = f"right_{motor_name}"
motor = Motor(send_id, motor_type_str, norm_mode_body)
motor.recv_id = recv_id
motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_"))
motors[prefixed_name] = motor
# Left arm (offset IDs by 8)
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
prefixed_name = f"left_{motor_name}"
motor = Motor(send_id + 0x08, motor_type_str, norm_mode_body)
motor.recv_id = recv_id + 0x08
motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_"))
motors[prefixed_name] = motor
# Initialize the Damiao motors bus
self.bus = DamiaoMotorsBus(
port=self.config.port,
motors=motors,
calibration=self.calibration,
can_interface=self.config.can_interface,
)
# Initialize cameras
self.cameras = make_cameras_from_configs(config.cameras)
# Initialize Pinocchio robot model for dynamics (optional)
self.pin_robot = None
try:
# Try to load URDF if available
# TODO: Add OpenArms URDF file to repository
self.pin_robot = pin.RobotWrapper.BuildFromURDF("urdf/openarms.urdf", "urdf")
logger.info("Loaded OpenArms URDF for dynamics computation")
except Exception as e:
logger.warning(f"Could not load URDF for dynamics: {e}. Gravity compensation will not be available.")
@property
def _motors_ft(self) -> Dict[str, type]:
"""Motor features for observation and action spaces."""
features = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float
features[f"{motor}.torque"] = float
return features
@property
def _cameras_ft(self) -> Dict[str, tuple]:
"""Camera features for observation space."""
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3)
for cam in self.cameras
}
@cached_property
def observation_features(self) -> Dict[str, type | tuple]:
"""Combined observation features from motors and cameras."""
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> Dict[str, type]:
"""Action features (motor positions only)."""
return self._motors_ft
@property
def is_connected(self) -> bool:
"""Check if robot is connected."""
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the robot and optionally calibrate.
We assume that at connection time, the arm is in a safe rest position,
and torque can be safely disabled to run calibration if needed.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
self.bus.connect()
# Run calibration if needed
if not self.is_calibrated and calibrate:
logger.info(
"No calibration found or calibration mismatch. Running calibration..."
)
self.calibrate()
# Connect cameras
for cam in self.cameras.values():
cam.connect()
# Configure motors
self.configure()
# Optionally set zero position
if self.config.zero_position_on_connect:
logger.info("Setting current position as zero...")
self.bus.set_zero_position()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
"""Check if robot is calibrated."""
return self.bus.is_calibrated
def calibrate(self) -> None:
"""
Run calibration procedure for OpenArms robot.
The calibration procedure:
1. Disable torque
2. Ask user to position arm in hanging position with gripper closed
3. Set this as zero position
4. Record range of motion for each joint
5. Save calibration
"""
if self.calibration:
# Ask user whether to use existing calibration
user_input = input(
f"Press ENTER to use existing calibration for {self.id}, "
f"or type 'c' and press ENTER to run new calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Using existing calibration for {self.id}")
self.bus.write_calibration(self.calibration)
return
logger.info(f"\nRunning calibration for {self}")
# Disable torque for manual positioning
self.bus.disable_torque()
time.sleep(0.1)
# Step 1: Set zero position
input(
"\nCalibration Step 1: Zero Position\n"
"Position the arm in the following configuration:\n"
" - Arm hanging straight down\n"
" - Gripper closed\n"
"Press ENTER when ready..."
)
# Set current position as zero for all motors
self.bus.set_zero_position()
logger.info("Zero position set.")
# Step 2: Record range of motion
print(
"\nCalibration Step 2: Range of Motion\n"
"Move each joint through its full range of motion.\n"
"The system will record min/max positions.\n"
"Press ENTER when done..."
)
# Record ranges
range_mins, range_maxes = self.bus.record_ranges_of_motion()
# Create calibration data (ranges are already in degrees)
self.calibration = {}
for motor_name, motor in self.bus.motors.items():
self.calibration[motor_name] = MotorCalibration(
id=motor.id,
drive_mode=0, # Normal direction
homing_offset=0, # Already set via set_zero_position
range_min=range_mins.get(motor_name, -180.0), # Default -180 degrees
range_max=range_maxes.get(motor_name, 180.0), # Default +180 degrees
)
# Special handling for gripper range
if "gripper" in self.calibration:
gripper_cal = self.calibration["gripper"]
gripper_range = abs(gripper_cal.range_max - gripper_cal.range_min)
if gripper_range < 5.0: # If gripper wasn't moved much (less than 5 degrees)
# Set default gripper range in degrees
gripper_cal.range_min = 0.0
gripper_cal.range_max = 90.0 # 90 degrees for full gripper motion
# Write calibration to motors and save to file
self.bus.write_calibration(self.calibration)
self._save_calibration()
print(f"\nCalibration complete and saved to {self.calibration_fpath}")
# Re-enable torque
self.bus.enable_torque()
def configure(self) -> None:
"""Configure motors with appropriate settings."""
with self.bus.torque_disabled():
# Configure all motors
self.bus.configure_motors()
# Set specific parameters for gripper if present
if "gripper" in self.bus.motors:
# Gripper uses lower gains to avoid damage
# These will be applied during MIT control commands
pass # Parameters are set during control commands
def setup_motors(self) -> None:
raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.")
def get_observation(self) -> Dict[str, Any]:
"""Get current observation from robot including position, velocity, and torque."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read motor positions, velocities, and torques
start = time.perf_counter()
positions = self.bus.sync_read("Present_Position")
velocities = self.bus.sync_read("Present_Velocity")
torques = self.bus.sync_read("Present_Torque")
for motor in self.bus.motors:
obs_dict[f"{motor}.pos"] = positions.get(motor, 0.0)
obs_dict[f"{motor}.vel"] = velocities.get(motor, 0.0)
obs_dict[f"{motor}.torque"] = torques.get(motor, 0.0)
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""
Send action command to robot.
The action magnitude may be clipped based on safety limits.
Args:
action: Dictionary with motor positions
Returns:
The action actually sent (potentially clipped)
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Extract motor positions from action
goal_pos = {
key.removesuffix(".pos"): val
for key, val in action.items()
if key.endswith(".pos")
}
# Apply safety limits if configured
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {
key: (g_pos, present_pos[key])
for key, g_pos in goal_pos.items()
}
goal_pos = ensure_safe_goal_position(
goal_present_pos,
self.config.max_relative_target
)
# Prepare MIT control commands for each motor
for motor_name, position_degrees in goal_pos.items():
# Use different gains for gripper
if motor_name == "gripper":
kp = self.config.position_kp * 0.5 # Lower gain for gripper
kd = self.config.position_kd * 0.5
else:
kp = self.config.position_kp
kd = self.config.position_kd
# Send MIT control command (position is in degrees)
self.bus._mit_control(
motor_name,
kp=kp,
kd=kd,
position_degrees=position_degrees,
velocity_deg_per_sec=0.0,
torque=0.0
)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
"""Disconnect from robot."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Disconnect from CAN bus
self.bus.disconnect(self.config.disable_torque_on_disconnect)
# Disconnect cameras
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")
def _deg_to_rad(self, deg: Dict[str, float | int]) -> Dict[str, float]:
"""Convert degrees to radians for all motors."""
return {m: np.deg2rad(float(v)) for m, v in deg.items()}
def _gravity_from_q(self, q_rad: Dict[str, float]) -> Dict[str, float]:
"""
Compute g(q) [N·m] for all joints in the robot.
The order of joints in the URDF matches self.bus.motors.
Args:
q_rad: Dictionary mapping motor names to positions in radians
Returns:
Dictionary mapping motor names to gravity torques in N·m
Raises:
RuntimeError: If URDF model is not loaded
"""
if self.pin_robot is None:
raise RuntimeError(
"Cannot compute gravity: URDF model not loaded. "
"Ensure urdf/openarms.urdf exists and is valid."
)
# Build position vector in the order of motors
q = np.zeros(self.pin_robot.model.nq)
for i, motor_name in enumerate(self.bus.motors):
q[i] = q_rad[motor_name]
# Compute generalized gravity vector
g = pin.computeGeneralizedGravity(self.pin_robot.model, self.pin_robot.data, q)
# Map back to motor names
return {motor_name: float(g[i]) for i, motor_name in enumerate(self.bus.motors)}
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_openarms_leader import OpenArmsLeaderConfig
from .openarms_leader import OpenArmsLeader
__all__ = ["OpenArmsLeader", "OpenArmsLeaderConfig"]
@@ -0,0 +1,56 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from typing import Dict
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("openarms_leader")
@dataclass
class OpenArmsLeaderConfig(TeleoperatorConfig):
"""Configuration for the OpenArms leader/teleoperator with Damiao motors."""
# CAN interface to connect to
# Linux: "can0", "can1", etc.
# macOS: "/dev/cu.usbmodem*" (serial device)
port: str = "can0"
# CAN interface type: "socketcan" (Linux), "slcan" (macOS/serial), or "auto" (auto-detect)
can_interface: str = "auto"
# Motor configuration for OpenArms (7 DOF per arm)
# Maps motor names to (send_can_id, recv_can_id, motor_type)
# Based on: https://docs.openarm.dev/software/setup/configure-test
# OpenArms uses 4 types of motors:
# - DM8009 (DM-J8009P-2EC) for shoulders (high torque)
# - DM4340P and DM4340 for shoulder rotation and elbow
# - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper
motor_config: Dict[str, tuple[int, int, str]] = field(default_factory=lambda: {
"joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009)
"joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009)
"joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340)
"joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340)
"joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310)
"joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310)
"joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310)
"gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
})
# Torque mode settings for manual control
# When enabled, motors have torque disabled for manual movement
manual_control: bool = True
@@ -0,0 +1,270 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Dict
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
from lerobot.motors.damiao.tables import MotorType
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .config_openarms_leader import OpenArmsLeaderConfig
logger = logging.getLogger(__name__)
class OpenArmsLeader(Teleoperator):
"""
OpenArms Leader/Teleoperator Arm with Damiao motors.
This teleoperator uses CAN bus communication to read positions from
Damiao motors that are manually moved (torque disabled).
"""
config_class = OpenArmsLeaderConfig
name = "openarms_leader"
def __init__(self, config: OpenArmsLeaderConfig):
super().__init__(config)
self.config = config
norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors
motors = {}
# Right arm (original IDs)
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
prefixed_name = f"right_{motor_name}"
motor = Motor(send_id, motor_type_str, norm_mode_body)
motor.recv_id = recv_id
motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_"))
motors[prefixed_name] = motor
# Left arm (offset IDs by 8)
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
prefixed_name = f"left_{motor_name}"
motor = Motor(send_id + 0x08, motor_type_str, norm_mode_body)
motor.recv_id = recv_id + 0x08
motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_"))
motors[prefixed_name] = motor
# Initialize the Damiao motors bus
self.bus = DamiaoMotorsBus(
port=self.config.port,
motors=motors,
calibration=self.calibration,
can_interface=self.config.can_interface,
)
@property
def action_features(self) -> Dict[str, type]:
"""Features produced by this teleoperator."""
features = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float
features[f"{motor}.torque"] = float
return features
@property
def feedback_features(self) -> Dict[str, type]:
"""Feedback features (not implemented for OpenArms)."""
return {}
@property
def is_connected(self) -> bool:
"""Check if teleoperator is connected."""
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the teleoperator.
For manual control, we disable torque after connecting so the
arm can be moved by hand.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
self.bus.connect()
# Run calibration if needed
if not self.is_calibrated and calibrate:
logger.info(
"No calibration found or calibration mismatch. Running calibration..."
)
self.calibrate()
# Configure for manual control
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
"""Check if teleoperator is calibrated."""
return self.bus.is_calibrated
def calibrate(self) -> None:
"""
Run calibration procedure for OpenArms leader.
The calibration procedure:
1. Disable torque (if not already disabled)
2. Ask user to position arm in zero position (hanging with gripper closed)
3. Set this as zero position
4. Record range of motion for each joint
5. Save calibration
"""
if self.calibration:
# Ask user whether to use existing calibration
user_input = input(
f"Press ENTER to use existing calibration for {self.id}, "
f"or type 'c' and press ENTER to run new calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Using existing calibration for {self.id}")
self.bus.write_calibration(self.calibration)
return
logger.info(f"\nRunning calibration for {self}")
# Ensure torque is disabled for manual positioning
self.bus.disable_torque()
time.sleep(0.1)
# Step 1: Set zero position
input(
"\nCalibration Step 1: Zero Position\n"
"Position the arm in the following configuration:\n"
" - Arm hanging straight down\n"
" - Gripper closed\n"
"Press ENTER when ready..."
)
# Set current position as zero for all motors
self.bus.set_zero_position()
logger.info("Zero position set.")
# Step 2: Record range of motion
print(
"\nCalibration Step 2: Range of Motion\n"
"Move each joint through its full range of motion.\n"
"The system will record min/max positions.\n"
"Press ENTER when done..."
)
# Record ranges
range_mins, range_maxes = self.bus.record_ranges_of_motion()
# Create calibration data (ranges are already in degrees)
self.calibration = {}
for motor_name, motor in self.bus.motors.items():
self.calibration[motor_name] = MotorCalibration(
id=motor.id,
drive_mode=0, # Normal direction
homing_offset=0, # Already set via set_zero_position
range_min=range_mins.get(motor_name, -180.0), # Default -180 degrees
range_max=range_maxes.get(motor_name, 180.0), # Default +180 degrees
)
# Special handling for gripper range
if "gripper" in self.calibration:
gripper_cal = self.calibration["gripper"]
gripper_range = abs(gripper_cal.range_max - gripper_cal.range_min)
if gripper_range < 5.0: # If gripper wasn't moved much (less than 5 degrees)
# Set default gripper range in degrees
gripper_cal.range_min = 0.0
gripper_cal.range_max = 90.0 # 90 degrees for full gripper motion
# Write calibration and save to file
self.bus.write_calibration(self.calibration)
self._save_calibration()
print(f"\nCalibration complete and saved to {self.calibration_fpath}")
def configure(self) -> None:
"""
Configure motors for manual teleoperation.
For manual control, we disable torque so the arm can be moved by hand.
"""
if self.config.manual_control:
# Disable torque for manual control
logger.info("Disabling torque for manual control...")
self.bus.disable_torque()
else:
# Configure motors normally
self.bus.configure_motors()
def setup_motors(self) -> None:
raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.")
def get_observation(self) -> Dict[str, Any]:
"""Get current observation from robot including position, velocity, and torque."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read motor positions, velocities, and torques
start = time.perf_counter()
positions = self.bus.sync_read("Present_Position")
velocities = self.bus.sync_read("Present_Velocity")
torques = self.bus.sync_read("Present_Torque")
for motor in self.bus.motors:
obs_dict[f"{motor}.pos"] = positions.get(motor, 0.0)
obs_dict[f"{motor}.vel"] = velocities.get(motor, 0.0)
obs_dict[f"{motor}.torque"] = torques.get(motor, 0.0)
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_feedback(self, feedback: Dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArms leader.")
def disconnect(self) -> None:
"""Disconnect from teleoperator."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# For manual control, ensure torque is disabled before disconnecting
if self.config.manual_control:
try:
self.bus.disable_torque()
except Exception as e:
logger.warning(f"Failed to disable torque during disconnect: {e}")
# Disconnect from CAN bus
self.bus.disconnect(disable_torque=False) # Already disabled above if needed
logger.info(f"{self} disconnected.")