diff --git a/examples/openarms/teleop_openarms_mini.py b/examples/openarms/teleop_openarms_mini.py index 9faf0c90d..f8a95e6aa 100644 --- a/examples/openarms/teleop_openarms_mini.py +++ b/examples/openarms/teleop_openarms_mini.py @@ -39,8 +39,8 @@ follower_config = OpenArmsFollowerConfig( # Configure the OpenArms Mini leader (Feetech motors on serial) leader_config = OpenArmsMiniConfig( - port_right="/dev/ttyACM1", # Serial port for right arm - port_left="/dev/ttyACM0", # Serial port for left arm + port_right="/dev/ttyACM0", # Serial port for right arm + port_left="/dev/ttyACM1", # Serial port for left arm id="openarms_mini", use_degrees=True, ) @@ -94,20 +94,6 @@ max_loop_time = 0.0 stats_update_interval = 1.0 # Update stats every 1 second last_stats_update = time.perf_counter() -JOINT_DIRECTION = { - # invert direction - "right_joint_1": -1, - "right_joint_2": -1, - "right_joint_3": -1, - "right_joint_4": -1, - "right_joint_5": -1, - "left_joint_1": -1, - "left_joint_3": -1, - "left_joint_4": -1, - "left_joint_5": -1, - "left_joint_6": -1, - "left_joint_7": -1, -} SWAPPED_JOINTS = { "right_joint_6": "right_joint_7", @@ -140,9 +126,6 @@ try: # Map 0-100 (Mini) to 0 to -65 (OpenArms) # 0 (closed) -> 0°, 100 (open) -> -65° pos = (pos / 100.0) * -65.0 - else: - # Apply direction reversal if specified (non-gripper joints only) - pos *= JOINT_DIRECTION.get(joint, 1) # Store in action dict for follower joint_action[follower_key] = pos diff --git a/examples/openarms_web_interface/web_record_server.py b/examples/openarms_web_interface/web_record_server.py index 0cc9b45c4..cb01d36b6 100644 --- a/examples/openarms_web_interface/web_record_server.py +++ b/examples/openarms_web_interface/web_record_server.py @@ -112,20 +112,7 @@ FRICTION_SCALE = 1.0 RAMP_UP_DURATION = 3.0 # Seconds to ramp up PID/torque from 0 to full # OpenArms Mini joint mappings (from teleop_openarms_mini.py) -JOINT_DIRECTION_MINI = { - "right_joint_1": -1, - "right_joint_2": -1, - "right_joint_3": -1, - "right_joint_4": -1, - "right_joint_5": -1, - "left_joint_1": -1, - "left_joint_3": -1, - "left_joint_4": -1, - "left_joint_5": -1, - "left_joint_6": -1, - "left_joint_7": -1, -} - +# Note: Direction flipping is now handled in OpenArmsMini.get_action() method SWAPPED_JOINTS_MINI = { "right_joint_6": "right_joint_7", "right_joint_7": "right_joint_6", @@ -545,6 +532,12 @@ def record_loop_with_compensation(): all_joints.append(f"right_{motor}") for motor in leader.bus_left.motors: all_joints.append(f"left_{motor}") + + # Disable torque on OpenArms Mini leader for manual control + if leader_type == "openarms_mini": + print(f"[Recording] Disabling torque on OpenArms Mini leader for manual control") + leader.bus_right.disable_torque() + leader.bus_left.disable_torque() try: while not stop_recording_flag.is_set(): @@ -561,6 +554,7 @@ def record_loop_with_compensation(): episode_start_time = loop_start recording_state["recording_started_time"] = time.time() # Set timestamp for UI print(f"[Recording] Ramp-up complete, starting data recording") + # Update ramp-up status for UI recording_state["ramp_up_remaining"] = round(ramp_up_remaining, 2) @@ -693,9 +687,6 @@ def record_loop_with_compensation(): # Map 0-100 (Mini) to 0 to -65 (OpenArms) # 0 (closed) -> 0°, 100 (open) -> -65° pos = (pos / 100.0) * -65.0 - else: - # Apply direction reversal if specified (non-gripper joints only) - pos *= JOINT_DIRECTION_MINI.get(joint, 1) follower_action[follower_key] = pos else: diff --git a/src/lerobot/robots/openarms/config_openarms_follower.py b/src/lerobot/robots/openarms/config_openarms_follower.py index c178cc7da..d1d858ee7 100644 --- a/src/lerobot/robots/openarms/config_openarms_follower.py +++ b/src/lerobot/robots/openarms/config_openarms_follower.py @@ -74,7 +74,7 @@ class OpenArmsFollowerConfig(RobotConfig): # MIT control parameters for position control (used in send_action) # List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper] - position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0]) + position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 25.0]) position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2]) # Damping gains for stability when applying torque compensation (gravity/friction) diff --git a/src/lerobot/teleoperators/openarms_mini/openarms_mini.py b/src/lerobot/teleoperators/openarms_mini/openarms_mini.py index 3d57d066b..0d42927ca 100644 --- a/src/lerobot/teleoperators/openarms_mini/openarms_mini.py +++ b/src/lerobot/teleoperators/openarms_mini/openarms_mini.py @@ -117,11 +117,8 @@ class OpenArmsMini(Teleoperator): logger.info(f"Connecting left arm on {self.config.port_left}...") self.bus_left.connect() - # Calibrate if needed - if not self.is_calibrated and calibrate: - logger.info( - "Mismatch between calibration values in the motor and the calibration file or no calibration file found" - ) + # Calibrate if requested (always prompt user) + if calibrate: self.calibrate() # Configure motors @@ -145,13 +142,13 @@ class OpenArmsMini(Teleoperator): 5. Save calibration """ if self.calibration: - # Calibration file exists, ask user whether to use it or run new calibration + # Ask user whether to use existing calibration user_input = input( - f"Press ENTER to use provided calibration file associated with the id {self.id}, " - f"or type 'c' and press ENTER to run calibration: " + f"Press ENTER to use existing calibration for {self.id}, " + f"or type 'c' and press ENTER to run new calibration: " ) if user_input.strip().lower() != "c": - logger.info(f"Writing calibration file associated with the id {self.id} to the motors") + logger.info(f"Using existing calibration for {self.id}") # Split calibration for each bus cal_right = {k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")} cal_left = {k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")} @@ -159,14 +156,14 @@ class OpenArmsMini(Teleoperator): self.bus_left.write_calibration(cal_left) return - logger.info(f"\nRunning calibration of {self}") + logger.info(f"\nRunning calibration for {self}") # Calibrate each arm separately self._calibrate_arm("right", self.bus_right) self._calibrate_arm("left", self.bus_left) self._save_calibration() - print(f"Calibration saved to {self.calibration_fpath}") + print(f"\nCalibration complete and saved to {self.calibration_fpath}") def _calibrate_arm(self, arm_name: str, bus: FeetechMotorsBus) -> None: """Calibrate a single arm with Feetech motors.""" @@ -175,6 +172,12 @@ class OpenArmsMini(Teleoperator): # Disable torque for manual positioning bus.disable_torque() + # Set Phase to 12 for all motors + logger.info(f"Setting Phase to 12 for all motors in {arm_name.upper()} arm...") + for motor in bus.motors: + bus.write("Phase", motor, 12) + logger.info(f"Phase set to 12 for all motors in {arm_name.upper()} arm") + # Set all motors to position mode for motor in bus.motors: bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) @@ -224,21 +227,30 @@ class OpenArmsMini(Teleoperator): open_pos = bus.read("Present_Position", motor_name, normalize=False) logger.info(f" Gripper open position recorded: {open_pos}") - # Use recorded positions as min/max for 0-100 normalization - # 0 = closed, 100 = open - range_min = int(closed_pos) - range_max = int(open_pos) + # For RANGE_0_100: range_min maps to 0 (closed), range_max maps to 100 (open) + # If gripper motor direction is reversed (closed > open), we need to swap and use drive_mode=1 + if closed_pos < open_pos: + # Normal direction: 0=closed, 100=open + range_min = int(closed_pos) + range_max = int(open_pos) + drive_mode = 0 + else: + # Reversed direction: swap so min < max, and set drive_mode=1 to reverse + range_min = int(open_pos) + range_max = int(closed_pos) + drive_mode = 1 - logger.info(f" {prefixed_name}: range set to [{range_min}, {range_max}] (0=closed, 100=open)") + logger.info(f" {prefixed_name}: range set to [{range_min}, {range_max}] (0=closed, 100=open, drive_mode={drive_mode})") else: # Use full motor range for joint motors (will use degrees normalization) range_min = 0 range_max = max_res + drive_mode = 0 # Normal direction for non-gripper motors logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)") self.calibration[prefixed_name] = MotorCalibration( id=motor.id, - drive_mode=0, # Normal direction + drive_mode=drive_mode, homing_offset=homing_offsets[motor_name], range_min=range_min, range_max=range_max, @@ -280,16 +292,26 @@ class OpenArmsMini(Teleoperator): """Get current action from both arms (read positions from all motors).""" start = time.perf_counter() + # Motors to flip (invert direction) - different for each arm + right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"] + left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"] + # Read positions from both arms right_positions = self.bus_right.sync_read("Present_Position") left_positions = self.bus_left.sync_read("Present_Position") - # Combine into single action dict with arm prefixes + # Combine into single action dict with arm prefixes and flip specified motors action = {} for motor, val in right_positions.items(): - action[f"right_{motor}.pos"] = val + if motor in right_motors_to_flip: + action[f"right_{motor}.pos"] = -val + else: + action[f"right_{motor}.pos"] = val for motor, val in left_positions.items(): - action[f"left_{motor}.pos"] = val + if motor in left_motors_to_flip: + action[f"left_{motor}.pos"] = -val + else: + action[f"left_{motor}.pos"] = val dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read action: {dt_ms:.1f}ms")