fix(teleop): add is_connected check to get_action (#2801)

This commit is contained in:
Steven Palma
2026-01-14 17:14:12 +01:00
committed by GitHub
parent 15724826dd
commit 1c61b43b15
6 changed files with 24 additions and 0 deletions
@@ -18,6 +18,7 @@ import logging
from functools import cached_property from functools import cached_property
from lerobot.teleoperators.so_leader import SOLeaderTeleopConfig from lerobot.teleoperators.so_leader import SOLeaderTeleopConfig
from lerobot.utils.errors import DeviceNotConnectedError
from ..so_leader import SOLeader from ..so_leader import SOLeader
from ..teleoperator import Teleoperator from ..teleoperator import Teleoperator
@@ -92,6 +93,9 @@ class BiSOLeader(Teleoperator):
self.right_arm.setup_motors() self.right_arm.setup_motors()
def get_action(self) -> dict[str, float]: def get_action(self) -> dict[str, float]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
action_dict = {} action_dict = {}
# Add "left_" prefix # Add "left_" prefix
@@ -21,6 +21,7 @@ from typing import Any
import numpy as np import numpy as np
from lerobot.processor import RobotAction from lerobot.processor import RobotAction
from lerobot.utils.errors import DeviceNotConnectedError
from ..teleoperator import Teleoperator from ..teleoperator import Teleoperator
from ..utils import TeleopEvents from ..utils import TeleopEvents
@@ -86,6 +87,9 @@ class GamepadTeleop(Teleoperator):
self.gamepad.start() self.gamepad.start()
def get_action(self) -> RobotAction: def get_action(self) -> RobotAction:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Update the controller to get fresh inputs # Update the controller to get fresh inputs
self.gamepad.update() self.gamepad.update()
@@ -158,6 +162,7 @@ class GamepadTeleop(Teleoperator):
self.gamepad.stop() self.gamepad.stop()
self.gamepad = None self.gamepad = None
@property
def is_connected(self) -> bool: def is_connected(self) -> bool:
"""Check if gamepad is connected.""" """Check if gamepad is connected."""
return self.gamepad is not None return self.gamepad is not None
@@ -300,6 +300,9 @@ class HomunculusArm(Teleoperator):
logger.debug(f"Error reading frame in background thread for {self}: {e}") logger.debug(f"Error reading frame in background thread for {self}: {e}")
def get_action(self) -> dict[str, float]: def get_action(self) -> dict[str, float]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
joint_positions = self._read() joint_positions = self._read()
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()} return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
@@ -326,6 +326,9 @@ class HomunculusGlove(Teleoperator):
logger.debug(f"Error reading frame in background thread for {self}: {e}") logger.debug(f"Error reading frame in background thread for {self}: {e}")
def get_action(self) -> dict[str, float]: def get_action(self) -> dict[str, float]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
joint_positions = self._read() joint_positions = self._read()
return homunculus_glove_to_hope_jr_hand( return homunculus_glove_to_hope_jr_hand(
{f"{joint}.pos": pos for joint, pos in joint_positions.items()} {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
@@ -165,6 +165,9 @@ class IOSPhone(BasePhone, Teleoperator):
return True, pos, rot, pose return True, pos, rot, pose
def get_action(self) -> dict: def get_action(self) -> dict:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
has_pose, raw_position, raw_rotation, fb_pose = self._read_current_pose() has_pose, raw_position, raw_rotation, fb_pose = self._read_current_pose()
if not has_pose or not self.is_calibrated: if not has_pose or not self.is_calibrated:
return {} return {}
@@ -319,6 +322,9 @@ class AndroidPhone(BasePhone, Teleoperator):
self._latest_message = message self._latest_message = message
def get_action(self) -> dict: def get_action(self) -> dict:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
ok, raw_pos, raw_rot, pose = self._read_current_pose() ok, raw_pos, raw_rot, pose = self._read_current_pose()
if not ok or not self.is_calibrated: if not ok or not self.is_calibrated:
return {} return {}
@@ -140,6 +140,9 @@ class SOLeader(Teleoperator):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_action(self) -> dict[str, float]: def get_action(self) -> dict[str, float]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start = time.perf_counter() start = time.perf_counter()
action = self.bus.sync_read("Present_Position") action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()} action = {f"{motor}.pos": val for motor, val in action.items()}