From 8d5f519fcbd63a6e0ebadd5b606c1eb63a98bd24 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Wed, 6 Aug 2025 17:41:39 +0200 Subject: [PATCH] Added script for the second part of the processor doc --- docs/source/implement_your_own_processor.mdx | 496 +++++++++++++++++++ docs/source/introduction_processor.mdx | 0 2 files changed, 496 insertions(+) create mode 100644 docs/source/implement_your_own_processor.mdx delete mode 100644 docs/source/introduction_processor.mdx diff --git a/docs/source/implement_your_own_processor.mdx b/docs/source/implement_your_own_processor.mdx new file mode 100644 index 000000000..b4a2126c2 --- /dev/null +++ b/docs/source/implement_your_own_processor.mdx @@ -0,0 +1,496 @@ +# Implement your processor + +In this tutorial, we will explain how to implement your own processor. We will start by motivating +the need for a custom processor and then we will explain the helper classes that can help you implement your own processor. + +## Why would you need a custom processor? + +In most cases, when reading raw data from a sensor like the camera and robot motor encoders, +you will need to process this data to transform it into a format that is compatible to use with the policies in LeRobot. +For example, raw images are encoded with `uint8` and the values are in the range `[0, 255]`. +To use these images with the policies, you will need to cast them to `float32` and normalize them to the range `[0, 1]`. + +For example, in LeRobot's `ImageProcessor`, raw images come from the environment as numpy arrays with `uint8` values in range `[0, 255]` and in channel-last format `(H, W, C)`. The processor transforms them into PyTorch tensors with `float32` values in range `[0, 1]` and channel-first format `(C, H, W)`: + +```python +# Input: numpy array with shape (480, 640, 3) and dtype uint8 +raw_image = env_observation["pixels"] # Values in [0, 255] + +# After processing: torch tensor with shape (1, 3, 480, 640) and dtype float32 +processed_image = processor(transition)["observation"]["observation.image"] # Values in [0, 1] +``` + +On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot. +For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minumum and maximum joint angle positions of the robot. + +For instance, in LeRobot's `UnnormalizerProcessor`, model outputs are in the normalized range `[-1, 1]` and need to be converted back to actual robot joint ranges: + +```python +# Input: model action with normalized values in [-1, 1] +normalized_action = torch.tensor([-0.5, 0.8, -1.0, 0.2]) # Model output + +# After post-processing: real joint positions in robot's native ranges +# Example: joints range from [-180.0, 180.0] +real_action = unnormalizer(transition)["action"] +# real action after post-processing: [ -90., 144., -180., 36.] +``` + +The unnormalizer uses the dataset statistics to convert back: +```python +# For MIN_MAX normalization: action = (normalized + 1) * (max - min) / 2 + min +real_action = (normalized_action + 1) * (max_val - min_val) / 2 + min_val +``` + +All this situation point us towards the need for a mechanism to preprocess the data before inputed to the policies and then post-process the action that are returend to be executed on the robot. + +To that end, LeRobot provides a pipeline mechanism to implement a sequence of processing steps for the input data and the output action. + +## How to implement your own processor? + +Prepare the sequence of processing steps necessary for your problem. A processor step is a class that implements the following methods: + +- `__call__`: implements the processing step for the input transition. +- `get_config`: gets the configuration of the processor step. +- `state_dict`: gets the state of the processor step. +- `load_state_dict`: loads the state of the processor step. +- `reset`: resets the state of the processor step. +- `feature_contract`: displays the modification to the feature space during the processor step. + +### Implement the `__call__` method + +The `__call__` method is the core of your processor step. It takes an `EnvTransition` and returns a modified `EnvTransition`. Here's a real example from LeRobot's `ImageProcessor`: + +```python +from dataclasses import dataclass +import torch +import einops +from lerobot.processor.pipeline import EnvTransition, TransitionKey + +@dataclass +class ImageProcessor: + def __call__(self, transition: EnvTransition) -> EnvTransition: + observation = transition.get(TransitionKey.OBSERVATION) + + if observation is None: + return transition + + processed_obs = {} + + # Copy all observations first + for key, value in observation.items(): + processed_obs[key] = value + + # Handle pixels key if present + pixels = observation.get("pixels") + if pixels is not None: + # Remove pixels from processed_obs since we'll replace it with processed images + processed_obs.pop("pixels", None) + + # Process the image + processed_img = self._process_single_image(pixels) + processed_obs["observation.image"] = processed_img + + # Return new transition with processed observation + new_transition = transition.copy() + new_transition[TransitionKey.OBSERVATION] = processed_obs + return new_transition + + def _process_single_image(self, img): + # Convert to tensor + img_tensor = torch.from_numpy(img) + + # Add batch dimension if needed + if img_tensor.ndim == 3: + img_tensor = img_tensor.unsqueeze(0) + + # Convert to channel-first format: (B, H, W, C) -> (B, C, H, W) + img_tensor = einops.rearrange(img_tensor, "b h w c -> b c h w").contiguous() + + # Convert to float32 and normalize to [0, 1] + img_tensor = img_tensor.type(torch.float32) / 255.0 + + return img_tensor +``` + +Key principles for implementing `__call__`: +- Always check if the required data exists (observations, actions, etc.) +- Return the original transition unchanged if no processing is needed +- Create a copy of the transition to avoid side effects +- Only modify the specific keys your processor is responsible for +### Configuration and State Management + +LeRobot processors support serialization and deserialization through three key methods. Here's how they work using `NormalizerProcessor` as an example: + +#### `get_config()` - Serializable Configuration +This method returns all non-tensor configuration that can be saved to JSON: + +```python +@dataclass +class NormalizerProcessor: + features: dict[str, PolicyFeature] + norm_map: dict[FeatureType, NormalizationMode] + normalize_keys: set[str] | None = None + eps: float = 1e-8 + + def get_config(self) -> dict[str, Any]: + """Return JSON-serializable configuration.""" + return { + "features": {k: {"type": v.type.value, "shape": v.shape} for k, v in self.features.items()}, + "norm_map": {k.value: v.value for k, v in self.norm_map.items()}, + "normalize_keys": list(self.normalize_keys) if self.normalize_keys else None, + "eps": self.eps, + # Note: 'stats' is not included as it contains tensors + } +``` + +#### `state_dict()` - Tensor State +This method returns only PyTorch tensors that need special serialization: + +```python +def state_dict(self) -> dict[str, torch.Tensor]: + """Return tensor state dictionary.""" + state = {} + for key, stats in self._tensor_stats.items(): + for stat_name, tensor_val in stats.items(): + state[f"{key}.{stat_name}"] = tensor_val + return state +``` + +#### `load_state_dict()` - Restore Tensor State +This method restores the tensor state from a saved state dictionary: + +```python +def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: + """Load tensor state from state dictionary.""" + # Reconstruct _tensor_stats from flat state dict + self._tensor_stats = {} + for full_key, tensor_val in state.items(): + if "." in full_key: + key, stat_name = full_key.rsplit(".", 1) + if key not in self._tensor_stats: + self._tensor_stats[key] = {} + self._tensor_stats[key][stat_name] = tensor_val +``` + +#### Usage Example +```python +# Save processor +config = processor.get_config() +tensors = processor.state_dict() + +# Later, restore processor +new_processor = NormalizerProcessor(**config) +new_processor.load_state_dict(tensors) +``` + +### Feature Contract + +The `feature_contract` method defines how your processor transforms the feature space. It tells the system how input feature names and shapes change after processing. This is crucial for policy configuration and debugging. + +Here's an example from `ImageProcessor`: + +```python +def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + """Transforms: + pixels -> observation.image, + observation.pixels -> observation.image, + pixels. -> observation.images., + observation.pixels. -> observation.images. + """ + # Handle simple pixel renaming + if "pixels" in features: + features["observation.image"] = features.pop("pixels") + if "observation.pixels" in features: + features["observation.image"] = features.pop("observation.pixels") + + # Handle camera-specific pixels + prefixes = ("pixels.", "observation.pixels.") + for key in list(features.keys()): + for p in prefixes: + if key.startswith(p): + suffix = key[len(p):] + features[f"observation.images.{suffix}"] = features.pop(key) + break + return features +``` + +And from `StateProcessor`: + +```python +def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + """Transforms: + environment_state -> observation.environment_state, + agent_pos -> observation.state + """ + pairs = ( + ("environment_state", "observation.environment_state"), + ("agent_pos", "observation.state"), + ) + for old, new in pairs: + if old in features: + features[new] = features.pop(old) + prefixed = f"observation.{old}" + if prefixed in features: + features[new] = features.pop(prefixed) + return features +``` + +**Key principles:** +- Use `features.pop(old_key)` to remove the old feature and get its value +- Use `features[new_key] = old_feature` to add the new feature with same properties +- Always return the modified features dictionary +- Document the transformations clearly in the docstring + +## Helper Classes + +LeRobot provides several pre-built processor classes that handle common transformations, which you can use directly or as building blocks for your custom processors. + +### Core Processing Classes + +- **`ImageProcessor`** - Converts images from numpy arrays (uint8, channel-last) to PyTorch tensors (float32, channel-first) +- **`StateProcessor`** - Handles state observations, converting numpy arrays to tensors and renaming keys +- **`NormalizerProcessor`** - Normalizes observations and actions using dataset statistics (mean/std or min/max) +- **`UnnormalizerProcessor`** - Inverse of NormalizerProcessor, converts normalized values back to original ranges + +### Utility Classes + +- **`DeviceProcessor`** - Moves tensors to specified device (CPU/GPU) +- **`ToBatchProcessor`** - Adds batch dimensions to observations and actions +- **`RenameProcessor`** - Renames keys in observations using a mapping dictionary +- **`TokenizerProcessor`** - Handles text tokenization for language-conditioned policies + +### Composition Classes + +- **`VanillaObservationProcessor`** - Combines ImageProcessor and StateProcessor for complete observation handling +- **`RobotProcessor`** - Main container that orchestrates a sequence of processor steps + +### Example Usage + +```python +from lerobot.processor import ( + ImageProcessor, + NormalizerProcessor, + DeviceProcessor, + RobotProcessor +) + +# Create a processing pipeline +preprocessing_steps = [ + ImageProcessor(), # Convert images + NormalizerProcessor(features=features, norm_map=norm_map, stats=dataset_stats), # Normalize + DeviceProcessor(device="cuda"), # Move to GPU +] + +# Combine into a robot processor +preprocessor = RobotProcessor(steps=preprocessing_steps, name="my_preprocessor") + +# Use it +processed_transition = preprocessor(raw_transition) +``` + +These helper classes implement the full `ProcessorStep` protocol, so they can be easily combined and extended for your specific needs. + +## Best Practices + +### Design Principles + +- **Keep processors atomic** - Each processor should handle one specific transformation. This makes them more reusable and easier to debug. +- **Use dataclasses** - Always implement processors as dataclasses for clean initialization and automatic generation of `__init__`, `__repr__`, etc. + +```python +@dataclass +class MyProcessor: + threshold: float = 0.5 + normalize: bool = True +``` + +### Registration and Discovery + +- **Always register your processor** - Use the `@ProcessorStepRegistry.register()` decorator to make your processor discoverable: + +```python +@ProcessorStepRegistry.register("my_custom_processor") +@dataclass +class MyCustomProcessor: + # Implementation +``` + +### State Management + +- **Separate concerns in config vs state_dict** - Keep non-tensor configuration in `get_config()` and only tensors in `state_dict()`: + +```python +def get_config(self) -> dict[str, Any]: + # JSON-serializable data only + return {"threshold": self.threshold, "mode": self.mode} + +def state_dict(self) -> dict[str, torch.Tensor]: + # Tensors only + return {"running_mean": self.running_mean, "weights": self.weights} +``` + +### Safety and Robustness + +- **Always check for None values** - Validate that required data exists before processing: + +```python +def __call__(self, transition: EnvTransition) -> EnvTransition: + observation = transition.get(TransitionKey.OBSERVATION) + if observation is None: + return transition # Return unchanged if no observation +``` + +- **Use copy() for safety** - If performance is not critical, copy transitions to avoid side effects: + +```python +new_transition = transition.copy() +new_transition[TransitionKey.OBSERVATION] = processed_obs +return new_transition +``` + +### Debugging and Development + +- **Use hooks for debugging** - RobotProcessor supports hooks for monitoring: + +```python +def debug_hook(step_idx: int, transition: EnvTransition) -> None: + print(f"Step {step_idx}: {list(transition.keys())}") + +processor = RobotProcessor(steps=[...], before_step_hooks=[debug_hook]) +``` + +- **Use step_through() for development** - Iterate through processing steps one by one: + +```python +for step_idx, transition in processor.step_through(data): + print(f"After step {step_idx}: {transition}") +``` + +### Performance Considerations + +- **Batch processing** - Design processors to handle batched data efficiently when possible +- **Device awareness** - Let `DeviceProcessor` handle device placement rather than hardcoding it +- **Memory efficiency** - Reuse tensors when safe to do so for better performance + +### Documentation + +- **Document feature contracts clearly** - Be explicit about how your processor transforms the feature space +- **Provide usage examples** - Include docstring examples showing typical usage patterns + +## Complete Example: Custom Smoothing Processor + +Here's a complete example that implements a simple action smoothing processor: + +```python +from dataclasses import dataclass, field +from typing import Any +import torch +from torch import Tensor + +from lerobot.processor.pipeline import ( + EnvTransition, + ProcessorStepRegistry, + TransitionKey +) +from lerobot.configs.types import PolicyFeature + +@ProcessorStepRegistry.register("action_smoother") +@dataclass +class ActionSmoothingProcessor: + """Smooths actions using exponential moving average. + + This processor maintains a running average of actions to reduce jitter + from the policy predictions. + """ + + # Configuration + alpha: float = 0.7 # Smoothing factor (0 = no smoothing, 1 = no memory) + + # State (not in config) + _previous_action: Tensor | None = field(default=None, init=False, repr=False) + _initialized: bool = field(default=False, init=False, repr=False) + + def __call__(self, transition: EnvTransition) -> EnvTransition: + action = transition.get(TransitionKey.ACTION) + + if action is None: + return transition + + # Convert to tensor if needed + if not isinstance(action, torch.Tensor): + action = torch.as_tensor(action, dtype=torch.float32) + + # Initialize on first call + if not self._initialized: + self._previous_action = action.clone() + self._initialized = True + smoothed_action = action + else: + # Exponential moving average: new = alpha * current + (1-alpha) * previous + smoothed_action = self.alpha * action + (1 - self.alpha) * self._previous_action + self._previous_action = smoothed_action.clone() + + # Return new transition with smoothed action + new_transition = transition.copy() + new_transition[TransitionKey.ACTION] = smoothed_action + return new_transition + + def get_config(self) -> dict[str, Any]: + """Return JSON-serializable configuration.""" + return {"alpha": self.alpha} + + def state_dict(self) -> dict[str, torch.Tensor]: + """Return tensor state.""" + if self._previous_action is not None: + return {"previous_action": self._previous_action} + return {} + + def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: + """Load tensor state.""" + if "previous_action" in state: + self._previous_action = state["previous_action"] + self._initialized = True + else: + self._previous_action = None + self._initialized = False + + def reset(self) -> None: + """Reset processor state at episode boundaries.""" + self._previous_action = None + self._initialized = False + + def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + """Action shapes remain unchanged.""" + return features # No transformation to feature space +``` + +### Usage Example + +```python +from lerobot.processor import RobotProcessor, DeviceProcessor + +# Create a postprocessing pipeline with action smoothing +postprocessing_steps = [ + DeviceProcessor(device="cpu"), # Move to CPU first + ActionSmoothingProcessor(alpha=0.8), # Apply smoothing +] + +postprocessor = RobotProcessor( + steps=postprocessing_steps, + name="smooth_postprocessor" +) + +# Use in your policy inference loop +for transition in environment_transitions: + # Get action from policy + action = policy(transition) + + # Post-process (including smoothing) + transition_with_action = {"action": action} + smoothed_transition = postprocessor(transition_with_action) + + # Execute smoothed action + next_obs = env.step(smoothed_transition["action"]) +``` + +This example demonstrates all the key concepts: processor registration, state management, configuration serialization, and proper integration with the LeRobot pipeline system. diff --git a/docs/source/introduction_processor.mdx b/docs/source/introduction_processor.mdx deleted file mode 100644 index e69de29bb..000000000