mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-26 05:59:52 +00:00
add openarms mini
This commit is contained in:
@@ -39,9 +39,9 @@ follower_config = OpenArmsFollowerConfig(
|
|||||||
|
|
||||||
# Configure the OpenArms Mini leader (Feetech motors on serial)
|
# Configure the OpenArms Mini leader (Feetech motors on serial)
|
||||||
leader_config = OpenArmsMiniConfig(
|
leader_config = OpenArmsMiniConfig(
|
||||||
port_right="/dev/ttyACM0", # Serial port for right arm
|
port_right="/dev/ttyACM1", # Serial port for right arm
|
||||||
port_left="/dev/ttyACM1", # Serial port for left arm
|
port_left="/dev/ttyACM0", # Serial port for left arm
|
||||||
id="openarms_minis",
|
id="openarms_mini",
|
||||||
use_degrees=True,
|
use_degrees=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -131,8 +131,6 @@ try:
|
|||||||
# Determine which follower joint this leader joint controls
|
# Determine which follower joint this leader joint controls
|
||||||
follower_joint = SWAPPED_JOINTS.get(joint, joint)
|
follower_joint = SWAPPED_JOINTS.get(joint, joint)
|
||||||
follower_key = f"{follower_joint}.pos"
|
follower_key = f"{follower_joint}.pos"
|
||||||
if "left" in follower_key:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Get leader position (default 0 if missing)
|
# Get leader position (default 0 if missing)
|
||||||
pos = leader_action.get(leader_key, 0.0)
|
pos = leader_action.get(leader_key, 0.0)
|
||||||
@@ -149,7 +147,7 @@ try:
|
|||||||
# Store in action dict for follower
|
# Store in action dict for follower
|
||||||
joint_action[follower_key] = pos
|
joint_action[follower_key] = pos
|
||||||
|
|
||||||
#follower.send_action(joint_action)
|
follower.send_action(joint_action)
|
||||||
|
|
||||||
# Loop timing
|
# Loop timing
|
||||||
loop_end = time.perf_counter()
|
loop_end = time.perf_counter()
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ function App() {
|
|||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
const [config, setConfig] = useState({
|
const [config, setConfig] = useState({
|
||||||
|
leader_type: 'openarms', // 'openarms' or 'openarms_mini'
|
||||||
leader_left: 'can0',
|
leader_left: 'can0',
|
||||||
leader_right: 'can1',
|
leader_right: 'can1',
|
||||||
follower_left: 'can2',
|
follower_left: 'can2',
|
||||||
@@ -35,6 +36,7 @@ function App() {
|
|||||||
|
|
||||||
// Available options
|
// Available options
|
||||||
const [availableCameras, setAvailableCameras] = useState([]);
|
const [availableCameras, setAvailableCameras] = useState([]);
|
||||||
|
const [availableUsbPorts, setAvailableUsbPorts] = useState([]);
|
||||||
const canInterfaces = ['can0', 'can1', 'can2', 'can3'];
|
const canInterfaces = ['can0', 'can1', 'can2', 'can3'];
|
||||||
|
|
||||||
const statusIntervalRef = useRef(null);
|
const statusIntervalRef = useRef(null);
|
||||||
@@ -192,6 +194,43 @@ function App() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Discover USB ports
|
||||||
|
const discoverUsbPorts = async () => {
|
||||||
|
try {
|
||||||
|
const response = await fetch(`${API_BASE}/usb/discover`);
|
||||||
|
const data = await response.json();
|
||||||
|
const ports = data.ports || [];
|
||||||
|
setAvailableUsbPorts(ports);
|
||||||
|
|
||||||
|
// Auto-fix config if OpenArms Mini is selected and ports are invalid
|
||||||
|
if (config.leader_type === 'openarms_mini') {
|
||||||
|
const updated = { ...config };
|
||||||
|
let changed = false;
|
||||||
|
|
||||||
|
if (ports.length >= 1 && !ports.includes(config.leader_left)) {
|
||||||
|
updated.leader_left = ports[0];
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ports.length >= 2 && !ports.includes(config.leader_right)) {
|
||||||
|
updated.leader_right = ports[1];
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (changed) {
|
||||||
|
setConfig(updated);
|
||||||
|
saveConfig(updated);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ports.length === 0) {
|
||||||
|
console.warn('No USB ports detected for OpenArms Mini');
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
console.error('Failed to discover USB ports:', e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Set task only (for pedal use)
|
// Set task only (for pedal use)
|
||||||
const setTaskOnly = async () => {
|
const setTaskOnly = async () => {
|
||||||
if (!task.trim()) {
|
if (!task.trim()) {
|
||||||
@@ -324,6 +363,7 @@ function App() {
|
|||||||
|
|
||||||
loadConfig();
|
loadConfig();
|
||||||
discoverCameras();
|
discoverCameras();
|
||||||
|
discoverUsbPorts();
|
||||||
fetchStatus();
|
fetchStatus();
|
||||||
statusIntervalRef.current = setInterval(fetchStatus, 1000);
|
statusIntervalRef.current = setInterval(fetchStatus, 1000);
|
||||||
|
|
||||||
@@ -335,6 +375,14 @@ function App() {
|
|||||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||||
}, []); // Run only once on mount
|
}, []); // Run only once on mount
|
||||||
|
|
||||||
|
// Discover USB ports when leader type changes to Mini
|
||||||
|
useEffect(() => {
|
||||||
|
if (config.leader_type === 'openarms_mini') {
|
||||||
|
discoverUsbPorts();
|
||||||
|
}
|
||||||
|
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||||
|
}, [config.leader_type]);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<main>
|
<main>
|
||||||
<header>
|
<header>
|
||||||
@@ -385,9 +433,42 @@ function App() {
|
|||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{/* CAN Interfaces */}
|
{/* Leader Type Selection */}
|
||||||
<div className="config-section">
|
<div className="config-section">
|
||||||
<h3>CAN Interfaces</h3>
|
<h3>🎮 Leader Type</h3>
|
||||||
|
<div className="config-grid">
|
||||||
|
<label style={{gridColumn: '1 / -1'}}>
|
||||||
|
Leader Arm Type
|
||||||
|
<select
|
||||||
|
value={config.leader_type}
|
||||||
|
onChange={(e) => updateConfig('leader_type', e.target.value)}
|
||||||
|
disabled={isRecording || robotsReady}
|
||||||
|
>
|
||||||
|
<option value="openarms">OpenArms (CAN Bus - Damiao Motors)</option>
|
||||||
|
<option value="openarms_mini">OpenArms Mini (USB - Feetech Motors)</option>
|
||||||
|
</select>
|
||||||
|
</label>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
{/* Leader Interfaces (CAN or USB based on type) */}
|
||||||
|
<div className="config-section">
|
||||||
|
<div style={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center', marginBottom: '0.5rem' }}>
|
||||||
|
<h3>
|
||||||
|
{config.leader_type === 'openarms_mini'
|
||||||
|
? `Leader Ports (USB) ${availableUsbPorts.length > 0 ? `(${availableUsbPorts.length} detected)` : ''}`
|
||||||
|
: 'Leader Interfaces (CAN)'}
|
||||||
|
</h3>
|
||||||
|
{config.leader_type === 'openarms_mini' && (
|
||||||
|
<button
|
||||||
|
onClick={discoverUsbPorts}
|
||||||
|
className="btn-refresh"
|
||||||
|
disabled={isRecording || robotsReady}
|
||||||
|
>
|
||||||
|
🔄 Refresh
|
||||||
|
</button>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
<div className="config-grid">
|
<div className="config-grid">
|
||||||
<label>
|
<label>
|
||||||
Leader Left
|
Leader Left
|
||||||
@@ -396,9 +477,19 @@ function App() {
|
|||||||
onChange={(e) => updateConfig('leader_left', e.target.value)}
|
onChange={(e) => updateConfig('leader_left', e.target.value)}
|
||||||
disabled={isRecording || robotsReady}
|
disabled={isRecording || robotsReady}
|
||||||
>
|
>
|
||||||
{canInterfaces.map((iface) => (
|
{config.leader_type === 'openarms_mini' ? (
|
||||||
<option key={iface} value={iface}>{iface}</option>
|
availableUsbPorts.length > 0 ? (
|
||||||
))}
|
availableUsbPorts.map((port) => (
|
||||||
|
<option key={port} value={port}>{port}</option>
|
||||||
|
))
|
||||||
|
) : (
|
||||||
|
<option value="">No USB ports detected</option>
|
||||||
|
)
|
||||||
|
) : (
|
||||||
|
canInterfaces.map((iface) => (
|
||||||
|
<option key={iface} value={iface}>{iface}</option>
|
||||||
|
))
|
||||||
|
)}
|
||||||
</select>
|
</select>
|
||||||
</label>
|
</label>
|
||||||
|
|
||||||
@@ -409,12 +500,28 @@ function App() {
|
|||||||
onChange={(e) => updateConfig('leader_right', e.target.value)}
|
onChange={(e) => updateConfig('leader_right', e.target.value)}
|
||||||
disabled={isRecording || robotsReady}
|
disabled={isRecording || robotsReady}
|
||||||
>
|
>
|
||||||
{canInterfaces.map((iface) => (
|
{config.leader_type === 'openarms_mini' ? (
|
||||||
<option key={iface} value={iface}>{iface}</option>
|
availableUsbPorts.length > 0 ? (
|
||||||
))}
|
availableUsbPorts.map((port) => (
|
||||||
|
<option key={port} value={port}>{port}</option>
|
||||||
|
))
|
||||||
|
) : (
|
||||||
|
<option value="">No USB ports detected</option>
|
||||||
|
)
|
||||||
|
) : (
|
||||||
|
canInterfaces.map((iface) => (
|
||||||
|
<option key={iface} value={iface}>{iface}</option>
|
||||||
|
))
|
||||||
|
)}
|
||||||
</select>
|
</select>
|
||||||
</label>
|
</label>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
{/* Follower CAN Interfaces */}
|
||||||
|
<div className="config-section">
|
||||||
|
<h3>Follower Interfaces (CAN)</h3>
|
||||||
|
<div className="config-grid">
|
||||||
<label>
|
<label>
|
||||||
Follower Left
|
Follower Left
|
||||||
<select
|
<select
|
||||||
@@ -627,9 +734,9 @@ function App() {
|
|||||||
onClick={moveToZero}
|
onClick={moveToZero}
|
||||||
disabled={movingToZero}
|
disabled={movingToZero}
|
||||||
className="btn-zero-large"
|
className="btn-zero-large"
|
||||||
title="Move follower robot to zero position (2s with 60% gains)"
|
title="Move both leader and follower robots to zero position (2s)"
|
||||||
>
|
>
|
||||||
{movingToZero ? '⏳ Moving to Zero Position...' : '🎯 Move to Zero Position'}
|
{movingToZero ? '⏳ Moving to Zero Position...' : '🎯 Move to Zero Position (Leader + Follower)'}
|
||||||
</button>
|
</button>
|
||||||
</div>
|
</div>
|
||||||
)}
|
)}
|
||||||
|
|||||||
@@ -37,6 +37,8 @@ from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerCon
|
|||||||
from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
|
from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
|
||||||
from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig
|
from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig
|
||||||
from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader
|
from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader
|
||||||
|
from lerobot.teleoperators.openarms_mini.config_openarms_mini import OpenArmsMiniConfig
|
||||||
|
from lerobot.teleoperators.openarms_mini.openarms_mini import OpenArmsMini
|
||||||
|
|
||||||
app = FastAPI(title="OpenArms Recording Server")
|
app = FastAPI(title="OpenArms Recording Server")
|
||||||
|
|
||||||
@@ -68,6 +70,7 @@ recording_state = {
|
|||||||
"recording_started_time": None, # Time when actual recording starts (after ramp-up)
|
"recording_started_time": None, # Time when actual recording starts (after ramp-up)
|
||||||
"moving_to_zero": False, # Whether robot is moving to zero position
|
"moving_to_zero": False, # Whether robot is moving to zero position
|
||||||
"config": {
|
"config": {
|
||||||
|
"leader_type": "openarms", # "openarms" or "openarms_mini"
|
||||||
"leader_left": "can0",
|
"leader_left": "can0",
|
||||||
"leader_right": "can1",
|
"leader_right": "can1",
|
||||||
"follower_left": "can2",
|
"follower_left": "can2",
|
||||||
@@ -108,6 +111,28 @@ FPS = 30
|
|||||||
FRICTION_SCALE = 1.0
|
FRICTION_SCALE = 1.0
|
||||||
RAMP_UP_DURATION = 3.0 # Seconds to ramp up PID/torque from 0 to full
|
RAMP_UP_DURATION = 3.0 # Seconds to ramp up PID/torque from 0 to full
|
||||||
|
|
||||||
|
# OpenArms Mini joint mappings (from teleop_openarms_mini.py)
|
||||||
|
JOINT_DIRECTION_MINI = {
|
||||||
|
"right_joint_1": -1,
|
||||||
|
"right_joint_2": -1,
|
||||||
|
"right_joint_3": -1,
|
||||||
|
"right_joint_4": -1,
|
||||||
|
"right_joint_5": -1,
|
||||||
|
"left_joint_1": -1,
|
||||||
|
"left_joint_3": -1,
|
||||||
|
"left_joint_4": -1,
|
||||||
|
"left_joint_5": -1,
|
||||||
|
"left_joint_6": -1,
|
||||||
|
"left_joint_7": -1,
|
||||||
|
}
|
||||||
|
|
||||||
|
SWAPPED_JOINTS_MINI = {
|
||||||
|
"right_joint_6": "right_joint_7",
|
||||||
|
"right_joint_7": "right_joint_6",
|
||||||
|
"left_joint_6": "left_joint_7",
|
||||||
|
"left_joint_7": "left_joint_6",
|
||||||
|
}
|
||||||
|
|
||||||
# Pedal configuration
|
# Pedal configuration
|
||||||
PEDAL_DEVICE = "/dev/input/by-id/usb-PCsensor_FootSwitch-event-kbd"
|
PEDAL_DEVICE = "/dev/input/by-id/usb-PCsensor_FootSwitch-event-kbd"
|
||||||
PEDAL_ENABLED = True # Set to False to disable pedal
|
PEDAL_ENABLED = True # Set to False to disable pedal
|
||||||
@@ -118,6 +143,7 @@ pedal_action_lock = threading.Lock() # Prevent concurrent pedal actions
|
|||||||
|
|
||||||
class RecordingConfig(BaseModel):
|
class RecordingConfig(BaseModel):
|
||||||
task: str
|
task: str
|
||||||
|
leader_type: str # "openarms" or "openarms_mini"
|
||||||
leader_left: str
|
leader_left: str
|
||||||
leader_right: str
|
leader_right: str
|
||||||
follower_left: str
|
follower_left: str
|
||||||
@@ -129,6 +155,7 @@ class RecordingConfig(BaseModel):
|
|||||||
|
|
||||||
class RobotSetupConfig(BaseModel):
|
class RobotSetupConfig(BaseModel):
|
||||||
"""Configuration for robot setup (no task required)."""
|
"""Configuration for robot setup (no task required)."""
|
||||||
|
leader_type: str # "openarms" or "openarms_mini"
|
||||||
leader_left: str
|
leader_left: str
|
||||||
leader_right: str
|
leader_right: str
|
||||||
follower_left: str
|
follower_left: str
|
||||||
@@ -154,6 +181,31 @@ def discover_cameras_sync():
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def discover_usb_ports_sync():
|
||||||
|
"""Discover available USB/serial ports for OpenArms Mini."""
|
||||||
|
try:
|
||||||
|
import glob
|
||||||
|
import serial
|
||||||
|
# Find all USB serial devices
|
||||||
|
ports = []
|
||||||
|
for pattern in ["/dev/ttyUSB*", "/dev/ttyACM*"]:
|
||||||
|
for port in glob.glob(pattern):
|
||||||
|
# Check if port is accessible
|
||||||
|
try:
|
||||||
|
# Try to open and close the port to verify access
|
||||||
|
ser = serial.Serial(port, timeout=0.1)
|
||||||
|
ser.close()
|
||||||
|
ports.append(port)
|
||||||
|
except (OSError, serial.SerialException) as e:
|
||||||
|
print(f"[USB] Port {port} exists but not accessible: {e}")
|
||||||
|
ports.sort()
|
||||||
|
print(f"[USB] Found {len(ports)} accessible USB port(s): {ports}")
|
||||||
|
return ports
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[USB] Error: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/cameras/discover")
|
@app.get("/api/cameras/discover")
|
||||||
async def discover_cameras():
|
async def discover_cameras():
|
||||||
"""Discover available cameras."""
|
"""Discover available cameras."""
|
||||||
@@ -161,6 +213,30 @@ async def discover_cameras():
|
|||||||
return {"cameras": cameras}
|
return {"cameras": cameras}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/usb/discover")
|
||||||
|
async def discover_usb_ports():
|
||||||
|
"""Discover available USB/serial ports."""
|
||||||
|
ports = discover_usb_ports_sync()
|
||||||
|
return {"ports": ports}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/usb/test/{port:path}")
|
||||||
|
async def test_usb_port(port: str):
|
||||||
|
"""Test if a USB port is accessible."""
|
||||||
|
try:
|
||||||
|
import serial
|
||||||
|
print(f"[USB Test] Testing port: {port}")
|
||||||
|
ser = serial.Serial(port, baudrate=1000000, timeout=0.1)
|
||||||
|
ser.close()
|
||||||
|
return {"status": "success", "message": f"Port {port} is accessible"}
|
||||||
|
except PermissionError as e:
|
||||||
|
return {"status": "error", "message": f"Permission denied: {e}"}
|
||||||
|
except serial.SerialException as e:
|
||||||
|
return {"status": "error", "message": f"Serial error: {e}"}
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "message": f"Error: {e}"}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/can/interfaces")
|
@app.get("/api/can/interfaces")
|
||||||
async def get_can_interfaces():
|
async def get_can_interfaces():
|
||||||
"""Get available CAN interfaces."""
|
"""Get available CAN interfaces."""
|
||||||
@@ -171,7 +247,9 @@ def initialize_robots_only(config: RobotSetupConfig):
|
|||||||
"""Initialize robots only (no dataset) for pre-setup."""
|
"""Initialize robots only (no dataset) for pre-setup."""
|
||||||
global robot_instances, recording_state
|
global robot_instances, recording_state
|
||||||
|
|
||||||
print(f"[Setup] Initializing robots...")
|
print(f"[Setup] Initializing robots with leader type: {config.leader_type}")
|
||||||
|
print(f"[Setup] Leader ports: left={config.leader_left}, right={config.leader_right}")
|
||||||
|
print(f"[Setup] Follower ports: left={config.follower_left}, right={config.follower_right}")
|
||||||
|
|
||||||
# Update status: Configuring cameras
|
# Update status: Configuring cameras
|
||||||
recording_state["status_message"] = "Configuring cameras..."
|
recording_state["status_message"] = "Configuring cameras..."
|
||||||
@@ -183,6 +261,7 @@ def initialize_robots_only(config: RobotSetupConfig):
|
|||||||
|
|
||||||
# Configure follower robot with cameras
|
# Configure follower robot with cameras
|
||||||
recording_state["status_message"] = "Configuring follower robot..."
|
recording_state["status_message"] = "Configuring follower robot..."
|
||||||
|
print(f"[Setup] Configuring follower with CAN: {config.follower_left}, {config.follower_right}")
|
||||||
follower_config = OpenArmsFollowerConfig(
|
follower_config = OpenArmsFollowerConfig(
|
||||||
port_left=config.follower_left,
|
port_left=config.follower_left,
|
||||||
port_right=config.follower_right,
|
port_right=config.follower_right,
|
||||||
@@ -193,38 +272,68 @@ def initialize_robots_only(config: RobotSetupConfig):
|
|||||||
cameras=camera_config,
|
cameras=camera_config,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Configure leader teleoperator
|
# Configure leader teleoperator based on type
|
||||||
recording_state["status_message"] = "Configuring leader teleoperator..."
|
recording_state["status_message"] = "Configuring leader teleoperator..."
|
||||||
leader_config = OpenArmsLeaderConfig(
|
if config.leader_type == "openarms_mini":
|
||||||
port_left=config.leader_left,
|
# OpenArms Mini: USB/serial ports, Feetech motors
|
||||||
port_right=config.leader_right,
|
leader_config = OpenArmsMiniConfig(
|
||||||
can_interface="socketcan",
|
port_left=config.leader_left,
|
||||||
id="openarms_leader",
|
port_right=config.leader_right,
|
||||||
manual_control=False,
|
id="openarms_mini",
|
||||||
)
|
use_degrees=True,
|
||||||
|
)
|
||||||
|
leader = OpenArmsMini(leader_config)
|
||||||
|
else:
|
||||||
|
# OpenArms: CAN bus, Damiao motors
|
||||||
|
leader_config = OpenArmsLeaderConfig(
|
||||||
|
port_left=config.leader_left,
|
||||||
|
port_right=config.leader_right,
|
||||||
|
can_interface="socketcan",
|
||||||
|
id="openarms_leader",
|
||||||
|
manual_control=False,
|
||||||
|
)
|
||||||
|
leader = OpenArmsLeader(leader_config)
|
||||||
|
|
||||||
# Initialize and connect
|
# Initialize and connect
|
||||||
recording_state["status_message"] = "Connecting to follower robot..."
|
recording_state["status_message"] = "Connecting to follower robot..."
|
||||||
follower = OpenArmsFollower(follower_config)
|
follower = OpenArmsFollower(follower_config)
|
||||||
follower.connect(calibrate=False) # Skip calibration in web mode
|
try:
|
||||||
|
follower.connect(calibrate=False) # Skip calibration in web mode
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[Setup] Follower connection error: {e}")
|
||||||
|
raise RuntimeError(f"Failed to connect to follower robot: {e}")
|
||||||
|
|
||||||
recording_state["status_message"] = "Connecting to leader teleoperator..."
|
recording_state["status_message"] = "Connecting to leader teleoperator..."
|
||||||
leader = OpenArmsLeader(leader_config)
|
try:
|
||||||
leader.connect(calibrate=False)
|
leader.connect(calibrate=False)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[Setup] Leader connection error: {e}")
|
||||||
|
# Disconnect follower on leader connection failure
|
||||||
|
try:
|
||||||
|
follower.disconnect()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
raise RuntimeError(f"Failed to connect to leader teleoperator: {e}")
|
||||||
|
|
||||||
# Verify URDF is loaded
|
# Enable torque/gravity compensation based on leader type
|
||||||
recording_state["status_message"] = "Loading URDF model for gravity compensation..."
|
if config.leader_type == "openarms":
|
||||||
if leader.pin_robot is None:
|
# Verify URDF is loaded for OpenArms (needs gravity compensation)
|
||||||
raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.")
|
recording_state["status_message"] = "Loading URDF model for gravity compensation..."
|
||||||
|
if leader.pin_robot is None:
|
||||||
|
raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.")
|
||||||
|
|
||||||
# Enable gravity compensation
|
# Enable gravity compensation
|
||||||
recording_state["status_message"] = "Enabling gravity compensation..."
|
recording_state["status_message"] = "Enabling gravity compensation..."
|
||||||
leader.bus_right.enable_torque()
|
leader.bus_right.enable_torque()
|
||||||
leader.bus_left.enable_torque()
|
leader.bus_left.enable_torque()
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
else:
|
||||||
|
# OpenArms Mini: No gravity compensation needed (Feetech motors)
|
||||||
|
recording_state["status_message"] = "Leader ready (no gravity compensation needed)..."
|
||||||
|
|
||||||
robot_instances["follower"] = follower
|
robot_instances["follower"] = follower
|
||||||
robot_instances["leader"] = leader
|
robot_instances["leader"] = leader
|
||||||
|
robot_instances["leader_type"] = config.leader_type
|
||||||
|
|
||||||
|
|
||||||
def initialize_robot_systems(config: RecordingConfig):
|
def initialize_robot_systems(config: RecordingConfig):
|
||||||
@@ -259,15 +368,27 @@ def initialize_robot_systems(config: RecordingConfig):
|
|||||||
cameras=camera_config,
|
cameras=camera_config,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Configure leader teleoperator
|
# Configure leader teleoperator based on type
|
||||||
recording_state["status_message"] = "Configuring leader teleoperator..."
|
recording_state["status_message"] = "Configuring leader teleoperator..."
|
||||||
leader_config = OpenArmsLeaderConfig(
|
if config.leader_type == "openarms_mini":
|
||||||
port_left=config.leader_left,
|
# OpenArms Mini: USB/serial ports, Feetech motors
|
||||||
port_right=config.leader_right,
|
leader_config = OpenArmsMiniConfig(
|
||||||
can_interface="socketcan",
|
port_left=config.leader_left,
|
||||||
id="openarms_leader",
|
port_right=config.leader_right,
|
||||||
manual_control=False,
|
id="openarms_mini",
|
||||||
)
|
use_degrees=True,
|
||||||
|
)
|
||||||
|
leader = OpenArmsMini(leader_config)
|
||||||
|
else:
|
||||||
|
# OpenArms: CAN bus, Damiao motors
|
||||||
|
leader_config = OpenArmsLeaderConfig(
|
||||||
|
port_left=config.leader_left,
|
||||||
|
port_right=config.leader_right,
|
||||||
|
can_interface="socketcan",
|
||||||
|
id="openarms_leader",
|
||||||
|
manual_control=False,
|
||||||
|
)
|
||||||
|
leader = OpenArmsLeader(leader_config)
|
||||||
|
|
||||||
# Initialize and connect
|
# Initialize and connect
|
||||||
recording_state["status_message"] = "Connecting to follower robot..."
|
recording_state["status_message"] = "Connecting to follower robot..."
|
||||||
@@ -275,19 +396,25 @@ def initialize_robot_systems(config: RecordingConfig):
|
|||||||
follower.connect(calibrate=False) # Skip calibration in web mode
|
follower.connect(calibrate=False) # Skip calibration in web mode
|
||||||
|
|
||||||
recording_state["status_message"] = "Connecting to leader teleoperator..."
|
recording_state["status_message"] = "Connecting to leader teleoperator..."
|
||||||
leader = OpenArmsLeader(leader_config)
|
|
||||||
leader.connect(calibrate=False)
|
leader.connect(calibrate=False)
|
||||||
|
|
||||||
# Verify URDF is loaded
|
# Enable torque/gravity compensation based on leader type
|
||||||
recording_state["status_message"] = "Loading URDF model for gravity compensation..."
|
if config.leader_type == "openarms":
|
||||||
if leader.pin_robot is None:
|
# Verify URDF is loaded for OpenArms (needs gravity compensation)
|
||||||
raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.")
|
recording_state["status_message"] = "Loading URDF model for gravity compensation..."
|
||||||
|
if leader.pin_robot is None:
|
||||||
|
raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.")
|
||||||
|
|
||||||
# Enable gravity compensation
|
# Enable gravity compensation
|
||||||
recording_state["status_message"] = "Enabling gravity compensation..."
|
recording_state["status_message"] = "Enabling gravity compensation..."
|
||||||
leader.bus_right.enable_torque()
|
leader.bus_right.enable_torque()
|
||||||
leader.bus_left.enable_torque()
|
leader.bus_left.enable_torque()
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
else:
|
||||||
|
# OpenArms Mini: No gravity compensation needed (Feetech motors)
|
||||||
|
recording_state["status_message"] = "Leader ready (no gravity compensation needed)..."
|
||||||
|
|
||||||
|
robot_instances["leader_type"] = config.leader_type
|
||||||
|
|
||||||
# Configure dataset features
|
# Configure dataset features
|
||||||
recording_state["status_message"] = "Configuring dataset features..."
|
recording_state["status_message"] = "Configuring dataset features..."
|
||||||
@@ -369,13 +496,14 @@ def initialize_robot_systems(config: RecordingConfig):
|
|||||||
|
|
||||||
|
|
||||||
def record_loop_with_compensation():
|
def record_loop_with_compensation():
|
||||||
"""Main recording loop with compensation."""
|
"""Main recording loop (with or without compensation based on leader type)."""
|
||||||
global recording_state, stop_recording_flag
|
global recording_state, stop_recording_flag
|
||||||
|
|
||||||
follower = robot_instances.get("follower")
|
follower = robot_instances.get("follower")
|
||||||
leader = robot_instances.get("leader")
|
leader = robot_instances.get("leader")
|
||||||
dataset = robot_instances.get("dataset")
|
dataset = robot_instances.get("dataset")
|
||||||
dataset_features = robot_instances.get("dataset_features")
|
dataset_features = robot_instances.get("dataset_features")
|
||||||
|
leader_type = robot_instances.get("leader_type", "openarms")
|
||||||
task = recording_state.get("task", "")
|
task = recording_state.get("task", "")
|
||||||
|
|
||||||
# Profiling: Track time spent in each operation
|
# Profiling: Track time spent in each operation
|
||||||
@@ -398,7 +526,7 @@ def record_loop_with_compensation():
|
|||||||
print(f"[Recording] Error: Missing components")
|
print(f"[Recording] Error: Missing components")
|
||||||
return
|
return
|
||||||
|
|
||||||
print(f"[Recording] Starting recording loop with ramp-up...")
|
print(f"[Recording] Starting recording loop (leader: {leader_type}) with ramp-up...")
|
||||||
|
|
||||||
dt = 1 / FPS
|
dt = 1 / FPS
|
||||||
loop_start_time = time.perf_counter()
|
loop_start_time = time.perf_counter()
|
||||||
@@ -466,89 +594,116 @@ def record_loop_with_compensation():
|
|||||||
leader_action = leader.get_action()
|
leader_action = leader.get_action()
|
||||||
timing_stats["get_action"].append(time.perf_counter() - t0)
|
timing_stats["get_action"].append(time.perf_counter() - t0)
|
||||||
|
|
||||||
# Extract positions and velocities
|
# Apply leader-specific control logic
|
||||||
leader_positions_deg = {}
|
if leader_type == "openarms":
|
||||||
leader_velocities_deg_per_sec = {}
|
# OpenArms: Apply gravity and friction compensation
|
||||||
|
# Extract positions and velocities
|
||||||
|
leader_positions_deg = {}
|
||||||
|
leader_velocities_deg_per_sec = {}
|
||||||
|
|
||||||
for motor in leader.bus_right.motors:
|
for motor in leader.bus_right.motors:
|
||||||
pos_key = f"right_{motor}.pos"
|
pos_key = f"right_{motor}.pos"
|
||||||
vel_key = f"right_{motor}.vel"
|
vel_key = f"right_{motor}.vel"
|
||||||
if pos_key in leader_action:
|
if pos_key in leader_action:
|
||||||
leader_positions_deg[f"right_{motor}"] = leader_action[pos_key]
|
leader_positions_deg[f"right_{motor}"] = leader_action[pos_key]
|
||||||
if vel_key in leader_action:
|
if vel_key in leader_action:
|
||||||
leader_velocities_deg_per_sec[f"right_{motor}"] = leader_action[vel_key]
|
leader_velocities_deg_per_sec[f"right_{motor}"] = leader_action[vel_key]
|
||||||
|
|
||||||
for motor in leader.bus_left.motors:
|
for motor in leader.bus_left.motors:
|
||||||
pos_key = f"left_{motor}.pos"
|
pos_key = f"left_{motor}.pos"
|
||||||
vel_key = f"left_{motor}.vel"
|
vel_key = f"left_{motor}.vel"
|
||||||
if pos_key in leader_action:
|
if pos_key in leader_action:
|
||||||
leader_positions_deg[f"left_{motor}"] = leader_action[pos_key]
|
leader_positions_deg[f"left_{motor}"] = leader_action[pos_key]
|
||||||
if vel_key in leader_action:
|
if vel_key in leader_action:
|
||||||
leader_velocities_deg_per_sec[f"left_{motor}"] = leader_action[vel_key]
|
leader_velocities_deg_per_sec[f"left_{motor}"] = leader_action[vel_key]
|
||||||
|
|
||||||
# Calculate gravity and friction torques
|
# Calculate gravity and friction torques
|
||||||
leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()}
|
leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()}
|
||||||
|
|
||||||
t0 = time.perf_counter()
|
t0 = time.perf_counter()
|
||||||
leader_gravity_torques_nm = leader._gravity_from_q(leader_positions_rad)
|
leader_gravity_torques_nm = leader._gravity_from_q(leader_positions_rad)
|
||||||
timing_stats["gravity_calc"].append(time.perf_counter() - t0)
|
timing_stats["gravity_calc"].append(time.perf_counter() - t0)
|
||||||
|
|
||||||
leader_velocities_rad_per_sec = {k: np.deg2rad(v) for k, v in leader_velocities_deg_per_sec.items()}
|
leader_velocities_rad_per_sec = {k: np.deg2rad(v) for k, v in leader_velocities_deg_per_sec.items()}
|
||||||
|
|
||||||
t0 = time.perf_counter()
|
t0 = time.perf_counter()
|
||||||
leader_friction_torques_nm = leader._friction_from_velocity(
|
leader_friction_torques_nm = leader._friction_from_velocity(
|
||||||
leader_velocities_rad_per_sec,
|
leader_velocities_rad_per_sec,
|
||||||
friction_scale=FRICTION_SCALE
|
friction_scale=FRICTION_SCALE
|
||||||
)
|
|
||||||
timing_stats["friction_calc"].append(time.perf_counter() - t0)
|
|
||||||
|
|
||||||
# Combine torques
|
|
||||||
leader_total_torques_nm = {}
|
|
||||||
for motor_name in leader_gravity_torques_nm:
|
|
||||||
gravity = leader_gravity_torques_nm.get(motor_name, 0.0)
|
|
||||||
friction = leader_friction_torques_nm.get(motor_name, 0.0)
|
|
||||||
leader_total_torques_nm[motor_name] = gravity + friction
|
|
||||||
|
|
||||||
# Apply compensation to RIGHT arm with ramp-up scaling
|
|
||||||
t0 = time.perf_counter()
|
|
||||||
for motor in leader.bus_right.motors:
|
|
||||||
full_name = f"right_{motor}"
|
|
||||||
position = leader_positions_deg.get(full_name, 0.0)
|
|
||||||
torque_raw = leader_total_torques_nm.get(full_name, 0.0)
|
|
||||||
# Scale torque by ramp factor during ramp-up phase
|
|
||||||
torque = torque_raw * ramp_factor
|
|
||||||
kd = leader.get_damping_kd(motor)
|
|
||||||
|
|
||||||
leader.bus_right._mit_control(
|
|
||||||
motor=motor, kp=0.0, kd=kd,
|
|
||||||
position_degrees=position,
|
|
||||||
velocity_deg_per_sec=0.0,
|
|
||||||
torque=torque,
|
|
||||||
)
|
)
|
||||||
|
timing_stats["friction_calc"].append(time.perf_counter() - t0)
|
||||||
|
|
||||||
# Apply compensation to LEFT arm with ramp-up scaling
|
# Combine torques
|
||||||
for motor in leader.bus_left.motors:
|
leader_total_torques_nm = {}
|
||||||
full_name = f"left_{motor}"
|
for motor_name in leader_gravity_torques_nm:
|
||||||
position = leader_positions_deg.get(full_name, 0.0)
|
gravity = leader_gravity_torques_nm.get(motor_name, 0.0)
|
||||||
torque_raw = leader_total_torques_nm.get(full_name, 0.0)
|
friction = leader_friction_torques_nm.get(motor_name, 0.0)
|
||||||
# Scale torque by ramp factor during ramp-up phase
|
leader_total_torques_nm[motor_name] = gravity + friction
|
||||||
torque = torque_raw * ramp_factor
|
|
||||||
kd = leader.get_damping_kd(motor)
|
|
||||||
|
|
||||||
leader.bus_left._mit_control(
|
# Apply compensation to RIGHT arm with ramp-up scaling
|
||||||
motor=motor, kp=0.0, kd=kd,
|
t0 = time.perf_counter()
|
||||||
position_degrees=position,
|
for motor in leader.bus_right.motors:
|
||||||
velocity_deg_per_sec=0.0,
|
full_name = f"right_{motor}"
|
||||||
torque=torque,
|
position = leader_positions_deg.get(full_name, 0.0)
|
||||||
)
|
torque_raw = leader_total_torques_nm.get(full_name, 0.0)
|
||||||
timing_stats["leader_control"].append(time.perf_counter() - t0)
|
# Scale torque by ramp factor during ramp-up phase
|
||||||
|
torque = torque_raw * ramp_factor
|
||||||
|
kd = leader.get_damping_kd(motor)
|
||||||
|
|
||||||
# Send positions to follower with ramped PID gains using send_action
|
leader.bus_right._mit_control(
|
||||||
|
motor=motor, kp=0.0, kd=kd,
|
||||||
|
position_degrees=position,
|
||||||
|
velocity_deg_per_sec=0.0,
|
||||||
|
torque=torque,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Apply compensation to LEFT arm with ramp-up scaling
|
||||||
|
for motor in leader.bus_left.motors:
|
||||||
|
full_name = f"left_{motor}"
|
||||||
|
position = leader_positions_deg.get(full_name, 0.0)
|
||||||
|
torque_raw = leader_total_torques_nm.get(full_name, 0.0)
|
||||||
|
# Scale torque by ramp factor during ramp-up phase
|
||||||
|
torque = torque_raw * ramp_factor
|
||||||
|
kd = leader.get_damping_kd(motor)
|
||||||
|
|
||||||
|
leader.bus_left._mit_control(
|
||||||
|
motor=motor, kp=0.0, kd=kd,
|
||||||
|
position_degrees=position,
|
||||||
|
velocity_deg_per_sec=0.0,
|
||||||
|
torque=torque,
|
||||||
|
)
|
||||||
|
timing_stats["leader_control"].append(time.perf_counter() - t0)
|
||||||
|
|
||||||
|
# Build follower action from leader action
|
||||||
follower_action = {}
|
follower_action = {}
|
||||||
for joint in all_joints:
|
if leader_type == "openarms_mini":
|
||||||
pos_key = f"{joint}.pos"
|
# OpenArms Mini: Apply joint direction and swap mappings
|
||||||
if pos_key in leader_action:
|
for joint in all_joints:
|
||||||
follower_action[pos_key] = leader_action[pos_key]
|
leader_key = f"{joint}.pos"
|
||||||
|
|
||||||
|
# Determine which follower joint this leader joint controls
|
||||||
|
follower_joint = SWAPPED_JOINTS_MINI.get(joint, joint)
|
||||||
|
follower_key = f"{follower_joint}.pos"
|
||||||
|
|
||||||
|
# Get leader position (default 0 if missing)
|
||||||
|
pos = leader_action.get(leader_key, 0.0)
|
||||||
|
|
||||||
|
# Convert gripper values: Mini uses 0-100, OpenArms uses 0 to -65 degrees
|
||||||
|
if "gripper" in joint:
|
||||||
|
# Map 0-100 (Mini) to 0 to -65 (OpenArms)
|
||||||
|
# 0 (closed) -> 0°, 100 (open) -> -65°
|
||||||
|
pos = (pos / 100.0) * -65.0
|
||||||
|
else:
|
||||||
|
# Apply direction reversal if specified (non-gripper joints only)
|
||||||
|
pos *= JOINT_DIRECTION_MINI.get(joint, 1)
|
||||||
|
|
||||||
|
follower_action[follower_key] = pos
|
||||||
|
else:
|
||||||
|
# OpenArms: Direct position mapping (no joint swapping)
|
||||||
|
for joint in all_joints:
|
||||||
|
pos_key = f"{joint}.pos"
|
||||||
|
if pos_key in leader_action:
|
||||||
|
follower_action[pos_key] = leader_action[pos_key]
|
||||||
|
|
||||||
if follower_action:
|
if follower_action:
|
||||||
# Build custom ramped PID gains for all motors
|
# Build custom ramped PID gains for all motors
|
||||||
@@ -742,16 +897,20 @@ def cleanup_robot_systems(keep_robots=False):
|
|||||||
try:
|
try:
|
||||||
if not keep_robots:
|
if not keep_robots:
|
||||||
if robot_instances.get("leader"):
|
if robot_instances.get("leader"):
|
||||||
|
leader_type = robot_instances.get("leader_type", "openarms")
|
||||||
|
# Only disable torque for OpenArms (with gravity compensation)
|
||||||
|
if leader_type == "openarms":
|
||||||
robot_instances["leader"].bus_right.disable_torque()
|
robot_instances["leader"].bus_right.disable_torque()
|
||||||
robot_instances["leader"].bus_left.disable_torque()
|
robot_instances["leader"].bus_left.disable_torque()
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
robot_instances["leader"].disconnect()
|
robot_instances["leader"].disconnect()
|
||||||
|
|
||||||
if robot_instances.get("follower"):
|
if robot_instances.get("follower"):
|
||||||
robot_instances["follower"].disconnect()
|
robot_instances["follower"].disconnect()
|
||||||
|
|
||||||
robot_instances["follower"] = None
|
robot_instances["follower"] = None
|
||||||
robot_instances["leader"] = None
|
robot_instances["leader"] = None
|
||||||
|
robot_instances["leader_type"] = None
|
||||||
recording_state["robots_ready"] = False
|
recording_state["robots_ready"] = False
|
||||||
print(f"[Cleanup] Robots disconnected")
|
print(f"[Cleanup] Robots disconnected")
|
||||||
|
|
||||||
@@ -906,39 +1065,59 @@ def do_move_to_zero(source: str = "API"):
|
|||||||
|
|
||||||
follower = robot_instances["follower"]
|
follower = robot_instances["follower"]
|
||||||
leader = robot_instances.get("leader")
|
leader = robot_instances.get("leader")
|
||||||
|
leader_type = robot_instances.get("leader_type", "openarms")
|
||||||
|
|
||||||
if leader:
|
# Handle leader based on type
|
||||||
print(f"[{source}] Disabling leader torque...")
|
if leader and leader_type == "openarms":
|
||||||
|
# OpenArms: Disable torque (with gravity compensation)
|
||||||
|
print(f"[{source}] Disabling leader torque (OpenArms with gravity compensation)...")
|
||||||
leader.bus_right.disable_torque()
|
leader.bus_right.disable_torque()
|
||||||
leader.bus_left.disable_torque()
|
leader.bus_left.disable_torque()
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
elif leader and leader_type == "openarms_mini":
|
||||||
|
# OpenArms Mini: Send to zero position command (will be executed in parallel with follower)
|
||||||
|
print(f"[{source}] Moving OpenArms Mini leader to zero position...")
|
||||||
|
|
||||||
motor_index = {
|
motor_index = {
|
||||||
"joint_1": 0, "joint_2": 1, "joint_3": 2, "joint_4": 3,
|
"joint_1": 0, "joint_2": 1, "joint_3": 2, "joint_4": 3,
|
||||||
"joint_5": 4, "joint_6": 5, "joint_7": 6, "gripper": 7,
|
"joint_5": 4, "joint_6": 5, "joint_7": 6, "gripper": 7,
|
||||||
}
|
}
|
||||||
|
|
||||||
zero_action = {}
|
# Prepare follower zero action
|
||||||
custom_kp = {}
|
follower_zero_action = {}
|
||||||
custom_kd = {}
|
follower_custom_kp = {}
|
||||||
|
follower_custom_kd = {}
|
||||||
|
|
||||||
for motor in follower.bus_right.motors:
|
for motor in follower.bus_right.motors:
|
||||||
full_name = f"right_{motor}"
|
full_name = f"right_{motor}"
|
||||||
zero_action[f"{full_name}.pos"] = 0.0
|
follower_zero_action[f"{full_name}.pos"] = 0.0
|
||||||
idx = motor_index.get(motor, 0)
|
idx = motor_index.get(motor, 0)
|
||||||
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
|
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
|
||||||
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
|
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
|
||||||
custom_kp[full_name] = kp_base * 0.05
|
follower_custom_kp[full_name] = kp_base * 0.05
|
||||||
custom_kd[full_name] = kd_base
|
follower_custom_kd[full_name] = kd_base
|
||||||
|
|
||||||
for motor in follower.bus_left.motors:
|
for motor in follower.bus_left.motors:
|
||||||
full_name = f"left_{motor}"
|
full_name = f"left_{motor}"
|
||||||
zero_action[f"{full_name}.pos"] = 0.0
|
follower_zero_action[f"{full_name}.pos"] = 0.0
|
||||||
idx = motor_index.get(motor, 0)
|
idx = motor_index.get(motor, 0)
|
||||||
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
|
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
|
||||||
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
|
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
|
||||||
custom_kp[full_name] = kp_base * 0.05
|
follower_custom_kp[full_name] = kp_base * 0.05
|
||||||
custom_kd[full_name] = kd_base
|
follower_custom_kd[full_name] = kd_base
|
||||||
|
|
||||||
|
# Prepare leader zero action (for OpenArms Mini)
|
||||||
|
leader_zero_action = None
|
||||||
|
if leader and leader_type == "openarms_mini":
|
||||||
|
leader_zero_action = {}
|
||||||
|
# For OpenArms Mini, send all joints (including gripper) to zero
|
||||||
|
# Gripper zero = 0 (closed)
|
||||||
|
for motor in leader.bus_right.motors:
|
||||||
|
full_name = f"right_{motor}"
|
||||||
|
leader_zero_action[f"{full_name}.pos"] = 0.0
|
||||||
|
for motor in leader.bus_left.motors:
|
||||||
|
full_name = f"left_{motor}"
|
||||||
|
leader_zero_action[f"{full_name}.pos"] = 0.0
|
||||||
|
|
||||||
duration = 2.0
|
duration = 2.0
|
||||||
fps = 30
|
fps = 30
|
||||||
@@ -947,36 +1126,63 @@ def do_move_to_zero(source: str = "API"):
|
|||||||
|
|
||||||
while time.perf_counter() - start_time < duration:
|
while time.perf_counter() - start_time < duration:
|
||||||
loop_start = time.perf_counter()
|
loop_start = time.perf_counter()
|
||||||
follower.send_action(zero_action, custom_kp=custom_kp, custom_kd=custom_kd)
|
|
||||||
|
# Send follower to zero
|
||||||
|
follower.send_action(follower_zero_action, custom_kp=follower_custom_kp, custom_kd=follower_custom_kd)
|
||||||
|
|
||||||
|
# Send leader (OpenArms Mini) to zero if applicable
|
||||||
|
if leader_zero_action:
|
||||||
|
# Use Feetech write commands directly (no custom gains needed)
|
||||||
|
for motor_name in leader.bus_right.motors:
|
||||||
|
full_name = f"right_{motor_name}"
|
||||||
|
pos_key = f"{full_name}.pos"
|
||||||
|
if pos_key in leader_zero_action:
|
||||||
|
# Write zero position to leader motor
|
||||||
|
leader.bus_right.write("Goal_Position", motor_name, leader_zero_action[pos_key])
|
||||||
|
|
||||||
|
for motor_name in leader.bus_left.motors:
|
||||||
|
full_name = f"left_{motor_name}"
|
||||||
|
pos_key = f"{full_name}.pos"
|
||||||
|
if pos_key in leader_zero_action:
|
||||||
|
# Write zero position to leader motor
|
||||||
|
leader.bus_left.write("Goal_Position", motor_name, leader_zero_action[pos_key])
|
||||||
|
|
||||||
loop_duration = time.perf_counter() - loop_start
|
loop_duration = time.perf_counter() - loop_start
|
||||||
sleep_time = dt - loop_duration
|
sleep_time = dt - loop_duration
|
||||||
if sleep_time > 0:
|
if sleep_time > 0:
|
||||||
time.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
if leader:
|
# Re-enable torque only for OpenArms (with gravity compensation)
|
||||||
print(f"[{source}] Re-enabling leader torque...")
|
if leader and leader_type == "openarms":
|
||||||
|
print(f"[{source}] Re-enabling leader torque (OpenArms with gravity compensation)...")
|
||||||
leader.bus_right.enable_torque()
|
leader.bus_right.enable_torque()
|
||||||
leader.bus_left.enable_torque()
|
leader.bus_left.enable_torque()
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
elif leader and leader_type == "openarms_mini":
|
||||||
|
print(f"[{source}] OpenArms Mini leader moved to zero position")
|
||||||
|
|
||||||
recording_state["moving_to_zero"] = False
|
recording_state["moving_to_zero"] = False
|
||||||
recording_state["status_message"] = "Moved to zero position" if recording_state["robots_ready"] else "Ready"
|
recording_state["status_message"] = "Moved to zero position" if recording_state["robots_ready"] else "Ready"
|
||||||
|
|
||||||
print(f"[{source}] Move to zero complete")
|
print(f"[{source}] Move to zero complete (follower + leader)")
|
||||||
return {"status": "success", "message": "Robot moved to zero position"}
|
return {"status": "success", "message": "Robots moved to zero position"}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
recording_state["moving_to_zero"] = False
|
recording_state["moving_to_zero"] = False
|
||||||
recording_state["status_message"] = f"Error: {str(e)}"
|
recording_state["status_message"] = f"Error: {str(e)}"
|
||||||
|
|
||||||
|
# Try to re-enable torque on error (only for OpenArms with gravity compensation)
|
||||||
leader = robot_instances.get("leader")
|
leader = robot_instances.get("leader")
|
||||||
if leader:
|
leader_type = robot_instances.get("leader_type", "openarms")
|
||||||
|
if leader and leader_type == "openarms":
|
||||||
try:
|
try:
|
||||||
|
print(f"[{source}] Attempting to re-enable leader torque after error...")
|
||||||
leader.bus_right.enable_torque()
|
leader.bus_right.enable_torque()
|
||||||
leader.bus_left.enable_torque()
|
leader.bus_left.enable_torque()
|
||||||
except Exception as torque_error:
|
except Exception as torque_error:
|
||||||
print(f"[{source}] Failed to re-enable leader torque: {torque_error}")
|
print(f"[{source}] Failed to re-enable leader torque: {torque_error}")
|
||||||
|
|
||||||
|
print(f"[{source}] Move to zero failed: {e}")
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
raise HTTPException(status_code=500, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user