mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-15 08:39:49 +00:00
2a5c757d58
- Use normalization processor as default example - Add section on transform features - Add section on overrides.
324 lines
16 KiB
Plaintext
324 lines
16 KiB
Plaintext
# Implement your own Robot Processor
|
|
|
|
In this tutorial, you'll learn how to implement your own Robot Processor.
|
|
It begins by exploring the need for a custom processor, then uses the Normalization processors as the running example to explain how to implement, configure, and serialize a processor. Finally, it lists all helper processors that ship with LeRobot.
|
|
|
|
## Why would you need a custom processor?
|
|
|
|
In most cases, when reading raw data from a sensor like the camera and robot motor encoders,
|
|
you will need to process this data to transform it into a format that is compatible to use with the policies in LeRobot.
|
|
For example, raw images are encoded with `uint8` and the values are in the range `[0, 255]`.
|
|
To use these images with the policies, you will need to cast them to `float32` and normalize them to the range `[0, 1]`.
|
|
|
|
For example, in LeRobot's `VanillaObservationProcessor`, raw images come from the environment as numpy arrays with `uint8` values in range `[0, 255]` and in channel-last format `(H, W, C)`. The processor transforms them into PyTorch tensors with `float32` values in range `[0, 1]` and channel-first format `(C, H, W)`:
|
|
|
|
```python
|
|
# Input: numpy array with shape (480, 640, 3) and dtype uint8
|
|
raw_image = env_observation["pixels"] # Values in [0, 255]
|
|
|
|
# After processing: torch tensor with shape (1, 3, 480, 640) and dtype float32
|
|
processed_image = processor(transition)["observation"]["observation.image"] # Values in [0, 1]
|
|
```
|
|
|
|
On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot.
|
|
For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minimum and maximum joint angle positions of the robot.
|
|
|
|
In LeRobot, this normalization workflow is handled by the `NormalizerProcessor` (for inputs) and the `UnnormalizerProcessor` (for outputs). These processors are heavily used by policies (e.g., Pi0, SmolVLA) and integrate tightly with the `RobotProcessor`'s `get_config`, `state_dict`, and `load_state_dict` APIs.
|
|
|
|
For instance, `UnnormalizerProcessor` converts model outputs in `[-1, 1]` back to actual robot joint ranges:
|
|
|
|
```python
|
|
# Input: model action with normalized values in [-1, 1]
|
|
normalized_action = torch.tensor([-0.5, 0.8, -1.0, 0.2]) # Model output
|
|
|
|
# After post-processing: real joint positions in robot's native ranges
|
|
# Example: joints range from [-180.0, 180.0]
|
|
real_action = unnormalizer(transition)["action"]
|
|
# real action after post-processing: [ -90., 144., -180., 36.]
|
|
```
|
|
|
|
The unnormalizer uses the dataset statistics to convert back:
|
|
|
|
```python
|
|
# For MIN_MAX normalization: action = (normalized + 1) * (max - min) / 2 + min
|
|
real_action = (normalized_action + 1) * (max_val - min_val) / 2 + min_val
|
|
```
|
|
|
|
All these situations point us towards the need for a mechanism to preprocess the data before being passed to the policies and then post-process the action that are returned to be executed on the robot.
|
|
|
|
To that end, LeRobot provides a pipeline mechanism to implement a sequence of processing steps for the input data and the output action.
|
|
|
|
## How to implement your own processor?
|
|
|
|
We'll use the `NormalizerProcessor` as a concrete running example because it is central to most policies and demonstrates configuration and state serialization cleanly.
|
|
|
|
Prepare the sequence of processing steps necessary for your problem. A processor step is a class that implements the following methods:
|
|
|
|
- `__call__`: implements the processing step for the input transition.
|
|
- `get_config`: gets the configuration of the processor step.
|
|
- `state_dict`: gets the state of the processor step.
|
|
- `load_state_dict`: loads the state of the processor step.
|
|
- `reset`: resets the state of the processor step.
|
|
- `feature_contract`: displays the modification to the feature space during the processor step.
|
|
|
|
### Implement the `__call__` method
|
|
|
|
The `__call__` method is the core of your processor step. It takes an `EnvTransition` and returns a modified `EnvTransition`. Here's how the `NormalizerProcessor` conceptually works (simplified):
|
|
|
|
```python
|
|
from dataclasses import dataclass
|
|
import torch
|
|
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
|
from lerobot.processor.pipeline import EnvTransition, TransitionKey
|
|
|
|
@dataclass
|
|
class NormalizerProcessor:
|
|
features: dict[str, PolicyFeature]
|
|
norm_map: dict[FeatureType, NormalizationMode]
|
|
stats: dict[str, dict[str, torch.Tensor]]
|
|
eps: float = 1e-8
|
|
|
|
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
|
normalized_info = {}
|
|
|
|
obs = transition.get(TransitionKey.OBSERVATION)
|
|
act = transition.get(TransitionKey.ACTION)
|
|
|
|
new_obs = self._normalize_observation(obs, normalized_info)
|
|
new_act = self._normalize_action(act, normalized_info)
|
|
|
|
new_transition = transition.copy()
|
|
new_transition[TransitionKey.OBSERVATION] = new_obs
|
|
new_transition[TransitionKey.ACTION] = new_act
|
|
|
|
# Record what was normalized into complementary_data
|
|
if normalized_info:
|
|
comp = new_transition.get(TransitionKey.COMPLEMENTARY_DATA) or {}
|
|
comp = dict(comp)
|
|
comp["normalized_keys"] = normalized_info
|
|
new_transition[TransitionKey.COMPLEMENTARY_DATA] = comp
|
|
|
|
return new_transition
|
|
```
|
|
|
|
See the full implementation in `src/lerobot/processor/normalize_processor.py` for details on mean/std and min/max modes and key selection.
|
|
|
|
**Key principles:**
|
|
|
|
- Always check if required data exists before processing
|
|
- Return unchanged transition if no processing is needed
|
|
- Use `transition.copy()` to avoid side effects
|
|
- Only modify the specific keys your processor handles
|
|
|
|
**Tip**: For observation-only processors, you can inherit from `ObservationProcessor` to avoid writing `__call__` boilerplate. The normalizer is mixed (observations and actions), so it implements `__call__` directly.
|
|
|
|
### Configuration and State Management
|
|
|
|
Processors support serialization through three methods that separate configuration from tensor state. This is especially important for normalization processors, which carry dataset statistics (tensors) in their state, and hyperparameters in their config:
|
|
|
|
```python
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
import torch
|
|
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
|
|
|
@dataclass
|
|
class NormalizerProcessor:
|
|
features: dict[str, PolicyFeature]
|
|
norm_map: dict[FeatureType, NormalizationMode]
|
|
eps: float = 1e-8
|
|
_tensor_stats: dict[str, dict[str, torch.Tensor]] = field(default_factory=dict, init=False, repr=False)
|
|
|
|
def get_config(self) -> dict[str, Any]:
|
|
"""JSON-serializable configuration (no tensors)."""
|
|
return {
|
|
"eps": self.eps,
|
|
"features": {k: {"type": v.type.value, "shape": v.shape} for k, v in self.features.items()},
|
|
"norm_map": {ft.value: nm.value for ft, nm in self.norm_map.items()},
|
|
}
|
|
|
|
def state_dict(self) -> dict[str, torch.Tensor]:
|
|
"""Tensor state only (e.g., dataset statistics)."""
|
|
flat: dict[str, torch.Tensor] = {}
|
|
for key, sub in self._tensor_stats.items():
|
|
for stat_name, tensor in sub.items():
|
|
flat[f"{key}.{stat_name}"] = tensor
|
|
return flat
|
|
|
|
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
|
"""Restore tensor state at runtime."""
|
|
self._tensor_stats.clear()
|
|
for flat_key, tensor in state.items():
|
|
key, stat_name = flat_key.rsplit(".", 1)
|
|
self._tensor_stats.setdefault(key, {})[stat_name] = tensor
|
|
```
|
|
|
|
**Usage:**
|
|
|
|
```python
|
|
# Save (e.g., inside a policy)
|
|
config = processor.get_config()
|
|
tensors = processor.state_dict()
|
|
|
|
# Restore (e.g., loading a pretrained policy)
|
|
new_processor = NormalizerProcessor(**config)
|
|
new_processor.load_state_dict(tensors)
|
|
```
|
|
|
|
### Transform features
|
|
|
|
The `transform_features` method defines how your processor transforms feature names and shapes. This is crucial for policy configuration and debugging.
|
|
|
|
Normalization typically preserves the feature keys and shapes, so `NormalizerProcessor.transform_features` returns the input features unchanged. When your processor renames or reshapes, implement this method to reflect the mapping for downstream components. For example, a simple rename processor:
|
|
|
|
```python
|
|
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
|
# Simple renaming
|
|
if "pixels" in features:
|
|
features["observation.image"] = features.pop("pixels")
|
|
|
|
# Pattern-based renaming
|
|
for key in list(features.keys()):
|
|
if key.startswith("env_state."):
|
|
suffix = key[len("env_state."):]
|
|
features[f"observation.{suffix}"] = features.pop(key)
|
|
|
|
return features
|
|
```
|
|
|
|
**Key principles:**
|
|
|
|
- Use `features.pop(old_key)` to remove and get the old feature
|
|
- Use `features[new_key] = old_feature` to add the renamed feature
|
|
- Always return the modified features dictionary
|
|
- Document transformations clearly in the docstring
|
|
|
|
### Example of usage from the codebase
|
|
|
|
`transform_features` is used by `RobotProcessor` to derive the dataset/policy feature contract from an initial feature set by applying each step's transformation. You can see concrete examples in the codebase:
|
|
|
|
- Phone teleoperation record pipeline (`examples/phone_so100_record.py`): processors like `ForwardKinematicsJointsToEE`, `GripperVelocityToJoint`, and `EEBoundsAndSafety` implement `transform_features` to declare which action/observation keys should be materialized in the dataset.
|
|
- SO100 follower kinematics (`src/lerobot/robots/so100_follower/robot_kinematic_processor.py`): each processor's `transform_features` method adds or refines feature keys such as `observation.state.ee.{x,y,z,wx,wy,wz}` or `action.gripper.pos`.
|
|
- Rename and tokenizer processors (`src/lerobot/processor/rename_processor.py`, `src/lerobot/processor/tokenizer_processor.py`): demonstrate key renaming and adding language token features to the contract.
|
|
|
|
In practice, you will often aggregate features by running `RobotProcessor.transform_features(...)` with your initial features to compute the final contract before recording or training.
|
|
|
|
## Helper Classes
|
|
|
|
LeRobot provides pre-built processor classes for common transformations. Below is a comprehensive list of registered processors in the codebase.
|
|
|
|
### Core processors (observations, actions, normalization)
|
|
|
|
- **`VanillaObservationProcessor`** (`observation_processor`): Images and state processing to LeRobot format.
|
|
- **`NormalizerProcessor`** (`normalizer_processor`): Normalize observations/actions (mean/std or min/max to [-1, 1]).
|
|
- **`UnnormalizerProcessor`** (`unnormalizer_processor`): Inverse of the normalizer for model outputs.
|
|
- **`DeviceProcessor`** (`device_processor`): Move tensors to a specific device (CPU/GPU) and optional float dtype.
|
|
- **`ToBatchProcessor`** (`to_batch_processor`): Add batch dimension to observations/actions when missing.
|
|
- **`RenameProcessor`** (`rename_processor`): Rename observation keys using a mapping dictionary.
|
|
- **`TokenizerProcessor`** (`tokenizer_processor`): Tokenize language tasks into `observation.language.*` tensors.
|
|
|
|
### Teleoperation mapping processors
|
|
|
|
- **`MapDeltaActionToRobotAction`** (`map_delta_action_to_robot_action`): Map teleop deltas (e.g., gamepad) to `action.target_*` fields.
|
|
- **`MapPhoneActionToRobotAction`** (`map_phone_action_to_robot_action`): Map calibrated phone pose/buttons to `action.target_*` and gripper.
|
|
|
|
### Robot kinematics processors (SO100 follower example)
|
|
|
|
- **`EEReferenceAndDelta`** (`ee_reference_and_delta`): Compute desired EE pose from target deltas and current pose.
|
|
- **`EEBoundsAndSafety`** (`ee_bounds_and_safety`): Clip EE pose to bounds and check for jumps.
|
|
- **`InverseKinematicsEEToJoints`** (`inverse_kinematics_ee_to_joints`): Convert EE pose to joint targets via IK.
|
|
- **`GripperVelocityToJoint`** (`gripper_velocity_to_joint`): Convert gripper velocity input to joint position command.
|
|
- **`ForwardKinematicsJointsToEE`** (`forward_kinematics_joints_to_ee`): Compute EE pose features from joint positions via FK.
|
|
- **`AddRobotObservationAsComplimentaryData`** (`add_robot_observation`): Read robot observation and insert `raw_joint_positions` into complementary data.
|
|
|
|
### Policy-specific utility processors
|
|
|
|
- **`Pi0NewLineProcessor`** (`pi0_new_line_processor`): Ensure text tasks end with a newline (Pi0 tokenizer compatibility).
|
|
- **`SmolVLANewLineProcessor`** (`smolvla_new_line_processor`): Ensure text tasks end with a newline (SmolVLA tokenizer compatibility).
|
|
|
|
### Usage Example
|
|
|
|
```python
|
|
from lerobot.processor import NormalizerProcessor, DeviceProcessor, RobotProcessor, ToBatchProcessor
|
|
|
|
# Create a processing pipeline (typical policy preprocessor)
|
|
steps = [
|
|
NormalizerProcessor(features=features, norm_map=norm_map, stats=stats),
|
|
ToBatchProcessor(),
|
|
DeviceProcessor(device="cuda"),
|
|
]
|
|
|
|
# Use in RobotProcessor
|
|
processor = RobotProcessor(steps=steps)
|
|
processed_transition = processor(raw_transition)
|
|
```
|
|
|
|
### Using overrides
|
|
|
|
You can override step parameters at load-time using `overrides`. This is handy for non-serializable objects or site-specific settings. It works both in policy factories and with `RobotProcessor.from_pretrained(...)`.
|
|
|
|
Example: during policy evaluation on the robot, override the device and rename map.
|
|
Use this to run a policy trained on CUDA on a CPU-only robot, or to remap camera keys when the robot uses different names than the dataset.
|
|
|
|
```437:445:src/lerobot/record.py
|
|
preprocessor, postprocessor = make_processor(
|
|
policy_cfg=cfg.policy,
|
|
pretrained_path=cfg.policy.pretrained_path,
|
|
dataset_stats=rename_stats(dataset.meta.stats, cfg.dataset.rename_map),
|
|
preprocessor_overrides={
|
|
"device_processor": {"device": cfg.policy.device},
|
|
"rename_processor": {"rename_map": cfg.dataset.rename_map},
|
|
},
|
|
)
|
|
```
|
|
|
|
Direct usage with `from_pretrained`:
|
|
|
|
```python
|
|
from lerobot.processor import RobotProcessor
|
|
|
|
processor = RobotProcessor.from_pretrained(
|
|
"username/my-processor",
|
|
overrides={
|
|
"device_processor": {"device": "cuda:0"}, # registry name for registered steps
|
|
"CustomStep": {"param": 42}, # class name for non-registered steps
|
|
},
|
|
)
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
- **Keep processors atomic** - One transformation per processor for reusability and debugging
|
|
- **Use dataclasses** - Clean initialization with `@dataclass`
|
|
- **Always register processors** - Use `@ProcessorStepRegistry.register("name")` for discoverability
|
|
- **Check for None** - Always validate required data exists before processing
|
|
- **Use copy() for safety** - Avoid side effects with `transition.copy()`
|
|
- **Separate config and state** - JSON-serializable config vs tensor state_dict
|
|
- **Use base classes** - Inherit from `ObservationProcessor` for observation-only processing
|
|
|
|
```python
|
|
@ProcessorStepRegistry.register("my_processor")
|
|
@dataclass
|
|
class MyProcessor(ObservationProcessor):
|
|
threshold: float = 0.5
|
|
|
|
def observation(self, observation):
|
|
if observation is None:
|
|
return observation
|
|
# Your processing logic here
|
|
return processed_observation
|
|
```
|
|
|
|
## Conclusion
|
|
|
|
You now have all the tools to implement custom processors in LeRobot! The key steps are:
|
|
|
|
1. **Define your processor** as a dataclass with the required methods (`__call__`, `get_config`, `state_dict`, `load_state_dict`, `reset`, `feature_contract`)
|
|
2. **Register it** using `@ProcessorStepRegistry.register("name")` for discoverability
|
|
3. **Integrate it** into a `RobotProcessor` pipeline with other processing steps
|
|
4. **Use base classes** like `ObservationProcessor` when possible to reduce boilerplate
|
|
|
|
The processor system is designed to be modular and composable, allowing you to build complex data processing pipelines from simple, focused components. Whether you're preprocessing sensor data for training or post-processing model outputs for robot execution, custom processors give you the flexibility to handle any data transformation your robotics application requires. Policies like Pi0 and SmolVLA use the same normalization processors described above, so your understanding here will transfer directly when wiring policy preprocessors and postprocessors.
|
|
|
|
Start simple, test thoroughly, and leverage the existing helper classes to build robust data processing pipelines for your robot learning workflows.
|