From e8d79b5191892d387973cc6c4e7adc852c918983 Mon Sep 17 00:00:00 2001 From: AdilZouitine Date: Mon, 15 Sep 2025 14:01:04 +0200 Subject: [PATCH] refactor(docs): streamline monitoring hooks and enhance performance reporting - Removed the log_shapes and measure_performance hooks, simplifying the monitoring process to focus on NaN checks. - Updated performance reporting to include maximum processing times alongside average times for better insights. - Clarified documentation regarding the processing pipeline and feature transformations. --- docs/source/debug_processor_pipeline.mdx | 37 ++------ docs/source/introduction_processors.mdx | 106 +++++------------------ 2 files changed, 28 insertions(+), 115 deletions(-) diff --git a/docs/source/debug_processor_pipeline.mdx b/docs/source/debug_processor_pipeline.mdx index 3d33ab113..7aaa57780 100644 --- a/docs/source/debug_processor_pipeline.mdx +++ b/docs/source/debug_processor_pipeline.mdx @@ -130,16 +130,7 @@ for i, intermediate in enumerate(processor.step_through(data)): Add monitoring hooks without modifying your pipeline code: ```python -# Define monitoring hooks -def log_shapes(step_idx: int, transition: EnvTransition): - """Log tensor shapes after each step.""" - obs = transition.get(TransitionKey.OBSERVATION) - if obs: - print(f"Step {step_idx} shapes:") - for key, value in obs.items(): - if isinstance(value, torch.Tensor): - print(f" {key}: {value.shape}") - +# Define monitoring hook def check_nans(step_idx: int, transition: EnvTransition): """Check for NaN values.""" obs = transition.get(TransitionKey.OBSERVATION) @@ -148,29 +139,14 @@ def check_nans(step_idx: int, transition: EnvTransition): if isinstance(value, torch.Tensor) and torch.isnan(value).any(): print(f"Warning: NaN detected in {key} at step {step_idx}") -def measure_performance(step_idx: int, transition: EnvTransition): - """Measure processing time per step.""" - import time - start_time = getattr(measure_performance, 'start_time', time.time()) - if step_idx == 0: - measure_performance.start_time = time.time() - else: - elapsed = time.time() - start_time - print(f"Step {step_idx-1} took {elapsed*1000:.2f}ms") - measure_performance.start_time = time.time() - -# Register hooks -processor.register_after_step_hook(log_shapes) +# Register hook processor.register_after_step_hook(check_nans) -processor.register_after_step_hook(measure_performance) -# Process data - hooks will be called after each step +# Process data - hook will be called after each step output = processor(input_data) -# Remove hooks when done debugging -processor.unregister_after_step_hook(log_shapes) +# Remove hook when done debugging processor.unregister_after_step_hook(check_nans) -processor.unregister_after_step_hook(measure_performance) ``` ## Pipeline Testing and Validation @@ -264,12 +240,13 @@ class PerformanceProfiler: print("\n=== Performance Report ===") for step_name, times in self.step_times.items(): avg_time = sum(times) / len(times) * 1000 # ms - print(f"{step_name}: {avg_time:.2f}ms avg ({len(times)} calls)") + max_time = max(times) * 1000 + print(f"{step_name}: {avg_time:.2f}ms avg, {max_time:.2f}ms max") profiler = PerformanceProfiler() processor.register_after_step_hook(profiler) -# Run your pipeline +# Run your pipeline multiple times for _ in range(100): output = processor(test_data) diff --git a/docs/source/introduction_processors.mdx b/docs/source/introduction_processors.mdx index 731eb28e7..f879c4630 100644 --- a/docs/source/introduction_processors.mdx +++ b/docs/source/introduction_processors.mdx @@ -52,7 +52,7 @@ from lerobot.processor import TransitionKey, EnvTransition, PolicyAction, RobotA # Example transition from a robot collecting data transition: EnvTransition = { - TransitionKey.OBSERVATION: { # dict[str, Any] | None + TransitionKey.OBSERVATION: { "observation.images.camera0": camera0_image_tensor, # Shape: (H, W, C) "observation.images.camera1": camera1_image_tensor, # Shape: (H, W, C) "observation.state": joint_positions_tensor, # Shape: (7,) for 7-DOF arm @@ -63,10 +63,8 @@ transition: EnvTransition = { TransitionKey.DONE: False, # bool | torch.Tensor | None TransitionKey.TRUNCATED: False, # bool | torch.Tensor | None TransitionKey.INFO: {"success": False}, # dict[str, Any] | None - TransitionKey.COMPLEMENTARY_DATA: { # dict[str, Any] | None + TransitionKey.COMPLEMENTARY_DATA: { "task": "pick up the red cube", # Language instruction - "task_index": 0, # Task identifier - "index": 42 # Frame index } } ``` @@ -100,9 +98,12 @@ class MyProcessorStep(ProcessorStep): return features # Most processors return features unchanged ``` +`__call__` is the core of your processor step. It takes an `EnvTransition` and returns a modified `EnvTransition`. +`transform_features` is used to declare how this step transforms feature shapes/types. + ### DataProcessorPipeline: The Generic Orchestrator -The `DataProcessorPipeline[TInput, TOutput]` chains multiple `ProcessorStep` instances together with compile-time type safety: +The `DataProcessorPipeline[TInput, TOutput]` chains multiple `ProcessorStep` instances together: ```python from lerobot.processor import RobotProcessorPipeline, PolicyProcessorPipeline @@ -229,6 +230,8 @@ training_postprocessor = PolicyProcessorPipeline[torch.Tensor, torch.Tensor]( DeviceProcessorStep(device="cpu"), # Move to CPU UnnormalizerProcessorStep(features=..., stats=...), # Denormalize ] + to_transition=policy_action_to_transition, + to_output=transition_to_policy_action, ) ``` @@ -272,75 +275,26 @@ class VelocityProcessor(ObservationProcessorStep): def observation(self, obs): new_obs = obs.copy() if "observation.state" in obs: - # Add computed velocity field - new_obs["observation.velocity"] = self._compute_velocity(obs["observation.state"]) + # concatenate computed velocity field to the state + new_obs["observation.state"] = self._compute_velocity(obs["observation.state"]) return new_obs def transform_features(self, features): """Declare the new velocity field we're adding.""" - if PipelineFeatureType.OBSERVATION in features: - # Add velocity feature with same shape as state - state_feature = features[PipelineFeatureType.OBSERVATION].get("observation.state") - if state_feature: - features[PipelineFeatureType.OBSERVATION]["observation.velocity"] = PolicyFeature( - type=FeatureType.STATE, - shape=state_feature.shape # Same shape as position - ) + state_feature = features[PipelineFeatureType.OBSERVATION].get("observation.state") + if state_feature: + double_shape = (state_feature.shape[0] * 2,) if state_feature.shape else (2,) + features[PipelineFeatureType.OBSERVATION]["observation.state"] = PolicyFeature( + type=FeatureType.STATE, shape=double_shape + ) return features ``` -### Real Examples from LeRobot +### Feature Specification Functions -**Phone Action Mapping** - Transforms action structure: - -```python -# Input features: {"phone.pos": (3,), "phone.rot": (4,), "phone.enabled": (1,)} -# Output features: {"target_x": (1,), "target_y": (1,), ..., "gripper": (1,)} - -def transform_features(self, features): - # Remove phone-specific keys - features[PipelineFeatureType.ACTION].pop("phone.pos", None) - features[PipelineFeatureType.ACTION].pop("phone.rot", None) - - # Add robot target keys - features[PipelineFeatureType.ACTION]["target_x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[PipelineFeatureType.ACTION]["target_y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - # ... more target fields - return features -``` - -**Forward Kinematics** - Adds computed observations: - -```python -# Input: Joint positions -# Output: Joint positions + End-effector pose - -def transform_features(self, features): - # Add end-effector pose features computed from joints - for axis in ["x", "y", "z", "wx", "wy", "wz"]: - features[PipelineFeatureType.OBSERVATION][f"observation.state.ee.{axis}"] = PolicyFeature( - type=FeatureType.STATE, shape=(1,) - ) - return features -``` - -**Tokenization** - Adds language features: - -```python -# Input: Text in complementary_data -# Output: Token IDs and attention mask in observations - -def transform_features(self, features): - features[PipelineFeatureType.OBSERVATION]["observation.language.tokens"] = PolicyFeature( - type=FeatureType.LANGUAGE, shape=(self.max_length,) - ) - features[PipelineFeatureType.OBSERVATION]["observation.language.attention_mask"] = PolicyFeature( - type=FeatureType.LANGUAGE, shape=(self.max_length,) - ) - return features -``` - -### Feature Aggregation in Practice +`create_initial_features()` and `aggregate_pipeline_dataset_features()` solve a critical dataset creation problem: determining the exact final data structure before any data is processed. +Since processor pipelines can add new features (like velocity fields), change tensor shapes (like cropping images), or rename keys, datasets need to know the complete output specification upfront to allocate proper storage and define schemas. +These functions work together by starting with robot hardware specifications (`create_initial_features()`) then simulating the entire pipeline transformation (`aggregate_pipeline_dataset_features()`) to compute the final feature dictionary that gets passed to `LeRobotDataset.create()`, ensuring perfect alignment between what processors output and what datasets expect to store. ```python from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features @@ -386,25 +340,7 @@ LeRobot provides many registered processor steps. Here are the most commonly use - **`rename_observations_processor`**: Rename observation keys using mapping dictionaries - **`tokenizer_processor`**: Tokenize natural language task descriptions into tokens and attention masks -## Performance Tips - -**🚀 Critical Optimization**: Always move data to GPU **before** normalization for significant speedups: - -```python -# ✅ FAST: GPU normalization -steps=[ - DeviceProcessorStep(device="cuda"), # Move to GPU first - NormalizerProcessorStep(...) # Normalize on GPU - much faster! -] - -# ❌ SLOW: CPU normalization -steps=[ - NormalizerProcessorStep(...), # Normalize on CPU - slow - DeviceProcessorStep(device="cuda") # Move to GPU after -] -``` - -## Next Steps +### Next Steps - **[Implement Your Own Processor](implement_your_own_processor.mdx)** - Create custom processor steps - **[Debug Your Pipeline](debug_processor_pipeline.mdx)** - Troubleshoot and optimize pipelines