diff --git a/src/lerobot/scripts/lerobot_calibrate.py b/src/lerobot/scripts/lerobot_calibrate.py index e43736954..0a9cbee66 100644 --- a/src/lerobot/scripts/lerobot_calibrate.py +++ b/src/lerobot/scripts/lerobot_calibrate.py @@ -54,6 +54,7 @@ from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, homunculus, diff --git a/src/lerobot/scripts/lerobot_find_joint_limits.py b/src/lerobot/scripts/lerobot_find_joint_limits.py index 5b9166a2e..b269eeeff 100644 --- a/src/lerobot/scripts/lerobot_find_joint_limits.py +++ b/src/lerobot/scripts/lerobot_find_joint_limits.py @@ -57,6 +57,7 @@ from lerobot.robots import ( # noqa: F401 from lerobot.teleoperators import ( # noqa: F401 TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, gamepad, diff --git a/src/lerobot/scripts/lerobot_record.py b/src/lerobot/scripts/lerobot_record.py index c411ebf9e..0deb54b90 100644 --- a/src/lerobot/scripts/lerobot_record.py +++ b/src/lerobot/scripts/lerobot_record.py @@ -137,6 +137,7 @@ from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, homunculus, diff --git a/src/lerobot/scripts/lerobot_rollout.py b/src/lerobot/scripts/lerobot_rollout.py index 82a858b9a..8515c4cc9 100644 --- a/src/lerobot/scripts/lerobot_rollout.py +++ b/src/lerobot/scripts/lerobot_rollout.py @@ -174,6 +174,7 @@ from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, homunculus, diff --git a/src/lerobot/scripts/lerobot_setup_motors.py b/src/lerobot/scripts/lerobot_setup_motors.py index 69ebcf5fa..f86c31af2 100644 --- a/src/lerobot/scripts/lerobot_setup_motors.py +++ b/src/lerobot/scripts/lerobot_setup_motors.py @@ -41,6 +41,7 @@ from lerobot.robots import ( # noqa: F401 ) from lerobot.teleoperators import ( # noqa: F401 TeleoperatorConfig, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, koch_leader, diff --git a/src/lerobot/scripts/lerobot_teleoperate.py b/src/lerobot/scripts/lerobot_teleoperate.py index 2ff02bda0..5d806200d 100644 --- a/src/lerobot/scripts/lerobot_teleoperate.py +++ b/src/lerobot/scripts/lerobot_teleoperate.py @@ -89,6 +89,7 @@ from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, bi_openarm_leader, + bi_openarm_mini, bi_rebot_102_leader, bi_so_leader, gamepad, diff --git a/src/lerobot/teleoperators/bi_openarm_mini/__init__.py b/src/lerobot/teleoperators/bi_openarm_mini/__init__.py new file mode 100644 index 000000000..ec1a2dbc8 --- /dev/null +++ b/src/lerobot/teleoperators/bi_openarm_mini/__init__.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .bi_openarm_mini import BiOpenArmMini +from .config_bi_openarm_mini import BiOpenArmMiniConfig + +__all__ = ["BiOpenArmMini", "BiOpenArmMiniConfig"] diff --git a/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py b/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py new file mode 100644 index 000000000..3ec6073b9 --- /dev/null +++ b/src/lerobot/teleoperators/bi_openarm_mini/bi_openarm_mini.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from functools import cached_property + +from lerobot.types import RobotAction +from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected + +from ..openarm_mini import OpenArmMini, OpenArmMiniConfig +from ..teleoperator import Teleoperator +from .config_bi_openarm_mini import BiOpenArmMiniConfig + +logger = logging.getLogger(__name__) + + +class BiOpenArmMini(Teleoperator): + """Bimanual OpenArm Mini teleoperator. + + Composes two single-arm :class:`OpenArmMini` instances. Action and feedback + keys of each arm are namespaced with a ``left_`` / ``right_`` prefix, so a + bimanual leader can teleoperate a bimanual OpenArm follower. + """ + + config_class = BiOpenArmMiniConfig + name = "bi_openarm_mini" + + def __init__(self, config: BiOpenArmMiniConfig): + super().__init__(config) + self.config = config + + # `side` is forced to match left/right regardless of what the user passed + # on the per-arm base config — the bimanual wrapper owns the side semantics. + left_arm_config = OpenArmMiniConfig( + id=f"{config.id}_left" if config.id else None, + calibration_dir=config.calibration_dir, + port=config.left_arm_config.port, + side="left", + use_degrees=config.left_arm_config.use_degrees, + ) + + right_arm_config = OpenArmMiniConfig( + id=f"{config.id}_right" if config.id else None, + calibration_dir=config.calibration_dir, + port=config.right_arm_config.port, + side="right", + use_degrees=config.right_arm_config.use_degrees, + ) + + self.left_arm = OpenArmMini(left_arm_config) + self.right_arm = OpenArmMini(right_arm_config) + + @cached_property + def action_features(self) -> dict[str, type]: + return { + **{f"left_{k}": v for k, v in self.left_arm.action_features.items()}, + **{f"right_{k}": v for k, v in self.right_arm.action_features.items()}, + } + + @cached_property + def feedback_features(self) -> dict[str, type]: + return { + **{f"left_{k}": v for k, v in self.left_arm.feedback_features.items()}, + **{f"right_{k}": v for k, v in self.right_arm.feedback_features.items()}, + } + + @property + def is_connected(self) -> bool: + return self.left_arm.is_connected and self.right_arm.is_connected + + @check_if_already_connected + def connect(self, calibrate: bool = True) -> None: + self.left_arm.connect(calibrate) + self.right_arm.connect(calibrate) + + @property + def is_calibrated(self) -> bool: + return self.left_arm.is_calibrated and self.right_arm.is_calibrated + + def calibrate(self) -> None: + self.left_arm.calibrate() + self.right_arm.calibrate() + + def configure(self) -> None: + self.left_arm.configure() + self.right_arm.configure() + + def setup_motors(self) -> None: + self.left_arm.setup_motors() + self.right_arm.setup_motors() + + @check_if_not_connected + def get_action(self) -> RobotAction: + action: RobotAction = {} + for k, v in self.left_arm.get_action().items(): + action[f"left_{k}"] = v + for k, v in self.right_arm.get_action().items(): + action[f"right_{k}"] = v + return action + + @check_if_not_connected + def send_feedback(self, feedback: dict[str, float]) -> None: + left_fb = {k.removeprefix("left_"): v for k, v in feedback.items() if k.startswith("left_")} + right_fb = {k.removeprefix("right_"): v for k, v in feedback.items() if k.startswith("right_")} + if left_fb: + self.left_arm.send_feedback(left_fb) + if right_fb: + self.right_arm.send_feedback(right_fb) + + @check_if_not_connected + def disconnect(self) -> None: + self.left_arm.disconnect() + self.right_arm.disconnect() diff --git a/src/lerobot/teleoperators/bi_openarm_mini/config_bi_openarm_mini.py b/src/lerobot/teleoperators/bi_openarm_mini/config_bi_openarm_mini.py new file mode 100644 index 000000000..f021eaa49 --- /dev/null +++ b/src/lerobot/teleoperators/bi_openarm_mini/config_bi_openarm_mini.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig +from ..openarm_mini import OpenArmMiniConfigBase + + +@TeleoperatorConfig.register_subclass("bi_openarm_mini") +@dataclass +class BiOpenArmMiniConfig(TeleoperatorConfig): + """Configuration class for Bi OpenArm Mini teleoperators.""" + + left_arm_config: OpenArmMiniConfigBase + right_arm_config: OpenArmMiniConfigBase diff --git a/src/lerobot/teleoperators/openarm_mini/__init__.py b/src/lerobot/teleoperators/openarm_mini/__init__.py index 8620af1d7..18f4566f3 100644 --- a/src/lerobot/teleoperators/openarm_mini/__init__.py +++ b/src/lerobot/teleoperators/openarm_mini/__init__.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .config_openarm_mini import OpenArmMiniConfig +from .config_openarm_mini import OpenArmMiniConfig, OpenArmMiniConfigBase from .openarm_mini import OpenArmMini -__all__ = ["OpenArmMini", "OpenArmMiniConfig"] +__all__ = ["OpenArmMini", "OpenArmMiniConfig", "OpenArmMiniConfigBase"] diff --git a/src/lerobot/teleoperators/openarm_mini/config_openarm_mini.py b/src/lerobot/teleoperators/openarm_mini/config_openarm_mini.py index 7dc3e0212..74a8bf606 100644 --- a/src/lerobot/teleoperators/openarm_mini/config_openarm_mini.py +++ b/src/lerobot/teleoperators/openarm_mini/config_openarm_mini.py @@ -19,12 +19,21 @@ from dataclasses import dataclass from ..config import TeleoperatorConfig -@TeleoperatorConfig.register_subclass("openarm_mini") @dataclass -class OpenArmMiniConfig(TeleoperatorConfig): - """Configuration for OpenArm Mini teleoperator with Feetech motors (dual arms).""" +class OpenArmMiniConfigBase: + """Base configuration for the OpenArm Mini teleoperator (Feetech STS3215, 7DOF + gripper).""" - port_right: str = "/dev/ttyUSB0" - port_left: str = "/dev/ttyUSB1" + # Serial port for the Feetech bus (e.g., "/dev/ttyUSB0"). + port: str + + # Side of the arm: "left" or "right". Controls per-joint direction flips applied + # during readout. If `None`, no flipping is applied. + side: str | None = None use_degrees: bool = True + + +@TeleoperatorConfig.register_subclass("openarm_mini") +@dataclass +class OpenArmMiniConfig(TeleoperatorConfig, OpenArmMiniConfigBase): + pass diff --git a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py index 2df1dfcfe..9793d0b6f 100644 --- a/src/lerobot/teleoperators/openarm_mini/openarm_mini.py +++ b/src/lerobot/teleoperators/openarm_mini/openarm_mini.py @@ -31,22 +31,22 @@ from .config_openarm_mini import OpenArmMiniConfig logger = logging.getLogger(__name__) -# Motors whose direction is inverted during readout -RIGHT_MOTORS_TO_FLIP = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_7"] -LEFT_MOTORS_TO_FLIP = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"] +# Per-side motor direction flips applied during readout. +SIDE_MOTORS_TO_FLIP: dict[str, list[str]] = { + "left": ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"], + "right": ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_7"], +} -# Leader joint 6 maps to follower joint 7 and vice versa +# Leader joint 6 ↔ follower joint 7 (symmetric — its own inverse). JOINT_REMAP = {"joint_6": "joint_7", "joint_7": "joint_6"} -JOINT_REMAP_REVERSE = {"joint_7": "joint_6", "joint_6": "joint_7"} GRIPPER_TELEOP_TO_DEGREES = -0.65 class OpenArmMini(Teleoperator): - """ - OpenArm Mini Teleoperator with dual Feetech-based arms (8 motors per arm). + """OpenArm Mini single-arm teleoperator (Feetech STS3215, 7DOF + gripper). - Each arm has 7 joints plus a gripper, using Feetech STS3215 servos. + For the bimanual setup, see :class:`BiOpenArmMini` which composes two of these. """ config_class = OpenArmMiniConfig @@ -56,9 +56,12 @@ class OpenArmMini(Teleoperator): super().__init__(config) self.config = config + if config.side is not None and config.side not in SIDE_MOTORS_TO_FLIP: + raise ValueError(f"Invalid side '{config.side}'; expected 'left', 'right', or None.") + self._motors_to_flip: list[str] = SIDE_MOTORS_TO_FLIP.get(config.side, []) if config.side else [] + norm_mode_body = MotorNormMode.DEGREES - - motors_right = { + motors = { "joint_1": Motor(1, "sts3215", norm_mode_body), "joint_2": Motor(2, "sts3215", norm_mode_body), "joint_3": Motor(3, "sts3215", norm_mode_body), @@ -69,46 +72,15 @@ class OpenArmMini(Teleoperator): "gripper": Motor(8, "sts3215", MotorNormMode.RANGE_0_100), } - motors_left = { - "joint_1": Motor(1, "sts3215", norm_mode_body), - "joint_2": Motor(2, "sts3215", norm_mode_body), - "joint_3": Motor(3, "sts3215", norm_mode_body), - "joint_4": Motor(4, "sts3215", norm_mode_body), - "joint_5": Motor(5, "sts3215", norm_mode_body), - "joint_6": Motor(6, "sts3215", norm_mode_body), - "joint_7": Motor(7, "sts3215", norm_mode_body), - "gripper": Motor(8, "sts3215", MotorNormMode.RANGE_0_100), - } - - cal_right = { - k.replace("right_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("right_") - } - cal_left = { - k.replace("left_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("left_") - } - - self.bus_right = FeetechMotorsBus( - port=self.config.port_right, - motors=motors_right, - calibration=cal_right, - ) - - self.bus_left = FeetechMotorsBus( - port=self.config.port_left, - motors=motors_left, - calibration=cal_left, + self.bus = FeetechMotorsBus( + port=self.config.port, + motors=motors, + calibration=self.calibration, ) @property def action_features(self) -> dict[str, type]: - # Right first, then left — matches the robot (BiOpenArmFollower) ordering - # and the dataset feature names recorded during data collection. - features: dict[str, type] = {} - for motor in self.bus_right.motors: - features[f"right_{motor}.pos"] = float - for motor in self.bus_left.motors: - features[f"left_{motor}.pos"] = float - return features + return {f"{motor}.pos": float for motor in self.bus.motors} @property def feedback_features(self) -> dict[str, type]: @@ -116,14 +88,12 @@ class OpenArmMini(Teleoperator): @property def is_connected(self) -> bool: - return self.bus_right.is_connected and self.bus_left.is_connected + return self.bus.is_connected @check_if_already_connected def connect(self, calibrate: bool = True) -> None: - logger.info(f"Connecting right arm on {self.config.port_right}...") - self.bus_right.connect() - logger.info(f"Connecting left arm on {self.config.port_left}...") - self.bus_left.connect() + logger.info(f"Connecting arm on {self.config.port}...") + self.bus.connect() if calibrate: self.calibrate() @@ -133,14 +103,14 @@ class OpenArmMini(Teleoperator): @property def is_calibrated(self) -> bool: - return self.bus_right.is_calibrated and self.bus_left.is_calibrated + return self.bus.is_calibrated def calibrate(self) -> None: """ - Run calibration procedure for OpenArm Mini. + Run calibration procedure for a single OpenArm Mini arm. 1. Disable torque - 2. Ask user to position arms in hanging position with grippers closed + 2. Ask user to position arm in hanging position with gripper closed 3. Set this as zero position via half-turn homing 4. Interactive gripper calibration (open/close positions) 5. Save calibration @@ -152,70 +122,51 @@ class OpenArmMini(Teleoperator): ) if user_input.strip().lower() != "c": logger.info(f"Using existing calibration for {self.id}") - cal_right = { - k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_") - } - cal_left = { - k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_") - } - self.bus_right.write_calibration(cal_right) - self.bus_left.write_calibration(cal_left) + self.bus.write_calibration(self.calibration) return logger.info(f"\nRunning calibration for {self}") - self._calibrate_arm("right", self.bus_right) - self._calibrate_arm("left", self.bus_left) + self.bus.disable_torque() - self._save_calibration() - print(f"\nCalibration complete and saved to {self.calibration_fpath}") + logger.info("Setting Phase to 12 for all motors...") + for motor in self.bus.motors: + self.bus.write("Phase", motor, 12) - def _calibrate_arm(self, arm_name: str, bus: FeetechMotorsBus) -> None: - """Calibrate a single arm with Feetech motors.""" - logger.info(f"\n=== Calibrating {arm_name.upper()} arm ===") - - bus.disable_torque() - - logger.info(f"Setting Phase to 12 for all motors in {arm_name.upper()} arm...") - for motor in bus.motors: - bus.write("Phase", motor, 12) - - for motor in bus.motors: - bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) input( - f"\nCalibration: Zero Position ({arm_name.upper()} arm)\n" + "\nCalibration: Zero Position\n" "Position the arm in the following configuration:\n" " - Arm hanging straight down\n" " - Gripper closed\n" "Press ENTER when ready..." ) - homing_offsets = bus.set_half_turn_homings() - logger.info(f"{arm_name.capitalize()} arm zero position set.") + homing_offsets = self.bus.set_half_turn_homings() + logger.info("Arm zero position set.") - print(f"\nSetting motor ranges for {arm_name.upper()} arm\n") + print("\nSetting motor ranges\n") if self.calibration is None: self.calibration = {} - motor_resolution = bus.model_resolution_table[list(bus.motors.values())[0].model] + motor_resolution = self.bus.model_resolution_table[list(self.bus.motors.values())[0].model] max_res = motor_resolution - 1 - for motor_name, motor in bus.motors.items(): - prefixed_name = f"{arm_name}_{motor_name}" - + for motor_name, motor in self.bus.motors.items(): if motor_name == "gripper": input( - f"\nGripper Calibration ({arm_name.upper()} arm)\n" - f"Step 1: CLOSE the gripper fully\n" - f"Press ENTER when gripper is closed..." + "\nGripper Calibration\n" + "Step 1: CLOSE the gripper fully\n" + "Press ENTER when gripper is closed..." ) - closed_pos = bus.read("Present_Position", motor_name, normalize=False) + closed_pos = self.bus.read("Present_Position", motor_name, normalize=False) logger.info(f" Gripper closed position recorded: {closed_pos}") input("\nStep 2: OPEN the gripper fully\nPress ENTER when gripper is fully open...") - open_pos = bus.read("Present_Position", motor_name, normalize=False) + open_pos = self.bus.read("Present_Position", motor_name, normalize=False) logger.info(f" Gripper open position recorded: {open_pos}") if closed_pos < open_pos: @@ -228,16 +179,16 @@ class OpenArmMini(Teleoperator): drive_mode = 1 logger.info( - f" {prefixed_name}: range set to [{range_min}, {range_max}] " + f" {motor_name}: range set to [{range_min}, {range_max}] " f"(0=closed, 100=open, drive_mode={drive_mode})" ) else: range_min = 0 range_max = max_res drive_mode = 0 - logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)") + logger.info(f" {motor_name}: range set to [0, {max_res}] (full motor range)") - self.calibration[prefixed_name] = MotorCalibration( + self.calibration[motor_name] = MotorCalibration( id=motor.id, drive_mode=drive_mode, homing_offset=homing_offsets[motor_name], @@ -245,108 +196,68 @@ class OpenArmMini(Teleoperator): range_max=range_max, ) - cal_for_bus = { - k.replace(f"{arm_name}_", ""): v - for k, v in self.calibration.items() - if k.startswith(f"{arm_name}_") - } - bus.write_calibration(cal_for_bus) + self.bus.write_calibration(self.calibration) + self._save_calibration() + print(f"\nCalibration complete and saved to {self.calibration_fpath}") def configure(self) -> None: - self.bus_right.disable_torque() - self.bus_right.configure_motors() - for motor in self.bus_right.motors: - self.bus_right.write("Operating_Mode", motor, OperatingMode.POSITION.value) - - self.bus_left.disable_torque() - self.bus_left.configure_motors() - for motor in self.bus_left.motors: - self.bus_left.write("Operating_Mode", motor, OperatingMode.POSITION.value) + self.bus.disable_torque() + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) def setup_motors(self) -> None: - print("\nSetting up RIGHT arm motors...") - for motor in reversed(self.bus_right.motors): - input(f"Connect the controller board to the RIGHT '{motor}' motor only and press enter.") - self.bus_right.setup_motor(motor) - print(f"RIGHT '{motor}' motor id set to {self.bus_right.motors[motor].id}") - - print("\nSetting up LEFT arm motors...") - for motor in reversed(self.bus_left.motors): - input(f"Connect the controller board to the LEFT '{motor}' motor only and press enter.") - self.bus_left.setup_motor(motor) - print(f"LEFT '{motor}' motor id set to {self.bus_left.motors[motor].id}") + for motor in reversed(self.bus.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @check_if_not_connected def get_action(self) -> RobotAction: - """Get current action from both arms (read positions from all motors).""" + """Get current action (read positions from all motors).""" start = time.perf_counter() - right_positions = self.bus_right.sync_read("Present_Position") - left_positions = self.bus_left.sync_read("Present_Position") + positions = self.bus.sync_read("Present_Position") - # Right first, then left — matches the robot (BiOpenArmFollower) ordering - # and the dataset feature names recorded during data collection. # Joint 6↔7 remap: leader joint_6 → follower joint_7 and vice versa. + # Per-side direction flip is applied based on the configured `side`. action: dict[str, Any] = {} - for motor, val in right_positions.items(): + for motor, val in positions.items(): target = JOINT_REMAP.get(motor, motor) if motor == "gripper": # Convert gripper from teleop 0-100 to openarms degrees: 0→0°, 100→-65° - action[f"right_{target}.pos"] = val * GRIPPER_TELEOP_TO_DEGREES + action[f"{target}.pos"] = val * GRIPPER_TELEOP_TO_DEGREES else: - action[f"right_{target}.pos"] = -val if motor in RIGHT_MOTORS_TO_FLIP else val - for motor, val in left_positions.items(): - target = JOINT_REMAP.get(motor, motor) - if motor == "gripper": - action[f"left_{target}.pos"] = val * GRIPPER_TELEOP_TO_DEGREES - else: - action[f"left_{target}.pos"] = -val if motor in LEFT_MOTORS_TO_FLIP else val + action[f"{target}.pos"] = -val if motor in self._motors_to_flip else val dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action def enable_torque(self) -> None: - """Enable torque on both arms for position control.""" - self.bus_right.enable_torque() - self.bus_left.enable_torque() + self.bus.enable_torque() def disable_torque(self) -> None: - """Disable torque on both arms for free movement.""" - self.bus_right.disable_torque() - self.bus_left.disable_torque() + self.bus.disable_torque() def write_goal_positions(self, positions: dict[str, float]) -> None: """Write goal positions to motors (inverse of get_action flip/gripper/remap logic).""" - right_goals: dict[str, float] = {} - left_goals: dict[str, float] = {} - + goals: dict[str, float] = {} for key, val in positions.items(): if not key.endswith(".pos"): continue - motor_name = key.removesuffix(".pos") - if motor_name.startswith("right_"): - base = motor_name.removeprefix("right_") - # Reverse remap: follower joint_7 → leader joint_6 and vice versa - target = JOINT_REMAP_REVERSE.get(base, base) - if base == "gripper": - # Convert robot degrees to teleop 0-100: 0°→0, -65°→100 - right_goals[target] = val / GRIPPER_TELEOP_TO_DEGREES - else: - # Un-flip using the ORIGINAL motor name (target = leader motor) - right_goals[target] = -val if target in RIGHT_MOTORS_TO_FLIP else val - elif motor_name.startswith("left_"): - base = motor_name.removeprefix("left_") - target = JOINT_REMAP_REVERSE.get(base, base) - if base == "gripper": - left_goals[target] = val / GRIPPER_TELEOP_TO_DEGREES - else: - left_goals[target] = -val if target in LEFT_MOTORS_TO_FLIP else val + base = key.removesuffix(".pos") + # JOINT_REMAP is symmetric (its own inverse). + target = JOINT_REMAP.get(base, base) + if base == "gripper": + # Convert robot degrees to teleop 0-100: 0°→0, -65°→100 + goals[target] = val / GRIPPER_TELEOP_TO_DEGREES + else: + # Un-flip using the ORIGINAL motor name (target = leader motor) + goals[target] = -val if target in self._motors_to_flip else val - if right_goals: - self.bus_right.sync_write("Goal_Position", right_goals) - if left_goals: - self.bus_left.sync_write("Goal_Position", left_goals) + if goals: + self.bus.sync_write("Goal_Position", goals) @check_if_not_connected def send_feedback(self, feedback: dict[str, float]) -> None: @@ -354,6 +265,5 @@ class OpenArmMini(Teleoperator): @check_if_not_connected def disconnect(self) -> None: - self.bus_right.disconnect() - self.bus_left.disconnect() + self.bus.disconnect() logger.info(f"{self} disconnected.") diff --git a/src/lerobot/teleoperators/utils.py b/src/lerobot/teleoperators/utils.py index f2711e182..0f0eaf07f 100644 --- a/src/lerobot/teleoperators/utils.py +++ b/src/lerobot/teleoperators/utils.py @@ -99,6 +99,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator": from .openarm_mini import OpenArmMini return OpenArmMini(config) + elif config.type == "bi_openarm_mini": + from .bi_openarm_mini import BiOpenArmMini + + return BiOpenArmMini(config) elif config.type == "rebot_102_leader": from .rebot_102_leader import RebotArm102Leader