# Debug Your Processor Pipeline Processor pipelines can be complex, especially when chaining multiple transformation steps. This guide provides comprehensive debugging tools and techniques to help you identify issues, optimize performance, and understand data flow through your pipelines. ## Quick Debugging Checklist When your pipeline isn't working as expected, check these common issues: ### 1. **Data Type Mismatches** ```python # ❌ Problem: PolicyProcessorPipeline gets RobotAction dict policy_processor = PolicyProcessorPipeline[dict, dict](...) robot_action = {"joint_1": 0.5} # dict[str, Any] result = policy_processor(robot_action) # May fail! # ✅ Solution: Use RobotProcessorPipeline for robot data robot_processor = RobotProcessorPipeline[dict, dict](...) result = robot_processor(robot_action) # Works! ``` ### 2. **Device Mismatches (Now More Subtle!)** Modern device handling is sophisticated and can cause subtle issues: ```python # ❌ Problem 1: Multi-GPU preservation vs movement # DeviceProcessorStep(device="cuda:0") with data on cuda:1 device_processor = DeviceProcessorStep(device="cuda:0") data_on_cuda1 = {"obs": torch.randn(10).cuda(1)} # On cuda:1 # SUBTLE: Data stays on cuda:1 (preserved), doesn't move to cuda:0! # This is intentional for Accelerate compatibility but can be confusing result = device_processor(create_transition(observation=data_on_cuda1)) assert result[TransitionKey.OBSERVATION]["obs"].device.index == 1 # Still on cuda:1! # ✅ Solution: Use CPU as intermediate if you need specific GPU steps = [ DeviceProcessorStep(device="cpu"), # Force to CPU first DeviceProcessorStep(device="cuda:0"), # Then to specific GPU ] # ❌ Problem 2: Automatic dtype adaptation in normalization normalizer = NormalizerProcessorStep(stats=stats, dtype=torch.float32) # Configured as float32 input_data = torch.randn(10, dtype=torch.bfloat16) # Input is bfloat16 # SUBTLE: Normalizer automatically adapts to bfloat16! # This changes internal state and can affect reproducibility result = normalizer(create_transition(observation={"obs": input_data})) assert normalizer.dtype == torch.bfloat16 # Changed from float32! # ✅ Solution: Explicitly control dtype flow steps = [ DeviceProcessorStep(device="cuda", float_dtype="float32"), # Force consistent dtype NormalizerProcessorStep(stats=stats, dtype=torch.float32), # Matching dtype ] # ❌ Problem 3: Mixed precision with normalization statistics # Statistics on CPU, input on GPU with different dtype normalizer = NormalizerProcessorStep(stats=cpu_stats, device="cpu") gpu_input = torch.randn(10, dtype=torch.float16).cuda() # SUBTLE: Statistics get moved/converted automatically during processing # This can cause memory spikes and unexpected device transfers ``` ### 3. **Missing Statistics** ```python # ❌ Problem: NormalizerProcessorStep has no stats # Solution 1: Load with dataset stats processor = PolicyProcessorPipeline.from_pretrained( "model_path", overrides={"normalizer_processor": {"stats": dataset.meta.stats}} ) # Solution 2: Compute stats from dataset from lerobot.datasets.compute_stats import compute_stats stats = compute_stats(dataset) # Solution 3: Hot-swap statistics at runtime (very useful!) from lerobot.processor import hotswap_stats new_processor = hotswap_stats(existing_processor, new_dataset.meta.stats) # Hot-swap is powerful for: # - Adapting trained models to new datasets # - A/B testing different normalization statistics # - Domain adaptation without retraining ``` ## Step-by-Step Pipeline Inspection Use `step_through()` to see exactly what happens at each transformation stage: ```python # Inspect data at each transformation stage for i, intermediate in enumerate(processor.step_through(data)): print(f"\n=== After step {i}: {processor.steps[i].__class__.__name__} ===") # Check observation shapes and statistics obs = intermediate.get(TransitionKey.OBSERVATION) if obs: for key, value in obs.items(): if isinstance(value, torch.Tensor): print(f"{key}: shape={value.shape}, " f"dtype={value.dtype}, " f"device={value.device}") if value.numel() > 0: # Avoid empty tensors print(f" range=[{value.min():.3f}, {value.max():.3f}], " f"mean={value.mean():.3f}, std={value.std():.3f}") # Check action if present action = intermediate.get(TransitionKey.ACTION) if action is not None: if isinstance(action, torch.Tensor): print(f"action: shape={action.shape}, dtype={action.dtype}, device={action.device}") if action.numel() > 0: print(f" range=[{action.min():.3f}, {action.max():.3f}]") elif isinstance(action, dict): print(f"action: dict with {len(action)} keys: {list(action.keys())}") # Check complementary data comp = intermediate.get(TransitionKey.COMPLEMENTARY_DATA) if comp: print(f"complementary_data: {list(comp.keys())}") ``` ## Runtime Monitoring with Hooks Add monitoring hooks without modifying your pipeline code: ```python # Define monitoring hooks def log_shapes(step_idx: int, transition: EnvTransition): """Log tensor shapes after each step.""" obs = transition.get(TransitionKey.OBSERVATION) if obs: print(f"Step {step_idx} shapes:") for key, value in obs.items(): if isinstance(value, torch.Tensor): print(f" {key}: {value.shape}") def check_nans(step_idx: int, transition: EnvTransition): """Check for NaN values.""" obs = transition.get(TransitionKey.OBSERVATION) if obs: for key, value in obs.items(): if isinstance(value, torch.Tensor) and torch.isnan(value).any(): print(f"Warning: NaN detected in {key} at step {step_idx}") def measure_performance(step_idx: int, transition: EnvTransition): """Measure processing time per step.""" import time start_time = getattr(measure_performance, 'start_time', time.time()) if step_idx == 0: measure_performance.start_time = time.time() else: elapsed = time.time() - start_time print(f"Step {step_idx-1} took {elapsed*1000:.2f}ms") measure_performance.start_time = time.time() # Register hooks processor.register_after_step_hook(log_shapes) processor.register_after_step_hook(check_nans) processor.register_after_step_hook(measure_performance) # Process data - hooks will be called after each step output = processor(input_data) # Remove hooks when done debugging processor.unregister_after_step_hook(log_shapes) processor.unregister_after_step_hook(check_nans) processor.unregister_after_step_hook(measure_performance) ``` ## Pipeline Testing and Validation ### Test Individual Steps ```python # Test each step independently test_transition = create_transition( observation={"observation.state": torch.randn(7)}, action=torch.randn(4) ) for i, step in enumerate(processor.steps): try: result = step(test_transition) print(f"✅ Step {i} ({step.__class__.__name__}) passed") test_transition = result # Use output for next step except Exception as e: print(f"❌ Step {i} ({step.__class__.__name__}) failed: {e}") break ``` ### Pipeline Slicing for Debugging ```python # Test subsets of your pipeline first_three_steps = processor[:3] # Returns new DataProcessorPipeline middle_step = processor[2] # Returns single ProcessorStep # Test partial pipeline partial_output = first_three_steps(input_data) print(f"After first 3 steps: {partial_output}") # Test remaining steps remaining_steps = processor[3:] final_output = remaining_steps(partial_output) ``` ### Create Test Variations ```python # Create variations for A/B testing variant_processor = RobotProcessorPipeline[dict, dict]( steps=processor.steps[:-1] + [alternative_final_step], name="variant_pipeline" ) # Compare outputs original_output = processor(test_data) variant_output = variant_processor(test_data) ``` ## Performance Profiling ### Memory Usage Monitoring ```python import torch def memory_hook(step_idx: int, transition: EnvTransition): """Monitor GPU memory usage.""" if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated() / 1024**3 # GB cached = torch.cuda.memory_reserved() / 1024**3 # GB print(f"Step {step_idx}: {allocated:.2f}GB allocated, {cached:.2f}GB cached") processor.register_after_step_hook(memory_hook) ``` ### Processing Time Analysis ```python import time from collections import defaultdict class PerformanceProfiler: def __init__(self): self.step_times = defaultdict(list) self.start_time = None def __call__(self, step_idx: int, transition: EnvTransition): current_time = time.perf_counter() if self.start_time is not None: step_name = processor.steps[step_idx-1].__class__.__name__ elapsed = current_time - self.start_time self.step_times[step_name].append(elapsed) self.start_time = current_time def report(self): print("\n=== Performance Report ===") for step_name, times in self.step_times.items(): avg_time = sum(times) / len(times) * 1000 # ms print(f"{step_name}: {avg_time:.2f}ms avg ({len(times)} calls)") profiler = PerformanceProfiler() processor.register_after_step_hook(profiler) # Run your pipeline for _ in range(100): output = processor(test_data) # Get performance report profiler.report() ``` ## Common Issues and Solutions ### Issue: "Action should be a PolicyAction type" ```python # Problem: Passing dict to PolicyProcessorPipeline action_dict = {"joint_1": 0.5} # RobotAction policy_processor(transition_with_dict_action) # Fails! # Solution 1: Use RobotProcessorPipeline instead robot_processor = RobotProcessorPipeline[dict, dict](...) # Solution 2: Convert to tensor first action_tensor = torch.tensor([0.5]) # PolicyAction transition = create_transition(action=action_tensor) ``` ### Issue: "Missing required keys in transition" ```python # Problem: Incomplete transition incomplete = {TransitionKey.OBSERVATION: {...}} # Missing ACTION # Solution: Use create_transition with defaults complete = create_transition( observation={...}, action=None, # Explicit None is fine reward=0.0, # Default values done=False ) ``` ### Issue: Normalization Statistics Not Found ```python # Problem: Processor can't find normalization stats normalizer = NormalizerProcessorStep(features=..., stats=None) # No stats! # Solution 1: Compute stats from dataset from lerobot.datasets.compute_stats import compute_stats stats = compute_stats(dataset) normalizer = NormalizerProcessorStep(features=..., stats=stats) # Solution 2: Load with overrides processor = PolicyProcessorPipeline.from_pretrained( "model_path", overrides={"normalizer_processor": {"stats": dataset.meta.stats}} ) # Solution 3: Hot-swap statistics (powerful for domain adaptation!) from lerobot.processor import hotswap_stats # Load model trained on dataset A trained_processor = PolicyProcessorPipeline.from_pretrained("model_trained_on_dataset_A") # Adapt to dataset B without retraining adapted_processor = hotswap_stats(trained_processor, dataset_B.meta.stats) # Now works with dataset B's data distribution! ``` ### Issue: GPU Out of Memory ```python # Problem: Large batches on GPU # Solution: Use float16 and optimize order steps = [ DeviceProcessorStep(device="cuda", float_dtype="float16"), # Use half precision NormalizerProcessorStep(...), # Normalize in half precision ] ``` ## Debugging Complex Pipelines ### Phone Teleoperation Example ```python # Debug a complex phone → robot pipeline phone_pipeline = RobotProcessorPipeline[RobotAction, RobotAction]( steps=[ MapPhoneActionToRobotAction(platform=PhoneOS.IOS), AddRobotObservationAsComplimentaryData(robot=robot), EEReferenceAndDelta(kinematics=solver, ...), EEBoundsAndSafety(...), InverseKinematicsEEToJoints(...), GripperVelocityToJoint(...), ] ) # Test with mock phone input mock_phone_action = { "phone.pos": [0.1, 0.0, 0.0], "phone.rot": Rotation.identity(), "phone.enabled": True, "phone.raw_inputs": {"a3": 0.5} # iOS button } # Step through to see transformations for i, result in enumerate(phone_pipeline.step_through(mock_phone_action)): action = result.get(TransitionKey.ACTION, {}) print(f"\nStep {i} output keys: {list(action.keys())}") # Check specific transformations if i == 0: # After MapPhoneActionToRobotAction assert "target_x" in action, "Phone mapping failed" elif i == 2: # After EEReferenceAndDelta assert "ee.x" in action, "EE reference calculation failed" elif i == 4: # After InverseKinematicsEEToJoints assert "shoulder_pan.pos" in action, "IK failed" ``` ## Registry Debugging ```python # List all available processors from lerobot.processor import ProcessorStepRegistry print("Available processors:") for name in sorted(ProcessorStepRegistry.list()): cls = ProcessorStepRegistry.get(name) print(f" {name}: {cls.__name__}") # Check if a processor is registered if "my_custom_processor" in ProcessorStepRegistry.list(): print("✅ Custom processor is registered") else: print("❌ Custom processor not found - check @ProcessorStepRegistry.register()") ``` ## Best Practices for Debugging ### 1. **Start Simple** ```python # Test with minimal data first minimal_transition = create_transition( observation={"observation.state": torch.randn(1, 7)}, action=torch.randn(1, 4) ) ``` ### 2. **Test Each Step Individually** ```python # Don't test the whole pipeline at once for step in processor.steps: try: output = step(test_transition) print(f"✅ {step.__class__.__name__} works") except Exception as e: print(f"❌ {step.__class__.__name__} failed: {e}") break ``` ### 3. **Use Hooks for Continuous Monitoring** ```python # Add permanent monitoring for production def production_monitor(step_idx: int, transition: EnvTransition): """Log critical issues only.""" obs = transition.get(TransitionKey.OBSERVATION) if obs: for key, value in obs.items(): if isinstance(value, torch.Tensor): if torch.isnan(value).any(): print(f"🚨 NaN detected in {key} at step {step_idx}") if torch.isinf(value).any(): print(f"🚨 Inf detected in {key} at step {step_idx}") processor.register_after_step_hook(production_monitor) ``` ### 4. **Validate Feature Contracts** ```python # Check that your pipeline produces expected features initial_features = {...} # Your input features output_features = processor.transform_features(initial_features) print("Input features:", list(initial_features.keys())) print("Output features:", list(output_features.keys())) # Verify expected features exist expected_keys = ["observation.state", "action"] for key in expected_keys: if key not in output_features: print(f"❌ Missing expected feature: {key}") ``` ## Troubleshooting Specific Processors ### Normalization Issues ```python # Debug normalization problems normalizer = NormalizerProcessorStep(...) # Check statistics print("Available stats:", list(normalizer._tensor_stats.keys())) for key, stats in normalizer._tensor_stats.items(): print(f"{key}: {list(stats.keys())}") for stat_name, tensor in stats.items(): print(f" {stat_name}: {tensor}") # Test normalization manually test_value = torch.tensor([1.0, 2.0, 3.0]) normalized = normalizer._apply_transform(test_value, "test_key", FeatureType.STATE) print(f"Original: {test_value}") print(f"Normalized: {normalized}") ``` ### Tokenization Issues ```python # Debug tokenizer problems tokenizer_step = TokenizerProcessorStep(...) # Test tokenization manually test_transition = create_transition( complementary_data={"task": "pick up the red cube"} ) tokenizer_step.transition = test_transition # Set current transition task = tokenizer_step.get_task(test_transition) print(f"Extracted task: {task}") if task: tokens = tokenizer_step._tokenize_text(task) print(f"Tokens: {tokens}") ``` ### Device Transfer Issues (Advanced Debugging) Modern device handling has subtle behaviors that can cause issues: ```python # Debug device processor with multi-GPU awareness device_step = DeviceProcessorStep(device="cuda:0", float_dtype="float16") print(f"Target device: {device_step.tensor_device}") print(f"Non-blocking: {device_step.non_blocking}") print(f"Target dtype: {device_step._target_float_dtype}") # Test 1: GPU-to-GPU preservation (Accelerate compatibility) tensor_on_cuda1 = torch.randn(10).cuda(1) processed = device_step._process_tensor(tensor_on_cuda1) print(f"cuda:1 → cuda:0 config: stays on cuda:{processed.device.index}") # Stays on cuda:1! # Test 2: CPU-to-GPU movement cpu_tensor = torch.randn(10) processed = device_step._process_tensor(cpu_tensor) print(f"CPU → cuda:0 config: moves to cuda:{processed.device.index}") # Moves to cuda:0 # Test 3: Automatic dtype adaptation in normalization normalizer = NormalizerProcessorStep(stats=stats, dtype=torch.float32) bfloat16_input = torch.randn(10, dtype=torch.bfloat16) # Before processing print(f"Normalizer dtype before: {normalizer.dtype}") print(f"Stats dtype before: {list(normalizer._tensor_stats.values())[0]['mean'].dtype}") # Process data transition = create_transition(observation={"obs": bfloat16_input}) result = normalizer(transition) # After processing - automatic adaptation! print(f"Normalizer dtype after: {normalizer.dtype}") # Changed to bfloat16! print(f"Stats dtype after: {list(normalizer._tensor_stats.values())[0]['mean'].dtype}") # bfloat16! print(f"Output dtype: {result[TransitionKey.OBSERVATION]['obs'].dtype}") # bfloat16 ``` ### Multi-GPU Debugging Patterns ```python # Test multi-GPU behavior def debug_multi_gpu_behavior(): if torch.cuda.device_count() < 2: print("Need 2+ GPUs for this test") return processor = DeviceProcessorStep(device="cuda:0") # Test data on different GPUs test_cases = [ ("CPU", torch.randn(5)), ("cuda:0", torch.randn(5).cuda(0)), ("cuda:1", torch.randn(5).cuda(1)), ] for name, tensor in test_cases: transition = create_transition(observation={"test": tensor}) result = processor(transition) output_device = result[TransitionKey.OBSERVATION]["test"].device print(f"{name} input → {output_device} output") # Expected: # CPU input → cuda:0 output (moved) # cuda:0 input → cuda:0 output (preserved) # cuda:1 input → cuda:1 output (preserved, not moved!) debug_multi_gpu_behavior() ``` ## Pipeline Optimization ### Performance Bottleneck Detection ```python import time from collections import defaultdict class DetailedProfiler: def __init__(self): self.step_times = defaultdict(list) self.memory_usage = defaultdict(list) self.step_start = None def before_step(self, step_idx: int, transition: EnvTransition): self.step_start = time.perf_counter() if torch.cuda.is_available(): torch.cuda.synchronize() # Ensure accurate timing def after_step(self, step_idx: int, transition: EnvTransition): if self.step_start is not None: elapsed = time.perf_counter() - self.step_start step_name = processor.steps[step_idx].__class__.__name__ self.step_times[step_name].append(elapsed * 1000) # ms if torch.cuda.is_available(): memory_mb = torch.cuda.memory_allocated() / 1024**2 self.memory_usage[step_name].append(memory_mb) def report(self): print("\n=== Detailed Performance Report ===") total_time = 0 for step_name, times in self.step_times.items(): avg_time = sum(times) / len(times) max_time = max(times) min_time = min(times) total_time += avg_time print(f"{step_name}:") print(f" Time: {avg_time:.2f}ms avg, {min_time:.2f}-{max_time:.2f}ms range") if step_name in self.memory_usage: memory_vals = self.memory_usage[step_name] avg_memory = sum(memory_vals) / len(memory_vals) print(f" Memory: {avg_memory:.1f}MB avg") print(f"\nTotal pipeline time: {total_time:.2f}ms") profiler = DetailedProfiler() processor.register_before_step_hook(profiler.before_step) processor.register_after_step_hook(profiler.after_step) ``` ### Memory Optimization ```python # Optimize memory usage def optimize_for_memory(): return PolicyProcessorPipeline[dict, dict]( steps=[ # Use float16 early to save memory DeviceProcessorStep(device="cuda", float_dtype="float16"), # Normalize on GPU in half precision NormalizerProcessorStep(...), # Process in chunks if needed # ChunkProcessorStep(chunk_size=32), # Custom step ] ) ``` ## Integration Testing ### End-to-End Robot Pipeline Test ```python # Test complete robot control pipeline def test_robot_pipeline(): # Mock robot observation robot_obs = { "shoulder_pan.pos": 0.0, "shoulder_lift.pos": -90.0, "elbow_flex.pos": 90.0, "wrist_flex.pos": 0.0, "wrist_roll.pos": 0.0, "gripper.pos": 50.0, "camera_front": np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) } # Test observation processing obs_processor = RobotProcessorPipeline[dict, EnvTransition]( steps=[VanillaObservationProcessorStep()], to_transition=observation_to_transition, to_output=identity_transition ) processed_obs = obs_processor(robot_obs) print("✅ Observation processing works") # Test action processing mock_ee_action = { "ee.x": 0.5, "ee.y": 0.0, "ee.z": 0.3, "ee.wx": 0.0, "ee.wy": 0.0, "ee.wz": 0.0, "gripper": 0.1 } action_processor = RobotProcessorPipeline[dict, dict]( steps=[ # Your action processing steps ], to_transition=robot_action_to_transition, to_output=transition_to_robot_action ) joint_action = action_processor(mock_ee_action) print("✅ Action processing works") print(f"Joint commands: {list(joint_action.keys())}") test_robot_pipeline() ``` ## Error Recovery and Fallbacks ```python # Robust pipeline with fallbacks class RobustProcessor: def __init__(self, primary_processor, fallback_processor=None): self.primary = primary_processor self.fallback = fallback_processor or IdentityProcessorStep() def __call__(self, transition): try: return self.primary(transition) except Exception as e: print(f"⚠️ Primary processor failed: {e}") print("🔄 Using fallback processor") return self.fallback(transition) # Usage robust_pipeline = RobustProcessor( primary_processor=complex_pipeline, fallback_processor=simple_pipeline ) ``` ## Summary Effective pipeline debugging involves: 1. **Step-by-step inspection** to understand data flow 2. **Runtime hooks** for continuous monitoring 3. **Individual step testing** to isolate issues 4. **Performance profiling** to identify bottlenecks 5. **Type validation** to catch data structure mismatches 6. **Fallback strategies** for robust deployment Remember: Start simple, test incrementally, and use the rich debugging tools LeRobot provides to build reliable, high-performance processor pipelines!