From feb3fed5e8041d774a1b9381347828e146d2b56a Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Wed, 6 Aug 2025 17:44:32 +0200 Subject: [PATCH] precommit style nit --- docs/source/implement_your_own_processor.mdx | 120 ++++++++++--------- 1 file changed, 64 insertions(+), 56 deletions(-) diff --git a/docs/source/implement_your_own_processor.mdx b/docs/source/implement_your_own_processor.mdx index b4a2126c2..2195b7af8 100644 --- a/docs/source/implement_your_own_processor.mdx +++ b/docs/source/implement_your_own_processor.mdx @@ -1,13 +1,13 @@ # Implement your processor -In this tutorial, we will explain how to implement your own processor. We will start by motivating +In this tutorial, we will explain how to implement your own processor. We will start by motivating the need for a custom processor and then we will explain the helper classes that can help you implement your own processor. ## Why would you need a custom processor? -In most cases, when reading raw data from a sensor like the camera and robot motor encoders, -you will need to process this data to transform it into a format that is compatible to use with the policies in LeRobot. -For example, raw images are encoded with `uint8` and the values are in the range `[0, 255]`. +In most cases, when reading raw data from a sensor like the camera and robot motor encoders, +you will need to process this data to transform it into a format that is compatible to use with the policies in LeRobot. +For example, raw images are encoded with `uint8` and the values are in the range `[0, 255]`. To use these images with the policies, you will need to cast them to `float32` and normalize them to the range `[0, 1]`. For example, in LeRobot's `ImageProcessor`, raw images come from the environment as numpy arrays with `uint8` values in range `[0, 255]` and in channel-last format `(H, W, C)`. The processor transforms them into PyTorch tensors with `float32` values in range `[0, 1]` and channel-first format `(C, H, W)`: @@ -16,12 +16,12 @@ For example, in LeRobot's `ImageProcessor`, raw images come from the environment # Input: numpy array with shape (480, 640, 3) and dtype uint8 raw_image = env_observation["pixels"] # Values in [0, 255] -# After processing: torch tensor with shape (1, 3, 480, 640) and dtype float32 +# After processing: torch tensor with shape (1, 3, 480, 640) and dtype float32 processed_image = processor(transition)["observation"]["observation.image"] # Values in [0, 1] ``` -On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot. -For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minumum and maximum joint angle positions of the robot. +On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot. +For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minimum and maximum joint angle positions of the robot. For instance, in LeRobot's `UnnormalizerProcessor`, model outputs are in the normalized range `[-1, 1]` and need to be converted back to actual robot joint ranges: @@ -31,21 +31,22 @@ normalized_action = torch.tensor([-0.5, 0.8, -1.0, 0.2]) # Model output # After post-processing: real joint positions in robot's native ranges # Example: joints range from [-180.0, 180.0] -real_action = unnormalizer(transition)["action"] +real_action = unnormalizer(transition)["action"] # real action after post-processing: [ -90., 144., -180., 36.] ``` The unnormalizer uses the dataset statistics to convert back: + ```python # For MIN_MAX normalization: action = (normalized + 1) * (max - min) / 2 + min real_action = (normalized_action + 1) * (max_val - min_val) / 2 + min_val ``` -All this situation point us towards the need for a mechanism to preprocess the data before inputed to the policies and then post-process the action that are returend to be executed on the robot. +All this situation point us towards the need for a mechanism to preprocess the data before being passed to the policies and then post-process the action that are returned to be executed on the robot. To that end, LeRobot provides a pipeline mechanism to implement a sequence of processing steps for the input data and the output action. -## How to implement your own processor? +## How to implement your own processor? Prepare the sequence of processing steps necessary for your problem. A processor step is a class that implements the following methods: @@ -70,68 +71,71 @@ from lerobot.processor.pipeline import EnvTransition, TransitionKey class ImageProcessor: def __call__(self, transition: EnvTransition) -> EnvTransition: observation = transition.get(TransitionKey.OBSERVATION) - + if observation is None: return transition - + processed_obs = {} - + # Copy all observations first for key, value in observation.items(): processed_obs[key] = value - - # Handle pixels key if present + + # Handle pixels key if present pixels = observation.get("pixels") if pixels is not None: # Remove pixels from processed_obs since we'll replace it with processed images processed_obs.pop("pixels", None) - + # Process the image processed_img = self._process_single_image(pixels) processed_obs["observation.image"] = processed_img - + # Return new transition with processed observation new_transition = transition.copy() new_transition[TransitionKey.OBSERVATION] = processed_obs return new_transition - + def _process_single_image(self, img): # Convert to tensor img_tensor = torch.from_numpy(img) - + # Add batch dimension if needed if img_tensor.ndim == 3: img_tensor = img_tensor.unsqueeze(0) - - # Convert to channel-first format: (B, H, W, C) -> (B, C, H, W) + + # Convert to channel-first format: (B, H, W, C) -> (B, C, H, W) img_tensor = einops.rearrange(img_tensor, "b h w c -> b c h w").contiguous() - + # Convert to float32 and normalize to [0, 1] img_tensor = img_tensor.type(torch.float32) / 255.0 - + return img_tensor ``` Key principles for implementing `__call__`: + - Always check if the required data exists (observations, actions, etc.) - Return the original transition unchanged if no processing is needed - Create a copy of the transition to avoid side effects - Only modify the specific keys your processor is responsible for + ### Configuration and State Management LeRobot processors support serialization and deserialization through three key methods. Here's how they work using `NormalizerProcessor` as an example: #### `get_config()` - Serializable Configuration + This method returns all non-tensor configuration that can be saved to JSON: ```python -@dataclass +@dataclass class NormalizerProcessor: features: dict[str, PolicyFeature] - norm_map: dict[FeatureType, NormalizationMode] + norm_map: dict[FeatureType, NormalizationMode] normalize_keys: set[str] | None = None eps: float = 1e-8 - + def get_config(self) -> dict[str, Any]: """Return JSON-serializable configuration.""" return { @@ -144,11 +148,12 @@ class NormalizerProcessor: ``` #### `state_dict()` - Tensor State + This method returns only PyTorch tensors that need special serialization: ```python def state_dict(self) -> dict[str, torch.Tensor]: - """Return tensor state dictionary.""" + """Return tensor state dictionary.""" state = {} for key, stats in self._tensor_stats.items(): for stat_name, tensor_val in stats.items(): @@ -156,7 +161,8 @@ def state_dict(self) -> dict[str, torch.Tensor]: return state ``` -#### `load_state_dict()` - Restore Tensor State +#### `load_state_dict()` - Restore Tensor State + This method restores the tensor state from a saved state dictionary: ```python @@ -173,6 +179,7 @@ def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: ``` #### Usage Example + ```python # Save processor config = processor.get_config() @@ -202,7 +209,7 @@ def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, Poli features["observation.image"] = features.pop("pixels") if "observation.pixels" in features: features["observation.image"] = features.pop("observation.pixels") - + # Handle camera-specific pixels prefixes = ("pixels.", "observation.pixels.") for key in list(features.keys()): @@ -236,6 +243,7 @@ def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, Poli ``` **Key principles:** + - Use `features.pop(old_key)` to remove the old feature and get its value - Use `features[new_key] = old_feature` to add the new feature with same properties - Always return the modified features dictionary @@ -248,7 +256,7 @@ LeRobot provides several pre-built processor classes that handle common transfor ### Core Processing Classes - **`ImageProcessor`** - Converts images from numpy arrays (uint8, channel-last) to PyTorch tensors (float32, channel-first) -- **`StateProcessor`** - Handles state observations, converting numpy arrays to tensors and renaming keys +- **`StateProcessor`** - Handles state observations, converting numpy arrays to tensors and renaming keys - **`NormalizerProcessor`** - Normalizes observations and actions using dataset statistics (mean/std or min/max) - **`UnnormalizerProcessor`** - Inverse of NormalizerProcessor, converts normalized values back to original ranges @@ -268,8 +276,8 @@ LeRobot provides several pre-built processor classes that handle common transfor ```python from lerobot.processor import ( - ImageProcessor, - NormalizerProcessor, + ImageProcessor, + NormalizerProcessor, DeviceProcessor, RobotProcessor ) @@ -310,7 +318,7 @@ class MyProcessor: ```python @ProcessorStepRegistry.register("my_custom_processor") -@dataclass +@dataclass class MyCustomProcessor: # Implementation ``` @@ -323,8 +331,8 @@ class MyCustomProcessor: def get_config(self) -> dict[str, Any]: # JSON-serializable data only return {"threshold": self.threshold, "mode": self.mode} - -def state_dict(self) -> dict[str, torch.Tensor]: + +def state_dict(self) -> dict[str, torch.Tensor]: # Tensors only return {"running_mean": self.running_mean, "weights": self.weights} ``` @@ -388,63 +396,63 @@ import torch from torch import Tensor from lerobot.processor.pipeline import ( - EnvTransition, - ProcessorStepRegistry, + EnvTransition, + ProcessorStepRegistry, TransitionKey ) from lerobot.configs.types import PolicyFeature -@ProcessorStepRegistry.register("action_smoother") +@ProcessorStepRegistry.register("action_smoother") @dataclass class ActionSmoothingProcessor: """Smooths actions using exponential moving average. - + This processor maintains a running average of actions to reduce jitter from the policy predictions. """ - + # Configuration alpha: float = 0.7 # Smoothing factor (0 = no smoothing, 1 = no memory) - + # State (not in config) _previous_action: Tensor | None = field(default=None, init=False, repr=False) _initialized: bool = field(default=False, init=False, repr=False) - + def __call__(self, transition: EnvTransition) -> EnvTransition: action = transition.get(TransitionKey.ACTION) - + if action is None: return transition - + # Convert to tensor if needed if not isinstance(action, torch.Tensor): action = torch.as_tensor(action, dtype=torch.float32) - + # Initialize on first call if not self._initialized: self._previous_action = action.clone() self._initialized = True smoothed_action = action else: - # Exponential moving average: new = alpha * current + (1-alpha) * previous + # Exponential moving average: new = alpha * current + (1-alpha) * previous smoothed_action = self.alpha * action + (1 - self.alpha) * self._previous_action self._previous_action = smoothed_action.clone() - + # Return new transition with smoothed action new_transition = transition.copy() new_transition[TransitionKey.ACTION] = smoothed_action return new_transition - + def get_config(self) -> dict[str, Any]: """Return JSON-serializable configuration.""" return {"alpha": self.alpha} - + def state_dict(self) -> dict[str, torch.Tensor]: """Return tensor state.""" if self._previous_action is not None: return {"previous_action": self._previous_action} return {} - + def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: """Load tensor state.""" if "previous_action" in state: @@ -453,12 +461,12 @@ class ActionSmoothingProcessor: else: self._previous_action = None self._initialized = False - + def reset(self) -> None: """Reset processor state at episode boundaries.""" self._previous_action = None self._initialized = False - + def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: """Action shapes remain unchanged.""" return features # No transformation to feature space @@ -476,19 +484,19 @@ postprocessing_steps = [ ] postprocessor = RobotProcessor( - steps=postprocessing_steps, + steps=postprocessing_steps, name="smooth_postprocessor" ) # Use in your policy inference loop for transition in environment_transitions: # Get action from policy - action = policy(transition) - + action = policy(transition) + # Post-process (including smoothing) transition_with_action = {"action": action} smoothed_transition = postprocessor(transition_with_action) - + # Execute smoothed action next_obs = env.step(smoothed_transition["action"]) ```