From 8277dbf0dc641ac1e5d37f76f2ba89f53dff58b3 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Fri, 2 Jan 2026 09:36:36 +0100 Subject: [PATCH] add foot pedal support --- examples/rac/rac_data_collection.py | 74 ++++++++++++++++++- examples/rac/rac_data_collection_openarms.py | 75 +++++++++++++++++++- 2 files changed, 146 insertions(+), 3 deletions(-) diff --git a/examples/rac/rac_data_collection.py b/examples/rac/rac_data_collection.py index 7cdc4d171..95641aa03 100644 --- a/examples/rac/rac_data_collection.py +++ b/examples/rac/rac_data_collection.py @@ -169,9 +169,74 @@ def init_rac_keyboard_listener(): listener = keyboard.Listener(on_press=on_press) listener.start() + + start_pedal_listener(events) + return listener, events +def start_pedal_listener(events: dict): + """Start foot pedal listener thread if evdev is available.""" + import threading + + try: + from evdev import InputDevice, ecodes + except ImportError: + logging.info("[Pedal] evdev not installed - pedal support disabled") + return + + PEDAL_DEVICE = "/dev/input/by-id/usb-PCsensor_FootSwitch-event-kbd" + KEY_LEFT = "KEY_A" # Left pedal -> 'c' (take control) + KEY_RIGHT = "KEY_C" # Right pedal -> SPACE (pause) or → (next) + + def pedal_reader(): + try: + dev = InputDevice(PEDAL_DEVICE) + print(f"[Pedal] Connected: {dev.name}") + print(f"[Pedal] Right={KEY_RIGHT} (pause/next), Left={KEY_LEFT} (take control)") + + for ev in dev.read_loop(): + if ev.type != ecodes.EV_KEY: + continue + + from evdev import categorize + key = categorize(ev) + code = key.keycode + if isinstance(code, (list, tuple)): + code = code[0] + + # Only trigger on key down + if key.keystate != 1: + continue + + if code == KEY_RIGHT: + # Right pedal: SPACE (pause) when running, → (next) when in correction + if events["correction_active"]: + print("\n[Pedal] → End episode") + events["exit_early"] = True + elif not events["policy_paused"]: + print("\n[Pedal] ⏸ PAUSED - Policy stopped, teleop tracking robot") + print(" Press left pedal to take control") + events["policy_paused"] = True + + elif code == KEY_LEFT: + # Left pedal: 'c' (take control) when paused + if events["policy_paused"] and not events["correction_active"]: + print("\n[Pedal] ▶ CORRECTION - You have control (recording)") + print(" Press right pedal when done") + events["correction_active"] = True + + except FileNotFoundError: + logging.info(f"[Pedal] Device not found: {PEDAL_DEVICE}") + except PermissionError: + logging.warning(f"[Pedal] Permission denied. Run: sudo setfacl -m u:$USER:rw {PEDAL_DEVICE}") + except Exception as e: + logging.debug(f"[Pedal] Error: {e}") + + thread = threading.Thread(target=pedal_reader, daemon=True) + thread.start() + + def make_identity_processors(): """Create identity processors for RaC recording.""" teleop_proc = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( @@ -309,8 +374,15 @@ def reset_loop( ): """Reset period where human repositions environment.""" print(f"\n[RaC] Reset time: {reset_time_s}s - reposition environment") - print(" Teleop active - move leader arms to home position") + + # Move teleop to match robot position to avoid sudden jumps + obs = robot.get_observation() + robot_pos = {k: v for k, v in obs.items() if k.endswith(".pos")} + print(" Moving teleop to robot position...") + teleop.smooth_move_to(robot_pos, duration_s=2.0, fps=50) + teleop.disable_torque() + print(" Teleop active - move leader arms to home position") timestamp = 0 start_t = time.perf_counter() diff --git a/examples/rac/rac_data_collection_openarms.py b/examples/rac/rac_data_collection_openarms.py index 3b9649ffd..adcce69a0 100644 --- a/examples/rac/rac_data_collection_openarms.py +++ b/examples/rac/rac_data_collection_openarms.py @@ -175,9 +175,74 @@ def init_rac_keyboard_listener(): listener = keyboard.Listener(on_press=on_press) listener.start() + + start_pedal_listener(events) + return listener, events +def start_pedal_listener(events: dict): + """Start foot pedal listener thread if evdev is available.""" + import threading + + try: + from evdev import InputDevice, ecodes + except ImportError: + logging.info("[Pedal] evdev not installed - pedal support disabled") + return + + PEDAL_DEVICE = "/dev/input/by-id/usb-PCsensor_FootSwitch-event-kbd" + KEY_LEFT = "KEY_A" # Left pedal -> 'c' (take control) + KEY_RIGHT = "KEY_C" # Right pedal -> SPACE (pause) or → (next) + + def pedal_reader(): + try: + dev = InputDevice(PEDAL_DEVICE) + print(f"[Pedal] Connected: {dev.name}") + print(f"[Pedal] Right={KEY_RIGHT} (pause/next), Left={KEY_LEFT} (take control)") + + for ev in dev.read_loop(): + if ev.type != ecodes.EV_KEY: + continue + + from evdev import categorize + key = categorize(ev) + code = key.keycode + if isinstance(code, (list, tuple)): + code = code[0] + + # Only trigger on key down + if key.keystate != 1: + continue + + if code == KEY_RIGHT: + # Right pedal: SPACE (pause) when running, → (next) when in correction + if events["correction_active"]: + print("\n[Pedal] → End episode") + events["exit_early"] = True + elif not events["policy_paused"]: + print("\n[Pedal] ⏸ PAUSED - Policy stopped, teleop tracking robot") + print(" Press left pedal to take control") + events["policy_paused"] = True + + elif code == KEY_LEFT: + # Left pedal: 'c' (take control) when paused + if events["policy_paused"] and not events["correction_active"]: + print("\n[Pedal] ▶ CORRECTION - You have control (recording)") + print(" Press right pedal when done") + events["correction_active"] = True + + except FileNotFoundError: + logging.info(f"[Pedal] Device not found: {PEDAL_DEVICE}") + except PermissionError: + logging.warning(f"[Pedal] Permission denied. Run: sudo setfacl -m u:$USER:rw {PEDAL_DEVICE}") + except Exception as e: + logging.debug(f"[Pedal] Error: {e}") + + thread = threading.Thread(target=pedal_reader, daemon=True) + thread.start() + + def make_identity_processors(): """Create identity processors for RaC recording.""" teleop_proc = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( @@ -259,7 +324,7 @@ def rac_rollout_loop( obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features} robot_pos = {k: v for k, v in obs_filtered.items() if k.endswith(".pos")} print("[RaC] Moving teleop to robot position (1s smooth transition)...") - teleop.smooth_move_to(robot_pos, duration_s=1.0, fps=50) + teleop.smooth_move_to(robot_pos, duration_s=2.0, fps=50) print("[RaC] Teleop aligned. Press 'c' to take control.") was_paused = True @@ -341,8 +406,14 @@ def reset_loop( ): """Reset period where human repositions environment.""" print(f"\n[RaC] Reset time: {reset_time_s}s - reposition environment") - print(" Teleop active - move leader arms to home position") + + # First move teleop to match robot position to avoid sudden jumps + obs = robot.get_observation() + robot_pos = {k: v for k, v in obs.items() if k.endswith(".pos") and k in robot.observation_features} + print(" Moving teleop to robot position...") + teleop.smooth_move_to(robot_pos, duration_s=2.0, fps=50) teleop.disable_torque() + print(" Teleop active - move leader arms to home position") timestamp = 0 start_t = time.perf_counter()