Add RobotProcessor tutorial to documentation

- Introduced a new tutorial on using RobotProcessor for preprocessing robot data.
- Added a section in the table of contents for easy navigation to the new tutorial.
- The tutorial covers key concepts, real-world scenarios, and practical examples for effective use of the RobotProcessor pipeline.
This commit is contained in:
Adil Zouitine
2025-07-03 15:22:21 +02:00
parent 9aa632968f
commit cd13f1ecfd
2 changed files with 849 additions and 0 deletions
+847
View File
@@ -0,0 +1,847 @@
# RobotProcessor: The Art of Processing Robot Data
This tutorial will teach you how to use RobotProcessor, a powerful pipeline system for preprocessing robot observations and postprocessing actions. You'll learn how to create modular, composable data transformations that make your robot learning pipelines more maintainable and reproducible.
**You'll learn:**
1. What RobotProcessor is and why it's better than alternatives
2. How to create and compose processor steps
3. How to save, load, and share your processors
4. Advanced features for debugging and customization
## Real-World Scenarios: When You Need RobotProcessor
Before diving into code, let's understand the real problems RobotProcessor solves. If you've worked with robots and learned policies, you've likely encountered these challenges:
### Observation Key Mismatches
Your environment might output observations with keys like `"rgb_camera_front"` and `"joint_positions"`, but your pre-trained policy expects `"observation.images.wrist"` and `"observation.state"`. You need to rename these keys consistently across your pipeline.
### Image Preprocessing Requirements
Your robot's camera captures 1920x1080 RGB images, but your policy was trained on 224x224 cropped images focusing on the workspace. You need to:
- Crop out the relevant workspace area
- Resize to the expected dimensions
- Convert from uint8 [0,255] to float32 [0,1]
- Ensure the right channel ordering (HWC vs CHW)
- Move everything to GPU efficiently
### Coordinate System Transformations
Your policy outputs desired end-effector positions in Cartesian space, but your robot expects joint angles. You need to:
- Apply inverse kinematics to convert positions to joint angles
- Handle singularities and out-of-reach positions
- Smooth the resulting trajectories to avoid jerky movements
- Clip values to respect joint limits
### State Augmentation
Your environment provides joint positions, but your policy also needs velocities and accelerations. You need to:
- Calculate velocities from position differences
- Maintain a buffer of past states
- Handle episode boundaries correctly
### Multi-Robot Deployment
You want to deploy the same policy on different robots with varying:
- Camera setups (different resolutions, positions)
- Joint configurations (6-DOF vs 7-DOF arms)
- Control frequencies (10Hz vs 30Hz)
- Safety limits and workspace boundaries
### Data Collection and Training
During data collection, you need to:
- Record raw sensor data for dataset creation
- Apply the same preprocessing as during deployment
- Ensure reproducibility across different operators
- Share preprocessing configurations with collaborators
RobotProcessor solves all these problems by providing a modular, composable system where each transformation is a separate, shareable component. Let's see how.
## Why RobotProcessor?
If you've worked with robot learning before, you've likely written preprocessing code like this:
```python
# Traditional procedural approach - hard to maintain and share
def preprocess_observation(obs, device='cuda'):
processed_obs = {}
# Process images
if "pixels" in obs:
for cam_name, img in obs["pixels"].items():
# Convert uint8 [0, 255] to float32 [0, 1]
img = img.astype(np.float32) / 255.0
# Convert from HWC to CHW format
img = np.transpose(img, (2, 0, 1))
# Add batch dimension
img = np.expand_dims(img, axis=0)
# Convert to torch and move to device
img = torch.from_numpy(img).to(device)
# Pad to standard size
if img.shape[-2:] != (224, 224):
img = F.pad(img, (0, 224 - img.shape[-1], 0, 224 - img.shape[-2]))
processed_obs[f"observation.images.{cam_name}"] = img
# Process state
if "agent_pos" in obs:
state = torch.from_numpy(obs["agent_pos"]).float()
state = state.unsqueeze(0).to(device)
processed_obs["observation.state"] = state
return processed_obs
```
This approach has several problems:
- **Mixed concerns**: Image processing, tensor conversion, device management all in one function
- **Hard to test**: Can't test image processing separately from state processing
- **Device management scattered**: Easy to forget `.to(device)` calls
- **No standardized format**: Each project reinvents the wheel
- **Difficult to share**: Can't easily reuse this preprocessing in other projects
RobotProcessor solves these issues by providing a declarative pipeline approach where each transformation is a separate, testable, shareable component.
## Understanding EnvTransition
Before diving into RobotProcessor, let's understand the data structure it operates on. An `EnvTransition` is a 7-tuple that represents a complete transition in the environment:
```python
from lerobot.processor.pipeline import TransitionIndex
# EnvTransition structure:
# (observation, action, reward, done, truncated, info, complementary_data)
transition = (
{"pixels": ..., "agent_pos": ...}, # observation at time t
[0.1, -0.2, 0.3], # action taken at time t
1.0, # reward received
False, # episode done flag
False, # episode truncated flag
{"success": True}, # additional info from environment
{"step_idx": 42} # complementary_data for inter-step communication
)
```
Each element serves a specific purpose:
1. **observation**: Raw sensor data from the environment (images, states, etc.)
2. **action**: The action command sent to the robot
3. **reward**: Scalar reward signal (for RL tasks)
4. **done**: Boolean indicating natural episode termination
5. **truncated**: Boolean indicating artificial termination (time limit, safety)
6. **info**: Dictionary with environment-specific information
7. **complementary_data**: Dictionary for passing data between processor steps (NEW!)
### Using TransitionIndex
Instead of using magic numbers to access tuple elements, use the `TransitionIndex` enum:
```python
from lerobot.processor.pipeline import TransitionIndex
# Bad - using magic numbers
obs = transition[0]
action = transition[1]
# Good - using TransitionIndex
obs = transition[TransitionIndex.OBSERVATION]
action = transition[TransitionIndex.ACTION]
reward = transition[TransitionIndex.REWARD]
done = transition[TransitionIndex.DONE]
truncated = transition[TransitionIndex.TRUNCATED]
info = transition[TransitionIndex.INFO]
comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA]
```
## Your First RobotProcessor
Let's create a processor that properly handles image and state preprocessing:
```python
from lerobot.processor.pipeline import RobotProcessor, TransitionIndex
from lerobot.processor.observation_processor import ImageProcessor, StateProcessor
import numpy as np
# Create sample data
observation = {
"pixels": {
"camera_front": np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8),
"camera_side": np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
},
"agent_pos": np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7], dtype=np.float32)
}
# Create a full transition
transition = (observation, None, 0.0, False, False, {}, {})
# Create and use the processor
processor = RobotProcessor([
ImageProcessor(), # Converts uint8[0,255] to float32[0,1], HWC to CHW, adds batch dim
StateProcessor(), # Converts numpy arrays to torch tensors, adds batch dim
])
processed_transition = processor(transition)
processed_obs = processed_transition[TransitionIndex.OBSERVATION]
# Check the results
print("Original keys:", observation.keys())
print("Processed keys:", processed_obs.keys())
print("Image shape:", processed_obs["observation.images.camera_front"].shape) # [1, 3, 480, 640]
print("Image dtype:", processed_obs["observation.images.camera_front"].dtype) # torch.float32
print("Image range:", processed_obs["observation.images.camera_front"].min().item(),
"to", processed_obs["observation.images.camera_front"].max().item()) # 0.0 to 1.0
```
## Creating Custom Steps: The ProcessorStep Protocol
A processor step must follow certain conventions. Let's create a complete example that shows all required and optional methods:
```python
from dataclasses import dataclass
from typing import Any, Dict
import torch
import torch.nn.functional as F
@dataclass
class ImagePadder:
"""Pad images to a standard size for batch processing."""
target_height: int = 224
target_width: int = 224
pad_value: float = 0.0
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Main processing method - required for all steps."""
obs = transition[TransitionIndex.OBSERVATION]
if obs is None:
return transition
# Process all image observations
for key in list(obs.keys()):
if key.startswith("observation.images."):
img = obs[key]
# Calculate padding
_, _, h, w = img.shape
pad_h = max(0, self.target_height - h)
pad_w = max(0, self.target_width - w)
if pad_h > 0 or pad_w > 0:
# Pad symmetrically
pad_left = pad_w // 2
pad_right = pad_w - pad_left
pad_top = pad_h // 2
pad_bottom = pad_h - pad_top
img = F.pad(img, (pad_left, pad_right, pad_top, pad_bottom),
mode='constant', value=self.pad_value)
obs[key] = img
# Return modified transition
return (obs, *transition[1:])
def get_config(self) -> Dict[str, Any]:
"""Return JSON-serializable configuration - required for save/load."""
return {
"target_height": self.target_height,
"target_width": self.target_width,
"pad_value": self.pad_value
}
def state_dict(self) -> Dict[str, torch.Tensor]:
"""Return tensor state - only include torch.Tensor objects!"""
# This step has no learnable parameters
return {}
def load_state_dict(self, state: Dict[str, torch.Tensor]) -> None:
"""Load tensor state - required if state_dict returns non-empty dict."""
# Nothing to load for this step
pass
def reset(self) -> None:
"""Reset internal state at episode boundaries - required for stateful steps."""
# This step is stateless, so nothing to reset
pass
```
### Important: get_config vs state_dict
These two methods serve different purposes and it's crucial to use them correctly:
```python
@dataclass
class AdaptiveNormalizer:
"""Example showing proper use of get_config and state_dict."""
learning_rate: float = 0.01
epsilon: float = 1e-8
def __post_init__(self):
self.running_mean = None
self.running_var = None
self.num_samples = 0 # Python int, not tensor
def get_config(self) -> Dict[str, Any]:
"""ONLY Python objects that can be JSON serialized!"""
return {
"learning_rate": self.learning_rate, # float ✓
"epsilon": self.epsilon, # float ✓
"num_samples": self.num_samples, # int ✓
# "running_mean": self.running_mean, # torch.Tensor ✗ WRONG!
}
def state_dict(self) -> Dict[str, torch.Tensor]:
"""ONLY torch.Tensor objects!"""
if self.running_mean is None:
return {}
return {
"running_mean": self.running_mean, # torch.Tensor ✓
"running_var": self.running_var, # torch.Tensor ✓
# "num_samples": self.num_samples, # int ✗ WRONG!
# Instead, convert to tensor if needed:
"num_samples_tensor": torch.tensor(self.num_samples)
}
def load_state_dict(self, state: Dict[str, torch.Tensor]) -> None:
"""Load tensors and convert back to Python types if needed."""
self.running_mean = state.get("running_mean")
self.running_var = state.get("running_var")
if "num_samples_tensor" in state:
self.num_samples = int(state["num_samples_tensor"].item())
```
## Using Complementary Data for Inter-Step Communication
The `complementary_data` field is perfect for passing information between steps without polluting the info dictionary:
```python
@dataclass
class ImageStatisticsCalculator:
"""Calculate image statistics and pass to next steps."""
def __call__(self, transition: EnvTransition) -> EnvTransition:
obs = transition[TransitionIndex.OBSERVATION]
comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] or {}
if obs is None:
return transition
# Calculate statistics for all images
image_stats = {}
for key in obs:
if key.startswith("observation.images."):
img = obs[key]
stats = {
"mean": img.mean().item(),
"std": img.std().item(),
"min": img.min().item(),
"max": img.max().item(),
}
image_stats[key] = stats
# Store in complementary_data for next steps
comp_data["image_statistics"] = image_stats
# Return transition with updated complementary_data
return (
obs,
transition[TransitionIndex.ACTION],
transition[TransitionIndex.REWARD],
transition[TransitionIndex.DONE],
transition[TransitionIndex.TRUNCATED],
transition[TransitionIndex.INFO],
comp_data # Updated complementary_data
)
@dataclass
class AdaptiveBrightnessAdjuster:
"""Adjust brightness based on statistics from previous step."""
target_brightness: float = 0.5
def __call__(self, transition: EnvTransition) -> EnvTransition:
obs = transition[TransitionIndex.OBSERVATION]
comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] or {}
if obs is None or "image_statistics" not in comp_data:
return transition
# Use statistics from previous step
image_stats = comp_data["image_statistics"]
for key in obs:
if key.startswith("observation.images.") and key in image_stats:
current_mean = image_stats[key]["mean"]
brightness_adjust = self.target_brightness - current_mean
# Adjust brightness
obs[key] = torch.clamp(obs[key] + brightness_adjust, 0, 1)
return (obs, *transition[1:])
# Use them together
processor = RobotProcessor([
ImageProcessor(),
ImageStatisticsCalculator(), # Calculates stats
AdaptiveBrightnessAdjuster(), # Uses stats from previous step
])
```
## Complete Example: Device-Aware Processing Pipeline
Here's a complete example showing proper device management and all features:
```python
from lerobot.processor.pipeline import RobotProcessor, ProcessorStepRegistry, TransitionIndex
import torch
import torch.nn.functional as F
import numpy as np
@ProcessorStepRegistry.register("device_mover")
@dataclass
class DeviceMover:
"""Move all tensors to specified device."""
device: str = "cuda"
def __call__(self, transition: EnvTransition) -> EnvTransition:
obs = transition[TransitionIndex.OBSERVATION]
if obs is None:
return transition
# Move all tensor observations to device
for key, value in obs.items():
if isinstance(value, torch.Tensor):
obs[key] = value.to(self.device)
# Also handle action if present
action = transition[TransitionIndex.ACTION]
if action is not None and isinstance(action, torch.Tensor):
action = action.to(self.device)
return (obs, action, *transition[2:])
return (obs, *transition[1:])
def get_config(self) -> Dict[str, Any]:
return {"device": str(self.device)}
@ProcessorStepRegistry.register("running_normalizer")
@dataclass
class RunningNormalizer:
"""Normalize using running statistics with proper device handling."""
feature_dim: int
momentum: float = 0.1
def __post_init__(self):
# Initialize as None - will be created on first call with correct device
self.running_mean = None
self.running_var = None
self.initialized = False
def __call__(self, transition: EnvTransition) -> EnvTransition:
obs = transition[TransitionIndex.OBSERVATION]
if obs is None or "observation.state" not in obs:
return transition
state = obs["observation.state"]
# Initialize on first call with correct device
if not self.initialized:
device = state.device
self.running_mean = torch.zeros(self.feature_dim, device=device)
self.running_var = torch.ones(self.feature_dim, device=device)
self.initialized = True
# Update statistics
with torch.no_grad():
batch_mean = state.mean(dim=0)
batch_var = state.var(dim=0, unbiased=False)
self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * batch_mean
self.running_var = (1 - self.momentum) * self.running_var + self.momentum * batch_var
# Normalize
state_normalized = (state - self.running_mean) / (self.running_var + 1e-8).sqrt()
obs["observation.state"] = state_normalized
return (obs, *transition[1:])
def get_config(self) -> Dict[str, Any]:
return {
"feature_dim": self.feature_dim,
"momentum": self.momentum,
"initialized": self.initialized
}
def state_dict(self) -> Dict[str, torch.Tensor]:
if not self.initialized:
return {}
return {
"running_mean": self.running_mean,
"running_var": self.running_var
}
def load_state_dict(self, state: Dict[str, torch.Tensor]) -> None:
if state:
self.running_mean = state["running_mean"]
self.running_var = state["running_var"]
self.initialized = True
def reset(self) -> None:
# Don't reset statistics - they persist across episodes
pass
# Create a complete pipeline
processor = RobotProcessor([
ImageProcessor(), # Convert images to float32 [0,1]
StateProcessor(), # Convert states to torch tensors
ImagePadder(224, 224), # Pad images to standard size
DeviceMover("cuda"), # Move everything to GPU
RunningNormalizer(7), # Normalize states
], name="CompletePreprocessor")
# The processor handles device transfers automatically
processor = processor.to("cuda") # Moves all stateful components to GPU
# Use it
obs = {
"pixels": {"cam": np.random.randint(0, 255, (200, 300, 3), dtype=np.uint8)},
"agent_pos": np.random.randn(7).astype(np.float32)
}
transition = (obs, None, 0.0, False, False, {}, {})
# Everything is processed and on GPU
processed = processor(transition)
print(processed[TransitionIndex.OBSERVATION]["observation.images.cam"].device) # cuda:0
```
## Solving Real-World Problems with RobotProcessor
Let's see how RobotProcessor elegantly solves the scenarios we discussed earlier:
### Renaming Observation Keys
When your environment and policy speak different "languages":
```python
@ProcessorStepRegistry.register("key_remapper")
@dataclass
class KeyRemapper:
"""Rename observation keys to match policy expectations."""
key_mapping: Dict[str, str] = field(default_factory=lambda: {
"rgb_camera_front": "observation.images.wrist",
"joint_positions": "observation.state",
"gripper_state": "observation.gripper"
})
def __call__(self, transition: EnvTransition) -> EnvTransition:
obs = transition[TransitionIndex.OBSERVATION]
if obs is None:
return transition
# Create new observation with renamed keys
renamed_obs = {}
for old_key, new_key in self.key_mapping.items():
if old_key in obs:
renamed_obs[new_key] = obs[old_key]
# Keep any unmapped keys as-is
for key, value in obs.items():
if key not in self.key_mapping:
renamed_obs[key] = value
return (renamed_obs, *transition[1:])
```
### Workspace-Focused Image Processing
When you need to crop and resize images to focus on the manipulation workspace:
```python
@ProcessorStepRegistry.register("workspace_cropper")
@dataclass
class WorkspaceCropper:
"""Crop and resize images to focus on robot workspace."""
crop_bbox: Tuple[int, int, int, int] = (400, 200, 1200, 800) # (x1, y1, x2, y2)
output_size: Tuple[int, int] = (224, 224)
def __call__(self, transition: EnvTransition) -> EnvTransition:
obs = transition[TransitionIndex.OBSERVATION]
if obs is None:
return transition
for key in list(obs.keys()):
if key.startswith("observation.images."):
img = obs[key]
# Crop to workspace
x1, y1, x2, y2 = self.crop_bbox
img_cropped = img[:, :, y1:y2, x1:x2]
# Resize to expected dimensions
img_resized = F.interpolate(
img_cropped,
size=self.output_size,
mode='bilinear',
align_corners=False
)
obs[key] = img_resized
return (obs, *transition[1:])
```
### Building Complete Pipelines for Different Robots
Now you can compose these steps into robot-specific pipelines:
```python
# Pipeline for Robot A with wrist camera
robot_a_processor = RobotProcessor([
# Observation preprocessing
KeyRemapper({
"wrist_rgb": "observation.images.wrist",
"arm_joints": "observation.state"
}),
ImageProcessor(),
WorkspaceCropper(crop_bbox=(300, 100, 900, 700), output_size=(224, 224)),
StateProcessor(),
VelocityCalculator(state_key="observation.state"),
DeviceMover("cuda"),
# Action postprocessing
ActionSmoother(alpha=0.2),
], name="RobotA_ACT_Processor")
# Pipeline for Robot B with overhead camera
robot_b_processor = RobotProcessor([
# Different key mapping for Robot B
KeyRemapper({
"overhead_cam": "observation.images.wrist", # Reuse same policy!
"joint_encoders": "observation.state"
}),
ImageProcessor(),
WorkspaceCropper(crop_bbox=(100, 50, 1100, 950), output_size=(224, 224)),
StateProcessor(),
VelocityCalculator(state_key="observation.state"),
DeviceMover("cuda"),
ActionSmoother(alpha=0.3), # Different smoothing
], name="RobotB_ACT_Processor")
# Share processors with the community
robot_a_processor.push_to_hub("my-lab/robot-a-act-processor")
robot_b_processor.push_to_hub("my-lab/robot-b-act-processor")
# Now anyone can use the same preprocessing
processor = RobotProcessor.from_pretrained("my-lab/robot-a-act-processor")
```
The beauty of this approach is that:
1. **Each transformation is tested independently** - The IK solver can be validated separately from image cropping
2. **Configurations are shareable** - Other labs can use your exact preprocessing setup
3. **Pipelines are composable** - Mix and match steps for different robots
4. **Everything is version controlled** - Use Hub revisions to track changes
## Best Practices for Processor Steps
### 1. Always Check for None
```python
def __call__(self, transition: EnvTransition) -> EnvTransition:
obs = transition[TransitionIndex.OBSERVATION]
# Always check if observation exists
if obs is None:
return transition
# Also check for specific keys
if "observation.state" not in obs:
return transition
```
### 2. Preserve Transition Structure
```python
# Good - preserve all elements
return (modified_obs, *transition[1:])
# Bad - losing information
return (modified_obs, None, 0.0, False, False, {}, {})
```
### 3. Clone When Storing State
```python
def __call__(self, transition: EnvTransition) -> EnvTransition:
obs = transition[TransitionIndex.OBSERVATION]
if self.store_previous:
# Good - clone to avoid reference issues
self.previous_state = obs["observation.state"].clone()
# Bad - stores reference that might be modified
# self.previous_state = obs["observation.state"]
```
### 4. Handle Device Transfers in state_dict
```python
def state_dict(self) -> Dict[str, torch.Tensor]:
if self.buffer is None:
return {}
# Good - clone to avoid memory sharing issues
return {"buffer": self.buffer.clone()}
```
## Complete Policy Example with Pre and Post Processing
Here's how to use RobotProcessor in a real robot control loop:
```python
from lerobot.processor.pipeline import RobotProcessor, ProcessorStepRegistry, TransitionIndex
from lerobot.policies.act.modeling_act import ACTPolicy
from pathlib import Path
import time
# Create preprocessing pipeline
preprocessor = RobotProcessor([
ImageProcessor(),
StateProcessor(),
ImagePadder(target_height=224, target_width=224),
DeviceMover("cuda"),
RunningNormalizer(feature_dim=14), # For 14-DOF robot
], name="ACTPreprocessor")
# Create postprocessing pipeline for actions
@ProcessorStepRegistry.register("action_clipper")
@dataclass
class ActionClipper:
"""Clip actions to safe ranges."""
min_value: float = -1.0
max_value: float = 1.0
def __call__(self, transition: EnvTransition) -> EnvTransition:
action = transition[TransitionIndex.ACTION]
if action is not None:
action = torch.clamp(action, self.min_value, self.max_value)
return (transition[TransitionIndex.OBSERVATION], action, *transition[2:])
return transition
def get_config(self) -> Dict[str, Any]:
return {"min_value": self.min_value, "max_value": self.max_value}
postprocessor = RobotProcessor([
ActionClipper(min_value=-0.5, max_value=0.5),
ActionSmoother(alpha=0.3),
], name="ACTPostprocessor")
# Load policy
policy = ACTPolicy.from_pretrained("lerobot/act_aloha_sim_transfer_cube_human")
# Move everything to GPU
preprocessor = preprocessor.to("cuda")
postprocessor = postprocessor.to("cuda")
policy = policy.to("cuda")
# Control loop
env = make_robot_env()
obs, info = env.reset()
for episode in range(10):
print(f"Episode {episode + 1}")
for step in range(1000):
# Create transition with raw observation
transition = (obs, None, 0.0, False, False, info, {"step": step})
# Preprocess
processed_transition = preprocessor(transition)
processed_obs = processed_transition[TransitionIndex.OBSERVATION]
# Get action from policy
with torch.no_grad():
action = policy.select_action(processed_obs)
# Postprocess action
action_transition = (
processed_obs,
action,
0.0,
False,
False,
info,
{"raw_action": action.clone()} # Store raw action in complementary_data
)
processed_action_transition = postprocessor(action_transition)
final_action = processed_action_transition[TransitionIndex.ACTION]
# Execute action
obs, reward, terminated, truncated, info = env.step(final_action.cpu().numpy())
if terminated or truncated:
# Reset at episode boundary
preprocessor.reset()
postprocessor.reset()
obs, info = env.reset()
break
# Save preprocessor with learned statistics
preprocessor.save_pretrained(f"./checkpoints/preprocessor_ep{episode}")
# Push final version to hub
preprocessor.push_to_hub("my-username/act-preprocessor")
postprocessor.push_to_hub("my-username/act-postprocessor")
```
## Debugging and Monitoring
Use the full power of `RobotProcessor` for debugging:
```python
# Enable detailed logging
def log_observation_shapes(step_idx: int, transition: EnvTransition):
obs = transition[TransitionIndex.OBSERVATION]
if obs:
print(f"Step {step_idx} observations:")
for key, value in obs.items():
if isinstance(value, torch.Tensor):
print(f" {key}: shape={value.shape}, device={value.device}, dtype={value.dtype}")
return None
processor.register_after_step_hook(log_observation_shapes)
# Monitor complementary data flow
def monitor_complementary_data(step_idx: int, transition: EnvTransition):
comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA]
if comp_data:
print(f"Step {step_idx} complementary_data: {list(comp_data.keys())}")
return None
processor.register_before_step_hook(monitor_complementary_data)
# Validate data integrity
def validate_tensors(step_idx: int, transition: EnvTransition):
obs = transition[TransitionIndex.OBSERVATION]
if obs:
for key, value in obs.items():
if isinstance(value, torch.Tensor):
if torch.isnan(value).any():
raise ValueError(f"NaN detected in {key} after step {step_idx}")
if torch.isinf(value).any():
raise ValueError(f"Inf detected in {key} after step {step_idx}")
return None
processor.register_after_step_hook(validate_tensors)
```
## Summary
RobotProcessor provides a powerful, modular approach to data preprocessing in robotics:
- **Clear separation of concerns**: Each transformation is a separate, testable unit
- **Proper state management**: Clear distinction between config (JSON) and state (tensors)
- **Device-aware**: Seamless GPU/CPU transfers with `.to(device)`
- **Inter-step communication**: Use `complementary_data` for passing information
- **Easy sharing**: Push to Hugging Face Hub for reproducibility
- **Type safety**: Use `TransitionIndex` instead of magic numbers
- **Debugging tools**: Step through transformations and add monitoring hooks
By following these patterns, your preprocessing code becomes more maintainable, shareable, and robust.
For the full API reference, see the [RobotProcessor API documentation](/api/processor).