This commit is contained in:
croissant
2025-11-01 15:36:56 +01:00
committed by Michel Aractingi
parent 0f90db23c5
commit e2c00f6ed8
8 changed files with 679 additions and 116 deletions
+216
View File
@@ -0,0 +1,216 @@
import time
import numpy as np
from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
# Friction model parameters from OpenArms config/follower.yaml
# τ_fric(ω) = Fo + Fv·ω + Fc·tanh(k·ω)
# For 8 motors: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
FRICTION_PARAMS = {
"Fc": [0.306, 0.306, 0.40, 0.166, 0.050, 0.093, 0.172, 0.0512], # Coulomb friction [Nm]
"k": [28.417, 28.417, 29.065, 130.038, 151.771, 242.287, 7.888, 4.000], # tanh steepness
"Fv": [0.063, 0.0630, 0.604, 0.813, 0.029, 0.072, 0.084, 0.084], # Viscous friction [Nm·s/rad]
"Fo": [0.088, 0.088, 0.008, -0.058, 0.005, 0.009, -0.059, -0.050], # Offset torque [Nm]
}
# Constants from OpenArms C++ implementation
AMP_TMP = 1.0
COEF_TMP = 0.1
FRICTION_SCALE = 1.0 # OpenArms C++ uses 0.3 factor in unilateral mode
DAMPING_KD = [0.5, 0.5, 0.5, 0.5, 0.1, 0.1, 0.1, 0.1] # Damping gains for stability
def compute_friction_torque(velocity_rad_per_sec: float, motor_index: int) -> float:
"""
Compute friction torque for a single motor using the tanh friction model.
Args:
velocity_rad_per_sec: Angular velocity in rad/s
motor_index: Index of the motor (0-7)
Returns:
Friction torque in N·m (scaled for stability)
"""
Fc = FRICTION_PARAMS["Fc"][motor_index]
k = FRICTION_PARAMS["k"][motor_index]
Fv = FRICTION_PARAMS["Fv"][motor_index]
Fo = FRICTION_PARAMS["Fo"][motor_index]
# Friction model: τ_fric = amp * Fc * tanh(coef * k * ω) + Fv * ω + Fo
friction_torque = (
AMP_TMP * Fc * np.tanh(COEF_TMP * k * velocity_rad_per_sec) +
Fv * velocity_rad_per_sec +
Fo
)
# Scale down friction compensation for stability at lower control rates
# (OpenArms C++ uses 0.3 factor in unilateral mode)!!
friction_torque *= FRICTION_SCALE
return friction_torque
def main() -> None:
config = OpenArmsFollowerConfig(
port_left="can0",
port_right="can1",
can_interface="socketcan",
id="openarms_follower",
disable_torque_on_disconnect=True,
max_relative_target=5.0,
)
print("Initializing robot...")
follower = OpenArmsFollower(config)
follower.connect(calibrate=True)
print(f"Applying friction compensation")
print(" 1. Support the arm before starting")
print(" 2. The arm will be held in place by friction compensation")
print(" 3. You should be able to move it with gentle force")
print("\nPress ENTER when ready to start...")
input()
print(f"✓ Motors enabled")
print("\nStarting friction compensation loop...")
print("Press Ctrl+C to stop\n")
loop_times = []
last_print_time = time.perf_counter()
# Motor name to index mapping
motor_name_to_index = {
"joint_1": 0,
"joint_2": 1,
"joint_3": 2,
"joint_4": 3,
"joint_5": 4,
"joint_6": 5,
"joint_7": 6,
"gripper": 7,
}
try:
while True:
loop_start = time.perf_counter()
# Get current joint positions and velocities from robot
obs = follower.get_observation()
# Extract velocities in degrees per second
velocities_deg_per_sec = {}
positions_deg = {}
for motor in follower.bus_right.motors:
vel_key = f"right_{motor}.vel"
pos_key = f"right_{motor}.pos"
if vel_key in obs:
velocities_deg_per_sec[f"right_{motor}"] = obs[vel_key]
if pos_key in obs:
positions_deg[f"right_{motor}"] = obs[pos_key]
for motor in follower.bus_left.motors:
vel_key = f"left_{motor}.vel"
pos_key = f"left_{motor}.pos"
if vel_key in obs:
velocities_deg_per_sec[f"left_{motor}"] = obs[vel_key]
if pos_key in obs:
positions_deg[f"left_{motor}"] = obs[pos_key]
# Convert velocities to rad/s and compute friction torques
friction_torques_nm = {}
for motor_full_name, velocity_deg_per_sec in velocities_deg_per_sec.items():
# Extract motor name without arm prefix
if motor_full_name.startswith("right_"):
motor_name = motor_full_name.removeprefix("right_")
elif motor_full_name.startswith("left_"):
motor_name = motor_full_name.removeprefix("left_")
else:
continue
# Get motor index for friction parameters
motor_index = motor_name_to_index.get(motor_name, 0)
# Convert velocity to rad/s
velocity_rad_per_sec = np.deg2rad(velocity_deg_per_sec)
# Compute friction torque
friction_torque = compute_friction_torque(velocity_rad_per_sec, motor_index)
friction_torques_nm[motor_full_name] = friction_torque
# Apply friction compensation to right arm (all joints INCLUDING gripper)
for motor in follower.bus_right.motors:
full_name = f"right_{motor}"
position = positions_deg.get(full_name, 0.0)
torque = friction_torques_nm.get(full_name, 0.0)
# Get motor index for damping gain
motor_index = motor_name_to_index.get(motor, 0)
kd = DAMPING_KD[motor_index]
# Send MIT control command with friction compensation + damping
follower.bus_right._mit_control(
motor=motor,
kp=0.0, # No position control
kd=kd, # Add damping for stability
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque
)
# Apply friction compensation to left arm (all joints INCLUDING gripper)
for motor in follower.bus_left.motors:
full_name = f"left_{motor}"
position = positions_deg.get(full_name, 0.0)
torque = friction_torques_nm.get(full_name, 0.0)
# Get motor index for damping gain
motor_index = motor_name_to_index.get(motor, 0)
kd = DAMPING_KD[motor_index]
# Send MIT control command with friction compensation + damping
follower.bus_left._mit_control(
motor=motor,
kp=0.0, # No position control
kd=kd, # Add damping for stability
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque
)
# Measure loop time
loop_end = time.perf_counter()
loop_time = loop_end - loop_start
loop_times.append(loop_time)
# Print status every 2 seconds
if loop_end - last_print_time >= 2.0:
if loop_times:
avg_time = sum(loop_times) / len(loop_times)
current_hz = 1.0 / avg_time if avg_time > 0 else 0
print(f"{current_hz:.1f} Hz")
loop_times = []
last_print_time = loop_end
time.sleep(0.001)
except KeyboardInterrupt:
print("\n\nStopping friction compensation...")
finally:
print("\nDisabling all motors and disconnecting...")
follower.bus_right.disable_torque()
follower.bus_left.disable_torque()
time.sleep(0.1)
follower.disconnect()
print("✓ Safe shutdown complete")
if __name__ == "__main__":
main()
+38 -3
View File
@@ -89,23 +89,39 @@ loop_times = []
start_time = time.perf_counter()
last_print_time = start_time
# Detailed timing accumulators
timing_stats = {
"leader_read": [],
"filter_data": [],
"follower_write": [],
}
try:
while True:
loop_start = time.perf_counter()
# Get action from leader
t0 = time.perf_counter()
leader_action = leader.get_action()
t1 = time.perf_counter()
timing_stats["leader_read"].append((t1 - t0) * 1000)
# Filter to only position data for all joints (both arms)
t2 = time.perf_counter()
joint_action = {}
for joint in all_joints:
pos_key = f"{joint}.pos"
if pos_key in leader_action:
joint_action[pos_key] = leader_action[pos_key]
t3 = time.perf_counter()
timing_stats["filter_data"].append((t3 - t2) * 1000)
# Send action to follower (both arms)
t4 = time.perf_counter()
if joint_action:
follower.send_action(joint_action)
t5 = time.perf_counter()
timing_stats["follower_write"].append((t5 - t4) * 1000)
# Measure loop time
loop_end = time.perf_counter()
@@ -122,12 +138,31 @@ try:
max_hz = 1.0 / min_time if min_time > 0 else 0
min_hz = 1.0 / max_time if max_time > 0 else 0
print(f"[Hz Stats] Avg: {current_hz:.1f} Hz | "
f"Range: {min_hz:.1f}-{max_hz:.1f} Hz | "
f"Avg loop time: {avg_time*1000:.1f} ms")
# Calculate average timing for each operation
timing_avgs = {
k: (sum(v) / len(v) if v else 0.0)
for k, v in timing_stats.items()
}
# Print detailed timing breakdown
print(f"\n{'='*70}")
print(f"Teleoperation - {current_hz:.1f} Hz | Total: {avg_time*1000:.1f} ms")
print(f"{'='*70}")
print(f" Leader Read: {timing_avgs['leader_read']:6.2f} ms")
print(f" Filter Data: {timing_avgs['filter_data']:6.2f} ms")
print(f" Follower Write: {timing_avgs['follower_write']:6.2f} ms")
print(f" ───────────────────────────────")
total_accounted = sum(timing_avgs.values())
print(f" Accounted: {total_accounted:6.2f} ms")
print(f" Unaccounted: {avg_time*1000 - total_accounted:6.2f} ms")
print(f" ───────────────────────────────")
print(f" Hz Range: {min_hz:.1f} - {max_hz:.1f} Hz")
print(f"{'='*70}\n")
# Reset for next measurement window
loop_times = []
for key in timing_stats:
timing_stats[key] = []
last_print_time = loop_end
except KeyboardInterrupt:
@@ -1,7 +1,7 @@
"""
OpenArms Teleoperation with Gravity Compensation
OpenArms Teleoperation with Gravity + Friction Compensation
Leader arms (both LEFT and RIGHT): Gravity compensation (weightless, easy to move)
Leader arms (both LEFT and RIGHT): Gravity + Friction compensation (weightless, easy to move)
Follower arms (both LEFT and RIGHT): Mirror leader movements
Uses the URDF file from the lerobot repository.
@@ -16,6 +16,8 @@ from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig
from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader
# Friction compensation scale factor (1.0 = full, 0.3 = 30% for stability)
FRICTION_SCALE = 1.0
def main():
"""Main teleoperation loop with gravity compensation"""
@@ -54,7 +56,7 @@ def main():
if leader.pin_robot is None:
raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.")
print("\nLeader BOTH arms: G-comp | Follower BOTH arms: Teleop")
print("\nLeader BOTH arms: Gravity + Friction comp | Follower BOTH arms: Teleop")
print("Press ENTER to start...")
input()
@@ -83,75 +85,75 @@ def main():
# Get leader state
leader_action = leader.get_action()
# Extract positions in degrees
# Extract positions and velocities in degrees
leader_positions_deg = {}
leader_velocities_deg_per_sec = {}
for motor in leader.bus_right.motors:
key = f"right_{motor}.pos"
if key in leader_action:
leader_positions_deg[f"right_{motor}"] = leader_action[key]
pos_key = f"right_{motor}.pos"
vel_key = f"right_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"right_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"right_{motor}"] = leader_action[vel_key]
for motor in leader.bus_left.motors:
key = f"left_{motor}.pos"
if key in leader_action:
leader_positions_deg[f"left_{motor}"] = leader_action[key]
pos_key = f"left_{motor}.pos"
vel_key = f"left_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"left_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"left_{motor}"] = leader_action[vel_key]
# Calculate gravity torques for leader using built-in method
leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()}
leader_torques_nm = leader._gravity_from_q(leader_positions_rad)
leader_gravity_torques_nm = leader._gravity_from_q(leader_positions_rad)
# Calculate friction torques for leader using built-in method
leader_velocities_rad_per_sec = {k: np.deg2rad(v) for k, v in leader_velocities_deg_per_sec.items()}
leader_friction_torques_nm = leader._friction_from_velocity(
leader_velocities_rad_per_sec,
friction_scale=FRICTION_SCALE
)
# Combine gravity + friction torques
leader_total_torques_nm = {}
for motor_name in leader_gravity_torques_nm:
gravity = leader_gravity_torques_nm.get(motor_name, 0.0)
friction = leader_friction_torques_nm.get(motor_name, 0.0)
leader_total_torques_nm[motor_name] = gravity + friction
# Apply gravity compensation to leader RIGHT arm (all joints except gripper)
# Apply gravity + friction compensation to leader RIGHT arm (all joints including gripper)
for motor in leader.bus_right.motors:
if motor == "gripper":
# Skip gripper - keep it free
full_name = f"right_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
leader.bus_right._mit_control(
motor=motor,
kp=0.0,
kd=0.0,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=0.0,
)
continue
full_name = f"right_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque = leader_torques_nm.get(full_name, 0.0)
torque = leader_total_torques_nm.get(full_name, 0.0)
# Get damping gain for stability
kd = leader.get_damping_kd(motor)
leader.bus_right._mit_control(
motor=motor,
kp=0.0,
kd=0.0,
kd=kd, # Add damping for stability
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque,
)
# Apply gravity compensation to leader LEFT arm (all joints except gripper)
# Apply gravity + friction compensation to leader LEFT arm (all joints including gripper)
for motor in leader.bus_left.motors:
if motor == "gripper":
# Skip gripper - keep it free
full_name = f"left_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
leader.bus_left._mit_control(
motor=motor,
kp=0.0,
kd=0.0,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=0.0,
)
continue
full_name = f"left_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque = leader_torques_nm.get(full_name, 0.0)
torque = leader_total_torques_nm.get(full_name, 0.0)
# Get damping gain for stability
kd = leader.get_damping_kd(motor)
leader.bus_left._mit_control(
leader.bus_left._mit_control(
motor=motor,
kp=0.0,
kd=0.0,
kd=kd, # Add damping for stability
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque,
+116
View File
@@ -440,6 +440,65 @@ class DamiaoMotorsBus(MotorsBusBase):
self.canbus.send(msg)
recv_id = self._get_motor_recv_id(motor)
self._recv_motor_response(expected_recv_id=recv_id)
def _mit_control_batch(
self,
commands: Dict[NameOrID, Tuple[float, float, float, float, float]],
) -> None:
"""
Send MIT control commands to multiple motors in batch (optimized).
Sends all commands first, then collects responses. Much faster than sequential.
Args:
commands: Dict mapping motor name/ID to (kp, kd, position_deg, velocity_deg/s, torque)
Example: {'joint_1': (10.0, 0.5, 45.0, 0.0, 0.0), ...}
"""
if not commands:
return
expected_recv_ids = []
# Step 1: Send all MIT control commands (no waiting)
for motor, (kp, kd, position_degrees, velocity_deg_per_sec, torque) in commands.items():
motor_id = self._get_motor_id(motor)
motor_name = self._get_motor_name(motor)
motor_type = self._motor_types.get(motor_name, MotorType.DM4310)
# Convert degrees to radians
position_rad = np.radians(position_degrees)
velocity_rad_per_sec = np.radians(velocity_deg_per_sec)
# Get motor limits
pmax, vmax, tmax = MOTOR_LIMIT_PARAMS[motor_type]
# Encode parameters
kp_uint = self._float_to_uint(kp, 0, 500, 12)
kd_uint = self._float_to_uint(kd, 0, 5, 12)
q_uint = self._float_to_uint(position_rad, -pmax, pmax, 16)
dq_uint = self._float_to_uint(velocity_rad_per_sec, -vmax, vmax, 12)
tau_uint = self._float_to_uint(torque, -tmax, tmax, 12)
# Pack data
data = [0] * 8
data[0] = (q_uint >> 8) & 0xFF
data[1] = q_uint & 0xFF
data[2] = dq_uint >> 4
data[3] = ((dq_uint & 0xF) << 4) | ((kp_uint >> 8) & 0xF)
data[4] = kp_uint & 0xFF
data[5] = kd_uint >> 4
data[6] = ((kd_uint & 0xF) << 4) | ((tau_uint >> 8) & 0xF)
data[7] = tau_uint & 0xFF
# Send command
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
self.canbus.send(msg)
# Track expected response
recv_id = self._get_motor_recv_id(motor)
expected_recv_ids.append(recv_id)
# Step 2: Collect all responses at once
self._recv_all_responses(expected_recv_ids, timeout=0.002)
def _float_to_uint(self, x: float, x_min: float, x_max: float, bits: int) -> int:
"""Convert float to unsigned integer for CAN transmission."""
@@ -611,6 +670,63 @@ class DamiaoMotorsBus(MotorsBusBase):
result[motor] = 0.0
return result
def sync_read_all_states(
self,
motors: str | list[str] | None = None,
*,
num_retry: int = 0,
) -> Dict[str, Dict[str, Value]]:
"""
Read ALL motor states (position, velocity, torque) from multiple motors in ONE refresh cycle.
This is 3x faster than calling sync_read() three times separately.
Returns:
Dictionary mapping motor names to state dicts with keys: 'position', 'velocity', 'torque'
Example: {'joint_1': {'position': 45.2, 'velocity': 1.3, 'torque': 0.5}, ...}
"""
motors = self._get_motors_list(motors)
result = {}
# Step 1: Send refresh commands to ALL motors first (no waiting)
for motor in motors:
motor_id = self._get_motor_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False)
self.canbus.send(msg)
# Step 2: Collect all responses at once (batch receive)
expected_recv_ids = [self._get_motor_recv_id(motor) for motor in motors]
responses = self._recv_all_responses(expected_recv_ids, timeout=0.003) # 3ms total timeout
# Step 3: Parse responses and extract ALL state values
for motor in motors:
try:
recv_id = self._get_motor_recv_id(motor)
msg = responses.get(recv_id)
if msg is None:
logger.warning(f"No response from motor '{motor}' (recv ID: 0x{recv_id:02X})")
result[motor] = {"position": 0.0, "velocity": 0.0, "torque": 0.0}
continue
motor_type = self._motor_types.get(motor, MotorType.DM4310)
position_degrees, velocity_deg_per_sec, torque, t_mos, t_rotor = self._decode_motor_state(msg.data, motor_type)
# Return all state values in one dict
result[motor] = {
"position": position_degrees,
"velocity": velocity_deg_per_sec,
"torque": torque,
"temp_mos": t_mos,
"temp_rotor": t_rotor,
}
except Exception as e:
logger.warning(f"Failed to read state from {motor}: {e}")
result[motor] = {"position": 0.0, "velocity": 0.0, "torque": 0.0}
return result
def sync_write(
self,
@@ -72,11 +72,22 @@ class OpenArmsFollowerConfig(RobotConfig):
"gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
})
# MIT control parameters for position control (per motor)
# Values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
# MIT control parameters for position control (used in send_action)
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0])
position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
# Damping gains for stability when applying torque compensation (gravity/friction)
# Used when kp=0 and only torque is applied
damping_kd: list[float] = field(default_factory=lambda: [0.5, 0.5, 0.5, 0.5, 0.1, 0.1, 0.1, 0.1])
# Friction model parameters: τ_fric(ω) = Fo + Fv·ω + Fc·tanh(k·ω)
# From OpenArms config/follower.yaml
friction_fc: list[float] = field(default_factory=lambda: [0.306, 0.306, 0.40, 0.166, 0.050, 0.093, 0.172, 0.0512]) # Coulomb friction [Nm]
friction_k: list[float] = field(default_factory=lambda: [28.417, 28.417, 29.065, 130.038, 151.771, 242.287, 7.888, 4.000]) # tanh steepness
friction_fv: list[float] = field(default_factory=lambda: [0.063, 0.0630, 0.604, 0.813, 0.029, 0.072, 0.084, 0.084]) # Viscous friction [Nm·s/rad]
friction_fo: list[float] = field(default_factory=lambda: [0.088, 0.088, 0.008, -0.058, 0.005, 0.009, -0.059, -0.050]) # Offset torque [Nm]
# Calibration parameters
calibration_mode: str = "manual" # "manual" or "auto"
zero_position_on_connect: bool = False # Set zero position on connect
+127 -49
View File
@@ -296,32 +296,33 @@ class OpenArmsFollower(Robot):
raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.")
def get_observation(self) -> Dict[str, Any]:
"""Get current observation from robot including position, velocity, and torque."""
"""
Get current observation from robot including position, velocity, and torque.
OPTIMIZED: Reads all motor states (pos/vel/torque) in one CAN refresh cycle
instead of 3 separate reads.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read motor positions, velocities, and torques from right arm
start = time.perf_counter()
positions_right = self.bus_right.sync_read("Present_Position")
velocities_right = self.bus_right.sync_read("Present_Velocity")
torques_right = self.bus_right.sync_read("Present_Torque")
# OPTIMIZED: Use sync_read_all_states to get pos/vel/torque in one go
right_states = self.bus_right.sync_read_all_states()
for motor in self.bus_right.motors:
obs_dict[f"right_{motor}.pos"] = positions_right.get(motor, 0.0)
obs_dict[f"right_{motor}.vel"] = velocities_right.get(motor, 0.0)
obs_dict[f"right_{motor}.torque"] = torques_right.get(motor, 0.0)
# Read motor positions, velocities, and torques from left arm
positions_left = self.bus_left.sync_read("Present_Position")
velocities_left = self.bus_left.sync_read("Present_Velocity")
torques_left = self.bus_left.sync_read("Present_Torque")
state = right_states.get(motor, {})
obs_dict[f"right_{motor}.pos"] = state.get("position", 0.0)
obs_dict[f"right_{motor}.vel"] = state.get("velocity", 0.0)
obs_dict[f"right_{motor}.torque"] = state.get("torque", 0.0)
# OPTIMIZED: Use sync_read_all_states to get pos/vel/torque in one go
left_states = self.bus_left.sync_read_all_states()
for motor in self.bus_left.motors:
obs_dict[f"left_{motor}.pos"] = positions_left.get(motor, 0.0)
obs_dict[f"left_{motor}.vel"] = velocities_left.get(motor, 0.0)
obs_dict[f"left_{motor}.torque"] = torques_left.get(motor, 0.0)
state = left_states.get(motor, {})
obs_dict[f"left_{motor}.pos"] = state.get("position", 0.0)
obs_dict[f"left_{motor}.vel"] = state.get("velocity", 0.0)
obs_dict[f"left_{motor}.torque"] = state.get("torque", 0.0)
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
@@ -404,39 +405,25 @@ class OpenArmsFollower(Robot):
"gripper": 7,
}
# Send MIT control commands to right arm
for motor_name, position_degrees in goal_pos_right.items():
# Get per-motor gains from config
idx = motor_index.get(motor_name, 0)
kp = self.config.position_kp[idx]
kd = self.config.position_kd[idx]
# Send MIT control command (position is in degrees)
self.bus_right._mit_control(
motor_name,
kp=kp,
kd=kd,
position_degrees=position_degrees,
velocity_deg_per_sec=0.0,
torque=0.0
)
# Use batch MIT control for right arm (sends all commands, then collects responses)
if goal_pos_right:
commands_right = {}
for motor_name, position_degrees in goal_pos_right.items():
idx = motor_index.get(motor_name, 0)
kp = self.config.position_kp[idx] if isinstance(self.config.position_kp, list) else self.config.position_kp
kd = self.config.position_kd[idx] if isinstance(self.config.position_kd, list) else self.config.position_kd
commands_right[motor_name] = (kp, kd, position_degrees, 0.0, 0.0)
self.bus_right._mit_control_batch(commands_right)
# Send MIT control commands to left arm
for motor_name, position_degrees in goal_pos_left.items():
# Get per-motor gains from config
idx = motor_index.get(motor_name, 0)
kp = self.config.position_kp[idx]
kd = self.config.position_kd[idx]
# Send MIT control command (position is in degrees)
self.bus_left._mit_control(
motor_name,
kp=kp,
kd=kd,
position_degrees=position_degrees,
velocity_deg_per_sec=0.0,
torque=0.0
)
# Use batch MIT control for left arm (sends all commands, then collects responses)
if goal_pos_left:
commands_left = {}
for motor_name, position_degrees in goal_pos_left.items():
idx = motor_index.get(motor_name, 0)
kp = self.config.position_kp[idx] if isinstance(self.config.position_kp, list) else self.config.position_kp
kd = self.config.position_kd[idx] if isinstance(self.config.position_kd, list) else self.config.position_kd
commands_left[motor_name] = (kp, kd, position_degrees, 0.0, 0.0)
self.bus_left._mit_control_batch(commands_left)
# Return the actions that were actually sent
result = {}
@@ -544,3 +531,94 @@ class OpenArmsFollower(Robot):
return result
def _friction_from_velocity(
self,
velocity_rad_per_sec: Dict[str, float],
friction_scale: float = 1.0,
amp_tmp: float = 1.0,
coef_tmp: float = 0.1
) -> Dict[str, float]:
"""
Compute friction torques for all joints in the robot using tanh friction model.
Args:
velocity_rad_per_sec: Dictionary mapping motor names (with arm prefix) to velocities in rad/s
friction_scale: Scale factor for friction compensation (default 1.0, use 0.3 for stability)
amp_tmp: Amplitude factor for tanh term (default 1.0)
coef_tmp: Coefficient for tanh steepness (default 0.1)
Returns:
Dictionary mapping motor names to friction torques in N·m
"""
# Motor name to index mapping
motor_name_to_index = {
"joint_1": 0,
"joint_2": 1,
"joint_3": 2,
"joint_4": 3,
"joint_5": 4,
"joint_6": 5,
"joint_7": 6,
"gripper": 7,
}
result = {}
# Process all motors (left and right)
for motor_full_name, velocity in velocity_rad_per_sec.items():
# Extract motor name without arm prefix
if motor_full_name.startswith("right_"):
motor_name = motor_full_name.removeprefix("right_")
elif motor_full_name.startswith("left_"):
motor_name = motor_full_name.removeprefix("left_")
else:
result[motor_full_name] = 0.0
continue
# Get motor index for friction parameters
motor_index = motor_name_to_index.get(motor_name, 0)
# Get friction parameters from config
Fc = self.config.friction_fc[motor_index]
k = self.config.friction_k[motor_index]
Fv = self.config.friction_fv[motor_index]
Fo = self.config.friction_fo[motor_index]
# Friction model: τ_fric = amp * Fc * tanh(coef * k * ω) + Fv * ω + Fo
friction_torque = (
amp_tmp * Fc * np.tanh(coef_tmp * k * velocity) +
Fv * velocity +
Fo
)
# Apply scale factor
friction_torque *= friction_scale
result[motor_full_name] = float(friction_torque)
return result
def get_damping_kd(self, motor_name: str) -> float:
"""
Get damping gain (Kd) for a specific motor.
Args:
motor_name: Motor name without arm prefix (e.g., "joint_1", "gripper")
Returns:
Damping gain value
"""
motor_name_to_index = {
"joint_1": 0,
"joint_2": 1,
"joint_3": 2,
"joint_4": 3,
"joint_5": 4,
"joint_6": 5,
"joint_7": 6,
"gripper": 7,
}
motor_index = motor_name_to_index.get(motor_name, 0)
return self.config.damping_kd[motor_index]
@@ -62,3 +62,19 @@ class OpenArmsLeaderConfig(TeleoperatorConfig):
# Torque mode settings for manual control
# When enabled, motors have torque disabled for manual movement
manual_control: bool = True
# MIT control parameters (used when manual_control=False for torque control)
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0])
position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
# Damping gains for stability when applying torque compensation (gravity/friction)
# Used when kp=0 and only torque is applied
damping_kd: list[float] = field(default_factory=lambda: [0.5, 0.5, 0.5, 0.5, 0.1, 0.1, 0.1, 0.1])
# Friction model parameters: τ_fric(ω) = Fo + Fv·ω + Fc·tanh(k·ω)
# From OpenArms config/leader.yaml (note: Fc[5] is slightly different: 0.083 vs 0.093)
friction_fc: list[float] = field(default_factory=lambda: [0.306, 0.306, 0.40, 0.166, 0.050, 0.083, 0.172, 0.0512]) # Coulomb friction [Nm]
friction_k: list[float] = field(default_factory=lambda: [28.417, 28.417, 29.065, 130.038, 151.771, 242.287, 7.888, 4.000]) # tanh steepness
friction_fv: list[float] = field(default_factory=lambda: [0.063, 0.0630, 0.604, 0.813, 0.029, 0.072, 0.084, 0.084]) # Viscous friction [Nm·s/rad]
friction_fo: list[float] = field(default_factory=lambda: [0.088, 0.088, 0.008, -0.058, 0.005, 0.009, -0.059, -0.050]) # Offset torque [Nm]
@@ -276,32 +276,30 @@ class OpenArmsLeader(Teleoperator):
This is the main method for teleoperators - it reads the current state
of the leader arm and returns it as an action that can be sent to a follower.
Reads all motor states (pos/vel/torque) in one CAN refresh cycle
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
action_dict = {}
# Read motor positions, velocities, and torques from right arm
start = time.perf_counter()
positions_right = self.bus_right.sync_read("Present_Position")
velocities_right = self.bus_right.sync_read("Present_Velocity")
torques_right = self.bus_right.sync_read("Present_Torque")
# OPTIMIZED: Use sync_read_all_states to get pos/vel/torque in one go
right_states = self.bus_right.sync_read_all_states()
for motor in self.bus_right.motors:
action_dict[f"right_{motor}.pos"] = positions_right.get(motor, 0.0)
action_dict[f"right_{motor}.vel"] = velocities_right.get(motor, 0.0)
action_dict[f"right_{motor}.torque"] = torques_right.get(motor, 0.0)
# Read motor positions, velocities, and torques from left arm
positions_left = self.bus_left.sync_read("Present_Position")
velocities_left = self.bus_left.sync_read("Present_Velocity")
torques_left = self.bus_left.sync_read("Present_Torque")
state = right_states.get(motor, {})
action_dict[f"right_{motor}.pos"] = state.get("position", 0.0)
action_dict[f"right_{motor}.vel"] = state.get("velocity", 0.0)
action_dict[f"right_{motor}.torque"] = state.get("torque", 0.0)
# OPTIMIZED: Use sync_read_all_states to get pos/vel/torque in one go
left_states = self.bus_left.sync_read_all_states()
for motor in self.bus_left.motors:
action_dict[f"left_{motor}.pos"] = positions_left.get(motor, 0.0)
action_dict[f"left_{motor}.vel"] = velocities_left.get(motor, 0.0)
action_dict[f"left_{motor}.torque"] = torques_left.get(motor, 0.0)
state = left_states.get(motor, {})
action_dict[f"left_{motor}.pos"] = state.get("position", 0.0)
action_dict[f"left_{motor}.vel"] = state.get("velocity", 0.0)
action_dict[f"left_{motor}.torque"] = state.get("torque", 0.0)
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
@@ -412,5 +410,96 @@ class OpenArmsLeader(Teleoperator):
idx += 2
return result
def _friction_from_velocity(
self,
velocity_rad_per_sec: Dict[str, float],
friction_scale: float = 1.0,
amp_tmp: float = 1.0,
coef_tmp: float = 0.1
) -> Dict[str, float]:
"""
Compute friction torques for all joints in the robot using tanh friction model.
Args:
velocity_rad_per_sec: Dictionary mapping motor names (with arm prefix) to velocities in rad/s
friction_scale: Scale factor for friction compensation (default 1.0, use 0.3 for stability)
amp_tmp: Amplitude factor for tanh term (default 1.0)
coef_tmp: Coefficient for tanh steepness (default 0.1)
Returns:
Dictionary mapping motor names to friction torques in N·m
"""
# Motor name to index mapping
motor_name_to_index = {
"joint_1": 0,
"joint_2": 1,
"joint_3": 2,
"joint_4": 3,
"joint_5": 4,
"joint_6": 5,
"joint_7": 6,
"gripper": 7,
}
result = {}
# Process all motors (left and right)
for motor_full_name, velocity in velocity_rad_per_sec.items():
# Extract motor name without arm prefix
if motor_full_name.startswith("right_"):
motor_name = motor_full_name.removeprefix("right_")
elif motor_full_name.startswith("left_"):
motor_name = motor_full_name.removeprefix("left_")
else:
result[motor_full_name] = 0.0
continue
# Get motor index for friction parameters
motor_index = motor_name_to_index.get(motor_name, 0)
# Get friction parameters from config
Fc = self.config.friction_fc[motor_index]
k = self.config.friction_k[motor_index]
Fv = self.config.friction_fv[motor_index]
Fo = self.config.friction_fo[motor_index]
# Friction model: τ_fric = amp * Fc * tanh(coef * k * ω) + Fv * ω + Fo
friction_torque = (
amp_tmp * Fc * np.tanh(coef_tmp * k * velocity) +
Fv * velocity +
Fo
)
# Apply scale factor
friction_torque *= friction_scale
result[motor_full_name] = float(friction_torque)
return result
def get_damping_kd(self, motor_name: str) -> float:
"""
Get damping gain (Kd) for a specific motor.
Args:
motor_name: Motor name without arm prefix (e.g., "joint_1", "gripper")
Returns:
Damping gain value
"""
motor_name_to_index = {
"joint_1": 0,
"joint_2": 1,
"joint_3": 2,
"joint_4": 3,
"joint_5": 4,
"joint_6": 5,
"joint_7": 6,
"gripper": 7,
}
motor_index = motor_name_to_index.get(motor_name, 0)
return self.config.damping_kd[motor_index]