add openarms mini

This commit is contained in:
pepijn kooijmans
2025-11-21 11:48:52 +01:00
committed by Michel Aractingi
parent 9b04fd25b6
commit 14319ee608
3 changed files with 466 additions and 155 deletions
+4 -6
View File
@@ -39,9 +39,9 @@ follower_config = OpenArmsFollowerConfig(
# Configure the OpenArms Mini leader (Feetech motors on serial)
leader_config = OpenArmsMiniConfig(
port_right="/dev/ttyACM0", # Serial port for right arm
port_left="/dev/ttyACM1", # Serial port for left arm
id="openarms_minis",
port_right="/dev/ttyACM1", # Serial port for right arm
port_left="/dev/ttyACM0", # Serial port for left arm
id="openarms_mini",
use_degrees=True,
)
@@ -131,8 +131,6 @@ try:
# Determine which follower joint this leader joint controls
follower_joint = SWAPPED_JOINTS.get(joint, joint)
follower_key = f"{follower_joint}.pos"
if "left" in follower_key:
continue
# Get leader position (default 0 if missing)
pos = leader_action.get(leader_key, 0.0)
@@ -149,7 +147,7 @@ try:
# Store in action dict for follower
joint_action[follower_key] = pos
#follower.send_action(joint_action)
follower.send_action(joint_action)
# Loop timing
loop_end = time.perf_counter()
+117 -10
View File
@@ -24,6 +24,7 @@ function App() {
// Configuration
const [config, setConfig] = useState({
leader_type: 'openarms', // 'openarms' or 'openarms_mini'
leader_left: 'can0',
leader_right: 'can1',
follower_left: 'can2',
@@ -35,6 +36,7 @@ function App() {
// Available options
const [availableCameras, setAvailableCameras] = useState([]);
const [availableUsbPorts, setAvailableUsbPorts] = useState([]);
const canInterfaces = ['can0', 'can1', 'can2', 'can3'];
const statusIntervalRef = useRef(null);
@@ -192,6 +194,43 @@ function App() {
}
};
// Discover USB ports
const discoverUsbPorts = async () => {
try {
const response = await fetch(`${API_BASE}/usb/discover`);
const data = await response.json();
const ports = data.ports || [];
setAvailableUsbPorts(ports);
// Auto-fix config if OpenArms Mini is selected and ports are invalid
if (config.leader_type === 'openarms_mini') {
const updated = { ...config };
let changed = false;
if (ports.length >= 1 && !ports.includes(config.leader_left)) {
updated.leader_left = ports[0];
changed = true;
}
if (ports.length >= 2 && !ports.includes(config.leader_right)) {
updated.leader_right = ports[1];
changed = true;
}
if (changed) {
setConfig(updated);
saveConfig(updated);
}
}
if (ports.length === 0) {
console.warn('No USB ports detected for OpenArms Mini');
}
} catch (e) {
console.error('Failed to discover USB ports:', e);
}
};
// Set task only (for pedal use)
const setTaskOnly = async () => {
if (!task.trim()) {
@@ -324,6 +363,7 @@ function App() {
loadConfig();
discoverCameras();
discoverUsbPorts();
fetchStatus();
statusIntervalRef.current = setInterval(fetchStatus, 1000);
@@ -335,6 +375,14 @@ function App() {
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []); // Run only once on mount
// Discover USB ports when leader type changes to Mini
useEffect(() => {
if (config.leader_type === 'openarms_mini') {
discoverUsbPorts();
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [config.leader_type]);
return (
<main>
<header>
@@ -385,9 +433,42 @@ function App() {
</div>
</div>
{/* CAN Interfaces */}
{/* Leader Type Selection */}
<div className="config-section">
<h3>CAN Interfaces</h3>
<h3>🎮 Leader Type</h3>
<div className="config-grid">
<label style={{gridColumn: '1 / -1'}}>
Leader Arm Type
<select
value={config.leader_type}
onChange={(e) => updateConfig('leader_type', e.target.value)}
disabled={isRecording || robotsReady}
>
<option value="openarms">OpenArms (CAN Bus - Damiao Motors)</option>
<option value="openarms_mini">OpenArms Mini (USB - Feetech Motors)</option>
</select>
</label>
</div>
</div>
{/* Leader Interfaces (CAN or USB based on type) */}
<div className="config-section">
<div style={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center', marginBottom: '0.5rem' }}>
<h3>
{config.leader_type === 'openarms_mini'
? `Leader Ports (USB) ${availableUsbPorts.length > 0 ? `(${availableUsbPorts.length} detected)` : ''}`
: 'Leader Interfaces (CAN)'}
</h3>
{config.leader_type === 'openarms_mini' && (
<button
onClick={discoverUsbPorts}
className="btn-refresh"
disabled={isRecording || robotsReady}
>
🔄 Refresh
</button>
)}
</div>
<div className="config-grid">
<label>
Leader Left
@@ -396,9 +477,19 @@ function App() {
onChange={(e) => updateConfig('leader_left', e.target.value)}
disabled={isRecording || robotsReady}
>
{canInterfaces.map((iface) => (
<option key={iface} value={iface}>{iface}</option>
))}
{config.leader_type === 'openarms_mini' ? (
availableUsbPorts.length > 0 ? (
availableUsbPorts.map((port) => (
<option key={port} value={port}>{port}</option>
))
) : (
<option value="">No USB ports detected</option>
)
) : (
canInterfaces.map((iface) => (
<option key={iface} value={iface}>{iface}</option>
))
)}
</select>
</label>
@@ -409,12 +500,28 @@ function App() {
onChange={(e) => updateConfig('leader_right', e.target.value)}
disabled={isRecording || robotsReady}
>
{canInterfaces.map((iface) => (
<option key={iface} value={iface}>{iface}</option>
))}
{config.leader_type === 'openarms_mini' ? (
availableUsbPorts.length > 0 ? (
availableUsbPorts.map((port) => (
<option key={port} value={port}>{port}</option>
))
) : (
<option value="">No USB ports detected</option>
)
) : (
canInterfaces.map((iface) => (
<option key={iface} value={iface}>{iface}</option>
))
)}
</select>
</label>
</div>
</div>
{/* Follower CAN Interfaces */}
<div className="config-section">
<h3>Follower Interfaces (CAN)</h3>
<div className="config-grid">
<label>
Follower Left
<select
@@ -627,9 +734,9 @@ function App() {
onClick={moveToZero}
disabled={movingToZero}
className="btn-zero-large"
title="Move follower robot to zero position (2s with 60% gains)"
title="Move both leader and follower robots to zero position (2s)"
>
{movingToZero ? '⏳ Moving to Zero Position...' : '🎯 Move to Zero Position'}
{movingToZero ? '⏳ Moving to Zero Position...' : '🎯 Move to Zero Position (Leader + Follower)'}
</button>
</div>
)}
@@ -37,6 +37,8 @@ from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerCon
from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig
from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader
from lerobot.teleoperators.openarms_mini.config_openarms_mini import OpenArmsMiniConfig
from lerobot.teleoperators.openarms_mini.openarms_mini import OpenArmsMini
app = FastAPI(title="OpenArms Recording Server")
@@ -68,6 +70,7 @@ recording_state = {
"recording_started_time": None, # Time when actual recording starts (after ramp-up)
"moving_to_zero": False, # Whether robot is moving to zero position
"config": {
"leader_type": "openarms", # "openarms" or "openarms_mini"
"leader_left": "can0",
"leader_right": "can1",
"follower_left": "can2",
@@ -108,6 +111,28 @@ FPS = 30
FRICTION_SCALE = 1.0
RAMP_UP_DURATION = 3.0 # Seconds to ramp up PID/torque from 0 to full
# OpenArms Mini joint mappings (from teleop_openarms_mini.py)
JOINT_DIRECTION_MINI = {
"right_joint_1": -1,
"right_joint_2": -1,
"right_joint_3": -1,
"right_joint_4": -1,
"right_joint_5": -1,
"left_joint_1": -1,
"left_joint_3": -1,
"left_joint_4": -1,
"left_joint_5": -1,
"left_joint_6": -1,
"left_joint_7": -1,
}
SWAPPED_JOINTS_MINI = {
"right_joint_6": "right_joint_7",
"right_joint_7": "right_joint_6",
"left_joint_6": "left_joint_7",
"left_joint_7": "left_joint_6",
}
# Pedal configuration
PEDAL_DEVICE = "/dev/input/by-id/usb-PCsensor_FootSwitch-event-kbd"
PEDAL_ENABLED = True # Set to False to disable pedal
@@ -118,6 +143,7 @@ pedal_action_lock = threading.Lock() # Prevent concurrent pedal actions
class RecordingConfig(BaseModel):
task: str
leader_type: str # "openarms" or "openarms_mini"
leader_left: str
leader_right: str
follower_left: str
@@ -129,6 +155,7 @@ class RecordingConfig(BaseModel):
class RobotSetupConfig(BaseModel):
"""Configuration for robot setup (no task required)."""
leader_type: str # "openarms" or "openarms_mini"
leader_left: str
leader_right: str
follower_left: str
@@ -154,6 +181,31 @@ def discover_cameras_sync():
return []
def discover_usb_ports_sync():
"""Discover available USB/serial ports for OpenArms Mini."""
try:
import glob
import serial
# Find all USB serial devices
ports = []
for pattern in ["/dev/ttyUSB*", "/dev/ttyACM*"]:
for port in glob.glob(pattern):
# Check if port is accessible
try:
# Try to open and close the port to verify access
ser = serial.Serial(port, timeout=0.1)
ser.close()
ports.append(port)
except (OSError, serial.SerialException) as e:
print(f"[USB] Port {port} exists but not accessible: {e}")
ports.sort()
print(f"[USB] Found {len(ports)} accessible USB port(s): {ports}")
return ports
except Exception as e:
print(f"[USB] Error: {e}")
return []
@app.get("/api/cameras/discover")
async def discover_cameras():
"""Discover available cameras."""
@@ -161,6 +213,30 @@ async def discover_cameras():
return {"cameras": cameras}
@app.get("/api/usb/discover")
async def discover_usb_ports():
"""Discover available USB/serial ports."""
ports = discover_usb_ports_sync()
return {"ports": ports}
@app.get("/api/usb/test/{port:path}")
async def test_usb_port(port: str):
"""Test if a USB port is accessible."""
try:
import serial
print(f"[USB Test] Testing port: {port}")
ser = serial.Serial(port, baudrate=1000000, timeout=0.1)
ser.close()
return {"status": "success", "message": f"Port {port} is accessible"}
except PermissionError as e:
return {"status": "error", "message": f"Permission denied: {e}"}
except serial.SerialException as e:
return {"status": "error", "message": f"Serial error: {e}"}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
@app.get("/api/can/interfaces")
async def get_can_interfaces():
"""Get available CAN interfaces."""
@@ -171,7 +247,9 @@ def initialize_robots_only(config: RobotSetupConfig):
"""Initialize robots only (no dataset) for pre-setup."""
global robot_instances, recording_state
print(f"[Setup] Initializing robots...")
print(f"[Setup] Initializing robots with leader type: {config.leader_type}")
print(f"[Setup] Leader ports: left={config.leader_left}, right={config.leader_right}")
print(f"[Setup] Follower ports: left={config.follower_left}, right={config.follower_right}")
# Update status: Configuring cameras
recording_state["status_message"] = "Configuring cameras..."
@@ -183,6 +261,7 @@ def initialize_robots_only(config: RobotSetupConfig):
# Configure follower robot with cameras
recording_state["status_message"] = "Configuring follower robot..."
print(f"[Setup] Configuring follower with CAN: {config.follower_left}, {config.follower_right}")
follower_config = OpenArmsFollowerConfig(
port_left=config.follower_left,
port_right=config.follower_right,
@@ -193,38 +272,68 @@ def initialize_robots_only(config: RobotSetupConfig):
cameras=camera_config,
)
# Configure leader teleoperator
# Configure leader teleoperator based on type
recording_state["status_message"] = "Configuring leader teleoperator..."
leader_config = OpenArmsLeaderConfig(
port_left=config.leader_left,
port_right=config.leader_right,
can_interface="socketcan",
id="openarms_leader",
manual_control=False,
)
if config.leader_type == "openarms_mini":
# OpenArms Mini: USB/serial ports, Feetech motors
leader_config = OpenArmsMiniConfig(
port_left=config.leader_left,
port_right=config.leader_right,
id="openarms_mini",
use_degrees=True,
)
leader = OpenArmsMini(leader_config)
else:
# OpenArms: CAN bus, Damiao motors
leader_config = OpenArmsLeaderConfig(
port_left=config.leader_left,
port_right=config.leader_right,
can_interface="socketcan",
id="openarms_leader",
manual_control=False,
)
leader = OpenArmsLeader(leader_config)
# Initialize and connect
recording_state["status_message"] = "Connecting to follower robot..."
follower = OpenArmsFollower(follower_config)
follower.connect(calibrate=False) # Skip calibration in web mode
try:
follower.connect(calibrate=False) # Skip calibration in web mode
except Exception as e:
print(f"[Setup] Follower connection error: {e}")
raise RuntimeError(f"Failed to connect to follower robot: {e}")
recording_state["status_message"] = "Connecting to leader teleoperator..."
leader = OpenArmsLeader(leader_config)
leader.connect(calibrate=False)
try:
leader.connect(calibrate=False)
except Exception as e:
print(f"[Setup] Leader connection error: {e}")
# Disconnect follower on leader connection failure
try:
follower.disconnect()
except:
pass
raise RuntimeError(f"Failed to connect to leader teleoperator: {e}")
# Verify URDF is loaded
recording_state["status_message"] = "Loading URDF model for gravity compensation..."
if leader.pin_robot is None:
raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.")
# Enable gravity compensation
recording_state["status_message"] = "Enabling gravity compensation..."
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)
# Enable torque/gravity compensation based on leader type
if config.leader_type == "openarms":
# Verify URDF is loaded for OpenArms (needs gravity compensation)
recording_state["status_message"] = "Loading URDF model for gravity compensation..."
if leader.pin_robot is None:
raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.")
# Enable gravity compensation
recording_state["status_message"] = "Enabling gravity compensation..."
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)
else:
# OpenArms Mini: No gravity compensation needed (Feetech motors)
recording_state["status_message"] = "Leader ready (no gravity compensation needed)..."
robot_instances["follower"] = follower
robot_instances["leader"] = leader
robot_instances["leader_type"] = config.leader_type
def initialize_robot_systems(config: RecordingConfig):
@@ -259,15 +368,27 @@ def initialize_robot_systems(config: RecordingConfig):
cameras=camera_config,
)
# Configure leader teleoperator
# Configure leader teleoperator based on type
recording_state["status_message"] = "Configuring leader teleoperator..."
leader_config = OpenArmsLeaderConfig(
port_left=config.leader_left,
port_right=config.leader_right,
can_interface="socketcan",
id="openarms_leader",
manual_control=False,
)
if config.leader_type == "openarms_mini":
# OpenArms Mini: USB/serial ports, Feetech motors
leader_config = OpenArmsMiniConfig(
port_left=config.leader_left,
port_right=config.leader_right,
id="openarms_mini",
use_degrees=True,
)
leader = OpenArmsMini(leader_config)
else:
# OpenArms: CAN bus, Damiao motors
leader_config = OpenArmsLeaderConfig(
port_left=config.leader_left,
port_right=config.leader_right,
can_interface="socketcan",
id="openarms_leader",
manual_control=False,
)
leader = OpenArmsLeader(leader_config)
# Initialize and connect
recording_state["status_message"] = "Connecting to follower robot..."
@@ -275,19 +396,25 @@ def initialize_robot_systems(config: RecordingConfig):
follower.connect(calibrate=False) # Skip calibration in web mode
recording_state["status_message"] = "Connecting to leader teleoperator..."
leader = OpenArmsLeader(leader_config)
leader.connect(calibrate=False)
# Verify URDF is loaded
recording_state["status_message"] = "Loading URDF model for gravity compensation..."
if leader.pin_robot is None:
raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.")
# Enable gravity compensation
recording_state["status_message"] = "Enabling gravity compensation..."
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)
# Enable torque/gravity compensation based on leader type
if config.leader_type == "openarms":
# Verify URDF is loaded for OpenArms (needs gravity compensation)
recording_state["status_message"] = "Loading URDF model for gravity compensation..."
if leader.pin_robot is None:
raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.")
# Enable gravity compensation
recording_state["status_message"] = "Enabling gravity compensation..."
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)
else:
# OpenArms Mini: No gravity compensation needed (Feetech motors)
recording_state["status_message"] = "Leader ready (no gravity compensation needed)..."
robot_instances["leader_type"] = config.leader_type
# Configure dataset features
recording_state["status_message"] = "Configuring dataset features..."
@@ -369,13 +496,14 @@ def initialize_robot_systems(config: RecordingConfig):
def record_loop_with_compensation():
"""Main recording loop with compensation."""
"""Main recording loop (with or without compensation based on leader type)."""
global recording_state, stop_recording_flag
follower = robot_instances.get("follower")
leader = robot_instances.get("leader")
dataset = robot_instances.get("dataset")
dataset_features = robot_instances.get("dataset_features")
leader_type = robot_instances.get("leader_type", "openarms")
task = recording_state.get("task", "")
# Profiling: Track time spent in each operation
@@ -398,7 +526,7 @@ def record_loop_with_compensation():
print(f"[Recording] Error: Missing components")
return
print(f"[Recording] Starting recording loop with ramp-up...")
print(f"[Recording] Starting recording loop (leader: {leader_type}) with ramp-up...")
dt = 1 / FPS
loop_start_time = time.perf_counter()
@@ -466,89 +594,116 @@ def record_loop_with_compensation():
leader_action = leader.get_action()
timing_stats["get_action"].append(time.perf_counter() - t0)
# Extract positions and velocities
leader_positions_deg = {}
leader_velocities_deg_per_sec = {}
for motor in leader.bus_right.motors:
pos_key = f"right_{motor}.pos"
vel_key = f"right_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"right_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"right_{motor}"] = leader_action[vel_key]
for motor in leader.bus_left.motors:
pos_key = f"left_{motor}.pos"
vel_key = f"left_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"left_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"left_{motor}"] = leader_action[vel_key]
# Calculate gravity and friction torques
leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()}
t0 = time.perf_counter()
leader_gravity_torques_nm = leader._gravity_from_q(leader_positions_rad)
timing_stats["gravity_calc"].append(time.perf_counter() - t0)
leader_velocities_rad_per_sec = {k: np.deg2rad(v) for k, v in leader_velocities_deg_per_sec.items()}
t0 = time.perf_counter()
leader_friction_torques_nm = leader._friction_from_velocity(
leader_velocities_rad_per_sec,
friction_scale=FRICTION_SCALE
)
timing_stats["friction_calc"].append(time.perf_counter() - t0)
# Combine torques
leader_total_torques_nm = {}
for motor_name in leader_gravity_torques_nm:
gravity = leader_gravity_torques_nm.get(motor_name, 0.0)
friction = leader_friction_torques_nm.get(motor_name, 0.0)
leader_total_torques_nm[motor_name] = gravity + friction
# Apply compensation to RIGHT arm with ramp-up scaling
t0 = time.perf_counter()
for motor in leader.bus_right.motors:
full_name = f"right_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque_raw = leader_total_torques_nm.get(full_name, 0.0)
# Scale torque by ramp factor during ramp-up phase
torque = torque_raw * ramp_factor
kd = leader.get_damping_kd(motor)
# Apply leader-specific control logic
if leader_type == "openarms":
# OpenArms: Apply gravity and friction compensation
# Extract positions and velocities
leader_positions_deg = {}
leader_velocities_deg_per_sec = {}
leader.bus_right._mit_control(
motor=motor, kp=0.0, kd=kd,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque,
)
# Apply compensation to LEFT arm with ramp-up scaling
for motor in leader.bus_left.motors:
full_name = f"left_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque_raw = leader_total_torques_nm.get(full_name, 0.0)
# Scale torque by ramp factor during ramp-up phase
torque = torque_raw * ramp_factor
kd = leader.get_damping_kd(motor)
for motor in leader.bus_right.motors:
pos_key = f"right_{motor}.pos"
vel_key = f"right_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"right_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"right_{motor}"] = leader_action[vel_key]
leader.bus_left._mit_control(
motor=motor, kp=0.0, kd=kd,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque,
for motor in leader.bus_left.motors:
pos_key = f"left_{motor}.pos"
vel_key = f"left_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"left_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"left_{motor}"] = leader_action[vel_key]
# Calculate gravity and friction torques
leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()}
t0 = time.perf_counter()
leader_gravity_torques_nm = leader._gravity_from_q(leader_positions_rad)
timing_stats["gravity_calc"].append(time.perf_counter() - t0)
leader_velocities_rad_per_sec = {k: np.deg2rad(v) for k, v in leader_velocities_deg_per_sec.items()}
t0 = time.perf_counter()
leader_friction_torques_nm = leader._friction_from_velocity(
leader_velocities_rad_per_sec,
friction_scale=FRICTION_SCALE
)
timing_stats["leader_control"].append(time.perf_counter() - t0)
timing_stats["friction_calc"].append(time.perf_counter() - t0)
# Combine torques
leader_total_torques_nm = {}
for motor_name in leader_gravity_torques_nm:
gravity = leader_gravity_torques_nm.get(motor_name, 0.0)
friction = leader_friction_torques_nm.get(motor_name, 0.0)
leader_total_torques_nm[motor_name] = gravity + friction
# Apply compensation to RIGHT arm with ramp-up scaling
t0 = time.perf_counter()
for motor in leader.bus_right.motors:
full_name = f"right_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque_raw = leader_total_torques_nm.get(full_name, 0.0)
# Scale torque by ramp factor during ramp-up phase
torque = torque_raw * ramp_factor
kd = leader.get_damping_kd(motor)
leader.bus_right._mit_control(
motor=motor, kp=0.0, kd=kd,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque,
)
# Apply compensation to LEFT arm with ramp-up scaling
for motor in leader.bus_left.motors:
full_name = f"left_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque_raw = leader_total_torques_nm.get(full_name, 0.0)
# Scale torque by ramp factor during ramp-up phase
torque = torque_raw * ramp_factor
kd = leader.get_damping_kd(motor)
leader.bus_left._mit_control(
motor=motor, kp=0.0, kd=kd,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque,
)
timing_stats["leader_control"].append(time.perf_counter() - t0)
# Send positions to follower with ramped PID gains using send_action
# Build follower action from leader action
follower_action = {}
for joint in all_joints:
pos_key = f"{joint}.pos"
if pos_key in leader_action:
follower_action[pos_key] = leader_action[pos_key]
if leader_type == "openarms_mini":
# OpenArms Mini: Apply joint direction and swap mappings
for joint in all_joints:
leader_key = f"{joint}.pos"
# Determine which follower joint this leader joint controls
follower_joint = SWAPPED_JOINTS_MINI.get(joint, joint)
follower_key = f"{follower_joint}.pos"
# Get leader position (default 0 if missing)
pos = leader_action.get(leader_key, 0.0)
# Convert gripper values: Mini uses 0-100, OpenArms uses 0 to -65 degrees
if "gripper" in joint:
# Map 0-100 (Mini) to 0 to -65 (OpenArms)
# 0 (closed) -> 0°, 100 (open) -> -65°
pos = (pos / 100.0) * -65.0
else:
# Apply direction reversal if specified (non-gripper joints only)
pos *= JOINT_DIRECTION_MINI.get(joint, 1)
follower_action[follower_key] = pos
else:
# OpenArms: Direct position mapping (no joint swapping)
for joint in all_joints:
pos_key = f"{joint}.pos"
if pos_key in leader_action:
follower_action[pos_key] = leader_action[pos_key]
if follower_action:
# Build custom ramped PID gains for all motors
@@ -742,16 +897,20 @@ def cleanup_robot_systems(keep_robots=False):
try:
if not keep_robots:
if robot_instances.get("leader"):
leader_type = robot_instances.get("leader_type", "openarms")
# Only disable torque for OpenArms (with gravity compensation)
if leader_type == "openarms":
robot_instances["leader"].bus_right.disable_torque()
robot_instances["leader"].bus_left.disable_torque()
time.sleep(0.1)
robot_instances["leader"].disconnect()
robot_instances["leader"].disconnect()
if robot_instances.get("follower"):
robot_instances["follower"].disconnect()
robot_instances["follower"] = None
robot_instances["leader"] = None
robot_instances["leader_type"] = None
recording_state["robots_ready"] = False
print(f"[Cleanup] Robots disconnected")
@@ -906,39 +1065,59 @@ def do_move_to_zero(source: str = "API"):
follower = robot_instances["follower"]
leader = robot_instances.get("leader")
leader_type = robot_instances.get("leader_type", "openarms")
if leader:
print(f"[{source}] Disabling leader torque...")
# Handle leader based on type
if leader and leader_type == "openarms":
# OpenArms: Disable torque (with gravity compensation)
print(f"[{source}] Disabling leader torque (OpenArms with gravity compensation)...")
leader.bus_right.disable_torque()
leader.bus_left.disable_torque()
time.sleep(0.1)
elif leader and leader_type == "openarms_mini":
# OpenArms Mini: Send to zero position command (will be executed in parallel with follower)
print(f"[{source}] Moving OpenArms Mini leader to zero position...")
motor_index = {
"joint_1": 0, "joint_2": 1, "joint_3": 2, "joint_4": 3,
"joint_5": 4, "joint_6": 5, "joint_7": 6, "gripper": 7,
}
zero_action = {}
custom_kp = {}
custom_kd = {}
# Prepare follower zero action
follower_zero_action = {}
follower_custom_kp = {}
follower_custom_kd = {}
for motor in follower.bus_right.motors:
full_name = f"right_{motor}"
zero_action[f"{full_name}.pos"] = 0.0
follower_zero_action[f"{full_name}.pos"] = 0.0
idx = motor_index.get(motor, 0)
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
custom_kp[full_name] = kp_base * 0.05
custom_kd[full_name] = kd_base
follower_custom_kp[full_name] = kp_base * 0.05
follower_custom_kd[full_name] = kd_base
for motor in follower.bus_left.motors:
full_name = f"left_{motor}"
zero_action[f"{full_name}.pos"] = 0.0
follower_zero_action[f"{full_name}.pos"] = 0.0
idx = motor_index.get(motor, 0)
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
custom_kp[full_name] = kp_base * 0.05
custom_kd[full_name] = kd_base
follower_custom_kp[full_name] = kp_base * 0.05
follower_custom_kd[full_name] = kd_base
# Prepare leader zero action (for OpenArms Mini)
leader_zero_action = None
if leader and leader_type == "openarms_mini":
leader_zero_action = {}
# For OpenArms Mini, send all joints (including gripper) to zero
# Gripper zero = 0 (closed)
for motor in leader.bus_right.motors:
full_name = f"right_{motor}"
leader_zero_action[f"{full_name}.pos"] = 0.0
for motor in leader.bus_left.motors:
full_name = f"left_{motor}"
leader_zero_action[f"{full_name}.pos"] = 0.0
duration = 2.0
fps = 30
@@ -947,36 +1126,63 @@ def do_move_to_zero(source: str = "API"):
while time.perf_counter() - start_time < duration:
loop_start = time.perf_counter()
follower.send_action(zero_action, custom_kp=custom_kp, custom_kd=custom_kd)
# Send follower to zero
follower.send_action(follower_zero_action, custom_kp=follower_custom_kp, custom_kd=follower_custom_kd)
# Send leader (OpenArms Mini) to zero if applicable
if leader_zero_action:
# Use Feetech write commands directly (no custom gains needed)
for motor_name in leader.bus_right.motors:
full_name = f"right_{motor_name}"
pos_key = f"{full_name}.pos"
if pos_key in leader_zero_action:
# Write zero position to leader motor
leader.bus_right.write("Goal_Position", motor_name, leader_zero_action[pos_key])
for motor_name in leader.bus_left.motors:
full_name = f"left_{motor_name}"
pos_key = f"{full_name}.pos"
if pos_key in leader_zero_action:
# Write zero position to leader motor
leader.bus_left.write("Goal_Position", motor_name, leader_zero_action[pos_key])
loop_duration = time.perf_counter() - loop_start
sleep_time = dt - loop_duration
if sleep_time > 0:
time.sleep(sleep_time)
if leader:
print(f"[{source}] Re-enabling leader torque...")
# Re-enable torque only for OpenArms (with gravity compensation)
if leader and leader_type == "openarms":
print(f"[{source}] Re-enabling leader torque (OpenArms with gravity compensation)...")
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)
elif leader and leader_type == "openarms_mini":
print(f"[{source}] OpenArms Mini leader moved to zero position")
recording_state["moving_to_zero"] = False
recording_state["status_message"] = "Moved to zero position" if recording_state["robots_ready"] else "Ready"
print(f"[{source}] Move to zero complete")
return {"status": "success", "message": "Robot moved to zero position"}
print(f"[{source}] Move to zero complete (follower + leader)")
return {"status": "success", "message": "Robots moved to zero position"}
except Exception as e:
recording_state["moving_to_zero"] = False
recording_state["status_message"] = f"Error: {str(e)}"
# Try to re-enable torque on error (only for OpenArms with gravity compensation)
leader = robot_instances.get("leader")
if leader:
leader_type = robot_instances.get("leader_type", "openarms")
if leader and leader_type == "openarms":
try:
print(f"[{source}] Attempting to re-enable leader torque after error...")
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
except Exception as torque_error:
print(f"[{source}] Failed to re-enable leader torque: {torque_error}")
print(f"[{source}] Move to zero failed: {e}")
raise HTTPException(status_code=500, detail=str(e))