feat: is connect checks decorators (#2813)

This commit is contained in:
Steven Palma
2026-01-16 18:52:06 +01:00
committed by GitHub
parent 77dc49b3a3
commit 46e19ae579
22 changed files with 144 additions and 261 deletions
@@ -22,7 +22,7 @@ from queue import Queue
from typing import Any
from lerobot.processor import RobotAction
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from ..teleoperator import Teleoperator
from ..utils import TeleopEvents
@@ -86,12 +86,8 @@ class KeyboardTeleop(Teleoperator):
def is_calibrated(self) -> bool:
pass
@check_if_already_connected
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(
"Keyboard is already connected. Do not run `robot.connect()` twice."
)
if PYNPUT_AVAILABLE:
logging.info("pynput is available - enabling local keyboard listener.")
self.listener = keyboard.Listener(
@@ -125,14 +121,10 @@ class KeyboardTeleop(Teleoperator):
def configure(self):
pass
@check_if_not_connected
def get_action(self) -> RobotAction:
before_read_t = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
)
self._drain_pressed_keys()
# Generate action based on current key states
@@ -144,11 +136,8 @@ class KeyboardTeleop(Teleoperator):
def send_feedback(self, feedback: dict[str, Any]) -> None:
pass
@check_if_not_connected
def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `robot.connect()` before `disconnect()`."
)
if self.listener is not None:
self.listener.stop()
@@ -182,12 +171,8 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
"names": {"delta_x": 0, "delta_y": 1, "delta_z": 2},
}
@check_if_not_connected
def get_action(self) -> RobotAction:
if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
)
self._drain_pressed_keys()
delta_x = 0.0
delta_y = 0.0
@@ -375,6 +360,7 @@ class KeyboardRoverTeleop(KeyboardTeleop):
# Only remove key if it's being released
self.current_pressed.pop(key_char, None)
@check_if_not_connected
def get_action(self) -> RobotAction:
"""
Get the current action based on pressed keys.
@@ -384,11 +370,6 @@ class KeyboardRoverTeleop(KeyboardTeleop):
"""
before_read_t = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardRoverTeleop is not connected. You need to run `connect()` before `get_action()`."
)
self._drain_pressed_keys()
linear_velocity = 0.0