Compare commits

...

56 Commits

Author SHA1 Message Date
Michel Aractingi b2d7eecdb4 format docs 2025-08-05 10:36:22 +02:00
Michel Aractingi 0710f3a0f1 Remove dataset and mode from HilSerlEnvConfig to a GymManipulatorConfig to reduce verbose of configs during training 2025-08-05 10:35:54 +02:00
Michel Aractingi 9effc5214f Refactored hilserl config 2025-08-05 01:24:46 +02:00
Michel Aractingi b292dbbc55 change folder structure to reduce the size of gym_manip 2025-08-04 16:41:42 +02:00
Michel Aractingi f49280e89b RL works at this commit - fixed actor.py and bugs in gym_manipulator 2025-08-03 23:21:13 +02:00
pre-commit-ci[bot] ff38a51df9 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-02 17:07:19 +00:00
Michel Aractingi cfa672129e Refactored actor.py to use the pipeline 2025-08-02 19:06:56 +02:00
Michel Aractingi e6e1edfd74 Added the replay functionality with the pipeline 2025-08-02 17:57:27 +02:00
Michel Aractingi 384101731e Added the capability to record a dataset 2025-08-02 17:14:14 +02:00
Michel Aractingi 1fdbecad3c Migrate gym_manipulator to use the pipeline
Added get_teleop_events function to capture relevant events from teleop devices unrelated to actions
2025-08-01 20:20:13 +02:00
Pepijn 2c4e888c7f Feat/pipeline add feature contract (#1637)
* Add feature contract to pipelinestep and pipeline

* Add tests

* Add processor tests

* PR feedback

* encorperate pr feedback

* type in doc

* oops
2025-08-01 08:41:54 +02:00
Adil Zouitine 5ced72e6b8 docs(pipeline): Add clarification for repo name sanitization process 2025-08-01 08:41:54 +02:00
Adil Zouitine 907023f9f7 refactor(pipeline): Improve state file naming conventions for clarity and uniqueness
- Enhanced state file naming to include the processor's sanitized name, ensuring uniqueness when multiple processors are saved in the same directory.
- Updated tests to reflect changes in state file naming, verifying that filenames now include the processor name and step indices to prevent conflicts.
- Added a new test to validate state file naming when using multiple processors, ensuring distinct filenames for each processor's state files.
2025-08-01 08:41:54 +02:00
Adil Zouitine 4ba23ea029 feat(pipeline): Enhance configuration filename handling and state file naming
- Introduced support for custom configuration filenames in the `save_pretrained` method, allowing users to specify a filename instead of the default.
- Improved state file naming to include step indices, preventing conflicts when multiple processors of the same type are saved.
- Added automatic detection for configuration files when loading from a directory, with error handling for multiple files.
- Updated tests to validate new features, including custom filenames and automatic config detection.
2025-08-01 08:41:54 +02:00
Adil Zouitine 409ac0baca chore(doc): address pip install commant lerobot that not exist yet 2025-08-01 08:41:54 +02:00
Adil Zouitine 699363f9fc refactor(pipeline): Enhance state filename generation and profiling method
- Updated state filename generation to use the registry name when available, improving clarity in saved files.
- Modified the profile_steps method to include a warmup_runs parameter, allowing for more controlled performance profiling.
- Ensured consistent conditions during profiling by deep copying transitions for each run, enhancing accuracy in timing results.
2025-08-01 08:41:54 +02:00
Adil Zouitine ae7a54de57 refactor(pipeline): Utilize get_safe_torch_device for device assignment
- Replaced direct torch.device instantiation with get_safe_torch_device to ensure safe device handling.
- This change enhances code readability and maintains consistency in device management across the RobotProcessor class.
2025-08-01 08:41:54 +02:00
Adil Zouitine fb9139b882 chore(pipeline): Move _CFG_NAME along other class member 2025-08-01 08:41:54 +02:00
Adil Zouitine 9fe3a3fb17 feat(pipeline): Add __repr__ method to RobotProcessor for improved readability
- Implemented a __repr__ method in the RobotProcessor class to provide a clear string representation of the processor, including step names and optional parameters like name and seed.
- Added comprehensive tests to validate the __repr__ output for various scenarios, including empty processors, single and multiple steps, custom names, and seed values.
- Ensured that the representation handles long lists of steps with truncation for better readability.
2025-08-01 08:41:54 +02:00
Adil Zouitine 26cb9a24c3 refactor(pipeline): Clarify hook behavior and improve documentation
- Updated the RobotProcessor class to ensure hooks are strictly for observation and do not modify transitions, enhancing clarity and maintainability.
- Refactored hook registration methods to reflect the new behavior, ensuring they accept only functions that do not return modified transitions.
- Enhanced documentation to clearly outline the purpose of hooks and their execution semantics.
- Added tests to verify that hooks are not executed during the step_through method while ensuring they function correctly during the __call__ method.
2025-08-01 08:41:54 +02:00
Adil Zouitine 77106697c3 feat(pipeline): Add hook unregistration functionality and enhance documentation
- Implemented methods to unregister before, after, and reset hooks in the RobotProcessor class, allowing for more flexible hook management.
- Enhanced documentation to clarify hook execution semantics and the implications of modifying transitions within hooks.
- Added comprehensive tests to verify the correct behavior of hook registration and unregistration, including error handling for non-existent hooks.
2025-08-01 08:41:54 +02:00
Adil Zouitine 75bc44c166 refactor(observation_processor): Improve observation processing by using constants and simplifying pixel handling
- Introduced constants for observation keys to enhance readability.
- Streamlined the handling of the "pixels" key by copying observations first and processing images more clearly.
- Updated the environment state and agent position assignments to use the new constants, improving maintainability.
2025-08-01 08:41:54 +02:00
Adil Zouitine f2b79656eb refactor(pipeline): Transition from tuple to dictionary format for EnvTransition
- Updated the EnvTransition structure to use a dictionary format instead of a tuple, enhancing readability and maintainability.
- Replaced instances of TransitionIndex with TransitionKey for accessing transition components.
- Adjusted related processing functions and tests to accommodate the new dictionary format, ensuring consistent handling of transitions across the codebase.
2025-08-01 08:41:53 +02:00
pre-commit-ci[bot] 14c2ece004 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-01 08:41:53 +02:00
Adil Zouitine 35612c61e1 refactor(pipeline): Introduce ComplementaryDataProcessor for handling complementary data in transitions 2025-08-01 08:41:53 +02:00
pre-commit-ci[bot] f7bb3e2d90 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-01 08:41:53 +02:00
Adil Zouitine 1e0d667a22 Apply suggestions from code review
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
Signed-off-by: Adil Zouitine <adilzouitinegm@gmail.com>
2025-08-01 08:41:53 +02:00
Adil Zouitine 33969a0337 refactor(pipeline): Simplify observation and padding data handling in batch transitions 2025-08-01 08:41:53 +02:00
Adil Zouitine fa26290e8c feat(pipeline): Enhance step_through method to support both tuple and dict inputs 2025-08-01 08:41:53 +02:00
Adil Zouitine e9f7f5127b chore(learner): nit comment from copilot 2025-08-01 08:41:53 +02:00
Adil Zouitine 097842c70f chore(normalization): addressing comments from copilot 2025-08-01 08:41:53 +02:00
Adil Zouitine 3b8a3a32a0 feat (overrides): Implement support for loading processors with parameter overrides
- Added the ability to provide non-serializable objects when loading processors from saved configurations using the `overrides` parameter.
- Enhanced error handling for invalid override keys and instantiation errors.
- Updated documentation and examples to illustrate the usage of overrides for both registered and unregistered steps.
- Added comprehensive tests to validate the new functionality and ensure backward compatibility.
2025-08-01 08:41:53 +02:00
Adil Zouitine 1c56779dd9 chore (type): add typing for multiprocess envs 2025-08-01 08:41:53 +02:00
Adil Zouitine 83a4338f8b chore (output format): improves output format 2025-08-01 08:41:53 +02:00
Adil Zouitine 730c7b2f35 fix(test): linting issue 2025-08-01 08:41:53 +02:00
pre-commit-ci[bot] 116059a43e [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-01 08:41:53 +02:00
Adil Zouitine b08149a113 chore (batch handling): Enhance processing components with batch conversion utilities 2025-08-01 08:41:53 +02:00
Adil Zouitine c227107f60 feat (device processor): Implement device processor 2025-08-01 08:41:53 +02:00
Adil Zouitine 01dc289f3d chore (docstrin):Improve docstring for NormalizerProcessor 2025-08-01 08:41:53 +02:00
Adil Zouitine 6830ca7645 Refactor normalization components and update tests
- Renamed `ObservationNormalizer` to `NormalizerProcessor` and `ActionUnnormalizer` to `UnnormalizerProcessor` for clarity.
- Consolidated normalization logic for both observations and actions into `NormalizerProcessor` and `UnnormalizerProcessor`.
- Updated tests to reflect the new class names and ensure proper functionality of normalization and unnormalization processes.
- Enhanced handling of missing statistics in normalization processes.
2025-08-01 08:41:52 +02:00
Adil Zouitine ed42c71fc3 fix(test): import issue 2025-08-01 08:41:52 +02:00
Adil Zouitine e0139065bd chore(test): add suggestion made by copilot regarding numpy test 2025-08-01 08:41:52 +02:00
Adil Zouitine e509f255af Update tests/processor/test_observation_processor.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Adil Zouitine <adilzouitinegm@gmail.com>
2025-08-01 08:41:52 +02:00
Adil Zouitine e2fcd140b0 fix(test): policies 2025-08-01 08:41:52 +02:00
Adil Zouitine 2a7a0e6129 fix (test): test factory 2025-08-01 08:41:52 +02:00
Adil Zouitine 9f33791b19 chore (docs): add docstring for processor 2025-08-01 08:41:52 +02:00
Adil Zouitine 453e0a995f Enhance processing architecture with new components
- Added `RenameProcessor` to facilitate key renaming in observations, improving data handling flexibility.
- Updated `__init__.py` to include `RenameProcessor` in module exports.
- Refactored `NormalizationProcessor` and `ObservationNormalizer` to use `rsplit` for better key handling.
- Introduced comprehensive tests for `NormalizationProcessor` and `RenameProcessor` to ensure functionality and robustness.
2025-08-01 08:41:52 +02:00
pre-commit-ci[bot] 8ebf79c494 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-01 08:41:52 +02:00
Adil Zouitine 8774aec304 Add normalization processor and related components
- Introduced `NormalizationProcessor` to handle both observation normalization and action unnormalization.
- Added `ObservationNormalizer` and `ActionUnnormalizer` classes for specific normalization tasks.
- Updated `__init__.py` to include the new `NormalizationProcessor` in the module exports.
- Enhanced `ObservationProcessor` with registration in the `ProcessorStepRegistry` for better modularity.
- Created `RenameProcessor` for renaming keys in observations, improving flexibility in data processing.
2025-08-01 08:41:52 +02:00
pre-commit-ci[bot] ac742c9f0d [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-01 08:41:52 +02:00
Adil Zouitine cd13f1ecfd Add RobotProcessor tutorial to documentation
- Introduced a new tutorial on using RobotProcessor for preprocessing robot data.
- Added a section in the table of contents for easy navigation to the new tutorial.
- The tutorial covers key concepts, real-world scenarios, and practical examples for effective use of the RobotProcessor pipeline.
2025-08-01 08:41:52 +02:00
Adil Zouitine 9aa632968f Refactor processing architecture to use RobotProcessor
- Replaced instances of RobotPipeline with RobotProcessor across the codebase for improved modularity and clarity.
- Introduced ProcessorStepRegistry for better management of processing steps.
- Updated relevant documentation and tests to reflect the new processing structure.
- Enhanced the save/load functionality to support the new processor design.
- Added a model card template for RobotProcessor to facilitate sharing and documentation.
2025-08-01 08:41:52 +02:00
Adil Zouitine 62caaf07b0 Remove redundant tests for None observation and serialization methods in test_observation_processor.py to streamline the test suite and improve maintainability. 2025-08-01 08:41:52 +02:00
Adil Zouitine 3355f04ca6 Refactor observation processing and improve modularity
- Updated `ObservationProcessor` to enhance the modular design for processing observations.
- Cleaned up imports and improved code readability by removing unnecessary lines and comments.
- Ensured backward compatibility while integrating new processing components.
- Added tests to validate the functionality of the updated processing architecture.
2025-08-01 08:41:52 +02:00
pre-commit-ci[bot] 769f531603 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-01 08:41:51 +02:00
Adil Zouitine f6c7287ae7 Refactor observation preprocessing to use a modular pipeline system
- Introduced `RobotPipeline` and `ObservationProcessor` for handling observation transformations.
- Updated `preprocess_observation` to maintain backward compatibility while leveraging the new pipeline.
- Added tests for the new processing components and ensured they match the original functionality.
- Removed hardcoded logic in favor of a more flexible, composable architecture.
2025-08-01 08:41:51 +02:00
28 changed files with 9594 additions and 2171 deletions
+2
View File
@@ -13,6 +13,8 @@
title: Cameras
- local: integrate_hardware
title: Bring Your Own Hardware
- local: processor_tutorial
title: RobotProcessor Pipeline
- local: hilserl
title: Train a Robot with RL
- local: hilserl_sim
+125 -55
View File
@@ -56,27 +56,41 @@ pip install -e ".[hilserl]"
### Understanding Configuration
The training process begins with proper configuration for the HILSerl environment. The configuration class of interest is `HILSerlRobotEnvConfig` in `lerobot/envs/configs.py`. Which is defined as:
The training process begins with proper configuration for the HILSerl environment. The main configuration class is `GymManipulatorConfig` in `lerobot/scripts/rl/gym_manipulator.py`, which contains nested `HILSerlRobotEnvConfig` and `DatasetConfig`. The configuration is organized into focused, nested sub-configs:
<!-- prettier-ignore-start -->
```python
class GymManipulatorConfig:
env: HILSerlRobotEnvConfig # Environment configuration (nested)
dataset: DatasetConfig # Dataset recording/replay configuration (nested)
mode: str | None = None # "record", "replay", or None (for training)
class HILSerlRobotEnvConfig(EnvConfig):
robot: RobotConfig | None = None # Main robot agent (defined in `lerobot/robots`)
teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm, (defined in `lerobot/teleoperators`)
wrapper: EnvTransformConfig | None = None # Environment wrapper settings; check `lerobot/scripts/server/gym_manipulator.py`
fps: int = 10 # Control frequency
teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm
processor: HILSerlProcessorConfig # Processing pipeline configuration (nested)
name: str = "real_robot" # Environment name
mode: str = None # "record", "replay", or None (for training)
repo_id: str | None = None # LeRobot dataset repository ID
dataset_root: str | None = None # Local dataset root (optional)
task: str = "" # Task identifier
num_episodes: int = 10 # Number of episodes for recording
episode: int = 0 # episode index for replay
device: str = "cuda" # Compute device
push_to_hub: bool = True # Whether to push the recorded datasets to Hub
pretrained_policy_name_or_path: str | None = None # For policy loading
reward_classifier_pretrained_path: str | None = None # For reward model
number_of_steps_after_success: int = 0 # For reward classifier, collect more positive examples after a success to train a classifier
fps: int = 30 # Control frequency
# Nested processor configuration
class HILSerlProcessorConfig:
control_mode: str = "gamepad" # Control mode
observation: ObservationConfig # Observation processing settings
image_preprocessing: ImagePreprocessingConfig # Image crop/resize settings
gripper: GripperConfig # Gripper control and penalty settings
reset: ResetConfig # Environment reset and timing settings
inverse_kinematics: InverseKinematicsConfig # IK processing settings
reward_classifier: RewardClassifierConfig # Reward classifier settings
# Dataset configuration
class DatasetConfig:
repo_id: str # LeRobot dataset repository ID
dataset_root: str | None = None # Local dataset root (optional)
task: str # Task identifier
num_episodes: int # Number of episodes for recording
episode: int # Episode index for replay
push_to_hub: bool # Whether to push datasets to Hub
```
<!-- prettier-ignore-end -->
@@ -130,22 +144,31 @@ With the bounds defined, you can safely collect demonstrations for training. Tra
Create a configuration file for recording demonstrations (or edit an existing one like [env_config_so100.json](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/env_config_so100.json)):
1. Set `mode` to `"record"`
2. Specify a unique `repo_id` for your dataset (e.g., "username/task_name")
3. Set `num_episodes` to the number of demonstrations you want to collect
4. Set `crop_params_dict` to `null` initially (we'll determine crops later)
5. Configure `robot`, `cameras`, and other hardware settings
1. Set `mode` to `"record"` at the root level
2. Specify a unique `repo_id` for your dataset in the `dataset` section (e.g., "username/task_name")
3. Set `num_episodes` in the `dataset` section to the number of demonstrations you want to collect
4. Set `env.processor.image_preprocessing.crop_params_dict` to `{}` initially (we'll determine crops later)
5. Configure `env.robot`, `env.teleop`, and other hardware settings in the `env` section
Example configuration section:
```json
"mode": "record",
"repo_id": "username/pick_lift_cube",
"dataset_root": null,
"task": "pick_and_lift",
"num_episodes": 15,
"episode": 0,
"push_to_hub": true
{
"env": {
"type": "gym_manipulator",
"fps": 10
// ... robot, teleop, processor configs ...
},
"dataset": {
"repo_id": "username/pick_lift_cube",
"dataset_root": null,
"task": "pick_and_lift",
"num_episodes": 15,
"episode": 0,
"push_to_hub": true
},
"mode": "record"
}
```
### Using a Teleoperation Device
@@ -191,10 +214,17 @@ The gamepad provides a very convenient way to control the robot and the episode
To setup the gamepad, you need to set the `control_mode` to `"gamepad"` and define the `teleop` section in the configuration file.
```json
{
"env": {
"teleop": {
"type": "gamepad",
"use_gripper": true
"type": "gamepad",
"use_gripper": true
},
"processor": {
"control_mode": "gamepad"
}
}
}
```
<p align="center">
@@ -216,11 +246,18 @@ The SO101 leader arm has reduced gears that allows it to move and track the foll
To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file.
```json
{
"env": {
"teleop": {
"type": "so101_leader",
"port": "/dev/tty.usbmodem585A0077921", # check your port number
"use_degrees": true
"type": "so101_leader",
"port": "/dev/tty.usbmodem585A0077921",
"use_degrees": true
},
"processor": {
"control_mode": "leader"
}
}
}
```
In order to annotate the success/failure of the episode, **you will need** to use a keyboard to press `s` for success, `esc` for failure.
@@ -251,7 +288,7 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/e
During recording:
1. The robot will reset to the initial position defined in the configuration file `fixed_reset_joint_positions`
1. The robot will reset to the initial position defined in the configuration file `env.processor.reset.fixed_reset_joint_positions`
2. Complete the task successfully
3. The episode ends with a reward of 1 when you press the "success" button
4. If the time limit is reached, or the fail button is pressed, the episode ends with a reward of 0
@@ -310,11 +347,19 @@ observation.images.front: [180, 250, 120, 150]
Add these crop parameters to your training configuration:
```json
"crop_params_dict": {
"observation.images.side": [180, 207, 180, 200],
"observation.images.front": [180, 250, 120, 150]
},
"resize_size": [128, 128]
{
"env": {
"processor": {
"image_preprocessing": {
"crop_params_dict": {
"observation.images.side": [180, 207, 180, 200],
"observation.images.front": [180, 250, 120, 150]
},
"resize_size": [128, 128]
}
}
}
}
```
**Recommended image resolution**
@@ -343,26 +388,35 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/r
**Key Parameters for Data Collection**
- **mode**: set it to `"record"` to collect a dataset
- **repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub
- **num_episodes**: Number of episodes to record
- **number_of_steps_after_success**: Number of additional frames to record after a success (reward=1) is detected
- **fps**: Number of frames per second to record
- **push_to_hub**: Whether to push the dataset to the hub
- **mode**: set it to `"record"` to collect a dataset (at root level)
- **dataset.repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub
- **dataset.num_episodes**: Number of episodes to record
- **env.processor.reset.number_of_steps_after_success**: Number of additional frames to record after a success (reward=1) is detected
- **env.fps**: Number of frames per second to record
- **dataset.push_to_hub**: Whether to push the dataset to the hub
The `number_of_steps_after_success` parameter is crucial as it allows you to collect more positive examples. When a success is detected, the system will continue recording for the specified number of steps while maintaining the reward=1 label. Otherwise, there won't be enough states in the dataset labeled to 1 to train a good classifier.
The `env.processor.reset.number_of_steps_after_success` parameter is crucial as it allows you to collect more positive examples. When a success is detected, the system will continue recording for the specified number of steps while maintaining the reward=1 label. Otherwise, there won't be enough states in the dataset labeled to 1 to train a good classifier.
Example configuration section for data collection:
```json
{
"mode": "record",
"repo_id": "hf_username/dataset_name",
"dataset_root": "data/your_dataset",
"num_episodes": 20,
"push_to_hub": true,
"fps": 10,
"number_of_steps_after_success": 15
"env": {
"type": "gym_manipulator",
"fps": 10,
"processor": {
"reset": {
"number_of_steps_after_success": 15
}
}
},
"dataset": {
"repo_id": "hf_username/dataset_name",
"dataset_root": "data/your_dataset",
"num_episodes": 20,
"push_to_hub": true
},
"mode": "record"
}
```
@@ -421,9 +475,17 @@ To use your trained reward classifier, configure the `HILSerlRobotEnvConfig` to
<!-- prettier-ignore-start -->
```python
env_config = HILSerlRobotEnvConfig(
reward_classifier_pretrained_path="path_to_your_pretrained_trained_model",
# Other environment parameters
config = GymManipulatorConfig(
env=HILSerlRobotEnvConfig(
processor=HILSerlProcessorConfig(
reward_classifier=RewardClassifierConfig(
pretrained_path="path_to_your_pretrained_trained_model"
)
),
# Other environment parameters
),
dataset=DatasetConfig(...),
mode=None # For training
)
```
<!-- prettier-ignore-end -->
@@ -432,7 +494,15 @@ or set the argument in the json config file.
```json
{
"reward_classifier_pretrained_path": "path_to_your_pretrained_model"
"env": {
"processor": {
"reward_classifier": {
"pretrained_path": "path_to_your_pretrained_model",
"success_threshold": 0.7,
"success_reward": 1.0
}
}
}
}
```
File diff suppressed because it is too large Load Diff
+58 -85
View File
@@ -161,35 +161,74 @@ class XarmEnv(EnvConfig):
@dataclass
class VideoRecordConfig:
"""Configuration for video recording in ManiSkill environments."""
enabled: bool = False
record_dir: str = "videos"
trajectory_name: str = "trajectory"
class ImagePreprocessingConfig:
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None
resize_size: tuple[int, int] | None = None
@dataclass
class EnvTransformConfig:
"""Configuration for environment wrappers."""
class RewardClassifierConfig:
"""Configuration for reward classification."""
pretrained_path: str | None = None
success_threshold: float = 0.5
success_reward: float = 1.0
@dataclass
class InverseKinematicsConfig:
"""Configuration for inverse kinematics processing."""
urdf_path: str | None = None
target_frame_name: str | None = None
end_effector_bounds: dict[str, list[float]] | None = None
end_effector_step_sizes: dict[str, float] | None = None
max_gripper_pos: float | None = None
@dataclass
class ObservationConfig:
"""Configuration for observation processing."""
# ee_action_space_params: EEActionSpaceConfig = field(default_factory=EEActionSpaceConfig)
control_mode: str = "gamepad"
display_cameras: bool = False
add_joint_velocity_to_observation: bool = False
add_current_to_observation: bool = False
add_ee_pose_to_observation: bool = False
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None
resize_size: tuple[int, int] | None = None
control_time_s: float = 20.0
fixed_reset_joint_positions: Any | None = None
reset_time_s: float = 5.0
display_cameras: bool = False
@dataclass
class GripperConfig:
"""Configuration for gripper control and penalties."""
use_gripper: bool = True
gripper_quantization_threshold: float | None = 0.8
gripper_penalty: float = 0.0
gripper_penalty_in_reward: bool = False
@dataclass
class ResetConfig:
"""Configuration for environment reset behavior."""
fixed_reset_joint_positions: Any | None = None
reset_time_s: float = 5.0
control_time_s: float = 20.0
terminate_on_success: bool = True
number_of_steps_after_success: int = 0
@dataclass
class HILSerlProcessorConfig:
"""Configuration for environment processing pipeline."""
control_mode: str = "gamepad"
observation: ObservationConfig = field(default_factory=ObservationConfig)
image_preprocessing: ImagePreprocessingConfig = field(default_factory=ImagePreprocessingConfig)
gripper: GripperConfig = field(default_factory=GripperConfig)
reset: ResetConfig = field(default_factory=ResetConfig)
inverse_kinematics: InverseKinematicsConfig = field(default_factory=InverseKinematicsConfig)
reward_classifier: RewardClassifierConfig = field(default_factory=RewardClassifierConfig)
@EnvConfig.register_subclass(name="gym_manipulator")
@dataclass
class HILSerlRobotEnvConfig(EnvConfig):
@@ -197,77 +236,11 @@ class HILSerlRobotEnvConfig(EnvConfig):
robot: RobotConfig | None = None
teleop: TeleoperatorConfig | None = None
wrapper: EnvTransformConfig | None = None
fps: int = 10
processor: HILSerlProcessorConfig = field(default_factory=HILSerlProcessorConfig)
name: str = "real_robot"
mode: str | None = None # Either "record", "replay", None
repo_id: str | None = None
dataset_root: str | None = None
task: str | None = ""
num_episodes: int = 10 # only for record mode
episode: int = 0
device: str = "cuda"
push_to_hub: bool = True
pretrained_policy_name_or_path: str | None = None
reward_classifier_pretrained_path: str | None = None
# For the reward classifier, to record more positive examples after a success
number_of_steps_after_success: int = 0
@property
def gym_kwargs(self) -> dict:
return {}
@EnvConfig.register_subclass("hil")
@dataclass
class HILEnvConfig(EnvConfig):
"""Configuration for the HIL environment."""
name: str = "PandaPickCube"
task: str | None = "PandaPickCubeKeyboard-v0"
use_viewer: bool = True
gripper_penalty: float = 0.0
use_gamepad: bool = True
state_dim: int = 18
action_dim: int = 4
fps: int = 100
episode_length: int = 100
video_record: VideoRecordConfig = field(default_factory=VideoRecordConfig)
features: dict[str, PolicyFeature] = field(
default_factory=lambda: {
"action": PolicyFeature(type=FeatureType.ACTION, shape=(4,)),
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 128, 128)),
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(18,)),
}
)
features_map: dict[str, str] = field(
default_factory=lambda: {
"action": ACTION,
"observation.image": OBS_IMAGE,
"observation.state": OBS_STATE,
}
)
################# args from hilserlrobotenv
reward_classifier_pretrained_path: str | None = None
robot_config: RobotConfig | None = None
teleop_config: TeleoperatorConfig | None = None
wrapper: EnvTransformConfig | None = None
mode: str | None = None # Either "record", "replay", None
repo_id: str | None = None
dataset_root: str | None = None
num_episodes: int = 10 # only for record mode
episode: int = 0
device: str = "cuda"
push_to_hub: bool = True
pretrained_policy_name_or_path: str | None = None
# For the reward classifier, to record more positive examples after a success
number_of_steps_after_success: int = 0
############################
@property
def gym_kwargs(self) -> dict:
return {
"use_viewer": self.use_viewer,
"use_gamepad": self.use_gamepad,
"gripper_penalty": self.gripper_penalty,
}
+21 -45
View File
@@ -16,10 +16,8 @@
import warnings
from typing import Any
import einops
import gymnasium as gym
import numpy as np
import torch
from torch import Tensor
from lerobot.configs.types import FeatureType, PolicyFeature
@@ -28,62 +26,40 @@ from lerobot.utils.utils import get_channel_first_image_shape
def preprocess_observation(observations: dict[str, np.ndarray]) -> dict[str, Tensor]:
# TODO(aliberts, rcadene): refactor this to use features from the environment (no hardcoding)
"""Convert environment observation to LeRobot format observation.
This function uses the new pipeline system internally but maintains
backward compatibility with the original interface.
Args:
observation: Dictionary of observation batches from a Gym vector environment.
Returns:
Dictionary of observation batches with keys renamed to LeRobot format and values as tensors.
"""
# map to expected inputs for the policy
return_observations = {}
if "pixels" in observations:
if isinstance(observations["pixels"], dict):
imgs = {f"observation.images.{key}": img for key, img in observations["pixels"].items()}
else:
imgs = {"observation.image": observations["pixels"]}
from lerobot.processor import RobotProcessor, TransitionKey, VanillaObservationProcessor
for imgkey, img in imgs.items():
# TODO(aliberts, rcadene): use transforms.ToTensor()?
img = torch.from_numpy(img)
# Create processor with observation processor
processor = RobotProcessor([VanillaObservationProcessor()])
# When preprocessing observations in a non-vectorized environment, we need to add a batch dimension.
# This is the case for human-in-the-loop RL where there is only one environment.
if img.ndim == 3:
img = img.unsqueeze(0)
# sanity check that images are channel last
_, h, w, c = img.shape
assert c < h and c < w, f"expect channel last images, but instead got {img.shape=}"
# Create transition dictionary and process
transition = {
TransitionKey.OBSERVATION: observations,
TransitionKey.ACTION: None,
TransitionKey.REWARD: None,
TransitionKey.DONE: None,
TransitionKey.TRUNCATED: None,
TransitionKey.INFO: None,
TransitionKey.COMPLEMENTARY_DATA: None,
}
result = processor(transition)
# sanity check that images are uint8
assert img.dtype == torch.uint8, f"expect torch.uint8, but instead {img.dtype=}"
# convert to channel first of type float32 in range [0,1]
img = einops.rearrange(img, "b h w c -> b c h w").contiguous()
img = img.type(torch.float32)
img /= 255
return_observations[imgkey] = img
if "environment_state" in observations:
env_state = torch.from_numpy(observations["environment_state"]).float()
if env_state.dim() == 1:
env_state = env_state.unsqueeze(0)
return_observations["observation.environment_state"] = env_state
# TODO(rcadene): enable pixels only baseline with `obs_type="pixels"` in environment by removing
agent_pos = torch.from_numpy(observations["agent_pos"]).float()
if agent_pos.dim() == 1:
agent_pos = agent_pos.unsqueeze(0)
return_observations["observation.state"] = agent_pos
return return_observations
# Extract and return the processed observation
return result[TransitionKey.OBSERVATION]
def env_to_policy_features(env_cfg: EnvConfig) -> dict[str, PolicyFeature]:
# TODO(aliberts, rcadene): remove this hardcoding of keys and just use the nested keys as is
# (need to also refactor preprocess_observation and externalize normalization from policies)
# (need to externalize normalization from policies)
policy_features = {}
for key, ft in env_cfg.features.items():
if ft.type is FeatureType.VISUAL:
+78
View File
@@ -0,0 +1,78 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .device_processor import DeviceProcessor
from .hil_processor import (
GripperPenaltyProcessor,
ImageCropResizeProcessor,
InterventionActionProcessor,
TimeLimitProcessor,
)
from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor
from .observation_processor import (
ImageProcessor,
StateProcessor,
VanillaObservationProcessor,
)
from .pipeline import (
ActionProcessor,
DoneProcessor,
EnvTransition,
IdentityProcessor,
InfoProcessor,
ObservationProcessor,
ProcessorStep,
ProcessorStepRegistry,
RewardProcessor,
RobotProcessor,
TransitionKey,
TruncatedProcessor,
)
from .rename_processor import RenameProcessor
from .robot_processor import (
InverseKinematicsProcessor,
JointVelocityProcessor,
MotorCurrentProcessor,
)
__all__ = [
"ActionProcessor",
"DeviceProcessor",
"DoneProcessor",
"EnvTransition",
"GripperPenaltyProcessor",
"IdentityProcessor",
"ImageCropResizeProcessor",
"ImageProcessor",
"InfoProcessor",
"InterventionActionProcessor",
"InverseKinematicsProcessor",
"JointVelocityProcessor",
"MotorCurrentProcessor",
"NormalizerProcessor",
"UnnormalizerProcessor",
"ObservationProcessor",
"ProcessorStep",
"ProcessorStepRegistry",
"RenameProcessor",
"RewardProcessor",
"RobotProcessor",
"StateProcessor",
"TimeLimitProcessor",
"TransitionKey",
"TruncatedProcessor",
"VanillaObservationProcessor",
]
+80
View File
@@ -0,0 +1,80 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from typing import Any
import torch
from lerobot.configs.types import PolicyFeature
from lerobot.processor.pipeline import EnvTransition, TransitionKey
@dataclass
class DeviceProcessor:
"""Processes transitions by moving tensors to the specified device.
This processor ensures that all tensors in the transition are moved to the
specified device (CPU or GPU) before they are returned.
"""
device: str = "cpu"
def __post_init__(self):
self.non_blocking = "cuda" in self.device
def __call__(self, transition: EnvTransition) -> EnvTransition:
# Create a copy of the transition
new_transition = transition.copy()
# Process observation tensors
observation = transition.get(TransitionKey.OBSERVATION)
if observation is not None:
new_observation = {
k: v.to(self.device, non_blocking=self.non_blocking) if isinstance(v, torch.Tensor) else v
for k, v in observation.items()
}
new_transition[TransitionKey.OBSERVATION] = new_observation
# Process action tensor
action = transition.get(TransitionKey.ACTION)
if action is not None and isinstance(action, torch.Tensor):
new_transition[TransitionKey.ACTION] = action.to(self.device, non_blocking=self.non_blocking)
# Process reward tensor
reward = transition.get(TransitionKey.REWARD)
if reward is not None and isinstance(reward, torch.Tensor):
new_transition[TransitionKey.REWARD] = reward.to(self.device, non_blocking=self.non_blocking)
# Process done tensor
done = transition.get(TransitionKey.DONE)
if done is not None and isinstance(done, torch.Tensor):
new_transition[TransitionKey.DONE] = done.to(self.device, non_blocking=self.non_blocking)
# Process truncated tensor
truncated = transition.get(TransitionKey.TRUNCATED)
if truncated is not None and isinstance(truncated, torch.Tensor):
new_transition[TransitionKey.TRUNCATED] = truncated.to(
self.device, non_blocking=self.non_blocking
)
return new_transition
def get_config(self) -> dict[str, Any]:
"""Return configuration for serialization."""
return {"device": self.device}
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
+331
View File
@@ -0,0 +1,331 @@
import time
from dataclasses import dataclass
from typing import Any
import torch
import torchvision.transforms.functional as F # noqa: N812
from lerobot.configs.types import PolicyFeature
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
@dataclass
@ProcessorStepRegistry.register("image_crop_resize_processor")
class ImageCropResizeProcessor:
"""Crop and resize image observations."""
crop_params_dict: dict[str, tuple[int, int, int, int]]
resize_size: tuple[int, int] = (128, 128)
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
if self.resize_size is None and not self.crop_params_dict:
return transition
new_observation = dict(observation)
# Process all image keys in the observation
for key in observation:
if "image" not in key:
continue
image = observation[key]
device = image.device
if device.type == "mps":
image = image.cpu()
# Crop if crop params are provided for this key
if key in self.crop_params_dict:
crop_params = self.crop_params_dict[key]
image = F.crop(image, *crop_params)
# Always resize
image = F.resize(image, self.resize_size)
image = image.clamp(0.0, 1.0)
new_observation[key] = image.to(device)
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = new_observation
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"crop_params_dict": self.crop_params_dict,
"resize_size": self.resize_size,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("time_limit_processor")
class TimeLimitProcessor:
"""Track episode time and enforce time limits."""
max_episode_steps: int
current_step: int = 0
def __call__(self, transition: EnvTransition) -> EnvTransition:
truncated = transition.get(TransitionKey.TRUNCATED)
if truncated is None:
return transition
self.current_step += 1
if self.current_step >= self.max_episode_steps:
truncated = True
new_transition = transition.copy()
new_transition[TransitionKey.TRUNCATED] = truncated
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"max_episode_steps": self.max_episode_steps,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
self.current_step = 0
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("gripper_penalty_processor")
class GripperPenaltyProcessor:
penalty: float = -0.01
max_gripper_pos: float = 30.0
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Calculate gripper penalty and add to complementary data."""
action = transition.get(TransitionKey.ACTION)
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA)
if complementary_data is None or action is None:
return transition
current_gripper_pos = complementary_data.get("raw_joint_positions", None)[-1]
if current_gripper_pos is None:
return transition
gripper_action = action[-1].item()
gripper_action_normalized = gripper_action / self.max_gripper_pos
# Normalize gripper state and action
gripper_state_normalized = current_gripper_pos / self.max_gripper_pos
gripper_action_normalized = gripper_action - 1.0
# Calculate penalty boolean as in original
gripper_penalty_bool = (gripper_state_normalized < 0.5 and gripper_action_normalized > 0.5) or (
gripper_state_normalized > 0.75 and gripper_action_normalized < 0.5
)
gripper_penalty = self.penalty * int(gripper_penalty_bool)
# Add penalty information to complementary data
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
# Create new complementary data with penalty info
new_complementary_data = dict(complementary_data)
new_complementary_data["discrete_penalty"] = gripper_penalty
# Create new transition with updated complementary data
new_transition = transition.copy()
new_transition[TransitionKey.COMPLEMENTARY_DATA] = new_complementary_data
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"penalty": self.penalty,
"max_gripper_pos": self.max_gripper_pos,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
"""Reset the processor state."""
self.last_gripper_state = None
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("intervention_action_processor")
class InterventionActionProcessor:
"""Handle action intervention based on signals in the transition.
This processor checks for intervention signals in the transition's complementary data
and overrides agent actions when intervention is active.
"""
use_gripper: bool = False
def __call__(self, transition: EnvTransition) -> EnvTransition:
action = transition.get(TransitionKey.ACTION)
if action is None:
return transition
# Get intervention signals from complementary data
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
teleop_action = complementary_data.get("teleop_action", {})
is_intervention = complementary_data.get("is_intervention", False)
terminate_episode = complementary_data.get("terminate_episode", False)
success = complementary_data.get("success", False)
rerecord_episode = complementary_data.get("rerecord_episode", False)
new_transition = transition.copy()
# Override action if intervention is active
if is_intervention and teleop_action:
# Convert teleop_action dict to tensor format
action_list = [
teleop_action.get("delta_x", 0.0),
teleop_action.get("delta_y", 0.0),
teleop_action.get("delta_z", 0.0),
]
if self.use_gripper:
action_list.append(teleop_action.get("gripper", 1.0))
teleop_action_tensor = torch.tensor(action_list, dtype=action.dtype, device=action.device)
new_transition[TransitionKey.ACTION] = teleop_action_tensor
# Handle episode termination
new_transition[TransitionKey.DONE] = bool(terminate_episode)
new_transition[TransitionKey.REWARD] = float(success)
# Update info with intervention metadata
info = new_transition.get(TransitionKey.INFO, {})
info["is_intervention"] = is_intervention
info["rerecord_episode"] = rerecord_episode
info["next.success"] = success if terminate_episode else info.get("next.success", False)
new_transition[TransitionKey.INFO] = info
new_transition[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] = new_transition[
TransitionKey.ACTION
]
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"use_gripper": self.use_gripper,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("reward_classifier_processor")
class RewardClassifierProcessor:
"""Apply reward classification to image observations.
This processor runs a trained reward classifier on image observations
to predict rewards and success states, potentially terminating episodes
when success is achieved.
"""
pretrained_path: str = None
device: str = "cpu"
success_threshold: float = 0.5
success_reward: float = 1.0
terminate_on_success: bool = True
reward_classifier: Any = None
def __post_init__(self):
"""Initialize the reward classifier after dataclass initialization."""
if self.pretrained_path is not None:
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
self.reward_classifier = Classifier.from_pretrained(self.pretrained_path)
self.reward_classifier.to(self.device)
self.reward_classifier.eval()
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None or self.reward_classifier is None:
return transition
# Extract images from observation
images = {key: value for key, value in observation.items() if "image" in key}
if not images:
return transition
# Run reward classifier
start_time = time.perf_counter()
with torch.inference_mode():
success = self.reward_classifier.predict_reward(images, threshold=self.success_threshold)
classifier_frequency = 1 / (time.perf_counter() - start_time)
# Calculate reward and termination
reward = transition.get(TransitionKey.REWARD, 0.0)
terminated = transition.get(TransitionKey.DONE, False)
if success == 1.0:
reward = self.success_reward
if self.terminate_on_success:
terminated = True
# Update transition
new_transition = transition.copy()
new_transition[TransitionKey.REWARD] = reward
new_transition[TransitionKey.DONE] = terminated
# Update info with classifier frequency
info = new_transition.get(TransitionKey.INFO, {})
info["reward_classifier_frequency"] = classifier_frequency
new_transition[TransitionKey.INFO] = info
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"device": self.device,
"success_threshold": self.success_threshold,
"success_reward": self.success_reward,
"terminate_on_success": self.terminate_on_success,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@@ -0,0 +1,335 @@
from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass, field
from typing import Any
import numpy as np
import torch
from torch import Tensor
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
def _convert_stats_to_tensors(stats: dict[str, dict[str, Any]]) -> dict[str, dict[str, Tensor]]:
"""Convert numpy arrays and other types to torch tensors."""
tensor_stats: dict[str, dict[str, Tensor]] = {}
for key, sub in stats.items():
tensor_stats[key] = {}
for stat_name, value in sub.items():
if isinstance(value, np.ndarray):
tensor_val = torch.from_numpy(value.astype(np.float32))
elif isinstance(value, torch.Tensor):
tensor_val = value.to(dtype=torch.float32)
elif isinstance(value, (int, float, list, tuple)):
tensor_val = torch.tensor(value, dtype=torch.float32)
else:
raise TypeError(f"Unsupported type for stats['{key}']['{stat_name}']: {type(value)}")
tensor_stats[key][stat_name] = tensor_val
return tensor_stats
@dataclass
@ProcessorStepRegistry.register(name="normalizer_processor")
class NormalizerProcessor:
"""Normalizes observations and actions in a single processor step.
This processor handles normalization of both observation and action tensors
using either mean/std normalization or min/max scaling to a [-1, 1] range.
For each tensor key in the stats dictionary, the processor will:
- Use mean/std normalization if those statistics are provided: (x - mean) / std
- Use min/max scaling if those statistics are provided: 2 * (x - min) / (max - min) - 1
The processor can be configured to normalize only specific keys by setting
the normalize_keys parameter.
"""
# Features and normalisation map are mandatory to match the design of normalize.py
features: dict[str, PolicyFeature]
norm_map: dict[FeatureType, NormalizationMode]
# Pre-computed statistics coming from dataset.meta.stats for instance.
stats: dict[str, dict[str, Any]] | None = None
# Explicit subset of keys to normalise. If ``None`` every key (except
# "action") found in ``stats`` will be normalised. Using a ``set`` makes
# membership checks O(1).
normalize_keys: set[str] | None = None
eps: float = 1e-8
_tensor_stats: dict[str, dict[str, Tensor]] = field(default_factory=dict, init=False, repr=False)
@classmethod
def from_lerobot_dataset(
cls,
dataset: LeRobotDataset,
features: dict[str, PolicyFeature],
norm_map: dict[FeatureType, NormalizationMode],
*,
normalize_keys: set[str] | None = None,
eps: float = 1e-8,
) -> NormalizerProcessor:
"""Factory helper that pulls statistics from a :class:`LeRobotDataset`.
The features and norm_map parameters are mandatory to match the design
pattern used in normalize.py.
"""
return cls(
features=features,
norm_map=norm_map,
stats=dataset.meta.stats,
normalize_keys=normalize_keys,
eps=eps,
)
def __post_init__(self):
# Handle deserialization from JSON config
if self.features and isinstance(list(self.features.values())[0], dict):
# Features came from JSON - need to reconstruct PolicyFeature objects
reconstructed_features = {}
for key, ft_dict in self.features.items():
reconstructed_features[key] = PolicyFeature(
type=FeatureType(ft_dict["type"]), shape=tuple(ft_dict["shape"])
)
self.features = reconstructed_features
if self.norm_map and isinstance(list(self.norm_map.keys())[0], str):
# norm_map came from JSON - need to reconstruct enum keys and values
reconstructed_norm_map = {}
for ft_type_str, norm_mode_str in self.norm_map.items():
reconstructed_norm_map[FeatureType(ft_type_str)] = NormalizationMode(norm_mode_str)
self.norm_map = reconstructed_norm_map
# Convert statistics once so we avoid repeated numpy→Tensor conversions
# during runtime.
self.stats = self.stats or {}
self._tensor_stats = _convert_stats_to_tensors(self.stats)
# Ensure *normalize_keys* is a set for fast look-ups and compare by
# value later when returning the configuration.
if self.normalize_keys is not None and not isinstance(self.normalize_keys, set):
self.normalize_keys = set(self.normalize_keys)
def _normalize_obs(self, observation):
if observation is None:
return None
# Decide which keys should be normalised for this call.
if self.normalize_keys is not None:
keys_to_norm = self.normalize_keys
else:
# Use feature map to skip action keys.
keys_to_norm = {k for k, ft in self.features.items() if ft.type is not FeatureType.ACTION}
processed = dict(observation)
for key in keys_to_norm:
if key not in processed or key not in self._tensor_stats:
continue
orig_val = processed[key]
tensor = (
orig_val.to(dtype=torch.float32)
if isinstance(orig_val, torch.Tensor)
else torch.as_tensor(orig_val, dtype=torch.float32)
)
stats = {k: v.to(tensor.device) for k, v in self._tensor_stats[key].items()}
if "mean" in stats and "std" in stats:
mean, std = stats["mean"], stats["std"]
processed[key] = (tensor - mean) / (std + self.eps)
elif "min" in stats and "max" in stats:
min_val, max_val = stats["min"], stats["max"]
processed[key] = 2 * (tensor - min_val) / (max_val - min_val + self.eps) - 1
return processed
def _normalize_action(self, action):
if action is None or "action" not in self._tensor_stats:
return action
tensor = (
action.to(dtype=torch.float32)
if isinstance(action, torch.Tensor)
else torch.as_tensor(action, dtype=torch.float32)
)
stats = {k: v.to(tensor.device) for k, v in self._tensor_stats["action"].items()}
if "mean" in stats and "std" in stats:
mean, std = stats["mean"], stats["std"]
return (tensor - mean) / (std + self.eps)
if "min" in stats and "max" in stats:
min_val, max_val = stats["min"], stats["max"]
return 2 * (tensor - min_val) / (max_val - min_val + self.eps) - 1
raise ValueError("Action stats must contain either ('mean','std') or ('min','max')")
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = self._normalize_obs(transition.get(TransitionKey.OBSERVATION))
action = self._normalize_action(transition.get(TransitionKey.ACTION))
# Create a new transition with normalized values
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = observation
new_transition[TransitionKey.ACTION] = action
return new_transition
def get_config(self) -> dict[str, Any]:
config = {
"eps": self.eps,
"features": {
key: {"type": ft.type.value, "shape": ft.shape} for key, ft in self.features.items()
},
"norm_map": {ft_type.value: norm_mode.value for ft_type, norm_mode in self.norm_map.items()},
}
if self.normalize_keys is not None:
# Serialise as a list for YAML / JSON friendliness
config["normalize_keys"] = sorted(self.normalize_keys)
return config
def state_dict(self) -> dict[str, Tensor]:
flat = {}
for key, sub in self._tensor_stats.items():
for stat_name, tensor in sub.items():
flat[f"{key}.{stat_name}"] = tensor
return flat
def load_state_dict(self, state: Mapping[str, Tensor]) -> None:
self._tensor_stats.clear()
for flat_key, tensor in state.items():
key, stat_name = flat_key.rsplit(".", 1)
self._tensor_stats.setdefault(key, {})[stat_name] = tensor
def reset(self):
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register(name="unnormalizer_processor")
class UnnormalizerProcessor:
"""Inverse normalisation for observations and actions.
Exactly mirrors :class:`NormalizerProcessor` but applies the inverse
transform.
"""
features: dict[str, PolicyFeature]
norm_map: dict[FeatureType, NormalizationMode]
stats: dict[str, dict[str, Any]] | None = None
eps: float = 1e-8
_tensor_stats: dict[str, dict[str, Tensor]] = field(default_factory=dict, init=False, repr=False)
@classmethod
def from_lerobot_dataset(
cls,
dataset: LeRobotDataset,
features: dict[str, PolicyFeature],
norm_map: dict[FeatureType, NormalizationMode],
*,
eps: float = 1e-8,
) -> UnnormalizerProcessor:
return cls(features=features, norm_map=norm_map, stats=dataset.meta.stats, eps=eps)
def __post_init__(self):
# Handle deserialization from JSON config
if self.features and isinstance(list(self.features.values())[0], dict):
# Features came from JSON - need to reconstruct PolicyFeature objects
reconstructed_features = {}
for key, ft_dict in self.features.items():
reconstructed_features[key] = PolicyFeature(
type=FeatureType(ft_dict["type"]), shape=tuple(ft_dict["shape"])
)
self.features = reconstructed_features
if self.norm_map and isinstance(list(self.norm_map.keys())[0], str):
# norm_map came from JSON - need to reconstruct enum keys and values
reconstructed_norm_map = {}
for ft_type_str, norm_mode_str in self.norm_map.items():
reconstructed_norm_map[FeatureType(ft_type_str)] = NormalizationMode(norm_mode_str)
self.norm_map = reconstructed_norm_map
self.stats = self.stats or {}
self._tensor_stats = _convert_stats_to_tensors(self.stats)
def _unnormalize_obs(self, observation):
if observation is None:
return None
keys = [k for k, ft in self.features.items() if ft.type is not FeatureType.ACTION]
processed = dict(observation)
for key in keys:
if key not in processed or key not in self._tensor_stats:
continue
orig_val = processed[key]
tensor = (
orig_val.to(dtype=torch.float32)
if isinstance(orig_val, torch.Tensor)
else torch.as_tensor(orig_val, dtype=torch.float32)
)
stats = {k: v.to(tensor.device) for k, v in self._tensor_stats[key].items()}
if "mean" in stats and "std" in stats:
mean, std = stats["mean"], stats["std"]
processed[key] = tensor * std + mean
elif "min" in stats and "max" in stats:
min_val, max_val = stats["min"], stats["max"]
processed[key] = (tensor + 1) / 2 * (max_val - min_val) + min_val
return processed
def _unnormalize_action(self, action):
if action is None or "action" not in self._tensor_stats:
return action
tensor = (
action.to(dtype=torch.float32)
if isinstance(action, torch.Tensor)
else torch.as_tensor(action, dtype=torch.float32)
)
stats = {k: v.to(tensor.device) for k, v in self._tensor_stats["action"].items()}
if "mean" in stats and "std" in stats:
mean, std = stats["mean"], stats["std"]
return tensor * std + mean
if "min" in stats and "max" in stats:
min_val, max_val = stats["min"], stats["max"]
return (tensor + 1) / 2 * (max_val - min_val) + min_val
raise ValueError("Action stats must contain either ('mean','std') or ('min','max')")
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = self._unnormalize_obs(transition.get(TransitionKey.OBSERVATION))
action = self._unnormalize_action(transition.get(TransitionKey.ACTION))
# Create a new transition with unnormalized values
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = observation
new_transition[TransitionKey.ACTION] = action
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"eps": self.eps,
"features": {
key: {"type": ft.type.value, "shape": ft.shape} for key, ft in self.features.items()
},
"norm_map": {ft_type.value: norm_mode.value for ft_type, norm_mode in self.norm_map.items()},
}
def state_dict(self) -> dict[str, Tensor]:
flat = {}
for key, sub in self._tensor_stats.items():
for stat_name, tensor in sub.items():
flat[f"{key}.{stat_name}"] = tensor
return flat
def load_state_dict(self, state: Mapping[str, Tensor]) -> None:
self._tensor_stats.clear()
for flat_key, tensor in state.items():
key, stat_name = flat_key.rsplit(".", 1)
self._tensor_stats.setdefault(key, {})[stat_name] = tensor
def reset(self):
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@@ -0,0 +1,267 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from typing import Any
import einops
import numpy as np
import torch
from torch import Tensor
from lerobot.configs.types import PolicyFeature
from lerobot.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
@dataclass
class ImageProcessor:
"""Process image observations from environment format to policy format.
Converts images from:
- Channel-last (H, W, C) to channel-first (C, H, W)
- uint8 [0, 255] to float32 [0, 1]
- Adds batch dimension if needed
- Handles both single images and dictionaries of images
"""
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
processed_obs = {}
# Copy all observations first
for key, value in observation.items():
processed_obs[key] = value
# Handle pixels key if present
pixels = observation.get("pixels")
if pixels is not None:
# Remove pixels from processed_obs since we'll replace it with processed images
processed_obs.pop("pixels", None)
# Determine image mapping
if isinstance(pixels, dict):
imgs = {f"{OBS_IMAGES}.{key}": img for key, img in pixels.items()}
else:
imgs = {OBS_IMAGE: pixels}
# Process each image
for imgkey, img in imgs.items():
processed_img = self._process_single_image(img)
processed_obs[imgkey] = processed_img
# Return new transition with processed observation
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = processed_obs
return new_transition
def _process_single_image(self, img: np.ndarray) -> Tensor:
"""Process a single image array."""
# Convert to tensor
img_tensor = torch.from_numpy(img)
# Add batch dimension if needed
if img_tensor.ndim == 3:
img_tensor = img_tensor.unsqueeze(0)
# Validate image format
_, h, w, c = img_tensor.shape
if not (c < h and c < w):
raise ValueError(f"Expected channel-last images, but got shape {img_tensor.shape}")
if img_tensor.dtype != torch.uint8:
raise ValueError(f"Expected torch.uint8 images, but got {img_tensor.dtype}")
# Convert to channel-first format
img_tensor = einops.rearrange(img_tensor, "b h w c -> b c h w").contiguous()
# Convert to float32 and normalize to [0, 1]
img_tensor = img_tensor.type(torch.float32) / 255.0
return img_tensor
def get_config(self) -> dict[str, Any]:
"""Return configuration for serialization."""
return {}
def state_dict(self) -> dict[str, torch.Tensor]:
"""Return state dictionary (empty for this processor)."""
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
"""Load state dictionary (no-op for this processor)."""
pass
def reset(self) -> None:
"""Reset processor state (no-op for this processor)."""
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
"""Transforms:
pixels -> OBS_IMAGE,
observation.pixels -> OBS_IMAGE,
pixels.<cam> -> OBS_IMAGES.<cam>,
observation.pixels.<cam> -> OBS_IMAGES.<cam>
"""
if "pixels" in features:
features[OBS_IMAGE] = features.pop("pixels")
if "observation.pixels" in features:
features[OBS_IMAGE] = features.pop("observation.pixels")
prefixes = ("pixels.", "observation.pixels.")
for key in list(features.keys()):
for p in prefixes:
if key.startswith(p):
suffix = key[len(p) :]
features[f"{OBS_IMAGES}.{suffix}"] = features.pop(key)
break
return features
@dataclass
class StateProcessor:
"""Process state observations from environment format to policy format.
Handles:
- environment_state -> observation.environment_state
- agent_pos -> observation.state
- Converts numpy arrays to tensors
- Adds batch dimension if needed
"""
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
processed_obs = dict(observation) # Copy existing observations
# Process environment_state
if "environment_state" in observation:
env_state = torch.from_numpy(observation["environment_state"]).float()
if env_state.dim() == 1:
env_state = env_state.unsqueeze(0)
processed_obs[OBS_ENV_STATE] = env_state
# Remove original key
del processed_obs["environment_state"]
# Process agent_pos
if "agent_pos" in observation:
agent_pos = torch.from_numpy(observation["agent_pos"]).float()
if agent_pos.dim() == 1:
agent_pos = agent_pos.unsqueeze(0)
processed_obs[OBS_STATE] = agent_pos
# Remove original key
del processed_obs["agent_pos"]
# Return new transition with processed observation
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = processed_obs
return new_transition
def get_config(self) -> dict[str, Any]:
"""Return configuration for serialization."""
return {}
def state_dict(self) -> dict[str, torch.Tensor]:
"""Return state dictionary (empty for this processor)."""
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
"""Load state dictionary (no-op for this processor)."""
pass
def reset(self) -> None:
"""Reset processor state (no-op for this processor)."""
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
"""Transforms:
environment_state -> OBS_ENV_STATE,
agent_pos -> OBS_STATE,
observation.environment_state -> OBS_ENV_STATE,
observation.agent_pos -> OBS_STATE
"""
pairs = (
("environment_state", OBS_ENV_STATE),
("agent_pos", OBS_STATE),
)
for old, new in pairs:
if old in features:
features[new] = features.pop(old)
prefixed = f"observation.{old}"
if prefixed in features:
features[new] = features.pop(prefixed)
return features
@dataclass
@ProcessorStepRegistry.register(name="observation_processor")
class VanillaObservationProcessor:
"""Complete observation processor that combines image and state processing.
This processor replicates the functionality of the original preprocess_observation
function but in a modular, composable way that fits into the pipeline architecture.
"""
image_processor: ImageProcessor = field(default_factory=ImageProcessor)
state_processor: StateProcessor = field(default_factory=StateProcessor)
def __call__(self, transition: EnvTransition) -> EnvTransition:
# First process images
transition = self.image_processor(transition)
# Then process state
transition = self.state_processor(transition)
return transition
def get_config(self) -> dict[str, Any]:
"""Return configuration for serialization."""
return {
"image_processor": self.image_processor.get_config(),
"state_processor": self.state_processor.get_config(),
}
def state_dict(self) -> dict[str, torch.Tensor]:
"""Return state dictionary."""
state = {}
state.update({f"image_processor.{k}": v for k, v in self.image_processor.state_dict().items()})
state.update({f"state_processor.{k}": v for k, v in self.state_processor.state_dict().items()})
return state
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
"""Load state dictionary."""
image_state = {
k.replace("image_processor.", ""): v for k, v in state.items() if k.startswith("image_processor.")
}
state_state = {
k.replace("state_processor.", ""): v for k, v in state.items() if k.startswith("state_processor.")
}
self.image_processor.load_state_dict(image_state)
self.state_processor.load_state_dict(state_state)
def reset(self) -> None:
"""Reset processor state."""
self.image_processor.reset()
self.state_processor.reset()
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
features = self.image_processor.feature_contract(features)
features = self.state_processor.feature_contract(features)
return features
File diff suppressed because it is too large Load Diff
+63
View File
@@ -0,0 +1,63 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from typing import Any
import torch
from lerobot.configs.types import PolicyFeature
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
@dataclass
@ProcessorStepRegistry.register(name="rename_processor")
class RenameProcessor:
"""Rename processor that renames keys in the observation."""
rename_map: dict[str, str] = field(default_factory=dict)
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
processed_obs = {}
for key, value in observation.items():
if key in self.rename_map:
processed_obs[self.rename_map[key]] = value
else:
processed_obs[key] = value
# Create a new transition with the renamed observation
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = processed_obs
return new_transition
def get_config(self) -> dict[str, Any]:
return {"rename_map": self.rename_map}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
"""Transforms:
- Each key in the observation that appears in `rename_map` is renamed to its value.
- Keys not in `rename_map` remain unchanged.
"""
return {self.rename_map.get(k, k): v for k, v in features.items()}
+245
View File
@@ -0,0 +1,245 @@
from dataclasses import dataclass, field
from typing import Any
import gymnasium as gym
import numpy as np
import torch
from lerobot.configs.types import PolicyFeature
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
@dataclass
@ProcessorStepRegistry.register("joint_velocity_processor")
class JointVelocityProcessor:
"""Add joint velocity information to observations.
Computes joint velocities by tracking changes in joint positions over time.
"""
joint_velocity_limits: float = 100.0
dt: float = 1.0 / 10
last_joint_positions: torch.Tensor | None = None
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
# Get current joint positions (assuming they're in observation.state)
current_positions = observation.get("observation.state")
if current_positions is None:
return transition
# Initialize last joint positions if not already set
if self.last_joint_positions is None:
self.last_joint_positions = current_positions.clone()
# Compute velocities
joint_velocities = (current_positions - self.last_joint_positions) / self.dt
self.last_joint_positions = current_positions.clone()
# Extend observation with velocities
extended_state = torch.cat([current_positions, joint_velocities], dim=-1)
# Create new observation dict
new_observation = dict(observation)
new_observation["observation.state"] = extended_state
# Return new transition
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = new_observation
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"joint_velocity_limits": self.joint_velocity_limits,
"dt": self.dt,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
self.last_joint_positions = None
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("current_processor")
class MotorCurrentProcessor:
"""Add motor current information to observations."""
env: gym.Env = None
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
# Get current values from complementary_data (where robot state would be stored)
present_current_dict = self.env.unwrapped.robot.bus.sync_read("Present_Current")
motor_currents = torch.tensor(
[present_current_dict[name] for name in self.env.unwrapped.robot.bus.motors],
dtype=torch.float32,
).unsqueeze(0)
current_state = observation.get("observation.state")
if current_state is None:
return transition
extended_state = torch.cat([current_state, motor_currents], dim=-1)
# Create new observation dict
new_observation = dict(observation)
new_observation["observation.state"] = extended_state
# Return new transition
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = new_observation
return new_transition
def get_config(self) -> dict[str, Any]:
return {}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("inverse_kinematics_processor")
class InverseKinematicsProcessor:
"""Convert end-effector space actions to joint space using inverse kinematics.
This processor transforms delta commands in end-effector space (delta_x, delta_y, delta_z)
to joint space commands using forward and inverse kinematics. It maintains the current
end-effector pose and joint positions to compute the transformations.
"""
urdf_path: str
target_frame_name: str = "gripper_link"
end_effector_step_sizes: dict[str, float] = field(default_factory=lambda: {"x": 1.0, "y": 1.0, "z": 1.0})
end_effector_bounds: dict[str, list[float]] | None = None
max_gripper_pos: float = 30.0
# State tracking
current_ee_pos: np.ndarray | None = field(default=None, init=False, repr=False)
current_joint_pos: np.ndarray | None = field(default=None, init=False, repr=False)
kinematics: RobotKinematics | None = field(default=None, init=False, repr=False)
def __post_init__(self):
"""Initialize the kinematics module after dataclass initialization."""
if self.urdf_path:
self.kinematics = RobotKinematics(
urdf_path=self.urdf_path,
target_frame_name=self.target_frame_name,
)
def __call__(self, transition: EnvTransition) -> EnvTransition:
action = transition.get(TransitionKey.ACTION)
if action is None:
return transition
action_np = action.detach().cpu().numpy().squeeze()
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
raw_joint_positions = complementary_data.get("raw_joint_positions")
current_gripper_pos = raw_joint_positions[-1]
if self.current_joint_pos is None:
self.current_joint_pos = raw_joint_positions
# Initialize end-effector position if not available
if self.current_joint_pos is None:
return transition # Cannot proceed without joint positions
# Calculate current end-effector position using forward kinematics
if self.current_ee_pos is None:
self.current_ee_pos = self.kinematics.forward_kinematics(self.current_joint_pos)
# Scale deltas by step sizes
delta_ee = np.array(
[
action_np[0] * self.end_effector_step_sizes["x"],
action_np[1] * self.end_effector_step_sizes["y"],
action_np[2] * self.end_effector_step_sizes["z"],
],
dtype=np.float32,
)
# Set desired end-effector position by adding delta
desired_ee_pos = np.eye(4)
desired_ee_pos[:3, :3] = self.current_ee_pos[:3, :3] # Keep orientation
# Add delta to position and clip to bounds
desired_ee_pos[:3, 3] = self.current_ee_pos[:3, 3] + delta_ee
if self.end_effector_bounds is not None:
desired_ee_pos[:3, 3] = np.clip(
desired_ee_pos[:3, 3],
self.end_effector_bounds["min"],
self.end_effector_bounds["max"],
)
# Compute inverse kinematics to get joint positions
target_joint_values = self.kinematics.inverse_kinematics(self.current_joint_pos, desired_ee_pos)
# Update current state
self.current_ee_pos = desired_ee_pos.copy()
self.current_joint_pos = target_joint_values.copy()
# Create new action with joint space commands
gripper_action = current_gripper_pos
if len(action_np) > 3:
# Handle gripper command separately
gripper_command = action_np[3]
# Process gripper command (convert from [0,2] to delta) and discretize
gripper_delta = np.round(gripper_command - 1.0).astype(int) * self.max_gripper_pos
gripper_action = np.clip(current_gripper_pos + gripper_delta, 0, self.max_gripper_pos)
# Combine joint positions and gripper
target_joint_values[-1] = gripper_action
converted_action = torch.from_numpy(target_joint_values).to(action.device).to(action.dtype)
new_transition = transition.copy()
new_transition[TransitionKey.ACTION] = converted_action
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"urdf_path": self.urdf_path,
"target_frame_name": self.target_frame_name,
"end_effector_step_sizes": self.end_effector_step_sizes,
"end_effector_bounds": self.end_effector_bounds,
"max_gripper_pos": self.max_gripper_pos,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
"""Reset the processor state."""
self.current_ee_pos = None
self.current_joint_pos = None
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
+19 -3
View File
@@ -68,10 +68,11 @@ from tqdm import trange
from lerobot.configs import parser
from lerobot.configs.eval import EvalPipelineConfig
from lerobot.envs.factory import make_env
from lerobot.envs.utils import add_envs_task, check_env_attributes_and_types, preprocess_observation
from lerobot.envs.utils import add_envs_task, check_env_attributes_and_types
from lerobot.policies.factory import make_policy
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.utils import get_device_from_parameters
from lerobot.processor import RobotProcessor, TransitionKey, VanillaObservationProcessor
from lerobot.utils.io_utils import write_video
from lerobot.utils.random_utils import set_seed
from lerobot.utils.utils import (
@@ -128,6 +129,16 @@ def rollout(
if render_callback is not None:
render_callback(env)
# Create observation processing processor
# NOTE: During environment interaction, we skip batch dictionary conversion
# since that format is only needed for loss computation during training.
# Using identity functions to avoid unnecessary format transformations.
obs_processor = RobotProcessor(
[VanillaObservationProcessor()],
to_transition=lambda x: x,
to_output=lambda x: x,
)
all_observations = []
all_actions = []
all_rewards = []
@@ -147,10 +158,13 @@ def rollout(
check_env_attributes_and_types(env)
while not np.all(done):
# Numpy array to tensor and changing dictionary keys to LeRobot policy format.
observation = preprocess_observation(observation)
transition = (observation, None, None, None, None, None, None)
processed_transition = obs_processor(transition)
observation = processed_transition[TransitionKey.OBSERVATION]
if return_observations:
all_observations.append(deepcopy(observation))
# TODO(azouitine): Move this in processor side
observation = {
key: observation[key].to(device, non_blocking=device.type == "cuda") for key in observation
}
@@ -195,7 +209,9 @@ def rollout(
# Track the final observation.
if return_observations:
observation = preprocess_observation(observation)
transition = (observation, None, None, None, None, None, None)
processed_transition = obs_processor(transition)
observation = processed_transition[TransitionKey.OBSERVATION]
all_observations.append(deepcopy(observation))
# Stack the sequence along the first dimension so that we have (batch, sequence, *) tensors.
+61 -26
View File
@@ -62,8 +62,14 @@ from lerobot.configs import parser
from lerobot.configs.train import TrainRLServerPipelineConfig
from lerobot.policies.factory import make_policy
from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.processor.pipeline import TransitionKey
from lerobot.robots import so100_follower # noqa: F401
from lerobot.scripts.rl.gym_manipulator import make_robot_env
from lerobot.scripts.rl.gym_manipulator import (
create_transition,
make_processors,
make_robot_env,
step_env_and_process_transition,
)
from lerobot.teleoperators import gamepad, so101_leader # noqa: F401
from lerobot.transport import services_pb2, services_pb2_grpc
from lerobot.transport.utils import (
@@ -236,7 +242,8 @@ def act_with_policy(
logging.info("make_env online")
online_env = make_robot_env(cfg=cfg.env)
online_env, teleop_device = make_robot_env(cfg=cfg.env)
env_processor, action_processor = make_processors(online_env, cfg.env)
set_seed(cfg.seed)
device = get_safe_torch_device(cfg.policy.device, log=True)
@@ -257,6 +264,13 @@ def act_with_policy(
assert isinstance(policy, nn.Module)
obs, info = online_env.reset()
complementary_data = {"raw_joint_positions": info.pop("raw_joint_positions")}
env_processor.reset()
action_processor.reset()
# Process initial observation
transition = create_transition(observation=obs, info=info, complementary_data=complementary_data)
transition = env_processor(transition)
# NOTE: For the moment we will solely handle the case of a single environment
sum_reward_episode = 0
@@ -274,45 +288,57 @@ def act_with_policy(
logging.info("[ACTOR] Shutting down act_with_policy")
return
if interaction_step >= cfg.policy.online_step_before_learning:
# Time policy inference and check if it meets FPS requirement
with policy_timer:
action = policy.select_action(batch=obs)
policy_fps = policy_timer.fps_last
observation = transition[TransitionKey.OBSERVATION]
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
# Time policy inference and check if it meets FPS requirement
with policy_timer:
# Extract observation from transition for policy
action = policy.select_action(batch=observation)
policy_fps = policy_timer.fps_last
else:
action = online_env.action_space.sample()
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
next_obs, reward, done, truncated, info = online_env.step(action)
# Use the new step function
new_transition, terminate_episode = step_env_and_process_transition(
env=online_env,
transition=transition,
action=action,
teleop_device=teleop_device,
env_processor=env_processor,
action_processor=action_processor,
)
# Extract values from processed transition
next_observation = new_transition[TransitionKey.OBSERVATION]
executed_action = new_transition[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
reward = new_transition[TransitionKey.REWARD]
done = new_transition.get(TransitionKey.DONE, False)
truncated = new_transition.get(TransitionKey.TRUNCATED, False)
sum_reward_episode += float(reward)
# Increment total steps counter for intervention rate
episode_total_steps += 1
# NOTE: We override the action if the intervention is True, because the action applied is the intervention action
if "is_intervention" in info and info["is_intervention"]:
# NOTE: The action space for demonstration before hand is with the full action space
# but sometimes for example we want to deactivate the gripper
action = info["action_intervention"]
# Check for intervention from transition info
intervention_info = new_transition[TransitionKey.INFO]
if intervention_info.get("is_intervention", False):
episode_intervention = True
# Increment intervention steps counter
episode_intervention_steps += 1
# Create transition for learner (convert to old format)
list_transition_to_send_to_learner.append(
Transition(
state=obs,
action=action,
state=observation,
action=executed_action,
reward=reward,
next_state=next_obs,
next_state=next_observation,
done=done,
truncated=truncated, # TODO: (azouitine) Handle truncation properly
complementary_info=info,
truncated=truncated,
complementary_info={}, # new_transition[TransitionKey.COMPLEMENTARY_DATA],
)
)
# assign obs to the next obs and continue the rollout
obs = next_obs
# Update transition for next iteration
transition = new_transition
if done or truncated:
logging.info(f"[ACTOR] Global step {interaction_step}: Episode reward: {sum_reward_episode}")
@@ -347,12 +373,21 @@ def act_with_policy(
)
)
# Reset intervention counters
# Reset intervention counters and environment
sum_reward_episode = 0.0
episode_intervention = False
episode_intervention_steps = 0
episode_total_steps = 0
# Reset environment and processors
obs, info = online_env.reset()
complementary_data = {"raw_joint_positions": info.pop("raw_joint_positions")}
env_processor.reset()
action_processor.reset()
# Process initial observation
transition = create_transition(observation=obs, info=info, complementary_data=complementary_data)
transition = env_processor(transition)
if cfg.env.fps is not None:
dt_time = time.perf_counter() - start_time
File diff suppressed because it is too large Load Diff
@@ -107,6 +107,45 @@ class GamepadTeleop(Teleoperator):
return action_dict
def get_teleop_events(self) -> dict[str, Any]:
"""
Get extra control events from the gamepad such as intervention status,
episode termination, success indicators, etc.
Returns:
Dictionary containing:
- is_intervention: bool - Whether human is currently intervening
- terminate_episode: bool - Whether to terminate the current episode
- success: bool - Whether the episode was successful
- rerecord_episode: bool - Whether to rerecord the episode
"""
if self.gamepad is None:
return {
"is_intervention": False,
"terminate_episode": False,
"success": False,
"rerecord_episode": False,
}
# Update gamepad state to get fresh inputs
self.gamepad.update()
# Check if intervention is active
is_intervention = self.gamepad.should_intervene()
# Get episode end status
episode_end_status = self.gamepad.get_episode_end_status()
terminate_episode = episode_end_status is not None
success = episode_end_status == "success"
rerecord_episode = episode_end_status == "rerecord_episode"
return {
"is_intervention": is_intervention,
"terminate_episode": terminate_episode,
"success": success,
"rerecord_episode": rerecord_episode,
}
def disconnect(self) -> None:
"""Disconnect from the gamepad."""
if self.gamepad is not None:
@@ -235,3 +235,67 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
action_dict["gripper"] = gripper_action
return action_dict
def get_teleop_events(self) -> dict[str, Any]:
"""
Get extra control events from the keyboard such as intervention status,
episode termination, success indicators, etc.
Keyboard mappings:
- Any movement keys pressed = intervention active
- 's' key = success (terminate episode successfully)
- 'r' key = rerecord episode (terminate and rerecord)
- 'q' key = quit episode (terminate without success)
Returns:
Dictionary containing:
- is_intervention: bool - Whether human is currently intervening
- terminate_episode: bool - Whether to terminate the current episode
- success: bool - Whether the episode was successful
- rerecord_episode: bool - Whether to rerecord the episode
"""
if not self.is_connected:
return {
"is_intervention": False,
"terminate_episode": False,
"success": False,
"rerecord_episode": False,
}
# Check if any movement keys are currently pressed (indicates intervention)
movement_keys = [
keyboard.Key.up,
keyboard.Key.down,
keyboard.Key.left,
keyboard.Key.right,
keyboard.Key.shift,
keyboard.Key.shift_r,
keyboard.Key.ctrl_r,
keyboard.Key.ctrl_l,
]
is_intervention = any(self.current_pressed.get(key, False) for key in movement_keys)
# Check for episode control commands from misc_keys_queue
terminate_episode = False
success = False
rerecord_episode = False
# Process any pending misc keys
while not self.misc_keys_queue.empty():
key = self.misc_keys_queue.get_nowait()
if key == "s":
terminate_episode = True
success = True
elif key == "r":
terminate_episode = True
rerecord_episode = True
elif key == "q":
terminate_episode = True
success = False
return {
"is_intervention": is_intervention,
"terminate_episode": terminate_episode,
"success": success,
"rerecord_episode": rerecord_episode,
}
+12
View File
@@ -160,6 +160,18 @@ class Teleoperator(abc.ABC):
"""
pass
@abc.abstractmethod
def get_teleop_events(self) -> dict[str, Any]:
"""
Get extra control events from the teleoperator such as intervention status,
episode termination, success indicators, etc.
Check the implementation of the gamepad for an example.
Returns:
dict[str, Any]: A dictionary containing control events with keys and values that are specific to the setup.
"""
pass
@abc.abstractmethod
def send_feedback(self, feedback: dict[str, Any]) -> None:
"""
@@ -0,0 +1,195 @@
---
library_name: lerobot
tags:
- robotics
- lerobot
- safetensors
pipeline_tag: robotics
---
# RobotProcessor
## Overview
RobotProcessor is a composable, debuggable post-processing pipeline for robot transitions in the LeRobot framework. It orchestrates an ordered collection of small, functional transforms (steps) that are executed left-to-right on each incoming `EnvTransition`.
## Architecture
The RobotProcessor provides a modular architecture for processing robot environment transitions through a sequence of composable steps. Each step is a callable that accepts a full `EnvTransition` tuple and returns a potentially modified tuple of the same structure.
### EnvTransition Structure
An `EnvTransition` is a 7-tuple containing:
1. **observation**: Current state observation
2. **action**: Action taken (can be None)
3. **reward**: Reward received (float or None)
4. **done**: Episode termination flag (bool or None)
5. **truncated**: Episode truncation flag (bool or None)
6. **info**: Additional information dictionary
7. **complementary_data**: Extra data dictionary
## Key Features
- **Composable Pipeline**: Chain multiple processing steps in a specific order
- **State Persistence**: Save and load processor state using SafeTensors format
- **Hugging Face Hub Integration**: Easy sharing and loading via `save_pretrained()` and `from_pretrained()`
- **Debugging Support**: Step-through functionality to inspect intermediate transformations
- **Hook System**: Before/after step hooks for additional processing or monitoring
- **Device Support**: Move tensor states to different devices (CPU/GPU)
- **Performance Profiling**: Built-in profiling to identify bottlenecks
## Installation
Follow the [installation instructions](https://huggingface.co/docs/lerobot/installation) to install the package.
## Usage
### Basic Example
```python
from lerobot.processor.pipeline import RobotProcessor
from your_steps import ObservationNormalizer, VelocityCalculator
# Create a processor with multiple steps
processor = RobotProcessor(
steps=[
ObservationNormalizer(mean=0, std=1),
VelocityCalculator(window_size=5),
],
name="my_robot_processor",
seed=42
)
# Process a transition
obs, info = env.reset()
transition = (obs, None, 0.0, False, False, info, {})
processed_transition = processor(transition)
# Extract processed observation
processed_obs = processed_transition[0]
```
### Saving and Loading
```python
# Save locally
processor.save_pretrained("./my_processor")
# Push to Hugging Face Hub
processor.push_to_hub("username/my-robot-processor")
# Load from Hub
loaded_processor = RobotProcessor.from_pretrained("username/my-robot-processor")
```
### Debugging with Step-Through
```python
# Inspect intermediate results
for idx, intermediate_transition in enumerate(processor.step_through(transition)):
print(f"After step {idx}: {intermediate_transition[0]}") # Print observation
```
### Using Hooks
```python
# Add monitoring hook
def log_observation(step_idx, transition):
print(f"Step {step_idx}: obs shape = {transition[0].shape}")
return None # Don't modify transition
processor.register_before_step_hook(log_observation)
```
## Creating Custom Steps
To create a custom processor step, implement the `ProcessorStep` protocol:
```python
from lerobot.processor.pipeline import ProcessorStepRegistry, EnvTransition
@ProcessorStepRegistry.register("my_custom_step")
class MyCustomStep:
def __init__(self, param1=1.0):
self.param1 = param1
self.buffer = []
def __call__(self, transition: EnvTransition) -> EnvTransition:
obs, action, reward, done, truncated, info, comp_data = transition
# Process observation
processed_obs = obs * self.param1
return (processed_obs, action, reward, done, truncated, info, comp_data)
def get_config(self) -> dict:
return {"param1": self.param1}
def state_dict(self) -> dict:
# Return only torch.Tensor state
return {}
def load_state_dict(self, state: dict) -> None:
# Load tensor state
pass
def reset(self) -> None:
# Clear buffers at episode boundaries
self.buffer.clear()
```
## Advanced Features
### Device Management
```python
# Move all tensor states to GPU
processor = processor.to("cuda")
# Move to specific device
processor = processor.to(torch.device("cuda:1"))
```
### Performance Profiling
```python
# Profile step execution times
profile_results = processor.profile_steps(transition, num_runs=100)
for step_name, time_ms in profile_results.items():
print(f"{step_name}: {time_ms:.3f} ms")
```
### Processor Slicing
```python
# Get a single step
first_step = processor[0]
# Create a sub-processor with steps 1-3
sub_processor = processor[1:4]
```
## Model Card Specifications
- **Pipeline Tag**: robotics
- **Library**: lerobot
- **Format**: safetensors
- **License**: Apache 2.0
## Limitations
- Steps must maintain the 7-tuple structure of EnvTransition
- All tensor state must be separated from configuration for proper serialization
- Steps are executed sequentially (no parallel processing within a single transition)
## Citation
If you use RobotProcessor in your research, please cite:
```bibtex
@misc{cadene2024lerobot,
author = {Cadene, Remi and Alibert, Simon and Soare, Alexander and Gallouedec, Quentin and Zouitine, Adil and Palma, Steven and Kooijmans, Pepijn and Aractingi, Michel and Shukor, Mustafa and Aubakirova, Dana and Russi, Martino and Capuano, Francesco and Pascale, Caroline and Choghari, Jade and Moss, Jess and Wolf, Thomas},
title = {LeRobot: State-of-the-art Machine Learning for Real-World Robotics in Pytorch},
howpublished = "\url{https://github.com/huggingface/lerobot}",
year = {2024}
}
```
+17
View File
@@ -19,6 +19,7 @@ import traceback
import pytest
from serial import SerialException
from lerobot.configs.types import FeatureType, PolicyFeature
from tests.utils import DEVICE
# Import fixture modules as plugins
@@ -69,3 +70,19 @@ def patch_builtins_input(monkeypatch):
print(text)
monkeypatch.setattr("builtins.input", print_text)
@pytest.fixture
def policy_feature_factory():
"""PolicyFeature factory"""
def _pf(ft: FeatureType, shape: tuple[int, ...]) -> PolicyFeature:
return PolicyFeature(type=ft, shape=shape)
return _pf
def assert_contract_is_typed(features: dict[str, PolicyFeature]) -> None:
assert isinstance(features, dict)
assert all(isinstance(k, str) for k in features.keys())
assert all(isinstance(v, PolicyFeature) for v in features.values())
+7 -2
View File
@@ -22,7 +22,7 @@ from gymnasium.utils.env_checker import check_env
import lerobot
from lerobot.envs.factory import make_env, make_env_config
from lerobot.envs.utils import preprocess_observation
from lerobot.processor import RobotProcessor, TransitionKey, VanillaObservationProcessor
from tests.utils import require_env
OBS_TYPES = ["state", "pixels", "pixels_agent_pos"]
@@ -48,7 +48,12 @@ def test_factory(env_name):
cfg = make_env_config(env_name)
env = make_env(cfg, n_envs=1)
obs, _ = env.reset()
obs = preprocess_observation(obs)
# Process observation using processor
obs_processor = RobotProcessor([VanillaObservationProcessor()])
transition = (obs, None, None, None, None, None, None)
processed_transition = obs_processor(transition)
obs = processed_transition[TransitionKey.OBSERVATION]
# test image keys are float32 in range [0,1]
for key in obs:
+5 -2
View File
@@ -30,7 +30,6 @@ from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.datasets.factory import make_dataset
from lerobot.datasets.utils import cycle, dataset_to_policy_features
from lerobot.envs.factory import make_env, make_env_config
from lerobot.envs.utils import preprocess_observation
from lerobot.optim.factory import make_optimizer_and_scheduler
from lerobot.policies.act.modeling_act import ACTTemporalEnsembler
from lerobot.policies.factory import (
@@ -40,6 +39,7 @@ from lerobot.policies.factory import (
)
from lerobot.policies.normalize import Normalize, Unnormalize
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.processor import RobotProcessor, TransitionKey, VanillaObservationProcessor
from lerobot.utils.random_utils import seeded_context
from tests.artifacts.policies.save_policy_to_safetensors import get_policy_stats
from tests.utils import DEVICE, require_cpu, require_env, require_x86_64_kernel
@@ -185,7 +185,10 @@ def test_policy(ds_repo_id, env_name, env_kwargs, policy_name, policy_kwargs):
observation, _ = env.reset(seed=train_cfg.seed)
# apply transform to normalize the observations
observation = preprocess_observation(observation)
obs_processor = RobotProcessor([VanillaObservationProcessor()])
transition = (observation, None, None, None, None, None, None)
processed_transition = obs_processor(transition)
observation = processed_transition[TransitionKey.OBSERVATION]
# send observation to device/gpu
observation = {key: observation[key].to(DEVICE, non_blocking=True) for key in observation}
+282
View File
@@ -0,0 +1,282 @@
import torch
from lerobot.processor.pipeline import (
RobotProcessor,
TransitionKey,
_default_batch_to_transition,
_default_transition_to_batch,
)
def _dummy_batch():
"""Create a dummy batch using the new format with observation.* and next.* keys."""
return {
"observation.image.left": torch.randn(1, 3, 128, 128),
"observation.image.right": torch.randn(1, 3, 128, 128),
"observation.state": torch.tensor([[0.1, 0.2, 0.3, 0.4]]),
"action": torch.tensor([[0.5]]),
"next.reward": 1.0,
"next.done": False,
"next.truncated": False,
"info": {"key": "value"},
}
def test_observation_grouping_roundtrip():
"""Test that observation.* keys are properly grouped and ungrouped."""
proc = RobotProcessor([])
batch_in = _dummy_batch()
batch_out = proc(batch_in)
# Check that all observation.* keys are preserved
original_obs_keys = {k: v for k, v in batch_in.items() if k.startswith("observation.")}
reconstructed_obs_keys = {k: v for k, v in batch_out.items() if k.startswith("observation.")}
assert set(original_obs_keys.keys()) == set(reconstructed_obs_keys.keys())
# Check tensor values
assert torch.allclose(batch_out["observation.image.left"], batch_in["observation.image.left"])
assert torch.allclose(batch_out["observation.image.right"], batch_in["observation.image.right"])
assert torch.allclose(batch_out["observation.state"], batch_in["observation.state"])
# Check other fields
assert torch.allclose(batch_out["action"], batch_in["action"])
assert batch_out["next.reward"] == batch_in["next.reward"]
assert batch_out["next.done"] == batch_in["next.done"]
assert batch_out["next.truncated"] == batch_in["next.truncated"]
assert batch_out["info"] == batch_in["info"]
def test_batch_to_transition_observation_grouping():
"""Test that _default_batch_to_transition correctly groups observation.* keys."""
batch = {
"observation.image.top": torch.randn(1, 3, 128, 128),
"observation.image.left": torch.randn(1, 3, 128, 128),
"observation.state": [1, 2, 3, 4],
"action": "action_data",
"next.reward": 1.5,
"next.done": True,
"next.truncated": False,
"info": {"episode": 42},
}
transition = _default_batch_to_transition(batch)
# Check observation is a dict with all observation.* keys
assert isinstance(transition[TransitionKey.OBSERVATION], dict)
assert "observation.image.top" in transition[TransitionKey.OBSERVATION]
assert "observation.image.left" in transition[TransitionKey.OBSERVATION]
assert "observation.state" in transition[TransitionKey.OBSERVATION]
# Check values are preserved
assert torch.allclose(
transition[TransitionKey.OBSERVATION]["observation.image.top"], batch["observation.image.top"]
)
assert torch.allclose(
transition[TransitionKey.OBSERVATION]["observation.image.left"], batch["observation.image.left"]
)
assert transition[TransitionKey.OBSERVATION]["observation.state"] == [1, 2, 3, 4]
# Check other fields
assert transition[TransitionKey.ACTION] == "action_data"
assert transition[TransitionKey.REWARD] == 1.5
assert transition[TransitionKey.DONE]
assert not transition[TransitionKey.TRUNCATED]
assert transition[TransitionKey.INFO] == {"episode": 42}
assert transition[TransitionKey.COMPLEMENTARY_DATA] == {}
def test_transition_to_batch_observation_flattening():
"""Test that _default_transition_to_batch correctly flattens observation dict."""
observation_dict = {
"observation.image.top": torch.randn(1, 3, 128, 128),
"observation.image.left": torch.randn(1, 3, 128, 128),
"observation.state": [1, 2, 3, 4],
}
transition = {
TransitionKey.OBSERVATION: observation_dict,
TransitionKey.ACTION: "action_data",
TransitionKey.REWARD: 1.5,
TransitionKey.DONE: True,
TransitionKey.TRUNCATED: False,
TransitionKey.INFO: {"episode": 42},
TransitionKey.COMPLEMENTARY_DATA: {},
}
batch = _default_transition_to_batch(transition)
# Check that observation.* keys are flattened back to batch
assert "observation.image.top" in batch
assert "observation.image.left" in batch
assert "observation.state" in batch
# Check values are preserved
assert torch.allclose(batch["observation.image.top"], observation_dict["observation.image.top"])
assert torch.allclose(batch["observation.image.left"], observation_dict["observation.image.left"])
assert batch["observation.state"] == [1, 2, 3, 4]
# Check other fields are mapped to next.* format
assert batch["action"] == "action_data"
assert batch["next.reward"] == 1.5
assert batch["next.done"]
assert not batch["next.truncated"]
assert batch["info"] == {"episode": 42}
def test_no_observation_keys():
"""Test behavior when there are no observation.* keys."""
batch = {
"action": "action_data",
"next.reward": 2.0,
"next.done": False,
"next.truncated": True,
"info": {"test": "no_obs"},
}
transition = _default_batch_to_transition(batch)
# Observation should be None when no observation.* keys
assert transition[TransitionKey.OBSERVATION] is None
# Check other fields
assert transition[TransitionKey.ACTION] == "action_data"
assert transition[TransitionKey.REWARD] == 2.0
assert not transition[TransitionKey.DONE]
assert transition[TransitionKey.TRUNCATED]
assert transition[TransitionKey.INFO] == {"test": "no_obs"}
# Round trip should work
reconstructed_batch = _default_transition_to_batch(transition)
assert reconstructed_batch["action"] == "action_data"
assert reconstructed_batch["next.reward"] == 2.0
assert not reconstructed_batch["next.done"]
assert reconstructed_batch["next.truncated"]
assert reconstructed_batch["info"] == {"test": "no_obs"}
def test_minimal_batch():
"""Test with minimal batch containing only observation.* and action."""
batch = {"observation.state": "minimal_state", "action": "minimal_action"}
transition = _default_batch_to_transition(batch)
# Check observation
assert transition[TransitionKey.OBSERVATION] == {"observation.state": "minimal_state"}
assert transition[TransitionKey.ACTION] == "minimal_action"
# Check defaults
assert transition[TransitionKey.REWARD] == 0.0
assert not transition[TransitionKey.DONE]
assert not transition[TransitionKey.TRUNCATED]
assert transition[TransitionKey.INFO] == {}
assert transition[TransitionKey.COMPLEMENTARY_DATA] == {}
# Round trip
reconstructed_batch = _default_transition_to_batch(transition)
assert reconstructed_batch["observation.state"] == "minimal_state"
assert reconstructed_batch["action"] == "minimal_action"
assert reconstructed_batch["next.reward"] == 0.0
assert not reconstructed_batch["next.done"]
assert not reconstructed_batch["next.truncated"]
assert reconstructed_batch["info"] == {}
def test_empty_batch():
"""Test behavior with empty batch."""
batch = {}
transition = _default_batch_to_transition(batch)
# All fields should have defaults
assert transition[TransitionKey.OBSERVATION] is None
assert transition[TransitionKey.ACTION] is None
assert transition[TransitionKey.REWARD] == 0.0
assert not transition[TransitionKey.DONE]
assert not transition[TransitionKey.TRUNCATED]
assert transition[TransitionKey.INFO] == {}
assert transition[TransitionKey.COMPLEMENTARY_DATA] == {}
# Round trip
reconstructed_batch = _default_transition_to_batch(transition)
assert reconstructed_batch["action"] is None
assert reconstructed_batch["next.reward"] == 0.0
assert not reconstructed_batch["next.done"]
assert not reconstructed_batch["next.truncated"]
assert reconstructed_batch["info"] == {}
def test_complex_nested_observation():
"""Test with complex nested observation data."""
batch = {
"observation.image.top": {"image": torch.randn(1, 3, 128, 128), "timestamp": 1234567890},
"observation.image.left": {"image": torch.randn(1, 3, 128, 128), "timestamp": 1234567891},
"observation.state": torch.randn(7),
"action": torch.randn(8),
"next.reward": 3.14,
"next.done": False,
"next.truncated": True,
"info": {"episode_length": 200, "success": True},
}
transition = _default_batch_to_transition(batch)
reconstructed_batch = _default_transition_to_batch(transition)
# Check that all observation keys are preserved
original_obs_keys = {k for k in batch if k.startswith("observation.")}
reconstructed_obs_keys = {k for k in reconstructed_batch if k.startswith("observation.")}
assert original_obs_keys == reconstructed_obs_keys
# Check tensor values
assert torch.allclose(batch["observation.state"], reconstructed_batch["observation.state"])
# Check nested dict with tensors
assert torch.allclose(
batch["observation.image.top"]["image"], reconstructed_batch["observation.image.top"]["image"]
)
assert torch.allclose(
batch["observation.image.left"]["image"], reconstructed_batch["observation.image.left"]["image"]
)
# Check action tensor
assert torch.allclose(batch["action"], reconstructed_batch["action"])
# Check other fields
assert batch["next.reward"] == reconstructed_batch["next.reward"]
assert batch["next.done"] == reconstructed_batch["next.done"]
assert batch["next.truncated"] == reconstructed_batch["next.truncated"]
assert batch["info"] == reconstructed_batch["info"]
def test_custom_converter():
"""Test that custom converters can still be used."""
def to_tr(batch):
# Custom converter that modifies the reward
tr = _default_batch_to_transition(batch)
# Double the reward
reward = tr.get(TransitionKey.REWARD, 0.0)
new_tr = tr.copy()
new_tr[TransitionKey.REWARD] = reward * 2 if reward is not None else 0.0
return new_tr
def to_batch(tr):
batch = _default_transition_to_batch(tr)
return batch
processor = RobotProcessor(steps=[], to_transition=to_tr, to_output=to_batch)
batch = {
"observation.state": torch.randn(1, 4),
"action": torch.randn(1, 2),
"next.reward": 1.0,
"next.done": False,
}
result = processor(batch)
# Check the reward was doubled by our custom converter
assert result["next.reward"] == 2.0
assert torch.allclose(result["observation.state"], batch["observation.state"])
assert torch.allclose(result["action"], batch["action"])
+628
View File
@@ -0,0 +1,628 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest.mock import Mock
import numpy as np
import pytest
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.processor.normalize_processor import (
NormalizerProcessor,
UnnormalizerProcessor,
_convert_stats_to_tensors,
)
from lerobot.processor.pipeline import RobotProcessor, TransitionKey
def create_transition(
observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None
):
"""Helper to create an EnvTransition dictionary."""
return {
TransitionKey.OBSERVATION: observation,
TransitionKey.ACTION: action,
TransitionKey.REWARD: reward,
TransitionKey.DONE: done,
TransitionKey.TRUNCATED: truncated,
TransitionKey.INFO: info,
TransitionKey.COMPLEMENTARY_DATA: complementary_data,
}
def test_numpy_conversion():
stats = {
"observation.image": {
"mean": np.array([0.5, 0.5, 0.5]),
"std": np.array([0.2, 0.2, 0.2]),
}
}
tensor_stats = _convert_stats_to_tensors(stats)
assert isinstance(tensor_stats["observation.image"]["mean"], torch.Tensor)
assert isinstance(tensor_stats["observation.image"]["std"], torch.Tensor)
assert torch.allclose(tensor_stats["observation.image"]["mean"], torch.tensor([0.5, 0.5, 0.5]))
assert torch.allclose(tensor_stats["observation.image"]["std"], torch.tensor([0.2, 0.2, 0.2]))
def test_tensor_conversion():
stats = {
"action": {
"mean": torch.tensor([0.0, 0.0]),
"std": torch.tensor([1.0, 1.0]),
}
}
tensor_stats = _convert_stats_to_tensors(stats)
assert tensor_stats["action"]["mean"].dtype == torch.float32
assert tensor_stats["action"]["std"].dtype == torch.float32
def test_scalar_conversion():
stats = {
"reward": {
"mean": 0.5,
"std": 0.1,
}
}
tensor_stats = _convert_stats_to_tensors(stats)
assert torch.allclose(tensor_stats["reward"]["mean"], torch.tensor(0.5))
assert torch.allclose(tensor_stats["reward"]["std"], torch.tensor(0.1))
def test_list_conversion():
stats = {
"observation.state": {
"min": [0.0, -1.0, -2.0],
"max": [1.0, 1.0, 2.0],
}
}
tensor_stats = _convert_stats_to_tensors(stats)
assert torch.allclose(tensor_stats["observation.state"]["min"], torch.tensor([0.0, -1.0, -2.0]))
assert torch.allclose(tensor_stats["observation.state"]["max"], torch.tensor([1.0, 1.0, 2.0]))
def test_unsupported_type():
stats = {
"bad_key": {
"mean": "string_value",
}
}
with pytest.raises(TypeError, match="Unsupported type"):
_convert_stats_to_tensors(stats)
# Helper functions to create feature maps and norm maps
def _create_observation_features():
return {
"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96)),
"observation.state": PolicyFeature(FeatureType.STATE, (2,)),
}
def _create_observation_norm_map():
return {
FeatureType.VISUAL: NormalizationMode.MEAN_STD,
FeatureType.STATE: NormalizationMode.MIN_MAX,
}
# Fixtures for observation normalisation tests using NormalizerProcessor
@pytest.fixture
def observation_stats():
return {
"observation.image": {
"mean": np.array([0.5, 0.5, 0.5]),
"std": np.array([0.2, 0.2, 0.2]),
},
"observation.state": {
"min": np.array([0.0, -1.0]),
"max": np.array([1.0, 1.0]),
},
}
@pytest.fixture
def observation_normalizer(observation_stats):
"""Return a NormalizerProcessor that only has observation stats (no action)."""
features = _create_observation_features()
norm_map = _create_observation_norm_map()
return NormalizerProcessor(features=features, norm_map=norm_map, stats=observation_stats)
def test_mean_std_normalization(observation_normalizer):
observation = {
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
"observation.state": torch.tensor([0.5, 0.0]),
}
transition = create_transition(observation=observation)
normalized_transition = observation_normalizer(transition)
normalized_obs = normalized_transition[TransitionKey.OBSERVATION]
# Check mean/std normalization
expected_image = (torch.tensor([0.7, 0.5, 0.3]) - 0.5) / 0.2
assert torch.allclose(normalized_obs["observation.image"], expected_image)
def test_min_max_normalization(observation_normalizer):
observation = {
"observation.state": torch.tensor([0.5, 0.0]),
}
transition = create_transition(observation=observation)
normalized_transition = observation_normalizer(transition)
normalized_obs = normalized_transition[TransitionKey.OBSERVATION]
# Check min/max normalization to [-1, 1]
# For state[0]: 2 * (0.5 - 0.0) / (1.0 - 0.0) - 1 = 0.0
# For state[1]: 2 * (0.0 - (-1.0)) / (1.0 - (-1.0)) - 1 = 0.0
expected_state = torch.tensor([0.0, 0.0])
assert torch.allclose(normalized_obs["observation.state"], expected_state, atol=1e-6)
def test_selective_normalization(observation_stats):
features = _create_observation_features()
norm_map = _create_observation_norm_map()
normalizer = NormalizerProcessor(
features=features, norm_map=norm_map, stats=observation_stats, normalize_keys={"observation.image"}
)
observation = {
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
"observation.state": torch.tensor([0.5, 0.0]),
}
transition = create_transition(observation=observation)
normalized_transition = normalizer(transition)
normalized_obs = normalized_transition[TransitionKey.OBSERVATION]
# Only image should be normalized
assert torch.allclose(normalized_obs["observation.image"], (torch.tensor([0.7, 0.5, 0.3]) - 0.5) / 0.2)
# State should remain unchanged
assert torch.allclose(normalized_obs["observation.state"], observation["observation.state"])
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_device_compatibility(observation_stats):
features = _create_observation_features()
norm_map = _create_observation_norm_map()
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=observation_stats)
observation = {
"observation.image": torch.tensor([0.7, 0.5, 0.3]).cuda(),
}
transition = create_transition(observation=observation)
normalized_transition = normalizer(transition)
normalized_obs = normalized_transition[TransitionKey.OBSERVATION]
assert normalized_obs["observation.image"].device.type == "cuda"
def test_from_lerobot_dataset():
# Mock dataset
mock_dataset = Mock()
mock_dataset.meta.stats = {
"observation.image": {"mean": [0.5], "std": [0.2]},
"action": {"mean": [0.0], "std": [1.0]},
}
features = {
"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96)),
"action": PolicyFeature(FeatureType.ACTION, (1,)),
}
norm_map = {
FeatureType.VISUAL: NormalizationMode.MEAN_STD,
FeatureType.ACTION: NormalizationMode.MEAN_STD,
}
normalizer = NormalizerProcessor.from_lerobot_dataset(mock_dataset, features, norm_map)
# Both observation and action statistics should be present in tensor stats
assert "observation.image" in normalizer._tensor_stats
assert "action" in normalizer._tensor_stats
def test_state_dict_save_load(observation_normalizer):
# Save state
state_dict = observation_normalizer.state_dict()
# Create new normalizer and load state
features = _create_observation_features()
norm_map = _create_observation_norm_map()
new_normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats={})
new_normalizer.load_state_dict(state_dict)
# Test that it works the same
observation = {"observation.image": torch.tensor([0.7, 0.5, 0.3])}
transition = create_transition(observation=observation)
result1 = observation_normalizer(transition)[TransitionKey.OBSERVATION]
result2 = new_normalizer(transition)[TransitionKey.OBSERVATION]
assert torch.allclose(result1["observation.image"], result2["observation.image"])
# Fixtures for ActionUnnormalizer tests
@pytest.fixture
def action_stats_mean_std():
return {
"mean": np.array([0.0, 0.0, 0.0]),
"std": np.array([1.0, 2.0, 0.5]),
}
@pytest.fixture
def action_stats_min_max():
return {
"min": np.array([-1.0, -2.0, 0.0]),
"max": np.array([1.0, 2.0, 1.0]),
}
def _create_action_features():
return {
"action": PolicyFeature(FeatureType.ACTION, (3,)),
}
def _create_action_norm_map_mean_std():
return {
FeatureType.ACTION: NormalizationMode.MEAN_STD,
}
def _create_action_norm_map_min_max():
return {
FeatureType.ACTION: NormalizationMode.MIN_MAX,
}
def test_mean_std_unnormalization(action_stats_mean_std):
features = _create_action_features()
norm_map = _create_action_norm_map_mean_std()
unnormalizer = UnnormalizerProcessor(
features=features, norm_map=norm_map, stats={"action": action_stats_mean_std}
)
normalized_action = torch.tensor([1.0, -0.5, 2.0])
transition = create_transition(action=normalized_action)
unnormalized_transition = unnormalizer(transition)
unnormalized_action = unnormalized_transition[TransitionKey.ACTION]
# action * std + mean
expected = torch.tensor([1.0 * 1.0 + 0.0, -0.5 * 2.0 + 0.0, 2.0 * 0.5 + 0.0])
assert torch.allclose(unnormalized_action, expected)
def test_min_max_unnormalization(action_stats_min_max):
features = _create_action_features()
norm_map = _create_action_norm_map_min_max()
unnormalizer = UnnormalizerProcessor(
features=features, norm_map=norm_map, stats={"action": action_stats_min_max}
)
# Actions in [-1, 1]
normalized_action = torch.tensor([0.0, -1.0, 1.0])
transition = create_transition(action=normalized_action)
unnormalized_transition = unnormalizer(transition)
unnormalized_action = unnormalized_transition[TransitionKey.ACTION]
# Map from [-1, 1] to [min, max]
# (action + 1) / 2 * (max - min) + min
expected = torch.tensor(
[
(0.0 + 1) / 2 * (1.0 - (-1.0)) + (-1.0), # 0.0
(-1.0 + 1) / 2 * (2.0 - (-2.0)) + (-2.0), # -2.0
(1.0 + 1) / 2 * (1.0 - 0.0) + 0.0, # 1.0
]
)
assert torch.allclose(unnormalized_action, expected)
def test_numpy_action_input(action_stats_mean_std):
features = _create_action_features()
norm_map = _create_action_norm_map_mean_std()
unnormalizer = UnnormalizerProcessor(
features=features, norm_map=norm_map, stats={"action": action_stats_mean_std}
)
normalized_action = np.array([1.0, -0.5, 2.0], dtype=np.float32)
transition = create_transition(action=normalized_action)
unnormalized_transition = unnormalizer(transition)
unnormalized_action = unnormalized_transition[TransitionKey.ACTION]
assert isinstance(unnormalized_action, torch.Tensor)
expected = torch.tensor([1.0, -1.0, 1.0])
assert torch.allclose(unnormalized_action, expected)
def test_none_action(action_stats_mean_std):
features = _create_action_features()
norm_map = _create_action_norm_map_mean_std()
unnormalizer = UnnormalizerProcessor(
features=features, norm_map=norm_map, stats={"action": action_stats_mean_std}
)
transition = create_transition()
result = unnormalizer(transition)
# Should return transition unchanged
assert result == transition
def test_action_from_lerobot_dataset():
mock_dataset = Mock()
mock_dataset.meta.stats = {"action": {"mean": [0.0], "std": [1.0]}}
features = {"action": PolicyFeature(FeatureType.ACTION, (1,))}
norm_map = {FeatureType.ACTION: NormalizationMode.MEAN_STD}
unnormalizer = UnnormalizerProcessor.from_lerobot_dataset(mock_dataset, features, norm_map)
assert "mean" in unnormalizer._tensor_stats["action"]
# Fixtures for NormalizerProcessor tests
@pytest.fixture
def full_stats():
return {
"observation.image": {
"mean": np.array([0.5, 0.5, 0.5]),
"std": np.array([0.2, 0.2, 0.2]),
},
"observation.state": {
"min": np.array([0.0, -1.0]),
"max": np.array([1.0, 1.0]),
},
"action": {
"mean": np.array([0.0, 0.0]),
"std": np.array([1.0, 2.0]),
},
}
def _create_full_features():
return {
"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96)),
"observation.state": PolicyFeature(FeatureType.STATE, (2,)),
"action": PolicyFeature(FeatureType.ACTION, (2,)),
}
def _create_full_norm_map():
return {
FeatureType.VISUAL: NormalizationMode.MEAN_STD,
FeatureType.STATE: NormalizationMode.MIN_MAX,
FeatureType.ACTION: NormalizationMode.MEAN_STD,
}
@pytest.fixture
def normalizer_processor(full_stats):
features = _create_full_features()
norm_map = _create_full_norm_map()
return NormalizerProcessor(features=features, norm_map=norm_map, stats=full_stats)
def test_combined_normalization(normalizer_processor):
observation = {
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
"observation.state": torch.tensor([0.5, 0.0]),
}
action = torch.tensor([1.0, -0.5])
transition = create_transition(
observation=observation,
action=action,
reward=1.0,
done=False,
truncated=False,
info={},
complementary_data={},
)
processed_transition = normalizer_processor(transition)
# Check normalized observations
processed_obs = processed_transition[TransitionKey.OBSERVATION]
expected_image = (torch.tensor([0.7, 0.5, 0.3]) - 0.5) / 0.2
assert torch.allclose(processed_obs["observation.image"], expected_image)
# Check normalized action
processed_action = processed_transition[TransitionKey.ACTION]
expected_action = torch.tensor([(1.0 - 0.0) / 1.0, (-0.5 - 0.0) / 2.0])
assert torch.allclose(processed_action, expected_action)
# Check other fields remain unchanged
assert processed_transition[TransitionKey.REWARD] == 1.0
assert not processed_transition[TransitionKey.DONE]
def test_processor_from_lerobot_dataset(full_stats):
# Mock dataset
mock_dataset = Mock()
mock_dataset.meta.stats = full_stats
features = _create_full_features()
norm_map = _create_full_norm_map()
processor = NormalizerProcessor.from_lerobot_dataset(
mock_dataset, features, norm_map, normalize_keys={"observation.image"}
)
assert processor.normalize_keys == {"observation.image"}
assert "observation.image" in processor._tensor_stats
assert "action" in processor._tensor_stats
def test_get_config(full_stats):
features = _create_full_features()
norm_map = _create_full_norm_map()
processor = NormalizerProcessor(
features=features, norm_map=norm_map, stats=full_stats, normalize_keys={"observation.image"}, eps=1e-6
)
config = processor.get_config()
expected_config = {
"normalize_keys": ["observation.image"],
"eps": 1e-6,
"features": {
"observation.image": {"type": "VISUAL", "shape": (3, 96, 96)},
"observation.state": {"type": "STATE", "shape": (2,)},
"action": {"type": "ACTION", "shape": (2,)},
},
"norm_map": {
"VISUAL": "MEAN_STD",
"STATE": "MIN_MAX",
"ACTION": "MEAN_STD",
},
}
assert config == expected_config
def test_integration_with_robot_processor(normalizer_processor):
"""Test integration with RobotProcessor pipeline"""
robot_processor = RobotProcessor([normalizer_processor])
observation = {
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
"observation.state": torch.tensor([0.5, 0.0]),
}
action = torch.tensor([1.0, -0.5])
transition = create_transition(
observation=observation,
action=action,
reward=1.0,
done=False,
truncated=False,
info={},
complementary_data={},
)
processed_transition = robot_processor(transition)
# Verify the processing worked
assert isinstance(processed_transition[TransitionKey.OBSERVATION], dict)
assert isinstance(processed_transition[TransitionKey.ACTION], torch.Tensor)
# Edge case tests
def test_empty_observation():
stats = {"observation.image": {"mean": [0.5], "std": [0.2]}}
features = {"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96))}
norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD}
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
transition = create_transition()
result = normalizer(transition)
assert result == transition
def test_empty_stats():
features = {"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96))}
norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD}
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats={})
observation = {"observation.image": torch.tensor([0.5])}
transition = create_transition(observation=observation)
result = normalizer(transition)
# Should return observation unchanged since no stats are available
assert torch.allclose(
result[TransitionKey.OBSERVATION]["observation.image"], observation["observation.image"]
)
def test_partial_stats():
"""If statistics are incomplete, the value should pass through unchanged."""
stats = {"observation.image": {"mean": [0.5]}} # Missing std / (min,max)
features = {"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96))}
norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD}
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
observation = {"observation.image": torch.tensor([0.7])}
transition = create_transition(observation=observation)
processed = normalizer(transition)[TransitionKey.OBSERVATION]
assert torch.allclose(processed["observation.image"], observation["observation.image"])
def test_missing_action_stats_no_error():
mock_dataset = Mock()
mock_dataset.meta.stats = {"observation.image": {"mean": [0.5], "std": [0.2]}}
features = {"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96))}
norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD}
processor = UnnormalizerProcessor.from_lerobot_dataset(mock_dataset, features, norm_map)
# The tensor stats should not contain the 'action' key
assert "action" not in processor._tensor_stats
def test_serialization_roundtrip(full_stats):
"""Test that features and norm_map can be serialized and deserialized correctly."""
features = _create_full_features()
norm_map = _create_full_norm_map()
original_processor = NormalizerProcessor(
features=features, norm_map=norm_map, stats=full_stats, normalize_keys={"observation.image"}, eps=1e-6
)
# Get config (serialization)
config = original_processor.get_config()
# Create a new processor from the config (deserialization)
new_processor = NormalizerProcessor(
features=config["features"],
norm_map=config["norm_map"],
stats=full_stats,
normalize_keys=set(config["normalize_keys"]),
eps=config["eps"],
)
# Test that both processors work the same way
observation = {
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
"observation.state": torch.tensor([0.5, 0.0]),
}
action = torch.tensor([1.0, -0.5])
transition = create_transition(
observation=observation,
action=action,
reward=1.0,
done=False,
truncated=False,
info={},
complementary_data={},
)
result1 = original_processor(transition)
result2 = new_processor(transition)
# Compare results
assert torch.allclose(
result1[TransitionKey.OBSERVATION]["observation.image"],
result2[TransitionKey.OBSERVATION]["observation.image"],
)
assert torch.allclose(result1[TransitionKey.ACTION], result2[TransitionKey.ACTION])
# Verify features and norm_map are correctly reconstructed
assert new_processor.features.keys() == original_processor.features.keys()
for key in new_processor.features:
assert new_processor.features[key].type == original_processor.features[key].type
assert new_processor.features[key].shape == original_processor.features[key].shape
assert new_processor.norm_map == original_processor.norm_map
@@ -0,0 +1,501 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pytest
import torch
from lerobot.configs.types import FeatureType
from lerobot.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE
from lerobot.processor import (
ImageProcessor,
StateProcessor,
VanillaObservationProcessor,
)
from lerobot.processor.pipeline import TransitionKey
from tests.conftest import assert_contract_is_typed
def create_transition(
observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None
):
"""Helper to create an EnvTransition dictionary."""
return {
TransitionKey.OBSERVATION: observation,
TransitionKey.ACTION: action,
TransitionKey.REWARD: reward,
TransitionKey.DONE: done,
TransitionKey.TRUNCATED: truncated,
TransitionKey.INFO: info,
TransitionKey.COMPLEMENTARY_DATA: complementary_data,
}
def test_process_single_image():
"""Test processing a single image."""
processor = ImageProcessor()
# Create a mock image (H, W, C) format, uint8
image = np.random.randint(0, 256, size=(64, 64, 3), dtype=np.uint8)
observation = {"pixels": image}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that the image was processed correctly
assert "observation.image" in processed_obs
processed_img = processed_obs["observation.image"]
# Check shape: should be (1, 3, 64, 64) - batch, channels, height, width
assert processed_img.shape == (1, 3, 64, 64)
# Check dtype and range
assert processed_img.dtype == torch.float32
assert processed_img.min() >= 0.0
assert processed_img.max() <= 1.0
def test_process_image_dict():
"""Test processing multiple images in a dictionary."""
processor = ImageProcessor()
# Create mock images
image1 = np.random.randint(0, 256, size=(32, 32, 3), dtype=np.uint8)
image2 = np.random.randint(0, 256, size=(48, 48, 3), dtype=np.uint8)
observation = {"pixels": {"camera1": image1, "camera2": image2}}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that both images were processed
assert "observation.images.camera1" in processed_obs
assert "observation.images.camera2" in processed_obs
# Check shapes
assert processed_obs["observation.images.camera1"].shape == (1, 3, 32, 32)
assert processed_obs["observation.images.camera2"].shape == (1, 3, 48, 48)
def test_process_batched_image():
"""Test processing already batched images."""
processor = ImageProcessor()
# Create a batched image (B, H, W, C)
image = np.random.randint(0, 256, size=(2, 64, 64, 3), dtype=np.uint8)
observation = {"pixels": image}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that batch dimension is preserved
assert processed_obs["observation.image"].shape == (2, 3, 64, 64)
def test_invalid_image_format():
"""Test error handling for invalid image formats."""
processor = ImageProcessor()
# Test wrong channel order (channels first)
image = np.random.randint(0, 256, size=(3, 64, 64), dtype=np.uint8)
observation = {"pixels": image}
transition = create_transition(observation=observation)
with pytest.raises(ValueError, match="Expected channel-last images"):
processor(transition)
def test_invalid_image_dtype():
"""Test error handling for invalid image dtype."""
processor = ImageProcessor()
# Test wrong dtype
image = np.random.rand(64, 64, 3).astype(np.float32)
observation = {"pixels": image}
transition = create_transition(observation=observation)
with pytest.raises(ValueError, match="Expected torch.uint8 images"):
processor(transition)
def test_no_pixels_in_observation():
"""Test processor when no pixels are in observation."""
processor = ImageProcessor()
observation = {"other_data": np.array([1, 2, 3])}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Should preserve other data unchanged
assert "other_data" in processed_obs
np.testing.assert_array_equal(processed_obs["other_data"], np.array([1, 2, 3]))
def test_none_observation():
"""Test processor with None observation."""
processor = ImageProcessor()
transition = create_transition()
result = processor(transition)
assert result == transition
def test_serialization_methods():
"""Test serialization methods."""
processor = ImageProcessor()
# Test get_config
config = processor.get_config()
assert isinstance(config, dict)
# Test state_dict
state = processor.state_dict()
assert isinstance(state, dict)
# Test load_state_dict (should not raise)
processor.load_state_dict(state)
# Test reset (should not raise)
processor.reset()
def test_process_environment_state():
"""Test processing environment_state."""
processor = StateProcessor()
env_state = np.array([1.0, 2.0, 3.0], dtype=np.float32)
observation = {"environment_state": env_state}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that environment_state was renamed and processed
assert "observation.environment_state" in processed_obs
assert "environment_state" not in processed_obs
processed_state = processed_obs["observation.environment_state"]
assert processed_state.shape == (1, 3) # Batch dimension added
assert processed_state.dtype == torch.float32
torch.testing.assert_close(processed_state, torch.tensor([[1.0, 2.0, 3.0]]))
def test_process_agent_pos():
"""Test processing agent_pos."""
processor = StateProcessor()
agent_pos = np.array([0.5, -0.5, 1.0], dtype=np.float32)
observation = {"agent_pos": agent_pos}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that agent_pos was renamed and processed
assert "observation.state" in processed_obs
assert "agent_pos" not in processed_obs
processed_state = processed_obs["observation.state"]
assert processed_state.shape == (1, 3) # Batch dimension added
assert processed_state.dtype == torch.float32
torch.testing.assert_close(processed_state, torch.tensor([[0.5, -0.5, 1.0]]))
def test_process_batched_states():
"""Test processing already batched states."""
processor = StateProcessor()
env_state = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32)
agent_pos = np.array([[0.5, -0.5], [1.0, -1.0]], dtype=np.float32)
observation = {"environment_state": env_state, "agent_pos": agent_pos}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that batch dimensions are preserved
assert processed_obs["observation.environment_state"].shape == (2, 2)
assert processed_obs["observation.state"].shape == (2, 2)
def test_process_both_states():
"""Test processing both environment_state and agent_pos."""
processor = StateProcessor()
env_state = np.array([1.0, 2.0], dtype=np.float32)
agent_pos = np.array([0.5, -0.5], dtype=np.float32)
observation = {"environment_state": env_state, "agent_pos": agent_pos, "other_data": "keep_me"}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that both states were processed
assert "observation.environment_state" in processed_obs
assert "observation.state" in processed_obs
# Check that original keys were removed
assert "environment_state" not in processed_obs
assert "agent_pos" not in processed_obs
# Check that other data was preserved
assert processed_obs["other_data"] == "keep_me"
def test_no_states_in_observation():
"""Test processor when no states are in observation."""
processor = StateProcessor()
observation = {"other_data": np.array([1, 2, 3])}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Should preserve data unchanged
np.testing.assert_array_equal(processed_obs, observation)
def test_complete_observation_processing():
"""Test processing a complete observation with both images and states."""
processor = VanillaObservationProcessor()
# Create mock data
image = np.random.randint(0, 256, size=(32, 32, 3), dtype=np.uint8)
env_state = np.array([1.0, 2.0, 3.0], dtype=np.float32)
agent_pos = np.array([0.5, -0.5, 1.0], dtype=np.float32)
observation = {
"pixels": image,
"environment_state": env_state,
"agent_pos": agent_pos,
"other_data": "preserve_me",
}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that image was processed
assert "observation.image" in processed_obs
assert processed_obs["observation.image"].shape == (1, 3, 32, 32)
# Check that states were processed
assert "observation.environment_state" in processed_obs
assert "observation.state" in processed_obs
# Check that original keys were removed
assert "pixels" not in processed_obs
assert "environment_state" not in processed_obs
assert "agent_pos" not in processed_obs
# Check that other data was preserved
assert processed_obs["other_data"] == "preserve_me"
def test_image_only_processing():
"""Test processing observation with only images."""
processor = VanillaObservationProcessor()
image = np.random.randint(0, 256, size=(64, 64, 3), dtype=np.uint8)
observation = {"pixels": image}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
assert "observation.image" in processed_obs
assert len(processed_obs) == 1
def test_state_only_processing():
"""Test processing observation with only states."""
processor = VanillaObservationProcessor()
agent_pos = np.array([1.0, 2.0], dtype=np.float32)
observation = {"agent_pos": agent_pos}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
assert "observation.state" in processed_obs
assert "agent_pos" not in processed_obs
def test_empty_observation():
"""Test processing empty observation."""
processor = VanillaObservationProcessor()
observation = {}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
assert processed_obs == {}
def test_custom_sub_processors():
"""Test ObservationProcessor with custom sub-processors."""
image_proc = ImageProcessor()
state_proc = StateProcessor()
processor = VanillaObservationProcessor(image_processor=image_proc, state_processor=state_proc)
# Should use the provided processors
assert processor.image_processor is image_proc
assert processor.state_processor is state_proc
def test_equivalent_to_original_function():
"""Test that ObservationProcessor produces equivalent results to preprocess_observation."""
# Import the original function for comparison
from lerobot.envs.utils import preprocess_observation
processor = VanillaObservationProcessor()
# Create test data similar to what the original function expects
image = np.random.randint(0, 256, size=(64, 64, 3), dtype=np.uint8)
env_state = np.array([1.0, 2.0, 3.0], dtype=np.float32)
agent_pos = np.array([0.5, -0.5, 1.0], dtype=np.float32)
observation = {"pixels": image, "environment_state": env_state, "agent_pos": agent_pos}
# Process with original function
original_result = preprocess_observation(observation)
# Process with new processor
transition = create_transition(observation=observation)
processor_result = processor(transition)[TransitionKey.OBSERVATION]
# Compare results
assert set(original_result.keys()) == set(processor_result.keys())
for key in original_result:
torch.testing.assert_close(original_result[key], processor_result[key])
def test_equivalent_with_image_dict():
"""Test equivalence with dictionary of images."""
from lerobot.envs.utils import preprocess_observation
processor = VanillaObservationProcessor()
# Create test data with multiple cameras
image1 = np.random.randint(0, 256, size=(32, 32, 3), dtype=np.uint8)
image2 = np.random.randint(0, 256, size=(48, 48, 3), dtype=np.uint8)
agent_pos = np.array([1.0, 2.0], dtype=np.float32)
observation = {"pixels": {"cam1": image1, "cam2": image2}, "agent_pos": agent_pos}
# Process with original function
original_result = preprocess_observation(observation)
# Process with new processor
transition = create_transition(observation=observation)
processor_result = processor(transition)[TransitionKey.OBSERVATION]
# Compare results
assert set(original_result.keys()) == set(processor_result.keys())
for key in original_result:
torch.testing.assert_close(original_result[key], processor_result[key])
def test_image_processor_feature_contract_pixels_to_image(policy_feature_factory):
processor = ImageProcessor()
features = {
"pixels": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
"keep": policy_feature_factory(FeatureType.ENV, (1,)),
}
out = processor.feature_contract(features.copy())
assert OBS_IMAGE in out and out[OBS_IMAGE] == features["pixels"]
assert "pixels" not in out
assert out["keep"] == features["keep"]
assert_contract_is_typed(out)
def test_image_processor_feature_contract_observation_pixels_to_image(policy_feature_factory):
processor = ImageProcessor()
features = {
"observation.pixels": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
"keep": policy_feature_factory(FeatureType.ENV, (1,)),
}
out = processor.feature_contract(features.copy())
assert OBS_IMAGE in out and out[OBS_IMAGE] == features["observation.pixels"]
assert "observation.pixels" not in out
assert out["keep"] == features["keep"]
assert_contract_is_typed(out)
def test_image_processor_feature_contract_multi_camera_and_prefixed(policy_feature_factory):
processor = ImageProcessor()
features = {
"pixels.front": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
"pixels.wrist": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
"observation.pixels.rear": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
"keep": policy_feature_factory(FeatureType.ENV, (7,)),
}
out = processor.feature_contract(features.copy())
assert f"{OBS_IMAGES}.front" in out and out[f"{OBS_IMAGES}.front"] == features["pixels.front"]
assert f"{OBS_IMAGES}.wrist" in out and out[f"{OBS_IMAGES}.wrist"] == features["pixels.wrist"]
assert f"{OBS_IMAGES}.rear" in out and out[f"{OBS_IMAGES}.rear"] == features["observation.pixels.rear"]
assert "pixels.front" not in out and "pixels.wrist" not in out and "observation.pixels.rear" not in out
assert out["keep"] == features["keep"]
assert_contract_is_typed(out)
def test_state_processor_feature_contract_environment_and_agent_pos(policy_feature_factory):
processor = StateProcessor()
features = {
"environment_state": policy_feature_factory(FeatureType.STATE, (3,)),
"agent_pos": policy_feature_factory(FeatureType.STATE, (7,)),
"keep": policy_feature_factory(FeatureType.ENV, (1,)),
}
out = processor.feature_contract(features.copy())
assert OBS_ENV_STATE in out and out[OBS_ENV_STATE] == features["environment_state"]
assert OBS_STATE in out and out[OBS_STATE] == features["agent_pos"]
assert "environment_state" not in out and "agent_pos" not in out
assert out["keep"] == features["keep"]
assert_contract_is_typed(out)
def test_state_processor_feature_contract_prefixed_inputs(policy_feature_factory):
proc = StateProcessor()
features = {
"observation.environment_state": policy_feature_factory(FeatureType.STATE, (2,)),
"observation.agent_pos": policy_feature_factory(FeatureType.STATE, (4,)),
}
out = proc.feature_contract(features.copy())
assert OBS_ENV_STATE in out and out[OBS_ENV_STATE] == features["observation.environment_state"]
assert OBS_STATE in out and out[OBS_STATE] == features["observation.agent_pos"]
assert "environment_state" not in out and "agent_pos" not in out
assert_contract_is_typed(out)
File diff suppressed because it is too large Load Diff
+467
View File
@@ -0,0 +1,467 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tempfile
from pathlib import Path
import numpy as np
import torch
from lerobot.configs.types import FeatureType
from lerobot.processor import ProcessorStepRegistry, RenameProcessor, RobotProcessor, TransitionKey
from tests.conftest import assert_contract_is_typed
def create_transition(
observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None
):
"""Helper to create an EnvTransition dictionary."""
return {
TransitionKey.OBSERVATION: observation,
TransitionKey.ACTION: action,
TransitionKey.REWARD: reward,
TransitionKey.DONE: done,
TransitionKey.TRUNCATED: truncated,
TransitionKey.INFO: info,
TransitionKey.COMPLEMENTARY_DATA: complementary_data,
}
def test_basic_renaming():
"""Test basic key renaming functionality."""
rename_map = {
"old_key1": "new_key1",
"old_key2": "new_key2",
}
processor = RenameProcessor(rename_map=rename_map)
observation = {
"old_key1": torch.tensor([1.0, 2.0]),
"old_key2": np.array([3.0, 4.0]),
"unchanged_key": "keep_me",
}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check renamed keys
assert "new_key1" in processed_obs
assert "new_key2" in processed_obs
assert "old_key1" not in processed_obs
assert "old_key2" not in processed_obs
# Check values are preserved
torch.testing.assert_close(processed_obs["new_key1"], torch.tensor([1.0, 2.0]))
np.testing.assert_array_equal(processed_obs["new_key2"], np.array([3.0, 4.0]))
# Check unchanged key is preserved
assert processed_obs["unchanged_key"] == "keep_me"
def test_empty_rename_map():
"""Test processor with empty rename map (should pass through unchanged)."""
processor = RenameProcessor(rename_map={})
observation = {
"key1": torch.tensor([1.0]),
"key2": "value2",
}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# All keys should be unchanged
assert processed_obs.keys() == observation.keys()
torch.testing.assert_close(processed_obs["key1"], observation["key1"])
assert processed_obs["key2"] == observation["key2"]
def test_none_observation():
"""Test processor with None observation."""
processor = RenameProcessor(rename_map={"old": "new"})
transition = create_transition()
result = processor(transition)
# Should return transition unchanged
assert result == transition
def test_overlapping_rename():
"""Test renaming when new names might conflict."""
rename_map = {
"a": "b",
"b": "c", # This creates a potential conflict
}
processor = RenameProcessor(rename_map=rename_map)
observation = {
"a": 1,
"b": 2,
"x": 3,
}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that renaming happens correctly
assert "a" not in processed_obs
assert processed_obs["b"] == 1 # 'a' renamed to 'b'
assert processed_obs["c"] == 2 # original 'b' renamed to 'c'
assert processed_obs["x"] == 3
def test_partial_rename():
"""Test renaming only some keys."""
rename_map = {
"observation.state": "observation.proprio_state",
"pixels": "observation.image",
}
processor = RenameProcessor(rename_map=rename_map)
observation = {
"observation.state": torch.randn(10),
"pixels": np.random.randint(0, 256, (64, 64, 3), dtype=np.uint8),
"reward": 1.0,
"info": {"episode": 1},
}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check renamed keys
assert "observation.proprio_state" in processed_obs
assert "observation.image" in processed_obs
assert "observation.state" not in processed_obs
assert "pixels" not in processed_obs
# Check unchanged keys
assert processed_obs["reward"] == 1.0
assert processed_obs["info"] == {"episode": 1}
def test_get_config():
"""Test configuration serialization."""
rename_map = {
"old1": "new1",
"old2": "new2",
}
processor = RenameProcessor(rename_map=rename_map)
config = processor.get_config()
assert config == {"rename_map": rename_map}
def test_state_dict():
"""Test state dict (should be empty for RenameProcessor)."""
processor = RenameProcessor(rename_map={"old": "new"})
state = processor.state_dict()
assert state == {}
# Load state dict should work even with empty dict
processor.load_state_dict({})
def test_integration_with_robot_processor():
"""Test integration with RobotProcessor pipeline."""
rename_map = {
"agent_pos": "observation.state",
"pixels": "observation.image",
}
rename_processor = RenameProcessor(rename_map=rename_map)
pipeline = RobotProcessor([rename_processor])
observation = {
"agent_pos": np.array([1.0, 2.0, 3.0]),
"pixels": np.zeros((32, 32, 3), dtype=np.uint8),
"other_data": "preserve_me",
}
transition = create_transition(
observation=observation, reward=0.5, done=False, truncated=False, info={}, complementary_data={}
)
result = pipeline(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check renaming worked through pipeline
assert "observation.state" in processed_obs
assert "observation.image" in processed_obs
assert "agent_pos" not in processed_obs
assert "pixels" not in processed_obs
assert processed_obs["other_data"] == "preserve_me"
# Check other transition elements unchanged
assert result[TransitionKey.REWARD] == 0.5
assert result[TransitionKey.DONE] is False
def test_save_and_load_pretrained():
"""Test saving and loading processor with RobotProcessor."""
rename_map = {
"old_state": "observation.state",
"old_image": "observation.image",
}
processor = RenameProcessor(rename_map=rename_map)
pipeline = RobotProcessor([processor], name="TestRenameProcessor")
with tempfile.TemporaryDirectory() as tmp_dir:
# Save pipeline
pipeline.save_pretrained(tmp_dir)
# Check files were created
config_path = Path(tmp_dir) / "testrenameprocessor.json" # Based on name="TestRenameProcessor"
assert config_path.exists()
# No state files should be created for RenameProcessor
state_files = list(Path(tmp_dir).glob("*.safetensors"))
assert len(state_files) == 0
# Load pipeline
loaded_pipeline = RobotProcessor.from_pretrained(tmp_dir)
assert loaded_pipeline.name == "TestRenameProcessor"
assert len(loaded_pipeline) == 1
# Check that loaded processor works correctly
loaded_processor = loaded_pipeline.steps[0]
assert isinstance(loaded_processor, RenameProcessor)
assert loaded_processor.rename_map == rename_map
# Test functionality after loading
observation = {"old_state": [1, 2, 3], "old_image": "image_data"}
transition = create_transition(observation=observation)
result = loaded_pipeline(transition)
processed_obs = result[TransitionKey.OBSERVATION]
assert "observation.state" in processed_obs
assert "observation.image" in processed_obs
assert processed_obs["observation.state"] == [1, 2, 3]
assert processed_obs["observation.image"] == "image_data"
def test_registry_functionality():
"""Test that RenameProcessor is properly registered."""
# Check that it's registered
assert "rename_processor" in ProcessorStepRegistry.list()
# Get from registry
retrieved_class = ProcessorStepRegistry.get("rename_processor")
assert retrieved_class is RenameProcessor
# Create instance from registry
instance = retrieved_class(rename_map={"old": "new"})
assert isinstance(instance, RenameProcessor)
assert instance.rename_map == {"old": "new"}
def test_registry_based_save_load():
"""Test save/load using registry name instead of module path."""
processor = RenameProcessor(rename_map={"key1": "renamed_key1"})
pipeline = RobotProcessor([processor])
with tempfile.TemporaryDirectory() as tmp_dir:
# Save and load
pipeline.save_pretrained(tmp_dir)
# Verify config uses registry name
import json
with open(Path(tmp_dir) / "robotprocessor.json") as f: # Default name is "RobotProcessor"
config = json.load(f)
assert "registry_name" in config["steps"][0]
assert config["steps"][0]["registry_name"] == "rename_processor"
assert "class" not in config["steps"][0] # Should use registry, not module path
# Load should work
loaded_pipeline = RobotProcessor.from_pretrained(tmp_dir)
loaded_processor = loaded_pipeline.steps[0]
assert isinstance(loaded_processor, RenameProcessor)
assert loaded_processor.rename_map == {"key1": "renamed_key1"}
def test_chained_rename_processors():
"""Test multiple RenameProcessors in a pipeline."""
# First processor: rename raw keys to intermediate format
processor1 = RenameProcessor(
rename_map={
"pos": "agent_position",
"img": "camera_image",
}
)
# Second processor: rename to final format
processor2 = RenameProcessor(
rename_map={
"agent_position": "observation.state",
"camera_image": "observation.image",
}
)
pipeline = RobotProcessor([processor1, processor2])
observation = {
"pos": np.array([1.0, 2.0]),
"img": "image_data",
"extra": "keep_me",
}
transition = create_transition(observation=observation)
# Step through to see intermediate results
results = list(pipeline.step_through(transition))
# After first processor
assert "agent_position" in results[1][TransitionKey.OBSERVATION]
assert "camera_image" in results[1][TransitionKey.OBSERVATION]
# After second processor
final_obs = results[2][TransitionKey.OBSERVATION]
assert "observation.state" in final_obs
assert "observation.image" in final_obs
assert final_obs["extra"] == "keep_me"
# Original keys should be gone
assert "pos" not in final_obs
assert "img" not in final_obs
assert "agent_position" not in final_obs
assert "camera_image" not in final_obs
def test_nested_observation_rename():
"""Test renaming with nested observation structures."""
rename_map = {
"observation.images.left": "observation.camera.left_view",
"observation.images.right": "observation.camera.right_view",
"observation.proprio": "observation.proprioception",
}
processor = RenameProcessor(rename_map=rename_map)
observation = {
"observation.images.left": torch.randn(3, 64, 64),
"observation.images.right": torch.randn(3, 64, 64),
"observation.proprio": torch.randn(7),
"observation.gripper": torch.tensor([0.0]), # Not renamed
}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check renames
assert "observation.camera.left_view" in processed_obs
assert "observation.camera.right_view" in processed_obs
assert "observation.proprioception" in processed_obs
# Check unchanged key
assert "observation.gripper" in processed_obs
# Check old keys removed
assert "observation.images.left" not in processed_obs
assert "observation.images.right" not in processed_obs
assert "observation.proprio" not in processed_obs
def test_value_types_preserved():
"""Test that various value types are preserved during renaming."""
rename_map = {"old_tensor": "new_tensor", "old_array": "new_array", "old_scalar": "new_scalar"}
processor = RenameProcessor(rename_map=rename_map)
tensor_value = torch.randn(3, 3)
array_value = np.random.rand(2, 2)
observation = {
"old_tensor": tensor_value,
"old_array": array_value,
"old_scalar": 42,
"old_string": "hello",
"old_dict": {"nested": "value"},
"old_list": [1, 2, 3],
}
transition = create_transition(observation=observation)
result = processor(transition)
processed_obs = result[TransitionKey.OBSERVATION]
# Check that values and types are preserved
assert torch.equal(processed_obs["new_tensor"], tensor_value)
assert np.array_equal(processed_obs["new_array"], array_value)
assert processed_obs["new_scalar"] == 42
assert processed_obs["old_string"] == "hello"
assert processed_obs["old_dict"] == {"nested": "value"}
assert processed_obs["old_list"] == [1, 2, 3]
def test_feature_contract_basic_renaming(policy_feature_factory):
processor = RenameProcessor(rename_map={"a": "x", "b": "y"})
features = {
"a": policy_feature_factory(FeatureType.STATE, (2,)),
"b": policy_feature_factory(FeatureType.ACTION, (3,)),
"c": policy_feature_factory(FeatureType.ENV, (1,)),
}
out = processor.feature_contract(features.copy())
# Values preserved and typed
assert out["x"] == features["a"]
assert out["y"] == features["b"]
assert out["c"] == features["c"]
assert_contract_is_typed(out)
# Input not mutated
assert set(features) == {"a", "b", "c"}
def test_feature_contract_overlapping_keys(policy_feature_factory):
# Overlapping renames: both 'a' and 'b' exist. 'a'->'b', 'b'->'c'
processor = RenameProcessor(rename_map={"a": "b", "b": "c"})
features = {
"a": policy_feature_factory(FeatureType.STATE, (1,)),
"b": policy_feature_factory(FeatureType.STATE, (2,)),
}
out = processor.feature_contract(features)
assert set(out) == {"b", "c"}
assert out["b"] == features["a"] # 'a' renamed to'b'
assert out["c"] == features["b"] # 'b' renamed to 'c'
assert_contract_is_typed(out)
def test_feature_contract_chained_processors(policy_feature_factory):
# Chain two rename processors at the contract level
processor1 = RenameProcessor(rename_map={"pos": "agent_position", "img": "camera_image"})
processor2 = RenameProcessor(
rename_map={"agent_position": "observation.state", "camera_image": "observation.image"}
)
pipeline = RobotProcessor([processor1, processor2])
spec = {
"pos": policy_feature_factory(FeatureType.STATE, (7,)),
"img": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
"extra": policy_feature_factory(FeatureType.ENV, (1,)),
}
out = pipeline.feature_contract(initial_features=spec)
assert set(out) == {"observation.state", "observation.image", "extra"}
assert out["observation.state"] == spec["pos"]
assert out["observation.image"] == spec["img"]
assert out["extra"] == spec["extra"]
assert_contract_is_typed(out)