From b9c4dd3d12d8bf9db4e657c2338e832532ae9a1c Mon Sep 17 00:00:00 2001 From: Steven Palma Date: Thu, 11 Jun 2026 15:54:52 +0200 Subject: [PATCH] refactor(robots): mixin for bi classes --- .../bi_openarm_follower.py | 31 +-------- .../bi_rebot_b601_follower.py | 31 +-------- .../robots/bi_so_follower/bi_so_follower.py | 31 +-------- .../bi_openarm_leader/bi_openarm_leader.py | 31 +-------- .../bi_openarm_mini/bi_openarm_mini.py | 31 +-------- .../bi_rebot_102_leader.py | 31 +-------- .../bi_so_leader/bi_so_leader.py | 31 +-------- src/lerobot/utils/bimanual.py | 63 +++++++++++++++++++ 8 files changed, 84 insertions(+), 196 deletions(-) create mode 100644 src/lerobot/utils/bimanual.py diff --git a/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py b/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py index 6721cb367..1613fa177 100644 --- a/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py +++ b/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction, RobotObservation -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..openarm_follower import OpenArmFollower, OpenArmFollowerConfig from ..robot import Robot @@ -27,7 +28,7 @@ from .config_bi_openarm_follower import BiOpenArmFollowerConfig logger = logging.getLogger(__name__) -class BiOpenArmFollower(Robot): +class BiOpenArmFollower(BimanualMixin, Robot): """ Bimanual OpenArm Follower Arms """ @@ -119,27 +120,6 @@ class BiOpenArmFollower(Robot): def action_features(self) -> dict[str, type]: return self._motors_ft - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - def setup_motors(self) -> None: raise NotImplementedError( "Motor ID configuration is typically done via manufacturer tools for CAN motors." @@ -183,8 +163,3 @@ class BiOpenArmFollower(Robot): prefixed_sent_action_right = {f"right_{key}": value for key, value in sent_action_right.items()} return {**prefixed_sent_action_left, **prefixed_sent_action_right} - - @check_if_not_connected - def disconnect(self): - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/robots/bi_rebot_b601_follower/bi_rebot_b601_follower.py b/src/lerobot/robots/bi_rebot_b601_follower/bi_rebot_b601_follower.py index bdf8bf4b2..c320cee8b 100644 --- a/src/lerobot/robots/bi_rebot_b601_follower/bi_rebot_b601_follower.py +++ b/src/lerobot/robots/bi_rebot_b601_follower/bi_rebot_b601_follower.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction, RobotObservation -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..rebot_b601_follower import RebotB601Follower, RebotB601FollowerRobotConfig from ..robot import Robot @@ -27,7 +28,7 @@ from .config_bi_rebot_b601_follower import BiRebotB601FollowerConfig logger = logging.getLogger(__name__) -class BiRebotB601Follower(Robot): +class BiRebotB601Follower(BimanualMixin, Robot): """Bimanual Seeed Studio reBot B601-DM follower. Composes two single-arm :class:`RebotB601Follower` instances. Observation and @@ -113,27 +114,6 @@ class BiRebotB601Follower(Robot): def action_features(self) -> dict[str, type]: return self._motors_ft - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - @check_if_not_connected def get_observation(self) -> RobotObservation: obs_dict: RobotObservation = {} @@ -159,8 +139,3 @@ class BiRebotB601Follower(Robot): **{f"left_{k}": v for k, v in sent_action_left.items()}, **{f"right_{k}": v for k, v in sent_action_right.items()}, } - - @check_if_not_connected - def disconnect(self) -> None: - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/robots/bi_so_follower/bi_so_follower.py b/src/lerobot/robots/bi_so_follower/bi_so_follower.py index 9b599d9e8..39c467cfb 100644 --- a/src/lerobot/robots/bi_so_follower/bi_so_follower.py +++ b/src/lerobot/robots/bi_so_follower/bi_so_follower.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction, RobotObservation -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..robot import Robot from ..so_follower import SOFollower, SOFollowerRobotConfig @@ -27,7 +28,7 @@ from .config_bi_so_follower import BiSOFollowerConfig logger = logging.getLogger(__name__) -class BiSOFollower(Robot): +class BiSOFollower(BimanualMixin, Robot): """ [Bimanual SO Follower Arms](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio """ @@ -104,27 +105,6 @@ class BiSOFollower(Robot): def action_features(self) -> dict[str, type]: return self._motors_ft - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - def setup_motors(self) -> None: self.left_arm.setup_motors() self.right_arm.setup_motors() @@ -162,8 +142,3 @@ class BiSOFollower(Robot): prefixed_sent_action_right = {f"right_{key}": value for key, value in sent_action_right.items()} return {**prefixed_sent_action_left, **prefixed_sent_action_right} - - @check_if_not_connected - def disconnect(self): - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py b/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py index 2d2c23f9c..640c45a57 100644 --- a/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py +++ b/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..openarm_leader import OpenArmLeader, OpenArmLeaderConfig from ..teleoperator import Teleoperator @@ -27,7 +28,7 @@ from .config_bi_openarm_leader import BiOpenArmLeaderConfig logger = logging.getLogger(__name__) -class BiOpenArmLeader(Teleoperator): +class BiOpenArmLeader(BimanualMixin, Teleoperator): """ Bimanual OpenArm Leader Arms """ @@ -86,27 +87,6 @@ class BiOpenArmLeader(Teleoperator): def feedback_features(self) -> dict[str, type]: return {} - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - def setup_motors(self) -> None: raise NotImplementedError( "Motor ID configuration is typically done via manufacturer tools for CAN motors." @@ -129,8 +109,3 @@ class BiOpenArmLeader(Teleoperator): def send_feedback(self, feedback: dict[str, float]) -> None: # TODO: Implement force feedback raise NotImplementedError - - @check_if_not_connected - def disconnect(self) -> None: - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py b/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py index 3ec6073b9..41ebdba0d 100644 --- a/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py +++ b/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..openarm_mini import OpenArmMini, OpenArmMiniConfig from ..teleoperator import Teleoperator @@ -27,7 +28,7 @@ from .config_bi_openarm_mini import BiOpenArmMiniConfig logger = logging.getLogger(__name__) -class BiOpenArmMini(Teleoperator): +class BiOpenArmMini(BimanualMixin, Teleoperator): """Bimanual OpenArm Mini teleoperator. Composes two single-arm :class:`OpenArmMini` instances. Action and feedback @@ -77,27 +78,6 @@ class BiOpenArmMini(Teleoperator): **{f"right_{k}": v for k, v in self.right_arm.feedback_features.items()}, } - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - def setup_motors(self) -> None: self.left_arm.setup_motors() self.right_arm.setup_motors() @@ -119,8 +99,3 @@ class BiOpenArmMini(Teleoperator): self.left_arm.send_feedback(left_fb) if right_fb: self.right_arm.send_feedback(right_fb) - - @check_if_not_connected - def disconnect(self) -> None: - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/teleoperators/bi_rebot_102_leader/bi_rebot_102_leader.py b/src/lerobot/teleoperators/bi_rebot_102_leader/bi_rebot_102_leader.py index 6cf364640..9da866b43 100644 --- a/src/lerobot/teleoperators/bi_rebot_102_leader/bi_rebot_102_leader.py +++ b/src/lerobot/teleoperators/bi_rebot_102_leader/bi_rebot_102_leader.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..rebot_102_leader import RebotArm102Leader, RebotArm102LeaderTeleopConfig from ..teleoperator import Teleoperator @@ -27,7 +28,7 @@ from .config_bi_rebot_102_leader import BiRebot102LeaderConfig logger = logging.getLogger(__name__) -class BiRebot102Leader(Teleoperator): +class BiRebot102Leader(BimanualMixin, Teleoperator): """Bimanual Seeed Studio StarArm102 / reBot Arm 102 leader. Composes two single-arm :class:`RebotArm102Leader` instances. Action keys of @@ -76,27 +77,6 @@ class BiRebot102Leader(Teleoperator): def feedback_features(self) -> dict[str, type]: return {} - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - @check_if_not_connected def get_action(self) -> RobotAction: action_dict = {} @@ -106,8 +86,3 @@ class BiRebot102Leader(Teleoperator): def send_feedback(self, feedback: dict[str, float]) -> None: raise NotImplementedError("Feedback is not implemented for the reBot Arm 102 leader.") - - @check_if_not_connected - def disconnect(self) -> None: - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py b/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py index e87c5a30a..6a45b4d91 100644 --- a/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py +++ b/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..so_leader import SOLeader, SOLeaderTeleopConfig from ..teleoperator import Teleoperator @@ -27,7 +28,7 @@ from .config_bi_so_leader import BiSOLeaderConfig logger = logging.getLogger(__name__) -class BiSOLeader(Teleoperator): +class BiSOLeader(BimanualMixin, Teleoperator): """ [Bimanual SO Leader Arms](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio """ @@ -68,27 +69,6 @@ class BiSOLeader(Teleoperator): def feedback_features(self) -> dict[str, type]: return {} - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - def setup_motors(self) -> None: self.left_arm.setup_motors() self.right_arm.setup_motors() @@ -110,8 +90,3 @@ class BiSOLeader(Teleoperator): def send_feedback(self, feedback: dict[str, float]) -> None: # TODO: Implement force feedback raise NotImplementedError - - @check_if_not_connected - def disconnect(self) -> None: - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/utils/bimanual.py b/src/lerobot/utils/bimanual.py new file mode 100644 index 000000000..1e212340b --- /dev/null +++ b/src/lerobot/utils/bimanual.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any + +from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected + + +class BimanualMixin: + """Lifecycle delegation for bimanual robots and teleoperators. + + Concrete subclasses must populate ``self.left_arm`` and ``self.right_arm`` in + their own ``__init__``. They retain ownership of feature dicts and the + data-routing methods (``get_action`` / ``send_action`` / ``get_observation`` / + ``send_feedback``), which vary per-embodiment. + + Inherit before the ``Robot`` / ``Teleoperator`` base so the mixin's methods + take precedence in the MRO:: + + class BiFooFollower(BimanualMixin, Robot): ... + """ + + left_arm: Any + right_arm: Any + + @property + def is_connected(self) -> bool: + return self.left_arm.is_connected and self.right_arm.is_connected + + @property + def is_calibrated(self) -> bool: + return self.left_arm.is_calibrated and self.right_arm.is_calibrated + + @check_if_already_connected + def connect(self, calibrate: bool = True) -> None: + self.left_arm.connect(calibrate) + self.right_arm.connect(calibrate) + + def calibrate(self) -> None: + self.left_arm.calibrate() + self.right_arm.calibrate() + + def configure(self) -> None: + self.left_arm.configure() + self.right_arm.configure() + + @check_if_not_connected + def disconnect(self) -> None: + self.left_arm.disconnect() + self.right_arm.disconnect()