diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml
index f86dd11c7..eb97117af 100644
--- a/docs/source/_toctree.yml
+++ b/docs/source/_toctree.yml
@@ -101,6 +101,8 @@
title: Earth Rover Mini
- local: omx
title: OMX
+ - local: openarm
+ title: OpenArm
title: "Robots"
- sections:
- local: phone_teleop
diff --git a/docs/source/openarm.mdx b/docs/source/openarm.mdx
new file mode 100644
index 000000000..661808749
--- /dev/null
+++ b/docs/source/openarm.mdx
@@ -0,0 +1,258 @@
+# OpenArm
+
+[OpenArm](https://openarm.dev) is an open-source 7DOF humanoid arm designed for physical AI research and deployment.
+
+To get your OpenArm, assembled or DIY, and join the global community, browse verified and certified manufacturers worldwide at [openarm.dev](https://openarm.dev).
+
+## What's Unique?
+
+- **Human-Scale Design**: OpenArm is designed with human-like proportions, scaled for a person around 160-165cm tall. This provides an optimal balance between practical reach and manageable inertia for safe, responsive operation.
+
+- **Safety-First Architecture**: Built with QDD backdrivable motors and high compliance, OpenArm prioritizes safe human-robot interaction while maintaining practical payload capabilities (6.0kg peak / 4.1kg nominal) for real-world tasks.
+
+- **Built for Durability**: Critical structural components use aluminum and stainless steel construction, ensuring robust performance for repetitive data collection and continuous research use.
+
+- **Fully Accessible & Buildable**: Every component, from CNC parts and 3D-printed casings to electrical wiring is designed to be purchasable and buildable by individual researchers and labs, with complete fabrication data provided.
+
+- **Practical & Affordable**: At $6,500 USD for a complete bimanual system, OpenArm delivers research-grade capabilities at a fraction of traditional humanoid robot costs.
+
+## Platform Requirements
+
+
+ **Linux Only**: OpenArm currently only works on Linux. The CAN bus USB adapter
+ does not have macOS drivers and has not been tested on Windows.
+
+
+## Safety Guide
+
+Before operating OpenArm, please read the [official safety guide](https://docs.openarm.dev/getting-started/safety-guide). Key points:
+
+- **Secure installation**: Fasten the arm to a flat, stable surface with screws or clamps
+- **Safe distance**: Keep body parts and objects outside the range of motion during operation
+- **Protective equipment**: Always wear safety goggles; use additional PPE as needed
+- **Payload limits**: Do not exceed specified payload limits (6.0kg peak / 4.1kg nominal per arm)
+- **Emergency stop**: Know the location and operation of the emergency stop device
+- **Regular inspection**: Check for loose screws, damaged mechanical limits, unusual noises, and wiring damage
+
+## Hardware Setup
+
+Follow the official [OpenArm hardware documentation](https://docs.openarm.dev) for:
+
+- Bill of materials and sourcing
+- 3D printing instructions
+- Mechanical assembly
+- Electrical wiring
+
+The hardware repositories are available at [github.com/enactic/openarm](https://github.com/enactic/openarm).
+
+## CAN Bus Setup
+
+OpenArm uses CAN bus communication with Damiao motors. Once you have the CAN bus USB adapter plugged into your Linux PC, follow the [Damiao Motors and CAN Bus guide](./damiao) to configure the interface.
+
+Quick setup:
+
+```bash
+# Setup CAN interfaces
+lerobot-setup-can --mode=setup --interfaces=can0,can1
+
+# Test motor communication
+lerobot-setup-can --mode=test --interfaces=can0,can1
+```
+
+## Install LeRobot 🤗
+
+Follow our [Installation Guide](./installation), then install the Damiao motor support:
+
+```bash
+pip install -e ".[damiao]"
+```
+
+## Usage
+
+### Follower Arm (Robot)
+
+
+
+
+```bash
+lerobot-calibrate \
+ --robot.type=openarm_follower \
+ --robot.port=can0 \
+ --robot.side=right \
+ --robot.id=my_openarm_follower
+```
+
+
+
+
+```python
+from lerobot.robots.openarm_follower import OpenArmFollower, OpenArmFollowerConfig
+
+config = OpenArmFollowerConfig(
+ port="can0",
+ side="right", # or "left" for left arm
+ id="my_openarm_follower",
+)
+
+follower = OpenArmFollower(config)
+follower.connect()
+
+# Read current state
+obs = follower.get_observation()
+print(obs)
+
+# Send action (position in degrees)
+action = {
+ "joint_1.pos": 0.0,
+ "joint_2.pos": 0.0,
+ "joint_3.pos": 0.0,
+ "joint_4.pos": 45.0,
+ "joint_5.pos": 0.0,
+ "joint_6.pos": 0.0,
+ "joint_7.pos": 0.0,
+ "gripper.pos": 0.0,
+}
+follower.send_action(action)
+
+follower.disconnect()
+```
+
+
+
+
+### Leader Arm (Teleoperator)
+
+The leader arm is used for teleoperation - manually moving it to control the follower arm.
+
+
+
+
+```bash
+lerobot-calibrate \
+ --teleop.type=openarm_leader \
+ --teleop.port=can1 \
+ --teleop.id=my_openarm_leader
+```
+
+
+
+
+```python
+from lerobot.teleoperators.openarm_leader import OpenArmLeader, OpenArmLeaderConfig
+
+config = OpenArmLeaderConfig(
+ port="can1",
+ id="my_openarm_leader",
+ manual_control=True, # Disable torque for manual movement
+)
+
+leader = OpenArmLeader(config)
+leader.connect()
+
+# Read current position (as action to send to follower)
+action = leader.get_action()
+print(action)
+
+leader.disconnect()
+```
+
+
+
+
+### Teleoperation
+
+To teleoperate OpenArm with leader-follower control:
+
+```bash
+lerobot-teleoperate \
+ --robot.type=openarm_follower \
+ --robot.port=can0 \
+ --robot.side=right \
+ --robot.id=my_follower \
+ --teleop.type=openarm_leader \
+ --teleop.port=can1 \
+ --teleop.id=my_leader
+```
+
+### Recording Data
+
+To record a dataset during teleoperation:
+
+```bash
+lerobot-record \
+ --robot.type=openarm_follower \
+ --robot.port=can0 \
+ --robot.side=right \
+ --robot.id=my_follower \
+ --teleop.type=openarm_leader \
+ --teleop.port=can1 \
+ --teleop.id=my_leader \
+ --repo-id=my_hf_username/my_openarm_dataset \
+ --fps=30 \
+ --num-episodes=10
+```
+
+## Configuration Options
+
+### Follower Configuration
+
+| Parameter | Default | Description |
+| --------------------- | --------- | ---------------------------------------------------------- |
+| `port` | - | CAN interface (e.g., `can0`) |
+| `side` | `None` | Arm side: `"left"`, `"right"`, or `None` for custom limits |
+| `use_can_fd` | `True` | Enable CAN FD for higher data rates |
+| `can_bitrate` | `1000000` | Nominal bitrate (1 Mbps) |
+| `can_data_bitrate` | `5000000` | CAN FD data bitrate (5 Mbps) |
+| `max_relative_target` | `None` | Safety limit for relative target positions |
+| `position_kp` | Per-joint | Position control proportional gains |
+| `position_kd` | Per-joint | Position control derivative gains |
+
+### Leader Configuration
+
+| Parameter | Default | Description |
+| ------------------ | --------- | ----------------------------------- |
+| `port` | - | CAN interface (e.g., `can1`) |
+| `manual_control` | `True` | Disable torque for manual movement |
+| `use_can_fd` | `True` | Enable CAN FD for higher data rates |
+| `can_bitrate` | `1000000` | Nominal bitrate (1 Mbps) |
+| `can_data_bitrate` | `5000000` | CAN FD data bitrate (5 Mbps) |
+
+## Motor Configuration
+
+OpenArm uses Damiao motors with the following default configuration:
+
+| Joint | Motor Type | Send ID | Recv ID |
+| --------------------------- | ---------- | ------- | ------- |
+| joint_1 (Shoulder pan) | DM8009 | 0x01 | 0x11 |
+| joint_2 (Shoulder lift) | DM8009 | 0x02 | 0x12 |
+| joint_3 (Shoulder rotation) | DM4340 | 0x03 | 0x13 |
+| joint_4 (Elbow flex) | DM4340 | 0x04 | 0x14 |
+| joint_5 (Wrist roll) | DM4310 | 0x05 | 0x15 |
+| joint_6 (Wrist pitch) | DM4310 | 0x06 | 0x16 |
+| joint_7 (Wrist rotation) | DM4310 | 0x07 | 0x17 |
+| gripper | DM4310 | 0x08 | 0x18 |
+
+## Troubleshooting
+
+### No Response from Motors
+
+1. Check power supply connections
+2. Verify CAN wiring (CAN-H, CAN-L, GND)
+3. Run diagnostics: `lerobot-setup-can --mode=test --interfaces=can0`
+4. See the [Damiao troubleshooting guide](./damiao#troubleshooting) for more details
+
+### CAN Interface Not Found
+
+Ensure the CAN interface is configured:
+
+```bash
+ip link show can0
+```
+
+## Resources
+
+- [OpenArm Website](https://openarm.dev)
+- [OpenArm Documentation](https://docs.openarm.dev)
+- [OpenArm GitHub](https://github.com/enactic/openarm)
+- [Safety Guide](https://docs.openarm.dev/getting-started/safety-guide)
+- [Damiao Motors and CAN Bus](./damiao)
diff --git a/pyproject.toml b/pyproject.toml
index 27126f855..ea2dfb4a2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -105,6 +105,7 @@ dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"]
damiao = ["python-can>=4.2.0,<5.0.0"]
# Robots
+openarms = ["lerobot[damiao]"]
gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"]
hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
diff --git a/src/lerobot/motors/damiao/damiao.py b/src/lerobot/motors/damiao/damiao.py
index dd0213fc3..c79f8d17e 100644
--- a/src/lerobot/motors/damiao/damiao.py
+++ b/src/lerobot/motors/damiao/damiao.py
@@ -28,8 +28,11 @@ from lerobot.utils.import_utils import _can_available
if TYPE_CHECKING or _can_available:
import can
else:
- can.Message = object
- can.interface = None
+
+ class can: # noqa: N801
+ Message = object
+ interface = None
+
import numpy as np
@@ -206,11 +209,31 @@ class DamiaoMotorsBus(MotorsBusBase):
Raises ConnectionError if any motor fails to respond.
"""
logger.info("Starting handshake with motors...")
- missing_motors = []
+ # Drain any pending messages
+ while self.canbus.recv(timeout=0.01):
+ pass
+
+ missing_motors = []
for motor_name in self.motors:
- msg = self._refresh_motor(motor_name)
- if msg is None:
+ motor_id = self._get_motor_id(motor_name)
+ recv_id = self._get_motor_recv_id(motor_name)
+
+ # Send enable command
+ data = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, CAN_CMD_ENABLE]
+ msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
+ self.canbus.send(msg)
+
+ # Wait for response with longer timeout
+ response = None
+ start_time = time.time()
+ while time.time() - start_time < 0.1:
+ response = self.canbus.recv(timeout=0.1)
+ if response and response.arbitration_id == recv_id:
+ break
+ response = None
+
+ if response is None:
missing_motors.append(motor_name)
else:
self._process_response(motor_name, msg)
@@ -259,7 +282,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_name = self._get_motor_name(motor)
recv_id = self._get_motor_recv_id(motor)
data = [0xFF] * 7 + [command_byte]
- msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
+ msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
if msg := self._recv_motor_response(expected_recv_id=recv_id):
self._process_response(motor_name, msg)
@@ -317,7 +340,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_id = self._get_motor_id(motor)
recv_id = self._get_motor_recv_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
- msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False)
+ msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
return self._recv_motor_response(expected_recv_id=recv_id)
@@ -439,7 +462,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_type = self._motor_types[motor_name]
data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque)
- msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
+ msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
recv_id = self._get_motor_recv_id(motor)
@@ -472,7 +495,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_type = self._motor_types[motor_name]
data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque)
- msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
+ msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
recv_id_to_motor[self._get_motor_recv_id(motor)] = motor_name
@@ -637,10 +660,10 @@ class DamiaoMotorsBus(MotorsBusBase):
for motor in motors:
motor_id = self._get_motor_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
- msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False)
+ msg = can.Message(
+ arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False, is_fd=self.use_can_fd
+ )
self.canbus.send(msg)
- # Small delay to reduce bus congestion if necessary, though removed in sync_read previously
- # precise_sleep(PRECISE_SLEEP_SEC)
# Collect responses
expected_recv_ids = [self._get_motor_recv_id(m) for m in motors]
@@ -676,7 +699,9 @@ class DamiaoMotorsBus(MotorsBusBase):
kd = self._gains[motor]["kd"]
data = self._encode_mit_packet(motor_type, kp, kd, float(value_degrees), 0.0, 0.0)
- msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
+ msg = can.Message(
+ arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd
+ )
self.canbus.send(msg)
precise_sleep(PRECISE_TIMEOUT_SEC)
diff --git a/src/lerobot/processor/hil_processor.py b/src/lerobot/processor/hil_processor.py
index f0dbac9c3..6d44ed8cb 100644
--- a/src/lerobot/processor/hil_processor.py
+++ b/src/lerobot/processor/hil_processor.py
@@ -18,16 +18,18 @@
import math
import time
from dataclasses import dataclass
-from typing import Any, Protocol, TypeVar, runtime_checkable
+from typing import TYPE_CHECKING, Any, Protocol, TypeVar, runtime_checkable
import numpy as np
import torch
import torchvision.transforms.functional as F # noqa: N812
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
-from lerobot.teleoperators.teleoperator import Teleoperator
from lerobot.teleoperators.utils import TeleopEvents
+if TYPE_CHECKING:
+ from lerobot.teleoperators.teleoperator import Teleoperator
+
from .core import EnvTransition, PolicyAction, TransitionKey
from .pipeline import (
ComplementaryDataProcessorStep,
@@ -69,10 +71,10 @@ class HasTeleopEvents(Protocol):
# Type variable constrained to Teleoperator subclasses that also implement events
-TeleopWithEvents = TypeVar("TeleopWithEvents", bound=Teleoperator)
+TeleopWithEvents = TypeVar("TeleopWithEvents", bound="Teleoperator")
-def _check_teleop_with_events(teleop: Teleoperator) -> None:
+def _check_teleop_with_events(teleop: "Teleoperator") -> None:
"""
Runtime check that a teleoperator implements the `HasTeleopEvents` protocol.
@@ -103,7 +105,7 @@ class AddTeleopActionAsComplimentaryDataStep(ComplementaryDataProcessorStep):
teleop_device: The teleoperator instance to get the action from.
"""
- teleop_device: Teleoperator
+ teleop_device: "Teleoperator"
def complementary_data(self, complementary_data: dict) -> dict:
"""
diff --git a/src/lerobot/robots/openarm_follower/__init__.py b/src/lerobot/robots/openarm_follower/__init__.py
new file mode 100644
index 000000000..1eb0d9fc7
--- /dev/null
+++ b/src/lerobot/robots/openarm_follower/__init__.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+
+# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .config_openarm_follower import OpenArmFollowerConfig
+from .openarm_follower import OpenArmFollower
+
+__all__ = ["OpenArmFollower", "OpenArmFollowerConfig"]
diff --git a/src/lerobot/robots/openarm_follower/config_openarm_follower.py b/src/lerobot/robots/openarm_follower/config_openarm_follower.py
new file mode 100644
index 000000000..af95b6395
--- /dev/null
+++ b/src/lerobot/robots/openarm_follower/config_openarm_follower.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+
+# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from dataclasses import dataclass, field
+
+from lerobot.cameras import CameraConfig
+
+from ..config import RobotConfig
+
+LEFT_DEFAULT_JOINTS_LIMITS: dict[str, tuple[float, float]] = {
+ "joint_1": (-75.0, 75.0),
+ "joint_2": (-90.0, 9.0),
+ "joint_3": (-85.0, 85.0),
+ "joint_4": (0.0, 135.0),
+ "joint_5": (-85.0, 85.0),
+ "joint_6": (-40.0, 40.0),
+ "joint_7": (-80.0, 80.0),
+ "gripper": (-65.0, 0.0),
+}
+
+RIGHT_DEFAULT_JOINTS_LIMITS: dict[str, tuple[float, float]] = {
+ "joint_1": (-75.0, 75.0),
+ "joint_2": (-9.0, 90.0),
+ "joint_3": (-85.0, 85.0),
+ "joint_4": (0.0, 135.0),
+ "joint_5": (-85.0, 85.0),
+ "joint_6": (-40.0, 40.0),
+ "joint_7": (-80.0, 80.0),
+ "gripper": (-65.0, 0.0),
+}
+
+
+@RobotConfig.register_subclass("openarm_follower")
+@dataclass
+class OpenArmFollowerConfig(RobotConfig):
+ """Configuration for the OpenArms follower robot with Damiao motors."""
+
+ # CAN interfaces - one per arm
+ # arm CAN interface (e.g., "can1")
+ # Linux: "can0", "can1", etc.
+ port: str
+
+ # side of the arm: "left" or "right". If "None" default values will be used
+ side: str | None = None
+
+ # CAN interface type: "socketcan" (Linux), "slcan" (serial), or "auto" (auto-detect)
+ can_interface: str = "socketcan"
+
+ # CAN FD settings (OpenArms uses CAN FD by default)
+ use_can_fd: bool = True
+ can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps)
+ can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps)
+
+ # Whether to disable torque when disconnecting
+ disable_torque_on_disconnect: bool = True
+
+ # Safety limit for relative target positions
+ # Set to a positive scalar for all motors, or a dict mapping motor names to limits
+ max_relative_target: float | dict[str, float] | None = None
+
+ # Camera configurations
+ cameras: dict[str, CameraConfig] = field(default_factory=dict)
+
+ # Motor configuration for OpenArms (7 DOF per arm)
+ # Maps motor names to (send_can_id, recv_can_id, motor_type)
+ # Based on: https://docs.openarm.dev/software/setup/configure-test
+ # OpenArms uses 4 types of motors:
+ # - DM8009 (DM-J8009P-2EC) for shoulders (high torque)
+ # - DM4340P and DM4340 for shoulder rotation and elbow
+ # - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper
+ motor_config: dict[str, tuple[int, int, str]] = field(
+ default_factory=lambda: {
+ "joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009)
+ "joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009)
+ "joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340)
+ "joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340)
+ "joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310)
+ "joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310)
+ "joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310)
+ "gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
+ }
+ )
+
+ # MIT control parameters for position control (used in send_action)
+ # List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
+ position_kp: list[float] = field(
+ default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 25.0]
+ )
+ position_kd: list[float] = field(default_factory=lambda: [5.0, 5.0, 3.0, 5.0, 0.3, 0.3, 0.3, 0.3])
+
+ # Values for joint limits. Can be overridden via CLI (for custom values) or by setting config.side to either 'left' or 'right'.
+ # If config.side is left set to None and no CLI values are passed, the default joint limit values are small for safety.
+ joint_limits: dict[str, tuple[float, float]] = field(
+ default_factory=lambda: {
+ "joint_1": (-5.0, 5.0),
+ "joint_2": (-5.0, 5.0),
+ "joint_3": (-5.0, 5.0),
+ "joint_4": (0.0, 5.0),
+ "joint_5": (-5.0, 5.0),
+ "joint_6": (-5.0, 5.0),
+ "joint_7": (-5.0, 5.0),
+ "gripper": (-5.0, 0.0),
+ }
+ )
diff --git a/src/lerobot/robots/openarm_follower/openarm_follower.py b/src/lerobot/robots/openarm_follower/openarm_follower.py
new file mode 100644
index 000000000..c221afd10
--- /dev/null
+++ b/src/lerobot/robots/openarm_follower/openarm_follower.py
@@ -0,0 +1,348 @@
+#!/usr/bin/env python
+
+# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+from functools import cached_property
+from typing import Any
+
+from lerobot.cameras.utils import make_cameras_from_configs
+from lerobot.motors import Motor, MotorCalibration, MotorNormMode
+from lerobot.motors.damiao import DamiaoMotorsBus
+from lerobot.processor import RobotAction, RobotObservation
+from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
+
+from ..robot import Robot
+from ..utils import ensure_safe_goal_position
+from .config_openarm_follower import (
+ LEFT_DEFAULT_JOINTS_LIMITS,
+ RIGHT_DEFAULT_JOINTS_LIMITS,
+ OpenArmFollowerConfig,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class OpenArmFollower(Robot):
+ """
+ OpenArms Follower Robot which uses CAN bus communication to control 7 DOF arm with a gripper.
+ The arm uses Damiao motors in MIT control mode.
+ """
+
+ config_class = OpenArmFollowerConfig
+ name = "openarm_follower"
+
+ def __init__(self, config: OpenArmFollowerConfig):
+ super().__init__(config)
+ self.config = config
+
+ # Arm motors
+ motors: dict[str, Motor] = {}
+ for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
+ motor = Motor(
+ send_id, motor_type_str, MotorNormMode.DEGREES
+ ) # Always use degrees for Damiao motors
+ motor.recv_id = recv_id
+ motor.motor_type_str = motor_type_str
+ motors[motor_name] = motor
+
+ self.bus = DamiaoMotorsBus(
+ port=self.config.port,
+ motors=motors,
+ calibration=self.calibration,
+ can_interface=self.config.can_interface,
+ use_can_fd=self.config.use_can_fd,
+ bitrate=self.config.can_bitrate,
+ data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
+ )
+
+ if config.side is not None:
+ if config.side == "left":
+ config.joint_limits = LEFT_DEFAULT_JOINTS_LIMITS
+ elif config.side == "right":
+ config.joint_limits = RIGHT_DEFAULT_JOINTS_LIMITS
+ else:
+ raise ValueError(
+ "config.side must be either 'left', 'right' (for default values) or 'None' (for CLI values)"
+ )
+ else:
+ logger.info(
+ "Set config.side to either 'left' or 'right' to use pre-configured values for joint limits."
+ )
+ logger.info(f"Values used for joint limits: {config.joint_limits}.")
+
+ # Initialize cameras
+ self.cameras = make_cameras_from_configs(config.cameras)
+
+ @property
+ def _motors_ft(self) -> dict[str, type]:
+ """Motor features for observation and action spaces."""
+ features: dict[str, type] = {}
+ for motor in self.bus.motors:
+ features[f"{motor}.pos"] = float
+ features[f"{motor}.vel"] = float # Add this
+ features[f"{motor}.torque"] = float # Add this
+ return features
+
+ @property
+ def _cameras_ft(self) -> dict[str, tuple]:
+ """Camera features for observation space."""
+ return {
+ cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
+ }
+
+ @cached_property
+ def observation_features(self) -> dict[str, type | tuple]:
+ """Combined observation features from motors and cameras."""
+ return {**self._motors_ft, **self._cameras_ft}
+
+ @cached_property
+ def action_features(self) -> dict[str, type]:
+ """Action features."""
+ return self._motors_ft
+
+ @property
+ def is_connected(self) -> bool:
+ """Check if robot is connected."""
+ return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
+
+ def connect(self, calibrate: bool = True) -> None:
+ """
+ Connect to the robot and optionally calibrate.
+
+ We assume that at connection time, the arms are in a safe rest position,
+ and torque can be safely disabled to run calibration if needed.
+ """
+ if self.is_connected:
+ raise DeviceAlreadyConnectedError(f"{self} already connected")
+
+ # Connect to CAN bus
+ logger.info(f"Connecting arm on {self.config.port}...")
+ self.bus.connect()
+
+ # Run calibration if needed
+ if not self.is_calibrated and calibrate:
+ logger.info(
+ "Mismatch between calibration values in the motor and the calibration file or no calibration file found"
+ )
+ self.calibrate()
+
+ for cam in self.cameras.values():
+ cam.connect()
+
+ self.configure()
+
+ if self.is_calibrated:
+ self.bus.set_zero_position()
+
+ self.bus.enable_torque()
+
+ logger.info(f"{self} connected.")
+
+ @property
+ def is_calibrated(self) -> bool:
+ """Check if robot is calibrated."""
+ return self.bus.is_calibrated
+
+ def calibrate(self) -> None:
+ """
+ Run calibration procedure for OpenArms robot.
+
+ The calibration procedure:
+ 1. Disable torque
+ 2. Ask user to position arms in hanging position with grippers closed
+ 3. Set this as zero position
+ 4. Record range of motion for each joint
+ 5. Save calibration
+ """
+ if self.calibration:
+ # Calibration file exists, ask user whether to use it or run new calibration
+ user_input = input(
+ f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
+ )
+ if user_input.strip().lower() != "c":
+ logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
+ self.bus.write_calibration(self.calibration)
+ return
+
+ logger.info(f"\nRunning calibration for {self}")
+ self.bus.disable_torque()
+
+ # Step 1: Set zero position
+ input(
+ "\nCalibration: Set Zero Position)\n"
+ "Position the arm in the following configuration:\n"
+ " - Arm hanging straight down\n"
+ " - Gripper closed\n"
+ "Press ENTER when ready..."
+ )
+
+ # Set current position as zero for all motors
+ self.bus.set_zero_position()
+ logger.info("Arm zero position set.")
+
+ logger.info("Setting range: -90° to +90° for safety by default for all joints")
+ for motor_name, motor in self.bus.motors.items():
+ self.calibration[motor_name] = MotorCalibration(
+ id=motor.id,
+ drive_mode=0,
+ homing_offset=0,
+ range_min=-90,
+ range_max=90,
+ )
+
+ self.bus.write_calibration(self.calibration)
+ self._save_calibration()
+ print(f"Calibration saved to {self.calibration_fpath}")
+
+ def configure(self) -> None:
+ """Configure motors with appropriate settings."""
+ # TODO(Steven, Pepijn): Slightly different from what it is happening in the leader
+ with self.bus.torque_disabled():
+ self.bus.configure_motors()
+
+ def setup_motors(self) -> None:
+ raise NotImplementedError(
+ "Motor ID configuration is typically done via manufacturer tools for CAN motors."
+ )
+
+ def get_observation(self) -> RobotObservation:
+ """
+ Get current observation from robot including position, velocity, and torque.
+
+ Reads all motor states (pos/vel/torque) in one CAN refresh cycle
+ instead of 3 separate reads.
+ """
+ start = time.perf_counter()
+
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ obs_dict: dict[str, Any] = {}
+
+ states = self.bus.sync_read_all_states()
+
+ for motor in self.bus.motors:
+ state = states.get(motor, {})
+ obs_dict[f"{motor}.pos"] = state.get("position", 0.0)
+ obs_dict[f"{motor}.vel"] = state.get("velocity", 0.0)
+ obs_dict[f"{motor}.torque"] = state.get("torque", 0.0)
+
+ # Capture images from cameras
+ for cam_key, cam in self.cameras.items():
+ start = time.perf_counter()
+ obs_dict[cam_key] = cam.async_read()
+ dt_ms = (time.perf_counter() - start) * 1e3
+ logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
+
+ dt_ms = (time.perf_counter() - start) * 1e3
+ logger.debug(f"{self} get_observation took: {dt_ms:.1f}ms")
+
+ return obs_dict
+
+ def send_action(
+ self,
+ action: RobotAction,
+ custom_kp: dict[str, float] | None = None,
+ custom_kd: dict[str, float] | None = None,
+ ) -> RobotAction:
+ """
+ Send action command to robot.
+
+ The action magnitude may be clipped based on safety limits.
+
+ Args:
+ action: Dictionary with motor positions (e.g., "joint_1.pos", "joint_2.pos")
+ custom_kp: Optional custom kp gains per motor (e.g., {"joint_1": 120.0, "joint_2": 150.0})
+ custom_kd: Optional custom kd gains per motor (e.g., {"joint_1": 1.5, "joint_2": 2.0})
+
+ Returns:
+ The action actually sent (potentially clipped)
+ """
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
+
+ # Apply joint limit clipping to arm
+ for motor_name, position in goal_pos.items():
+ if motor_name in self.config.joint_limits:
+ min_limit, max_limit = self.config.joint_limits[motor_name]
+ clipped_position = max(min_limit, min(max_limit, position))
+ if clipped_position != position:
+ logger.debug(f"Clipped {motor_name} from {position:.2f}° to {clipped_position:.2f}°")
+ goal_pos[motor_name] = clipped_position
+
+ # Cap goal position when too far away from present position.
+ # /!\ Slower fps expected due to reading from the follower.
+ if self.config.max_relative_target is not None:
+ present_pos = self.bus.sync_read("Present_Position")
+ goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
+ goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
+
+ # TODO(Steven, Pepijn): Refactor writing
+ # Motor name to index mapping for gains
+ motor_index = {
+ "joint_1": 0,
+ "joint_2": 1,
+ "joint_3": 2,
+ "joint_4": 3,
+ "joint_5": 4,
+ "joint_6": 5,
+ "joint_7": 6,
+ "gripper": 7,
+ }
+
+ # Use batch MIT control for arm (sends all commands, then collects responses)
+ commands = {}
+ for motor_name, position_degrees in goal_pos.items():
+ idx = motor_index.get(motor_name, 0)
+ # Use custom gains if provided, otherwise use config defaults
+ if custom_kp is not None and motor_name in custom_kp:
+ kp = custom_kp[motor_name]
+ else:
+ kp = (
+ self.config.position_kp[idx]
+ if isinstance(self.config.position_kp, list)
+ else self.config.position_kp
+ )
+ if custom_kd is not None and motor_name in custom_kd:
+ kd = custom_kd[motor_name]
+ else:
+ kd = (
+ self.config.position_kd[idx]
+ if isinstance(self.config.position_kd, list)
+ else self.config.position_kd
+ )
+ commands[motor_name] = (kp, kd, position_degrees, 0.0, 0.0)
+
+ self.bus._mit_control_batch(commands)
+
+ return {f"{motor}.pos": val for motor, val in goal_pos.items()}
+
+ def disconnect(self):
+ """Disconnect from robot."""
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ # Disconnect CAN bus
+ self.bus.disconnect(self.config.disable_torque_on_disconnect)
+
+ # Disconnect cameras
+ for cam in self.cameras.values():
+ cam.disconnect()
+
+ logger.info(f"{self} disconnected.")
diff --git a/src/lerobot/robots/utils.py b/src/lerobot/robots/utils.py
index 27abaaa86..e0c76cab3 100644
--- a/src/lerobot/robots/utils.py
+++ b/src/lerobot/robots/utils.py
@@ -60,6 +60,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
from .reachy2 import Reachy2Robot
return Reachy2Robot(config)
+ elif config.type == "openarm_follower":
+ from .openarm_follower import OpenArmFollower
+
+ return OpenArmFollower(config)
elif config.type == "mock_robot":
from tests.mocks.mock_robot import MockRobot
diff --git a/src/lerobot/scripts/lerobot_calibrate.py b/src/lerobot/scripts/lerobot_calibrate.py
index cbc7684d3..0f79e6aa2 100644
--- a/src/lerobot/scripts/lerobot_calibrate.py
+++ b/src/lerobot/scripts/lerobot_calibrate.py
@@ -42,6 +42,7 @@ from lerobot.robots import ( # noqa: F401
lekiwi,
make_robot_from_config,
omx_follower,
+ openarm_follower,
so_follower,
)
from lerobot.teleoperators import ( # noqa: F401
@@ -52,6 +53,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader,
make_teleoperator_from_config,
omx_leader,
+ openarm_leader,
so_leader,
)
from lerobot.utils.import_utils import register_third_party_plugins
diff --git a/src/lerobot/scripts/lerobot_find_joint_limits.py b/src/lerobot/scripts/lerobot_find_joint_limits.py
index 20bbc8615..d928dc5cd 100644
--- a/src/lerobot/scripts/lerobot_find_joint_limits.py
+++ b/src/lerobot/scripts/lerobot_find_joint_limits.py
@@ -48,6 +48,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower,
make_robot_from_config,
omx_follower,
+ openarm_follower,
so_follower,
)
from lerobot.teleoperators import ( # noqa: F401
@@ -57,6 +58,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader,
make_teleoperator_from_config,
omx_leader,
+ openarm_leader,
so_leader,
)
from lerobot.utils.robot_utils import precise_sleep
diff --git a/src/lerobot/scripts/lerobot_record.py b/src/lerobot/scripts/lerobot_record.py
index f03776989..4d334f38f 100644
--- a/src/lerobot/scripts/lerobot_record.py
+++ b/src/lerobot/scripts/lerobot_record.py
@@ -104,6 +104,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower,
make_robot_from_config,
omx_follower,
+ openarm_follower,
reachy2,
so_follower,
unitree_g1,
@@ -116,6 +117,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader,
make_teleoperator_from_config,
omx_leader,
+ openarm_leader,
reachy2_teleoperator,
so_leader,
)
diff --git a/src/lerobot/scripts/lerobot_replay.py b/src/lerobot/scripts/lerobot_replay.py
index 49c06d643..c3bc3d766 100644
--- a/src/lerobot/scripts/lerobot_replay.py
+++ b/src/lerobot/scripts/lerobot_replay.py
@@ -59,6 +59,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower,
make_robot_from_config,
omx_follower,
+ openarm_follower,
reachy2,
so_follower,
unitree_g1,
diff --git a/src/lerobot/scripts/lerobot_teleoperate.py b/src/lerobot/scripts/lerobot_teleoperate.py
index 18d8863d6..a415dd600 100644
--- a/src/lerobot/scripts/lerobot_teleoperate.py
+++ b/src/lerobot/scripts/lerobot_teleoperate.py
@@ -76,6 +76,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower,
make_robot_from_config,
omx_follower,
+ openarm_follower,
reachy2,
so_follower,
)
@@ -89,6 +90,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader,
make_teleoperator_from_config,
omx_leader,
+ openarm_leader,
reachy2_teleoperator,
so_leader,
)
diff --git a/src/lerobot/teleoperators/openarm_leader/__init__.py b/src/lerobot/teleoperators/openarm_leader/__init__.py
new file mode 100644
index 000000000..1493317fe
--- /dev/null
+++ b/src/lerobot/teleoperators/openarm_leader/__init__.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+
+# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .config_openarm_leader import OpenArmLeaderConfig
+from .openarm_leader import OpenArmLeader
+
+__all__ = ["OpenArmLeader", "OpenArmLeaderConfig"]
diff --git a/src/lerobot/teleoperators/openarm_leader/config_openarm_leader.py b/src/lerobot/teleoperators/openarm_leader/config_openarm_leader.py
new file mode 100644
index 000000000..c53169b0a
--- /dev/null
+++ b/src/lerobot/teleoperators/openarm_leader/config_openarm_leader.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+
+# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from dataclasses import dataclass, field
+
+from ..config import TeleoperatorConfig
+
+
+@TeleoperatorConfig.register_subclass("openarm_leader")
+@dataclass
+class OpenArmLeaderConfig(TeleoperatorConfig):
+ """Configuration for the OpenArms leader/teleoperator with Damiao motors."""
+
+ # CAN interfaces - one per arm
+ # Arm CAN interface (e.g., "can3")
+ # Linux: "can0", "can1", etc.
+ port: str
+
+ # CAN interface type: "socketcan" (Linux), "slcan" (serial), or "auto" (auto-detect)
+ can_interface: str = "socketcan"
+
+ # CAN FD settings (OpenArms uses CAN FD by default)
+ use_can_fd: bool = True
+ can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps)
+ can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps)
+
+ # Motor configuration for OpenArms (7 DOF per arm)
+ # Maps motor names to (send_can_id, recv_can_id, motor_type)
+ # Based on: https://docs.openarm.dev/software/setup/configure-test
+ # OpenArms uses 4 types of motors:
+ # - DM8009 (DM-J8009P-2EC) for shoulders (high torque)
+ # - DM4340P and DM4340 for shoulder rotation and elbow
+ # - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper
+ motor_config: dict[str, tuple[int, int, str]] = field(
+ default_factory=lambda: {
+ "joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009)
+ "joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009)
+ "joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340)
+ "joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340)
+ "joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310)
+ "joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310)
+ "joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310)
+ "gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
+ }
+ )
+
+ # Torque mode settings for manual control
+ # When enabled, motors have torque disabled for manual movement
+ manual_control: bool = True
+
+ # TODO(Steven, Pepijn): Not used ... ?
+ # MIT control parameters (used when manual_control=False for torque control)
+ # List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
+ position_kp: list[float] = field(
+ default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0]
+ )
+ position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
diff --git a/src/lerobot/teleoperators/openarm_leader/openarm_leader.py b/src/lerobot/teleoperators/openarm_leader/openarm_leader.py
new file mode 100644
index 000000000..edf4d7090
--- /dev/null
+++ b/src/lerobot/teleoperators/openarm_leader/openarm_leader.py
@@ -0,0 +1,225 @@
+#!/usr/bin/env python
+
+# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+from typing import Any
+
+from lerobot.motors import Motor, MotorCalibration, MotorNormMode
+from lerobot.motors.damiao import DamiaoMotorsBus
+from lerobot.processor import RobotAction
+from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
+
+from ..teleoperator import Teleoperator
+from .config_openarm_leader import OpenArmLeaderConfig
+
+logger = logging.getLogger(__name__)
+
+
+class OpenArmLeader(Teleoperator):
+ """
+ OpenArm Leader/Teleoperator Arm with Damiao motors.
+
+ This teleoperator uses CAN bus communication to read positions from
+ Damiao motors that are manually moved (torque disabled).
+ """
+
+ config_class = OpenArmLeaderConfig
+ name = "openarm_leader"
+
+ def __init__(self, config: OpenArmLeaderConfig):
+ super().__init__(config)
+ self.config = config
+
+ # Arm motors
+ motors: dict[str, Motor] = {}
+ for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
+ motor = Motor(
+ send_id, motor_type_str, MotorNormMode.DEGREES
+ ) # Always use degrees for Damiao motors
+ motor.recv_id = recv_id
+ motor.motor_type_str = motor_type_str
+ motors[motor_name] = motor
+
+ self.bus = DamiaoMotorsBus(
+ port=self.config.port,
+ motors=motors,
+ calibration=self.calibration,
+ can_interface=self.config.can_interface,
+ use_can_fd=self.config.use_can_fd,
+ bitrate=self.config.can_bitrate,
+ data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
+ )
+
+ @property
+ def action_features(self) -> dict[str, type]:
+ """Features produced by this teleoperator."""
+ features: dict[str, type] = {}
+ for motor in self.bus.motors:
+ features[f"{motor}.pos"] = float
+ features[f"{motor}.vel"] = float
+ features[f"{motor}.torque"] = float
+ return features
+
+ @property
+ def feedback_features(self) -> dict[str, type]:
+ """Feedback features (not implemented for OpenArms)."""
+ return {}
+
+ @property
+ def is_connected(self) -> bool:
+ """Check if teleoperator is connected."""
+ return self.bus.is_connected
+
+ def connect(self, calibrate: bool = True) -> None:
+ """
+ Connect to the teleoperator.
+
+ For manual control, we disable torque after connecting so the
+ arm can be moved by hand.
+ """
+ if self.is_connected:
+ raise DeviceAlreadyConnectedError(f"{self} already connected")
+
+ # Connect to CAN bus
+ logger.info(f"Connecting arm on {self.config.port}...")
+ self.bus.connect()
+
+ # Run calibration if needed
+ if not self.is_calibrated and calibrate:
+ logger.info(
+ "Mismatch between calibration values in the motor and the calibration file or no calibration file found"
+ )
+ self.calibrate()
+
+ self.configure()
+
+ if self.is_calibrated:
+ self.bus.set_zero_position()
+
+ logger.info(f"{self} connected.")
+
+ @property
+ def is_calibrated(self) -> bool:
+ """Check if teleoperator is calibrated."""
+ return self.bus.is_calibrated
+
+ def calibrate(self) -> None:
+ """
+ Run calibration procedure for OpenArms leader.
+
+ The calibration procedure:
+ 1. Disable torque (if not already disabled)
+ 2. Ask user to position arm in zero position (hanging with gripper closed)
+ 3. Set this as zero position
+ 4. Record range of motion for each joint
+ 5. Save calibration
+ """
+ if self.calibration:
+ # Calibration file exists, ask user whether to use it or run new calibration
+ user_input = input(
+ f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
+ )
+ if user_input.strip().lower() != "c":
+ logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
+ self.bus.write_calibration(self.calibration)
+ return
+
+ logger.info(f"\nRunning calibration for {self}")
+ self.bus.disable_torque()
+
+ # Step 1: Set zero position
+ input(
+ "\nCalibration: Set Zero Position)\n"
+ "Position the arm in the following configuration:\n"
+ " - Arm hanging straight down\n"
+ " - Gripper closed\n"
+ "Press ENTER when ready..."
+ )
+
+ # Set current position as zero for all motors
+ self.bus.set_zero_position()
+ logger.info("Arm zero position set.")
+
+ logger.info("Setting range: -90° to +90° by default for all joints")
+ # TODO(Steven, Pepijn): Check if MotorCalibration is actually needed here given that we only use Degrees
+ for motor_name, motor in self.bus.motors.items():
+ self.calibration[motor_name] = MotorCalibration(
+ id=motor.id,
+ drive_mode=0,
+ homing_offset=0,
+ range_min=-90,
+ range_max=90,
+ )
+
+ self.bus.write_calibration(self.calibration)
+ self._save_calibration()
+ print(f"Calibration saved to {self.calibration_fpath}")
+
+ def configure(self) -> None:
+ """
+ Configure motors for manual teleoperation.
+
+ For manual control, we disable torque so the arm can be moved by hand.
+ """
+
+ return self.bus.disable_torque() if self.config.manual_control else self.bus.configure_motors()
+
+ def setup_motors(self) -> None:
+ raise NotImplementedError(
+ "Motor ID configuration is typically done via manufacturer tools for CAN motors."
+ )
+
+ def get_action(self) -> RobotAction:
+ """
+ Get current action from the leader arm.
+
+ This is the main method for teleoperators - it reads the current state
+ of the leader arm and returns it as an action that can be sent to a follower.
+
+ Reads all motor states (pos/vel/torque) in one CAN refresh cycle.
+ """
+ start = time.perf_counter()
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ action_dict: dict[str, Any] = {}
+
+ # Use sync_read_all_states to get pos/vel/torque in one go
+ states = self.bus.sync_read_all_states()
+ for motor in self.bus.motors:
+ state = states.get(motor, {})
+ action_dict[f"{motor}.pos"] = state.get("position")
+ action_dict[f"{motor}.vel"] = state.get("velocity")
+ action_dict[f"{motor}.torque"] = state.get("torque")
+
+ dt_ms = (time.perf_counter() - start) * 1e3
+ logger.debug(f"{self} read state: {dt_ms:.1f}ms")
+
+ return action_dict
+
+ def send_feedback(self, feedback: dict[str, float]) -> None:
+ raise NotImplementedError("Feedback is not yet implemented for OpenArm leader.")
+
+ def disconnect(self) -> None:
+ """Disconnect from teleoperator."""
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ # Disconnect CAN bus
+ # For manual control, ensure torque is disabled before disconnecting
+ self.bus.disconnect(disable_torque=self.config.manual_control)
+ logger.info(f"{self} disconnected.")
diff --git a/src/lerobot/teleoperators/utils.py b/src/lerobot/teleoperators/utils.py
index eec2f119c..8f6bbc787 100644
--- a/src/lerobot/teleoperators/utils.py
+++ b/src/lerobot/teleoperators/utils.py
@@ -13,12 +13,14 @@
# limitations under the License.
from enum import Enum
-from typing import cast
+from typing import TYPE_CHECKING, cast
from lerobot.utils.import_utils import make_device_from_device_class
from .config import TeleoperatorConfig
-from .teleoperator import Teleoperator
+
+if TYPE_CHECKING:
+ from .teleoperator import Teleoperator
class TeleopEvents(Enum):
@@ -31,7 +33,7 @@ class TeleopEvents(Enum):
TERMINATE_EPISODE = "terminate_episode"
-def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
+def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
# TODO(Steven): Consider just using the make_device_from_device_class for all types
if config.type == "keyboard":
from .keyboard import KeyboardTeleop
@@ -81,8 +83,12 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
from .reachy2_teleoperator import Reachy2Teleoperator
return Reachy2Teleoperator(config)
+ elif config.type == "openarm_leader":
+ from .openarm_leader import OpenArmLeader
+
+ return OpenArmLeader(config)
else:
try:
- return cast(Teleoperator, make_device_from_device_class(config))
+ return cast("Teleoperator", make_device_from_device_class(config))
except Exception as e:
raise ValueError(f"Error creating robot with config {config}: {e}") from e