From 61f8c8a0f2589d5fe5226ebfb6ede41255428ef8 Mon Sep 17 00:00:00 2001 From: Jade Choghari Date: Tue, 18 Nov 2025 18:37:27 +0100 Subject: [PATCH] add docs --- docs/source/_toctree.yml | 2 + docs/source/env_processor.mdx | 418 +++++++++++++++++++++++++ src/lerobot/processor/env_processor.py | 4 +- 3 files changed, 423 insertions(+), 1 deletion(-) diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index 0cf8aa9a6..260d2ef0a 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -59,6 +59,8 @@ title: Implement your own processor - local: processors_robots_teleop title: Processors for Robots and Teleoperators + - local: env_processor + title: Environment Processors title: "Robot Processors" - sections: - local: so101 diff --git a/docs/source/env_processor.mdx b/docs/source/env_processor.mdx index e69de29bb..8dbf315c7 100644 --- a/docs/source/env_processor.mdx +++ b/docs/source/env_processor.mdx @@ -0,0 +1,418 @@ +# Environment Processors + +Environment processors are a critical layer in LeRobot's data processing architecture that handle **environment-specific** transformations, separate from policy-specific processing. This separation of concerns enables cleaner code, better modularity, and easier experimentation with different environments and policies. + +## Why Environment Processors? + +When working with different robot environments (LIBERO, MetaWorld, Aloha, etc.), each environment often has unique data formats, coordinate systems, and conventions that need standardization **before** policy processing. Without environment processors, these transformations would be: + +1. **Hardcoded in environment code** - Making it difficult to experiment with different state representations +2. **Duplicated across policies** - Each policy would need to handle environment-specific quirks +3. **Mixed with policy logic** - Violating separation of concerns and making debugging harder + +Environment processors solve this by providing a **dedicated processing layer** between raw environment observations and policy inputs. + +## The Processing Pipeline + +Here's how data flows through the complete processing pipeline during evaluation: + +```python +# In lerobot_eval.py rollout() function: + +# 1. Raw environment observation (numpy arrays, various formats) +raw_observation = env.step(action) + +# 2. Convert numpy to torch, normalize images [0,1] +observation = preprocess_observation(raw_observation) + +# 3. Add task metadata (for multi-task environments) +observation = add_envs_task(env, observation) + +# 4. ENVIRONMENT-SPECIFIC preprocessing (NEW!) +# - Flatten robot states +# - Rotate images to match dataset conventions +# - Handle environment-specific coordinate systems +observation = env_preprocessor(observation) + +# 5. POLICY-SPECIFIC preprocessing +# - Normalize with dataset statistics +# - Add batch dimensions +# - Move to GPU +# - Tokenize language instructions +observation = preprocessor(observation) + +# 6. Policy inference +action = policy.select_action(observation) + +# 7. POLICY-SPECIFIC postprocessing +# - Unnormalize actions +# - Remove batch dimensions +action = postprocessor(action) + +# 8. ENVIRONMENT-SPECIFIC postprocessing (NEW!) +# - Convert action formats if needed +# - Apply environment-specific constraints +action_transition = {"action": action} +action_transition = env_postprocessor(action_transition) +action = action_transition["action"] + +# 9. Execute in environment +env.step(action) +``` + +## The Benefits + +### 1. **Separation of Concerns** + +Environment processors handle transformations specific to the **environment's data format**, while policy processors handle transformations specific to the **model's requirements**. + +```python +# ❌ Before: Mixed concerns +class LiberoVLAPolicy: + def preprocess(self, obs): + # Environment-specific: Flatten robot state (shouldn't be in policy!) + state = self._flatten_robot_state(obs["robot_state"]) + # Policy-specific: Normalize with dataset stats + state = self.normalizer(state) + return state + +# ✅ After: Clear separation +# Environment processor: Handles LIBERO's nested robot state +env_preprocessor = LiberoProcessorStep() # Flattens robot_state + +# Policy processor: Handles model requirements +policy_preprocessor = NormalizerProcessorStep(stats=dataset_stats) +``` + +### 2. **Flexibility and Reusability** + +The same policy can work with different environment processors, and the same environment processor can work with different policies: + +```python +# Use SmolVLA policy with LIBERO environment +libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(libero_cfg) +smolvla_preprocessor, smolvla_postprocessor = make_pre_post_processors(smolvla_cfg) + +# Or use ACT policy with the same LIBERO environment +libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(libero_cfg) +act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg) +``` + +### 3. **Easier Experimentation** + +Want to try different state representations for LIBERO? Just create a new processor: + +```python +# Original: 8D state (pos + quat→axisangle + gripper) +@ProcessorStepRegistry.register("libero_processor") +class LiberoProcessorStep(ObservationProcessorStep): + def _process_observation(self, obs): + eef_pos = robot_state["eef"]["pos"] # 3D + eef_axisangle = quat2axisangle(quat) # 3D + gripper = robot_state["gripper"]["qpos"] # 2D + state = torch.cat([eef_pos, eef_axisangle, gripper], dim=-1) # 8D + return state + +# Experiment: Add velocity for better control +@ProcessorStepRegistry.register("libero_velocity_processor") +class LiberoVelocityProcessorStep(ObservationProcessorStep): + def _process_observation(self, obs): + # Include velocities for 14D state + eef_pos = robot_state["eef"]["pos"] # 3D + eef_axisangle = quat2axisangle(quat) # 3D + eef_vel = robot_state["eef"]["vel"] # 3D (NEW) + gripper_pos = robot_state["gripper"]["qpos"] # 2D + gripper_vel = robot_state["gripper"]["qvel"] # 3D (NEW) + state = torch.cat([eef_pos, eef_axisangle, eef_vel, + gripper_pos, gripper_vel], dim=-1) # 14D + return state +``` + +### 4. **Cleaner Environment Code** + +Environments expose **all available data** without needing to know what downstream models will use: + +```python +# LIBERO environment exposes full robot state +observation = { + "pixels": {"image": img, "image2": img2}, + "robot_state": { + "eef": {"pos": ..., "quat": ..., "vel": ..., "mat": ..., "axisangle": ...}, + "gripper": {"qpos": ..., "qvel": ...}, + "joints": {"pos": ..., "vel": ...} + } +} + +# Environment processor decides what to use +# Policy processor handles model-specific transformations +``` + +## Using Environment Processors + +### Factory Function + +The `make_env_pre_post_processors` function follows the same pattern as `make_pre_post_processors` for policies: + +```python +from lerobot.envs.factory import make_env_pre_post_processors +from lerobot.envs.configs import LiberoEnv, PushtEnv + +# For LIBERO: Returns LiberoProcessorStep in preprocessor +libero_cfg = LiberoEnv(task="libero_spatial", camera_name=["agentview"]) +env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg) + +# For other environments: Returns identity processors (no-op) +pusht_cfg = PushtEnv() +env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg) +``` + +### Implementation in `envs/factory.py` + +```python +def make_env_pre_post_processors( + env_cfg: EnvConfig, +) -> tuple[ + PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], + PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], +]: + """ + Create preprocessor and postprocessor pipelines for environment observations. + + Args: + env_cfg: The configuration of the environment. + + Returns: + A tuple containing: + - preprocessor: Pipeline that processes environment observations + - postprocessor: Pipeline that processes environment outputs + """ + # For LIBERO environments, add the LiberoProcessorStep to preprocessor + if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type: + preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()]) + else: + # For all other environments, return an identity preprocessor + preprocessor = PolicyProcessorPipeline(steps=[]) + + # Postprocessor is currently identity for all environments + # Future: Could add environment-specific action transformations + postprocessor = PolicyProcessorPipeline(steps=[]) + + return preprocessor, postprocessor +``` + +### Integration in Evaluation + +In `lerobot_eval.py`, the environment processors are created once and used throughout: + +```python +def eval_main(cfg: EvalPipelineConfig): + # Create environment + envs = make_env(cfg.env, n_envs=cfg.eval.batch_size) + + # Create policy + policy = make_policy(cfg=cfg.policy, env_cfg=cfg.env) + + # Create policy processors + preprocessor, postprocessor = make_pre_post_processors( + policy_cfg=cfg.policy, + pretrained_path=cfg.policy.pretrained_path, + ) + + # Create environment processors (NEW!) + env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env) + + # Run evaluation with both processor types + eval_policy_all( + envs=envs, + policy=policy, + env_preprocessor=env_preprocessor, # Environment-specific + env_postprocessor=env_postprocessor, # Environment-specific + preprocessor=preprocessor, # Policy-specific + postprocessor=postprocessor, # Policy-specific + n_episodes=cfg.eval.n_episodes, + ) +``` + +## Example: LIBERO Environment Processor + +The `LiberoProcessorStep` demonstrates a real-world environment processor: + +```python +from lerobot.processor.pipeline import ObservationProcessorStep + +@dataclass +@ProcessorStepRegistry.register(name="libero_processor") +class LiberoProcessorStep(ObservationProcessorStep): + """ + Processes LIBERO observations into the LeRobot format. + + **State Processing:** + - Extracts end-effector position (3D) + - Converts quaternion to axis-angle representation (3D) + - Extracts gripper joint positions (2D) + - Concatenates into 8D state vector + + **Image Processing:** + - Rotates images 180° to match HuggingFaceVLA/libero convention + """ + + def _process_observation(self, observation): + processed_obs = observation.copy() + + # Process images: Flip 180° for camera convention + for key in list(processed_obs.keys()): + if key.startswith("observation.images."): + img = processed_obs[key] + img = torch.flip(img, dims=[2, 3]) # Flip H and W + processed_obs[key] = img + + # Process robot_state: Flatten to 8D vector + if "observation.robot_state" in processed_obs: + robot_state = processed_obs.pop("observation.robot_state") + + eef_pos = robot_state["eef"]["pos"] # (B, 3) + eef_quat = robot_state["eef"]["quat"] # (B, 4) + gripper_qpos = robot_state["gripper"]["qpos"] # (B, 2) + + # Convert quaternion to axis-angle + eef_axisangle = self._quat2axisangle(eef_quat) # (B, 3) + + # Concatenate into single state vector + state = torch.cat((eef_pos, eef_axisangle, gripper_qpos), dim=-1) + state = state.float() + + processed_obs["observation.state"] = state + + return processed_obs +``` + +### Why These Transformations? + +1. **Image Rotation**: The HuggingFaceVLA/libero dataset has images rotated 180° from the raw LIBERO simulator. The processor handles this convention mismatch so policies trained on the dataset work seamlessly. + +2. **State Flattening**: The raw LIBERO environment exposes nested dictionaries with all available state information (position, quaternion, velocity, matrix representation, etc.). The processor: + - Selects the relevant components (pos, quat, gripper) + - Converts quaternion to axis-angle (more suitable for learning) + - Flattens to a single 8D vector that policies expect + +3. **Flexibility**: The environment still exposes **all** raw data. If you want to try different state representations (e.g., including velocities, using matrix representation instead of axis-angle), you can create a new processor without modifying the environment code. + +## Adding Environment Processors for New Environments + +To add environment processors for a new environment: + +### 1. Create the Processor Step + +```python +# In src/lerobot/processor/env_processor.py + +@dataclass +@ProcessorStepRegistry.register(name="myenv_processor") +class MyEnvProcessorStep(ObservationProcessorStep): + """Process observations from MyEnv.""" + + def _process_observation(self, observation): + processed = observation.copy() + + # Your environment-specific transformations + if "myenv.specific.state" in processed: + state = processed.pop("myenv.specific.state") + # Transform to standard format + processed["observation.state"] = self._transform_state(state) + + return processed +``` + +### 2. Update the Factory + +```python +# In src/lerobot/envs/factory.py + +def make_env_pre_post_processors(env_cfg: EnvConfig): + if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type: + preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()]) + elif isinstance(env_cfg, MyEnvConfig) or "myenv" in env_cfg.type: + preprocessor = PolicyProcessorPipeline(steps=[MyEnvProcessorStep()]) + else: + preprocessor = PolicyProcessorPipeline(steps=[]) + + postprocessor = PolicyProcessorPipeline(steps=[]) + return preprocessor, postprocessor +``` + +### 3. Use in Evaluation + +No changes needed! The evaluation script automatically uses the appropriate processor: + +```bash +lerobot-eval \ + --policy.path=lerobot/my_policy \ + --env.type=myenv \ # Automatically uses MyEnvProcessorStep + --eval.n_episodes=10 +``` + +## Future: Environment Postprocessors + +Currently, postprocessors are identity (no-op) for all environments. Future use cases include: + +### Action Space Transformations + +```python +@dataclass +class MyEnvActionPostprocessor(ProcessorStep): + """Convert policy actions to environment-specific format.""" + + def __call__(self, transition: EnvTransition) -> EnvTransition: + action = transition["action"] + + # Example: Convert from Cartesian to joint space + if self.action_space == "joint": + action = self.ik_solver(action) + + # Example: Apply environment-specific safety limits + action = torch.clamp(action, self.min_action, self.max_action) + + transition["action"] = action + return transition +``` + +### Coordinate System Conversions + +```python +@dataclass +class CoordinateTransformPostprocessor(ProcessorStep): + """Transform actions between coordinate systems.""" + + def __call__(self, transition: EnvTransition) -> EnvTransition: + action = transition["action"] + + # Example: Policy outputs in world frame, env expects base frame + action = self.world_to_base_transform(action) + + transition["action"] = action + return transition +``` + +## Best Practices + +1. **Keep environment processors simple**: They should only handle environment-specific data format issues, not complex learning-related transformations. + +2. **Use policy processors for model requirements**: Normalization, batching, device placement, and tokenization belong in policy processors. + +3. **Expose all data from environments**: Let processors decide what to use rather than hardcoding choices in the environment. + +4. **Document conventions**: Clearly document any coordinate system conventions, camera orientations, or data formats that your processor handles. + +5. **Test independently**: Environment processors should be testable without loading full policies or environments. + +## Summary + +Environment processors provide a **clean separation** between environment-specific data transformations and policy-specific model requirements. This architecture: + +- ✅ Enables easy experimentation with different state representations +- ✅ Allows policies to work seamlessly across different environments +- ✅ Keeps environment code focused on simulation/hardware interface +- ✅ Makes processor pipelines more maintainable and debuggable +- ✅ Follows the single responsibility principle + +The key insight: **Environments define data formats, processors standardize them, policies consume standardized data.** Each layer has a clear, focused responsibility. diff --git a/src/lerobot/processor/env_processor.py b/src/lerobot/processor/env_processor.py index fd0b8c41f..b1872b032 100644 --- a/src/lerobot/processor/env_processor.py +++ b/src/lerobot/processor/env_processor.py @@ -13,14 +13,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import torch from dataclasses import dataclass +import torch + from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.utils.constants import OBS_IMAGES, OBS_STATE from .pipeline import ObservationProcessorStep, ProcessorStepRegistry + @dataclass @ProcessorStepRegistry.register(name="libero_processor") class LiberoProcessorStep(ObservationProcessorStep):