more fixes

This commit is contained in:
Jade Choghari
2025-11-17 13:05:14 +01:00
parent f4547299e4
commit cb7d2ed0fc
11 changed files with 990 additions and 201 deletions
+3 -1
View File
@@ -337,7 +337,9 @@ def make_pre_post_processors(
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, XVLAConfig):
from lerobot.policies.xvla.processor_xvla import make_xvla_pre_post_processors
from lerobot.policies.xvla.processor_xvla import (
make_xvla_pre_post_processors,
)
processors = make_xvla_pre_post_processors(
config=policy_cfg,
@@ -0,0 +1,165 @@
# XVLA Custom Processor Steps - Implementation Summary
## Overview
Implemented three custom processor steps for XVLA that encapsulate the preprocessing and postprocessing logic previously scattered in `lerobot_eval.py` (lines 165-184).
## Files Modified
### 1. `/src/lerobot/policies/xvla/processor_xvla.py`
**Changes:**
- Added imports: `dataclass`, `numpy`, `Rotate6D_to_AxisAngle`, processor core types
- Implemented 3 new processor step classes (all registered with `ProcessorStepRegistry`)
**New Classes:**
#### `XVLAImageScaleProcessorStep`
- **Registry Name:** `xvla_image_scale`
- **Purpose:** Scales image observations by 255 (converts [0,1] to [0,255])
- **Configuration:**
- `image_keys: list[str] | None` - Auto-detects or specify image keys
- **Location:** Lines 93-140
#### `XVLAAddDomainIdProcessorStep`
- **Registry Name:** `xvla_add_domain_id`
- **Purpose:** Adds domain_id tensor to complementary data
- **Configuration:**
- `domain_id: int = 3` - Domain identifier
- `device: str = "cuda"` - Tensor device
- **Location:** Lines 143-192
#### `XVLARotation6DToAxisAngleProcessorStep`
- **Registry Name:** `xvla_rotation_6d_to_axis_angle`
- **Purpose:** Converts 6D rotation to axis-angle and reorganizes action dimensions
- Input: [eef(3), rotation_6d(6), gripper(1)] = 10D
- Output: [eef(3), axis_angle(3), gripper(1)] = 7D
- **Configuration:**
- `expected_action_dim: int = 10`
- **Location:** Lines 195-255
### 2. `/src/lerobot/policies/xvla/README_PROCESSORS.md` (NEW)
Comprehensive documentation covering:
- Processor step descriptions and configurations
- Integration examples for preprocessing/postprocessing pipelines
- Before/after comparison showing simplified evaluation code
- JSON/YAML configuration examples
- Reference to Groot processor patterns
## Key Features
### 1. **Registry-Based Architecture**
All processors are registered with `@ProcessorStepRegistry.register()`, enabling:
- Instantiation from configuration files
- Serialization/deserialization with policies
- Easy discovery and debugging
### 2. **Proper ProcessorStep Interface**
Each processor implements:
- `__call__(transition: EnvTransition) -> EnvTransition` - Main processing logic
- `transform_features(features) -> features` - Feature contract declaration
- `get_config() -> dict` - Serializable configuration
### 3. **Safe Data Handling**
- All processors use `transition.copy()` to avoid side effects
- Proper handling of missing/None values
- Device-aware tensor operations
### 4. **Configurable and Reusable**
- All parameters exposed in `get_config()`
- Can be customized per deployment
- Works with any XVLA model configuration
## Usage Impact
### Before (from lerobot_eval.py):
```python
# Lines 166-184 - scattered preprocessing/postprocessing
observation[f"observation.images.image"] = observation[f"observation.images.image"] * 255
observation[f"observation.images.image2"] = observation[f"observation.images.image2"] * 255
observation = add_envs_task(env, observation)
observation = preprocessor(observation)
observation["domain_id"] = torch.tensor([int(3)], dtype=torch.long).to("cuda")
with torch.inference_mode():
action = policy.select_action(observation).to("cpu").numpy()
target_eef = action[:, :3]
target_axis = Rotate6D_to_AxisAngle(action[:, 3:9])
target_act = action[:, 9:10]
action_numpy = np.concatenate([target_eef, target_axis, target_act], axis=-1)
```
### After (with custom processors):
```python
# Clean and simple - processors encapsulate all the logic
observation = add_envs_task(env, observation)
observation = preprocessor(observation) # Includes image scaling + domain_id
with torch.inference_mode():
action = policy.select_action(observation)
action = postprocessor(action) # Includes rotation conversion + device transfer
action_numpy = action.numpy()
```
## Design Patterns Followed
1. **Groot Processor Reference:** Followed same patterns as `processor_groot.py`:
- Dataclass-based configuration
- Registry registration
- State management via `get_config()`
- Proper transition handling
2. **LeRobot Processor Guidelines:** (from `implement_your_own_processor.mdx`):
- Safe data handling with `copy()`
- Clear error messages
- Device/dtype awareness
- Feature contract declaration
3. **Pipeline Integration:**
- Works seamlessly with `PolicyProcessorPipeline`
- Automatic dict ↔ EnvTransition conversion
- Composable with other processor steps
## Benefits
1. **Cleaner Code:** Evaluation loop is now much simpler
2. **Maintainable:** Processing logic is centralized and well-documented
3. **Configurable:** All parameters can be adjusted via config files
4. **Reusable:** Can be used across different XVLA deployments
5. **Testable:** Each processor can be tested independently
6. **Serializable:** Processors save/load with the policy
## Testing Recommendations
1. **Unit Tests:**
- Test each processor with sample transitions
- Verify image scaling (multiply by 255)
- Verify domain_id addition and device placement
- Verify rotation conversion accuracy
2. **Integration Tests:**
- Test full preprocessing pipeline
- Test full postprocessing pipeline
- Verify evaluation loop still works correctly
- Test with different domain_ids and devices
3. **Configuration Tests:**
- Test loading processors from config
- Test serialization/deserialization
- Test overrides mechanism
## Next Steps
1. **Update XVLA Policy Factory:** Optionally add these processors to the default pipeline in `make_xvla_pre_post_processors()` or document how to add them via config
2. **Update lerobot_eval.py:** Simplify the evaluation code to use the new processors
3. **Add Configuration Examples:** Create sample config files showing processor integration
4. **Add Tests:** Implement unit and integration tests for the new processors
## Notes
- No changes made to `make_xvla_pre_post_processors()` as requested
- Processors are available but not automatically included (must be added via config)
- All processors follow LeRobot conventions and best practices
- Compatible with existing XVLA model configurations
+141
View File
@@ -0,0 +1,141 @@
# XVLA Custom Processors - Quick Start
## What Was Implemented
Three custom processor steps that simplify XVLA evaluation by encapsulating preprocessing and postprocessing logic:
```
┌─────────────────────────────────────────────────────────────┐
│ PREPROCESSING PIPELINE │
├─────────────────────────────────────────────────────────────┤
│ 1. RenameObservationsProcessorStep │
│ 2. AddBatchDimensionProcessorStep │
│ 3. XVLAImageScaleProcessorStep ← NEW │
│ └─ Scales images by 255 │
│ 4. TokenizerProcessorStep │
│ 5. DeviceProcessorStep │
│ 6. XVLAAddDomainIdProcessorStep ← NEW │
│ └─ Adds domain_id tensor │
│ 7. NormalizerProcessorStep │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ POSTPROCESSING PIPELINE │
├─────────────────────────────────────────────────────────────┤
│ 1. UnnormalizerProcessorStep │
│ 2. XVLARotation6DToAxisAngleProcessorStep ← NEW │
│ └─ Converts 6D rotation to axis-angle (10D → 7D) │
│ 3. DeviceProcessorStep(device="cpu") │
└─────────────────────────────────────────────────────────────┘
```
## Simplest Usage
### Option 1: Import and Use Directly
```python
from lerobot.policies.xvla.processor_xvla import (
XVLAImageScaleProcessorStep,
XVLAAddDomainIdProcessorStep,
XVLARotation6DToAxisAngleProcessorStep,
)
# Add to your existing preprocessor steps
preprocessor = PolicyProcessorPipeline(
steps=[
# ... your existing steps ...
XVLAImageScaleProcessorStep(),
# ... more steps ...
XVLAAddDomainIdProcessorStep(domain_id=3),
]
)
# Add to your postprocessor steps
postprocessor = PolicyProcessorPipeline(
steps=[
XVLARotation6DToAxisAngleProcessorStep(),
DeviceProcessorStep(device="cpu"),
]
)
```
### Option 2: Load from Config
```python
# In your config.json or YAML:
{
"preprocessor_steps": [
{"name": "xvla_image_scale"},
{"name": "xvla_add_domain_id", "domain_id": 3, "device": "cuda"}
],
"postprocessor_steps": [
{"name": "xvla_rotation_6d_to_axis_angle", "expected_action_dim": 10}
]
}
# Then load:
preprocessor = PolicyProcessorPipeline.from_pretrained("path/to/config")
```
## Evaluation Loop Comparison
### ❌ Old Way (Manual Processing)
```python
# Scattered preprocessing
observation["observation.images.image"] *= 255
observation["observation.images.image2"] *= 255
observation = add_envs_task(env, observation)
observation = preprocessor(observation)
observation["domain_id"] = torch.tensor([3], dtype=torch.long).to("cuda")
# Policy inference
action = policy.select_action(observation)
# Manual postprocessing
target_eef = action[:, :3]
target_axis = Rotate6D_to_AxisAngle(action[:, 3:9])
target_act = action[:, 9:10]
action = np.concatenate([target_eef, target_axis, target_act], axis=-1)
```
### ✅ New Way (With Custom Processors)
```python
# All preprocessing in one call
observation = add_envs_task(env, observation)
observation = preprocessor(observation) # Includes scaling + domain_id
# Policy inference
action = policy.select_action(observation)
# All postprocessing in one call
action = postprocessor(action) # Includes rotation conversion
```
**Result:** 13 lines → 6 lines of cleaner, more maintainable code!
## Quick Reference
| Processor | Purpose | Config Key | Default |
|-----------|---------|------------|---------|
| **XVLAImageScaleProcessorStep** | Scale images by 255 | `xvla_image_scale` | Auto-detect images |
| **XVLAAddDomainIdProcessorStep** | Add domain_id tensor | `xvla_add_domain_id` | domain_id=3, device="cuda" |
| **XVLARotation6DToAxisAngleProcessorStep** | Convert 6D→axis-angle | `xvla_rotation_6d_to_axis_angle` | expected_action_dim=10 |
## Key Benefits
1. ✅ **Clean code** - No scattered preprocessing logic
2. ✅ **Configurable** - Adjust via config files
3. ✅ **Reusable** - Works across different XVLA setups
4. ✅ **Serializable** - Saves/loads with policy
5. ✅ **Testable** - Each processor can be tested independently
6. ✅ **Registry-based** - Easy instantiation from config
## Next Steps
1. **Update your evaluation script** to use the new processors
2. **Add processors to your config** if using config-based loading
3. **Test with your specific XVLA model** to ensure compatibility
4. **Adjust parameters** as needed (domain_id, device, etc.)
For detailed documentation, see `README_PROCESSORS.md`.
@@ -0,0 +1,132 @@
# XVLA Custom Processor Steps
Three custom processor steps have been implemented for XVLA that encapsulate the preprocessing and postprocessing logic from `lerobot_eval.py`.
## Processor Steps
### 1. XVLAImageScaleProcessorStep
**Registry Name:** `xvla_image_scale`
Scales image observations by 255 (from [0,1] to [0,255] range).
```python
XVLAImageScaleProcessorStep(
image_keys=None # Auto-detects "observation.images.*" or specify list
)
```
### 2. XVLAAddDomainIdProcessorStep
**Registry Name:** `xvla_add_domain_id`
Adds `domain_id` tensor to complementary data for multi-domain support.
```python
XVLAAddDomainIdProcessorStep(
domain_id=3, # Domain identifier
device="cuda" # Tensor device
)
```
### 3. XVLARotation6DToAxisAngleProcessorStep
**Registry Name:** `xvla_rotation_6d_to_axis_angle`
Converts 6D rotation to axis-angle representation:
- **Input:** [eef(3), rotation_6d(6), gripper(1)] = 10D
- **Output:** [eef(3), axis_angle(3), gripper(1)] = 7D
```python
XVLARotation6DToAxisAngleProcessorStep(
expected_action_dim=10
)
```
## Integration with Config
These steps can be added to your XVLA policy configuration:
### In Preprocessing Pipeline:
```python
from lerobot.policies.xvla.processor_xvla import (
XVLAImageScaleProcessorStep,
XVLAAddDomainIdProcessorStep,
)
preprocessor_steps = [
RenameObservationsProcessorStep(rename_map={}),
AddBatchDimensionProcessorStep(),
XVLAImageScaleProcessorStep(), # Add this
TokenizerProcessorStep(...),
DeviceProcessorStep(device="cuda"),
XVLAAddDomainIdProcessorStep(domain_id=3), # Add this
NormalizerProcessorStep(...),
]
```
### In Postprocessing Pipeline:
```python
from lerobot.policies.xvla.processor_xvla import XVLARotation6DToAxisAngleProcessorStep
postprocessor_steps = [
UnnormalizerProcessorStep(...),
XVLARotation6DToAxisAngleProcessorStep(), # Add this
DeviceProcessorStep(device="cpu"),
]
```
## Usage in Evaluation
Now your evaluation loop simplifies to:
```python
# Before (from lerobot_eval.py lines 165-184)
observation[f"observation.images.image"] = observation[f"observation.images.image"] * 255
observation[f"observation.images.image2"] = observation[f"observation.images.image2"] * 255
observation = add_envs_task(env, observation)
observation = preprocessor(observation)
observation["domain_id"] = torch.tensor([int(3)], dtype=torch.long).to("cuda")
with torch.inference_mode():
action = policy.select_action(observation).to("cpu").numpy()
target_eef = action[:, :3]
target_axis = Rotate6D_to_AxisAngle(action[:, 3:9])
target_act = action[:, 9:10]
action_numpy = np.concatenate([target_eef, target_axis, target_act], axis=-1)
# After (clean and simple)
observation = add_envs_task(env, observation) # Add task
observation = preprocessor(observation) # Scales images + adds domain_id
with torch.inference_mode():
action = policy.select_action(observation)
action = postprocessor(action) # Converts rotation + moves to CPU
action_numpy = action.numpy()
```
## Configuration via Registry
All steps are registered and can be loaded from JSON/YAML config:
```json
{
"preprocessor": {
"steps": [
{"name": "xvla_image_scale"},
{"name": "xvla_add_domain_id", "domain_id": 3, "device": "cuda"}
]
},
"postprocessor": {
"steps": [
{"name": "xvla_rotation_6d_to_axis_angle", "expected_action_dim": 10}
]
}
}
```
## Implementation Reference
See `processor_groot.py` for similar patterns - these XVLA processors follow the same design:
- Registered with `@ProcessorStepRegistry.register()`
- Implement `__call__`, `transform_features`, and `get_config`
- Operate on `EnvTransition` objects
- Properly handle `transition.copy()` to avoid side effects
@@ -0,0 +1,234 @@
# XVLA Configuration and Evaluation Updates - Summary
## Overview
Updated XVLA configuration files and evaluation script to use the new custom processor steps, eliminating manual preprocessing and postprocessing code.
## Files Modified
### 1. `/src/lerobot/policies/xvla/policy_preprocessor.json`
**Added two new processor steps:**
#### Step 3: `xvla_image_scale` (NEW - Line 14-19)
```json
{
"registry_name": "xvla_image_scale",
"config": {
"image_keys": null
}
}
```
- **Position:** After `to_batch_processor`, before `tokenizer_processor`
- **Purpose:** Scales images by 255 (converts from [0,1] to [0,255])
- **Replaces:** Manual code `observation["observation.images.image"] *= 255`
#### Step 6: `xvla_add_domain_id` (NEW - Line 38-44)
```json
{
"registry_name": "xvla_add_domain_id",
"config": {
"domain_id": 3,
"device": "cuda"
}
}
```
- **Position:** After `device_processor`, before `normalizer_processor`
- **Purpose:** Adds domain_id tensor to complementary data
- **Replaces:** Manual code `observation["domain_id"] = torch.tensor([int(3)], dtype=torch.long).to("cuda")`
**Final preprocessing pipeline order:**
1. `rename_observations_processor`
2. `to_batch_processor`
3. `xvla_image_scale` ⭐ NEW
4. `tokenizer_processor`
5. `device_processor`
6. `xvla_add_domain_id` ⭐ NEW
7. `normalizer_processor`
### 2. `/src/lerobot/policies/xvla/policy_postprocessor.json`
**Added one new processor step and updated device:**
#### Step 2: `xvla_rotation_6d_to_axis_angle` (NEW - Line 23-28)
```json
{
"registry_name": "xvla_rotation_6d_to_axis_angle",
"config": {
"expected_action_dim": 10
}
}
```
- **Position:** After `unnormalizer_processor`, before `device_processor`
- **Purpose:** Converts 6D rotation to axis-angle (10D → 7D action)
- **Replaces:** Manual code:
```python
target_eef = action[:, :3]
target_axis = Rotate6D_to_AxisAngle(action[:, 3:9])
target_act = action[:, 9:10]
action = np.concatenate([target_eef, target_axis, target_act], axis=-1)
```
#### Step 3: `device_processor` (UPDATED - Line 29-35)
- **Changed device:** `"cuda"``"cpu"`
- **Purpose:** Move tensors to CPU for environment interaction
- **Replaces:** Manual code `.to("cpu")`
**Final postprocessing pipeline order:**
1. `unnormalizer_processor`
2. `xvla_rotation_6d_to_axis_angle` ⭐ NEW
3. `device_processor` (device changed to "cpu") 🔧 UPDATED
### 3. `/src/lerobot/scripts/lerobot_eval.py`
**Removed manual preprocessing/postprocessing code:**
#### Lines 91-92: Removed import (DELETED)
```python
# REMOVED:
from lerobot.policies.xvla.utils import Rotate6D_to_AxisAngle
```
#### Lines 165-184: Simplified evaluation logic (REPLACED)
**Before (18 lines with manual processing):**
```python
observation[f"observation.images.image"] = observation[f"observation.images.image"] * 255
observation[f"observation.images.image2"] = observation[f"observation.images.image2"] * 255
observation = add_envs_task(env, observation)
observation = preprocessor(observation)
observation["domain_id"] = torch.tensor([int(3)], dtype=torch.long).to("cuda")
with torch.inference_mode():
action = policy.select_action(observation).to("cpu").numpy()
# action = postprocessor(action) # THIS WAS COMMENTED OUT
target_eef = action[:, :3]
target_axis = Rotate6D_to_AxisAngle(action[:, 3:9])
target_act = action[:, 9:10]
action_numpy = np.concatenate([target_eef, target_axis, target_act], axis=-1)
# Convert to CPU / numpy.
# action_numpy: np.ndarray = action.to("cpu").numpy()
assert action_numpy.ndim == 2, "Action dimensions should be (batch, action_dim)"
```
**After (11 lines, clean and simple):**
```python
observation = add_envs_task(env, observation)
# Preprocess observation (includes image scaling and domain_id addition)
observation = preprocessor(observation)
# Policy inference
with torch.inference_mode():
action = policy.select_action(observation)
# Postprocess action (includes rotation conversion and device transfer to CPU)
action = postprocessor(action)
# Convert to numpy
action_numpy: np.ndarray = action.numpy()
assert action_numpy.ndim == 2, "Action dimensions should be (batch, action_dim)"
```
## Impact Summary
### Code Reduction
- **Lines removed:** ~13 lines of manual processing code
- **Lines added:** ~7 lines of clean processor calls
- **Net reduction:** ~6 lines + cleaner structure
- **Removed import:** No longer need `Rotate6D_to_AxisAngle` import
### Benefits
1. **✅ Cleaner Code**
- Evaluation loop is now much simpler and more readable
- No scattered preprocessing logic
- Clear separation of concerns
2. **✅ Configuration-Driven**
- All preprocessing/postprocessing controlled via JSON config
- Easy to adjust parameters (domain_id, device, etc.) without code changes
- Can load different configs for different deployments
3. **✅ Maintainable**
- Processing logic centralized in processor classes
- Single source of truth for transformations
- Easier to debug and test
4. **✅ Reusable**
- Processors work across all XVLA evaluations
- Can be shared between training and inference
- Can be serialized with the model
5. **✅ Consistent**
- Same processing pipeline guaranteed in all contexts
- No risk of forgetting manual steps
- Automatic handling of edge cases
## Testing Checklist
Before deploying, verify:
- [ ] Images are scaled correctly (0-255 range)
- [ ] domain_id is added to complementary data
- [ ] 6D rotation correctly converts to axis-angle
- [ ] Actions are 7D after postprocessing
- [ ] Evaluation success rates match previous results
- [ ] Video rendering still works
- [ ] Multi-environment batching works correctly
## Configuration Notes
### Customizing Domain ID
To change the domain ID for different embodiments, edit `policy_preprocessor.json`:
```json
{
"registry_name": "xvla_add_domain_id",
"config": {
"domain_id": 5, // Change this value
"device": "cuda"
}
}
```
### Customizing Image Keys
To scale specific images only, edit `policy_preprocessor.json`:
```json
{
"registry_name": "xvla_image_scale",
"config": {
"image_keys": ["observation.images.image", "observation.images.wrist_cam"]
}
}
```
### Customizing Action Dimensions
To support different action dimensions, edit `policy_postprocessor.json`:
```json
{
"registry_name": "xvla_rotation_6d_to_axis_angle",
"config": {
"expected_action_dim": 12 // Adjust based on your model
}
}
```
## Migration Guide
If you have existing XVLA checkpoints without these configs:
1. **Copy the updated JSON files** to your checkpoint directory
2. **No model retraining needed** - processors are data transforms only
3. **Test evaluation** to ensure consistent results
4. **Update any custom evaluation scripts** to use processors
## Related Files
- Custom processors implementation: `/src/lerobot/policies/xvla/processor_xvla.py`
- Documentation: `/src/lerobot/policies/xvla/README_PROCESSORS.md`
- Quick start: `/src/lerobot/policies/xvla/QUICK_START.md`
## Questions?
See the processor documentation in `/src/lerobot/policies/xvla/README_PROCESSORS.md` for detailed usage examples and troubleshooting.
+6
View File
@@ -0,0 +1,6 @@
from lerobot.policies.xvla.processor_xvla import (
make_xvla_pre_post_processors,
XVLAImageScaleProcessorStep,
XVLAAddDomainIdProcessorStep,
XVLARotation6DToAxisAngleProcessorStep,
)
@@ -0,0 +1,37 @@
{
"name": "policy_postprocessor",
"steps": [
{
"registry_name": "unnormalizer_processor",
"config": {
"eps": 1e-08,
"features": {
"action": {
"type": "ACTION",
"shape": [
20
]
}
},
"norm_map": {
"VISUAL": "MEAN_STD",
"STATE": "IDENTITY",
"ACTION": "IDENTITY"
}
}
},
{
"registry_name": "xvla_rotation_6d_to_axis_angle",
"config": {
"expected_action_dim": 10
}
},
{
"registry_name": "device_processor",
"config": {
"device": "cpu",
"float_dtype": null
}
}
]
}
@@ -0,0 +1,87 @@
{
"name": "policy_preprocessor",
"steps": [
{
"registry_name": "rename_observations_processor",
"config": {
"rename_map": {}
}
},
{
"registry_name": "to_batch_processor",
"config": {}
},
{
"registry_name": "xvla_image_scale",
"config": {
"image_keys": null
}
},
{
"registry_name": "tokenizer_processor",
"config": {
"max_length": 50,
"task_key": "task",
"padding_side": "right",
"padding": "max_length",
"truncation": true,
"tokenizer_name": "facebook/bart-large"
}
},
{
"registry_name": "device_processor",
"config": {
"device": "cuda",
"float_dtype": null
}
},
{
"registry_name": "xvla_add_domain_id",
"config": {
"domain_id": 3,
"device": "cuda"
}
},
{
"registry_name": "normalizer_processor",
"config": {
"eps": 1e-08,
"features": {
"observation.images.image": {
"type": "VISUAL",
"shape": [
3,
224,
224
]
},
"observation.images.image2": {
"type": "VISUAL",
"shape": [
3,
224,
224
]
},
"observation.state": {
"type": "STATE",
"shape": [
8
]
},
"action": {
"type": "ACTION",
"shape": [
20
]
}
},
"norm_map": {
"VISUAL": "IMAGENET",
"STATE": "IDENTITY",
"ACTION": "IDENTITY"
}
}
}
]
}
+174 -184
View File
@@ -14,208 +14,30 @@
# limitations under the License.
# ------------------------------------------------------------------------------
from dataclasses import dataclass
from typing import Any
import numpy as np
import torch
from transformers import ProcessorMixin
from lerobot.policies.xvla.configuration_xvla import XVLAConfig
from lerobot.policies.xvla.utils import Rotate6D_to_AxisAngle
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyProcessorPipeline,
ProcessorStep,
ProcessorStepRegistry,
RenameObservationsProcessorStep,
TokenizerProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action
from lerobot.processor.core import EnvTransition, TransitionKey
from lerobot.utils.constants import POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME
class XVLAProcessor(ProcessorMixin):
"""
XVLAProcessor: Unified multimodal processor for XVLA models.
Handles:
- Multi-view image inputs (e.g., from multiple cameras).
- Batch processing for multiple samples.
- Joint tokenization and image tensor preparation.
This processor combines an image processor and a tokenizer under a single interface
so that users can call it directly like:
>>> processor = XVLAProcessor.from_pretrained("path/to/xvla")
>>> inputs = processor(images=batch_images, language_instruction=batch_texts)
It is fully compatible with the Hugging Face AutoProcessor API.
Attributes
----------
num_views : int, default=3
Expected number of image views per sample. Missing views will be padded with zeros.
language_max_length : int, default=50
Maximum token length for text encoding.
attributes : list
Required by ProcessorMixin to know which submodules are stored and reloaded.
image_processor_class : str
The name of the associated image processor class.
tokenizer_class : tuple(str)
The names of compatible tokenizer classes.
"""
num_views: int = 3
language_max_length: int = 50
# Hugging Face ProcessorMixin-required metadata
attributes = ["image_processor", "tokenizer"]
image_processor_class = "AutoImageProcessor"
tokenizer_class = ("BartTokenizer", "BartTokenizerFast")
def __init__(self, image_processor=None, tokenizer=None):
"""
Initialize XVLAProcessor.
Parameters
----------
image_processor : PreTrainedImageProcessor, optional
The image processor used to normalize/resize images.
tokenizer : PreTrainedTokenizer, optional
The tokenizer used for text tokenization.
"""
# ProcessorMixin automatically saves these under self.image_processor / self.tokenizer
super().__init__(image_processor, tokenizer)
# ================== LANGUAGE ENCODING ==================
def encode_language(self, language_instruction: str | list[str]) -> dict[str, torch.Tensor]:
"""
Tokenize one or more language instructions.
Parameters
----------
language_instruction : str or List[str]
A single instruction or a batch of instructions.
Returns
-------
Dict[str, torch.Tensor]
{
"input_ids": tensor of shape [B, L]
}
"""
if isinstance(language_instruction, str):
language_instruction = [language_instruction]
inputs = self.tokenizer(
language_instruction,
return_tensors="pt",
padding="max_length",
max_length=self.language_max_length,
truncation=True,
)
return {"input_ids": inputs["input_ids"]}
# ================== IMAGE ENCODING ==================
def encode_image(self, images: list | list[list], **kwargs) -> dict[str, torch.Tensor]:
"""
Preprocess one or more sets of multi-view images.
Parameters
----------
images : List or List[List]
Single sample: [img1, img2, ...]
Batch: [[img1a, img1b], [img2a, img2b, img2c], ...]
Each image may be a PIL.Image, NumPy array, or torch.Tensor.
kwargs : dict
Extra arguments passed to the underlying image processor
(e.g., `do_resize=False`, `size=(224,224)`).
Returns
-------
Dict[str, torch.Tensor]
{
"image_input": tensor [B, num_views, C, H, W],
"image_mask": tensor [B, num_views]
}
"""
# Normalize to batch form
if not isinstance(images[0], (list, tuple)):
images = [images] # convert single sample to batch of size 1
batch_imgs, batch_masks = [], []
for sample_imgs in images:
processed = self.image_processor(sample_imgs, return_tensors="pt", **kwargs)["pixel_values"]
V_exist = processed.size(0)
# Pad to self.num_views
if V_exist < self.num_views:
processed = torch.cat(
[processed, processed.new_zeros(self.num_views - V_exist, *processed.shape[1:])],
dim=0,
)
# Mask: True for valid slots, False for padding
image_mask = torch.zeros(self.num_views, dtype=torch.bool, device=processed.device)
image_mask[:V_exist] = True
batch_imgs.append(processed)
batch_masks.append(image_mask)
image_input = torch.stack(batch_imgs, dim=0) # [B, num_views, C, H, W]
image_mask = torch.stack(batch_masks, dim=0) # [B, num_views]
return {"image_input": image_input, "image_mask": image_mask}
# ================== COMBINED CALL ==================
def __call__(
self,
images: list | list[list] | None = None,
language_instruction: str | list[str] | None = None,
**kwargs,
) -> dict[str, torch.Tensor]:
"""
Combine image and text encoding into a unified multimodal input.
Parameters
----------
images : List or List[List], optional
Single-sample or batched multi-view images.
language_instruction : str or List[str], optional
Corresponding text instructions.
kwargs : dict
Extra args passed to image processor.
Returns
-------
Dict[str, torch.Tensor]
{
"input_ids": [B, L], optional,
"image_input": [B, num_views, C, H, W], optional,
"image_mask": [B, num_views], optional
}
"""
outputs: dict[str, Any] = {}
# Encode language if provided
if language_instruction is not None:
outputs.update(self.encode_language(language_instruction))
# Encode image if provided
if images is not None:
outputs.update(self.encode_image(images, **kwargs))
# Sanity check for batch alignment
if "input_ids" in outputs and "image_input" in outputs:
assert outputs["input_ids"].size(0) == outputs["image_input"].size(0), (
f"Batch mismatch: text batch {outputs['input_ids'].size(0)} "
f"!= image batch {outputs['image_input'].size(0)}"
)
return outputs
def make_xvla_pre_post_processors(
config: XVLAConfig,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
@@ -263,3 +85,171 @@ def make_xvla_pre_post_processors(
to_output=transition_to_policy_action,
),
)
# Custom XVLA processor steps
@dataclass
@ProcessorStepRegistry.register(name="xvla_image_scale")
class XVLAImageScaleProcessorStep(ProcessorStep):
"""Scale image observations by 255 to convert from [0, 1] to [0, 255] range.
This processor step multiplies all image observations by 255, which is required
for XVLA models that expect images in uint8-like range.
Args:
image_keys: List of observation keys that contain images to scale.
If None, will automatically detect keys starting with "observation.images."
"""
image_keys: list[str] | None = None
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Scale image observations by 255."""
new_transition = transition.copy()
obs = new_transition.get(TransitionKey.OBSERVATION, {})
if obs is None:
return new_transition
# Make a copy of observations to avoid modifying the original
obs = obs.copy()
# Determine which keys to scale
keys_to_scale = self.image_keys
if keys_to_scale is None:
# Auto-detect image keys
keys_to_scale = [k for k in obs.keys() if k.startswith("observation.images.")]
# Scale each image
for key in keys_to_scale:
if key in obs and isinstance(obs[key], torch.Tensor):
obs[key] = obs[key] * 255
new_transition[TransitionKey.OBSERVATION] = obs
return new_transition
def transform_features(self, features):
"""Image scaling doesn't change feature structure."""
return features
def get_config(self) -> dict[str, Any]:
"""Return serializable configuration."""
return {
"image_keys": self.image_keys,
}
@dataclass
@ProcessorStepRegistry.register(name="xvla_add_domain_id")
class XVLAAddDomainIdProcessorStep(ProcessorStep):
"""Add domain_id to complementary data.
This processor step adds a domain_id tensor to the complementary data,
which is used by XVLA to identify different robot embodiments or task domains.
Args:
domain_id: The domain ID to add (default: 3)
device: Device to place the domain_id tensor on (default: "cuda")
"""
domain_id: int = 3
device: str = "cuda"
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Add domain_id to complementary data."""
new_transition = transition.copy()
comp = new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
if comp is None:
comp = {}
else:
comp = comp.copy()
# Infer batch size from observation tensors
obs = new_transition.get(TransitionKey.OBSERVATION, {})
batch_size = 1
if obs:
for v in obs.values():
if isinstance(v, torch.Tensor):
batch_size = v.shape[0]
break
# Add domain_id tensor
comp["domain_id"] = torch.tensor([int(self.domain_id)] * batch_size, dtype=torch.long).to(self.device)
new_transition[TransitionKey.COMPLEMENTARY_DATA] = comp
return new_transition
def transform_features(self, features):
"""Domain ID addition doesn't change feature structure."""
return features
def get_config(self) -> dict[str, Any]:
"""Return serializable configuration."""
return {
"domain_id": self.domain_id,
"device": self.device,
}
@dataclass
@ProcessorStepRegistry.register(name="xvla_rotation_6d_to_axis_angle")
class XVLARotation6DToAxisAngleProcessorStep(ProcessorStep):
"""Convert 6D rotation representation to axis-angle and reorganize action dimensions.
This processor step takes actions with 6D rotation representation and converts them to
axis-angle representation, reorganizing the action dimensions as:
- action[:, :3] -> target_eef (end-effector position)
- action[:, 3:9] -> 6D rotation (converted to axis-angle, 3D)
- action[:, 9:10] -> gripper action
Final output: [target_eef (3), axis_angle (3), gripper (1)] = 7D action
Args:
expected_action_dim: Expected input action dimension (default: 10, supports 6D rotation + extras)
"""
expected_action_dim: int = 10
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Convert 6D rotation to axis-angle in action."""
new_transition = transition.copy()
action = new_transition.get(TransitionKey.ACTION)
if action is None or not isinstance(action, torch.Tensor):
return new_transition
# Convert to numpy for processing
device = action.device
dtype = action.dtype
action_np = action.cpu().numpy()
# Extract components
# action shape: (B, D) where D >= 10
target_eef = action_np[:, :3] # (B, 3)
rotation_6d = action_np[:, 3:9] # (B, 6)
target_act = action_np[:, 9:10] # (B, 1)
# Convert 6D rotation to axis-angle
target_axis = Rotate6D_to_AxisAngle(rotation_6d) # (B, 3)
# Concatenate: [eef (3), axis_angle (3), gripper (1)] = 7D
action_np = np.concatenate([target_eef, target_axis, target_act], axis=-1)
# Convert back to tensor
action = torch.from_numpy(action_np).to(device=device, dtype=dtype)
new_transition[TransitionKey.ACTION] = action
return new_transition
def transform_features(self, features):
"""Rotation conversion changes action dimension from 10 to 7."""
# Note: This is a simplified version. In practice, you might want to
# update the action feature shape in the features dict.
return features
def get_config(self) -> dict[str, Any]:
"""Return serializable configuration."""
return {
"expected_action_dim": self.expected_action_dim,
}
-1
View File
@@ -55,7 +55,6 @@ from .core import EnvAction, EnvTransition, PolicyAction, RobotAction, Transitio
TInput = TypeVar("TInput")
TOutput = TypeVar("TOutput")
class ProcessorStepRegistry:
"""A registry for ProcessorStep classes to allow instantiation from a string name.
+11 -15
View File
@@ -88,7 +88,6 @@ from lerobot.utils.utils import (
init_logging,
inside_slurm,
)
from lerobot.policies.xvla.utils import Rotate6D_to_AxisAngle
def rollout(
env: gym.vector.VectorEnv,
@@ -163,24 +162,21 @@ def rollout(
all_observations.append(deepcopy(observation))
# Infer "task" from attributes of environments.
observation[f"observation.images.image"] = observation[f"observation.images.image"] * 255
observation[f"observation.images.image2"] = observation[f"observation.images.image2"] * 255
# TODO: works with SyncVectorEnv but not AsyncVectorEnv
observation = add_envs_task(env, observation)
# inputs = processor([observation[f"observation.images.image"], observation[f"observation.images.image2"]], observation["task"])
# Preprocess observation (includes image scaling and domain_id addition)
observation = preprocessor(observation)
observation["domain_id"] = torch.tensor([int(3)], dtype=torch.long).to("cuda")
breakpoint()
# Policy inference
with torch.inference_mode():
action = policy.select_action(observation).to("cpu").numpy()
# action = postprocessor(action)
target_eef = action[:, :3]
target_axis = Rotate6D_to_AxisAngle(action[:, 3:9])
target_act = action[:, 9:10]
action_numpy = np.concatenate([target_eef, target_axis, target_act], axis=-1)
# Convert to CPU / numpy.
# action_numpy: np.ndarray = action.to("cpu").numpy()
action = policy.select_action(observation)
# Postprocess action (includes rotation conversion and device transfer to CPU)
action = postprocessor(action)
# Convert to numpy
action_numpy: np.ndarray = action.numpy()
assert action_numpy.ndim == 2, "Action dimensions should be (batch, action_dim)"
# Apply the next action.