From a988da478909944b38a639fa644b876ccc708a9e Mon Sep 17 00:00:00 2001 From: Adil Zouitine Date: Thu, 4 Sep 2025 16:28:49 +0200 Subject: [PATCH] feat(teleoperation): introduce HasTeleopEvents protocol and enhance teleop event handling (#1866) - Added the HasTeleopEvents protocol to define a standard for teleoperators that provide control events. - Implemented a runtime check to ensure teleoperators implement the get_teleop_events() method. - Updated AddTeleopEventsAsInfoStep to utilize the new protocol, enhancing compatibility with custom teleoperators. - Improved documentation for clarity on teleoperation event extraction and compatibility with built-in teleoperators. --- src/lerobot/processor/hil_processor.py | 58 ++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/src/lerobot/processor/hil_processor.py b/src/lerobot/processor/hil_processor.py index 3c1570eaf..70dc416bf 100644 --- a/src/lerobot/processor/hil_processor.py +++ b/src/lerobot/processor/hil_processor.py @@ -1,7 +1,7 @@ import math import time from dataclasses import dataclass -from typing import Any +from typing import Any, Protocol, TypeVar, runtime_checkable import numpy as np import torch @@ -27,6 +27,40 @@ DISCRETE_PENALTY_KEY = "discrete_penalty" TELEOP_ACTION_KEY = "teleop_action" +@runtime_checkable +class HasTeleopEvents(Protocol): + """Minimal protocol for objects that provide teleoperation events. + + This protocol only defines the additional get_teleop_events() method, + avoiding duplication of the entire Teleoperator interface. + """ + + def get_teleop_events(self) -> dict[str, Any]: + """Get extra control events from the teleoperator. + + Returns: + Dictionary containing control events such as: + - is_intervention: bool - Whether human is currently intervening + - terminate_episode: bool - Whether to terminate the current episode + - success: bool - Whether the episode was successful + - rerecord_episode: bool - Whether to rerecord the episode + """ + ... + + +# Type variable constrained to Teleoperator subclasses that also implement events +TeleopWithEvents = TypeVar("TeleopWithEvents", bound=Teleoperator) + + +def _check_teleop_with_events(teleop: Teleoperator) -> None: + """Runtime check that a teleoperator implements get_teleop_events.""" + if not isinstance(teleop, HasTeleopEvents): + raise TypeError( + f"Teleoperator {type(teleop).__name__} must implement get_teleop_events() method. " + f"Compatible teleoperators: GamepadTeleop, KeyboardEndEffectorTeleop" + ) + + @ProcessorStepRegistry.register("add_teleop_action_as_complementary_data") @dataclass class AddTeleopActionAsComplimentaryDataStep(ComplementaryDataProcessorStep): @@ -46,13 +80,29 @@ class AddTeleopActionAsComplimentaryDataStep(ComplementaryDataProcessorStep): @ProcessorStepRegistry.register("add_teleop_action_as_info") @dataclass class AddTeleopEventsAsInfoStep(InfoProcessorStep): - """Add teleoperator control events to transition info.""" + """Add teleoperator control events to transition info. - teleop_device: Teleoperator + This processor step extracts control events from teleoperators that support + event-based interaction (intervention detection, episode termination, etc.). + + Works with any teleoperator that inherits from Teleoperator and implements the + get_teleop_events() method, including custom user-defined teleoperators. + + Built-in compatible teleoperators: + - GamepadTeleop: Uses gamepad buttons for control events + - KeyboardEndEffectorTeleop: Uses keyboard keys for control events + """ + + teleop_device: TeleopWithEvents + + def __post_init__(self): + """Validate that the teleoperator supports events.""" + _check_teleop_with_events(self.teleop_device) def info(self, info: dict) -> dict: new_info = dict(info) - teleop_events = getattr(self.teleop_device, "get_teleop_events", lambda: {})() + + teleop_events = self.teleop_device.get_teleop_events() new_info.update(teleop_events) return new_info