chore (output format): improves output format

This commit is contained in:
Adil Zouitine
2025-07-06 22:03:37 +02:00
parent 730c7b2f35
commit 83a4338f8b
4 changed files with 487 additions and 63 deletions
+440 -21
View File
@@ -95,9 +95,13 @@ This approach has several problems:
RobotProcessor solves these issues by providing a declarative pipeline approach where each transformation is a separate, testable, shareable component.
## Understanding EnvTransition
## Understanding EnvTransition and Batch Format
Before diving into RobotProcessor, let's understand the data structure it operates on. An `EnvTransition` is a 7-tuple that represents a complete transition in the environment:
RobotProcessor works with two data formats:
### 1. EnvTransition Tuple Format
An `EnvTransition` is a 7-tuple that represents a complete transition in the environment:
```python
from lerobot.processor.pipeline import TransitionIndex
@@ -105,24 +109,59 @@ from lerobot.processor.pipeline import TransitionIndex
# EnvTransition structure:
# (observation, action, reward, done, truncated, info, complementary_data)
transition = (
{"pixels": ..., "agent_pos": ...}, # observation at time t
[0.1, -0.2, 0.3], # action taken at time t
1.0, # reward received
False, # episode done flag
False, # episode truncated flag
{"success": True}, # additional info from environment
{"step_idx": 42} # complementary_data for inter-step communication
{"observation.image": ..., "observation.state": ...}, # observation at time t
[0.1, -0.2, 0.3], # action taken at time t
1.0, # reward received
False, # episode done flag
False, # episode truncated flag
{"success": True}, # additional info from environment
{"step_idx": 42} # complementary_data for inter-step communication
)
```
Each element serves a specific purpose:
1. **observation**: Raw sensor data from the environment (images, states, etc.)
2. **action**: The action command sent to the robot
3. **reward**: Scalar reward signal (for RL tasks)
4. **done**: Boolean indicating natural episode termination
5. **truncated**: Boolean indicating artificial termination (time limit, safety)
6. **info**: Dictionary with environment-specific information
7. **complementary_data**: Dictionary for passing data between processor steps (NEW!)
### 2. Batch Dictionary Format
This is the format used by LeRobot datasets and replay buffers:
```python
# Batch dictionary format (used by LeRobotDataset, ReplayBuffer)
batch = {
"observation.image": torch.tensor(...), # Image observations
"observation.state": torch.tensor(...), # State observations
"action": torch.tensor(...), # Actions
"next.reward": torch.tensor(...), # Rewards
"next.done": torch.tensor(...), # Done flags
"next.truncated": torch.tensor(...), # Truncated flags
"info": {...}, # Info dictionary
# Additional keys are preserved but ignored during conversion
}
```
### Automatic Format Conversion
RobotProcessor automatically handles both formats:
```python
from lerobot.processor.pipeline import RobotProcessor
from lerobot.processor.observation_processor import ImageProcessor
processor = RobotProcessor([ImageProcessor()])
# Works with EnvTransition tuples
transition = ({"pixels": image_array}, None, 0.0, False, False, {}, {})
processed_transition = processor(transition) # Returns EnvTransition tuple
# Also works with batch dictionaries
batch = {
"observation.pixels": image_tensor,
"action": action_tensor,
"next.reward": reward_tensor,
"next.done": done_tensor,
"next.truncated": truncated_tensor,
"info": info_dict
}
processed_batch = processor(batch) # Returns batch dictionary
```
### Using TransitionIndex
@@ -145,6 +184,228 @@ info = transition[TransitionIndex.INFO]
comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA]
```
### Default Conversion Functions
RobotProcessor uses these default conversion functions:
```python
def _default_batch_to_transition(batch):
"""Default conversion from batch dict to EnvTransition tuple."""
# Extract observation keys (anything starting with "observation.")
observation_keys = {k: v for k, v in batch.items() if k.startswith("observation.")}
observation = None
if observation_keys:
observation = {}
# Keep observation.* keys as-is (don't remove "observation." prefix)
for key, value in observation_keys.items():
observation[key] = value
return (
observation,
batch.get("action"),
batch.get("next.reward", 0.0), # Note: "next.reward" not "reward"
batch.get("next.done", False), # Note: "next.done" not "done"
batch.get("next.truncated", False), # Note: "next.truncated" not "truncated"
batch.get("info", {}),
{}, # Empty complementary_data
)
def _default_transition_to_batch(transition):
"""Default conversion from EnvTransition tuple to batch dict."""
obs, action, reward, done, truncated, info, _ = transition
batch = {
"action": action,
"next.reward": reward, # Note: "next.reward" not "reward"
"next.done": done, # Note: "next.done" not "done"
"next.truncated": truncated, # Note: "next.truncated" not "truncated"
"info": info,
}
# Flatten observation dict (keep observation.* keys as-is)
if isinstance(obs, dict):
for key, value in obs.items():
batch[key] = value
return batch
```
### Custom Conversion Functions
You can customize how RobotProcessor converts between formats:
```python
def custom_batch_to_transition(batch):
"""Custom conversion from batch dict to EnvTransition tuple."""
# Extract observation keys (anything starting with "observation.")
observation = {k: v for k, v in batch.items() if k.startswith("observation.")}
return (
observation,
batch.get("action"),
batch.get("reward", 0.0), # Use "reward" instead of "next.reward"
batch.get("done", False), # Use "done" instead of "next.done"
batch.get("truncated", False),
batch.get("info", {}),
batch.get("complementary_data", {})
)
def custom_transition_to_batch(transition):
"""Custom conversion from EnvTransition tuple to batch dict."""
obs, action, reward, done, truncated, info, comp_data = transition
batch = {
"action": action,
"reward": reward, # Use "reward" instead of "next.reward"
"done": done, # Use "done" instead of "next.done"
"truncated": truncated,
"info": info,
}
# Flatten observation dict
if obs:
batch.update(obs)
return batch
# Use custom converters
processor = RobotProcessor(
steps=[ImageProcessor()],
to_transition=custom_batch_to_transition,
to_output=custom_transition_to_batch
)
```
### Advanced: Controlling Output Format with `to_output`
The `to_output` function determines what format is returned when you call the processor with a batch dictionary. Sometimes you want to output `EnvTransition` tuples even when you input batch dictionaries:
```python
# Identity function to always return EnvTransition tuples
def keep_as_transition(transition):
"""Always return EnvTransition tuple regardless of input format."""
return transition
# Processor that always outputs EnvTransition tuples
processor = RobotProcessor(
steps=[ImageProcessor(), StateProcessor()],
to_output=keep_as_transition # Always return tuple format
)
# Even when called with batch dict, returns EnvTransition tuple
batch = {
"observation.image": image_tensor,
"action": action_tensor,
"next.reward": reward_tensor,
"next.done": done_tensor,
"next.truncated": truncated_tensor,
"info": info_dict
}
result = processor(batch) # Returns EnvTransition tuple, not batch dict!
print(type(result)) # <class 'tuple'>
```
### Real-World Example: Environment Interaction
This is particularly useful for environment interaction where you want consistent tuple output:
```python
from lerobot.processor.observation_processor import VanillaObservationProcessor
# Create processor that always outputs EnvTransition for environment interaction
# This avoids format conversion overhead during real-time control
env_processor = RobotProcessor(
[VanillaObservationProcessor()],
to_transition=lambda x: x, # Pass through - no conversion needed
to_output=lambda x: x, # Always return EnvTransition tuple
)
# Environment interaction loop
env = make_env()
obs, info = env.reset()
for step in range(1000):
# Create transition - input is already in tuple format
transition = (obs, None, 0.0, False, False, info, {"step": step})
# Process - output is guaranteed to be EnvTransition tuple
processed_transition = env_processor(transition)
processed_obs = processed_transition[TransitionIndex.OBSERVATION]
# Use with policy
action = policy.select_action(processed_obs)
obs, reward, done, truncated, info = env.step(action)
if done or truncated:
break
```
### When to Use Different Output Formats
**Use EnvTransition tuple output when:**
- Environment interaction and real-time control
- You need to access individual transition components frequently
- Performance is critical (avoids dictionary creation overhead)
- Working with gym environments that expect tuple format
**Use batch dictionary output when:**
- Training with LeRobot datasets
- Working with DataLoaders and batched processing
- Interfacing with existing LeRobot training code
- You need the standardized "next.*" key format
```python
# For environment interaction - use tuple output
env_processor = RobotProcessor(
steps=[ImageProcessor(), StateProcessor()],
to_output=lambda x: x # Return EnvTransition tuple
)
# For training - use batch output (default)
train_processor = RobotProcessor(
steps=[ImageProcessor(), StateProcessor(), NormalizerProcessor(...)],
# to_output defaults to _default_transition_to_batch
)
# Training loop
for batch in dataloader:
processed_batch = train_processor(batch) # Returns batch dict
loss = model.compute_loss(processed_batch)
# Environment loop
for step in range(1000):
transition = (obs, None, 0.0, False, False, info, {})
processed_transition = env_processor(transition) # Returns EnvTransition tuple
obs = processed_transition[TransitionIndex.OBSERVATION]
action = policy.select_action(obs)
```
### Why "next.reward", "next.done", "next.truncated"?
The default conversion uses "next.*" prefixes because this matches the standard format used by LeRobot datasets and follows the convention that rewards, done flags, and truncated flags are the result of taking an action (i.e., they come from the "next" state):
```python
# Standard RL transition format
# (s_t, a_t, r_{t+1}, done_{t+1}, truncated_{t+1})
# ^ ^ ^ ^ ^
# | | | | |
# | | | | +-- Result of action a_t
# | | | +-- Result of action a_t
# | | +-- Result of action a_t
# | +-- Action taken in state s_t
# +-- State at time t
batch = {
"observation.state": s_t,
"action": a_t,
"next.reward": r_{t+1}, # Reward received after taking action
"next.done": done_{t+1}, # Done flag after taking action
"next.truncated": truncated_{t+1}, # Truncated flag after taking action
}
```
## Your First RobotProcessor
Let's create a processor that properly handles image and state preprocessing:
@@ -184,6 +445,111 @@ print("Image range:", processed_obs["observation.images.camera_front"].min().ite
"to", processed_obs["observation.images.camera_front"].max().item()) # 0.0 to 1.0
```
## Working with LeRobot Datasets and Replay Buffers
RobotProcessor seamlessly works with LeRobot's batch dictionary format:
```python
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.processor.pipeline import RobotProcessor
from lerobot.processor.normalize_processor import NormalizerProcessor
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
# Load a dataset
dataset = LeRobotDataset("lerobot/pusht")
# Define features and normalization
features = {
"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96)),
"observation.state": PolicyFeature(FeatureType.STATE, (2,)),
"action": PolicyFeature(FeatureType.ACTION, (2,)),
}
norm_map = {
FeatureType.VISUAL: NormalizationMode.MEAN_STD,
FeatureType.STATE: NormalizationMode.MIN_MAX,
FeatureType.ACTION: NormalizationMode.MEAN_STD,
}
# Create processor with normalization
processor = RobotProcessor([
NormalizerProcessor.from_lerobot_dataset(dataset, features, norm_map),
])
# Process a batch from the dataset
batch = dataset[0] # Get first batch
print("Original batch keys:", list(batch.keys()))
print("Original image shape:", batch["observation.image"].shape)
# Process the batch - automatically converts to/from batch format
processed_batch = processor(batch)
print("Processed batch keys:", list(processed_batch.keys()))
print("Processed image range:", processed_batch["observation.image"].min().item(),
"to", processed_batch["observation.image"].max().item())
# Use with DataLoader
from torch.utils.data import DataLoader
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
for batch in dataloader:
# Process entire batch at once
processed_batch = processor(batch)
# Use processed batch for training
# model.train_step(processed_batch)
break
```
## Integration with Replay Buffers
RobotProcessor works great with replay buffers for online learning:
```python
from lerobot.common.utils.buffer import ReplayBuffer
from lerobot.processor.pipeline import RobotProcessor
from lerobot.processor.device_processor import DeviceProcessor
# Create replay buffer
buffer = ReplayBuffer(capacity=10000)
# Create processor for online data
online_processor = RobotProcessor([
ImageProcessor(),
StateProcessor(),
DeviceProcessor(device="cuda"),
])
# During environment interaction
env = make_env()
obs, info = env.reset()
for step in range(1000):
# Raw environment observation
transition = (obs, None, 0.0, False, False, info, {})
# Process for policy input
processed_transition = online_processor(transition)
processed_obs = processed_transition[TransitionIndex.OBSERVATION]
# Get action from policy
action = policy.select_action(processed_obs)
# Execute action
next_obs, reward, done, truncated, info = env.step(action)
# Store in replay buffer (can store either format)
buffer.add(obs, action, reward, next_obs, done, truncated, info)
obs = next_obs
if done or truncated:
obs, info = env.reset()
# Sample and process batches for training
batch = buffer.sample(batch_size=32)
processed_batch = online_processor(batch) # Processes entire batch
```
## Creating Custom Steps: The ProcessorStep Protocol
A processor step must follow certain conventions. Let's create a complete example that shows all required and optional methods:
@@ -689,7 +1055,7 @@ def state_dict(self) -> Dict[str, torch.Tensor]:
## Complete Policy Example with Pre and Post Processing
Here's how to use RobotProcessor in a real robot control loop:
Here's how to use RobotProcessor in a real robot control loop, showing both tuple and batch formats:
```python
from lerobot.processor.pipeline import RobotProcessor, ProcessorStepRegistry, TransitionIndex
@@ -739,7 +1105,7 @@ preprocessor = preprocessor.to("cuda")
postprocessor = postprocessor.to("cuda")
policy = policy.to("cuda")
# Control loop
# Control loop using EnvTransition format
env = make_robot_env()
obs, info = env.reset()
@@ -750,7 +1116,7 @@ for episode in range(10):
# Create transition with raw observation
transition = (obs, None, 0.0, False, False, info, {"step": step})
# Preprocess
# Preprocess - works with tuple format
processed_transition = preprocessor(transition)
processed_obs = processed_transition[TransitionIndex.OBSERVATION]
@@ -784,6 +1150,47 @@ for episode in range(10):
# Save preprocessor with learned statistics
preprocessor.save_pretrained(f"./checkpoints/preprocessor_ep{episode}")
# Alternative: Using batch dictionary format
# This is useful when integrating with existing LeRobot training code
def control_loop_with_batch_format():
"""Example using batch dictionary format."""
obs, info = env.reset()
for step in range(1000):
# Create batch dictionary
batch = {
"observation.image": torch.from_numpy(obs["pixels"]).unsqueeze(0),
"observation.state": torch.from_numpy(obs["agent_pos"]).unsqueeze(0),
"action": torch.zeros(1, 7), # Placeholder
"next.reward": torch.tensor([0.0]),
"next.done": torch.tensor([False]),
"next.truncated": torch.tensor([False]),
"info": info,
}
# Preprocess - works with batch format
processed_batch = preprocessor(batch)
# Get action from policy
with torch.no_grad():
action = policy.select_action({
k: v for k, v in processed_batch.items()
if k.startswith("observation.")
})
# Add action to batch for postprocessing
processed_batch["action"] = action
# Postprocess
final_batch = postprocessor(processed_batch)
final_action = final_batch["action"]
# Execute action
obs, reward, terminated, truncated, info = env.step(final_action.cpu().numpy())
if terminated or truncated:
break
# Push final version to hub
preprocessor.push_to_hub("my-username/act-preprocessor")
postprocessor.push_to_hub("my-username/act-postprocessor")
@@ -834,6 +1241,9 @@ processor.register_after_step_hook(validate_tensors)
RobotProcessor provides a powerful, modular approach to data preprocessing in robotics:
- **Dual format support**: Works seamlessly with both EnvTransition tuples and batch dictionaries
- **Automatic format conversion**: Converts between tuple and batch formats as needed
- **LeRobot integration**: Native support for LeRobotDataset and ReplayBuffer formats
- **Clear separation of concerns**: Each transformation is a separate, testable unit
- **Proper state management**: Clear distinction between config (JSON) and state (tensors)
- **Device-aware**: Seamless GPU/CPU transfers with `.to(device)`
@@ -841,7 +1251,16 @@ RobotProcessor provides a powerful, modular approach to data preprocessing in ro
- **Easy sharing**: Push to Hugging Face Hub for reproducibility
- **Type safety**: Use `TransitionIndex` instead of magic numbers
- **Debugging tools**: Step through transformations and add monitoring hooks
- **Flexible conversion**: Customize `to_transition` and `to_output` functions for specific needs
By following these patterns, your preprocessing code becomes more maintainable, shareable, and robust.
Key advantages of the dual format approach:
- **Environment interaction**: Use tuple format for real-time robot control
- **Training/evaluation**: Use batch format for dataset processing and model training
- **Seamless integration**: Same processor works with both formats automatically
- **Backward compatibility**: Existing code using either format continues to work
- **Output format control**: Use `to_output` to control return format regardless of input format
- **Performance optimization**: Avoid unnecessary format conversions during time-critical operations
By following these patterns, your preprocessing code becomes more maintainable, shareable, and robust while being compatible with the entire LeRobot ecosystem.
For the full API reference, see the [RobotProcessor API documentation](/api/processor).