mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-16 00:59:46 +00:00
Add grav compensation
This commit is contained in:
@@ -151,30 +151,56 @@ SCS_SERIES_CONTROL_TABLE = {
|
||||
"Acceleration_2": (83, 1), # don't know what that is
|
||||
}
|
||||
|
||||
HLS3625_CONTROL_TABLE = {
|
||||
# EPROM ---------------------------------------------------------
|
||||
"ID": (0x05, 1),
|
||||
"Baud_Rate": (0x06, 1),
|
||||
"Return_Delay_Time": (0x07, 1),
|
||||
"Min_Position_Limit": (0x09, 2),
|
||||
"Max_Position_Limit": (0x0B, 2),
|
||||
"Operating_Mode": (0x21, 1),
|
||||
"Homing_Offset": (0x1F, 2),
|
||||
# SRAM control -----------------------------------------------
|
||||
"Torque_Enable": (0x28, 1),
|
||||
"Acceleration": (0x29, 1),
|
||||
"Phase": (0x12, 1),
|
||||
"Target_Position": (0x2A, 2),
|
||||
"Target_Torque": (0x2C, 2),
|
||||
"Running_Speed": (0x2E, 2),
|
||||
"Torque_Limit": (0x30, 2),
|
||||
"Lock": (0x37, 1),
|
||||
# SRAM feedback ----------------------------------------------
|
||||
"Present_Position": (0x38, 2),
|
||||
"Present_Velocity": (0x3A, 2),
|
||||
"Present_Current": (0x45, 2),
|
||||
# Factory / limits -------------------------------------------
|
||||
"Maximum_Acceleration": (0x55, 1),
|
||||
# http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0
|
||||
HLS_SERIES_CONTROL_TABLE = {
|
||||
# EPROM
|
||||
"Firmware_Major_Version": FIRMWARE_MAJOR_VERSION, # read-only
|
||||
"Firmware_Minor_Version": FIRMWARE_MINOR_VERSION, # read-only
|
||||
"Model_Number": MODEL_NUMBER, # read-only
|
||||
"ID": (5, 1),
|
||||
"Baud_Rate": (6, 1),
|
||||
"Return_Delay_Time": (7, 1),
|
||||
"Response_Status_Level": (8, 1),
|
||||
"Min_Position_Limit": (9, 2),
|
||||
"Max_Position_Limit": (11, 2),
|
||||
"Max_Temperature_Limit": (13, 1),
|
||||
"Max_Voltage_Limit": (14, 1),
|
||||
"Min_Voltage_Limit": (15, 1),
|
||||
"Max_Torque_Limit": (16, 2),
|
||||
"Phase": (18, 1),
|
||||
"Unloading_Condition": (19, 1),
|
||||
"LED_Alarm_Condition": (20, 1),
|
||||
"P_Coefficient": (21, 1),
|
||||
"D_Coefficient": (22, 1),
|
||||
"I_Coefficient": (23, 1),
|
||||
"Minimum_Startup_Force": (24, 2),
|
||||
"CW_Dead_Zone": (26, 1),
|
||||
"CCW_Dead_Zone": (27, 1),
|
||||
"Homing_Offset": (31, 2),
|
||||
"Operating_Mode": (33, 1),
|
||||
"P_Coefficient_Curr": (34, 1),
|
||||
"I_Coefficient_Curr": (35, 1),
|
||||
# SRAM
|
||||
"Torque_Enable": (40, 1),
|
||||
"Acceleration": (41, 1),
|
||||
"Goal_Position": (42, 2),
|
||||
"Target_Torque": (44, 2),
|
||||
"Goal_Velocity": (46, 2),
|
||||
"Torque_Limit": (48, 2),
|
||||
"P_Coefficient_Ring": (50, 1),
|
||||
"D_Coefficient_Ring": (51, 1),
|
||||
"I_Coefficient_Ring": (52, 1),
|
||||
"Lock": (55, 1),
|
||||
# SRAM feedback
|
||||
"Present_Position": (56, 2),
|
||||
"Present_Velocity": (58, 2),
|
||||
"Present_Load": (60, 2),
|
||||
"Present_Voltage": (62, 1),
|
||||
"Present_Current": (69, 2),
|
||||
# Factory
|
||||
"Maximum_Velocity_Limit": (84, 1),
|
||||
"Maximum_Acceleration": (85, 1),
|
||||
"Acceleration_Multiplier ": (86, 1), # Acceleration multiplier in effect when acceleration is 0
|
||||
}
|
||||
|
||||
STS_SMS_SERIES_BAUDRATE_TABLE = {
|
||||
@@ -207,7 +233,7 @@ MODEL_CONTROL_TABLE = {
|
||||
"sts3250": STS_SMS_SERIES_CONTROL_TABLE,
|
||||
"scs0009": SCS_SERIES_CONTROL_TABLE,
|
||||
"sm8512bl": STS_SMS_SERIES_CONTROL_TABLE,
|
||||
"hls3625": HLS3625_CONTROL_TABLE,
|
||||
"hls3625": HLS_SERIES_CONTROL_TABLE,
|
||||
}
|
||||
|
||||
MODEL_RESOLUTION = {
|
||||
@@ -247,7 +273,14 @@ MODEL_ENCODING_TABLE = {
|
||||
"sts3250": STS_SMS_SERIES_ENCODINGS_TABLE,
|
||||
"sm8512bl": STS_SMS_SERIES_ENCODINGS_TABLE,
|
||||
"scs0009": {},
|
||||
"hls3625": {"Target_Torque": 15, "Homing_Offset": 15},
|
||||
"hls3625": {
|
||||
"Homing_Offset": 11,
|
||||
"Goal_Velocity": 15,
|
||||
"Present_Velocity": 15,
|
||||
"Target_Torque": 10,
|
||||
"Present_Position": 15,
|
||||
"Goal_Position": 15,
|
||||
},
|
||||
}
|
||||
|
||||
SCAN_BAUDRATES = [
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import math
|
||||
import time
|
||||
from functools import cached_property
|
||||
from typing import Any
|
||||
@@ -41,6 +42,45 @@ class SO101FollowerT(Robot):
|
||||
config_class = SO101FollowerTConfig
|
||||
name = "so101_follower"
|
||||
|
||||
_CURRENT_STEP_A: float = 6.5e-3 # 6.5 mA per register LSB #http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0
|
||||
_KT_NM_PER_AMP: float = 0.814 # Torque constant Kt [N·m/A] #https://www.feetechrc.com/811177.html
|
||||
_MAX_CURRENT_A: float = 3.0 # Safe driver limit for this model
|
||||
|
||||
_COUNT_TO_RAD: float = math.radians(0.087) # 1 pos count to rad
|
||||
|
||||
def _current_to_torque_nm(self, raw: dict[str, int]) -> dict[str, float]:
|
||||
"""Convert "Present_Current" register counts (±2047) → torque [Nm].
|
||||
Values are clamped to ±3A before conversion for protection.
|
||||
"""
|
||||
max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) # ≈ 462
|
||||
coef = self._CURRENT_STEP_A * self._KT_NM_PER_AMP
|
||||
return {k: max(min(v, max_cnt), -max_cnt) * coef for k, v in raw.items()}
|
||||
|
||||
def _torque_nm_to_current(self, torque: dict[str, float]) -> dict[str, int]:
|
||||
"""Convert torque [Nm] to register counts, clamped to ±3A (2.44 Nm)."""
|
||||
inv_coef = 1.0 / (self._CURRENT_STEP_A * self._KT_NM_PER_AMP)
|
||||
max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A))
|
||||
max_torque = self._MAX_CURRENT_A * self._KT_NM_PER_AMP
|
||||
return {
|
||||
k: max(-max_cnt, min(max_cnt, int(round(max(-max_torque, min(max_torque, float(t))) * inv_coef))))
|
||||
for k, t in torque.items()
|
||||
}
|
||||
|
||||
def _counts_to_rad(self, raw: dict[str, int]) -> dict[str, float]:
|
||||
"""Convert "Present_Position" counts to rad."""
|
||||
out: dict[str, float] = {}
|
||||
for motor, cnt in raw.items():
|
||||
out[motor] = cnt * self._COUNT_TO_RAD
|
||||
return out
|
||||
|
||||
def _rad_to_counts(self, pos: dict[str, float]) -> dict[str, int]:
|
||||
"""Convert radians to position counts."""
|
||||
out: dict[str, int] = {}
|
||||
for motor, rad in pos.items():
|
||||
cnt = int(round(rad / self._COUNT_TO_RAD))
|
||||
out[motor] = cnt
|
||||
return out
|
||||
|
||||
def __init__(self, config: SO101FollowerTConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
@@ -146,7 +186,7 @@ class SO101FollowerT(Robot):
|
||||
|
||||
self.bus.write("Operating_Mode", motor, 2) # Set to current mode
|
||||
self.bus.write("Target_Torque", motor, 0)
|
||||
self.bus.write("Torque_Limit", motor, 1000)
|
||||
self.bus.write("Torque_Limit", motor, 1000) # 100%
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
for motor in reversed(self.bus.motors):
|
||||
@@ -158,14 +198,17 @@ class SO101FollowerT(Robot):
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
|
||||
pos = self.bus.sync_read("Present_Position", normalize=False, num_retry=5)
|
||||
curr = self.bus.sync_read("Present_Current", normalize=False, num_retry=5)
|
||||
pos_dict = {f"{motor}.pos": val for motor, val in pos.items()}
|
||||
curr_dict = {f"{motor}.effort": val for motor, val in curr.items()}
|
||||
obs_dict = {**pos_dict, **curr_dict}
|
||||
# Positions
|
||||
pos_counts = self.bus.sync_read("Present_Position", normalize=False, num_retry=10)
|
||||
pos_rad = self._counts_to_rad(pos_counts)
|
||||
obs_dict = {f"{m}.pos": r for m, r in pos_rad.items()}
|
||||
|
||||
# Currents (counts) → torque (N·m)
|
||||
curr_raw = self.bus.sync_read("Present_Current", normalize=False, num_retry=10)
|
||||
torque_nm = self._current_to_torque_nm({f"{m}.effort": v for m, v in curr_raw.items()})
|
||||
obs_dict.update(torque_nm)
|
||||
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
@@ -180,26 +223,26 @@ class SO101FollowerT(Robot):
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Command arm to move to a target joint configuration.
|
||||
|
||||
The relative action magnitude may be clipped depending on the configuration parameter
|
||||
`max_relative_target`. In this case, the action sent differs from original action.
|
||||
Thus, this function always returns the action actually sent.
|
||||
"""Command arm to move to a target torque for a joint.
|
||||
|
||||
Raises:
|
||||
RobotDeviceNotConnectedError: if robot is not connected.
|
||||
|
||||
Returns:
|
||||
the action sent to the motors, potentially clipped.
|
||||
the action sent to the motors.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
# Send goal position to the arm
|
||||
effort = {k.removesuffix(".effort"): int(v) for k, v in action.items() if k.endswith(".effort")}
|
||||
if effort:
|
||||
# current (torque) mode write
|
||||
self.bus.sync_write("Target_Torque", effort, normalize=False, num_retry=2)
|
||||
# Extract torque commands
|
||||
torque_cmd = {k: v for k, v in action.items() if k.endswith(".effort")}
|
||||
if torque_cmd:
|
||||
counts = self._torque_nm_to_current(torque_cmd)
|
||||
# remove the .effort suffix
|
||||
counts = {k.removesuffix(".effort"): v for k, v in counts.items()}
|
||||
self.bus.sync_write("Target_Torque", counts, normalize=False, num_retry=2)
|
||||
|
||||
# pass back the other keys (.pos) untouched
|
||||
return action
|
||||
|
||||
def disconnect(self):
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import pinocchio as pin
|
||||
|
||||
from lerobot.common.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig
|
||||
from lerobot.common.robots.so101_follower_torque.so101_follower_t import SO101FollowerT
|
||||
|
||||
MOTORS = [
|
||||
"shoulder_pan",
|
||||
"shoulder_lift",
|
||||
"elbow_flex",
|
||||
"wrist_flex",
|
||||
"wrist_roll",
|
||||
"gripper",
|
||||
]
|
||||
|
||||
IDX = {name: i for i, name in enumerate(MOTORS)}
|
||||
|
||||
|
||||
def obs_to_q(obs):
|
||||
"""Extract joint vector q [rad] directly from observation dict."""
|
||||
q = np.zeros(len(MOTORS))
|
||||
for m in MOTORS:
|
||||
q[IDX[m]] = obs[f"{m}.pos"] # already in rad
|
||||
return q
|
||||
|
||||
|
||||
def tau_to_action(tau):
|
||||
"""Convert τ array (Nm) to action dict understood by send_action()."""
|
||||
return {f"{m}.effort": float(tau[IDX[m]]) for m in MOTORS}
|
||||
|
||||
|
||||
pin_robot = pin.RobotWrapper.BuildFromURDF(
|
||||
"lerobot/SO101/so101_new_calib.urdf",
|
||||
"lerobot/SO101",
|
||||
)
|
||||
pin_robot.data = pin_robot.model.createData()
|
||||
pin_robot.initViewer(open=True)
|
||||
pin_robot.loadViewerModel()
|
||||
pin_robot.display(pin_robot.q0)
|
||||
|
||||
cfg = SO101FollowerTConfig(port="/dev/tty.usbmodem58760431551", id="my_awesome_follower_arm_t")
|
||||
real = SO101FollowerT(cfg)
|
||||
real.connect()
|
||||
|
||||
print("Running gravity compensation")
|
||||
try:
|
||||
while True:
|
||||
obs = real.get_observation() # positions in rad
|
||||
q = obs_to_q(obs) # dict to vector
|
||||
|
||||
tau = pin.computeGeneralizedGravity(pin_robot.model, pin_robot.data, q) # τ in [Nm]
|
||||
|
||||
# real.send_action(tau_to_action(tau)) # apply τ
|
||||
|
||||
pin_robot.display(q)
|
||||
time.sleep(0.02) # Run at 50 Hz
|
||||
except KeyboardInterrupt:
|
||||
print("Stopping")
|
||||
finally:
|
||||
real.disconnect()
|
||||
Reference in New Issue
Block a user