From 2437decd3fc1a6b9713524ee71267d7593e93e0c Mon Sep 17 00:00:00 2001 From: Pepijn Date: Wed, 16 Jul 2025 10:40:58 +0200 Subject: [PATCH] Cleanup unneeded code --- src/lerobot/bi_teleoperate.py | 210 ++++-------------- src/lerobot/grav_comp.py | 86 ------- src/lerobot/record.py | 1 + .../so101_follower_torque/so101_follower_t.py | 119 ++++++---- 4 files changed, 120 insertions(+), 296 deletions(-) delete mode 100644 src/lerobot/grav_comp.py diff --git a/src/lerobot/bi_teleoperate.py b/src/lerobot/bi_teleoperate.py index 5d937804c..c76dbce58 100644 --- a/src/lerobot/bi_teleoperate.py +++ b/src/lerobot/bi_teleoperate.py @@ -2,8 +2,6 @@ import math import sys import time -import numpy as np - from lerobot.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig from lerobot.robots.so101_follower_torque.so101_follower_t import SO101FollowerT from lerobot.utils.robot_utils import busy_wait @@ -15,25 +13,14 @@ RERUN_HZ = 100 ESC_CLR_EOL = "\x1b[K" CURSOR_UP = "\x1b[F" - -def vec(obs, key_suffix, joints): - """dict → 1-D numpy array (selected joints)""" - return np.array([obs[f"{m}{key_suffix}"] for m in joints]) - - -def to_cmd(tau_vec, joints): - """1-D array → {'joint.effort': value}""" - return {f"{m}.effort": v for m, v in zip(joints, tau_vec, strict=False)} - - follower_cfg = SO101FollowerTConfig( port="/dev/tty.usbmodem58760431551", - id="follower_arm_tt", + id="follower_arm_torque", ) leader_cfg = SO101FollowerTConfig( port="/dev/tty.usbmodem58760428721", - id="leader_arm_tt", + id="leader_arm_torque", ) follower = SO101FollowerT(follower_cfg) @@ -50,9 +37,6 @@ first_print = True loop_count = 0 tic_prev = time.perf_counter() -use_friction_comp = True # Enable/disable friction compensation -use_gravity_comp = True # Enable gravity compensation with corrected signs - # Initialize previous positions for velocity calculation pos_f_prev = None pos_l_prev = None @@ -91,142 +75,66 @@ while True: pos_l_prev = pos_l.copy() # Compute reaction torques using model-based approach - # Include friction in model-based disturbance for consistency - tau_reaction_f = follower._compute_model_based_disturbance( - pos_f, vel_f, acc_f, tau_meas_f, include_friction=use_friction_comp - ) - tau_reaction_l = leader._compute_model_based_disturbance( - pos_l, vel_l, acc_l, tau_meas_l, include_friction=use_friction_comp - ) - - # Get gravity for feed-forward compensation - grav_f = follower._gravity_from_q(pos_f) - grav_l = leader._gravity_from_q(pos_l) - - # Get inertia torques - inertia_f = follower._inertia_from_q_dq(pos_f, vel_f, acc_f) - inertia_l = leader._inertia_from_q_dq(pos_l, vel_l, acc_l) + tau_reaction_f = follower._compute_model_based_disturbance(pos_f, vel_f, acc_f, tau_meas_f) + tau_reaction_l = leader._compute_model_based_disturbance(pos_l, vel_l, acc_l, tau_meas_l) # Joint-specific control gains for better tracking - Kp_gains = follower.kp_gains # Use robot class properties - Kd_gains = follower.kd_gains # Use robot class properties + Kp_gains = follower.kp_gains + Kd_gains = follower.kd_gains + Kf_gains = follower.kf_gains - Kf = 0.05 # Force reflection gain (global) + # Compute torque commands in one line using list comprehension + tau_cmd_f = [ + Kp_gains[j] * (pos_l[j] - pos_f[j]) # Position tracking + + Kd_gains[j] * (vel_l[j] - vel_f[j]) # Velocity damping + + Kf_gains[j] * (-tau_reaction_l[j] - tau_reaction_f[j]) # Force reflection + for j in follower.bus.motors + ] - # Friction model parameters from robot class - friction_viscous = follower.friction_viscous # Use robot class properties - friction_coulomb = follower.friction_coulomb # Use robot class properties - - for j in follower.bus.motors: - # Bilateral control references (eqs. 5-7) - theta_cmd_f, omega_cmd_f = pos_l[j], vel_l[j] # Follower follows leader - theta_cmd_l, omega_cmd_l = pos_f[j], vel_f[j] # Leader follows follower - - tau_cmd_f_ref = -tau_reaction_l[j] # Reflect leader reaction torque - tau_cmd_l_ref = -tau_reaction_f[j] # Reflect follower reaction torque - - # Friction compensation (feedforward) - if use_friction_comp: - # Follower friction compensation - tau_friction_f = friction_viscous[j] * vel_f[j] + friction_coulomb[j] * ( - 1.0 if vel_f[j] > 0.01 else -1.0 if vel_f[j] < -0.01 else 0.0 - ) - - # Leader friction compensation - tau_friction_l = friction_viscous[j] * vel_l[j] + friction_coulomb[j] * ( - 1.0 if vel_l[j] > 0.01 else -1.0 if vel_l[j] < -0.01 else 0.0 - ) - else: - tau_friction_f = 0.0 - tau_friction_l = 0.0 - - # Follower control with joint-specific gains and friction compensation - tau_pos_f = Kp_gains[j] * (theta_cmd_f - pos_f[j]) - tau_vel_f = Kd_gains[j] * (omega_cmd_f - vel_f[j]) - tau_force_f = Kf * (tau_cmd_f_ref - tau_reaction_f[j]) - - # Apply gravity compensation conditionally - tau_gravity_f = grav_f[j] if use_gravity_comp else 0.0 - tau_inertia_f = inertia_f[j] if use_gravity_comp else 0.0 - - tau_ref_f = tau_pos_f + tau_vel_f + tau_force_f + tau_gravity_f + tau_inertia_f + tau_friction_f - - # Leader control with joint-specific gains and friction compensation - tau_pos_l = Kp_gains[j] * (theta_cmd_l - pos_l[j]) - tau_vel_l = Kd_gains[j] * (omega_cmd_l - vel_l[j]) - tau_force_l = Kf * (tau_cmd_l_ref - tau_reaction_l[j]) - - # Apply gravity compensation conditionally - tau_gravity_l = grav_l[j] if use_gravity_comp else 0.0 - tau_inertia_l = inertia_l[j] if use_gravity_comp else 0.0 - - tau_ref_l = tau_pos_l + tau_vel_l + tau_force_l + tau_gravity_l + tau_inertia_l + tau_friction_l - - tau_cmd_f.append(tau_ref_f) - tau_cmd_l.append(tau_ref_l) - - # Store interaction torques (cmd minus deterministic components) for model - if use_friction_comp: - # If friction compensation is enabled, friction is already removed in model-based disturbance - tau_interaction_l[j] = tau_ref_l - tau_gravity_l - tau_inertia_l - tau_friction_l - tau_interaction_f[j] = -tau_reaction_f[j] # Already has friction removed - else: - # If friction compensation is disabled, use original logic - tau_interaction_l[j] = tau_ref_l - tau_gravity_l - tau_inertia_l - tau_friction_l - tau_interaction_f[j] = -( - tau_meas_f[j] - tau_gravity_f - tau_inertia_f - tau_friction_f - ) # Invert to match leader + tau_cmd_l = [ + Kp_gains[j] * (pos_f[j] - pos_l[j]) # Position tracking + + Kd_gains[j] * (vel_f[j] - vel_l[j]) # Velocity damping + + Kf_gains[j] * (-tau_reaction_f[j] - tau_reaction_l[j]) # Force reflection + for j in leader.bus.motors + ] + # Store interaction torques and debug info + for i, j in enumerate(follower.bus.motors): # Store debug info debug_info_f[j] = { "τ_measured": tau_meas_f[j], - "τ_pos": tau_pos_f, - "τ_vel": tau_vel_f, - "τ_force": tau_force_f, - "τ_gravity": tau_gravity_f, # Use conditional gravity value - "τ_inertia": tau_inertia_f, # Use conditional inertia value - "τ_friction": tau_friction_f, # Add friction to debug info "τ_reaction": tau_reaction_f[j], - "τ_ref": tau_ref_f, - "θ_err": theta_cmd_f - pos_f[j], - "ω_err": omega_cmd_f - vel_f[j], - "τ_err": tau_cmd_f_ref - tau_reaction_f[j], + "τ_ref": tau_cmd_f[i], + "θ_err": pos_l[j] - pos_f[j], + "ω_err": vel_l[j] - vel_f[j], + "τ_err": -tau_reaction_l[j] - tau_reaction_f[j], } debug_info_l[j] = { "τ_measured": tau_meas_l[j], - "τ_pos": tau_pos_l, - "τ_vel": tau_vel_l, - "τ_force": tau_force_l, - "τ_gravity": tau_gravity_l, # Use conditional gravity value - "τ_inertia": tau_inertia_l, # Use conditional inertia value - "τ_friction": tau_friction_l, # Add friction to debug info "τ_reaction": tau_reaction_l[j], - "τ_ref": tau_ref_l, - "θ_err": theta_cmd_l - pos_l[j], - "ω_err": omega_cmd_l - vel_l[j], - "τ_err": tau_cmd_l_ref - tau_reaction_l[j], + "τ_ref": tau_cmd_l[i], + "θ_err": pos_f[j] - pos_l[j], + "ω_err": vel_f[j] - vel_l[j], + "τ_err": -tau_reaction_f[j] - tau_reaction_l[j], } # Send torques to both arms - follower_action = to_cmd(np.array(tau_cmd_f), list(follower.bus.motors.keys())) - leader_action = to_cmd(np.array(tau_cmd_l), list(leader.bus.motors.keys())) - - follower.send_action(follower_action) - leader.send_action(leader_action) + follower.send_action({f"{m}.effort": tau_cmd_f[i] for i, m in enumerate(follower.bus.motors)}) + leader.send_action({f"{m}.effort": tau_cmd_l[i] for i, m in enumerate(leader.bus.motors)}) # Create structured observation and action for rerun # Observation: follower side only (θ_f, ω_f, τ_ext) observation = { "follower_joint_angles": pos_f, # θ_f: current angles "follower_angular_velocities": vel_f, # ω_f: current velocities - "follower_external_torques": tau_interaction_f, # τ_ext: measured minus deterministic components + "follower_external_torques": tau_reaction_f, # τ_ext: measured minus deterministic components } # Action: leader targets (θ_leader[τ], ω_leader[τ], τ_leader[τ]) action = { "leader_target_angles": pos_l, # θ_leader[τ]: absolute target angles "leader_target_velocities": vel_l, # ω_leader[τ]: absolute target velocities - "leader_interaction_torques": tau_interaction_l, # τ_leader[τ]: cmd minus deterministic components + "leader_interaction_torques": tau_reaction_l, # τ_leader[τ]: cmd minus deterministic components } # Log data for visualization (100 Hz) @@ -242,12 +150,8 @@ while True: lines = [f"Loop {hz:6.1f} Hz Δt {dt * 1e3:5.2f} ms"] lines.append("=" * 106) lines.append("LEADER ARM TORQUE ANALYSIS:") - lines.append( - f"{'Joint':<13}{'Pos':>8}{'Grav':>6}{'Inert':>6}{'Track':>6}{'Vel':>6}{'Force':>6}{'Frict':>6}{'React':>6}{'Meas':>6}{'Cmd':>6}" - ) - lines.append( - f"{'':13}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}" - ) + lines.append(f"{'Joint':<13}{'Pos':>8}{'React':>6}{'Meas':>6}{'Cmd':>6}") + lines.append(f"{'':13}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}") lines.append("-" * 86) for i, j in enumerate(leader.bus.motors): @@ -256,12 +160,6 @@ while True: lines.append( f"{j:<13s}" f"{math.degrees(pos_l[j]):+8.1f}" - f"{debug_l['τ_gravity']:+6.2f}" - f"{debug_l['τ_inertia']:+6.2f}" - f"{debug_l['τ_pos']:+6.2f}" - f"{debug_l['τ_vel']:+6.2f}" - f"{debug_l['τ_force']:+6.2f}" - f"{debug_l['τ_friction']:+6.2f}" f"{debug_l['τ_reaction']:+6.2f}" f"{debug_l['τ_measured']:+6.2f}" f"{tau_cmd_l[i]:+6.2f}" @@ -269,12 +167,8 @@ while True: lines.append("") lines.append("FOLLOWER ARM TORQUE ANALYSIS:") - lines.append( - f"{'Joint':<13}{'Pos':>8}{'Grav':>6}{'Inert':>6}{'Track':>6}{'Vel':>6}{'Force':>6}{'Frict':>6}{'React':>6}{'Meas':>6}{'Cmd':>6}" - ) - lines.append( - f"{'':13}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}" - ) + lines.append(f"{'Joint':<13}{'Pos':>8}{'React':>6}{'Meas':>6}{'Cmd':>6}") + lines.append(f"{'':13}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}") lines.append("-" * 86) for i, j in enumerate(follower.bus.motors): @@ -283,12 +177,6 @@ while True: lines.append( f"{j:<13s}" f"{math.degrees(pos_f[j]):+8.1f}" - f"{debug_f['τ_gravity']:+6.2f}" - f"{debug_f['τ_inertia']:+6.2f}" - f"{debug_f['τ_pos']:+6.2f}" - f"{debug_f['τ_vel']:+6.2f}" - f"{debug_f['τ_force']:+6.2f}" - f"{debug_f['τ_friction']:+6.2f}" f"{debug_f['τ_reaction']:+6.2f}" f"{debug_f['τ_measured']:+6.2f}" f"{tau_cmd_f[i]:+6.2f}" @@ -298,35 +186,21 @@ while True: lines.append("=" * 86) lines.append("TORQUE COMPONENT EXPLANATIONS:") lines.append("• Pos (joint pos) = Joint position in degrees") - lines.append("• Grav (gravity) = Feed-forward gravity compensation") - lines.append("• Inert (inertia) = Feed-forward inertia compensation") - lines.append("• Track (tracking) = Position tracking control (Kp error)") - lines.append("• Vel (velocity) = Velocity damping control (Kd error)") - lines.append("• Force (bilateral) = Force reflection between robots (telepresence)") - lines.append("• Frict (friction) = Feed-forward friction compensation (transparency)") lines.append("• React (reaction) = External forces (human interaction, contact)") lines.append("• Meas (measured) = Raw torque from motor current sensor") lines.append("• Cmd (command) = Final torque sent to motor") lines.append("-" * 86) - lines.append("Cmd = Track + Vel + Force + Grav + Inert + Frict") lines.append( - "React = Meas - Grav - Inert - Frict (external forces)" - if use_friction_comp - else "React = Meas - Grav - Inert (external forces)" + "Cmd = Track + Vel + Force + (Added as feedforward in send_action: Grav + Inert + Frict)" ) + lines.append("React = Meas - Grav - Inert - Frict (external forces)") lines.append("Force = Kf × (reflect_other_robot - React) (telepresence)") lines.append("Frict = b_visc×ω + f_coulomb×sign(ω) (transparency)") lines.append( - f"Control Gains: Kp=robot-class (position) | Kd=robot-class (velocity) | Kf={Kf} (force reflection)" + f"Joint Gains: shoulder_pan Kp={Kp_gains['shoulder_pan']:.1f} | shoulder_pan Kd={Kd_gains['shoulder_pan']:.1f} | shoulder_pan Kf={Kf_gains['shoulder_pan']:.1f}" ) lines.append( - f"Joint Gains: shoulder_pan Kp={Kp_gains['shoulder_pan']:.1f} | shoulder_lift Kp={Kp_gains['shoulder_lift']:.1f} | elbow_flex Kp={Kp_gains['elbow_flex']:.1f}" - ) - lines.append( - f"Friction Comp: {'ON' if use_friction_comp else 'OFF'} | Viscous: {friction_viscous['shoulder_pan']:.3f} | Coulomb: {friction_coulomb['shoulder_pan']:.3f} (robot-class)" - ) - lines.append( - f"Gravity Comp: {'ON' if use_gravity_comp else 'OFF'} | Inertia Comp: {'ON' if use_gravity_comp else 'OFF'} | (Both disabled for testing)" + f"Friction Comp, Viscous: {follower.friction_viscous['shoulder_pan']:.3f} | Coulomb: {follower.friction_coulomb['shoulder_pan']:.3f} (robot-class)" ) block = "\n".join(lines) diff --git a/src/lerobot/grav_comp.py b/src/lerobot/grav_comp.py deleted file mode 100644 index fa416598a..000000000 --- a/src/lerobot/grav_comp.py +++ /dev/null @@ -1,86 +0,0 @@ -import sys -import time - -import numpy as np -import pinocchio as pin - -from lerobot.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig -from lerobot.robots.so101_follower_torque.so101_follower_t import SO101FollowerT - -MOTORS = [ - "shoulder_pan", - "shoulder_lift", - "elbow_flex", - "wrist_flex", - "wrist_roll", - "gripper", -] - -FLIP_TORQUE = np.array( - [ - True, # shoulder_pan - True, # shoulder_lift t - True, # elbow_flex t - True, # wrist_flex t - True, # wrist_roll - False, # gripper - ], - dtype=bool, -) - -SIGN = np.where(FLIP_TORQUE, -1.0, 1.0) - -IDX = {name: i for i, name in enumerate(MOTORS)} - - -def obs_to_q(obs): - """Extract joint vector q [rad] directly from observation dict.""" - q = np.zeros(len(MOTORS)) - for m in MOTORS: - q[IDX[m]] = obs[f"{m}.pos"] # already in rad - return q - - -def tau_to_action(tau): - """Convert τ array (Nm) to action dict understood by send_action().""" - tau = SIGN * tau - return {f"{m}.effort": float(tau[IDX[m]]) for m in MOTORS} - - -cfg = SO101FollowerTConfig(port="/dev/tty.usbmodem58760431551", id="follower_torque6") -real = SO101FollowerT(cfg) -real.connect() - -real.pin_robot.data = real.pin_robot.model.createData() -real.pin_robot.initViewer(open=True) -real.pin_robot.loadViewerModel() -real.pin_robot.display(real.pin_robot.q0) - -ESC_CLR_EOL = "\x1b[K" - -print("Running gravity compensation") -try: - while True: - obs = real.get_observation() # positions in rad - q = obs_to_q(obs) # dict to vector - - tau = pin.computeGeneralizedGravity(real.pin_robot.model, real.pin_robot.data, q) # τ in [Nm] - - # build compact single-line status - status = " | ".join( - f"{m}: {np.degrees(q[i]):+5.1f}° Δτ {(abs(obs[f'{m}.effort']) - abs(tau[i])):+5.1f}" - for i, m in enumerate(MOTORS) - ) - - # overwrite previous status - sys.stdout.write("\r" + ESC_CLR_EOL + status) - sys.stdout.flush() - - real.send_action(tau_to_action(tau)) # apply τ - - real.pin_robot.display(q) - time.sleep(0.002) # Run at 500 Hz -except KeyboardInterrupt: - print("Stopping") -finally: - real.disconnect() diff --git a/src/lerobot/record.py b/src/lerobot/record.py index 9fc0dc7ed..b49e8bd74 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -171,6 +171,7 @@ def record_loop( control_time_s: int | None = None, single_task: str | None = None, display_data: bool = False, + biteleop: bool = False, ): if dataset is not None and dataset.fps != fps: raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).") diff --git a/src/lerobot/robots/so101_follower_torque/so101_follower_t.py b/src/lerobot/robots/so101_follower_torque/so101_follower_t.py index c8454b0df..469282e6c 100644 --- a/src/lerobot/robots/so101_follower_torque/so101_follower_t.py +++ b/src/lerobot/robots/so101_follower_torque/so101_follower_t.py @@ -40,7 +40,7 @@ logger = logging.getLogger(__name__) class SO101FollowerT(Robot): """ - SO-101 Follower Arm designed by TheRobotStudio and Hugging Face. + SO-101 Arm with HLS3625 motors with current control. """ config_class = SO101FollowerTConfig @@ -48,10 +48,10 @@ class SO101FollowerT(Robot): _CURRENT_STEP_A: float = 6.5e-3 # 6.5 mA per register LSB #http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0 _KT_NM_PER_AMP: float = 0.814 # Torque constant Kt [N·m/A] #https://www.feetechrc.com/811177.html - _MAX_CURRENT_A: float = 4.0 # Safe driver limit for this model + _MAX_CURRENT_A: float = 4.0 # Safe driver limit - # Control gains for bilateral teleoperation - _KP_GAINS = { # Position gains [Nm/rad] - reduced for bilateral stability + # Position gains [Nm/rad] + _KP_GAINS = { "shoulder_pan": 6.0, "shoulder_lift": 7.0, "elbow_flex": 7.0, @@ -60,7 +60,8 @@ class SO101FollowerT(Robot): "gripper": 4.0, } - _KD_GAINS = { # Velocity gains [Nm⋅s/rad] - matched to position gains + # Velocity gains [Nm⋅s/rad] + _KD_GAINS = { "shoulder_pan": 0.4, "shoulder_lift": 0.8, "elbow_flex": 0.7, @@ -69,8 +70,18 @@ class SO101FollowerT(Robot): "gripper": 0.3, } - # Friction model parameters - _FRICTION_VISCOUS = { # Viscous friction coefficient [Nm⋅s/rad] per joint + # Force gains + _KF_GAINS = { + "shoulder_pan": 0.05, + "shoulder_lift": 0.05, + "elbow_flex": 0.05, + "wrist_flex": 0.05, + "wrist_roll": 0.05, + "gripper": 0.05, + } + + # Viscous friction coefficient [Nm⋅s/rad] per joint + _FRICTION_VISCOUS = { "shoulder_pan": 0.02, "shoulder_lift": 0.08, "elbow_flex": 0.05, @@ -79,7 +90,8 @@ class SO101FollowerT(Robot): "gripper": 0.01, } - _FRICTION_COULOMB = { # Coulomb friction [Nm] per joint + # Coulomb/static friction [Nm] per joint + _FRICTION_COULOMB = { "shoulder_pan": 0.15, "shoulder_lift": 0.25, "elbow_flex": 0.20, @@ -129,28 +141,29 @@ class SO101FollowerT(Robot): self._prev_t: float | None = None # Butterworth low-pass filter parameters - self._cutoff_freq = 10.0 # Hz - cutoff frequency for the filter - self._filter_order = 2 # Filter order (2nd order is common) - self._sampling_freq = 100.0 # Hz - assumed control loop frequency + self._cutoff_freq = 10.0 # Hz, cutoff frequency for the filter + self._filter_order = 2 # Filter order + self._sampling_freq = 100.0 # Hz, (control loop frequency) - # Design the Butterworth filter nyquist_freq = self._sampling_freq / 2 normalized_cutoff = self._cutoff_freq / nyquist_freq self._b, self._a = butter(self._filter_order, normalized_cutoff, btype="low") - # History buffers for filtering + # History buffers self._pos_history = {m: collections.deque(maxlen=20) for m in self.bus.motors} self._vel_raw_history = {m: collections.deque(maxlen=20) for m in self.bus.motors} self._time_history = collections.deque(maxlen=20) + self._last_observation = None + @property def _motors_ft(self) -> dict[str, type]: d: dict[str, type] = {} for m in self.bus.motors: d[f"{m}.pos"] = float d[f"{m}.vel"] = float - d[f"{m}.acc"] = float # Add acceleration - d[f"{m}.tau_meas"] = float # Changed from tau_res to tau_meas + d[f"{m}.acc"] = float + d[f"{m}.tau_meas"] = float return d @property @@ -181,6 +194,11 @@ class SO101FollowerT(Robot): """Velocity control gains [Nm⋅s/rad] for bilateral teleoperation""" return self._KD_GAINS.copy() + @property + def kf_gains(self) -> dict[str, float]: + """Force control gains for bilateral teleoperation""" + return self._KF_GAINS.copy() + @property def friction_viscous(self) -> dict[str, float]: """Viscous friction coefficients [Nm⋅s/rad] for friction compensation""" @@ -210,12 +228,11 @@ class SO101FollowerT(Robot): self._cutoff_freq = cutoff_freq self._filter_order = order - # Redesign the filter nyquist_freq = self._sampling_freq / 2 normalized_cutoff = self._cutoff_freq / nyquist_freq self._b, self._a = butter(self._filter_order, normalized_cutoff, btype="low") - # Clear history buffers + # Clear buffers for m in self.bus.motors: self._pos_history[m].clear() self._vel_raw_history[m].clear() @@ -265,7 +282,6 @@ class SO101FollowerT(Robot): """ Compute inertia torques τ_inertia = M(q) * ddq directly from URDF model. """ - # Convert joint dictionaries to numpy arrays in correct order q = np.zeros(self.pin_robot.model.nq) dq = np.zeros(self.pin_robot.model.nv) ddq = np.zeros(self.pin_robot.model.nv) @@ -289,7 +305,6 @@ class SO101FollowerT(Robot): dq_rad: dict[str, float], ddq_rad: dict[str, float], tau_measured: dict[str, float], - include_friction: bool = True, ) -> dict[str, float]: """ Compute disturbance torques using direct model-based approach: @@ -298,29 +313,24 @@ class SO101FollowerT(Robot): Args: include_friction: If True, also removes friction from the disturbance calculation """ - # Get gravity compensation tau_gravity = self._gravity_from_q(q_rad) - - # Get inertia compensation tau_inertia = self._inertia_from_q_dq(q_rad, dq_rad, ddq_rad) # Compute disturbance: what's left after removing known dynamics tau_disturbance = {} + tau_friction = {} for motor_name in self.bus.motors: tau_dist = tau_measured[motor_name] - tau_gravity[motor_name] - tau_inertia[motor_name] - # Optionally remove friction model - if include_friction: - # Calculate friction torque using class constants - omega = dq_rad[motor_name] - tau_friction = self._FRICTION_VISCOUS[motor_name] * omega + self._FRICTION_COULOMB[ - motor_name - ] * (1.0 if omega > 0.01 else -1.0 if omega < -0.01 else 0.0) - - # Apply torque sign correction (same as for gravity/inertia) - tau_friction = -tau_friction # Apply torque sign correction - - tau_dist -= tau_friction + # Calculate friction torque + omega = dq_rad[motor_name] + tau_friction_motor = self._FRICTION_VISCOUS[motor_name] * omega + self._FRICTION_COULOMB[ + motor_name + ] * (1.0 if omega > 0.01 else -1.0 if omega < -0.01 else 0.0) + # Apply torque sign correction + tau_friction_motor = -tau_friction_motor + tau_friction[motor_name] = tau_friction_motor + tau_dist -= tau_friction_motor tau_disturbance[motor_name] = tau_dist @@ -407,7 +417,7 @@ class SO101FollowerT(Robot): t_now = time.perf_counter() - # position + # Position pos_deg = self.bus.sync_read("Present_Position", num_retry=5) pos_rad = self._deg_to_rad(pos_deg) @@ -432,17 +442,13 @@ class SO101FollowerT(Robot): # Apply Butterworth low-pass filter to velocity vel_rad = {} for m in pos_rad: - if len(self._vel_raw_history[m]) >= 10: # Need enough samples for good filtering - # Convert deque to numpy array + if len(self._vel_raw_history[m]) >= 10: vel_raw_array = np.array(list(self._vel_raw_history[m])) # Apply Butterworth filter vel_filtered = lfilter(self._b, self._a, vel_raw_array) - - # Use the most recent filtered value vel_rad[m] = vel_filtered[-1] else: - # Not enough history, use raw velocity vel_rad[m] = vel_rad_raw[m] # Calculate acceleration from filtered velocity @@ -454,12 +460,11 @@ class SO101FollowerT(Robot): dt = max(dt, 1e-4) # Avoid division by zero acc_rad = {m: (vel_rad[m] - self._prev_vel_rad[m]) / dt for m in vel_rad} - # Update previous values self._prev_pos_rad = pos_rad.copy() self._prev_vel_rad = vel_rad.copy() self._prev_t = t_now - # measured torque (Nm) + # Measured torque (Nm) cur_raw = self.bus.sync_read("Present_Current", normalize=False, num_retry=5) tau_meas = self._current_to_torque_nm(cur_raw) @@ -476,6 +481,9 @@ class SO101FollowerT(Robot): dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + # Store observation for feedforward compensation + self._last_observation = obs_dict.copy() + return obs_dict def send_action(self, action: dict[str, Any]) -> dict[str, Any]: @@ -495,6 +503,33 @@ class SO101FollowerT(Robot): if not tau_cmd_nm: return action + # Add feedforward compensation if we have a last observation + if self._last_observation is not None: + # Extract position, velocity, acceleration from last observation + pos_rad = {m: self._last_observation[f"{m}.pos"] for m in self.bus.motors} + vel_rad = {m: self._last_observation[f"{m}.vel"] for m in self.bus.motors} + acc_rad = {m: self._last_observation[f"{m}.acc"] for m in self.bus.motors} + + # Compute feedforward terms + tau_gravity = self._gravity_from_q(pos_rad) + tau_inertia = self._inertia_from_q_dq(pos_rad, vel_rad, acc_rad) + + # Add feedforward compensation to commanded torques + for motor in tau_cmd_nm: + # Add gravity compensation + tau_cmd_nm[motor] += tau_gravity[motor] + + # Add inertia compensation + tau_cmd_nm[motor] += tau_inertia[motor] + + # Add friction compensation + omega = vel_rad[motor] + tau_friction = self._FRICTION_VISCOUS[motor] * omega + self._FRICTION_COULOMB[motor] * ( + 1.0 if omega > 0.01 else -1.0 if omega < -0.01 else 0.0 + ) + tau_friction = -tau_friction # Apply torque sign correction + tau_cmd_nm[motor] += tau_friction + inv_coef = 1.0 / (self._CURRENT_STEP_A * self._KT_NM_PER_AMP) max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) counts = {}