diff --git a/examples/so100_teleop/teleop.py b/examples/so100_teleop/teleop.py index b3c854546..a69b4f1f8 100644 --- a/examples/so100_teleop/teleop.py +++ b/examples/so100_teleop/teleop.py @@ -18,8 +18,11 @@ Simple SO100/SO101 leader-follower teleoperation with spacebar intervention toggle. Modes: - - Default (not intervening): leader arm tracks follower position (torque ON on leader) - - Intervention (SPACE pressed): follower arm tracks leader position (torque OFF on leader) + - Default (not intervening): follower holds its current position. + The leader arm has torque ENABLED and mirrors the follower so there is no + large position jump when intervention starts. + - Intervention (SPACE pressed): leader torque DISABLED, human moves the leader + freely, and the follower mirrors the leader joint-by-joint. Usage: uv run python examples/so100_teleop/teleop.py @@ -29,62 +32,144 @@ Controls: Ctrl+C — exit """ +import logging +import os +import sys import time +from threading import Event, Thread from lerobot.robots.so_follower import SO101Follower, SO101FollowerConfig -from lerobot.teleoperators.so_leader import SO101LeaderFollower +from lerobot.teleoperators.so_leader import SO101Leader from lerobot.teleoperators.so_leader.config_so_leader import SOLeaderTeleopConfig from lerobot.utils.robot_utils import precise_sleep -FPS = 30 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# ── pynput keyboard listener ───────────────────────────────────────────────── +PYNPUT_AVAILABLE = True +try: + if "DISPLAY" not in os.environ and "linux" in sys.platform: + raise ImportError("No DISPLAY set, pynput skipped.") + from pynput import keyboard as pynput_keyboard +except Exception: + pynput_keyboard = None + PYNPUT_AVAILABLE = False # ── Configure ports ────────────────────────────────────────────────────────── +FOLLOWER_PORT = "/dev/ttyUSB0" # ← change to your follower port +LEADER_PORT = "/dev/ttyUSB1" # ← change to your leader port +FPS = 30 + + +def hold_position(robot) -> dict: + """Read current joint positions and write them back as the goal. + + This prevents the motors from snapping to a stale Goal_Position register + value (which can happen when torque is re-enabled after calibration). + Returns the current position dict for reuse. + """ + current = robot.bus.sync_read("Present_Position") + robot.bus.sync_write("Goal_Position", current) + return {f"{motor}.pos": val for motor, val in current.items()} + + +# ── Connect ─────────────────────────────────────────────────────────────────── follower_config = SO101FollowerConfig( - port="/dev/ttyUSB0", # adjust to your follower port, e.g. /dev/usb_follower_arm + port=FOLLOWER_PORT, id="follower_arm", use_degrees=True, ) - leader_config = SOLeaderTeleopConfig( - port="/dev/ttyUSB1", # adjust to your leader port, e.g. /dev/usb_leader_arm + port=LEADER_PORT, id="leader_arm", use_degrees=True, ) -# ── Connect ────────────────────────────────────────────────────────────────── follower = SO101Follower(follower_config) -leader = SO101LeaderFollower(leader_config) +leader = SO101Leader(leader_config) follower.connect() leader.connect() -print("\nTeleoperation started.") -print(" Not intervening → leader mirrors follower (torque ON)") -print(" SPACE → toggle intervention: follower mirrors leader (torque OFF)") -print(" Ctrl+C → exit\n") +# ── CRITICAL: hold both arms at their current position before doing anything ─ +# configure() enables follower torque, and the Goal_Position register may contain +# a stale value from a previous session. Writing current→goal prevents sudden motion. +follower_current = hold_position(follower) +leader_current = hold_position(leader) # leader torque is still off here, but sets the register + +# ── Intervention state + keyboard listener ─────────────────────────────────── +is_intervening = False +stop_event = Event() + + +def _start_keyboard_listener(): + if not PYNPUT_AVAILABLE: + logger.warning("pynput not available — spacebar toggle disabled.") + return None + + def on_press(key): + global is_intervening + if key == pynput_keyboard.Key.space: + is_intervening = not is_intervening + state = "INTERVENTION (leader → follower)" if is_intervening else "IDLE (follower holds)" + print(f"\n[SPACE] {state}\n") + + def listen(): + with pynput_keyboard.Listener(on_press=on_press) as listener: + while not stop_event.is_set(): + time.sleep(0.05) + listener.stop() + + t = Thread(target=listen, daemon=True) + t.start() + return t + + +kbd_thread = _start_keyboard_listener() + +# Enable leader torque AFTER writing its goal to current position, so it holds in place. +leader.bus.sync_write("Torque_Enable", 1) +leader_torque_on = True + +print("\nTeleoperation ready.") +print(" SPACE → toggle intervention (leader controls follower)") +print(" Ctrl+C → exit\n") try: while True: t0 = time.perf_counter() - # 1. Read current follower joint positions - follower_obs = follower.get_observation() + if is_intervening: + # ── Intervention: leader torque OFF, follower mirrors leader ────── + if leader_torque_on: + leader.bus.sync_write("Torque_Enable", 0) + leader_torque_on = False - # 2. Send follower positions to leader so it tracks the follower when not intervening - leader.send_action(follower_obs) + leader_action = leader.get_action() # reads present leader joints + follower.send_action(leader_action) # follower tracks leader - # 3. Read current leader joint positions - leader_action = leader.get_action() + else: + # ── Idle: leader torque ON, leader mirrors follower, follower holds + if not leader_torque_on: + # Before re-enabling torque, set the leader's goal to its current + # position so it doesn't snap to the follower position suddenly. + hold_position(leader) + leader.bus.sync_write("Torque_Enable", 1) + leader_torque_on = True - # 4. When intervening, forward leader positions to follower - if leader.is_intervening: - follower.send_action(leader_action) - # else: follower holds its current position (no new command sent) + follower_obs = follower.get_observation() + # Command leader to match follower (so next intervention has no jump) + goal_pos = {motor: follower_obs[f"{motor}.pos"] for motor in leader.bus.motors} + leader.bus.sync_write("Goal_Position", goal_pos) + # Follower holds — no send_action call precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0)) except KeyboardInterrupt: print("\nExiting...") finally: + stop_event.set() + leader.bus.sync_write("Torque_Enable", 0) follower.disconnect() leader.disconnect() diff --git a/src/lerobot/teleoperators/so_leader/so101_leader_follower.py b/src/lerobot/teleoperators/so_leader/so101_leader_follower.py index a685aee93..db3f504fe 100644 --- a/src/lerobot/teleoperators/so_leader/so101_leader_follower.py +++ b/src/lerobot/teleoperators/so_leader/so101_leader_follower.py @@ -58,7 +58,9 @@ class SO101LeaderFollower(SO101Leader): # Leader-follower state self.is_intervening = False - self.leader_torque_enabled = True + # Initialize as False because configure() disables torque at connect time; + # send_action() will re-enable it on the first call when not intervening. + self.leader_torque_enabled = False # Tracking error for automatic intervention detection self.leader_tracking_error_queue = deque(maxlen=4)