mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-15 08:39:49 +00:00
add improv openarm mini
This commit is contained in:
@@ -39,8 +39,8 @@ follower_config = OpenArmsFollowerConfig(
|
||||
|
||||
# Configure the OpenArms Mini leader (Feetech motors on serial)
|
||||
leader_config = OpenArmsMiniConfig(
|
||||
port_right="/dev/ttyACM1", # Serial port for right arm
|
||||
port_left="/dev/ttyACM0", # Serial port for left arm
|
||||
port_right="/dev/ttyACM0", # Serial port for right arm
|
||||
port_left="/dev/ttyACM1", # Serial port for left arm
|
||||
id="openarms_mini",
|
||||
use_degrees=True,
|
||||
)
|
||||
@@ -94,20 +94,6 @@ max_loop_time = 0.0
|
||||
stats_update_interval = 1.0 # Update stats every 1 second
|
||||
last_stats_update = time.perf_counter()
|
||||
|
||||
JOINT_DIRECTION = {
|
||||
# invert direction
|
||||
"right_joint_1": -1,
|
||||
"right_joint_2": -1,
|
||||
"right_joint_3": -1,
|
||||
"right_joint_4": -1,
|
||||
"right_joint_5": -1,
|
||||
"left_joint_1": -1,
|
||||
"left_joint_3": -1,
|
||||
"left_joint_4": -1,
|
||||
"left_joint_5": -1,
|
||||
"left_joint_6": -1,
|
||||
"left_joint_7": -1,
|
||||
}
|
||||
|
||||
SWAPPED_JOINTS = {
|
||||
"right_joint_6": "right_joint_7",
|
||||
@@ -140,9 +126,6 @@ try:
|
||||
# Map 0-100 (Mini) to 0 to -65 (OpenArms)
|
||||
# 0 (closed) -> 0°, 100 (open) -> -65°
|
||||
pos = (pos / 100.0) * -65.0
|
||||
else:
|
||||
# Apply direction reversal if specified (non-gripper joints only)
|
||||
pos *= JOINT_DIRECTION.get(joint, 1)
|
||||
|
||||
# Store in action dict for follower
|
||||
joint_action[follower_key] = pos
|
||||
|
||||
@@ -112,20 +112,7 @@ FRICTION_SCALE = 1.0
|
||||
RAMP_UP_DURATION = 3.0 # Seconds to ramp up PID/torque from 0 to full
|
||||
|
||||
# OpenArms Mini joint mappings (from teleop_openarms_mini.py)
|
||||
JOINT_DIRECTION_MINI = {
|
||||
"right_joint_1": -1,
|
||||
"right_joint_2": -1,
|
||||
"right_joint_3": -1,
|
||||
"right_joint_4": -1,
|
||||
"right_joint_5": -1,
|
||||
"left_joint_1": -1,
|
||||
"left_joint_3": -1,
|
||||
"left_joint_4": -1,
|
||||
"left_joint_5": -1,
|
||||
"left_joint_6": -1,
|
||||
"left_joint_7": -1,
|
||||
}
|
||||
|
||||
# Note: Direction flipping is now handled in OpenArmsMini.get_action() method
|
||||
SWAPPED_JOINTS_MINI = {
|
||||
"right_joint_6": "right_joint_7",
|
||||
"right_joint_7": "right_joint_6",
|
||||
@@ -545,6 +532,12 @@ def record_loop_with_compensation():
|
||||
all_joints.append(f"right_{motor}")
|
||||
for motor in leader.bus_left.motors:
|
||||
all_joints.append(f"left_{motor}")
|
||||
|
||||
# Disable torque on OpenArms Mini leader for manual control
|
||||
if leader_type == "openarms_mini":
|
||||
print(f"[Recording] Disabling torque on OpenArms Mini leader for manual control")
|
||||
leader.bus_right.disable_torque()
|
||||
leader.bus_left.disable_torque()
|
||||
|
||||
try:
|
||||
while not stop_recording_flag.is_set():
|
||||
@@ -561,6 +554,7 @@ def record_loop_with_compensation():
|
||||
episode_start_time = loop_start
|
||||
recording_state["recording_started_time"] = time.time() # Set timestamp for UI
|
||||
print(f"[Recording] Ramp-up complete, starting data recording")
|
||||
|
||||
|
||||
# Update ramp-up status for UI
|
||||
recording_state["ramp_up_remaining"] = round(ramp_up_remaining, 2)
|
||||
@@ -693,9 +687,6 @@ def record_loop_with_compensation():
|
||||
# Map 0-100 (Mini) to 0 to -65 (OpenArms)
|
||||
# 0 (closed) -> 0°, 100 (open) -> -65°
|
||||
pos = (pos / 100.0) * -65.0
|
||||
else:
|
||||
# Apply direction reversal if specified (non-gripper joints only)
|
||||
pos *= JOINT_DIRECTION_MINI.get(joint, 1)
|
||||
|
||||
follower_action[follower_key] = pos
|
||||
else:
|
||||
|
||||
@@ -74,7 +74,7 @@ class OpenArmsFollowerConfig(RobotConfig):
|
||||
|
||||
# MIT control parameters for position control (used in send_action)
|
||||
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
|
||||
position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0])
|
||||
position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 25.0])
|
||||
position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
|
||||
|
||||
# Damping gains for stability when applying torque compensation (gravity/friction)
|
||||
|
||||
@@ -117,11 +117,8 @@ class OpenArmsMini(Teleoperator):
|
||||
logger.info(f"Connecting left arm on {self.config.port_left}...")
|
||||
self.bus_left.connect()
|
||||
|
||||
# Calibrate if needed
|
||||
if not self.is_calibrated and calibrate:
|
||||
logger.info(
|
||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
||||
)
|
||||
# Calibrate if requested (always prompt user)
|
||||
if calibrate:
|
||||
self.calibrate()
|
||||
|
||||
# Configure motors
|
||||
@@ -145,13 +142,13 @@ class OpenArmsMini(Teleoperator):
|
||||
5. Save calibration
|
||||
"""
|
||||
if self.calibration:
|
||||
# Calibration file exists, ask user whether to use it or run new calibration
|
||||
# Ask user whether to use existing calibration
|
||||
user_input = input(
|
||||
f"Press ENTER to use provided calibration file associated with the id {self.id}, "
|
||||
f"or type 'c' and press ENTER to run calibration: "
|
||||
f"Press ENTER to use existing calibration for {self.id}, "
|
||||
f"or type 'c' and press ENTER to run new calibration: "
|
||||
)
|
||||
if user_input.strip().lower() != "c":
|
||||
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
|
||||
logger.info(f"Using existing calibration for {self.id}")
|
||||
# Split calibration for each bus
|
||||
cal_right = {k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")}
|
||||
cal_left = {k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")}
|
||||
@@ -159,14 +156,14 @@ class OpenArmsMini(Teleoperator):
|
||||
self.bus_left.write_calibration(cal_left)
|
||||
return
|
||||
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
logger.info(f"\nRunning calibration for {self}")
|
||||
|
||||
# Calibrate each arm separately
|
||||
self._calibrate_arm("right", self.bus_right)
|
||||
self._calibrate_arm("left", self.bus_left)
|
||||
|
||||
self._save_calibration()
|
||||
print(f"Calibration saved to {self.calibration_fpath}")
|
||||
print(f"\nCalibration complete and saved to {self.calibration_fpath}")
|
||||
|
||||
def _calibrate_arm(self, arm_name: str, bus: FeetechMotorsBus) -> None:
|
||||
"""Calibrate a single arm with Feetech motors."""
|
||||
@@ -175,6 +172,12 @@ class OpenArmsMini(Teleoperator):
|
||||
# Disable torque for manual positioning
|
||||
bus.disable_torque()
|
||||
|
||||
# Set Phase to 12 for all motors
|
||||
logger.info(f"Setting Phase to 12 for all motors in {arm_name.upper()} arm...")
|
||||
for motor in bus.motors:
|
||||
bus.write("Phase", motor, 12)
|
||||
logger.info(f"Phase set to 12 for all motors in {arm_name.upper()} arm")
|
||||
|
||||
# Set all motors to position mode
|
||||
for motor in bus.motors:
|
||||
bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
|
||||
@@ -224,21 +227,30 @@ class OpenArmsMini(Teleoperator):
|
||||
open_pos = bus.read("Present_Position", motor_name, normalize=False)
|
||||
logger.info(f" Gripper open position recorded: {open_pos}")
|
||||
|
||||
# Use recorded positions as min/max for 0-100 normalization
|
||||
# 0 = closed, 100 = open
|
||||
range_min = int(closed_pos)
|
||||
range_max = int(open_pos)
|
||||
# For RANGE_0_100: range_min maps to 0 (closed), range_max maps to 100 (open)
|
||||
# If gripper motor direction is reversed (closed > open), we need to swap and use drive_mode=1
|
||||
if closed_pos < open_pos:
|
||||
# Normal direction: 0=closed, 100=open
|
||||
range_min = int(closed_pos)
|
||||
range_max = int(open_pos)
|
||||
drive_mode = 0
|
||||
else:
|
||||
# Reversed direction: swap so min < max, and set drive_mode=1 to reverse
|
||||
range_min = int(open_pos)
|
||||
range_max = int(closed_pos)
|
||||
drive_mode = 1
|
||||
|
||||
logger.info(f" {prefixed_name}: range set to [{range_min}, {range_max}] (0=closed, 100=open)")
|
||||
logger.info(f" {prefixed_name}: range set to [{range_min}, {range_max}] (0=closed, 100=open, drive_mode={drive_mode})")
|
||||
else:
|
||||
# Use full motor range for joint motors (will use degrees normalization)
|
||||
range_min = 0
|
||||
range_max = max_res
|
||||
drive_mode = 0 # Normal direction for non-gripper motors
|
||||
logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)")
|
||||
|
||||
self.calibration[prefixed_name] = MotorCalibration(
|
||||
id=motor.id,
|
||||
drive_mode=0, # Normal direction
|
||||
drive_mode=drive_mode,
|
||||
homing_offset=homing_offsets[motor_name],
|
||||
range_min=range_min,
|
||||
range_max=range_max,
|
||||
@@ -280,16 +292,26 @@ class OpenArmsMini(Teleoperator):
|
||||
"""Get current action from both arms (read positions from all motors)."""
|
||||
start = time.perf_counter()
|
||||
|
||||
# Motors to flip (invert direction) - different for each arm
|
||||
right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"]
|
||||
left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"]
|
||||
|
||||
# Read positions from both arms
|
||||
right_positions = self.bus_right.sync_read("Present_Position")
|
||||
left_positions = self.bus_left.sync_read("Present_Position")
|
||||
|
||||
# Combine into single action dict with arm prefixes
|
||||
# Combine into single action dict with arm prefixes and flip specified motors
|
||||
action = {}
|
||||
for motor, val in right_positions.items():
|
||||
action[f"right_{motor}.pos"] = val
|
||||
if motor in right_motors_to_flip:
|
||||
action[f"right_{motor}.pos"] = -val
|
||||
else:
|
||||
action[f"right_{motor}.pos"] = val
|
||||
for motor, val in left_positions.items():
|
||||
action[f"left_{motor}.pos"] = val
|
||||
if motor in left_motors_to_flip:
|
||||
action[f"left_{motor}.pos"] = -val
|
||||
else:
|
||||
action[f"left_{motor}.pos"] = val
|
||||
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||
|
||||
Reference in New Issue
Block a user