mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-20 19:19:56 +00:00
fix calibration of gripper and add max clip positions for openarm for safety
This commit is contained in:
@@ -7,6 +7,11 @@ an OpenArms Mini leader (Feetech-based) with dual arms (16 motors total).
|
|||||||
The OpenArms Mini has:
|
The OpenArms Mini has:
|
||||||
- Right arm: 8 motors (joint_1 to joint_7 + gripper)
|
- Right arm: 8 motors (joint_1 to joint_7 + gripper)
|
||||||
- Left arm: 8 motors (joint_1 to joint_7 + gripper)
|
- Left arm: 8 motors (joint_1 to joint_7 + gripper)
|
||||||
|
|
||||||
|
Note on gripper normalization:
|
||||||
|
- OpenArms Mini gripper: 0-100 scale (0=closed, 100=open)
|
||||||
|
- OpenArms follower gripper: degrees (0=closed, -65=open)
|
||||||
|
- This script automatically converts between the two ranges
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import time
|
import time
|
||||||
@@ -17,7 +22,10 @@ from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
|
|||||||
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
|
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
|
||||||
from lerobot.teleoperators.openarms_mini.openarms_mini import OpenArmsMini
|
from lerobot.teleoperators.openarms_mini.openarms_mini import OpenArmsMini
|
||||||
from lerobot.teleoperators.openarms_mini.config_openarms_mini import OpenArmsMiniConfig
|
from lerobot.teleoperators.openarms_mini.config_openarms_mini import OpenArmsMiniConfig
|
||||||
|
from lerobot.utils.robot_utils import busy_wait
|
||||||
|
|
||||||
|
# Target control frequency
|
||||||
|
TARGET_FPS = 30
|
||||||
|
|
||||||
# Configure the OpenArms follower (Damiao motors on CAN bus)
|
# Configure the OpenArms follower (Damiao motors on CAN bus)
|
||||||
follower_config = OpenArmsFollowerConfig(
|
follower_config = OpenArmsFollowerConfig(
|
||||||
@@ -80,8 +88,11 @@ all_joints = [
|
|||||||
|
|
||||||
# Performance monitoring
|
# Performance monitoring
|
||||||
loop_times = []
|
loop_times = []
|
||||||
start_time = time.perf_counter()
|
avg_loop_time = 0.0
|
||||||
last_print_time = start_time
|
min_loop_time = float('inf')
|
||||||
|
max_loop_time = 0.0
|
||||||
|
stats_update_interval = 1.0 # Update stats every 1 second
|
||||||
|
last_stats_update = time.perf_counter()
|
||||||
|
|
||||||
JOINT_DIRECTION = {
|
JOINT_DIRECTION = {
|
||||||
# invert direction
|
# invert direction
|
||||||
@@ -126,17 +137,51 @@ try:
|
|||||||
# Get leader position (default 0 if missing)
|
# Get leader position (default 0 if missing)
|
||||||
pos = leader_action.get(leader_key, 0.0)
|
pos = leader_action.get(leader_key, 0.0)
|
||||||
|
|
||||||
# Apply direction reversal if specified
|
# Convert gripper values: Mini uses 0-100, OpenArms uses 0 to -65 degrees
|
||||||
pos *= JOINT_DIRECTION.get(joint, 1)
|
if "gripper" in joint:
|
||||||
|
# Map 0-100 (Mini) to 0 to -65 (OpenArms)
|
||||||
|
# 0 (closed) -> 0°, 100 (open) -> -65°
|
||||||
|
pos = (pos / 100.0) * -65.0
|
||||||
|
else:
|
||||||
|
# Apply direction reversal if specified (non-gripper joints only)
|
||||||
|
pos *= JOINT_DIRECTION.get(joint, 1)
|
||||||
|
|
||||||
# Store in action dict for follower
|
# Store in action dict for follower
|
||||||
|
|
||||||
joint_action[follower_key] = pos
|
joint_action[follower_key] = pos
|
||||||
|
|
||||||
#follower.send_action(joint_action)
|
#follower.send_action(joint_action)
|
||||||
|
|
||||||
|
# Loop timing
|
||||||
|
loop_end = time.perf_counter()
|
||||||
|
loop_time = loop_end - loop_start
|
||||||
|
loop_times.append(loop_time)
|
||||||
|
|
||||||
|
# Update stats periodically
|
||||||
|
current_time = time.perf_counter()
|
||||||
|
if current_time - last_stats_update >= stats_update_interval:
|
||||||
|
if loop_times:
|
||||||
|
avg_loop_time = sum(loop_times) / len(loop_times)
|
||||||
|
min_loop_time = min(loop_times)
|
||||||
|
max_loop_time = max(loop_times)
|
||||||
|
loop_times = []
|
||||||
|
last_stats_update = current_time
|
||||||
|
|
||||||
|
# Display everything
|
||||||
sys.stdout.write("\033[H\033[J") # Clear screen
|
sys.stdout.write("\033[H\033[J") # Clear screen
|
||||||
print(f"{'Joint':<20} {'Leader (deg)':>15} {'Follower (deg)':>15}")
|
|
||||||
|
# Show timing stats at the top
|
||||||
|
if avg_loop_time > 0:
|
||||||
|
avg_hz = 1.0 / avg_loop_time
|
||||||
|
min_hz = 1.0 / max_loop_time if max_loop_time > 0 else 0
|
||||||
|
max_hz = 1.0 / min_loop_time if min_loop_time > 0 and min_loop_time < float('inf') else 0
|
||||||
|
print(f"[Performance] Target: {TARGET_FPS} Hz | Avg: {avg_hz:.1f} Hz | Range: {min_hz:.1f}-{max_hz:.1f} Hz | Loop: {avg_loop_time*1000:.1f} ms\n")
|
||||||
|
else:
|
||||||
|
print(f"[Performance] Target: {TARGET_FPS} Hz | Measuring...\n")
|
||||||
|
|
||||||
|
# Show joint positions
|
||||||
|
print(f"{'Joint':<20} {'Leader':>15} {'Follower':>15}")
|
||||||
|
print(f"{'':20} {'(0-100/deg)':>15} {'(deg)':>15}")
|
||||||
|
print("-" * 52)
|
||||||
|
|
||||||
for joint in all_joints:
|
for joint in all_joints:
|
||||||
leader_key = f"{joint}.pos"
|
leader_key = f"{joint}.pos"
|
||||||
@@ -148,27 +193,9 @@ try:
|
|||||||
|
|
||||||
print(f"{joint:<20} {leader_pos:>15.2f} {follower_pos:>15.2f}")
|
print(f"{joint:<20} {leader_pos:>15.2f} {follower_pos:>15.2f}")
|
||||||
|
|
||||||
# Loop timing and stats
|
# Smart sleep to maintain target FPS
|
||||||
loop_end = time.perf_counter()
|
dt_s = time.perf_counter() - loop_start
|
||||||
loop_time = loop_end - loop_start
|
busy_wait(max(0, 1.0 / TARGET_FPS - dt_s))
|
||||||
loop_times.append(loop_time)
|
|
||||||
|
|
||||||
# Print stats every 2 seconds
|
|
||||||
if loop_times:
|
|
||||||
avg_time = sum(loop_times) / len(loop_times)
|
|
||||||
current_hz = 1.0 / avg_time if avg_time > 0 else 0
|
|
||||||
min_time = min(loop_times)
|
|
||||||
max_time = max(loop_times)
|
|
||||||
max_hz = 1.0 / min_time if min_time > 0 else 0
|
|
||||||
min_hz = 1.0 / max_time if max_time > 0 else 0
|
|
||||||
print(f"\n[Hz Stats] Avg: {current_hz:.1f} Hz | "
|
|
||||||
f"Range: {min_hz:.1f}-{max_hz:.1f} Hz | "
|
|
||||||
f"Avg loop time: {avg_time*1000:.1f} ms")
|
|
||||||
|
|
||||||
loop_times = []
|
|
||||||
last_print_time = loop_end
|
|
||||||
|
|
||||||
time.sleep(0.05) # Small sleep to prevent flooding the terminal
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("\n\nStopping teleoperation...")
|
print("\n\nStopping teleoperation...")
|
||||||
|
|||||||
@@ -91,3 +91,28 @@ class OpenArmsFollowerConfig(RobotConfig):
|
|||||||
# Calibration parameters
|
# Calibration parameters
|
||||||
calibration_mode: str = "manual" # "manual" or "auto"
|
calibration_mode: str = "manual" # "manual" or "auto"
|
||||||
zero_position_on_connect: bool = False # Set zero position on connect
|
zero_position_on_connect: bool = False # Set zero position on connect
|
||||||
|
|
||||||
|
# Joint limits for position clipping (degrees)
|
||||||
|
# Format: [min, max] for each joint
|
||||||
|
# These limits clip commands in send_action to prevent mechanical damage
|
||||||
|
joint_limits_right: Dict[str, tuple[float, float]] = field(default_factory=lambda: {
|
||||||
|
"joint_1": (-75.0, 75.0),
|
||||||
|
"joint_2": (-9.0, 90.0),
|
||||||
|
"joint_3": (-85.0, 85.0),
|
||||||
|
"joint_4": (0.0, 135.0),
|
||||||
|
"joint_5": (-85.0, 85.0),
|
||||||
|
"joint_6": (-40.0, 40.0),
|
||||||
|
"joint_7": (-80.0, 80.0),
|
||||||
|
"gripper": (-65.0, 0.0),
|
||||||
|
})
|
||||||
|
|
||||||
|
joint_limits_left: Dict[str, tuple[float, float]] = field(default_factory=lambda: {
|
||||||
|
"joint_1": (-75.0, 75.0),
|
||||||
|
"joint_2": (-90.0, 9.0),
|
||||||
|
"joint_3": (-85.0, 85.0),
|
||||||
|
"joint_4": (0.0, 135.0),
|
||||||
|
"joint_5": (-85.0, 85.0),
|
||||||
|
"joint_6": (-40.0, 40.0),
|
||||||
|
"joint_7": (-80.0, 80.0),
|
||||||
|
"gripper": (-65.0, 0.0),
|
||||||
|
})
|
||||||
@@ -394,11 +394,29 @@ class OpenArmsFollower(Robot):
|
|||||||
motor_name = key.removesuffix(".pos")
|
motor_name = key.removesuffix(".pos")
|
||||||
if motor_name.startswith("right_"):
|
if motor_name.startswith("right_"):
|
||||||
# Remove "right_" prefix for bus access
|
# Remove "right_" prefix for bus access
|
||||||
goal_pos_right[motor_name.removeprefix("right_")] = val # do we also do this read in other robots in send action?
|
goal_pos_right[motor_name.removeprefix("right_")] = val
|
||||||
elif motor_name.startswith("left_"):
|
elif motor_name.startswith("left_"):
|
||||||
# Remove "left_" prefix for bus access
|
# Remove "left_" prefix for bus access
|
||||||
goal_pos_left[motor_name.removeprefix("left_")] = val
|
goal_pos_left[motor_name.removeprefix("left_")] = val
|
||||||
|
|
||||||
|
# Apply joint limit clipping to right arm
|
||||||
|
for motor_name, position in goal_pos_right.items():
|
||||||
|
if motor_name in self.config.joint_limits_right:
|
||||||
|
min_limit, max_limit = self.config.joint_limits_right[motor_name]
|
||||||
|
clipped_position = max(min_limit, min(max_limit, position))
|
||||||
|
if clipped_position != position:
|
||||||
|
logger.debug(f"Clipped right_{motor_name} from {position:.2f}° to {clipped_position:.2f}°")
|
||||||
|
goal_pos_right[motor_name] = clipped_position
|
||||||
|
|
||||||
|
# Apply joint limit clipping to left arm
|
||||||
|
for motor_name, position in goal_pos_left.items():
|
||||||
|
if motor_name in self.config.joint_limits_left:
|
||||||
|
min_limit, max_limit = self.config.joint_limits_left[motor_name]
|
||||||
|
clipped_position = max(min_limit, min(max_limit, position))
|
||||||
|
if clipped_position != position:
|
||||||
|
logger.debug(f"Clipped left_{motor_name} from {position:.2f}° to {clipped_position:.2f}°")
|
||||||
|
goal_pos_left[motor_name] = clipped_position
|
||||||
|
|
||||||
# Apply safety limits if configured
|
# Apply safety limits if configured
|
||||||
if self.config.max_relative_target is not None:
|
if self.config.max_relative_target is not None:
|
||||||
# Get current positions
|
# Get current positions
|
||||||
|
|||||||
@@ -192,28 +192,49 @@ class OpenArmsMini(Teleoperator):
|
|||||||
homing_offsets = bus.set_half_turn_homings()
|
homing_offsets = bus.set_half_turn_homings()
|
||||||
logger.info(f"{arm_name.capitalize()} arm zero position set.")
|
logger.info(f"{arm_name.capitalize()} arm zero position set.")
|
||||||
|
|
||||||
# Step 2: Set fixed ranges (use full motor range for all motors)
|
# Step 2: Set ranges for joints and gripper
|
||||||
print(
|
print(f"\nSetting motor ranges for {arm_name.upper()} arm\n")
|
||||||
f"\nSetting motor ranges to full range (0-4095)\n"
|
|
||||||
f"Normalization will handle conversion to degrees/0-100 at runtime\n"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create calibration data with full motor ranges
|
# Create calibration data with full motor ranges
|
||||||
if self.calibration is None:
|
if self.calibration is None:
|
||||||
self.calibration = {}
|
self.calibration = {}
|
||||||
|
|
||||||
|
# Get motor resolution
|
||||||
|
motor_resolution = bus.model_resolution_table[list(bus.motors.values())[0].model]
|
||||||
|
max_res = motor_resolution - 1
|
||||||
|
|
||||||
for motor_name, motor in bus.motors.items():
|
for motor_name, motor in bus.motors.items():
|
||||||
# Prefix motor name with arm name for storage
|
# Prefix motor name with arm name for storage
|
||||||
prefixed_name = f"{arm_name}_{motor_name}"
|
prefixed_name = f"{arm_name}_{motor_name}"
|
||||||
|
|
||||||
# Get motor resolution
|
if motor_name == "gripper":
|
||||||
motor_resolution = bus.model_resolution_table[motor.model]
|
# Interactive calibration for gripper
|
||||||
max_res = motor_resolution - 1
|
input(
|
||||||
|
f"\nGripper Calibration ({arm_name.upper()} arm)\n"
|
||||||
# Use full motor range for all motors
|
f"Step 1: CLOSE the gripper fully\n"
|
||||||
# The normalization layer will convert to degrees or 0-100 based on norm_mode
|
f"Press ENTER when gripper is closed..."
|
||||||
range_min = 0
|
)
|
||||||
range_max = max_res
|
closed_pos = bus.read("Present_Position", motor_name, normalize=False)
|
||||||
|
logger.info(f" Gripper closed position recorded: {closed_pos}")
|
||||||
|
|
||||||
|
input(
|
||||||
|
f"\nStep 2: OPEN the gripper fully\n"
|
||||||
|
f"Press ENTER when gripper is fully open..."
|
||||||
|
)
|
||||||
|
open_pos = bus.read("Present_Position", motor_name, normalize=False)
|
||||||
|
logger.info(f" Gripper open position recorded: {open_pos}")
|
||||||
|
|
||||||
|
# Use recorded positions as min/max for 0-100 normalization
|
||||||
|
# 0 = closed, 100 = open
|
||||||
|
range_min = int(closed_pos)
|
||||||
|
range_max = int(open_pos)
|
||||||
|
|
||||||
|
logger.info(f" {prefixed_name}: range set to [{range_min}, {range_max}] (0=closed, 100=open)")
|
||||||
|
else:
|
||||||
|
# Use full motor range for joint motors (will use degrees normalization)
|
||||||
|
range_min = 0
|
||||||
|
range_max = max_res
|
||||||
|
logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)")
|
||||||
|
|
||||||
self.calibration[prefixed_name] = MotorCalibration(
|
self.calibration[prefixed_name] = MotorCalibration(
|
||||||
id=motor.id,
|
id=motor.id,
|
||||||
@@ -222,7 +243,6 @@ class OpenArmsMini(Teleoperator):
|
|||||||
range_min=range_min,
|
range_min=range_min,
|
||||||
range_max=range_max,
|
range_max=range_max,
|
||||||
)
|
)
|
||||||
logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)")
|
|
||||||
|
|
||||||
# Write calibration to this arm's motors
|
# Write calibration to this arm's motors
|
||||||
cal_for_bus = {k.replace(f"{arm_name}_", ""): v for k, v in self.calibration.items() if k.startswith(f"{arm_name}_")}
|
cal_for_bus = {k.replace(f"{arm_name}_", ""): v for k, v in self.calibration.items() if k.startswith(f"{arm_name}_")}
|
||||||
|
|||||||
Reference in New Issue
Block a user