# Introduction to Processors In robotics, there's a fundamental mismatch between the data that robots and humans produce and what machine learning models expect. This creates several translation challenges: **Raw Robot Data → Model Input:** - Robots output raw sensor data (camera images, joint positions, force readings) that need normalization, batching, and device placement before models can process them - Language instructions from humans ("pick up the red cube") must be tokenized into numerical representations - Different robots use different coordinate systems and units that need standardization **Model Output → Robot Commands:** - Models might output end-effector positions, but robots need joint-space commands - Teleoperators (like gamepads) produce relative movements (delta positions), but robots expect absolute commands - Model predictions are often normalized and need to be converted back to real-world scales **Cross-Domain Translation:** - Training data from one robot setup needs adaptation for deployment on different hardware - Models trained with specific camera configurations must work with new camera arrangements - Datasets with different naming conventions need harmonization **That's where processors come in.** They serve as the universal translators that bridge these gaps, ensuring seamless data flow from sensors to models to actuators. Processors are the data transformation backbone of LeRobot. They handle all the preprocessing and postprocessing steps needed to convert raw environment data into model-ready inputs and vice versa. This guide will walk you through everything you need to know about processors - from basic concepts to advanced usage patterns. ## What are Processors? In robotics, data comes in many forms - images from cameras, joint positions from sensors, text instructions from users, and more. Each type of data requires specific transformations before a model can use it effectively. Models need this data to be: - **Normalized**: Scaled to appropriate ranges for neural network processing - **Batched**: Organized with proper dimensions for batch processing - **Tokenized**: Text converted to numerical representations - **Device-placed**: Moved to the right hardware (CPU/GPU) - **Type-converted**: Cast to appropriate data types Processors handle these transformations through composable, reusable steps that can be chained together into pipelines. Think of them as a modular assembly line where each station performs a specific transformation on your data. ## Core Concepts ### EnvTransition: The Universal Data Container The `EnvTransition` is the fundamental data structure that flows through all processors. It's a typed dictionary that represents a complete robot-environment interaction: ```python from lerobot.processor.pipeline import TransitionKey, EnvTransition # Example transition from a robot collecting data transition: EnvTransition = { TransitionKey.OBSERVATION: { "observation.images.camera0": camera0_image_tensor, # Shape: (H, W, C) "observation.images.camera1": camera1_image_tensor, # Shape: (H, W, C) "observation.state": joint_positions_tensor, # Shape: (7,) for 7-DOF arm "observation.environment_state": env_state_tensor # Shape: (3,) for object position }, TransitionKey.ACTION: action_tensor, # Shape: (7,) for joint velocities TransitionKey.REWARD: 0.0, # Scalar reward signal TransitionKey.DONE: False, # Episode termination flag TransitionKey.TRUNCATED: False, # Episode truncation flag TransitionKey.INFO: {"success": False}, # Additional metadata TransitionKey.COMPLEMENTARY_DATA: { "task": "pick up the red cube", # Language instruction "task_index": 0, # Task identifier "index": 42 # Frame index } } ``` Each key in the transition has a specific purpose: - **OBSERVATION**: All sensor data (images, states, proprioception) - **ACTION**: The action to execute or that was executed - **REWARD**: Reinforcement learning signal - **DONE/TRUNCATED**: Episode boundary indicators - **INFO**: Arbitrary metadata - **COMPLEMENTARY_DATA**: Task descriptions, indices, padding flags, inter-step data (e.g., you need to compute the velocities and then use this velocity to clip the action) ### ProcessorStep: The Building Block Interface A `ProcessorStep` is a single transformation unit that processes transitions. It's a protocol (interface) that any processor step must implement: ```python from lerobot.processor.pipeline import ProcessorStep, EnvTransition from lerobot.configs.types import PolicyFeature from typing import Any import torch class MyProcessorStep: """Example processor step interface - all methods must be implemented.""" def __call__(self, transition: EnvTransition) -> EnvTransition: """Transform the transition - this is the main processing logic.""" raise NotImplementedError def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: """Declare how this step transforms feature shapes/types.""" raise NotImplementedError def get_config(self) -> dict[str, Any]: """Return JSON-serializable configuration for saving/loading.""" raise NotImplementedError def state_dict(self) -> dict[str, torch.Tensor]: """Return any learnable parameters (tensors only).""" raise NotImplementedError def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: """Load learnable parameters from saved state.""" raise NotImplementedError def reset(self) -> None: """Reset any internal state between episodes.""" raise NotImplementedError ``` ### RobotProcessor: The Pipeline Orchestrator The `RobotProcessor` chains multiple `ProcessorStep` instances together, executing them sequentially. It provides automatic format conversion to handle both batch dictionaries (from datasets) and EnvTransition dictionaries: ```python from lerobot.processor.pipeline import RobotProcessor, _default_batch_to_transition, _default_transition_to_batch # Create a processing pipeline processor = RobotProcessor( steps=[ step1, # First transformation step2, # Second transformation step3 # Third transformation ], name="my_preprocessing_pipeline", # Optional: Custom converters for input/output formats to_transition=_default_batch_to_transition, # How to convert batch dict → EnvTransition to_output=_default_transition_to_batch # How to convert EnvTransition → output format ) # The processor automatically handles different input formats: # 1. If input is a batch dict (from dataset), converts to EnvTransition # 2. Passes through each step sequentially # 3. Converts back to original format (or custom output format) # Example with batch dict input (common in training) batch_dict = {"observation.state": tensor, "action": tensor} output = processor(batch_dict) # Automatically converted to/from EnvTransition # Example with EnvTransition input (common in inference) transition = {TransitionKey.OBSERVATION: {...}, TransitionKey.ACTION: ...} output = processor(transition) # Stays as EnvTransition throughout ``` The `to_transition` and `to_output` converters enable seamless integration with existing codebases. By default, they handle the standard LeRobot batch format, but you can customize them for different data structures. ### Additional Converter Functions LeRobot provides several specialized converter functions for common robotics scenarios: ```python from lerobot.processor.converters import ( to_transition_teleop_action, to_transition_robot_observation, to_output_robot_action, to_dataset_frame ) ``` **`to_transition_teleop_action`** - Converts teleoperation device actions to EnvTransitions: ```python # Use case: Phone, gamepad, or other teleop device control phone_action = {"x": 0.1, "y": -0.2, "gripper": 0.8} transition = to_transition_teleop_action(phone_action) # Creates: {ACTION: {"action.x": 0.1, "action.y": -0.2, "action.gripper": 0.8}, ...} ``` **`to_transition_robot_observation`** - Converts robot sensor data to EnvTransitions: ```python # Use case: Live robot observation during inference robot_obs = { "joint_1": 0.5, "joint_2": -0.3, # joint positions "camera_0": image_array # camera images } transition = to_transition_robot_observation(robot_obs) # Creates: {OBSERVATION: {"observation.state.joint_1": 0.5, "observation.images.camera_0": image, ...}} ``` **`to_output_robot_action`** - Extracts robot-executable actions from EnvTransitions: ```python # Use case: Converting model outputs back to robot commands model_transition = {ACTION: {"action.joint_1": 0.2, "action.joint_2": 0.1}} robot_action = to_output_robot_action(model_transition) # Returns: {"joint_1": 0.2, "joint_2": 0.1} - ready for robot.send_action() ``` **`to_dataset_frame`** - Converts transitions to dataset-compatible format: ```python # Use case: Saving processed data or creating training batches features = { "action": {"names": ["joint_1", "joint_2"]}, "observation.state": {"names": ["joint_1", "joint_2"]}, "observation.images.camera0": {...} } batch = to_dataset_frame(transition, features) # Returns: {"action": [0.2, 0.1], "observation.state": [0.5, -0.3], ...} ``` These converters are particularly useful when integrating with real robots, as shown in the examples: ```python # Example from phone_so100_teleop.py - Real robot teleoperation phone_to_robot_ee_pose = RobotProcessor( steps=[...], to_transition=to_transition_teleop_action, # Phone → EnvTransition to_output=lambda tr: tr # Keep as EnvTransition ) # Example from phone_so100_eval.py - Robot action execution robot_ee_to_joints = RobotProcessor( steps=[...], to_transition=lambda tr: tr, # Already EnvTransition to_output=to_output_robot_action # EnvTransition → Robot action ) # Example from phone_so100_record.py - Dataset recording robot_joints_to_ee_pose = RobotProcessor( steps=[...], to_transition=to_transition_robot_observation, # Robot obs → EnvTransition to_output=lambda tr: tr # Keep as EnvTransition for dataset ) ``` ### Data Format Conversion Different data sources have different formats, but processors need a unified `EnvTransition` structure internally. The default converters handle LeRobot datasets, but you can customize them: ```python # Default: LeRobot batch format lerobot_batch = { "observation.state": torch.tensor(...), "action": torch.tensor(...), "next.reward": torch.tensor(...), "task": ["pick cube", ...] } # → Converts to EnvTransition → Processes → Converts back # Custom: Live robot data robot_data = { "cameras": {"wrist_cam": np.array(...)}, "joint_positions": np.array(...), "gripper_state": 0.5 } def robot_to_transition(data: dict) -> EnvTransition: return { TransitionKey.OBSERVATION: { "observation.images.wrist": torch.from_numpy(data["cameras"]["wrist_cam"]), "observation.state": torch.from_numpy(data["joint_positions"]) }, TransitionKey.ACTION: None, # ... other fields with defaults } # Use custom converter processor = RobotProcessor( steps=[...], to_transition=robot_to_transition, to_output=lambda transition: transition # Keep as EnvTransition ) ``` **When to customize:** Live robot data, Gymnasium environments, legacy datasets, or any non-LeRobot format. ## Common Processor Steps LeRobot provides a rich set of pre-built processor steps for common transformations. Let's explore each in detail: ### Data Normalization Normalization is crucial for neural network training and inference. The `NormalizerProcessor` handles both mean-std normalization and min-max scaling: ```python from lerobot.processor.normalize_processor import NormalizerProcessor, UnnormalizerProcessor from lerobot.configs.types import PolicyFeature, FeatureType, NormalizationMode # Define what features exist in your data features = { "observation.images.camera0": PolicyFeature( type=FeatureType.IMAGE, shape=(224, 224, 3) ), "observation.state": PolicyFeature( type=FeatureType.STATE, shape=(7,) ), "action": PolicyFeature( type=FeatureType.ACTION, shape=(7,) ) } # Define normalization strategy per feature type norm_map = { FeatureType.IMAGE: NormalizationMode.MEAN_STD, # Images: (x - mean) / std FeatureType.STATE: NormalizationMode.MIN_MAX, # States: scale to [-1, 1] FeatureType.ACTION: NormalizationMode.MIN_MAX # Actions: scale to [-1, 1] } # Create normalizer with dataset statistics normalizer = NormalizerProcessor( features=features, norm_map=norm_map, stats=dataset.meta.stats, # Contains mean, std, min, max per feature normalize_keys={"observation.state", "action"} # Optional: only normalize specific keys ) # For postprocessing: inverse transformation unnormalizer = UnnormalizerProcessor( features=features, norm_map=norm_map, stats=dataset.meta.stats ) # The normalizer automatically: # - Detects which normalization to apply based on feature type # - Handles device placement of statistics tensors # - Skips keys not in stats or not in normalize_keys # - Adds metadata about what was normalized ``` ### Device Management The `DeviceProcessor` ensures tensors are on the right device with the right dtype: ```python from lerobot.processor.device_processor import DeviceProcessor # Basic GPU placement gpu_processor = DeviceProcessor(device="cuda:0") # Advanced: GPU with half-precision for inference efficient_processor = DeviceProcessor( device="cuda:0", float_dtype="float16" # Convert float32 -> float16 for memory efficiency ) # The processor: # - Moves all tensors to specified device # - Preserves non-tensor data unchanged # - Optionally converts float dtypes while preserving int/bool types # - Uses non_blocking transfers for CUDA devices # - Handles nested structures (observations, complementary_data) # Supported float dtypes: # "float16" / "half": 16-bit floating point # "float32" / "float": 32-bit floating point (default) # "float64" / "double": 64-bit floating point # "bfloat16": Brain floating point (better for training) ``` ### Batch Processing Models expect batched inputs, but robot interactions often produce unbatched data: ```python from lerobot.processor.batch_processor import ToBatchProcessor batch_processor = ToBatchProcessor() # Automatically adds batch dimensions where needed: # State: (7,) -> (1, 7) # Image: (224, 224, 3) -> (1, 224, 224, 3) # Action: (4,) -> (1, 4) # Task: "pick_cube" -> ["pick_cube"] # Already batched: (1, 7) -> (1, 7) [unchanged] # The processor intelligently: # - Detects tensor dimensionality # - Adds batch dim to 1D states/actions # - Adds batch dim to 3D images # - Wraps string tasks in lists # - Preserves already-batched data # Example usage in inference: single_observation = robot.get_observation() # Unbatched batched_input = batch_processor({"observation": single_observation}) model_output = model(batched_input) # Model expects batch dim ``` ### Text Tokenization For language-conditioned policies, text instructions must be tokenized: ```python from lerobot.processor.tokenizer_processor import TokenizerProcessor from transformers import AutoTokenizer # Option 1: Auto-load tokenizer by name tokenizer_proc = TokenizerProcessor( tokenizer_name="google/paligemma-3b-pt-224", max_length=128, task_key="task", # Where to find text in complementary_data padding="max_length", # Pad to max_length padding_side="right", truncation=True # Truncate if longer than max_length ) # Option 2: Provide custom tokenizer custom_tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium") custom_proc = TokenizerProcessor( tokenizer=custom_tokenizer, max_length=256, padding_side="left" # For autoregressive models ) # The processor: # - Extracts task text from complementary_data # - Tokenizes using HuggingFace tokenizer # - Adds tokens and attention_mask to observations # - Handles both single strings and lists of strings # - Preserves original task in complementary_data # Output structure: # observation["observation.language.tokens"] = tensor([101, 2032, ...]) # observation["observation.language.attention_mask"] = tensor([1, 1, 0, ...]) ``` ### Key Renaming Different datasets and models may use different naming conventions. The `RenameProcessor` solves this mismatch: **Why is this useful?** - When loading a model trained on a different dataset with different key names - When using foundation models that expect specific key naming conventions - When standardizing datasets from different sources - When adapting legacy code to new naming standards ```python from lerobot.processor.rename_processor import RenameProcessor # Example 1: Dataset uses "top"/"wrist", model expects "camera0"/"camera1" rename_proc = RenameProcessor( rename_map={ "observation.images.top": "observation.images.camera0", "observation.images.wrist": "observation.images.camera1", } ) # Example 2: Foundation model compatibility # Your dataset: "observation.state", Foundation model: "proprio" foundation_rename = RenameProcessor( rename_map={ "observation.state": "proprio", "observation.images.main": "rgb", } ) # Example 3: Standardizing multiple datasets standardize_rename = RenameProcessor( rename_map={ # Different robots might use different names "observation.joint_positions": "observation.state", "observation.gripper_state": "observation.end_effector", "observation.arm_camera": "observation.images.wrist", } ) ``` ## Building Complete Pipelines Let's build a real-world preprocessing and postprocessing pipeline for a vision-based manipulation policy: ```python # Consolidated imports from lerobot.processor import ( RobotProcessor, NormalizerProcessor, UnnormalizerProcessor, DeviceProcessor, ToBatchProcessor, TokenizerProcessor, RenameProcessor ) # Step 1: Define the preprocessing pipeline preprocessor = RobotProcessor( steps=[ # 1. Standardize naming from dataset RenameProcessor( rename_map={ "observation.images.top": "observation.images.camera0", "observation.images.wrist": "observation.images.camera1" } ), # 2. Add batch dimensions for model ToBatchProcessor(), # 3. Tokenize language instructions if present TokenizerProcessor( tokenizer_name="google/paligemma-3b-pt-224", max_length=64, task_key="task" ), # 4. Normalize numerical data NormalizerProcessor( features=policy_features, norm_map={ FeatureType.IMAGE: NormalizationMode.MEAN_STD, FeatureType.STATE: NormalizationMode.MIN_MAX, FeatureType.ACTION: NormalizationMode.MIN_MAX }, stats=dataset.meta.stats ), # 5. Move to GPU and convert to half precision DeviceProcessor( device="cuda:0", float_dtype="float16" ) ], name="robot_preprocessor" ) # Step 2: Define the postprocessing pipeline postprocessor = RobotProcessor( steps=[ # 1. Move back to CPU for robot hardware DeviceProcessor(device="cpu"), # 2. Denormalize actions to original scale UnnormalizerProcessor( features=policy_features, norm_map={ FeatureType.ACTION: NormalizationMode.MIN_MAX }, stats=dataset.meta.stats ) ], name="robot_postprocessor" ) ``` ## Using Processors in Practice ### Training Loop Integration Here's how processors integrate into a training loop using the policy's forward method: ```python from torch.utils.data import DataLoader # Create dataset and dataloader dataset = LeRobotDataset(repo_id="your_dataset") dataloader = DataLoader(dataset, batch_size=32, shuffle=True) # Initialize model and processors model = YourPolicy.from_pretrained("your_model") preprocessor = RobotProcessor.from_pretrained( "your_model", config_filename="robot_preprocessor.json" ) # Training loop for epoch in range(num_epochs): for batch in dataloader: # Preprocess batch processed_batch = preprocessor(batch) # Forward pass - returns loss and optional metrics loss, metrics = model.forward(processed_batch) # Backward pass optimizer.zero_grad() loss.backward() optimizer.step() # Log metrics if available if metrics: wandb.log(metrics) ``` ### Inference Pipeline For deployment, processors ensure consistent data handling with real robots: ```python # Load model and processors policy = YourPolicy.from_pretrained("path/to/model") preprocessor = RobotProcessor.from_pretrained( "path/to/model", config_filename="robot_preprocessor.json" ) postprocessor = RobotProcessor.from_pretrained( "path/to/model", config_filename="robot_postprocessor.json" ) # Connect to robot robot = make_robot_from_config(robot_config) robot.connect() # Inference loop policy.eval() # Reset the policy and processors policy.reset() preprocessor.reset() postprocessor.reset() with torch.no_grad(): while not done: # Get observation from robot observation = robot.get_observation() # Build dataset-compatible frame observation_frame = build_dataset_frame( dataset.features, observation, prefix="observation" ) # Add task instruction to complementary data observation_frame["task"] = "pick up the red cube" # Preprocess for model model_input = preprocessor(observation_frame) # Run policy raw_action = policy.select_action(model_input) # Postprocess action action_transition = {TransitionKey.ACTION: raw_action} processed = postprocessor(action_transition) action = processed[TransitionKey.ACTION] # Convert to robot action format robot_action = { key: action[i].item() for i, key in enumerate(robot.action_features) } # Execute on robot robot.send_action(robot_action) ``` ## Saving and Loading Processors Processors can be persisted and shared just like models, making them portable across different environments and ensuring reproducibility: ### Local Save/Load ```python # Save processor configuration and state preprocessor.save_pretrained( "./my_robot_processor", config_filename="preprocessor.json" # Optional custom name ) # The save creates: # my_robot_processor/ # ├── preprocessor.json # Configuration # ├── preprocessor_step_0_normalizer.safetensors # Step 0 state (stats) # └── preprocessor_step_1_device.safetensors # Step 1 state (if any) # Load processor loaded = RobotProcessor.from_pretrained( "./my_robot_processor", config_filename="preprocessor.json" ) ``` ### HuggingFace Hub Integration The HuggingFace Hub provides a centralized place to share and version your processors. This is particularly useful for sharing preprocessing configurations with models, ensuring that anyone who downloads your model can reproduce your exact preprocessing pipeline. It also enables versioning and collaboration on preprocessing strategies. ```python # Save to HuggingFace Hub preprocessor.save_pretrained("username/my-robot-policy") # Load from Hub with automatic download hub_processor = RobotProcessor.from_pretrained( "username/my-robot-policy", config_filename="robot_preprocessor.json", revision="main", # Optional: specific revision cache_dir="./cache" # Optional: local cache directory ) # The Hub integration provides: # - Automatic versioning with git # - Public or private sharing # - Download caching for efficiency # - Integration with model repositories ``` ### Loading with Overrides Sometimes you need to modify loaded processors for new environments or datasets. The override mechanism allows you to update specific processor configurations without modifying the saved files: ```python # Load processor with configuration overrides processor = RobotProcessor.from_pretrained( "./saved_processor", overrides={ # Change device for different hardware "device_processor": {"device": "cuda:1"}, # Update statistics for new dataset "normalizer_processor": {"stats": new_dataset.meta.stats}, # Provide non-serializable objects (like tokenizers) "tokenizer_processor": {"tokenizer": custom_tokenizer} } ) # Common override scenarios: # 1. Adapting to different hardware (GPU availability) # 2. Fine-tuning on new datasets with different statistics # 3. Providing runtime dependencies that can't be serialized # 4. Testing variations without creating new saved configs ``` ## Creating Custom Processor Steps Build your own processor steps for specialized transformations. The key is implementing the required interface: ### Basic Custom Step with Registration The registration mechanism allows your custom processors to be saved and loaded by name rather than by module path. This makes them more portable and easier to share: ```python from dataclasses import dataclass from lerobot.processor.pipeline import ProcessorStepRegistry, ObservationProcessor # The @register decorator adds your processor to the global registry # Use a unique name, preferably namespaced to avoid conflicts @dataclass @ProcessorStepRegistry.register("my_company/gaussian_noise") class GaussianNoiseProcessor(ObservationProcessor): """Add Gaussian noise to observations for robustness training.""" noise_std: float = 0.01 training_only: bool = True is_training: bool = True def observation(self, observation): """Add noise to observation tensors.""" if not self.is_training and self.training_only: return observation noisy_obs = {} for key, value in observation.items(): if isinstance(value, torch.Tensor) and "image" not in key: # Add noise to non-image observations noise = torch.randn_like(value) * self.noise_std noisy_obs[key] = value + noise else: noisy_obs[key] = value return noisy_obs def get_config(self): return { "noise_std": self.noise_std, "training_only": self.training_only, "is_training": self.is_training } # Why register? # 1. Enables saving by name: config saves "my_company/gaussian_noise" instead of full module path # 2. More portable: Others can use your processor without your exact module structure # 3. Version-safe: Module refactoring won't break saved configs # 4. Cleaner configs: JSON shows readable names instead of long import paths ``` ### Using Base Classes for Common Patterns LeRobot provides base classes like `ObservationProcessor`, `ActionProcessor`, etc., that handle the boilerplate of extracting and reinserting specific components: ```python from lerobot.processor import ActionProcessor @dataclass @ProcessorStepRegistry.register("my_company/action_clipper") class ActionClipProcessor(ActionProcessor): """Clip actions to safe ranges.""" min_value: float = -1.0 max_value: float = 1.0 def action(self, action): """Process only the action component.""" # No need to handle transition dict - base class does it return torch.clamp(action, self.min_value, self.max_value) def get_config(self): return {"min_value": self.min_value, "max_value": self.max_value} ``` For more advanced processor patterns including stateful processors, see [Implement Your Own Processor](implement_your_own_processor.mdx). ## Advanced Features ### Debugging with Hooks Processors support hooks for monitoring and debugging without modifying the pipeline code: ```python # Define monitoring hooks def log_shapes(step_idx: int, transition: EnvTransition): """Log tensor shapes after each step.""" obs = transition.get(TransitionKey.OBSERVATION) if obs: print(f"Step {step_idx} shapes:") for key, value in obs.items(): if isinstance(value, torch.Tensor): print(f" {key}: {value.shape}") def check_nans(step_idx: int, transition: EnvTransition): """Check for NaN values.""" obs = transition.get(TransitionKey.OBSERVATION) if obs: for key, value in obs.items(): if isinstance(value, torch.Tensor) and torch.isnan(value).any(): print(f"Warning: NaN detected in {key} at step {step_idx}") # Register hooks processor.register_after_step_hook(log_shapes) processor.register_after_step_hook(check_nans) # Process data - hooks will be called after each step output = processor(input_data) # Remove hooks when done debugging processor.unregister_after_step_hook(log_shapes) processor.unregister_after_step_hook(check_nans) ``` ### Step-by-Step Inspection Use `step_through()` for detailed debugging of the transformation pipeline: ```python # Inspect data at each transformation stage for i, intermediate in enumerate(processor.step_through(data)): print(f"\n=== After step {i} ===") # Check observation shapes obs = intermediate.get(TransitionKey.OBSERVATION) if obs: for key, value in obs.items(): if isinstance(value, torch.Tensor): print(f"{key}: shape={value.shape}, " f"dtype={value.dtype}, " f"device={value.device}, " f"range=[{value.min():.3f}, {value.max():.3f}]") # Check action if present action = intermediate.get(TransitionKey.ACTION) if action is not None and isinstance(action, torch.Tensor): print(f"action: shape={action.shape}, range=[{action.min():.3f}, {action.max():.3f}]") ``` ### Pipeline Slicing Extract subsets of a pipeline for testing or creating variations: ```python # Get specific steps first_three_steps = processor[:3] # Returns new RobotProcessor middle_step = processor[2] # Returns single ProcessorStep # Test individual steps test_input = {...} step_output = processor[0](test_input) # Test first step only # Create variations variant_processor = RobotProcessor( steps=processor.steps[:-1] + [new_final_step], name="variant" ) ``` ## Best Practices and Tips ### 1. Order Matters The sequence of processors is crucial. Follow this general order: ```python # Preprocessing: Raw → Model-ready 1. Rename (standardize keys) 2. Batch (add dimensions) 3. Tokenize (text → tokens) 4. Normalize (scale values) 5. Device (move to GPU) # Postprocessing: Model → Robot-ready 1. Device (move to CPU) 2. Unnormalize (restore scale) 3. Unbatch (remove dimensions if needed) ``` ### 2. Registration Best Practices ```python # Always register custom steps for better portability @ProcessorStepRegistry.register("my_company/special_processor") class SpecialProcessor: ... # Use namespaced names to avoid conflicts # Good: "my_company/augmentation" # Bad: "augmentation" (too generic) # Check registered processors print(ProcessorStepRegistry.list()) # See all registered processors ``` ### 3. Common Pitfalls and Solutions **Tensor Device Mismatch:** ```python # Problem: RuntimeError: Expected all tensors on same device # Solution: Ensure DeviceProcessor is in pipeline preprocessor = RobotProcessor( steps=[ NormalizerProcessor(...), DeviceProcessor(device="cuda") # Add this ] ) ``` **Missing Statistics:** ```python # Problem: NormalizerProcessor has no stats # Solution 1: Compute stats from dataset from lerobot.datasets.compute_stats import compute_stats stats = compute_stats(dataset) # Solution 2: Load with overrides processor = RobotProcessor.from_pretrained( "model_path", overrides={"normalizer_processor": {"stats": dataset.meta.stats}} ) ``` ## Next Steps Now that you understand processors, explore these topics: - [**Implement Your Own Processor**](implement_your_own_processor.mdx) - Deep dive into creating custom processors with advanced features like stateful processing - [**Policy Documentation**](policies.mdx) - Learn how different policies use processors - [**Dataset Documentation**](datasets.mdx) - Understand the data format that processors transform - [**Training Guide**](training.mdx) - See processors in action during model training - [**Evaluation Guide**](evaluation.mdx) - Learn about processor usage during policy evaluation ## Summary Processors are the unsung heroes of robotics pipelines, handling the critical transformations between raw sensor data and model-ready tensors. By understanding and effectively using processors, you can: - Build robust, reusable data pipelines - Share preprocessing configurations across projects - Debug data transformations systematically - Ensure consistency between training and deployment - Create custom transformations for specialized tasks Remember: good preprocessing is often the difference between a model that works in theory and one that works in practice! The modular pipeline approach ensures your transformations are testable, reproducible, and portable across different robots and environments.