# Implement your processor In this tutorial, we will explain how to implement your own processor. We will start by motivating the need for a custom processor and then we will explain the helper classes that can help you implement your own processor. ## Why would you need a custom processor? In most cases, when reading raw data from a sensor like the camera and robot motor encoders, you will need to process this data to transform it into a format that is compatible to use with the policies in LeRobot. For example, raw images are encoded with `uint8` and the values are in the range `[0, 255]`. To use these images with the policies, you will need to cast them to `float32` and normalize them to the range `[0, 1]`. For example, in LeRobot's `ImageProcessor`, raw images come from the environment as numpy arrays with `uint8` values in range `[0, 255]` and in channel-last format `(H, W, C)`. The processor transforms them into PyTorch tensors with `float32` values in range `[0, 1]` and channel-first format `(C, H, W)`: ```python # Input: numpy array with shape (480, 640, 3) and dtype uint8 raw_image = env_observation["pixels"] # Values in [0, 255] # After processing: torch tensor with shape (1, 3, 480, 640) and dtype float32 processed_image = processor(transition)["observation"]["observation.image"] # Values in [0, 1] ``` On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot. For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minumum and maximum joint angle positions of the robot. For instance, in LeRobot's `UnnormalizerProcessor`, model outputs are in the normalized range `[-1, 1]` and need to be converted back to actual robot joint ranges: ```python # Input: model action with normalized values in [-1, 1] normalized_action = torch.tensor([-0.5, 0.8, -1.0, 0.2]) # Model output # After post-processing: real joint positions in robot's native ranges # Example: joints range from [-180.0, 180.0] real_action = unnormalizer(transition)["action"] # real action after post-processing: [ -90., 144., -180., 36.] ``` The unnormalizer uses the dataset statistics to convert back: ```python # For MIN_MAX normalization: action = (normalized + 1) * (max - min) / 2 + min real_action = (normalized_action + 1) * (max_val - min_val) / 2 + min_val ``` All this situation point us towards the need for a mechanism to preprocess the data before inputed to the policies and then post-process the action that are returend to be executed on the robot. To that end, LeRobot provides a pipeline mechanism to implement a sequence of processing steps for the input data and the output action. ## How to implement your own processor? Prepare the sequence of processing steps necessary for your problem. A processor step is a class that implements the following methods: - `__call__`: implements the processing step for the input transition. - `get_config`: gets the configuration of the processor step. - `state_dict`: gets the state of the processor step. - `load_state_dict`: loads the state of the processor step. - `reset`: resets the state of the processor step. - `feature_contract`: displays the modification to the feature space during the processor step. ### Implement the `__call__` method The `__call__` method is the core of your processor step. It takes an `EnvTransition` and returns a modified `EnvTransition`. Here's a real example from LeRobot's `ImageProcessor`: ```python from dataclasses import dataclass import torch import einops from lerobot.processor.pipeline import EnvTransition, TransitionKey @dataclass class ImageProcessor: def __call__(self, transition: EnvTransition) -> EnvTransition: observation = transition.get(TransitionKey.OBSERVATION) if observation is None: return transition processed_obs = {} # Copy all observations first for key, value in observation.items(): processed_obs[key] = value # Handle pixels key if present pixels = observation.get("pixels") if pixels is not None: # Remove pixels from processed_obs since we'll replace it with processed images processed_obs.pop("pixels", None) # Process the image processed_img = self._process_single_image(pixels) processed_obs["observation.image"] = processed_img # Return new transition with processed observation new_transition = transition.copy() new_transition[TransitionKey.OBSERVATION] = processed_obs return new_transition def _process_single_image(self, img): # Convert to tensor img_tensor = torch.from_numpy(img) # Add batch dimension if needed if img_tensor.ndim == 3: img_tensor = img_tensor.unsqueeze(0) # Convert to channel-first format: (B, H, W, C) -> (B, C, H, W) img_tensor = einops.rearrange(img_tensor, "b h w c -> b c h w").contiguous() # Convert to float32 and normalize to [0, 1] img_tensor = img_tensor.type(torch.float32) / 255.0 return img_tensor ``` Key principles for implementing `__call__`: - Always check if the required data exists (observations, actions, etc.) - Return the original transition unchanged if no processing is needed - Create a copy of the transition to avoid side effects - Only modify the specific keys your processor is responsible for ### Configuration and State Management LeRobot processors support serialization and deserialization through three key methods. Here's how they work using `NormalizerProcessor` as an example: #### `get_config()` - Serializable Configuration This method returns all non-tensor configuration that can be saved to JSON: ```python @dataclass class NormalizerProcessor: features: dict[str, PolicyFeature] norm_map: dict[FeatureType, NormalizationMode] normalize_keys: set[str] | None = None eps: float = 1e-8 def get_config(self) -> dict[str, Any]: """Return JSON-serializable configuration.""" return { "features": {k: {"type": v.type.value, "shape": v.shape} for k, v in self.features.items()}, "norm_map": {k.value: v.value for k, v in self.norm_map.items()}, "normalize_keys": list(self.normalize_keys) if self.normalize_keys else None, "eps": self.eps, # Note: 'stats' is not included as it contains tensors } ``` #### `state_dict()` - Tensor State This method returns only PyTorch tensors that need special serialization: ```python def state_dict(self) -> dict[str, torch.Tensor]: """Return tensor state dictionary.""" state = {} for key, stats in self._tensor_stats.items(): for stat_name, tensor_val in stats.items(): state[f"{key}.{stat_name}"] = tensor_val return state ``` #### `load_state_dict()` - Restore Tensor State This method restores the tensor state from a saved state dictionary: ```python def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: """Load tensor state from state dictionary.""" # Reconstruct _tensor_stats from flat state dict self._tensor_stats = {} for full_key, tensor_val in state.items(): if "." in full_key: key, stat_name = full_key.rsplit(".", 1) if key not in self._tensor_stats: self._tensor_stats[key] = {} self._tensor_stats[key][stat_name] = tensor_val ``` #### Usage Example ```python # Save processor config = processor.get_config() tensors = processor.state_dict() # Later, restore processor new_processor = NormalizerProcessor(**config) new_processor.load_state_dict(tensors) ``` ### Feature Contract The `feature_contract` method defines how your processor transforms the feature space. It tells the system how input feature names and shapes change after processing. This is crucial for policy configuration and debugging. Here's an example from `ImageProcessor`: ```python def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: """Transforms: pixels -> observation.image, observation.pixels -> observation.image, pixels. -> observation.images., observation.pixels. -> observation.images. """ # Handle simple pixel renaming if "pixels" in features: features["observation.image"] = features.pop("pixels") if "observation.pixels" in features: features["observation.image"] = features.pop("observation.pixels") # Handle camera-specific pixels prefixes = ("pixels.", "observation.pixels.") for key in list(features.keys()): for p in prefixes: if key.startswith(p): suffix = key[len(p):] features[f"observation.images.{suffix}"] = features.pop(key) break return features ``` And from `StateProcessor`: ```python def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: """Transforms: environment_state -> observation.environment_state, agent_pos -> observation.state """ pairs = ( ("environment_state", "observation.environment_state"), ("agent_pos", "observation.state"), ) for old, new in pairs: if old in features: features[new] = features.pop(old) prefixed = f"observation.{old}" if prefixed in features: features[new] = features.pop(prefixed) return features ``` **Key principles:** - Use `features.pop(old_key)` to remove the old feature and get its value - Use `features[new_key] = old_feature` to add the new feature with same properties - Always return the modified features dictionary - Document the transformations clearly in the docstring ## Helper Classes LeRobot provides several pre-built processor classes that handle common transformations, which you can use directly or as building blocks for your custom processors. ### Core Processing Classes - **`ImageProcessor`** - Converts images from numpy arrays (uint8, channel-last) to PyTorch tensors (float32, channel-first) - **`StateProcessor`** - Handles state observations, converting numpy arrays to tensors and renaming keys - **`NormalizerProcessor`** - Normalizes observations and actions using dataset statistics (mean/std or min/max) - **`UnnormalizerProcessor`** - Inverse of NormalizerProcessor, converts normalized values back to original ranges ### Utility Classes - **`DeviceProcessor`** - Moves tensors to specified device (CPU/GPU) - **`ToBatchProcessor`** - Adds batch dimensions to observations and actions - **`RenameProcessor`** - Renames keys in observations using a mapping dictionary - **`TokenizerProcessor`** - Handles text tokenization for language-conditioned policies ### Composition Classes - **`VanillaObservationProcessor`** - Combines ImageProcessor and StateProcessor for complete observation handling - **`RobotProcessor`** - Main container that orchestrates a sequence of processor steps ### Example Usage ```python from lerobot.processor import ( ImageProcessor, NormalizerProcessor, DeviceProcessor, RobotProcessor ) # Create a processing pipeline preprocessing_steps = [ ImageProcessor(), # Convert images NormalizerProcessor(features=features, norm_map=norm_map, stats=dataset_stats), # Normalize DeviceProcessor(device="cuda"), # Move to GPU ] # Combine into a robot processor preprocessor = RobotProcessor(steps=preprocessing_steps, name="my_preprocessor") # Use it processed_transition = preprocessor(raw_transition) ``` These helper classes implement the full `ProcessorStep` protocol, so they can be easily combined and extended for your specific needs. ## Best Practices ### Design Principles - **Keep processors atomic** - Each processor should handle one specific transformation. This makes them more reusable and easier to debug. - **Use dataclasses** - Always implement processors as dataclasses for clean initialization and automatic generation of `__init__`, `__repr__`, etc. ```python @dataclass class MyProcessor: threshold: float = 0.5 normalize: bool = True ``` ### Registration and Discovery - **Always register your processor** - Use the `@ProcessorStepRegistry.register()` decorator to make your processor discoverable: ```python @ProcessorStepRegistry.register("my_custom_processor") @dataclass class MyCustomProcessor: # Implementation ``` ### State Management - **Separate concerns in config vs state_dict** - Keep non-tensor configuration in `get_config()` and only tensors in `state_dict()`: ```python def get_config(self) -> dict[str, Any]: # JSON-serializable data only return {"threshold": self.threshold, "mode": self.mode} def state_dict(self) -> dict[str, torch.Tensor]: # Tensors only return {"running_mean": self.running_mean, "weights": self.weights} ``` ### Safety and Robustness - **Always check for None values** - Validate that required data exists before processing: ```python def __call__(self, transition: EnvTransition) -> EnvTransition: observation = transition.get(TransitionKey.OBSERVATION) if observation is None: return transition # Return unchanged if no observation ``` - **Use copy() for safety** - If performance is not critical, copy transitions to avoid side effects: ```python new_transition = transition.copy() new_transition[TransitionKey.OBSERVATION] = processed_obs return new_transition ``` ### Debugging and Development - **Use hooks for debugging** - RobotProcessor supports hooks for monitoring: ```python def debug_hook(step_idx: int, transition: EnvTransition) -> None: print(f"Step {step_idx}: {list(transition.keys())}") processor = RobotProcessor(steps=[...], before_step_hooks=[debug_hook]) ``` - **Use step_through() for development** - Iterate through processing steps one by one: ```python for step_idx, transition in processor.step_through(data): print(f"After step {step_idx}: {transition}") ``` ### Performance Considerations - **Batch processing** - Design processors to handle batched data efficiently when possible - **Device awareness** - Let `DeviceProcessor` handle device placement rather than hardcoding it - **Memory efficiency** - Reuse tensors when safe to do so for better performance ### Documentation - **Document feature contracts clearly** - Be explicit about how your processor transforms the feature space - **Provide usage examples** - Include docstring examples showing typical usage patterns ## Complete Example: Custom Smoothing Processor Here's a complete example that implements a simple action smoothing processor: ```python from dataclasses import dataclass, field from typing import Any import torch from torch import Tensor from lerobot.processor.pipeline import ( EnvTransition, ProcessorStepRegistry, TransitionKey ) from lerobot.configs.types import PolicyFeature @ProcessorStepRegistry.register("action_smoother") @dataclass class ActionSmoothingProcessor: """Smooths actions using exponential moving average. This processor maintains a running average of actions to reduce jitter from the policy predictions. """ # Configuration alpha: float = 0.7 # Smoothing factor (0 = no smoothing, 1 = no memory) # State (not in config) _previous_action: Tensor | None = field(default=None, init=False, repr=False) _initialized: bool = field(default=False, init=False, repr=False) def __call__(self, transition: EnvTransition) -> EnvTransition: action = transition.get(TransitionKey.ACTION) if action is None: return transition # Convert to tensor if needed if not isinstance(action, torch.Tensor): action = torch.as_tensor(action, dtype=torch.float32) # Initialize on first call if not self._initialized: self._previous_action = action.clone() self._initialized = True smoothed_action = action else: # Exponential moving average: new = alpha * current + (1-alpha) * previous smoothed_action = self.alpha * action + (1 - self.alpha) * self._previous_action self._previous_action = smoothed_action.clone() # Return new transition with smoothed action new_transition = transition.copy() new_transition[TransitionKey.ACTION] = smoothed_action return new_transition def get_config(self) -> dict[str, Any]: """Return JSON-serializable configuration.""" return {"alpha": self.alpha} def state_dict(self) -> dict[str, torch.Tensor]: """Return tensor state.""" if self._previous_action is not None: return {"previous_action": self._previous_action} return {} def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: """Load tensor state.""" if "previous_action" in state: self._previous_action = state["previous_action"] self._initialized = True else: self._previous_action = None self._initialized = False def reset(self) -> None: """Reset processor state at episode boundaries.""" self._previous_action = None self._initialized = False def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: """Action shapes remain unchanged.""" return features # No transformation to feature space ``` ### Usage Example ```python from lerobot.processor import RobotProcessor, DeviceProcessor # Create a postprocessing pipeline with action smoothing postprocessing_steps = [ DeviceProcessor(device="cpu"), # Move to CPU first ActionSmoothingProcessor(alpha=0.8), # Apply smoothing ] postprocessor = RobotProcessor( steps=postprocessing_steps, name="smooth_postprocessor" ) # Use in your policy inference loop for transition in environment_transitions: # Get action from policy action = policy(transition) # Post-process (including smoothing) transition_with_action = {"action": action} smoothed_transition = postprocessor(transition_with_action) # Execute smoothed action next_obs = env.step(smoothed_transition["action"]) ``` This example demonstrates all the key concepts: processor registration, state management, configuration serialization, and proper integration with the LeRobot pipeline system.