mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-26 05:59:52 +00:00
feat(teleoperation): introduce HasTeleopEvents protocol and enhance teleop event handling (#1866)
- Added the HasTeleopEvents protocol to define a standard for teleoperators that provide control events. - Implemented a runtime check to ensure teleoperators implement the get_teleop_events() method. - Updated AddTeleopEventsAsInfoStep to utilize the new protocol, enhancing compatibility with custom teleoperators. - Improved documentation for clarity on teleoperation event extraction and compatibility with built-in teleoperators.
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
import math
|
import math
|
||||||
import time
|
import time
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
from typing import Any, Protocol, TypeVar, runtime_checkable
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import torch
|
import torch
|
||||||
@@ -27,6 +27,40 @@ DISCRETE_PENALTY_KEY = "discrete_penalty"
|
|||||||
TELEOP_ACTION_KEY = "teleop_action"
|
TELEOP_ACTION_KEY = "teleop_action"
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class HasTeleopEvents(Protocol):
|
||||||
|
"""Minimal protocol for objects that provide teleoperation events.
|
||||||
|
|
||||||
|
This protocol only defines the additional get_teleop_events() method,
|
||||||
|
avoiding duplication of the entire Teleoperator interface.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get_teleop_events(self) -> dict[str, Any]:
|
||||||
|
"""Get extra control events from the teleoperator.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing control events such as:
|
||||||
|
- is_intervention: bool - Whether human is currently intervening
|
||||||
|
- terminate_episode: bool - Whether to terminate the current episode
|
||||||
|
- success: bool - Whether the episode was successful
|
||||||
|
- rerecord_episode: bool - Whether to rerecord the episode
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
# Type variable constrained to Teleoperator subclasses that also implement events
|
||||||
|
TeleopWithEvents = TypeVar("TeleopWithEvents", bound=Teleoperator)
|
||||||
|
|
||||||
|
|
||||||
|
def _check_teleop_with_events(teleop: Teleoperator) -> None:
|
||||||
|
"""Runtime check that a teleoperator implements get_teleop_events."""
|
||||||
|
if not isinstance(teleop, HasTeleopEvents):
|
||||||
|
raise TypeError(
|
||||||
|
f"Teleoperator {type(teleop).__name__} must implement get_teleop_events() method. "
|
||||||
|
f"Compatible teleoperators: GamepadTeleop, KeyboardEndEffectorTeleop"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@ProcessorStepRegistry.register("add_teleop_action_as_complementary_data")
|
@ProcessorStepRegistry.register("add_teleop_action_as_complementary_data")
|
||||||
@dataclass
|
@dataclass
|
||||||
class AddTeleopActionAsComplimentaryDataStep(ComplementaryDataProcessorStep):
|
class AddTeleopActionAsComplimentaryDataStep(ComplementaryDataProcessorStep):
|
||||||
@@ -46,13 +80,29 @@ class AddTeleopActionAsComplimentaryDataStep(ComplementaryDataProcessorStep):
|
|||||||
@ProcessorStepRegistry.register("add_teleop_action_as_info")
|
@ProcessorStepRegistry.register("add_teleop_action_as_info")
|
||||||
@dataclass
|
@dataclass
|
||||||
class AddTeleopEventsAsInfoStep(InfoProcessorStep):
|
class AddTeleopEventsAsInfoStep(InfoProcessorStep):
|
||||||
"""Add teleoperator control events to transition info."""
|
"""Add teleoperator control events to transition info.
|
||||||
|
|
||||||
teleop_device: Teleoperator
|
This processor step extracts control events from teleoperators that support
|
||||||
|
event-based interaction (intervention detection, episode termination, etc.).
|
||||||
|
|
||||||
|
Works with any teleoperator that inherits from Teleoperator and implements the
|
||||||
|
get_teleop_events() method, including custom user-defined teleoperators.
|
||||||
|
|
||||||
|
Built-in compatible teleoperators:
|
||||||
|
- GamepadTeleop: Uses gamepad buttons for control events
|
||||||
|
- KeyboardEndEffectorTeleop: Uses keyboard keys for control events
|
||||||
|
"""
|
||||||
|
|
||||||
|
teleop_device: TeleopWithEvents
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
"""Validate that the teleoperator supports events."""
|
||||||
|
_check_teleop_with_events(self.teleop_device)
|
||||||
|
|
||||||
def info(self, info: dict) -> dict:
|
def info(self, info: dict) -> dict:
|
||||||
new_info = dict(info)
|
new_info = dict(info)
|
||||||
teleop_events = getattr(self.teleop_device, "get_teleop_events", lambda: {})()
|
|
||||||
|
teleop_events = self.teleop_device.get_teleop_events()
|
||||||
new_info.update(teleop_events)
|
new_info.update(teleop_events)
|
||||||
return new_info
|
return new_info
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user