From 30c10c1c6e7d9172e6b0bce69fe4b5a98b42e829 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Mon, 27 Oct 2025 16:40:05 +0100 Subject: [PATCH] Add damiao motors and open arm robot --- docs/source/openarms.mdx | 541 +++++++++++ pyproject.toml | 3 + src/lerobot/motors/__init__.py | 9 +- src/lerobot/motors/damiao/damiao.py | 889 ++++++++++-------- src/lerobot/motors/damiao/tables.py | 209 ++++ src/lerobot/motors/dynamixel/dynamixel.py | 4 +- src/lerobot/motors/feetech/feetech.py | 4 +- src/lerobot/motors/motors_bus.py | 100 +- src/lerobot/robots/openarms/__init__.py | 20 + .../openarms/config_openarms_follower.py | 73 ++ .../robots/openarms/openarms_follower.py | 393 ++++++++ .../teleoperators/openarms/__init__.py | 20 + .../openarms/config_openarms_leader.py | 56 ++ .../teleoperators/openarms/openarms_leader.py | 270 ++++++ 14 files changed, 2194 insertions(+), 397 deletions(-) create mode 100644 docs/source/openarms.mdx create mode 100644 src/lerobot/robots/openarms/__init__.py create mode 100644 src/lerobot/robots/openarms/config_openarms_follower.py create mode 100644 src/lerobot/robots/openarms/openarms_follower.py create mode 100644 src/lerobot/teleoperators/openarms/__init__.py create mode 100644 src/lerobot/teleoperators/openarms/config_openarms_leader.py create mode 100644 src/lerobot/teleoperators/openarms/openarms_leader.py diff --git a/docs/source/openarms.mdx b/docs/source/openarms.mdx new file mode 100644 index 000000000..04b3b6926 --- /dev/null +++ b/docs/source/openarms.mdx @@ -0,0 +1,541 @@ +# OpenArms Robot + +OpenArms is a 7 DOF robotic arm with a gripper, designed by [Enactic, Inc.](https://www.enactic.com/) It uses Damiao motors controlled via CAN bus communication and MIT control mode for smooth, precise motion. + +## Hardware Overview + +- **7 DOF per arm** (14 DOF total for dual arm setup) +- **1 gripper per arm** (2 grippers total) +- **Damiao motors** with 4 different types: + - **DM8009** (DM-J8009P-2EC) for shoulders (J1, J2) - high torque + - **DM4340** for shoulder rotation and elbow (J3, J4) + - **DM4310** (DM-J4310-2EC V1.1) for wrist (J5, J6, J7) and gripper (J8) +- **24V power supply** required +- **CAN interface device**: + - **Linux**: Any SocketCAN-compatible adapter + - **macOS**: CANable, PEAK PCAN-USB, or Kvaser USBcan +- Proper CAN wiring (CANH, CANL, 120Ω termination) + + +## Motor Configuration + +Each arm has the following motor configuration based on the [OpenArm setup guide](https://docs.openarm.dev/software/setup/): + +| Joint | Motor | Motor Type | Sender CAN ID | Receiver ID | Description | +|-------|-------|------------|---------------|-------------|-------------| +| J1 | joint_1 | DM8009 | 0x01 | 0x11 | Shoulder pan | +| J2 | joint_2 | DM8009 | 0x02 | 0x12 | Shoulder lift | +| J3 | joint_3 | DM4340 | 0x03 | 0x13 | Shoulder rotation | +| J4 | joint_4 | DM4340 | 0x04 | 0x14 | Elbow flex | +| J5 | joint_5 | DM4310 | 0x05 | 0x15 | Wrist roll | +| J6 | joint_6 | DM4310 | 0x06 | 0x16 | Wrist pitch | +| J7 | joint_7 | DM4310 | 0x07 | 0x17 | Wrist rotation | +| J8 | gripper | DM4310 | 0x08 | 0x18 | Gripper | + +For dual arm setups, the left arm uses IDs 0x09-0x10 for joints 1-8 with the same motor types. + +## Quick Start (macOS) + +If you're on macOS, here's the fastest way to get started: + +```bash +# 1. Install LeRobot with OpenArms dependencies +pip install -e ".[openarms]" + +# 2. Find your USB-CAN adapter +ls /dev/cu.usbmodem* + +# 3. Test communication +python3 -c " +import can +bus = can.interface.Bus(channel='/dev/cu.usbmodem00000000050C1', interface='slcan', bitrate=1000000) +print('✓ CAN interface connected') +bus.shutdown() +" +``` + +Then use this configuration: + +```python +from lerobot.robots.openarms import OpenArmsFollower +from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig + +config = OpenArmsFollowerConfig( + port="/dev/cu.usbmodem00000000050C1", # Your adapter + can_interface="auto", # Auto-detects slcan for /dev/* ports + is_dual_arm=True, +) + +robot = OpenArmsFollower(config) +robot.connect() +``` + +## Prerequisites + +### Software Requirements + +**Linux:** +- Ubuntu 22.04/24.04 (or any Linux with SocketCAN) +- Python 3.8+ +- `can-utils` and `iproute2` packages +- LeRobot with OpenArms dependencies + +```bash +# Install system dependencies +sudo apt install can-utils iproute2 + +# Install LeRobot with OpenArms support +pip install -e ".[openarms]" +``` + +**macOS:** +- macOS 12+ (Monterey or later) +- Python 3.8+ +- LeRobot with OpenArms dependencies + +```bash +# Install LeRobot with OpenArms support (includes python-can) +pip install -e ".[openarms]" +``` + +The `openarms` extra installs: +- `python-can>=4.2.0` - CAN bus communication library (supports SocketCAN on Linux and SLCAN on macOS) + +:::tip +If you've already installed LeRobot and want to add OpenArms support, run: +```bash +pip install -e ".[openarms]" +``` +::: + +## Setup Guide + +### Step 1: Motor ID Configuration + +**IMPORTANT**: Before using the robot, motors must be configured with the correct CAN IDs. + +Refer to the [OpenArm Motor ID Configuration Guide](https://docs.openarm.dev/software/setup/motor-id) for detailed instructions using the Damiao Debugging Tools on Windows. + +Key points: +- Each motor needs a unique **Sender CAN ID** (0x01-0x08 for first arm) +- Each motor needs a unique **Receiver/Master ID** (0x11-0x18 for first arm) +- Use the Damiao Debugging Tools to set these IDs +- For dual arm setups, use 0x09-0x10 for the second arm + +### Step 2: Setup CAN Interface + +Configure your CAN interface as described in the [OpenArm CAN Setup Guide](https://docs.openarm.dev/software/setup/can-setup): + +#### Linux (SocketCAN) + +```bash +# Find your CAN interface +ip link show + +# Setup CAN 2.0 at 1 Mbps (standard) +sudo ip link set can0 down +sudo ip link set can0 type can bitrate 1000000 +sudo ip link set can0 up + +# Verify configuration +ip link show can0 +``` + +#### macOS + +macOS doesn't have native SocketCAN support. + +**Use SLCAN (Serial Line CAN)** + +For USB-CAN adapters that support SLCAN protocol (like CANable): + +```bash +# Install python-can if not already installed +pip install python-can + +# The adapter will appear as a serial device +ls /dev/cu.usbmodem* + +# Use with python-can slcan interface +# Example: /dev/cu.usbmodem14201 +``` + +In your code, specify the slcan interface: + +```python +from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig + +config = OpenArmsFollowerConfig( + port="/dev/cu.usbmodem14201", # Your USB-CAN adapter + can_interface="slcan", # Will auto-detect if set to "auto" +) +``` + +### Step 3: Test Motor Communication + +**On Linux:** + +Test basic communication as described in the [OpenArm Motor Test Guide](https://docs.openarm.dev/software/setup/configure-test): + +```bash +# Terminal 1: Monitor CAN traffic +candump can0 + +# Terminal 2: Enable motor 1 +cansend can0 001#FFFFFFFFFFFFFFFC + +# Expected response on Terminal 1: +# can0 011 [8] XX XX XX XX XX XX XX XX + +# Disable motor 1 +cansend can0 001#FFFFFFFFFFFFFFFD +``` + +**On macOS:** + +Testing is done differently since you'll use serial-based adapters: + +```bash +# Find your USB-CAN adapter +ls /dev/cu.usbmodem* + +# Example output: /dev/cu.usbmodem00000000050C1 + +# Test with Python directly (can-utils don't work on macOS) +python3 -c " +import can +bus = can.interface.Bus(channel='/dev/cu.usbmodem00000000050C1', interface='slcan', bitrate=1000000) +msg = can.Message(arbitration_id=0x01, data=[0xFF]*7+[0xFC]) +bus.send(msg) +response = bus.recv(timeout=1.0) +if response: + print(f'✓ Motor responded: ID 0x{response.arbitration_id:02X}') +else: + print('✗ No response') +bus.shutdown() +" +``` + +## Usage + +### Basic Setup + +**On Linux:** + +```python +from lerobot.robots.openarms import OpenArmsFollower +from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig + +# Configure for dual arm setup +config = OpenArmsFollowerConfig( + port="can0", + can_interface="socketcan", # Or "auto" for auto-detection + id="openarms_dual", + is_dual_arm=True, +) + +robot = OpenArmsFollower(config) +robot.connect() +``` + +**On macOS:** + +```python +from lerobot.robots.openarms import OpenArmsFollower +from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig + +# Find your USB-CAN adapter first +# ls /dev/cu.usbmodem* + +config = OpenArmsFollowerConfig( + port="/dev/cu.usbmodem14201", # Your adapter's serial port + can_interface="slcan", # Or "auto" for auto-detection + id="openarms_dual", + is_dual_arm=True, +) + +robot = OpenArmsFollower(config) +robot.connect() +``` + +### Calibration + +On first use, you'll need to calibrate the robot: + +```python +robot.calibrate() +``` + +The calibration process will: +1. Disable torque on all motors +2. Ask you to position arms in **hanging position with grippers closed** +3. Set this as the zero position +4. Ask you to move each joint through its full range +5. Record min/max positions for each joint +6. Save calibration to file + +### Reading Observations + +The robot provides comprehensive state information: + +```python +observation = robot.get_observation() + +# Observation includes for each motor: +# - {motor_name}.pos: Position in degrees +# - {motor_name}.vel: Velocity in degrees/second +# - {motor_name}.torque: Motor torque +# - {camera_name}: Camera images (if configured) + +print(f"Right arm joint 1 position: {observation['right_joint_1.pos']:.1f}°") +print(f"Right arm joint 1 velocity: {observation['right_joint_1.vel']:.1f}°/s") +print(f"Right arm joint 1 torque: {observation['right_joint_1.torque']:.3f} N·m") +``` + +### Sending Actions + +```python +# Send target positions (in degrees) +action = { + "right_joint_1.pos": 45.0, + "right_joint_2.pos": -30.0, + # ... all joints + "right_gripper.pos": 45.0, # Half-closed +} + +actual_action = robot.send_action(action) +``` + +### Gripper Control + +```python +# Open gripper +robot.open_gripper(arm="right") + +# Close gripper +robot.close_gripper(arm="right") +``` + +## Safety Features + +### 1. Maximum Relative Target + +Limits how far a joint can move in a single command to prevent sudden movements: + +```python +config = OpenArmsFollowerConfig( + port="can0", + # Limit all joints to 10 degrees per command + max_relative_target=10.0, + + # Or set per-motor limits + max_relative_target={ + "right_joint_1": 15.0, # Slower moving joint + "right_joint_2": 10.0, + "right_gripper": 5.0, # Very slow gripper + } +) +``` + +**How it works**: If current position is 50° and you command 80°, with `max_relative_target=10.0`, the robot will only move to 60° in that step. + +### 2. Torque Limits + +Control maximum torque output, especially important for grippers and teleoperation: + +```python +config = OpenArmsFollowerConfig( + port="can0", + # Gripper torque limit (fraction of motor's max torque) + gripper_torque_limit=0.5, # 50% of max torque +) +``` + +Lower torque limits prevent damage when gripping delicate objects. + +### 3. MIT Control Gains + +Control responsiveness and stability via PID-like gains: + +```python +config = OpenArmsFollowerConfig( + port="can0", + position_kp=10.0, # Position gain (higher = more responsive) + position_kd=0.5, # Velocity damping (higher = more damped) +) +``` + +**Guidelines**: +- **For following (robot)**: Higher gains for responsiveness + - `position_kp=10.0`, `position_kd=0.5` +- **For teleoperation (leader)**: Lower gains or disable torque for manual movement + - `manual_control=True` (torque disabled) + +### 4. Velocity Limits + +Velocity limits are enforced by the Damiao motors based on motor type. For DM4310: +- Max velocity: 30 rad/s ≈ 1718°/s + +The motors will automatically limit velocity to safe values. + +## Teleoperation + +### Leader Arm Setup + +The leader arm is moved manually (torque disabled) to generate commands: + +```python +from lerobot.teleoperators.openarms import OpenArmsLeader +from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig + +config = OpenArmsLeaderConfig( + port="can1", # Separate CAN interface for leader + id="openarms_leader", + manual_control=True, # Torque disabled for manual movement + is_dual_arm=True, +) + +leader = OpenArmsLeader(config) +leader.connect() + +# Read current position as action +action = leader.get_action() +# action contains positions for all joints in degrees +``` + +### Safety Considerations for Teleoperation + +1. **Use separate CAN interfaces** for leader and follower to avoid conflicts +2. **Enable max_relative_target** on follower to smooth abrupt movements +3. **Lower torque limits** on follower to prevent damage from tracking errors +4. **Test with one arm** before enabling dual arm teleoperation +5. **Have emergency stop** ready (power switch or CAN disable) + +```python +# Recommended follower config for teleoperation +follower_config = OpenArmsFollowerConfig( + port="can0", + max_relative_target=5.0, # Small steps for smooth following + gripper_torque_limit=0.3, # Low torque for safety + position_kp=5.0, # Lower gains for gentler following + position_kd=0.3, +) +``` + +## Troubleshooting + +### Motors Not Responding + +**Linux:** +1. **Check power supply**: 24V with sufficient current +2. **Verify CAN interface**: `ip link show can0` should show "UP" +3. **Test with cansend**: Follow [motor test guide](https://docs.openarm.dev/software/setup/configure-test) +4. **Check motor IDs**: Use Damiao Debugging Tools to verify IDs +5. **Check termination**: 120Ω resistor should be enabled on CAN interface + +**macOS:** +1. **Check power supply**: 24V with sufficient current +2. **Find adapter**: `ls /dev/cu.usbmodem*` should show your device +3. **Test connection**: Use Python script above to test communication +4. **Check motor IDs**: Use Damiao Debugging Tools on Windows +5. **Verify drivers**: Ensure USB-CAN adapter drivers are installed +6. **Try different baudrate**: Some adapters default to different rates + +### macOS-Specific Issues + +**"No such interface" error:** +```python +# Try auto-detection +config.can_interface = "auto" + +# Or explicitly list available interfaces +import can +print(can.detect_available_configs()) +``` + +**Permission denied on `/dev/cu.*`:** +```bash +# Add user to dialout group (if applicable) +sudo dscl . -append /Groups/_dialout GroupMembership $USER + +# Or run with sudo (not recommended) +sudo python your_script.py +``` + +**Adapter not showing up:** +```bash +# Check USB devices +system_profiler SPUSBDataType + +# Reinstall python-can +pip install --upgrade --force-reinstall python-can +``` + +### Motor Shaking/Unstable + +- **Lower control gains**: Reduce `position_kp` and `position_kd` +- **Check calibration**: Re-run calibration procedure +- **Verify power**: Insufficient current can cause instability +- **Check mechanical**: Loose connections, binding, or damaged components + +### CAN Bus Errors + +**Linux:** +```bash +# Check for errors +ip -s link show can0 + +# Reset CAN interface +sudo ip link set can0 down +sudo ip link set can0 up +``` + +**macOS:** +```bash +# Reconnect USB adapter +# Unplug and replug the USB cable + +# Restart Python script +# The slcan interface auto-reconnects +``` + +### Position Drift + +- **Re-calibrate**: Run calibration procedure +- **Set zero position**: Use `robot.bus.set_zero_position()` with arm in known position +- **Check temperature**: Motors may drift when hot + +## Technical Details + +### Position Units + +All positions are in **degrees**: +- Motor internal representation: radians +- User API: degrees +- Automatic conversion handled by `DamiaoMotorsBus` + +### Control Mode + +OpenArms uses **MIT control mode** which allows simultaneous control of: +- Position (degrees) +- Velocity (degrees/second) +- Torque (N·m) +- Position gain (Kp) +- Velocity damping (Kd) + +### Communication + +- **Protocol**: CAN 2.0 at 1 Mbps (or CAN-FD at 5 Mbps) +- **Frame format**: Standard 11-bit IDs +- **Update rate**: Typically 50-100 Hz depending on motor count +- **Latency**: ~10-20ms per motor command + +## References + +- [OpenArm Official Documentation](https://docs.openarm.dev/) +- [OpenArm Setup Guide](https://docs.openarm.dev/software/setup/) +- [Motor ID Configuration](https://docs.openarm.dev/software/setup/motor-id) +- [CAN Interface Setup](https://docs.openarm.dev/software/setup/can-setup) +- [Motor Communication Test](https://docs.openarm.dev/software/setup/configure-test) +- [Damiao Motor Documentation](https://wiki.seeedstudio.com/damiao_series/) +- [Enactic GitHub](https://github.com/enactic/openarm_can) diff --git a/pyproject.toml b/pyproject.toml index 9458c0127..40c5d6540 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,8 +102,10 @@ grpcio-dep = ["grpcio==1.73.1", "protobuf==6.31.0"] # TODO: Bumb dependency (com # Motors feetech = ["feetech-servo-sdk>=1.0.0,<2.0.0"] dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"] +damiao = ["python-can>=4.2.0,<5.0.0"] # Robots +openarms = ["lerobot[damiao]"] gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"] hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"] lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"] @@ -153,6 +155,7 @@ metaworld = ["metaworld==3.0.0"] # All all = [ "lerobot[dynamixel]", + "lerobot[openarms]", "lerobot[gamepad]", "lerobot[hopejr]", "lerobot[lekiwi]", diff --git a/src/lerobot/motors/__init__.py b/src/lerobot/motors/__init__.py index 850ef33d7..19115d030 100644 --- a/src/lerobot/motors/__init__.py +++ b/src/lerobot/motors/__init__.py @@ -14,4 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .motors_bus import Motor, MotorCalibration, MotorNormMode, MotorsBus +from .motors_bus import ( + Motor, + MotorCalibration, + MotorNormMode, + MotorsBus, # Backward compatibility (alias for SerialMotorsBus) + MotorsBusBase, + SerialMotorsBus, +) diff --git a/src/lerobot/motors/damiao/damiao.py b/src/lerobot/motors/damiao/damiao.py index 2c22112c0..f3186b661 100644 --- a/src/lerobot/motors/damiao/damiao.py +++ b/src/lerobot/motors/damiao/damiao.py @@ -15,386 +15,361 @@ # TODO(pepijn): add license of: https://github.com/cmjang/DM_Control_Python?tab=MIT-1-ov-file#readme import logging +import time from copy import deepcopy -from enum import Enum +from functools import cached_property +from typing import Dict, List, Optional, Tuple, Union -from lerobot.motors.encoding_utils import decode_twos_complement, encode_twos_complement +import can +import numpy as np + +from lerobot.motors import Motor, MotorCalibration, MotorNormMode, MotorsBusBase +from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.utils.utils import enter_pressed, move_cursor_up + +from .tables import ( + AVAILABLE_BAUDRATES, + CAN_CMD_DISABLE, + CAN_CMD_ENABLE, + CAN_CMD_REFRESH, + CAN_CMD_SET_ZERO, + CAN_PARAM_ID, + DEFAULT_BAUDRATE, + DEFAULT_TIMEOUT_MS, + MODEL_RESOLUTION, + MOTOR_LIMIT_PARAMS, + NORMALIZED_DATA, + MotorType, +) logger = logging.getLogger(__name__) -class OperatingMode(Enum): - MIT = 0 +NameOrID = Union[str, int] +Value = Union[int, float] -class DamiaoMotorsBus(MotorsBus): +class DamiaoMotorsBus(MotorsBusBase): """ - The Damiao implementation for a MotorsBus. It relies on the python-can library to communicate with - the motors. For more info, see the python-can documentation: https://python-can.readthedocs.io/en/stable/, seedstudio documentation: https://wiki.seeedstudio.com/damiao_series/ and DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python - https://wiki.seeedstudio.com/damiao_series/ and DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python + The Damiao implementation for a MotorsBus using CAN bus communication. + + This class uses python-can for CAN bus communication with Damiao motors. + For more info, see: + - python-can documentation: https://python-can.readthedocs.io/en/stable/ + - Seedstudio documentation: https://wiki.seeedstudio.com/damiao_series/ + - DM_Control_Python repo: https://github.com/cmjang/DM_Control_Python """ + # CAN-specific settings + available_baudrates = deepcopy(AVAILABLE_BAUDRATES) + default_baudrate = DEFAULT_BAUDRATE + default_timeout = DEFAULT_TIMEOUT_MS + + # Motor configuration + model_resolution_table = deepcopy(MODEL_RESOLUTION) + normalized_data = deepcopy(NORMALIZED_DATA) + def __init__( self, port: str, + motors: dict[str, Motor], + calibration: dict[str, MotorCalibration] | None = None, + can_interface: str = "socketcan", ): - self.port = port - - def configure_motors(self) -> None: - for motor in self.motors - - - @cached_property - def models(self) -> list[str]: - return [m.model for m in self.motors.values()] - - @cached_property - def ids(self) -> list[int]: - return [m.id for m in self.motors.values()] - - @property - def is_connected(self) -> bool: - """bool: `True` if the underlying serial port is open.""" - return self.port_handler.is_open - - def connect(self, handshake: bool = True) -> None: - """Open the serial port and initialise communication. + """ + Initialize the Damiao motors bus. Args: - handshake (bool, optional): Pings every expected motor and performs additional - integrity checks specific to the implementation. Defaults to `True`. + port: CAN interface name (e.g., "can0" for Linux, "/dev/cu.usbmodem*" for macOS) + motors: Dictionary mapping motor names to Motor objects + calibration: Optional calibration data + can_interface: CAN interface type - "socketcan" (Linux) or "slcan" (macOS/serial) + """ + super().__init__(port, motors, calibration) + self.port = port + self.can_interface = can_interface + self.canbus = None + self._is_connected = False + + # Map motor names to CAN IDs + self._motor_can_ids = {} + self._recv_id_to_motor = {} + + # Store motor types + self._motor_types = {} + for name, motor in self.motors.items(): + if hasattr(motor, "motor_type"): + self._motor_types[name] = motor.motor_type + else: + # Default to DM4310 if not specified + self._motor_types[name] = MotorType.DM4310 - Raises: - DeviceAlreadyConnectedError: The port is already open. - ConnectionError: The underlying SDK failed to open the port or the handshake did not succeed. + @property + def is_connected(self) -> bool: + """Check if the CAN bus is connected.""" + return self._is_connected and self.canbus is not None + + def connect(self, handshake: bool = True) -> None: + """ + Open the CAN bus and initialize communication. + + Args: + handshake: If True, ping all motors to verify they're present """ if self.is_connected: raise DeviceAlreadyConnectedError( - f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice." + f"{self.__class__.__name__}('{self.port}') is already connected." ) - self._connect(handshake) - self.set_timeout() - logger.debug(f"{self.__class__.__name__} connected.") + try: + # Auto-detect interface type based on port name + if self.can_interface == "auto": + if self.port.startswith("/dev/"): + # Serial device (macOS/Windows) + self.can_interface = "slcan" + logger.info(f"Auto-detected slcan interface for port {self.port}") + else: + # Network interface (Linux) + self.can_interface = "socketcan" + logger.info(f"Auto-detected socketcan interface for port {self.port}") + + # Connect to CAN bus + if self.can_interface == "socketcan": + # Linux SocketCAN + self.canbus = can.interface.Bus( + channel=self.port, + interface="socketcan", + bitrate=self.default_baudrate + ) + elif self.can_interface == "slcan": + # Serial Line CAN (macOS, Windows, or USB adapters) + self.canbus = can.interface.Bus( + channel=self.port, + interface="slcan", + bitrate=self.default_baudrate + ) + else: + # Generic interface (vector, pcan, etc.) + self.canbus = can.interface.Bus( + channel=self.port, + interface=self.can_interface, + bitrate=self.default_baudrate + ) + + self._is_connected = True + + if handshake: + self._handshake() + + logger.debug(f"{self.__class__.__name__} connected via {self.can_interface}.") + except Exception as e: + self._is_connected = False + raise ConnectionError(f"Failed to connect to CAN bus: {e}") - - @property - def is_calibrated(self) -> bool: - return self.calibration == self.read_calibration() - - def read_calibration(self) -> dict[str, MotorCalibration]: - offsets = self.sync_read("Homing_Offset", normalize=False) - mins = self.sync_read("Min_Position_Limit", normalize=False) - maxes = self.sync_read("Max_Position_Limit", normalize=False) - drive_modes = self.sync_read("Drive_Mode", normalize=False) - - calibration = {} - for motor, m in self.motors.items(): - calibration[motor] = MotorCalibration( - id=m.id, - drive_mode=drive_modes[motor], - homing_offset=offsets[motor], - range_min=mins[motor], - range_max=maxes[motor], - ) - - return calibration - - def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None: - for motor, calibration in calibration_dict.items(): - self.write("Homing_Offset", motor, calibration.homing_offset) - self.write("Min_Position_Limit", motor, calibration.range_min) - self.write("Max_Position_Limit", motor, calibration.range_max) - - if cache: - self.calibration = calibration_dict - - def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: - for motor in self._get_motors_list(motors): - self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry) - - def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: - for motor in self._get_motors_list(motors): - self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry) + def _handshake(self) -> None: + """Verify all motors are present by refreshing their status.""" + for motor_name in self.motors: + self._refresh_motor(motor_name) + time.sleep(0.01) # Small delay between motors def disconnect(self, disable_torque: bool = True) -> None: - """Close the serial port (optionally disabling torque first). + """ + Close the CAN bus connection. Args: - disable_torque (bool, optional): If `True` (default) torque is disabled on every motor before - closing the port. This can prevent damaging motors if they are left applying resisting torque - after disconnect. + disable_torque: If True, disable torque on all motors before disconnecting """ if not self.is_connected: raise DeviceNotConnectedError( - f"{self.__class__.__name__}('{self.port}') is not connected. Try running `{self.__class__.__name__}.connect()` first." + f"{self.__class__.__name__}('{self.port}') is not connected." ) if disable_torque: - self.port_handler.clearPort() - self.port_handler.is_using = False - self.disable_torque(num_retry=5) + try: + self.disable_torque() + except Exception as e: + logger.warning(f"Failed to disable torque during disconnect: {e}") - self.port_handler.closePort() + if self.canbus: + self.canbus.shutdown() + self.canbus = None + self._is_connected = False logger.debug(f"{self.__class__.__name__} disconnected.") - @classmethod - def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]: - """Probe *port* at every supported baud-rate and list responding IDs. + def configure_motors(self) -> None: + """Configure all motors with default settings.""" + # Damiao motors don't require much configuration in MIT mode + # Just ensure they're enabled + for motor in self.motors: + self._enable_motor(motor) + time.sleep(0.01) - Args: - port (str): Serial/USB port to scan (e.g. ``"/dev/ttyUSB0"``). - *args, **kwargs: Forwarded to the subclass constructor. + def _enable_motor(self, motor: NameOrID) -> None: + """Enable a single motor.""" + motor_id = self._get_motor_id(motor) + data = [0xFF] * 7 + [CAN_CMD_ENABLE] + msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) + self.canbus.send(msg) + self._recv_motor_response() - Returns: - dict[int, list[int]]: Mapping *baud-rate → list of motor IDs* - for every baud-rate that produced at least one response. - """ - bus = cls(port, {}, *args, **kwargs) - bus._connect(handshake=False) - baudrate_ids = {} - for baudrate in tqdm(bus.available_baudrates, desc="Scanning port"): - bus.set_baudrate(baudrate) - ids_models = bus.broadcast_ping() - if ids_models: - tqdm.write(f"Motors found for {baudrate=}: {pformat(ids_models, indent=4)}") - baudrate_ids[baudrate] = list(ids_models) - - bus.port_handler.closePort() - return baudrate_ids - - def setup_motor( - self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None - ) -> None: - """Assign the correct ID and baud-rate to a single motor. - - This helper temporarily switches to the motor's current settings, disables torque, sets the desired - ID, and finally programs the bus' default baud-rate. - - Args: - motor (str): Key of the motor in :pyattr:`motors`. - initial_baudrate (int | None, optional): Current baud-rate (skips scanning when provided). - Defaults to None. - initial_id (int | None, optional): Current ID (skips scanning when provided). Defaults to None. - - Raises: - RuntimeError: The motor could not be found or its model number - does not match the expected one. - ConnectionError: Communication with the motor failed. - """ - if not self.is_connected: - self._connect(handshake=False) - - if initial_baudrate is None: - initial_baudrate, initial_id = self._find_single_motor(motor) - - if initial_id is None: - _, initial_id = self._find_single_motor(motor, initial_baudrate) - - model = self.motors[motor].model - target_id = self.motors[motor].id - self.set_baudrate(initial_baudrate) - self._disable_torque(initial_id, model) - - # Set ID - addr, length = get_address(self.model_ctrl_table, model, "ID") - self._write(addr, length, initial_id, target_id) - - # Set Baudrate - addr, length = get_address(self.model_ctrl_table, model, "Baud_Rate") - baudrate_value = self.model_baudrate_table[model][self.default_baudrate] - self._write(addr, length, target_id, baudrate_value) - - self.set_baudrate(self.default_baudrate) - - @contextmanager - def torque_disabled(self, motors: int | str | list[str] | None = None): - """Context-manager that guarantees torque is re-enabled. - - This helper is useful to temporarily disable torque when configuring motors. - - Examples: - >>> with bus.torque_disabled(): - ... # Safe operations here - ... pass - """ - self.disable_torque(motors) - try: - yield - finally: - self.enable_torque(motors) - - def set_timeout(self, timeout_ms: int | None = None): - """Change the packet timeout used by the SDK. - - Args: - timeout_ms (int | None, optional): Timeout in *milliseconds*. If `None` (default) the method falls - back to :pyattr:`default_timeout`. - """ - timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout - self.port_handler.setPacketTimeoutMillis(timeout_ms) - - def get_baudrate(self) -> int: - """Return the current baud-rate configured on the port. - - Returns: - int: Baud-rate in bits / second. - """ - return self.port_handler.getBaudRate() - - def set_baudrate(self, baudrate: int) -> None: - """Set a new UART baud-rate on the port. - - Args: - baudrate (int): Desired baud-rate in bits / second. - - Raises: - RuntimeError: The SDK failed to apply the change. - """ - present_bus_baudrate = self.port_handler.getBaudRate() - if present_bus_baudrate != baudrate: - logger.info(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.") - self.port_handler.setBaudRate(baudrate) - - if self.port_handler.getBaudRate() != baudrate: - raise RuntimeError("Failed to write bus baud rate.") - - def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None: - """Restore factory calibration for the selected motors. - - Homing offset is set to ``0`` and min/max position limits are set to the full usable range. - The in-memory :pyattr:`calibration` is cleared. - - Args: - motors (NameOrID | list[NameOrID] | None, optional): Selection of motors. `None` (default) - resets every motor. - """ - if motors is None: - motors = list(self.motors) - elif isinstance(motors, (str | int)): - motors = [motors] - elif not isinstance(motors, list): - raise TypeError(motors) + def _disable_motor(self, motor: NameOrID) -> None: + """Disable a single motor.""" + motor_id = self._get_motor_id(motor) + data = [0xFF] * 7 + [CAN_CMD_DISABLE] + msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) + self.canbus.send(msg) + self._recv_motor_response() + def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: + """Enable torque on selected motors.""" + motors = self._get_motors_list(motors) for motor in motors: - model = self._get_motor_model(motor) - max_res = self.model_resolution_table[model] - 1 - self.write("Homing_Offset", motor, 0, normalize=False) - self.write("Min_Position_Limit", motor, 0, normalize=False) - self.write("Max_Position_Limit", motor, max_res, normalize=False) + for _ in range(num_retry + 1): + try: + self._enable_motor(motor) + break + except Exception as e: + if _ == num_retry: + raise e + time.sleep(0.01) - self.calibration = {} + def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: + """Disable torque on selected motors.""" + motors = self._get_motors_list(motors) + for motor in motors: + for _ in range(num_retry + 1): + try: + self._disable_motor(motor) + break + except Exception as e: + if _ == num_retry: + raise e + time.sleep(0.01) - def record_ranges_of_motion( - self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True - ) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: - """Interactively record the min/max encoder values of each motor. + def set_zero_position(self, motors: str | list[str] | None = None) -> None: + """Set current position as zero for selected motors.""" + motors = self._get_motors_list(motors) + for motor in motors: + motor_id = self._get_motor_id(motor) + data = [0xFF] * 7 + [CAN_CMD_SET_ZERO] + msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) + self.canbus.send(msg) + self._recv_motor_response() + time.sleep(0.01) - Move the joints by hand (with torque disabled) while the method streams live positions. Press - :kbd:`Enter` to finish. + def _refresh_motor(self, motor: NameOrID) -> None: + """Refresh motor status.""" + motor_id = self._get_motor_id(motor) + data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0] + msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False) + self.canbus.send(msg) + self._recv_motor_response() + + def _recv_motor_response(self, timeout: float = 0.1) -> Optional[can.Message]: + """Receive a response from a motor.""" + try: + msg = self.canbus.recv(timeout=timeout) + if msg: + return msg + except Exception as e: + logger.debug(f"Failed to receive CAN message: {e}") + return None + + def _mit_control( + self, + motor: NameOrID, + kp: float, + kd: float, + position_degrees: float, + velocity_deg_per_sec: float, + torque: float, + ) -> None: + """ + Send MIT control command to a motor. Args: - motors (NameOrID | list[NameOrID] | None, optional): Motors to record. - Defaults to every motor (`None`). - display_values (bool, optional): When `True` (default) a live table is printed to the console. + motor: Motor name or ID + kp: Position gain + kd: Velocity gain + position_degrees: Target position (degrees) + velocity_deg_per_sec: Target velocity (degrees/s) + torque: Target torque (N·m) + """ + motor_id = self._get_motor_id(motor) + motor_name = self._get_motor_name(motor) + motor_type = self._motor_types.get(motor_name, MotorType.DM4310) + + # Convert degrees to radians for motor control + position_rad = np.radians(position_degrees) + velocity_rad_per_sec = np.radians(velocity_deg_per_sec) + + # Get motor limits + pmax, vmax, tmax = MOTOR_LIMIT_PARAMS[motor_type] + + # Encode parameters + kp_uint = self._float_to_uint(kp, 0, 500, 12) + kd_uint = self._float_to_uint(kd, 0, 5, 12) + q_uint = self._float_to_uint(position_rad, -pmax, pmax, 16) + dq_uint = self._float_to_uint(velocity_rad_per_sec, -vmax, vmax, 12) + tau_uint = self._float_to_uint(torque, -tmax, tmax, 12) + + # Pack data + data = [0] * 8 + data[0] = (q_uint >> 8) & 0xFF + data[1] = q_uint & 0xFF + data[2] = dq_uint >> 4 + data[3] = ((dq_uint & 0xF) << 4) | ((kp_uint >> 8) & 0xF) + data[4] = kp_uint & 0xFF + data[5] = kd_uint >> 4 + data[6] = ((kd_uint & 0xF) << 4) | ((tau_uint >> 8) & 0xF) + data[7] = tau_uint & 0xFF + + msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) + self.canbus.send(msg) + self._recv_motor_response() + + def _float_to_uint(self, x: float, x_min: float, x_max: float, bits: int) -> int: + """Convert float to unsigned integer for CAN transmission.""" + x = max(x_min, min(x_max, x)) # Clamp to range + span = x_max - x_min + data_norm = (x - x_min) / span + return int(data_norm * ((1 << bits) - 1)) + + def _uint_to_float(self, x: int, x_min: float, x_max: float, bits: int) -> float: + """Convert unsigned integer from CAN to float.""" + span = x_max - x_min + data_norm = float(x) / ((1 << bits) - 1) + return data_norm * span + x_min + + def _decode_motor_state(self, data: bytes, motor_type: MotorType) -> Tuple[float, float, float, int, int]: + """ + Decode motor state from CAN data. Returns: - tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: Two dictionaries *mins* and *maxes* with the - extreme values observed for each motor. + Tuple of (position_degrees, velocity_deg_per_sec, torque, temp_mos, temp_rotor) """ - if motors is None: - motors = list(self.motors) - elif isinstance(motors, (str | int)): - motors = [motors] - elif not isinstance(motors, list): - raise TypeError(motors) - - start_positions = self.sync_read("Present_Position", motors, normalize=False) - mins = start_positions.copy() - maxes = start_positions.copy() - - user_pressed_enter = False - while not user_pressed_enter: - positions = self.sync_read("Present_Position", motors, normalize=False) - mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()} - maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()} - - if display_values: - print("\n-------------------------------------------") - print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}") - for motor in motors: - print(f"{motor:<15} | {mins[motor]:>6} | {positions[motor]:>6} | {maxes[motor]:>6}") - - if enter_pressed(): - user_pressed_enter = True - - if display_values and not user_pressed_enter: - # Move cursor up to overwrite the previous output - move_cursor_up(len(motors) + 3) - - same_min_max = [motor for motor in motors if mins[motor] == maxes[motor]] - if same_min_max: - raise ValueError(f"Some motors have the same min and max values:\n{pformat(same_min_max)}") - - return mins, maxes - - def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]: - if not self.calibration: - raise RuntimeError(f"{self} has no calibration registered.") - - normalized_values = {} - for id_, val in ids_values.items(): - motor = self._id_to_name(id_) - min_ = self.calibration[motor].range_min - max_ = self.calibration[motor].range_max - drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode - if max_ == min_: - raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.") - - bounded_val = min(max_, max(min_, val)) - if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100: - norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 - normalized_values[id_] = -norm if drive_mode else norm - elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100: - norm = ((bounded_val - min_) / (max_ - min_)) * 100 - normalized_values[id_] = 100 - norm if drive_mode else norm - elif self.motors[motor].norm_mode is MotorNormMode.DEGREES: - mid = (min_ + max_) / 2 - max_res = self.model_resolution_table[self._id_to_model(id_)] - 1 - normalized_values[id_] = (val - mid) * 360 / max_res - else: - raise NotImplementedError - - return normalized_values - - def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]: - if not self.calibration: - raise RuntimeError(f"{self} has no calibration registered.") - - unnormalized_values = {} - for id_, val in ids_values.items(): - motor = self._id_to_name(id_) - min_ = self.calibration[motor].range_min - max_ = self.calibration[motor].range_max - drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode - if max_ == min_: - raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.") - - if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100: - val = -val if drive_mode else val - bounded_val = min(100.0, max(-100.0, val)) - unnormalized_values[id_] = int(((bounded_val + 100) / 200) * (max_ - min_) + min_) - elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100: - val = 100 - val if drive_mode else val - bounded_val = min(100.0, max(0.0, val)) - unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_) - elif self.motors[motor].norm_mode is MotorNormMode.DEGREES: - mid = (min_ + max_) / 2 - max_res = self.model_resolution_table[self._id_to_model(id_)] - 1 - unnormalized_values[id_] = int((val * max_res / 360) + mid) - else: - raise NotImplementedError - - return unnormalized_values + if len(data) < 8: + raise ValueError("Invalid motor state data") + + # Extract encoded values + q_uint = (data[1] << 8) | data[2] + dq_uint = (data[3] << 4) | (data[4] >> 4) + tau_uint = ((data[4] & 0x0F) << 8) | data[5] + t_mos = data[6] + t_rotor = data[7] + + # Get motor limits + pmax, vmax, tmax = MOTOR_LIMIT_PARAMS[motor_type] + + # Decode to physical values (radians) + position_rad = self._uint_to_float(q_uint, -pmax, pmax, 16) + velocity_rad_per_sec = self._uint_to_float(dq_uint, -vmax, vmax, 12) + torque = self._uint_to_float(tau_uint, -tmax, tmax, 12) + + # Convert to degrees + position_degrees = np.degrees(position_rad) + velocity_deg_per_sec = np.degrees(velocity_rad_per_sec) + + return position_degrees, velocity_deg_per_sec, torque, t_mos, t_rotor def read( self, @@ -404,69 +379,207 @@ class DamiaoMotorsBus(MotorsBus): normalize: bool = True, num_retry: int = 0, ) -> Value: - """Read a register from a motor. - - Args: - data_name (str): Control-table key (e.g. `"Present_Position"`). - motor (str): Motor name. - normalize (bool, optional): When `True` (default) scale the value to a user-friendly range as - defined by the calibration. - num_retry (int, optional): Retry attempts. Defaults to `0`. - - Returns: - Value: Raw or normalised value depending on *normalize*. - """ + """Read a value from a single motor. Positions are always in degrees.""" if not self.is_connected: - raise DeviceNotConnectedError( - f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." - ) - - id_ = self.motors[motor].id - model = self.motors[motor].model - addr, length = get_address(self.model_ctrl_table, model, data_name) - - err_msg = f"Failed to read '{data_name}' on {id_=} after {num_retry + 1} tries." - value, _, _ = self._read(addr, length, id_, num_retry=num_retry, raise_on_error=True, err_msg=err_msg) - - id_value = self._decode_sign(data_name, {id_: value}) - - if normalize and data_name in self.normalized_data: - id_value = self._normalize(id_value) - - return id_value[id_] + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Refresh motor to get latest state + self._refresh_motor(motor) + + # Read response + msg = self._recv_motor_response() + if msg is None: + raise ConnectionError(f"No response from motor {motor}") + + motor_type = self._motor_types.get(motor, MotorType.DM4310) + position_degrees, velocity_deg_per_sec, torque, t_mos, t_rotor = self._decode_motor_state(msg.data, motor_type) + + # Return requested data (already in degrees for position/velocity) + if data_name == "Present_Position": + value = position_degrees + elif data_name == "Present_Velocity": + value = velocity_deg_per_sec + elif data_name == "Present_Torque": + value = torque + elif data_name == "Temperature_MOS": + value = t_mos + elif data_name == "Temperature_Rotor": + value = t_rotor + else: + raise ValueError(f"Unknown data_name: {data_name}") + + # For Damiao, positions are always in degrees, no normalization needed + # We keep the normalize parameter for compatibility but don't use it + return value def write( - self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0 + self, + data_name: str, + motor: str, + value: Value, + *, + normalize: bool = True, + num_retry: int = 0, ) -> None: - """Write a value to a single motor's register. - - Contrary to :pymeth:`sync_write`, this expects a response status packet emitted by the motor, which - provides a guarantee that the value was written to the register successfully. In consequence, it is - slower than :pymeth:`sync_write` but it is more reliable. It should typically be used when configuring - motors. - - Args: - data_name (str): Register name. - motor (str): Motor name. - value (Value): Value to write. If *normalize* is `True` the value is first converted to raw - units. - normalize (bool, optional): Enable or disable normalisation. Defaults to `True`. - num_retry (int, optional): Retry attempts. Defaults to `0`. - """ + """Write a value to a single motor. Positions are always in degrees.""" if not self.is_connected: - raise DeviceNotConnectedError( - f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Value is expected to be in degrees for positions + if data_name == "Goal_Position": + # Use MIT control with position in degrees + self._mit_control(motor, 10.0, 0.5, value, 0, 0) + else: + raise ValueError(f"Writing {data_name} not supported in MIT mode") - id_ = self.motors[motor].id - model = self.motors[motor].model - addr, length = get_address(self.model_ctrl_table, model, data_name) + def sync_read( + self, + data_name: str, + motors: str | list[str] | None = None, + *, + normalize: bool = True, + num_retry: int = 0, + ) -> Dict[str, Value]: + """Read the same value from multiple motors simultaneously.""" + motors = self._get_motors_list(motors) + result = {} - if normalize and data_name in self.normalized_data: - value = self._unnormalize({id_: value})[id_] + for motor in motors: + try: + value = self.read(data_name, motor, normalize=normalize, num_retry=num_retry) + result[motor] = value + except Exception as e: + logger.warning(f"Failed to read {data_name} from {motor}: {e}") + result[motor] = 0.0 + + return result - value = self._encode_sign(data_name, {id_: value})[id_] + def sync_write( + self, + data_name: str, + values: Dict[str, Value], + *, + normalize: bool = True, + num_retry: int = 0, + ) -> None: + """Write different values to multiple motors simultaneously. Positions are always in degrees.""" + if data_name == "Goal_Position": + # Use MIT control for position commands (values in degrees) + for motor, value_degrees in values.items(): + # Use reasonable default gains for position control + self._mit_control(motor, 10.0, 0.5, value_degrees, 0, 0) + else: + # Fall back to individual writes + for motor, value in values.items(): + self.write(data_name, motor, value, normalize=normalize, num_retry=num_retry) - err_msg = f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries." - self._write(addr, length, id_, value, num_retry=num_retry, raise_on_error=True, err_msg=err_msg) + def read_calibration(self) -> dict[str, MotorCalibration]: + """Read calibration data from motors.""" + # Damiao motors don't store calibration internally + # Return existing calibration or empty dict + return self.calibration if self.calibration else {} + def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None: + """Write calibration data to motors.""" + # Damiao motors don't store calibration internally + # Just cache it in memory + if cache: + self.calibration = calibration_dict + + def record_ranges_of_motion( + self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True + ) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: + """ + Interactively record the min/max values of each motor in degrees. + + Move the joints by hand (with torque disabled) while the method streams live positions. + Press Enter to finish. + """ + if motors is None: + motors = list(self.motors.keys()) + elif isinstance(motors, (str, int)): + motors = [motors] + + # Disable torque for manual movement + self.disable_torque(motors) + time.sleep(0.1) + + # Get initial positions (already in degrees) + start_positions = self.sync_read("Present_Position", motors, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + + print("\nMove joints through their full range of motion. Press ENTER when done.") + user_pressed_enter = False + + while not user_pressed_enter: + positions = self.sync_read("Present_Position", motors, normalize=False) + + for motor in motors: + if motor in positions: + mins[motor] = min(positions[motor], mins.get(motor, positions[motor])) + maxes[motor] = max(positions[motor], maxes.get(motor, positions[motor])) + + if display_values: + print("\n" + "=" * 50) + print(f"{'MOTOR':<20} | {'MIN (deg)':>12} | {'POS (deg)':>12} | {'MAX (deg)':>12}") + print("-" * 50) + for motor in motors: + if motor in positions: + print(f"{motor:<20} | {mins[motor]:>12.1f} | {positions[motor]:>12.1f} | {maxes[motor]:>12.1f}") + + if enter_pressed(): + user_pressed_enter = True + + if display_values and not user_pressed_enter: + # Move cursor up to overwrite the previous output + move_cursor_up(len(motors) + 4) + + time.sleep(0.05) + + # Re-enable torque + self.enable_torque(motors) + + # Validate ranges + for motor in motors: + if motor in mins and motor in maxes: + if abs(maxes[motor] - mins[motor]) < 5.0: # At least 5 degrees of range + raise ValueError(f"Motor {motor} has insufficient range of motion (< 5 degrees)") + + return mins, maxes + + def _get_motors_list(self, motors: str | list[str] | None) -> list[str]: + """Convert motor specification to list of motor names.""" + if motors is None: + return list(self.motors.keys()) + elif isinstance(motors, str): + return [motors] + elif isinstance(motors, list): + return motors + else: + raise TypeError(f"Invalid motors type: {type(motors)}") + + def _get_motor_id(self, motor: NameOrID) -> int: + """Get CAN ID for a motor.""" + if isinstance(motor, str): + if motor in self.motors: + return self.motors[motor].id + else: + raise ValueError(f"Unknown motor: {motor}") + else: + return motor + + def _get_motor_name(self, motor: NameOrID) -> str: + """Get motor name from name or ID.""" + if isinstance(motor, str): + return motor + else: + for name, m in self.motors.items(): + if m.id == motor: + return name + raise ValueError(f"Unknown motor ID: {motor}") + + @cached_property + def is_calibrated(self) -> bool: + """Check if motors are calibrated.""" + return bool(self.calibration) \ No newline at end of file diff --git a/src/lerobot/motors/damiao/tables.py b/src/lerobot/motors/damiao/tables.py index e69de29bb..58c6dfab1 100644 --- a/src/lerobot/motors/damiao/tables.py +++ b/src/lerobot/motors/damiao/tables.py @@ -0,0 +1,209 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Configuration tables for Damiao motors.""" + +from enum import IntEnum +from typing import Dict, List, Tuple + +# Motor type definitions +class MotorType(IntEnum): + DM3507 = 0 + DM4310 = 1 + DM4310_48V = 2 + DM4340 = 3 + DM4340_48V = 4 + DM6006 = 5 + DM8006 = 6 + DM8009 = 7 + DM10010L = 8 + DM10010 = 9 + DMH3510 = 10 + DMH6215 = 11 + DMG6220 = 12 + +# Control modes +class ControlMode(IntEnum): + MIT = 1 + POS_VEL = 2 + VEL = 3 + TORQUE_POS = 4 + +# Motor variable IDs (RID) +class MotorVariable(IntEnum): + UV_VALUE = 0 + KT_VALUE = 1 + OT_VALUE = 2 + OC_VALUE = 3 + ACC = 4 + DEC = 5 + MAX_SPD = 6 + MST_ID = 7 + ESC_ID = 8 + TIMEOUT = 9 + CTRL_MODE = 10 + DAMP = 11 + INERTIA = 12 + HW_VER = 13 + SW_VER = 14 + SN = 15 + NPP = 16 + RS = 17 + LS = 18 + FLUX = 19 + GR = 20 + PMAX = 21 + VMAX = 22 + TMAX = 23 + I_BW = 24 + KP_ASR = 25 + KI_ASR = 26 + KP_APR = 27 + KI_APR = 28 + OV_VALUE = 29 + GREF = 30 + DETA = 31 + V_BW = 32 + IQ_C1 = 33 + VL_C1 = 34 + CAN_BR = 35 + SUB_VER = 36 + U_OFF = 50 + V_OFF = 51 + K1 = 52 + K2 = 53 + M_OFF = 54 + DIR = 55 + P_M = 80 + XOUT = 81 + +# Motor limit parameters [PMAX, VMAX, TMAX] +# PMAX: Maximum position (rad) +# VMAX: Maximum velocity (rad/s) +# TMAX: Maximum torque (N·m) +MOTOR_LIMIT_PARAMS = { + MotorType.DM3507: (12.5, 30, 10), + MotorType.DM4310: (12.5, 30, 10), + MotorType.DM4310_48V: (12.5, 50, 10), + MotorType.DM4340: (12.5, 8, 28), + MotorType.DM4340_48V: (12.5, 10, 28), + MotorType.DM6006: (12.5, 45, 20), + MotorType.DM8006: (12.5, 45, 40), + MotorType.DM8009: (12.5, 45, 54), + MotorType.DM10010L: (12.5, 25, 200), + MotorType.DM10010: (12.5, 20, 200), + MotorType.DMH3510: (12.5, 280, 1), + MotorType.DMH6215: (12.5, 45, 10), + MotorType.DMG6220: (12.5, 45, 10), +} + +# Motor model names +MODEL_NAMES = { + MotorType.DM3507: "dm3507", + MotorType.DM4310: "dm4310", + MotorType.DM4310_48V: "dm4310_48v", + MotorType.DM4340: "dm4340", + MotorType.DM4340_48V: "dm4340_48v", + MotorType.DM6006: "dm6006", + MotorType.DM8006: "dm8006", + MotorType.DM8009: "dm8009", + MotorType.DM10010L: "dm10010l", + MotorType.DM10010: "dm10010", + MotorType.DMH3510: "dmh3510", + MotorType.DMH6215: "dmh6215", + MotorType.DMG6220: "dmg6220", +} + +# Motor resolution table (encoder counts per revolution) +MODEL_RESOLUTION = { + "dm3507": 65536, + "dm4310": 65536, + "dm4310_48v": 65536, + "dm4340": 65536, + "dm4340_48v": 65536, + "dm6006": 65536, + "dm8006": 65536, + "dm8009": 65536, + "dm10010l": 65536, + "dm10010": 65536, + "dmh3510": 65536, + "dmh6215": 65536, + "dmg6220": 65536, +} + +# CAN baudrates supported by Damiao motors +AVAILABLE_BAUDRATES = [ + 125000, # 0: 125 kbps + 200000, # 1: 200 kbps + 250000, # 2: 250 kbps + 500000, # 3: 500 kbps + 1000000, # 4: 1 mbps (default for OpenArms) + 2000000, # 5: 2 mbps + 2500000, # 6: 2.5 mbps + 3200000, # 7: 3.2 mbps + 4000000, # 8: 4 mbps + 5000000, # 9: 5 mbps +] +DEFAULT_BAUDRATE = 1000000 # 1 Mbps is standard for OpenArms + +# Default timeout in milliseconds +DEFAULT_TIMEOUT_MS = 1000 + +# Data that should be normalized +NORMALIZED_DATA = ["Present_Position", "Goal_Position"] + +# OpenArms specific configurations +# Based on: https://docs.openarm.dev/software/setup/configure-test +# OpenArms has 7 DOF per arm (14 total for dual arm) +OPENARMS_ARM_MOTOR_IDS = { + "joint_1": {"send": 0x01, "recv": 0x11}, # J1 - Shoulder pan + "joint_2": {"send": 0x02, "recv": 0x12}, # J2 - Shoulder lift + "joint_3": {"send": 0x03, "recv": 0x13}, # J3 - Elbow flex + "joint_4": {"send": 0x04, "recv": 0x14}, # J4 - Wrist flex + "joint_5": {"send": 0x05, "recv": 0x15}, # J5 - Wrist roll + "joint_6": {"send": 0x06, "recv": 0x16}, # J6 - Wrist pitch + "joint_7": {"send": 0x07, "recv": 0x17}, # J7 - Wrist rotation +} + +OPENARMS_GRIPPER_MOTOR_IDS = { + "gripper": {"send": 0x08, "recv": 0x18}, # J8 - Gripper +} + +# Default motor types for OpenArms +OPENARMS_DEFAULT_MOTOR_TYPES = { + "joint_1": MotorType.DM8009, # Shoulder pan - high torque + "joint_2": MotorType.DM8009, # Shoulder lift - high torque + "joint_3": MotorType.DM4340, # Shoulder rotation + "joint_4": MotorType.DM4340, # Elbow flex + "joint_5": MotorType.DM4310, # Wrist roll + "joint_6": MotorType.DM4310, # Wrist pitch + "joint_7": MotorType.DM4310, # Wrist rotation + "gripper": MotorType.DM4310, # Gripper +} + +# MIT control parameter ranges +MIT_KP_RANGE = (0.0, 500.0) +MIT_KD_RANGE = (0.0, 5.0) + +# CAN frame command IDs +CAN_CMD_ENABLE = 0xFC +CAN_CMD_DISABLE = 0xFD +CAN_CMD_SET_ZERO = 0xFE +CAN_CMD_REFRESH = 0xCC +CAN_CMD_QUERY_PARAM = 0x33 +CAN_CMD_WRITE_PARAM = 0x55 +CAN_CMD_SAVE_PARAM = 0xAA + +# CAN ID for parameter operations +CAN_PARAM_ID = 0x7FF diff --git a/src/lerobot/motors/dynamixel/dynamixel.py b/src/lerobot/motors/dynamixel/dynamixel.py index 01bfcf544..b83c116a4 100644 --- a/src/lerobot/motors/dynamixel/dynamixel.py +++ b/src/lerobot/motors/dynamixel/dynamixel.py @@ -24,7 +24,7 @@ from enum import Enum from lerobot.motors.encoding_utils import decode_twos_complement, encode_twos_complement -from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value, get_address +from ..motors_bus import Motor, MotorCalibration, NameOrID, SerialMotorsBus, Value, get_address from .tables import ( AVAILABLE_BAUDRATES, MODEL_BAUDRATE_TABLE, @@ -100,7 +100,7 @@ def _split_into_byte_chunks(value: int, length: int) -> list[int]: return data -class DynamixelMotorsBus(MotorsBus): +class DynamixelMotorsBus(SerialMotorsBus): """ The Dynamixel implementation for a MotorsBus. It relies on the python dynamixel sdk to communicate with the motors. For more info, see the Dynamixel SDK Documentation: diff --git a/src/lerobot/motors/feetech/feetech.py b/src/lerobot/motors/feetech/feetech.py index 2ea57af12..f23b68e33 100644 --- a/src/lerobot/motors/feetech/feetech.py +++ b/src/lerobot/motors/feetech/feetech.py @@ -19,7 +19,7 @@ from pprint import pformat from lerobot.motors.encoding_utils import decode_sign_magnitude, encode_sign_magnitude -from ..motors_bus import Motor, MotorCalibration, MotorsBus, NameOrID, Value, get_address +from ..motors_bus import Motor, MotorCalibration, NameOrID, SerialMotorsBus, Value, get_address from .tables import ( FIRMWARE_MAJOR_VERSION, FIRMWARE_MINOR_VERSION, @@ -96,7 +96,7 @@ def patch_setPacketTimeout(self, packet_length): # noqa: N802 self.packet_timeout = (self.tx_time_per_byte * packet_length) + (self.tx_time_per_byte * 3.0) + 50 -class FeetechMotorsBus(MotorsBus): +class FeetechMotorsBus(SerialMotorsBus): """ The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk. diff --git a/src/lerobot/motors/motors_bus.py b/src/lerobot/motors/motors_bus.py index 17eaa8063..2080e595d 100644 --- a/src/lerobot/motors/motors_bus.py +++ b/src/lerobot/motors/motors_bus.py @@ -19,6 +19,8 @@ # TODO(aliberts): Add block noqa when feature below is available # https://github.com/astral-sh/ruff/issues/3711 +from __future__ import annotations + import abc import logging from contextlib import contextmanager @@ -41,6 +43,92 @@ Value: TypeAlias = int | float logger = logging.getLogger(__name__) +class MotorsBusBase(abc.ABC): + """ + Base class for all motor bus implementations. + + This is a minimal interface that all motor buses must implement, regardless of their + communication protocol (serial, CAN, etc.). + """ + + def __init__( + self, + port: str, + motors: dict[str, Motor], + calibration: dict[str, MotorCalibration] | None = None, + ): + self.port = port + self.motors = motors + self.calibration = calibration if calibration else {} + + @abc.abstractmethod + def connect(self, handshake: bool = True) -> None: + """Establish connection to the motors.""" + pass + + @abc.abstractmethod + def disconnect(self, disable_torque: bool = True) -> None: + """Disconnect from the motors.""" + pass + + @property + @abc.abstractmethod + def is_connected(self) -> bool: + """Check if connected to the motors.""" + pass + + @abc.abstractmethod + def read(self, data_name: str, motor: str, *, normalize: bool = True, num_retry: int = 0) -> Value: + """Read a value from a single motor.""" + pass + + @abc.abstractmethod + def write( + self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0 + ) -> None: + """Write a value to a single motor.""" + pass + + @abc.abstractmethod + def sync_read( + self, data_name: str, motors: str | list[str] | None = None, *, normalize: bool = True + ) -> dict[str, Value]: + """Read a value from multiple motors.""" + pass + + @abc.abstractmethod + def sync_write( + self, + data_name: str, + values: Value | dict[str, Value], + motors: str | list[str] | None = None, + *, + normalize: bool = True, + ) -> None: + """Write values to multiple motors.""" + pass + + @abc.abstractmethod + def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: + """Enable torque on selected motors.""" + pass + + @abc.abstractmethod + def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None: + """Disable torque on selected motors.""" + pass + + @abc.abstractmethod + def read_calibration(self) -> dict[str, MotorCalibration]: + """Read calibration parameters from the motors.""" + pass + + @abc.abstractmethod + def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None: + """Write calibration parameters to the motors.""" + pass + + def get_ctrl_table(model_ctrl_table: dict[str, dict], model: str) -> dict[str, tuple[int, int]]: ctrl_table = model_ctrl_table.get(model) if ctrl_table is None: @@ -203,15 +291,15 @@ class GroupSyncWrite(Protocol): def txPacket(self): ... -class MotorsBus(abc.ABC): +class SerialMotorsBus(MotorsBusBase): """ - A MotorsBus allows to efficiently read and write to the attached motors. + A SerialMotorsBus allows to efficiently read and write to motors connected via serial communication. It represents several motors daisy-chained together and connected through a serial port. - There are currently two implementations of this abstract class: + There are currently two implementations of this class: - DynamixelMotorsBus - FeetechMotorsBus - Note: This class may evolve in the future should we add support for other types of bus. + This class is specifically for serial-based motor protocols (Dynamixel, Feetech, etc.). A MotorsBus subclass instance requires a port (e.g. `FeetechMotorsBus(port="/dev/tty.usbmodem575E0031751"`)). To find the port, you can run our utility script: @@ -1212,3 +1300,7 @@ class MotorsBus(abc.ABC): for id_, value in ids_values.items(): data = self._serialize_data(value, length) self.sync_writer.addParam(id_, data) + + +# Backward compatibility alias +MotorsBus = SerialMotorsBus diff --git a/src/lerobot/robots/openarms/__init__.py b/src/lerobot/robots/openarms/__init__.py new file mode 100644 index 000000000..9e531526b --- /dev/null +++ b/src/lerobot/robots/openarms/__init__.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .config_openarms_follower import OpenArmsFollowerConfig +from .openarms_follower import OpenArmsFollower + +__all__ = ["OpenArmsFollower", "OpenArmsFollowerConfig"] diff --git a/src/lerobot/robots/openarms/config_openarms_follower.py b/src/lerobot/robots/openarms/config_openarms_follower.py new file mode 100644 index 000000000..b8571de5c --- /dev/null +++ b/src/lerobot/robots/openarms/config_openarms_follower.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from lerobot.cameras import CameraConfig +from lerobot.motors.damiao.tables import MotorType + +from ..config import RobotConfig + + +@RobotConfig.register_subclass("openarms_follower") +@dataclass +class OpenArmsFollowerConfig(RobotConfig): + """Configuration for the OpenArms follower robot with Damiao motors.""" + + # CAN interface to connect to + # Linux: "can0", "can1", etc. + # macOS: "/dev/cu.usbmodem*" (serial device) + port: str = "can0" + + # CAN interface type: "socketcan" (Linux), "slcan" (macOS/serial), or "auto" (auto-detect) + can_interface: str = "auto" + + # Whether to disable torque when disconnecting + disable_torque_on_disconnect: bool = True + + # Safety limit for relative target positions + # Set to a positive scalar for all motors, or a dict mapping motor names to limits + max_relative_target: Optional[float | Dict[str, float]] = None + + # Camera configurations + cameras: Dict[str, CameraConfig] = field(default_factory=dict) + + # Motor configuration for OpenArms (7 DOF per arm) + # Maps motor names to (send_can_id, recv_can_id, motor_type) + # Based on: https://docs.openarm.dev/software/setup/configure-test + # OpenArms uses 4 types of motors: + # - DM8009 (DM-J8009P-2EC) for shoulders (high torque) + # - DM4340P and DM4340 for shoulder rotation and elbow + # - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper + motor_config: Dict[str, tuple[int, int, str]] = field(default_factory=lambda: { + "joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009) + "joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009) + "joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340) + "joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340) + "joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310) + "joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310) + "joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310) + "gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310) + }) + + # MIT control parameters for position control + position_kp: float = 10.0 # Position gain + position_kd: float = 0.5 # Velocity damping + + # Calibration parameters + calibration_mode: str = "manual" # "manual" or "auto" + zero_position_on_connect: bool = False # Set zero position on connect diff --git a/src/lerobot/robots/openarms/openarms_follower.py b/src/lerobot/robots/openarms/openarms_follower.py new file mode 100644 index 000000000..cf6a758e8 --- /dev/null +++ b/src/lerobot/robots/openarms/openarms_follower.py @@ -0,0 +1,393 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from functools import cached_property +from typing import Any, Dict + +import numpy as np +import pinocchio as pin + +from lerobot.cameras.utils import make_cameras_from_configs +from lerobot.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.motors.damiao import DamiaoMotorsBus +from lerobot.motors.damiao.tables import MotorType +from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +from ..robot import Robot +from ..utils import ensure_safe_goal_position +from .config_openarms_follower import OpenArmsFollowerConfig + +logger = logging.getLogger(__name__) + + +class OpenArmsFollower(Robot): + """ + OpenArms Follower Robot which uses CAN bus communication to control 7 DOF arm with a gripper. + The arm uses Damiao motors in MIT control mode. + """ + + config_class = OpenArmsFollowerConfig + name = "openarms_follower" + + def __init__(self, config: OpenArmsFollowerConfig): + super().__init__(config) + self.config = config + + norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors + motors = {} + + # Right arm + for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items(): + prefixed_name = f"right_{motor_name}" + motor = Motor(send_id, motor_type_str, norm_mode_body) + motor.recv_id = recv_id + motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_")) + motors[prefixed_name] = motor + + # Left arm (offset IDs by 8) + for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items(): + prefixed_name = f"left_{motor_name}" + motor = Motor(send_id + 0x08, motor_type_str, norm_mode_body) + motor.recv_id = recv_id + 0x08 + motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_")) + motors[prefixed_name] = motor + + # Initialize the Damiao motors bus + self.bus = DamiaoMotorsBus( + port=self.config.port, + motors=motors, + calibration=self.calibration, + can_interface=self.config.can_interface, + ) + + # Initialize cameras + self.cameras = make_cameras_from_configs(config.cameras) + + # Initialize Pinocchio robot model for dynamics (optional) + self.pin_robot = None + try: + # Try to load URDF if available + # TODO: Add OpenArms URDF file to repository + self.pin_robot = pin.RobotWrapper.BuildFromURDF("urdf/openarms.urdf", "urdf") + logger.info("Loaded OpenArms URDF for dynamics computation") + except Exception as e: + logger.warning(f"Could not load URDF for dynamics: {e}. Gravity compensation will not be available.") + + @property + def _motors_ft(self) -> Dict[str, type]: + """Motor features for observation and action spaces.""" + features = {} + for motor in self.bus.motors: + features[f"{motor}.pos"] = float + features[f"{motor}.vel"] = float + features[f"{motor}.torque"] = float + return features + + @property + def _cameras_ft(self) -> Dict[str, tuple]: + """Camera features for observation space.""" + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) + for cam in self.cameras + } + + @cached_property + def observation_features(self) -> Dict[str, type | tuple]: + """Combined observation features from motors and cameras.""" + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> Dict[str, type]: + """Action features (motor positions only).""" + return self._motors_ft + + @property + def is_connected(self) -> bool: + """Check if robot is connected.""" + return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) + + def connect(self, calibrate: bool = True) -> None: + """ + Connect to the robot and optionally calibrate. + + We assume that at connection time, the arm is in a safe rest position, + and torque can be safely disabled to run calibration if needed. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + # Connect to CAN bus + self.bus.connect() + + # Run calibration if needed + if not self.is_calibrated and calibrate: + logger.info( + "No calibration found or calibration mismatch. Running calibration..." + ) + self.calibrate() + + # Connect cameras + for cam in self.cameras.values(): + cam.connect() + + # Configure motors + self.configure() + + # Optionally set zero position + if self.config.zero_position_on_connect: + logger.info("Setting current position as zero...") + self.bus.set_zero_position() + + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + """Check if robot is calibrated.""" + return self.bus.is_calibrated + + def calibrate(self) -> None: + """ + Run calibration procedure for OpenArms robot. + + The calibration procedure: + 1. Disable torque + 2. Ask user to position arm in hanging position with gripper closed + 3. Set this as zero position + 4. Record range of motion for each joint + 5. Save calibration + """ + if self.calibration: + # Ask user whether to use existing calibration + user_input = input( + f"Press ENTER to use existing calibration for {self.id}, " + f"or type 'c' and press ENTER to run new calibration: " + ) + if user_input.strip().lower() != "c": + logger.info(f"Using existing calibration for {self.id}") + self.bus.write_calibration(self.calibration) + return + + logger.info(f"\nRunning calibration for {self}") + + # Disable torque for manual positioning + self.bus.disable_torque() + time.sleep(0.1) + + # Step 1: Set zero position + input( + "\nCalibration Step 1: Zero Position\n" + "Position the arm in the following configuration:\n" + " - Arm hanging straight down\n" + " - Gripper closed\n" + "Press ENTER when ready..." + ) + + # Set current position as zero for all motors + self.bus.set_zero_position() + logger.info("Zero position set.") + + # Step 2: Record range of motion + print( + "\nCalibration Step 2: Range of Motion\n" + "Move each joint through its full range of motion.\n" + "The system will record min/max positions.\n" + "Press ENTER when done..." + ) + + # Record ranges + range_mins, range_maxes = self.bus.record_ranges_of_motion() + + # Create calibration data (ranges are already in degrees) + self.calibration = {} + for motor_name, motor in self.bus.motors.items(): + self.calibration[motor_name] = MotorCalibration( + id=motor.id, + drive_mode=0, # Normal direction + homing_offset=0, # Already set via set_zero_position + range_min=range_mins.get(motor_name, -180.0), # Default -180 degrees + range_max=range_maxes.get(motor_name, 180.0), # Default +180 degrees + ) + + # Special handling for gripper range + if "gripper" in self.calibration: + gripper_cal = self.calibration["gripper"] + gripper_range = abs(gripper_cal.range_max - gripper_cal.range_min) + if gripper_range < 5.0: # If gripper wasn't moved much (less than 5 degrees) + # Set default gripper range in degrees + gripper_cal.range_min = 0.0 + gripper_cal.range_max = 90.0 # 90 degrees for full gripper motion + + # Write calibration to motors and save to file + self.bus.write_calibration(self.calibration) + self._save_calibration() + + print(f"\nCalibration complete and saved to {self.calibration_fpath}") + + # Re-enable torque + self.bus.enable_torque() + + def configure(self) -> None: + """Configure motors with appropriate settings.""" + with self.bus.torque_disabled(): + # Configure all motors + self.bus.configure_motors() + + # Set specific parameters for gripper if present + if "gripper" in self.bus.motors: + # Gripper uses lower gains to avoid damage + # These will be applied during MIT control commands + pass # Parameters are set during control commands + + def setup_motors(self) -> None: + raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.") + + def get_observation(self) -> Dict[str, Any]: + """Get current observation from robot including position, velocity, and torque.""" + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + obs_dict = {} + + # Read motor positions, velocities, and torques + start = time.perf_counter() + positions = self.bus.sync_read("Present_Position") + velocities = self.bus.sync_read("Present_Velocity") + torques = self.bus.sync_read("Present_Torque") + + for motor in self.bus.motors: + obs_dict[f"{motor}.pos"] = positions.get(motor, 0.0) + obs_dict[f"{motor}.vel"] = velocities.get(motor, 0.0) + obs_dict[f"{motor}.torque"] = torques.get(motor, 0.0) + + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: Dict[str, Any]) -> Dict[str, Any]: + """ + Send action command to robot. + + The action magnitude may be clipped based on safety limits. + + Args: + action: Dictionary with motor positions + + Returns: + The action actually sent (potentially clipped) + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Extract motor positions from action + goal_pos = { + key.removesuffix(".pos"): val + for key, val in action.items() + if key.endswith(".pos") + } + + # Apply safety limits if configured + if self.config.max_relative_target is not None: + present_pos = self.bus.sync_read("Present_Position") + goal_present_pos = { + key: (g_pos, present_pos[key]) + for key, g_pos in goal_pos.items() + } + goal_pos = ensure_safe_goal_position( + goal_present_pos, + self.config.max_relative_target + ) + + # Prepare MIT control commands for each motor + for motor_name, position_degrees in goal_pos.items(): + # Use different gains for gripper + if motor_name == "gripper": + kp = self.config.position_kp * 0.5 # Lower gain for gripper + kd = self.config.position_kd * 0.5 + else: + kp = self.config.position_kp + kd = self.config.position_kd + + # Send MIT control command (position is in degrees) + self.bus._mit_control( + motor_name, + kp=kp, + kd=kd, + position_degrees=position_degrees, + velocity_deg_per_sec=0.0, + torque=0.0 + ) + + return {f"{motor}.pos": val for motor, val in goal_pos.items()} + + def disconnect(self): + """Disconnect from robot.""" + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Disconnect from CAN bus + self.bus.disconnect(self.config.disable_torque_on_disconnect) + + # Disconnect cameras + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") + + def _deg_to_rad(self, deg: Dict[str, float | int]) -> Dict[str, float]: + """Convert degrees to radians for all motors.""" + return {m: np.deg2rad(float(v)) for m, v in deg.items()} + + def _gravity_from_q(self, q_rad: Dict[str, float]) -> Dict[str, float]: + """ + Compute g(q) [N·m] for all joints in the robot. + The order of joints in the URDF matches self.bus.motors. + + Args: + q_rad: Dictionary mapping motor names to positions in radians + + Returns: + Dictionary mapping motor names to gravity torques in N·m + + Raises: + RuntimeError: If URDF model is not loaded + """ + if self.pin_robot is None: + raise RuntimeError( + "Cannot compute gravity: URDF model not loaded. " + "Ensure urdf/openarms.urdf exists and is valid." + ) + + # Build position vector in the order of motors + q = np.zeros(self.pin_robot.model.nq) + for i, motor_name in enumerate(self.bus.motors): + q[i] = q_rad[motor_name] + + # Compute generalized gravity vector + g = pin.computeGeneralizedGravity(self.pin_robot.model, self.pin_robot.data, q) + + # Map back to motor names + return {motor_name: float(g[i]) for i, motor_name in enumerate(self.bus.motors)} + diff --git a/src/lerobot/teleoperators/openarms/__init__.py b/src/lerobot/teleoperators/openarms/__init__.py new file mode 100644 index 000000000..d253db062 --- /dev/null +++ b/src/lerobot/teleoperators/openarms/__init__.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .config_openarms_leader import OpenArmsLeaderConfig +from .openarms_leader import OpenArmsLeader + +__all__ = ["OpenArmsLeader", "OpenArmsLeaderConfig"] diff --git a/src/lerobot/teleoperators/openarms/config_openarms_leader.py b/src/lerobot/teleoperators/openarms/config_openarms_leader.py new file mode 100644 index 000000000..0960a707d --- /dev/null +++ b/src/lerobot/teleoperators/openarms/config_openarms_leader.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field +from typing import Dict + +from ..config import TeleoperatorConfig + + +@TeleoperatorConfig.register_subclass("openarms_leader") +@dataclass +class OpenArmsLeaderConfig(TeleoperatorConfig): + """Configuration for the OpenArms leader/teleoperator with Damiao motors.""" + + # CAN interface to connect to + # Linux: "can0", "can1", etc. + # macOS: "/dev/cu.usbmodem*" (serial device) + port: str = "can0" + + # CAN interface type: "socketcan" (Linux), "slcan" (macOS/serial), or "auto" (auto-detect) + can_interface: str = "auto" + + # Motor configuration for OpenArms (7 DOF per arm) + # Maps motor names to (send_can_id, recv_can_id, motor_type) + # Based on: https://docs.openarm.dev/software/setup/configure-test + # OpenArms uses 4 types of motors: + # - DM8009 (DM-J8009P-2EC) for shoulders (high torque) + # - DM4340P and DM4340 for shoulder rotation and elbow + # - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper + motor_config: Dict[str, tuple[int, int, str]] = field(default_factory=lambda: { + "joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009) + "joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009) + "joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340) + "joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340) + "joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310) + "joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310) + "joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310) + "gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310) + }) + + # Torque mode settings for manual control + # When enabled, motors have torque disabled for manual movement + manual_control: bool = True diff --git a/src/lerobot/teleoperators/openarms/openarms_leader.py b/src/lerobot/teleoperators/openarms/openarms_leader.py new file mode 100644 index 000000000..f49fac053 --- /dev/null +++ b/src/lerobot/teleoperators/openarms/openarms_leader.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from typing import Dict + +from lerobot.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.motors.damiao import DamiaoMotorsBus +from lerobot.motors.damiao.tables import MotorType +from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +from ..teleoperator import Teleoperator +from .config_openarms_leader import OpenArmsLeaderConfig + +logger = logging.getLogger(__name__) + + +class OpenArmsLeader(Teleoperator): + """ + OpenArms Leader/Teleoperator Arm with Damiao motors. + + This teleoperator uses CAN bus communication to read positions from + Damiao motors that are manually moved (torque disabled). + """ + + config_class = OpenArmsLeaderConfig + name = "openarms_leader" + + def __init__(self, config: OpenArmsLeaderConfig): + super().__init__(config) + self.config = config + + + norm_mode_body = MotorNormMode.DEGREES # Always use degrees for Damiao motors + motors = {} + + # Right arm (original IDs) + for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items(): + prefixed_name = f"right_{motor_name}" + motor = Motor(send_id, motor_type_str, norm_mode_body) + motor.recv_id = recv_id + motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_")) + motors[prefixed_name] = motor + + # Left arm (offset IDs by 8) + for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items(): + prefixed_name = f"left_{motor_name}" + motor = Motor(send_id + 0x08, motor_type_str, norm_mode_body) + motor.recv_id = recv_id + 0x08 + motor.motor_type = getattr(MotorType, motor_type_str.upper().replace("-", "_")) + motors[prefixed_name] = motor + + # Initialize the Damiao motors bus + self.bus = DamiaoMotorsBus( + port=self.config.port, + motors=motors, + calibration=self.calibration, + can_interface=self.config.can_interface, + ) + + @property + def action_features(self) -> Dict[str, type]: + """Features produced by this teleoperator.""" + features = {} + for motor in self.bus.motors: + features[f"{motor}.pos"] = float + features[f"{motor}.vel"] = float + features[f"{motor}.torque"] = float + return features + + @property + def feedback_features(self) -> Dict[str, type]: + """Feedback features (not implemented for OpenArms).""" + return {} + + @property + def is_connected(self) -> bool: + """Check if teleoperator is connected.""" + return self.bus.is_connected + + def connect(self, calibrate: bool = True) -> None: + """ + Connect to the teleoperator. + + For manual control, we disable torque after connecting so the + arm can be moved by hand. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + # Connect to CAN bus + self.bus.connect() + + # Run calibration if needed + if not self.is_calibrated and calibrate: + logger.info( + "No calibration found or calibration mismatch. Running calibration..." + ) + self.calibrate() + + # Configure for manual control + self.configure() + + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + """Check if teleoperator is calibrated.""" + return self.bus.is_calibrated + + def calibrate(self) -> None: + """ + Run calibration procedure for OpenArms leader. + + The calibration procedure: + 1. Disable torque (if not already disabled) + 2. Ask user to position arm in zero position (hanging with gripper closed) + 3. Set this as zero position + 4. Record range of motion for each joint + 5. Save calibration + """ + if self.calibration: + # Ask user whether to use existing calibration + user_input = input( + f"Press ENTER to use existing calibration for {self.id}, " + f"or type 'c' and press ENTER to run new calibration: " + ) + if user_input.strip().lower() != "c": + logger.info(f"Using existing calibration for {self.id}") + self.bus.write_calibration(self.calibration) + return + + logger.info(f"\nRunning calibration for {self}") + + # Ensure torque is disabled for manual positioning + self.bus.disable_torque() + time.sleep(0.1) + + # Step 1: Set zero position + input( + "\nCalibration Step 1: Zero Position\n" + "Position the arm in the following configuration:\n" + " - Arm hanging straight down\n" + " - Gripper closed\n" + "Press ENTER when ready..." + ) + + # Set current position as zero for all motors + self.bus.set_zero_position() + logger.info("Zero position set.") + + # Step 2: Record range of motion + print( + "\nCalibration Step 2: Range of Motion\n" + "Move each joint through its full range of motion.\n" + "The system will record min/max positions.\n" + "Press ENTER when done..." + ) + + # Record ranges + range_mins, range_maxes = self.bus.record_ranges_of_motion() + + # Create calibration data (ranges are already in degrees) + self.calibration = {} + for motor_name, motor in self.bus.motors.items(): + self.calibration[motor_name] = MotorCalibration( + id=motor.id, + drive_mode=0, # Normal direction + homing_offset=0, # Already set via set_zero_position + range_min=range_mins.get(motor_name, -180.0), # Default -180 degrees + range_max=range_maxes.get(motor_name, 180.0), # Default +180 degrees + ) + + # Special handling for gripper range + if "gripper" in self.calibration: + gripper_cal = self.calibration["gripper"] + gripper_range = abs(gripper_cal.range_max - gripper_cal.range_min) + if gripper_range < 5.0: # If gripper wasn't moved much (less than 5 degrees) + # Set default gripper range in degrees + gripper_cal.range_min = 0.0 + gripper_cal.range_max = 90.0 # 90 degrees for full gripper motion + + # Write calibration and save to file + self.bus.write_calibration(self.calibration) + self._save_calibration() + + print(f"\nCalibration complete and saved to {self.calibration_fpath}") + + def configure(self) -> None: + """ + Configure motors for manual teleoperation. + + For manual control, we disable torque so the arm can be moved by hand. + """ + if self.config.manual_control: + # Disable torque for manual control + logger.info("Disabling torque for manual control...") + self.bus.disable_torque() + else: + # Configure motors normally + self.bus.configure_motors() + + def setup_motors(self) -> None: + raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.") + + + def get_observation(self) -> Dict[str, Any]: + """Get current observation from robot including position, velocity, and torque.""" + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + obs_dict = {} + + # Read motor positions, velocities, and torques + start = time.perf_counter() + positions = self.bus.sync_read("Present_Position") + velocities = self.bus.sync_read("Present_Velocity") + torques = self.bus.sync_read("Present_Torque") + + for motor in self.bus.motors: + obs_dict[f"{motor}.pos"] = positions.get(motor, 0.0) + obs_dict[f"{motor}.vel"] = velocities.get(motor, 0.0) + obs_dict[f"{motor}.torque"] = torques.get(motor, 0.0) + + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_feedback(self, feedback: Dict[str, float]) -> None: + raise NotImplementedError("Feedback is not yet implemented for OpenArms leader.") + + def disconnect(self) -> None: + """Disconnect from teleoperator.""" + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # For manual control, ensure torque is disabled before disconnecting + if self.config.manual_control: + try: + self.bus.disable_torque() + except Exception as e: + logger.warning(f"Failed to disable torque during disconnect: {e}") + + # Disconnect from CAN bus + self.bus.disconnect(disable_torque=False) # Already disabled above if needed + + logger.info(f"{self} disconnected.") +