diff --git a/docs/source/hil_data_collection.mdx b/docs/source/hil_data_collection.mdx index ba68959d1..c7df0631e 100644 --- a/docs/source/hil_data_collection.mdx +++ b/docs/source/hil_data_collection.mdx @@ -57,11 +57,11 @@ The `lerobot-rollout --strategy.type=dagger` mode requires **teleoperators with **Compatible teleoperators:** -- `openarm_mini` - OpenArm Mini +- `bi_openarm_mini` - Bimanual OpenArm Mini - `so_leader` - SO100 / SO101 leader arm > [!IMPORTANT] -> The provided commands default to `bi_openarm_follower` + `openarm_mini`. +> The provided commands default to `bi_openarm_follower` + `bi_openarm_mini`. > `so_follower` + `so_leader` configs are also registered and can be used via CLI flags. --- @@ -104,9 +104,9 @@ lerobot-rollout --strategy.type=dagger \ --robot.right_arm_config.port=can0 \ --robot.right_arm_config.side=right \ --robot.cameras='{left_wrist: {type: opencv, index_or_path: "/dev/video0", width: 1280, height: 720, fps: 30}, right_wrist: {type: opencv, index_or_path: "/dev/video4", width: 1280, height: 720, fps: 30}, base: {type: opencv, index_or_path: "/dev/video2", width: 640, height: 480, fps: 30}}' \ - --teleop.type=openarm_mini \ - --teleop.port_left=/dev/ttyACM0 \ - --teleop.port_right=/dev/ttyACM1 \ + --teleop.type=bi_openarm_mini \ + --teleop.left_arm_config.port=/dev/ttyACM0 \ + --teleop.right_arm_config.port=/dev/ttyACM1 \ --policy.path=outputs/pretrain/checkpoints/last/pretrained_model \ --dataset.repo_id=your-username/rollout_hil_dataset \ --dataset.single_task="Fold the T-shirt properly" \ @@ -131,9 +131,9 @@ lerobot-rollout --strategy.type=dagger \ --robot.right_arm_config.port=can0 \ --robot.right_arm_config.side=right \ --robot.cameras='{left_wrist: {type: opencv, index_or_path: "/dev/video0", width: 1280, height: 720, fps: 30}, right_wrist: {type: opencv, index_or_path: "/dev/video4", width: 1280, height: 720, fps: 30}, base: {type: opencv, index_or_path: "/dev/video2", width: 640, height: 480, fps: 30}}' \ - --teleop.type=openarm_mini \ - --teleop.port_left=/dev/ttyACM0 \ - --teleop.port_right=/dev/ttyACM1 \ + --teleop.type=bi_openarm_mini \ + --teleop.left_arm_config.port=/dev/ttyACM0 \ + --teleop.right_arm_config.port=/dev/ttyACM1 \ --policy.path=outputs/pretrain/checkpoints/last/pretrained_model \ --dataset.repo_id=your-username/rollout_hil_rtc_dataset \ --dataset.single_task="Fold the T-shirt properly" \ diff --git a/docs/source/inference.mdx b/docs/source/inference.mdx index 78d9faa30..31405b5de 100644 --- a/docs/source/inference.mdx +++ b/docs/source/inference.mdx @@ -117,7 +117,7 @@ lerobot-rollout \ --strategy.num_episodes=20 \ --policy.path=outputs/pretrain/checkpoints/last/pretrained_model \ --robot.type=bi_openarm_follower \ - --teleop.type=openarm_mini \ + --teleop.type=bi_openarm_mini \ --dataset.repo_id=${HF_USER}/rollout_hil_data \ --dataset.single_task="Fold the T-shirt" ``` diff --git a/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py b/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py index b6f446d9c..1613fa177 100644 --- a/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py +++ b/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction, RobotObservation -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..openarm_follower import OpenArmFollower, OpenArmFollowerConfig from ..robot import Robot @@ -27,7 +28,7 @@ from .config_bi_openarm_follower import BiOpenArmFollowerConfig logger = logging.getLogger(__name__) -class BiOpenArmFollower(Robot): +class BiOpenArmFollower(BimanualMixin, Robot): """ Bimanual OpenArm Follower Arms """ @@ -39,15 +40,17 @@ class BiOpenArmFollower(Robot): super().__init__(config) self.config = config - # Top-level cameras are distributed evenly: each arm's OpenArmFollower - # will only open the cameras assigned to it. Per-arm cameras are used - # as fallback when top-level cameras are empty. - if config.cameras: - left_cameras = config.cameras - right_cameras = {} - else: - left_cameras = config.left_arm_config.cameras - right_cameras = config.right_arm_config.cameras + # Top-level cameras are opened by `left_arm` for convenience, but their + # keys stay unprefixed in observations (tracked via `_top_level_cam_keys`). + self._top_level_cam_keys = set(config.cameras) + _collisions = self._top_level_cam_keys & set( + config.left_arm_config.cameras + ) | self._top_level_cam_keys & set(config.right_arm_config.cameras) + if _collisions: + raise ValueError( + f"Top-level camera names collide with per-arm camera names: {sorted(_collisions)}" + ) + left_arm_cameras = {**config.left_arm_config.cameras, **config.cameras} left_arm_config = OpenArmFollowerConfig( id=f"{config.id}_left" if config.id else None, @@ -56,7 +59,7 @@ class BiOpenArmFollower(Robot): disable_torque_on_disconnect=config.left_arm_config.disable_torque_on_disconnect, use_velocity_and_torque=config.left_arm_config.use_velocity_and_torque, max_relative_target=config.left_arm_config.max_relative_target, - cameras=left_cameras, + cameras=left_arm_cameras, side=config.left_arm_config.side, can_interface=config.left_arm_config.can_interface, use_can_fd=config.left_arm_config.use_can_fd, @@ -75,7 +78,7 @@ class BiOpenArmFollower(Robot): disable_torque_on_disconnect=config.right_arm_config.disable_torque_on_disconnect, use_velocity_and_torque=config.right_arm_config.use_velocity_and_torque, max_relative_target=config.right_arm_config.max_relative_target, - cameras=right_cameras, + cameras=config.right_arm_config.cameras, side=config.right_arm_config.side, can_interface=config.right_arm_config.can_interface, use_can_fd=config.right_arm_config.use_can_fd, @@ -95,22 +98,19 @@ class BiOpenArmFollower(Robot): @property def _motors_ft(self) -> dict[str, type]: - left_arm_motors_ft = self.left_arm._motors_ft - right_arm_motors_ft = self.right_arm._motors_ft - - # Right first, then left — matches the teleoperator (OpenArmMini) ordering - # and the dataset feature names recorded during data collection. return { - **{f"right_{k}": v for k, v in right_arm_motors_ft.items()}, - **{f"left_{k}": v for k, v in left_arm_motors_ft.items()}, + **{f"left_{k}": v for k, v in self.left_arm._motors_ft.items()}, + **{f"right_{k}": v for k, v in self.right_arm._motors_ft.items()}, } @property def _cameras_ft(self) -> dict[str, tuple]: - # Cameras already have unique user-chosen names (e.g. "left_wrist", "base", - # "right_wrist"), so we merge them directly — unlike motors which need the - # left_/right_ prefix to disambiguate identical per-arm joint names. - return {**self.left_arm._cameras_ft, **self.right_arm._cameras_ft} + out: dict[str, tuple] = {} + for k, v in self.left_arm._cameras_ft.items(): + out[k if k in self._top_level_cam_keys else f"left_{k}"] = v + for k, v in self.right_arm._cameras_ft.items(): + out[f"right_{k}"] = v + return out @cached_property def observation_features(self) -> dict[str, type | tuple]: @@ -120,27 +120,6 @@ class BiOpenArmFollower(Robot): def action_features(self) -> dict[str, type]: return self._motors_ft - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - def setup_motors(self) -> None: raise NotImplementedError( "Motor ID configuration is typically done via manufacturer tools for CAN motors." @@ -148,21 +127,15 @@ class BiOpenArmFollower(Robot): @check_if_not_connected def get_observation(self) -> RobotObservation: - obs_dict = {} + obs_dict: RobotObservation = {} - # Camera keys that should NOT get the arm prefix (they already have unique names) - left_cam_keys = set(self.left_arm.cameras.keys()) - right_cam_keys = set(self.right_arm.cameras.keys()) + # Add "left_" prefix to per-arm keys; keep top-level camera keys unprefixed. + for key, value in self.left_arm.get_observation().items(): + obs_dict[key if key in self._top_level_cam_keys else f"left_{key}"] = value - # Right first, then left — matches the teleoperator (OpenArmMini) ordering - # and the dataset feature names recorded during data collection. - right_obs = self.right_arm.get_observation() - for key, value in right_obs.items(): - obs_dict[key if key in right_cam_keys else f"right_{key}"] = value - - left_obs = self.left_arm.get_observation() - for key, value in left_obs.items(): - obs_dict[key if key in left_cam_keys else f"left_{key}"] = value + # Add "right_" prefix + for key, value in self.right_arm.get_observation().items(): + obs_dict[f"right_{key}"] = value return obs_dict @@ -189,9 +162,4 @@ class BiOpenArmFollower(Robot): prefixed_sent_action_left = {f"left_{key}": value for key, value in sent_action_left.items()} prefixed_sent_action_right = {f"right_{key}": value for key, value in sent_action_right.items()} - return {**prefixed_sent_action_right, **prefixed_sent_action_left} - - @check_if_not_connected - def disconnect(self): - self.left_arm.disconnect() - self.right_arm.disconnect() + return {**prefixed_sent_action_left, **prefixed_sent_action_right} diff --git a/src/lerobot/robots/bi_openarm_follower/config_bi_openarm_follower.py b/src/lerobot/robots/bi_openarm_follower/config_bi_openarm_follower.py index 9ed56aeac..d1c9335a0 100644 --- a/src/lerobot/robots/bi_openarm_follower/config_bi_openarm_follower.py +++ b/src/lerobot/robots/bi_openarm_follower/config_bi_openarm_follower.py @@ -32,5 +32,7 @@ class BiOpenArmFollowerConfig(RobotConfig): left_arm_config: OpenArmFollowerConfigBase right_arm_config: OpenArmFollowerConfigBase - # Top-level cameras shared across both arms. + # Top-level cameras not attached to a specific side. Keys are kept as-is in + # observations (no `left_`/`right_` prefix). Per-arm cameras (declared on + # `{left,right}_arm_config.cameras`) are prefixed. cameras: dict[str, CameraConfig] = field(default_factory=dict) diff --git a/src/lerobot/robots/bi_rebot_b601_follower/bi_rebot_b601_follower.py b/src/lerobot/robots/bi_rebot_b601_follower/bi_rebot_b601_follower.py index bd19f1b62..c320cee8b 100644 --- a/src/lerobot/robots/bi_rebot_b601_follower/bi_rebot_b601_follower.py +++ b/src/lerobot/robots/bi_rebot_b601_follower/bi_rebot_b601_follower.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction, RobotObservation -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..rebot_b601_follower import RebotB601Follower, RebotB601FollowerRobotConfig from ..robot import Robot @@ -27,7 +28,7 @@ from .config_bi_rebot_b601_follower import BiRebotB601FollowerConfig logger = logging.getLogger(__name__) -class BiRebotB601Follower(Robot): +class BiRebotB601Follower(BimanualMixin, Robot): """Bimanual Seeed Studio reBot B601-DM follower. Composes two single-arm :class:`RebotB601Follower` instances. Observation and @@ -41,6 +42,18 @@ class BiRebotB601Follower(Robot): super().__init__(config) self.config = config + # Top-level cameras are opened by `left_arm` for convenience, but their + # keys stay unprefixed in observations (tracked via `_top_level_cam_keys`). + self._top_level_cam_keys = set(config.cameras) + _collisions = self._top_level_cam_keys & set( + config.left_arm_config.cameras + ) | self._top_level_cam_keys & set(config.right_arm_config.cameras) + if _collisions: + raise ValueError( + f"Top-level camera names collide with per-arm camera names: {sorted(_collisions)}" + ) + left_arm_cameras = {**config.left_arm_config.cameras, **config.cameras} + left_arm_config = RebotB601FollowerRobotConfig( id=f"{config.id}_left" if config.id else None, calibration_dir=config.calibration_dir, @@ -49,7 +62,7 @@ class BiRebotB601Follower(Robot): dm_serial_baud=config.left_arm_config.dm_serial_baud, disable_torque_on_disconnect=config.left_arm_config.disable_torque_on_disconnect, max_relative_target=config.left_arm_config.max_relative_target, - cameras=config.left_arm_config.cameras, + cameras=left_arm_cameras, motor_can_ids=config.left_arm_config.motor_can_ids, pos_vel_velocity=config.left_arm_config.pos_vel_velocity, gripper_torque_ratio=config.left_arm_config.gripper_torque_ratio, @@ -86,10 +99,12 @@ class BiRebotB601Follower(Robot): @property def _cameras_ft(self) -> dict[str, tuple]: - return { - **{f"left_{k}": v for k, v in self.left_arm._cameras_ft.items()}, - **{f"right_{k}": v for k, v in self.right_arm._cameras_ft.items()}, - } + out: dict[str, tuple] = {} + for k, v in self.left_arm._cameras_ft.items(): + out[k if k in self._top_level_cam_keys else f"left_{k}"] = v + for k, v in self.right_arm._cameras_ft.items(): + out[f"right_{k}"] = v + return out @cached_property def observation_features(self) -> dict[str, type | tuple]: @@ -99,32 +114,13 @@ class BiRebotB601Follower(Robot): def action_features(self) -> dict[str, type]: return self._motors_ft - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - @check_if_not_connected def get_observation(self) -> RobotObservation: - obs_dict = {} - obs_dict.update({f"left_{k}": v for k, v in self.left_arm.get_observation().items()}) - obs_dict.update({f"right_{k}": v for k, v in self.right_arm.get_observation().items()}) + obs_dict: RobotObservation = {} + for k, v in self.left_arm.get_observation().items(): + obs_dict[k if k in self._top_level_cam_keys else f"left_{k}"] = v + for k, v in self.right_arm.get_observation().items(): + obs_dict[f"right_{k}"] = v return obs_dict @check_if_not_connected @@ -143,8 +139,3 @@ class BiRebotB601Follower(Robot): **{f"left_{k}": v for k, v in sent_action_left.items()}, **{f"right_{k}": v for k, v in sent_action_right.items()}, } - - @check_if_not_connected - def disconnect(self) -> None: - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/robots/bi_rebot_b601_follower/config_bi_rebot_b601_follower.py b/src/lerobot/robots/bi_rebot_b601_follower/config_bi_rebot_b601_follower.py index 079b7a355..fce2967a8 100644 --- a/src/lerobot/robots/bi_rebot_b601_follower/config_bi_rebot_b601_follower.py +++ b/src/lerobot/robots/bi_rebot_b601_follower/config_bi_rebot_b601_follower.py @@ -14,7 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import dataclass +from dataclasses import dataclass, field + +from lerobot.cameras import CameraConfig from ..config import RobotConfig from ..rebot_b601_follower import RebotB601FollowerConfig @@ -27,3 +29,8 @@ class BiRebotB601FollowerConfig(RobotConfig): left_arm_config: RebotB601FollowerConfig right_arm_config: RebotB601FollowerConfig + + # Top-level cameras not attached to a specific side. Keys are kept as-is in + # observations (no `left_`/`right_` prefix). Per-arm cameras (declared on + # `{left,right}_arm_config.cameras`) are prefixed. + cameras: dict[str, CameraConfig] = field(default_factory=dict) diff --git a/src/lerobot/robots/bi_so_follower/bi_so_follower.py b/src/lerobot/robots/bi_so_follower/bi_so_follower.py index f592150a6..39c467cfb 100644 --- a/src/lerobot/robots/bi_so_follower/bi_so_follower.py +++ b/src/lerobot/robots/bi_so_follower/bi_so_follower.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction, RobotObservation -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..robot import Robot from ..so_follower import SOFollower, SOFollowerRobotConfig @@ -27,7 +28,7 @@ from .config_bi_so_follower import BiSOFollowerConfig logger = logging.getLogger(__name__) -class BiSOFollower(Robot): +class BiSOFollower(BimanualMixin, Robot): """ [Bimanual SO Follower Arms](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio """ @@ -39,6 +40,18 @@ class BiSOFollower(Robot): super().__init__(config) self.config = config + # Top-level cameras are opened by `left_arm` for convenience, but their + # keys stay unprefixed in observations (tracked via `_top_level_cam_keys`). + self._top_level_cam_keys = set(config.cameras) + _collisions = self._top_level_cam_keys & set( + config.left_arm_config.cameras + ) | self._top_level_cam_keys & set(config.right_arm_config.cameras) + if _collisions: + raise ValueError( + f"Top-level camera names collide with per-arm camera names: {sorted(_collisions)}" + ) + left_arm_cameras = {**config.left_arm_config.cameras, **config.cameras} + left_arm_config = SOFollowerRobotConfig( id=f"{config.id}_left" if config.id else None, calibration_dir=config.calibration_dir, @@ -46,7 +59,7 @@ class BiSOFollower(Robot): disable_torque_on_disconnect=config.left_arm_config.disable_torque_on_disconnect, max_relative_target=config.left_arm_config.max_relative_target, use_degrees=config.left_arm_config.use_degrees, - cameras=config.left_arm_config.cameras, + cameras=left_arm_cameras, ) right_arm_config = SOFollowerRobotConfig( @@ -77,13 +90,12 @@ class BiSOFollower(Robot): @property def _cameras_ft(self) -> dict[str, tuple]: - left_arm_cameras_ft = self.left_arm._cameras_ft - right_arm_cameras_ft = self.right_arm._cameras_ft - - return { - **{f"left_{k}": v for k, v in left_arm_cameras_ft.items()}, - **{f"right_{k}": v for k, v in right_arm_cameras_ft.items()}, - } + out: dict[str, tuple] = {} + for k, v in self.left_arm._cameras_ft.items(): + out[k if k in self._top_level_cam_keys else f"left_{k}"] = v + for k, v in self.right_arm._cameras_ft.items(): + out[f"right_{k}"] = v + return out @cached_property def observation_features(self) -> dict[str, type | tuple]: @@ -93,42 +105,21 @@ class BiSOFollower(Robot): def action_features(self) -> dict[str, type]: return self._motors_ft - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - def setup_motors(self) -> None: self.left_arm.setup_motors() self.right_arm.setup_motors() @check_if_not_connected def get_observation(self) -> RobotObservation: - obs_dict = {} + obs_dict: RobotObservation = {} - # Add "left_" prefix - left_obs = self.left_arm.get_observation() - obs_dict.update({f"left_{key}": value for key, value in left_obs.items()}) + # Add "left_" prefix to per-arm keys; keep top-level camera keys unprefixed. + for key, value in self.left_arm.get_observation().items(): + obs_dict[key if key in self._top_level_cam_keys else f"left_{key}"] = value # Add "right_" prefix - right_obs = self.right_arm.get_observation() - obs_dict.update({f"right_{key}": value for key, value in right_obs.items()}) + for key, value in self.right_arm.get_observation().items(): + obs_dict[f"right_{key}"] = value return obs_dict @@ -151,8 +142,3 @@ class BiSOFollower(Robot): prefixed_sent_action_right = {f"right_{key}": value for key, value in sent_action_right.items()} return {**prefixed_sent_action_left, **prefixed_sent_action_right} - - @check_if_not_connected - def disconnect(self): - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/robots/bi_so_follower/config_bi_so_follower.py b/src/lerobot/robots/bi_so_follower/config_bi_so_follower.py index 97afbab4f..0c68e7cb1 100644 --- a/src/lerobot/robots/bi_so_follower/config_bi_so_follower.py +++ b/src/lerobot/robots/bi_so_follower/config_bi_so_follower.py @@ -14,7 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import dataclass +from dataclasses import dataclass, field + +from lerobot.cameras import CameraConfig from ..config import RobotConfig from ..so_follower import SOFollowerConfig @@ -27,3 +29,8 @@ class BiSOFollowerConfig(RobotConfig): left_arm_config: SOFollowerConfig right_arm_config: SOFollowerConfig + + # Top-level cameras not attached to a specific side. Keys are kept as-is in + # observations (no `left_`/`right_` prefix). Per-arm cameras (declared on + # `{left,right}_arm_config.cameras`) are prefixed. + cameras: dict[str, CameraConfig] = field(default_factory=dict) diff --git a/src/lerobot/scripts/lerobot_calibrate.py b/src/lerobot/scripts/lerobot_calibrate.py index e43736954..0a9cbee66 100644 --- a/src/lerobot/scripts/lerobot_calibrate.py +++ b/src/lerobot/scripts/lerobot_calibrate.py @@ -54,6 +54,7 @@ from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, homunculus, diff --git a/src/lerobot/scripts/lerobot_find_joint_limits.py b/src/lerobot/scripts/lerobot_find_joint_limits.py index 5b9166a2e..b269eeeff 100644 --- a/src/lerobot/scripts/lerobot_find_joint_limits.py +++ b/src/lerobot/scripts/lerobot_find_joint_limits.py @@ -57,6 +57,7 @@ from lerobot.robots import ( # noqa: F401 from lerobot.teleoperators import ( # noqa: F401 TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, gamepad, diff --git a/src/lerobot/scripts/lerobot_record.py b/src/lerobot/scripts/lerobot_record.py index c411ebf9e..0deb54b90 100644 --- a/src/lerobot/scripts/lerobot_record.py +++ b/src/lerobot/scripts/lerobot_record.py @@ -137,6 +137,7 @@ from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, homunculus, diff --git a/src/lerobot/scripts/lerobot_rollout.py b/src/lerobot/scripts/lerobot_rollout.py index 82a858b9a..8515c4cc9 100644 --- a/src/lerobot/scripts/lerobot_rollout.py +++ b/src/lerobot/scripts/lerobot_rollout.py @@ -174,6 +174,7 @@ from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, homunculus, diff --git a/src/lerobot/scripts/lerobot_setup_motors.py b/src/lerobot/scripts/lerobot_setup_motors.py index 69ebcf5fa..f86c31af2 100644 --- a/src/lerobot/scripts/lerobot_setup_motors.py +++ b/src/lerobot/scripts/lerobot_setup_motors.py @@ -41,6 +41,7 @@ from lerobot.robots import ( # noqa: F401 ) from lerobot.teleoperators import ( # noqa: F401 TeleoperatorConfig, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, koch_leader, diff --git a/src/lerobot/scripts/lerobot_teleoperate.py b/src/lerobot/scripts/lerobot_teleoperate.py index 2ff02bda0..5d806200d 100644 --- a/src/lerobot/scripts/lerobot_teleoperate.py +++ b/src/lerobot/scripts/lerobot_teleoperate.py @@ -89,6 +89,7 @@ from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, gamepad, diff --git a/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py b/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py index 2d2c23f9c..640c45a57 100644 --- a/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py +++ b/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py @@ -18,7 +18,8 @@ import logging from functools import cached_property from lerobot.types import RobotAction -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..openarm_leader import OpenArmLeader, OpenArmLeaderConfig from ..teleoperator import Teleoperator @@ -27,7 +28,7 @@ from .config_bi_openarm_leader import BiOpenArmLeaderConfig logger = logging.getLogger(__name__) -class BiOpenArmLeader(Teleoperator): +class BiOpenArmLeader(BimanualMixin, Teleoperator): """ Bimanual OpenArm Leader Arms """ @@ -86,27 +87,6 @@ class BiOpenArmLeader(Teleoperator): def feedback_features(self) -> dict[str, type]: return {} - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - def setup_motors(self) -> None: raise NotImplementedError( "Motor ID configuration is typically done via manufacturer tools for CAN motors." @@ -129,8 +109,3 @@ class BiOpenArmLeader(Teleoperator): def send_feedback(self, feedback: dict[str, float]) -> None: # TODO: Implement force feedback raise NotImplementedError - - @check_if_not_connected - def disconnect(self) -> None: - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/teleoperators/bi_openarm_leader/config_bi_openarm_leader.py b/src/lerobot/teleoperators/bi_openarm_leader/config_bi_openarm_leader.py index f7ec929ed..6425c179a 100644 --- a/src/lerobot/teleoperators/bi_openarm_leader/config_bi_openarm_leader.py +++ b/src/lerobot/teleoperators/bi_openarm_leader/config_bi_openarm_leader.py @@ -23,7 +23,7 @@ from ..openarm_leader import OpenArmLeaderConfigBase @TeleoperatorConfig.register_subclass("bi_openarm_leader") @dataclass class BiOpenArmLeaderConfig(TeleoperatorConfig): - """Configuration class for Bi OpenArm Follower robots.""" + """Configuration class for Bi OpenArm Leader teleoperators.""" left_arm_config: OpenArmLeaderConfigBase right_arm_config: OpenArmLeaderConfigBase diff --git a/src/lerobot/teleoperators/bi_openarm_mini/__init__.py b/src/lerobot/teleoperators/bi_openarm_mini/__init__.py new file mode 100644 index 000000000..ec1a2dbc8 --- /dev/null +++ b/src/lerobot/teleoperators/bi_openarm_mini/__init__.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .bi_openarm_mini import BiOpenArmMini +from .config_bi_openarm_mini import BiOpenArmMiniConfig + +__all__ = ["BiOpenArmMini", "BiOpenArmMiniConfig"] diff --git a/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py b/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py new file mode 100644 index 000000000..41ebdba0d --- /dev/null +++ b/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from functools import cached_property + +from lerobot.types import RobotAction +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected + +from ..openarm_mini import OpenArmMini, OpenArmMiniConfig +from ..teleoperator import Teleoperator +from .config_bi_openarm_mini import BiOpenArmMiniConfig + +logger = logging.getLogger(__name__) + + +class BiOpenArmMini(BimanualMixin, Teleoperator): + """Bimanual OpenArm Mini teleoperator. + + Composes two single-arm :class:`OpenArmMini` instances. Action and feedback + keys of each arm are namespaced with a ``left_`` / ``right_`` prefix, so a + bimanual leader can teleoperate a bimanual OpenArm follower. + """ + + config_class = BiOpenArmMiniConfig + name = "bi_openarm_mini" + + def __init__(self, config: BiOpenArmMiniConfig): + super().__init__(config) + self.config = config + + # `side` is forced to match left/right regardless of what the user passed + # on the per-arm base config — the bimanual wrapper owns the side semantics. + left_arm_config = OpenArmMiniConfig( + id=f"{config.id}_left" if config.id else None, + calibration_dir=config.calibration_dir, + port=config.left_arm_config.port, + side="left", + use_degrees=config.left_arm_config.use_degrees, + ) + + right_arm_config = OpenArmMiniConfig( + id=f"{config.id}_right" if config.id else None, + calibration_dir=config.calibration_dir, + port=config.right_arm_config.port, + side="right", + use_degrees=config.right_arm_config.use_degrees, + ) + + self.left_arm = OpenArmMini(left_arm_config) + self.right_arm = OpenArmMini(right_arm_config) + + @cached_property + def action_features(self) -> dict[str, type]: + return { + **{f"left_{k}": v for k, v in self.left_arm.action_features.items()}, + **{f"right_{k}": v for k, v in self.right_arm.action_features.items()}, + } + + @cached_property + def feedback_features(self) -> dict[str, type]: + return { + **{f"left_{k}": v for k, v in self.left_arm.feedback_features.items()}, + **{f"right_{k}": v for k, v in self.right_arm.feedback_features.items()}, + } + + def setup_motors(self) -> None: + self.left_arm.setup_motors() + self.right_arm.setup_motors() + + @check_if_not_connected + def get_action(self) -> RobotAction: + action: RobotAction = {} + for k, v in self.left_arm.get_action().items(): + action[f"left_{k}"] = v + for k, v in self.right_arm.get_action().items(): + action[f"right_{k}"] = v + return action + + @check_if_not_connected + def send_feedback(self, feedback: dict[str, float]) -> None: + left_fb = {k.removeprefix("left_"): v for k, v in feedback.items() if k.startswith("left_")} + right_fb = {k.removeprefix("right_"): v for k, v in feedback.items() if k.startswith("right_")} + if left_fb: + self.left_arm.send_feedback(left_fb) + if right_fb: + self.right_arm.send_feedback(right_fb) diff --git a/src/lerobot/teleoperators/bi_openarm_mini/config_bi_openarm_mini.py b/src/lerobot/teleoperators/bi_openarm_mini/config_bi_openarm_mini.py new file mode 100644 index 000000000..f021eaa49 --- /dev/null +++ b/src/lerobot/teleoperators/bi_openarm_mini/config_bi_openarm_mini.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig +from ..openarm_mini import OpenArmMiniConfigBase + + +@TeleoperatorConfig.register_subclass("bi_openarm_mini") +@dataclass +class BiOpenArmMiniConfig(TeleoperatorConfig): + """Configuration class for Bi OpenArm Mini teleoperators.""" + + left_arm_config: OpenArmMiniConfigBase + right_arm_config: OpenArmMiniConfigBase diff --git a/src/lerobot/teleoperators/bi_rebot_102_leader/__init__.py b/src/lerobot/teleoperators/bi_rebot_102_leader/__init__.py index c15cf76d8..9b26d6e59 100644 --- a/src/lerobot/teleoperators/bi_rebot_102_leader/__init__.py +++ b/src/lerobot/teleoperators/bi_rebot_102_leader/__init__.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .bi_rebot_102_leader import BiRebotArm102Leader -from .config_bi_rebot_102_leader import BiRebotArm102LeaderConfig +from .bi_rebot_102_leader import BiRebot102Leader +from .config_bi_rebot_102_leader import BiRebot102LeaderConfig -__all__ = ["BiRebotArm102Leader", "BiRebotArm102LeaderConfig"] +__all__ = ["BiRebot102Leader", "BiRebot102LeaderConfig"] diff --git a/src/lerobot/teleoperators/bi_rebot_102_leader/bi_rebot_102_leader.py b/src/lerobot/teleoperators/bi_rebot_102_leader/bi_rebot_102_leader.py index a4e5fd8c6..9da866b43 100644 --- a/src/lerobot/teleoperators/bi_rebot_102_leader/bi_rebot_102_leader.py +++ b/src/lerobot/teleoperators/bi_rebot_102_leader/bi_rebot_102_leader.py @@ -18,16 +18,17 @@ import logging from functools import cached_property from lerobot.types import RobotAction -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..rebot_102_leader import RebotArm102Leader, RebotArm102LeaderTeleopConfig from ..teleoperator import Teleoperator -from .config_bi_rebot_102_leader import BiRebotArm102LeaderConfig +from .config_bi_rebot_102_leader import BiRebot102LeaderConfig logger = logging.getLogger(__name__) -class BiRebotArm102Leader(Teleoperator): +class BiRebot102Leader(BimanualMixin, Teleoperator): """Bimanual Seeed Studio StarArm102 / reBot Arm 102 leader. Composes two single-arm :class:`RebotArm102Leader` instances. Action keys of @@ -35,10 +36,10 @@ class BiRebotArm102Leader(Teleoperator): leader can teleoperate a bimanual reBot B601 follower. """ - config_class = BiRebotArm102LeaderConfig + config_class = BiRebot102LeaderConfig name = "bi_rebot_102_leader" - def __init__(self, config: BiRebotArm102LeaderConfig): + def __init__(self, config: BiRebot102LeaderConfig): super().__init__(config) self.config = config @@ -76,27 +77,6 @@ class BiRebotArm102Leader(Teleoperator): def feedback_features(self) -> dict[str, type]: return {} - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - @check_if_not_connected def get_action(self) -> RobotAction: action_dict = {} @@ -106,8 +86,3 @@ class BiRebotArm102Leader(Teleoperator): def send_feedback(self, feedback: dict[str, float]) -> None: raise NotImplementedError("Feedback is not implemented for the reBot Arm 102 leader.") - - @check_if_not_connected - def disconnect(self) -> None: - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/teleoperators/bi_rebot_102_leader/config_bi_rebot_102_leader.py b/src/lerobot/teleoperators/bi_rebot_102_leader/config_bi_rebot_102_leader.py index 265ae26c1..2503b102c 100644 --- a/src/lerobot/teleoperators/bi_rebot_102_leader/config_bi_rebot_102_leader.py +++ b/src/lerobot/teleoperators/bi_rebot_102_leader/config_bi_rebot_102_leader.py @@ -22,7 +22,7 @@ from ..rebot_102_leader import RebotArm102LeaderConfig @TeleoperatorConfig.register_subclass("bi_rebot_102_leader") @dataclass -class BiRebotArm102LeaderConfig(TeleoperatorConfig): +class BiRebot102LeaderConfig(TeleoperatorConfig): """Configuration class for the bimanual reBot Arm 102 leader teleoperator.""" left_arm_config: RebotArm102LeaderConfig diff --git a/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py b/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py index f2e88d20a..6a45b4d91 100644 --- a/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py +++ b/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py @@ -17,7 +17,9 @@ import logging from functools import cached_property -from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected +from lerobot.types import RobotAction +from lerobot.utils.bimanual import BimanualMixin +from lerobot.utils.decorators import check_if_not_connected from ..so_leader import SOLeader, SOLeaderTeleopConfig from ..teleoperator import Teleoperator @@ -26,7 +28,7 @@ from .config_bi_so_leader import BiSOLeaderConfig logger = logging.getLogger(__name__) -class BiSOLeader(Teleoperator): +class BiSOLeader(BimanualMixin, Teleoperator): """ [Bimanual SO Leader Arms](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio """ @@ -67,33 +69,12 @@ class BiSOLeader(Teleoperator): def feedback_features(self) -> dict[str, type]: return {} - @property - def is_connected(self) -> bool: - return self.left_arm.is_connected and self.right_arm.is_connected - - @check_if_already_connected - def connect(self, calibrate: bool = True) -> None: - self.left_arm.connect(calibrate) - self.right_arm.connect(calibrate) - - @property - def is_calibrated(self) -> bool: - return self.left_arm.is_calibrated and self.right_arm.is_calibrated - - def calibrate(self) -> None: - self.left_arm.calibrate() - self.right_arm.calibrate() - - def configure(self) -> None: - self.left_arm.configure() - self.right_arm.configure() - def setup_motors(self) -> None: self.left_arm.setup_motors() self.right_arm.setup_motors() @check_if_not_connected - def get_action(self) -> dict[str, float]: + def get_action(self) -> RobotAction: action_dict = {} # Add "left_" prefix @@ -109,8 +90,3 @@ class BiSOLeader(Teleoperator): def send_feedback(self, feedback: dict[str, float]) -> None: # TODO: Implement force feedback raise NotImplementedError - - @check_if_not_connected - def disconnect(self) -> None: - self.left_arm.disconnect() - self.right_arm.disconnect() diff --git a/src/lerobot/teleoperators/openarm_mini/__init__.py b/src/lerobot/teleoperators/openarm_mini/__init__.py index 8620af1d7..18f4566f3 100644 --- a/src/lerobot/teleoperators/openarm_mini/__init__.py +++ b/src/lerobot/teleoperators/openarm_mini/__init__.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .config_openarm_mini import OpenArmMiniConfig +from .config_openarm_mini import OpenArmMiniConfig, OpenArmMiniConfigBase from .openarm_mini import OpenArmMini -__all__ = ["OpenArmMini", "OpenArmMiniConfig"] +__all__ = ["OpenArmMini", "OpenArmMiniConfig", "OpenArmMiniConfigBase"] diff --git a/src/lerobot/teleoperators/openarm_mini/config_openarm_mini.py b/src/lerobot/teleoperators/openarm_mini/config_openarm_mini.py index 7dc3e0212..74a8bf606 100644 --- a/src/lerobot/teleoperators/openarm_mini/config_openarm_mini.py +++ b/src/lerobot/teleoperators/openarm_mini/config_openarm_mini.py @@ -19,12 +19,21 @@ from dataclasses import dataclass from ..config import TeleoperatorConfig -@TeleoperatorConfig.register_subclass("openarm_mini") @dataclass -class OpenArmMiniConfig(TeleoperatorConfig): - """Configuration for OpenArm Mini teleoperator with Feetech motors (dual arms).""" +class OpenArmMiniConfigBase: + """Base configuration for the OpenArm Mini teleoperator (Feetech STS3215, 7DOF + gripper).""" - port_right: str = "/dev/ttyUSB0" - port_left: str = "/dev/ttyUSB1" + # Serial port for the Feetech bus (e.g., "/dev/ttyUSB0"). + port: str + + # Side of the arm: "left" or "right". Controls per-joint direction flips applied + # during readout. If `None`, no flipping is applied. + side: str | None = None use_degrees: bool = True + + +@TeleoperatorConfig.register_subclass("openarm_mini") +@dataclass +class OpenArmMiniConfig(TeleoperatorConfig, OpenArmMiniConfigBase): + pass diff --git a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py index 2df1dfcfe..9793d0b6f 100644 --- a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py +++ b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py @@ -31,22 +31,22 @@ from .config_openarm_mini import OpenArmMiniConfig logger = logging.getLogger(__name__) -# Motors whose direction is inverted during readout -RIGHT_MOTORS_TO_FLIP = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_7"] -LEFT_MOTORS_TO_FLIP = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"] +# Per-side motor direction flips applied during readout. +SIDE_MOTORS_TO_FLIP: dict[str, list[str]] = { + "left": ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"], + "right": ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_7"], +} -# Leader joint 6 maps to follower joint 7 and vice versa +# Leader joint 6 ↔ follower joint 7 (symmetric — its own inverse). JOINT_REMAP = {"joint_6": "joint_7", "joint_7": "joint_6"} -JOINT_REMAP_REVERSE = {"joint_7": "joint_6", "joint_6": "joint_7"} GRIPPER_TELEOP_TO_DEGREES = -0.65 class OpenArmMini(Teleoperator): - """ - OpenArm Mini Teleoperator with dual Feetech-based arms (8 motors per arm). + """OpenArm Mini single-arm teleoperator (Feetech STS3215, 7DOF + gripper). - Each arm has 7 joints plus a gripper, using Feetech STS3215 servos. + For the bimanual setup, see :class:`BiOpenArmMini` which composes two of these. """ config_class = OpenArmMiniConfig @@ -56,9 +56,12 @@ class OpenArmMini(Teleoperator): super().__init__(config) self.config = config + if config.side is not None and config.side not in SIDE_MOTORS_TO_FLIP: + raise ValueError(f"Invalid side '{config.side}'; expected 'left', 'right', or None.") + self._motors_to_flip: list[str] = SIDE_MOTORS_TO_FLIP.get(config.side, []) if config.side else [] + norm_mode_body = MotorNormMode.DEGREES - - motors_right = { + motors = { "joint_1": Motor(1, "sts3215", norm_mode_body), "joint_2": Motor(2, "sts3215", norm_mode_body), "joint_3": Motor(3, "sts3215", norm_mode_body), @@ -69,46 +72,15 @@ class OpenArmMini(Teleoperator): "gripper": Motor(8, "sts3215", MotorNormMode.RANGE_0_100), } - motors_left = { - "joint_1": Motor(1, "sts3215", norm_mode_body), - "joint_2": Motor(2, "sts3215", norm_mode_body), - "joint_3": Motor(3, "sts3215", norm_mode_body), - "joint_4": Motor(4, "sts3215", norm_mode_body), - "joint_5": Motor(5, "sts3215", norm_mode_body), - "joint_6": Motor(6, "sts3215", norm_mode_body), - "joint_7": Motor(7, "sts3215", norm_mode_body), - "gripper": Motor(8, "sts3215", MotorNormMode.RANGE_0_100), - } - - cal_right = { - k.replace("right_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("right_") - } - cal_left = { - k.replace("left_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("left_") - } - - self.bus_right = FeetechMotorsBus( - port=self.config.port_right, - motors=motors_right, - calibration=cal_right, - ) - - self.bus_left = FeetechMotorsBus( - port=self.config.port_left, - motors=motors_left, - calibration=cal_left, + self.bus = FeetechMotorsBus( + port=self.config.port, + motors=motors, + calibration=self.calibration, ) @property def action_features(self) -> dict[str, type]: - # Right first, then left — matches the robot (BiOpenArmFollower) ordering - # and the dataset feature names recorded during data collection. - features: dict[str, type] = {} - for motor in self.bus_right.motors: - features[f"right_{motor}.pos"] = float - for motor in self.bus_left.motors: - features[f"left_{motor}.pos"] = float - return features + return {f"{motor}.pos": float for motor in self.bus.motors} @property def feedback_features(self) -> dict[str, type]: @@ -116,14 +88,12 @@ class OpenArmMini(Teleoperator): @property def is_connected(self) -> bool: - return self.bus_right.is_connected and self.bus_left.is_connected + return self.bus.is_connected @check_if_already_connected def connect(self, calibrate: bool = True) -> None: - logger.info(f"Connecting right arm on {self.config.port_right}...") - self.bus_right.connect() - logger.info(f"Connecting left arm on {self.config.port_left}...") - self.bus_left.connect() + logger.info(f"Connecting arm on {self.config.port}...") + self.bus.connect() if calibrate: self.calibrate() @@ -133,14 +103,14 @@ class OpenArmMini(Teleoperator): @property def is_calibrated(self) -> bool: - return self.bus_right.is_calibrated and self.bus_left.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: """ - Run calibration procedure for OpenArm Mini. + Run calibration procedure for a single OpenArm Mini arm. 1. Disable torque - 2. Ask user to position arms in hanging position with grippers closed + 2. Ask user to position arm in hanging position with gripper closed 3. Set this as zero position via half-turn homing 4. Interactive gripper calibration (open/close positions) 5. Save calibration @@ -152,70 +122,51 @@ class OpenArmMini(Teleoperator): ) if user_input.strip().lower() != "c": logger.info(f"Using existing calibration for {self.id}") - cal_right = { - k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_") - } - cal_left = { - k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_") - } - self.bus_right.write_calibration(cal_right) - self.bus_left.write_calibration(cal_left) + self.bus.write_calibration(self.calibration) return logger.info(f"\nRunning calibration for {self}") - self._calibrate_arm("right", self.bus_right) - self._calibrate_arm("left", self.bus_left) + self.bus.disable_torque() - self._save_calibration() - print(f"\nCalibration complete and saved to {self.calibration_fpath}") + logger.info("Setting Phase to 12 for all motors...") + for motor in self.bus.motors: + self.bus.write("Phase", motor, 12) - def _calibrate_arm(self, arm_name: str, bus: FeetechMotorsBus) -> None: - """Calibrate a single arm with Feetech motors.""" - logger.info(f"\n=== Calibrating {arm_name.upper()} arm ===") - - bus.disable_torque() - - logger.info(f"Setting Phase to 12 for all motors in {arm_name.upper()} arm...") - for motor in bus.motors: - bus.write("Phase", motor, 12) - - for motor in bus.motors: - bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) input( - f"\nCalibration: Zero Position ({arm_name.upper()} arm)\n" + "\nCalibration: Zero Position\n" "Position the arm in the following configuration:\n" " - Arm hanging straight down\n" " - Gripper closed\n" "Press ENTER when ready..." ) - homing_offsets = bus.set_half_turn_homings() - logger.info(f"{arm_name.capitalize()} arm zero position set.") + homing_offsets = self.bus.set_half_turn_homings() + logger.info("Arm zero position set.") - print(f"\nSetting motor ranges for {arm_name.upper()} arm\n") + print("\nSetting motor ranges\n") if self.calibration is None: self.calibration = {} - motor_resolution = bus.model_resolution_table[list(bus.motors.values())[0].model] + motor_resolution = self.bus.model_resolution_table[list(self.bus.motors.values())[0].model] max_res = motor_resolution - 1 - for motor_name, motor in bus.motors.items(): - prefixed_name = f"{arm_name}_{motor_name}" - + for motor_name, motor in self.bus.motors.items(): if motor_name == "gripper": input( - f"\nGripper Calibration ({arm_name.upper()} arm)\n" - f"Step 1: CLOSE the gripper fully\n" - f"Press ENTER when gripper is closed..." + "\nGripper Calibration\n" + "Step 1: CLOSE the gripper fully\n" + "Press ENTER when gripper is closed..." ) - closed_pos = bus.read("Present_Position", motor_name, normalize=False) + closed_pos = self.bus.read("Present_Position", motor_name, normalize=False) logger.info(f" Gripper closed position recorded: {closed_pos}") input("\nStep 2: OPEN the gripper fully\nPress ENTER when gripper is fully open...") - open_pos = bus.read("Present_Position", motor_name, normalize=False) + open_pos = self.bus.read("Present_Position", motor_name, normalize=False) logger.info(f" Gripper open position recorded: {open_pos}") if closed_pos < open_pos: @@ -228,16 +179,16 @@ class OpenArmMini(Teleoperator): drive_mode = 1 logger.info( - f" {prefixed_name}: range set to [{range_min}, {range_max}] " + f" {motor_name}: range set to [{range_min}, {range_max}] " f"(0=closed, 100=open, drive_mode={drive_mode})" ) else: range_min = 0 range_max = max_res drive_mode = 0 - logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)") + logger.info(f" {motor_name}: range set to [0, {max_res}] (full motor range)") - self.calibration[prefixed_name] = MotorCalibration( + self.calibration[motor_name] = MotorCalibration( id=motor.id, drive_mode=drive_mode, homing_offset=homing_offsets[motor_name], @@ -245,108 +196,68 @@ class OpenArmMini(Teleoperator): range_max=range_max, ) - cal_for_bus = { - k.replace(f"{arm_name}_", ""): v - for k, v in self.calibration.items() - if k.startswith(f"{arm_name}_") - } - bus.write_calibration(cal_for_bus) + self.bus.write_calibration(self.calibration) + self._save_calibration() + print(f"\nCalibration complete and saved to {self.calibration_fpath}") def configure(self) -> None: - self.bus_right.disable_torque() - self.bus_right.configure_motors() - for motor in self.bus_right.motors: - self.bus_right.write("Operating_Mode", motor, OperatingMode.POSITION.value) - - self.bus_left.disable_torque() - self.bus_left.configure_motors() - for motor in self.bus_left.motors: - self.bus_left.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.disable_torque() + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) def setup_motors(self) -> None: - print("\nSetting up RIGHT arm motors...") - for motor in reversed(self.bus_right.motors): - input(f"Connect the controller board to the RIGHT '{motor}' motor only and press enter.") - self.bus_right.setup_motor(motor) - print(f"RIGHT '{motor}' motor id set to {self.bus_right.motors[motor].id}") - - print("\nSetting up LEFT arm motors...") - for motor in reversed(self.bus_left.motors): - input(f"Connect the controller board to the LEFT '{motor}' motor only and press enter.") - self.bus_left.setup_motor(motor) - print(f"LEFT '{motor}' motor id set to {self.bus_left.motors[motor].id}") + for motor in reversed(self.bus.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected def get_action(self) -> RobotAction: - """Get current action from both arms (read positions from all motors).""" + """Get current action (read positions from all motors).""" start = time.perf_counter() - right_positions = self.bus_right.sync_read("Present_Position") - left_positions = self.bus_left.sync_read("Present_Position") + positions = self.bus.sync_read("Present_Position") - # Right first, then left — matches the robot (BiOpenArmFollower) ordering - # and the dataset feature names recorded during data collection. # Joint 6↔7 remap: leader joint_6 → follower joint_7 and vice versa. + # Per-side direction flip is applied based on the configured `side`. action: dict[str, Any] = {} - for motor, val in right_positions.items(): + for motor, val in positions.items(): target = JOINT_REMAP.get(motor, motor) if motor == "gripper": # Convert gripper from teleop 0-100 to openarms degrees: 0→0°, 100→-65° - action[f"right_{target}.pos"] = val * GRIPPER_TELEOP_TO_DEGREES + action[f"{target}.pos"] = val * GRIPPER_TELEOP_TO_DEGREES else: - action[f"right_{target}.pos"] = -val if motor in RIGHT_MOTORS_TO_FLIP else val - for motor, val in left_positions.items(): - target = JOINT_REMAP.get(motor, motor) - if motor == "gripper": - action[f"left_{target}.pos"] = val * GRIPPER_TELEOP_TO_DEGREES - else: - action[f"left_{target}.pos"] = -val if motor in LEFT_MOTORS_TO_FLIP else val + action[f"{target}.pos"] = -val if motor in self._motors_to_flip else val dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action def enable_torque(self) -> None: - """Enable torque on both arms for position control.""" - self.bus_right.enable_torque() - self.bus_left.enable_torque() + self.bus.enable_torque() def disable_torque(self) -> None: - """Disable torque on both arms for free movement.""" - self.bus_right.disable_torque() - self.bus_left.disable_torque() + self.bus.disable_torque() def write_goal_positions(self, positions: dict[str, float]) -> None: """Write goal positions to motors (inverse of get_action flip/gripper/remap logic).""" - right_goals: dict[str, float] = {} - left_goals: dict[str, float] = {} - + goals: dict[str, float] = {} for key, val in positions.items(): if not key.endswith(".pos"): continue - motor_name = key.removesuffix(".pos") - if motor_name.startswith("right_"): - base = motor_name.removeprefix("right_") - # Reverse remap: follower joint_7 → leader joint_6 and vice versa - target = JOINT_REMAP_REVERSE.get(base, base) - if base == "gripper": - # Convert robot degrees to teleop 0-100: 0°→0, -65°→100 - right_goals[target] = val / GRIPPER_TELEOP_TO_DEGREES - else: - # Un-flip using the ORIGINAL motor name (target = leader motor) - right_goals[target] = -val if target in RIGHT_MOTORS_TO_FLIP else val - elif motor_name.startswith("left_"): - base = motor_name.removeprefix("left_") - target = JOINT_REMAP_REVERSE.get(base, base) - if base == "gripper": - left_goals[target] = val / GRIPPER_TELEOP_TO_DEGREES - else: - left_goals[target] = -val if target in LEFT_MOTORS_TO_FLIP else val + base = key.removesuffix(".pos") + # JOINT_REMAP is symmetric (its own inverse). + target = JOINT_REMAP.get(base, base) + if base == "gripper": + # Convert robot degrees to teleop 0-100: 0°→0, -65°→100 + goals[target] = val / GRIPPER_TELEOP_TO_DEGREES + else: + # Un-flip using the ORIGINAL motor name (target = leader motor) + goals[target] = -val if target in self._motors_to_flip else val - if right_goals: - self.bus_right.sync_write("Goal_Position", right_goals) - if left_goals: - self.bus_left.sync_write("Goal_Position", left_goals) + if goals: + self.bus.sync_write("Goal_Position", goals) @check_if_not_connected def send_feedback(self, feedback: dict[str, float]) -> None: @@ -354,6 +265,5 @@ class OpenArmMini(Teleoperator): @check_if_not_connected def disconnect(self) -> None: - self.bus_right.disconnect() - self.bus_left.disconnect() + self.bus.disconnect() logger.info(f"{self} disconnected.") diff --git a/src/lerobot/teleoperators/utils.py b/src/lerobot/teleoperators/utils.py index 5a6d4ecde..0f0eaf07f 100644 --- a/src/lerobot/teleoperators/utils.py +++ b/src/lerobot/teleoperators/utils.py @@ -99,14 +99,18 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator": from .openarm_mini import OpenArmMini return OpenArmMini(config) + elif config.type == "bi_openarm_mini": + from .bi_openarm_mini import BiOpenArmMini + + return BiOpenArmMini(config) elif config.type == "rebot_102_leader": from .rebot_102_leader import RebotArm102Leader return RebotArm102Leader(config) elif config.type == "bi_rebot_102_leader": - from .bi_rebot_102_leader import BiRebotArm102Leader + from .bi_rebot_102_leader import BiRebot102Leader - return BiRebotArm102Leader(config) + return BiRebot102Leader(config) else: try: return cast("Teleoperator", make_device_from_device_class(config)) diff --git a/src/lerobot/utils/bimanual.py b/src/lerobot/utils/bimanual.py new file mode 100644 index 000000000..1e212340b --- /dev/null +++ b/src/lerobot/utils/bimanual.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any + +from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected + + +class BimanualMixin: + """Lifecycle delegation for bimanual robots and teleoperators. + + Concrete subclasses must populate ``self.left_arm`` and ``self.right_arm`` in + their own ``__init__``. They retain ownership of feature dicts and the + data-routing methods (``get_action`` / ``send_action`` / ``get_observation`` / + ``send_feedback``), which vary per-embodiment. + + Inherit before the ``Robot`` / ``Teleoperator`` base so the mixin's methods + take precedence in the MRO:: + + class BiFooFollower(BimanualMixin, Robot): ... + """ + + left_arm: Any + right_arm: Any + + @property + def is_connected(self) -> bool: + return self.left_arm.is_connected and self.right_arm.is_connected + + @property + def is_calibrated(self) -> bool: + return self.left_arm.is_calibrated and self.right_arm.is_calibrated + + @check_if_already_connected + def connect(self, calibrate: bool = True) -> None: + self.left_arm.connect(calibrate) + self.right_arm.connect(calibrate) + + def calibrate(self) -> None: + self.left_arm.calibrate() + self.right_arm.calibrate() + + def configure(self) -> None: + self.left_arm.configure() + self.right_arm.configure() + + @check_if_not_connected + def disconnect(self) -> None: + self.left_arm.disconnect() + self.right_arm.disconnect() diff --git a/tests/teleoperators/test_rebot_102_leader.py b/tests/teleoperators/test_rebot_102_leader.py index bea10e131..aaf986a9b 100644 --- a/tests/teleoperators/test_rebot_102_leader.py +++ b/tests/teleoperators/test_rebot_102_leader.py @@ -18,7 +18,7 @@ from unittest.mock import MagicMock, patch import pytest -from lerobot.teleoperators.bi_rebot_102_leader import BiRebotArm102Leader, BiRebotArm102LeaderConfig +from lerobot.teleoperators.bi_rebot_102_leader import BiRebot102Leader, BiRebot102LeaderConfig from lerobot.teleoperators.rebot_102_leader import ( RebotArm102Leader, RebotArm102LeaderConfig, @@ -91,11 +91,11 @@ def test_send_feedback_not_implemented(leader): def test_bimanual_prefixes_features(): with patch(f"{_MODULE}.require_package", lambda *a, **kw: None): - cfg = BiRebotArm102LeaderConfig( + cfg = BiRebot102LeaderConfig( left_arm_config=RebotArm102LeaderConfig(port="/dev/null0"), right_arm_config=RebotArm102LeaderConfig(port="/dev/null1"), ) - teleop = BiRebotArm102Leader(cfg) + teleop = BiRebot102Leader(cfg) assert any(k.startswith("left_") for k in teleop.action_features) assert any(k.startswith("right_") for k in teleop.action_features) assert "left_gripper.pos" in teleop.action_features