From cff530a17a681aee3910236e3236d869667090f8 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Tue, 11 Nov 2025 13:36:55 +0100 Subject: [PATCH] Add mini openarms to test --- examples/openarms/teleop_openarms_mini.py | 141 +++++++++ .../teleoperators/openarms_mini/__init__.py | 21 ++ .../openarms_mini/config_openarms_mini.py | 33 ++ .../openarms_mini/openarms_mini.py | 293 ++++++++++++++++++ 4 files changed, 488 insertions(+) create mode 100644 examples/openarms/teleop_openarms_mini.py create mode 100644 src/lerobot/teleoperators/openarms_mini/__init__.py create mode 100644 src/lerobot/teleoperators/openarms_mini/config_openarms_mini.py create mode 100644 src/lerobot/teleoperators/openarms_mini/openarms_mini.py diff --git a/examples/openarms/teleop_openarms_mini.py b/examples/openarms/teleop_openarms_mini.py new file mode 100644 index 000000000..1c5e64ae7 --- /dev/null +++ b/examples/openarms/teleop_openarms_mini.py @@ -0,0 +1,141 @@ +""" +OpenArms Mini Teleoperation Example + +This script demonstrates teleoperation of an OpenArms follower robot using +an OpenArms Mini leader (Feetech-based) with dual arms (16 motors total). + +The OpenArms Mini has: +- Right arm: 8 motors (joint_1 to joint_7 + gripper) +- Left arm: 8 motors (joint_1 to joint_7 + gripper) +""" + +import time + +from lerobot.robots.openarms.openarms_follower import OpenArmsFollower +from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig +from lerobot.teleoperators.openarms_mini.openarms_mini import OpenArmsMini +from lerobot.teleoperators.openarms_mini.config_openarms_mini import OpenArmsMiniConfig + + +# Configure the OpenArms follower (Damiao motors on CAN bus) +follower_config = OpenArmsFollowerConfig( + port_left="can2", # CAN interface for follower left arm + port_right="can3", # CAN interface for follower right arm + can_interface="socketcan", # Linux SocketCAN + id="openarms_follower", + disable_torque_on_disconnect=True, + max_relative_target=10.0, # Safety limit (degrees per step) +) + +# Configure the OpenArms Mini leader (Feetech motors on serial) +leader_config = OpenArmsMiniConfig( + port_right="/dev/ttyUSB0", # Serial port for right arm + port_left="/dev/ttyUSB1", # Serial port for left arm + id="openarms_mini", + use_degrees=True, +) + +print("OpenArms Mini → OpenArms Follower Teleoperation") + +# Initialize devices +follower = OpenArmsFollower(follower_config) +leader = OpenArmsMini(leader_config) + +# Connect and calibrate follower +print("Note: If you have existing calibration, just press ENTER to use it.") +follower.connect(calibrate=True) + +# Connect and calibrate leader +print("Note: The leader arms will have torque disabled for manual control.") +leader.connect(calibrate=True) + +print("\nPress ENTER to start teleoperation...") +input() + +print("Press Ctrl+C to stop.\n") + +# All joints for both arms (16 motors total) +all_joints = [ + # Right arm + "right_joint_1", + "right_joint_2", + "right_joint_3", + "right_joint_4", + "right_joint_5", + "right_joint_6", + "right_joint_7", + "right_gripper", + # Left arm + "left_joint_1", + "left_joint_2", + "left_joint_3", + "left_joint_4", + "left_joint_5", + "left_joint_6", + "left_joint_7", + "left_gripper", +] + +# Performance monitoring +loop_times = [] +start_time = time.perf_counter() +last_print_time = start_time + +try: + while True: + loop_start = time.perf_counter() + + # Get action from leader (OpenArms Mini) + leader_action = leader.get_action() + + # Filter to only position data for all joints (both arms) + joint_action = {} + for joint in all_joints: + pos_key = f"{joint}.pos" + if pos_key in leader_action: + joint_action[pos_key] = leader_action[pos_key] + + # Send action to follower (OpenArms) + if joint_action: + follower.send_action(joint_action) + + # Measure loop time + loop_end = time.perf_counter() + loop_time = loop_end - loop_start + loop_times.append(loop_time) + + # Print stats every 2 seconds + if loop_end - last_print_time >= 2.0: + if loop_times: + avg_time = sum(loop_times) / len(loop_times) + current_hz = 1.0 / avg_time if avg_time > 0 else 0 + min_time = min(loop_times) + max_time = max(loop_times) + max_hz = 1.0 / min_time if min_time > 0 else 0 + min_hz = 1.0 / max_time if max_time > 0 else 0 + + print(f"[Hz Stats] Avg: {current_hz:.1f} Hz | " + f"Range: {min_hz:.1f}-{max_hz:.1f} Hz | " + f"Avg loop time: {avg_time*1000:.1f} ms") + + # Reset for next measurement window + loop_times = [] + last_print_time = loop_end + +except KeyboardInterrupt: + print("\n\nStopping teleoperation...") +finally: + # Disconnect devices + print("Disconnecting devices...") + try: + follower.disconnect() + except Exception as e: + print(f"Error disconnecting follower: {e}") + + try: + leader.disconnect() + except Exception as e: + print(f"Error disconnecting leader: {e}") + + print("Done!") + diff --git a/src/lerobot/teleoperators/openarms_mini/__init__.py b/src/lerobot/teleoperators/openarms_mini/__init__.py new file mode 100644 index 000000000..8a30d6012 --- /dev/null +++ b/src/lerobot/teleoperators/openarms_mini/__init__.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .config_openarms_mini import OpenArmsMiniConfig +from .openarms_mini import OpenArmsMini + +__all__ = ["OpenArmsMini", "OpenArmsMiniConfig"] + diff --git a/src/lerobot/teleoperators/openarms_mini/config_openarms_mini.py b/src/lerobot/teleoperators/openarms_mini/config_openarms_mini.py new file mode 100644 index 000000000..26f9481f6 --- /dev/null +++ b/src/lerobot/teleoperators/openarms_mini/config_openarms_mini.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..teleoperator import TeleoperatorConfig + + +@TeleoperatorConfig.register_subclass("openarms_mini") +@dataclass +class OpenArmsMiniConfig(TeleoperatorConfig): + """Configuration for OpenArms Mini teleoperator with Feetech motors (dual arms).""" + + # Serial ports for left and right arms + port_right: str = "/dev/ttyUSB0" # Serial port for right arm + port_left: str = "/dev/ttyUSB1" # Serial port for left arm + + # Whether to use degrees mode (True) or normalized mode (False) + use_degrees: bool = True + diff --git a/src/lerobot/teleoperators/openarms_mini/openarms_mini.py b/src/lerobot/teleoperators/openarms_mini/openarms_mini.py new file mode 100644 index 000000000..47a6212fe --- /dev/null +++ b/src/lerobot/teleoperators/openarms_mini/openarms_mini.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from lerobot.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.motors.feetech import ( + FeetechMotorsBus, + OperatingMode, +) +from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +from ..teleoperator import Teleoperator +from .config_openarms_mini import OpenArmsMiniConfig + +logger = logging.getLogger(__name__) + + +class OpenArmsMini(Teleoperator): + """ + OpenArms Mini Teleoperator with dual Feetech-based arms (8 motors per arm). + Each arm has 7 joints plus a gripper, using the same DOF as OpenArms. + """ + + config_class = OpenArmsMiniConfig + name = "openarms_mini" + + def __init__(self, config: OpenArmsMiniConfig): + super().__init__(config) + self.config = config + + # Use degrees mode for all motors + norm_mode_body = MotorNormMode.DEGREES + + # Right arm motors (8 motors: joint_1 to joint_7 + gripper) + motors_right = { + "joint_1": Motor(1, "sts3215", norm_mode_body), + "joint_2": Motor(2, "sts3215", norm_mode_body), + "joint_3": Motor(3, "sts3215", norm_mode_body), + "joint_4": Motor(4, "sts3215", norm_mode_body), + "joint_5": Motor(5, "sts3215", norm_mode_body), + "joint_6": Motor(6, "sts3215", norm_mode_body), + "joint_7": Motor(7, "sts3215", norm_mode_body), + "gripper": Motor(8, "sts3215", MotorNormMode.RANGE_0_100), + } + + # Left arm motors (8 motors: joint_1 to joint_7 + gripper) + # Note: Left arm uses IDs 11-18 to avoid conflicts if on same bus + motors_left = { + "joint_1": Motor(1, "sts3215", norm_mode_body), + "joint_2": Motor(2, "sts3215", norm_mode_body), + "joint_3": Motor(3, "sts3215", norm_mode_body), + "joint_4": Motor(4, "sts3215", norm_mode_body), + "joint_5": Motor(5, "sts3215", norm_mode_body), + "joint_6": Motor(6, "sts3215", norm_mode_body), + "joint_7": Motor(7, "sts3215", norm_mode_body), + "gripper": Motor(8, "sts3215", MotorNormMode.RANGE_0_100), + } + + # Initialize Feetech motor buses for each arm + self.bus_right = FeetechMotorsBus( + port=self.config.port_right, + motors=motors_right, + calibration={k.replace("right_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("right_")}, + ) + + self.bus_left = FeetechMotorsBus( + port=self.config.port_left, + motors=motors_left, + calibration={k.replace("left_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("left_")}, + ) + + @property + def action_features(self) -> dict[str, type]: + """Action features include positions for all motors (16 total: 8 per arm).""" + features = {} + # Right arm motors + for motor in self.bus_right.motors: + features[f"right_{motor}.pos"] = float + # Left arm motors + for motor in self.bus_left.motors: + features[f"left_{motor}.pos"] = float + return features + + @property + def feedback_features(self) -> dict[str, type]: + """No feedback features for now.""" + return {} + + @property + def is_connected(self) -> bool: + """Check if both arms are connected.""" + return self.bus_right.is_connected and self.bus_left.is_connected + + def connect(self, calibrate: bool = True) -> None: + """Connect to both arms and optionally calibrate.""" + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + # Connect to both buses + logger.info(f"Connecting right arm on {self.config.port_right}...") + self.bus_right.connect() + logger.info(f"Connecting left arm on {self.config.port_left}...") + self.bus_left.connect() + + # Calibrate if needed + if not self.is_calibrated and calibrate: + logger.info( + "Mismatch between calibration values in the motor and the calibration file or no calibration file found" + ) + self.calibrate() + + # Configure motors + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + """Check if both arms are calibrated.""" + return self.bus_right.is_calibrated and self.bus_left.is_calibrated + + def calibrate(self) -> None: + """ + Run calibration procedure for OpenArms Mini. + + The calibration procedure: + 1. Disable torque + 2. Ask user to position arms in hanging position with grippers closed + 3. Set this as zero position + 4. Set fixed range of -90° to +90° for all joints (0-100 for gripper) + 5. Save calibration + """ + if self.calibration: + # Calibration file exists, ask user whether to use it or run new calibration + user_input = input( + f"Press ENTER to use provided calibration file associated with the id {self.id}, " + f"or type 'c' and press ENTER to run calibration: " + ) + if user_input.strip().lower() != "c": + logger.info(f"Writing calibration file associated with the id {self.id} to the motors") + # Split calibration for each bus + cal_right = {k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")} + cal_left = {k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")} + self.bus_right.write_calibration(cal_right) + self.bus_left.write_calibration(cal_left) + return + + logger.info(f"\nRunning calibration of {self}") + + # Calibrate each arm separately + self._calibrate_arm("right", self.bus_right) + self._calibrate_arm("left", self.bus_left) + + self._save_calibration() + print(f"Calibration saved to {self.calibration_fpath}") + + def _calibrate_arm(self, arm_name: str, bus: FeetechMotorsBus) -> None: + """Calibrate a single arm with Feetech motors.""" + logger.info(f"\n=== Calibrating {arm_name.upper()} arm ===") + + # Disable torque for manual positioning + bus.disable_torque() + + # Set all motors to position mode + for motor in bus.motors: + bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + # Step 1: Set zero position + input( + f"\nCalibration: Zero Position ({arm_name.upper()} arm)\n" + "Position the arm in the following configuration:\n" + " - Arm hanging straight down\n" + " - Gripper closed\n" + "Press ENTER when ready..." + ) + + # Set current position as zero (half-turn homing) + homing_offsets = bus.set_half_turn_homings() + logger.info(f"{arm_name.capitalize()} arm zero position set.") + + # Step 2: Set fixed ranges + print( + f"\nSetting fixed ranges:\n" + f" - Joints 1-7: -90° to +90°\n" + f" - Gripper: 0 to 100\n" + ) + + # Create calibration data with fixed ranges + if self.calibration is None: + self.calibration = {} + + for motor_name, motor in bus.motors.items(): + # Prefix motor name with arm name for storage + prefixed_name = f"{arm_name}_{motor_name}" + + # Set ranges based on motor type + if motor_name == "gripper": + # Gripper uses 0-100 range + range_min = 0 + range_max = 100 + else: + # All joints use -90° to +90° range + range_min = -90 + range_max = 90 + + self.calibration[prefixed_name] = MotorCalibration( + id=motor.id, + drive_mode=0, # Normal direction + homing_offset=homing_offsets[motor_name], + range_min=range_min, + range_max=range_max, + ) + logger.info(f" {prefixed_name}: range set to [{range_min}, {range_max}]") + + # Write calibration to this arm's motors + cal_for_bus = {k.replace(f"{arm_name}_", ""): v for k, v in self.calibration.items() if k.startswith(f"{arm_name}_")} + bus.write_calibration(cal_for_bus) + + def configure(self) -> None: + """Configure motors with appropriate settings.""" + # Configure right arm + self.bus_right.disable_torque() + self.bus_right.configure_motors() + for motor in self.bus_right.motors: + self.bus_right.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + # Configure left arm + self.bus_left.disable_torque() + self.bus_left.configure_motors() + for motor in self.bus_left.motors: + self.bus_left.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + def setup_motors(self) -> None: + """Setup motor IDs for both arms.""" + print("\nSetting up RIGHT arm motors...") + for motor in reversed(self.bus_right.motors): + input(f"Connect the controller board to the RIGHT '{motor}' motor only and press enter.") + self.bus_right.setup_motor(motor) + print(f"RIGHT '{motor}' motor id set to {self.bus_right.motors[motor].id}") + + print("\nSetting up LEFT arm motors...") + for motor in reversed(self.bus_left.motors): + input(f"Connect the controller board to the LEFT '{motor}' motor only and press enter.") + self.bus_left.setup_motor(motor) + print(f"LEFT '{motor}' motor id set to {self.bus_left.motors[motor].id}") + + def get_action(self) -> dict[str, float]: + """Get current action from both arms (read positions from all motors).""" + start = time.perf_counter() + + # Read positions from both arms + right_positions = self.bus_right.sync_read("Present_Position") + left_positions = self.bus_left.sync_read("Present_Position") + + # Combine into single action dict with arm prefixes + action = {} + for motor, val in right_positions.items(): + action[f"right_{motor}.pos"] = val + for motor, val in left_positions.items(): + action[f"left_{motor}.pos"] = val + + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read action: {dt_ms:.1f}ms") + return action + + def send_feedback(self, feedback: dict[str, float]) -> None: + """Send feedback to teleoperator (not implemented).""" + raise NotImplementedError + + def disconnect(self) -> None: + """Disconnect from both arms.""" + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Disconnect both buses + self.bus_right.disconnect() + self.bus_left.disconnect() + logger.info(f"{self} disconnected.") +