mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-15 08:39:49 +00:00
Compare commits
56 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b2d7eecdb4 | |||
| 0710f3a0f1 | |||
| 9effc5214f | |||
| b292dbbc55 | |||
| f49280e89b | |||
| ff38a51df9 | |||
| cfa672129e | |||
| e6e1edfd74 | |||
| 384101731e | |||
| 1fdbecad3c | |||
| 2c4e888c7f | |||
| 5ced72e6b8 | |||
| 907023f9f7 | |||
| 4ba23ea029 | |||
| 409ac0baca | |||
| 699363f9fc | |||
| ae7a54de57 | |||
| fb9139b882 | |||
| 9fe3a3fb17 | |||
| 26cb9a24c3 | |||
| 77106697c3 | |||
| 75bc44c166 | |||
| f2b79656eb | |||
| 14c2ece004 | |||
| 35612c61e1 | |||
| f7bb3e2d90 | |||
| 1e0d667a22 | |||
| 33969a0337 | |||
| fa26290e8c | |||
| e9f7f5127b | |||
| 097842c70f | |||
| 3b8a3a32a0 | |||
| 1c56779dd9 | |||
| 83a4338f8b | |||
| 730c7b2f35 | |||
| 116059a43e | |||
| b08149a113 | |||
| c227107f60 | |||
| 01dc289f3d | |||
| 6830ca7645 | |||
| ed42c71fc3 | |||
| e0139065bd | |||
| e509f255af | |||
| e2fcd140b0 | |||
| 2a7a0e6129 | |||
| 9f33791b19 | |||
| 453e0a995f | |||
| 8ebf79c494 | |||
| 8774aec304 | |||
| ac742c9f0d | |||
| cd13f1ecfd | |||
| 9aa632968f | |||
| 62caaf07b0 | |||
| 3355f04ca6 | |||
| 769f531603 | |||
| f6c7287ae7 |
@@ -13,6 +13,8 @@
|
||||
title: Cameras
|
||||
- local: integrate_hardware
|
||||
title: Bring Your Own Hardware
|
||||
- local: processor_tutorial
|
||||
title: RobotProcessor Pipeline
|
||||
- local: hilserl
|
||||
title: Train a Robot with RL
|
||||
- local: hilserl_sim
|
||||
|
||||
+125
-55
@@ -56,27 +56,41 @@ pip install -e ".[hilserl]"
|
||||
|
||||
### Understanding Configuration
|
||||
|
||||
The training process begins with proper configuration for the HILSerl environment. The configuration class of interest is `HILSerlRobotEnvConfig` in `lerobot/envs/configs.py`. Which is defined as:
|
||||
The training process begins with proper configuration for the HILSerl environment. The main configuration class is `GymManipulatorConfig` in `lerobot/scripts/rl/gym_manipulator.py`, which contains nested `HILSerlRobotEnvConfig` and `DatasetConfig`. The configuration is organized into focused, nested sub-configs:
|
||||
|
||||
<!-- prettier-ignore-start -->
|
||||
```python
|
||||
class GymManipulatorConfig:
|
||||
env: HILSerlRobotEnvConfig # Environment configuration (nested)
|
||||
dataset: DatasetConfig # Dataset recording/replay configuration (nested)
|
||||
mode: str | None = None # "record", "replay", or None (for training)
|
||||
|
||||
class HILSerlRobotEnvConfig(EnvConfig):
|
||||
robot: RobotConfig | None = None # Main robot agent (defined in `lerobot/robots`)
|
||||
teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm, (defined in `lerobot/teleoperators`)
|
||||
wrapper: EnvTransformConfig | None = None # Environment wrapper settings; check `lerobot/scripts/server/gym_manipulator.py`
|
||||
fps: int = 10 # Control frequency
|
||||
teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm
|
||||
processor: HILSerlProcessorConfig # Processing pipeline configuration (nested)
|
||||
name: str = "real_robot" # Environment name
|
||||
mode: str = None # "record", "replay", or None (for training)
|
||||
repo_id: str | None = None # LeRobot dataset repository ID
|
||||
dataset_root: str | None = None # Local dataset root (optional)
|
||||
task: str = "" # Task identifier
|
||||
num_episodes: int = 10 # Number of episodes for recording
|
||||
episode: int = 0 # episode index for replay
|
||||
device: str = "cuda" # Compute device
|
||||
push_to_hub: bool = True # Whether to push the recorded datasets to Hub
|
||||
pretrained_policy_name_or_path: str | None = None # For policy loading
|
||||
reward_classifier_pretrained_path: str | None = None # For reward model
|
||||
number_of_steps_after_success: int = 0 # For reward classifier, collect more positive examples after a success to train a classifier
|
||||
fps: int = 30 # Control frequency
|
||||
|
||||
# Nested processor configuration
|
||||
class HILSerlProcessorConfig:
|
||||
control_mode: str = "gamepad" # Control mode
|
||||
observation: ObservationConfig # Observation processing settings
|
||||
image_preprocessing: ImagePreprocessingConfig # Image crop/resize settings
|
||||
gripper: GripperConfig # Gripper control and penalty settings
|
||||
reset: ResetConfig # Environment reset and timing settings
|
||||
inverse_kinematics: InverseKinematicsConfig # IK processing settings
|
||||
reward_classifier: RewardClassifierConfig # Reward classifier settings
|
||||
|
||||
# Dataset configuration
|
||||
class DatasetConfig:
|
||||
repo_id: str # LeRobot dataset repository ID
|
||||
dataset_root: str | None = None # Local dataset root (optional)
|
||||
task: str # Task identifier
|
||||
num_episodes: int # Number of episodes for recording
|
||||
episode: int # Episode index for replay
|
||||
push_to_hub: bool # Whether to push datasets to Hub
|
||||
```
|
||||
<!-- prettier-ignore-end -->
|
||||
|
||||
@@ -130,22 +144,31 @@ With the bounds defined, you can safely collect demonstrations for training. Tra
|
||||
|
||||
Create a configuration file for recording demonstrations (or edit an existing one like [env_config_so100.json](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/env_config_so100.json)):
|
||||
|
||||
1. Set `mode` to `"record"`
|
||||
2. Specify a unique `repo_id` for your dataset (e.g., "username/task_name")
|
||||
3. Set `num_episodes` to the number of demonstrations you want to collect
|
||||
4. Set `crop_params_dict` to `null` initially (we'll determine crops later)
|
||||
5. Configure `robot`, `cameras`, and other hardware settings
|
||||
1. Set `mode` to `"record"` at the root level
|
||||
2. Specify a unique `repo_id` for your dataset in the `dataset` section (e.g., "username/task_name")
|
||||
3. Set `num_episodes` in the `dataset` section to the number of demonstrations you want to collect
|
||||
4. Set `env.processor.image_preprocessing.crop_params_dict` to `{}` initially (we'll determine crops later)
|
||||
5. Configure `env.robot`, `env.teleop`, and other hardware settings in the `env` section
|
||||
|
||||
Example configuration section:
|
||||
|
||||
```json
|
||||
"mode": "record",
|
||||
"repo_id": "username/pick_lift_cube",
|
||||
"dataset_root": null,
|
||||
"task": "pick_and_lift",
|
||||
"num_episodes": 15,
|
||||
"episode": 0,
|
||||
"push_to_hub": true
|
||||
{
|
||||
"env": {
|
||||
"type": "gym_manipulator",
|
||||
"fps": 10
|
||||
// ... robot, teleop, processor configs ...
|
||||
},
|
||||
"dataset": {
|
||||
"repo_id": "username/pick_lift_cube",
|
||||
"dataset_root": null,
|
||||
"task": "pick_and_lift",
|
||||
"num_episodes": 15,
|
||||
"episode": 0,
|
||||
"push_to_hub": true
|
||||
},
|
||||
"mode": "record"
|
||||
}
|
||||
```
|
||||
|
||||
### Using a Teleoperation Device
|
||||
@@ -191,10 +214,17 @@ The gamepad provides a very convenient way to control the robot and the episode
|
||||
To setup the gamepad, you need to set the `control_mode` to `"gamepad"` and define the `teleop` section in the configuration file.
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"teleop": {
|
||||
"type": "gamepad",
|
||||
"use_gripper": true
|
||||
"type": "gamepad",
|
||||
"use_gripper": true
|
||||
},
|
||||
"processor": {
|
||||
"control_mode": "gamepad"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
<p align="center">
|
||||
@@ -216,11 +246,18 @@ The SO101 leader arm has reduced gears that allows it to move and track the foll
|
||||
To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file.
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"teleop": {
|
||||
"type": "so101_leader",
|
||||
"port": "/dev/tty.usbmodem585A0077921", # check your port number
|
||||
"use_degrees": true
|
||||
"type": "so101_leader",
|
||||
"port": "/dev/tty.usbmodem585A0077921",
|
||||
"use_degrees": true
|
||||
},
|
||||
"processor": {
|
||||
"control_mode": "leader"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
In order to annotate the success/failure of the episode, **you will need** to use a keyboard to press `s` for success, `esc` for failure.
|
||||
@@ -251,7 +288,7 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/e
|
||||
|
||||
During recording:
|
||||
|
||||
1. The robot will reset to the initial position defined in the configuration file `fixed_reset_joint_positions`
|
||||
1. The robot will reset to the initial position defined in the configuration file `env.processor.reset.fixed_reset_joint_positions`
|
||||
2. Complete the task successfully
|
||||
3. The episode ends with a reward of 1 when you press the "success" button
|
||||
4. If the time limit is reached, or the fail button is pressed, the episode ends with a reward of 0
|
||||
@@ -310,11 +347,19 @@ observation.images.front: [180, 250, 120, 150]
|
||||
Add these crop parameters to your training configuration:
|
||||
|
||||
```json
|
||||
"crop_params_dict": {
|
||||
"observation.images.side": [180, 207, 180, 200],
|
||||
"observation.images.front": [180, 250, 120, 150]
|
||||
},
|
||||
"resize_size": [128, 128]
|
||||
{
|
||||
"env": {
|
||||
"processor": {
|
||||
"image_preprocessing": {
|
||||
"crop_params_dict": {
|
||||
"observation.images.side": [180, 207, 180, 200],
|
||||
"observation.images.front": [180, 250, 120, 150]
|
||||
},
|
||||
"resize_size": [128, 128]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Recommended image resolution**
|
||||
@@ -343,26 +388,35 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/r
|
||||
|
||||
**Key Parameters for Data Collection**
|
||||
|
||||
- **mode**: set it to `"record"` to collect a dataset
|
||||
- **repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub
|
||||
- **num_episodes**: Number of episodes to record
|
||||
- **number_of_steps_after_success**: Number of additional frames to record after a success (reward=1) is detected
|
||||
- **fps**: Number of frames per second to record
|
||||
- **push_to_hub**: Whether to push the dataset to the hub
|
||||
- **mode**: set it to `"record"` to collect a dataset (at root level)
|
||||
- **dataset.repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub
|
||||
- **dataset.num_episodes**: Number of episodes to record
|
||||
- **env.processor.reset.number_of_steps_after_success**: Number of additional frames to record after a success (reward=1) is detected
|
||||
- **env.fps**: Number of frames per second to record
|
||||
- **dataset.push_to_hub**: Whether to push the dataset to the hub
|
||||
|
||||
The `number_of_steps_after_success` parameter is crucial as it allows you to collect more positive examples. When a success is detected, the system will continue recording for the specified number of steps while maintaining the reward=1 label. Otherwise, there won't be enough states in the dataset labeled to 1 to train a good classifier.
|
||||
The `env.processor.reset.number_of_steps_after_success` parameter is crucial as it allows you to collect more positive examples. When a success is detected, the system will continue recording for the specified number of steps while maintaining the reward=1 label. Otherwise, there won't be enough states in the dataset labeled to 1 to train a good classifier.
|
||||
|
||||
Example configuration section for data collection:
|
||||
|
||||
```json
|
||||
{
|
||||
"mode": "record",
|
||||
"repo_id": "hf_username/dataset_name",
|
||||
"dataset_root": "data/your_dataset",
|
||||
"num_episodes": 20,
|
||||
"push_to_hub": true,
|
||||
"fps": 10,
|
||||
"number_of_steps_after_success": 15
|
||||
"env": {
|
||||
"type": "gym_manipulator",
|
||||
"fps": 10,
|
||||
"processor": {
|
||||
"reset": {
|
||||
"number_of_steps_after_success": 15
|
||||
}
|
||||
}
|
||||
},
|
||||
"dataset": {
|
||||
"repo_id": "hf_username/dataset_name",
|
||||
"dataset_root": "data/your_dataset",
|
||||
"num_episodes": 20,
|
||||
"push_to_hub": true
|
||||
},
|
||||
"mode": "record"
|
||||
}
|
||||
```
|
||||
|
||||
@@ -421,9 +475,17 @@ To use your trained reward classifier, configure the `HILSerlRobotEnvConfig` to
|
||||
|
||||
<!-- prettier-ignore-start -->
|
||||
```python
|
||||
env_config = HILSerlRobotEnvConfig(
|
||||
reward_classifier_pretrained_path="path_to_your_pretrained_trained_model",
|
||||
# Other environment parameters
|
||||
config = GymManipulatorConfig(
|
||||
env=HILSerlRobotEnvConfig(
|
||||
processor=HILSerlProcessorConfig(
|
||||
reward_classifier=RewardClassifierConfig(
|
||||
pretrained_path="path_to_your_pretrained_trained_model"
|
||||
)
|
||||
),
|
||||
# Other environment parameters
|
||||
),
|
||||
dataset=DatasetConfig(...),
|
||||
mode=None # For training
|
||||
)
|
||||
```
|
||||
<!-- prettier-ignore-end -->
|
||||
@@ -432,7 +494,15 @@ or set the argument in the json config file.
|
||||
|
||||
```json
|
||||
{
|
||||
"reward_classifier_pretrained_path": "path_to_your_pretrained_model"
|
||||
"env": {
|
||||
"processor": {
|
||||
"reward_classifier": {
|
||||
"pretrained_path": "path_to_your_pretrained_model",
|
||||
"success_threshold": 0.7,
|
||||
"success_reward": 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
+58
-85
@@ -161,35 +161,74 @@ class XarmEnv(EnvConfig):
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoRecordConfig:
|
||||
"""Configuration for video recording in ManiSkill environments."""
|
||||
|
||||
enabled: bool = False
|
||||
record_dir: str = "videos"
|
||||
trajectory_name: str = "trajectory"
|
||||
class ImagePreprocessingConfig:
|
||||
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None
|
||||
resize_size: tuple[int, int] | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnvTransformConfig:
|
||||
"""Configuration for environment wrappers."""
|
||||
class RewardClassifierConfig:
|
||||
"""Configuration for reward classification."""
|
||||
|
||||
pretrained_path: str | None = None
|
||||
success_threshold: float = 0.5
|
||||
success_reward: float = 1.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class InverseKinematicsConfig:
|
||||
"""Configuration for inverse kinematics processing."""
|
||||
|
||||
urdf_path: str | None = None
|
||||
target_frame_name: str | None = None
|
||||
end_effector_bounds: dict[str, list[float]] | None = None
|
||||
end_effector_step_sizes: dict[str, float] | None = None
|
||||
max_gripper_pos: float | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObservationConfig:
|
||||
"""Configuration for observation processing."""
|
||||
|
||||
# ee_action_space_params: EEActionSpaceConfig = field(default_factory=EEActionSpaceConfig)
|
||||
control_mode: str = "gamepad"
|
||||
display_cameras: bool = False
|
||||
add_joint_velocity_to_observation: bool = False
|
||||
add_current_to_observation: bool = False
|
||||
add_ee_pose_to_observation: bool = False
|
||||
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None
|
||||
resize_size: tuple[int, int] | None = None
|
||||
control_time_s: float = 20.0
|
||||
fixed_reset_joint_positions: Any | None = None
|
||||
reset_time_s: float = 5.0
|
||||
display_cameras: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class GripperConfig:
|
||||
"""Configuration for gripper control and penalties."""
|
||||
|
||||
use_gripper: bool = True
|
||||
gripper_quantization_threshold: float | None = 0.8
|
||||
gripper_penalty: float = 0.0
|
||||
gripper_penalty_in_reward: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResetConfig:
|
||||
"""Configuration for environment reset behavior."""
|
||||
|
||||
fixed_reset_joint_positions: Any | None = None
|
||||
reset_time_s: float = 5.0
|
||||
control_time_s: float = 20.0
|
||||
terminate_on_success: bool = True
|
||||
number_of_steps_after_success: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class HILSerlProcessorConfig:
|
||||
"""Configuration for environment processing pipeline."""
|
||||
|
||||
control_mode: str = "gamepad"
|
||||
observation: ObservationConfig = field(default_factory=ObservationConfig)
|
||||
image_preprocessing: ImagePreprocessingConfig = field(default_factory=ImagePreprocessingConfig)
|
||||
gripper: GripperConfig = field(default_factory=GripperConfig)
|
||||
reset: ResetConfig = field(default_factory=ResetConfig)
|
||||
inverse_kinematics: InverseKinematicsConfig = field(default_factory=InverseKinematicsConfig)
|
||||
reward_classifier: RewardClassifierConfig = field(default_factory=RewardClassifierConfig)
|
||||
|
||||
|
||||
@EnvConfig.register_subclass(name="gym_manipulator")
|
||||
@dataclass
|
||||
class HILSerlRobotEnvConfig(EnvConfig):
|
||||
@@ -197,77 +236,11 @@ class HILSerlRobotEnvConfig(EnvConfig):
|
||||
|
||||
robot: RobotConfig | None = None
|
||||
teleop: TeleoperatorConfig | None = None
|
||||
wrapper: EnvTransformConfig | None = None
|
||||
fps: int = 10
|
||||
processor: HILSerlProcessorConfig = field(default_factory=HILSerlProcessorConfig)
|
||||
|
||||
name: str = "real_robot"
|
||||
mode: str | None = None # Either "record", "replay", None
|
||||
repo_id: str | None = None
|
||||
dataset_root: str | None = None
|
||||
task: str | None = ""
|
||||
num_episodes: int = 10 # only for record mode
|
||||
episode: int = 0
|
||||
device: str = "cuda"
|
||||
push_to_hub: bool = True
|
||||
pretrained_policy_name_or_path: str | None = None
|
||||
reward_classifier_pretrained_path: str | None = None
|
||||
# For the reward classifier, to record more positive examples after a success
|
||||
number_of_steps_after_success: int = 0
|
||||
|
||||
@property
|
||||
def gym_kwargs(self) -> dict:
|
||||
return {}
|
||||
|
||||
|
||||
@EnvConfig.register_subclass("hil")
|
||||
@dataclass
|
||||
class HILEnvConfig(EnvConfig):
|
||||
"""Configuration for the HIL environment."""
|
||||
|
||||
name: str = "PandaPickCube"
|
||||
task: str | None = "PandaPickCubeKeyboard-v0"
|
||||
use_viewer: bool = True
|
||||
gripper_penalty: float = 0.0
|
||||
use_gamepad: bool = True
|
||||
state_dim: int = 18
|
||||
action_dim: int = 4
|
||||
fps: int = 100
|
||||
episode_length: int = 100
|
||||
video_record: VideoRecordConfig = field(default_factory=VideoRecordConfig)
|
||||
features: dict[str, PolicyFeature] = field(
|
||||
default_factory=lambda: {
|
||||
"action": PolicyFeature(type=FeatureType.ACTION, shape=(4,)),
|
||||
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 128, 128)),
|
||||
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(18,)),
|
||||
}
|
||||
)
|
||||
features_map: dict[str, str] = field(
|
||||
default_factory=lambda: {
|
||||
"action": ACTION,
|
||||
"observation.image": OBS_IMAGE,
|
||||
"observation.state": OBS_STATE,
|
||||
}
|
||||
)
|
||||
################# args from hilserlrobotenv
|
||||
reward_classifier_pretrained_path: str | None = None
|
||||
robot_config: RobotConfig | None = None
|
||||
teleop_config: TeleoperatorConfig | None = None
|
||||
wrapper: EnvTransformConfig | None = None
|
||||
mode: str | None = None # Either "record", "replay", None
|
||||
repo_id: str | None = None
|
||||
dataset_root: str | None = None
|
||||
num_episodes: int = 10 # only for record mode
|
||||
episode: int = 0
|
||||
device: str = "cuda"
|
||||
push_to_hub: bool = True
|
||||
pretrained_policy_name_or_path: str | None = None
|
||||
# For the reward classifier, to record more positive examples after a success
|
||||
number_of_steps_after_success: int = 0
|
||||
############################
|
||||
|
||||
@property
|
||||
def gym_kwargs(self) -> dict:
|
||||
return {
|
||||
"use_viewer": self.use_viewer,
|
||||
"use_gamepad": self.use_gamepad,
|
||||
"gripper_penalty": self.gripper_penalty,
|
||||
}
|
||||
|
||||
+21
-45
@@ -16,10 +16,8 @@
|
||||
import warnings
|
||||
from typing import Any
|
||||
|
||||
import einops
|
||||
import gymnasium as gym
|
||||
import numpy as np
|
||||
import torch
|
||||
from torch import Tensor
|
||||
|
||||
from lerobot.configs.types import FeatureType, PolicyFeature
|
||||
@@ -28,62 +26,40 @@ from lerobot.utils.utils import get_channel_first_image_shape
|
||||
|
||||
|
||||
def preprocess_observation(observations: dict[str, np.ndarray]) -> dict[str, Tensor]:
|
||||
# TODO(aliberts, rcadene): refactor this to use features from the environment (no hardcoding)
|
||||
"""Convert environment observation to LeRobot format observation.
|
||||
|
||||
This function uses the new pipeline system internally but maintains
|
||||
backward compatibility with the original interface.
|
||||
|
||||
Args:
|
||||
observation: Dictionary of observation batches from a Gym vector environment.
|
||||
Returns:
|
||||
Dictionary of observation batches with keys renamed to LeRobot format and values as tensors.
|
||||
"""
|
||||
# map to expected inputs for the policy
|
||||
return_observations = {}
|
||||
if "pixels" in observations:
|
||||
if isinstance(observations["pixels"], dict):
|
||||
imgs = {f"observation.images.{key}": img for key, img in observations["pixels"].items()}
|
||||
else:
|
||||
imgs = {"observation.image": observations["pixels"]}
|
||||
from lerobot.processor import RobotProcessor, TransitionKey, VanillaObservationProcessor
|
||||
|
||||
for imgkey, img in imgs.items():
|
||||
# TODO(aliberts, rcadene): use transforms.ToTensor()?
|
||||
img = torch.from_numpy(img)
|
||||
# Create processor with observation processor
|
||||
processor = RobotProcessor([VanillaObservationProcessor()])
|
||||
|
||||
# When preprocessing observations in a non-vectorized environment, we need to add a batch dimension.
|
||||
# This is the case for human-in-the-loop RL where there is only one environment.
|
||||
if img.ndim == 3:
|
||||
img = img.unsqueeze(0)
|
||||
# sanity check that images are channel last
|
||||
_, h, w, c = img.shape
|
||||
assert c < h and c < w, f"expect channel last images, but instead got {img.shape=}"
|
||||
# Create transition dictionary and process
|
||||
transition = {
|
||||
TransitionKey.OBSERVATION: observations,
|
||||
TransitionKey.ACTION: None,
|
||||
TransitionKey.REWARD: None,
|
||||
TransitionKey.DONE: None,
|
||||
TransitionKey.TRUNCATED: None,
|
||||
TransitionKey.INFO: None,
|
||||
TransitionKey.COMPLEMENTARY_DATA: None,
|
||||
}
|
||||
result = processor(transition)
|
||||
|
||||
# sanity check that images are uint8
|
||||
assert img.dtype == torch.uint8, f"expect torch.uint8, but instead {img.dtype=}"
|
||||
|
||||
# convert to channel first of type float32 in range [0,1]
|
||||
img = einops.rearrange(img, "b h w c -> b c h w").contiguous()
|
||||
img = img.type(torch.float32)
|
||||
img /= 255
|
||||
|
||||
return_observations[imgkey] = img
|
||||
|
||||
if "environment_state" in observations:
|
||||
env_state = torch.from_numpy(observations["environment_state"]).float()
|
||||
if env_state.dim() == 1:
|
||||
env_state = env_state.unsqueeze(0)
|
||||
|
||||
return_observations["observation.environment_state"] = env_state
|
||||
|
||||
# TODO(rcadene): enable pixels only baseline with `obs_type="pixels"` in environment by removing
|
||||
agent_pos = torch.from_numpy(observations["agent_pos"]).float()
|
||||
if agent_pos.dim() == 1:
|
||||
agent_pos = agent_pos.unsqueeze(0)
|
||||
return_observations["observation.state"] = agent_pos
|
||||
|
||||
return return_observations
|
||||
# Extract and return the processed observation
|
||||
return result[TransitionKey.OBSERVATION]
|
||||
|
||||
|
||||
def env_to_policy_features(env_cfg: EnvConfig) -> dict[str, PolicyFeature]:
|
||||
# TODO(aliberts, rcadene): remove this hardcoding of keys and just use the nested keys as is
|
||||
# (need to also refactor preprocess_observation and externalize normalization from policies)
|
||||
# (need to externalize normalization from policies)
|
||||
policy_features = {}
|
||||
for key, ft in env_cfg.features.items():
|
||||
if ft.type is FeatureType.VISUAL:
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from .device_processor import DeviceProcessor
|
||||
from .hil_processor import (
|
||||
GripperPenaltyProcessor,
|
||||
ImageCropResizeProcessor,
|
||||
InterventionActionProcessor,
|
||||
TimeLimitProcessor,
|
||||
)
|
||||
from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor
|
||||
from .observation_processor import (
|
||||
ImageProcessor,
|
||||
StateProcessor,
|
||||
VanillaObservationProcessor,
|
||||
)
|
||||
from .pipeline import (
|
||||
ActionProcessor,
|
||||
DoneProcessor,
|
||||
EnvTransition,
|
||||
IdentityProcessor,
|
||||
InfoProcessor,
|
||||
ObservationProcessor,
|
||||
ProcessorStep,
|
||||
ProcessorStepRegistry,
|
||||
RewardProcessor,
|
||||
RobotProcessor,
|
||||
TransitionKey,
|
||||
TruncatedProcessor,
|
||||
)
|
||||
from .rename_processor import RenameProcessor
|
||||
from .robot_processor import (
|
||||
InverseKinematicsProcessor,
|
||||
JointVelocityProcessor,
|
||||
MotorCurrentProcessor,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"ActionProcessor",
|
||||
"DeviceProcessor",
|
||||
"DoneProcessor",
|
||||
"EnvTransition",
|
||||
"GripperPenaltyProcessor",
|
||||
"IdentityProcessor",
|
||||
"ImageCropResizeProcessor",
|
||||
"ImageProcessor",
|
||||
"InfoProcessor",
|
||||
"InterventionActionProcessor",
|
||||
"InverseKinematicsProcessor",
|
||||
"JointVelocityProcessor",
|
||||
"MotorCurrentProcessor",
|
||||
"NormalizerProcessor",
|
||||
"UnnormalizerProcessor",
|
||||
"ObservationProcessor",
|
||||
"ProcessorStep",
|
||||
"ProcessorStepRegistry",
|
||||
"RenameProcessor",
|
||||
"RewardProcessor",
|
||||
"RobotProcessor",
|
||||
"StateProcessor",
|
||||
"TimeLimitProcessor",
|
||||
"TransitionKey",
|
||||
"TruncatedProcessor",
|
||||
"VanillaObservationProcessor",
|
||||
]
|
||||
@@ -0,0 +1,80 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
from lerobot.processor.pipeline import EnvTransition, TransitionKey
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeviceProcessor:
|
||||
"""Processes transitions by moving tensors to the specified device.
|
||||
|
||||
This processor ensures that all tensors in the transition are moved to the
|
||||
specified device (CPU or GPU) before they are returned.
|
||||
"""
|
||||
|
||||
device: str = "cpu"
|
||||
|
||||
def __post_init__(self):
|
||||
self.non_blocking = "cuda" in self.device
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
# Create a copy of the transition
|
||||
new_transition = transition.copy()
|
||||
|
||||
# Process observation tensors
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if observation is not None:
|
||||
new_observation = {
|
||||
k: v.to(self.device, non_blocking=self.non_blocking) if isinstance(v, torch.Tensor) else v
|
||||
for k, v in observation.items()
|
||||
}
|
||||
new_transition[TransitionKey.OBSERVATION] = new_observation
|
||||
|
||||
# Process action tensor
|
||||
action = transition.get(TransitionKey.ACTION)
|
||||
if action is not None and isinstance(action, torch.Tensor):
|
||||
new_transition[TransitionKey.ACTION] = action.to(self.device, non_blocking=self.non_blocking)
|
||||
|
||||
# Process reward tensor
|
||||
reward = transition.get(TransitionKey.REWARD)
|
||||
if reward is not None and isinstance(reward, torch.Tensor):
|
||||
new_transition[TransitionKey.REWARD] = reward.to(self.device, non_blocking=self.non_blocking)
|
||||
|
||||
# Process done tensor
|
||||
done = transition.get(TransitionKey.DONE)
|
||||
if done is not None and isinstance(done, torch.Tensor):
|
||||
new_transition[TransitionKey.DONE] = done.to(self.device, non_blocking=self.non_blocking)
|
||||
|
||||
# Process truncated tensor
|
||||
truncated = transition.get(TransitionKey.TRUNCATED)
|
||||
if truncated is not None and isinstance(truncated, torch.Tensor):
|
||||
new_transition[TransitionKey.TRUNCATED] = truncated.to(
|
||||
self.device, non_blocking=self.non_blocking
|
||||
)
|
||||
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
"""Return configuration for serialization."""
|
||||
return {"device": self.device}
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
@@ -0,0 +1,331 @@
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import torch
|
||||
import torchvision.transforms.functional as F # noqa: N812
|
||||
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("image_crop_resize_processor")
|
||||
class ImageCropResizeProcessor:
|
||||
"""Crop and resize image observations."""
|
||||
|
||||
crop_params_dict: dict[str, tuple[int, int, int, int]]
|
||||
resize_size: tuple[int, int] = (128, 128)
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if observation is None:
|
||||
return transition
|
||||
|
||||
if self.resize_size is None and not self.crop_params_dict:
|
||||
return transition
|
||||
|
||||
new_observation = dict(observation)
|
||||
|
||||
# Process all image keys in the observation
|
||||
for key in observation:
|
||||
if "image" not in key:
|
||||
continue
|
||||
|
||||
image = observation[key]
|
||||
device = image.device
|
||||
if device.type == "mps":
|
||||
image = image.cpu()
|
||||
# Crop if crop params are provided for this key
|
||||
if key in self.crop_params_dict:
|
||||
crop_params = self.crop_params_dict[key]
|
||||
image = F.crop(image, *crop_params)
|
||||
# Always resize
|
||||
image = F.resize(image, self.resize_size)
|
||||
image = image.clamp(0.0, 1.0)
|
||||
new_observation[key] = image.to(device)
|
||||
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.OBSERVATION] = new_observation
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"crop_params_dict": self.crop_params_dict,
|
||||
"resize_size": self.resize_size,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
pass
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("time_limit_processor")
|
||||
class TimeLimitProcessor:
|
||||
"""Track episode time and enforce time limits."""
|
||||
|
||||
max_episode_steps: int
|
||||
current_step: int = 0
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
truncated = transition.get(TransitionKey.TRUNCATED)
|
||||
if truncated is None:
|
||||
return transition
|
||||
|
||||
self.current_step += 1
|
||||
if self.current_step >= self.max_episode_steps:
|
||||
truncated = True
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.TRUNCATED] = truncated
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"max_episode_steps": self.max_episode_steps,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
self.current_step = 0
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("gripper_penalty_processor")
|
||||
class GripperPenaltyProcessor:
|
||||
penalty: float = -0.01
|
||||
max_gripper_pos: float = 30.0
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
"""Calculate gripper penalty and add to complementary data."""
|
||||
action = transition.get(TransitionKey.ACTION)
|
||||
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA)
|
||||
|
||||
if complementary_data is None or action is None:
|
||||
return transition
|
||||
|
||||
current_gripper_pos = complementary_data.get("raw_joint_positions", None)[-1]
|
||||
if current_gripper_pos is None:
|
||||
return transition
|
||||
|
||||
gripper_action = action[-1].item()
|
||||
gripper_action_normalized = gripper_action / self.max_gripper_pos
|
||||
|
||||
# Normalize gripper state and action
|
||||
gripper_state_normalized = current_gripper_pos / self.max_gripper_pos
|
||||
gripper_action_normalized = gripper_action - 1.0
|
||||
|
||||
# Calculate penalty boolean as in original
|
||||
gripper_penalty_bool = (gripper_state_normalized < 0.5 and gripper_action_normalized > 0.5) or (
|
||||
gripper_state_normalized > 0.75 and gripper_action_normalized < 0.5
|
||||
)
|
||||
|
||||
gripper_penalty = self.penalty * int(gripper_penalty_bool)
|
||||
|
||||
# Add penalty information to complementary data
|
||||
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
|
||||
|
||||
# Create new complementary data with penalty info
|
||||
new_complementary_data = dict(complementary_data)
|
||||
new_complementary_data["discrete_penalty"] = gripper_penalty
|
||||
|
||||
# Create new transition with updated complementary data
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.COMPLEMENTARY_DATA] = new_complementary_data
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"penalty": self.penalty,
|
||||
"max_gripper_pos": self.max_gripper_pos,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the processor state."""
|
||||
self.last_gripper_state = None
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("intervention_action_processor")
|
||||
class InterventionActionProcessor:
|
||||
"""Handle action intervention based on signals in the transition.
|
||||
|
||||
This processor checks for intervention signals in the transition's complementary data
|
||||
and overrides agent actions when intervention is active.
|
||||
"""
|
||||
|
||||
use_gripper: bool = False
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
action = transition.get(TransitionKey.ACTION)
|
||||
if action is None:
|
||||
return transition
|
||||
|
||||
# Get intervention signals from complementary data
|
||||
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
|
||||
teleop_action = complementary_data.get("teleop_action", {})
|
||||
is_intervention = complementary_data.get("is_intervention", False)
|
||||
terminate_episode = complementary_data.get("terminate_episode", False)
|
||||
success = complementary_data.get("success", False)
|
||||
rerecord_episode = complementary_data.get("rerecord_episode", False)
|
||||
|
||||
new_transition = transition.copy()
|
||||
|
||||
# Override action if intervention is active
|
||||
if is_intervention and teleop_action:
|
||||
# Convert teleop_action dict to tensor format
|
||||
action_list = [
|
||||
teleop_action.get("delta_x", 0.0),
|
||||
teleop_action.get("delta_y", 0.0),
|
||||
teleop_action.get("delta_z", 0.0),
|
||||
]
|
||||
if self.use_gripper:
|
||||
action_list.append(teleop_action.get("gripper", 1.0))
|
||||
|
||||
teleop_action_tensor = torch.tensor(action_list, dtype=action.dtype, device=action.device)
|
||||
new_transition[TransitionKey.ACTION] = teleop_action_tensor
|
||||
|
||||
# Handle episode termination
|
||||
new_transition[TransitionKey.DONE] = bool(terminate_episode)
|
||||
new_transition[TransitionKey.REWARD] = float(success)
|
||||
|
||||
# Update info with intervention metadata
|
||||
info = new_transition.get(TransitionKey.INFO, {})
|
||||
info["is_intervention"] = is_intervention
|
||||
info["rerecord_episode"] = rerecord_episode
|
||||
info["next.success"] = success if terminate_episode else info.get("next.success", False)
|
||||
new_transition[TransitionKey.INFO] = info
|
||||
new_transition[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] = new_transition[
|
||||
TransitionKey.ACTION
|
||||
]
|
||||
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"use_gripper": self.use_gripper,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
pass
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("reward_classifier_processor")
|
||||
class RewardClassifierProcessor:
|
||||
"""Apply reward classification to image observations.
|
||||
|
||||
This processor runs a trained reward classifier on image observations
|
||||
to predict rewards and success states, potentially terminating episodes
|
||||
when success is achieved.
|
||||
"""
|
||||
|
||||
pretrained_path: str = None
|
||||
device: str = "cpu"
|
||||
success_threshold: float = 0.5
|
||||
success_reward: float = 1.0
|
||||
terminate_on_success: bool = True
|
||||
|
||||
reward_classifier: Any = None
|
||||
|
||||
def __post_init__(self):
|
||||
"""Initialize the reward classifier after dataclass initialization."""
|
||||
if self.pretrained_path is not None:
|
||||
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
|
||||
|
||||
self.reward_classifier = Classifier.from_pretrained(self.pretrained_path)
|
||||
self.reward_classifier.to(self.device)
|
||||
self.reward_classifier.eval()
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if observation is None or self.reward_classifier is None:
|
||||
return transition
|
||||
|
||||
# Extract images from observation
|
||||
images = {key: value for key, value in observation.items() if "image" in key}
|
||||
|
||||
if not images:
|
||||
return transition
|
||||
|
||||
# Run reward classifier
|
||||
start_time = time.perf_counter()
|
||||
with torch.inference_mode():
|
||||
success = self.reward_classifier.predict_reward(images, threshold=self.success_threshold)
|
||||
|
||||
classifier_frequency = 1 / (time.perf_counter() - start_time)
|
||||
|
||||
# Calculate reward and termination
|
||||
reward = transition.get(TransitionKey.REWARD, 0.0)
|
||||
terminated = transition.get(TransitionKey.DONE, False)
|
||||
|
||||
if success == 1.0:
|
||||
reward = self.success_reward
|
||||
if self.terminate_on_success:
|
||||
terminated = True
|
||||
|
||||
# Update transition
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.REWARD] = reward
|
||||
new_transition[TransitionKey.DONE] = terminated
|
||||
|
||||
# Update info with classifier frequency
|
||||
info = new_transition.get(TransitionKey.INFO, {})
|
||||
info["reward_classifier_frequency"] = classifier_frequency
|
||||
new_transition[TransitionKey.INFO] = info
|
||||
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"device": self.device,
|
||||
"success_threshold": self.success_threshold,
|
||||
"success_reward": self.success_reward,
|
||||
"terminate_on_success": self.terminate_on_success,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
pass
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
@@ -0,0 +1,335 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from torch import Tensor
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
|
||||
|
||||
|
||||
def _convert_stats_to_tensors(stats: dict[str, dict[str, Any]]) -> dict[str, dict[str, Tensor]]:
|
||||
"""Convert numpy arrays and other types to torch tensors."""
|
||||
tensor_stats: dict[str, dict[str, Tensor]] = {}
|
||||
for key, sub in stats.items():
|
||||
tensor_stats[key] = {}
|
||||
for stat_name, value in sub.items():
|
||||
if isinstance(value, np.ndarray):
|
||||
tensor_val = torch.from_numpy(value.astype(np.float32))
|
||||
elif isinstance(value, torch.Tensor):
|
||||
tensor_val = value.to(dtype=torch.float32)
|
||||
elif isinstance(value, (int, float, list, tuple)):
|
||||
tensor_val = torch.tensor(value, dtype=torch.float32)
|
||||
else:
|
||||
raise TypeError(f"Unsupported type for stats['{key}']['{stat_name}']: {type(value)}")
|
||||
tensor_stats[key][stat_name] = tensor_val
|
||||
return tensor_stats
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register(name="normalizer_processor")
|
||||
class NormalizerProcessor:
|
||||
"""Normalizes observations and actions in a single processor step.
|
||||
|
||||
This processor handles normalization of both observation and action tensors
|
||||
using either mean/std normalization or min/max scaling to a [-1, 1] range.
|
||||
|
||||
For each tensor key in the stats dictionary, the processor will:
|
||||
- Use mean/std normalization if those statistics are provided: (x - mean) / std
|
||||
- Use min/max scaling if those statistics are provided: 2 * (x - min) / (max - min) - 1
|
||||
|
||||
The processor can be configured to normalize only specific keys by setting
|
||||
the normalize_keys parameter.
|
||||
"""
|
||||
|
||||
# Features and normalisation map are mandatory to match the design of normalize.py
|
||||
features: dict[str, PolicyFeature]
|
||||
norm_map: dict[FeatureType, NormalizationMode]
|
||||
|
||||
# Pre-computed statistics coming from dataset.meta.stats for instance.
|
||||
stats: dict[str, dict[str, Any]] | None = None
|
||||
|
||||
# Explicit subset of keys to normalise. If ``None`` every key (except
|
||||
# "action") found in ``stats`` will be normalised. Using a ``set`` makes
|
||||
# membership checks O(1).
|
||||
normalize_keys: set[str] | None = None
|
||||
|
||||
eps: float = 1e-8
|
||||
|
||||
_tensor_stats: dict[str, dict[str, Tensor]] = field(default_factory=dict, init=False, repr=False)
|
||||
|
||||
@classmethod
|
||||
def from_lerobot_dataset(
|
||||
cls,
|
||||
dataset: LeRobotDataset,
|
||||
features: dict[str, PolicyFeature],
|
||||
norm_map: dict[FeatureType, NormalizationMode],
|
||||
*,
|
||||
normalize_keys: set[str] | None = None,
|
||||
eps: float = 1e-8,
|
||||
) -> NormalizerProcessor:
|
||||
"""Factory helper that pulls statistics from a :class:`LeRobotDataset`.
|
||||
|
||||
The features and norm_map parameters are mandatory to match the design
|
||||
pattern used in normalize.py.
|
||||
"""
|
||||
|
||||
return cls(
|
||||
features=features,
|
||||
norm_map=norm_map,
|
||||
stats=dataset.meta.stats,
|
||||
normalize_keys=normalize_keys,
|
||||
eps=eps,
|
||||
)
|
||||
|
||||
def __post_init__(self):
|
||||
# Handle deserialization from JSON config
|
||||
if self.features and isinstance(list(self.features.values())[0], dict):
|
||||
# Features came from JSON - need to reconstruct PolicyFeature objects
|
||||
reconstructed_features = {}
|
||||
for key, ft_dict in self.features.items():
|
||||
reconstructed_features[key] = PolicyFeature(
|
||||
type=FeatureType(ft_dict["type"]), shape=tuple(ft_dict["shape"])
|
||||
)
|
||||
self.features = reconstructed_features
|
||||
|
||||
if self.norm_map and isinstance(list(self.norm_map.keys())[0], str):
|
||||
# norm_map came from JSON - need to reconstruct enum keys and values
|
||||
reconstructed_norm_map = {}
|
||||
for ft_type_str, norm_mode_str in self.norm_map.items():
|
||||
reconstructed_norm_map[FeatureType(ft_type_str)] = NormalizationMode(norm_mode_str)
|
||||
self.norm_map = reconstructed_norm_map
|
||||
|
||||
# Convert statistics once so we avoid repeated numpy→Tensor conversions
|
||||
# during runtime.
|
||||
self.stats = self.stats or {}
|
||||
self._tensor_stats = _convert_stats_to_tensors(self.stats)
|
||||
|
||||
# Ensure *normalize_keys* is a set for fast look-ups and compare by
|
||||
# value later when returning the configuration.
|
||||
if self.normalize_keys is not None and not isinstance(self.normalize_keys, set):
|
||||
self.normalize_keys = set(self.normalize_keys)
|
||||
|
||||
def _normalize_obs(self, observation):
|
||||
if observation is None:
|
||||
return None
|
||||
|
||||
# Decide which keys should be normalised for this call.
|
||||
if self.normalize_keys is not None:
|
||||
keys_to_norm = self.normalize_keys
|
||||
else:
|
||||
# Use feature map to skip action keys.
|
||||
keys_to_norm = {k for k, ft in self.features.items() if ft.type is not FeatureType.ACTION}
|
||||
|
||||
processed = dict(observation)
|
||||
for key in keys_to_norm:
|
||||
if key not in processed or key not in self._tensor_stats:
|
||||
continue
|
||||
|
||||
orig_val = processed[key]
|
||||
tensor = (
|
||||
orig_val.to(dtype=torch.float32)
|
||||
if isinstance(orig_val, torch.Tensor)
|
||||
else torch.as_tensor(orig_val, dtype=torch.float32)
|
||||
)
|
||||
stats = {k: v.to(tensor.device) for k, v in self._tensor_stats[key].items()}
|
||||
|
||||
if "mean" in stats and "std" in stats:
|
||||
mean, std = stats["mean"], stats["std"]
|
||||
processed[key] = (tensor - mean) / (std + self.eps)
|
||||
elif "min" in stats and "max" in stats:
|
||||
min_val, max_val = stats["min"], stats["max"]
|
||||
processed[key] = 2 * (tensor - min_val) / (max_val - min_val + self.eps) - 1
|
||||
return processed
|
||||
|
||||
def _normalize_action(self, action):
|
||||
if action is None or "action" not in self._tensor_stats:
|
||||
return action
|
||||
|
||||
tensor = (
|
||||
action.to(dtype=torch.float32)
|
||||
if isinstance(action, torch.Tensor)
|
||||
else torch.as_tensor(action, dtype=torch.float32)
|
||||
)
|
||||
stats = {k: v.to(tensor.device) for k, v in self._tensor_stats["action"].items()}
|
||||
if "mean" in stats and "std" in stats:
|
||||
mean, std = stats["mean"], stats["std"]
|
||||
return (tensor - mean) / (std + self.eps)
|
||||
if "min" in stats and "max" in stats:
|
||||
min_val, max_val = stats["min"], stats["max"]
|
||||
return 2 * (tensor - min_val) / (max_val - min_val + self.eps) - 1
|
||||
raise ValueError("Action stats must contain either ('mean','std') or ('min','max')")
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = self._normalize_obs(transition.get(TransitionKey.OBSERVATION))
|
||||
action = self._normalize_action(transition.get(TransitionKey.ACTION))
|
||||
|
||||
# Create a new transition with normalized values
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.OBSERVATION] = observation
|
||||
new_transition[TransitionKey.ACTION] = action
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
config = {
|
||||
"eps": self.eps,
|
||||
"features": {
|
||||
key: {"type": ft.type.value, "shape": ft.shape} for key, ft in self.features.items()
|
||||
},
|
||||
"norm_map": {ft_type.value: norm_mode.value for ft_type, norm_mode in self.norm_map.items()},
|
||||
}
|
||||
if self.normalize_keys is not None:
|
||||
# Serialise as a list for YAML / JSON friendliness
|
||||
config["normalize_keys"] = sorted(self.normalize_keys)
|
||||
return config
|
||||
|
||||
def state_dict(self) -> dict[str, Tensor]:
|
||||
flat = {}
|
||||
for key, sub in self._tensor_stats.items():
|
||||
for stat_name, tensor in sub.items():
|
||||
flat[f"{key}.{stat_name}"] = tensor
|
||||
return flat
|
||||
|
||||
def load_state_dict(self, state: Mapping[str, Tensor]) -> None:
|
||||
self._tensor_stats.clear()
|
||||
for flat_key, tensor in state.items():
|
||||
key, stat_name = flat_key.rsplit(".", 1)
|
||||
self._tensor_stats.setdefault(key, {})[stat_name] = tensor
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register(name="unnormalizer_processor")
|
||||
class UnnormalizerProcessor:
|
||||
"""Inverse normalisation for observations and actions.
|
||||
|
||||
Exactly mirrors :class:`NormalizerProcessor` but applies the inverse
|
||||
transform.
|
||||
"""
|
||||
|
||||
features: dict[str, PolicyFeature]
|
||||
norm_map: dict[FeatureType, NormalizationMode]
|
||||
stats: dict[str, dict[str, Any]] | None = None
|
||||
eps: float = 1e-8
|
||||
|
||||
_tensor_stats: dict[str, dict[str, Tensor]] = field(default_factory=dict, init=False, repr=False)
|
||||
|
||||
@classmethod
|
||||
def from_lerobot_dataset(
|
||||
cls,
|
||||
dataset: LeRobotDataset,
|
||||
features: dict[str, PolicyFeature],
|
||||
norm_map: dict[FeatureType, NormalizationMode],
|
||||
*,
|
||||
eps: float = 1e-8,
|
||||
) -> UnnormalizerProcessor:
|
||||
return cls(features=features, norm_map=norm_map, stats=dataset.meta.stats, eps=eps)
|
||||
|
||||
def __post_init__(self):
|
||||
# Handle deserialization from JSON config
|
||||
if self.features and isinstance(list(self.features.values())[0], dict):
|
||||
# Features came from JSON - need to reconstruct PolicyFeature objects
|
||||
reconstructed_features = {}
|
||||
for key, ft_dict in self.features.items():
|
||||
reconstructed_features[key] = PolicyFeature(
|
||||
type=FeatureType(ft_dict["type"]), shape=tuple(ft_dict["shape"])
|
||||
)
|
||||
self.features = reconstructed_features
|
||||
|
||||
if self.norm_map and isinstance(list(self.norm_map.keys())[0], str):
|
||||
# norm_map came from JSON - need to reconstruct enum keys and values
|
||||
reconstructed_norm_map = {}
|
||||
for ft_type_str, norm_mode_str in self.norm_map.items():
|
||||
reconstructed_norm_map[FeatureType(ft_type_str)] = NormalizationMode(norm_mode_str)
|
||||
self.norm_map = reconstructed_norm_map
|
||||
|
||||
self.stats = self.stats or {}
|
||||
self._tensor_stats = _convert_stats_to_tensors(self.stats)
|
||||
|
||||
def _unnormalize_obs(self, observation):
|
||||
if observation is None:
|
||||
return None
|
||||
keys = [k for k, ft in self.features.items() if ft.type is not FeatureType.ACTION]
|
||||
processed = dict(observation)
|
||||
for key in keys:
|
||||
if key not in processed or key not in self._tensor_stats:
|
||||
continue
|
||||
orig_val = processed[key]
|
||||
tensor = (
|
||||
orig_val.to(dtype=torch.float32)
|
||||
if isinstance(orig_val, torch.Tensor)
|
||||
else torch.as_tensor(orig_val, dtype=torch.float32)
|
||||
)
|
||||
stats = {k: v.to(tensor.device) for k, v in self._tensor_stats[key].items()}
|
||||
if "mean" in stats and "std" in stats:
|
||||
mean, std = stats["mean"], stats["std"]
|
||||
processed[key] = tensor * std + mean
|
||||
elif "min" in stats and "max" in stats:
|
||||
min_val, max_val = stats["min"], stats["max"]
|
||||
processed[key] = (tensor + 1) / 2 * (max_val - min_val) + min_val
|
||||
return processed
|
||||
|
||||
def _unnormalize_action(self, action):
|
||||
if action is None or "action" not in self._tensor_stats:
|
||||
return action
|
||||
tensor = (
|
||||
action.to(dtype=torch.float32)
|
||||
if isinstance(action, torch.Tensor)
|
||||
else torch.as_tensor(action, dtype=torch.float32)
|
||||
)
|
||||
stats = {k: v.to(tensor.device) for k, v in self._tensor_stats["action"].items()}
|
||||
if "mean" in stats and "std" in stats:
|
||||
mean, std = stats["mean"], stats["std"]
|
||||
return tensor * std + mean
|
||||
if "min" in stats and "max" in stats:
|
||||
min_val, max_val = stats["min"], stats["max"]
|
||||
return (tensor + 1) / 2 * (max_val - min_val) + min_val
|
||||
raise ValueError("Action stats must contain either ('mean','std') or ('min','max')")
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = self._unnormalize_obs(transition.get(TransitionKey.OBSERVATION))
|
||||
action = self._unnormalize_action(transition.get(TransitionKey.ACTION))
|
||||
|
||||
# Create a new transition with unnormalized values
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.OBSERVATION] = observation
|
||||
new_transition[TransitionKey.ACTION] = action
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"eps": self.eps,
|
||||
"features": {
|
||||
key: {"type": ft.type.value, "shape": ft.shape} for key, ft in self.features.items()
|
||||
},
|
||||
"norm_map": {ft_type.value: norm_mode.value for ft_type, norm_mode in self.norm_map.items()},
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, Tensor]:
|
||||
flat = {}
|
||||
for key, sub in self._tensor_stats.items():
|
||||
for stat_name, tensor in sub.items():
|
||||
flat[f"{key}.{stat_name}"] = tensor
|
||||
return flat
|
||||
|
||||
def load_state_dict(self, state: Mapping[str, Tensor]) -> None:
|
||||
self._tensor_stats.clear()
|
||||
for flat_key, tensor in state.items():
|
||||
key, stat_name = flat_key.rsplit(".", 1)
|
||||
self._tensor_stats.setdefault(key, {})[stat_name] = tensor
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
@@ -0,0 +1,267 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
import einops
|
||||
import numpy as np
|
||||
import torch
|
||||
from torch import Tensor
|
||||
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
from lerobot.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE
|
||||
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImageProcessor:
|
||||
"""Process image observations from environment format to policy format.
|
||||
|
||||
Converts images from:
|
||||
- Channel-last (H, W, C) to channel-first (C, H, W)
|
||||
- uint8 [0, 255] to float32 [0, 1]
|
||||
- Adds batch dimension if needed
|
||||
- Handles both single images and dictionaries of images
|
||||
"""
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
|
||||
if observation is None:
|
||||
return transition
|
||||
|
||||
processed_obs = {}
|
||||
|
||||
# Copy all observations first
|
||||
for key, value in observation.items():
|
||||
processed_obs[key] = value
|
||||
|
||||
# Handle pixels key if present
|
||||
pixels = observation.get("pixels")
|
||||
if pixels is not None:
|
||||
# Remove pixels from processed_obs since we'll replace it with processed images
|
||||
processed_obs.pop("pixels", None)
|
||||
# Determine image mapping
|
||||
if isinstance(pixels, dict):
|
||||
imgs = {f"{OBS_IMAGES}.{key}": img for key, img in pixels.items()}
|
||||
else:
|
||||
imgs = {OBS_IMAGE: pixels}
|
||||
|
||||
# Process each image
|
||||
for imgkey, img in imgs.items():
|
||||
processed_img = self._process_single_image(img)
|
||||
processed_obs[imgkey] = processed_img
|
||||
|
||||
# Return new transition with processed observation
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.OBSERVATION] = processed_obs
|
||||
return new_transition
|
||||
|
||||
def _process_single_image(self, img: np.ndarray) -> Tensor:
|
||||
"""Process a single image array."""
|
||||
# Convert to tensor
|
||||
img_tensor = torch.from_numpy(img)
|
||||
|
||||
# Add batch dimension if needed
|
||||
if img_tensor.ndim == 3:
|
||||
img_tensor = img_tensor.unsqueeze(0)
|
||||
|
||||
# Validate image format
|
||||
_, h, w, c = img_tensor.shape
|
||||
if not (c < h and c < w):
|
||||
raise ValueError(f"Expected channel-last images, but got shape {img_tensor.shape}")
|
||||
|
||||
if img_tensor.dtype != torch.uint8:
|
||||
raise ValueError(f"Expected torch.uint8 images, but got {img_tensor.dtype}")
|
||||
|
||||
# Convert to channel-first format
|
||||
img_tensor = einops.rearrange(img_tensor, "b h w c -> b c h w").contiguous()
|
||||
|
||||
# Convert to float32 and normalize to [0, 1]
|
||||
img_tensor = img_tensor.type(torch.float32) / 255.0
|
||||
|
||||
return img_tensor
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
"""Return configuration for serialization."""
|
||||
return {}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
"""Return state dictionary (empty for this processor)."""
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
"""Load state dictionary (no-op for this processor)."""
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset processor state (no-op for this processor)."""
|
||||
pass
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
"""Transforms:
|
||||
pixels -> OBS_IMAGE,
|
||||
observation.pixels -> OBS_IMAGE,
|
||||
pixels.<cam> -> OBS_IMAGES.<cam>,
|
||||
observation.pixels.<cam> -> OBS_IMAGES.<cam>
|
||||
"""
|
||||
if "pixels" in features:
|
||||
features[OBS_IMAGE] = features.pop("pixels")
|
||||
if "observation.pixels" in features:
|
||||
features[OBS_IMAGE] = features.pop("observation.pixels")
|
||||
|
||||
prefixes = ("pixels.", "observation.pixels.")
|
||||
for key in list(features.keys()):
|
||||
for p in prefixes:
|
||||
if key.startswith(p):
|
||||
suffix = key[len(p) :]
|
||||
features[f"{OBS_IMAGES}.{suffix}"] = features.pop(key)
|
||||
break
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
class StateProcessor:
|
||||
"""Process state observations from environment format to policy format.
|
||||
|
||||
Handles:
|
||||
- environment_state -> observation.environment_state
|
||||
- agent_pos -> observation.state
|
||||
- Converts numpy arrays to tensors
|
||||
- Adds batch dimension if needed
|
||||
"""
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
|
||||
if observation is None:
|
||||
return transition
|
||||
|
||||
processed_obs = dict(observation) # Copy existing observations
|
||||
|
||||
# Process environment_state
|
||||
if "environment_state" in observation:
|
||||
env_state = torch.from_numpy(observation["environment_state"]).float()
|
||||
if env_state.dim() == 1:
|
||||
env_state = env_state.unsqueeze(0)
|
||||
processed_obs[OBS_ENV_STATE] = env_state
|
||||
# Remove original key
|
||||
del processed_obs["environment_state"]
|
||||
|
||||
# Process agent_pos
|
||||
if "agent_pos" in observation:
|
||||
agent_pos = torch.from_numpy(observation["agent_pos"]).float()
|
||||
if agent_pos.dim() == 1:
|
||||
agent_pos = agent_pos.unsqueeze(0)
|
||||
processed_obs[OBS_STATE] = agent_pos
|
||||
# Remove original key
|
||||
del processed_obs["agent_pos"]
|
||||
|
||||
# Return new transition with processed observation
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.OBSERVATION] = processed_obs
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
"""Return configuration for serialization."""
|
||||
return {}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
"""Return state dictionary (empty for this processor)."""
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
"""Load state dictionary (no-op for this processor)."""
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset processor state (no-op for this processor)."""
|
||||
pass
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
"""Transforms:
|
||||
environment_state -> OBS_ENV_STATE,
|
||||
agent_pos -> OBS_STATE,
|
||||
observation.environment_state -> OBS_ENV_STATE,
|
||||
observation.agent_pos -> OBS_STATE
|
||||
"""
|
||||
pairs = (
|
||||
("environment_state", OBS_ENV_STATE),
|
||||
("agent_pos", OBS_STATE),
|
||||
)
|
||||
for old, new in pairs:
|
||||
if old in features:
|
||||
features[new] = features.pop(old)
|
||||
prefixed = f"observation.{old}"
|
||||
if prefixed in features:
|
||||
features[new] = features.pop(prefixed)
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register(name="observation_processor")
|
||||
class VanillaObservationProcessor:
|
||||
"""Complete observation processor that combines image and state processing.
|
||||
|
||||
This processor replicates the functionality of the original preprocess_observation
|
||||
function but in a modular, composable way that fits into the pipeline architecture.
|
||||
"""
|
||||
|
||||
image_processor: ImageProcessor = field(default_factory=ImageProcessor)
|
||||
state_processor: StateProcessor = field(default_factory=StateProcessor)
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
# First process images
|
||||
transition = self.image_processor(transition)
|
||||
# Then process state
|
||||
transition = self.state_processor(transition)
|
||||
return transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
"""Return configuration for serialization."""
|
||||
return {
|
||||
"image_processor": self.image_processor.get_config(),
|
||||
"state_processor": self.state_processor.get_config(),
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
"""Return state dictionary."""
|
||||
state = {}
|
||||
state.update({f"image_processor.{k}": v for k, v in self.image_processor.state_dict().items()})
|
||||
state.update({f"state_processor.{k}": v for k, v in self.state_processor.state_dict().items()})
|
||||
return state
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
"""Load state dictionary."""
|
||||
image_state = {
|
||||
k.replace("image_processor.", ""): v for k, v in state.items() if k.startswith("image_processor.")
|
||||
}
|
||||
state_state = {
|
||||
k.replace("state_processor.", ""): v for k, v in state.items() if k.startswith("state_processor.")
|
||||
}
|
||||
|
||||
self.image_processor.load_state_dict(image_state)
|
||||
self.state_processor.load_state_dict(state_state)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset processor state."""
|
||||
self.image_processor.reset()
|
||||
self.state_processor.reset()
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
features = self.image_processor.feature_contract(features)
|
||||
features = self.state_processor.feature_contract(features)
|
||||
return features
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,63 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register(name="rename_processor")
|
||||
class RenameProcessor:
|
||||
"""Rename processor that renames keys in the observation."""
|
||||
|
||||
rename_map: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if observation is None:
|
||||
return transition
|
||||
|
||||
processed_obs = {}
|
||||
for key, value in observation.items():
|
||||
if key in self.rename_map:
|
||||
processed_obs[self.rename_map[key]] = value
|
||||
else:
|
||||
processed_obs[key] = value
|
||||
|
||||
# Create a new transition with the renamed observation
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.OBSERVATION] = processed_obs
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {"rename_map": self.rename_map}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
"""Transforms:
|
||||
- Each key in the observation that appears in `rename_map` is renamed to its value.
|
||||
- Keys not in `rename_map` remain unchanged.
|
||||
"""
|
||||
return {self.rename_map.get(k, k): v for k, v in features.items()}
|
||||
@@ -0,0 +1,245 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
import gymnasium as gym
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("joint_velocity_processor")
|
||||
class JointVelocityProcessor:
|
||||
"""Add joint velocity information to observations.
|
||||
|
||||
Computes joint velocities by tracking changes in joint positions over time.
|
||||
"""
|
||||
|
||||
joint_velocity_limits: float = 100.0
|
||||
dt: float = 1.0 / 10
|
||||
|
||||
last_joint_positions: torch.Tensor | None = None
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if observation is None:
|
||||
return transition
|
||||
|
||||
# Get current joint positions (assuming they're in observation.state)
|
||||
current_positions = observation.get("observation.state")
|
||||
if current_positions is None:
|
||||
return transition
|
||||
|
||||
# Initialize last joint positions if not already set
|
||||
if self.last_joint_positions is None:
|
||||
self.last_joint_positions = current_positions.clone()
|
||||
|
||||
# Compute velocities
|
||||
joint_velocities = (current_positions - self.last_joint_positions) / self.dt
|
||||
self.last_joint_positions = current_positions.clone()
|
||||
|
||||
# Extend observation with velocities
|
||||
extended_state = torch.cat([current_positions, joint_velocities], dim=-1)
|
||||
|
||||
# Create new observation dict
|
||||
new_observation = dict(observation)
|
||||
new_observation["observation.state"] = extended_state
|
||||
|
||||
# Return new transition
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.OBSERVATION] = new_observation
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"joint_velocity_limits": self.joint_velocity_limits,
|
||||
"dt": self.dt,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
self.last_joint_positions = None
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("current_processor")
|
||||
class MotorCurrentProcessor:
|
||||
"""Add motor current information to observations."""
|
||||
|
||||
env: gym.Env = None
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if observation is None:
|
||||
return transition
|
||||
|
||||
# Get current values from complementary_data (where robot state would be stored)
|
||||
present_current_dict = self.env.unwrapped.robot.bus.sync_read("Present_Current")
|
||||
motor_currents = torch.tensor(
|
||||
[present_current_dict[name] for name in self.env.unwrapped.robot.bus.motors],
|
||||
dtype=torch.float32,
|
||||
).unsqueeze(0)
|
||||
|
||||
current_state = observation.get("observation.state")
|
||||
if current_state is None:
|
||||
return transition
|
||||
|
||||
extended_state = torch.cat([current_state, motor_currents], dim=-1)
|
||||
|
||||
# Create new observation dict
|
||||
new_observation = dict(observation)
|
||||
new_observation["observation.state"] = extended_state
|
||||
|
||||
# Return new transition
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.OBSERVATION] = new_observation
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
pass
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("inverse_kinematics_processor")
|
||||
class InverseKinematicsProcessor:
|
||||
"""Convert end-effector space actions to joint space using inverse kinematics.
|
||||
|
||||
This processor transforms delta commands in end-effector space (delta_x, delta_y, delta_z)
|
||||
to joint space commands using forward and inverse kinematics. It maintains the current
|
||||
end-effector pose and joint positions to compute the transformations.
|
||||
"""
|
||||
|
||||
urdf_path: str
|
||||
target_frame_name: str = "gripper_link"
|
||||
end_effector_step_sizes: dict[str, float] = field(default_factory=lambda: {"x": 1.0, "y": 1.0, "z": 1.0})
|
||||
end_effector_bounds: dict[str, list[float]] | None = None
|
||||
max_gripper_pos: float = 30.0
|
||||
|
||||
# State tracking
|
||||
current_ee_pos: np.ndarray | None = field(default=None, init=False, repr=False)
|
||||
current_joint_pos: np.ndarray | None = field(default=None, init=False, repr=False)
|
||||
kinematics: RobotKinematics | None = field(default=None, init=False, repr=False)
|
||||
|
||||
def __post_init__(self):
|
||||
"""Initialize the kinematics module after dataclass initialization."""
|
||||
if self.urdf_path:
|
||||
self.kinematics = RobotKinematics(
|
||||
urdf_path=self.urdf_path,
|
||||
target_frame_name=self.target_frame_name,
|
||||
)
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
action = transition.get(TransitionKey.ACTION)
|
||||
if action is None:
|
||||
return transition
|
||||
|
||||
action_np = action.detach().cpu().numpy().squeeze()
|
||||
|
||||
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
|
||||
raw_joint_positions = complementary_data.get("raw_joint_positions")
|
||||
current_gripper_pos = raw_joint_positions[-1]
|
||||
if self.current_joint_pos is None:
|
||||
self.current_joint_pos = raw_joint_positions
|
||||
|
||||
# Initialize end-effector position if not available
|
||||
if self.current_joint_pos is None:
|
||||
return transition # Cannot proceed without joint positions
|
||||
|
||||
# Calculate current end-effector position using forward kinematics
|
||||
if self.current_ee_pos is None:
|
||||
self.current_ee_pos = self.kinematics.forward_kinematics(self.current_joint_pos)
|
||||
|
||||
# Scale deltas by step sizes
|
||||
delta_ee = np.array(
|
||||
[
|
||||
action_np[0] * self.end_effector_step_sizes["x"],
|
||||
action_np[1] * self.end_effector_step_sizes["y"],
|
||||
action_np[2] * self.end_effector_step_sizes["z"],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
# Set desired end-effector position by adding delta
|
||||
desired_ee_pos = np.eye(4)
|
||||
desired_ee_pos[:3, :3] = self.current_ee_pos[:3, :3] # Keep orientation
|
||||
|
||||
# Add delta to position and clip to bounds
|
||||
desired_ee_pos[:3, 3] = self.current_ee_pos[:3, 3] + delta_ee
|
||||
if self.end_effector_bounds is not None:
|
||||
desired_ee_pos[:3, 3] = np.clip(
|
||||
desired_ee_pos[:3, 3],
|
||||
self.end_effector_bounds["min"],
|
||||
self.end_effector_bounds["max"],
|
||||
)
|
||||
|
||||
# Compute inverse kinematics to get joint positions
|
||||
target_joint_values = self.kinematics.inverse_kinematics(self.current_joint_pos, desired_ee_pos)
|
||||
|
||||
# Update current state
|
||||
self.current_ee_pos = desired_ee_pos.copy()
|
||||
self.current_joint_pos = target_joint_values.copy()
|
||||
|
||||
# Create new action with joint space commands
|
||||
gripper_action = current_gripper_pos
|
||||
if len(action_np) > 3:
|
||||
# Handle gripper command separately
|
||||
gripper_command = action_np[3]
|
||||
|
||||
# Process gripper command (convert from [0,2] to delta) and discretize
|
||||
gripper_delta = np.round(gripper_command - 1.0).astype(int) * self.max_gripper_pos
|
||||
gripper_action = np.clip(current_gripper_pos + gripper_delta, 0, self.max_gripper_pos)
|
||||
|
||||
# Combine joint positions and gripper
|
||||
target_joint_values[-1] = gripper_action
|
||||
|
||||
converted_action = torch.from_numpy(target_joint_values).to(action.device).to(action.dtype)
|
||||
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.ACTION] = converted_action
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"urdf_path": self.urdf_path,
|
||||
"target_frame_name": self.target_frame_name,
|
||||
"end_effector_step_sizes": self.end_effector_step_sizes,
|
||||
"end_effector_bounds": self.end_effector_bounds,
|
||||
"max_gripper_pos": self.max_gripper_pos,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the processor state."""
|
||||
self.current_ee_pos = None
|
||||
self.current_joint_pos = None
|
||||
|
||||
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
@@ -68,10 +68,11 @@ from tqdm import trange
|
||||
from lerobot.configs import parser
|
||||
from lerobot.configs.eval import EvalPipelineConfig
|
||||
from lerobot.envs.factory import make_env
|
||||
from lerobot.envs.utils import add_envs_task, check_env_attributes_and_types, preprocess_observation
|
||||
from lerobot.envs.utils import add_envs_task, check_env_attributes_and_types
|
||||
from lerobot.policies.factory import make_policy
|
||||
from lerobot.policies.pretrained import PreTrainedPolicy
|
||||
from lerobot.policies.utils import get_device_from_parameters
|
||||
from lerobot.processor import RobotProcessor, TransitionKey, VanillaObservationProcessor
|
||||
from lerobot.utils.io_utils import write_video
|
||||
from lerobot.utils.random_utils import set_seed
|
||||
from lerobot.utils.utils import (
|
||||
@@ -128,6 +129,16 @@ def rollout(
|
||||
if render_callback is not None:
|
||||
render_callback(env)
|
||||
|
||||
# Create observation processing processor
|
||||
# NOTE: During environment interaction, we skip batch dictionary conversion
|
||||
# since that format is only needed for loss computation during training.
|
||||
# Using identity functions to avoid unnecessary format transformations.
|
||||
obs_processor = RobotProcessor(
|
||||
[VanillaObservationProcessor()],
|
||||
to_transition=lambda x: x,
|
||||
to_output=lambda x: x,
|
||||
)
|
||||
|
||||
all_observations = []
|
||||
all_actions = []
|
||||
all_rewards = []
|
||||
@@ -147,10 +158,13 @@ def rollout(
|
||||
check_env_attributes_and_types(env)
|
||||
while not np.all(done):
|
||||
# Numpy array to tensor and changing dictionary keys to LeRobot policy format.
|
||||
observation = preprocess_observation(observation)
|
||||
transition = (observation, None, None, None, None, None, None)
|
||||
processed_transition = obs_processor(transition)
|
||||
observation = processed_transition[TransitionKey.OBSERVATION]
|
||||
if return_observations:
|
||||
all_observations.append(deepcopy(observation))
|
||||
|
||||
# TODO(azouitine): Move this in processor side
|
||||
observation = {
|
||||
key: observation[key].to(device, non_blocking=device.type == "cuda") for key in observation
|
||||
}
|
||||
@@ -195,7 +209,9 @@ def rollout(
|
||||
|
||||
# Track the final observation.
|
||||
if return_observations:
|
||||
observation = preprocess_observation(observation)
|
||||
transition = (observation, None, None, None, None, None, None)
|
||||
processed_transition = obs_processor(transition)
|
||||
observation = processed_transition[TransitionKey.OBSERVATION]
|
||||
all_observations.append(deepcopy(observation))
|
||||
|
||||
# Stack the sequence along the first dimension so that we have (batch, sequence, *) tensors.
|
||||
|
||||
@@ -62,8 +62,14 @@ from lerobot.configs import parser
|
||||
from lerobot.configs.train import TrainRLServerPipelineConfig
|
||||
from lerobot.policies.factory import make_policy
|
||||
from lerobot.policies.sac.modeling_sac import SACPolicy
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
from lerobot.robots import so100_follower # noqa: F401
|
||||
from lerobot.scripts.rl.gym_manipulator import make_robot_env
|
||||
from lerobot.scripts.rl.gym_manipulator import (
|
||||
create_transition,
|
||||
make_processors,
|
||||
make_robot_env,
|
||||
step_env_and_process_transition,
|
||||
)
|
||||
from lerobot.teleoperators import gamepad, so101_leader # noqa: F401
|
||||
from lerobot.transport import services_pb2, services_pb2_grpc
|
||||
from lerobot.transport.utils import (
|
||||
@@ -236,7 +242,8 @@ def act_with_policy(
|
||||
|
||||
logging.info("make_env online")
|
||||
|
||||
online_env = make_robot_env(cfg=cfg.env)
|
||||
online_env, teleop_device = make_robot_env(cfg=cfg.env)
|
||||
env_processor, action_processor = make_processors(online_env, cfg.env)
|
||||
|
||||
set_seed(cfg.seed)
|
||||
device = get_safe_torch_device(cfg.policy.device, log=True)
|
||||
@@ -257,6 +264,13 @@ def act_with_policy(
|
||||
assert isinstance(policy, nn.Module)
|
||||
|
||||
obs, info = online_env.reset()
|
||||
complementary_data = {"raw_joint_positions": info.pop("raw_joint_positions")}
|
||||
env_processor.reset()
|
||||
action_processor.reset()
|
||||
|
||||
# Process initial observation
|
||||
transition = create_transition(observation=obs, info=info, complementary_data=complementary_data)
|
||||
transition = env_processor(transition)
|
||||
|
||||
# NOTE: For the moment we will solely handle the case of a single environment
|
||||
sum_reward_episode = 0
|
||||
@@ -274,45 +288,57 @@ def act_with_policy(
|
||||
logging.info("[ACTOR] Shutting down act_with_policy")
|
||||
return
|
||||
|
||||
if interaction_step >= cfg.policy.online_step_before_learning:
|
||||
# Time policy inference and check if it meets FPS requirement
|
||||
with policy_timer:
|
||||
action = policy.select_action(batch=obs)
|
||||
policy_fps = policy_timer.fps_last
|
||||
observation = transition[TransitionKey.OBSERVATION]
|
||||
|
||||
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
|
||||
# Time policy inference and check if it meets FPS requirement
|
||||
with policy_timer:
|
||||
# Extract observation from transition for policy
|
||||
action = policy.select_action(batch=observation)
|
||||
policy_fps = policy_timer.fps_last
|
||||
|
||||
else:
|
||||
action = online_env.action_space.sample()
|
||||
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
|
||||
|
||||
next_obs, reward, done, truncated, info = online_env.step(action)
|
||||
# Use the new step function
|
||||
new_transition, terminate_episode = step_env_and_process_transition(
|
||||
env=online_env,
|
||||
transition=transition,
|
||||
action=action,
|
||||
teleop_device=teleop_device,
|
||||
env_processor=env_processor,
|
||||
action_processor=action_processor,
|
||||
)
|
||||
|
||||
# Extract values from processed transition
|
||||
next_observation = new_transition[TransitionKey.OBSERVATION]
|
||||
executed_action = new_transition[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
|
||||
reward = new_transition[TransitionKey.REWARD]
|
||||
done = new_transition.get(TransitionKey.DONE, False)
|
||||
truncated = new_transition.get(TransitionKey.TRUNCATED, False)
|
||||
|
||||
sum_reward_episode += float(reward)
|
||||
# Increment total steps counter for intervention rate
|
||||
episode_total_steps += 1
|
||||
|
||||
# NOTE: We override the action if the intervention is True, because the action applied is the intervention action
|
||||
if "is_intervention" in info and info["is_intervention"]:
|
||||
# NOTE: The action space for demonstration before hand is with the full action space
|
||||
# but sometimes for example we want to deactivate the gripper
|
||||
action = info["action_intervention"]
|
||||
# Check for intervention from transition info
|
||||
intervention_info = new_transition[TransitionKey.INFO]
|
||||
if intervention_info.get("is_intervention", False):
|
||||
episode_intervention = True
|
||||
# Increment intervention steps counter
|
||||
episode_intervention_steps += 1
|
||||
|
||||
# Create transition for learner (convert to old format)
|
||||
list_transition_to_send_to_learner.append(
|
||||
Transition(
|
||||
state=obs,
|
||||
action=action,
|
||||
state=observation,
|
||||
action=executed_action,
|
||||
reward=reward,
|
||||
next_state=next_obs,
|
||||
next_state=next_observation,
|
||||
done=done,
|
||||
truncated=truncated, # TODO: (azouitine) Handle truncation properly
|
||||
complementary_info=info,
|
||||
truncated=truncated,
|
||||
complementary_info={}, # new_transition[TransitionKey.COMPLEMENTARY_DATA],
|
||||
)
|
||||
)
|
||||
# assign obs to the next obs and continue the rollout
|
||||
obs = next_obs
|
||||
|
||||
# Update transition for next iteration
|
||||
transition = new_transition
|
||||
|
||||
if done or truncated:
|
||||
logging.info(f"[ACTOR] Global step {interaction_step}: Episode reward: {sum_reward_episode}")
|
||||
@@ -347,12 +373,21 @@ def act_with_policy(
|
||||
)
|
||||
)
|
||||
|
||||
# Reset intervention counters
|
||||
# Reset intervention counters and environment
|
||||
sum_reward_episode = 0.0
|
||||
episode_intervention = False
|
||||
episode_intervention_steps = 0
|
||||
episode_total_steps = 0
|
||||
|
||||
# Reset environment and processors
|
||||
obs, info = online_env.reset()
|
||||
complementary_data = {"raw_joint_positions": info.pop("raw_joint_positions")}
|
||||
env_processor.reset()
|
||||
action_processor.reset()
|
||||
|
||||
# Process initial observation
|
||||
transition = create_transition(observation=obs, info=info, complementary_data=complementary_data)
|
||||
transition = env_processor(transition)
|
||||
|
||||
if cfg.env.fps is not None:
|
||||
dt_time = time.perf_counter() - start_time
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -107,6 +107,45 @@ class GamepadTeleop(Teleoperator):
|
||||
|
||||
return action_dict
|
||||
|
||||
def get_teleop_events(self) -> dict[str, Any]:
|
||||
"""
|
||||
Get extra control events from the gamepad such as intervention status,
|
||||
episode termination, success indicators, etc.
|
||||
|
||||
Returns:
|
||||
Dictionary containing:
|
||||
- is_intervention: bool - Whether human is currently intervening
|
||||
- terminate_episode: bool - Whether to terminate the current episode
|
||||
- success: bool - Whether the episode was successful
|
||||
- rerecord_episode: bool - Whether to rerecord the episode
|
||||
"""
|
||||
if self.gamepad is None:
|
||||
return {
|
||||
"is_intervention": False,
|
||||
"terminate_episode": False,
|
||||
"success": False,
|
||||
"rerecord_episode": False,
|
||||
}
|
||||
|
||||
# Update gamepad state to get fresh inputs
|
||||
self.gamepad.update()
|
||||
|
||||
# Check if intervention is active
|
||||
is_intervention = self.gamepad.should_intervene()
|
||||
|
||||
# Get episode end status
|
||||
episode_end_status = self.gamepad.get_episode_end_status()
|
||||
terminate_episode = episode_end_status is not None
|
||||
success = episode_end_status == "success"
|
||||
rerecord_episode = episode_end_status == "rerecord_episode"
|
||||
|
||||
return {
|
||||
"is_intervention": is_intervention,
|
||||
"terminate_episode": terminate_episode,
|
||||
"success": success,
|
||||
"rerecord_episode": rerecord_episode,
|
||||
}
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Disconnect from the gamepad."""
|
||||
if self.gamepad is not None:
|
||||
|
||||
@@ -235,3 +235,67 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
|
||||
action_dict["gripper"] = gripper_action
|
||||
|
||||
return action_dict
|
||||
|
||||
def get_teleop_events(self) -> dict[str, Any]:
|
||||
"""
|
||||
Get extra control events from the keyboard such as intervention status,
|
||||
episode termination, success indicators, etc.
|
||||
|
||||
Keyboard mappings:
|
||||
- Any movement keys pressed = intervention active
|
||||
- 's' key = success (terminate episode successfully)
|
||||
- 'r' key = rerecord episode (terminate and rerecord)
|
||||
- 'q' key = quit episode (terminate without success)
|
||||
|
||||
Returns:
|
||||
Dictionary containing:
|
||||
- is_intervention: bool - Whether human is currently intervening
|
||||
- terminate_episode: bool - Whether to terminate the current episode
|
||||
- success: bool - Whether the episode was successful
|
||||
- rerecord_episode: bool - Whether to rerecord the episode
|
||||
"""
|
||||
if not self.is_connected:
|
||||
return {
|
||||
"is_intervention": False,
|
||||
"terminate_episode": False,
|
||||
"success": False,
|
||||
"rerecord_episode": False,
|
||||
}
|
||||
|
||||
# Check if any movement keys are currently pressed (indicates intervention)
|
||||
movement_keys = [
|
||||
keyboard.Key.up,
|
||||
keyboard.Key.down,
|
||||
keyboard.Key.left,
|
||||
keyboard.Key.right,
|
||||
keyboard.Key.shift,
|
||||
keyboard.Key.shift_r,
|
||||
keyboard.Key.ctrl_r,
|
||||
keyboard.Key.ctrl_l,
|
||||
]
|
||||
is_intervention = any(self.current_pressed.get(key, False) for key in movement_keys)
|
||||
|
||||
# Check for episode control commands from misc_keys_queue
|
||||
terminate_episode = False
|
||||
success = False
|
||||
rerecord_episode = False
|
||||
|
||||
# Process any pending misc keys
|
||||
while not self.misc_keys_queue.empty():
|
||||
key = self.misc_keys_queue.get_nowait()
|
||||
if key == "s":
|
||||
terminate_episode = True
|
||||
success = True
|
||||
elif key == "r":
|
||||
terminate_episode = True
|
||||
rerecord_episode = True
|
||||
elif key == "q":
|
||||
terminate_episode = True
|
||||
success = False
|
||||
|
||||
return {
|
||||
"is_intervention": is_intervention,
|
||||
"terminate_episode": terminate_episode,
|
||||
"success": success,
|
||||
"rerecord_episode": rerecord_episode,
|
||||
}
|
||||
|
||||
@@ -160,6 +160,18 @@ class Teleoperator(abc.ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_teleop_events(self) -> dict[str, Any]:
|
||||
"""
|
||||
Get extra control events from the teleoperator such as intervention status,
|
||||
episode termination, success indicators, etc.
|
||||
Check the implementation of the gamepad for an example.
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: A dictionary containing control events with keys and values that are specific to the setup.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_feedback(self, feedback: dict[str, Any]) -> None:
|
||||
"""
|
||||
|
||||
@@ -0,0 +1,195 @@
|
||||
---
|
||||
library_name: lerobot
|
||||
tags:
|
||||
- robotics
|
||||
- lerobot
|
||||
- safetensors
|
||||
pipeline_tag: robotics
|
||||
---
|
||||
|
||||
# RobotProcessor
|
||||
|
||||
## Overview
|
||||
|
||||
RobotProcessor is a composable, debuggable post-processing pipeline for robot transitions in the LeRobot framework. It orchestrates an ordered collection of small, functional transforms (steps) that are executed left-to-right on each incoming `EnvTransition`.
|
||||
|
||||
## Architecture
|
||||
|
||||
The RobotProcessor provides a modular architecture for processing robot environment transitions through a sequence of composable steps. Each step is a callable that accepts a full `EnvTransition` tuple and returns a potentially modified tuple of the same structure.
|
||||
|
||||
### EnvTransition Structure
|
||||
|
||||
An `EnvTransition` is a 7-tuple containing:
|
||||
|
||||
1. **observation**: Current state observation
|
||||
2. **action**: Action taken (can be None)
|
||||
3. **reward**: Reward received (float or None)
|
||||
4. **done**: Episode termination flag (bool or None)
|
||||
5. **truncated**: Episode truncation flag (bool or None)
|
||||
6. **info**: Additional information dictionary
|
||||
7. **complementary_data**: Extra data dictionary
|
||||
|
||||
## Key Features
|
||||
|
||||
- **Composable Pipeline**: Chain multiple processing steps in a specific order
|
||||
- **State Persistence**: Save and load processor state using SafeTensors format
|
||||
- **Hugging Face Hub Integration**: Easy sharing and loading via `save_pretrained()` and `from_pretrained()`
|
||||
- **Debugging Support**: Step-through functionality to inspect intermediate transformations
|
||||
- **Hook System**: Before/after step hooks for additional processing or monitoring
|
||||
- **Device Support**: Move tensor states to different devices (CPU/GPU)
|
||||
- **Performance Profiling**: Built-in profiling to identify bottlenecks
|
||||
|
||||
## Installation
|
||||
|
||||
Follow the [installation instructions](https://huggingface.co/docs/lerobot/installation) to install the package.
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic Example
|
||||
|
||||
```python
|
||||
from lerobot.processor.pipeline import RobotProcessor
|
||||
from your_steps import ObservationNormalizer, VelocityCalculator
|
||||
|
||||
# Create a processor with multiple steps
|
||||
processor = RobotProcessor(
|
||||
steps=[
|
||||
ObservationNormalizer(mean=0, std=1),
|
||||
VelocityCalculator(window_size=5),
|
||||
],
|
||||
name="my_robot_processor",
|
||||
seed=42
|
||||
)
|
||||
|
||||
# Process a transition
|
||||
obs, info = env.reset()
|
||||
transition = (obs, None, 0.0, False, False, info, {})
|
||||
processed_transition = processor(transition)
|
||||
|
||||
# Extract processed observation
|
||||
processed_obs = processed_transition[0]
|
||||
```
|
||||
|
||||
### Saving and Loading
|
||||
|
||||
```python
|
||||
# Save locally
|
||||
processor.save_pretrained("./my_processor")
|
||||
|
||||
# Push to Hugging Face Hub
|
||||
processor.push_to_hub("username/my-robot-processor")
|
||||
|
||||
# Load from Hub
|
||||
loaded_processor = RobotProcessor.from_pretrained("username/my-robot-processor")
|
||||
```
|
||||
|
||||
### Debugging with Step-Through
|
||||
|
||||
```python
|
||||
# Inspect intermediate results
|
||||
for idx, intermediate_transition in enumerate(processor.step_through(transition)):
|
||||
print(f"After step {idx}: {intermediate_transition[0]}") # Print observation
|
||||
```
|
||||
|
||||
### Using Hooks
|
||||
|
||||
```python
|
||||
# Add monitoring hook
|
||||
def log_observation(step_idx, transition):
|
||||
print(f"Step {step_idx}: obs shape = {transition[0].shape}")
|
||||
return None # Don't modify transition
|
||||
|
||||
processor.register_before_step_hook(log_observation)
|
||||
```
|
||||
|
||||
## Creating Custom Steps
|
||||
|
||||
To create a custom processor step, implement the `ProcessorStep` protocol:
|
||||
|
||||
```python
|
||||
from lerobot.processor.pipeline import ProcessorStepRegistry, EnvTransition
|
||||
|
||||
@ProcessorStepRegistry.register("my_custom_step")
|
||||
class MyCustomStep:
|
||||
def __init__(self, param1=1.0):
|
||||
self.param1 = param1
|
||||
self.buffer = []
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
obs, action, reward, done, truncated, info, comp_data = transition
|
||||
# Process observation
|
||||
processed_obs = obs * self.param1
|
||||
return (processed_obs, action, reward, done, truncated, info, comp_data)
|
||||
|
||||
def get_config(self) -> dict:
|
||||
return {"param1": self.param1}
|
||||
|
||||
def state_dict(self) -> dict:
|
||||
# Return only torch.Tensor state
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict) -> None:
|
||||
# Load tensor state
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
# Clear buffers at episode boundaries
|
||||
self.buffer.clear()
|
||||
```
|
||||
|
||||
## Advanced Features
|
||||
|
||||
### Device Management
|
||||
|
||||
```python
|
||||
# Move all tensor states to GPU
|
||||
processor = processor.to("cuda")
|
||||
|
||||
# Move to specific device
|
||||
processor = processor.to(torch.device("cuda:1"))
|
||||
```
|
||||
|
||||
### Performance Profiling
|
||||
|
||||
```python
|
||||
# Profile step execution times
|
||||
profile_results = processor.profile_steps(transition, num_runs=100)
|
||||
for step_name, time_ms in profile_results.items():
|
||||
print(f"{step_name}: {time_ms:.3f} ms")
|
||||
```
|
||||
|
||||
### Processor Slicing
|
||||
|
||||
```python
|
||||
# Get a single step
|
||||
first_step = processor[0]
|
||||
|
||||
# Create a sub-processor with steps 1-3
|
||||
sub_processor = processor[1:4]
|
||||
```
|
||||
|
||||
## Model Card Specifications
|
||||
|
||||
- **Pipeline Tag**: robotics
|
||||
- **Library**: lerobot
|
||||
- **Format**: safetensors
|
||||
- **License**: Apache 2.0
|
||||
|
||||
## Limitations
|
||||
|
||||
- Steps must maintain the 7-tuple structure of EnvTransition
|
||||
- All tensor state must be separated from configuration for proper serialization
|
||||
- Steps are executed sequentially (no parallel processing within a single transition)
|
||||
|
||||
## Citation
|
||||
|
||||
If you use RobotProcessor in your research, please cite:
|
||||
|
||||
```bibtex
|
||||
@misc{cadene2024lerobot,
|
||||
author = {Cadene, Remi and Alibert, Simon and Soare, Alexander and Gallouedec, Quentin and Zouitine, Adil and Palma, Steven and Kooijmans, Pepijn and Aractingi, Michel and Shukor, Mustafa and Aubakirova, Dana and Russi, Martino and Capuano, Francesco and Pascale, Caroline and Choghari, Jade and Moss, Jess and Wolf, Thomas},
|
||||
title = {LeRobot: State-of-the-art Machine Learning for Real-World Robotics in Pytorch},
|
||||
howpublished = "\url{https://github.com/huggingface/lerobot}",
|
||||
year = {2024}
|
||||
}
|
||||
```
|
||||
@@ -19,6 +19,7 @@ import traceback
|
||||
import pytest
|
||||
from serial import SerialException
|
||||
|
||||
from lerobot.configs.types import FeatureType, PolicyFeature
|
||||
from tests.utils import DEVICE
|
||||
|
||||
# Import fixture modules as plugins
|
||||
@@ -69,3 +70,19 @@ def patch_builtins_input(monkeypatch):
|
||||
print(text)
|
||||
|
||||
monkeypatch.setattr("builtins.input", print_text)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def policy_feature_factory():
|
||||
"""PolicyFeature factory"""
|
||||
|
||||
def _pf(ft: FeatureType, shape: tuple[int, ...]) -> PolicyFeature:
|
||||
return PolicyFeature(type=ft, shape=shape)
|
||||
|
||||
return _pf
|
||||
|
||||
|
||||
def assert_contract_is_typed(features: dict[str, PolicyFeature]) -> None:
|
||||
assert isinstance(features, dict)
|
||||
assert all(isinstance(k, str) for k in features.keys())
|
||||
assert all(isinstance(v, PolicyFeature) for v in features.values())
|
||||
|
||||
@@ -22,7 +22,7 @@ from gymnasium.utils.env_checker import check_env
|
||||
|
||||
import lerobot
|
||||
from lerobot.envs.factory import make_env, make_env_config
|
||||
from lerobot.envs.utils import preprocess_observation
|
||||
from lerobot.processor import RobotProcessor, TransitionKey, VanillaObservationProcessor
|
||||
from tests.utils import require_env
|
||||
|
||||
OBS_TYPES = ["state", "pixels", "pixels_agent_pos"]
|
||||
@@ -48,7 +48,12 @@ def test_factory(env_name):
|
||||
cfg = make_env_config(env_name)
|
||||
env = make_env(cfg, n_envs=1)
|
||||
obs, _ = env.reset()
|
||||
obs = preprocess_observation(obs)
|
||||
|
||||
# Process observation using processor
|
||||
obs_processor = RobotProcessor([VanillaObservationProcessor()])
|
||||
transition = (obs, None, None, None, None, None, None)
|
||||
processed_transition = obs_processor(transition)
|
||||
obs = processed_transition[TransitionKey.OBSERVATION]
|
||||
|
||||
# test image keys are float32 in range [0,1]
|
||||
for key in obs:
|
||||
|
||||
@@ -30,7 +30,6 @@ from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.datasets.factory import make_dataset
|
||||
from lerobot.datasets.utils import cycle, dataset_to_policy_features
|
||||
from lerobot.envs.factory import make_env, make_env_config
|
||||
from lerobot.envs.utils import preprocess_observation
|
||||
from lerobot.optim.factory import make_optimizer_and_scheduler
|
||||
from lerobot.policies.act.modeling_act import ACTTemporalEnsembler
|
||||
from lerobot.policies.factory import (
|
||||
@@ -40,6 +39,7 @@ from lerobot.policies.factory import (
|
||||
)
|
||||
from lerobot.policies.normalize import Normalize, Unnormalize
|
||||
from lerobot.policies.pretrained import PreTrainedPolicy
|
||||
from lerobot.processor import RobotProcessor, TransitionKey, VanillaObservationProcessor
|
||||
from lerobot.utils.random_utils import seeded_context
|
||||
from tests.artifacts.policies.save_policy_to_safetensors import get_policy_stats
|
||||
from tests.utils import DEVICE, require_cpu, require_env, require_x86_64_kernel
|
||||
@@ -185,7 +185,10 @@ def test_policy(ds_repo_id, env_name, env_kwargs, policy_name, policy_kwargs):
|
||||
observation, _ = env.reset(seed=train_cfg.seed)
|
||||
|
||||
# apply transform to normalize the observations
|
||||
observation = preprocess_observation(observation)
|
||||
obs_processor = RobotProcessor([VanillaObservationProcessor()])
|
||||
transition = (observation, None, None, None, None, None, None)
|
||||
processed_transition = obs_processor(transition)
|
||||
observation = processed_transition[TransitionKey.OBSERVATION]
|
||||
|
||||
# send observation to device/gpu
|
||||
observation = {key: observation[key].to(DEVICE, non_blocking=True) for key in observation}
|
||||
|
||||
@@ -0,0 +1,282 @@
|
||||
import torch
|
||||
|
||||
from lerobot.processor.pipeline import (
|
||||
RobotProcessor,
|
||||
TransitionKey,
|
||||
_default_batch_to_transition,
|
||||
_default_transition_to_batch,
|
||||
)
|
||||
|
||||
|
||||
def _dummy_batch():
|
||||
"""Create a dummy batch using the new format with observation.* and next.* keys."""
|
||||
return {
|
||||
"observation.image.left": torch.randn(1, 3, 128, 128),
|
||||
"observation.image.right": torch.randn(1, 3, 128, 128),
|
||||
"observation.state": torch.tensor([[0.1, 0.2, 0.3, 0.4]]),
|
||||
"action": torch.tensor([[0.5]]),
|
||||
"next.reward": 1.0,
|
||||
"next.done": False,
|
||||
"next.truncated": False,
|
||||
"info": {"key": "value"},
|
||||
}
|
||||
|
||||
|
||||
def test_observation_grouping_roundtrip():
|
||||
"""Test that observation.* keys are properly grouped and ungrouped."""
|
||||
proc = RobotProcessor([])
|
||||
batch_in = _dummy_batch()
|
||||
batch_out = proc(batch_in)
|
||||
|
||||
# Check that all observation.* keys are preserved
|
||||
original_obs_keys = {k: v for k, v in batch_in.items() if k.startswith("observation.")}
|
||||
reconstructed_obs_keys = {k: v for k, v in batch_out.items() if k.startswith("observation.")}
|
||||
|
||||
assert set(original_obs_keys.keys()) == set(reconstructed_obs_keys.keys())
|
||||
|
||||
# Check tensor values
|
||||
assert torch.allclose(batch_out["observation.image.left"], batch_in["observation.image.left"])
|
||||
assert torch.allclose(batch_out["observation.image.right"], batch_in["observation.image.right"])
|
||||
assert torch.allclose(batch_out["observation.state"], batch_in["observation.state"])
|
||||
|
||||
# Check other fields
|
||||
assert torch.allclose(batch_out["action"], batch_in["action"])
|
||||
assert batch_out["next.reward"] == batch_in["next.reward"]
|
||||
assert batch_out["next.done"] == batch_in["next.done"]
|
||||
assert batch_out["next.truncated"] == batch_in["next.truncated"]
|
||||
assert batch_out["info"] == batch_in["info"]
|
||||
|
||||
|
||||
def test_batch_to_transition_observation_grouping():
|
||||
"""Test that _default_batch_to_transition correctly groups observation.* keys."""
|
||||
batch = {
|
||||
"observation.image.top": torch.randn(1, 3, 128, 128),
|
||||
"observation.image.left": torch.randn(1, 3, 128, 128),
|
||||
"observation.state": [1, 2, 3, 4],
|
||||
"action": "action_data",
|
||||
"next.reward": 1.5,
|
||||
"next.done": True,
|
||||
"next.truncated": False,
|
||||
"info": {"episode": 42},
|
||||
}
|
||||
|
||||
transition = _default_batch_to_transition(batch)
|
||||
|
||||
# Check observation is a dict with all observation.* keys
|
||||
assert isinstance(transition[TransitionKey.OBSERVATION], dict)
|
||||
assert "observation.image.top" in transition[TransitionKey.OBSERVATION]
|
||||
assert "observation.image.left" in transition[TransitionKey.OBSERVATION]
|
||||
assert "observation.state" in transition[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check values are preserved
|
||||
assert torch.allclose(
|
||||
transition[TransitionKey.OBSERVATION]["observation.image.top"], batch["observation.image.top"]
|
||||
)
|
||||
assert torch.allclose(
|
||||
transition[TransitionKey.OBSERVATION]["observation.image.left"], batch["observation.image.left"]
|
||||
)
|
||||
assert transition[TransitionKey.OBSERVATION]["observation.state"] == [1, 2, 3, 4]
|
||||
|
||||
# Check other fields
|
||||
assert transition[TransitionKey.ACTION] == "action_data"
|
||||
assert transition[TransitionKey.REWARD] == 1.5
|
||||
assert transition[TransitionKey.DONE]
|
||||
assert not transition[TransitionKey.TRUNCATED]
|
||||
assert transition[TransitionKey.INFO] == {"episode": 42}
|
||||
assert transition[TransitionKey.COMPLEMENTARY_DATA] == {}
|
||||
|
||||
|
||||
def test_transition_to_batch_observation_flattening():
|
||||
"""Test that _default_transition_to_batch correctly flattens observation dict."""
|
||||
observation_dict = {
|
||||
"observation.image.top": torch.randn(1, 3, 128, 128),
|
||||
"observation.image.left": torch.randn(1, 3, 128, 128),
|
||||
"observation.state": [1, 2, 3, 4],
|
||||
}
|
||||
|
||||
transition = {
|
||||
TransitionKey.OBSERVATION: observation_dict,
|
||||
TransitionKey.ACTION: "action_data",
|
||||
TransitionKey.REWARD: 1.5,
|
||||
TransitionKey.DONE: True,
|
||||
TransitionKey.TRUNCATED: False,
|
||||
TransitionKey.INFO: {"episode": 42},
|
||||
TransitionKey.COMPLEMENTARY_DATA: {},
|
||||
}
|
||||
|
||||
batch = _default_transition_to_batch(transition)
|
||||
|
||||
# Check that observation.* keys are flattened back to batch
|
||||
assert "observation.image.top" in batch
|
||||
assert "observation.image.left" in batch
|
||||
assert "observation.state" in batch
|
||||
|
||||
# Check values are preserved
|
||||
assert torch.allclose(batch["observation.image.top"], observation_dict["observation.image.top"])
|
||||
assert torch.allclose(batch["observation.image.left"], observation_dict["observation.image.left"])
|
||||
assert batch["observation.state"] == [1, 2, 3, 4]
|
||||
|
||||
# Check other fields are mapped to next.* format
|
||||
assert batch["action"] == "action_data"
|
||||
assert batch["next.reward"] == 1.5
|
||||
assert batch["next.done"]
|
||||
assert not batch["next.truncated"]
|
||||
assert batch["info"] == {"episode": 42}
|
||||
|
||||
|
||||
def test_no_observation_keys():
|
||||
"""Test behavior when there are no observation.* keys."""
|
||||
batch = {
|
||||
"action": "action_data",
|
||||
"next.reward": 2.0,
|
||||
"next.done": False,
|
||||
"next.truncated": True,
|
||||
"info": {"test": "no_obs"},
|
||||
}
|
||||
|
||||
transition = _default_batch_to_transition(batch)
|
||||
|
||||
# Observation should be None when no observation.* keys
|
||||
assert transition[TransitionKey.OBSERVATION] is None
|
||||
|
||||
# Check other fields
|
||||
assert transition[TransitionKey.ACTION] == "action_data"
|
||||
assert transition[TransitionKey.REWARD] == 2.0
|
||||
assert not transition[TransitionKey.DONE]
|
||||
assert transition[TransitionKey.TRUNCATED]
|
||||
assert transition[TransitionKey.INFO] == {"test": "no_obs"}
|
||||
|
||||
# Round trip should work
|
||||
reconstructed_batch = _default_transition_to_batch(transition)
|
||||
assert reconstructed_batch["action"] == "action_data"
|
||||
assert reconstructed_batch["next.reward"] == 2.0
|
||||
assert not reconstructed_batch["next.done"]
|
||||
assert reconstructed_batch["next.truncated"]
|
||||
assert reconstructed_batch["info"] == {"test": "no_obs"}
|
||||
|
||||
|
||||
def test_minimal_batch():
|
||||
"""Test with minimal batch containing only observation.* and action."""
|
||||
batch = {"observation.state": "minimal_state", "action": "minimal_action"}
|
||||
|
||||
transition = _default_batch_to_transition(batch)
|
||||
|
||||
# Check observation
|
||||
assert transition[TransitionKey.OBSERVATION] == {"observation.state": "minimal_state"}
|
||||
assert transition[TransitionKey.ACTION] == "minimal_action"
|
||||
|
||||
# Check defaults
|
||||
assert transition[TransitionKey.REWARD] == 0.0
|
||||
assert not transition[TransitionKey.DONE]
|
||||
assert not transition[TransitionKey.TRUNCATED]
|
||||
assert transition[TransitionKey.INFO] == {}
|
||||
assert transition[TransitionKey.COMPLEMENTARY_DATA] == {}
|
||||
|
||||
# Round trip
|
||||
reconstructed_batch = _default_transition_to_batch(transition)
|
||||
assert reconstructed_batch["observation.state"] == "minimal_state"
|
||||
assert reconstructed_batch["action"] == "minimal_action"
|
||||
assert reconstructed_batch["next.reward"] == 0.0
|
||||
assert not reconstructed_batch["next.done"]
|
||||
assert not reconstructed_batch["next.truncated"]
|
||||
assert reconstructed_batch["info"] == {}
|
||||
|
||||
|
||||
def test_empty_batch():
|
||||
"""Test behavior with empty batch."""
|
||||
batch = {}
|
||||
|
||||
transition = _default_batch_to_transition(batch)
|
||||
|
||||
# All fields should have defaults
|
||||
assert transition[TransitionKey.OBSERVATION] is None
|
||||
assert transition[TransitionKey.ACTION] is None
|
||||
assert transition[TransitionKey.REWARD] == 0.0
|
||||
assert not transition[TransitionKey.DONE]
|
||||
assert not transition[TransitionKey.TRUNCATED]
|
||||
assert transition[TransitionKey.INFO] == {}
|
||||
assert transition[TransitionKey.COMPLEMENTARY_DATA] == {}
|
||||
|
||||
# Round trip
|
||||
reconstructed_batch = _default_transition_to_batch(transition)
|
||||
assert reconstructed_batch["action"] is None
|
||||
assert reconstructed_batch["next.reward"] == 0.0
|
||||
assert not reconstructed_batch["next.done"]
|
||||
assert not reconstructed_batch["next.truncated"]
|
||||
assert reconstructed_batch["info"] == {}
|
||||
|
||||
|
||||
def test_complex_nested_observation():
|
||||
"""Test with complex nested observation data."""
|
||||
batch = {
|
||||
"observation.image.top": {"image": torch.randn(1, 3, 128, 128), "timestamp": 1234567890},
|
||||
"observation.image.left": {"image": torch.randn(1, 3, 128, 128), "timestamp": 1234567891},
|
||||
"observation.state": torch.randn(7),
|
||||
"action": torch.randn(8),
|
||||
"next.reward": 3.14,
|
||||
"next.done": False,
|
||||
"next.truncated": True,
|
||||
"info": {"episode_length": 200, "success": True},
|
||||
}
|
||||
|
||||
transition = _default_batch_to_transition(batch)
|
||||
reconstructed_batch = _default_transition_to_batch(transition)
|
||||
|
||||
# Check that all observation keys are preserved
|
||||
original_obs_keys = {k for k in batch if k.startswith("observation.")}
|
||||
reconstructed_obs_keys = {k for k in reconstructed_batch if k.startswith("observation.")}
|
||||
|
||||
assert original_obs_keys == reconstructed_obs_keys
|
||||
|
||||
# Check tensor values
|
||||
assert torch.allclose(batch["observation.state"], reconstructed_batch["observation.state"])
|
||||
|
||||
# Check nested dict with tensors
|
||||
assert torch.allclose(
|
||||
batch["observation.image.top"]["image"], reconstructed_batch["observation.image.top"]["image"]
|
||||
)
|
||||
assert torch.allclose(
|
||||
batch["observation.image.left"]["image"], reconstructed_batch["observation.image.left"]["image"]
|
||||
)
|
||||
|
||||
# Check action tensor
|
||||
assert torch.allclose(batch["action"], reconstructed_batch["action"])
|
||||
|
||||
# Check other fields
|
||||
assert batch["next.reward"] == reconstructed_batch["next.reward"]
|
||||
assert batch["next.done"] == reconstructed_batch["next.done"]
|
||||
assert batch["next.truncated"] == reconstructed_batch["next.truncated"]
|
||||
assert batch["info"] == reconstructed_batch["info"]
|
||||
|
||||
|
||||
def test_custom_converter():
|
||||
"""Test that custom converters can still be used."""
|
||||
|
||||
def to_tr(batch):
|
||||
# Custom converter that modifies the reward
|
||||
tr = _default_batch_to_transition(batch)
|
||||
# Double the reward
|
||||
reward = tr.get(TransitionKey.REWARD, 0.0)
|
||||
new_tr = tr.copy()
|
||||
new_tr[TransitionKey.REWARD] = reward * 2 if reward is not None else 0.0
|
||||
return new_tr
|
||||
|
||||
def to_batch(tr):
|
||||
batch = _default_transition_to_batch(tr)
|
||||
return batch
|
||||
|
||||
processor = RobotProcessor(steps=[], to_transition=to_tr, to_output=to_batch)
|
||||
|
||||
batch = {
|
||||
"observation.state": torch.randn(1, 4),
|
||||
"action": torch.randn(1, 2),
|
||||
"next.reward": 1.0,
|
||||
"next.done": False,
|
||||
}
|
||||
|
||||
result = processor(batch)
|
||||
|
||||
# Check the reward was doubled by our custom converter
|
||||
assert result["next.reward"] == 2.0
|
||||
assert torch.allclose(result["observation.state"], batch["observation.state"])
|
||||
assert torch.allclose(result["action"], batch["action"])
|
||||
@@ -0,0 +1,628 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from unittest.mock import Mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.processor.normalize_processor import (
|
||||
NormalizerProcessor,
|
||||
UnnormalizerProcessor,
|
||||
_convert_stats_to_tensors,
|
||||
)
|
||||
from lerobot.processor.pipeline import RobotProcessor, TransitionKey
|
||||
|
||||
|
||||
def create_transition(
|
||||
observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None
|
||||
):
|
||||
"""Helper to create an EnvTransition dictionary."""
|
||||
return {
|
||||
TransitionKey.OBSERVATION: observation,
|
||||
TransitionKey.ACTION: action,
|
||||
TransitionKey.REWARD: reward,
|
||||
TransitionKey.DONE: done,
|
||||
TransitionKey.TRUNCATED: truncated,
|
||||
TransitionKey.INFO: info,
|
||||
TransitionKey.COMPLEMENTARY_DATA: complementary_data,
|
||||
}
|
||||
|
||||
|
||||
def test_numpy_conversion():
|
||||
stats = {
|
||||
"observation.image": {
|
||||
"mean": np.array([0.5, 0.5, 0.5]),
|
||||
"std": np.array([0.2, 0.2, 0.2]),
|
||||
}
|
||||
}
|
||||
tensor_stats = _convert_stats_to_tensors(stats)
|
||||
|
||||
assert isinstance(tensor_stats["observation.image"]["mean"], torch.Tensor)
|
||||
assert isinstance(tensor_stats["observation.image"]["std"], torch.Tensor)
|
||||
assert torch.allclose(tensor_stats["observation.image"]["mean"], torch.tensor([0.5, 0.5, 0.5]))
|
||||
assert torch.allclose(tensor_stats["observation.image"]["std"], torch.tensor([0.2, 0.2, 0.2]))
|
||||
|
||||
|
||||
def test_tensor_conversion():
|
||||
stats = {
|
||||
"action": {
|
||||
"mean": torch.tensor([0.0, 0.0]),
|
||||
"std": torch.tensor([1.0, 1.0]),
|
||||
}
|
||||
}
|
||||
tensor_stats = _convert_stats_to_tensors(stats)
|
||||
|
||||
assert tensor_stats["action"]["mean"].dtype == torch.float32
|
||||
assert tensor_stats["action"]["std"].dtype == torch.float32
|
||||
|
||||
|
||||
def test_scalar_conversion():
|
||||
stats = {
|
||||
"reward": {
|
||||
"mean": 0.5,
|
||||
"std": 0.1,
|
||||
}
|
||||
}
|
||||
tensor_stats = _convert_stats_to_tensors(stats)
|
||||
|
||||
assert torch.allclose(tensor_stats["reward"]["mean"], torch.tensor(0.5))
|
||||
assert torch.allclose(tensor_stats["reward"]["std"], torch.tensor(0.1))
|
||||
|
||||
|
||||
def test_list_conversion():
|
||||
stats = {
|
||||
"observation.state": {
|
||||
"min": [0.0, -1.0, -2.0],
|
||||
"max": [1.0, 1.0, 2.0],
|
||||
}
|
||||
}
|
||||
tensor_stats = _convert_stats_to_tensors(stats)
|
||||
|
||||
assert torch.allclose(tensor_stats["observation.state"]["min"], torch.tensor([0.0, -1.0, -2.0]))
|
||||
assert torch.allclose(tensor_stats["observation.state"]["max"], torch.tensor([1.0, 1.0, 2.0]))
|
||||
|
||||
|
||||
def test_unsupported_type():
|
||||
stats = {
|
||||
"bad_key": {
|
||||
"mean": "string_value",
|
||||
}
|
||||
}
|
||||
with pytest.raises(TypeError, match="Unsupported type"):
|
||||
_convert_stats_to_tensors(stats)
|
||||
|
||||
|
||||
# Helper functions to create feature maps and norm maps
|
||||
def _create_observation_features():
|
||||
return {
|
||||
"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96)),
|
||||
"observation.state": PolicyFeature(FeatureType.STATE, (2,)),
|
||||
}
|
||||
|
||||
|
||||
def _create_observation_norm_map():
|
||||
return {
|
||||
FeatureType.VISUAL: NormalizationMode.MEAN_STD,
|
||||
FeatureType.STATE: NormalizationMode.MIN_MAX,
|
||||
}
|
||||
|
||||
|
||||
# Fixtures for observation normalisation tests using NormalizerProcessor
|
||||
@pytest.fixture
|
||||
def observation_stats():
|
||||
return {
|
||||
"observation.image": {
|
||||
"mean": np.array([0.5, 0.5, 0.5]),
|
||||
"std": np.array([0.2, 0.2, 0.2]),
|
||||
},
|
||||
"observation.state": {
|
||||
"min": np.array([0.0, -1.0]),
|
||||
"max": np.array([1.0, 1.0]),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def observation_normalizer(observation_stats):
|
||||
"""Return a NormalizerProcessor that only has observation stats (no action)."""
|
||||
features = _create_observation_features()
|
||||
norm_map = _create_observation_norm_map()
|
||||
return NormalizerProcessor(features=features, norm_map=norm_map, stats=observation_stats)
|
||||
|
||||
|
||||
def test_mean_std_normalization(observation_normalizer):
|
||||
observation = {
|
||||
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
|
||||
"observation.state": torch.tensor([0.5, 0.0]),
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
normalized_transition = observation_normalizer(transition)
|
||||
normalized_obs = normalized_transition[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check mean/std normalization
|
||||
expected_image = (torch.tensor([0.7, 0.5, 0.3]) - 0.5) / 0.2
|
||||
assert torch.allclose(normalized_obs["observation.image"], expected_image)
|
||||
|
||||
|
||||
def test_min_max_normalization(observation_normalizer):
|
||||
observation = {
|
||||
"observation.state": torch.tensor([0.5, 0.0]),
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
normalized_transition = observation_normalizer(transition)
|
||||
normalized_obs = normalized_transition[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check min/max normalization to [-1, 1]
|
||||
# For state[0]: 2 * (0.5 - 0.0) / (1.0 - 0.0) - 1 = 0.0
|
||||
# For state[1]: 2 * (0.0 - (-1.0)) / (1.0 - (-1.0)) - 1 = 0.0
|
||||
expected_state = torch.tensor([0.0, 0.0])
|
||||
assert torch.allclose(normalized_obs["observation.state"], expected_state, atol=1e-6)
|
||||
|
||||
|
||||
def test_selective_normalization(observation_stats):
|
||||
features = _create_observation_features()
|
||||
norm_map = _create_observation_norm_map()
|
||||
normalizer = NormalizerProcessor(
|
||||
features=features, norm_map=norm_map, stats=observation_stats, normalize_keys={"observation.image"}
|
||||
)
|
||||
|
||||
observation = {
|
||||
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
|
||||
"observation.state": torch.tensor([0.5, 0.0]),
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
normalized_transition = normalizer(transition)
|
||||
normalized_obs = normalized_transition[TransitionKey.OBSERVATION]
|
||||
|
||||
# Only image should be normalized
|
||||
assert torch.allclose(normalized_obs["observation.image"], (torch.tensor([0.7, 0.5, 0.3]) - 0.5) / 0.2)
|
||||
# State should remain unchanged
|
||||
assert torch.allclose(normalized_obs["observation.state"], observation["observation.state"])
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_device_compatibility(observation_stats):
|
||||
features = _create_observation_features()
|
||||
norm_map = _create_observation_norm_map()
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=observation_stats)
|
||||
observation = {
|
||||
"observation.image": torch.tensor([0.7, 0.5, 0.3]).cuda(),
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
normalized_transition = normalizer(transition)
|
||||
normalized_obs = normalized_transition[TransitionKey.OBSERVATION]
|
||||
|
||||
assert normalized_obs["observation.image"].device.type == "cuda"
|
||||
|
||||
|
||||
def test_from_lerobot_dataset():
|
||||
# Mock dataset
|
||||
mock_dataset = Mock()
|
||||
mock_dataset.meta.stats = {
|
||||
"observation.image": {"mean": [0.5], "std": [0.2]},
|
||||
"action": {"mean": [0.0], "std": [1.0]},
|
||||
}
|
||||
|
||||
features = {
|
||||
"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96)),
|
||||
"action": PolicyFeature(FeatureType.ACTION, (1,)),
|
||||
}
|
||||
norm_map = {
|
||||
FeatureType.VISUAL: NormalizationMode.MEAN_STD,
|
||||
FeatureType.ACTION: NormalizationMode.MEAN_STD,
|
||||
}
|
||||
|
||||
normalizer = NormalizerProcessor.from_lerobot_dataset(mock_dataset, features, norm_map)
|
||||
|
||||
# Both observation and action statistics should be present in tensor stats
|
||||
assert "observation.image" in normalizer._tensor_stats
|
||||
assert "action" in normalizer._tensor_stats
|
||||
|
||||
|
||||
def test_state_dict_save_load(observation_normalizer):
|
||||
# Save state
|
||||
state_dict = observation_normalizer.state_dict()
|
||||
|
||||
# Create new normalizer and load state
|
||||
features = _create_observation_features()
|
||||
norm_map = _create_observation_norm_map()
|
||||
new_normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats={})
|
||||
new_normalizer.load_state_dict(state_dict)
|
||||
|
||||
# Test that it works the same
|
||||
observation = {"observation.image": torch.tensor([0.7, 0.5, 0.3])}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result1 = observation_normalizer(transition)[TransitionKey.OBSERVATION]
|
||||
result2 = new_normalizer(transition)[TransitionKey.OBSERVATION]
|
||||
|
||||
assert torch.allclose(result1["observation.image"], result2["observation.image"])
|
||||
|
||||
|
||||
# Fixtures for ActionUnnormalizer tests
|
||||
@pytest.fixture
|
||||
def action_stats_mean_std():
|
||||
return {
|
||||
"mean": np.array([0.0, 0.0, 0.0]),
|
||||
"std": np.array([1.0, 2.0, 0.5]),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def action_stats_min_max():
|
||||
return {
|
||||
"min": np.array([-1.0, -2.0, 0.0]),
|
||||
"max": np.array([1.0, 2.0, 1.0]),
|
||||
}
|
||||
|
||||
|
||||
def _create_action_features():
|
||||
return {
|
||||
"action": PolicyFeature(FeatureType.ACTION, (3,)),
|
||||
}
|
||||
|
||||
|
||||
def _create_action_norm_map_mean_std():
|
||||
return {
|
||||
FeatureType.ACTION: NormalizationMode.MEAN_STD,
|
||||
}
|
||||
|
||||
|
||||
def _create_action_norm_map_min_max():
|
||||
return {
|
||||
FeatureType.ACTION: NormalizationMode.MIN_MAX,
|
||||
}
|
||||
|
||||
|
||||
def test_mean_std_unnormalization(action_stats_mean_std):
|
||||
features = _create_action_features()
|
||||
norm_map = _create_action_norm_map_mean_std()
|
||||
unnormalizer = UnnormalizerProcessor(
|
||||
features=features, norm_map=norm_map, stats={"action": action_stats_mean_std}
|
||||
)
|
||||
|
||||
normalized_action = torch.tensor([1.0, -0.5, 2.0])
|
||||
transition = create_transition(action=normalized_action)
|
||||
|
||||
unnormalized_transition = unnormalizer(transition)
|
||||
unnormalized_action = unnormalized_transition[TransitionKey.ACTION]
|
||||
|
||||
# action * std + mean
|
||||
expected = torch.tensor([1.0 * 1.0 + 0.0, -0.5 * 2.0 + 0.0, 2.0 * 0.5 + 0.0])
|
||||
assert torch.allclose(unnormalized_action, expected)
|
||||
|
||||
|
||||
def test_min_max_unnormalization(action_stats_min_max):
|
||||
features = _create_action_features()
|
||||
norm_map = _create_action_norm_map_min_max()
|
||||
unnormalizer = UnnormalizerProcessor(
|
||||
features=features, norm_map=norm_map, stats={"action": action_stats_min_max}
|
||||
)
|
||||
|
||||
# Actions in [-1, 1]
|
||||
normalized_action = torch.tensor([0.0, -1.0, 1.0])
|
||||
transition = create_transition(action=normalized_action)
|
||||
|
||||
unnormalized_transition = unnormalizer(transition)
|
||||
unnormalized_action = unnormalized_transition[TransitionKey.ACTION]
|
||||
|
||||
# Map from [-1, 1] to [min, max]
|
||||
# (action + 1) / 2 * (max - min) + min
|
||||
expected = torch.tensor(
|
||||
[
|
||||
(0.0 + 1) / 2 * (1.0 - (-1.0)) + (-1.0), # 0.0
|
||||
(-1.0 + 1) / 2 * (2.0 - (-2.0)) + (-2.0), # -2.0
|
||||
(1.0 + 1) / 2 * (1.0 - 0.0) + 0.0, # 1.0
|
||||
]
|
||||
)
|
||||
assert torch.allclose(unnormalized_action, expected)
|
||||
|
||||
|
||||
def test_numpy_action_input(action_stats_mean_std):
|
||||
features = _create_action_features()
|
||||
norm_map = _create_action_norm_map_mean_std()
|
||||
unnormalizer = UnnormalizerProcessor(
|
||||
features=features, norm_map=norm_map, stats={"action": action_stats_mean_std}
|
||||
)
|
||||
|
||||
normalized_action = np.array([1.0, -0.5, 2.0], dtype=np.float32)
|
||||
transition = create_transition(action=normalized_action)
|
||||
|
||||
unnormalized_transition = unnormalizer(transition)
|
||||
unnormalized_action = unnormalized_transition[TransitionKey.ACTION]
|
||||
|
||||
assert isinstance(unnormalized_action, torch.Tensor)
|
||||
expected = torch.tensor([1.0, -1.0, 1.0])
|
||||
assert torch.allclose(unnormalized_action, expected)
|
||||
|
||||
|
||||
def test_none_action(action_stats_mean_std):
|
||||
features = _create_action_features()
|
||||
norm_map = _create_action_norm_map_mean_std()
|
||||
unnormalizer = UnnormalizerProcessor(
|
||||
features=features, norm_map=norm_map, stats={"action": action_stats_mean_std}
|
||||
)
|
||||
|
||||
transition = create_transition()
|
||||
result = unnormalizer(transition)
|
||||
|
||||
# Should return transition unchanged
|
||||
assert result == transition
|
||||
|
||||
|
||||
def test_action_from_lerobot_dataset():
|
||||
mock_dataset = Mock()
|
||||
mock_dataset.meta.stats = {"action": {"mean": [0.0], "std": [1.0]}}
|
||||
features = {"action": PolicyFeature(FeatureType.ACTION, (1,))}
|
||||
norm_map = {FeatureType.ACTION: NormalizationMode.MEAN_STD}
|
||||
unnormalizer = UnnormalizerProcessor.from_lerobot_dataset(mock_dataset, features, norm_map)
|
||||
assert "mean" in unnormalizer._tensor_stats["action"]
|
||||
|
||||
|
||||
# Fixtures for NormalizerProcessor tests
|
||||
@pytest.fixture
|
||||
def full_stats():
|
||||
return {
|
||||
"observation.image": {
|
||||
"mean": np.array([0.5, 0.5, 0.5]),
|
||||
"std": np.array([0.2, 0.2, 0.2]),
|
||||
},
|
||||
"observation.state": {
|
||||
"min": np.array([0.0, -1.0]),
|
||||
"max": np.array([1.0, 1.0]),
|
||||
},
|
||||
"action": {
|
||||
"mean": np.array([0.0, 0.0]),
|
||||
"std": np.array([1.0, 2.0]),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _create_full_features():
|
||||
return {
|
||||
"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96)),
|
||||
"observation.state": PolicyFeature(FeatureType.STATE, (2,)),
|
||||
"action": PolicyFeature(FeatureType.ACTION, (2,)),
|
||||
}
|
||||
|
||||
|
||||
def _create_full_norm_map():
|
||||
return {
|
||||
FeatureType.VISUAL: NormalizationMode.MEAN_STD,
|
||||
FeatureType.STATE: NormalizationMode.MIN_MAX,
|
||||
FeatureType.ACTION: NormalizationMode.MEAN_STD,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def normalizer_processor(full_stats):
|
||||
features = _create_full_features()
|
||||
norm_map = _create_full_norm_map()
|
||||
return NormalizerProcessor(features=features, norm_map=norm_map, stats=full_stats)
|
||||
|
||||
|
||||
def test_combined_normalization(normalizer_processor):
|
||||
observation = {
|
||||
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
|
||||
"observation.state": torch.tensor([0.5, 0.0]),
|
||||
}
|
||||
action = torch.tensor([1.0, -0.5])
|
||||
transition = create_transition(
|
||||
observation=observation,
|
||||
action=action,
|
||||
reward=1.0,
|
||||
done=False,
|
||||
truncated=False,
|
||||
info={},
|
||||
complementary_data={},
|
||||
)
|
||||
|
||||
processed_transition = normalizer_processor(transition)
|
||||
|
||||
# Check normalized observations
|
||||
processed_obs = processed_transition[TransitionKey.OBSERVATION]
|
||||
expected_image = (torch.tensor([0.7, 0.5, 0.3]) - 0.5) / 0.2
|
||||
assert torch.allclose(processed_obs["observation.image"], expected_image)
|
||||
|
||||
# Check normalized action
|
||||
processed_action = processed_transition[TransitionKey.ACTION]
|
||||
expected_action = torch.tensor([(1.0 - 0.0) / 1.0, (-0.5 - 0.0) / 2.0])
|
||||
assert torch.allclose(processed_action, expected_action)
|
||||
|
||||
# Check other fields remain unchanged
|
||||
assert processed_transition[TransitionKey.REWARD] == 1.0
|
||||
assert not processed_transition[TransitionKey.DONE]
|
||||
|
||||
|
||||
def test_processor_from_lerobot_dataset(full_stats):
|
||||
# Mock dataset
|
||||
mock_dataset = Mock()
|
||||
mock_dataset.meta.stats = full_stats
|
||||
|
||||
features = _create_full_features()
|
||||
norm_map = _create_full_norm_map()
|
||||
|
||||
processor = NormalizerProcessor.from_lerobot_dataset(
|
||||
mock_dataset, features, norm_map, normalize_keys={"observation.image"}
|
||||
)
|
||||
|
||||
assert processor.normalize_keys == {"observation.image"}
|
||||
assert "observation.image" in processor._tensor_stats
|
||||
assert "action" in processor._tensor_stats
|
||||
|
||||
|
||||
def test_get_config(full_stats):
|
||||
features = _create_full_features()
|
||||
norm_map = _create_full_norm_map()
|
||||
processor = NormalizerProcessor(
|
||||
features=features, norm_map=norm_map, stats=full_stats, normalize_keys={"observation.image"}, eps=1e-6
|
||||
)
|
||||
|
||||
config = processor.get_config()
|
||||
expected_config = {
|
||||
"normalize_keys": ["observation.image"],
|
||||
"eps": 1e-6,
|
||||
"features": {
|
||||
"observation.image": {"type": "VISUAL", "shape": (3, 96, 96)},
|
||||
"observation.state": {"type": "STATE", "shape": (2,)},
|
||||
"action": {"type": "ACTION", "shape": (2,)},
|
||||
},
|
||||
"norm_map": {
|
||||
"VISUAL": "MEAN_STD",
|
||||
"STATE": "MIN_MAX",
|
||||
"ACTION": "MEAN_STD",
|
||||
},
|
||||
}
|
||||
assert config == expected_config
|
||||
|
||||
|
||||
def test_integration_with_robot_processor(normalizer_processor):
|
||||
"""Test integration with RobotProcessor pipeline"""
|
||||
robot_processor = RobotProcessor([normalizer_processor])
|
||||
|
||||
observation = {
|
||||
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
|
||||
"observation.state": torch.tensor([0.5, 0.0]),
|
||||
}
|
||||
action = torch.tensor([1.0, -0.5])
|
||||
transition = create_transition(
|
||||
observation=observation,
|
||||
action=action,
|
||||
reward=1.0,
|
||||
done=False,
|
||||
truncated=False,
|
||||
info={},
|
||||
complementary_data={},
|
||||
)
|
||||
|
||||
processed_transition = robot_processor(transition)
|
||||
|
||||
# Verify the processing worked
|
||||
assert isinstance(processed_transition[TransitionKey.OBSERVATION], dict)
|
||||
assert isinstance(processed_transition[TransitionKey.ACTION], torch.Tensor)
|
||||
|
||||
|
||||
# Edge case tests
|
||||
def test_empty_observation():
|
||||
stats = {"observation.image": {"mean": [0.5], "std": [0.2]}}
|
||||
features = {"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96))}
|
||||
norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD}
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
|
||||
|
||||
transition = create_transition()
|
||||
result = normalizer(transition)
|
||||
|
||||
assert result == transition
|
||||
|
||||
|
||||
def test_empty_stats():
|
||||
features = {"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96))}
|
||||
norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD}
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats={})
|
||||
observation = {"observation.image": torch.tensor([0.5])}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = normalizer(transition)
|
||||
# Should return observation unchanged since no stats are available
|
||||
assert torch.allclose(
|
||||
result[TransitionKey.OBSERVATION]["observation.image"], observation["observation.image"]
|
||||
)
|
||||
|
||||
|
||||
def test_partial_stats():
|
||||
"""If statistics are incomplete, the value should pass through unchanged."""
|
||||
stats = {"observation.image": {"mean": [0.5]}} # Missing std / (min,max)
|
||||
features = {"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96))}
|
||||
norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD}
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
|
||||
observation = {"observation.image": torch.tensor([0.7])}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
processed = normalizer(transition)[TransitionKey.OBSERVATION]
|
||||
assert torch.allclose(processed["observation.image"], observation["observation.image"])
|
||||
|
||||
|
||||
def test_missing_action_stats_no_error():
|
||||
mock_dataset = Mock()
|
||||
mock_dataset.meta.stats = {"observation.image": {"mean": [0.5], "std": [0.2]}}
|
||||
|
||||
features = {"observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96))}
|
||||
norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD}
|
||||
|
||||
processor = UnnormalizerProcessor.from_lerobot_dataset(mock_dataset, features, norm_map)
|
||||
# The tensor stats should not contain the 'action' key
|
||||
assert "action" not in processor._tensor_stats
|
||||
|
||||
|
||||
def test_serialization_roundtrip(full_stats):
|
||||
"""Test that features and norm_map can be serialized and deserialized correctly."""
|
||||
features = _create_full_features()
|
||||
norm_map = _create_full_norm_map()
|
||||
original_processor = NormalizerProcessor(
|
||||
features=features, norm_map=norm_map, stats=full_stats, normalize_keys={"observation.image"}, eps=1e-6
|
||||
)
|
||||
|
||||
# Get config (serialization)
|
||||
config = original_processor.get_config()
|
||||
|
||||
# Create a new processor from the config (deserialization)
|
||||
new_processor = NormalizerProcessor(
|
||||
features=config["features"],
|
||||
norm_map=config["norm_map"],
|
||||
stats=full_stats,
|
||||
normalize_keys=set(config["normalize_keys"]),
|
||||
eps=config["eps"],
|
||||
)
|
||||
|
||||
# Test that both processors work the same way
|
||||
observation = {
|
||||
"observation.image": torch.tensor([0.7, 0.5, 0.3]),
|
||||
"observation.state": torch.tensor([0.5, 0.0]),
|
||||
}
|
||||
action = torch.tensor([1.0, -0.5])
|
||||
transition = create_transition(
|
||||
observation=observation,
|
||||
action=action,
|
||||
reward=1.0,
|
||||
done=False,
|
||||
truncated=False,
|
||||
info={},
|
||||
complementary_data={},
|
||||
)
|
||||
|
||||
result1 = original_processor(transition)
|
||||
result2 = new_processor(transition)
|
||||
|
||||
# Compare results
|
||||
assert torch.allclose(
|
||||
result1[TransitionKey.OBSERVATION]["observation.image"],
|
||||
result2[TransitionKey.OBSERVATION]["observation.image"],
|
||||
)
|
||||
assert torch.allclose(result1[TransitionKey.ACTION], result2[TransitionKey.ACTION])
|
||||
|
||||
# Verify features and norm_map are correctly reconstructed
|
||||
assert new_processor.features.keys() == original_processor.features.keys()
|
||||
for key in new_processor.features:
|
||||
assert new_processor.features[key].type == original_processor.features[key].type
|
||||
assert new_processor.features[key].shape == original_processor.features[key].shape
|
||||
|
||||
assert new_processor.norm_map == original_processor.norm_map
|
||||
@@ -0,0 +1,501 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType
|
||||
from lerobot.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE
|
||||
from lerobot.processor import (
|
||||
ImageProcessor,
|
||||
StateProcessor,
|
||||
VanillaObservationProcessor,
|
||||
)
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
from tests.conftest import assert_contract_is_typed
|
||||
|
||||
|
||||
def create_transition(
|
||||
observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None
|
||||
):
|
||||
"""Helper to create an EnvTransition dictionary."""
|
||||
return {
|
||||
TransitionKey.OBSERVATION: observation,
|
||||
TransitionKey.ACTION: action,
|
||||
TransitionKey.REWARD: reward,
|
||||
TransitionKey.DONE: done,
|
||||
TransitionKey.TRUNCATED: truncated,
|
||||
TransitionKey.INFO: info,
|
||||
TransitionKey.COMPLEMENTARY_DATA: complementary_data,
|
||||
}
|
||||
|
||||
|
||||
def test_process_single_image():
|
||||
"""Test processing a single image."""
|
||||
processor = ImageProcessor()
|
||||
|
||||
# Create a mock image (H, W, C) format, uint8
|
||||
image = np.random.randint(0, 256, size=(64, 64, 3), dtype=np.uint8)
|
||||
|
||||
observation = {"pixels": image}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that the image was processed correctly
|
||||
assert "observation.image" in processed_obs
|
||||
processed_img = processed_obs["observation.image"]
|
||||
|
||||
# Check shape: should be (1, 3, 64, 64) - batch, channels, height, width
|
||||
assert processed_img.shape == (1, 3, 64, 64)
|
||||
|
||||
# Check dtype and range
|
||||
assert processed_img.dtype == torch.float32
|
||||
assert processed_img.min() >= 0.0
|
||||
assert processed_img.max() <= 1.0
|
||||
|
||||
|
||||
def test_process_image_dict():
|
||||
"""Test processing multiple images in a dictionary."""
|
||||
processor = ImageProcessor()
|
||||
|
||||
# Create mock images
|
||||
image1 = np.random.randint(0, 256, size=(32, 32, 3), dtype=np.uint8)
|
||||
image2 = np.random.randint(0, 256, size=(48, 48, 3), dtype=np.uint8)
|
||||
|
||||
observation = {"pixels": {"camera1": image1, "camera2": image2}}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that both images were processed
|
||||
assert "observation.images.camera1" in processed_obs
|
||||
assert "observation.images.camera2" in processed_obs
|
||||
|
||||
# Check shapes
|
||||
assert processed_obs["observation.images.camera1"].shape == (1, 3, 32, 32)
|
||||
assert processed_obs["observation.images.camera2"].shape == (1, 3, 48, 48)
|
||||
|
||||
|
||||
def test_process_batched_image():
|
||||
"""Test processing already batched images."""
|
||||
processor = ImageProcessor()
|
||||
|
||||
# Create a batched image (B, H, W, C)
|
||||
image = np.random.randint(0, 256, size=(2, 64, 64, 3), dtype=np.uint8)
|
||||
|
||||
observation = {"pixels": image}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that batch dimension is preserved
|
||||
assert processed_obs["observation.image"].shape == (2, 3, 64, 64)
|
||||
|
||||
|
||||
def test_invalid_image_format():
|
||||
"""Test error handling for invalid image formats."""
|
||||
processor = ImageProcessor()
|
||||
|
||||
# Test wrong channel order (channels first)
|
||||
image = np.random.randint(0, 256, size=(3, 64, 64), dtype=np.uint8)
|
||||
observation = {"pixels": image}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
with pytest.raises(ValueError, match="Expected channel-last images"):
|
||||
processor(transition)
|
||||
|
||||
|
||||
def test_invalid_image_dtype():
|
||||
"""Test error handling for invalid image dtype."""
|
||||
processor = ImageProcessor()
|
||||
|
||||
# Test wrong dtype
|
||||
image = np.random.rand(64, 64, 3).astype(np.float32)
|
||||
observation = {"pixels": image}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
with pytest.raises(ValueError, match="Expected torch.uint8 images"):
|
||||
processor(transition)
|
||||
|
||||
|
||||
def test_no_pixels_in_observation():
|
||||
"""Test processor when no pixels are in observation."""
|
||||
processor = ImageProcessor()
|
||||
|
||||
observation = {"other_data": np.array([1, 2, 3])}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Should preserve other data unchanged
|
||||
assert "other_data" in processed_obs
|
||||
np.testing.assert_array_equal(processed_obs["other_data"], np.array([1, 2, 3]))
|
||||
|
||||
|
||||
def test_none_observation():
|
||||
"""Test processor with None observation."""
|
||||
processor = ImageProcessor()
|
||||
|
||||
transition = create_transition()
|
||||
result = processor(transition)
|
||||
|
||||
assert result == transition
|
||||
|
||||
|
||||
def test_serialization_methods():
|
||||
"""Test serialization methods."""
|
||||
processor = ImageProcessor()
|
||||
|
||||
# Test get_config
|
||||
config = processor.get_config()
|
||||
assert isinstance(config, dict)
|
||||
|
||||
# Test state_dict
|
||||
state = processor.state_dict()
|
||||
assert isinstance(state, dict)
|
||||
|
||||
# Test load_state_dict (should not raise)
|
||||
processor.load_state_dict(state)
|
||||
|
||||
# Test reset (should not raise)
|
||||
processor.reset()
|
||||
|
||||
|
||||
def test_process_environment_state():
|
||||
"""Test processing environment_state."""
|
||||
processor = StateProcessor()
|
||||
|
||||
env_state = np.array([1.0, 2.0, 3.0], dtype=np.float32)
|
||||
observation = {"environment_state": env_state}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that environment_state was renamed and processed
|
||||
assert "observation.environment_state" in processed_obs
|
||||
assert "environment_state" not in processed_obs
|
||||
|
||||
processed_state = processed_obs["observation.environment_state"]
|
||||
assert processed_state.shape == (1, 3) # Batch dimension added
|
||||
assert processed_state.dtype == torch.float32
|
||||
torch.testing.assert_close(processed_state, torch.tensor([[1.0, 2.0, 3.0]]))
|
||||
|
||||
|
||||
def test_process_agent_pos():
|
||||
"""Test processing agent_pos."""
|
||||
processor = StateProcessor()
|
||||
|
||||
agent_pos = np.array([0.5, -0.5, 1.0], dtype=np.float32)
|
||||
observation = {"agent_pos": agent_pos}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that agent_pos was renamed and processed
|
||||
assert "observation.state" in processed_obs
|
||||
assert "agent_pos" not in processed_obs
|
||||
|
||||
processed_state = processed_obs["observation.state"]
|
||||
assert processed_state.shape == (1, 3) # Batch dimension added
|
||||
assert processed_state.dtype == torch.float32
|
||||
torch.testing.assert_close(processed_state, torch.tensor([[0.5, -0.5, 1.0]]))
|
||||
|
||||
|
||||
def test_process_batched_states():
|
||||
"""Test processing already batched states."""
|
||||
processor = StateProcessor()
|
||||
|
||||
env_state = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32)
|
||||
agent_pos = np.array([[0.5, -0.5], [1.0, -1.0]], dtype=np.float32)
|
||||
|
||||
observation = {"environment_state": env_state, "agent_pos": agent_pos}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that batch dimensions are preserved
|
||||
assert processed_obs["observation.environment_state"].shape == (2, 2)
|
||||
assert processed_obs["observation.state"].shape == (2, 2)
|
||||
|
||||
|
||||
def test_process_both_states():
|
||||
"""Test processing both environment_state and agent_pos."""
|
||||
processor = StateProcessor()
|
||||
|
||||
env_state = np.array([1.0, 2.0], dtype=np.float32)
|
||||
agent_pos = np.array([0.5, -0.5], dtype=np.float32)
|
||||
|
||||
observation = {"environment_state": env_state, "agent_pos": agent_pos, "other_data": "keep_me"}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that both states were processed
|
||||
assert "observation.environment_state" in processed_obs
|
||||
assert "observation.state" in processed_obs
|
||||
|
||||
# Check that original keys were removed
|
||||
assert "environment_state" not in processed_obs
|
||||
assert "agent_pos" not in processed_obs
|
||||
|
||||
# Check that other data was preserved
|
||||
assert processed_obs["other_data"] == "keep_me"
|
||||
|
||||
|
||||
def test_no_states_in_observation():
|
||||
"""Test processor when no states are in observation."""
|
||||
processor = StateProcessor()
|
||||
|
||||
observation = {"other_data": np.array([1, 2, 3])}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Should preserve data unchanged
|
||||
np.testing.assert_array_equal(processed_obs, observation)
|
||||
|
||||
|
||||
def test_complete_observation_processing():
|
||||
"""Test processing a complete observation with both images and states."""
|
||||
processor = VanillaObservationProcessor()
|
||||
|
||||
# Create mock data
|
||||
image = np.random.randint(0, 256, size=(32, 32, 3), dtype=np.uint8)
|
||||
env_state = np.array([1.0, 2.0, 3.0], dtype=np.float32)
|
||||
agent_pos = np.array([0.5, -0.5, 1.0], dtype=np.float32)
|
||||
|
||||
observation = {
|
||||
"pixels": image,
|
||||
"environment_state": env_state,
|
||||
"agent_pos": agent_pos,
|
||||
"other_data": "preserve_me",
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that image was processed
|
||||
assert "observation.image" in processed_obs
|
||||
assert processed_obs["observation.image"].shape == (1, 3, 32, 32)
|
||||
|
||||
# Check that states were processed
|
||||
assert "observation.environment_state" in processed_obs
|
||||
assert "observation.state" in processed_obs
|
||||
|
||||
# Check that original keys were removed
|
||||
assert "pixels" not in processed_obs
|
||||
assert "environment_state" not in processed_obs
|
||||
assert "agent_pos" not in processed_obs
|
||||
|
||||
# Check that other data was preserved
|
||||
assert processed_obs["other_data"] == "preserve_me"
|
||||
|
||||
|
||||
def test_image_only_processing():
|
||||
"""Test processing observation with only images."""
|
||||
processor = VanillaObservationProcessor()
|
||||
|
||||
image = np.random.randint(0, 256, size=(64, 64, 3), dtype=np.uint8)
|
||||
observation = {"pixels": image}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
assert "observation.image" in processed_obs
|
||||
assert len(processed_obs) == 1
|
||||
|
||||
|
||||
def test_state_only_processing():
|
||||
"""Test processing observation with only states."""
|
||||
processor = VanillaObservationProcessor()
|
||||
|
||||
agent_pos = np.array([1.0, 2.0], dtype=np.float32)
|
||||
observation = {"agent_pos": agent_pos}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
assert "observation.state" in processed_obs
|
||||
assert "agent_pos" not in processed_obs
|
||||
|
||||
|
||||
def test_empty_observation():
|
||||
"""Test processing empty observation."""
|
||||
processor = VanillaObservationProcessor()
|
||||
|
||||
observation = {}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
assert processed_obs == {}
|
||||
|
||||
|
||||
def test_custom_sub_processors():
|
||||
"""Test ObservationProcessor with custom sub-processors."""
|
||||
image_proc = ImageProcessor()
|
||||
state_proc = StateProcessor()
|
||||
processor = VanillaObservationProcessor(image_processor=image_proc, state_processor=state_proc)
|
||||
|
||||
# Should use the provided processors
|
||||
assert processor.image_processor is image_proc
|
||||
assert processor.state_processor is state_proc
|
||||
|
||||
|
||||
def test_equivalent_to_original_function():
|
||||
"""Test that ObservationProcessor produces equivalent results to preprocess_observation."""
|
||||
# Import the original function for comparison
|
||||
from lerobot.envs.utils import preprocess_observation
|
||||
|
||||
processor = VanillaObservationProcessor()
|
||||
|
||||
# Create test data similar to what the original function expects
|
||||
image = np.random.randint(0, 256, size=(64, 64, 3), dtype=np.uint8)
|
||||
env_state = np.array([1.0, 2.0, 3.0], dtype=np.float32)
|
||||
agent_pos = np.array([0.5, -0.5, 1.0], dtype=np.float32)
|
||||
|
||||
observation = {"pixels": image, "environment_state": env_state, "agent_pos": agent_pos}
|
||||
|
||||
# Process with original function
|
||||
original_result = preprocess_observation(observation)
|
||||
|
||||
# Process with new processor
|
||||
transition = create_transition(observation=observation)
|
||||
processor_result = processor(transition)[TransitionKey.OBSERVATION]
|
||||
|
||||
# Compare results
|
||||
assert set(original_result.keys()) == set(processor_result.keys())
|
||||
|
||||
for key in original_result:
|
||||
torch.testing.assert_close(original_result[key], processor_result[key])
|
||||
|
||||
|
||||
def test_equivalent_with_image_dict():
|
||||
"""Test equivalence with dictionary of images."""
|
||||
from lerobot.envs.utils import preprocess_observation
|
||||
|
||||
processor = VanillaObservationProcessor()
|
||||
|
||||
# Create test data with multiple cameras
|
||||
image1 = np.random.randint(0, 256, size=(32, 32, 3), dtype=np.uint8)
|
||||
image2 = np.random.randint(0, 256, size=(48, 48, 3), dtype=np.uint8)
|
||||
agent_pos = np.array([1.0, 2.0], dtype=np.float32)
|
||||
|
||||
observation = {"pixels": {"cam1": image1, "cam2": image2}, "agent_pos": agent_pos}
|
||||
|
||||
# Process with original function
|
||||
original_result = preprocess_observation(observation)
|
||||
|
||||
# Process with new processor
|
||||
transition = create_transition(observation=observation)
|
||||
processor_result = processor(transition)[TransitionKey.OBSERVATION]
|
||||
|
||||
# Compare results
|
||||
assert set(original_result.keys()) == set(processor_result.keys())
|
||||
|
||||
for key in original_result:
|
||||
torch.testing.assert_close(original_result[key], processor_result[key])
|
||||
|
||||
|
||||
def test_image_processor_feature_contract_pixels_to_image(policy_feature_factory):
|
||||
processor = ImageProcessor()
|
||||
features = {
|
||||
"pixels": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
|
||||
"keep": policy_feature_factory(FeatureType.ENV, (1,)),
|
||||
}
|
||||
out = processor.feature_contract(features.copy())
|
||||
|
||||
assert OBS_IMAGE in out and out[OBS_IMAGE] == features["pixels"]
|
||||
assert "pixels" not in out
|
||||
assert out["keep"] == features["keep"]
|
||||
assert_contract_is_typed(out)
|
||||
|
||||
|
||||
def test_image_processor_feature_contract_observation_pixels_to_image(policy_feature_factory):
|
||||
processor = ImageProcessor()
|
||||
features = {
|
||||
"observation.pixels": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
|
||||
"keep": policy_feature_factory(FeatureType.ENV, (1,)),
|
||||
}
|
||||
out = processor.feature_contract(features.copy())
|
||||
|
||||
assert OBS_IMAGE in out and out[OBS_IMAGE] == features["observation.pixels"]
|
||||
assert "observation.pixels" not in out
|
||||
assert out["keep"] == features["keep"]
|
||||
assert_contract_is_typed(out)
|
||||
|
||||
|
||||
def test_image_processor_feature_contract_multi_camera_and_prefixed(policy_feature_factory):
|
||||
processor = ImageProcessor()
|
||||
features = {
|
||||
"pixels.front": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
|
||||
"pixels.wrist": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
|
||||
"observation.pixels.rear": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
|
||||
"keep": policy_feature_factory(FeatureType.ENV, (7,)),
|
||||
}
|
||||
out = processor.feature_contract(features.copy())
|
||||
|
||||
assert f"{OBS_IMAGES}.front" in out and out[f"{OBS_IMAGES}.front"] == features["pixels.front"]
|
||||
assert f"{OBS_IMAGES}.wrist" in out and out[f"{OBS_IMAGES}.wrist"] == features["pixels.wrist"]
|
||||
assert f"{OBS_IMAGES}.rear" in out and out[f"{OBS_IMAGES}.rear"] == features["observation.pixels.rear"]
|
||||
assert "pixels.front" not in out and "pixels.wrist" not in out and "observation.pixels.rear" not in out
|
||||
assert out["keep"] == features["keep"]
|
||||
assert_contract_is_typed(out)
|
||||
|
||||
|
||||
def test_state_processor_feature_contract_environment_and_agent_pos(policy_feature_factory):
|
||||
processor = StateProcessor()
|
||||
features = {
|
||||
"environment_state": policy_feature_factory(FeatureType.STATE, (3,)),
|
||||
"agent_pos": policy_feature_factory(FeatureType.STATE, (7,)),
|
||||
"keep": policy_feature_factory(FeatureType.ENV, (1,)),
|
||||
}
|
||||
out = processor.feature_contract(features.copy())
|
||||
|
||||
assert OBS_ENV_STATE in out and out[OBS_ENV_STATE] == features["environment_state"]
|
||||
assert OBS_STATE in out and out[OBS_STATE] == features["agent_pos"]
|
||||
assert "environment_state" not in out and "agent_pos" not in out
|
||||
assert out["keep"] == features["keep"]
|
||||
assert_contract_is_typed(out)
|
||||
|
||||
|
||||
def test_state_processor_feature_contract_prefixed_inputs(policy_feature_factory):
|
||||
proc = StateProcessor()
|
||||
features = {
|
||||
"observation.environment_state": policy_feature_factory(FeatureType.STATE, (2,)),
|
||||
"observation.agent_pos": policy_feature_factory(FeatureType.STATE, (4,)),
|
||||
}
|
||||
out = proc.feature_contract(features.copy())
|
||||
|
||||
assert OBS_ENV_STATE in out and out[OBS_ENV_STATE] == features["observation.environment_state"]
|
||||
assert OBS_STATE in out and out[OBS_STATE] == features["observation.agent_pos"]
|
||||
assert "environment_state" not in out and "agent_pos" not in out
|
||||
assert_contract_is_typed(out)
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,467 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType
|
||||
from lerobot.processor import ProcessorStepRegistry, RenameProcessor, RobotProcessor, TransitionKey
|
||||
from tests.conftest import assert_contract_is_typed
|
||||
|
||||
|
||||
def create_transition(
|
||||
observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None
|
||||
):
|
||||
"""Helper to create an EnvTransition dictionary."""
|
||||
return {
|
||||
TransitionKey.OBSERVATION: observation,
|
||||
TransitionKey.ACTION: action,
|
||||
TransitionKey.REWARD: reward,
|
||||
TransitionKey.DONE: done,
|
||||
TransitionKey.TRUNCATED: truncated,
|
||||
TransitionKey.INFO: info,
|
||||
TransitionKey.COMPLEMENTARY_DATA: complementary_data,
|
||||
}
|
||||
|
||||
|
||||
def test_basic_renaming():
|
||||
"""Test basic key renaming functionality."""
|
||||
rename_map = {
|
||||
"old_key1": "new_key1",
|
||||
"old_key2": "new_key2",
|
||||
}
|
||||
processor = RenameProcessor(rename_map=rename_map)
|
||||
|
||||
observation = {
|
||||
"old_key1": torch.tensor([1.0, 2.0]),
|
||||
"old_key2": np.array([3.0, 4.0]),
|
||||
"unchanged_key": "keep_me",
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check renamed keys
|
||||
assert "new_key1" in processed_obs
|
||||
assert "new_key2" in processed_obs
|
||||
assert "old_key1" not in processed_obs
|
||||
assert "old_key2" not in processed_obs
|
||||
|
||||
# Check values are preserved
|
||||
torch.testing.assert_close(processed_obs["new_key1"], torch.tensor([1.0, 2.0]))
|
||||
np.testing.assert_array_equal(processed_obs["new_key2"], np.array([3.0, 4.0]))
|
||||
|
||||
# Check unchanged key is preserved
|
||||
assert processed_obs["unchanged_key"] == "keep_me"
|
||||
|
||||
|
||||
def test_empty_rename_map():
|
||||
"""Test processor with empty rename map (should pass through unchanged)."""
|
||||
processor = RenameProcessor(rename_map={})
|
||||
|
||||
observation = {
|
||||
"key1": torch.tensor([1.0]),
|
||||
"key2": "value2",
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# All keys should be unchanged
|
||||
assert processed_obs.keys() == observation.keys()
|
||||
torch.testing.assert_close(processed_obs["key1"], observation["key1"])
|
||||
assert processed_obs["key2"] == observation["key2"]
|
||||
|
||||
|
||||
def test_none_observation():
|
||||
"""Test processor with None observation."""
|
||||
processor = RenameProcessor(rename_map={"old": "new"})
|
||||
|
||||
transition = create_transition()
|
||||
result = processor(transition)
|
||||
|
||||
# Should return transition unchanged
|
||||
assert result == transition
|
||||
|
||||
|
||||
def test_overlapping_rename():
|
||||
"""Test renaming when new names might conflict."""
|
||||
rename_map = {
|
||||
"a": "b",
|
||||
"b": "c", # This creates a potential conflict
|
||||
}
|
||||
processor = RenameProcessor(rename_map=rename_map)
|
||||
|
||||
observation = {
|
||||
"a": 1,
|
||||
"b": 2,
|
||||
"x": 3,
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that renaming happens correctly
|
||||
assert "a" not in processed_obs
|
||||
assert processed_obs["b"] == 1 # 'a' renamed to 'b'
|
||||
assert processed_obs["c"] == 2 # original 'b' renamed to 'c'
|
||||
assert processed_obs["x"] == 3
|
||||
|
||||
|
||||
def test_partial_rename():
|
||||
"""Test renaming only some keys."""
|
||||
rename_map = {
|
||||
"observation.state": "observation.proprio_state",
|
||||
"pixels": "observation.image",
|
||||
}
|
||||
processor = RenameProcessor(rename_map=rename_map)
|
||||
|
||||
observation = {
|
||||
"observation.state": torch.randn(10),
|
||||
"pixels": np.random.randint(0, 256, (64, 64, 3), dtype=np.uint8),
|
||||
"reward": 1.0,
|
||||
"info": {"episode": 1},
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check renamed keys
|
||||
assert "observation.proprio_state" in processed_obs
|
||||
assert "observation.image" in processed_obs
|
||||
assert "observation.state" not in processed_obs
|
||||
assert "pixels" not in processed_obs
|
||||
|
||||
# Check unchanged keys
|
||||
assert processed_obs["reward"] == 1.0
|
||||
assert processed_obs["info"] == {"episode": 1}
|
||||
|
||||
|
||||
def test_get_config():
|
||||
"""Test configuration serialization."""
|
||||
rename_map = {
|
||||
"old1": "new1",
|
||||
"old2": "new2",
|
||||
}
|
||||
processor = RenameProcessor(rename_map=rename_map)
|
||||
|
||||
config = processor.get_config()
|
||||
assert config == {"rename_map": rename_map}
|
||||
|
||||
|
||||
def test_state_dict():
|
||||
"""Test state dict (should be empty for RenameProcessor)."""
|
||||
processor = RenameProcessor(rename_map={"old": "new"})
|
||||
|
||||
state = processor.state_dict()
|
||||
assert state == {}
|
||||
|
||||
# Load state dict should work even with empty dict
|
||||
processor.load_state_dict({})
|
||||
|
||||
|
||||
def test_integration_with_robot_processor():
|
||||
"""Test integration with RobotProcessor pipeline."""
|
||||
rename_map = {
|
||||
"agent_pos": "observation.state",
|
||||
"pixels": "observation.image",
|
||||
}
|
||||
rename_processor = RenameProcessor(rename_map=rename_map)
|
||||
|
||||
pipeline = RobotProcessor([rename_processor])
|
||||
|
||||
observation = {
|
||||
"agent_pos": np.array([1.0, 2.0, 3.0]),
|
||||
"pixels": np.zeros((32, 32, 3), dtype=np.uint8),
|
||||
"other_data": "preserve_me",
|
||||
}
|
||||
transition = create_transition(
|
||||
observation=observation, reward=0.5, done=False, truncated=False, info={}, complementary_data={}
|
||||
)
|
||||
|
||||
result = pipeline(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check renaming worked through pipeline
|
||||
assert "observation.state" in processed_obs
|
||||
assert "observation.image" in processed_obs
|
||||
assert "agent_pos" not in processed_obs
|
||||
assert "pixels" not in processed_obs
|
||||
assert processed_obs["other_data"] == "preserve_me"
|
||||
|
||||
# Check other transition elements unchanged
|
||||
assert result[TransitionKey.REWARD] == 0.5
|
||||
assert result[TransitionKey.DONE] is False
|
||||
|
||||
|
||||
def test_save_and_load_pretrained():
|
||||
"""Test saving and loading processor with RobotProcessor."""
|
||||
rename_map = {
|
||||
"old_state": "observation.state",
|
||||
"old_image": "observation.image",
|
||||
}
|
||||
processor = RenameProcessor(rename_map=rename_map)
|
||||
pipeline = RobotProcessor([processor], name="TestRenameProcessor")
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
# Save pipeline
|
||||
pipeline.save_pretrained(tmp_dir)
|
||||
|
||||
# Check files were created
|
||||
config_path = Path(tmp_dir) / "testrenameprocessor.json" # Based on name="TestRenameProcessor"
|
||||
assert config_path.exists()
|
||||
|
||||
# No state files should be created for RenameProcessor
|
||||
state_files = list(Path(tmp_dir).glob("*.safetensors"))
|
||||
assert len(state_files) == 0
|
||||
|
||||
# Load pipeline
|
||||
loaded_pipeline = RobotProcessor.from_pretrained(tmp_dir)
|
||||
|
||||
assert loaded_pipeline.name == "TestRenameProcessor"
|
||||
assert len(loaded_pipeline) == 1
|
||||
|
||||
# Check that loaded processor works correctly
|
||||
loaded_processor = loaded_pipeline.steps[0]
|
||||
assert isinstance(loaded_processor, RenameProcessor)
|
||||
assert loaded_processor.rename_map == rename_map
|
||||
|
||||
# Test functionality after loading
|
||||
observation = {"old_state": [1, 2, 3], "old_image": "image_data"}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = loaded_pipeline(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
assert "observation.state" in processed_obs
|
||||
assert "observation.image" in processed_obs
|
||||
assert processed_obs["observation.state"] == [1, 2, 3]
|
||||
assert processed_obs["observation.image"] == "image_data"
|
||||
|
||||
|
||||
def test_registry_functionality():
|
||||
"""Test that RenameProcessor is properly registered."""
|
||||
# Check that it's registered
|
||||
assert "rename_processor" in ProcessorStepRegistry.list()
|
||||
|
||||
# Get from registry
|
||||
retrieved_class = ProcessorStepRegistry.get("rename_processor")
|
||||
assert retrieved_class is RenameProcessor
|
||||
|
||||
# Create instance from registry
|
||||
instance = retrieved_class(rename_map={"old": "new"})
|
||||
assert isinstance(instance, RenameProcessor)
|
||||
assert instance.rename_map == {"old": "new"}
|
||||
|
||||
|
||||
def test_registry_based_save_load():
|
||||
"""Test save/load using registry name instead of module path."""
|
||||
processor = RenameProcessor(rename_map={"key1": "renamed_key1"})
|
||||
pipeline = RobotProcessor([processor])
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
# Save and load
|
||||
pipeline.save_pretrained(tmp_dir)
|
||||
|
||||
# Verify config uses registry name
|
||||
import json
|
||||
|
||||
with open(Path(tmp_dir) / "robotprocessor.json") as f: # Default name is "RobotProcessor"
|
||||
config = json.load(f)
|
||||
|
||||
assert "registry_name" in config["steps"][0]
|
||||
assert config["steps"][0]["registry_name"] == "rename_processor"
|
||||
assert "class" not in config["steps"][0] # Should use registry, not module path
|
||||
|
||||
# Load should work
|
||||
loaded_pipeline = RobotProcessor.from_pretrained(tmp_dir)
|
||||
loaded_processor = loaded_pipeline.steps[0]
|
||||
assert isinstance(loaded_processor, RenameProcessor)
|
||||
assert loaded_processor.rename_map == {"key1": "renamed_key1"}
|
||||
|
||||
|
||||
def test_chained_rename_processors():
|
||||
"""Test multiple RenameProcessors in a pipeline."""
|
||||
# First processor: rename raw keys to intermediate format
|
||||
processor1 = RenameProcessor(
|
||||
rename_map={
|
||||
"pos": "agent_position",
|
||||
"img": "camera_image",
|
||||
}
|
||||
)
|
||||
|
||||
# Second processor: rename to final format
|
||||
processor2 = RenameProcessor(
|
||||
rename_map={
|
||||
"agent_position": "observation.state",
|
||||
"camera_image": "observation.image",
|
||||
}
|
||||
)
|
||||
|
||||
pipeline = RobotProcessor([processor1, processor2])
|
||||
|
||||
observation = {
|
||||
"pos": np.array([1.0, 2.0]),
|
||||
"img": "image_data",
|
||||
"extra": "keep_me",
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
# Step through to see intermediate results
|
||||
results = list(pipeline.step_through(transition))
|
||||
|
||||
# After first processor
|
||||
assert "agent_position" in results[1][TransitionKey.OBSERVATION]
|
||||
assert "camera_image" in results[1][TransitionKey.OBSERVATION]
|
||||
|
||||
# After second processor
|
||||
final_obs = results[2][TransitionKey.OBSERVATION]
|
||||
assert "observation.state" in final_obs
|
||||
assert "observation.image" in final_obs
|
||||
assert final_obs["extra"] == "keep_me"
|
||||
|
||||
# Original keys should be gone
|
||||
assert "pos" not in final_obs
|
||||
assert "img" not in final_obs
|
||||
assert "agent_position" not in final_obs
|
||||
assert "camera_image" not in final_obs
|
||||
|
||||
|
||||
def test_nested_observation_rename():
|
||||
"""Test renaming with nested observation structures."""
|
||||
rename_map = {
|
||||
"observation.images.left": "observation.camera.left_view",
|
||||
"observation.images.right": "observation.camera.right_view",
|
||||
"observation.proprio": "observation.proprioception",
|
||||
}
|
||||
processor = RenameProcessor(rename_map=rename_map)
|
||||
|
||||
observation = {
|
||||
"observation.images.left": torch.randn(3, 64, 64),
|
||||
"observation.images.right": torch.randn(3, 64, 64),
|
||||
"observation.proprio": torch.randn(7),
|
||||
"observation.gripper": torch.tensor([0.0]), # Not renamed
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check renames
|
||||
assert "observation.camera.left_view" in processed_obs
|
||||
assert "observation.camera.right_view" in processed_obs
|
||||
assert "observation.proprioception" in processed_obs
|
||||
|
||||
# Check unchanged key
|
||||
assert "observation.gripper" in processed_obs
|
||||
|
||||
# Check old keys removed
|
||||
assert "observation.images.left" not in processed_obs
|
||||
assert "observation.images.right" not in processed_obs
|
||||
assert "observation.proprio" not in processed_obs
|
||||
|
||||
|
||||
def test_value_types_preserved():
|
||||
"""Test that various value types are preserved during renaming."""
|
||||
rename_map = {"old_tensor": "new_tensor", "old_array": "new_array", "old_scalar": "new_scalar"}
|
||||
processor = RenameProcessor(rename_map=rename_map)
|
||||
|
||||
tensor_value = torch.randn(3, 3)
|
||||
array_value = np.random.rand(2, 2)
|
||||
|
||||
observation = {
|
||||
"old_tensor": tensor_value,
|
||||
"old_array": array_value,
|
||||
"old_scalar": 42,
|
||||
"old_string": "hello",
|
||||
"old_dict": {"nested": "value"},
|
||||
"old_list": [1, 2, 3],
|
||||
}
|
||||
transition = create_transition(observation=observation)
|
||||
|
||||
result = processor(transition)
|
||||
processed_obs = result[TransitionKey.OBSERVATION]
|
||||
|
||||
# Check that values and types are preserved
|
||||
assert torch.equal(processed_obs["new_tensor"], tensor_value)
|
||||
assert np.array_equal(processed_obs["new_array"], array_value)
|
||||
assert processed_obs["new_scalar"] == 42
|
||||
assert processed_obs["old_string"] == "hello"
|
||||
assert processed_obs["old_dict"] == {"nested": "value"}
|
||||
assert processed_obs["old_list"] == [1, 2, 3]
|
||||
|
||||
|
||||
def test_feature_contract_basic_renaming(policy_feature_factory):
|
||||
processor = RenameProcessor(rename_map={"a": "x", "b": "y"})
|
||||
features = {
|
||||
"a": policy_feature_factory(FeatureType.STATE, (2,)),
|
||||
"b": policy_feature_factory(FeatureType.ACTION, (3,)),
|
||||
"c": policy_feature_factory(FeatureType.ENV, (1,)),
|
||||
}
|
||||
|
||||
out = processor.feature_contract(features.copy())
|
||||
|
||||
# Values preserved and typed
|
||||
assert out["x"] == features["a"]
|
||||
assert out["y"] == features["b"]
|
||||
assert out["c"] == features["c"]
|
||||
|
||||
assert_contract_is_typed(out)
|
||||
# Input not mutated
|
||||
assert set(features) == {"a", "b", "c"}
|
||||
|
||||
|
||||
def test_feature_contract_overlapping_keys(policy_feature_factory):
|
||||
# Overlapping renames: both 'a' and 'b' exist. 'a'->'b', 'b'->'c'
|
||||
processor = RenameProcessor(rename_map={"a": "b", "b": "c"})
|
||||
features = {
|
||||
"a": policy_feature_factory(FeatureType.STATE, (1,)),
|
||||
"b": policy_feature_factory(FeatureType.STATE, (2,)),
|
||||
}
|
||||
out = processor.feature_contract(features)
|
||||
|
||||
assert set(out) == {"b", "c"}
|
||||
assert out["b"] == features["a"] # 'a' renamed to'b'
|
||||
assert out["c"] == features["b"] # 'b' renamed to 'c'
|
||||
assert_contract_is_typed(out)
|
||||
|
||||
|
||||
def test_feature_contract_chained_processors(policy_feature_factory):
|
||||
# Chain two rename processors at the contract level
|
||||
processor1 = RenameProcessor(rename_map={"pos": "agent_position", "img": "camera_image"})
|
||||
processor2 = RenameProcessor(
|
||||
rename_map={"agent_position": "observation.state", "camera_image": "observation.image"}
|
||||
)
|
||||
pipeline = RobotProcessor([processor1, processor2])
|
||||
|
||||
spec = {
|
||||
"pos": policy_feature_factory(FeatureType.STATE, (7,)),
|
||||
"img": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)),
|
||||
"extra": policy_feature_factory(FeatureType.ENV, (1,)),
|
||||
}
|
||||
out = pipeline.feature_contract(initial_features=spec)
|
||||
|
||||
assert set(out) == {"observation.state", "observation.image", "extra"}
|
||||
assert out["observation.state"] == spec["pos"]
|
||||
assert out["observation.image"] == spec["img"]
|
||||
assert out["extra"] == spec["extra"]
|
||||
assert_contract_is_typed(out)
|
||||
Reference in New Issue
Block a user