add foot pedal support

This commit is contained in:
Pepijn
2026-01-02 09:36:36 +01:00
parent eb0918249d
commit 8277dbf0dc
2 changed files with 146 additions and 3 deletions
+73 -1
View File
@@ -169,9 +169,74 @@ def init_rac_keyboard_listener():
listener = keyboard.Listener(on_press=on_press)
listener.start()
start_pedal_listener(events)
return listener, events
def start_pedal_listener(events: dict):
"""Start foot pedal listener thread if evdev is available."""
import threading
try:
from evdev import InputDevice, ecodes
except ImportError:
logging.info("[Pedal] evdev not installed - pedal support disabled")
return
PEDAL_DEVICE = "/dev/input/by-id/usb-PCsensor_FootSwitch-event-kbd"
KEY_LEFT = "KEY_A" # Left pedal -> 'c' (take control)
KEY_RIGHT = "KEY_C" # Right pedal -> SPACE (pause) or → (next)
def pedal_reader():
try:
dev = InputDevice(PEDAL_DEVICE)
print(f"[Pedal] Connected: {dev.name}")
print(f"[Pedal] Right={KEY_RIGHT} (pause/next), Left={KEY_LEFT} (take control)")
for ev in dev.read_loop():
if ev.type != ecodes.EV_KEY:
continue
from evdev import categorize
key = categorize(ev)
code = key.keycode
if isinstance(code, (list, tuple)):
code = code[0]
# Only trigger on key down
if key.keystate != 1:
continue
if code == KEY_RIGHT:
# Right pedal: SPACE (pause) when running, → (next) when in correction
if events["correction_active"]:
print("\n[Pedal] → End episode")
events["exit_early"] = True
elif not events["policy_paused"]:
print("\n[Pedal] ⏸ PAUSED - Policy stopped, teleop tracking robot")
print(" Press left pedal to take control")
events["policy_paused"] = True
elif code == KEY_LEFT:
# Left pedal: 'c' (take control) when paused
if events["policy_paused"] and not events["correction_active"]:
print("\n[Pedal] ▶ CORRECTION - You have control (recording)")
print(" Press right pedal when done")
events["correction_active"] = True
except FileNotFoundError:
logging.info(f"[Pedal] Device not found: {PEDAL_DEVICE}")
except PermissionError:
logging.warning(f"[Pedal] Permission denied. Run: sudo setfacl -m u:$USER:rw {PEDAL_DEVICE}")
except Exception as e:
logging.debug(f"[Pedal] Error: {e}")
thread = threading.Thread(target=pedal_reader, daemon=True)
thread.start()
def make_identity_processors():
"""Create identity processors for RaC recording."""
teleop_proc = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
@@ -309,8 +374,15 @@ def reset_loop(
):
"""Reset period where human repositions environment."""
print(f"\n[RaC] Reset time: {reset_time_s}s - reposition environment")
print(" Teleop active - move leader arms to home position")
# Move teleop to match robot position to avoid sudden jumps
obs = robot.get_observation()
robot_pos = {k: v for k, v in obs.items() if k.endswith(".pos")}
print(" Moving teleop to robot position...")
teleop.smooth_move_to(robot_pos, duration_s=2.0, fps=50)
teleop.disable_torque()
print(" Teleop active - move leader arms to home position")
timestamp = 0
start_t = time.perf_counter()
+73 -2
View File
@@ -175,9 +175,74 @@ def init_rac_keyboard_listener():
listener = keyboard.Listener(on_press=on_press)
listener.start()
start_pedal_listener(events)
return listener, events
def start_pedal_listener(events: dict):
"""Start foot pedal listener thread if evdev is available."""
import threading
try:
from evdev import InputDevice, ecodes
except ImportError:
logging.info("[Pedal] evdev not installed - pedal support disabled")
return
PEDAL_DEVICE = "/dev/input/by-id/usb-PCsensor_FootSwitch-event-kbd"
KEY_LEFT = "KEY_A" # Left pedal -> 'c' (take control)
KEY_RIGHT = "KEY_C" # Right pedal -> SPACE (pause) or → (next)
def pedal_reader():
try:
dev = InputDevice(PEDAL_DEVICE)
print(f"[Pedal] Connected: {dev.name}")
print(f"[Pedal] Right={KEY_RIGHT} (pause/next), Left={KEY_LEFT} (take control)")
for ev in dev.read_loop():
if ev.type != ecodes.EV_KEY:
continue
from evdev import categorize
key = categorize(ev)
code = key.keycode
if isinstance(code, (list, tuple)):
code = code[0]
# Only trigger on key down
if key.keystate != 1:
continue
if code == KEY_RIGHT:
# Right pedal: SPACE (pause) when running, → (next) when in correction
if events["correction_active"]:
print("\n[Pedal] → End episode")
events["exit_early"] = True
elif not events["policy_paused"]:
print("\n[Pedal] ⏸ PAUSED - Policy stopped, teleop tracking robot")
print(" Press left pedal to take control")
events["policy_paused"] = True
elif code == KEY_LEFT:
# Left pedal: 'c' (take control) when paused
if events["policy_paused"] and not events["correction_active"]:
print("\n[Pedal] ▶ CORRECTION - You have control (recording)")
print(" Press right pedal when done")
events["correction_active"] = True
except FileNotFoundError:
logging.info(f"[Pedal] Device not found: {PEDAL_DEVICE}")
except PermissionError:
logging.warning(f"[Pedal] Permission denied. Run: sudo setfacl -m u:$USER:rw {PEDAL_DEVICE}")
except Exception as e:
logging.debug(f"[Pedal] Error: {e}")
thread = threading.Thread(target=pedal_reader, daemon=True)
thread.start()
def make_identity_processors():
"""Create identity processors for RaC recording."""
teleop_proc = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
@@ -259,7 +324,7 @@ def rac_rollout_loop(
obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features}
robot_pos = {k: v for k, v in obs_filtered.items() if k.endswith(".pos")}
print("[RaC] Moving teleop to robot position (1s smooth transition)...")
teleop.smooth_move_to(robot_pos, duration_s=1.0, fps=50)
teleop.smooth_move_to(robot_pos, duration_s=2.0, fps=50)
print("[RaC] Teleop aligned. Press 'c' to take control.")
was_paused = True
@@ -341,8 +406,14 @@ def reset_loop(
):
"""Reset period where human repositions environment."""
print(f"\n[RaC] Reset time: {reset_time_s}s - reposition environment")
print(" Teleop active - move leader arms to home position")
# First move teleop to match robot position to avoid sudden jumps
obs = robot.get_observation()
robot_pos = {k: v for k, v in obs.items() if k.endswith(".pos") and k in robot.observation_features}
print(" Moving teleop to robot position...")
teleop.smooth_move_to(robot_pos, duration_s=2.0, fps=50)
teleop.disable_torque()
print(" Teleop active - move leader arms to home position")
timestamp = 0
start_t = time.perf_counter()