mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-18 02:00:03 +00:00
add improv openarm mini
This commit is contained in:
committed by
Michel Aractingi
parent
14319ee608
commit
19fe69dac0
@@ -39,8 +39,8 @@ follower_config = OpenArmsFollowerConfig(
|
|||||||
|
|
||||||
# Configure the OpenArms Mini leader (Feetech motors on serial)
|
# Configure the OpenArms Mini leader (Feetech motors on serial)
|
||||||
leader_config = OpenArmsMiniConfig(
|
leader_config = OpenArmsMiniConfig(
|
||||||
port_right="/dev/ttyACM1", # Serial port for right arm
|
port_right="/dev/ttyACM0", # Serial port for right arm
|
||||||
port_left="/dev/ttyACM0", # Serial port for left arm
|
port_left="/dev/ttyACM1", # Serial port for left arm
|
||||||
id="openarms_mini",
|
id="openarms_mini",
|
||||||
use_degrees=True,
|
use_degrees=True,
|
||||||
)
|
)
|
||||||
@@ -94,20 +94,6 @@ max_loop_time = 0.0
|
|||||||
stats_update_interval = 1.0 # Update stats every 1 second
|
stats_update_interval = 1.0 # Update stats every 1 second
|
||||||
last_stats_update = time.perf_counter()
|
last_stats_update = time.perf_counter()
|
||||||
|
|
||||||
JOINT_DIRECTION = {
|
|
||||||
# invert direction
|
|
||||||
"right_joint_1": -1,
|
|
||||||
"right_joint_2": -1,
|
|
||||||
"right_joint_3": -1,
|
|
||||||
"right_joint_4": -1,
|
|
||||||
"right_joint_5": -1,
|
|
||||||
"left_joint_1": -1,
|
|
||||||
"left_joint_3": -1,
|
|
||||||
"left_joint_4": -1,
|
|
||||||
"left_joint_5": -1,
|
|
||||||
"left_joint_6": -1,
|
|
||||||
"left_joint_7": -1,
|
|
||||||
}
|
|
||||||
|
|
||||||
SWAPPED_JOINTS = {
|
SWAPPED_JOINTS = {
|
||||||
"right_joint_6": "right_joint_7",
|
"right_joint_6": "right_joint_7",
|
||||||
@@ -140,9 +126,6 @@ try:
|
|||||||
# Map 0-100 (Mini) to 0 to -65 (OpenArms)
|
# Map 0-100 (Mini) to 0 to -65 (OpenArms)
|
||||||
# 0 (closed) -> 0°, 100 (open) -> -65°
|
# 0 (closed) -> 0°, 100 (open) -> -65°
|
||||||
pos = (pos / 100.0) * -65.0
|
pos = (pos / 100.0) * -65.0
|
||||||
else:
|
|
||||||
# Apply direction reversal if specified (non-gripper joints only)
|
|
||||||
pos *= JOINT_DIRECTION.get(joint, 1)
|
|
||||||
|
|
||||||
# Store in action dict for follower
|
# Store in action dict for follower
|
||||||
joint_action[follower_key] = pos
|
joint_action[follower_key] = pos
|
||||||
|
|||||||
@@ -112,20 +112,7 @@ FRICTION_SCALE = 1.0
|
|||||||
RAMP_UP_DURATION = 3.0 # Seconds to ramp up PID/torque from 0 to full
|
RAMP_UP_DURATION = 3.0 # Seconds to ramp up PID/torque from 0 to full
|
||||||
|
|
||||||
# OpenArms Mini joint mappings (from teleop_openarms_mini.py)
|
# OpenArms Mini joint mappings (from teleop_openarms_mini.py)
|
||||||
JOINT_DIRECTION_MINI = {
|
# Note: Direction flipping is now handled in OpenArmsMini.get_action() method
|
||||||
"right_joint_1": -1,
|
|
||||||
"right_joint_2": -1,
|
|
||||||
"right_joint_3": -1,
|
|
||||||
"right_joint_4": -1,
|
|
||||||
"right_joint_5": -1,
|
|
||||||
"left_joint_1": -1,
|
|
||||||
"left_joint_3": -1,
|
|
||||||
"left_joint_4": -1,
|
|
||||||
"left_joint_5": -1,
|
|
||||||
"left_joint_6": -1,
|
|
||||||
"left_joint_7": -1,
|
|
||||||
}
|
|
||||||
|
|
||||||
SWAPPED_JOINTS_MINI = {
|
SWAPPED_JOINTS_MINI = {
|
||||||
"right_joint_6": "right_joint_7",
|
"right_joint_6": "right_joint_7",
|
||||||
"right_joint_7": "right_joint_6",
|
"right_joint_7": "right_joint_6",
|
||||||
@@ -545,6 +532,12 @@ def record_loop_with_compensation():
|
|||||||
all_joints.append(f"right_{motor}")
|
all_joints.append(f"right_{motor}")
|
||||||
for motor in leader.bus_left.motors:
|
for motor in leader.bus_left.motors:
|
||||||
all_joints.append(f"left_{motor}")
|
all_joints.append(f"left_{motor}")
|
||||||
|
|
||||||
|
# Disable torque on OpenArms Mini leader for manual control
|
||||||
|
if leader_type == "openarms_mini":
|
||||||
|
print(f"[Recording] Disabling torque on OpenArms Mini leader for manual control")
|
||||||
|
leader.bus_right.disable_torque()
|
||||||
|
leader.bus_left.disable_torque()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while not stop_recording_flag.is_set():
|
while not stop_recording_flag.is_set():
|
||||||
@@ -561,6 +554,7 @@ def record_loop_with_compensation():
|
|||||||
episode_start_time = loop_start
|
episode_start_time = loop_start
|
||||||
recording_state["recording_started_time"] = time.time() # Set timestamp for UI
|
recording_state["recording_started_time"] = time.time() # Set timestamp for UI
|
||||||
print(f"[Recording] Ramp-up complete, starting data recording")
|
print(f"[Recording] Ramp-up complete, starting data recording")
|
||||||
|
|
||||||
|
|
||||||
# Update ramp-up status for UI
|
# Update ramp-up status for UI
|
||||||
recording_state["ramp_up_remaining"] = round(ramp_up_remaining, 2)
|
recording_state["ramp_up_remaining"] = round(ramp_up_remaining, 2)
|
||||||
@@ -693,9 +687,6 @@ def record_loop_with_compensation():
|
|||||||
# Map 0-100 (Mini) to 0 to -65 (OpenArms)
|
# Map 0-100 (Mini) to 0 to -65 (OpenArms)
|
||||||
# 0 (closed) -> 0°, 100 (open) -> -65°
|
# 0 (closed) -> 0°, 100 (open) -> -65°
|
||||||
pos = (pos / 100.0) * -65.0
|
pos = (pos / 100.0) * -65.0
|
||||||
else:
|
|
||||||
# Apply direction reversal if specified (non-gripper joints only)
|
|
||||||
pos *= JOINT_DIRECTION_MINI.get(joint, 1)
|
|
||||||
|
|
||||||
follower_action[follower_key] = pos
|
follower_action[follower_key] = pos
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ class OpenArmsFollowerConfig(RobotConfig):
|
|||||||
|
|
||||||
# MIT control parameters for position control (used in send_action)
|
# MIT control parameters for position control (used in send_action)
|
||||||
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
|
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
|
||||||
position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0])
|
position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 25.0])
|
||||||
position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
|
position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
|
||||||
|
|
||||||
# Damping gains for stability when applying torque compensation (gravity/friction)
|
# Damping gains for stability when applying torque compensation (gravity/friction)
|
||||||
|
|||||||
@@ -117,11 +117,8 @@ class OpenArmsMini(Teleoperator):
|
|||||||
logger.info(f"Connecting left arm on {self.config.port_left}...")
|
logger.info(f"Connecting left arm on {self.config.port_left}...")
|
||||||
self.bus_left.connect()
|
self.bus_left.connect()
|
||||||
|
|
||||||
# Calibrate if needed
|
# Calibrate if requested (always prompt user)
|
||||||
if not self.is_calibrated and calibrate:
|
if calibrate:
|
||||||
logger.info(
|
|
||||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
|
||||||
)
|
|
||||||
self.calibrate()
|
self.calibrate()
|
||||||
|
|
||||||
# Configure motors
|
# Configure motors
|
||||||
@@ -145,13 +142,13 @@ class OpenArmsMini(Teleoperator):
|
|||||||
5. Save calibration
|
5. Save calibration
|
||||||
"""
|
"""
|
||||||
if self.calibration:
|
if self.calibration:
|
||||||
# Calibration file exists, ask user whether to use it or run new calibration
|
# Ask user whether to use existing calibration
|
||||||
user_input = input(
|
user_input = input(
|
||||||
f"Press ENTER to use provided calibration file associated with the id {self.id}, "
|
f"Press ENTER to use existing calibration for {self.id}, "
|
||||||
f"or type 'c' and press ENTER to run calibration: "
|
f"or type 'c' and press ENTER to run new calibration: "
|
||||||
)
|
)
|
||||||
if user_input.strip().lower() != "c":
|
if user_input.strip().lower() != "c":
|
||||||
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
|
logger.info(f"Using existing calibration for {self.id}")
|
||||||
# Split calibration for each bus
|
# Split calibration for each bus
|
||||||
cal_right = {k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")}
|
cal_right = {k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")}
|
||||||
cal_left = {k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")}
|
cal_left = {k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")}
|
||||||
@@ -159,14 +156,14 @@ class OpenArmsMini(Teleoperator):
|
|||||||
self.bus_left.write_calibration(cal_left)
|
self.bus_left.write_calibration(cal_left)
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(f"\nRunning calibration of {self}")
|
logger.info(f"\nRunning calibration for {self}")
|
||||||
|
|
||||||
# Calibrate each arm separately
|
# Calibrate each arm separately
|
||||||
self._calibrate_arm("right", self.bus_right)
|
self._calibrate_arm("right", self.bus_right)
|
||||||
self._calibrate_arm("left", self.bus_left)
|
self._calibrate_arm("left", self.bus_left)
|
||||||
|
|
||||||
self._save_calibration()
|
self._save_calibration()
|
||||||
print(f"Calibration saved to {self.calibration_fpath}")
|
print(f"\nCalibration complete and saved to {self.calibration_fpath}")
|
||||||
|
|
||||||
def _calibrate_arm(self, arm_name: str, bus: FeetechMotorsBus) -> None:
|
def _calibrate_arm(self, arm_name: str, bus: FeetechMotorsBus) -> None:
|
||||||
"""Calibrate a single arm with Feetech motors."""
|
"""Calibrate a single arm with Feetech motors."""
|
||||||
@@ -175,6 +172,12 @@ class OpenArmsMini(Teleoperator):
|
|||||||
# Disable torque for manual positioning
|
# Disable torque for manual positioning
|
||||||
bus.disable_torque()
|
bus.disable_torque()
|
||||||
|
|
||||||
|
# Set Phase to 12 for all motors
|
||||||
|
logger.info(f"Setting Phase to 12 for all motors in {arm_name.upper()} arm...")
|
||||||
|
for motor in bus.motors:
|
||||||
|
bus.write("Phase", motor, 12)
|
||||||
|
logger.info(f"Phase set to 12 for all motors in {arm_name.upper()} arm")
|
||||||
|
|
||||||
# Set all motors to position mode
|
# Set all motors to position mode
|
||||||
for motor in bus.motors:
|
for motor in bus.motors:
|
||||||
bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
|
bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
|
||||||
@@ -224,21 +227,30 @@ class OpenArmsMini(Teleoperator):
|
|||||||
open_pos = bus.read("Present_Position", motor_name, normalize=False)
|
open_pos = bus.read("Present_Position", motor_name, normalize=False)
|
||||||
logger.info(f" Gripper open position recorded: {open_pos}")
|
logger.info(f" Gripper open position recorded: {open_pos}")
|
||||||
|
|
||||||
# Use recorded positions as min/max for 0-100 normalization
|
# For RANGE_0_100: range_min maps to 0 (closed), range_max maps to 100 (open)
|
||||||
# 0 = closed, 100 = open
|
# If gripper motor direction is reversed (closed > open), we need to swap and use drive_mode=1
|
||||||
range_min = int(closed_pos)
|
if closed_pos < open_pos:
|
||||||
range_max = int(open_pos)
|
# Normal direction: 0=closed, 100=open
|
||||||
|
range_min = int(closed_pos)
|
||||||
|
range_max = int(open_pos)
|
||||||
|
drive_mode = 0
|
||||||
|
else:
|
||||||
|
# Reversed direction: swap so min < max, and set drive_mode=1 to reverse
|
||||||
|
range_min = int(open_pos)
|
||||||
|
range_max = int(closed_pos)
|
||||||
|
drive_mode = 1
|
||||||
|
|
||||||
logger.info(f" {prefixed_name}: range set to [{range_min}, {range_max}] (0=closed, 100=open)")
|
logger.info(f" {prefixed_name}: range set to [{range_min}, {range_max}] (0=closed, 100=open, drive_mode={drive_mode})")
|
||||||
else:
|
else:
|
||||||
# Use full motor range for joint motors (will use degrees normalization)
|
# Use full motor range for joint motors (will use degrees normalization)
|
||||||
range_min = 0
|
range_min = 0
|
||||||
range_max = max_res
|
range_max = max_res
|
||||||
|
drive_mode = 0 # Normal direction for non-gripper motors
|
||||||
logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)")
|
logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)")
|
||||||
|
|
||||||
self.calibration[prefixed_name] = MotorCalibration(
|
self.calibration[prefixed_name] = MotorCalibration(
|
||||||
id=motor.id,
|
id=motor.id,
|
||||||
drive_mode=0, # Normal direction
|
drive_mode=drive_mode,
|
||||||
homing_offset=homing_offsets[motor_name],
|
homing_offset=homing_offsets[motor_name],
|
||||||
range_min=range_min,
|
range_min=range_min,
|
||||||
range_max=range_max,
|
range_max=range_max,
|
||||||
@@ -280,16 +292,26 @@ class OpenArmsMini(Teleoperator):
|
|||||||
"""Get current action from both arms (read positions from all motors)."""
|
"""Get current action from both arms (read positions from all motors)."""
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
|
|
||||||
|
# Motors to flip (invert direction) - different for each arm
|
||||||
|
right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"]
|
||||||
|
left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"]
|
||||||
|
|
||||||
# Read positions from both arms
|
# Read positions from both arms
|
||||||
right_positions = self.bus_right.sync_read("Present_Position")
|
right_positions = self.bus_right.sync_read("Present_Position")
|
||||||
left_positions = self.bus_left.sync_read("Present_Position")
|
left_positions = self.bus_left.sync_read("Present_Position")
|
||||||
|
|
||||||
# Combine into single action dict with arm prefixes
|
# Combine into single action dict with arm prefixes and flip specified motors
|
||||||
action = {}
|
action = {}
|
||||||
for motor, val in right_positions.items():
|
for motor, val in right_positions.items():
|
||||||
action[f"right_{motor}.pos"] = val
|
if motor in right_motors_to_flip:
|
||||||
|
action[f"right_{motor}.pos"] = -val
|
||||||
|
else:
|
||||||
|
action[f"right_{motor}.pos"] = val
|
||||||
for motor, val in left_positions.items():
|
for motor, val in left_positions.items():
|
||||||
action[f"left_{motor}.pos"] = val
|
if motor in left_motors_to_flip:
|
||||||
|
action[f"left_{motor}.pos"] = -val
|
||||||
|
else:
|
||||||
|
action[f"left_{motor}.pos"] = val
|
||||||
|
|
||||||
dt_ms = (time.perf_counter() - start) * 1e3
|
dt_ms = (time.perf_counter() - start) * 1e3
|
||||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||||
|
|||||||
Reference in New Issue
Block a user