precommit style nit

This commit is contained in:
Michel Aractingi
2025-08-06 17:44:32 +02:00
committed by Adil Zouitine
parent 8d5f519fcb
commit feb3fed5e8
+64 -56
View File
@@ -1,13 +1,13 @@
# Implement your processor # Implement your processor
In this tutorial, we will explain how to implement your own processor. We will start by motivating In this tutorial, we will explain how to implement your own processor. We will start by motivating
the need for a custom processor and then we will explain the helper classes that can help you implement your own processor. the need for a custom processor and then we will explain the helper classes that can help you implement your own processor.
## Why would you need a custom processor? ## Why would you need a custom processor?
In most cases, when reading raw data from a sensor like the camera and robot motor encoders, In most cases, when reading raw data from a sensor like the camera and robot motor encoders,
you will need to process this data to transform it into a format that is compatible to use with the policies in LeRobot. you will need to process this data to transform it into a format that is compatible to use with the policies in LeRobot.
For example, raw images are encoded with `uint8` and the values are in the range `[0, 255]`. For example, raw images are encoded with `uint8` and the values are in the range `[0, 255]`.
To use these images with the policies, you will need to cast them to `float32` and normalize them to the range `[0, 1]`. To use these images with the policies, you will need to cast them to `float32` and normalize them to the range `[0, 1]`.
For example, in LeRobot's `ImageProcessor`, raw images come from the environment as numpy arrays with `uint8` values in range `[0, 255]` and in channel-last format `(H, W, C)`. The processor transforms them into PyTorch tensors with `float32` values in range `[0, 1]` and channel-first format `(C, H, W)`: For example, in LeRobot's `ImageProcessor`, raw images come from the environment as numpy arrays with `uint8` values in range `[0, 255]` and in channel-last format `(H, W, C)`. The processor transforms them into PyTorch tensors with `float32` values in range `[0, 1]` and channel-first format `(C, H, W)`:
@@ -16,12 +16,12 @@ For example, in LeRobot's `ImageProcessor`, raw images come from the environment
# Input: numpy array with shape (480, 640, 3) and dtype uint8 # Input: numpy array with shape (480, 640, 3) and dtype uint8
raw_image = env_observation["pixels"] # Values in [0, 255] raw_image = env_observation["pixels"] # Values in [0, 255]
# After processing: torch tensor with shape (1, 3, 480, 640) and dtype float32 # After processing: torch tensor with shape (1, 3, 480, 640) and dtype float32
processed_image = processor(transition)["observation"]["observation.image"] # Values in [0, 1] processed_image = processor(transition)["observation"]["observation.image"] # Values in [0, 1]
``` ```
On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot. On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot.
For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minumum and maximum joint angle positions of the robot. For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minimum and maximum joint angle positions of the robot.
For instance, in LeRobot's `UnnormalizerProcessor`, model outputs are in the normalized range `[-1, 1]` and need to be converted back to actual robot joint ranges: For instance, in LeRobot's `UnnormalizerProcessor`, model outputs are in the normalized range `[-1, 1]` and need to be converted back to actual robot joint ranges:
@@ -31,21 +31,22 @@ normalized_action = torch.tensor([-0.5, 0.8, -1.0, 0.2]) # Model output
# After post-processing: real joint positions in robot's native ranges # After post-processing: real joint positions in robot's native ranges
# Example: joints range from [-180.0, 180.0] # Example: joints range from [-180.0, 180.0]
real_action = unnormalizer(transition)["action"] real_action = unnormalizer(transition)["action"]
# real action after post-processing: [ -90., 144., -180., 36.] # real action after post-processing: [ -90., 144., -180., 36.]
``` ```
The unnormalizer uses the dataset statistics to convert back: The unnormalizer uses the dataset statistics to convert back:
```python ```python
# For MIN_MAX normalization: action = (normalized + 1) * (max - min) / 2 + min # For MIN_MAX normalization: action = (normalized + 1) * (max - min) / 2 + min
real_action = (normalized_action + 1) * (max_val - min_val) / 2 + min_val real_action = (normalized_action + 1) * (max_val - min_val) / 2 + min_val
``` ```
All this situation point us towards the need for a mechanism to preprocess the data before inputed to the policies and then post-process the action that are returend to be executed on the robot. All this situation point us towards the need for a mechanism to preprocess the data before being passed to the policies and then post-process the action that are returned to be executed on the robot.
To that end, LeRobot provides a pipeline mechanism to implement a sequence of processing steps for the input data and the output action. To that end, LeRobot provides a pipeline mechanism to implement a sequence of processing steps for the input data and the output action.
## How to implement your own processor? ## How to implement your own processor?
Prepare the sequence of processing steps necessary for your problem. A processor step is a class that implements the following methods: Prepare the sequence of processing steps necessary for your problem. A processor step is a class that implements the following methods:
@@ -70,68 +71,71 @@ from lerobot.processor.pipeline import EnvTransition, TransitionKey
class ImageProcessor: class ImageProcessor:
def __call__(self, transition: EnvTransition) -> EnvTransition: def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION) observation = transition.get(TransitionKey.OBSERVATION)
if observation is None: if observation is None:
return transition return transition
processed_obs = {} processed_obs = {}
# Copy all observations first # Copy all observations first
for key, value in observation.items(): for key, value in observation.items():
processed_obs[key] = value processed_obs[key] = value
# Handle pixels key if present # Handle pixels key if present
pixels = observation.get("pixels") pixels = observation.get("pixels")
if pixels is not None: if pixels is not None:
# Remove pixels from processed_obs since we'll replace it with processed images # Remove pixels from processed_obs since we'll replace it with processed images
processed_obs.pop("pixels", None) processed_obs.pop("pixels", None)
# Process the image # Process the image
processed_img = self._process_single_image(pixels) processed_img = self._process_single_image(pixels)
processed_obs["observation.image"] = processed_img processed_obs["observation.image"] = processed_img
# Return new transition with processed observation # Return new transition with processed observation
new_transition = transition.copy() new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = processed_obs new_transition[TransitionKey.OBSERVATION] = processed_obs
return new_transition return new_transition
def _process_single_image(self, img): def _process_single_image(self, img):
# Convert to tensor # Convert to tensor
img_tensor = torch.from_numpy(img) img_tensor = torch.from_numpy(img)
# Add batch dimension if needed # Add batch dimension if needed
if img_tensor.ndim == 3: if img_tensor.ndim == 3:
img_tensor = img_tensor.unsqueeze(0) img_tensor = img_tensor.unsqueeze(0)
# Convert to channel-first format: (B, H, W, C) -> (B, C, H, W) # Convert to channel-first format: (B, H, W, C) -> (B, C, H, W)
img_tensor = einops.rearrange(img_tensor, "b h w c -> b c h w").contiguous() img_tensor = einops.rearrange(img_tensor, "b h w c -> b c h w").contiguous()
# Convert to float32 and normalize to [0, 1] # Convert to float32 and normalize to [0, 1]
img_tensor = img_tensor.type(torch.float32) / 255.0 img_tensor = img_tensor.type(torch.float32) / 255.0
return img_tensor return img_tensor
``` ```
Key principles for implementing `__call__`: Key principles for implementing `__call__`:
- Always check if the required data exists (observations, actions, etc.) - Always check if the required data exists (observations, actions, etc.)
- Return the original transition unchanged if no processing is needed - Return the original transition unchanged if no processing is needed
- Create a copy of the transition to avoid side effects - Create a copy of the transition to avoid side effects
- Only modify the specific keys your processor is responsible for - Only modify the specific keys your processor is responsible for
### Configuration and State Management ### Configuration and State Management
LeRobot processors support serialization and deserialization through three key methods. Here's how they work using `NormalizerProcessor` as an example: LeRobot processors support serialization and deserialization through three key methods. Here's how they work using `NormalizerProcessor` as an example:
#### `get_config()` - Serializable Configuration #### `get_config()` - Serializable Configuration
This method returns all non-tensor configuration that can be saved to JSON: This method returns all non-tensor configuration that can be saved to JSON:
```python ```python
@dataclass @dataclass
class NormalizerProcessor: class NormalizerProcessor:
features: dict[str, PolicyFeature] features: dict[str, PolicyFeature]
norm_map: dict[FeatureType, NormalizationMode] norm_map: dict[FeatureType, NormalizationMode]
normalize_keys: set[str] | None = None normalize_keys: set[str] | None = None
eps: float = 1e-8 eps: float = 1e-8
def get_config(self) -> dict[str, Any]: def get_config(self) -> dict[str, Any]:
"""Return JSON-serializable configuration.""" """Return JSON-serializable configuration."""
return { return {
@@ -144,11 +148,12 @@ class NormalizerProcessor:
``` ```
#### `state_dict()` - Tensor State #### `state_dict()` - Tensor State
This method returns only PyTorch tensors that need special serialization: This method returns only PyTorch tensors that need special serialization:
```python ```python
def state_dict(self) -> dict[str, torch.Tensor]: def state_dict(self) -> dict[str, torch.Tensor]:
"""Return tensor state dictionary.""" """Return tensor state dictionary."""
state = {} state = {}
for key, stats in self._tensor_stats.items(): for key, stats in self._tensor_stats.items():
for stat_name, tensor_val in stats.items(): for stat_name, tensor_val in stats.items():
@@ -156,7 +161,8 @@ def state_dict(self) -> dict[str, torch.Tensor]:
return state return state
``` ```
#### `load_state_dict()` - Restore Tensor State #### `load_state_dict()` - Restore Tensor State
This method restores the tensor state from a saved state dictionary: This method restores the tensor state from a saved state dictionary:
```python ```python
@@ -173,6 +179,7 @@ def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
``` ```
#### Usage Example #### Usage Example
```python ```python
# Save processor # Save processor
config = processor.get_config() config = processor.get_config()
@@ -202,7 +209,7 @@ def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, Poli
features["observation.image"] = features.pop("pixels") features["observation.image"] = features.pop("pixels")
if "observation.pixels" in features: if "observation.pixels" in features:
features["observation.image"] = features.pop("observation.pixels") features["observation.image"] = features.pop("observation.pixels")
# Handle camera-specific pixels # Handle camera-specific pixels
prefixes = ("pixels.", "observation.pixels.") prefixes = ("pixels.", "observation.pixels.")
for key in list(features.keys()): for key in list(features.keys()):
@@ -236,6 +243,7 @@ def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, Poli
``` ```
**Key principles:** **Key principles:**
- Use `features.pop(old_key)` to remove the old feature and get its value - Use `features.pop(old_key)` to remove the old feature and get its value
- Use `features[new_key] = old_feature` to add the new feature with same properties - Use `features[new_key] = old_feature` to add the new feature with same properties
- Always return the modified features dictionary - Always return the modified features dictionary
@@ -248,7 +256,7 @@ LeRobot provides several pre-built processor classes that handle common transfor
### Core Processing Classes ### Core Processing Classes
- **`ImageProcessor`** - Converts images from numpy arrays (uint8, channel-last) to PyTorch tensors (float32, channel-first) - **`ImageProcessor`** - Converts images from numpy arrays (uint8, channel-last) to PyTorch tensors (float32, channel-first)
- **`StateProcessor`** - Handles state observations, converting numpy arrays to tensors and renaming keys - **`StateProcessor`** - Handles state observations, converting numpy arrays to tensors and renaming keys
- **`NormalizerProcessor`** - Normalizes observations and actions using dataset statistics (mean/std or min/max) - **`NormalizerProcessor`** - Normalizes observations and actions using dataset statistics (mean/std or min/max)
- **`UnnormalizerProcessor`** - Inverse of NormalizerProcessor, converts normalized values back to original ranges - **`UnnormalizerProcessor`** - Inverse of NormalizerProcessor, converts normalized values back to original ranges
@@ -268,8 +276,8 @@ LeRobot provides several pre-built processor classes that handle common transfor
```python ```python
from lerobot.processor import ( from lerobot.processor import (
ImageProcessor, ImageProcessor,
NormalizerProcessor, NormalizerProcessor,
DeviceProcessor, DeviceProcessor,
RobotProcessor RobotProcessor
) )
@@ -310,7 +318,7 @@ class MyProcessor:
```python ```python
@ProcessorStepRegistry.register("my_custom_processor") @ProcessorStepRegistry.register("my_custom_processor")
@dataclass @dataclass
class MyCustomProcessor: class MyCustomProcessor:
# Implementation # Implementation
``` ```
@@ -323,8 +331,8 @@ class MyCustomProcessor:
def get_config(self) -> dict[str, Any]: def get_config(self) -> dict[str, Any]:
# JSON-serializable data only # JSON-serializable data only
return {"threshold": self.threshold, "mode": self.mode} return {"threshold": self.threshold, "mode": self.mode}
def state_dict(self) -> dict[str, torch.Tensor]: def state_dict(self) -> dict[str, torch.Tensor]:
# Tensors only # Tensors only
return {"running_mean": self.running_mean, "weights": self.weights} return {"running_mean": self.running_mean, "weights": self.weights}
``` ```
@@ -388,63 +396,63 @@ import torch
from torch import Tensor from torch import Tensor
from lerobot.processor.pipeline import ( from lerobot.processor.pipeline import (
EnvTransition, EnvTransition,
ProcessorStepRegistry, ProcessorStepRegistry,
TransitionKey TransitionKey
) )
from lerobot.configs.types import PolicyFeature from lerobot.configs.types import PolicyFeature
@ProcessorStepRegistry.register("action_smoother") @ProcessorStepRegistry.register("action_smoother")
@dataclass @dataclass
class ActionSmoothingProcessor: class ActionSmoothingProcessor:
"""Smooths actions using exponential moving average. """Smooths actions using exponential moving average.
This processor maintains a running average of actions to reduce jitter This processor maintains a running average of actions to reduce jitter
from the policy predictions. from the policy predictions.
""" """
# Configuration # Configuration
alpha: float = 0.7 # Smoothing factor (0 = no smoothing, 1 = no memory) alpha: float = 0.7 # Smoothing factor (0 = no smoothing, 1 = no memory)
# State (not in config) # State (not in config)
_previous_action: Tensor | None = field(default=None, init=False, repr=False) _previous_action: Tensor | None = field(default=None, init=False, repr=False)
_initialized: bool = field(default=False, init=False, repr=False) _initialized: bool = field(default=False, init=False, repr=False)
def __call__(self, transition: EnvTransition) -> EnvTransition: def __call__(self, transition: EnvTransition) -> EnvTransition:
action = transition.get(TransitionKey.ACTION) action = transition.get(TransitionKey.ACTION)
if action is None: if action is None:
return transition return transition
# Convert to tensor if needed # Convert to tensor if needed
if not isinstance(action, torch.Tensor): if not isinstance(action, torch.Tensor):
action = torch.as_tensor(action, dtype=torch.float32) action = torch.as_tensor(action, dtype=torch.float32)
# Initialize on first call # Initialize on first call
if not self._initialized: if not self._initialized:
self._previous_action = action.clone() self._previous_action = action.clone()
self._initialized = True self._initialized = True
smoothed_action = action smoothed_action = action
else: else:
# Exponential moving average: new = alpha * current + (1-alpha) * previous # Exponential moving average: new = alpha * current + (1-alpha) * previous
smoothed_action = self.alpha * action + (1 - self.alpha) * self._previous_action smoothed_action = self.alpha * action + (1 - self.alpha) * self._previous_action
self._previous_action = smoothed_action.clone() self._previous_action = smoothed_action.clone()
# Return new transition with smoothed action # Return new transition with smoothed action
new_transition = transition.copy() new_transition = transition.copy()
new_transition[TransitionKey.ACTION] = smoothed_action new_transition[TransitionKey.ACTION] = smoothed_action
return new_transition return new_transition
def get_config(self) -> dict[str, Any]: def get_config(self) -> dict[str, Any]:
"""Return JSON-serializable configuration.""" """Return JSON-serializable configuration."""
return {"alpha": self.alpha} return {"alpha": self.alpha}
def state_dict(self) -> dict[str, torch.Tensor]: def state_dict(self) -> dict[str, torch.Tensor]:
"""Return tensor state.""" """Return tensor state."""
if self._previous_action is not None: if self._previous_action is not None:
return {"previous_action": self._previous_action} return {"previous_action": self._previous_action}
return {} return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
"""Load tensor state.""" """Load tensor state."""
if "previous_action" in state: if "previous_action" in state:
@@ -453,12 +461,12 @@ class ActionSmoothingProcessor:
else: else:
self._previous_action = None self._previous_action = None
self._initialized = False self._initialized = False
def reset(self) -> None: def reset(self) -> None:
"""Reset processor state at episode boundaries.""" """Reset processor state at episode boundaries."""
self._previous_action = None self._previous_action = None
self._initialized = False self._initialized = False
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
"""Action shapes remain unchanged.""" """Action shapes remain unchanged."""
return features # No transformation to feature space return features # No transformation to feature space
@@ -476,19 +484,19 @@ postprocessing_steps = [
] ]
postprocessor = RobotProcessor( postprocessor = RobotProcessor(
steps=postprocessing_steps, steps=postprocessing_steps,
name="smooth_postprocessor" name="smooth_postprocessor"
) )
# Use in your policy inference loop # Use in your policy inference loop
for transition in environment_transitions: for transition in environment_transitions:
# Get action from policy # Get action from policy
action = policy(transition) action = policy(transition)
# Post-process (including smoothing) # Post-process (including smoothing)
transition_with_action = {"action": action} transition_with_action = {"action": action}
smoothed_transition = postprocessor(transition_with_action) smoothed_transition = postprocessor(transition_with_action)
# Execute smoothed action # Execute smoothed action
next_obs = env.step(smoothed_transition["action"]) next_obs = env.step(smoothed_transition["action"])
``` ```