Improved doc implement_your_own_pipeline

- Use normalization processor as default example
- Add section on transform features
- Add section on overrides.
This commit is contained in:
Michel Aractingi
2025-08-08 00:58:59 +02:00
parent 6d4e983197
commit 2a5c757d58
+144 -68
View File
@@ -1,8 +1,7 @@
# Implement your own Robot Processor # Implement your own Robot Processor
In this tutorial, you'll learn how to implement your own Robot Processor. In this tutorial, you'll learn how to implement your own Robot Processor.
It begins by exploring the need for a custom processor, followed by an explanation of how to implement one. It begins by exploring the need for a custom processor, then uses the Normalization processors as the running example to explain how to implement, configure, and serialize a processor. Finally, it lists all helper processors that ship with LeRobot.
The tutorial also covers the set of helper classes that are available in LeRobot to support the implementation.
## Why would you need a custom processor? ## Why would you need a custom processor?
@@ -24,7 +23,9 @@ processed_image = processor(transition)["observation"]["observation.image"] # V
On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot. On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot.
For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minimum and maximum joint angle positions of the robot. For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minimum and maximum joint angle positions of the robot.
For instance, in LeRobot's `UnnormalizerProcessor`, model outputs are in the normalized range `[-1, 1]` and need to be converted back to actual robot joint ranges: In LeRobot, this normalization workflow is handled by the `NormalizerProcessor` (for inputs) and the `UnnormalizerProcessor` (for outputs). These processors are heavily used by policies (e.g., Pi0, SmolVLA) and integrate tightly with the `RobotProcessor`'s `get_config`, `state_dict`, and `load_state_dict` APIs.
For instance, `UnnormalizerProcessor` converts model outputs in `[-1, 1]` back to actual robot joint ranges:
```python ```python
# Input: model action with normalized values in [-1, 1] # Input: model action with normalized values in [-1, 1]
@@ -43,12 +44,14 @@ The unnormalizer uses the dataset statistics to convert back:
real_action = (normalized_action + 1) * (max_val - min_val) / 2 + min_val real_action = (normalized_action + 1) * (max_val - min_val) / 2 + min_val
``` ```
All this situation point us towards the need for a mechanism to preprocess the data before being passed to the policies and then post-process the action that are returned to be executed on the robot. All these situations point us towards the need for a mechanism to preprocess the data before being passed to the policies and then post-process the action that are returned to be executed on the robot.
To that end, LeRobot provides a pipeline mechanism to implement a sequence of processing steps for the input data and the output action. To that end, LeRobot provides a pipeline mechanism to implement a sequence of processing steps for the input data and the output action.
## How to implement your own processor? ## How to implement your own processor?
We'll use the `NormalizerProcessor` as a concrete running example because it is central to most policies and demonstrates configuration and state serialization cleanly.
Prepare the sequence of processing steps necessary for your problem. A processor step is a class that implements the following methods: Prepare the sequence of processing steps necessary for your problem. A processor step is a class that implements the following methods:
- `__call__`: implements the processing step for the input transition. - `__call__`: implements the processing step for the input transition.
@@ -60,33 +63,46 @@ Prepare the sequence of processing steps necessary for your problem. A processor
### Implement the `__call__` method ### Implement the `__call__` method
The `__call__` method is the core of your processor step. It takes an `EnvTransition` and returns a modified `EnvTransition`. Here's a minimal example: The `__call__` method is the core of your processor step. It takes an `EnvTransition` and returns a modified `EnvTransition`. Here's how the `NormalizerProcessor` conceptually works (simplified):
```python ```python
from dataclasses import dataclass from dataclasses import dataclass
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.processor.pipeline import EnvTransition, TransitionKey from lerobot.processor.pipeline import EnvTransition, TransitionKey
@dataclass @dataclass
class MyProcessor: class NormalizerProcessor:
features: dict[str, PolicyFeature]
norm_map: dict[FeatureType, NormalizationMode]
stats: dict[str, dict[str, torch.Tensor]]
eps: float = 1e-8
def __call__(self, transition: EnvTransition) -> EnvTransition: def __call__(self, transition: EnvTransition) -> EnvTransition:
# Check if the required data exists normalized_info = {}
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
# Process the data obs = transition.get(TransitionKey.OBSERVATION)
processed_obs = self._process_observation(observation) act = transition.get(TransitionKey.ACTION)
new_obs = self._normalize_observation(obs, normalized_info)
new_act = self._normalize_action(act, normalized_info)
# Return new transition with processed data
new_transition = transition.copy() new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = processed_obs new_transition[TransitionKey.OBSERVATION] = new_obs
return new_transition new_transition[TransitionKey.ACTION] = new_act
def _process_observation(self, obs): # Record what was normalized into complementary_data
# Your custom processing logic here if normalized_info:
return obs comp = new_transition.get(TransitionKey.COMPLEMENTARY_DATA) or {}
comp = dict(comp)
comp["normalized_keys"] = normalized_info
new_transition[TransitionKey.COMPLEMENTARY_DATA] = comp
return new_transition
``` ```
See the full implementation in `src/lerobot/processor/normalize_processor.py` for details on mean/std and min/max modes and key selection.
**Key principles:** **Key principles:**
- Always check if required data exists before processing - Always check if required data exists before processing
@@ -94,63 +110,69 @@ class MyProcessor:
- Use `transition.copy()` to avoid side effects - Use `transition.copy()` to avoid side effects
- Only modify the specific keys your processor handles - Only modify the specific keys your processor handles
**Tip**: For observation-only processors, inherit from `ObservationProcessor` to avoid writing `__call__` boilerplate: **Tip**: For observation-only processors, you can inherit from `ObservationProcessor` to avoid writing `__call__` boilerplate. The normalizer is mixed (observations and actions), so it implements `__call__` directly.
```python
from lerobot.processor.pipeline import ObservationProcessor
@dataclass
class MyObservationProcessor(ObservationProcessor):
def observation(self, observation):
# Only implement this method - __call__ is handled automatically
return self._process_observation(observation)
```
### Configuration and State Management ### Configuration and State Management
Processors support serialization through three methods that separate configuration from tensor state: Processors support serialization through three methods that separate configuration from tensor state. This is especially important for normalization processors, which carry dataset statistics (tensors) in their state, and hyperparameters in their config:
```python ```python
from dataclasses import dataclass, field
from typing import Any
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
@dataclass @dataclass
class MyProcessor: class NormalizerProcessor:
threshold: float = 0.5 features: dict[str, PolicyFeature]
_running_mean: torch.Tensor = field(default=None, init=False) norm_map: dict[FeatureType, NormalizationMode]
eps: float = 1e-8
_tensor_stats: dict[str, dict[str, torch.Tensor]] = field(default_factory=dict, init=False, repr=False)
def get_config(self) -> dict[str, Any]: def get_config(self) -> dict[str, Any]:
"""Return JSON-serializable configuration.""" """JSON-serializable configuration (no tensors)."""
return {"threshold": self.threshold} return {
"eps": self.eps,
"features": {k: {"type": v.type.value, "shape": v.shape} for k, v in self.features.items()},
"norm_map": {ft.value: nm.value for ft, nm in self.norm_map.items()},
}
def state_dict(self) -> dict[str, torch.Tensor]: def state_dict(self) -> dict[str, torch.Tensor]:
"""Return tensor state only.""" """Tensor state only (e.g., dataset statistics)."""
if self._running_mean is not None: flat: dict[str, torch.Tensor] = {}
return {"running_mean": self._running_mean} for key, sub in self._tensor_stats.items():
return {} for stat_name, tensor in sub.items():
flat[f"{key}.{stat_name}"] = tensor
return flat
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
"""Restore tensor state.""" """Restore tensor state at runtime."""
if "running_mean" in state: self._tensor_stats.clear()
self._running_mean = state["running_mean"] for flat_key, tensor in state.items():
key, stat_name = flat_key.rsplit(".", 1)
self._tensor_stats.setdefault(key, {})[stat_name] = tensor
``` ```
**Usage:** **Usage:**
```python ```python
# Save # Save (e.g., inside a policy)
config = processor.get_config() config = processor.get_config()
tensors = processor.state_dict() tensors = processor.state_dict()
# Restore # Restore (e.g., loading a pretrained policy)
new_processor = MyProcessor(**config) new_processor = NormalizerProcessor(**config)
new_processor.load_state_dict(tensors) new_processor.load_state_dict(tensors)
``` ```
### Feature Contract ### Transform features
The `feature_contract` method defines how your processor transforms feature names and shapes. This is crucial for policy configuration and debugging. The `transform_features` method defines how your processor transforms feature names and shapes. This is crucial for policy configuration and debugging.
Normalization typically preserves the feature keys and shapes, so `NormalizerProcessor.transform_features` returns the input features unchanged. When your processor renames or reshapes, implement this method to reflect the mapping for downstream components. For example, a simple rename processor:
```python ```python
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
"""Transform feature keys: old_key -> new_key"""
# Simple renaming # Simple renaming
if "pixels" in features: if "pixels" in features:
features["observation.image"] = features.pop("pixels") features["observation.image"] = features.pop("pixels")
@@ -171,37 +193,58 @@ def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, Poli
- Always return the modified features dictionary - Always return the modified features dictionary
- Document transformations clearly in the docstring - Document transformations clearly in the docstring
### Example of usage from the codebase
`transform_features` is used by `RobotProcessor` to derive the dataset/policy feature contract from an initial feature set by applying each step's transformation. You can see concrete examples in the codebase:
- Phone teleoperation record pipeline (`examples/phone_so100_record.py`): processors like `ForwardKinematicsJointsToEE`, `GripperVelocityToJoint`, and `EEBoundsAndSafety` implement `transform_features` to declare which action/observation keys should be materialized in the dataset.
- SO100 follower kinematics (`src/lerobot/robots/so100_follower/robot_kinematic_processor.py`): each processor's `transform_features` method adds or refines feature keys such as `observation.state.ee.{x,y,z,wx,wy,wz}` or `action.gripper.pos`.
- Rename and tokenizer processors (`src/lerobot/processor/rename_processor.py`, `src/lerobot/processor/tokenizer_processor.py`): demonstrate key renaming and adding language token features to the contract.
In practice, you will often aggregate features by running `RobotProcessor.transform_features(...)` with your initial features to compute the final contract before recording or training.
## Helper Classes ## Helper Classes
LeRobot provides pre-built processor classes for common transformations: LeRobot provides pre-built processor classes for common transformations. Below is a comprehensive list of registered processors in the codebase.
### Core Classes ### Core processors (observations, actions, normalization)
- **`VanillaObservationProcessor`** - Handles images and state observations - **`VanillaObservationProcessor`** (`observation_processor`): Images and state processing to LeRobot format.
- **`NormalizerProcessor`** - Normalizes data using dataset statistics (mean/std or min/max) - **`NormalizerProcessor`** (`normalizer_processor`): Normalize observations/actions (mean/std or min/max to [-1, 1]).
- **`UnnormalizerProcessor`** - Converts normalized values back to original ranges - **`UnnormalizerProcessor`** (`unnormalizer_processor`): Inverse of the normalizer for model outputs.
- **`DeviceProcessor`** (`device_processor`): Move tensors to a specific device (CPU/GPU) and optional float dtype.
- **`ToBatchProcessor`** (`to_batch_processor`): Add batch dimension to observations/actions when missing.
- **`RenameProcessor`** (`rename_processor`): Rename observation keys using a mapping dictionary.
- **`TokenizerProcessor`** (`tokenizer_processor`): Tokenize language tasks into `observation.language.*` tensors.
### Utility Classes ### Teleoperation mapping processors
- **`DeviceProcessor`** - Moves tensors to specified device (CPU/GPU) - **`MapDeltaActionToRobotAction`** (`map_delta_action_to_robot_action`): Map teleop deltas (e.g., gamepad) to `action.target_*` fields.
- **`ToBatchProcessor`** - Adds batch dimensions - **`MapPhoneActionToRobotAction`** (`map_phone_action_to_robot_action`): Map calibrated phone pose/buttons to `action.target_*` and gripper.
- **`RenameProcessor`** - Renames keys using a mapping dictionary
- **`TokenizerProcessor`** - Handles text tokenization for language-conditioned policies ### Robot kinematics processors (SO100 follower example)
- **`EEReferenceAndDelta`** (`ee_reference_and_delta`): Compute desired EE pose from target deltas and current pose.
- **`EEBoundsAndSafety`** (`ee_bounds_and_safety`): Clip EE pose to bounds and check for jumps.
- **`InverseKinematicsEEToJoints`** (`inverse_kinematics_ee_to_joints`): Convert EE pose to joint targets via IK.
- **`GripperVelocityToJoint`** (`gripper_velocity_to_joint`): Convert gripper velocity input to joint position command.
- **`ForwardKinematicsJointsToEE`** (`forward_kinematics_joints_to_ee`): Compute EE pose features from joint positions via FK.
- **`AddRobotObservationAsComplimentaryData`** (`add_robot_observation`): Read robot observation and insert `raw_joint_positions` into complementary data.
### Policy-specific utility processors
- **`Pi0NewLineProcessor`** (`pi0_new_line_processor`): Ensure text tasks end with a newline (Pi0 tokenizer compatibility).
- **`SmolVLANewLineProcessor`** (`smolvla_new_line_processor`): Ensure text tasks end with a newline (SmolVLA tokenizer compatibility).
### Usage Example ### Usage Example
```python ```python
from lerobot.processor import ( from lerobot.processor import NormalizerProcessor, DeviceProcessor, RobotProcessor, ToBatchProcessor
VanillaObservationProcessor,
NormalizerProcessor,
DeviceProcessor,
RobotProcessor
)
# Create a processing pipeline # Create a processing pipeline (typical policy preprocessor)
steps = [ steps = [
VanillaObservationProcessor(), # Process images and states
NormalizerProcessor(features=features, norm_map=norm_map, stats=stats), NormalizerProcessor(features=features, norm_map=norm_map, stats=stats),
ToBatchProcessor(),
DeviceProcessor(device="cuda"), DeviceProcessor(device="cuda"),
] ]
@@ -210,6 +253,39 @@ processor = RobotProcessor(steps=steps)
processed_transition = processor(raw_transition) processed_transition = processor(raw_transition)
``` ```
### Using overrides
You can override step parameters at load-time using `overrides`. This is handy for non-serializable objects or site-specific settings. It works both in policy factories and with `RobotProcessor.from_pretrained(...)`.
Example: during policy evaluation on the robot, override the device and rename map.
Use this to run a policy trained on CUDA on a CPU-only robot, or to remap camera keys when the robot uses different names than the dataset.
```437:445:src/lerobot/record.py
preprocessor, postprocessor = make_processor(
policy_cfg=cfg.policy,
pretrained_path=cfg.policy.pretrained_path,
dataset_stats=rename_stats(dataset.meta.stats, cfg.dataset.rename_map),
preprocessor_overrides={
"device_processor": {"device": cfg.policy.device},
"rename_processor": {"rename_map": cfg.dataset.rename_map},
},
)
```
Direct usage with `from_pretrained`:
```python
from lerobot.processor import RobotProcessor
processor = RobotProcessor.from_pretrained(
"username/my-processor",
overrides={
"device_processor": {"device": "cuda:0"}, # registry name for registered steps
"CustomStep": {"param": 42}, # class name for non-registered steps
},
)
```
## Best Practices ## Best Practices
- **Keep processors atomic** - One transformation per processor for reusability and debugging - **Keep processors atomic** - One transformation per processor for reusability and debugging
@@ -242,6 +318,6 @@ You now have all the tools to implement custom processors in LeRobot! The key st
3. **Integrate it** into a `RobotProcessor` pipeline with other processing steps 3. **Integrate it** into a `RobotProcessor` pipeline with other processing steps
4. **Use base classes** like `ObservationProcessor` when possible to reduce boilerplate 4. **Use base classes** like `ObservationProcessor` when possible to reduce boilerplate
The processor system is designed to be modular and composable, allowing you to build complex data processing pipelines from simple, focused components. Whether you're preprocessing sensor data for training or post-processing model outputs for robot execution, custom processors give you the flexibility to handle any data transformation your robotics application requires. The processor system is designed to be modular and composable, allowing you to build complex data processing pipelines from simple, focused components. Whether you're preprocessing sensor data for training or post-processing model outputs for robot execution, custom processors give you the flexibility to handle any data transformation your robotics application requires. Policies like Pi0 and SmolVLA use the same normalization processors described above, so your understanding here will transfer directly when wiring policy preprocessors and postprocessors.
Start simple, test thoroughly, and leverage the existing helper classes to build robust data processing pipelines for your robot learning workflows. Start simple, test thoroughly, and leverage the existing helper classes to build robust data processing pipelines for your robot learning workflows.