diff --git a/examples/openarms/teleop_openarms_mini.py b/examples/openarms/teleop_openarms_mini.py index ab9e7361f..e05a148ad 100644 --- a/examples/openarms/teleop_openarms_mini.py +++ b/examples/openarms/teleop_openarms_mini.py @@ -7,6 +7,11 @@ an OpenArms Mini leader (Feetech-based) with dual arms (16 motors total). The OpenArms Mini has: - Right arm: 8 motors (joint_1 to joint_7 + gripper) - Left arm: 8 motors (joint_1 to joint_7 + gripper) + +Note on gripper normalization: +- OpenArms Mini gripper: 0-100 scale (0=closed, 100=open) +- OpenArms follower gripper: degrees (0=closed, -65=open) +- This script automatically converts between the two ranges """ import time @@ -17,7 +22,10 @@ from lerobot.robots.openarms.openarms_follower import OpenArmsFollower from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig from lerobot.teleoperators.openarms_mini.openarms_mini import OpenArmsMini from lerobot.teleoperators.openarms_mini.config_openarms_mini import OpenArmsMiniConfig +from lerobot.utils.robot_utils import busy_wait +# Target control frequency +TARGET_FPS = 30 # Configure the OpenArms follower (Damiao motors on CAN bus) follower_config = OpenArmsFollowerConfig( @@ -80,8 +88,11 @@ all_joints = [ # Performance monitoring loop_times = [] -start_time = time.perf_counter() -last_print_time = start_time +avg_loop_time = 0.0 +min_loop_time = float('inf') +max_loop_time = 0.0 +stats_update_interval = 1.0 # Update stats every 1 second +last_stats_update = time.perf_counter() JOINT_DIRECTION = { # invert direction @@ -126,17 +137,51 @@ try: # Get leader position (default 0 if missing) pos = leader_action.get(leader_key, 0.0) - # Apply direction reversal if specified - pos *= JOINT_DIRECTION.get(joint, 1) + # Convert gripper values: Mini uses 0-100, OpenArms uses 0 to -65 degrees + if "gripper" in joint: + # Map 0-100 (Mini) to 0 to -65 (OpenArms) + # 0 (closed) -> 0°, 100 (open) -> -65° + pos = (pos / 100.0) * -65.0 + else: + # Apply direction reversal if specified (non-gripper joints only) + pos *= JOINT_DIRECTION.get(joint, 1) # Store in action dict for follower - joint_action[follower_key] = pos #follower.send_action(joint_action) + # Loop timing + loop_end = time.perf_counter() + loop_time = loop_end - loop_start + loop_times.append(loop_time) + + # Update stats periodically + current_time = time.perf_counter() + if current_time - last_stats_update >= stats_update_interval: + if loop_times: + avg_loop_time = sum(loop_times) / len(loop_times) + min_loop_time = min(loop_times) + max_loop_time = max(loop_times) + loop_times = [] + last_stats_update = current_time + + # Display everything sys.stdout.write("\033[H\033[J") # Clear screen - print(f"{'Joint':<20} {'Leader (deg)':>15} {'Follower (deg)':>15}") + + # Show timing stats at the top + if avg_loop_time > 0: + avg_hz = 1.0 / avg_loop_time + min_hz = 1.0 / max_loop_time if max_loop_time > 0 else 0 + max_hz = 1.0 / min_loop_time if min_loop_time > 0 and min_loop_time < float('inf') else 0 + print(f"[Performance] Target: {TARGET_FPS} Hz | Avg: {avg_hz:.1f} Hz | Range: {min_hz:.1f}-{max_hz:.1f} Hz | Loop: {avg_loop_time*1000:.1f} ms\n") + else: + print(f"[Performance] Target: {TARGET_FPS} Hz | Measuring...\n") + + # Show joint positions + print(f"{'Joint':<20} {'Leader':>15} {'Follower':>15}") + print(f"{'':20} {'(0-100/deg)':>15} {'(deg)':>15}") + print("-" * 52) for joint in all_joints: leader_key = f"{joint}.pos" @@ -148,27 +193,9 @@ try: print(f"{joint:<20} {leader_pos:>15.2f} {follower_pos:>15.2f}") - # Loop timing and stats - loop_end = time.perf_counter() - loop_time = loop_end - loop_start - loop_times.append(loop_time) - - # Print stats every 2 seconds - if loop_times: - avg_time = sum(loop_times) / len(loop_times) - current_hz = 1.0 / avg_time if avg_time > 0 else 0 - min_time = min(loop_times) - max_time = max(loop_times) - max_hz = 1.0 / min_time if min_time > 0 else 0 - min_hz = 1.0 / max_time if max_time > 0 else 0 - print(f"\n[Hz Stats] Avg: {current_hz:.1f} Hz | " - f"Range: {min_hz:.1f}-{max_hz:.1f} Hz | " - f"Avg loop time: {avg_time*1000:.1f} ms") - - loop_times = [] - last_print_time = loop_end - - time.sleep(0.05) # Small sleep to prevent flooding the terminal + # Smart sleep to maintain target FPS + dt_s = time.perf_counter() - loop_start + busy_wait(max(0, 1.0 / TARGET_FPS - dt_s)) except KeyboardInterrupt: print("\n\nStopping teleoperation...") diff --git a/src/lerobot/robots/openarms/config_openarms_follower.py b/src/lerobot/robots/openarms/config_openarms_follower.py index 5e28b2620..c178cc7da 100644 --- a/src/lerobot/robots/openarms/config_openarms_follower.py +++ b/src/lerobot/robots/openarms/config_openarms_follower.py @@ -91,3 +91,28 @@ class OpenArmsFollowerConfig(RobotConfig): # Calibration parameters calibration_mode: str = "manual" # "manual" or "auto" zero_position_on_connect: bool = False # Set zero position on connect + + # Joint limits for position clipping (degrees) + # Format: [min, max] for each joint + # These limits clip commands in send_action to prevent mechanical damage + joint_limits_right: Dict[str, tuple[float, float]] = field(default_factory=lambda: { + "joint_1": (-75.0, 75.0), + "joint_2": (-9.0, 90.0), + "joint_3": (-85.0, 85.0), + "joint_4": (0.0, 135.0), + "joint_5": (-85.0, 85.0), + "joint_6": (-40.0, 40.0), + "joint_7": (-80.0, 80.0), + "gripper": (-65.0, 0.0), + }) + + joint_limits_left: Dict[str, tuple[float, float]] = field(default_factory=lambda: { + "joint_1": (-75.0, 75.0), + "joint_2": (-90.0, 9.0), + "joint_3": (-85.0, 85.0), + "joint_4": (0.0, 135.0), + "joint_5": (-85.0, 85.0), + "joint_6": (-40.0, 40.0), + "joint_7": (-80.0, 80.0), + "gripper": (-65.0, 0.0), + }) \ No newline at end of file diff --git a/src/lerobot/robots/openarms/openarms_follower.py b/src/lerobot/robots/openarms/openarms_follower.py index 38482c5dd..d11177242 100644 --- a/src/lerobot/robots/openarms/openarms_follower.py +++ b/src/lerobot/robots/openarms/openarms_follower.py @@ -394,11 +394,29 @@ class OpenArmsFollower(Robot): motor_name = key.removesuffix(".pos") if motor_name.startswith("right_"): # Remove "right_" prefix for bus access - goal_pos_right[motor_name.removeprefix("right_")] = val # do we also do this read in other robots in send action? + goal_pos_right[motor_name.removeprefix("right_")] = val elif motor_name.startswith("left_"): # Remove "left_" prefix for bus access goal_pos_left[motor_name.removeprefix("left_")] = val + # Apply joint limit clipping to right arm + for motor_name, position in goal_pos_right.items(): + if motor_name in self.config.joint_limits_right: + min_limit, max_limit = self.config.joint_limits_right[motor_name] + clipped_position = max(min_limit, min(max_limit, position)) + if clipped_position != position: + logger.debug(f"Clipped right_{motor_name} from {position:.2f}° to {clipped_position:.2f}°") + goal_pos_right[motor_name] = clipped_position + + # Apply joint limit clipping to left arm + for motor_name, position in goal_pos_left.items(): + if motor_name in self.config.joint_limits_left: + min_limit, max_limit = self.config.joint_limits_left[motor_name] + clipped_position = max(min_limit, min(max_limit, position)) + if clipped_position != position: + logger.debug(f"Clipped left_{motor_name} from {position:.2f}° to {clipped_position:.2f}°") + goal_pos_left[motor_name] = clipped_position + # Apply safety limits if configured if self.config.max_relative_target is not None: # Get current positions diff --git a/src/lerobot/teleoperators/openarms_mini/openarms_mini.py b/src/lerobot/teleoperators/openarms_mini/openarms_mini.py index 26f402460..3d57d066b 100644 --- a/src/lerobot/teleoperators/openarms_mini/openarms_mini.py +++ b/src/lerobot/teleoperators/openarms_mini/openarms_mini.py @@ -192,28 +192,49 @@ class OpenArmsMini(Teleoperator): homing_offsets = bus.set_half_turn_homings() logger.info(f"{arm_name.capitalize()} arm zero position set.") - # Step 2: Set fixed ranges (use full motor range for all motors) - print( - f"\nSetting motor ranges to full range (0-4095)\n" - f"Normalization will handle conversion to degrees/0-100 at runtime\n" - ) + # Step 2: Set ranges for joints and gripper + print(f"\nSetting motor ranges for {arm_name.upper()} arm\n") # Create calibration data with full motor ranges if self.calibration is None: self.calibration = {} + # Get motor resolution + motor_resolution = bus.model_resolution_table[list(bus.motors.values())[0].model] + max_res = motor_resolution - 1 + for motor_name, motor in bus.motors.items(): # Prefix motor name with arm name for storage prefixed_name = f"{arm_name}_{motor_name}" - # Get motor resolution - motor_resolution = bus.model_resolution_table[motor.model] - max_res = motor_resolution - 1 - - # Use full motor range for all motors - # The normalization layer will convert to degrees or 0-100 based on norm_mode - range_min = 0 - range_max = max_res + if motor_name == "gripper": + # Interactive calibration for gripper + input( + f"\nGripper Calibration ({arm_name.upper()} arm)\n" + f"Step 1: CLOSE the gripper fully\n" + f"Press ENTER when gripper is closed..." + ) + closed_pos = bus.read("Present_Position", motor_name, normalize=False) + logger.info(f" Gripper closed position recorded: {closed_pos}") + + input( + f"\nStep 2: OPEN the gripper fully\n" + f"Press ENTER when gripper is fully open..." + ) + open_pos = bus.read("Present_Position", motor_name, normalize=False) + logger.info(f" Gripper open position recorded: {open_pos}") + + # Use recorded positions as min/max for 0-100 normalization + # 0 = closed, 100 = open + range_min = int(closed_pos) + range_max = int(open_pos) + + logger.info(f" {prefixed_name}: range set to [{range_min}, {range_max}] (0=closed, 100=open)") + else: + # Use full motor range for joint motors (will use degrees normalization) + range_min = 0 + range_max = max_res + logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)") self.calibration[prefixed_name] = MotorCalibration( id=motor.id, @@ -222,7 +243,6 @@ class OpenArmsMini(Teleoperator): range_min=range_min, range_max=range_max, ) - logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)") # Write calibration to this arm's motors cal_for_bus = {k.replace(f"{arm_name}_", ""): v for k, v in self.calibration.items() if k.startswith(f"{arm_name}_")}