refactor(docs): streamline monitoring hooks and enhance performance reporting

- Removed the log_shapes and measure_performance hooks, simplifying the monitoring process to focus on NaN checks.
- Updated performance reporting to include maximum processing times alongside average times for better insights.
- Clarified documentation regarding the processing pipeline and feature transformations.
This commit is contained in:
AdilZouitine
2025-09-15 14:01:04 +02:00
parent 066308ceb8
commit e8d79b5191
2 changed files with 28 additions and 115 deletions
+7 -30
View File
@@ -130,16 +130,7 @@ for i, intermediate in enumerate(processor.step_through(data)):
Add monitoring hooks without modifying your pipeline code:
```python
# Define monitoring hooks
def log_shapes(step_idx: int, transition: EnvTransition):
"""Log tensor shapes after each step."""
obs = transition.get(TransitionKey.OBSERVATION)
if obs:
print(f"Step {step_idx} shapes:")
for key, value in obs.items():
if isinstance(value, torch.Tensor):
print(f" {key}: {value.shape}")
# Define monitoring hook
def check_nans(step_idx: int, transition: EnvTransition):
"""Check for NaN values."""
obs = transition.get(TransitionKey.OBSERVATION)
@@ -148,29 +139,14 @@ def check_nans(step_idx: int, transition: EnvTransition):
if isinstance(value, torch.Tensor) and torch.isnan(value).any():
print(f"Warning: NaN detected in {key} at step {step_idx}")
def measure_performance(step_idx: int, transition: EnvTransition):
"""Measure processing time per step."""
import time
start_time = getattr(measure_performance, 'start_time', time.time())
if step_idx == 0:
measure_performance.start_time = time.time()
else:
elapsed = time.time() - start_time
print(f"Step {step_idx-1} took {elapsed*1000:.2f}ms")
measure_performance.start_time = time.time()
# Register hooks
processor.register_after_step_hook(log_shapes)
# Register hook
processor.register_after_step_hook(check_nans)
processor.register_after_step_hook(measure_performance)
# Process data - hooks will be called after each step
# Process data - hook will be called after each step
output = processor(input_data)
# Remove hooks when done debugging
processor.unregister_after_step_hook(log_shapes)
# Remove hook when done debugging
processor.unregister_after_step_hook(check_nans)
processor.unregister_after_step_hook(measure_performance)
```
## Pipeline Testing and Validation
@@ -264,12 +240,13 @@ class PerformanceProfiler:
print("\n=== Performance Report ===")
for step_name, times in self.step_times.items():
avg_time = sum(times) / len(times) * 1000 # ms
print(f"{step_name}: {avg_time:.2f}ms avg ({len(times)} calls)")
max_time = max(times) * 1000
print(f"{step_name}: {avg_time:.2f}ms avg, {max_time:.2f}ms max")
profiler = PerformanceProfiler()
processor.register_after_step_hook(profiler)
# Run your pipeline
# Run your pipeline multiple times
for _ in range(100):
output = processor(test_data)
+21 -85
View File
@@ -52,7 +52,7 @@ from lerobot.processor import TransitionKey, EnvTransition, PolicyAction, RobotA
# Example transition from a robot collecting data
transition: EnvTransition = {
TransitionKey.OBSERVATION: { # dict[str, Any] | None
TransitionKey.OBSERVATION: {
"observation.images.camera0": camera0_image_tensor, # Shape: (H, W, C)
"observation.images.camera1": camera1_image_tensor, # Shape: (H, W, C)
"observation.state": joint_positions_tensor, # Shape: (7,) for 7-DOF arm
@@ -63,10 +63,8 @@ transition: EnvTransition = {
TransitionKey.DONE: False, # bool | torch.Tensor | None
TransitionKey.TRUNCATED: False, # bool | torch.Tensor | None
TransitionKey.INFO: {"success": False}, # dict[str, Any] | None
TransitionKey.COMPLEMENTARY_DATA: { # dict[str, Any] | None
TransitionKey.COMPLEMENTARY_DATA: {
"task": "pick up the red cube", # Language instruction
"task_index": 0, # Task identifier
"index": 42 # Frame index
}
}
```
@@ -100,9 +98,12 @@ class MyProcessorStep(ProcessorStep):
return features # Most processors return features unchanged
```
`__call__` is the core of your processor step. It takes an `EnvTransition` and returns a modified `EnvTransition`.
`transform_features` is used to declare how this step transforms feature shapes/types.
### DataProcessorPipeline: The Generic Orchestrator
The `DataProcessorPipeline[TInput, TOutput]` chains multiple `ProcessorStep` instances together with compile-time type safety:
The `DataProcessorPipeline[TInput, TOutput]` chains multiple `ProcessorStep` instances together:
```python
from lerobot.processor import RobotProcessorPipeline, PolicyProcessorPipeline
@@ -229,6 +230,8 @@ training_postprocessor = PolicyProcessorPipeline[torch.Tensor, torch.Tensor](
DeviceProcessorStep(device="cpu"), # Move to CPU
UnnormalizerProcessorStep(features=..., stats=...), # Denormalize
]
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
)
```
@@ -272,75 +275,26 @@ class VelocityProcessor(ObservationProcessorStep):
def observation(self, obs):
new_obs = obs.copy()
if "observation.state" in obs:
# Add computed velocity field
new_obs["observation.velocity"] = self._compute_velocity(obs["observation.state"])
# concatenate computed velocity field to the state
new_obs["observation.state"] = self._compute_velocity(obs["observation.state"])
return new_obs
def transform_features(self, features):
"""Declare the new velocity field we're adding."""
if PipelineFeatureType.OBSERVATION in features:
# Add velocity feature with same shape as state
state_feature = features[PipelineFeatureType.OBSERVATION].get("observation.state")
if state_feature:
features[PipelineFeatureType.OBSERVATION]["observation.velocity"] = PolicyFeature(
type=FeatureType.STATE,
shape=state_feature.shape # Same shape as position
)
state_feature = features[PipelineFeatureType.OBSERVATION].get("observation.state")
if state_feature:
double_shape = (state_feature.shape[0] * 2,) if state_feature.shape else (2,)
features[PipelineFeatureType.OBSERVATION]["observation.state"] = PolicyFeature(
type=FeatureType.STATE, shape=double_shape
)
return features
```
### Real Examples from LeRobot
### Feature Specification Functions
**Phone Action Mapping** - Transforms action structure:
```python
# Input features: {"phone.pos": (3,), "phone.rot": (4,), "phone.enabled": (1,)}
# Output features: {"target_x": (1,), "target_y": (1,), ..., "gripper": (1,)}
def transform_features(self, features):
# Remove phone-specific keys
features[PipelineFeatureType.ACTION].pop("phone.pos", None)
features[PipelineFeatureType.ACTION].pop("phone.rot", None)
# Add robot target keys
features[PipelineFeatureType.ACTION]["target_x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
features[PipelineFeatureType.ACTION]["target_y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
# ... more target fields
return features
```
**Forward Kinematics** - Adds computed observations:
```python
# Input: Joint positions
# Output: Joint positions + End-effector pose
def transform_features(self, features):
# Add end-effector pose features computed from joints
for axis in ["x", "y", "z", "wx", "wy", "wz"]:
features[PipelineFeatureType.OBSERVATION][f"observation.state.ee.{axis}"] = PolicyFeature(
type=FeatureType.STATE, shape=(1,)
)
return features
```
**Tokenization** - Adds language features:
```python
# Input: Text in complementary_data
# Output: Token IDs and attention mask in observations
def transform_features(self, features):
features[PipelineFeatureType.OBSERVATION]["observation.language.tokens"] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
features[PipelineFeatureType.OBSERVATION]["observation.language.attention_mask"] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
return features
```
### Feature Aggregation in Practice
`create_initial_features()` and `aggregate_pipeline_dataset_features()` solve a critical dataset creation problem: determining the exact final data structure before any data is processed.
Since processor pipelines can add new features (like velocity fields), change tensor shapes (like cropping images), or rename keys, datasets need to know the complete output specification upfront to allocate proper storage and define schemas.
These functions work together by starting with robot hardware specifications (`create_initial_features()`) then simulating the entire pipeline transformation (`aggregate_pipeline_dataset_features()`) to compute the final feature dictionary that gets passed to `LeRobotDataset.create()`, ensuring perfect alignment between what processors output and what datasets expect to store.
```python
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features
@@ -386,25 +340,7 @@ LeRobot provides many registered processor steps. Here are the most commonly use
- **`rename_observations_processor`**: Rename observation keys using mapping dictionaries
- **`tokenizer_processor`**: Tokenize natural language task descriptions into tokens and attention masks
## Performance Tips
**🚀 Critical Optimization**: Always move data to GPU **before** normalization for significant speedups:
```python
# ✅ FAST: GPU normalization
steps=[
DeviceProcessorStep(device="cuda"), # Move to GPU first
NormalizerProcessorStep(...) # Normalize on GPU - much faster!
]
# ❌ SLOW: CPU normalization
steps=[
NormalizerProcessorStep(...), # Normalize on CPU - slow
DeviceProcessorStep(device="cuda") # Move to GPU after
]
```
## Next Steps
### Next Steps
- **[Implement Your Own Processor](implement_your_own_processor.mdx)** - Create custom processor steps
- **[Debug Your Pipeline](debug_processor_pipeline.mdx)** - Troubleshoot and optimize pipelines