From cd13f1ecfd88f9c1252d38debe0159d6af2492c1 Mon Sep 17 00:00:00 2001 From: Adil Zouitine Date: Thu, 3 Jul 2025 15:22:21 +0200 Subject: [PATCH] Add RobotProcessor tutorial to documentation - Introduced a new tutorial on using RobotProcessor for preprocessing robot data. - Added a section in the table of contents for easy navigation to the new tutorial. - The tutorial covers key concepts, real-world scenarios, and practical examples for effective use of the RobotProcessor pipeline. --- docs/source/_toctree.yml | 2 + docs/source/processor_tutorial.mdx | 847 +++++++++++++++++++++++++++++ 2 files changed, 849 insertions(+) create mode 100644 docs/source/processor_tutorial.mdx diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index 1af96d79d..50c9b761a 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -13,6 +13,8 @@ title: Cameras - local: integrate_hardware title: Bring Your Own Hardware + - local: processor_tutorial + title: RobotProcessor Pipeline - local: hilserl title: Train a Robot with RL - local: hilserl_sim diff --git a/docs/source/processor_tutorial.mdx b/docs/source/processor_tutorial.mdx new file mode 100644 index 000000000..7646a795e --- /dev/null +++ b/docs/source/processor_tutorial.mdx @@ -0,0 +1,847 @@ +# RobotProcessor: The Art of Processing Robot Data + +This tutorial will teach you how to use RobotProcessor, a powerful pipeline system for preprocessing robot observations and postprocessing actions. You'll learn how to create modular, composable data transformations that make your robot learning pipelines more maintainable and reproducible. + +**You'll learn:** +1. What RobotProcessor is and why it's better than alternatives +2. How to create and compose processor steps +3. How to save, load, and share your processors +4. Advanced features for debugging and customization + +## Real-World Scenarios: When You Need RobotProcessor + +Before diving into code, let's understand the real problems RobotProcessor solves. If you've worked with robots and learned policies, you've likely encountered these challenges: + +### Observation Key Mismatches +Your environment might output observations with keys like `"rgb_camera_front"` and `"joint_positions"`, but your pre-trained policy expects `"observation.images.wrist"` and `"observation.state"`. You need to rename these keys consistently across your pipeline. + +### Image Preprocessing Requirements +Your robot's camera captures 1920x1080 RGB images, but your policy was trained on 224x224 cropped images focusing on the workspace. You need to: +- Crop out the relevant workspace area +- Resize to the expected dimensions +- Convert from uint8 [0,255] to float32 [0,1] +- Ensure the right channel ordering (HWC vs CHW) +- Move everything to GPU efficiently + +### Coordinate System Transformations +Your policy outputs desired end-effector positions in Cartesian space, but your robot expects joint angles. You need to: +- Apply inverse kinematics to convert positions to joint angles +- Handle singularities and out-of-reach positions +- Smooth the resulting trajectories to avoid jerky movements +- Clip values to respect joint limits + +### State Augmentation +Your environment provides joint positions, but your policy also needs velocities and accelerations. You need to: +- Calculate velocities from position differences +- Maintain a buffer of past states +- Handle episode boundaries correctly + +### Multi-Robot Deployment +You want to deploy the same policy on different robots with varying: +- Camera setups (different resolutions, positions) +- Joint configurations (6-DOF vs 7-DOF arms) +- Control frequencies (10Hz vs 30Hz) +- Safety limits and workspace boundaries + +### Data Collection and Training +During data collection, you need to: +- Record raw sensor data for dataset creation +- Apply the same preprocessing as during deployment +- Ensure reproducibility across different operators +- Share preprocessing configurations with collaborators + +RobotProcessor solves all these problems by providing a modular, composable system where each transformation is a separate, shareable component. Let's see how. + +## Why RobotProcessor? + +If you've worked with robot learning before, you've likely written preprocessing code like this: + +```python +# Traditional procedural approach - hard to maintain and share +def preprocess_observation(obs, device='cuda'): + processed_obs = {} + + # Process images + if "pixels" in obs: + for cam_name, img in obs["pixels"].items(): + # Convert uint8 [0, 255] to float32 [0, 1] + img = img.astype(np.float32) / 255.0 + # Convert from HWC to CHW format + img = np.transpose(img, (2, 0, 1)) + # Add batch dimension + img = np.expand_dims(img, axis=0) + # Convert to torch and move to device + img = torch.from_numpy(img).to(device) + # Pad to standard size + if img.shape[-2:] != (224, 224): + img = F.pad(img, (0, 224 - img.shape[-1], 0, 224 - img.shape[-2])) + processed_obs[f"observation.images.{cam_name}"] = img + + # Process state + if "agent_pos" in obs: + state = torch.from_numpy(obs["agent_pos"]).float() + state = state.unsqueeze(0).to(device) + processed_obs["observation.state"] = state + + return processed_obs +``` + +This approach has several problems: +- **Mixed concerns**: Image processing, tensor conversion, device management all in one function +- **Hard to test**: Can't test image processing separately from state processing +- **Device management scattered**: Easy to forget `.to(device)` calls +- **No standardized format**: Each project reinvents the wheel +- **Difficult to share**: Can't easily reuse this preprocessing in other projects + +RobotProcessor solves these issues by providing a declarative pipeline approach where each transformation is a separate, testable, shareable component. + +## Understanding EnvTransition + +Before diving into RobotProcessor, let's understand the data structure it operates on. An `EnvTransition` is a 7-tuple that represents a complete transition in the environment: + +```python +from lerobot.processor.pipeline import TransitionIndex + +# EnvTransition structure: +# (observation, action, reward, done, truncated, info, complementary_data) +transition = ( + {"pixels": ..., "agent_pos": ...}, # observation at time t + [0.1, -0.2, 0.3], # action taken at time t + 1.0, # reward received + False, # episode done flag + False, # episode truncated flag + {"success": True}, # additional info from environment + {"step_idx": 42} # complementary_data for inter-step communication +) +``` + +Each element serves a specific purpose: +1. **observation**: Raw sensor data from the environment (images, states, etc.) +2. **action**: The action command sent to the robot +3. **reward**: Scalar reward signal (for RL tasks) +4. **done**: Boolean indicating natural episode termination +5. **truncated**: Boolean indicating artificial termination (time limit, safety) +6. **info**: Dictionary with environment-specific information +7. **complementary_data**: Dictionary for passing data between processor steps (NEW!) + +### Using TransitionIndex + +Instead of using magic numbers to access tuple elements, use the `TransitionIndex` enum: + +```python +from lerobot.processor.pipeline import TransitionIndex + +# Bad - using magic numbers +obs = transition[0] +action = transition[1] + +# Good - using TransitionIndex +obs = transition[TransitionIndex.OBSERVATION] +action = transition[TransitionIndex.ACTION] +reward = transition[TransitionIndex.REWARD] +done = transition[TransitionIndex.DONE] +truncated = transition[TransitionIndex.TRUNCATED] +info = transition[TransitionIndex.INFO] +comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] +``` + +## Your First RobotProcessor + +Let's create a processor that properly handles image and state preprocessing: + +```python +from lerobot.processor.pipeline import RobotProcessor, TransitionIndex +from lerobot.processor.observation_processor import ImageProcessor, StateProcessor +import numpy as np + +# Create sample data +observation = { + "pixels": { + "camera_front": np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8), + "camera_side": np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) + }, + "agent_pos": np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7], dtype=np.float32) +} + +# Create a full transition +transition = (observation, None, 0.0, False, False, {}, {}) + +# Create and use the processor +processor = RobotProcessor([ + ImageProcessor(), # Converts uint8[0,255] to float32[0,1], HWC to CHW, adds batch dim + StateProcessor(), # Converts numpy arrays to torch tensors, adds batch dim +]) + +processed_transition = processor(transition) +processed_obs = processed_transition[TransitionIndex.OBSERVATION] + +# Check the results +print("Original keys:", observation.keys()) +print("Processed keys:", processed_obs.keys()) +print("Image shape:", processed_obs["observation.images.camera_front"].shape) # [1, 3, 480, 640] +print("Image dtype:", processed_obs["observation.images.camera_front"].dtype) # torch.float32 +print("Image range:", processed_obs["observation.images.camera_front"].min().item(), + "to", processed_obs["observation.images.camera_front"].max().item()) # 0.0 to 1.0 +``` + +## Creating Custom Steps: The ProcessorStep Protocol + +A processor step must follow certain conventions. Let's create a complete example that shows all required and optional methods: + +```python +from dataclasses import dataclass +from typing import Any, Dict +import torch +import torch.nn.functional as F + +@dataclass +class ImagePadder: + """Pad images to a standard size for batch processing.""" + + target_height: int = 224 + target_width: int = 224 + pad_value: float = 0.0 + + def __call__(self, transition: EnvTransition) -> EnvTransition: + """Main processing method - required for all steps.""" + obs = transition[TransitionIndex.OBSERVATION] + + if obs is None: + return transition + + # Process all image observations + for key in list(obs.keys()): + if key.startswith("observation.images."): + img = obs[key] + # Calculate padding + _, _, h, w = img.shape + pad_h = max(0, self.target_height - h) + pad_w = max(0, self.target_width - w) + + if pad_h > 0 or pad_w > 0: + # Pad symmetrically + pad_left = pad_w // 2 + pad_right = pad_w - pad_left + pad_top = pad_h // 2 + pad_bottom = pad_h - pad_top + + img = F.pad(img, (pad_left, pad_right, pad_top, pad_bottom), + mode='constant', value=self.pad_value) + + obs[key] = img + + # Return modified transition + return (obs, *transition[1:]) + + def get_config(self) -> Dict[str, Any]: + """Return JSON-serializable configuration - required for save/load.""" + return { + "target_height": self.target_height, + "target_width": self.target_width, + "pad_value": self.pad_value + } + + def state_dict(self) -> Dict[str, torch.Tensor]: + """Return tensor state - only include torch.Tensor objects!""" + # This step has no learnable parameters + return {} + + def load_state_dict(self, state: Dict[str, torch.Tensor]) -> None: + """Load tensor state - required if state_dict returns non-empty dict.""" + # Nothing to load for this step + pass + + def reset(self) -> None: + """Reset internal state at episode boundaries - required for stateful steps.""" + # This step is stateless, so nothing to reset + pass +``` + +### Important: get_config vs state_dict + +These two methods serve different purposes and it's crucial to use them correctly: + +```python +@dataclass +class AdaptiveNormalizer: + """Example showing proper use of get_config and state_dict.""" + + learning_rate: float = 0.01 + epsilon: float = 1e-8 + + def __post_init__(self): + self.running_mean = None + self.running_var = None + self.num_samples = 0 # Python int, not tensor + + def get_config(self) -> Dict[str, Any]: + """ONLY Python objects that can be JSON serialized!""" + return { + "learning_rate": self.learning_rate, # float ✓ + "epsilon": self.epsilon, # float ✓ + "num_samples": self.num_samples, # int ✓ + # "running_mean": self.running_mean, # torch.Tensor ✗ WRONG! + } + + def state_dict(self) -> Dict[str, torch.Tensor]: + """ONLY torch.Tensor objects!""" + if self.running_mean is None: + return {} + return { + "running_mean": self.running_mean, # torch.Tensor ✓ + "running_var": self.running_var, # torch.Tensor ✓ + # "num_samples": self.num_samples, # int ✗ WRONG! + # Instead, convert to tensor if needed: + "num_samples_tensor": torch.tensor(self.num_samples) + } + + def load_state_dict(self, state: Dict[str, torch.Tensor]) -> None: + """Load tensors and convert back to Python types if needed.""" + self.running_mean = state.get("running_mean") + self.running_var = state.get("running_var") + if "num_samples_tensor" in state: + self.num_samples = int(state["num_samples_tensor"].item()) +``` + +## Using Complementary Data for Inter-Step Communication + +The `complementary_data` field is perfect for passing information between steps without polluting the info dictionary: + +```python +@dataclass +class ImageStatisticsCalculator: + """Calculate image statistics and pass to next steps.""" + + def __call__(self, transition: EnvTransition) -> EnvTransition: + obs = transition[TransitionIndex.OBSERVATION] + comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] or {} + + if obs is None: + return transition + + # Calculate statistics for all images + image_stats = {} + for key in obs: + if key.startswith("observation.images."): + img = obs[key] + stats = { + "mean": img.mean().item(), + "std": img.std().item(), + "min": img.min().item(), + "max": img.max().item(), + } + image_stats[key] = stats + + # Store in complementary_data for next steps + comp_data["image_statistics"] = image_stats + + # Return transition with updated complementary_data + return ( + obs, + transition[TransitionIndex.ACTION], + transition[TransitionIndex.REWARD], + transition[TransitionIndex.DONE], + transition[TransitionIndex.TRUNCATED], + transition[TransitionIndex.INFO], + comp_data # Updated complementary_data + ) + +@dataclass +class AdaptiveBrightnessAdjuster: + """Adjust brightness based on statistics from previous step.""" + + target_brightness: float = 0.5 + + def __call__(self, transition: EnvTransition) -> EnvTransition: + obs = transition[TransitionIndex.OBSERVATION] + comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] or {} + + if obs is None or "image_statistics" not in comp_data: + return transition + + # Use statistics from previous step + image_stats = comp_data["image_statistics"] + + for key in obs: + if key.startswith("observation.images.") and key in image_stats: + current_mean = image_stats[key]["mean"] + brightness_adjust = self.target_brightness - current_mean + + # Adjust brightness + obs[key] = torch.clamp(obs[key] + brightness_adjust, 0, 1) + + return (obs, *transition[1:]) + +# Use them together +processor = RobotProcessor([ + ImageProcessor(), + ImageStatisticsCalculator(), # Calculates stats + AdaptiveBrightnessAdjuster(), # Uses stats from previous step +]) +``` + +## Complete Example: Device-Aware Processing Pipeline + +Here's a complete example showing proper device management and all features: + +```python +from lerobot.processor.pipeline import RobotProcessor, ProcessorStepRegistry, TransitionIndex +import torch +import torch.nn.functional as F +import numpy as np + +@ProcessorStepRegistry.register("device_mover") +@dataclass +class DeviceMover: + """Move all tensors to specified device.""" + + device: str = "cuda" + + def __call__(self, transition: EnvTransition) -> EnvTransition: + obs = transition[TransitionIndex.OBSERVATION] + + if obs is None: + return transition + + # Move all tensor observations to device + for key, value in obs.items(): + if isinstance(value, torch.Tensor): + obs[key] = value.to(self.device) + + # Also handle action if present + action = transition[TransitionIndex.ACTION] + if action is not None and isinstance(action, torch.Tensor): + action = action.to(self.device) + return (obs, action, *transition[2:]) + + return (obs, *transition[1:]) + + def get_config(self) -> Dict[str, Any]: + return {"device": str(self.device)} + +@ProcessorStepRegistry.register("running_normalizer") +@dataclass +class RunningNormalizer: + """Normalize using running statistics with proper device handling.""" + + feature_dim: int + momentum: float = 0.1 + + def __post_init__(self): + # Initialize as None - will be created on first call with correct device + self.running_mean = None + self.running_var = None + self.initialized = False + + def __call__(self, transition: EnvTransition) -> EnvTransition: + obs = transition[TransitionIndex.OBSERVATION] + + if obs is None or "observation.state" not in obs: + return transition + + state = obs["observation.state"] + + # Initialize on first call with correct device + if not self.initialized: + device = state.device + self.running_mean = torch.zeros(self.feature_dim, device=device) + self.running_var = torch.ones(self.feature_dim, device=device) + self.initialized = True + + # Update statistics + with torch.no_grad(): + batch_mean = state.mean(dim=0) + batch_var = state.var(dim=0, unbiased=False) + + self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * batch_mean + self.running_var = (1 - self.momentum) * self.running_var + self.momentum * batch_var + + # Normalize + state_normalized = (state - self.running_mean) / (self.running_var + 1e-8).sqrt() + obs["observation.state"] = state_normalized + + return (obs, *transition[1:]) + + def get_config(self) -> Dict[str, Any]: + return { + "feature_dim": self.feature_dim, + "momentum": self.momentum, + "initialized": self.initialized + } + + def state_dict(self) -> Dict[str, torch.Tensor]: + if not self.initialized: + return {} + return { + "running_mean": self.running_mean, + "running_var": self.running_var + } + + def load_state_dict(self, state: Dict[str, torch.Tensor]) -> None: + if state: + self.running_mean = state["running_mean"] + self.running_var = state["running_var"] + self.initialized = True + + def reset(self) -> None: + # Don't reset statistics - they persist across episodes + pass + +# Create a complete pipeline +processor = RobotProcessor([ + ImageProcessor(), # Convert images to float32 [0,1] + StateProcessor(), # Convert states to torch tensors + ImagePadder(224, 224), # Pad images to standard size + DeviceMover("cuda"), # Move everything to GPU + RunningNormalizer(7), # Normalize states +], name="CompletePreprocessor") + +# The processor handles device transfers automatically +processor = processor.to("cuda") # Moves all stateful components to GPU + +# Use it +obs = { + "pixels": {"cam": np.random.randint(0, 255, (200, 300, 3), dtype=np.uint8)}, + "agent_pos": np.random.randn(7).astype(np.float32) +} +transition = (obs, None, 0.0, False, False, {}, {}) + +# Everything is processed and on GPU +processed = processor(transition) +print(processed[TransitionIndex.OBSERVATION]["observation.images.cam"].device) # cuda:0 +``` + +## Solving Real-World Problems with RobotProcessor + +Let's see how RobotProcessor elegantly solves the scenarios we discussed earlier: + +### Renaming Observation Keys + +When your environment and policy speak different "languages": + +```python +@ProcessorStepRegistry.register("key_remapper") +@dataclass +class KeyRemapper: + """Rename observation keys to match policy expectations.""" + + key_mapping: Dict[str, str] = field(default_factory=lambda: { + "rgb_camera_front": "observation.images.wrist", + "joint_positions": "observation.state", + "gripper_state": "observation.gripper" + }) + + def __call__(self, transition: EnvTransition) -> EnvTransition: + obs = transition[TransitionIndex.OBSERVATION] + if obs is None: + return transition + + # Create new observation with renamed keys + renamed_obs = {} + for old_key, new_key in self.key_mapping.items(): + if old_key in obs: + renamed_obs[new_key] = obs[old_key] + + # Keep any unmapped keys as-is + for key, value in obs.items(): + if key not in self.key_mapping: + renamed_obs[key] = value + + return (renamed_obs, *transition[1:]) +``` + +### Workspace-Focused Image Processing + +When you need to crop and resize images to focus on the manipulation workspace: + +```python +@ProcessorStepRegistry.register("workspace_cropper") +@dataclass +class WorkspaceCropper: + """Crop and resize images to focus on robot workspace.""" + + crop_bbox: Tuple[int, int, int, int] = (400, 200, 1200, 800) # (x1, y1, x2, y2) + output_size: Tuple[int, int] = (224, 224) + + def __call__(self, transition: EnvTransition) -> EnvTransition: + obs = transition[TransitionIndex.OBSERVATION] + if obs is None: + return transition + + for key in list(obs.keys()): + if key.startswith("observation.images."): + img = obs[key] + # Crop to workspace + x1, y1, x2, y2 = self.crop_bbox + img_cropped = img[:, :, y1:y2, x1:x2] + # Resize to expected dimensions + img_resized = F.interpolate( + img_cropped, + size=self.output_size, + mode='bilinear', + align_corners=False + ) + obs[key] = img_resized + + return (obs, *transition[1:]) +``` + + +### Building Complete Pipelines for Different Robots + +Now you can compose these steps into robot-specific pipelines: + +```python +# Pipeline for Robot A with wrist camera +robot_a_processor = RobotProcessor([ + # Observation preprocessing + KeyRemapper({ + "wrist_rgb": "observation.images.wrist", + "arm_joints": "observation.state" + }), + ImageProcessor(), + WorkspaceCropper(crop_bbox=(300, 100, 900, 700), output_size=(224, 224)), + StateProcessor(), + VelocityCalculator(state_key="observation.state"), + DeviceMover("cuda"), + + # Action postprocessing + ActionSmoother(alpha=0.2), +], name="RobotA_ACT_Processor") + +# Pipeline for Robot B with overhead camera +robot_b_processor = RobotProcessor([ + # Different key mapping for Robot B + KeyRemapper({ + "overhead_cam": "observation.images.wrist", # Reuse same policy! + "joint_encoders": "observation.state" + }), + ImageProcessor(), + WorkspaceCropper(crop_bbox=(100, 50, 1100, 950), output_size=(224, 224)), + StateProcessor(), + VelocityCalculator(state_key="observation.state"), + DeviceMover("cuda"), + + ActionSmoother(alpha=0.3), # Different smoothing +], name="RobotB_ACT_Processor") + +# Share processors with the community +robot_a_processor.push_to_hub("my-lab/robot-a-act-processor") +robot_b_processor.push_to_hub("my-lab/robot-b-act-processor") + +# Now anyone can use the same preprocessing +processor = RobotProcessor.from_pretrained("my-lab/robot-a-act-processor") +``` + +The beauty of this approach is that: +1. **Each transformation is tested independently** - The IK solver can be validated separately from image cropping +2. **Configurations are shareable** - Other labs can use your exact preprocessing setup +3. **Pipelines are composable** - Mix and match steps for different robots +4. **Everything is version controlled** - Use Hub revisions to track changes + +## Best Practices for Processor Steps + +### 1. Always Check for None +```python +def __call__(self, transition: EnvTransition) -> EnvTransition: + obs = transition[TransitionIndex.OBSERVATION] + + # Always check if observation exists + if obs is None: + return transition + + # Also check for specific keys + if "observation.state" not in obs: + return transition +``` + +### 2. Preserve Transition Structure +```python +# Good - preserve all elements +return (modified_obs, *transition[1:]) + +# Bad - losing information +return (modified_obs, None, 0.0, False, False, {}, {}) +``` + +### 3. Clone When Storing State +```python +def __call__(self, transition: EnvTransition) -> EnvTransition: + obs = transition[TransitionIndex.OBSERVATION] + + if self.store_previous: + # Good - clone to avoid reference issues + self.previous_state = obs["observation.state"].clone() + + # Bad - stores reference that might be modified + # self.previous_state = obs["observation.state"] +``` + +### 4. Handle Device Transfers in state_dict +```python +def state_dict(self) -> Dict[str, torch.Tensor]: + if self.buffer is None: + return {} + + # Good - clone to avoid memory sharing issues + return {"buffer": self.buffer.clone()} +``` + +## Complete Policy Example with Pre and Post Processing + +Here's how to use RobotProcessor in a real robot control loop: + +```python +from lerobot.processor.pipeline import RobotProcessor, ProcessorStepRegistry, TransitionIndex +from lerobot.policies.act.modeling_act import ACTPolicy +from pathlib import Path +import time + +# Create preprocessing pipeline +preprocessor = RobotProcessor([ + ImageProcessor(), + StateProcessor(), + ImagePadder(target_height=224, target_width=224), + DeviceMover("cuda"), + RunningNormalizer(feature_dim=14), # For 14-DOF robot +], name="ACTPreprocessor") + +# Create postprocessing pipeline for actions +@ProcessorStepRegistry.register("action_clipper") +@dataclass +class ActionClipper: + """Clip actions to safe ranges.""" + min_value: float = -1.0 + max_value: float = 1.0 + + def __call__(self, transition: EnvTransition) -> EnvTransition: + action = transition[TransitionIndex.ACTION] + + if action is not None: + action = torch.clamp(action, self.min_value, self.max_value) + return (transition[TransitionIndex.OBSERVATION], action, *transition[2:]) + + return transition + + def get_config(self) -> Dict[str, Any]: + return {"min_value": self.min_value, "max_value": self.max_value} + +postprocessor = RobotProcessor([ + ActionClipper(min_value=-0.5, max_value=0.5), + ActionSmoother(alpha=0.3), +], name="ACTPostprocessor") + +# Load policy +policy = ACTPolicy.from_pretrained("lerobot/act_aloha_sim_transfer_cube_human") + +# Move everything to GPU +preprocessor = preprocessor.to("cuda") +postprocessor = postprocessor.to("cuda") +policy = policy.to("cuda") + +# Control loop +env = make_robot_env() +obs, info = env.reset() + +for episode in range(10): + print(f"Episode {episode + 1}") + + for step in range(1000): + # Create transition with raw observation + transition = (obs, None, 0.0, False, False, info, {"step": step}) + + # Preprocess + processed_transition = preprocessor(transition) + processed_obs = processed_transition[TransitionIndex.OBSERVATION] + + # Get action from policy + with torch.no_grad(): + action = policy.select_action(processed_obs) + + # Postprocess action + action_transition = ( + processed_obs, + action, + 0.0, + False, + False, + info, + {"raw_action": action.clone()} # Store raw action in complementary_data + ) + processed_action_transition = postprocessor(action_transition) + final_action = processed_action_transition[TransitionIndex.ACTION] + + # Execute action + obs, reward, terminated, truncated, info = env.step(final_action.cpu().numpy()) + + if terminated or truncated: + # Reset at episode boundary + preprocessor.reset() + postprocessor.reset() + obs, info = env.reset() + break + + # Save preprocessor with learned statistics + preprocessor.save_pretrained(f"./checkpoints/preprocessor_ep{episode}") + +# Push final version to hub +preprocessor.push_to_hub("my-username/act-preprocessor") +postprocessor.push_to_hub("my-username/act-postprocessor") +``` + +## Debugging and Monitoring + +Use the full power of `RobotProcessor` for debugging: + +```python +# Enable detailed logging +def log_observation_shapes(step_idx: int, transition: EnvTransition): + obs = transition[TransitionIndex.OBSERVATION] + if obs: + print(f"Step {step_idx} observations:") + for key, value in obs.items(): + if isinstance(value, torch.Tensor): + print(f" {key}: shape={value.shape}, device={value.device}, dtype={value.dtype}") + return None + +processor.register_after_step_hook(log_observation_shapes) + +# Monitor complementary data flow +def monitor_complementary_data(step_idx: int, transition: EnvTransition): + comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] + if comp_data: + print(f"Step {step_idx} complementary_data: {list(comp_data.keys())}") + return None + +processor.register_before_step_hook(monitor_complementary_data) + +# Validate data integrity +def validate_tensors(step_idx: int, transition: EnvTransition): + obs = transition[TransitionIndex.OBSERVATION] + if obs: + for key, value in obs.items(): + if isinstance(value, torch.Tensor): + if torch.isnan(value).any(): + raise ValueError(f"NaN detected in {key} after step {step_idx}") + if torch.isinf(value).any(): + raise ValueError(f"Inf detected in {key} after step {step_idx}") + return None + +processor.register_after_step_hook(validate_tensors) +``` + +## Summary + +RobotProcessor provides a powerful, modular approach to data preprocessing in robotics: + +- **Clear separation of concerns**: Each transformation is a separate, testable unit +- **Proper state management**: Clear distinction between config (JSON) and state (tensors) +- **Device-aware**: Seamless GPU/CPU transfers with `.to(device)` +- **Inter-step communication**: Use `complementary_data` for passing information +- **Easy sharing**: Push to Hugging Face Hub for reproducibility +- **Type safety**: Use `TransitionIndex` instead of magic numbers +- **Debugging tools**: Step through transformations and add monitoring hooks + +By following these patterns, your preprocessing code becomes more maintainable, shareable, and robust. + +For the full API reference, see the [RobotProcessor API documentation](/api/processor). \ No newline at end of file