feat(robots): add OpenArm robot & teleoperator (#2795)

* fix(motors): cleanup imports + fix signatures

* feat(motors): add damiao canbus + multiple fixes

* fix(motors): address comments -> last_state + different gains + sleep

* refactor(motors): reduce duplicated code + adressed some comments in the PR

* chore(motors): better timeouts

* tests(motors): damiao test and imports

* chore(deps): fix space

* feat(robot): add openarm leader

Co-authored-by: Pepijn <pepijn@huggingface.co>

* feat(robot): add openarm follower

Co-authored-by: Pepijn <pepijn@huggingface.co>

* refactor(robot): remove mechanical compensations and double arm assumption + rename

* chore(robots): remove left arm references

* refactor(teleop): multiple improvements to leader

* refactor(teleop): multiple improvements to leader

* feat(robots): add open arm to util CLI

* chore(robot): add alias openarm

* Apply suggestions from code review

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>

* chore(motors): remove normalization tables damiao

* fix(motors): imports and signatures

* feat(motors): add motor_type_str + recv_id to motor class and _get_motor_recv_id raises if no motor_obj.recv_id

* chore(motors): remove normalize from base motor class and damaio

* tests(motors): remove bad tests (to be replaced)

* chore(motors): updated import check

* fix(robots): open arm mirrored config for joint limits

* chore(motors): update position_kd gain values

* chore(robots): set to 0 if openarm is calibrated at connect time

* chore(robots): remove macos in open arm as can doesn't support it

* chore(robots): update for motor_type_str in Motor class

* chore(robots): no default value for can port in open arms

* use constant for kp and kd range and check responses in mit_control_batch()

* Add docs on setting up canbus and use damiao otor bus, also add lerobot_setup_can.py and log if there is not response from a write command

* precommit format

* supress bandit as these are intentional cli commands

* fix setup-can

* add test

* skip test in ci

* nit precommit

* update doc example

* dont import can for tests

* remove comment

* Add openarms docs

* format

* update purchase link

* can to none if nit availabl;e

* add canfd option in bus

* make handshake logic similar to lerobot-can

* type hint

* type check

* add temp teleop test

* remove script

* mock class

* ignore linter

---------

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Pepijn <pepijn@huggingface.co>
Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
This commit is contained in:
Steven Palma
2026-01-28 14:28:51 +01:00
committed by GitHub
parent 736b43f3cf
commit bf337e716d
18 changed files with 1129 additions and 22 deletions
+2
View File
@@ -101,6 +101,8 @@
title: Earth Rover Mini title: Earth Rover Mini
- local: omx - local: omx
title: OMX title: OMX
- local: openarm
title: OpenArm
title: "Robots" title: "Robots"
- sections: - sections:
- local: phone_teleop - local: phone_teleop
+258
View File
@@ -0,0 +1,258 @@
# OpenArm
[OpenArm](https://openarm.dev) is an open-source 7DOF humanoid arm designed for physical AI research and deployment.
To get your OpenArm, assembled or DIY, and join the global community, browse verified and certified manufacturers worldwide at [openarm.dev](https://openarm.dev).
## What's Unique?
- **Human-Scale Design**: OpenArm is designed with human-like proportions, scaled for a person around 160-165cm tall. This provides an optimal balance between practical reach and manageable inertia for safe, responsive operation.
- **Safety-First Architecture**: Built with QDD backdrivable motors and high compliance, OpenArm prioritizes safe human-robot interaction while maintaining practical payload capabilities (6.0kg peak / 4.1kg nominal) for real-world tasks.
- **Built for Durability**: Critical structural components use aluminum and stainless steel construction, ensuring robust performance for repetitive data collection and continuous research use.
- **Fully Accessible & Buildable**: Every component, from CNC parts and 3D-printed casings to electrical wiring is designed to be purchasable and buildable by individual researchers and labs, with complete fabrication data provided.
- **Practical & Affordable**: At $6,500 USD for a complete bimanual system, OpenArm delivers research-grade capabilities at a fraction of traditional humanoid robot costs.
## Platform Requirements
<Tip warning={true}>
**Linux Only**: OpenArm currently only works on Linux. The CAN bus USB adapter
does not have macOS drivers and has not been tested on Windows.
</Tip>
## Safety Guide
Before operating OpenArm, please read the [official safety guide](https://docs.openarm.dev/getting-started/safety-guide). Key points:
- **Secure installation**: Fasten the arm to a flat, stable surface with screws or clamps
- **Safe distance**: Keep body parts and objects outside the range of motion during operation
- **Protective equipment**: Always wear safety goggles; use additional PPE as needed
- **Payload limits**: Do not exceed specified payload limits (6.0kg peak / 4.1kg nominal per arm)
- **Emergency stop**: Know the location and operation of the emergency stop device
- **Regular inspection**: Check for loose screws, damaged mechanical limits, unusual noises, and wiring damage
## Hardware Setup
Follow the official [OpenArm hardware documentation](https://docs.openarm.dev) for:
- Bill of materials and sourcing
- 3D printing instructions
- Mechanical assembly
- Electrical wiring
The hardware repositories are available at [github.com/enactic/openarm](https://github.com/enactic/openarm).
## CAN Bus Setup
OpenArm uses CAN bus communication with Damiao motors. Once you have the CAN bus USB adapter plugged into your Linux PC, follow the [Damiao Motors and CAN Bus guide](./damiao) to configure the interface.
Quick setup:
```bash
# Setup CAN interfaces
lerobot-setup-can --mode=setup --interfaces=can0,can1
# Test motor communication
lerobot-setup-can --mode=test --interfaces=can0,can1
```
## Install LeRobot 🤗
Follow our [Installation Guide](./installation), then install the Damiao motor support:
```bash
pip install -e ".[damiao]"
```
## Usage
### Follower Arm (Robot)
<hfoptions id="follower">
<hfoption id="Command">
```bash
lerobot-calibrate \
--robot.type=openarm_follower \
--robot.port=can0 \
--robot.side=right \
--robot.id=my_openarm_follower
```
</hfoption>
<hfoption id="API example">
```python
from lerobot.robots.openarm_follower import OpenArmFollower, OpenArmFollowerConfig
config = OpenArmFollowerConfig(
port="can0",
side="right", # or "left" for left arm
id="my_openarm_follower",
)
follower = OpenArmFollower(config)
follower.connect()
# Read current state
obs = follower.get_observation()
print(obs)
# Send action (position in degrees)
action = {
"joint_1.pos": 0.0,
"joint_2.pos": 0.0,
"joint_3.pos": 0.0,
"joint_4.pos": 45.0,
"joint_5.pos": 0.0,
"joint_6.pos": 0.0,
"joint_7.pos": 0.0,
"gripper.pos": 0.0,
}
follower.send_action(action)
follower.disconnect()
```
</hfoption>
</hfoptions>
### Leader Arm (Teleoperator)
The leader arm is used for teleoperation - manually moving it to control the follower arm.
<hfoptions id="leader">
<hfoption id="Command">
```bash
lerobot-calibrate \
--teleop.type=openarm_leader \
--teleop.port=can1 \
--teleop.id=my_openarm_leader
```
</hfoption>
<hfoption id="API example">
```python
from lerobot.teleoperators.openarm_leader import OpenArmLeader, OpenArmLeaderConfig
config = OpenArmLeaderConfig(
port="can1",
id="my_openarm_leader",
manual_control=True, # Disable torque for manual movement
)
leader = OpenArmLeader(config)
leader.connect()
# Read current position (as action to send to follower)
action = leader.get_action()
print(action)
leader.disconnect()
```
</hfoption>
</hfoptions>
### Teleoperation
To teleoperate OpenArm with leader-follower control:
```bash
lerobot-teleoperate \
--robot.type=openarm_follower \
--robot.port=can0 \
--robot.side=right \
--robot.id=my_follower \
--teleop.type=openarm_leader \
--teleop.port=can1 \
--teleop.id=my_leader
```
### Recording Data
To record a dataset during teleoperation:
```bash
lerobot-record \
--robot.type=openarm_follower \
--robot.port=can0 \
--robot.side=right \
--robot.id=my_follower \
--teleop.type=openarm_leader \
--teleop.port=can1 \
--teleop.id=my_leader \
--repo-id=my_hf_username/my_openarm_dataset \
--fps=30 \
--num-episodes=10
```
## Configuration Options
### Follower Configuration
| Parameter | Default | Description |
| --------------------- | --------- | ---------------------------------------------------------- |
| `port` | - | CAN interface (e.g., `can0`) |
| `side` | `None` | Arm side: `"left"`, `"right"`, or `None` for custom limits |
| `use_can_fd` | `True` | Enable CAN FD for higher data rates |
| `can_bitrate` | `1000000` | Nominal bitrate (1 Mbps) |
| `can_data_bitrate` | `5000000` | CAN FD data bitrate (5 Mbps) |
| `max_relative_target` | `None` | Safety limit for relative target positions |
| `position_kp` | Per-joint | Position control proportional gains |
| `position_kd` | Per-joint | Position control derivative gains |
### Leader Configuration
| Parameter | Default | Description |
| ------------------ | --------- | ----------------------------------- |
| `port` | - | CAN interface (e.g., `can1`) |
| `manual_control` | `True` | Disable torque for manual movement |
| `use_can_fd` | `True` | Enable CAN FD for higher data rates |
| `can_bitrate` | `1000000` | Nominal bitrate (1 Mbps) |
| `can_data_bitrate` | `5000000` | CAN FD data bitrate (5 Mbps) |
## Motor Configuration
OpenArm uses Damiao motors with the following default configuration:
| Joint | Motor Type | Send ID | Recv ID |
| --------------------------- | ---------- | ------- | ------- |
| joint_1 (Shoulder pan) | DM8009 | 0x01 | 0x11 |
| joint_2 (Shoulder lift) | DM8009 | 0x02 | 0x12 |
| joint_3 (Shoulder rotation) | DM4340 | 0x03 | 0x13 |
| joint_4 (Elbow flex) | DM4340 | 0x04 | 0x14 |
| joint_5 (Wrist roll) | DM4310 | 0x05 | 0x15 |
| joint_6 (Wrist pitch) | DM4310 | 0x06 | 0x16 |
| joint_7 (Wrist rotation) | DM4310 | 0x07 | 0x17 |
| gripper | DM4310 | 0x08 | 0x18 |
## Troubleshooting
### No Response from Motors
1. Check power supply connections
2. Verify CAN wiring (CAN-H, CAN-L, GND)
3. Run diagnostics: `lerobot-setup-can --mode=test --interfaces=can0`
4. See the [Damiao troubleshooting guide](./damiao#troubleshooting) for more details
### CAN Interface Not Found
Ensure the CAN interface is configured:
```bash
ip link show can0
```
## Resources
- [OpenArm Website](https://openarm.dev)
- [OpenArm Documentation](https://docs.openarm.dev)
- [OpenArm GitHub](https://github.com/enactic/openarm)
- [Safety Guide](https://docs.openarm.dev/getting-started/safety-guide)
- [Damiao Motors and CAN Bus](./damiao)
+1
View File
@@ -105,6 +105,7 @@ dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"]
damiao = ["python-can>=4.2.0,<5.0.0"] damiao = ["python-can>=4.2.0,<5.0.0"]
# Robots # Robots
openarms = ["lerobot[damiao]"]
gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"] gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"]
hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"] hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"] lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
+38 -13
View File
@@ -28,8 +28,11 @@ from lerobot.utils.import_utils import _can_available
if TYPE_CHECKING or _can_available: if TYPE_CHECKING or _can_available:
import can import can
else: else:
can.Message = object
can.interface = None class can: # noqa: N801
Message = object
interface = None
import numpy as np import numpy as np
@@ -206,11 +209,31 @@ class DamiaoMotorsBus(MotorsBusBase):
Raises ConnectionError if any motor fails to respond. Raises ConnectionError if any motor fails to respond.
""" """
logger.info("Starting handshake with motors...") logger.info("Starting handshake with motors...")
missing_motors = []
# Drain any pending messages
while self.canbus.recv(timeout=0.01):
pass
missing_motors = []
for motor_name in self.motors: for motor_name in self.motors:
msg = self._refresh_motor(motor_name) motor_id = self._get_motor_id(motor_name)
if msg is None: recv_id = self._get_motor_recv_id(motor_name)
# Send enable command
data = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, CAN_CMD_ENABLE]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
# Wait for response with longer timeout
response = None
start_time = time.time()
while time.time() - start_time < 0.1:
response = self.canbus.recv(timeout=0.1)
if response and response.arbitration_id == recv_id:
break
response = None
if response is None:
missing_motors.append(motor_name) missing_motors.append(motor_name)
else: else:
self._process_response(motor_name, msg) self._process_response(motor_name, msg)
@@ -259,7 +282,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_name = self._get_motor_name(motor) motor_name = self._get_motor_name(motor)
recv_id = self._get_motor_recv_id(motor) recv_id = self._get_motor_recv_id(motor)
data = [0xFF] * 7 + [command_byte] data = [0xFF] * 7 + [command_byte]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg) self.canbus.send(msg)
if msg := self._recv_motor_response(expected_recv_id=recv_id): if msg := self._recv_motor_response(expected_recv_id=recv_id):
self._process_response(motor_name, msg) self._process_response(motor_name, msg)
@@ -317,7 +340,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_id = self._get_motor_id(motor) motor_id = self._get_motor_id(motor)
recv_id = self._get_motor_recv_id(motor) recv_id = self._get_motor_recv_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0] data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False) msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg) self.canbus.send(msg)
return self._recv_motor_response(expected_recv_id=recv_id) return self._recv_motor_response(expected_recv_id=recv_id)
@@ -439,7 +462,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_type = self._motor_types[motor_name] motor_type = self._motor_types[motor_name]
data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque) data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg) self.canbus.send(msg)
recv_id = self._get_motor_recv_id(motor) recv_id = self._get_motor_recv_id(motor)
@@ -472,7 +495,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_type = self._motor_types[motor_name] motor_type = self._motor_types[motor_name]
data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque) data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg) self.canbus.send(msg)
recv_id_to_motor[self._get_motor_recv_id(motor)] = motor_name recv_id_to_motor[self._get_motor_recv_id(motor)] = motor_name
@@ -637,10 +660,10 @@ class DamiaoMotorsBus(MotorsBusBase):
for motor in motors: for motor in motors:
motor_id = self._get_motor_id(motor) motor_id = self._get_motor_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0] data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False) msg = can.Message(
arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False, is_fd=self.use_can_fd
)
self.canbus.send(msg) self.canbus.send(msg)
# Small delay to reduce bus congestion if necessary, though removed in sync_read previously
# precise_sleep(PRECISE_SLEEP_SEC)
# Collect responses # Collect responses
expected_recv_ids = [self._get_motor_recv_id(m) for m in motors] expected_recv_ids = [self._get_motor_recv_id(m) for m in motors]
@@ -676,7 +699,9 @@ class DamiaoMotorsBus(MotorsBusBase):
kd = self._gains[motor]["kd"] kd = self._gains[motor]["kd"]
data = self._encode_mit_packet(motor_type, kp, kd, float(value_degrees), 0.0, 0.0) data = self._encode_mit_packet(motor_type, kp, kd, float(value_degrees), 0.0, 0.0)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) msg = can.Message(
arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd
)
self.canbus.send(msg) self.canbus.send(msg)
precise_sleep(PRECISE_TIMEOUT_SEC) precise_sleep(PRECISE_TIMEOUT_SEC)
+7 -5
View File
@@ -18,16 +18,18 @@
import math import math
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Protocol, TypeVar, runtime_checkable from typing import TYPE_CHECKING, Any, Protocol, TypeVar, runtime_checkable
import numpy as np import numpy as np
import torch import torch
import torchvision.transforms.functional as F # noqa: N812 import torchvision.transforms.functional as F # noqa: N812
from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.teleoperators.teleoperator import Teleoperator
from lerobot.teleoperators.utils import TeleopEvents from lerobot.teleoperators.utils import TeleopEvents
if TYPE_CHECKING:
from lerobot.teleoperators.teleoperator import Teleoperator
from .core import EnvTransition, PolicyAction, TransitionKey from .core import EnvTransition, PolicyAction, TransitionKey
from .pipeline import ( from .pipeline import (
ComplementaryDataProcessorStep, ComplementaryDataProcessorStep,
@@ -69,10 +71,10 @@ class HasTeleopEvents(Protocol):
# Type variable constrained to Teleoperator subclasses that also implement events # Type variable constrained to Teleoperator subclasses that also implement events
TeleopWithEvents = TypeVar("TeleopWithEvents", bound=Teleoperator) TeleopWithEvents = TypeVar("TeleopWithEvents", bound="Teleoperator")
def _check_teleop_with_events(teleop: Teleoperator) -> None: def _check_teleop_with_events(teleop: "Teleoperator") -> None:
""" """
Runtime check that a teleoperator implements the `HasTeleopEvents` protocol. Runtime check that a teleoperator implements the `HasTeleopEvents` protocol.
@@ -103,7 +105,7 @@ class AddTeleopActionAsComplimentaryDataStep(ComplementaryDataProcessorStep):
teleop_device: The teleoperator instance to get the action from. teleop_device: The teleoperator instance to get the action from.
""" """
teleop_device: Teleoperator teleop_device: "Teleoperator"
def complementary_data(self, complementary_data: dict) -> dict: def complementary_data(self, complementary_data: dict) -> dict:
""" """
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_openarm_follower import OpenArmFollowerConfig
from .openarm_follower import OpenArmFollower
__all__ = ["OpenArmFollower", "OpenArmFollowerConfig"]
@@ -0,0 +1,117 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.cameras import CameraConfig
from ..config import RobotConfig
LEFT_DEFAULT_JOINTS_LIMITS: dict[str, tuple[float, float]] = {
"joint_1": (-75.0, 75.0),
"joint_2": (-90.0, 9.0),
"joint_3": (-85.0, 85.0),
"joint_4": (0.0, 135.0),
"joint_5": (-85.0, 85.0),
"joint_6": (-40.0, 40.0),
"joint_7": (-80.0, 80.0),
"gripper": (-65.0, 0.0),
}
RIGHT_DEFAULT_JOINTS_LIMITS: dict[str, tuple[float, float]] = {
"joint_1": (-75.0, 75.0),
"joint_2": (-9.0, 90.0),
"joint_3": (-85.0, 85.0),
"joint_4": (0.0, 135.0),
"joint_5": (-85.0, 85.0),
"joint_6": (-40.0, 40.0),
"joint_7": (-80.0, 80.0),
"gripper": (-65.0, 0.0),
}
@RobotConfig.register_subclass("openarm_follower")
@dataclass
class OpenArmFollowerConfig(RobotConfig):
"""Configuration for the OpenArms follower robot with Damiao motors."""
# CAN interfaces - one per arm
# arm CAN interface (e.g., "can1")
# Linux: "can0", "can1", etc.
port: str
# side of the arm: "left" or "right". If "None" default values will be used
side: str | None = None
# CAN interface type: "socketcan" (Linux), "slcan" (serial), or "auto" (auto-detect)
can_interface: str = "socketcan"
# CAN FD settings (OpenArms uses CAN FD by default)
use_can_fd: bool = True
can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps)
can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps)
# Whether to disable torque when disconnecting
disable_torque_on_disconnect: bool = True
# Safety limit for relative target positions
# Set to a positive scalar for all motors, or a dict mapping motor names to limits
max_relative_target: float | dict[str, float] | None = None
# Camera configurations
cameras: dict[str, CameraConfig] = field(default_factory=dict)
# Motor configuration for OpenArms (7 DOF per arm)
# Maps motor names to (send_can_id, recv_can_id, motor_type)
# Based on: https://docs.openarm.dev/software/setup/configure-test
# OpenArms uses 4 types of motors:
# - DM8009 (DM-J8009P-2EC) for shoulders (high torque)
# - DM4340P and DM4340 for shoulder rotation and elbow
# - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper
motor_config: dict[str, tuple[int, int, str]] = field(
default_factory=lambda: {
"joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009)
"joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009)
"joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340)
"joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340)
"joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310)
"joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310)
"joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310)
"gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
}
)
# MIT control parameters for position control (used in send_action)
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
position_kp: list[float] = field(
default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 25.0]
)
position_kd: list[float] = field(default_factory=lambda: [5.0, 5.0, 3.0, 5.0, 0.3, 0.3, 0.3, 0.3])
# Values for joint limits. Can be overridden via CLI (for custom values) or by setting config.side to either 'left' or 'right'.
# If config.side is left set to None and no CLI values are passed, the default joint limit values are small for safety.
joint_limits: dict[str, tuple[float, float]] = field(
default_factory=lambda: {
"joint_1": (-5.0, 5.0),
"joint_2": (-5.0, 5.0),
"joint_3": (-5.0, 5.0),
"joint_4": (0.0, 5.0),
"joint_5": (-5.0, 5.0),
"joint_6": (-5.0, 5.0),
"joint_7": (-5.0, 5.0),
"gripper": (-5.0, 0.0),
}
)
@@ -0,0 +1,348 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
from lerobot.processor import RobotAction, RobotObservation
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_openarm_follower import (
LEFT_DEFAULT_JOINTS_LIMITS,
RIGHT_DEFAULT_JOINTS_LIMITS,
OpenArmFollowerConfig,
)
logger = logging.getLogger(__name__)
class OpenArmFollower(Robot):
"""
OpenArms Follower Robot which uses CAN bus communication to control 7 DOF arm with a gripper.
The arm uses Damiao motors in MIT control mode.
"""
config_class = OpenArmFollowerConfig
name = "openarm_follower"
def __init__(self, config: OpenArmFollowerConfig):
super().__init__(config)
self.config = config
# Arm motors
motors: dict[str, Motor] = {}
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
motor = Motor(
send_id, motor_type_str, MotorNormMode.DEGREES
) # Always use degrees for Damiao motors
motor.recv_id = recv_id
motor.motor_type_str = motor_type_str
motors[motor_name] = motor
self.bus = DamiaoMotorsBus(
port=self.config.port,
motors=motors,
calibration=self.calibration,
can_interface=self.config.can_interface,
use_can_fd=self.config.use_can_fd,
bitrate=self.config.can_bitrate,
data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
)
if config.side is not None:
if config.side == "left":
config.joint_limits = LEFT_DEFAULT_JOINTS_LIMITS
elif config.side == "right":
config.joint_limits = RIGHT_DEFAULT_JOINTS_LIMITS
else:
raise ValueError(
"config.side must be either 'left', 'right' (for default values) or 'None' (for CLI values)"
)
else:
logger.info(
"Set config.side to either 'left' or 'right' to use pre-configured values for joint limits."
)
logger.info(f"Values used for joint limits: {config.joint_limits}.")
# Initialize cameras
self.cameras = make_cameras_from_configs(config.cameras)
@property
def _motors_ft(self) -> dict[str, type]:
"""Motor features for observation and action spaces."""
features: dict[str, type] = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float # Add this
features[f"{motor}.torque"] = float # Add this
return features
@property
def _cameras_ft(self) -> dict[str, tuple]:
"""Camera features for observation space."""
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
"""Combined observation features from motors and cameras."""
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
"""Action features."""
return self._motors_ft
@property
def is_connected(self) -> bool:
"""Check if robot is connected."""
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the robot and optionally calibrate.
We assume that at connection time, the arms are in a safe rest position,
and torque can be safely disabled to run calibration if needed.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
logger.info(f"Connecting arm on {self.config.port}...")
self.bus.connect()
# Run calibration if needed
if not self.is_calibrated and calibrate:
logger.info(
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
)
self.calibrate()
for cam in self.cameras.values():
cam.connect()
self.configure()
if self.is_calibrated:
self.bus.set_zero_position()
self.bus.enable_torque()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
"""Check if robot is calibrated."""
return self.bus.is_calibrated
def calibrate(self) -> None:
"""
Run calibration procedure for OpenArms robot.
The calibration procedure:
1. Disable torque
2. Ask user to position arms in hanging position with grippers closed
3. Set this as zero position
4. Record range of motion for each joint
5. Save calibration
"""
if self.calibration:
# Calibration file exists, ask user whether to use it or run new calibration
user_input = input(
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
self.bus.write_calibration(self.calibration)
return
logger.info(f"\nRunning calibration for {self}")
self.bus.disable_torque()
# Step 1: Set zero position
input(
"\nCalibration: Set Zero Position)\n"
"Position the arm in the following configuration:\n"
" - Arm hanging straight down\n"
" - Gripper closed\n"
"Press ENTER when ready..."
)
# Set current position as zero for all motors
self.bus.set_zero_position()
logger.info("Arm zero position set.")
logger.info("Setting range: -90° to +90° for safety by default for all joints")
for motor_name, motor in self.bus.motors.items():
self.calibration[motor_name] = MotorCalibration(
id=motor.id,
drive_mode=0,
homing_offset=0,
range_min=-90,
range_max=90,
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
"""Configure motors with appropriate settings."""
# TODO(Steven, Pepijn): Slightly different from what it is happening in the leader
with self.bus.torque_disabled():
self.bus.configure_motors()
def setup_motors(self) -> None:
raise NotImplementedError(
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
def get_observation(self) -> RobotObservation:
"""
Get current observation from robot including position, velocity, and torque.
Reads all motor states (pos/vel/torque) in one CAN refresh cycle
instead of 3 separate reads.
"""
start = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict: dict[str, Any] = {}
states = self.bus.sync_read_all_states()
for motor in self.bus.motors:
state = states.get(motor, {})
obs_dict[f"{motor}.pos"] = state.get("position", 0.0)
obs_dict[f"{motor}.vel"] = state.get("velocity", 0.0)
obs_dict[f"{motor}.torque"] = state.get("torque", 0.0)
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} get_observation took: {dt_ms:.1f}ms")
return obs_dict
def send_action(
self,
action: RobotAction,
custom_kp: dict[str, float] | None = None,
custom_kd: dict[str, float] | None = None,
) -> RobotAction:
"""
Send action command to robot.
The action magnitude may be clipped based on safety limits.
Args:
action: Dictionary with motor positions (e.g., "joint_1.pos", "joint_2.pos")
custom_kp: Optional custom kp gains per motor (e.g., {"joint_1": 120.0, "joint_2": 150.0})
custom_kd: Optional custom kd gains per motor (e.g., {"joint_1": 1.5, "joint_2": 2.0})
Returns:
The action actually sent (potentially clipped)
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Apply joint limit clipping to arm
for motor_name, position in goal_pos.items():
if motor_name in self.config.joint_limits:
min_limit, max_limit = self.config.joint_limits[motor_name]
clipped_position = max(min_limit, min(max_limit, position))
if clipped_position != position:
logger.debug(f"Clipped {motor_name} from {position:.2f}° to {clipped_position:.2f}°")
goal_pos[motor_name] = clipped_position
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# TODO(Steven, Pepijn): Refactor writing
# Motor name to index mapping for gains
motor_index = {
"joint_1": 0,
"joint_2": 1,
"joint_3": 2,
"joint_4": 3,
"joint_5": 4,
"joint_6": 5,
"joint_7": 6,
"gripper": 7,
}
# Use batch MIT control for arm (sends all commands, then collects responses)
commands = {}
for motor_name, position_degrees in goal_pos.items():
idx = motor_index.get(motor_name, 0)
# Use custom gains if provided, otherwise use config defaults
if custom_kp is not None and motor_name in custom_kp:
kp = custom_kp[motor_name]
else:
kp = (
self.config.position_kp[idx]
if isinstance(self.config.position_kp, list)
else self.config.position_kp
)
if custom_kd is not None and motor_name in custom_kd:
kd = custom_kd[motor_name]
else:
kd = (
self.config.position_kd[idx]
if isinstance(self.config.position_kd, list)
else self.config.position_kd
)
commands[motor_name] = (kp, kd, position_degrees, 0.0, 0.0)
self.bus._mit_control_batch(commands)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
"""Disconnect from robot."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Disconnect CAN bus
self.bus.disconnect(self.config.disable_torque_on_disconnect)
# Disconnect cameras
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")
+4
View File
@@ -60,6 +60,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
from .reachy2 import Reachy2Robot from .reachy2 import Reachy2Robot
return Reachy2Robot(config) return Reachy2Robot(config)
elif config.type == "openarm_follower":
from .openarm_follower import OpenArmFollower
return OpenArmFollower(config)
elif config.type == "mock_robot": elif config.type == "mock_robot":
from tests.mocks.mock_robot import MockRobot from tests.mocks.mock_robot import MockRobot
+2
View File
@@ -42,6 +42,7 @@ from lerobot.robots import ( # noqa: F401
lekiwi, lekiwi,
make_robot_from_config, make_robot_from_config,
omx_follower, omx_follower,
openarm_follower,
so_follower, so_follower,
) )
from lerobot.teleoperators import ( # noqa: F401 from lerobot.teleoperators import ( # noqa: F401
@@ -52,6 +53,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader, koch_leader,
make_teleoperator_from_config, make_teleoperator_from_config,
omx_leader, omx_leader,
openarm_leader,
so_leader, so_leader,
) )
from lerobot.utils.import_utils import register_third_party_plugins from lerobot.utils.import_utils import register_third_party_plugins
@@ -48,6 +48,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower, koch_follower,
make_robot_from_config, make_robot_from_config,
omx_follower, omx_follower,
openarm_follower,
so_follower, so_follower,
) )
from lerobot.teleoperators import ( # noqa: F401 from lerobot.teleoperators import ( # noqa: F401
@@ -57,6 +58,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader, koch_leader,
make_teleoperator_from_config, make_teleoperator_from_config,
omx_leader, omx_leader,
openarm_leader,
so_leader, so_leader,
) )
from lerobot.utils.robot_utils import precise_sleep from lerobot.utils.robot_utils import precise_sleep
+2
View File
@@ -104,6 +104,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower, koch_follower,
make_robot_from_config, make_robot_from_config,
omx_follower, omx_follower,
openarm_follower,
reachy2, reachy2,
so_follower, so_follower,
unitree_g1, unitree_g1,
@@ -116,6 +117,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader, koch_leader,
make_teleoperator_from_config, make_teleoperator_from_config,
omx_leader, omx_leader,
openarm_leader,
reachy2_teleoperator, reachy2_teleoperator,
so_leader, so_leader,
) )
+1
View File
@@ -59,6 +59,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower, koch_follower,
make_robot_from_config, make_robot_from_config,
omx_follower, omx_follower,
openarm_follower,
reachy2, reachy2,
so_follower, so_follower,
unitree_g1, unitree_g1,
@@ -76,6 +76,7 @@ from lerobot.robots import ( # noqa: F401
koch_follower, koch_follower,
make_robot_from_config, make_robot_from_config,
omx_follower, omx_follower,
openarm_follower,
reachy2, reachy2,
so_follower, so_follower,
) )
@@ -89,6 +90,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader, koch_leader,
make_teleoperator_from_config, make_teleoperator_from_config,
omx_leader, omx_leader,
openarm_leader,
reachy2_teleoperator, reachy2_teleoperator,
so_leader, so_leader,
) )
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_openarm_leader import OpenArmLeaderConfig
from .openarm_leader import OpenArmLeader
__all__ = ["OpenArmLeader", "OpenArmLeaderConfig"]
@@ -0,0 +1,70 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("openarm_leader")
@dataclass
class OpenArmLeaderConfig(TeleoperatorConfig):
"""Configuration for the OpenArms leader/teleoperator with Damiao motors."""
# CAN interfaces - one per arm
# Arm CAN interface (e.g., "can3")
# Linux: "can0", "can1", etc.
port: str
# CAN interface type: "socketcan" (Linux), "slcan" (serial), or "auto" (auto-detect)
can_interface: str = "socketcan"
# CAN FD settings (OpenArms uses CAN FD by default)
use_can_fd: bool = True
can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps)
can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps)
# Motor configuration for OpenArms (7 DOF per arm)
# Maps motor names to (send_can_id, recv_can_id, motor_type)
# Based on: https://docs.openarm.dev/software/setup/configure-test
# OpenArms uses 4 types of motors:
# - DM8009 (DM-J8009P-2EC) for shoulders (high torque)
# - DM4340P and DM4340 for shoulder rotation and elbow
# - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper
motor_config: dict[str, tuple[int, int, str]] = field(
default_factory=lambda: {
"joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009)
"joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009)
"joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340)
"joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340)
"joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310)
"joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310)
"joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310)
"gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
}
)
# Torque mode settings for manual control
# When enabled, motors have torque disabled for manual movement
manual_control: bool = True
# TODO(Steven, Pepijn): Not used ... ?
# MIT control parameters (used when manual_control=False for torque control)
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
position_kp: list[float] = field(
default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0]
)
position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
@@ -0,0 +1,225 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Any
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
from lerobot.processor import RobotAction
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .config_openarm_leader import OpenArmLeaderConfig
logger = logging.getLogger(__name__)
class OpenArmLeader(Teleoperator):
"""
OpenArm Leader/Teleoperator Arm with Damiao motors.
This teleoperator uses CAN bus communication to read positions from
Damiao motors that are manually moved (torque disabled).
"""
config_class = OpenArmLeaderConfig
name = "openarm_leader"
def __init__(self, config: OpenArmLeaderConfig):
super().__init__(config)
self.config = config
# Arm motors
motors: dict[str, Motor] = {}
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
motor = Motor(
send_id, motor_type_str, MotorNormMode.DEGREES
) # Always use degrees for Damiao motors
motor.recv_id = recv_id
motor.motor_type_str = motor_type_str
motors[motor_name] = motor
self.bus = DamiaoMotorsBus(
port=self.config.port,
motors=motors,
calibration=self.calibration,
can_interface=self.config.can_interface,
use_can_fd=self.config.use_can_fd,
bitrate=self.config.can_bitrate,
data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
)
@property
def action_features(self) -> dict[str, type]:
"""Features produced by this teleoperator."""
features: dict[str, type] = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float
features[f"{motor}.torque"] = float
return features
@property
def feedback_features(self) -> dict[str, type]:
"""Feedback features (not implemented for OpenArms)."""
return {}
@property
def is_connected(self) -> bool:
"""Check if teleoperator is connected."""
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the teleoperator.
For manual control, we disable torque after connecting so the
arm can be moved by hand.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
logger.info(f"Connecting arm on {self.config.port}...")
self.bus.connect()
# Run calibration if needed
if not self.is_calibrated and calibrate:
logger.info(
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
)
self.calibrate()
self.configure()
if self.is_calibrated:
self.bus.set_zero_position()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
"""Check if teleoperator is calibrated."""
return self.bus.is_calibrated
def calibrate(self) -> None:
"""
Run calibration procedure for OpenArms leader.
The calibration procedure:
1. Disable torque (if not already disabled)
2. Ask user to position arm in zero position (hanging with gripper closed)
3. Set this as zero position
4. Record range of motion for each joint
5. Save calibration
"""
if self.calibration:
# Calibration file exists, ask user whether to use it or run new calibration
user_input = input(
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
self.bus.write_calibration(self.calibration)
return
logger.info(f"\nRunning calibration for {self}")
self.bus.disable_torque()
# Step 1: Set zero position
input(
"\nCalibration: Set Zero Position)\n"
"Position the arm in the following configuration:\n"
" - Arm hanging straight down\n"
" - Gripper closed\n"
"Press ENTER when ready..."
)
# Set current position as zero for all motors
self.bus.set_zero_position()
logger.info("Arm zero position set.")
logger.info("Setting range: -90° to +90° by default for all joints")
# TODO(Steven, Pepijn): Check if MotorCalibration is actually needed here given that we only use Degrees
for motor_name, motor in self.bus.motors.items():
self.calibration[motor_name] = MotorCalibration(
id=motor.id,
drive_mode=0,
homing_offset=0,
range_min=-90,
range_max=90,
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
"""
Configure motors for manual teleoperation.
For manual control, we disable torque so the arm can be moved by hand.
"""
return self.bus.disable_torque() if self.config.manual_control else self.bus.configure_motors()
def setup_motors(self) -> None:
raise NotImplementedError(
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
def get_action(self) -> RobotAction:
"""
Get current action from the leader arm.
This is the main method for teleoperators - it reads the current state
of the leader arm and returns it as an action that can be sent to a follower.
Reads all motor states (pos/vel/torque) in one CAN refresh cycle.
"""
start = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
action_dict: dict[str, Any] = {}
# Use sync_read_all_states to get pos/vel/torque in one go
states = self.bus.sync_read_all_states()
for motor in self.bus.motors:
state = states.get(motor, {})
action_dict[f"{motor}.pos"] = state.get("position")
action_dict[f"{motor}.vel"] = state.get("velocity")
action_dict[f"{motor}.torque"] = state.get("torque")
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm leader.")
def disconnect(self) -> None:
"""Disconnect from teleoperator."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Disconnect CAN bus
# For manual control, ensure torque is disabled before disconnecting
self.bus.disconnect(disable_torque=self.config.manual_control)
logger.info(f"{self} disconnected.")
+10 -4
View File
@@ -13,12 +13,14 @@
# limitations under the License. # limitations under the License.
from enum import Enum from enum import Enum
from typing import cast from typing import TYPE_CHECKING, cast
from lerobot.utils.import_utils import make_device_from_device_class from lerobot.utils.import_utils import make_device_from_device_class
from .config import TeleoperatorConfig from .config import TeleoperatorConfig
from .teleoperator import Teleoperator
if TYPE_CHECKING:
from .teleoperator import Teleoperator
class TeleopEvents(Enum): class TeleopEvents(Enum):
@@ -31,7 +33,7 @@ class TeleopEvents(Enum):
TERMINATE_EPISODE = "terminate_episode" TERMINATE_EPISODE = "terminate_episode"
def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator: def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
# TODO(Steven): Consider just using the make_device_from_device_class for all types # TODO(Steven): Consider just using the make_device_from_device_class for all types
if config.type == "keyboard": if config.type == "keyboard":
from .keyboard import KeyboardTeleop from .keyboard import KeyboardTeleop
@@ -81,8 +83,12 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
from .reachy2_teleoperator import Reachy2Teleoperator from .reachy2_teleoperator import Reachy2Teleoperator
return Reachy2Teleoperator(config) return Reachy2Teleoperator(config)
elif config.type == "openarm_leader":
from .openarm_leader import OpenArmLeader
return OpenArmLeader(config)
else: else:
try: try:
return cast(Teleoperator, make_device_from_device_class(config)) return cast("Teleoperator", make_device_from_device_class(config))
except Exception as e: except Exception as e:
raise ValueError(f"Error creating robot with config {config}: {e}") from e raise ValueError(f"Error creating robot with config {config}: {e}") from e