diff --git a/examples/openarms/friction_compensation.py b/examples/openarms/friction_compensation.py new file mode 100644 index 000000000..d12febd4d --- /dev/null +++ b/examples/openarms/friction_compensation.py @@ -0,0 +1,216 @@ +import time +import numpy as np + +from lerobot.robots.openarms.openarms_follower import OpenArmsFollower +from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig + + +# Friction model parameters from OpenArms config/follower.yaml +# τ_fric(ω) = Fo + Fv·ω + Fc·tanh(k·ω) +# For 8 motors: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper] +FRICTION_PARAMS = { + "Fc": [0.306, 0.306, 0.40, 0.166, 0.050, 0.093, 0.172, 0.0512], # Coulomb friction [Nm] + "k": [28.417, 28.417, 29.065, 130.038, 151.771, 242.287, 7.888, 4.000], # tanh steepness + "Fv": [0.063, 0.0630, 0.604, 0.813, 0.029, 0.072, 0.084, 0.084], # Viscous friction [Nm·s/rad] + "Fo": [0.088, 0.088, 0.008, -0.058, 0.005, 0.009, -0.059, -0.050], # Offset torque [Nm] +} + +# Constants from OpenArms C++ implementation +AMP_TMP = 1.0 +COEF_TMP = 0.1 + +FRICTION_SCALE = 1.0 # OpenArms C++ uses 0.3 factor in unilateral mode +DAMPING_KD = [0.5, 0.5, 0.5, 0.5, 0.1, 0.1, 0.1, 0.1] # Damping gains for stability + +def compute_friction_torque(velocity_rad_per_sec: float, motor_index: int) -> float: + """ + Compute friction torque for a single motor using the tanh friction model. + + Args: + velocity_rad_per_sec: Angular velocity in rad/s + motor_index: Index of the motor (0-7) + + Returns: + Friction torque in N·m (scaled for stability) + """ + + Fc = FRICTION_PARAMS["Fc"][motor_index] + k = FRICTION_PARAMS["k"][motor_index] + Fv = FRICTION_PARAMS["Fv"][motor_index] + Fo = FRICTION_PARAMS["Fo"][motor_index] + + # Friction model: τ_fric = amp * Fc * tanh(coef * k * ω) + Fv * ω + Fo + friction_torque = ( + AMP_TMP * Fc * np.tanh(COEF_TMP * k * velocity_rad_per_sec) + + Fv * velocity_rad_per_sec + + Fo + ) + + # Scale down friction compensation for stability at lower control rates + # (OpenArms C++ uses 0.3 factor in unilateral mode)!! + friction_torque *= FRICTION_SCALE + + return friction_torque + + +def main() -> None: + config = OpenArmsFollowerConfig( + port_left="can0", + port_right="can1", + can_interface="socketcan", + id="openarms_follower", + disable_torque_on_disconnect=True, + max_relative_target=5.0, + ) + + print("Initializing robot...") + follower = OpenArmsFollower(config) + follower.connect(calibrate=True) + + print(f"Applying friction compensation") + print(" 1. Support the arm before starting") + print(" 2. The arm will be held in place by friction compensation") + print(" 3. You should be able to move it with gentle force") + print("\nPress ENTER when ready to start...") + input() + + print(f"✓ Motors enabled") + print("\nStarting friction compensation loop...") + print("Press Ctrl+C to stop\n") + + loop_times = [] + last_print_time = time.perf_counter() + + # Motor name to index mapping + motor_name_to_index = { + "joint_1": 0, + "joint_2": 1, + "joint_3": 2, + "joint_4": 3, + "joint_5": 4, + "joint_6": 5, + "joint_7": 6, + "gripper": 7, + } + + try: + while True: + loop_start = time.perf_counter() + + # Get current joint positions and velocities from robot + obs = follower.get_observation() + + # Extract velocities in degrees per second + velocities_deg_per_sec = {} + positions_deg = {} + + for motor in follower.bus_right.motors: + vel_key = f"right_{motor}.vel" + pos_key = f"right_{motor}.pos" + if vel_key in obs: + velocities_deg_per_sec[f"right_{motor}"] = obs[vel_key] + if pos_key in obs: + positions_deg[f"right_{motor}"] = obs[pos_key] + + for motor in follower.bus_left.motors: + vel_key = f"left_{motor}.vel" + pos_key = f"left_{motor}.pos" + if vel_key in obs: + velocities_deg_per_sec[f"left_{motor}"] = obs[vel_key] + if pos_key in obs: + positions_deg[f"left_{motor}"] = obs[pos_key] + + # Convert velocities to rad/s and compute friction torques + friction_torques_nm = {} + for motor_full_name, velocity_deg_per_sec in velocities_deg_per_sec.items(): + # Extract motor name without arm prefix + if motor_full_name.startswith("right_"): + motor_name = motor_full_name.removeprefix("right_") + elif motor_full_name.startswith("left_"): + motor_name = motor_full_name.removeprefix("left_") + else: + continue + + # Get motor index for friction parameters + motor_index = motor_name_to_index.get(motor_name, 0) + + # Convert velocity to rad/s + velocity_rad_per_sec = np.deg2rad(velocity_deg_per_sec) + + # Compute friction torque + friction_torque = compute_friction_torque(velocity_rad_per_sec, motor_index) + friction_torques_nm[motor_full_name] = friction_torque + + # Apply friction compensation to right arm (all joints INCLUDING gripper) + for motor in follower.bus_right.motors: + full_name = f"right_{motor}" + position = positions_deg.get(full_name, 0.0) + torque = friction_torques_nm.get(full_name, 0.0) + + # Get motor index for damping gain + motor_index = motor_name_to_index.get(motor, 0) + kd = DAMPING_KD[motor_index] + + # Send MIT control command with friction compensation + damping + follower.bus_right._mit_control( + motor=motor, + kp=0.0, # No position control + kd=kd, # Add damping for stability + position_degrees=position, + velocity_deg_per_sec=0.0, + torque=torque + ) + + # Apply friction compensation to left arm (all joints INCLUDING gripper) + for motor in follower.bus_left.motors: + full_name = f"left_{motor}" + position = positions_deg.get(full_name, 0.0) + torque = friction_torques_nm.get(full_name, 0.0) + + # Get motor index for damping gain + motor_index = motor_name_to_index.get(motor, 0) + kd = DAMPING_KD[motor_index] + + # Send MIT control command with friction compensation + damping + follower.bus_left._mit_control( + motor=motor, + kp=0.0, # No position control + kd=kd, # Add damping for stability + position_degrees=position, + velocity_deg_per_sec=0.0, + torque=torque + ) + + # Measure loop time + loop_end = time.perf_counter() + loop_time = loop_end - loop_start + loop_times.append(loop_time) + + # Print status every 2 seconds + if loop_end - last_print_time >= 2.0: + if loop_times: + avg_time = sum(loop_times) / len(loop_times) + current_hz = 1.0 / avg_time if avg_time > 0 else 0 + + print(f"{current_hz:.1f} Hz") + + loop_times = [] + last_print_time = loop_end + + time.sleep(0.001) + + except KeyboardInterrupt: + print("\n\nStopping friction compensation...") + + finally: + print("\nDisabling all motors and disconnecting...") + follower.bus_right.disable_torque() + follower.bus_left.disable_torque() + time.sleep(0.1) + follower.disconnect() + print("✓ Safe shutdown complete") + + +if __name__ == "__main__": + main() + diff --git a/examples/openarms/teleop.py b/examples/openarms/teleop.py index d67491e1b..b088985e0 100644 --- a/examples/openarms/teleop.py +++ b/examples/openarms/teleop.py @@ -89,23 +89,39 @@ loop_times = [] start_time = time.perf_counter() last_print_time = start_time +# Detailed timing accumulators +timing_stats = { + "leader_read": [], + "filter_data": [], + "follower_write": [], +} + try: while True: loop_start = time.perf_counter() # Get action from leader + t0 = time.perf_counter() leader_action = leader.get_action() + t1 = time.perf_counter() + timing_stats["leader_read"].append((t1 - t0) * 1000) # Filter to only position data for all joints (both arms) + t2 = time.perf_counter() joint_action = {} for joint in all_joints: pos_key = f"{joint}.pos" if pos_key in leader_action: joint_action[pos_key] = leader_action[pos_key] + t3 = time.perf_counter() + timing_stats["filter_data"].append((t3 - t2) * 1000) # Send action to follower (both arms) + t4 = time.perf_counter() if joint_action: follower.send_action(joint_action) + t5 = time.perf_counter() + timing_stats["follower_write"].append((t5 - t4) * 1000) # Measure loop time loop_end = time.perf_counter() @@ -122,12 +138,31 @@ try: max_hz = 1.0 / min_time if min_time > 0 else 0 min_hz = 1.0 / max_time if max_time > 0 else 0 - print(f"[Hz Stats] Avg: {current_hz:.1f} Hz | " - f"Range: {min_hz:.1f}-{max_hz:.1f} Hz | " - f"Avg loop time: {avg_time*1000:.1f} ms") + # Calculate average timing for each operation + timing_avgs = { + k: (sum(v) / len(v) if v else 0.0) + for k, v in timing_stats.items() + } + + # Print detailed timing breakdown + print(f"\n{'='*70}") + print(f"Teleoperation - {current_hz:.1f} Hz | Total: {avg_time*1000:.1f} ms") + print(f"{'='*70}") + print(f" Leader Read: {timing_avgs['leader_read']:6.2f} ms") + print(f" Filter Data: {timing_avgs['filter_data']:6.2f} ms") + print(f" Follower Write: {timing_avgs['follower_write']:6.2f} ms") + print(f" ───────────────────────────────") + total_accounted = sum(timing_avgs.values()) + print(f" Accounted: {total_accounted:6.2f} ms") + print(f" Unaccounted: {avg_time*1000 - total_accounted:6.2f} ms") + print(f" ───────────────────────────────") + print(f" Hz Range: {min_hz:.1f} - {max_hz:.1f} Hz") + print(f"{'='*70}\n") # Reset for next measurement window loop_times = [] + for key in timing_stats: + timing_stats[key] = [] last_print_time = loop_end except KeyboardInterrupt: diff --git a/examples/openarms/teleop_with_gravity_compensation.py b/examples/openarms/teleop_with_compensation.py similarity index 65% rename from examples/openarms/teleop_with_gravity_compensation.py rename to examples/openarms/teleop_with_compensation.py index 8cebe4e18..bc50584ac 100755 --- a/examples/openarms/teleop_with_gravity_compensation.py +++ b/examples/openarms/teleop_with_compensation.py @@ -1,7 +1,7 @@ """ -OpenArms Teleoperation with Gravity Compensation +OpenArms Teleoperation with Gravity + Friction Compensation -Leader arms (both LEFT and RIGHT): Gravity compensation (weightless, easy to move) +Leader arms (both LEFT and RIGHT): Gravity + Friction compensation (weightless, easy to move) Follower arms (both LEFT and RIGHT): Mirror leader movements Uses the URDF file from the lerobot repository. @@ -16,6 +16,8 @@ from lerobot.robots.openarms.openarms_follower import OpenArmsFollower from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader +# Friction compensation scale factor (1.0 = full, 0.3 = 30% for stability) +FRICTION_SCALE = 1.0 def main(): """Main teleoperation loop with gravity compensation""" @@ -54,7 +56,7 @@ def main(): if leader.pin_robot is None: raise RuntimeError("URDF model not loaded on leader. Gravity compensation not available.") - print("\nLeader BOTH arms: G-comp | Follower BOTH arms: Teleop") + print("\nLeader BOTH arms: Gravity + Friction comp | Follower BOTH arms: Teleop") print("Press ENTER to start...") input() @@ -83,75 +85,75 @@ def main(): # Get leader state leader_action = leader.get_action() - # Extract positions in degrees + # Extract positions and velocities in degrees leader_positions_deg = {} + leader_velocities_deg_per_sec = {} + for motor in leader.bus_right.motors: - key = f"right_{motor}.pos" - if key in leader_action: - leader_positions_deg[f"right_{motor}"] = leader_action[key] + pos_key = f"right_{motor}.pos" + vel_key = f"right_{motor}.vel" + if pos_key in leader_action: + leader_positions_deg[f"right_{motor}"] = leader_action[pos_key] + if vel_key in leader_action: + leader_velocities_deg_per_sec[f"right_{motor}"] = leader_action[vel_key] for motor in leader.bus_left.motors: - key = f"left_{motor}.pos" - if key in leader_action: - leader_positions_deg[f"left_{motor}"] = leader_action[key] + pos_key = f"left_{motor}.pos" + vel_key = f"left_{motor}.vel" + if pos_key in leader_action: + leader_positions_deg[f"left_{motor}"] = leader_action[pos_key] + if vel_key in leader_action: + leader_velocities_deg_per_sec[f"left_{motor}"] = leader_action[vel_key] # Calculate gravity torques for leader using built-in method leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()} - leader_torques_nm = leader._gravity_from_q(leader_positions_rad) + leader_gravity_torques_nm = leader._gravity_from_q(leader_positions_rad) + + # Calculate friction torques for leader using built-in method + leader_velocities_rad_per_sec = {k: np.deg2rad(v) for k, v in leader_velocities_deg_per_sec.items()} + leader_friction_torques_nm = leader._friction_from_velocity( + leader_velocities_rad_per_sec, + friction_scale=FRICTION_SCALE + ) + + # Combine gravity + friction torques + leader_total_torques_nm = {} + for motor_name in leader_gravity_torques_nm: + gravity = leader_gravity_torques_nm.get(motor_name, 0.0) + friction = leader_friction_torques_nm.get(motor_name, 0.0) + leader_total_torques_nm[motor_name] = gravity + friction - # Apply gravity compensation to leader RIGHT arm (all joints except gripper) + # Apply gravity + friction compensation to leader RIGHT arm (all joints including gripper) for motor in leader.bus_right.motors: - if motor == "gripper": - # Skip gripper - keep it free - full_name = f"right_{motor}" - position = leader_positions_deg.get(full_name, 0.0) - leader.bus_right._mit_control( - motor=motor, - kp=0.0, - kd=0.0, - position_degrees=position, - velocity_deg_per_sec=0.0, - torque=0.0, - ) - continue - full_name = f"right_{motor}" position = leader_positions_deg.get(full_name, 0.0) - torque = leader_torques_nm.get(full_name, 0.0) + torque = leader_total_torques_nm.get(full_name, 0.0) + + # Get damping gain for stability + kd = leader.get_damping_kd(motor) leader.bus_right._mit_control( motor=motor, kp=0.0, - kd=0.0, + kd=kd, # Add damping for stability position_degrees=position, velocity_deg_per_sec=0.0, torque=torque, ) - # Apply gravity compensation to leader LEFT arm (all joints except gripper) + # Apply gravity + friction compensation to leader LEFT arm (all joints including gripper) for motor in leader.bus_left.motors: - if motor == "gripper": - # Skip gripper - keep it free - full_name = f"left_{motor}" - position = leader_positions_deg.get(full_name, 0.0) - leader.bus_left._mit_control( - motor=motor, - kp=0.0, - kd=0.0, - position_degrees=position, - velocity_deg_per_sec=0.0, - torque=0.0, - ) - continue - full_name = f"left_{motor}" position = leader_positions_deg.get(full_name, 0.0) - torque = leader_torques_nm.get(full_name, 0.0) + torque = leader_total_torques_nm.get(full_name, 0.0) + + # Get damping gain for stability + kd = leader.get_damping_kd(motor) - leader.bus_left._mit_control( + leader.bus_left._mit_control( motor=motor, kp=0.0, - kd=0.0, + kd=kd, # Add damping for stability position_degrees=position, velocity_deg_per_sec=0.0, torque=torque, diff --git a/src/lerobot/motors/damiao/damiao.py b/src/lerobot/motors/damiao/damiao.py index 14f695761..1d9e6eb7f 100644 --- a/src/lerobot/motors/damiao/damiao.py +++ b/src/lerobot/motors/damiao/damiao.py @@ -440,6 +440,65 @@ class DamiaoMotorsBus(MotorsBusBase): self.canbus.send(msg) recv_id = self._get_motor_recv_id(motor) self._recv_motor_response(expected_recv_id=recv_id) + + def _mit_control_batch( + self, + commands: Dict[NameOrID, Tuple[float, float, float, float, float]], + ) -> None: + """ + Send MIT control commands to multiple motors in batch (optimized). + Sends all commands first, then collects responses. Much faster than sequential. + + Args: + commands: Dict mapping motor name/ID to (kp, kd, position_deg, velocity_deg/s, torque) + Example: {'joint_1': (10.0, 0.5, 45.0, 0.0, 0.0), ...} + """ + if not commands: + return + + expected_recv_ids = [] + + # Step 1: Send all MIT control commands (no waiting) + for motor, (kp, kd, position_degrees, velocity_deg_per_sec, torque) in commands.items(): + motor_id = self._get_motor_id(motor) + motor_name = self._get_motor_name(motor) + motor_type = self._motor_types.get(motor_name, MotorType.DM4310) + + # Convert degrees to radians + position_rad = np.radians(position_degrees) + velocity_rad_per_sec = np.radians(velocity_deg_per_sec) + + # Get motor limits + pmax, vmax, tmax = MOTOR_LIMIT_PARAMS[motor_type] + + # Encode parameters + kp_uint = self._float_to_uint(kp, 0, 500, 12) + kd_uint = self._float_to_uint(kd, 0, 5, 12) + q_uint = self._float_to_uint(position_rad, -pmax, pmax, 16) + dq_uint = self._float_to_uint(velocity_rad_per_sec, -vmax, vmax, 12) + tau_uint = self._float_to_uint(torque, -tmax, tmax, 12) + + # Pack data + data = [0] * 8 + data[0] = (q_uint >> 8) & 0xFF + data[1] = q_uint & 0xFF + data[2] = dq_uint >> 4 + data[3] = ((dq_uint & 0xF) << 4) | ((kp_uint >> 8) & 0xF) + data[4] = kp_uint & 0xFF + data[5] = kd_uint >> 4 + data[6] = ((kd_uint & 0xF) << 4) | ((tau_uint >> 8) & 0xF) + data[7] = tau_uint & 0xFF + + # Send command + msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False) + self.canbus.send(msg) + + # Track expected response + recv_id = self._get_motor_recv_id(motor) + expected_recv_ids.append(recv_id) + + # Step 2: Collect all responses at once + self._recv_all_responses(expected_recv_ids, timeout=0.002) def _float_to_uint(self, x: float, x_min: float, x_max: float, bits: int) -> int: """Convert float to unsigned integer for CAN transmission.""" @@ -611,6 +670,63 @@ class DamiaoMotorsBus(MotorsBusBase): result[motor] = 0.0 return result + + def sync_read_all_states( + self, + motors: str | list[str] | None = None, + *, + num_retry: int = 0, + ) -> Dict[str, Dict[str, Value]]: + """ + Read ALL motor states (position, velocity, torque) from multiple motors in ONE refresh cycle. + This is 3x faster than calling sync_read() three times separately. + + Returns: + Dictionary mapping motor names to state dicts with keys: 'position', 'velocity', 'torque' + Example: {'joint_1': {'position': 45.2, 'velocity': 1.3, 'torque': 0.5}, ...} + """ + motors = self._get_motors_list(motors) + result = {} + + # Step 1: Send refresh commands to ALL motors first (no waiting) + for motor in motors: + motor_id = self._get_motor_id(motor) + data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0] + msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False) + self.canbus.send(msg) + + # Step 2: Collect all responses at once (batch receive) + expected_recv_ids = [self._get_motor_recv_id(motor) for motor in motors] + responses = self._recv_all_responses(expected_recv_ids, timeout=0.003) # 3ms total timeout + + # Step 3: Parse responses and extract ALL state values + for motor in motors: + try: + recv_id = self._get_motor_recv_id(motor) + msg = responses.get(recv_id) + + if msg is None: + logger.warning(f"No response from motor '{motor}' (recv ID: 0x{recv_id:02X})") + result[motor] = {"position": 0.0, "velocity": 0.0, "torque": 0.0} + continue + + motor_type = self._motor_types.get(motor, MotorType.DM4310) + position_degrees, velocity_deg_per_sec, torque, t_mos, t_rotor = self._decode_motor_state(msg.data, motor_type) + + # Return all state values in one dict + result[motor] = { + "position": position_degrees, + "velocity": velocity_deg_per_sec, + "torque": torque, + "temp_mos": t_mos, + "temp_rotor": t_rotor, + } + + except Exception as e: + logger.warning(f"Failed to read state from {motor}: {e}") + result[motor] = {"position": 0.0, "velocity": 0.0, "torque": 0.0} + + return result def sync_write( self, diff --git a/src/lerobot/robots/openarms/config_openarms_follower.py b/src/lerobot/robots/openarms/config_openarms_follower.py index daeb0e43b..5e28b2620 100644 --- a/src/lerobot/robots/openarms/config_openarms_follower.py +++ b/src/lerobot/robots/openarms/config_openarms_follower.py @@ -72,11 +72,22 @@ class OpenArmsFollowerConfig(RobotConfig): "gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310) }) - # MIT control parameters for position control (per motor) - # Values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper] + # MIT control parameters for position control (used in send_action) + # List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper] position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0]) position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2]) + # Damping gains for stability when applying torque compensation (gravity/friction) + # Used when kp=0 and only torque is applied + damping_kd: list[float] = field(default_factory=lambda: [0.5, 0.5, 0.5, 0.5, 0.1, 0.1, 0.1, 0.1]) + + # Friction model parameters: τ_fric(ω) = Fo + Fv·ω + Fc·tanh(k·ω) + # From OpenArms config/follower.yaml + friction_fc: list[float] = field(default_factory=lambda: [0.306, 0.306, 0.40, 0.166, 0.050, 0.093, 0.172, 0.0512]) # Coulomb friction [Nm] + friction_k: list[float] = field(default_factory=lambda: [28.417, 28.417, 29.065, 130.038, 151.771, 242.287, 7.888, 4.000]) # tanh steepness + friction_fv: list[float] = field(default_factory=lambda: [0.063, 0.0630, 0.604, 0.813, 0.029, 0.072, 0.084, 0.084]) # Viscous friction [Nm·s/rad] + friction_fo: list[float] = field(default_factory=lambda: [0.088, 0.088, 0.008, -0.058, 0.005, 0.009, -0.059, -0.050]) # Offset torque [Nm] + # Calibration parameters calibration_mode: str = "manual" # "manual" or "auto" zero_position_on_connect: bool = False # Set zero position on connect diff --git a/src/lerobot/robots/openarms/openarms_follower.py b/src/lerobot/robots/openarms/openarms_follower.py index e2dd5b68f..2d2aa98fd 100644 --- a/src/lerobot/robots/openarms/openarms_follower.py +++ b/src/lerobot/robots/openarms/openarms_follower.py @@ -296,32 +296,33 @@ class OpenArmsFollower(Robot): raise NotImplementedError("Motor ID configuration is typically done via manufacturer tools for CAN motors.") def get_observation(self) -> Dict[str, Any]: - """Get current observation from robot including position, velocity, and torque.""" + """ + Get current observation from robot including position, velocity, and torque. + + OPTIMIZED: Reads all motor states (pos/vel/torque) in one CAN refresh cycle + instead of 3 separate reads. + """ if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") obs_dict = {} - - # Read motor positions, velocities, and torques from right arm start = time.perf_counter() - positions_right = self.bus_right.sync_read("Present_Position") - velocities_right = self.bus_right.sync_read("Present_Velocity") - torques_right = self.bus_right.sync_read("Present_Torque") + # OPTIMIZED: Use sync_read_all_states to get pos/vel/torque in one go + right_states = self.bus_right.sync_read_all_states() for motor in self.bus_right.motors: - obs_dict[f"right_{motor}.pos"] = positions_right.get(motor, 0.0) - obs_dict[f"right_{motor}.vel"] = velocities_right.get(motor, 0.0) - obs_dict[f"right_{motor}.torque"] = torques_right.get(motor, 0.0) - - # Read motor positions, velocities, and torques from left arm - positions_left = self.bus_left.sync_read("Present_Position") - velocities_left = self.bus_left.sync_read("Present_Velocity") - torques_left = self.bus_left.sync_read("Present_Torque") + state = right_states.get(motor, {}) + obs_dict[f"right_{motor}.pos"] = state.get("position", 0.0) + obs_dict[f"right_{motor}.vel"] = state.get("velocity", 0.0) + obs_dict[f"right_{motor}.torque"] = state.get("torque", 0.0) + # OPTIMIZED: Use sync_read_all_states to get pos/vel/torque in one go + left_states = self.bus_left.sync_read_all_states() for motor in self.bus_left.motors: - obs_dict[f"left_{motor}.pos"] = positions_left.get(motor, 0.0) - obs_dict[f"left_{motor}.vel"] = velocities_left.get(motor, 0.0) - obs_dict[f"left_{motor}.torque"] = torques_left.get(motor, 0.0) + state = left_states.get(motor, {}) + obs_dict[f"left_{motor}.pos"] = state.get("position", 0.0) + obs_dict[f"left_{motor}.vel"] = state.get("velocity", 0.0) + obs_dict[f"left_{motor}.torque"] = state.get("torque", 0.0) dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") @@ -404,39 +405,25 @@ class OpenArmsFollower(Robot): "gripper": 7, } - # Send MIT control commands to right arm - for motor_name, position_degrees in goal_pos_right.items(): - # Get per-motor gains from config - idx = motor_index.get(motor_name, 0) - kp = self.config.position_kp[idx] - kd = self.config.position_kd[idx] - - # Send MIT control command (position is in degrees) - self.bus_right._mit_control( - motor_name, - kp=kp, - kd=kd, - position_degrees=position_degrees, - velocity_deg_per_sec=0.0, - torque=0.0 - ) + # Use batch MIT control for right arm (sends all commands, then collects responses) + if goal_pos_right: + commands_right = {} + for motor_name, position_degrees in goal_pos_right.items(): + idx = motor_index.get(motor_name, 0) + kp = self.config.position_kp[idx] if isinstance(self.config.position_kp, list) else self.config.position_kp + kd = self.config.position_kd[idx] if isinstance(self.config.position_kd, list) else self.config.position_kd + commands_right[motor_name] = (kp, kd, position_degrees, 0.0, 0.0) + self.bus_right._mit_control_batch(commands_right) - # Send MIT control commands to left arm - for motor_name, position_degrees in goal_pos_left.items(): - # Get per-motor gains from config - idx = motor_index.get(motor_name, 0) - kp = self.config.position_kp[idx] - kd = self.config.position_kd[idx] - - # Send MIT control command (position is in degrees) - self.bus_left._mit_control( - motor_name, - kp=kp, - kd=kd, - position_degrees=position_degrees, - velocity_deg_per_sec=0.0, - torque=0.0 - ) + # Use batch MIT control for left arm (sends all commands, then collects responses) + if goal_pos_left: + commands_left = {} + for motor_name, position_degrees in goal_pos_left.items(): + idx = motor_index.get(motor_name, 0) + kp = self.config.position_kp[idx] if isinstance(self.config.position_kp, list) else self.config.position_kp + kd = self.config.position_kd[idx] if isinstance(self.config.position_kd, list) else self.config.position_kd + commands_left[motor_name] = (kp, kd, position_degrees, 0.0, 0.0) + self.bus_left._mit_control_batch(commands_left) # Return the actions that were actually sent result = {} @@ -544,3 +531,94 @@ class OpenArmsFollower(Robot): return result + def _friction_from_velocity( + self, + velocity_rad_per_sec: Dict[str, float], + friction_scale: float = 1.0, + amp_tmp: float = 1.0, + coef_tmp: float = 0.1 + ) -> Dict[str, float]: + """ + Compute friction torques for all joints in the robot using tanh friction model. + + Args: + velocity_rad_per_sec: Dictionary mapping motor names (with arm prefix) to velocities in rad/s + friction_scale: Scale factor for friction compensation (default 1.0, use 0.3 for stability) + amp_tmp: Amplitude factor for tanh term (default 1.0) + coef_tmp: Coefficient for tanh steepness (default 0.1) + + Returns: + Dictionary mapping motor names to friction torques in N·m + """ + # Motor name to index mapping + motor_name_to_index = { + "joint_1": 0, + "joint_2": 1, + "joint_3": 2, + "joint_4": 3, + "joint_5": 4, + "joint_6": 5, + "joint_7": 6, + "gripper": 7, + } + + result = {} + + # Process all motors (left and right) + for motor_full_name, velocity in velocity_rad_per_sec.items(): + # Extract motor name without arm prefix + if motor_full_name.startswith("right_"): + motor_name = motor_full_name.removeprefix("right_") + elif motor_full_name.startswith("left_"): + motor_name = motor_full_name.removeprefix("left_") + else: + result[motor_full_name] = 0.0 + continue + + # Get motor index for friction parameters + motor_index = motor_name_to_index.get(motor_name, 0) + + # Get friction parameters from config + Fc = self.config.friction_fc[motor_index] + k = self.config.friction_k[motor_index] + Fv = self.config.friction_fv[motor_index] + Fo = self.config.friction_fo[motor_index] + + # Friction model: τ_fric = amp * Fc * tanh(coef * k * ω) + Fv * ω + Fo + friction_torque = ( + amp_tmp * Fc * np.tanh(coef_tmp * k * velocity) + + Fv * velocity + + Fo + ) + + # Apply scale factor + friction_torque *= friction_scale + + result[motor_full_name] = float(friction_torque) + + return result + + def get_damping_kd(self, motor_name: str) -> float: + """ + Get damping gain (Kd) for a specific motor. + + Args: + motor_name: Motor name without arm prefix (e.g., "joint_1", "gripper") + + Returns: + Damping gain value + """ + motor_name_to_index = { + "joint_1": 0, + "joint_2": 1, + "joint_3": 2, + "joint_4": 3, + "joint_5": 4, + "joint_6": 5, + "joint_7": 6, + "gripper": 7, + } + + motor_index = motor_name_to_index.get(motor_name, 0) + return self.config.damping_kd[motor_index] + diff --git a/src/lerobot/teleoperators/openarms/config_openarms_leader.py b/src/lerobot/teleoperators/openarms/config_openarms_leader.py index c0f02a9e2..094ecf0a9 100644 --- a/src/lerobot/teleoperators/openarms/config_openarms_leader.py +++ b/src/lerobot/teleoperators/openarms/config_openarms_leader.py @@ -62,3 +62,19 @@ class OpenArmsLeaderConfig(TeleoperatorConfig): # Torque mode settings for manual control # When enabled, motors have torque disabled for manual movement manual_control: bool = True + + # MIT control parameters (used when manual_control=False for torque control) + # List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper] + position_kp: list[float] = field(default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0]) + position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2]) + + # Damping gains for stability when applying torque compensation (gravity/friction) + # Used when kp=0 and only torque is applied + damping_kd: list[float] = field(default_factory=lambda: [0.5, 0.5, 0.5, 0.5, 0.1, 0.1, 0.1, 0.1]) + + # Friction model parameters: τ_fric(ω) = Fo + Fv·ω + Fc·tanh(k·ω) + # From OpenArms config/leader.yaml (note: Fc[5] is slightly different: 0.083 vs 0.093) + friction_fc: list[float] = field(default_factory=lambda: [0.306, 0.306, 0.40, 0.166, 0.050, 0.083, 0.172, 0.0512]) # Coulomb friction [Nm] + friction_k: list[float] = field(default_factory=lambda: [28.417, 28.417, 29.065, 130.038, 151.771, 242.287, 7.888, 4.000]) # tanh steepness + friction_fv: list[float] = field(default_factory=lambda: [0.063, 0.0630, 0.604, 0.813, 0.029, 0.072, 0.084, 0.084]) # Viscous friction [Nm·s/rad] + friction_fo: list[float] = field(default_factory=lambda: [0.088, 0.088, 0.008, -0.058, 0.005, 0.009, -0.059, -0.050]) # Offset torque [Nm] \ No newline at end of file diff --git a/src/lerobot/teleoperators/openarms/openarms_leader.py b/src/lerobot/teleoperators/openarms/openarms_leader.py index f129959ce..c75740ab4 100644 --- a/src/lerobot/teleoperators/openarms/openarms_leader.py +++ b/src/lerobot/teleoperators/openarms/openarms_leader.py @@ -276,32 +276,30 @@ class OpenArmsLeader(Teleoperator): This is the main method for teleoperators - it reads the current state of the leader arm and returns it as an action that can be sent to a follower. + + Reads all motor states (pos/vel/torque) in one CAN refresh cycle """ if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") action_dict = {} - - # Read motor positions, velocities, and torques from right arm start = time.perf_counter() - positions_right = self.bus_right.sync_read("Present_Position") - velocities_right = self.bus_right.sync_read("Present_Velocity") - torques_right = self.bus_right.sync_read("Present_Torque") + # OPTIMIZED: Use sync_read_all_states to get pos/vel/torque in one go + right_states = self.bus_right.sync_read_all_states() for motor in self.bus_right.motors: - action_dict[f"right_{motor}.pos"] = positions_right.get(motor, 0.0) - action_dict[f"right_{motor}.vel"] = velocities_right.get(motor, 0.0) - action_dict[f"right_{motor}.torque"] = torques_right.get(motor, 0.0) - - # Read motor positions, velocities, and torques from left arm - positions_left = self.bus_left.sync_read("Present_Position") - velocities_left = self.bus_left.sync_read("Present_Velocity") - torques_left = self.bus_left.sync_read("Present_Torque") + state = right_states.get(motor, {}) + action_dict[f"right_{motor}.pos"] = state.get("position", 0.0) + action_dict[f"right_{motor}.vel"] = state.get("velocity", 0.0) + action_dict[f"right_{motor}.torque"] = state.get("torque", 0.0) + # OPTIMIZED: Use sync_read_all_states to get pos/vel/torque in one go + left_states = self.bus_left.sync_read_all_states() for motor in self.bus_left.motors: - action_dict[f"left_{motor}.pos"] = positions_left.get(motor, 0.0) - action_dict[f"left_{motor}.vel"] = velocities_left.get(motor, 0.0) - action_dict[f"left_{motor}.torque"] = torques_left.get(motor, 0.0) + state = left_states.get(motor, {}) + action_dict[f"left_{motor}.pos"] = state.get("position", 0.0) + action_dict[f"left_{motor}.vel"] = state.get("velocity", 0.0) + action_dict[f"left_{motor}.torque"] = state.get("torque", 0.0) dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") @@ -412,5 +410,96 @@ class OpenArmsLeader(Teleoperator): idx += 2 return result + + def _friction_from_velocity( + self, + velocity_rad_per_sec: Dict[str, float], + friction_scale: float = 1.0, + amp_tmp: float = 1.0, + coef_tmp: float = 0.1 + ) -> Dict[str, float]: + """ + Compute friction torques for all joints in the robot using tanh friction model. + + Args: + velocity_rad_per_sec: Dictionary mapping motor names (with arm prefix) to velocities in rad/s + friction_scale: Scale factor for friction compensation (default 1.0, use 0.3 for stability) + amp_tmp: Amplitude factor for tanh term (default 1.0) + coef_tmp: Coefficient for tanh steepness (default 0.1) + + Returns: + Dictionary mapping motor names to friction torques in N·m + """ + # Motor name to index mapping + motor_name_to_index = { + "joint_1": 0, + "joint_2": 1, + "joint_3": 2, + "joint_4": 3, + "joint_5": 4, + "joint_6": 5, + "joint_7": 6, + "gripper": 7, + } + + result = {} + + # Process all motors (left and right) + for motor_full_name, velocity in velocity_rad_per_sec.items(): + # Extract motor name without arm prefix + if motor_full_name.startswith("right_"): + motor_name = motor_full_name.removeprefix("right_") + elif motor_full_name.startswith("left_"): + motor_name = motor_full_name.removeprefix("left_") + else: + result[motor_full_name] = 0.0 + continue + + # Get motor index for friction parameters + motor_index = motor_name_to_index.get(motor_name, 0) + + # Get friction parameters from config + Fc = self.config.friction_fc[motor_index] + k = self.config.friction_k[motor_index] + Fv = self.config.friction_fv[motor_index] + Fo = self.config.friction_fo[motor_index] + + # Friction model: τ_fric = amp * Fc * tanh(coef * k * ω) + Fv * ω + Fo + friction_torque = ( + amp_tmp * Fc * np.tanh(coef_tmp * k * velocity) + + Fv * velocity + + Fo + ) + + # Apply scale factor + friction_torque *= friction_scale + + result[motor_full_name] = float(friction_torque) + + return result + + def get_damping_kd(self, motor_name: str) -> float: + """ + Get damping gain (Kd) for a specific motor. + + Args: + motor_name: Motor name without arm prefix (e.g., "joint_1", "gripper") + + Returns: + Damping gain value + """ + motor_name_to_index = { + "joint_1": 0, + "joint_2": 1, + "joint_3": 2, + "joint_4": 3, + "joint_5": 4, + "joint_6": 5, + "joint_7": 6, + "gripper": 7, + } + + motor_index = motor_name_to_index.get(motor_name, 0) + return self.config.damping_kd[motor_index]