Compare commits

...

109 Commits

Author SHA1 Message Date
Pepijn 4e671ef080 fix 2025-09-01 15:41:24 +02:00
Pepijn cf9796b2f7 fix eval 2025-09-01 14:57:24 +02:00
Pepijn 88116b11e1 remove full pos embedding 2025-09-01 14:51:33 +02:00
Pepijn cf0c3f0a9a change config 2025-09-01 14:37:15 +02:00
Pepijn ee48a80e4d hls_gaus true 2025-09-01 14:19:07 +02:00
Pepijn cb0fb8ad15 hls_gaus true 2025-09-01 13:56:08 +02:00
Pepijn f79fdf7205 increase stride 2025-09-01 13:53:43 +02:00
Pepijn a305f5f46a hl-gauss 2025-09-01 13:34:55 +02:00
Pepijn 45348d7b69 remove debug log 2025-09-01 13:32:37 +02:00
Pepijn d4c1c123c6 hl-gauss 2025-09-01 13:24:28 +02:00
Pepijn da861139a3 hl-gauss 2025-09-01 13:11:53 +02:00
Pepijn 4f51f7153c hl-gauss 2025-09-01 13:09:00 +02:00
Pepijn 9027c7866f less prefetching 2025-09-01 12:12:36 +02:00
Pepijn c2bf226082 regulalizer 2025-09-01 12:07:37 +02:00
Pepijn f84c20d403 huberman loss 2025-09-01 11:59:20 +02:00
Pepijn 4c4462edea huberman loss 2025-09-01 11:56:58 +02:00
Pepijn 0b710932e2 huberman loss 2025-09-01 11:53:30 +02:00
Pepijn 9a19f8f6f4 use cls token 2025-09-01 11:31:28 +02:00
Pepijn 3504d17fef smaller siglip2 2025-09-01 11:18:35 +02:00
Pepijn d35ed3fd83 conversion dest 2025-09-01 11:01:27 +02:00
Pepijn ce5b27d255 siglip again 2025-09-01 10:55:12 +02:00
Pepijn 9dcb407ba7 siglip again 2025-09-01 10:27:58 +02:00
Pepijn 5eb5bf7164 clean 2025-09-01 10:14:43 +02:00
Pepijn 65fb5d3b1a fix 2025-09-01 00:12:30 +02:00
Pepijn d6a24e2882 fix 2025-08-31 21:47:11 +02:00
Pepijn d51bbe9492 fix 2025-08-31 21:38:46 +02:00
Pepijn d8c875e069 use patch tokens 2025-08-31 20:52:00 +02:00
Pepijn eff5b90542 add lower out of bound sampling 2025-08-31 20:38:45 +02:00
Pepijn a1a3fa435d fix dinov3 2025-08-31 20:21:58 +02:00
Pepijn 79c3466f0f fix dinov3 2025-08-31 19:44:27 +02:00
Pepijn e1d433cbfc fix dinov3 2025-08-31 19:41:16 +02:00
Pepijn 16e82fd29f fix stride unique samplin 2025-08-31 19:31:27 +02:00
Pepijn ae57fe2d33 debug frames 2025-08-31 19:20:18 +02:00
Pepijn e3306951c0 debug frames 2025-08-31 19:18:52 +02:00
Pepijn 10e36f2453 dinov3 base 2025-08-31 19:07:46 +02:00
Pepijn 9204a8bccd debug same frame 2025-08-31 19:06:30 +02:00
Pepijn 43eedf62e4 use dinov3 2025-08-31 18:49:06 +02:00
Pepijn c51d40ad56 add vision feature debug 2025-08-31 18:38:50 +02:00
Pepijn 5c1d930a34 add stride 2025-08-31 18:32:47 +02:00
Pepijn 8d20ca1625 extend head 2025-08-31 18:18:03 +02:00
Pepijn e4df9ccb63 fix progress 2025-08-31 18:11:18 +02:00
Pepijn 086815edb7 fix progress 2025-08-31 17:13:49 +02:00
Pepijn c9243c29b0 cleanup 2025-08-31 16:34:46 +02:00
Pepijn e7617076ca cleanup 2025-08-31 16:03:24 +02:00
Pepijn 221e5862ea cleanup 2025-08-31 15:52:15 +02:00
Pepijn 1e1b010257 cleanup 2025-08-31 15:40:00 +02:00
Pepijn def71cc439 change sampling 2025-08-31 15:20:20 +02:00
Pepijn 4557655ab1 simple eval 2025-08-31 14:11:47 +02:00
Pepijn 28298fbe78 simple eval 2025-08-31 14:08:48 +02:00
Pepijn f84affec23 simple eval 2025-08-31 14:00:19 +02:00
Pepijn dad0babbf5 simple eval 2025-08-31 13:54:03 +02:00
Pepijn fc5cd05fb0 simple eval 2025-08-31 13:48:40 +02:00
Pepijn d01b060d24 simple eval 2025-08-31 13:43:09 +02:00
Pepijn 7da15ba069 simple eval 2025-08-31 13:40:13 +02:00
Pepijn b0a5b88c21 simple eval 2025-08-31 13:28:04 +02:00
Pepijn 42fbcc89c5 ddebugging 2025-08-31 02:10:52 +02:00
Pepijn 9767120eb4 debug sampling 2025-08-31 01:48:35 +02:00
Pepijn 852713dc84 random sample for log 2025-08-31 01:33:58 +02:00
Pepijn 1f38712c95 fix pos enc 2025-08-31 01:22:54 +02:00
Pepijn 0ffc5b4741 add layernorm in head 2025-08-31 01:13:22 +02:00
Pepijn a1b1643ff6 change head init 2025-08-31 01:02:25 +02:00
Pepijn 7739fe12e4 sigmoid head 2025-08-31 00:53:23 +02:00
Pepijn be9bdc242f add pos relative 2025-08-31 00:43:26 +02:00
Pepijn 195cc79c49 add pos info for all frames 2025-08-31 00:29:08 +02:00
Pepijn f8d42cc038 fix 2025-08-30 23:58:58 +02:00
Pepijn 1797dea3d5 fix 2025-08-30 23:40:03 +02:00
Pepijn 825c0666a9 fix 2025-08-30 23:11:26 +02:00
Pepijn 47bc670ad2 less video prefetch 2025-08-30 21:21:27 +02:00
Pepijn aa505d4192 more video prefetch 2025-08-30 16:40:18 +02:00
Pepijn e380653c62 more video prefetch 2025-08-30 16:30:04 +02:00
Pepijn bf5c037959 remove decode logging 2025-08-30 16:28:29 +02:00
Pepijn 1234e71cfb add decode logging 2025-08-30 16:16:08 +02:00
Pepijn b1ff7132c1 add decode logging 2025-08-30 16:08:21 +02:00
Pepijn b357a8c4d8 add decode logging 2025-08-30 16:05:58 +02:00
Pepijn 0be53ef3e1 add decode logging 2025-08-30 16:00:55 +02:00
Pepijn aed90c8042 add decode logging 2025-08-30 15:52:24 +02:00
Pepijn 0b5da92a58 optimzize data loading 2025-08-30 15:40:36 +02:00
Pepijn 599218fe9a use rewind 2025-08-30 14:41:15 +02:00
Pepijn 2507341a32 stats every minute 2025-08-30 14:38:28 +02:00
Pepijn bde397e891 use siglip 2 2025-08-30 14:28:55 +02:00
Pepijn 76e260c401 fix 2025-08-30 13:07:51 +02:00
Pepijn 5179515d81 fix 2025-08-30 12:40:55 +02:00
Pepijn 8ad00d1ee7 fix 2025-08-30 12:33:39 +02:00
Pepijn 7440d772ff fix 2025-08-30 12:28:18 +02:00
Pepijn a4fc02a636 fix 2025-08-30 12:05:38 +02:00
Pepijn f5c39d6292 fix 2025-08-30 11:37:16 +02:00
Pepijn 3f616f0ebe add benchmark 2025-08-29 15:33:45 +02:00
Pepijn 9698e74e88 small impr 2025-08-29 09:05:53 +02:00
Pepijn 04d55e4670 small impr 2025-08-28 22:45:23 +02:00
Pepijn 7dce022a05 exactly as rewind code 2025-08-28 21:18:41 +02:00
Pepijn cc05067a76 dino v2 2025-08-28 19:23:17 +02:00
Pepijn bead25a58a smaller model 2025-08-28 17:43:03 +02:00
Pepijn c877e98658 use only rewind loss 2025-08-28 14:22:57 +02:00
Pepijn a4c88d6340 nit 2025-08-28 08:52:48 +02:00
Pepijn 34ca077d78 pad seq 2025-08-27 17:16:31 +02:00
Pepijn 2a901f8134 add multipe timesteps 2025-08-27 16:34:22 +02:00
Pepijn 450be9d7d1 add multipe timesteps 2025-08-27 16:33:53 +02:00
Pepijn 681be962ae initial commit 2025-08-27 14:58:34 +02:00
Adil Zouitine b16e18f978 Fix typo in documentation for adapters in robots/teleop section 2025-08-08 16:36:09 +02:00
Pepijn 652e3cb859 Add phone docs and use pipeline for robots/teleop docs 2025-08-08 16:05:34 +02:00
Michel Aractingi 2a5c757d58 Improved doc implement_your_own_pipeline
- Use normalization processor as default example
- Add section on transform features
- Add section on overrides.
2025-08-08 00:58:59 +02:00
pre-commit-ci[bot] 6d4e983197 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-07 18:13:34 +02:00
Adil Zouitine ecda7482c7 feat(docs): Enhance introduction to processors with additional converter functions
- Updated the introduction to processors documentation to include default batch-to-transition and transition-to-batch converters.
- Added detailed descriptions and examples for new specialized converter functions: `to_transition_teleop_action`, `to_transition_robot_observation`, `to_output_robot_action`, and `to_dataset_frame`.
- Improved clarity on how these converters facilitate integration with existing robotics applications.
2025-08-07 18:13:34 +02:00
pre-commit-ci[bot] 7124d471c1 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-07 18:13:34 +02:00
Adil Zouitine a14af62ee3 Add comprehensive documentation for processors in robotics
- Introduced a detailed guide on processors, covering their role in transforming raw robot data into model-ready inputs and vice versa.
- Explained core concepts such as EnvTransition, ProcessorStep, and RobotProcessor, along with their functionalities.
- Included examples of common processor steps like normalization, device management, batch processing, and text tokenization.
- Provided insights on building complete pipelines, integrating processors into training loops, and saving/loading configurations.
- Emphasized best practices and advanced features for effective usage of processors in robotics applications.
2025-08-07 18:13:34 +02:00
Michel Aractingi ac80f1f081 improved part 2 of processor guide 2025-08-07 18:13:34 +02:00
Michel Aractingi feb3fed5e8 precommit style nit 2025-08-07 18:13:34 +02:00
Michel Aractingi 8d5f519fcb Added script for the second part of the processor doc 2025-08-07 18:13:34 +02:00
Adil Zouitine b9d3c34ae4 chore(docs): initialize doc 2025-08-07 18:13:34 +02:00
22 changed files with 4628 additions and 30 deletions
+15 -2
View File
@@ -24,9 +24,16 @@
- local: smolvla
title: Finetune SmolVLA
title: "Policies"
- sections:
- local: introduction_processors
title: Introduction to Robot Processors
- local: implement_your_own_processor
title: Implement your own processor
- local: processors_robots_teleop
title: Processors for Robots and Teleoperators
title: "Robot Processors"
- sections:
- local: hope_jr
title: Hope Jr
- local: so101
title: SO-101
- local: so100
@@ -35,7 +42,13 @@
title: Koch v1.1
- local: lekiwi
title: LeKiwi
- local: hope_jr
title: Hope Jr
title: "Robots"
- sections:
- local: phone_teleop
title: Phone
title: "Teleoperators"
- sections:
- local: notebooks
title: Notebooks
@@ -0,0 +1,323 @@
# Implement your own Robot Processor
In this tutorial, you'll learn how to implement your own Robot Processor.
It begins by exploring the need for a custom processor, then uses the Normalization processors as the running example to explain how to implement, configure, and serialize a processor. Finally, it lists all helper processors that ship with LeRobot.
## Why would you need a custom processor?
In most cases, when reading raw data from a sensor like the camera and robot motor encoders,
you will need to process this data to transform it into a format that is compatible to use with the policies in LeRobot.
For example, raw images are encoded with `uint8` and the values are in the range `[0, 255]`.
To use these images with the policies, you will need to cast them to `float32` and normalize them to the range `[0, 1]`.
For example, in LeRobot's `VanillaObservationProcessor`, raw images come from the environment as numpy arrays with `uint8` values in range `[0, 255]` and in channel-last format `(H, W, C)`. The processor transforms them into PyTorch tensors with `float32` values in range `[0, 1]` and channel-first format `(C, H, W)`:
```python
# Input: numpy array with shape (480, 640, 3) and dtype uint8
raw_image = env_observation["pixels"] # Values in [0, 255]
# After processing: torch tensor with shape (1, 3, 480, 640) and dtype float32
processed_image = processor(transition)["observation"]["observation.image"] # Values in [0, 1]
```
On the other hand, when a model returns a certain action to be executed on the robot, it is often that one has to post-process this action to make it compatible to run on the robot.
For example, the model might return joint positions values that range from `[-1, 1]` and one would need to scale them to the ranges of the minimum and maximum joint angle positions of the robot.
In LeRobot, this normalization workflow is handled by the `NormalizerProcessor` (for inputs) and the `UnnormalizerProcessor` (for outputs). These processors are heavily used by policies (e.g., Pi0, SmolVLA) and integrate tightly with the `RobotProcessor`'s `get_config`, `state_dict`, and `load_state_dict` APIs.
For instance, `UnnormalizerProcessor` converts model outputs in `[-1, 1]` back to actual robot joint ranges:
```python
# Input: model action with normalized values in [-1, 1]
normalized_action = torch.tensor([-0.5, 0.8, -1.0, 0.2]) # Model output
# After post-processing: real joint positions in robot's native ranges
# Example: joints range from [-180.0, 180.0]
real_action = unnormalizer(transition)["action"]
# real action after post-processing: [ -90., 144., -180., 36.]
```
The unnormalizer uses the dataset statistics to convert back:
```python
# For MIN_MAX normalization: action = (normalized + 1) * (max - min) / 2 + min
real_action = (normalized_action + 1) * (max_val - min_val) / 2 + min_val
```
All these situations point us towards the need for a mechanism to preprocess the data before being passed to the policies and then post-process the action that are returned to be executed on the robot.
To that end, LeRobot provides a pipeline mechanism to implement a sequence of processing steps for the input data and the output action.
## How to implement your own processor?
We'll use the `NormalizerProcessor` as a concrete running example because it is central to most policies and demonstrates configuration and state serialization cleanly.
Prepare the sequence of processing steps necessary for your problem. A processor step is a class that implements the following methods:
- `__call__`: implements the processing step for the input transition.
- `get_config`: gets the configuration of the processor step.
- `state_dict`: gets the state of the processor step.
- `load_state_dict`: loads the state of the processor step.
- `reset`: resets the state of the processor step.
- `feature_contract`: displays the modification to the feature space during the processor step.
### Implement the `__call__` method
The `__call__` method is the core of your processor step. It takes an `EnvTransition` and returns a modified `EnvTransition`. Here's how the `NormalizerProcessor` conceptually works (simplified):
```python
from dataclasses import dataclass
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.processor.pipeline import EnvTransition, TransitionKey
@dataclass
class NormalizerProcessor:
features: dict[str, PolicyFeature]
norm_map: dict[FeatureType, NormalizationMode]
stats: dict[str, dict[str, torch.Tensor]]
eps: float = 1e-8
def __call__(self, transition: EnvTransition) -> EnvTransition:
normalized_info = {}
obs = transition.get(TransitionKey.OBSERVATION)
act = transition.get(TransitionKey.ACTION)
new_obs = self._normalize_observation(obs, normalized_info)
new_act = self._normalize_action(act, normalized_info)
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = new_obs
new_transition[TransitionKey.ACTION] = new_act
# Record what was normalized into complementary_data
if normalized_info:
comp = new_transition.get(TransitionKey.COMPLEMENTARY_DATA) or {}
comp = dict(comp)
comp["normalized_keys"] = normalized_info
new_transition[TransitionKey.COMPLEMENTARY_DATA] = comp
return new_transition
```
See the full implementation in `src/lerobot/processor/normalize_processor.py` for details on mean/std and min/max modes and key selection.
**Key principles:**
- Always check if required data exists before processing
- Return unchanged transition if no processing is needed
- Use `transition.copy()` to avoid side effects
- Only modify the specific keys your processor handles
**Tip**: For observation-only processors, you can inherit from `ObservationProcessor` to avoid writing `__call__` boilerplate. The normalizer is mixed (observations and actions), so it implements `__call__` directly.
### Configuration and State Management
Processors support serialization through three methods that separate configuration from tensor state. This is especially important for normalization processors, which carry dataset statistics (tensors) in their state, and hyperparameters in their config:
```python
from dataclasses import dataclass, field
from typing import Any
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
@dataclass
class NormalizerProcessor:
features: dict[str, PolicyFeature]
norm_map: dict[FeatureType, NormalizationMode]
eps: float = 1e-8
_tensor_stats: dict[str, dict[str, torch.Tensor]] = field(default_factory=dict, init=False, repr=False)
def get_config(self) -> dict[str, Any]:
"""JSON-serializable configuration (no tensors)."""
return {
"eps": self.eps,
"features": {k: {"type": v.type.value, "shape": v.shape} for k, v in self.features.items()},
"norm_map": {ft.value: nm.value for ft, nm in self.norm_map.items()},
}
def state_dict(self) -> dict[str, torch.Tensor]:
"""Tensor state only (e.g., dataset statistics)."""
flat: dict[str, torch.Tensor] = {}
for key, sub in self._tensor_stats.items():
for stat_name, tensor in sub.items():
flat[f"{key}.{stat_name}"] = tensor
return flat
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
"""Restore tensor state at runtime."""
self._tensor_stats.clear()
for flat_key, tensor in state.items():
key, stat_name = flat_key.rsplit(".", 1)
self._tensor_stats.setdefault(key, {})[stat_name] = tensor
```
**Usage:**
```python
# Save (e.g., inside a policy)
config = processor.get_config()
tensors = processor.state_dict()
# Restore (e.g., loading a pretrained policy)
new_processor = NormalizerProcessor(**config)
new_processor.load_state_dict(tensors)
```
### Transform features
The `transform_features` method defines how your processor transforms feature names and shapes. This is crucial for policy configuration and debugging.
Normalization typically preserves the feature keys and shapes, so `NormalizerProcessor.transform_features` returns the input features unchanged. When your processor renames or reshapes, implement this method to reflect the mapping for downstream components. For example, a simple rename processor:
```python
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
# Simple renaming
if "pixels" in features:
features["observation.image"] = features.pop("pixels")
# Pattern-based renaming
for key in list(features.keys()):
if key.startswith("env_state."):
suffix = key[len("env_state."):]
features[f"observation.{suffix}"] = features.pop(key)
return features
```
**Key principles:**
- Use `features.pop(old_key)` to remove and get the old feature
- Use `features[new_key] = old_feature` to add the renamed feature
- Always return the modified features dictionary
- Document transformations clearly in the docstring
### Example of usage from the codebase
`transform_features` is used by `RobotProcessor` to derive the dataset/policy feature contract from an initial feature set by applying each step's transformation. You can see concrete examples in the codebase:
- Phone teleoperation record pipeline (`examples/phone_so100_record.py`): processors like `ForwardKinematicsJointsToEE`, `GripperVelocityToJoint`, and `EEBoundsAndSafety` implement `transform_features` to declare which action/observation keys should be materialized in the dataset.
- SO100 follower kinematics (`src/lerobot/robots/so100_follower/robot_kinematic_processor.py`): each processor's `transform_features` method adds or refines feature keys such as `observation.state.ee.{x,y,z,wx,wy,wz}` or `action.gripper.pos`.
- Rename and tokenizer processors (`src/lerobot/processor/rename_processor.py`, `src/lerobot/processor/tokenizer_processor.py`): demonstrate key renaming and adding language token features to the contract.
In practice, you will often aggregate features by running `RobotProcessor.transform_features(...)` with your initial features to compute the final contract before recording or training.
## Helper Classes
LeRobot provides pre-built processor classes for common transformations. Below is a comprehensive list of registered processors in the codebase.
### Core processors (observations, actions, normalization)
- **`VanillaObservationProcessor`** (`observation_processor`): Images and state processing to LeRobot format.
- **`NormalizerProcessor`** (`normalizer_processor`): Normalize observations/actions (mean/std or min/max to [-1, 1]).
- **`UnnormalizerProcessor`** (`unnormalizer_processor`): Inverse of the normalizer for model outputs.
- **`DeviceProcessor`** (`device_processor`): Move tensors to a specific device (CPU/GPU) and optional float dtype.
- **`ToBatchProcessor`** (`to_batch_processor`): Add batch dimension to observations/actions when missing.
- **`RenameProcessor`** (`rename_processor`): Rename observation keys using a mapping dictionary.
- **`TokenizerProcessor`** (`tokenizer_processor`): Tokenize language tasks into `observation.language.*` tensors.
### Teleoperation mapping processors
- **`MapDeltaActionToRobotAction`** (`map_delta_action_to_robot_action`): Map teleop deltas (e.g., gamepad) to `action.target_*` fields.
- **`MapPhoneActionToRobotAction`** (`map_phone_action_to_robot_action`): Map calibrated phone pose/buttons to `action.target_*` and gripper.
### Robot kinematics processors (SO100 follower example)
- **`EEReferenceAndDelta`** (`ee_reference_and_delta`): Compute desired EE pose from target deltas and current pose.
- **`EEBoundsAndSafety`** (`ee_bounds_and_safety`): Clip EE pose to bounds and check for jumps.
- **`InverseKinematicsEEToJoints`** (`inverse_kinematics_ee_to_joints`): Convert EE pose to joint targets via IK.
- **`GripperVelocityToJoint`** (`gripper_velocity_to_joint`): Convert gripper velocity input to joint position command.
- **`ForwardKinematicsJointsToEE`** (`forward_kinematics_joints_to_ee`): Compute EE pose features from joint positions via FK.
- **`AddRobotObservationAsComplimentaryData`** (`add_robot_observation`): Read robot observation and insert `raw_joint_positions` into complementary data.
### Policy-specific utility processors
- **`Pi0NewLineProcessor`** (`pi0_new_line_processor`): Ensure text tasks end with a newline (Pi0 tokenizer compatibility).
- **`SmolVLANewLineProcessor`** (`smolvla_new_line_processor`): Ensure text tasks end with a newline (SmolVLA tokenizer compatibility).
### Usage Example
```python
from lerobot.processor import NormalizerProcessor, DeviceProcessor, RobotProcessor, ToBatchProcessor
# Create a processing pipeline (typical policy preprocessor)
steps = [
NormalizerProcessor(features=features, norm_map=norm_map, stats=stats),
ToBatchProcessor(),
DeviceProcessor(device="cuda"),
]
# Use in RobotProcessor
processor = RobotProcessor(steps=steps)
processed_transition = processor(raw_transition)
```
### Using overrides
You can override step parameters at load-time using `overrides`. This is handy for non-serializable objects or site-specific settings. It works both in policy factories and with `RobotProcessor.from_pretrained(...)`.
Example: during policy evaluation on the robot, override the device and rename map.
Use this to run a policy trained on CUDA on a CPU-only robot, or to remap camera keys when the robot uses different names than the dataset.
```437:445:src/lerobot/record.py
preprocessor, postprocessor = make_processor(
policy_cfg=cfg.policy,
pretrained_path=cfg.policy.pretrained_path,
dataset_stats=rename_stats(dataset.meta.stats, cfg.dataset.rename_map),
preprocessor_overrides={
"device_processor": {"device": cfg.policy.device},
"rename_processor": {"rename_map": cfg.dataset.rename_map},
},
)
```
Direct usage with `from_pretrained`:
```python
from lerobot.processor import RobotProcessor
processor = RobotProcessor.from_pretrained(
"username/my-processor",
overrides={
"device_processor": {"device": "cuda:0"}, # registry name for registered steps
"CustomStep": {"param": 42}, # class name for non-registered steps
},
)
```
## Best Practices
- **Keep processors atomic** - One transformation per processor for reusability and debugging
- **Use dataclasses** - Clean initialization with `@dataclass`
- **Always register processors** - Use `@ProcessorStepRegistry.register("name")` for discoverability
- **Check for None** - Always validate required data exists before processing
- **Use copy() for safety** - Avoid side effects with `transition.copy()`
- **Separate config and state** - JSON-serializable config vs tensor state_dict
- **Use base classes** - Inherit from `ObservationProcessor` for observation-only processing
```python
@ProcessorStepRegistry.register("my_processor")
@dataclass
class MyProcessor(ObservationProcessor):
threshold: float = 0.5
def observation(self, observation):
if observation is None:
return observation
# Your processing logic here
return processed_observation
```
## Conclusion
You now have all the tools to implement custom processors in LeRobot! The key steps are:
1. **Define your processor** as a dataclass with the required methods (`__call__`, `get_config`, `state_dict`, `load_state_dict`, `reset`, `feature_contract`)
2. **Register it** using `@ProcessorStepRegistry.register("name")` for discoverability
3. **Integrate it** into a `RobotProcessor` pipeline with other processing steps
4. **Use base classes** like `ObservationProcessor` when possible to reduce boilerplate
The processor system is designed to be modular and composable, allowing you to build complex data processing pipelines from simple, focused components. Whether you're preprocessing sensor data for training or post-processing model outputs for robot execution, custom processors give you the flexibility to handle any data transformation your robotics application requires. Policies like Pi0 and SmolVLA use the same normalization processors described above, so your understanding here will transfer directly when wiring policy preprocessors and postprocessors.
Start simple, test thoroughly, and leverage the existing helper classes to build robust data processing pipelines for your robot learning workflows.
+991
View File
@@ -0,0 +1,991 @@
# Introduction to Processors
In robotics, there's a fundamental mismatch between the data that robots and humans produce and what machine learning models expect. This creates several translation challenges:
**Raw Robot Data → Model Input:**
- Robots output raw sensor data (camera images, joint positions, force readings) that need normalization, batching, and device placement before models can process them
- Language instructions from humans ("pick up the red cube") must be tokenized into numerical representations
- Different robots use different coordinate systems and units that need standardization
**Model Output → Robot Commands:**
- Models might output end-effector positions, but robots need joint-space commands
- Teleoperators (like gamepads) produce relative movements (delta positions), but robots expect absolute commands
- Model predictions are often normalized and need to be converted back to real-world scales
**Cross-Domain Translation:**
- Training data from one robot setup needs adaptation for deployment on different hardware
- Models trained with specific camera configurations must work with new camera arrangements
- Datasets with different naming conventions need harmonization
**That's where processors come in.** They serve as the universal translators that bridge these gaps, ensuring seamless data flow from sensors to models to actuators.
Processors are the data transformation backbone of LeRobot. They handle all the preprocessing and postprocessing steps needed to convert raw environment data into model-ready inputs and vice versa. This guide will walk you through everything you need to know about processors - from basic concepts to advanced usage patterns.
## What are Processors?
In robotics, data comes in many forms - images from cameras, joint positions from sensors, text instructions from users, and more. Each type of data requires specific transformations before a model can use it effectively. Models need this data to be:
- **Normalized**: Scaled to appropriate ranges for neural network processing
- **Batched**: Organized with proper dimensions for batch processing
- **Tokenized**: Text converted to numerical representations
- **Device-placed**: Moved to the right hardware (CPU/GPU)
- **Type-converted**: Cast to appropriate data types
Processors handle these transformations through composable, reusable steps that can be chained together into pipelines. Think of them as a modular assembly line where each station performs a specific transformation on your data.
## Core Concepts
### EnvTransition: The Universal Data Container
The `EnvTransition` is the fundamental data structure that flows through all processors. It's a typed dictionary that represents a complete robot-environment interaction:
```python
from lerobot.processor.pipeline import TransitionKey, EnvTransition
# Example transition from a robot collecting data
transition: EnvTransition = {
TransitionKey.OBSERVATION: {
"observation.images.camera0": camera0_image_tensor, # Shape: (H, W, C)
"observation.images.camera1": camera1_image_tensor, # Shape: (H, W, C)
"observation.state": joint_positions_tensor, # Shape: (7,) for 7-DOF arm
"observation.environment_state": env_state_tensor # Shape: (3,) for object position
},
TransitionKey.ACTION: action_tensor, # Shape: (7,) for joint velocities
TransitionKey.REWARD: 0.0, # Scalar reward signal
TransitionKey.DONE: False, # Episode termination flag
TransitionKey.TRUNCATED: False, # Episode truncation flag
TransitionKey.INFO: {"success": False}, # Additional metadata
TransitionKey.COMPLEMENTARY_DATA: {
"task": "pick up the red cube", # Language instruction
"task_index": 0, # Task identifier
"index": 42 # Frame index
}
}
```
Each key in the transition has a specific purpose:
- **OBSERVATION**: All sensor data (images, states, proprioception)
- **ACTION**: The action to execute or that was executed
- **REWARD**: Reinforcement learning signal
- **DONE/TRUNCATED**: Episode boundary indicators
- **INFO**: Arbitrary metadata
- **COMPLEMENTARY_DATA**: Task descriptions, indices, padding flags, inter-step data (e.g., you need to compute the velocities and then use this velocity to clip the action)
### ProcessorStep: The Building Block Interface
A `ProcessorStep` is a single transformation unit that processes transitions. It's a protocol (interface) that any processor step must implement:
```python
from lerobot.processor.pipeline import ProcessorStep, EnvTransition
from lerobot.configs.types import PolicyFeature
from typing import Any
import torch
class MyProcessorStep:
"""Example processor step interface - all methods must be implemented."""
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Transform the transition - this is the main processing logic."""
raise NotImplementedError
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
"""Declare how this step transforms feature shapes/types."""
raise NotImplementedError
def get_config(self) -> dict[str, Any]:
"""Return JSON-serializable configuration for saving/loading."""
raise NotImplementedError
def state_dict(self) -> dict[str, torch.Tensor]:
"""Return any learnable parameters (tensors only)."""
raise NotImplementedError
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
"""Load learnable parameters from saved state."""
raise NotImplementedError
def reset(self) -> None:
"""Reset any internal state between episodes."""
raise NotImplementedError
```
### RobotProcessor: The Pipeline Orchestrator
The `RobotProcessor` chains multiple `ProcessorStep` instances together, executing them sequentially. It provides automatic format conversion to handle both batch dictionaries (from datasets) and EnvTransition dictionaries:
```python
from lerobot.processor.pipeline import RobotProcessor, _default_batch_to_transition, _default_transition_to_batch
# Create a processing pipeline
processor = RobotProcessor(
steps=[
step1, # First transformation
step2, # Second transformation
step3 # Third transformation
],
name="my_preprocessing_pipeline",
# Optional: Custom converters for input/output formats
to_transition=_default_batch_to_transition, # How to convert batch dict → EnvTransition
to_output=_default_transition_to_batch # How to convert EnvTransition → output format
)
# The processor automatically handles different input formats:
# 1. If input is a batch dict (from dataset), converts to EnvTransition
# 2. Passes through each step sequentially
# 3. Converts back to original format (or custom output format)
# Example with batch dict input (common in training)
batch_dict = {"observation.state": tensor, "action": tensor}
output = processor(batch_dict) # Automatically converted to/from EnvTransition
# Example with EnvTransition input (common in inference)
transition = {TransitionKey.OBSERVATION: {...}, TransitionKey.ACTION: ...}
output = processor(transition) # Stays as EnvTransition throughout
```
The `to_transition` and `to_output` converters enable seamless integration with existing codebases.
By default, they handle the standard LeRobot batch format, but you can customize them for different data structures.
### Additional Converter Functions
LeRobot provides several specialized converter functions for common robotics scenarios:
```python
from lerobot.processor.converters import (
to_transition_teleop_action,
to_transition_robot_observation,
to_output_robot_action,
to_dataset_frame
)
```
**`to_transition_teleop_action`** - Converts teleoperation device actions to EnvTransitions:
```python
# Use case: Phone, gamepad, or other teleop device control
phone_action = {"x": 0.1, "y": -0.2, "gripper": 0.8}
transition = to_transition_teleop_action(phone_action)
# Creates: {ACTION: {"action.x": 0.1, "action.y": -0.2, "action.gripper": 0.8}, ...}
```
**`to_transition_robot_observation`** - Converts robot sensor data to EnvTransitions:
```python
# Use case: Live robot observation during inference
robot_obs = {
"joint_1": 0.5, "joint_2": -0.3, # joint positions
"camera_0": image_array # camera images
}
transition = to_transition_robot_observation(robot_obs)
# Creates: {OBSERVATION: {"observation.state.joint_1": 0.5, "observation.images.camera_0": image, ...}}
```
**`to_output_robot_action`** - Extracts robot-executable actions from EnvTransitions:
```python
# Use case: Converting model outputs back to robot commands
model_transition = {ACTION: {"action.joint_1": 0.2, "action.joint_2": 0.1}}
robot_action = to_output_robot_action(model_transition)
# Returns: {"joint_1": 0.2, "joint_2": 0.1} - ready for robot.send_action()
```
**`to_dataset_frame`** - Converts transitions to dataset-compatible format:
```python
# Use case: Saving processed data or creating training batches
features = {
"action": {"names": ["joint_1", "joint_2"]},
"observation.state": {"names": ["joint_1", "joint_2"]},
"observation.images.camera0": {...}
}
batch = to_dataset_frame(transition, features)
# Returns: {"action": [0.2, 0.1], "observation.state": [0.5, -0.3], ...}
```
These converters are particularly useful when integrating with real robots, as shown in the examples:
```python
# Example from phone_so100_teleop.py - Real robot teleoperation
phone_to_robot_ee_pose = RobotProcessor(
steps=[...],
to_transition=to_transition_teleop_action, # Phone → EnvTransition
to_output=lambda tr: tr # Keep as EnvTransition
)
# Example from phone_so100_eval.py - Robot action execution
robot_ee_to_joints = RobotProcessor(
steps=[...],
to_transition=lambda tr: tr, # Already EnvTransition
to_output=to_output_robot_action # EnvTransition → Robot action
)
# Example from phone_so100_record.py - Dataset recording
robot_joints_to_ee_pose = RobotProcessor(
steps=[...],
to_transition=to_transition_robot_observation, # Robot obs → EnvTransition
to_output=lambda tr: tr # Keep as EnvTransition for dataset
)
```
### Data Format Conversion
Different data sources have different formats, but processors need a unified `EnvTransition` structure internally.
The default converters handle LeRobot datasets, but you can customize them:
```python
# Default: LeRobot batch format
lerobot_batch = {
"observation.state": torch.tensor(...),
"action": torch.tensor(...),
"next.reward": torch.tensor(...),
"task": ["pick cube", ...]
}
# → Converts to EnvTransition → Processes → Converts back
# Custom: Live robot data
robot_data = {
"cameras": {"wrist_cam": np.array(...)},
"joint_positions": np.array(...),
"gripper_state": 0.5
}
def robot_to_transition(data: dict) -> EnvTransition:
return {
TransitionKey.OBSERVATION: {
"observation.images.wrist": torch.from_numpy(data["cameras"]["wrist_cam"]),
"observation.state": torch.from_numpy(data["joint_positions"])
},
TransitionKey.ACTION: None,
# ... other fields with defaults
}
# Use custom converter
processor = RobotProcessor(
steps=[...],
to_transition=robot_to_transition,
to_output=lambda transition: transition # Keep as EnvTransition
)
```
**When to customize:** Live robot data, Gymnasium environments, legacy datasets, or any non-LeRobot format.
## Common Processor Steps
LeRobot provides a rich set of pre-built processor steps for common transformations.
Let's explore each in detail:
### Data Normalization
Normalization is crucial for neural network training and inference.
The `NormalizerProcessor` handles both mean-std normalization and min-max scaling:
```python
from lerobot.processor.normalize_processor import NormalizerProcessor, UnnormalizerProcessor
from lerobot.configs.types import PolicyFeature, FeatureType, NormalizationMode
# Define what features exist in your data
features = {
"observation.images.camera0": PolicyFeature(
type=FeatureType.IMAGE,
shape=(224, 224, 3)
),
"observation.state": PolicyFeature(
type=FeatureType.STATE,
shape=(7,)
),
"action": PolicyFeature(
type=FeatureType.ACTION,
shape=(7,)
)
}
# Define normalization strategy per feature type
norm_map = {
FeatureType.IMAGE: NormalizationMode.MEAN_STD, # Images: (x - mean) / std
FeatureType.STATE: NormalizationMode.MIN_MAX, # States: scale to [-1, 1]
FeatureType.ACTION: NormalizationMode.MIN_MAX # Actions: scale to [-1, 1]
}
# Create normalizer with dataset statistics
normalizer = NormalizerProcessor(
features=features,
norm_map=norm_map,
stats=dataset.meta.stats, # Contains mean, std, min, max per feature
normalize_keys={"observation.state", "action"} # Optional: only normalize specific keys
)
# For postprocessing: inverse transformation
unnormalizer = UnnormalizerProcessor(
features=features,
norm_map=norm_map,
stats=dataset.meta.stats
)
# The normalizer automatically:
# - Detects which normalization to apply based on feature type
# - Handles device placement of statistics tensors
# - Skips keys not in stats or not in normalize_keys
# - Adds metadata about what was normalized
```
### Device Management
The `DeviceProcessor` ensures tensors are on the right device with the right dtype:
```python
from lerobot.processor.device_processor import DeviceProcessor
# Basic GPU placement
gpu_processor = DeviceProcessor(device="cuda:0")
# Advanced: GPU with half-precision for inference
efficient_processor = DeviceProcessor(
device="cuda:0",
float_dtype="float16" # Convert float32 -> float16 for memory efficiency
)
# The processor:
# - Moves all tensors to specified device
# - Preserves non-tensor data unchanged
# - Optionally converts float dtypes while preserving int/bool types
# - Uses non_blocking transfers for CUDA devices
# - Handles nested structures (observations, complementary_data)
# Supported float dtypes:
# "float16" / "half": 16-bit floating point
# "float32" / "float": 32-bit floating point (default)
# "float64" / "double": 64-bit floating point
# "bfloat16": Brain floating point (better for training)
```
### Batch Processing
Models expect batched inputs, but robot interactions often produce unbatched data:
```python
from lerobot.processor.batch_processor import ToBatchProcessor
batch_processor = ToBatchProcessor()
# Automatically adds batch dimensions where needed:
# State: (7,) -> (1, 7)
# Image: (224, 224, 3) -> (1, 224, 224, 3)
# Action: (4,) -> (1, 4)
# Task: "pick_cube" -> ["pick_cube"]
# Already batched: (1, 7) -> (1, 7) [unchanged]
# The processor intelligently:
# - Detects tensor dimensionality
# - Adds batch dim to 1D states/actions
# - Adds batch dim to 3D images
# - Wraps string tasks in lists
# - Preserves already-batched data
# Example usage in inference:
single_observation = robot.get_observation() # Unbatched
batched_input = batch_processor({"observation": single_observation})
model_output = model(batched_input) # Model expects batch dim
```
### Text Tokenization
For language-conditioned policies, text instructions must be tokenized:
```python
from lerobot.processor.tokenizer_processor import TokenizerProcessor
from transformers import AutoTokenizer
# Option 1: Auto-load tokenizer by name
tokenizer_proc = TokenizerProcessor(
tokenizer_name="google/paligemma-3b-pt-224",
max_length=128,
task_key="task", # Where to find text in complementary_data
padding="max_length", # Pad to max_length
padding_side="right",
truncation=True # Truncate if longer than max_length
)
# Option 2: Provide custom tokenizer
custom_tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")
custom_proc = TokenizerProcessor(
tokenizer=custom_tokenizer,
max_length=256,
padding_side="left" # For autoregressive models
)
# The processor:
# - Extracts task text from complementary_data
# - Tokenizes using HuggingFace tokenizer
# - Adds tokens and attention_mask to observations
# - Handles both single strings and lists of strings
# - Preserves original task in complementary_data
# Output structure:
# observation["observation.language.tokens"] = tensor([101, 2032, ...])
# observation["observation.language.attention_mask"] = tensor([1, 1, 0, ...])
```
### Key Renaming
Different datasets and models may use different naming conventions.
The `RenameProcessor` solves this mismatch:
**Why is this useful?**
- When loading a model trained on a different dataset with different key names
- When using foundation models that expect specific key naming conventions
- When standardizing datasets from different sources
- When adapting legacy code to new naming standards
```python
from lerobot.processor.rename_processor import RenameProcessor
# Example 1: Dataset uses "top"/"wrist", model expects "camera0"/"camera1"
rename_proc = RenameProcessor(
rename_map={
"observation.images.top": "observation.images.camera0",
"observation.images.wrist": "observation.images.camera1",
}
)
# Example 2: Foundation model compatibility
# Your dataset: "observation.state", Foundation model: "proprio"
foundation_rename = RenameProcessor(
rename_map={
"observation.state": "proprio",
"observation.images.main": "rgb",
}
)
# Example 3: Standardizing multiple datasets
standardize_rename = RenameProcessor(
rename_map={
# Different robots might use different names
"observation.joint_positions": "observation.state",
"observation.gripper_state": "observation.end_effector",
"observation.arm_camera": "observation.images.wrist",
}
)
```
## Building Complete Pipelines
Let's build a real-world preprocessing and postprocessing pipeline for a vision-based
manipulation policy:
```python
# Consolidated imports
from lerobot.processor import (
RobotProcessor,
NormalizerProcessor,
UnnormalizerProcessor,
DeviceProcessor,
ToBatchProcessor,
TokenizerProcessor,
RenameProcessor
)
# Step 1: Define the preprocessing pipeline
preprocessor = RobotProcessor(
steps=[
# 1. Standardize naming from dataset
RenameProcessor(
rename_map={
"observation.images.top": "observation.images.camera0",
"observation.images.wrist": "observation.images.camera1"
}
),
# 2. Add batch dimensions for model
ToBatchProcessor(),
# 3. Tokenize language instructions if present
TokenizerProcessor(
tokenizer_name="google/paligemma-3b-pt-224",
max_length=64,
task_key="task"
),
# 4. Normalize numerical data
NormalizerProcessor(
features=policy_features,
norm_map={
FeatureType.IMAGE: NormalizationMode.MEAN_STD,
FeatureType.STATE: NormalizationMode.MIN_MAX,
FeatureType.ACTION: NormalizationMode.MIN_MAX
},
stats=dataset.meta.stats
),
# 5. Move to GPU and convert to half precision
DeviceProcessor(
device="cuda:0",
float_dtype="float16"
)
],
name="robot_preprocessor"
)
# Step 2: Define the postprocessing pipeline
postprocessor = RobotProcessor(
steps=[
# 1. Move back to CPU for robot hardware
DeviceProcessor(device="cpu"),
# 2. Denormalize actions to original scale
UnnormalizerProcessor(
features=policy_features,
norm_map={
FeatureType.ACTION: NormalizationMode.MIN_MAX
},
stats=dataset.meta.stats
)
],
name="robot_postprocessor"
)
```
## Using Processors in Practice
### Training Loop Integration
Here's how processors integrate into a training loop using the policy's forward method:
```python
from torch.utils.data import DataLoader
# Create dataset and dataloader
dataset = LeRobotDataset(repo_id="your_dataset")
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# Initialize model and processors
model = YourPolicy.from_pretrained("your_model")
preprocessor = RobotProcessor.from_pretrained(
"your_model",
config_filename="robot_preprocessor.json"
)
# Training loop
for epoch in range(num_epochs):
for batch in dataloader:
# Preprocess batch
processed_batch = preprocessor(batch)
# Forward pass - returns loss and optional metrics
loss, metrics = model.forward(processed_batch)
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Log metrics if available
if metrics:
wandb.log(metrics)
```
### Inference Pipeline
For deployment, processors ensure consistent data handling with real robots:
```python
# Load model and processors
policy = YourPolicy.from_pretrained("path/to/model")
preprocessor = RobotProcessor.from_pretrained(
"path/to/model",
config_filename="robot_preprocessor.json"
)
postprocessor = RobotProcessor.from_pretrained(
"path/to/model",
config_filename="robot_postprocessor.json"
)
# Connect to robot
robot = make_robot_from_config(robot_config)
robot.connect()
# Inference loop
policy.eval()
# Reset the policy and processors
policy.reset()
preprocessor.reset()
postprocessor.reset()
with torch.no_grad():
while not done:
# Get observation from robot
observation = robot.get_observation()
# Build dataset-compatible frame
observation_frame = build_dataset_frame(
dataset.features,
observation,
prefix="observation"
)
# Add task instruction to complementary data
observation_frame["task"] = "pick up the red cube"
# Preprocess for model
model_input = preprocessor(observation_frame)
# Run policy
raw_action = policy.select_action(model_input)
# Postprocess action
action_transition = {TransitionKey.ACTION: raw_action}
processed = postprocessor(action_transition)
action = processed[TransitionKey.ACTION]
# Convert to robot action format
robot_action = {
key: action[i].item()
for i, key in enumerate(robot.action_features)
}
# Execute on robot
robot.send_action(robot_action)
```
## Saving and Loading Processors
Processors can be persisted and shared just like models, making them portable across different
environments and ensuring reproducibility:
### Local Save/Load
```python
# Save processor configuration and state
preprocessor.save_pretrained(
"./my_robot_processor",
config_filename="preprocessor.json" # Optional custom name
)
# The save creates:
# my_robot_processor/
# ├── preprocessor.json # Configuration
# ├── preprocessor_step_0_normalizer.safetensors # Step 0 state (stats)
# └── preprocessor_step_1_device.safetensors # Step 1 state (if any)
# Load processor
loaded = RobotProcessor.from_pretrained(
"./my_robot_processor",
config_filename="preprocessor.json"
)
```
### HuggingFace Hub Integration
The HuggingFace Hub provides a centralized place to share and version your processors.
This is particularly useful for sharing preprocessing configurations with models,
ensuring that anyone who downloads your model can reproduce your exact preprocessing pipeline.
It also enables versioning and collaboration on preprocessing strategies.
```python
# Save to HuggingFace Hub
preprocessor.save_pretrained("username/my-robot-policy")
# Load from Hub with automatic download
hub_processor = RobotProcessor.from_pretrained(
"username/my-robot-policy",
config_filename="robot_preprocessor.json",
revision="main", # Optional: specific revision
cache_dir="./cache" # Optional: local cache directory
)
# The Hub integration provides:
# - Automatic versioning with git
# - Public or private sharing
# - Download caching for efficiency
# - Integration with model repositories
```
### Loading with Overrides
Sometimes you need to modify loaded processors for new environments or datasets.
The override mechanism allows you to update specific processor configurations without modifying
the saved files:
```python
# Load processor with configuration overrides
processor = RobotProcessor.from_pretrained(
"./saved_processor",
overrides={
# Change device for different hardware
"device_processor": {"device": "cuda:1"},
# Update statistics for new dataset
"normalizer_processor": {"stats": new_dataset.meta.stats},
# Provide non-serializable objects (like tokenizers)
"tokenizer_processor": {"tokenizer": custom_tokenizer}
}
)
# Common override scenarios:
# 1. Adapting to different hardware (GPU availability)
# 2. Fine-tuning on new datasets with different statistics
# 3. Providing runtime dependencies that can't be serialized
# 4. Testing variations without creating new saved configs
```
## Creating Custom Processor Steps
Build your own processor steps for specialized transformations.
The key is implementing the required interface:
### Basic Custom Step with Registration
The registration mechanism allows your custom processors to be saved and loaded by name rather
than by module path.
This makes them more portable and easier to share:
```python
from dataclasses import dataclass
from lerobot.processor.pipeline import ProcessorStepRegistry, ObservationProcessor
# The @register decorator adds your processor to the global registry
# Use a unique name, preferably namespaced to avoid conflicts
@dataclass
@ProcessorStepRegistry.register("my_company/gaussian_noise")
class GaussianNoiseProcessor(ObservationProcessor):
"""Add Gaussian noise to observations for robustness training."""
noise_std: float = 0.01
training_only: bool = True
is_training: bool = True
def observation(self, observation):
"""Add noise to observation tensors."""
if not self.is_training and self.training_only:
return observation
noisy_obs = {}
for key, value in observation.items():
if isinstance(value, torch.Tensor) and "image" not in key:
# Add noise to non-image observations
noise = torch.randn_like(value) * self.noise_std
noisy_obs[key] = value + noise
else:
noisy_obs[key] = value
return noisy_obs
def get_config(self):
return {
"noise_std": self.noise_std,
"training_only": self.training_only,
"is_training": self.is_training
}
# Why register?
# 1. Enables saving by name: config saves "my_company/gaussian_noise" instead of full module path
# 2. More portable: Others can use your processor without your exact module structure
# 3. Version-safe: Module refactoring won't break saved configs
# 4. Cleaner configs: JSON shows readable names instead of long import paths
```
### Using Base Classes for Common Patterns
LeRobot provides base classes like `ObservationProcessor`, `ActionProcessor`, etc., that handle
the boilerplate of extracting and reinserting specific components:
```python
from lerobot.processor import ActionProcessor
@dataclass
@ProcessorStepRegistry.register("my_company/action_clipper")
class ActionClipProcessor(ActionProcessor):
"""Clip actions to safe ranges."""
min_value: float = -1.0
max_value: float = 1.0
def action(self, action):
"""Process only the action component."""
# No need to handle transition dict - base class does it
return torch.clamp(action, self.min_value, self.max_value)
def get_config(self):
return {"min_value": self.min_value, "max_value": self.max_value}
```
For more advanced processor patterns including stateful processors, see [Implement Your Own Processor](implement_your_own_processor.mdx).
## Advanced Features
### Debugging with Hooks
Processors support hooks for monitoring and debugging without modifying the pipeline code:
```python
# Define monitoring hooks
def log_shapes(step_idx: int, transition: EnvTransition):
"""Log tensor shapes after each step."""
obs = transition.get(TransitionKey.OBSERVATION)
if obs:
print(f"Step {step_idx} shapes:")
for key, value in obs.items():
if isinstance(value, torch.Tensor):
print(f" {key}: {value.shape}")
def check_nans(step_idx: int, transition: EnvTransition):
"""Check for NaN values."""
obs = transition.get(TransitionKey.OBSERVATION)
if obs:
for key, value in obs.items():
if isinstance(value, torch.Tensor) and torch.isnan(value).any():
print(f"Warning: NaN detected in {key} at step {step_idx}")
# Register hooks
processor.register_after_step_hook(log_shapes)
processor.register_after_step_hook(check_nans)
# Process data - hooks will be called after each step
output = processor(input_data)
# Remove hooks when done debugging
processor.unregister_after_step_hook(log_shapes)
processor.unregister_after_step_hook(check_nans)
```
### Step-by-Step Inspection
Use `step_through()` for detailed debugging of the transformation pipeline:
```python
# Inspect data at each transformation stage
for i, intermediate in enumerate(processor.step_through(data)):
print(f"\n=== After step {i} ===")
# Check observation shapes
obs = intermediate.get(TransitionKey.OBSERVATION)
if obs:
for key, value in obs.items():
if isinstance(value, torch.Tensor):
print(f"{key}: shape={value.shape}, "
f"dtype={value.dtype}, "
f"device={value.device}, "
f"range=[{value.min():.3f}, {value.max():.3f}]")
# Check action if present
action = intermediate.get(TransitionKey.ACTION)
if action is not None and isinstance(action, torch.Tensor):
print(f"action: shape={action.shape}, range=[{action.min():.3f}, {action.max():.3f}]")
```
### Pipeline Slicing
Extract subsets of a pipeline for testing or creating variations:
```python
# Get specific steps
first_three_steps = processor[:3] # Returns new RobotProcessor
middle_step = processor[2] # Returns single ProcessorStep
# Test individual steps
test_input = {...}
step_output = processor[0](test_input) # Test first step only
# Create variations
variant_processor = RobotProcessor(
steps=processor.steps[:-1] + [new_final_step],
name="variant"
)
```
## Best Practices and Tips
### 1. Order Matters
The sequence of processors is crucial. Follow this general order:
```python
# Preprocessing: Raw → Model-ready
1. Rename (standardize keys)
2. Batch (add dimensions)
3. Tokenize (text → tokens)
4. Normalize (scale values)
5. Device (move to GPU)
# Postprocessing: Model → Robot-ready
1. Device (move to CPU)
2. Unnormalize (restore scale)
3. Unbatch (remove dimensions if needed)
```
### 2. Registration Best Practices
```python
# Always register custom steps for better portability
@ProcessorStepRegistry.register("my_company/special_processor")
class SpecialProcessor:
...
# Use namespaced names to avoid conflicts
# Good: "my_company/augmentation"
# Bad: "augmentation" (too generic)
# Check registered processors
print(ProcessorStepRegistry.list()) # See all registered processors
```
### 3. Common Pitfalls and Solutions
**Tensor Device Mismatch:**
```python
# Problem: RuntimeError: Expected all tensors on same device
# Solution: Ensure DeviceProcessor is in pipeline
preprocessor = RobotProcessor(
steps=[
NormalizerProcessor(...),
DeviceProcessor(device="cuda") # Add this
]
)
```
**Missing Statistics:**
```python
# Problem: NormalizerProcessor has no stats
# Solution 1: Compute stats from dataset
from lerobot.datasets.compute_stats import compute_stats
stats = compute_stats(dataset)
# Solution 2: Load with overrides
processor = RobotProcessor.from_pretrained(
"model_path",
overrides={"normalizer_processor": {"stats": dataset.meta.stats}}
)
```
## Next Steps
Now that you understand processors, explore these topics:
- [**Implement Your Own Processor**](implement_your_own_processor.mdx) - Deep dive into creating custom processors with advanced features like stateful processing
- [**Policy Documentation**](policies.mdx) - Learn how different policies use processors
- [**Dataset Documentation**](datasets.mdx) - Understand the data format that processors transform
- [**Training Guide**](training.mdx) - See processors in action during model training
- [**Evaluation Guide**](evaluation.mdx) - Learn about processor usage during policy evaluation
## Summary
Processors are the unsung heroes of robotics pipelines, handling the critical transformations between raw sensor data and model-ready tensors. By understanding and effectively using processors, you can:
- Build robust, reusable data pipelines
- Share preprocessing configurations across projects
- Debug data transformations systematically
- Ensure consistency between training and deployment
- Create custom transformations for specialized tasks
Remember: good preprocessing is often the difference between a model that works in theory
and one that works in practice!
The modular pipeline approach ensures your transformations are testable, reproducible,
and portable across different robots and environments.
+195
View File
@@ -0,0 +1,195 @@
# Phone
Use your phone (iOS or Android) to control your robot.
**In this guide you'll learn:**
- How to connect an iOS/Android phone
- How phone pose is mapped to robot endeffector (EE) targets
- How to tweak safety limits, gripper control, and IK settings
To use phone to control your robot, install the relevant dependencies with:
```bash
pip install lerobot[phone]
```
## Get started
### Supported platforms
- iOS: Uses the HEBI Mobile I/O app (ARKit pose + buttons). Download the app first, open it and the examples will discover it on your network and stream the phone pose and inputs.
- Android: Uses the `teleop` package (WebXR). When you start the Python process, it prints a local URL. Open the link on your phone, tap Start, then use Move to stream pose.
Links:
- Android WebXR library: [`teleop` on PyPI](https://pypi.org/project/teleop/)
- iOS app: [HEBI Mobile I/O](https://docs.hebi.us/tools.html#mobile-io)
### Phone orientation and controls
- Orientation: hold the phone with the screen facing up and the top edge pointing in the same direction as the robot gripper. This ensures calibration aligns the phones frame with the robot frame so motion feels natural.
- Enable/disable:
- iOS: Hold `B1` to enable teleoperation, release to stop. The first press captures a reference pose.
- Android: Press and hold the `Move` button, release to stop. The first press captures a reference pose.
- Gripper control:
- iOS: Analog input `A3` controls the gripper as velocity input.
- Android: Buttons `A` and `B` act like increment/decrement (A opens, B closes). You can tune velocity in the `GripperVelocityToJoint` step.
### Step 1: Choose the platform
Modify the examples to use `PhoneOS.IOS` or `PhoneOS.ANDROID` in `PhoneConfig`. The API is identical across platforms, only the input source differs. All examples are under `examples/` and have `phone_so100_*.py` variants.
Teleoperation example:
```36:43:examples/phone_so100_teleop.py
from lerobot.teleoperators.phone.config_phone import PhoneConfig, PhoneOS
teleop_config = PhoneConfig(phone_os=PhoneOS.IOS) # or PhoneOS.ANDROID
teleop_device = Phone(teleop_config)
```
### Step 2: Connect and calibrate
When `Phone(teleop_config)` is created and `connect()` is called, calibration is prompted automatically. Hold the phone in the orientation described above, then:
- iOS: press and hold `B1` to capture the reference pose.
- Android: press `Move` button on the WebXR page to capture the reference pose.
Why calibrate? We capture the current pose so subsequent poses are expressed in a robot aligned frame. When you again press the button to enable control, the position is recaptured to avoid drift when your phone is repositioned while it was disabled.
### Step 3: Run an example
Run on of the examples scripts to teleoperate, record a dataset, replay a dataset or evaluate a policy.
All scripts assume you configured your robot (e.g., SO-100 follower) and set the correct serial port.
- Android: after starting the script, open the printed local URL on your phone, tap Start, then press and hold Move.
- iOS: open HEBI Mobile I/O first; B1 enables motion. A3 controls the gripper.
You can customize mapping or safety limits by editing the processor steps shown in the examples.
You can also remap inputs (e.g., use a different analog input) or adapt the pipeline to other robots (e.g., LeKiwi) by modifying the input and kinematics steps. More about this in the [Processors for Robots and Teleoperators](./processors_robots_teleop.mdx) guide.
- Run this example to teleoperate:
```bash
python examples/phone_so100_teleop.py
```
- Run this example to record a dataset, which saves absolute end effector observations and actions:
```bash
python examples/phone_so100_record.py
```
- Run this example to replay recorded episodes:
```bash
python examples/phone_so100_replay.py
```
- Run this example to evaluate a pretrained policy:
```bash
python examples/phone_so100_eval.py
```
### Important pipeline steps and options
- Kinematics are used in multiple steps. We use [Placo](https://github.com/Rhoban/placo) which is a wrapper around Pinocchio for handling our kinematics. We construct the kinematics object by passing the robot's URDF and target frame. We set `target_frame_name` to the gripper frame.
```44:49:examples/phone_so100_teleop.py
RobotKinematics(
urdf_path="./src/lerobot/teleoperators/sim/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
)
```
- The `MapPhoneActionToRobotAction` step converts the calibrated phone pose and inputs into target deltas and gripper commands, below is shown what the step outputs.
```72:83:src/lerobot/teleoperators/phone/phone_processor.py
# Map calibrated phone pose to robot targets (enabled gates the motion)
act.update(
{
"action.enabled": enabled,
"action.target_x": -pos[1] if enabled else 0.0,
"action.target_y": pos[0] if enabled else 0.0,
"action.target_z": pos[2] if enabled else 0.0,
"action.target_wx": rotvec[1] if enabled else 0.0,
"action.target_wy": rotvec[0] if enabled else 0.0,
"action.target_wz": -rotvec[2] if enabled else 0.0,
"action.gripper": gripper,
}
)
```
- The `EEReferenceAndDelta` step converts target deltas to an absolute desired EE pose, storing a reference on enable, the `end_effector_step_sizes` are the step sizes for the EE pose and can be modified to change the motion speed.
```56:65:examples/phone_so100_teleop.py
EEReferenceAndDelta(
kinematics=kinematics_solver,
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
motor_names=list(robot.bus.motors.keys()),
)
```
- The `EEBoundsAndSafety` step clamps EE motion to a workspace and checks for large ee step jumps to ensure safety. The `end_effector_bounds` are the bounds for the EE pose and can be modified to change the workspace. The `max_ee_step_m` and `max_ee_twist_step_rad` are the step limits for the EE pose and can be modified to change the safety limits.
```61:66:examples/phone_so100_teleop.py
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.10,
max_ee_twist_step_rad=0.50,
)
```
- The `GripperVelocityToJoint` step turns a velocitylike gripper input into absolute gripper position using the current measured state. The `speed_factor` is the factor by which the velocity is multiplied.
```78:81:examples/phone_so100_teleop.py
GripperVelocityToJoint(
motor_names=list(robot.bus.motors.keys()),
speed_factor=20.0,
)
```
#### Different IK initial guesses
We use different IK initial guesses in the kinematic steps. As initial guess either the current measured joints or the previous IK solution is used.
- Closed loop (used in record/eval): sets `initial_guess_current_joints=True` so IK starts from the measured joints each frame.
```71:76:examples/phone_so100_eval.py
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True, # closed loop
)
```
- Open loop (used in replay): sets `initial_guess_current_joints=False` so IK continues from the previous IK solution rather than the measured state. This preserves action stability when we replay without feedback.
```80:86:examples/phone_so100_replay.py
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=False, # open loop
)
```
### Pipeline steps explained
- MapPhoneActionToRobotAction: converts calibrated phone pose and inputs into target deltas and a gripper command. Motion is gated by an enable signal (B1 on iOS, Move on Android).
- AddRobotObservationAsComplimentaryData: reads current robot joints and inserts them under `complementary_data.raw_joint_positions` for FK/IK steps to use.
- EEReferenceAndDelta: latches a reference EE pose on enable and combines it with target deltas to produce an absolute desired EE pose each frame. When disabled, it keeps sending the last commanded pose.
- EEBoundsAndSafety: clamps the EE pose to a workspace and ratelimits jumps for safety. Also declares `action.ee.*` features.
- InverseKinematicsEEToJoints: turns an EE pose into joint positions with IK. `initial_guess_current_joints=True` is recommended for closedloop control; set `False` for openloop replay for stability.
- GripperVelocityToJoint: integrates a velocitylike gripper input into an absolute gripper position using the current measured state.
- ForwardKinematicsJointsToEE: computes `observation.state.ee.*` from observed joints for logging and training on EE state.
### Troubleshooting
- iOS not discovered: ensure HEBI Mobile I/O is open and your laptop/phone are on the same network.
- Android URL not reachable: check local you used `https` instead of `http`, use the exact IP printed by the script and allow your browser to enter and ignore the certificate issue.
- Motion feels inverted: adjust the sign flips in `MapPhoneActionToRobotAction` or swap axes to match your setup.
+148
View File
@@ -0,0 +1,148 @@
# Processors for Robots and Teleoperators
This guide shows how to build and modify processing pipelines that connect teleoperators (e.g., phone) to robots and datasets. Pipelines standardize conversions between different action/observation spaces so you can swap teleops and robots without rewriting glue code.
We use the Phone to SO100 follower examples for concreteness, but the same patterns apply to other robots.
**What you'll learn**
- Absolute vs. relative EE control: What each means, tradeoffs, and how to choose for your task.
- Three-pipeline pattern: How to map teleop actions → dataset actions → robot commands, and robot observations → dataset observations.
- Adapters (`to_transition` / `to_output`): How these convert raw dicts to `EnvTransition` and back to reduce boilerplate.
- Dataset feature contracts: How steps declare features via `transform_features(...)`, and how to aggregate/merge them for recording.
- Choosing a representation: When to store joints, absolute EE poses, or relative EE deltas—and how that affects training.
- Pipeline customization guidance: How to swap robots/URDFs safely and tune bounds, step sizes, and options like IK initialization.
### Absolute vs relative EE control
The examples in this guide use absolute end effector (EE) poses because they are easy to reason about. In practice, relative EE deltas or joint position are often preferred as learning features.
You can choose what you save and learn from the teleop and robot action spaces, joints, absolute EE, or relative EE by using/implementing the right steps (and `transform_features()`) in your pipelines.
## Three pipelines
We often compose three pipelines. Depending on your setup, some can be empty if action and observation spaces already match.
Each of these pipelines handle different conversions between different action and observation spaces. Below is a quick explanation of each pipeline.
1. Pipeline 1: Teleop action space → dataset action space (phone pose → EE targets)
2. Pipeline 2: Dataset action space → robot command space (EE targets → joints)
3. Pipeline 3: Robot observation space → dataset observation space (joints → EE pose)
Below is an example of the three pipelines that we use in the phone to SO-100 follower examples:
```69:90:examples/phone_so100_record.py
phone_to_robot_ee_pose = RobotProcessor( # teleop -> dataset action
steps=[MapPhoneActionToRobotAction(platform=teleop_config.phone_os),
AddRobotObservationAsComplimentaryData(robot=robot),
EEReferenceAndDelta(kinematics=kinematics_solver,
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
motor_names=list(robot.bus.motors.keys())),
EEBoundsAndSafety(end_effector_bounds={"min": [-1, -1, -1], "max": [1, 1, 1]},
max_ee_step_m=0.20, max_ee_twist_step_rad=0.50)],
to_transition=to_transition_teleop_action,
to_output=lambda tr: tr,
)
robot_ee_to_joints = RobotProcessor( # dataset action -> robot
steps=[InverseKinematicsEEToJoints(kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True),
GripperVelocityToJoint(motor_names=list(robot.bus.motors.keys()), speed_factor=20.0)],
to_transition=lambda tr: tr,
to_output=to_output_robot_action,
)
robot_joints_to_ee_pose = RobotProcessor( # robot obs -> dataset obs
steps=[ForwardKinematicsJointsToEE(kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()))],
to_transition=to_transition_robot_observation,
to_output=lambda tr: tr,
)
```
## Why to_transition / to_output
To convert from robot/teleoperator to pipeline and back, we use the `to_transition` and `to_output` pipeline adapters.
They standardize conversions to reduce boilerplate code, and form the bridge between the robot and teleoperators raw dicts and the pipelines `EnvTransition` format.
In the phone to SO-100 follower examples we use the following adapters:
- `to_transition_teleop_action`: transforms the teleop action dict to a pipeline transition (puts keys under `action.*`, converts scalars/arrays to tensors, keeps objects like `Rotation` intact)
- `to_output_robot_action`: transforms the pipeline transition to a robot action dict (extracts keys ending with `.pos`/`.vel` and strips `action.` prefix)
- `to_transition_robot_observation`: transforms the robot observation dict to a pipeline transition (splits state vs images; stores state under `observation.state.*` and images under `observation.images.*`)
See `src/lerobot/processor/converters.py` for more details.
## Dataset feature contracts
Dataset features are the keys saved in the dataset. Each step can declare what its dataset features are via `transform_features(...)`. We can then aggregate features per pipeline with `aggregate_pipeline_dataset_features()` and merge multiple groups with `merge_features(...)`.
Below is and example of how we declare features with the `transform_features` method in the phone to SO-100 follower examples:
```203:211:src/lerobot/robots/so100_follower/robot_kinematic_processor.py
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
# Because this is last step we specify the dataset features of this step that we want to be stored in the dataset
features["action.ee.x"] = float
features["action.ee.y"] = float
features["action.ee.z"] = float
features["action.ee.wx"] = float
features["action.ee.wy"] = float
features["action.ee.wz"] = float
return features
```
Tip: declare features at the last step that produces them (e.g., `EEBoundsAndSafety` declares `action.ee.*`, `ForwardKinematicsJointsToEE` declares `observation.state.ee.*`).
Below is an example of how we aggregate and merge features in the phone to SO-100 follower examples:
```121:145:examples/phone_so100_record.py
action_ee = aggregate_pipeline_dataset_features(
pipeline=phone_to_robot_ee_pose,
initial_features=phone.action_features,
use_videos=True,
patterns=["action.ee"],
)
gripper = aggregate_pipeline_dataset_features(
pipeline=robot_ee_to_joints,
initial_features={},
use_videos=True,
patterns=["action.gripper.pos", "observation.state.gripper.pos"],
)
observation_ee = aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose,
initial_features=robot.observation_features,
use_videos=True,
patterns=["observation.state.ee"],
)
dataset_features = merge_features(action_ee, gripper, observation_ee)
```
How it works:
- `aggregate_pipeline_dataset_features(...)`: applies `transform_features` across the pipeline and filters by patterns (images included when `use_videos=True`).
- `merge_features(...)`: combine multiple feature dicts.
- Recording uses `to_dataset_frame(...)` to build frames consistent with `dataset.features` before we call `add_frame(...)` to add the frame to the dataset.
## Guidance when customizing robot pipelines
You can store any of the following features as your action/observation space:
- Joint positions
- Absolute EE poses
- Relative EE deltas
- Other features: joint velocity, etc.
Pick what you want to use for your policy action and observation space and configure/modify the pipelines and steps accordingly.
### Different robots
- Swap `RobotKinematics` URDF and `motor_names`. Ensure `target_frame_name` points to your gripper/wrist.
### Safety first
- When changing pipelines, start with tight bounds, implement safety steps when working with real robots.
- Its advised to start with simulation first and then move to real robots.
Hope this guide helps you get started with customizing your robot pipelines, If you run into any issues at any point, jump into our [Discord community](https://discord.com/invite/s3KuuzsPFb) for support.
+74
View File
@@ -0,0 +1,74 @@
#!/usr/bin/env python
"""
Convert video dataset to image dataset for faster training.
This pre-extracts all frames from MP4 files to PNG images.
"""
import argparse
from pathlib import Path
import logging
import shutil
def convert_dataset_videos_to_images(repo_id: str, root: str | None = None):
"""Convert all videos in a LeRobot dataset to individual image files."""
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.video_utils import decode_video_frames
import torch
# Load dataset
dataset = LeRobotDataset(repo_id, root=root, download_videos=True)
total_frames_processed = 0
for ep_idx in range(dataset.meta.total_episodes):
logging.info(f"Processing episode {ep_idx}/{dataset.meta.total_episodes}")
for vid_key in dataset.meta.video_keys:
video_path = dataset.root / dataset.meta.get_video_file_path(ep_idx, vid_key)
if not video_path.exists():
logging.warning(f"Video not found: {video_path}")
continue
# Create image directory
img_dir = dataset.root / f"images/chunk-{dataset.meta.get_episode_chunk(ep_idx)}/{vid_key}"
img_dir.mkdir(parents=True, exist_ok=True)
# Decode all frames from video
# Get episode length to decode all frames
ep_length = dataset.meta.episodes[ep_idx]["length"]
timestamps = [i / dataset.fps for i in range(ep_length)]
try:
frames = decode_video_frames(video_path, timestamps, dataset.tolerance_s, dataset.video_backend)
# Save each frame as PNG
for i, frame in enumerate(frames.squeeze(0)):
img_path = img_dir / f"episode_{ep_idx:06d}_{i:06d}.png"
# Convert tensor to PIL and save
import torchvision.transforms as T
to_pil = T.ToPILImage()
pil_frame = to_pil(frame)
pil_frame.save(img_path)
total_frames_processed += len(frames.squeeze(0))
logging.info(f" Extracted {len(frames.squeeze(0))} frames to {img_dir}")
except Exception as e:
logging.error(f"Failed to process {video_path}: {e}")
continue
logging.info(f"Conversion complete! Processed {total_frames_processed} total frames")
logging.info(f"You can now use download_videos=False to use the extracted images")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convert LeRobot video dataset to images")
parser.add_argument("repo_id", help="Dataset repo ID (e.g., 'kenmacken/record-test-2')")
parser.add_argument("--root", help="Local root directory", default=None)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
convert_dataset_videos_to_images(args.repo_id, args.root)
+2
View File
@@ -33,6 +33,8 @@ class DatasetConfig:
# Root directory where the dataset will be stored (e.g. 'dataset/path').
root: str | None = None
episodes: list[int] | None = None
# Percentage of dataset to use (0-100). If set, overrides episodes parameter.
percentage: float | None = None
image_transforms: ImageTransformsConfig = field(default_factory=ImageTransformsConfig)
revision: str | None = None
use_imagenet_stats: bool = True
+15 -2
View File
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pprint import pformat
import torch
@@ -87,10 +86,24 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas
cfg.dataset.repo_id, root=cfg.dataset.root, revision=cfg.dataset.revision
)
delta_timestamps = resolve_delta_timestamps(cfg.policy, ds_meta)
# Handle percentage parameter
episodes = cfg.dataset.episodes
if cfg.dataset.percentage is not None:
# Calculate episodes based on percentage
total_episodes = ds_meta.total_episodes
num_episodes_to_use = max(1, int(total_episodes * cfg.dataset.percentage / 100))
episodes = list(range(num_episodes_to_use))
import logging
logging.info(
f"Using {cfg.dataset.percentage}% of dataset: {num_episodes_to_use}/{total_episodes} episodes"
)
dataset = LeRobotDataset(
cfg.dataset.repo_id,
root=cfg.dataset.root,
episodes=cfg.dataset.episodes,
episodes=episodes,
delta_timestamps=delta_timestamps,
image_transforms=image_transforms,
revision=cfg.dataset.revision,
@@ -13,20 +13,24 @@
# limitations under the License.
"""
This script will help you convert any LeRobot dataset already pushed to the hub from codebase version 2.0 to
2.1. It will:
This script converts a LeRobot dataset already pushed to the Hub from codebase version 2.0 to 2.1.
It downloads metadata from a SOURCE dataset repo, computes/validates per-episode stats, updates
the codebase version in `info.json`, and uploads the result to a DESTINATION dataset repo.
It will:
- Generate per-episodes stats and writes them in `episodes_stats.jsonl`
- Check consistency between these new stats and the old ones.
- Remove the deprecated `stats.json`.
- Update codebase_version in `info.json`.
- Push this new version to the hub on the 'main' branch and tags it with "v2.1".
- Push this new version to the destination repo/branch and tag it with the current codebase version.
Usage:
```bash
python -m lerobot.datasets.v21.convert_dataset_v20_to_v21 \
--repo-id=aliberts/koch_tutorial
--source-repo-id=namespace/source_dataset \
--dest-repo-id=namespace/destination_dataset \
--branch=main
```
"""
@@ -54,48 +58,67 @@ class SuppressWarnings:
def convert_dataset(
repo_id: str,
source_repo_id: str,
dest_repo_id: str,
branch: str | None = None,
num_workers: int = 4,
):
# Download metadata from the source repo at v2.0
with SuppressWarnings():
dataset = LeRobotDataset(repo_id, revision=V20, force_cache_sync=True)
dataset = LeRobotDataset(source_repo_id, revision=V20, force_cache_sync=True)
# Ensure we recompute fresh episodes stats
if (dataset.root / EPISODES_STATS_PATH).is_file():
(dataset.root / EPISODES_STATS_PATH).unlink()
# Compute and validate stats
convert_stats(dataset, num_workers=num_workers)
ref_stats = load_stats(dataset.root)
check_aggregate_stats(dataset, ref_stats)
# Update codebase version in info.json
dataset.meta.info["codebase_version"] = CODEBASE_VERSION
write_info(dataset.meta.info, dataset.root)
dataset.push_to_hub(branch=branch, tag_version=False, allow_patterns="meta/")
# delete old stats.json file
if (dataset.root / STATS_PATH).is_file:
# Remove deprecated stats.json locally so it won't be uploaded
if (dataset.root / STATS_PATH).is_file():
(dataset.root / STATS_PATH).unlink()
# Push only meta/ to destination repo
hub_api = HfApi()
if hub_api.file_exists(
repo_id=dataset.repo_id, filename=STATS_PATH, revision=branch, repo_type="dataset"
):
hub_api.delete_file(
path_in_repo=STATS_PATH, repo_id=dataset.repo_id, revision=branch, repo_type="dataset"
)
hub_api.create_repo(repo_id=dest_repo_id, private=False, repo_type="dataset", exist_ok=True)
if branch:
hub_api.create_branch(repo_id=dest_repo_id, branch=branch, repo_type="dataset", exist_ok=True)
hub_api.create_tag(repo_id, tag=CODEBASE_VERSION, revision=branch, repo_type="dataset")
hub_api.upload_folder(
repo_id=dest_repo_id,
folder_path=str(dataset.root),
repo_type="dataset",
revision=branch,
allow_patterns="meta/",
)
# Ensure old stats.json is deleted on destination
if hub_api.file_exists(repo_id=dest_repo_id, filename=STATS_PATH, revision=branch, repo_type="dataset"):
hub_api.delete_file(path_in_repo=STATS_PATH, repo_id=dest_repo_id, revision=branch, repo_type="dataset")
# Tag destination with current codebase version
hub_api.create_tag(dest_repo_id, tag=CODEBASE_VERSION, revision=branch, repo_type="dataset")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--repo-id",
"--source-repo-id",
type=str,
required=True,
help="Repository identifier on Hugging Face: a community or a user name `/` the name of the dataset "
"(e.g. `lerobot/pusht`, `cadene/aloha_sim_insertion_human`).",
help="Source dataset repo id to download from (must be v2.0).",
)
parser.add_argument(
"--dest-repo-id",
type=str,
required=True,
help="Destination dataset repo id to upload the converted metadata to.",
)
parser.add_argument(
"--branch",
+2
View File
@@ -16,6 +16,7 @@ from .act.configuration_act import ACTConfig as ACTConfig
from .diffusion.configuration_diffusion import DiffusionConfig as DiffusionConfig
from .pi0.configuration_pi0 import PI0Config as PI0Config
from .pi0.processor_pi0 import Pi0NewLineProcessor
from .rlearn.configuration_rlearn import RLearNConfig as RLearNConfig
from .smolvla.configuration_smolvla import SmolVLAConfig as SmolVLAConfig
from .smolvla.processor_smolvla import SmolVLANewLineProcessor
from .tdmpc.configuration_tdmpc import TDMPCConfig as TDMPCConfig
@@ -28,4 +29,5 @@ __all__ = [
"SmolVLAConfig",
"TDMPCConfig",
"VQBeTConfig",
"RLearNConfig",
]
+19
View File
@@ -34,6 +34,7 @@ from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig
from lerobot.policies.pi0.configuration_pi0 import PI0Config
from lerobot.policies.pi0fast.configuration_pi0fast import PI0FASTConfig
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.rlearn.configuration_rlearn import RLearNConfig
from lerobot.policies.sac.configuration_sac import SACConfig
from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig
from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig
@@ -80,6 +81,10 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
from lerobot.policies.smolvla.modeling_smolvla import SmolVLAPolicy
return SmolVLAPolicy
elif name == "rlearn":
from lerobot.policies.rlearn.modeling_rlearn import RLearNPolicy
return RLearNPolicy
else:
raise NotImplementedError(f"Policy with name {name} is not implemented.")
@@ -103,6 +108,8 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
return SmolVLAConfig(**kwargs)
elif policy_type == "reward_classifier":
return RewardClassifierConfig(**kwargs)
elif policy_type == "rlearn":
return RLearNConfig(**kwargs)
else:
raise ValueError(f"Policy type '{policy_type}' is not available.")
@@ -220,6 +227,13 @@ def make_processor(
cast(SmolVLAConfig, policy_cfg), dataset_stats=kwargs.get("dataset_stats")
)
elif policy_cfg.type == "rlearn":
from lerobot.policies.rlearn.processor_rlearn import make_rlearn_processor
processors = make_rlearn_processor(
cast(RLearNConfig, policy_cfg), dataset_stats=kwargs.get("dataset_stats")
)
else:
raise NotImplementedError(f"Processor for policy type '{policy_cfg.type}' is not implemented.")
@@ -230,6 +244,7 @@ def make_policy(
cfg: PreTrainedConfig,
ds_meta: LeRobotDatasetMetadata | None = None,
env_cfg: EnvConfig | None = None,
episode_data_index: dict | None = None,
) -> PreTrainedPolicy:
"""Make an instance of a policy class.
@@ -287,6 +302,10 @@ def make_policy(
cfg.input_features = {key: ft for key, ft in features.items() if key not in cfg.output_features}
kwargs["config"] = cfg
# Pass episode_data_index for RLearN policy to calculate proper progress
if cfg.type == "rlearn" and episode_data_index is not None:
kwargs["episode_data_index"] = episode_data_index
if cfg.pretrained_path:
# Load a pretrained policy and override the config if needed (for example, if there are inference-time
# hyperparameters that we want to vary).
@@ -0,0 +1,128 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import NormalizationMode
@PreTrainedConfig.register_subclass("rlearn")
@dataclass
class RLearNConfig(PreTrainedConfig):
"""Configuration for a video-language conditioned reward model (RLearN).
Inputs:
- Visual frames (one or multiple cameras). Optionally a short sequence.
- A language instruction/goal string.
Output:
- Per-timestep reward logits or a single-step reward logit.
Notes:
- This follows the ReWiND paper architecture. It uses frozen vision/text encoders
(DINOv3 for vision, SigLIP2 for language) and trains a
lightweight temporal aggregator + head.
"""
# Encoders - Use SigLIP2 for both vision and text (shared checkpoint)
vision_model_name: str = "google/siglip2-base-patch16-224"
text_model_name: str = "google/siglip2-base-patch16-224"
freeze_backbones: bool = True
# Sequence length, amount of past frames including current one to use in the temporal model
max_seq_len: int = 16
# Temporal sampling stride
temporal_sampling_stride: int = 3 # Open x mostly has fps 10, and rewind has seq len 16, ours is 30fps so 30/10 = 3 stride lenght to have same timeframe!
# Model dimensions and transformer
dim_model: int = 512
num_layers: int = 4
num_heads: int = 8
ff_mult: int = 4 # Feed-forward multiplier, hidden = dim_model * ff_mult
dropout: float = 0.05
# --- reward head options ---
use_categorical_rewards: bool = False # classification over bins
num_reward_bins: int = 25
reward_min_value: float = 0.0 # for HL-Gauss range
reward_max_value: float = 1.0
use_hl_gauss_loss: bool = True # if False -> plain regression
hl_gauss_num_bins: int = 25 # histogram resolution
# Inference-time subsampling and regularization
inference_stride: int = 1 # inference_stride is an extra, second downsampling applied in forward after window sampling/rewind. Keep it at 1 to disable extra skipping
frame_dropout_p: float = 0.10
# Training
learning_rate: float = 5e-4
weight_decay: float = 0.01
head_lr_multiplier: float = 5.0
logit_eps: float = 1e-4
regularizer_warmup_steps: int = 500
# Performance optimizations
use_amp: bool = False
compile_model: bool = True
# ReWiND augmentation
rewind_prob: float = 0.3 #0.8
rewind_last3_prob: float = 0.0 #0.3
mismatch_prob: float = 0.0 #0.2
# Normalization presets
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.MEAN_STD,
}
)
# Required path to episodes.jsonl for episode boundaries
episodes_jsonl_path: str | None = "meta/episodes.jsonl"
def validate_features(self) -> None:
# Require at least one image feature. Language is recommended but optional (can be blank).
if not self.image_features:
raise ValueError(
"You must provide at least one image feature for RLearN (e.g. 'observation.image')."
)
@property
def observation_delta_indices(self) -> list | None:
# Request a long enough context so in-window stride sampling can be >1.
# We ask for (max_seq_len * temporal_sampling_stride) frames ending at t=0.
# Example: max_seq_len=16, temporal_sampling_stride=3 → 48 deltas → ~46 frames available.
total_needed = self.max_seq_len * max(1, int(self.temporal_sampling_stride))
return list(range(1 - total_needed, 1))
@property
def action_delta_indices(self) -> list | None:
# Not an action chunking policy.
return None
@property
def reward_delta_indices(self) -> list | None:
# ReWiND generates progress labels on-the-fly, doesn't need reward data
return None
def get_optimizer_preset(self): # type: ignore[override]
from lerobot.optim.optimizers import AdamWConfig
return AdamWConfig(lr=self.learning_rate, weight_decay=self.weight_decay)
def get_scheduler_preset(self): # type: ignore[override]
# No scheduler by default.
return None
+392
View File
@@ -0,0 +1,392 @@
#!/usr/bin/env python
"""
Standalone evaluation script for RLearN models.
This script evaluates RLearN reward models on episodes from a dataset,
generating comparison plots between ground truth rewards and model predictions.
Usage:
python src/lerobot/policies/rlearn/eval_script.py --model MODEL_NAME --dataset DATASET_REPO --episodes N
Example:
python src/lerobot/policies/rlearn/eval_script.py --model pepijn223/rlearn_18 --dataset pepijn223/phone_pipeline_pickup1 --episodes 2
"""
import argparse
import os
import sys
from pathlib import Path
# Add src to path for imports
sys.path.append(str(Path(__file__).parent.parent.parent.parent))
import warnings
import matplotlib.pyplot as plt
import numpy as np
import torch
from scipy.stats import spearmanr
from tqdm import tqdm
warnings.filterwarnings("ignore")
# LeRobot imports
from lerobot.constants import OBS_IMAGE, OBS_IMAGES, OBS_LANGUAGE
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.policies.rlearn.modeling_rlearn import RLearNPolicy
def _to_chw_float01(img):
"""Ensure CHW float in [0,1]."""
if isinstance(img, np.ndarray):
img = torch.from_numpy(img)
# HWC -> CHW if needed
if len(img.shape) == 3 and img.shape[-1] in (1, 3, 4):
img = img.permute(2, 0, 1)
if img.dtype == torch.uint8:
img = img.float() / 255.0
else:
img = img.float()
return torch.clamp(img, 0.0, 1.0)
def _get_language(frame_data):
lang = None
if OBS_LANGUAGE in frame_data:
lang = frame_data[OBS_LANGUAGE]
if isinstance(lang, list) and len(lang) > 0:
lang = lang[0]
elif "task" in frame_data:
lang = frame_data["task"]
return lang if isinstance(lang, str) else "No language provided"
def _get_ground_truth_reward(frame_data):
"""Try common keys for ground-truth reward. Return None if unavailable."""
for key in ("reward", "rewards", "gt_reward", "progress"):
if key in frame_data:
r = frame_data[key]
# unwrap single-element lists/arrays
if isinstance(r, (list, np.ndarray)) and np.array(r).size == 1:
r = float(np.array(r).reshape(-1)[0])
try:
return float(r)
except Exception:
pass
return None
def extract_episode_frames_and_gt(dataset, episode_idx):
"""Load a full episode: frames (T, C, H, W), language (str), gt_rewards (np.ndarray or None)."""
ep_start = dataset.episode_data_index["from"][episode_idx].item()
ep_end = dataset.episode_data_index["to"][episode_idx].item()
T = ep_end - ep_start
frames = []
gt_rewards = []
language = None
for t in range(T):
item = dataset[ep_start + t]
# image(s)
if OBS_IMAGES in item:
img = item[OBS_IMAGES]
elif OBS_IMAGE in item:
img = item[OBS_IMAGE]
else:
# try to find an image-like key
img_keys = [k for k in item.keys() if "image" in k.lower()]
if not img_keys:
continue
img = item[img_keys[0]]
frames.append(_to_chw_float01(img))
# language once
if language is None:
language = _get_language(item)
# ground-truth reward (optional)
r = _get_ground_truth_reward(item)
gt_rewards.append(r)
if not frames:
return None, None, None
frames = torch.stack(frames) # (T, C, H, W)
# If all GT entries are None, treat as missing
if all(r is None for r in gt_rewards):
gt_rewards = None
else:
# Replace None by forward filling
arr = np.array([np.nan if r is None else float(r) for r in gt_rewards], dtype=float)
# forward/back fill
if np.isnan(arr[0]):
first_valid = np.flatnonzero(~np.isnan(arr))
if len(first_valid) > 0:
arr[0] = arr[first_valid[0]]
else:
arr[0] = 0.0
for i in range(1, len(arr)):
if np.isnan(arr[i]):
arr[i] = arr[i - 1]
gt_rewards = arr
return frames, language or "No language provided", gt_rewards
@torch.no_grad()
def predict_rewards_sliding(model, frames, language, max_seq_len=16, batch_size=64, device="cuda", temporal_stride: int | None = None):
"""
Sliding-window prediction: for each frame i, create a window [max(0, i-L+1) .. i],
left-pad by repeating the first frame to length L (<= 16), and take the prediction
corresponding to the current frame's position in the window.
Returns np.ndarray of shape (T,).
"""
T = frames.shape[0]
cfg = getattr(model, "config", object())
L = int(getattr(cfg, "max_seq_len", max_seq_len))
L = min(L, max_seq_len) # hard-cap at 16
# Use the same temporal stride as training (skip s-1 frames, take 1)
if temporal_stride is None:
temporal_stride = int(getattr(cfg, "temporal_sampling_stride", 1))
temporal_stride = max(1, int(temporal_stride))
# Preprocessed tensor on device
frames = frames.to(device)
windows = []
frame_positions = [] # Track which temporal position each frame should use
left_pad_counts = [] # Number of left-pad (OOB) frames per window
for i in range(T):
# Build indices with stride s: [..., i-3, i] etc., left-padded by clamping to 0
idxs = [i - (L - 1 - j) * temporal_stride for j in range(L)]
pad_needed = sum(1 for k in idxs if k < 0)
clamped = [0 if k < 0 else (T - 1 if k >= T else k) for k in idxs]
window = frames[clamped] # (L, C, H, W)
# Use the last temporal position (current frame) for reading model output
frame_pos = L - 1
windows.append(window)
frame_positions.append(frame_pos)
left_pad_counts.append(pad_needed)
preds = np.zeros(T, dtype=float)
for s in range(0, T, batch_size):
e = min(s + batch_size, T)
batch_windows = torch.stack(windows[s:e]) # (B, L, C, H, W)
batch_positions = frame_positions[s:e]
batch = {OBS_IMAGES: batch_windows, OBS_LANGUAGE: [language] * (e - s)} # expects (B, L, C, H, W)
# Model returns (B, L) predictions for each temporal position
values = model.predict_rewards(batch) # torch.Tensor (B, L)
# Apply eval-time padding rule: predictions for left-padded (OOB) frames are zero
if values.dim() == 2 and len(left_pad_counts) >= (e - s):
for b_idx in range(e - s):
pad_n = left_pad_counts[s + b_idx]
if pad_n > 0:
values[b_idx, :pad_n] = 0.0
# Debug output removed - issue was identified and fixed
if values.dim() == 2:
# Extract the prediction corresponding to each frame's position in its window
batch_preds = []
for b_idx, pos in enumerate(batch_positions):
batch_preds.append(values[b_idx, pos].item())
preds[s:e] = np.array(batch_preds)
else:
# Fallback: if model returns (B,), use as is
preds[s:e] = values.detach().float().cpu().numpy()
return preds
def plot_episode_eval(episode_idx, gt, pred, language, save_path=None, show=False, title_prefix="RLearN Eval"):
"""Plot GT vs Predicted over time. Saves PNG if save_path is provided."""
T = len(pred)
x = np.arange(T)
plt.figure(figsize=(14, 8))
plt.plot(x, pred, linewidth=2.5, marker="o", markersize=3, label="Predicted Reward", color="blue")
if gt is not None:
plt.plot(x, gt, linestyle="--", linewidth=2.5, label="Ground-Truth Reward", color="orange")
# Correlation between GT and Pred
corr, p = spearmanr(gt, pred)
corr_str = f"ρ(GT, Pred) = {0.0 if np.isnan(corr) else corr:.3f} (p={0.0 if np.isnan(p) else p:.3f})"
else:
expected = np.linspace(0, 1, T)
plt.plot(x, expected, linestyle="--", linewidth=2.5, label="Expected Progress (0→1)", color="orange")
corr, p = spearmanr(x, pred)
corr_str = f"VOC-S ρ(t, Pred) = {0.0 if np.isnan(corr) else corr:.3f} (p={0.0 if np.isnan(p) else p:.3f})"
plt.title(f"{title_prefix} — Episode {episode_idx}\n{language}\n{corr_str}", fontsize=14)
plt.xlabel("Frame Index", fontsize=12)
plt.ylabel("Reward / Progress", fontsize=12)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
if save_path is not None:
plt.savefig(save_path, dpi=200, bbox_inches="tight")
print(f"Saved eval image to: {save_path}")
if show:
plt.show()
else:
plt.close()
def eval_episode_sliding(
episode_idx, dataset, model, save_dir=".", device="cuda", max_seq_len=16, batch_size=64, title_prefix="RLearN Eval"
):
"""End-to-end: load episode, predict with sliding 16-frame windows, and save PNG."""
frames, language, gt = extract_episode_frames_and_gt(dataset, episode_idx)
if frames is None:
print(f"[Episode {episode_idx}] No frames found.")
return None
model.eval()
pred = predict_rewards_sliding(
model=model, frames=frames, language=language, max_seq_len=max_seq_len, batch_size=batch_size, device=device
)
# Basic stats
print(f"Episode {episode_idx}: T={len(pred)}, pred∈[{pred.min():.3f},{pred.max():.3f}]")
if gt is not None:
print(f"GT available: gt∈[{np.nanmin(gt):.3f},{np.nanmax(gt):.3f}]")
save_path = f"{save_dir}/episode_{episode_idx:04d}_eval.png"
plot_episode_eval(
episode_idx=episode_idx, gt=gt, pred=pred, language=language, save_path=save_path, show=False, title_prefix=title_prefix
)
return save_path
def main():
"""Main evaluation script for RLearN models."""
# Parse command line arguments
parser = argparse.ArgumentParser(description="Evaluate RLearN model on episodes with GT vs Predicted rewards")
parser.add_argument("--model", type=str, required=True, help="Model name/path (e.g., pepijn223/rlearn_mse5)")
parser.add_argument("--dataset", type=str, required=True, help="Dataset repo (e.g., pepijn223/phone_pipeline_pickup1)")
parser.add_argument("--episodes", type=int, default=5, help="Number of episodes to evaluate")
parser.add_argument("--output", type=str, default="./eval_results", help="Output directory for images")
parser.add_argument(
"--device",
type=str,
default="cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu",
help="Device to use",
)
parser.add_argument("--batch_size", type=int, default=32, help="Batch size for sliding window evaluation")
args = parser.parse_args()
# Create output directory
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
print("🎯 RLearN Model Evaluation")
print("=" * 60)
print(f"Model: {args.model}")
print(f"Dataset: {args.dataset}")
print(f"Episodes: {args.episodes}")
print(f"Device: {args.device}")
print(f"Output: {output_dir}")
print("=" * 60)
try:
# Load dataset
print("📁 Loading dataset...")
dataset = LeRobotDataset(
repo_id=args.dataset,
episodes=list(range(min(args.episodes, 50))), # Load enough episodes
download_videos=True,
)
print(f"✅ Dataset loaded: {dataset.num_episodes} episodes, {dataset.num_frames} frames")
print(f" Features: {list(dataset.features.keys())}")
print(f" FPS: {dataset.fps}")
# Load model
print("\n🤖 Loading model...")
model = RLearNPolicy.from_pretrained(args.model)
model = model.to(args.device)
model.eval()
print(f"✅ Model loaded on {args.device}")
print(f" Parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f" Trainable: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")
print(f" Max sequence length: {model.config.max_seq_len}")
# Select episodes to evaluate
total_available = min(dataset.num_episodes, args.episodes)
episode_indices = list(range(total_available))
print(f"\n📊 Evaluating {len(episode_indices)} episodes...")
print("=" * 60)
# Run sliding window evaluation on each episode
saved_paths = []
for i, ep_idx in enumerate(episode_indices):
print(f"\n[{i+1}/{len(episode_indices)}] Processing Episode {ep_idx}")
print("-" * 40)
try:
save_path = eval_episode_sliding(
episode_idx=ep_idx,
dataset=dataset,
model=model,
save_dir=str(output_dir),
device=args.device,
batch_size=args.batch_size,
title_prefix="RLearN Ground Truth vs Predicted",
)
if save_path:
saved_paths.append(save_path)
except Exception as e:
print(f"❌ Error processing episode {ep_idx}: {e}")
import traceback
traceback.print_exc()
continue
# Summary
print("\n" + "=" * 60)
print("✅ EVALUATION COMPLETE")
print(f"📈 Generated {len(saved_paths)} evaluation plots")
print(f"📁 Results saved to: {output_dir}")
print("\nGenerated files:")
for path in saved_paths:
print(f"{path}")
if saved_paths:
print(f"\n💡 View the plots to compare ground truth vs predicted rewards!")
print(f" Each plot shows the model's sliding 16-frame window predictions")
print(f" against available ground truth rewards over the episode timeline.")
return 0
except Exception as e:
print(f"❌ Error during evaluation: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit(main())
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,128 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from typing import Any
from lerobot.configs.types import PolicyFeature
from lerobot.constants import OBS_LANGUAGE
from lerobot.policies.rlearn.configuration_rlearn import RLearNConfig
from lerobot.processor import (
DeviceProcessor,
NormalizerProcessor,
RenameProcessor,
RobotProcessor,
ToBatchProcessor,
TokenizerProcessor,
UnnormalizerProcessor,
)
from lerobot.processor.pipeline import (
ComplementaryDataProcessor,
EnvTransition,
ProcessorStepRegistry,
TransitionKey,
)
def make_rlearn_processor(
config: RLearNConfig, dataset_stats: dict[str, dict[str, Any]] | None = None
) -> tuple[RobotProcessor, RobotProcessor]:
"""Build pre/post processors for RLearN.
Responsibilities moved out of the model:
- Normalize inputs (images) using dataset stats
- Ensure batching
- Map complementary_data.task to observation.language when available
- Tokenize language into observation.language.tokens / attention_mask
- Move to/from device
"""
input_steps = [
# No renaming by default, but keep for future extensibility
RenameProcessor(rename_map={}),
# Move heavy normalization to GPU after transfer for better parallelism
ToBatchProcessor(),
RLearnLanguageFromTaskProcessor(),
# Use SigLIP2 for tokenizer to keep vocab aligned with text tower
TokenizerProcessor(
tokenizer_name=config.text_model_name,
max_length=64,
padding="max_length",
truncation=True,
padding_side="right",
),
DeviceProcessor(device=config.device),
# Move normalization after GPU transfer to use GPU acceleration
NormalizerProcessor(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
]
output_steps = [
DeviceProcessor(device="cpu"),
UnnormalizerProcessor(
features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats
),
]
return RobotProcessor(steps=input_steps, name="robot_preprocessor"), RobotProcessor(
steps=output_steps, name="robot_postprocessor"
)
@dataclass
@ProcessorStepRegistry.register(name="rlearn_language_from_task")
class RLearnLanguageFromTaskProcessor(ComplementaryDataProcessor):
"""Copy complementary_data['task'] into observation['observation.language'] if present.
This ensures the model can consume a raw language string when tokenization is not used,
while TokenizerProcessor can still create tokenized fields.
"""
task_key: str = "task"
def __call__(self, transition: EnvTransition) -> EnvTransition: # type: ignore[override]
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA)
if not complementary_data or self.task_key not in complementary_data:
return transition
task = complementary_data.get(self.task_key)
if task is None:
return transition
# Normalize to list[str]
if isinstance(task, str):
task_list = [task]
elif isinstance(task, list) and all(isinstance(t, str) for t in task):
task_list = task
else:
return transition
observation = transition.get(TransitionKey.OBSERVATION) or {}
# Do not overwrite if user already provided observation.language
if OBS_LANGUAGE not in observation:
observation[OBS_LANGUAGE] = task_list
transition[TransitionKey.OBSERVATION] = observation
return transition
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: # noqa: D401
# Adds nothing to features; only mirrors complementary_data.task into observation
return features
def get_config(self) -> dict[str, Any]:
return {"task_key": self.task_key}
+101
View File
@@ -0,0 +1,101 @@
## General Value/Reward Learning:
I want to implement a general/universal vision and language value function or reward model for robotics/video tasks. Also called a video language conditioned reward model. Integrated with already existing LeRobot code if convenient, use the LeRobot Dataset for dataset and store the reward for a frame in the lerobot frame itself.
Inspired by these papers:
- ReWiND; https://arxiv.org/pdf/2505.10911 (Most applicable and main paper I want to implement ideas from) and code: https://github.com/lucidrains/rewind-reward-pytorch
- LIV; https://arxiv.org/pdf/2306.00958 (Most applicable and 2nd main paper I want to implement ideas from) and code https://github.com/penn-pal-lab/LI
- VLC: Video-Language Critic: Transferable Reward Functions for Language-Conditioned Robotics: https://arxiv.org/pdf/2405.19988 (Most applicable and 3rd paper I want to implement ideas from) and code: https://github.com/minttusofia/video_language_critic
And these papers which are also relevant:
- https://www.dyna.co/dyna-1/research (Main company I want to reproduce the eventual results from)
- vip; https://arxiv.org/pdf/2210.00030
- uvd; https://arxiv.org/pdf/2310.08581
- vlm in context; https://arxiv.org/pdf/2411.04549
- https://www.youtube.com/watch?v=JfZYtpEisoM
Little less relevant but still similar papers:
- Learning Generalizable Robotic Reward Functions from “In-The-Wild” Human Videos,
- XIRL: Cross-embodiment Inverse Reinforcement Learning,
- Video-Language Critic: Transferable Reward https://arxiv.org/pdf/2405.19988
- Functions for Language-Conditioned Robotics,
- LORel, Language-Driven Representation Learning for Robotics https://sites.google.com/view/robotlorel
- RoboCLIP: One Demonstration is Enough to Learn Robot Policies https://arxiv.org/pdf/2310.07899
- Points2Rewards: learn first key points and then uses the keypoints to learn general value function/policy https://semrob.github.io/docs/2025_rss_semrob.github.io_paper20.pdf
- Language-Driven Representation Learning for Robotics: https://arxiv.org/pdf/2302.12766v1
- R3M: A Universal Visual Representation for Robot Manipulation: https://arxiv.org/pdf/2203.12601v3
Input should be the current image or whole video and the task goal specified in text/language. Output is current reward.
Archiutecture:
_ inputs: video o1:T (or current o1:t), language z;
_ DINO v3 ViT-B/16 (86M params): https://huggingface.co/facebook/dinov3-vitb16-pretrain-lvd1689m for vision encoding
\_ sentence-transformers/all-MiniLM-L12-v2: https://huggingface.co/sentence-transformers/all-MiniLM-L12-v2 for text encoding \* Temporal module: small causal transformer ("cross-modal sequential aggregator"), with first-frame positional embedding (to avoid position cheating), frame-dropout, and stride sampling; outputs per-timestep logits.
Loss: See this chatgpt thread: https://chatgpt.com/s/t_68999a50a0b081919abc365cdd205e01
Past images: (for example a reward method go to 3rd floor, has to know what floor it was on and what pas actions it did, can we attend or encorperate images of decision from history in one way?) Maybe via this paper: Learning Long-Context Diffusion Policies via Past-Token Prediction
Amount of frames needed for test/generalization: 1M frames? or ~20% of IPEC-COMMUNITY/bc_z_lerobot
Eval:
Implement something like voc score , or ROC rank order correlation between reward leanredna and ev reward from sim, or use something else to do additional evaluation
Ideas:
- Incorporate training on multiple horizons: as in label same dataset for longer horizons: make a sandwich (long), put cheese on bread (medium) and even smaller horizons: go down or close gripper (small)
- Incorporate navigation goals “walk towards the kitchen”, make sure we fix CLIP contrastive learning issue of positional text misunderstanding where model doesnnt learn difference between "horse right of cow" and "horse left of cow" “Move right” potentially train with more other data or even actionable world models such as Genie 3 (https://deepmind.google/discover/blog/genie-3-a-new-frontier-for-world-models/)
How to use a general reward model (use cases): - Train rl policy on it - Success detection - Do exploraion - Do task via planning and search to optimize reward - Filter out bad episodes in large datasets from imitation learning
Potential Datasets: (start with dataset that is most clean for this and works best with chosen way of doing evals)
_ Epic-Kitchens-100
_ Something-Something v. 2 Dataset https://www.qualcomm.com/developer/software/something-something-v-2-dataset
_ Ego4D (3000 hours)
_ Open X-Embodiment (OXE)
\_ Agi bot world: https://huggingface.co/datasets/agibot-world/AgiBotWorld-Alpha
- GalexiAI dataset: https://opengalaxea.github.io/G0/
_ GTEA+ Gaze: https://cbs.ic.gatech.edu/fpv/
_ YouCook2 dataset
\_ HOWTO100M: https://www.di.ens.fr/willow/research/howto100m/
- Genie generated dataset?
### TODOs:
- Implement first architecture [x]
- Implement processors [x]
- Choose right loss metric(s) [x]
- Make dataset with script that generated the dataset (IPEC-COMMUNITY/bc_z_lerobot) ready in lerobot format (and be able to visualize in dataset visualizer)
- Annotate with ReWiND-style 0→1 progress rewards [x]
- Visualize to check [x]
- Implement eval score or metric that is robust and can deal with generalization/is a good metric to try different architectures. And use it in an eval jupyter notebook with visalization of the live reward next to the video for part of the dataset: VOC score and score with correct and incorrect language captions [x]
- Do first training [x]
- Implement on-the-fly progress label generation (no need for pre-annotated rewards) [x]
- Try different losses
- Only rewind loss [x]
- Exactly similar to: https://github.com/lucidrains/rewind-reward-pytorch/blob/main/rewind_reward_pytorch/rewind_reward.py#L11 [x]
- Try DINO v2 as encoder Base 86 M: with https://huggingface.co/sentence-transformers/all-MiniLM-L12-v2 [x]
- Test rewind (evaluate) [x]
- benchmark siglip 2 vs this implementation forward pass, debug speed [x]
- use siglip 2 [x]
- Fix evaluation bug !!! []
- Fix sample episode padding bug !!! []
- Overfit on one episode []
- Cleanup code? [] + enable language loss
- Convert python -m lerobot.datasets.v21.convert_dataset_v20_to_v21 --repo-id=IPEC-COMMUNITY/bc_z_lerobot and train on 1 percent
- Then on 10 percent []
- Ablation 16 sucessive frame vs 16 frame samples with stride 2 or 4 []
- Add more artificial text to dataset generated by vlm (google gemini) []
- See google gemini vlm caption [] https://gemini.google.com/app/7e332ffaf32580f2
- Multiple captions per video, creat method to generate as much data as possible etc [] https://arxiv.org/abs/2508.13446, https://arxiv.org/pdf/2412.04453
- Add other datasets from OXE metioned in rewind []
- Extend evaluation []
- Ablation for size vision encoder, language encoder, temporal head []
- Ablation one mlp head per frame or single mlp head []
- Add other datasets metnioned here []
- How can we improve spatial aware learning? solve issue of Contrastive learning and position []
+7 -4
View File
@@ -145,10 +145,13 @@ class TokenizerProcessor:
observation = dict(observation) # Make a copy
# Add tokenized data to observation
observation[f"{OBS_LANGUAGE}.tokens"] = tokenized_prompt["input_ids"]
observation[f"{OBS_LANGUAGE}.attention_mask"] = tokenized_prompt["attention_mask"].to(
dtype=torch.bool
)
input_ids = tokenized_prompt["input_ids"]
attention_mask = tokenized_prompt.get("attention_mask")
if attention_mask is None:
# Some tokenizers (e.g., SigLIP text) may not return attention_mask; default to ones
attention_mask = torch.ones_like(input_ids)
observation[f"{OBS_LANGUAGE}.tokens"] = input_ids
observation[f"{OBS_LANGUAGE}.attention_mask"] = attention_mask.to(dtype=torch.bool)
transition[TransitionKey.OBSERVATION.value] = observation # type: ignore[misc]
return transition
+172 -2
View File
@@ -14,12 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import time
from contextlib import nullcontext
from pprint import pformat
from typing import Any
import torch
# Fix tokenizer parallelism conflicts with multiprocessing
os.environ["TOKENIZERS_PARALLELISM"] = "false"
from termcolor import colored
from torch.amp import GradScaler
from torch.optim import Optimizer
@@ -67,10 +72,18 @@ def update_policy(
start_time = time.perf_counter()
device = get_device_from_parameters(policy)
policy.train()
# Forward pass timing
forward_start = time.perf_counter()
with torch.autocast(device_type=device.type) if use_amp else nullcontext():
loss, output_dict = policy.forward(batch)
# TODO(rcadene): policy.unnormalize_outputs(out_dict)
forward_time = time.perf_counter() - forward_start
# Backward pass timing
backward_start = time.perf_counter()
grad_scaler.scale(loss).backward()
backward_time = time.perf_counter() - backward_start
# Unscale the gradient of the optimizer's assigned params in-place **prior to gradient clipping**.
grad_scaler.unscale_(optimizer)
@@ -81,6 +94,9 @@ def update_policy(
error_if_nonfinite=False,
)
# Optimizer step timing
optim_start = time.perf_counter()
# Optimizer's gradients are already unscaled, so scaler.step does not unscale them,
# although it still skips optimizer.step() if the gradients contain infs or NaNs.
with lock if lock is not None else nullcontext():
@@ -97,6 +113,47 @@ def update_policy(
if has_method(policy, "update"):
# To possibly update an internal buffer (for instance an Exponential Moving Average like in TDMPC).
policy.update()
optim_time = time.perf_counter() - optim_start
total_time = time.perf_counter() - start_time
# Collect timing statistics for RLearN policy (averaged reporting every minute)
if getattr(policy, "name", None) == "rlearn":
# Initialize timing accumulator if not exists
if not hasattr(policy, '_train_timing_stats'):
policy._train_timing_stats = {
'forward_times': [],
'backward_times': [],
'optim_times': [],
'total_times': [],
'last_print_time': time.perf_counter()
}
# Accumulate current step's timings
stats = policy._train_timing_stats
stats['forward_times'].append(forward_time * 1000)
stats['backward_times'].append(backward_time * 1000)
stats['optim_times'].append(optim_time * 1000)
stats['total_times'].append(total_time * 1000)
# Print averaged stats every minute (60 seconds)
current_time = time.perf_counter()
if current_time - stats['last_print_time'] >= 60.0:
n_samples = len(stats['forward_times'])
if n_samples > 0:
print(f"\nTraining Step Average Timing (last {n_samples} steps):")
print(f" Forward pass: {sum(stats['forward_times'])/n_samples:.2f} ms")
print(f" Backward pass: {sum(stats['backward_times'])/n_samples:.2f} ms")
print(f" Optimizer step: {sum(stats['optim_times'])/n_samples:.2f} ms")
print(f" Total update: {sum(stats['total_times'])/n_samples:.2f} ms")
print(f" Avg steps/sec: {1000.0/(sum(stats['total_times'])/n_samples):.2f}")
print("-" * 50)
# Reset stats for next minute
for key in stats:
if key != 'last_print_time':
stats[key] = []
stats['last_print_time'] = current_time
train_metrics.loss = loss.item()
train_metrics.grad_norm = grad_norm.item()
@@ -125,6 +182,18 @@ def train(cfg: TrainPipelineConfig):
torch.backends.cuda.matmul.allow_tf32 = True
logging.info("Creating dataset")
# Force PyAV backend for RLearN (proven to be fastest)
if getattr(cfg.policy, "type", None) == "rlearn":
# Override video backend to use PyAV
if hasattr(cfg.dataset, 'video_backend'):
original_backend = cfg.dataset.video_backend
cfg.dataset.video_backend = 'pyav'
logging.info(f"RLearN: Forcing video_backend from '{original_backend}' to 'pyav' for better performance")
else:
cfg.dataset.video_backend = 'pyav'
logging.info("RLearN: Setting video_backend to 'pyav' for better performance")
dataset = make_dataset(cfg)
# Create environment used for evaluating checkpoints during training on simulation data.
@@ -136,10 +205,16 @@ def train(cfg: TrainPipelineConfig):
eval_env = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs)
logging.info("Creating policy")
# Pass episode_data_index for RLearN to calculate proper progress
episode_data_index = dataset.episode_data_index if hasattr(dataset, "episode_data_index") else None
policy = make_policy(
cfg=cfg.policy,
ds_meta=dataset.meta,
episode_data_index=episode_data_index,
)
preprocessor, postprocessor = make_processor(
policy_cfg=cfg.policy, pretrained_path=cfg.policy.pretrained_path, dataset_stats=dataset.meta.stats
)
@@ -173,6 +248,15 @@ def train(cfg: TrainPipelineConfig):
drop_n_last_frames=cfg.policy.drop_n_last_frames,
shuffle=True,
)
elif cfg.policy.type == "rlearn":
# For RLearN, drop first 15 frames to avoid padding issues with temporal windows
shuffle = False
sampler = EpisodeAwareSampler(
dataset.episode_data_index,
drop_n_first_frames=15, # Skip frames that would need padding
drop_n_last_frames=0,
shuffle=True,
)
else:
shuffle = True
sampler = None
@@ -185,6 +269,9 @@ def train(cfg: TrainPipelineConfig):
sampler=sampler,
pin_memory=device.type == "cuda",
drop_last=False,
persistent_workers=cfg.num_workers > 0, # Keep workers alive between epochs
prefetch_factor=3, # Prefetch for video pipeline
timeout=30, # Prevent hanging on video decode errors
)
dl_iter = cycle(dataloader)
@@ -197,6 +284,12 @@ def train(cfg: TrainPipelineConfig):
"update_s": AverageMeter("updt_s", ":.3f"),
"dataloading_s": AverageMeter("data_s", ":.3f"),
}
# RLearN-only: pixels per second throughput
try:
if getattr(policy, "name", None) == "rlearn":
train_metrics["pix_s"] = AverageMeter("pix/s", ":.1f")
except Exception:
pass
train_tracker = MetricsTracker(
cfg.batch_size, dataset.num_frames, dataset.num_episodes, train_metrics, initial_step=step
@@ -204,10 +297,17 @@ def train(cfg: TrainPipelineConfig):
logging.info("Start offline training on a fixed dataset")
for _ in range(step, cfg.steps):
start_time = time.perf_counter()
# Data loading timing
data_start = time.perf_counter()
batch = next(dl_iter)
data_loading_time = time.perf_counter() - data_start
# Preprocessing timing
preprocess_start = time.perf_counter()
batch = preprocessor(batch)
train_tracker.dataloading_s = time.perf_counter() - start_time
preprocess_time = time.perf_counter() - preprocess_start
train_tracker.dataloading_s = data_loading_time + preprocess_time
for key in batch:
if isinstance(batch[key], torch.Tensor):
@@ -224,6 +324,73 @@ def train(cfg: TrainPipelineConfig):
use_amp=cfg.policy.use_amp,
)
# RLearN-only: compute pixel throughput (pixels per second)
if getattr(policy, "name", None) == "rlearn":
def _count_pixels(x: torch.Tensor) -> int:
# Expect shapes: (B,T,C,H,W) or (B,C,H,W)
if x.dim() == 5:
b, t, _, h, w = x.shape
return int(b * t * h * w)
if x.dim() == 4:
b, _, h, w = x.shape
return int(b * h * w)
return 0
total_pixels = 0
for k, v in batch.items():
if "image" not in k.lower():
continue
if isinstance(v, torch.Tensor):
total_pixels += _count_pixels(v)
elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], torch.Tensor):
# list of T tensors shaped (B,C,H,W)
total_pixels += sum(_count_pixels(t) for t in v)
# Avoid div-by-zero
meter = train_tracker.update_s
upd_s = meter.val if isinstance(meter, AverageMeter) else float(meter)
upd_s = max(upd_s, 1e-8)
pix_per_s = float(total_pixels) / upd_s
try:
train_tracker.pix_s = pix_per_s
except Exception:
pass
# Collect data pipeline timing for RLearN (averaged reporting every minute)
if getattr(policy, "name", None) == "rlearn":
# Initialize data timing accumulator if not exists
if not hasattr(policy, '_data_timing_stats'):
policy._data_timing_stats = {
'data_loading_times': [],
'preprocess_times': [],
'last_print_time': time.perf_counter()
}
# Accumulate current step's data timings
data_stats = policy._data_timing_stats
data_stats['data_loading_times'].append(data_loading_time * 1000)
data_stats['preprocess_times'].append(preprocess_time * 1000)
# Print averaged stats every minute (60 seconds)
current_time = time.perf_counter()
if current_time - data_stats['last_print_time'] >= 60.0:
n_samples = len(data_stats['data_loading_times'])
if n_samples > 0:
avg_data_loading = sum(data_stats['data_loading_times']) / n_samples
avg_preprocessing = sum(data_stats['preprocess_times']) / n_samples
print(f"\nData Pipeline Average Timing (last {n_samples} steps):")
print(f" Data loading: {avg_data_loading:.2f} ms")
print(f" Preprocessing: {avg_preprocessing:.2f} ms")
print(f" Total data pipeline: {avg_data_loading + avg_preprocessing:.2f} ms")
print("-" * 50)
# Reset stats for next minute
for key in data_stats:
if key != 'last_print_time':
data_stats[key] = []
data_stats['last_print_time'] = current_time
# Note: eval and checkpoint happens *after* the `step`th training update has completed, so we
# increment `step` here.
step += 1
@@ -232,6 +399,7 @@ def train(cfg: TrainPipelineConfig):
is_saving_step = step % cfg.save_freq == 0 or step == cfg.steps
is_eval_step = cfg.eval_freq > 0 and step % cfg.eval_freq == 0
if is_log_step:
logging.info(train_tracker)
if wandb_logger:
@@ -282,6 +450,8 @@ def train(cfg: TrainPipelineConfig):
wandb_logger.log_dict(wandb_log_dict, step, mode="eval")
wandb_logger.log_video(eval_info["video_paths"][0], step, mode="eval")
if eval_env:
eval_env.close()
logging.info("End of training")
+116
View File
@@ -0,0 +1,116 @@
#!/usr/bin/env python
"""
Quick benchmark to test video decoding speed across different backends.
"""
import time
from pathlib import Path
import torch
def test_video_backend(video_path, backend_name, num_frames=10):
"""Test video decoding speed for a specific backend."""
try:
from lerobot.datasets.video_utils import decode_video_frames
# Create timestamps for first N frames
fps = 30 # Assume 30fps, adjust if needed
timestamps = [i / fps for i in range(num_frames)]
# Time the decoding
start_time = time.perf_counter()
frames = decode_video_frames(video_path, timestamps, tolerance_s=1e-4, backend=backend_name)
decode_time = time.perf_counter() - start_time
frames_decoded = frames.shape[1] if frames.dim() > 1 else frames.shape[0]
ms_per_frame = (decode_time * 1000) / max(frames_decoded, 1)
print(f"{backend_name:12} | {decode_time*1000:6.1f}ms total | {ms_per_frame:6.1f}ms/frame | {frames_decoded} frames")
return decode_time, frames_decoded
except Exception as e:
print(f"{backend_name:12} | ERROR: {str(e)[:50]}...")
return float('inf'), 0
def main():
print("📦 Downloading dataset to get video file locations...")
try:
from lerobot.datasets.lerobot_dataset import LeRobotDataset
# Download the dataset - this will tell us exactly where it's stored
dataset = LeRobotDataset("kenmacken/record-test-2", download_videos=True)
print(f"✅ Dataset downloaded to: {dataset.root}")
print(f" Video keys: {dataset.meta.video_keys}")
print(f" Total episodes: {dataset.meta.total_episodes}")
# Get actual video file paths from the dataset
video_files = []
for ep_idx in range(min(2, dataset.meta.total_episodes)): # Test first 2 episodes max
for vid_key in dataset.meta.video_keys:
video_path = dataset.root / dataset.meta.get_video_file_path(ep_idx, vid_key)
if video_path.exists():
video_files.append(video_path)
break # Just need one video file for testing
if video_files:
break
if not video_files:
print("❌ No video files found after download!")
return
except Exception as e:
print(f"❌ Error downloading dataset: {e}")
# Fallback to manual search
possible_paths = [
Path.home() / ".cache/huggingface/lerobot/kenmacken/record-test-2",
Path("/tmp/huggingface/lerobot/kenmacken/record-test-2"),
Path("./datasets/record-test-2"),
]
video_files = []
print("Trying fallback search...")
for path in possible_paths:
print(f" Checking: {path}")
if path.exists():
files = list(path.rglob("*.mp4"))
if files:
video_files = files
print(f" ✅ Found {len(files)} video files!")
break
if not video_files:
print("❌ No video files found!")
return
test_video = video_files[0]
print(f"Testing video: {test_video.name}")
print(f"File size: {test_video.stat().st_size / 1024 / 1024:.1f} MB")
print("-" * 60)
backends = ["torchcodec", "pyav", "video_reader"]
results = {}
for backend in backends:
decode_time, frames = test_video_backend(test_video, backend)
results[backend] = (decode_time, frames)
print("-" * 60)
print("RECOMMENDATION:")
# Find fastest backend
valid_results = {k: v for k, v in results.items() if v[0] != float('inf')}
if valid_results:
fastest = min(valid_results.items(), key=lambda x: x[1][0])
print(f"🚀 Use '{fastest[0]}' - fastest backend!")
print(f" Add to your config: video_backend: \"{fastest[0]}\"")
slowest_time = max(valid_results.values())[0]
speedup = slowest_time / fastest[1][0]
print(f" Speedup vs slowest: {speedup:.1f}x faster")
else:
print("❌ No backends worked!")
if __name__ == "__main__":
main()
+188
View File
@@ -0,0 +1,188 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test script for RLearN evaluation metrics.
This script tests the VOC-S and success/failure detection metrics with synthetic data
to ensure they work correctly before running on real datasets.
"""
import numpy as np
from lerobot.policies.rlearn.evaluation import (
compute_success_failure_detection,
compute_voc_s,
generate_mismatched_languages,
)
def test_voc_s():
"""Test VOC-S computation with synthetic data."""
print("Testing VOC-S computation...")
# Test case 1: Perfect positive correlation (0 -> 1)
perfect_positive = [np.linspace(0, 1, 20) for _ in range(10)]
results = compute_voc_s(perfect_positive)
print("Perfect positive correlation:")
print(f" Mean: {results['voc_s_mean']:.4f} (should be ~1.0)")
print(f" IQM: {results['voc_s_iqm']:.4f} (should be ~1.0)")
assert results["voc_s_mean"] > 0.95, f"Expected >0.95, got {results['voc_s_mean']}"
# Test case 2: Perfect negative correlation (1 -> 0)
perfect_negative = [np.linspace(1, 0, 20) for _ in range(10)]
results = compute_voc_s(perfect_negative)
print("Perfect negative correlation:")
print(f" Mean: {results['voc_s_mean']:.4f} (should be ~-1.0)")
print(f" IQM: {results['voc_s_iqm']:.4f} (should be ~-1.0)")
assert results["voc_s_mean"] < -0.95, f"Expected <-0.95, got {results['voc_s_mean']}"
# Test case 3: No correlation (random)
np.random.seed(42)
random_rewards = [np.random.random(20) for _ in range(50)]
results = compute_voc_s(random_rewards)
print("Random correlation:")
print(f" Mean: {results['voc_s_mean']:.4f} (should be ~0.0)")
print(f" IQM: {results['voc_s_iqm']:.4f} (should be ~0.0)")
assert abs(results["voc_s_mean"]) < 0.3, f"Expected ~0, got {results['voc_s_mean']}"
# Test case 4: Mixed correlations
mixed = []
mixed.extend([np.linspace(0, 1, 15) for _ in range(5)]) # Positive
mixed.extend([np.linspace(1, 0, 15) for _ in range(5)]) # Negative
mixed.extend([np.random.random(15) for _ in range(5)]) # Random
results = compute_voc_s(mixed)
print("Mixed correlations:")
print(f" Mean: {results['voc_s_mean']:.4f}")
print(f" IQM: {results['voc_s_iqm']:.4f}")
print(f" Std: {results['voc_s_std']:.4f}")
print("✓ VOC-S tests passed!\n")
def test_success_failure_detection():
"""Test success/failure detection with synthetic data."""
print("Testing Success/Failure Detection...")
# Test case 1: Clear separation (correct > incorrect)
correct_rewards = [np.linspace(0, 1, 20) for _ in range(20)] # Always increasing
incorrect_rewards = [np.linspace(0, 0.3, 20) for _ in range(20)] # Lower final values
results = compute_success_failure_detection(correct_rewards, incorrect_rewards)
print("Clear separation test:")
print(f" Detection accuracy: {results['detection_accuracy']:.4f} (should be 1.0)")
print(f" Mean correct: {results['mean_correct_final']:.4f}")
print(f" Mean incorrect: {results['mean_incorrect_final']:.4f}")
print(f" Separation score: {results['separation_score']:.4f}")
assert results["detection_accuracy"] == 1.0, f"Expected 1.0, got {results['detection_accuracy']}"
# Test case 2: No separation (same distributions with some randomness)
np.random.seed(42)
same_rewards_1 = [np.random.normal(0.5, 0.05, 15) for _ in range(20)]
same_rewards_2 = [np.random.normal(0.5, 0.05, 15) for _ in range(20)]
results = compute_success_failure_detection(same_rewards_1, same_rewards_2)
print("No separation test:")
print(f" Detection accuracy: {results['detection_accuracy']:.4f} (should be ~0.5)")
print(f" Separation score: {results['separation_score']:.4f} (should be ~0.0)")
# Relax the assertion since random data can vary
assert 0.2 <= results["detection_accuracy"] <= 0.8, (
f"Expected ~0.5 (±0.3), got {results['detection_accuracy']}"
)
# Test case 3: Partial separation
np.random.seed(42)
partial_correct = [np.random.normal(0.7, 0.1, 10) for _ in range(20)]
partial_incorrect = [np.random.normal(0.4, 0.1, 10) for _ in range(20)]
results = compute_success_failure_detection(partial_correct, partial_incorrect)
print("Partial separation test:")
print(f" Detection accuracy: {results['detection_accuracy']:.4f}")
print(f" Separation score: {results['separation_score']:.4f}")
print("✓ Success/Failure Detection tests passed!\n")
def test_mismatch_generation():
"""Test mismatch language generation."""
print("Testing mismatch language generation...")
original_languages = [
"pick up the red ball",
"put the cup on the table",
"open the drawer",
"close the door",
]
# Test with default templates
mismatched = generate_mismatched_languages(original_languages)
print(f"Original languages: {len(original_languages)}")
print(f"Mismatched languages: {len(mismatched)}")
assert len(mismatched) == len(original_languages)
# Ensure they're actually different
for orig, mismatch in zip(original_languages, mismatched, strict=False):
print(f" '{orig}' -> '{mismatch}'")
assert orig != mismatch, "Mismatch should be different from original"
# Test with custom templates
custom_templates = ["dance", "sing", "jump"]
mismatched_custom = generate_mismatched_languages(original_languages, custom_templates)
print("\nWith custom templates:")
for orig, mismatch in zip(original_languages, mismatched_custom, strict=False):
print(f" '{orig}' -> '{mismatch}'")
assert mismatch in custom_templates
print("✓ Mismatch generation tests passed!\n")
def test_edge_cases():
"""Test edge cases and error handling."""
print("Testing edge cases...")
# Empty input
empty_results = compute_voc_s([])
assert empty_results["num_episodes"] == 0
assert empty_results["voc_s_mean"] == 0.0
# Single frame episodes (should be skipped)
single_frame = [np.array([0.5]) for _ in range(5)]
results = compute_voc_s(single_frame)
assert results["num_episodes"] == 0, "Single-frame episodes should be skipped"
# Constant rewards (should give correlation = 0)
constant_rewards = [np.ones(10) * 0.5 for _ in range(5)]
results = compute_voc_s(constant_rewards)
print(f"Constant rewards correlation: {results['voc_s_mean']:.4f} (should be 0.0)")
assert results["voc_s_mean"] == 0.0
# Mismatched array lengths for detection
try:
compute_success_failure_detection([np.array([1, 2])], [])
assert False, "Should have raised ValueError"
except ValueError:
pass # Expected
print("✓ Edge case tests passed!\n")
+244
View File
@@ -0,0 +1,244 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.constants import OBS_IMAGES, OBS_LANGUAGE, REWARD
from lerobot.policies.factory import make_processor
from lerobot.policies.rlearn.configuration_rlearn import RLearNConfig
from lerobot.policies.rlearn.modeling_rlearn import RLearNPolicy
from tests.utils import require_package
@require_package("transformers")
@require_package("sentence_transformers")
def test_rlearn_instantiation_and_forward_tensor_batch():
"""Instantiate RLearN and run a forward pass with a (B, T, C, H, W) tensor input using a real model and real text."""
cfg = RLearNConfig(
vision_model_name="facebook/dinov3-vitb16-pretrain-lvd1689m",
text_model_name="sentence-transformers/all-MiniLM-L12-v2",
push_to_hub=False,
freeze_backbones=True,
)
cfg.input_features = {
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
}
cfg.output_features = {
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
}
policy = RLearNPolicy(cfg)
B, T, C, H, W = 2, 3, 3, 256, 256
batch = {
OBS_IMAGES: torch.rand(B, T, C, H, W),
REWARD: torch.randint(low=0, high=1, size=(B, T)).float(),
OBS_LANGUAGE: ["move the green cube into the box" for _ in range(B)],
}
loss, logs = policy.forward(batch)
assert isinstance(loss, torch.Tensor)
assert "loss" in logs
@require_package("transformers")
@require_package("sentence_transformers")
def test_rlearn_instantiation_and_forward_list_batch_with_language():
"""Instantiate RLearN and run a forward pass with a list-of-frames input and real language using a real model."""
cfg = RLearNConfig(
vision_model_name="facebook/dinov3-vitb16-pretrain-lvd1689m",
text_model_name="sentence-transformers/all-MiniLM-L12-v2",
push_to_hub=False,
freeze_backbones=True,
)
cfg.input_features = {
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
}
cfg.output_features = {
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
}
policy = RLearNPolicy(cfg)
B, T, C, H, W = 2, 4, 3, 256, 256
frames = [torch.rand(B, C, H, W) for _ in range(T)]
batch = {
OBS_IMAGES: frames, # list[(B, C, H, W)]
REWARD: torch.randint(low=0, high=2, size=(B, T)).float(),
OBS_LANGUAGE: ["move the red cube into the box" for _ in range(B)],
}
loss, logs = policy.forward(batch)
assert isinstance(loss, torch.Tensor)
assert "loss" in logs
@require_package("transformers")
@require_package("sentence_transformers")
def test_rlearn_composite_loss_shapes_and_terms():
"""Smoke test composite loss: checks presence of terms and valid gradients."""
cfg = RLearNConfig(
vision_model_name="facebook/dinov3-vitb16-pretrain-lvd1689m",
text_model_name="sentence-transformers/all-MiniLM-L12-v2",
push_to_hub=False,
freeze_backbones=True,
use_video_rewind=True,
rewind_prob=0.5,
use_mismatch_loss=True,
)
cfg.input_features = {
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
}
cfg.output_features = {
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
}
policy = RLearNPolicy(cfg)
B, T, C, H, W = 2, 3, 3, 256, 256
# Progress labels y in [0,1]
y = torch.linspace(0, 1, T).unsqueeze(0).repeat(B, 1)
batch = {
OBS_IMAGES: torch.rand(B, T, C, H, W),
REWARD: y.clone(),
OBS_LANGUAGE: ["stack the blocks" for _ in range(B)],
}
loss, logs = policy.forward(batch)
assert isinstance(loss, torch.Tensor) and torch.isfinite(loss)
# Expect ReWiND loss terms (progress and mismatch)
assert "loss_progress" in logs
assert "loss_mismatch" in logs
@require_package("transformers")
@require_package("sentence_transformers")
def test_rlearn_preprocessor_tokenizes_and_copies_task():
cfg = RLearNConfig(
vision_model_name="facebook/dinov3-vitb16-pretrain-lvd1689m",
text_model_name="sentence-transformers/all-MiniLM-L12-v2",
device="cpu",
push_to_hub=False,
)
cfg.input_features = {
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 64, 64)),
}
cfg.output_features = {
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
}
pre, post = make_processor(cfg, dataset_stats=None)
B, C, H, W = 2, 3, 64, 64
batch = {
"observation.image": torch.rand(B, C, H, W),
REWARD: torch.zeros(B),
"task": ["pick the cube", "place it in the box"],
}
processed = pre(batch)
assert isinstance(processed, dict)
assert f"{OBS_LANGUAGE}.tokens" in processed
assert f"{OBS_LANGUAGE}.attention_mask" in processed
assert OBS_LANGUAGE in processed
tokens = processed[f"{OBS_LANGUAGE}.tokens"]
attn = processed[f"{OBS_LANGUAGE}.attention_mask"]
assert tokens.dim() == 2 and attn.dim() == 2
assert tokens.shape[0] == B and attn.shape[0] == B
@require_package("transformers")
@require_package("sentence_transformers")
def test_rlearn_preprocessor_string_task_and_to_batch():
cfg = RLearNConfig(
vision_model_name="facebook/dinov3-vitb16-pretrain-lvd1689m",
text_model_name="sentence-transformers/all-MiniLM-L12-v2",
device="cpu",
push_to_hub=False,
)
cfg.input_features = {
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 64, 64)),
}
cfg.output_features = {
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
}
pre, post = make_processor(cfg, dataset_stats=None)
# Unbatched image and single string task
batch = {
"observation.image": torch.rand(3, 64, 64),
REWARD: torch.tensor(0.0),
"task": "move the green cube into the box",
}
processed = pre(batch)
# Image should have batch dim now
assert processed["observation.image"].dim() == 4 and processed["observation.image"].shape[0] == 1
# Language copy and tokenization should exist
assert OBS_LANGUAGE in processed and isinstance(processed[OBS_LANGUAGE], list)
assert f"{OBS_LANGUAGE}.tokens" in processed
assert f"{OBS_LANGUAGE}.attention_mask" in processed
@require_package("transformers")
@require_package("sentence_transformers")
def test_rlearn_pipeline_end_to_end_forward():
"""End-to-end: preprocessor + model forward using RLearN pipeline on synthetic data."""
cfg = RLearNConfig(
vision_model_name="facebook/dinov3-vitb16-pretrain-lvd1689m",
text_model_name="sentence-transformers/all-MiniLM-L12-v2",
device="cpu",
push_to_hub=False,
freeze_backbones=True,
use_video_rewind=True,
)
cfg.input_features = {
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
}
cfg.output_features = {
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
}
# Build processors and model
pre, post = make_processor(cfg, dataset_stats=None)
policy = RLearNPolicy(cfg)
B, T, C, H, W = 2, 3, 3, 256, 256
y = torch.linspace(0, 1, T).unsqueeze(0).repeat(B, 1)
raw = {
# Provide as observation.image to let preprocessor map/normalize and batch
"observation.image": torch.rand(B, C, H, W), # not time-major to test ToBatch
REWARD: y[:, :1].clone(), # single step label; pipeline keeps structure
"task": ["insert the peg", "insert the peg"],
}
processed = pre(raw)
# Integrate preprocessor output with model forward
loss, logs = policy.forward(
{
OBS_IMAGES: processed.get(OBS_IMAGES, processed.get("observation.image"))
.unsqueeze(1)
.repeat(1, T, 1, 1, 1),
REWARD: y.clone(),
OBS_LANGUAGE: processed[OBS_LANGUAGE],
}
)
assert isinstance(loss, torch.Tensor) and torch.isfinite(loss)
@@ -0,0 +1,59 @@
#!/usr/bin/env python
import torch
from lerobot.policies.rlearn.configuration_rlearn import RLearNConfig
from lerobot.policies.rlearn.evaluation import RLearnEvaluator
from lerobot.policies.rlearn.modeling_rlearn import RLearNPolicy
def test_temporal_evaluation():
"""Test that evaluation creates proper temporal sequences with past frames."""
# Create a simple config
config = RLearNConfig(
max_seq_len=4, # Small for testing
dim_model=64, # Small for testing
n_heads=2,
n_layers=2,
)
# Create model (will be randomly initialized)
model = RLearNPolicy(config)
model.eval()
# Create evaluator
evaluator = RLearnEvaluator(model, device="cpu")
# Create test episode: 8 frames of 3x64x64 images
T, C, H, W = 8, 3, 64, 64
frames = torch.randn(T, C, H, W)
language = "test instruction"
print(f"Input episode shape: {frames.shape}")
print(f"Model expects sequences of length: {config.max_seq_len}")
# Test the evaluation
rewards = evaluator.predict_episode_rewards(frames, language, batch_size=4)
print(f"Output rewards shape: {rewards.shape}")
print(f"Rewards: {rewards}")
# Verify we get one reward per frame
assert len(rewards) == T, f"Expected {T} rewards, got {len(rewards)}"
print("✅ Test passed! Evaluation correctly processes temporal sequences.")
# Test with very short episode (shorter than max_seq_len)
short_frames = torch.randn(2, C, H, W) # Only 2 frames
short_rewards = evaluator.predict_episode_rewards(short_frames, language)
print(f"\nShort episode shape: {short_frames.shape}")
print(f"Short rewards shape: {short_rewards.shape}")
assert len(short_rewards) == 2, f"Expected 2 rewards, got {len(short_rewards)}"
print("✅ Short episode test passed!")
if __name__ == "__main__":
test_temporal_evaluation()