feat(teleop): enhance leader-follower behavior and torque management in SO101 teleoperation

This commit is contained in:
Khalil Meftah
2026-04-28 17:46:06 +02:00
parent e228f0880f
commit c868f874f1
2 changed files with 111 additions and 24 deletions
+108 -23
View File
@@ -18,8 +18,11 @@
Simple SO100/SO101 leader-follower teleoperation with spacebar intervention toggle.
Modes:
- Default (not intervening): leader arm tracks follower position (torque ON on leader)
- Intervention (SPACE pressed): follower arm tracks leader position (torque OFF on leader)
- Default (not intervening): follower holds its current position.
The leader arm has torque ENABLED and mirrors the follower so there is no
large position jump when intervention starts.
- Intervention (SPACE pressed): leader torque DISABLED, human moves the leader
freely, and the follower mirrors the leader joint-by-joint.
Usage:
uv run python examples/so100_teleop/teleop.py
@@ -29,62 +32,144 @@ Controls:
Ctrl+C — exit
"""
import logging
import os
import sys
import time
from threading import Event, Thread
from lerobot.robots.so_follower import SO101Follower, SO101FollowerConfig
from lerobot.teleoperators.so_leader import SO101LeaderFollower
from lerobot.teleoperators.so_leader import SO101Leader
from lerobot.teleoperators.so_leader.config_so_leader import SOLeaderTeleopConfig
from lerobot.utils.robot_utils import precise_sleep
FPS = 30
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ── pynput keyboard listener ─────────────────────────────────────────────────
PYNPUT_AVAILABLE = True
try:
if "DISPLAY" not in os.environ and "linux" in sys.platform:
raise ImportError("No DISPLAY set, pynput skipped.")
from pynput import keyboard as pynput_keyboard
except Exception:
pynput_keyboard = None
PYNPUT_AVAILABLE = False
# ── Configure ports ──────────────────────────────────────────────────────────
FOLLOWER_PORT = "/dev/ttyUSB0" # ← change to your follower port
LEADER_PORT = "/dev/ttyUSB1" # ← change to your leader port
FPS = 30
def hold_position(robot) -> dict:
"""Read current joint positions and write them back as the goal.
This prevents the motors from snapping to a stale Goal_Position register
value (which can happen when torque is re-enabled after calibration).
Returns the current position dict for reuse.
"""
current = robot.bus.sync_read("Present_Position")
robot.bus.sync_write("Goal_Position", current)
return {f"{motor}.pos": val for motor, val in current.items()}
# ── Connect ───────────────────────────────────────────────────────────────────
follower_config = SO101FollowerConfig(
port="/dev/ttyUSB0", # adjust to your follower port, e.g. /dev/usb_follower_arm
port=FOLLOWER_PORT,
id="follower_arm",
use_degrees=True,
)
leader_config = SOLeaderTeleopConfig(
port="/dev/ttyUSB1", # adjust to your leader port, e.g. /dev/usb_leader_arm
port=LEADER_PORT,
id="leader_arm",
use_degrees=True,
)
# ── Connect ──────────────────────────────────────────────────────────────────
follower = SO101Follower(follower_config)
leader = SO101LeaderFollower(leader_config)
leader = SO101Leader(leader_config)
follower.connect()
leader.connect()
print("\nTeleoperation started.")
print(" Not intervening → leader mirrors follower (torque ON)")
print(" SPACE → toggle intervention: follower mirrors leader (torque OFF)")
print(" Ctrl+C → exit\n")
# ── CRITICAL: hold both arms at their current position before doing anything ─
# configure() enables follower torque, and the Goal_Position register may contain
# a stale value from a previous session. Writing current→goal prevents sudden motion.
follower_current = hold_position(follower)
leader_current = hold_position(leader) # leader torque is still off here, but sets the register
# ── Intervention state + keyboard listener ───────────────────────────────────
is_intervening = False
stop_event = Event()
def _start_keyboard_listener():
if not PYNPUT_AVAILABLE:
logger.warning("pynput not available — spacebar toggle disabled.")
return None
def on_press(key):
global is_intervening
if key == pynput_keyboard.Key.space:
is_intervening = not is_intervening
state = "INTERVENTION (leader → follower)" if is_intervening else "IDLE (follower holds)"
print(f"\n[SPACE] {state}\n")
def listen():
with pynput_keyboard.Listener(on_press=on_press) as listener:
while not stop_event.is_set():
time.sleep(0.05)
listener.stop()
t = Thread(target=listen, daemon=True)
t.start()
return t
kbd_thread = _start_keyboard_listener()
# Enable leader torque AFTER writing its goal to current position, so it holds in place.
leader.bus.sync_write("Torque_Enable", 1)
leader_torque_on = True
print("\nTeleoperation ready.")
print(" SPACE → toggle intervention (leader controls follower)")
print(" Ctrl+C → exit\n")
try:
while True:
t0 = time.perf_counter()
# 1. Read current follower joint positions
follower_obs = follower.get_observation()
if is_intervening:
# ── Intervention: leader torque OFF, follower mirrors leader ──────
if leader_torque_on:
leader.bus.sync_write("Torque_Enable", 0)
leader_torque_on = False
# 2. Send follower positions to leader so it tracks the follower when not intervening
leader.send_action(follower_obs)
leader_action = leader.get_action() # reads present leader joints
follower.send_action(leader_action) # follower tracks leader
# 3. Read current leader joint positions
leader_action = leader.get_action()
else:
# ── Idle: leader torque ON, leader mirrors follower, follower holds
if not leader_torque_on:
# Before re-enabling torque, set the leader's goal to its current
# position so it doesn't snap to the follower position suddenly.
hold_position(leader)
leader.bus.sync_write("Torque_Enable", 1)
leader_torque_on = True
# 4. When intervening, forward leader positions to follower
if leader.is_intervening:
follower.send_action(leader_action)
# else: follower holds its current position (no new command sent)
follower_obs = follower.get_observation()
# Command leader to match follower (so next intervention has no jump)
goal_pos = {motor: follower_obs[f"{motor}.pos"] for motor in leader.bus.motors}
leader.bus.sync_write("Goal_Position", goal_pos)
# Follower holds — no send_action call
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
except KeyboardInterrupt:
print("\nExiting...")
finally:
stop_event.set()
leader.bus.sync_write("Torque_Enable", 0)
follower.disconnect()
leader.disconnect()
@@ -58,7 +58,9 @@ class SO101LeaderFollower(SO101Leader):
# Leader-follower state
self.is_intervening = False
self.leader_torque_enabled = True
# Initialize as False because configure() disables torque at connect time;
# send_action() will re-enable it on the first call when not intervening.
self.leader_torque_enabled = False
# Tracking error for automatic intervention detection
self.leader_tracking_error_queue = deque(maxlen=4)