# RobotProcessor: The Art of Processing Robot Data This tutorial will teach you how to use RobotProcessor, a powerful pipeline system for preprocessing robot observations and postprocessing actions. You'll learn how to create modular, composable data transformations that make your robot learning pipelines more maintainable and reproducible. **You'll learn:** 1. What RobotProcessor is and why it's better than alternatives 2. How to create and compose processor steps 3. How to save, load, and share your processors 4. Advanced features for debugging and customization ## Real-World Scenarios: When You Need RobotProcessor Before diving into code, let's understand the real problems RobotProcessor solves. If you've worked with robots and learned policies, you've likely encountered these challenges: ### Observation Key Mismatches Your environment might output observations with keys like `"rgb_camera_front"` and `"joint_positions"`, but your pre-trained policy expects `"observation.images.wrist"` and `"observation.state"`. You need to rename these keys consistently across your pipeline. ### Image Preprocessing Requirements Your robot's camera captures 1920x1080 RGB images, but your policy was trained on 224x224 cropped images focusing on the workspace. You need to: - Crop out the relevant workspace area - Resize to the expected dimensions - Convert from uint8 [0,255] to float32 [0,1] - Ensure the right channel ordering (HWC vs CHW) - Move everything to GPU efficiently ### Coordinate System Transformations Your policy outputs desired end-effector positions in Cartesian space, but your robot expects joint angles. You need to: - Apply inverse kinematics to convert positions to joint angles - Handle singularities and out-of-reach positions - Smooth the resulting trajectories to avoid jerky movements - Clip values to respect joint limits ### State Augmentation Your environment provides joint positions, but your policy also needs velocities and accelerations. You need to: - Calculate velocities from position differences - Maintain a buffer of past states - Handle episode boundaries correctly ### Multi-Robot Deployment You want to deploy the same policy on different robots with varying: - Camera setups (different resolutions, positions) - Joint configurations (6-DOF vs 7-DOF arms) - Control frequencies (10Hz vs 30Hz) - Safety limits and workspace boundaries ### Data Collection and Training During data collection, you need to: - Record raw sensor data for dataset creation - Apply the same preprocessing as during deployment - Ensure reproducibility across different operators - Share preprocessing configurations with collaborators RobotProcessor solves all these problems by providing a modular, composable system where each transformation is a separate, shareable component. Let's see how. ## Why RobotProcessor? If you've worked with robot learning before, you've likely written preprocessing code like this: ```python # Traditional procedural approach - hard to maintain and share def preprocess_observation(obs, device='cuda'): processed_obs = {} # Process images if "pixels" in obs: for cam_name, img in obs["pixels"].items(): # Convert uint8 [0, 255] to float32 [0, 1] img = img.astype(np.float32) / 255.0 # Convert from HWC to CHW format img = np.transpose(img, (2, 0, 1)) # Add batch dimension img = np.expand_dims(img, axis=0) # Convert to torch and move to device img = torch.from_numpy(img).to(device) # Pad to standard size if img.shape[-2:] != (224, 224): img = F.pad(img, (0, 224 - img.shape[-1], 0, 224 - img.shape[-2])) processed_obs[f"observation.images.{cam_name}"] = img # Process state if "agent_pos" in obs: state = torch.from_numpy(obs["agent_pos"]).float() state = state.unsqueeze(0).to(device) processed_obs["observation.state"] = state return processed_obs ``` This approach has several problems: - **Mixed concerns**: Image processing, tensor conversion, device management all in one function - **Hard to test**: Can't test image processing separately from state processing - **Device management scattered**: Easy to forget `.to(device)` calls - **No standardized format**: Each project reinvents the wheel - **Difficult to share**: Can't easily reuse this preprocessing in other projects RobotProcessor solves these issues by providing a declarative pipeline approach where each transformation is a separate, testable, shareable component. ## Understanding EnvTransition and Batch Format RobotProcessor works with two data formats: ### 1. EnvTransition Tuple Format An `EnvTransition` is a 7-tuple that represents a complete transition in the environment: ```python from lerobot.processor.pipeline import TransitionIndex # EnvTransition structure: # (observation, action, reward, done, truncated, info, complementary_data) transition = ( {"observation.image": ..., "observation.state": ...}, # observation at time t [0.1, -0.2, 0.3], # action taken at time t 1.0, # reward received False, # episode done flag False, # episode truncated flag {"success": True}, # additional info from environment {"step_idx": 42} # complementary_data for inter-step communication ) ``` ### 2. Batch Dictionary Format This is the format used by LeRobot datasets and replay buffers: ```python # Batch dictionary format (used by LeRobotDataset, ReplayBuffer) batch = { "observation.image": torch.tensor(...), # Image observations "observation.state": torch.tensor(...), # State observations "action": torch.tensor(...), # Actions "next.reward": torch.tensor(...), # Rewards "next.done": torch.tensor(...), # Done flags "next.truncated": torch.tensor(...), # Truncated flags "info": {...}, # Info dictionary # Additional keys are preserved but ignored during conversion } ``` ### Automatic Format Conversion RobotProcessor automatically handles both formats: ```python from lerobot.processor.pipeline import RobotProcessor from lerobot.processor.observation_processor import ImageProcessor processor = RobotProcessor([ImageProcessor()]) # Works with EnvTransition tuples transition = ({"pixels": image_array}, None, 0.0, False, False, {}, {}) processed_transition = processor(transition) # Returns EnvTransition tuple # Also works with batch dictionaries batch = { "observation.pixels": image_tensor, "action": action_tensor, "next.reward": reward_tensor, "next.done": done_tensor, "next.truncated": truncated_tensor, "info": info_dict } processed_batch = processor(batch) # Returns batch dictionary ``` ### Using TransitionIndex Instead of using magic numbers to access tuple elements, use the `TransitionIndex` enum: ```python from lerobot.processor.pipeline import TransitionIndex # Bad - using magic numbers obs = transition[0] action = transition[1] # Good - using TransitionIndex obs = transition[TransitionIndex.OBSERVATION] action = transition[TransitionIndex.ACTION] reward = transition[TransitionIndex.REWARD] done = transition[TransitionIndex.DONE] truncated = transition[TransitionIndex.TRUNCATED] info = transition[TransitionIndex.INFO] comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] ``` ### Default Conversion Functions RobotProcessor uses these default conversion functions: ```python def _default_batch_to_transition(batch): """Default conversion from batch dict to EnvTransition tuple.""" # Extract observation keys (anything starting with "observation.") observation_keys = {k: v for k, v in batch.items() if k.startswith("observation.")} observation = None if observation_keys: observation = {} # Keep observation.* keys as-is (don't remove "observation." prefix) for key, value in observation_keys.items(): observation[key] = value return ( observation, batch.get("action"), batch.get("next.reward", 0.0), # Note: "next.reward" not "reward" batch.get("next.done", False), # Note: "next.done" not "done" batch.get("next.truncated", False), # Note: "next.truncated" not "truncated" batch.get("info", {}), {}, # Empty complementary_data ) def _default_transition_to_batch(transition): """Default conversion from EnvTransition tuple to batch dict.""" obs, action, reward, done, truncated, info, _ = transition batch = { "action": action, "next.reward": reward, # Note: "next.reward" not "reward" "next.done": done, # Note: "next.done" not "done" "next.truncated": truncated, # Note: "next.truncated" not "truncated" "info": info, } # Flatten observation dict (keep observation.* keys as-is) if isinstance(obs, dict): for key, value in obs.items(): batch[key] = value return batch ``` ### Custom Conversion Functions You can customize how RobotProcessor converts between formats: ```python def custom_batch_to_transition(batch): """Custom conversion from batch dict to EnvTransition tuple.""" # Extract observation keys (anything starting with "observation.") observation = {k: v for k, v in batch.items() if k.startswith("observation.")} return ( observation, batch.get("action"), batch.get("reward", 0.0), # Use "reward" instead of "next.reward" batch.get("done", False), # Use "done" instead of "next.done" batch.get("truncated", False), batch.get("info", {}), batch.get("complementary_data", {}) ) def custom_transition_to_batch(transition): """Custom conversion from EnvTransition tuple to batch dict.""" obs, action, reward, done, truncated, info, comp_data = transition batch = { "action": action, "reward": reward, # Use "reward" instead of "next.reward" "done": done, # Use "done" instead of "next.done" "truncated": truncated, "info": info, } # Flatten observation dict if obs: batch.update(obs) return batch # Use custom converters processor = RobotProcessor( steps=[ImageProcessor()], to_transition=custom_batch_to_transition, to_output=custom_transition_to_batch ) ``` ### Advanced: Controlling Output Format with `to_output` The `to_output` function determines what format is returned when you call the processor with a batch dictionary. Sometimes you want to output `EnvTransition` tuples even when you input batch dictionaries: ```python # Identity function to always return EnvTransition tuples def keep_as_transition(transition): """Always return EnvTransition tuple regardless of input format.""" return transition # Processor that always outputs EnvTransition tuples processor = RobotProcessor( steps=[ImageProcessor(), StateProcessor()], to_output=keep_as_transition # Always return tuple format ) # Even when called with batch dict, returns EnvTransition tuple batch = { "observation.image": image_tensor, "action": action_tensor, "next.reward": reward_tensor, "next.done": done_tensor, "next.truncated": truncated_tensor, "info": info_dict } result = processor(batch) # Returns EnvTransition tuple, not batch dict! print(type(result)) # ``` ### Real-World Example: Environment Interaction This is particularly useful for environment interaction where you want consistent tuple output: ```python from lerobot.processor.observation_processor import VanillaObservationProcessor # Create processor that always outputs EnvTransition for environment interaction # This avoids format conversion overhead during real-time control env_processor = RobotProcessor( [VanillaObservationProcessor()], to_transition=lambda x: x, # Pass through - no conversion needed to_output=lambda x: x, # Always return EnvTransition tuple ) # Environment interaction loop env = make_env() obs, info = env.reset() for step in range(1000): # Create transition - input is already in tuple format transition = (obs, None, 0.0, False, False, info, {"step": step}) # Process - output is guaranteed to be EnvTransition tuple processed_transition = env_processor(transition) processed_obs = processed_transition[TransitionIndex.OBSERVATION] # Use with policy action = policy.select_action(processed_obs) obs, reward, done, truncated, info = env.step(action) if done or truncated: break ``` ### When to Use Different Output Formats **Use EnvTransition tuple output when:** - Environment interaction and real-time control - You need to access individual transition components frequently - Performance is critical (avoids dictionary creation overhead) - Working with gym environments that expect tuple format **Use batch dictionary output when:** - Training with LeRobot datasets - Working with DataLoaders and batched processing - Interfacing with existing LeRobot training code - You need the standardized "next.*" key format ```python # For environment interaction - use tuple output env_processor = RobotProcessor( steps=[ImageProcessor(), StateProcessor()], to_output=lambda x: x # Return EnvTransition tuple ) # For training - use batch output (default) train_processor = RobotProcessor( steps=[ImageProcessor(), StateProcessor(), NormalizerProcessor(...)], # to_output defaults to _default_transition_to_batch ) # Training loop for batch in dataloader: processed_batch = train_processor(batch) # Returns batch dict loss = model.compute_loss(processed_batch) # Environment loop for step in range(1000): transition = (obs, None, 0.0, False, False, info, {}) processed_transition = env_processor(transition) # Returns EnvTransition tuple obs = processed_transition[TransitionIndex.OBSERVATION] action = policy.select_action(obs) ``` ### Why "next.reward", "next.done", "next.truncated"? The default conversion uses "next.*" prefixes because this matches the standard format used by LeRobot datasets and follows the convention that rewards, done flags, and truncated flags are the result of taking an action (i.e., they come from the "next" state): ```python # Standard RL transition format # (s_t, a_t, r_{t+1}, done_{t+1}, truncated_{t+1}) # ^ ^ ^ ^ ^ # | | | | | # | | | | +-- Result of action a_t # | | | +-- Result of action a_t # | | +-- Result of action a_t # | +-- Action taken in state s_t # +-- State at time t batch = { "observation.state": s_t, "action": a_t, "next.reward": r_{t+1}, # Reward received after taking action "next.done": done_{t+1}, # Done flag after taking action "next.truncated": truncated_{t+1}, # Truncated flag after taking action } ``` ## Your First RobotProcessor Let's create a processor that properly handles image and state preprocessing: ```python from lerobot.processor.pipeline import RobotProcessor, TransitionIndex from lerobot.processor.observation_processor import ImageProcessor, StateProcessor import numpy as np # Create sample data observation = { "pixels": { "camera_front": np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8), "camera_side": np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) }, "agent_pos": np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7], dtype=np.float32) } # Create a full transition transition = (observation, None, 0.0, False, False, {}, {}) # Create and use the processor processor = RobotProcessor([ ImageProcessor(), # Converts uint8[0,255] to float32[0,1], HWC to CHW, adds batch dim StateProcessor(), # Converts numpy arrays to torch tensors, adds batch dim ]) processed_transition = processor(transition) processed_obs = processed_transition[TransitionIndex.OBSERVATION] # Check the results print("Original keys:", observation.keys()) print("Processed keys:", processed_obs.keys()) print("Image shape:", processed_obs["observation.images.camera_front"].shape) # [1, 3, 480, 640] print("Image dtype:", processed_obs["observation.images.camera_front"].dtype) # torch.float32 print("Image range:", processed_obs["observation.images.camera_front"].min().item(), "to", processed_obs["observation.images.camera_front"].max().item()) # 0.0 to 1.0 ``` ## Working with LeRobot Datasets and Replay Buffers RobotProcessor seamlessly works with LeRobot's batch dictionary format: ```python from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.processor.pipeline import RobotProcessor from lerobot.processor.normalize_processor import NormalizerProcessor from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature # Load a dataset dataset = LeRobotDataset("lerobot/pusht") # Define features and normalization features = { "observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96)), "observation.state": PolicyFeature(FeatureType.STATE, (2,)), "action": PolicyFeature(FeatureType.ACTION, (2,)), } norm_map = { FeatureType.VISUAL: NormalizationMode.MEAN_STD, FeatureType.STATE: NormalizationMode.MIN_MAX, FeatureType.ACTION: NormalizationMode.MEAN_STD, } # Create processor with normalization processor = RobotProcessor([ NormalizerProcessor.from_lerobot_dataset(dataset, features, norm_map), ]) # Process a batch from the dataset batch = dataset[0] # Get first batch print("Original batch keys:", list(batch.keys())) print("Original image shape:", batch["observation.image"].shape) # Process the batch - automatically converts to/from batch format processed_batch = processor(batch) print("Processed batch keys:", list(processed_batch.keys())) print("Processed image range:", processed_batch["observation.image"].min().item(), "to", processed_batch["observation.image"].max().item()) # Use with DataLoader from torch.utils.data import DataLoader dataloader = DataLoader(dataset, batch_size=32, shuffle=True) for batch in dataloader: # Process entire batch at once processed_batch = processor(batch) # Use processed batch for training # model.train_step(processed_batch) break ``` ## Integration with Replay Buffers RobotProcessor works great with replay buffers for online learning: ```python from lerobot.common.utils.buffer import ReplayBuffer from lerobot.processor.pipeline import RobotProcessor from lerobot.processor.device_processor import DeviceProcessor # Create replay buffer buffer = ReplayBuffer(capacity=10000) # Create processor for online data online_processor = RobotProcessor([ ImageProcessor(), StateProcessor(), DeviceProcessor(device="cuda"), ]) # During environment interaction env = make_env() obs, info = env.reset() for step in range(1000): # Raw environment observation transition = (obs, None, 0.0, False, False, info, {}) # Process for policy input processed_transition = online_processor(transition) processed_obs = processed_transition[TransitionIndex.OBSERVATION] # Get action from policy action = policy.select_action(processed_obs) # Execute action next_obs, reward, done, truncated, info = env.step(action) # Store in replay buffer (can store either format) buffer.add(obs, action, reward, next_obs, done, truncated, info) obs = next_obs if done or truncated: obs, info = env.reset() # Sample and process batches for training batch = buffer.sample(batch_size=32) processed_batch = online_processor(batch) # Processes entire batch ``` ## Creating Custom Steps: The ProcessorStep Protocol A processor step must follow certain conventions. Let's create a complete example that shows all required and optional methods: ```python from dataclasses import dataclass from typing import Any, Dict import torch import torch.nn.functional as F @dataclass class ImagePadder: """Pad images to a standard size for batch processing.""" target_height: int = 224 target_width: int = 224 pad_value: float = 0.0 def __call__(self, transition: EnvTransition) -> EnvTransition: """Main processing method - required for all steps.""" obs = transition[TransitionIndex.OBSERVATION] if obs is None: return transition # Process all image observations for key in list(obs.keys()): if key.startswith("observation.images."): img = obs[key] # Calculate padding _, _, h, w = img.shape pad_h = max(0, self.target_height - h) pad_w = max(0, self.target_width - w) if pad_h > 0 or pad_w > 0: # Pad symmetrically pad_left = pad_w // 2 pad_right = pad_w - pad_left pad_top = pad_h // 2 pad_bottom = pad_h - pad_top img = F.pad(img, (pad_left, pad_right, pad_top, pad_bottom), mode='constant', value=self.pad_value) obs[key] = img # Return modified transition return (obs, *transition[1:]) def get_config(self) -> Dict[str, Any]: """Return JSON-serializable configuration - required for save/load.""" return { "target_height": self.target_height, "target_width": self.target_width, "pad_value": self.pad_value } def state_dict(self) -> Dict[str, torch.Tensor]: """Return tensor state - only include torch.Tensor objects!""" # This step has no learnable parameters return {} def load_state_dict(self, state: Dict[str, torch.Tensor]) -> None: """Load tensor state - required if state_dict returns non-empty dict.""" # Nothing to load for this step pass def reset(self) -> None: """Reset internal state at episode boundaries - required for stateful steps.""" # This step is stateless, so nothing to reset pass ``` ### Important: get_config vs state_dict These two methods serve different purposes and it's crucial to use them correctly: ```python @dataclass class AdaptiveNormalizer: """Example showing proper use of get_config and state_dict.""" learning_rate: float = 0.01 epsilon: float = 1e-8 def __post_init__(self): self.running_mean = None self.running_var = None self.num_samples = 0 # Python int, not tensor def get_config(self) -> Dict[str, Any]: """ONLY Python objects that can be JSON serialized!""" return { "learning_rate": self.learning_rate, # float ✓ "epsilon": self.epsilon, # float ✓ "num_samples": self.num_samples, # int ✓ # "running_mean": self.running_mean, # torch.Tensor ✗ WRONG! } def state_dict(self) -> Dict[str, torch.Tensor]: """ONLY torch.Tensor objects!""" if self.running_mean is None: return {} return { "running_mean": self.running_mean, # torch.Tensor ✓ "running_var": self.running_var, # torch.Tensor ✓ # "num_samples": self.num_samples, # int ✗ WRONG! # Instead, convert to tensor if needed: "num_samples_tensor": torch.tensor(self.num_samples) } def load_state_dict(self, state: Dict[str, torch.Tensor]) -> None: """Load tensors and convert back to Python types if needed.""" self.running_mean = state.get("running_mean") self.running_var = state.get("running_var") if "num_samples_tensor" in state: self.num_samples = int(state["num_samples_tensor"].item()) ``` ## Using Complementary Data for Inter-Step Communication The `complementary_data` field is perfect for passing information between steps without polluting the info dictionary: ```python @dataclass class ImageStatisticsCalculator: """Calculate image statistics and pass to next steps.""" def __call__(self, transition: EnvTransition) -> EnvTransition: obs = transition[TransitionIndex.OBSERVATION] comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] or {} if obs is None: return transition # Calculate statistics for all images image_stats = {} for key in obs: if key.startswith("observation.images."): img = obs[key] stats = { "mean": img.mean().item(), "std": img.std().item(), "min": img.min().item(), "max": img.max().item(), } image_stats[key] = stats # Store in complementary_data for next steps comp_data["image_statistics"] = image_stats # Return transition with updated complementary_data return ( obs, transition[TransitionIndex.ACTION], transition[TransitionIndex.REWARD], transition[TransitionIndex.DONE], transition[TransitionIndex.TRUNCATED], transition[TransitionIndex.INFO], comp_data # Updated complementary_data ) @dataclass class AdaptiveBrightnessAdjuster: """Adjust brightness based on statistics from previous step.""" target_brightness: float = 0.5 def __call__(self, transition: EnvTransition) -> EnvTransition: obs = transition[TransitionIndex.OBSERVATION] comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] or {} if obs is None or "image_statistics" not in comp_data: return transition # Use statistics from previous step image_stats = comp_data["image_statistics"] for key in obs: if key.startswith("observation.images.") and key in image_stats: current_mean = image_stats[key]["mean"] brightness_adjust = self.target_brightness - current_mean # Adjust brightness obs[key] = torch.clamp(obs[key] + brightness_adjust, 0, 1) return (obs, *transition[1:]) # Use them together processor = RobotProcessor([ ImageProcessor(), ImageStatisticsCalculator(), # Calculates stats AdaptiveBrightnessAdjuster(), # Uses stats from previous step ]) ``` ## Loading Processors with Overrides: Handling Non-Serializable Objects One of the most powerful features of RobotProcessor is the ability to override step configurations when loading from saved checkpoints. This is particularly useful for handling non-serializable objects like environment instances, database connections, or hardware interfaces that can't be saved to JSON. ### The Problem: Non-Serializable Parameters Imagine you have a processor step that needs a live gym environment: ```python from dataclasses import dataclass import gym @ProcessorStepRegistry.register("action_repeat_step") @dataclass class ActionRepeatStep: """Step that repeats actions using environment feedback.""" repeat_count: int = 3 env: gym.Env = None # This can't be serialized to JSON! def __call__(self, transition: EnvTransition) -> EnvTransition: obs, action, reward, done, truncated, info, comp_data = transition if self.env is not None and action is not None: # Repeat action multiple times in environment total_reward = 0 for _ in range(self.repeat_count): _, r, d, t, _ = self.env.step(action) total_reward += r if d or t: break reward = total_reward return (obs, action, reward, done, truncated, info, comp_data) def get_config(self) -> dict[str, Any]: # Note: env is NOT included because it's not serializable return {"repeat_count": self.repeat_count} def state_dict(self) -> dict[str, torch.Tensor]: return {} def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: pass ``` If you try to save and load this processor normally, the `env` parameter will be lost because it can't be serialized to JSON. ### The Solution: Override Parameters The `overrides` parameter in `from_pretrained()` allows you to provide non-serializable objects when loading: ```python from lerobot.processor.pipeline import RobotProcessor # Create processor with environment env = gym.make("CartPole-v1") action_step = ActionRepeatStep(repeat_count=2, env=env) processor = RobotProcessor([action_step], name="CartPoleProcessor") # Save the processor (env won't be saved) processor.save_pretrained("./cartpole_processor") # Later, load with environment override new_env = gym.make("CartPole-v1") loaded_processor = RobotProcessor.from_pretrained( "./cartpole_processor", overrides={ "action_repeat_step": {"env": new_env} # Provide the environment } ) ``` ### How Overrides Work The `overrides` parameter is a dictionary where: - **Keys** are step identifiers (class names for unregistered steps, registry names for registered steps) - **Values** are dictionaries of parameter overrides that get merged with saved configurations ```python overrides = { "StepClassName": {"param1": "new_value", "param2": 42}, "registered_step_name": {"device": "cuda", "learning_rate": 0.01} } ``` The loading process: 1. Load saved configuration from JSON 2. For each step, check if overrides exist for that step 3. Merge override parameters with saved parameters (overrides take precedence) 4. Instantiate the step with merged configuration 5. Load any saved tensor state ### Real-World Examples #### Example 1: Environment-Dependent Steps ```python @ProcessorStepRegistry.register("ik_solver_step") @dataclass class InverseKinematicsStep: """Convert Cartesian positions to joint angles.""" robot_model: str = "ur5" solver_timeout: float = 0.1 kinematics_solver: Any = None # Non-serializable solver instance def __call__(self, transition: EnvTransition) -> EnvTransition: obs, action, reward, done, truncated, info, comp_data = transition if self.kinematics_solver is not None and action is not None: # Convert Cartesian action to joint angles joint_angles = self.kinematics_solver.solve(action, timeout=self.solver_timeout) action = joint_angles return (obs, action, reward, done, truncated, info, comp_data) def get_config(self) -> dict[str, Any]: return { "robot_model": self.robot_model, "solver_timeout": self.solver_timeout } # Save processor without solver processor = RobotProcessor([InverseKinematicsStep(robot_model="ur5")]) processor.save_pretrained("./robot_processor") # Load with solver instance from robotics_toolbox import URKinematics solver = URKinematics("ur5") loaded_processor = RobotProcessor.from_pretrained( "./robot_processor", overrides={ "ik_solver_step": { "kinematics_solver": solver, "solver_timeout": 0.05 # Also override timeout } } ) ``` #### Example 2: Device and Hardware Configuration ```python @ProcessorStepRegistry.register("camera_capture_step") @dataclass class CameraCaptureStep: """Capture images from physical camera.""" camera_id: int = 0 resolution: tuple = (640, 480) camera_interface: Any = None # Hardware interface def get_config(self) -> dict[str, Any]: return { "camera_id": self.camera_id, "resolution": self.resolution } # Deploy on different robots with different camera setups # Robot A camera_a = CameraInterface("/dev/video0") processor_a = RobotProcessor.from_pretrained( "shared/vision_processor", overrides={ "camera_capture_step": { "camera_interface": camera_a, "resolution": (1920, 1080) # High-res camera } } ) # Robot B camera_b = CameraInterface("/dev/video1") processor_b = RobotProcessor.from_pretrained( "shared/vision_processor", overrides={ "camera_capture_step": { "camera_interface": camera_b, "resolution": (640, 480) # Lower-res camera } } ) ``` #### Example 3: Multiple Environment Deployment ```python # Training processor that works with simulation @ProcessorStepRegistry.register("physics_validator") @dataclass class PhysicsValidatorStep: """Validate actions against physics constraints.""" max_force: float = 100.0 physics_engine: Any = None def get_config(self) -> dict[str, Any]: return {"max_force": self.max_force} # Different physics engines for different environments import pybullet as pb import mujoco # Simulation deployment sim_engine = pb.connect(pb.DIRECT) sim_processor = RobotProcessor.from_pretrained( "shared/control_processor", overrides={ "physics_validator": { "physics_engine": sim_engine, "max_force": 150.0 # Higher limits in sim } } ) # Real robot deployment real_engine = RealRobotInterface() real_processor = RobotProcessor.from_pretrained( "shared/control_processor", overrides={ "physics_validator": { "physics_engine": real_engine, "max_force": 50.0 # Conservative limits on real robot } } ) ``` ### Override Key Matching Rules The override system uses exact string matching: #### For Registered Steps Use the registry name (the string passed to `@ProcessorStepRegistry.register()`): ```python @ProcessorStepRegistry.register("my_custom_step") class MyStep: pass # Use registry name in overrides overrides = {"my_custom_step": {"param": "value"}} ``` #### For Unregistered Steps Use the exact class name: ```python class MyUnregisteredStep: pass # Use class name in overrides overrides = {"MyUnregisteredStep": {"param": "value"}} ``` ### Error Handling and Validation The override system provides helpful error messages: #### Invalid Override Keys ```python # This will raise KeyError with helpful message overrides = {"NonExistentStep": {"param": "value"}} try: processor = RobotProcessor.from_pretrained("./processor", overrides=overrides) except KeyError as e: print(e) # Output: Override keys ['NonExistentStep'] do not match any step in the saved configuration. # Available step keys: ['ActualStepName', 'AnotherStepName'] ``` #### Instantiation Errors ```python # Invalid parameter types are caught overrides = {"MyStep": {"numeric_param": "not_a_number"}} try: processor = RobotProcessor.from_pretrained("./processor", overrides=overrides) except ValueError as e: print(e) # Output: Failed to instantiate processor step 'MyStep' with config: {...} ``` ### Multiple Steps with Same Class Name When you have multiple steps of the same class, all instances get the same override: ```python step1 = MyStep(param=1) step2 = MyStep(param=2) processor = RobotProcessor([step1, step2]) # Both steps will get the override overrides = {"MyStep": {"param": 999}} loaded = RobotProcessor.from_pretrained("./processor", overrides=overrides) # Both steps now have param=999 ``` To override steps individually, use different classes or register with different names: ```python @ProcessorStepRegistry.register("step_1") class MyStep: pass @ProcessorStepRegistry.register("step_2") class MyStep: # Same class, different registry names pass # Now you can override them separately overrides = { "step_1": {"param": 1}, "step_2": {"param": 2} } ``` ### Best Practices for Overrides #### 1. Design Steps for Overrides When creating steps that need non-serializable objects, design them with overrides in mind: ```python @dataclass class WellDesignedStep: # Serializable configuration timeout: float = 1.0 retry_count: int = 3 # Non-serializable objects with default None database: Any = None api_client: Any = None def __post_init__(self): # Validate that required non-serializable objects are provided if self.database is None: raise ValueError("database must be provided via overrides") def get_config(self) -> dict[str, Any]: # Only include serializable parameters return { "timeout": self.timeout, "retry_count": self.retry_count } ``` #### 2. Use Registry Names for Clarity Register steps with descriptive names to make overrides clearer: ```python @ProcessorStepRegistry.register("robot_arm_controller") class ArmControlStep: pass @ProcessorStepRegistry.register("gripper_controller") class GripperControlStep: pass # Clear override keys overrides = { "robot_arm_controller": {"joint_limits": arm_limits}, "gripper_controller": {"force_limit": gripper_force} } ``` #### 3. Document Override Requirements Include clear documentation about what overrides are needed: ```python @ProcessorStepRegistry.register("vision_processor") class VisionProcessor: """Process camera images for robot vision. Required overrides when loading: camera_interface: Hardware camera interface object Optional overrides: resolution: Camera resolution tuple (default: (640, 480)) fps: Camera frame rate (default: 30) """ camera_interface: Any = None resolution: tuple = (640, 480) fps: int = 30 ``` #### 4. Environment-Specific Configuration Files Create configuration helpers for different deployment environments: ```python # config/simulation.py def get_simulation_overrides(): return { "camera_step": {"camera_interface": SimCamera()}, "physics_step": {"engine": SimPhysics()}, "control_step": {"safety_limits": False} } # config/production.py def get_production_overrides(): return { "camera_step": {"camera_interface": RealCamera("/dev/video0")}, "physics_step": {"engine": RealPhysics()}, "control_step": {"safety_limits": True} } # Usage from config.production import get_production_overrides processor = RobotProcessor.from_pretrained( "shared/processor", overrides=get_production_overrides() ) ``` ### Integration with Hub Sharing Overrides work seamlessly with Hugging Face Hub sharing: ```python # Save processor without non-serializable objects processor.push_to_hub("my-lab/robot-processor") # Anyone can load and provide their own environment import gym local_env = gym.make("MyRobotEnv-v1") processor = RobotProcessor.from_pretrained( "my-lab/robot-processor", overrides={"env_step": {"env": local_env}} ) ``` This enables sharing of preprocessing logic while allowing each user to provide their own environment-specific dependencies. ## Complete Example: Device-Aware Processing Pipeline Here's a complete example showing proper device management and all features: ```python from lerobot.processor.pipeline import RobotProcessor, ProcessorStepRegistry, TransitionIndex import torch import torch.nn.functional as F import numpy as np @ProcessorStepRegistry.register("device_mover") @dataclass class DeviceMover: """Move all tensors to specified device.""" device: str = "cuda" def __call__(self, transition: EnvTransition) -> EnvTransition: obs = transition[TransitionIndex.OBSERVATION] if obs is None: return transition # Move all tensor observations to device for key, value in obs.items(): if isinstance(value, torch.Tensor): obs[key] = value.to(self.device) # Also handle action if present action = transition[TransitionIndex.ACTION] if action is not None and isinstance(action, torch.Tensor): action = action.to(self.device) return (obs, action, *transition[2:]) return (obs, *transition[1:]) def get_config(self) -> Dict[str, Any]: return {"device": str(self.device)} @ProcessorStepRegistry.register("running_normalizer") @dataclass class RunningNormalizer: """Normalize using running statistics with proper device handling.""" feature_dim: int momentum: float = 0.1 def __post_init__(self): # Initialize as None - will be created on first call with correct device self.running_mean = None self.running_var = None self.initialized = False def __call__(self, transition: EnvTransition) -> EnvTransition: obs = transition[TransitionIndex.OBSERVATION] if obs is None or "observation.state" not in obs: return transition state = obs["observation.state"] # Initialize on first call with correct device if not self.initialized: device = state.device self.running_mean = torch.zeros(self.feature_dim, device=device) self.running_var = torch.ones(self.feature_dim, device=device) self.initialized = True # Update statistics with torch.no_grad(): batch_mean = state.mean(dim=0) batch_var = state.var(dim=0, unbiased=False) self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * batch_mean self.running_var = (1 - self.momentum) * self.running_var + self.momentum * batch_var # Normalize state_normalized = (state - self.running_mean) / (self.running_var + 1e-8).sqrt() obs["observation.state"] = state_normalized return (obs, *transition[1:]) def get_config(self) -> Dict[str, Any]: return { "feature_dim": self.feature_dim, "momentum": self.momentum, "initialized": self.initialized } def state_dict(self) -> Dict[str, torch.Tensor]: if not self.initialized: return {} return { "running_mean": self.running_mean, "running_var": self.running_var } def load_state_dict(self, state: Dict[str, torch.Tensor]) -> None: if state: self.running_mean = state["running_mean"] self.running_var = state["running_var"] self.initialized = True def reset(self) -> None: # Don't reset statistics - they persist across episodes pass # Create a complete pipeline processor = RobotProcessor([ ImageProcessor(), # Convert images to float32 [0,1] StateProcessor(), # Convert states to torch tensors ImagePadder(224, 224), # Pad images to standard size DeviceMover("cuda"), # Move everything to GPU RunningNormalizer(7), # Normalize states ], name="CompletePreprocessor") # The processor handles device transfers automatically processor = processor.to("cuda") # Moves all stateful components to GPU # Use it obs = { "pixels": {"cam": np.random.randint(0, 255, (200, 300, 3), dtype=np.uint8)}, "agent_pos": np.random.randn(7).astype(np.float32) } transition = (obs, None, 0.0, False, False, {}, {}) # Everything is processed and on GPU processed = processor(transition) print(processed[TransitionIndex.OBSERVATION]["observation.images.cam"].device) # cuda:0 ``` ## Solving Real-World Problems with RobotProcessor Let's see how RobotProcessor elegantly solves the scenarios we discussed earlier: ### Renaming Observation Keys When your environment and policy speak different "languages": ```python @ProcessorStepRegistry.register("key_remapper") @dataclass class KeyRemapper: """Rename observation keys to match policy expectations.""" key_mapping: Dict[str, str] = field(default_factory=lambda: { "rgb_camera_front": "observation.images.wrist", "joint_positions": "observation.state", "gripper_state": "observation.gripper" }) def __call__(self, transition: EnvTransition) -> EnvTransition: obs = transition[TransitionIndex.OBSERVATION] if obs is None: return transition # Create new observation with renamed keys renamed_obs = {} for old_key, new_key in self.key_mapping.items(): if old_key in obs: renamed_obs[new_key] = obs[old_key] # Keep any unmapped keys as-is for key, value in obs.items(): if key not in self.key_mapping: renamed_obs[key] = value return (renamed_obs, *transition[1:]) ``` ### Workspace-Focused Image Processing When you need to crop and resize images to focus on the manipulation workspace: ```python @ProcessorStepRegistry.register("workspace_cropper") @dataclass class WorkspaceCropper: """Crop and resize images to focus on robot workspace.""" crop_bbox: Tuple[int, int, int, int] = (400, 200, 1200, 800) # (x1, y1, x2, y2) output_size: Tuple[int, int] = (224, 224) def __call__(self, transition: EnvTransition) -> EnvTransition: obs = transition[TransitionIndex.OBSERVATION] if obs is None: return transition for key in list(obs.keys()): if key.startswith("observation.images."): img = obs[key] # Crop to workspace x1, y1, x2, y2 = self.crop_bbox img_cropped = img[:, :, y1:y2, x1:x2] # Resize to expected dimensions img_resized = F.interpolate( img_cropped, size=self.output_size, mode='bilinear', align_corners=False ) obs[key] = img_resized return (obs, *transition[1:]) ``` ### Building Complete Pipelines for Different Robots Now you can compose these steps into robot-specific pipelines: ```python # Pipeline for Robot A with wrist camera robot_a_processor = RobotProcessor([ # Observation preprocessing KeyRemapper({ "wrist_rgb": "observation.images.wrist", "arm_joints": "observation.state" }), ImageProcessor(), WorkspaceCropper(crop_bbox=(300, 100, 900, 700), output_size=(224, 224)), StateProcessor(), VelocityCalculator(state_key="observation.state"), DeviceMover("cuda"), # Action postprocessing ActionSmoother(alpha=0.2), ], name="RobotA_ACT_Processor") # Pipeline for Robot B with overhead camera robot_b_processor = RobotProcessor([ # Different key mapping for Robot B KeyRemapper({ "overhead_cam": "observation.images.wrist", # Reuse same policy! "joint_encoders": "observation.state" }), ImageProcessor(), WorkspaceCropper(crop_bbox=(100, 50, 1100, 950), output_size=(224, 224)), StateProcessor(), VelocityCalculator(state_key="observation.state"), DeviceMover("cuda"), ActionSmoother(alpha=0.3), # Different smoothing ], name="RobotB_ACT_Processor") # Share processors with the community robot_a_processor.push_to_hub("my-lab/robot-a-act-processor") robot_b_processor.push_to_hub("my-lab/robot-b-act-processor") # Now anyone can use the same preprocessing processor = RobotProcessor.from_pretrained("my-lab/robot-a-act-processor") ``` The beauty of this approach is that: 1. **Each transformation is tested independently** - The IK solver can be validated separately from image cropping 2. **Configurations are shareable** - Other labs can use your exact preprocessing setup 3. **Pipelines are composable** - Mix and match steps for different robots 4. **Everything is version controlled** - Use Hub revisions to track changes ## Best Practices for Processor Steps ### 1. Always Check for None ```python def __call__(self, transition: EnvTransition) -> EnvTransition: obs = transition[TransitionIndex.OBSERVATION] # Always check if observation exists if obs is None: return transition # Also check for specific keys if "observation.state" not in obs: return transition ``` ### 2. Preserve Transition Structure ```python # Good - preserve all elements return (modified_obs, *transition[1:]) # Bad - losing information return (modified_obs, None, 0.0, False, False, {}, {}) ``` ### 3. Clone When Storing State ```python def __call__(self, transition: EnvTransition) -> EnvTransition: obs = transition[TransitionIndex.OBSERVATION] if self.store_previous: # Good - clone to avoid reference issues self.previous_state = obs["observation.state"].clone() # Bad - stores reference that might be modified # self.previous_state = obs["observation.state"] ``` ### 4. Handle Device Transfers in state_dict ```python def state_dict(self) -> Dict[str, torch.Tensor]: if self.buffer is None: return {} # Good - clone to avoid memory sharing issues return {"buffer": self.buffer.clone()} ``` ## Complete Policy Example with Pre and Post Processing Here's how to use RobotProcessor in a real robot control loop, showing both tuple and batch formats: ```python from lerobot.processor.pipeline import RobotProcessor, ProcessorStepRegistry, TransitionIndex from lerobot.policies.act.modeling_act import ACTPolicy from pathlib import Path import time # Create preprocessing pipeline preprocessor = RobotProcessor([ ImageProcessor(), StateProcessor(), ImagePadder(target_height=224, target_width=224), DeviceMover("cuda"), RunningNormalizer(feature_dim=14), # For 14-DOF robot ], name="ACTPreprocessor") # Create postprocessing pipeline for actions @ProcessorStepRegistry.register("action_clipper") @dataclass class ActionClipper: """Clip actions to safe ranges.""" min_value: float = -1.0 max_value: float = 1.0 def __call__(self, transition: EnvTransition) -> EnvTransition: action = transition[TransitionIndex.ACTION] if action is not None: action = torch.clamp(action, self.min_value, self.max_value) return (transition[TransitionIndex.OBSERVATION], action, *transition[2:]) return transition def get_config(self) -> Dict[str, Any]: return {"min_value": self.min_value, "max_value": self.max_value} postprocessor = RobotProcessor([ ActionClipper(min_value=-0.5, max_value=0.5), ActionSmoother(alpha=0.3), ], name="ACTPostprocessor") # Load policy policy = ACTPolicy.from_pretrained("lerobot/act_aloha_sim_transfer_cube_human") # Move everything to GPU preprocessor = preprocessor.to("cuda") postprocessor = postprocessor.to("cuda") policy = policy.to("cuda") # Control loop using EnvTransition format env = make_robot_env() obs, info = env.reset() for episode in range(10): print(f"Episode {episode + 1}") for step in range(1000): # Create transition with raw observation transition = (obs, None, 0.0, False, False, info, {"step": step}) # Preprocess - works with tuple format processed_transition = preprocessor(transition) processed_obs = processed_transition[TransitionIndex.OBSERVATION] # Get action from policy with torch.no_grad(): action = policy.select_action(processed_obs) # Postprocess action action_transition = ( processed_obs, action, 0.0, False, False, info, {"raw_action": action.clone()} # Store raw action in complementary_data ) processed_action_transition = postprocessor(action_transition) final_action = processed_action_transition[TransitionIndex.ACTION] # Execute action obs, reward, terminated, truncated, info = env.step(final_action.cpu().numpy()) if terminated or truncated: # Reset at episode boundary preprocessor.reset() postprocessor.reset() obs, info = env.reset() break # Save preprocessor with learned statistics preprocessor.save_pretrained(f"./checkpoints/preprocessor_ep{episode}") # Alternative: Using batch dictionary format # This is useful when integrating with existing LeRobot training code def control_loop_with_batch_format(): """Example using batch dictionary format.""" obs, info = env.reset() for step in range(1000): # Create batch dictionary batch = { "observation.image": torch.from_numpy(obs["pixels"]).unsqueeze(0), "observation.state": torch.from_numpy(obs["agent_pos"]).unsqueeze(0), "action": torch.zeros(1, 7), # Placeholder "next.reward": torch.tensor([0.0]), "next.done": torch.tensor([False]), "next.truncated": torch.tensor([False]), "info": info, } # Preprocess - works with batch format processed_batch = preprocessor(batch) # Get action from policy with torch.no_grad(): action = policy.select_action({ k: v for k, v in processed_batch.items() if k.startswith("observation.") }) # Add action to batch for postprocessing processed_batch["action"] = action # Postprocess final_batch = postprocessor(processed_batch) final_action = final_batch["action"] # Execute action obs, reward, terminated, truncated, info = env.step(final_action.cpu().numpy()) if terminated or truncated: break # Push final version to hub preprocessor.push_to_hub("my-username/act-preprocessor") postprocessor.push_to_hub("my-username/act-postprocessor") ``` ## Debugging and Monitoring Use the full power of `RobotProcessor` for debugging: ```python # Enable detailed logging def log_observation_shapes(step_idx: int, transition: EnvTransition): obs = transition[TransitionIndex.OBSERVATION] if obs: print(f"Step {step_idx} observations:") for key, value in obs.items(): if isinstance(value, torch.Tensor): print(f" {key}: shape={value.shape}, device={value.device}, dtype={value.dtype}") return None processor.register_after_step_hook(log_observation_shapes) # Monitor complementary data flow def monitor_complementary_data(step_idx: int, transition: EnvTransition): comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] if comp_data: print(f"Step {step_idx} complementary_data: {list(comp_data.keys())}") return None processor.register_before_step_hook(monitor_complementary_data) # Validate data integrity def validate_tensors(step_idx: int, transition: EnvTransition): obs = transition[TransitionIndex.OBSERVATION] if obs: for key, value in obs.items(): if isinstance(value, torch.Tensor): if torch.isnan(value).any(): raise ValueError(f"NaN detected in {key} after step {step_idx}") if torch.isinf(value).any(): raise ValueError(f"Inf detected in {key} after step {step_idx}") return None processor.register_after_step_hook(validate_tensors) ``` ## Summary RobotProcessor provides a powerful, modular approach to data preprocessing in robotics: - **Dual format support**: Works seamlessly with both EnvTransition tuples and batch dictionaries - **Automatic format conversion**: Converts between tuple and batch formats as needed - **LeRobot integration**: Native support for LeRobotDataset and ReplayBuffer formats - **Clear separation of concerns**: Each transformation is a separate, testable unit - **Proper state management**: Clear distinction between config (JSON) and state (tensors) - **Device-aware**: Seamless GPU/CPU transfers with `.to(device)` - **Inter-step communication**: Use `complementary_data` for passing information - **Easy sharing**: Push to Hugging Face Hub for reproducibility - **Type safety**: Use `TransitionIndex` instead of magic numbers - **Debugging tools**: Step through transformations and add monitoring hooks - **Flexible conversion**: Customize `to_transition` and `to_output` functions for specific needs Key advantages of the dual format approach: - **Environment interaction**: Use tuple format for real-time robot control - **Training/evaluation**: Use batch format for dataset processing and model training - **Seamless integration**: Same processor works with both formats automatically - **Backward compatibility**: Existing code using either format continues to work - **Output format control**: Use `to_output` to control return format regardless of input format - **Performance optimization**: Avoid unnecessary format conversions during time-critical operations By following these patterns, your preprocessing code becomes more maintainable, shareable, and robust while being compatible with the entire LeRobot ecosystem. For the full API reference, see the [RobotProcessor API documentation](/api/processor).