# Introduction to Processors In robotics, there's a fundamental mismatch between the data that robots and humans produce and what machine learning models expect. This creates several translation challenges: **Raw Robot Data → Model Input:** - Robots output raw sensor data (camera images, joint positions, force readings) that need normalization, batching, and device placement before models can process them - Language instructions from humans ("pick up the red cube") must be tokenized into numerical representations - Different robots use different coordinate systems and units that need standardization **Model Output → Robot Commands:** - Models might output end-effector positions, but robots need joint-space commands - Teleoperators (like gamepads) produce relative movements (delta positions), but robots expect absolute commands - Model predictions are often normalized and need to be converted back to real-world scales **Cross-Domain Translation:** - Training data from one robot setup needs adaptation for deployment on different hardware - Models trained with specific camera configurations must work with new camera arrangements - Datasets with different naming conventions need harmonization **That's where processors come in.** They serve as the universal translators that bridge these gaps, ensuring seamless data flow from sensors to models to actuators. Processors are the data transformation backbone of LeRobot. They handle all the preprocessing and postprocessing steps needed to convert raw environment data into model-ready inputs and vice versa. ## What are Processors? In robotics, data comes in many forms - images from cameras, joint positions from sensors, text instructions from users, and more. Each type of data requires specific transformations before a model can use it effectively. Models need this data to be: - **Normalized**: Scaled to appropriate ranges for neural network processing - **Batched**: Organized with proper dimensions for batch processing - **Tokenized**: Text converted to numerical representations - **Device-placed**: Moved to the right hardware (CPU/GPU) - **Type-converted**: Cast to appropriate data types Processors handle these transformations through composable, reusable steps that can be chained together into pipelines. Think of them as a modular assembly line where each station performs a specific transformation on your data. ## Core Concepts ### EnvTransition: The Universal Data Container The `EnvTransition` is the fundamental data structure that flows through all processors. It's a strongly-typed dictionary that represents a complete robot-environment interaction: ```python from lerobot.processor import TransitionKey, EnvTransition, PolicyAction, RobotAction # EnvTransition is precisely typed to handle different action types: # - PolicyAction: torch.Tensor (for model inputs/outputs) # - RobotAction: dict[str, Any] (for robot hardware) # - EnvAction: np.ndarray (for gym environments) # Example transition from a robot collecting data transition: EnvTransition = { TransitionKey.OBSERVATION: { "observation.images.camera0": camera0_image_tensor, # Shape: (H, W, C) "observation.images.camera1": camera1_image_tensor, # Shape: (H, W, C) "observation.state": joint_positions_tensor, # Shape: (7,) for 7-DOF arm "observation.environment_state": env_state_tensor # Shape: (3,) for object position }, TransitionKey.ACTION: action_tensor, # PolicyAction | RobotAction | EnvAction | None TransitionKey.REWARD: 0.0, # float | torch.Tensor | None TransitionKey.DONE: False, # bool | torch.Tensor | None TransitionKey.TRUNCATED: False, # bool | torch.Tensor | None TransitionKey.INFO: {"success": False}, # dict[str, Any] | None TransitionKey.COMPLEMENTARY_DATA: { "task": "pick up the red cube", # Language instruction } } ``` Each key in the transition has a specific purpose: - **OBSERVATION**: All sensor data (images, states, proprioception) - **ACTION**: The action to execute or that was executed - **REWARD**: Reinforcement learning signal - **DONE/TRUNCATED**: Episode boundary indicators - **INFO**: Arbitrary metadata - **COMPLEMENTARY_DATA**: Task descriptions, indices, padding flags, inter-step data ### ProcessorStep: The Building Block A `ProcessorStep` is a single transformation unit that processes transitions. It's an abstract base class with two required methods: ```python from lerobot.processor import ProcessorStep, EnvTransition class MyProcessorStep(ProcessorStep): """Example processor step - inherit and implement abstract methods.""" def __call__(self, transition: EnvTransition) -> EnvTransition: """Transform the transition - REQUIRED abstract method.""" # Your processing logic here return transition def transform_features(self, features): """Declare how this step transforms feature shapes/types - REQUIRED abstract method.""" return features # Most processors return features unchanged ``` `__call__` is the core of your processor step. It takes an `EnvTransition` and returns a modified `EnvTransition`. `transform_features` is used to declare how this step transforms feature shapes/types. ### DataProcessorPipeline: The Generic Orchestrator The `DataProcessorPipeline[TInput, TOutput]` chains multiple `ProcessorStep` instances together: ```python from lerobot.processor import RobotProcessorPipeline, PolicyProcessorPipeline # For robot hardware (unbatched data) robot_processor = RobotProcessorPipeline[dict[str, Any], dict[str, Any]]( steps=[step1, step2, step3], name="robot_pipeline" ) # For model training/inference (batched data) policy_processor = PolicyProcessorPipeline[dict[str, Any], dict[str, Any]]( steps=[step1, step2, step3], name="policy_pipeline" ) ``` ## RobotProcessorPipeline vs PolicyProcessorPipeline The key distinction is in the data structures they handle: | Aspect | RobotProcessorPipeline | PolicyProcessorPipeline | | --------------- | -------------------------------------------- | ---------------------------------------- | | **Input** | `dict[str, Any]` - Individual robot values | `dict[str, Any]` - Batched tensors | | **Output** | `dict[str, Any]` - Individual robot commands | `torch.Tensor` - Policy predictions | | **Use Case** | Real-time robot control | Model training/inference | | **Data Format** | Unbatched, heterogeneous | Batched, homogeneous | | **Examples** | `{"joint_1": 0.5}` | `{"observation.state": tensor([[0.5]])}` | **Use `RobotProcessorPipeline`** for robot hardware interfaces: ```python # Robot data structures: dict[str, Any] for observations and actions robot_obs: dict[str, Any] = { "joint_1": 0.5, # Individual joint values "joint_2": -0.3, "camera_0": image_array # Raw camera data } robot_action: dict[str, Any] = { "joint_1": 0.2, # Target joint positions "joint_2": 0.1, "gripper": 0.8 } ``` **Use `PolicyProcessorPipeline`** for model training and batch processing: ```python # Policy data structures: batch dicts and tensors policy_batch: dict[str, Any] = { "observation.state": torch.tensor([[0.5, -0.3]]), # Batched states "observation.images.camera0": torch.tensor(...), # Batched images "action": torch.tensor([[0.2, 0.1, 0.8]]) # Batched actions } policy_action: torch.Tensor = torch.tensor([[0.2, 0.1, 0.8]]) # Model output tensor ``` ## Converter Functions LeRobot provides converter functions to bridge different data formats: ```python from lerobot.processor.converters import ( # Robot hardware converters robot_action_to_transition, # Robot dict → EnvTransition observation_to_transition, # Robot obs → EnvTransition transition_to_robot_action, # EnvTransition → Robot dict # Policy/training converters batch_to_transition, # Batch dict → EnvTransition transition_to_batch, # EnvTransition → Batch dict policy_action_to_transition, # Policy tensor → EnvTransition transition_to_policy_action, # EnvTransition → Policy tensor # Utilities create_transition, # Build transitions with defaults identity_transition # Pass-through converter ) ``` ## Real-World Examples ### Robot Control Pipeline ```python # Phone teleoperation → Robot control (from examples/phone_to_so100/) phone_to_robot = RobotProcessorPipeline[RobotAction, RobotAction]( steps=[ MapPhoneActionToRobotAction(platform=PhoneOS.IOS), # Phone → robot targets EEReferenceAndDelta(kinematics=solver, ...), # Deltas → absolute pose EEBoundsAndSafety(bounds=..., max_step=0.2), # Safety limits InverseKinematicsEEToJoints(kinematics=solver), # Pose → joint angles GripperVelocityToJoint(motor_names=motors), # Gripper control ], to_transition=robot_action_to_transition, to_output=transition_to_robot_action ) # Usage: phone_action → robot_joints phone_input = {"phone.pos": [0.1, 0.2, 0.0], "phone.rot": rotation} robot_joints = phone_to_robot(phone_input) robot.send_action(robot_joints) ``` ### Policy Training Pipeline ```python # Training data preprocessing (optimized order for GPU performance) training_preprocessor = PolicyProcessorPipeline[dict[str, Any], dict[str, Any]]( steps=[ RenameObservationsProcessorStep(rename_map={}), # Standardize keys AddBatchDimensionProcessorStep(), # Add batch dims TokenizerProcessorStep(tokenizer_name="...", ...), # Tokenize language DeviceProcessorStep(device="cuda"), # Move to GPU first ⚡ NormalizerProcessorStep(features=..., stats=...), # Normalize on GPU ⚡ ] ) # Model output postprocessing training_postprocessor = PolicyProcessorPipeline[torch.Tensor, torch.Tensor]( steps=[ DeviceProcessorStep(device="cpu"), # Move to CPU UnnormalizerProcessorStep(features=..., stats=...), # Denormalize ] to_transition=policy_action_to_transition, to_output=transition_to_policy_action, ) ``` ### Mixed Robot + Policy Pipeline ```python # Real deployment: Robot sensors → Model → Robot commands with torch.no_grad(): while not done: # 1. Get robot observation (unbatched) raw_obs = robot.get_observation() # dict[str, Any] # 2. Process for policy (add batching, normalize) policy_input = policy_preprocessor(raw_obs) # Batched dict # 3. Run model policy_output = policy.select_action(policy_input) # Policy tensor # 4. Postprocess for robot (denormalize, convert to dict) robot_action = policy_postprocessor(policy_output) # dict[str, Any] # 5. Send to robot robot.send_action(robot_action) ``` ## Feature Contracts: Shape and Type Transformation Processors don't just transform data - they can also **change the data structure itself**. The `transform_features()` method declares these changes, which is crucial for dataset recording and policy creation. ### Why Feature Contracts Matter When building datasets or policies, LeRobot needs to know: - **What data fields will exist** after processing - **What shapes and types** each field will have - **How to configure models** for the expected data structure ```python # Example: A processor that adds velocity to observations class VelocityProcessor(ObservationProcessorStep): def observation(self, obs): new_obs = obs.copy() if "observation.state" in obs: # concatenate computed velocity field to the state new_obs["observation.state"] = self._compute_velocity(obs["observation.state"]) return new_obs def transform_features(self, features): """Declare the new velocity field we're adding.""" state_feature = features[PipelineFeatureType.OBSERVATION].get("observation.state") if state_feature: double_shape = (state_feature.shape[0] * 2,) if state_feature.shape else (2,) features[PipelineFeatureType.OBSERVATION]["observation.state"] = PolicyFeature( type=FeatureType.STATE, shape=double_shape ) return features ``` ### Feature Specification Functions `create_initial_features()` and `aggregate_pipeline_dataset_features()` solve a critical dataset creation problem: determining the exact final data structure before any data is processed. Since processor pipelines can add new features (like velocity fields), change tensor shapes (like cropping images), or rename keys, datasets need to know the complete output specification upfront to allocate proper storage and define schemas. These functions work together by starting with robot hardware specifications (`create_initial_features()`) then simulating the entire pipeline transformation (`aggregate_pipeline_dataset_features()`) to compute the final feature dictionary that gets passed to `LeRobotDataset.create()`, ensuring perfect alignment between what processors output and what datasets expect to store. ```python from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features # Start with robot's raw features initial_features = create_initial_features( observation=robot.observation_features, # {"joint_1.pos": float, "camera_0": (480,640,3)} action=robot.action_features # {"joint_1.pos": float, "gripper.pos": float} ) # Apply processor pipeline to compute final features final_features = aggregate_pipeline_dataset_features( pipeline=my_processor_pipeline, initial_features=initial_features, use_videos=True ) # Result: Complete feature specification for dataset/policy # { # "observation.state": {"shape": (7,), "dtype": "float32"}, # "observation.images.camera_0": {"shape": (3, 480, 640), "dtype": "uint8"}, # "observation.velocity": {"shape": (7,), "dtype": "float32"}, # Added by processor! # "action": {"shape": (7,), "dtype": "float32"} # } # Use for dataset creation dataset = LeRobotDataset.create( repo_id="my_dataset", features=final_features, # Knows exactly what data to expect ... ) ``` ## Common Processor Steps LeRobot provides many registered processor steps. Here are the most commonly used core processors: ### Essential Processors - **`normalizer_processor`**: Normalize observations/actions using dataset statistics (mean/std or min/max) - **`device_processor`**: Move tensors to CPU/GPU with optional dtype conversion - **`to_batch_processor`**: Add batch dimensions to transitions for model compatibility - **`rename_observations_processor`**: Rename observation keys using mapping dictionaries - **`tokenizer_processor`**: Tokenize natural language task descriptions into tokens and attention masks ### Next Steps - **[Implement Your Own Processor](implement_your_own_processor.mdx)** - Create custom processor steps - **[Debug Your Pipeline](debug_processor_pipeline.mdx)** - Troubleshoot and optimize pipelines - **[Processors for Robots and Teleoperators](processors_robots_teleop.mdx)** - Real-world integration patterns ## Summary Processors solve the data translation problem in robotics by providing: - **Modular transformations**: Composable, reusable processing steps - **Type safety**: Generic pipelines with compile-time checking - **Performance optimization**: GPU-accelerated operations - **Robot/Policy distinction**: Separate pipelines for different data structures - **Comprehensive ecosystem**: 30+ registered processors for common tasks The key insight: `RobotProcessorPipeline` handles unbatched robot hardware data, while `PolicyProcessorPipeline` handles batched model data. Choose the right tool for your data structure!