mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-11 14:49:43 +00:00
6e86a69dcd
* more changes * working changes * more changes * more fixes * fix style * more * clean * put axis-1 * more fixes * more styling fixes: * iterate on review: * more changes * add env processor * style * more changes * add docs * fix imports * fix test, add to train * Update src/lerobot/envs/factory.py Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co> Signed-off-by: Jade Choghari <chogharijade@gmail.com> * iterate on review --------- Signed-off-by: Jade Choghari <chogharijade@gmail.com> Co-authored-by: jade.choghari@huggingface.co <“chogharijade@gmail.com”> Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co>
419 lines
15 KiB
Plaintext
419 lines
15 KiB
Plaintext
# Environment Processors
|
|
|
|
Environment processors are a critical layer in LeRobot's data processing architecture that handle **environment-specific** transformations, separate from policy-specific processing. This separation of concerns enables cleaner code, better modularity, and easier experimentation with different environments and policies.
|
|
|
|
## Why Environment Processors?
|
|
|
|
When working with different robot environments (LIBERO, MetaWorld, Aloha, etc.), each environment often has unique data formats, coordinate systems, and conventions that need standardization **before** policy processing. Without environment processors, these transformations would be:
|
|
|
|
1. **Hardcoded in environment code** - Making it difficult to experiment with different state representations
|
|
2. **Duplicated across policies** - Each policy would need to handle environment-specific quirks
|
|
3. **Mixed with policy logic** - Violating separation of concerns and making debugging harder
|
|
|
|
Environment processors solve this by providing a **dedicated processing layer** between raw environment observations and policy inputs.
|
|
|
|
## The Processing Pipeline
|
|
|
|
Here's how data flows through the complete processing pipeline during evaluation:
|
|
|
|
```python
|
|
# In lerobot_eval.py rollout() function:
|
|
|
|
# 1. Raw environment observation (numpy arrays, various formats)
|
|
raw_observation = env.step(action)
|
|
|
|
# 2. Convert numpy to torch, normalize images [0,1]
|
|
observation = preprocess_observation(raw_observation)
|
|
|
|
# 3. Add task metadata (for multi-task environments)
|
|
observation = add_envs_task(env, observation)
|
|
|
|
# 4. ENVIRONMENT-SPECIFIC preprocessing (NEW!)
|
|
# - Flatten robot states
|
|
# - Rotate images to match dataset conventions
|
|
# - Handle environment-specific coordinate systems
|
|
observation = env_preprocessor(observation)
|
|
|
|
# 5. POLICY-SPECIFIC preprocessing
|
|
# - Normalize with dataset statistics
|
|
# - Add batch dimensions
|
|
# - Move to GPU
|
|
# - Tokenize language instructions
|
|
observation = preprocessor(observation)
|
|
|
|
# 6. Policy inference
|
|
action = policy.select_action(observation)
|
|
|
|
# 7. POLICY-SPECIFIC postprocessing
|
|
# - Unnormalize actions
|
|
# - Remove batch dimensions
|
|
action = postprocessor(action)
|
|
|
|
# 8. ENVIRONMENT-SPECIFIC postprocessing (NEW!)
|
|
# - Convert action formats if needed
|
|
# - Apply environment-specific constraints
|
|
action_transition = {"action": action}
|
|
action_transition = env_postprocessor(action_transition)
|
|
action = action_transition["action"]
|
|
|
|
# 9. Execute in environment
|
|
env.step(action)
|
|
```
|
|
|
|
## The Benefits
|
|
|
|
### 1. **Separation of Concerns**
|
|
|
|
Environment processors handle transformations specific to the **environment's data format**, while policy processors handle transformations specific to the **model's requirements**.
|
|
|
|
```python
|
|
# ❌ Before: Mixed concerns
|
|
class LiberoVLAPolicy:
|
|
def preprocess(self, obs):
|
|
# Environment-specific: Flatten robot state (shouldn't be in policy!)
|
|
state = self._flatten_robot_state(obs["robot_state"])
|
|
# Policy-specific: Normalize with dataset stats
|
|
state = self.normalizer(state)
|
|
return state
|
|
|
|
# ✅ After: Clear separation
|
|
# Environment processor: Handles LIBERO's nested robot state
|
|
env_preprocessor = LiberoProcessorStep() # Flattens robot_state
|
|
|
|
# Policy processor: Handles model requirements
|
|
policy_preprocessor = NormalizerProcessorStep(stats=dataset_stats)
|
|
```
|
|
|
|
### 2. **Flexibility and Reusability**
|
|
|
|
The same policy can work with different environment processors, and the same environment processor can work with different policies:
|
|
|
|
```python
|
|
# Use SmolVLA policy with LIBERO environment
|
|
libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(libero_cfg)
|
|
smolvla_preprocessor, smolvla_postprocessor = make_pre_post_processors(smolvla_cfg)
|
|
|
|
# Or use ACT policy with the same LIBERO environment
|
|
libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(libero_cfg)
|
|
act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg)
|
|
```
|
|
|
|
### 3. **Easier Experimentation**
|
|
|
|
Want to try different state representations for LIBERO? Just create a new processor:
|
|
|
|
```python
|
|
# Original: 8D state (pos + quat→axisangle + gripper)
|
|
@ProcessorStepRegistry.register("libero_processor")
|
|
class LiberoProcessorStep(ObservationProcessorStep):
|
|
def _process_observation(self, obs):
|
|
eef_pos = robot_state["eef"]["pos"] # 3D
|
|
eef_axisangle = quat2axisangle(quat) # 3D
|
|
gripper = robot_state["gripper"]["qpos"] # 2D
|
|
state = torch.cat([eef_pos, eef_axisangle, gripper], dim=-1) # 8D
|
|
return state
|
|
|
|
# Experiment: Add velocity for better control
|
|
@ProcessorStepRegistry.register("libero_velocity_processor")
|
|
class LiberoVelocityProcessorStep(ObservationProcessorStep):
|
|
def _process_observation(self, obs):
|
|
# Include velocities for 14D state
|
|
eef_pos = robot_state["eef"]["pos"] # 3D
|
|
eef_axisangle = quat2axisangle(quat) # 3D
|
|
eef_vel = robot_state["eef"]["vel"] # 3D (NEW)
|
|
gripper_pos = robot_state["gripper"]["qpos"] # 2D
|
|
gripper_vel = robot_state["gripper"]["qvel"] # 3D (NEW)
|
|
state = torch.cat([eef_pos, eef_axisangle, eef_vel,
|
|
gripper_pos, gripper_vel], dim=-1) # 14D
|
|
return state
|
|
```
|
|
|
|
### 4. **Cleaner Environment Code**
|
|
|
|
Environments expose **all available data** without needing to know what downstream models will use:
|
|
|
|
```python
|
|
# LIBERO environment exposes full robot state
|
|
observation = {
|
|
"pixels": {"image": img, "image2": img2},
|
|
"robot_state": {
|
|
"eef": {"pos": ..., "quat": ..., "vel": ..., "mat": ..., "axisangle": ...},
|
|
"gripper": {"qpos": ..., "qvel": ...},
|
|
"joints": {"pos": ..., "vel": ...}
|
|
}
|
|
}
|
|
|
|
# Environment processor decides what to use
|
|
# Policy processor handles model-specific transformations
|
|
```
|
|
|
|
## Using Environment Processors
|
|
|
|
### Factory Function
|
|
|
|
The `make_env_pre_post_processors` function follows the same pattern as `make_pre_post_processors` for policies:
|
|
|
|
```python
|
|
from lerobot.envs.factory import make_env_pre_post_processors
|
|
from lerobot.envs.configs import LiberoEnv, PushtEnv
|
|
|
|
# For LIBERO: Returns LiberoProcessorStep in preprocessor
|
|
libero_cfg = LiberoEnv(task="libero_spatial", camera_name=["agentview"])
|
|
env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg)
|
|
|
|
# For other environments: Returns identity processors (no-op)
|
|
pusht_cfg = PushtEnv()
|
|
env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg)
|
|
```
|
|
|
|
### Implementation in `envs/factory.py`
|
|
|
|
```python
|
|
def make_env_pre_post_processors(
|
|
env_cfg: EnvConfig,
|
|
) -> tuple[
|
|
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
|
|
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
|
|
]:
|
|
"""
|
|
Create preprocessor and postprocessor pipelines for environment observations.
|
|
|
|
Args:
|
|
env_cfg: The configuration of the environment.
|
|
|
|
Returns:
|
|
A tuple containing:
|
|
- preprocessor: Pipeline that processes environment observations
|
|
- postprocessor: Pipeline that processes environment outputs
|
|
"""
|
|
# For LIBERO environments, add the LiberoProcessorStep to preprocessor
|
|
if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
|
|
preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()])
|
|
else:
|
|
# For all other environments, return an identity preprocessor
|
|
preprocessor = PolicyProcessorPipeline(steps=[])
|
|
|
|
# Postprocessor is currently identity for all environments
|
|
# Future: Could add environment-specific action transformations
|
|
postprocessor = PolicyProcessorPipeline(steps=[])
|
|
|
|
return preprocessor, postprocessor
|
|
```
|
|
|
|
### Integration in Evaluation
|
|
|
|
In `lerobot_eval.py`, the environment processors are created once and used throughout:
|
|
|
|
```python
|
|
def eval_main(cfg: EvalPipelineConfig):
|
|
# Create environment
|
|
envs = make_env(cfg.env, n_envs=cfg.eval.batch_size)
|
|
|
|
# Create policy
|
|
policy = make_policy(cfg=cfg.policy, env_cfg=cfg.env)
|
|
|
|
# Create policy processors
|
|
preprocessor, postprocessor = make_pre_post_processors(
|
|
policy_cfg=cfg.policy,
|
|
pretrained_path=cfg.policy.pretrained_path,
|
|
)
|
|
|
|
# Create environment processors (NEW!)
|
|
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env)
|
|
|
|
# Run evaluation with both processor types
|
|
eval_policy_all(
|
|
envs=envs,
|
|
policy=policy,
|
|
env_preprocessor=env_preprocessor, # Environment-specific
|
|
env_postprocessor=env_postprocessor, # Environment-specific
|
|
preprocessor=preprocessor, # Policy-specific
|
|
postprocessor=postprocessor, # Policy-specific
|
|
n_episodes=cfg.eval.n_episodes,
|
|
)
|
|
```
|
|
|
|
## Example: LIBERO Environment Processor
|
|
|
|
The `LiberoProcessorStep` demonstrates a real-world environment processor:
|
|
|
|
```python
|
|
from lerobot.processor.pipeline import ObservationProcessorStep
|
|
|
|
@dataclass
|
|
@ProcessorStepRegistry.register(name="libero_processor")
|
|
class LiberoProcessorStep(ObservationProcessorStep):
|
|
"""
|
|
Processes LIBERO observations into the LeRobot format.
|
|
|
|
**State Processing:**
|
|
- Extracts end-effector position (3D)
|
|
- Converts quaternion to axis-angle representation (3D)
|
|
- Extracts gripper joint positions (2D)
|
|
- Concatenates into 8D state vector
|
|
|
|
**Image Processing:**
|
|
- Rotates images 180° to match HuggingFaceVLA/libero convention
|
|
"""
|
|
|
|
def _process_observation(self, observation):
|
|
processed_obs = observation.copy()
|
|
|
|
# Process images: Flip 180° for camera convention
|
|
for key in list(processed_obs.keys()):
|
|
if key.startswith("observation.images."):
|
|
img = processed_obs[key]
|
|
img = torch.flip(img, dims=[2, 3]) # Flip H and W
|
|
processed_obs[key] = img
|
|
|
|
# Process robot_state: Flatten to 8D vector
|
|
if "observation.robot_state" in processed_obs:
|
|
robot_state = processed_obs.pop("observation.robot_state")
|
|
|
|
eef_pos = robot_state["eef"]["pos"] # (B, 3)
|
|
eef_quat = robot_state["eef"]["quat"] # (B, 4)
|
|
gripper_qpos = robot_state["gripper"]["qpos"] # (B, 2)
|
|
|
|
# Convert quaternion to axis-angle
|
|
eef_axisangle = self._quat2axisangle(eef_quat) # (B, 3)
|
|
|
|
# Concatenate into single state vector
|
|
state = torch.cat((eef_pos, eef_axisangle, gripper_qpos), dim=-1)
|
|
state = state.float()
|
|
|
|
processed_obs["observation.state"] = state
|
|
|
|
return processed_obs
|
|
```
|
|
|
|
### Why These Transformations?
|
|
|
|
1. **Image Rotation**: The HuggingFaceVLA/libero dataset has images rotated 180° from the raw LIBERO simulator. The processor handles this convention mismatch so policies trained on the dataset work seamlessly.
|
|
|
|
2. **State Flattening**: The raw LIBERO environment exposes nested dictionaries with all available state information (position, quaternion, velocity, matrix representation, etc.). The processor:
|
|
- Selects the relevant components (pos, quat, gripper)
|
|
- Converts quaternion to axis-angle (more suitable for learning)
|
|
- Flattens to a single 8D vector that policies expect
|
|
|
|
3. **Flexibility**: The environment still exposes **all** raw data. If you want to try different state representations (e.g., including velocities, using matrix representation instead of axis-angle), you can create a new processor without modifying the environment code.
|
|
|
|
## Adding Environment Processors for New Environments
|
|
|
|
To add environment processors for a new environment:
|
|
|
|
### 1. Create the Processor Step
|
|
|
|
```python
|
|
# In src/lerobot/processor/env_processor.py
|
|
|
|
@dataclass
|
|
@ProcessorStepRegistry.register(name="myenv_processor")
|
|
class MyEnvProcessorStep(ObservationProcessorStep):
|
|
"""Process observations from MyEnv."""
|
|
|
|
def _process_observation(self, observation):
|
|
processed = observation.copy()
|
|
|
|
# Your environment-specific transformations
|
|
if "myenv.specific.state" in processed:
|
|
state = processed.pop("myenv.specific.state")
|
|
# Transform to standard format
|
|
processed["observation.state"] = self._transform_state(state)
|
|
|
|
return processed
|
|
```
|
|
|
|
### 2. Update the Factory
|
|
|
|
```python
|
|
# In src/lerobot/envs/factory.py
|
|
|
|
def make_env_pre_post_processors(env_cfg: EnvConfig):
|
|
if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
|
|
preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()])
|
|
elif isinstance(env_cfg, MyEnvConfig) or "myenv" in env_cfg.type:
|
|
preprocessor = PolicyProcessorPipeline(steps=[MyEnvProcessorStep()])
|
|
else:
|
|
preprocessor = PolicyProcessorPipeline(steps=[])
|
|
|
|
postprocessor = PolicyProcessorPipeline(steps=[])
|
|
return preprocessor, postprocessor
|
|
```
|
|
|
|
### 3. Use in Evaluation
|
|
|
|
No changes needed! The evaluation script automatically uses the appropriate processor:
|
|
|
|
```bash
|
|
lerobot-eval \
|
|
--policy.path=lerobot/my_policy \
|
|
--env.type=myenv \ # Automatically uses MyEnvProcessorStep
|
|
--eval.n_episodes=10
|
|
```
|
|
|
|
## Future: Environment Postprocessors
|
|
|
|
Currently, postprocessors are identity (no-op) for all environments. Future use cases include:
|
|
|
|
### Action Space Transformations
|
|
|
|
```python
|
|
@dataclass
|
|
class MyEnvActionPostprocessor(ProcessorStep):
|
|
"""Convert policy actions to environment-specific format."""
|
|
|
|
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
|
action = transition["action"]
|
|
|
|
# Example: Convert from Cartesian to joint space
|
|
if self.action_space == "joint":
|
|
action = self.ik_solver(action)
|
|
|
|
# Example: Apply environment-specific safety limits
|
|
action = torch.clamp(action, self.min_action, self.max_action)
|
|
|
|
transition["action"] = action
|
|
return transition
|
|
```
|
|
|
|
### Coordinate System Conversions
|
|
|
|
```python
|
|
@dataclass
|
|
class CoordinateTransformPostprocessor(ProcessorStep):
|
|
"""Transform actions between coordinate systems."""
|
|
|
|
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
|
action = transition["action"]
|
|
|
|
# Example: Policy outputs in world frame, env expects base frame
|
|
action = self.world_to_base_transform(action)
|
|
|
|
transition["action"] = action
|
|
return transition
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
1. **Keep environment processors simple**: They should only handle environment-specific data format issues, not complex learning-related transformations.
|
|
|
|
2. **Use policy processors for model requirements**: Normalization, batching, device placement, and tokenization belong in policy processors.
|
|
|
|
3. **Expose all data from environments**: Let processors decide what to use rather than hardcoding choices in the environment.
|
|
|
|
4. **Document conventions**: Clearly document any coordinate system conventions, camera orientations, or data formats that your processor handles.
|
|
|
|
5. **Test independently**: Environment processors should be testable without loading full policies or environments.
|
|
|
|
## Summary
|
|
|
|
Environment processors provide a **clean separation** between environment-specific data transformations and policy-specific model requirements. This architecture:
|
|
|
|
- ✅ Enables easy experimentation with different state representations
|
|
- ✅ Allows policies to work seamlessly across different environments
|
|
- ✅ Keeps environment code focused on simulation/hardware interface
|
|
- ✅ Makes processor pipelines more maintainable and debuggable
|
|
- ✅ Follows the single responsibility principle
|
|
|
|
The key insight: **Environments define data formats, processors standardize them, policies consume standardized data.** Each layer has a clear, focused responsibility.
|