docs(debug): enhance debugging guide for processor pipelines

- Streamlined the introduction to clarify the challenges of debugging complex processor pipelines.
- Expanded the section on hooks, detailing their purpose and implementation for runtime monitoring.
- Introduced step-by-step debugging techniques, emphasizing the use of the `step_through()` method for inspecting intermediate states.
- Added examples of feature validation to ensure data structure contracts are met.
- Consolidated best practices for debugging, highlighting the synergy between hooks, step-through debugging, and feature validation.
This commit is contained in:
AdilZouitine
2025-09-16 10:26:05 +02:00
parent cee5a3fec5
commit b12a386334
+226 -642
View File
@@ -1,715 +1,299 @@
# Debug Your Processor Pipeline
Processor pipelines can be complex, especially when chaining multiple transformation steps. This guide provides comprehensive debugging tools and techniques to help you identify issues, optimize performance, and understand data flow through your pipelines.
Processor pipelines can be complex, especially when chaining multiple transformation steps.
Unlike simple function calls, pipelines lack natural observability, you can't easily see what happens
between each step or where things go wrong.
This guide provides debugging tools and techniques specifically designed to address these challenges
and help you understand data flow through your pipelines.
## Quick Debugging Checklist
We'll explore three complementary debugging approaches: **hooks** for runtime monitoring, **step-through debugging** for detailed inspection, and **feature validation** for catching structural mismatches. Each serves a different purpose and together they provide complete visibility into your pipeline's behavior.
When your pipeline isn't working as expected, check these common issues:
## Understanding Hooks
### 1. **Data Type Mismatches**
Hooks are functions that get called at specific points during pipeline execution.
They provide a way to inspect, monitor, or modify data without changing your pipeline code.
Think of them as "event listeners" for your pipeline.
### What is a Hook?
A hook is a callback function that gets automatically invoked at specific moments during pipeline execution.
The concept comes from event-driven programming, imagine you could "hook into" the pipeline's execution flow to observe or react to what's happening.
Think of hooks like inserting checkpoints into your pipeline. Every time the pipeline reaches one of these checkpoints, it pauses briefly to call your hook function, giving you a chance to inspect the current state, log information, and validate data.
A hook is simply a function that accepts two parameters:
- `step_idx: int` - The index of the current processing step (0, 1, 2, etc.)
- `transition: EnvTransition` - The data transition at that point in the pipeline
The beauty of hooks is their non-invasive nature: you can add monitoring, validation, or debugging logic without changing a single line of your pipeline code. The pipeline remains clean and focused on its core logic, while hooks handle the cross-cutting concerns like logging, monitoring, and debugging.
### Before vs After Hooks
The pipeline supports two types of hooks:
- **Before hooks** (`register_before_step_hook`) - Called before each step executes
- **After hooks** (`register_after_step_hook`) - Called after each step completes
```python
# ❌ Problem: PolicyProcessorPipeline gets RobotAction dict
policy_processor = PolicyProcessorPipeline[dict, dict](...)
robot_action = {"joint_1": 0.5} # dict[str, Any]
result = policy_processor(robot_action) # May fail!
def before_hook(step_idx: int, transition: EnvTransition):
"""Called before step processes the transition."""
print(f"About to execute step {step_idx}")
# Useful for: logging, validation, setup
# ✅ Solution: Use RobotProcessorPipeline for robot data
robot_processor = RobotProcessorPipeline[dict, dict](...)
result = robot_processor(robot_action) # Works!
def after_hook(step_idx: int, transition: EnvTransition):
"""Called after step has processed the transition."""
print(f"Completed step {step_idx}")
# Useful for: monitoring results, cleanup, debugging
processor.register_before_step_hook(before_hook)
processor.register_after_step_hook(after_hook)
```
### 2. **Device Mismatches (Now More Subtle!)**
### Implementing a NaN Detection Hook
Modern device handling is sophisticated and can cause subtle issues:
Here's a practical example of a hook that detects NaN values:
```python
# ❌ Problem 1: Multi-GPU preservation vs movement
# DeviceProcessorStep(device="cuda:0") with data on cuda:1
device_processor = DeviceProcessorStep(device="cuda:0")
data_on_cuda1 = {"obs": torch.randn(10).cuda(1)} # On cuda:1
# SUBTLE: Data stays on cuda:1 (preserved), doesn't move to cuda:0!
# This is intentional for Accelerate compatibility but can be confusing
result = device_processor(create_transition(observation=data_on_cuda1))
assert result[TransitionKey.OBSERVATION]["obs"].device.index == 1 # Still on cuda:1!
# ✅ Solution: Use CPU as intermediate if you need specific GPU
steps = [
DeviceProcessorStep(device="cpu"), # Force to CPU first
DeviceProcessorStep(device="cuda:0"), # Then to specific GPU
]
# ❌ Problem 2: Automatic dtype adaptation in normalization
normalizer = NormalizerProcessorStep(stats=stats, dtype=torch.float32) # Configured as float32
input_data = torch.randn(10, dtype=torch.bfloat16) # Input is bfloat16
# SUBTLE: Normalizer automatically adapts to bfloat16!
# This changes internal state and can affect reproducibility
result = normalizer(create_transition(observation={"obs": input_data}))
assert normalizer.dtype == torch.bfloat16 # Changed from float32!
# ✅ Solution: Explicitly control dtype flow
steps = [
DeviceProcessorStep(device="cuda", float_dtype="float32"), # Force consistent dtype
NormalizerProcessorStep(stats=stats, dtype=torch.float32), # Matching dtype
]
# ❌ Problem 3: Mixed precision with normalization statistics
# Statistics on CPU, input on GPU with different dtype
normalizer = NormalizerProcessorStep(stats=cpu_stats, device="cpu")
gpu_input = torch.randn(10, dtype=torch.float16).cuda()
# SUBTLE: Statistics get moved/converted automatically during processing
# This can cause memory spikes and unexpected device transfers
```
### 3. **Missing Statistics**
```python
# ❌ Problem: NormalizerProcessorStep has no stats
# Solution 1: Load with dataset stats
processor = PolicyProcessorPipeline.from_pretrained(
"model_path",
overrides={"normalizer_processor": {"stats": dataset.meta.stats}}
)
# Solution 2: Compute stats from dataset
from lerobot.datasets.compute_stats import compute_stats
stats = compute_stats(dataset)
# Solution 3: Hot-swap statistics at runtime (very useful!)
from lerobot.processor import hotswap_stats
new_processor = hotswap_stats(existing_processor, new_dataset.meta.stats)
# Hot-swap is powerful for:
# - Adapting trained models to new datasets
# - A/B testing different normalization statistics
# - Domain adaptation without retraining
```
## Step-by-Step Pipeline Inspection
Use `step_through()` to see exactly what happens at each transformation stage:
```python
# Inspect data at each transformation stage
for i, intermediate in enumerate(processor.step_through(data)):
print(f"\n=== After step {i}: {processor.steps[i].__class__.__name__} ===")
# Check observation shapes and statistics
obs = intermediate.get(TransitionKey.OBSERVATION)
if obs:
for key, value in obs.items():
if isinstance(value, torch.Tensor):
print(f"{key}: shape={value.shape}, "
f"dtype={value.dtype}, "
f"device={value.device}")
if value.numel() > 0: # Avoid empty tensors
print(f" range=[{value.min():.3f}, {value.max():.3f}], "
f"mean={value.mean():.3f}, std={value.std():.3f}")
# Check action if present
action = intermediate.get(TransitionKey.ACTION)
if action is not None:
if isinstance(action, torch.Tensor):
print(f"action: shape={action.shape}, dtype={action.dtype}, device={action.device}")
if action.numel() > 0:
print(f" range=[{action.min():.3f}, {action.max():.3f}]")
elif isinstance(action, dict):
print(f"action: dict with {len(action)} keys: {list(action.keys())}")
# Check complementary data
comp = intermediate.get(TransitionKey.COMPLEMENTARY_DATA)
if comp:
print(f"complementary_data: {list(comp.keys())}")
```
## Runtime Monitoring with Hooks
Add monitoring hooks without modifying your pipeline code:
```python
# Define monitoring hook
def check_nans(step_idx: int, transition: EnvTransition):
"""Check for NaN values."""
"""Check for NaN values in observations."""
obs = transition.get(TransitionKey.OBSERVATION)
if obs:
for key, value in obs.items():
if isinstance(value, torch.Tensor) and torch.isnan(value).any():
print(f"Warning: NaN detected in {key} at step {step_idx}")
print(f"NaN detected in {key} at step {step_idx}")
# Register hook
# Register the hook to run after each step
processor.register_after_step_hook(check_nans)
# Process data - hook will be called after each step
# Process your data - the hook will be called automatically
output = processor(input_data)
# Remove hook when done debugging
# Remove the hook when done debugging
processor.unregister_after_step_hook(check_nans)
```
## Pipeline Testing and Validation
### How Hooks Work Internally
### Test Individual Steps
Understanding the internal mechanism helps you use hooks more effectively. The pipeline maintains two separate lists: one for before-step hooks and another for after-step hooks. When you register a hook, it's simply appended to the appropriate list.
During execution, the pipeline follows a strict sequence: for each processing step, it first calls all before-hooks in registration order, then executes the actual step transformation, and finally calls all after-hooks in registration order. This creates a predictable, sandwich-like structure around each step.
The key insight is that hooks don't change the core pipeline logic—they're purely additive. The pipeline's `_forward` method orchestrates this dance between hooks and processing steps, ensuring that your debugging or monitoring code runs at exactly the right moments without interfering with the main data flow.
Here's a simplified view of how the pipeline executes hooks:
```python
# Test each step independently
test_transition = create_transition(
observation={"observation.state": torch.randn(7)},
action=torch.randn(4)
)
for i, step in enumerate(processor.steps):
try:
result = step(test_transition)
print(f"✅ Step {i} ({step.__class__.__name__}) passed")
test_transition = result # Use output for next step
except Exception as e:
print(f"❌ Step {i} ({step.__class__.__name__}) failed: {e}")
break
```
### Pipeline Slicing for Debugging
```python
# Test subsets of your pipeline
first_three_steps = processor[:3] # Returns new DataProcessorPipeline
middle_step = processor[2] # Returns single ProcessorStep
# Test partial pipeline
partial_output = first_three_steps(input_data)
print(f"After first 3 steps: {partial_output}")
# Test remaining steps
remaining_steps = processor[3:]
final_output = remaining_steps(partial_output)
```
### Create Test Variations
```python
# Create variations for A/B testing
variant_processor = RobotProcessorPipeline[dict, dict](
steps=processor.steps[:-1] + [alternative_final_step],
name="variant_pipeline"
)
# Compare outputs
original_output = processor(test_data)
variant_output = variant_processor(test_data)
```
## Performance Profiling
### Memory Usage Monitoring
```python
import torch
def memory_hook(step_idx: int, transition: EnvTransition):
"""Monitor GPU memory usage."""
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated() / 1024**3 # GB
cached = torch.cuda.memory_reserved() / 1024**3 # GB
print(f"Step {step_idx}: {allocated:.2f}GB allocated, {cached:.2f}GB cached")
processor.register_after_step_hook(memory_hook)
```
### Processing Time Analysis
```python
import time
from collections import defaultdict
class PerformanceProfiler:
class DataProcessorPipeline:
def __init__(self):
self.step_times = defaultdict(list)
self.start_time = None
self.steps = [...]
self.before_step_hooks = [] # List of before hooks
self.after_step_hooks = [] # List of after hooks
def __call__(self, step_idx: int, transition: EnvTransition):
current_time = time.perf_counter()
if self.start_time is not None:
step_name = processor.steps[step_idx-1].__class__.__name__
elapsed = current_time - self.start_time
self.step_times[step_name].append(elapsed)
self.start_time = current_time
def _forward(self, transition):
"""Internal method that processes the transition through all steps."""
for step_idx, processor_step in enumerate(self.steps):
# 1. Call all BEFORE hooks
for hook in self.before_step_hooks:
hook(step_idx, transition)
def report(self):
print("\n=== Performance Report ===")
for step_name, times in self.step_times.items():
avg_time = sum(times) / len(times) * 1000 # ms
max_time = max(times) * 1000
print(f"{step_name}: {avg_time:.2f}ms avg, {max_time:.2f}ms max")
# 2. Execute the actual processing step
transition = processor_step(transition)
profiler = PerformanceProfiler()
processor.register_after_step_hook(profiler)
# 3. Call all AFTER hooks
for hook in self.after_step_hooks:
hook(step_idx, transition)
# Run your pipeline multiple times
for _ in range(100):
output = processor(test_data)
return transition
# Get performance report
profiler.report()
def register_before_step_hook(self, hook_fn):
self.before_step_hooks.append(hook_fn)
def register_after_step_hook(self, hook_fn):
self.after_step_hooks.append(hook_fn)
```
## Common Issues and Solutions
### Execution Flow
### Issue: "Action should be a PolicyAction type"
The execution flow looks like this:
```python
# Problem: Passing dict to PolicyProcessorPipeline
action_dict = {"joint_1": 0.5} # RobotAction
policy_processor(transition_with_dict_action) # Fails!
# Solution 1: Use RobotProcessorPipeline instead
robot_processor = RobotProcessorPipeline[dict, dict](...)
# Solution 2: Convert to tensor first
action_tensor = torch.tensor([0.5]) # PolicyAction
transition = create_transition(action=action_tensor)
```
Input → Before Hook → Step 0 → After Hook → Before Hook → Step 1 → After Hook → ... → Output
```
### Issue: "Missing required keys in transition"
For example, with 3 steps and both hook types:
```python
# Problem: Incomplete transition
incomplete = {TransitionKey.OBSERVATION: {...}} # Missing ACTION
def timing_before(step_idx, transition):
print(f"⏱️ Starting step {step_idx}")
# Solution: Use create_transition with defaults
complete = create_transition(
observation={...},
action=None, # Explicit None is fine
reward=0.0, # Default values
done=False
)
def validation_after(step_idx, transition):
print(f"✅ Completed step {step_idx}")
processor.register_before_step_hook(timing_before)
processor.register_after_step_hook(validation_after)
# This will output:
# ⏱️ Starting step 0
# ✅ Completed step 0
# ⏱️ Starting step 1
# ✅ Completed step 1
# ⏱️ Starting step 2
# ✅ Completed step 2
```
### Issue: Normalization Statistics Not Found
### Multiple Hooks
You can register multiple hooks of the same type - they execute in the order registered:
```python
# Problem: Processor can't find normalization stats
normalizer = NormalizerProcessorStep(features=..., stats=None) # No stats!
# Solution 1: Compute stats from dataset
from lerobot.datasets.compute_stats import compute_stats
stats = compute_stats(dataset)
normalizer = NormalizerProcessorStep(features=..., stats=stats)
# Solution 2: Load with overrides
processor = PolicyProcessorPipeline.from_pretrained(
"model_path",
overrides={"normalizer_processor": {"stats": dataset.meta.stats}}
)
# Solution 3: Hot-swap statistics (powerful for domain adaptation!)
from lerobot.processor import hotswap_stats
# Load model trained on dataset A
trained_processor = PolicyProcessorPipeline.from_pretrained("model_trained_on_dataset_A")
# Adapt to dataset B without retraining
adapted_processor = hotswap_stats(trained_processor, dataset_B.meta.stats)
# Now works with dataset B's data distribution!
```
### Issue: GPU Out of Memory
```python
# Problem: Large batches on GPU
# Solution: Use float16 and optimize order
steps = [
DeviceProcessorStep(device="cuda", float_dtype="float16"), # Use half precision
NormalizerProcessorStep(...), # Normalize in half precision
]
```
## Debugging Complex Pipelines
### Phone Teleoperation Example
```python
# Debug a complex phone → robot pipeline
phone_pipeline = RobotProcessorPipeline[RobotAction, RobotAction](
steps=[
MapPhoneActionToRobotAction(platform=PhoneOS.IOS),
AddRobotObservationAsComplimentaryData(robot=robot),
EEReferenceAndDelta(kinematics=solver, ...),
EEBoundsAndSafety(...),
InverseKinematicsEEToJoints(...),
GripperVelocityToJoint(...),
]
)
# Test with mock phone input
mock_phone_action = {
"phone.pos": [0.1, 0.0, 0.0],
"phone.rot": Rotation.identity(),
"phone.enabled": True,
"phone.raw_inputs": {"a3": 0.5} # iOS button
}
# Step through to see transformations
for i, result in enumerate(phone_pipeline.step_through(mock_phone_action)):
action = result.get(TransitionKey.ACTION, {})
print(f"\nStep {i} output keys: {list(action.keys())}")
# Check specific transformations
if i == 0: # After MapPhoneActionToRobotAction
assert "target_x" in action, "Phone mapping failed"
elif i == 2: # After EEReferenceAndDelta
assert "ee.x" in action, "EE reference calculation failed"
elif i == 4: # After InverseKinematicsEEToJoints
assert "shoulder_pan.pos" in action, "IK failed"
```
## Registry Debugging
```python
# List all available processors
from lerobot.processor import ProcessorStepRegistry
print("Available processors:")
for name in sorted(ProcessorStepRegistry.list()):
cls = ProcessorStepRegistry.get(name)
print(f" {name}: {cls.__name__}")
# Check if a processor is registered
if "my_custom_processor" in ProcessorStepRegistry.list():
print("✅ Custom processor is registered")
else:
print("❌ Custom processor not found - check @ProcessorStepRegistry.register()")
```
## Best Practices for Debugging
### 1. **Start Simple**
```python
# Test with minimal data first
minimal_transition = create_transition(
observation={"observation.state": torch.randn(1, 7)},
action=torch.randn(1, 4)
)
```
### 2. **Test Each Step Individually**
```python
# Don't test the whole pipeline at once
for step in processor.steps:
try:
output = step(test_transition)
print(f"✅ {step.__class__.__name__} works")
except Exception as e:
print(f"❌ {step.__class__.__name__} failed: {e}")
break
```
### 3. **Use Hooks for Continuous Monitoring**
```python
# Add permanent monitoring for production
def production_monitor(step_idx: int, transition: EnvTransition):
"""Log critical issues only."""
def log_shapes(step_idx: int, transition: EnvTransition):
obs = transition.get(TransitionKey.OBSERVATION)
if obs:
print(f"Step {step_idx} observation shapes:")
for key, value in obs.items():
if isinstance(value, torch.Tensor):
print(f" {key}: {value.shape}")
processor.register_after_step_hook(check_nans) # Executes first
processor.register_after_step_hook(log_shapes) # Executes second
# Both hooks will be called after each step in registration order
output = processor(input_data)
```
While hooks are excellent for monitoring specific issues (like NaN detection) or gathering metrics during normal pipeline execution, sometimes you need to dive deeper. When you want to understand exactly what happens at each step or debug complex transformation logic, step-through debugging provides the detailed inspection you need.
## Step-Through Debugging
Step-through debugging is like having a slow-motion replay for your pipeline. Instead of watching your data get transformed in one quick blur from input to output, you can pause and examine what happens after each individual step.
This approach is particularly valuable when you're trying to understand a complex pipeline, debug unexpected behavior, or verify that each transformation is working as expected. Unlike hooks, which are great for automated monitoring, step-through debugging gives you manual, interactive control over the inspection process.
The `step_through()` method is a generator that yields the transition state after each processing step, allowing you to inspect intermediate results. Think of it as creating a series of snapshots of your data as it flows through the pipeline—each snapshot shows you exactly what your data looks like after one more transformation has been applied.
### How Step-Through Works
The `step_through()` method fundamentally changes how the pipeline executes. Instead of running all steps in sequence and only returning the final result, it transforms the pipeline into an iterator that yields intermediate results.
Here's what happens internally: the method starts by converting your input data into the pipeline's internal transition format, then yields this initial state. Next, it applies the first processing step and yields the result. Then it applies the second step to that result and yields again, and so on. Each `yield` gives you a complete snapshot of the transition at that point.
This generator pattern is powerful because it's lazy—the pipeline only computes the next step when you ask for it. This means you can stop at any point, inspect the current state thoroughly, and decide whether to continue. You're not forced to run the entire pipeline just to debug one problematic step.
Instead of running the entire pipeline and only seeing the final result, `step_through()` pauses after each step and gives you the intermediate transition:
```python
# This creates a generator that yields intermediate states
for i, intermediate_result in enumerate(processor.step_through(input_data)):
print(f"=== After step {i} ===")
# Inspect the observation at this stage
obs = intermediate_result.get(TransitionKey.OBSERVATION)
if obs:
for key, value in obs.items():
if isinstance(value, torch.Tensor):
if torch.isnan(value).any():
print(f"🚨 NaN detected in {key} at step {step_idx}")
if torch.isinf(value).any():
print(f"🚨 Inf detected in {key} at step {step_idx}")
processor.register_after_step_hook(production_monitor)
print(f"{key}: shape={value.shape}, dtype={value.dtype}")
```
### 4. **Validate Feature Contracts**
### Interactive Debugging with Breakpoints
You can add breakpoints in the step-through loop to interactively debug:
```python
# Check that your pipeline produces expected features
initial_features = {...} # Your input features
# Step through the pipeline with debugging
for i, intermediate in enumerate(processor.step_through(data)):
print(f"Step {i}: {processor.steps[i].__class__.__name__}")
# Set a breakpoint to inspect the current state
breakpoint() # Debugger will pause here
# You can now inspect 'intermediate' in the debugger:
# - Check tensor shapes and values
# - Verify expected transformations
# - Look for unexpected changes
```
During the debugger session, you can:
- Examine `intermediate[TransitionKey.OBSERVATION]` to see observation data
- Check `intermediate[TransitionKey.ACTION]` for action transformations
- Inspect any part of the transition to understand what each step does
Step-through debugging is perfect for understanding the _data_ transformations, but what about the _structure_ of that data? While hooks and step-through help you debug runtime behavior, you also need to ensure your pipeline produces data in the format expected by downstream components. This is where feature contract validation comes in.
## Validating Feature Contracts
Feature contracts define what data structure your pipeline expects as input and produces as output.
Validating these contracts helps catch mismatches early.
### Understanding Feature Contracts
Each processor step has a `transform_features()` method that describes how it changes the data structure:
```python
# Get the expected output features from your pipeline
initial_features = {
PipelineFeatureType.OBSERVATION: {
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(7,)),
"observation.image": PolicyFeature(type=FeatureType.IMAGE, shape=(3, 224, 224))
},
PipelineFeatureType.ACTION: {
"action": PolicyFeature(type=FeatureType.ACTION, shape=(4,))
}
}
# Check what your pipeline will output
output_features = processor.transform_features(initial_features)
print("Input features:", list(initial_features.keys()))
print("Output features:", list(output_features.keys()))
print("Input features:")
for feature_type, features in initial_features.items():
print(f" {feature_type}:")
for key, feature in features.items():
print(f" {key}: {feature.type.value}, shape={feature.shape}")
# Verify expected features exist
expected_keys = ["observation.state", "action"]
for key in expected_keys:
if key not in output_features:
print(f"❌ Missing expected feature: {key}")
print("\nOutput features:")
for feature_type, features in output_features.items():
print(f" {feature_type}:")
for key, feature in features.items():
print(f" {key}: {feature.type.value}, shape={feature.shape}")
```
## Troubleshooting Specific Processors
### Verifying Expected Features
### Normalization Issues
Check that your pipeline produces the features you expect:
```python
# Debug normalization problems
normalizer = NormalizerProcessorStep(...)
# Define what features you expect the pipeline to produce
expected_keys = ["observation.state", "observation.image", "action"]
# Check statistics
print("Available stats:", list(normalizer._tensor_stats.keys()))
for key, stats in normalizer._tensor_stats.items():
print(f"{key}: {list(stats.keys())}")
for stat_name, tensor in stats.items():
print(f" {stat_name}: {tensor}")
print("Validating feature contract...")
for expected_key in expected_keys:
found = False
for feature_type, features in output_features.items():
if expected_key in features:
feature = features[expected_key]
print(f"✅ {expected_key}: {feature.type.value}, shape={feature.shape}")
found = True
break
# Test normalization manually
test_value = torch.tensor([1.0, 2.0, 3.0])
normalized = normalizer._apply_transform(test_value, "test_key", FeatureType.STATE)
print(f"Original: {test_value}")
print(f"Normalized: {normalized}")
if not found:
print(f"❌ Missing expected feature: {expected_key}")
```
### Tokenization Issues
```python
# Debug tokenizer problems
tokenizer_step = TokenizerProcessorStep(...)
# Test tokenization manually
test_transition = create_transition(
complementary_data={"task": "pick up the red cube"}
)
tokenizer_step.transition = test_transition # Set current transition
task = tokenizer_step.get_task(test_transition)
print(f"Extracted task: {task}")
if task:
tokens = tokenizer_step._tokenize_text(task)
print(f"Tokens: {tokens}")
```
### Device Transfer Issues (Advanced Debugging)
Modern device handling has subtle behaviors that can cause issues:
```python
# Debug device processor with multi-GPU awareness
device_step = DeviceProcessorStep(device="cuda:0", float_dtype="float16")
print(f"Target device: {device_step.tensor_device}")
print(f"Non-blocking: {device_step.non_blocking}")
print(f"Target dtype: {device_step._target_float_dtype}")
# Test 1: GPU-to-GPU preservation (Accelerate compatibility)
tensor_on_cuda1 = torch.randn(10).cuda(1)
processed = device_step._process_tensor(tensor_on_cuda1)
print(f"cuda:1 → cuda:0 config: stays on cuda:{processed.device.index}") # Stays on cuda:1!
# Test 2: CPU-to-GPU movement
cpu_tensor = torch.randn(10)
processed = device_step._process_tensor(cpu_tensor)
print(f"CPU → cuda:0 config: moves to cuda:{processed.device.index}") # Moves to cuda:0
# Test 3: Automatic dtype adaptation in normalization
normalizer = NormalizerProcessorStep(stats=stats, dtype=torch.float32)
bfloat16_input = torch.randn(10, dtype=torch.bfloat16)
# Before processing
print(f"Normalizer dtype before: {normalizer.dtype}")
print(f"Stats dtype before: {list(normalizer._tensor_stats.values())[0]['mean'].dtype}")
# Process data
transition = create_transition(observation={"obs": bfloat16_input})
result = normalizer(transition)
# After processing - automatic adaptation!
print(f"Normalizer dtype after: {normalizer.dtype}") # Changed to bfloat16!
print(f"Stats dtype after: {list(normalizer._tensor_stats.values())[0]['mean'].dtype}") # bfloat16!
print(f"Output dtype: {result[TransitionKey.OBSERVATION]['obs'].dtype}") # bfloat16
```
### Multi-GPU Debugging Patterns
```python
# Test multi-GPU behavior
def debug_multi_gpu_behavior():
if torch.cuda.device_count() < 2:
print("Need 2+ GPUs for this test")
return
processor = DeviceProcessorStep(device="cuda:0")
# Test data on different GPUs
test_cases = [
("CPU", torch.randn(5)),
("cuda:0", torch.randn(5).cuda(0)),
("cuda:1", torch.randn(5).cuda(1)),
]
for name, tensor in test_cases:
transition = create_transition(observation={"test": tensor})
result = processor(transition)
output_device = result[TransitionKey.OBSERVATION]["test"].device
print(f"{name} input → {output_device} output")
# Expected:
# CPU input → cuda:0 output (moved)
# cuda:0 input → cuda:0 output (preserved)
# cuda:1 input → cuda:1 output (preserved, not moved!)
debug_multi_gpu_behavior()
```
## Pipeline Optimization
### Performance Bottleneck Detection
```python
import time
from collections import defaultdict
class DetailedProfiler:
def __init__(self):
self.step_times = defaultdict(list)
self.memory_usage = defaultdict(list)
self.step_start = None
def before_step(self, step_idx: int, transition: EnvTransition):
self.step_start = time.perf_counter()
if torch.cuda.is_available():
torch.cuda.synchronize() # Ensure accurate timing
def after_step(self, step_idx: int, transition: EnvTransition):
if self.step_start is not None:
elapsed = time.perf_counter() - self.step_start
step_name = processor.steps[step_idx].__class__.__name__
self.step_times[step_name].append(elapsed * 1000) # ms
if torch.cuda.is_available():
memory_mb = torch.cuda.memory_allocated() / 1024**2
self.memory_usage[step_name].append(memory_mb)
def report(self):
print("\n=== Detailed Performance Report ===")
total_time = 0
for step_name, times in self.step_times.items():
avg_time = sum(times) / len(times)
max_time = max(times)
min_time = min(times)
total_time += avg_time
print(f"{step_name}:")
print(f" Time: {avg_time:.2f}ms avg, {min_time:.2f}-{max_time:.2f}ms range")
if step_name in self.memory_usage:
memory_vals = self.memory_usage[step_name]
avg_memory = sum(memory_vals) / len(memory_vals)
print(f" Memory: {avg_memory:.1f}MB avg")
print(f"\nTotal pipeline time: {total_time:.2f}ms")
profiler = DetailedProfiler()
processor.register_before_step_hook(profiler.before_step)
processor.register_after_step_hook(profiler.after_step)
```
### Memory Optimization
```python
# Optimize memory usage
def optimize_for_memory():
return PolicyProcessorPipeline[dict, dict](
steps=[
# Use float16 early to save memory
DeviceProcessorStep(device="cuda", float_dtype="float16"),
# Normalize on GPU in half precision
NormalizerProcessorStep(...),
# Process in chunks if needed
# ChunkProcessorStep(chunk_size=32), # Custom step
]
)
```
## Integration Testing
### End-to-End Robot Pipeline Test
```python
# Test complete robot control pipeline
def test_robot_pipeline():
# Mock robot observation
robot_obs = {
"shoulder_pan.pos": 0.0,
"shoulder_lift.pos": -90.0,
"elbow_flex.pos": 90.0,
"wrist_flex.pos": 0.0,
"wrist_roll.pos": 0.0,
"gripper.pos": 50.0,
"camera_front": np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
}
# Test observation processing
obs_processor = RobotProcessorPipeline[dict, EnvTransition](
steps=[VanillaObservationProcessorStep()],
to_transition=observation_to_transition,
to_output=identity_transition
)
processed_obs = obs_processor(robot_obs)
print("✅ Observation processing works")
# Test action processing
mock_ee_action = {
"ee.x": 0.5, "ee.y": 0.0, "ee.z": 0.3,
"ee.wx": 0.0, "ee.wy": 0.0, "ee.wz": 0.0,
"gripper": 0.1
}
action_processor = RobotProcessorPipeline[dict, dict](
steps=[
# Your action processing steps
],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action
)
joint_action = action_processor(mock_ee_action)
print("✅ Action processing works")
print(f"Joint commands: {list(joint_action.keys())}")
test_robot_pipeline()
```
## Error Recovery and Fallbacks
```python
# Robust pipeline with fallbacks
class RobustProcessor:
def __init__(self, primary_processor, fallback_processor=None):
self.primary = primary_processor
self.fallback = fallback_processor or IdentityProcessorStep()
def __call__(self, transition):
try:
return self.primary(transition)
except Exception as e:
print(f"⚠️ Primary processor failed: {e}")
print("🔄 Using fallback processor")
return self.fallback(transition)
# Usage
robust_pipeline = RobustProcessor(
primary_processor=complex_pipeline,
fallback_processor=simple_pipeline
)
```
This validation helps ensure your pipeline will work correctly with downstream components that expect specific data structures.
## Summary
Effective pipeline debugging involves:
Now that you understand the three debugging approaches, you can tackle any pipeline issue systematically:
1. **Step-by-step inspection** to understand data flow
2. **Runtime hooks** for continuous monitoring
3. **Individual step testing** to isolate issues
4. **Performance profiling** to identify bottlenecks
5. **Type validation** to catch data structure mismatches
6. **Fallback strategies** for robust deployment
1. **Hooks** - For runtime monitoring and validation without modifying pipeline code
2. **Step-through** - For inspecting intermediate states and understanding transformations
3. **Feature validation** - For ensuring data structure contracts are met
Remember: Start simple, test incrementally, and use the rich debugging tools LeRobot provides to build reliable, high-performance processor pipelines!
**When to use each approach:**
- Start with **step-through debugging** when you need to understand what your pipeline does or when something unexpected happens
- Add **hooks** for continuous monitoring during development and production to catch issues automatically
- Use **feature validation** before deployment to ensure your pipeline works with downstream components
These three tools work together to give you the complete observability that complex pipelines naturally lack. With hooks watching for issues, step-through helping you understand behavior, and feature validation ensuring compatibility, you'll be able to debug any pipeline confidently and efficiently.