feat(robots): add bi manual openarm follower and leader (#2835)

* fix(motors): cleanup imports + fix signatures

* feat(motors): add damiao canbus + multiple fixes

* fix(motors): address comments -> last_state + different gains + sleep

* refactor(motors): reduce duplicated code + adressed some comments in the PR

* chore(motors): better timeouts

* tests(motors): damiao test and imports

* chore(deps): fix space

* feat(robot): add openarm leader

Co-authored-by: Pepijn <pepijn@huggingface.co>

* feat(robot): add openarm follower

Co-authored-by: Pepijn <pepijn@huggingface.co>

* refactor(robot): remove mechanical compensations and double arm assumption + rename

* chore(robots): remove left arm references

* refactor(teleop): multiple improvements to leader

* refactor(teleop): multiple improvements to leader

* feat(robots): add open arm to util CLI

* chore(robot): add alias openarm

* Apply suggestions from code review

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>

* chore(motors): remove normalization tables damiao

* fix(motors): imports and signatures

* feat(motors): add motor_type_str + recv_id to motor class and _get_motor_recv_id raises if no motor_obj.recv_id

* chore(motors): remove normalize from base motor class and damaio

* tests(motors): remove bad tests (to be replaced)

* chore(motors): updated import check

* fix(robots): open arm mirrored config for joint limits

* chore(motors): update position_kd gain values

* chore(robots): set to 0 if openarm is calibrated at connect time

* chore(robots): remove macos in open arm as can doesn't support it

* chore(robots): update for motor_type_str in Motor class

* chore(robots): no default value for can port in open arms

* feat(robots): add bi manual openarm follower and leader

* use constant for kp and kd range and check responses in mit_control_batch()

* Add docs on setting up canbus and use damiao otor bus, also add lerobot_setup_can.py and log if there is not response from a write command

* precommit format

* supress bandit as these are intentional cli commands

* fix setup-can

* add test

* skip test in ci

* nit precommit

* update doc example

* dont import can for tests

* remove comment

* Add openarms docs

* format

* update purchase link

* can to none if nit availabl;e

* add canfd option in bus

* make handshake logic similar to lerobot-can

* type hint

* type check

* add temp teleop test

* remove script

* mock class

* mock class

* ignore linter

* pre-commit

* Add command for bimanual openarm

* fix import

* fix import leader

* fix import draccus

---------

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Pepijn <pepijn@huggingface.co>
Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
This commit is contained in:
Steven Palma
2026-01-28 17:25:57 +01:00
committed by GitHub
parent 149628dfd5
commit 4483184875
18 changed files with 461 additions and 10 deletions
+18
View File
@@ -174,6 +174,24 @@ lerobot-teleoperate \
--teleop.id=my_leader
```
### Bimanual Teleoperation
To teleoperate a bimanual OpenArm setup with two leader and two follower arms:
```bash
lerobot-teleoperate \
--robot.type=bi_openarm_follower \
--robot.left_arm_config.port=can0 \
--robot.left_arm_config.side=left \
--robot.right_arm_config.port=can1 \
--robot.right_arm_config.side=right \
--robot.id=my_bimanual_follower \
--teleop.type=bi_openarm_leader \
--teleop.left_arm_config.port=can2 \
--teleop.right_arm_config.port=can3 \
--teleop.id=my_bimanual_leader
```
### Recording Data
To record a dataset during teleoperation:
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .bi_openarm_follower import BiOpenArmFollower
from .config_bi_openarm_follower import BiOpenArmFollowerConfig
__all__ = ["BiOpenArmFollower", "BiOpenArmFollowerConfig"]
@@ -0,0 +1,175 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from functools import cached_property
from lerobot.processor import RobotAction, RobotObservation
from lerobot.robots.openarm_follower import OpenArmFollower, OpenArmFollowerConfig
from ..robot import Robot
from .config_bi_openarm_follower import BiOpenArmFollowerConfig
logger = logging.getLogger(__name__)
class BiOpenArmFollower(Robot):
"""
Bimanual OpenArm Follower Arms
"""
config_class = BiOpenArmFollowerConfig
name = "bi_openarm_follower"
def __init__(self, config: BiOpenArmFollowerConfig):
super().__init__(config)
self.config = config
left_arm_config = OpenArmFollowerConfig(
id=f"{config.id}_left" if config.id else None,
calibration_dir=config.calibration_dir,
port=config.left_arm_config.port,
disable_torque_on_disconnect=config.left_arm_config.disable_torque_on_disconnect,
max_relative_target=config.left_arm_config.max_relative_target,
cameras=config.left_arm_config.cameras,
side=config.left_arm_config.side,
can_interface=config.left_arm_config.can_interface,
use_can_fd=config.left_arm_config.use_can_fd,
can_bitrate=config.left_arm_config.can_bitrate,
can_data_bitrate=config.left_arm_config.can_data_bitrate,
motor_config=config.left_arm_config.motor_config,
position_kd=config.left_arm_config.position_kd,
position_kp=config.left_arm_config.position_kp,
joint_limits=config.left_arm_config.joint_limits,
)
right_arm_config = OpenArmFollowerConfig(
id=f"{config.id}_right" if config.id else None,
calibration_dir=config.calibration_dir,
port=config.right_arm_config.port,
disable_torque_on_disconnect=config.right_arm_config.disable_torque_on_disconnect,
max_relative_target=config.right_arm_config.max_relative_target,
cameras=config.right_arm_config.cameras,
side=config.right_arm_config.side,
can_interface=config.right_arm_config.can_interface,
use_can_fd=config.right_arm_config.use_can_fd,
can_bitrate=config.right_arm_config.can_bitrate,
can_data_bitrate=config.right_arm_config.can_data_bitrate,
motor_config=config.right_arm_config.motor_config,
position_kd=config.right_arm_config.position_kd,
position_kp=config.right_arm_config.position_kp,
joint_limits=config.right_arm_config.joint_limits,
)
self.left_arm = OpenArmFollower(left_arm_config)
self.right_arm = OpenArmFollower(right_arm_config)
# Only for compatibility with other parts of the codebase that expect a `robot.cameras` attribute
self.cameras = {**self.left_arm.cameras, **self.right_arm.cameras}
@property
def _motors_ft(self) -> dict[str, type]:
left_arm_motors_ft = self.left_arm._motors_ft
right_arm_motors_ft = self.right_arm._motors_ft
return {
**{f"left_{k}": v for k, v in left_arm_motors_ft.items()},
**{f"right_{k}": v for k, v in right_arm_motors_ft.items()},
}
@property
def _cameras_ft(self) -> dict[str, tuple]:
left_arm_cameras_ft = self.left_arm._cameras_ft
right_arm_cameras_ft = self.right_arm._cameras_ft
return {
**{f"left_{k}": v for k, v in left_arm_cameras_ft.items()},
**{f"right_{k}": v for k, v in right_arm_cameras_ft.items()},
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@property
def is_calibrated(self) -> bool:
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
def calibrate(self) -> None:
self.left_arm.calibrate()
self.right_arm.calibrate()
def configure(self) -> None:
self.left_arm.configure()
self.right_arm.configure()
def setup_motors(self) -> None:
raise NotImplementedError(
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
def get_observation(self) -> RobotObservation:
obs_dict = {}
# Add "left_" prefix
left_obs = self.left_arm.get_observation()
obs_dict.update({f"left_{key}": value for key, value in left_obs.items()})
# Add "right_" prefix
right_obs = self.right_arm.get_observation()
obs_dict.update({f"right_{key}": value for key, value in right_obs.items()})
return obs_dict
def send_action(
self,
action: RobotAction,
custom_kp: dict[str, float] | None = None,
custom_kd: dict[str, float] | None = None,
) -> RobotAction:
# Remove "left_" prefix
left_action = {
key.removeprefix("left_"): value for key, value in action.items() if key.startswith("left_")
}
# Remove "right_" prefix
right_action = {
key.removeprefix("right_"): value for key, value in action.items() if key.startswith("right_")
}
sent_action_left = self.left_arm.send_action(left_action, custom_kp, custom_kd)
sent_action_right = self.right_arm.send_action(right_action, custom_kp, custom_kd)
# Add prefixes back
prefixed_sent_action_left = {f"left_{key}": value for key, value in sent_action_left.items()}
prefixed_sent_action_right = {f"right_{key}": value for key, value in sent_action_right.items()}
return {**prefixed_sent_action_left, **prefixed_sent_action_right}
def disconnect(self):
self.left_arm.disconnect()
self.right_arm.disconnect()
@@ -0,0 +1,30 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from lerobot.robots.openarm_follower import OpenArmFollowerConfigBase
from ..config import RobotConfig
@RobotConfig.register_subclass("bi_openarm_follower")
@dataclass
class BiOpenArmFollowerConfig(RobotConfig):
"""Configuration class for Bi OpenArm Follower robots."""
left_arm_config: OpenArmFollowerConfigBase
right_arm_config: OpenArmFollowerConfigBase
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_openarm_follower import OpenArmFollowerConfig
from .config_openarm_follower import OpenArmFollowerConfig, OpenArmFollowerConfigBase
from .openarm_follower import OpenArmFollower
__all__ = ["OpenArmFollower", "OpenArmFollowerConfig"]
__all__ = ["OpenArmFollower", "OpenArmFollowerConfig", "OpenArmFollowerConfigBase"]
@@ -43,10 +43,9 @@ RIGHT_DEFAULT_JOINTS_LIMITS: dict[str, tuple[float, float]] = {
}
@RobotConfig.register_subclass("openarm_follower")
@dataclass
class OpenArmFollowerConfig(RobotConfig):
"""Configuration for the OpenArms follower robot with Damiao motors."""
class OpenArmFollowerConfigBase:
"""Base configuration for the OpenArms follower robot with Damiao motors."""
# CAN interfaces - one per arm
# arm CAN interface (e.g., "can1")
@@ -115,3 +114,9 @@ class OpenArmFollowerConfig(RobotConfig):
"gripper": (-5.0, 0.0),
}
)
@RobotConfig.register_subclass("openarm_follower")
@dataclass
class OpenArmFollowerConfig(RobotConfig, OpenArmFollowerConfigBase):
pass
+4
View File
@@ -64,6 +64,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
from .openarm_follower import OpenArmFollower
return OpenArmFollower(config)
elif config.type == "bi_openarm_follower":
from .bi_openarm_follower import BiOpenArmFollower
return BiOpenArmFollower(config)
elif config.type == "mock_robot":
from tests.mocks.mock_robot import MockRobot
+2
View File
@@ -36,6 +36,7 @@ from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraCon
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_openarm_follower,
bi_so_follower,
hope_jr,
koch_follower,
@@ -48,6 +49,7 @@ from lerobot.robots import ( # noqa: F401
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
bi_openarm_leader,
bi_so_leader,
homunculus,
koch_leader,
@@ -44,6 +44,7 @@ import numpy as np
from lerobot.model.kinematics import RobotKinematics
from lerobot.robots import ( # noqa: F401
RobotConfig,
bi_openarm_follower,
bi_so_follower,
koch_follower,
make_robot_from_config,
@@ -53,6 +54,7 @@ from lerobot.robots import ( # noqa: F401
)
from lerobot.teleoperators import ( # noqa: F401
TeleoperatorConfig,
bi_openarm_leader,
bi_so_leader,
gamepad,
koch_leader,
+2
View File
@@ -98,6 +98,7 @@ from lerobot.processor.rename_processor import rename_stats
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_openarm_follower,
bi_so_follower,
earthrover_mini_plus,
hope_jr,
@@ -112,6 +113,7 @@ from lerobot.robots import ( # noqa: F401
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
bi_openarm_leader,
bi_so_leader,
homunculus,
koch_leader,
+1
View File
@@ -53,6 +53,7 @@ from lerobot.processor import (
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_openarm_follower,
bi_so_follower,
earthrover_mini_plus,
hope_jr,
@@ -70,6 +70,7 @@ from lerobot.processor import (
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_openarm_follower,
bi_so_follower,
earthrover_mini_plus,
hope_jr,
@@ -84,6 +85,7 @@ from lerobot.robots import ( # noqa: F401
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
bi_openarm_leader,
bi_so_leader,
gamepad,
homunculus,
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .bi_openarm_leader import BiOpenArmLeader
from .config_bi_openarm_leader import BiOpenArmLeaderConfig
__all__ = ["BiOpenArmLeader", "BiOpenArmLeaderConfig"]
@@ -0,0 +1,131 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from functools import cached_property
from lerobot.processor import RobotAction
from lerobot.teleoperators.openarm_leader import OpenArmLeaderConfig
from ..openarm_leader import OpenArmLeader
from ..teleoperator import Teleoperator
from .config_bi_openarm_leader import BiOpenArmLeaderConfig
logger = logging.getLogger(__name__)
class BiOpenArmLeader(Teleoperator):
"""
Bimanual OpenArm Leader Arms
"""
config_class = BiOpenArmLeaderConfig
name = "bi_openarm_leader"
def __init__(self, config: BiOpenArmLeaderConfig):
super().__init__(config)
self.config = config
left_arm_config = OpenArmLeaderConfig(
id=f"{config.id}_left" if config.id else None,
calibration_dir=config.calibration_dir,
port=config.left_arm_config.port,
can_interface=config.left_arm_config.can_interface,
use_can_fd=config.left_arm_config.use_can_fd,
can_bitrate=config.left_arm_config.can_bitrate,
can_data_bitrate=config.left_arm_config.can_data_bitrate,
motor_config=config.left_arm_config.motor_config,
manual_control=config.left_arm_config.manual_control,
position_kd=config.left_arm_config.position_kd,
position_kp=config.left_arm_config.position_kp,
)
right_arm_config = OpenArmLeaderConfig(
id=f"{config.id}_right" if config.id else None,
calibration_dir=config.calibration_dir,
port=config.right_arm_config.port,
can_interface=config.right_arm_config.can_interface,
use_can_fd=config.right_arm_config.use_can_fd,
can_bitrate=config.right_arm_config.can_bitrate,
can_data_bitrate=config.right_arm_config.can_data_bitrate,
motor_config=config.right_arm_config.motor_config,
manual_control=config.right_arm_config.manual_control,
position_kd=config.right_arm_config.position_kd,
position_kp=config.right_arm_config.position_kp,
)
self.left_arm = OpenArmLeader(left_arm_config)
self.right_arm = OpenArmLeader(right_arm_config)
@cached_property
def action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.action_features
right_arm_features = self.right_arm.action_features
return {
**{f"left_{k}": v for k, v in left_arm_features.items()},
**{f"right_{k}": v for k, v in right_arm_features.items()},
}
@cached_property
def feedback_features(self) -> dict[str, type]:
return {}
@property
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@property
def is_calibrated(self) -> bool:
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
def calibrate(self) -> None:
self.left_arm.calibrate()
self.right_arm.calibrate()
def configure(self) -> None:
self.left_arm.configure()
self.right_arm.configure()
def setup_motors(self) -> None:
raise NotImplementedError(
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
def get_action(self) -> RobotAction:
action_dict = {}
# Add "left_" prefix
left_action = self.left_arm.get_action()
action_dict.update({f"left_{key}": value for key, value in left_action.items()})
# Add "right_" prefix
right_action = self.right_arm.get_action()
action_dict.update({f"right_{key}": value for key, value in right_action.items()})
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError
def disconnect(self) -> None:
self.left_arm.disconnect()
self.right_arm.disconnect()
@@ -0,0 +1,30 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from lerobot.teleoperators.openarm_leader import OpenArmLeaderConfigBase
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("bi_openarm_leader")
@dataclass
class BiOpenArmLeaderConfig(TeleoperatorConfig):
"""Configuration class for Bi OpenArm Follower robots."""
left_arm_config: OpenArmLeaderConfigBase
right_arm_config: OpenArmLeaderConfigBase
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_openarm_leader import OpenArmLeaderConfig
from .config_openarm_leader import OpenArmLeaderConfig, OpenArmLeaderConfigBase
from .openarm_leader import OpenArmLeader
__all__ = ["OpenArmLeader", "OpenArmLeaderConfig"]
__all__ = ["OpenArmLeader", "OpenArmLeaderConfig", "OpenArmLeaderConfigBase"]
@@ -19,10 +19,9 @@ from dataclasses import dataclass, field
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("openarm_leader")
@dataclass
class OpenArmLeaderConfig(TeleoperatorConfig):
"""Configuration for the OpenArms leader/teleoperator with Damiao motors."""
class OpenArmLeaderConfigBase:
"""Base configuration for the OpenArms leader/teleoperator with Damiao motors."""
# CAN interfaces - one per arm
# Arm CAN interface (e.g., "can3")
@@ -68,3 +67,9 @@ class OpenArmLeaderConfig(TeleoperatorConfig):
default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0]
)
position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
@TeleoperatorConfig.register_subclass("openarm_leader")
@dataclass
class OpenArmLeaderConfig(TeleoperatorConfig, OpenArmLeaderConfigBase):
pass
+4
View File
@@ -91,6 +91,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
from .openarm_leader import OpenArmLeader
return OpenArmLeader(config)
elif config.type == "bi_openarm_leader":
from .bi_openarm_leader import BiOpenArmLeader
return BiOpenArmLeader(config)
else:
try:
return cast("Teleoperator", make_device_from_device_class(config))