Cleanup unneeded code

This commit is contained in:
Pepijn
2025-07-16 10:40:58 +02:00
parent 2d2f5d3d60
commit 2437decd3f
4 changed files with 120 additions and 296 deletions
+42 -168
View File
@@ -2,8 +2,6 @@ import math
import sys
import time
import numpy as np
from lerobot.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig
from lerobot.robots.so101_follower_torque.so101_follower_t import SO101FollowerT
from lerobot.utils.robot_utils import busy_wait
@@ -15,25 +13,14 @@ RERUN_HZ = 100
ESC_CLR_EOL = "\x1b[K"
CURSOR_UP = "\x1b[F"
def vec(obs, key_suffix, joints):
"""dict → 1-D numpy array (selected joints)"""
return np.array([obs[f"{m}{key_suffix}"] for m in joints])
def to_cmd(tau_vec, joints):
"""1-D array → {'joint.effort': value}"""
return {f"{m}.effort": v for m, v in zip(joints, tau_vec, strict=False)}
follower_cfg = SO101FollowerTConfig(
port="/dev/tty.usbmodem58760431551",
id="follower_arm_tt",
id="follower_arm_torque",
)
leader_cfg = SO101FollowerTConfig(
port="/dev/tty.usbmodem58760428721",
id="leader_arm_tt",
id="leader_arm_torque",
)
follower = SO101FollowerT(follower_cfg)
@@ -50,9 +37,6 @@ first_print = True
loop_count = 0
tic_prev = time.perf_counter()
use_friction_comp = True # Enable/disable friction compensation
use_gravity_comp = True # Enable gravity compensation with corrected signs
# Initialize previous positions for velocity calculation
pos_f_prev = None
pos_l_prev = None
@@ -91,142 +75,66 @@ while True:
pos_l_prev = pos_l.copy()
# Compute reaction torques using model-based approach
# Include friction in model-based disturbance for consistency
tau_reaction_f = follower._compute_model_based_disturbance(
pos_f, vel_f, acc_f, tau_meas_f, include_friction=use_friction_comp
)
tau_reaction_l = leader._compute_model_based_disturbance(
pos_l, vel_l, acc_l, tau_meas_l, include_friction=use_friction_comp
)
# Get gravity for feed-forward compensation
grav_f = follower._gravity_from_q(pos_f)
grav_l = leader._gravity_from_q(pos_l)
# Get inertia torques
inertia_f = follower._inertia_from_q_dq(pos_f, vel_f, acc_f)
inertia_l = leader._inertia_from_q_dq(pos_l, vel_l, acc_l)
tau_reaction_f = follower._compute_model_based_disturbance(pos_f, vel_f, acc_f, tau_meas_f)
tau_reaction_l = leader._compute_model_based_disturbance(pos_l, vel_l, acc_l, tau_meas_l)
# Joint-specific control gains for better tracking
Kp_gains = follower.kp_gains # Use robot class properties
Kd_gains = follower.kd_gains # Use robot class properties
Kp_gains = follower.kp_gains
Kd_gains = follower.kd_gains
Kf_gains = follower.kf_gains
Kf = 0.05 # Force reflection gain (global)
# Compute torque commands in one line using list comprehension
tau_cmd_f = [
Kp_gains[j] * (pos_l[j] - pos_f[j]) # Position tracking
+ Kd_gains[j] * (vel_l[j] - vel_f[j]) # Velocity damping
+ Kf_gains[j] * (-tau_reaction_l[j] - tau_reaction_f[j]) # Force reflection
for j in follower.bus.motors
]
# Friction model parameters from robot class
friction_viscous = follower.friction_viscous # Use robot class properties
friction_coulomb = follower.friction_coulomb # Use robot class properties
for j in follower.bus.motors:
# Bilateral control references (eqs. 5-7)
theta_cmd_f, omega_cmd_f = pos_l[j], vel_l[j] # Follower follows leader
theta_cmd_l, omega_cmd_l = pos_f[j], vel_f[j] # Leader follows follower
tau_cmd_f_ref = -tau_reaction_l[j] # Reflect leader reaction torque
tau_cmd_l_ref = -tau_reaction_f[j] # Reflect follower reaction torque
# Friction compensation (feedforward)
if use_friction_comp:
# Follower friction compensation
tau_friction_f = friction_viscous[j] * vel_f[j] + friction_coulomb[j] * (
1.0 if vel_f[j] > 0.01 else -1.0 if vel_f[j] < -0.01 else 0.0
)
# Leader friction compensation
tau_friction_l = friction_viscous[j] * vel_l[j] + friction_coulomb[j] * (
1.0 if vel_l[j] > 0.01 else -1.0 if vel_l[j] < -0.01 else 0.0
)
else:
tau_friction_f = 0.0
tau_friction_l = 0.0
# Follower control with joint-specific gains and friction compensation
tau_pos_f = Kp_gains[j] * (theta_cmd_f - pos_f[j])
tau_vel_f = Kd_gains[j] * (omega_cmd_f - vel_f[j])
tau_force_f = Kf * (tau_cmd_f_ref - tau_reaction_f[j])
# Apply gravity compensation conditionally
tau_gravity_f = grav_f[j] if use_gravity_comp else 0.0
tau_inertia_f = inertia_f[j] if use_gravity_comp else 0.0
tau_ref_f = tau_pos_f + tau_vel_f + tau_force_f + tau_gravity_f + tau_inertia_f + tau_friction_f
# Leader control with joint-specific gains and friction compensation
tau_pos_l = Kp_gains[j] * (theta_cmd_l - pos_l[j])
tau_vel_l = Kd_gains[j] * (omega_cmd_l - vel_l[j])
tau_force_l = Kf * (tau_cmd_l_ref - tau_reaction_l[j])
# Apply gravity compensation conditionally
tau_gravity_l = grav_l[j] if use_gravity_comp else 0.0
tau_inertia_l = inertia_l[j] if use_gravity_comp else 0.0
tau_ref_l = tau_pos_l + tau_vel_l + tau_force_l + tau_gravity_l + tau_inertia_l + tau_friction_l
tau_cmd_f.append(tau_ref_f)
tau_cmd_l.append(tau_ref_l)
# Store interaction torques (cmd minus deterministic components) for model
if use_friction_comp:
# If friction compensation is enabled, friction is already removed in model-based disturbance
tau_interaction_l[j] = tau_ref_l - tau_gravity_l - tau_inertia_l - tau_friction_l
tau_interaction_f[j] = -tau_reaction_f[j] # Already has friction removed
else:
# If friction compensation is disabled, use original logic
tau_interaction_l[j] = tau_ref_l - tau_gravity_l - tau_inertia_l - tau_friction_l
tau_interaction_f[j] = -(
tau_meas_f[j] - tau_gravity_f - tau_inertia_f - tau_friction_f
) # Invert to match leader
tau_cmd_l = [
Kp_gains[j] * (pos_f[j] - pos_l[j]) # Position tracking
+ Kd_gains[j] * (vel_f[j] - vel_l[j]) # Velocity damping
+ Kf_gains[j] * (-tau_reaction_f[j] - tau_reaction_l[j]) # Force reflection
for j in leader.bus.motors
]
# Store interaction torques and debug info
for i, j in enumerate(follower.bus.motors):
# Store debug info
debug_info_f[j] = {
"τ_measured": tau_meas_f[j],
"τ_pos": tau_pos_f,
"τ_vel": tau_vel_f,
"τ_force": tau_force_f,
"τ_gravity": tau_gravity_f, # Use conditional gravity value
"τ_inertia": tau_inertia_f, # Use conditional inertia value
"τ_friction": tau_friction_f, # Add friction to debug info
"τ_reaction": tau_reaction_f[j],
"τ_ref": tau_ref_f,
"θ_err": theta_cmd_f - pos_f[j],
"ω_err": omega_cmd_f - vel_f[j],
"τ_err": tau_cmd_f_ref - tau_reaction_f[j],
"τ_ref": tau_cmd_f[i],
"θ_err": pos_l[j] - pos_f[j],
"ω_err": vel_l[j] - vel_f[j],
"τ_err": -tau_reaction_l[j] - tau_reaction_f[j],
}
debug_info_l[j] = {
"τ_measured": tau_meas_l[j],
"τ_pos": tau_pos_l,
"τ_vel": tau_vel_l,
"τ_force": tau_force_l,
"τ_gravity": tau_gravity_l, # Use conditional gravity value
"τ_inertia": tau_inertia_l, # Use conditional inertia value
"τ_friction": tau_friction_l, # Add friction to debug info
"τ_reaction": tau_reaction_l[j],
"τ_ref": tau_ref_l,
"θ_err": theta_cmd_l - pos_l[j],
"ω_err": omega_cmd_l - vel_l[j],
"τ_err": tau_cmd_l_ref - tau_reaction_l[j],
"τ_ref": tau_cmd_l[i],
"θ_err": pos_f[j] - pos_l[j],
"ω_err": vel_f[j] - vel_l[j],
"τ_err": -tau_reaction_f[j] - tau_reaction_l[j],
}
# Send torques to both arms
follower_action = to_cmd(np.array(tau_cmd_f), list(follower.bus.motors.keys()))
leader_action = to_cmd(np.array(tau_cmd_l), list(leader.bus.motors.keys()))
follower.send_action(follower_action)
leader.send_action(leader_action)
follower.send_action({f"{m}.effort": tau_cmd_f[i] for i, m in enumerate(follower.bus.motors)})
leader.send_action({f"{m}.effort": tau_cmd_l[i] for i, m in enumerate(leader.bus.motors)})
# Create structured observation and action for rerun
# Observation: follower side only (θ_f, ω_f, τ_ext)
observation = {
"follower_joint_angles": pos_f, # θ_f: current angles
"follower_angular_velocities": vel_f, # ω_f: current velocities
"follower_external_torques": tau_interaction_f, # τ_ext: measured minus deterministic components
"follower_external_torques": tau_reaction_f, # τ_ext: measured minus deterministic components
}
# Action: leader targets (θ_leader[τ], ω_leader[τ], τ_leader[τ])
action = {
"leader_target_angles": pos_l, # θ_leader[τ]: absolute target angles
"leader_target_velocities": vel_l, # ω_leader[τ]: absolute target velocities
"leader_interaction_torques": tau_interaction_l, # τ_leader[τ]: cmd minus deterministic components
"leader_interaction_torques": tau_reaction_l, # τ_leader[τ]: cmd minus deterministic components
}
# Log data for visualization (100 Hz)
@@ -242,12 +150,8 @@ while True:
lines = [f"Loop {hz:6.1f} Hz Δt {dt * 1e3:5.2f} ms"]
lines.append("=" * 106)
lines.append("LEADER ARM TORQUE ANALYSIS:")
lines.append(
f"{'Joint':<13}{'Pos':>8}{'Grav':>6}{'Inert':>6}{'Track':>6}{'Vel':>6}{'Force':>6}{'Frict':>6}{'React':>6}{'Meas':>6}{'Cmd':>6}"
)
lines.append(
f"{'':13}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}"
)
lines.append(f"{'Joint':<13}{'Pos':>8}{'React':>6}{'Meas':>6}{'Cmd':>6}")
lines.append(f"{'':13}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}")
lines.append("-" * 86)
for i, j in enumerate(leader.bus.motors):
@@ -256,12 +160,6 @@ while True:
lines.append(
f"{j:<13s}"
f"{math.degrees(pos_l[j]):+8.1f}"
f"{debug_l['τ_gravity']:+6.2f}"
f"{debug_l['τ_inertia']:+6.2f}"
f"{debug_l['τ_pos']:+6.2f}"
f"{debug_l['τ_vel']:+6.2f}"
f"{debug_l['τ_force']:+6.2f}"
f"{debug_l['τ_friction']:+6.2f}"
f"{debug_l['τ_reaction']:+6.2f}"
f"{debug_l['τ_measured']:+6.2f}"
f"{tau_cmd_l[i]:+6.2f}"
@@ -269,12 +167,8 @@ while True:
lines.append("")
lines.append("FOLLOWER ARM TORQUE ANALYSIS:")
lines.append(
f"{'Joint':<13}{'Pos':>8}{'Grav':>6}{'Inert':>6}{'Track':>6}{'Vel':>6}{'Force':>6}{'Frict':>6}{'React':>6}{'Meas':>6}{'Cmd':>6}"
)
lines.append(
f"{'':13}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}"
)
lines.append(f"{'Joint':<13}{'Pos':>8}{'React':>6}{'Meas':>6}{'Cmd':>6}")
lines.append(f"{'':13}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}")
lines.append("-" * 86)
for i, j in enumerate(follower.bus.motors):
@@ -283,12 +177,6 @@ while True:
lines.append(
f"{j:<13s}"
f"{math.degrees(pos_f[j]):+8.1f}"
f"{debug_f['τ_gravity']:+6.2f}"
f"{debug_f['τ_inertia']:+6.2f}"
f"{debug_f['τ_pos']:+6.2f}"
f"{debug_f['τ_vel']:+6.2f}"
f"{debug_f['τ_force']:+6.2f}"
f"{debug_f['τ_friction']:+6.2f}"
f"{debug_f['τ_reaction']:+6.2f}"
f"{debug_f['τ_measured']:+6.2f}"
f"{tau_cmd_f[i]:+6.2f}"
@@ -298,35 +186,21 @@ while True:
lines.append("=" * 86)
lines.append("TORQUE COMPONENT EXPLANATIONS:")
lines.append("• Pos (joint pos) = Joint position in degrees")
lines.append("• Grav (gravity) = Feed-forward gravity compensation")
lines.append("• Inert (inertia) = Feed-forward inertia compensation")
lines.append("• Track (tracking) = Position tracking control (Kp error)")
lines.append("• Vel (velocity) = Velocity damping control (Kd error)")
lines.append("• Force (bilateral) = Force reflection between robots (telepresence)")
lines.append("• Frict (friction) = Feed-forward friction compensation (transparency)")
lines.append("• React (reaction) = External forces (human interaction, contact)")
lines.append("• Meas (measured) = Raw torque from motor current sensor")
lines.append("• Cmd (command) = Final torque sent to motor")
lines.append("-" * 86)
lines.append("Cmd = Track + Vel + Force + Grav + Inert + Frict")
lines.append(
"React = Meas - Grav - Inert - Frict (external forces)"
if use_friction_comp
else "React = Meas - Grav - Inert (external forces)"
"Cmd = Track + Vel + Force + (Added as feedforward in send_action: Grav + Inert + Frict)"
)
lines.append("React = Meas - Grav - Inert - Frict (external forces)")
lines.append("Force = Kf × (reflect_other_robot - React) (telepresence)")
lines.append("Frict = b_visc×ω + f_coulomb×sign(ω) (transparency)")
lines.append(
f"Control Gains: Kp=robot-class (position) | Kd=robot-class (velocity) | Kf={Kf} (force reflection)"
f"Joint Gains: shoulder_pan Kp={Kp_gains['shoulder_pan']:.1f} | shoulder_pan Kd={Kd_gains['shoulder_pan']:.1f} | shoulder_pan Kf={Kf_gains['shoulder_pan']:.1f}"
)
lines.append(
f"Joint Gains: shoulder_pan Kp={Kp_gains['shoulder_pan']:.1f} | shoulder_lift Kp={Kp_gains['shoulder_lift']:.1f} | elbow_flex Kp={Kp_gains['elbow_flex']:.1f}"
)
lines.append(
f"Friction Comp: {'ON' if use_friction_comp else 'OFF'} | Viscous: {friction_viscous['shoulder_pan']:.3f} | Coulomb: {friction_coulomb['shoulder_pan']:.3f} (robot-class)"
)
lines.append(
f"Gravity Comp: {'ON' if use_gravity_comp else 'OFF'} | Inertia Comp: {'ON' if use_gravity_comp else 'OFF'} | (Both disabled for testing)"
f"Friction Comp, Viscous: {follower.friction_viscous['shoulder_pan']:.3f} | Coulomb: {follower.friction_coulomb['shoulder_pan']:.3f} (robot-class)"
)
block = "\n".join(lines)
-86
View File
@@ -1,86 +0,0 @@
import sys
import time
import numpy as np
import pinocchio as pin
from lerobot.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig
from lerobot.robots.so101_follower_torque.so101_follower_t import SO101FollowerT
MOTORS = [
"shoulder_pan",
"shoulder_lift",
"elbow_flex",
"wrist_flex",
"wrist_roll",
"gripper",
]
FLIP_TORQUE = np.array(
[
True, # shoulder_pan
True, # shoulder_lift t
True, # elbow_flex t
True, # wrist_flex t
True, # wrist_roll
False, # gripper
],
dtype=bool,
)
SIGN = np.where(FLIP_TORQUE, -1.0, 1.0)
IDX = {name: i for i, name in enumerate(MOTORS)}
def obs_to_q(obs):
"""Extract joint vector q [rad] directly from observation dict."""
q = np.zeros(len(MOTORS))
for m in MOTORS:
q[IDX[m]] = obs[f"{m}.pos"] # already in rad
return q
def tau_to_action(tau):
"""Convert τ array (Nm) to action dict understood by send_action()."""
tau = SIGN * tau
return {f"{m}.effort": float(tau[IDX[m]]) for m in MOTORS}
cfg = SO101FollowerTConfig(port="/dev/tty.usbmodem58760431551", id="follower_torque6")
real = SO101FollowerT(cfg)
real.connect()
real.pin_robot.data = real.pin_robot.model.createData()
real.pin_robot.initViewer(open=True)
real.pin_robot.loadViewerModel()
real.pin_robot.display(real.pin_robot.q0)
ESC_CLR_EOL = "\x1b[K"
print("Running gravity compensation")
try:
while True:
obs = real.get_observation() # positions in rad
q = obs_to_q(obs) # dict to vector
tau = pin.computeGeneralizedGravity(real.pin_robot.model, real.pin_robot.data, q) # τ in [Nm]
# build compact single-line status
status = " | ".join(
f"{m}: {np.degrees(q[i]):+5.1f}° Δτ {(abs(obs[f'{m}.effort']) - abs(tau[i])):+5.1f}"
for i, m in enumerate(MOTORS)
)
# overwrite previous status
sys.stdout.write("\r" + ESC_CLR_EOL + status)
sys.stdout.flush()
real.send_action(tau_to_action(tau)) # apply τ
real.pin_robot.display(q)
time.sleep(0.002) # Run at 500 Hz
except KeyboardInterrupt:
print("Stopping")
finally:
real.disconnect()
+1
View File
@@ -171,6 +171,7 @@ def record_loop(
control_time_s: int | None = None,
single_task: str | None = None,
display_data: bool = False,
biteleop: bool = False,
):
if dataset is not None and dataset.fps != fps:
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
@@ -40,7 +40,7 @@ logger = logging.getLogger(__name__)
class SO101FollowerT(Robot):
"""
SO-101 Follower Arm designed by TheRobotStudio and Hugging Face.
SO-101 Arm with HLS3625 motors with current control.
"""
config_class = SO101FollowerTConfig
@@ -48,10 +48,10 @@ class SO101FollowerT(Robot):
_CURRENT_STEP_A: float = 6.5e-3 # 6.5 mA per register LSB #http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0
_KT_NM_PER_AMP: float = 0.814 # Torque constant Kt [N·m/A] #https://www.feetechrc.com/811177.html
_MAX_CURRENT_A: float = 4.0 # Safe driver limit for this model
_MAX_CURRENT_A: float = 4.0 # Safe driver limit
# Control gains for bilateral teleoperation
_KP_GAINS = { # Position gains [Nm/rad] - reduced for bilateral stability
# Position gains [Nm/rad]
_KP_GAINS = {
"shoulder_pan": 6.0,
"shoulder_lift": 7.0,
"elbow_flex": 7.0,
@@ -60,7 +60,8 @@ class SO101FollowerT(Robot):
"gripper": 4.0,
}
_KD_GAINS = { # Velocity gains [Nm⋅s/rad] - matched to position gains
# Velocity gains [Nm⋅s/rad]
_KD_GAINS = {
"shoulder_pan": 0.4,
"shoulder_lift": 0.8,
"elbow_flex": 0.7,
@@ -69,8 +70,18 @@ class SO101FollowerT(Robot):
"gripper": 0.3,
}
# Friction model parameters
_FRICTION_VISCOUS = { # Viscous friction coefficient [Nm⋅s/rad] per joint
# Force gains
_KF_GAINS = {
"shoulder_pan": 0.05,
"shoulder_lift": 0.05,
"elbow_flex": 0.05,
"wrist_flex": 0.05,
"wrist_roll": 0.05,
"gripper": 0.05,
}
# Viscous friction coefficient [Nm⋅s/rad] per joint
_FRICTION_VISCOUS = {
"shoulder_pan": 0.02,
"shoulder_lift": 0.08,
"elbow_flex": 0.05,
@@ -79,7 +90,8 @@ class SO101FollowerT(Robot):
"gripper": 0.01,
}
_FRICTION_COULOMB = { # Coulomb friction [Nm] per joint
# Coulomb/static friction [Nm] per joint
_FRICTION_COULOMB = {
"shoulder_pan": 0.15,
"shoulder_lift": 0.25,
"elbow_flex": 0.20,
@@ -129,28 +141,29 @@ class SO101FollowerT(Robot):
self._prev_t: float | None = None
# Butterworth low-pass filter parameters
self._cutoff_freq = 10.0 # Hz - cutoff frequency for the filter
self._filter_order = 2 # Filter order (2nd order is common)
self._sampling_freq = 100.0 # Hz - assumed control loop frequency
self._cutoff_freq = 10.0 # Hz, cutoff frequency for the filter
self._filter_order = 2 # Filter order
self._sampling_freq = 100.0 # Hz, (control loop frequency)
# Design the Butterworth filter
nyquist_freq = self._sampling_freq / 2
normalized_cutoff = self._cutoff_freq / nyquist_freq
self._b, self._a = butter(self._filter_order, normalized_cutoff, btype="low")
# History buffers for filtering
# History buffers
self._pos_history = {m: collections.deque(maxlen=20) for m in self.bus.motors}
self._vel_raw_history = {m: collections.deque(maxlen=20) for m in self.bus.motors}
self._time_history = collections.deque(maxlen=20)
self._last_observation = None
@property
def _motors_ft(self) -> dict[str, type]:
d: dict[str, type] = {}
for m in self.bus.motors:
d[f"{m}.pos"] = float
d[f"{m}.vel"] = float
d[f"{m}.acc"] = float # Add acceleration
d[f"{m}.tau_meas"] = float # Changed from tau_res to tau_meas
d[f"{m}.acc"] = float
d[f"{m}.tau_meas"] = float
return d
@property
@@ -181,6 +194,11 @@ class SO101FollowerT(Robot):
"""Velocity control gains [Nm⋅s/rad] for bilateral teleoperation"""
return self._KD_GAINS.copy()
@property
def kf_gains(self) -> dict[str, float]:
"""Force control gains for bilateral teleoperation"""
return self._KF_GAINS.copy()
@property
def friction_viscous(self) -> dict[str, float]:
"""Viscous friction coefficients [Nm⋅s/rad] for friction compensation"""
@@ -210,12 +228,11 @@ class SO101FollowerT(Robot):
self._cutoff_freq = cutoff_freq
self._filter_order = order
# Redesign the filter
nyquist_freq = self._sampling_freq / 2
normalized_cutoff = self._cutoff_freq / nyquist_freq
self._b, self._a = butter(self._filter_order, normalized_cutoff, btype="low")
# Clear history buffers
# Clear buffers
for m in self.bus.motors:
self._pos_history[m].clear()
self._vel_raw_history[m].clear()
@@ -265,7 +282,6 @@ class SO101FollowerT(Robot):
"""
Compute inertia torques τ_inertia = M(q) * ddq directly from URDF model.
"""
# Convert joint dictionaries to numpy arrays in correct order
q = np.zeros(self.pin_robot.model.nq)
dq = np.zeros(self.pin_robot.model.nv)
ddq = np.zeros(self.pin_robot.model.nv)
@@ -289,7 +305,6 @@ class SO101FollowerT(Robot):
dq_rad: dict[str, float],
ddq_rad: dict[str, float],
tau_measured: dict[str, float],
include_friction: bool = True,
) -> dict[str, float]:
"""
Compute disturbance torques using direct model-based approach:
@@ -298,29 +313,24 @@ class SO101FollowerT(Robot):
Args:
include_friction: If True, also removes friction from the disturbance calculation
"""
# Get gravity compensation
tau_gravity = self._gravity_from_q(q_rad)
# Get inertia compensation
tau_inertia = self._inertia_from_q_dq(q_rad, dq_rad, ddq_rad)
# Compute disturbance: what's left after removing known dynamics
tau_disturbance = {}
tau_friction = {}
for motor_name in self.bus.motors:
tau_dist = tau_measured[motor_name] - tau_gravity[motor_name] - tau_inertia[motor_name]
# Optionally remove friction model
if include_friction:
# Calculate friction torque using class constants
omega = dq_rad[motor_name]
tau_friction = self._FRICTION_VISCOUS[motor_name] * omega + self._FRICTION_COULOMB[
motor_name
] * (1.0 if omega > 0.01 else -1.0 if omega < -0.01 else 0.0)
# Apply torque sign correction (same as for gravity/inertia)
tau_friction = -tau_friction # Apply torque sign correction
tau_dist -= tau_friction
# Calculate friction torque
omega = dq_rad[motor_name]
tau_friction_motor = self._FRICTION_VISCOUS[motor_name] * omega + self._FRICTION_COULOMB[
motor_name
] * (1.0 if omega > 0.01 else -1.0 if omega < -0.01 else 0.0)
# Apply torque sign correction
tau_friction_motor = -tau_friction_motor
tau_friction[motor_name] = tau_friction_motor
tau_dist -= tau_friction_motor
tau_disturbance[motor_name] = tau_dist
@@ -407,7 +417,7 @@ class SO101FollowerT(Robot):
t_now = time.perf_counter()
# position
# Position
pos_deg = self.bus.sync_read("Present_Position", num_retry=5)
pos_rad = self._deg_to_rad(pos_deg)
@@ -432,17 +442,13 @@ class SO101FollowerT(Robot):
# Apply Butterworth low-pass filter to velocity
vel_rad = {}
for m in pos_rad:
if len(self._vel_raw_history[m]) >= 10: # Need enough samples for good filtering
# Convert deque to numpy array
if len(self._vel_raw_history[m]) >= 10:
vel_raw_array = np.array(list(self._vel_raw_history[m]))
# Apply Butterworth filter
vel_filtered = lfilter(self._b, self._a, vel_raw_array)
# Use the most recent filtered value
vel_rad[m] = vel_filtered[-1]
else:
# Not enough history, use raw velocity
vel_rad[m] = vel_rad_raw[m]
# Calculate acceleration from filtered velocity
@@ -454,12 +460,11 @@ class SO101FollowerT(Robot):
dt = max(dt, 1e-4) # Avoid division by zero
acc_rad = {m: (vel_rad[m] - self._prev_vel_rad[m]) / dt for m in vel_rad}
# Update previous values
self._prev_pos_rad = pos_rad.copy()
self._prev_vel_rad = vel_rad.copy()
self._prev_t = t_now
# measured torque (Nm)
# Measured torque (Nm)
cur_raw = self.bus.sync_read("Present_Current", normalize=False, num_retry=5)
tau_meas = self._current_to_torque_nm(cur_raw)
@@ -476,6 +481,9 @@ class SO101FollowerT(Robot):
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
# Store observation for feedforward compensation
self._last_observation = obs_dict.copy()
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
@@ -495,6 +503,33 @@ class SO101FollowerT(Robot):
if not tau_cmd_nm:
return action
# Add feedforward compensation if we have a last observation
if self._last_observation is not None:
# Extract position, velocity, acceleration from last observation
pos_rad = {m: self._last_observation[f"{m}.pos"] for m in self.bus.motors}
vel_rad = {m: self._last_observation[f"{m}.vel"] for m in self.bus.motors}
acc_rad = {m: self._last_observation[f"{m}.acc"] for m in self.bus.motors}
# Compute feedforward terms
tau_gravity = self._gravity_from_q(pos_rad)
tau_inertia = self._inertia_from_q_dq(pos_rad, vel_rad, acc_rad)
# Add feedforward compensation to commanded torques
for motor in tau_cmd_nm:
# Add gravity compensation
tau_cmd_nm[motor] += tau_gravity[motor]
# Add inertia compensation
tau_cmd_nm[motor] += tau_inertia[motor]
# Add friction compensation
omega = vel_rad[motor]
tau_friction = self._FRICTION_VISCOUS[motor] * omega + self._FRICTION_COULOMB[motor] * (
1.0 if omega > 0.01 else -1.0 if omega < -0.01 else 0.0
)
tau_friction = -tau_friction # Apply torque sign correction
tau_cmd_nm[motor] += tau_friction
inv_coef = 1.0 / (self._CURRENT_STEP_A * self._KT_NM_PER_AMP)
max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A))
counts = {}