diff --git a/examples/phone_to_so100/evaluate.py b/examples/phone_to_so100/evaluate.py index 9e6ef7a57..fc25a0acd 100644 --- a/examples/phone_to_so100/evaluate.py +++ b/examples/phone_to_so100/evaluate.py @@ -16,7 +16,7 @@ from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features +from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features from lerobot.datasets.utils import combine_feature_dicts from lerobot.model.kinematics import RobotKinematics from lerobot.policies.act.modeling_act import ACTPolicy @@ -25,7 +25,7 @@ from lerobot.processor import RobotProcessorPipeline from lerobot.processor.converters import ( identity_transition, observation_to_transition, - transition_to_robot_action, + transition_to_action, ) from lerobot.record import record_loop from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig @@ -76,7 +76,7 @@ robot_ee_to_joints_processor = RobotProcessorPipeline( ), ], to_transition=identity_transition, - to_output=transition_to_robot_action, + to_output=transition_to_action, ) # Build pipeline to convert joint observation to ee pose observation @@ -91,7 +91,7 @@ robot_joints_to_ee_pose_processor = RobotProcessorPipeline( # Build dataset action and gripper features action_ee_and_gripper = aggregate_pipeline_dataset_features( pipeline=robot_ee_to_joints_processor, - initial_features={}, + initial_features=create_initial_features(), use_videos=True, patterns=["action.ee", "action.gripper.pos", "observation.state.gripper.pos"], ) # Get all ee action features + gripper pos action features @@ -99,7 +99,7 @@ action_ee_and_gripper = aggregate_pipeline_dataset_features( # Build dataset observation features obs_ee = aggregate_pipeline_dataset_features( pipeline=robot_joints_to_ee_pose_processor, - initial_features=robot.observation_features, + initial_features=create_initial_features(observation=robot.observation_features), use_videos=True, patterns=["observation.state.ee"], ) # Get all ee observation features diff --git a/examples/phone_to_so100/record.py b/examples/phone_to_so100/record.py index e2bd5541a..f25835f96 100644 --- a/examples/phone_to_so100/record.py +++ b/examples/phone_to_so100/record.py @@ -17,7 +17,7 @@ from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features +from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features from lerobot.datasets.utils import combine_feature_dicts from lerobot.model.kinematics import RobotKinematics from lerobot.processor import RobotProcessorPipeline @@ -25,7 +25,7 @@ from lerobot.processor.converters import ( action_to_transition, identity_transition, observation_to_transition, - transition_to_robot_action, + transition_to_action, ) from lerobot.record import record_loop from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig @@ -107,7 +107,7 @@ robot_ee_to_joints_processor = RobotProcessorPipeline( ), ], to_transition=identity_transition, - to_output=transition_to_robot_action, + to_output=transition_to_action, ) # Build pipeline to convert joint observation to ee pose observation @@ -122,7 +122,7 @@ robot_joints_to_ee_pose = RobotProcessorPipeline( # Build dataset ee action features action_ee = aggregate_pipeline_dataset_features( pipeline=phone_to_robot_ee_pose_processor, - initial_features=phone.action_features, + initial_features=create_initial_features(action=phone.action_features), use_videos=True, patterns=["action.ee"], ) @@ -130,7 +130,7 @@ action_ee = aggregate_pipeline_dataset_features( # Get gripper pos action features gripper = aggregate_pipeline_dataset_features( pipeline=robot_ee_to_joints_processor, - initial_features={}, + initial_features=create_initial_features(), use_videos=True, patterns=["action.gripper.pos", "observation.state.gripper.pos"], ) @@ -138,7 +138,7 @@ gripper = aggregate_pipeline_dataset_features( # Build dataset ee observation features observation_ee = aggregate_pipeline_dataset_features( pipeline=robot_joints_to_ee_pose, - initial_features=robot.observation_features, + initial_features=create_initial_features(observation=robot.observation_features), use_videos=True, patterns=["observation.state.ee"], ) diff --git a/examples/phone_to_so100/replay.py b/examples/phone_to_so100/replay.py index cb56005cb..ffeaa7c2b 100644 --- a/examples/phone_to_so100/replay.py +++ b/examples/phone_to_so100/replay.py @@ -20,7 +20,7 @@ import time from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.model.kinematics import RobotKinematics from lerobot.processor import RobotProcessorPipeline -from lerobot.processor.converters import action_to_transition, transition_to_robot_action +from lerobot.processor.converters import action_to_transition, transition_to_action from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig from lerobot.robots.so100_follower.robot_kinematic_processor import ( AddRobotObservationAsComplimentaryData, @@ -60,7 +60,7 @@ robot_ee_to_joints_processor = RobotProcessorPipeline( ), ], to_transition=action_to_transition, - to_output=transition_to_robot_action, + to_output=transition_to_action, ) robot_ee_to_joints_processor.reset() diff --git a/examples/phone_to_so100/teleoperate.py b/examples/phone_to_so100/teleoperate.py index 6a4f76e74..5be126c32 100644 --- a/examples/phone_to_so100/teleoperate.py +++ b/examples/phone_to_so100/teleoperate.py @@ -17,7 +17,7 @@ import time from lerobot.model.kinematics import RobotKinematics from lerobot.processor import RobotProcessorPipeline -from lerobot.processor.converters import action_to_transition, transition_to_robot_action +from lerobot.processor.converters import action_to_transition, transition_to_action from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig from lerobot.robots.so100_follower.robot_kinematic_processor import ( AddRobotObservationAsComplimentaryData, @@ -73,7 +73,7 @@ phone_to_robot_joints_processor = RobotProcessorPipeline( ), ], to_transition=action_to_transition, - to_output=transition_to_robot_action, + to_output=transition_to_action, ) robot.connect() diff --git a/src/lerobot/configs/types.py b/src/lerobot/configs/types.py index 322a7ea9b..e02527840 100644 --- a/src/lerobot/configs/types.py +++ b/src/lerobot/configs/types.py @@ -27,6 +27,11 @@ class FeatureType(str, Enum): LANGUAGE = "LANGUAGE" +class PipelineFeatureType(str, Enum): + ACTION = "ACTION" + OBSERVATION = "OBSERVATION" + + class NormalizationMode(str, Enum): MIN_MAX = "MIN_MAX" MEAN_STD = "MEAN_STD" diff --git a/src/lerobot/datasets/pipeline_features.py b/src/lerobot/datasets/pipeline_features.py index 6b568e893..8918b660d 100644 --- a/src/lerobot/datasets/pipeline_features.py +++ b/src/lerobot/datasets/pipeline_features.py @@ -12,100 +12,130 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re from collections.abc import Sequence from typing import Any +from lerobot.configs.types import PipelineFeatureType from lerobot.constants import ACTION, OBS_IMAGES, OBS_STATE from lerobot.datasets.utils import hw_to_dataset_features from lerobot.processor import DataProcessorPipeline +def create_initial_features( + action: dict[str, Any] | None, observation: dict[str, Any] | None +) -> dict[PipelineFeatureType, dict[str, Any]]: + """ + Creates the initial features dict for the dataset from action and observation specs. + + Args: + action: A dictionary of action feature names to their types/shapes. + observation: A dictionary of observation feature names to their types/shapes. + + Returns: + The initial features dictionary structured by PipelineFeatureType. + """ + features = {PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: {}} + if action: + features[PipelineFeatureType.ACTION] = action + if observation: + features[PipelineFeatureType.OBSERVATION] = observation + return features + + +# Helper to filter state/action keys based on regex patterns. +def should_keep(key: str, patterns: tuple[str]) -> bool: + if patterns is None: + return True + return any(re.search(pat, key) for pat in patterns) + + +def strip_prefix(key: str, prefixes_to_strip: tuple[str]) -> str: + for prefix in prefixes_to_strip: + if key.startswith(prefix): + return key[len(prefix) :] + return key + + +# Define prefixes to strip from feature keys for clean names. +# Handles both fully qualified (e.g., "action.state") and short (e.g., "state") forms. +PREFIXES_TO_STRIP = tuple( + f"{token}." for const in (ACTION, OBS_STATE, OBS_IMAGES) for token in (const, const.split(".")[-1]) +) + + def aggregate_pipeline_dataset_features( pipeline: DataProcessorPipeline, - initial_features: dict[str, Any], + initial_features: dict[PipelineFeatureType, dict[str, Any]], *, use_videos: bool = True, patterns: Sequence[str] | None = None, ) -> dict[str, dict]: - """Aggregates and filters dataset features based on a data processing pipeline. + """ + Aggregates and filters pipeline features to create a dataset-ready features dictionary. - This function determines the final structure of dataset features after applying a series - of processing steps defined in a pipeline. It starts with an initial set of hardware - features (e.g., camera image shapes), transforms them using the pipeline, and then - filters the results. - - Image features are controlled by the `use_videos` flag, while action and state features - can be selectively included by matching their keys against the provided regex `patterns`. - The final output is formatted to be compatible with Hugging Face Datasets feature dictionaries. + This function transforms initial features using the pipeline, categorizes them as action or observations + (image or state), filters them based on `use_videos` and `patterns`, and finally + formats them for use with a Hugging Face LeRobot Dataset. Args: - pipeline (DataProcessorPipeline): The data processing pipeline that defines all - feature transformations. - initial_features (dict[str, Any]): A dictionary of initial hardware features, where - keys are feature names and values are their shapes or types (e.g., camera resolutions). - use_videos (bool): If `True`, includes image/video features in the output. Defaults to `True`. - patterns (Sequence[str] | None): An optional sequence of regular expression patterns. - Only action and state keys that match at least one pattern will be included. If `None`, - all action and state keys are kept. Defaults to `None`. + pipeline: The DataProcessorPipeline to apply. + initial_features: A dictionary of raw feature specs for actions and observations. + use_videos: If False, image features are excluded. + patterns: A sequence of regex patterns to filter action and state features. + Image features are not affected by this filter. Returns: - dict[str, dict]: A dictionary representing the final dataset features, structured for - use with `datasets.Features`. + A dictionary of features formatted for a Hugging Face LeRobot Dataset. """ - import re - - # Gather everything the pipeline features specifies, seeded with hardware cams: all_features = pipeline.transform_features(initial_features) - # Helper to decide which action/state keys survive the `patterns` filter: - def keep(key: str) -> bool: - if patterns is None: - return True - return any(re.search(pat, key) for pat in patterns) + # Intermediate storage for categorized and filtered features. + processed_features: dict[str, dict[str, Any]] = { + "action": {}, + "observation": {}, + } + images_token = OBS_IMAGES.split(".")[-1] - # Start with hardware dict, injecting initial cameras if videos are ON: - hw: dict[str, dict[str, Any]] = {} - if use_videos: - cams = { - name: shape - for name, shape in initial_features.items() - if isinstance(shape, tuple) and len(shape) == 3 - } - if cams: - hw["observation"] = dict(cams) - - # Go over every feature from the pipeline and merge: - for full_key, ty in all_features.items(): - if full_key.startswith(f"{ACTION}."): - # action. - if not keep(full_key): - continue - name = full_key[len(f"{ACTION}.") :] - hw.setdefault(ACTION, {})[name] = ty - - elif full_key.startswith(f"{OBS_STATE}."): - # observation.state. - if not keep(full_key): - continue - name = full_key[len(f"{OBS_STATE}.") :] - hw.setdefault("observation", {})[name] = ty - - elif full_key.startswith(f"{OBS_IMAGES}."): - # observation.images. - # images obey ONLY the use_videos flag, not patterns - if not use_videos: - continue - name = full_key[len(f"{OBS_IMAGES}.") :] - hw.setdefault("observation", {})[name] = ty - - else: - # anything else (e.g. policy-only features) is ignored here + # Iterate through all features transformed by the pipeline. + for ptype, feats in all_features.items(): + if ptype not in [PipelineFeatureType.ACTION, PipelineFeatureType.OBSERVATION]: continue - out: dict[str, dict] = {} - if ACTION in hw: - out.update(hw_to_dataset_features(hw[ACTION], ACTION, use_videos)) - if "observation" in hw: - out.update(hw_to_dataset_features(hw["observation"], "observation", use_videos)) + for key, value in feats.items(): + # 1. Categorize the feature. + is_action = ptype == PipelineFeatureType.ACTION + # Observations are classified as images if their key matches image-related tokens or if the shape of the feature is 3. + # All other observations are treated as state. + is_image = not is_action and ( + (isinstance(value, tuple) and len(value) == 3) + or ( + key.startswith(f"{OBS_IMAGES}.") + or key.startswith(f"{images_token}.") + or f".{images_token}." in key + ) + ) - return out + # 2. Apply filtering rules. + if is_image and not use_videos: + continue + if not is_image and not should_keep(key, patterns): + continue + + # 3. Add the feature to the appropriate group with a clean name. + name = strip_prefix(key, PREFIXES_TO_STRIP) + if is_action: + processed_features["action"][name] = value + else: + processed_features["observation"][name] = value + + # Convert the processed features into the final dataset format. + dataset_features = {} + if processed_features["action"]: + dataset_features.update(hw_to_dataset_features(processed_features["action"], ACTION, use_videos)) + if processed_features["observation"]: + dataset_features.update( + hw_to_dataset_features(processed_features["observation"], "observation", use_videos) + ) + + return dataset_features diff --git a/src/lerobot/policies/act/processor_act.py b/src/lerobot/policies/act/processor_act.py index 2e17c9a89..76f362464 100644 --- a/src/lerobot/policies/act/processor_act.py +++ b/src/lerobot/policies/act/processor_act.py @@ -23,7 +23,7 @@ from lerobot.processor import ( NormalizerProcessorStep, PolicyProcessorPipeline, ProcessorKwargs, - RenameProcessorStep, + RenameObservationsProcessorStep, UnnormalizerProcessorStep, ) @@ -58,7 +58,7 @@ def make_act_pre_post_processors( postprocessor_kwargs = {} input_steps = [ - RenameProcessorStep(rename_map={}), + RenameObservationsProcessorStep(rename_map={}), AddBatchDimensionProcessorStep(), DeviceProcessorStep(device=config.device), NormalizerProcessorStep( diff --git a/src/lerobot/policies/diffusion/processor_diffusion.py b/src/lerobot/policies/diffusion/processor_diffusion.py index 3c73e7bc1..324ffea42 100644 --- a/src/lerobot/policies/diffusion/processor_diffusion.py +++ b/src/lerobot/policies/diffusion/processor_diffusion.py @@ -24,7 +24,7 @@ from lerobot.processor import ( NormalizerProcessorStep, PolicyProcessorPipeline, ProcessorKwargs, - RenameProcessorStep, + RenameObservationsProcessorStep, UnnormalizerProcessorStep, ) @@ -67,7 +67,7 @@ def make_diffusion_pre_post_processors( postprocessor_kwargs = {} input_steps = [ - RenameProcessorStep(rename_map={}), + RenameObservationsProcessorStep(rename_map={}), AddBatchDimensionProcessorStep(), DeviceProcessorStep(device=config.device), NormalizerProcessorStep( diff --git a/src/lerobot/policies/pi0/processor_pi0.py b/src/lerobot/policies/pi0/processor_pi0.py index 766b7d7f9..417105208 100644 --- a/src/lerobot/policies/pi0/processor_pi0.py +++ b/src/lerobot/policies/pi0/processor_pi0.py @@ -17,7 +17,7 @@ import torch -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.constants import POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME from lerobot.policies.pi0.configuration_pi0 import PI0Config from lerobot.processor import ( @@ -29,7 +29,7 @@ from lerobot.processor import ( ProcessorKwargs, ProcessorStep, ProcessorStepRegistry, - RenameProcessorStep, + RenameObservationsProcessorStep, TokenizerProcessorStep, UnnormalizerProcessorStep, ) @@ -77,7 +77,9 @@ class Pi0NewLineProcessor(ComplementaryDataProcessorStep): return new_complementary_data - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ This step does not alter the feature definitions. @@ -127,7 +129,7 @@ def make_pi0_pre_post_processors( # Add remaining processors input_steps: list[ProcessorStep] = [ - RenameProcessorStep(rename_map={}), # To mimic the same processor as pretrained one + RenameObservationsProcessorStep(rename_map={}), # To mimic the same processor as pretrained one AddBatchDimensionProcessorStep(), Pi0NewLineProcessor(), # Add newlines before tokenization for PaliGemma TokenizerProcessorStep( diff --git a/src/lerobot/policies/pi0fast/processor_pi0fast.py b/src/lerobot/policies/pi0fast/processor_pi0fast.py index 62d255686..e6ed9a4d2 100644 --- a/src/lerobot/policies/pi0fast/processor_pi0fast.py +++ b/src/lerobot/policies/pi0fast/processor_pi0fast.py @@ -24,7 +24,7 @@ from lerobot.processor import ( NormalizerProcessorStep, PolicyProcessorPipeline, ProcessorKwargs, - RenameProcessorStep, + RenameObservationsProcessorStep, UnnormalizerProcessorStep, ) @@ -63,7 +63,7 @@ def make_pi0fast_pre_post_processors( postprocessor_kwargs = {} input_steps = [ - RenameProcessorStep(rename_map={}), # To mimic the same processor as pretrained one + RenameObservationsProcessorStep(rename_map={}), # To mimic the same processor as pretrained one AddBatchDimensionProcessorStep(), DeviceProcessorStep(device=config.device), NormalizerProcessorStep( diff --git a/src/lerobot/policies/sac/processor_sac.py b/src/lerobot/policies/sac/processor_sac.py index 0098f1999..0caeec32a 100644 --- a/src/lerobot/policies/sac/processor_sac.py +++ b/src/lerobot/policies/sac/processor_sac.py @@ -25,7 +25,7 @@ from lerobot.processor import ( NormalizerProcessorStep, PolicyProcessorPipeline, ProcessorKwargs, - RenameProcessorStep, + RenameObservationsProcessorStep, UnnormalizerProcessorStep, ) @@ -64,7 +64,7 @@ def make_sac_pre_post_processors( postprocessor_kwargs = {} input_steps = [ - RenameProcessorStep(rename_map={}), + RenameObservationsProcessorStep(rename_map={}), AddBatchDimensionProcessorStep(), DeviceProcessorStep(device=config.device), NormalizerProcessorStep( diff --git a/src/lerobot/policies/smolvla/processor_smolvla.py b/src/lerobot/policies/smolvla/processor_smolvla.py index 00b479f42..92002ebad 100644 --- a/src/lerobot/policies/smolvla/processor_smolvla.py +++ b/src/lerobot/policies/smolvla/processor_smolvla.py @@ -16,7 +16,7 @@ import torch -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.constants import POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig from lerobot.processor import ( @@ -27,7 +27,7 @@ from lerobot.processor import ( PolicyProcessorPipeline, ProcessorKwargs, ProcessorStepRegistry, - RenameProcessorStep, + RenameObservationsProcessorStep, TokenizerProcessorStep, UnnormalizerProcessorStep, ) @@ -69,7 +69,7 @@ def make_smolvla_pre_post_processors( postprocessor_kwargs = {} input_steps = [ - RenameProcessorStep(rename_map={}), # To mimic the same processor as pretrained one + RenameObservationsProcessorStep(rename_map={}), # To mimic the same processor as pretrained one AddBatchDimensionProcessorStep(), SmolVLANewLineProcessor(), TokenizerProcessorStep( @@ -137,5 +137,7 @@ class SmolVLANewLineProcessor(ComplementaryDataProcessorStep): return new_complementary_data - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features diff --git a/src/lerobot/policies/tdmpc/processor_tdmpc.py b/src/lerobot/policies/tdmpc/processor_tdmpc.py index 77497bd23..76e7b7ab1 100644 --- a/src/lerobot/policies/tdmpc/processor_tdmpc.py +++ b/src/lerobot/policies/tdmpc/processor_tdmpc.py @@ -24,7 +24,7 @@ from lerobot.processor import ( NormalizerProcessorStep, PolicyProcessorPipeline, ProcessorKwargs, - RenameProcessorStep, + RenameObservationsProcessorStep, UnnormalizerProcessorStep, ) @@ -63,7 +63,7 @@ def make_tdmpc_pre_post_processors( postprocessor_kwargs = {} input_steps = [ - RenameProcessorStep(rename_map={}), + RenameObservationsProcessorStep(rename_map={}), AddBatchDimensionProcessorStep(), DeviceProcessorStep(device=config.device), NormalizerProcessorStep( diff --git a/src/lerobot/policies/vqbet/processor_vqbet.py b/src/lerobot/policies/vqbet/processor_vqbet.py index 08d1de334..41a9d66f8 100644 --- a/src/lerobot/policies/vqbet/processor_vqbet.py +++ b/src/lerobot/policies/vqbet/processor_vqbet.py @@ -25,7 +25,7 @@ from lerobot.processor import ( NormalizerProcessorStep, PolicyProcessorPipeline, ProcessorKwargs, - RenameProcessorStep, + RenameObservationsProcessorStep, UnnormalizerProcessorStep, ) @@ -64,7 +64,7 @@ def make_vqbet_pre_post_processors( postprocessor_kwargs = {} input_steps = [ - RenameProcessorStep(rename_map={}), # Let the possibility to the user to rename the keys + RenameObservationsProcessorStep(rename_map={}), # Let the possibility to the user to rename the keys AddBatchDimensionProcessorStep(), DeviceProcessorStep(device=config.device), NormalizerProcessorStep( diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 89649fd13..66d074eb6 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -54,7 +54,7 @@ from .pipeline import ( RobotProcessorPipeline, TruncatedProcessorStep, ) -from .rename_processor import RenameProcessorStep +from .rename_processor import RenameObservationsProcessorStep from .tokenizer_processor import TokenizerProcessorStep __all__ = [ @@ -85,7 +85,7 @@ __all__ = [ "ProcessorKwargs", "ProcessorStep", "ProcessorStepRegistry", - "RenameProcessorStep", + "RenameObservationsProcessorStep", "RewardClassifierProcessorStep", "RewardProcessorStep", "DataProcessorPipeline", diff --git a/src/lerobot/processor/batch_processor.py b/src/lerobot/processor/batch_processor.py index d0956d5f3..64bb1f6f3 100644 --- a/src/lerobot/processor/batch_processor.py +++ b/src/lerobot/processor/batch_processor.py @@ -24,7 +24,7 @@ from dataclasses import dataclass, field from torch import Tensor -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE from .core import EnvTransition @@ -60,7 +60,9 @@ class AddBatchDimensionActionStep(ActionProcessorStep): return action return action.unsqueeze(0) - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Returns the input features unchanged. @@ -116,7 +118,9 @@ class AddBatchDimensionObservationStep(ObservationProcessorStep): observation[key] = value.unsqueeze(0) return observation - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Returns the input features unchanged. @@ -171,7 +175,9 @@ class AddBatchDimensionComplementaryDataStep(ComplementaryDataProcessorStep): complementary_data["task_index"] = task_index_value.unsqueeze(0) return complementary_data - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Returns the input features unchanged. @@ -226,7 +232,9 @@ class AddBatchDimensionProcessorStep(ProcessorStep): transition = self.to_batch_complementary_data_processor(transition) return transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Returns the input features unchanged. diff --git a/src/lerobot/processor/converters.py b/src/lerobot/processor/converters.py index 566b67402..cdc1f8621 100644 --- a/src/lerobot/processor/converters.py +++ b/src/lerobot/processor/converters.py @@ -25,7 +25,6 @@ import numpy as np import torch from lerobot.constants import ACTION, DONE, OBS_IMAGES, OBS_STATE, REWARD, TRUNCATED -from lerobot.utils.rotation import Rotation from .core import EnvTransition, TransitionKey @@ -291,17 +290,8 @@ def action_to_transition(action: dict[str, Any]) -> EnvTransition: Returns: An `EnvTransition` containing the formatted action. """ - act_dict: dict[str, Any] = {} - for k, v in action.items(): - # Check if the value is a type that should not be converted to a tensor. - if isinstance(v, (Rotation, dict)): - act_dict[f"{ACTION}.{k}"] = v - continue - arr = np.array(v) if np.isscalar(v) else v - act_dict[f"{ACTION}.{k}"] = to_tensor(arr) - - return create_transition(observation={}, action=act_dict) + return create_transition(observation={}, action=action) def observation_to_transition(observation: dict[str, Any]) -> EnvTransition: @@ -320,18 +310,12 @@ def observation_to_transition(observation: dict[str, Any]) -> EnvTransition: """ state, images = _split_obs_to_state_and_images(observation) - obs_dict: dict[str, Any] = {} - for k, v in state.items(): - arr = np.array(v) if np.isscalar(v) else v - obs_dict[f"{OBS_STATE}.{k}"] = to_tensor(arr) + image_observations = {f"{OBS_IMAGES}.{cam}": img for cam, img in images.items()} - for cam, img in images.items(): - obs_dict[f"{OBS_IMAGES}.{cam}"] = img - - return create_transition(observation=obs_dict, action={}) + return create_transition(observation={**state, **image_observations}, action={}) -def transition_to_robot_action(transition: EnvTransition) -> dict[str, Any]: +def transition_to_action(transition: EnvTransition) -> dict[str, Any]: """ Extract a raw action dictionary for a robot from an `EnvTransition`. @@ -344,18 +328,7 @@ def transition_to_robot_action(transition: EnvTransition) -> dict[str, Any]: Returns: A dictionary representing the raw robot action. """ - out: dict[str, Any] = {} - action_dict = transition.get(TransitionKey.ACTION) or {} - - if action_dict is None: - return out - - for k, v in action_dict.items(): - if isinstance(k, str) and k.startswith(f"{ACTION}.") and k.endswith((".pos", ".vel")): - out_key = k[len(f"{ACTION}.") :] # Strip the 'action.' prefix. - out[out_key] = float(v) - - return out + return transition.get(TransitionKey.ACTION) def merge_transitions(transitions: Sequence[EnvTransition] | EnvTransition) -> EnvTransition: diff --git a/src/lerobot/processor/delta_action_processor.py b/src/lerobot/processor/delta_action_processor.py index 0af982f82..0135705bd 100644 --- a/src/lerobot/processor/delta_action_processor.py +++ b/src/lerobot/processor/delta_action_processor.py @@ -18,8 +18,7 @@ from dataclasses import dataclass from torch import Tensor -from lerobot.configs.types import FeatureType, PolicyFeature -from lerobot.constants import ACTION +from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from .pipeline import ActionProcessorStep, ProcessorStepRegistry @@ -47,20 +46,24 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep): # TODO (maractingi): add rotation delta_action = { - f"{ACTION}.delta_x": action[0], - f"{ACTION}.delta_y": action[1], - f"{ACTION}.delta_z": action[2], + "delta_x": action[0], + "delta_y": action[1], + "delta_z": action[2], } if self.use_gripper: - delta_action[f"{ACTION}.gripper"] = action[3] + delta_action["gripper"] = action[3] return delta_action - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - features[f"{ACTION}.delta_x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.delta_y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.delta_z"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + features[PipelineFeatureType.ACTION]["delta_x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["delta_y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["delta_z"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) if self.use_gripper: - features[f"{ACTION}.gripper"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["gripper"] = PolicyFeature( + type=FeatureType.ACTION, shape=(1,) + ) return features @@ -89,10 +92,10 @@ class MapDeltaActionToRobotActionStep(ActionProcessorStep): def action(self, action: dict) -> dict: # NOTE (maractingi): Action can be a dict from the teleop_devices or a tensor from the policy # TODO (maractingi): changing this target_xyz naming convention from the teleop_devices - delta_x = action.pop(f"{ACTION}.delta_x", 0.0) - delta_y = action.pop(f"{ACTION}.delta_y", 0.0) - delta_z = action.pop(f"{ACTION}.delta_z", 0.0) - gripper = action.pop(f"{ACTION}.gripper", 1.0) # Default to "stay" (1.0) + delta_x = action.pop("delta_x", 0.0) + delta_y = action.pop("delta_y", 0.0) + delta_z = action.pop("delta_z", 0.0) + gripper = action.pop("gripper", 1.0) # Default to "stay" (1.0) # Determine if the teleoperator is actively providing input # Consider enabled if any significant movement delta is detected @@ -112,31 +115,33 @@ class MapDeltaActionToRobotActionStep(ActionProcessorStep): # Update action with robot target format action = { - f"{ACTION}.enabled": enabled, - f"{ACTION}.target_x": scaled_delta_x, - f"{ACTION}.target_y": scaled_delta_y, - f"{ACTION}.target_z": scaled_delta_z, - f"{ACTION}.target_wx": target_wx, - f"{ACTION}.target_wy": target_wy, - f"{ACTION}.target_wz": target_wz, - f"{ACTION}.gripper": float(gripper), + "enabled": enabled, + "target_x": scaled_delta_x, + "target_y": scaled_delta_y, + "target_z": scaled_delta_z, + "target_wx": target_wx, + "target_wy": target_wy, + "target_wz": target_wz, + "gripper": float(gripper), } return action - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """Transform features to match output format.""" - features.pop(f"{ACTION}.delta_x", None) - features.pop(f"{ACTION}.delta_y", None) - features.pop(f"{ACTION}.delta_z", None) - features.pop(f"{ACTION}.gripper", None) + features[PipelineFeatureType.ACTION].pop("delta_x", None) + features[PipelineFeatureType.ACTION].pop("delta_y", None) + features[PipelineFeatureType.ACTION].pop("delta_z", None) + features[PipelineFeatureType.ACTION].pop("gripper", None) - features[f"{ACTION}.enabled"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_z"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_wx"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_wy"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_wz"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.gripper"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["enabled"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_z"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_wx"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_wy"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_wz"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["gripper"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) return features diff --git a/src/lerobot/processor/device_processor.py b/src/lerobot/processor/device_processor.py index 5f1a190b7..ffe8a6af7 100644 --- a/src/lerobot/processor/device_processor.py +++ b/src/lerobot/processor/device_processor.py @@ -24,7 +24,7 @@ from typing import Any import torch -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.utils.utils import get_safe_torch_device from .core import EnvTransition, TransitionKey @@ -169,7 +169,9 @@ class DeviceProcessorStep(ProcessorStep): """ return {"device": self.device, "float_dtype": self.float_dtype} - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Returns the input features unchanged. diff --git a/src/lerobot/processor/gym_action_processor.py b/src/lerobot/processor/gym_action_processor.py index e4f57c4d9..af728c3e9 100644 --- a/src/lerobot/processor/gym_action_processor.py +++ b/src/lerobot/processor/gym_action_processor.py @@ -19,7 +19,7 @@ from dataclasses import dataclass import numpy as np import torch -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from .converters import to_tensor from .pipeline import ActionProcessorStep, ProcessorStepRegistry @@ -63,7 +63,9 @@ class Torch2NumpyActionProcessorStep(ActionProcessorStep): return numpy_action - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features @@ -87,5 +89,7 @@ class Numpy2TorchActionProcessorStep(ActionProcessorStep): torch_action = to_tensor(action, dtype=None) # Preserve original dtype return torch_action - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features diff --git a/src/lerobot/processor/hil_processor.py b/src/lerobot/processor/hil_processor.py index fe12b27e2..17a82edc0 100644 --- a/src/lerobot/processor/hil_processor.py +++ b/src/lerobot/processor/hil_processor.py @@ -24,7 +24,7 @@ import numpy as np import torch import torchvision.transforms.functional as F # noqa: N812 -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.constants import ACTION from lerobot.teleoperators.teleoperator import Teleoperator from lerobot.teleoperators.utils import TeleopEvents @@ -121,7 +121,9 @@ class AddTeleopActionAsComplimentaryDataStep(ComplementaryDataProcessorStep): new_complementary_data[TELEOP_ACTION_KEY] = self.teleop_device.get_action() return new_complementary_data - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features @@ -161,7 +163,9 @@ class AddTeleopEventsAsInfoStep(InfoProcessorStep): new_info.update(teleop_events) return new_info - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features @@ -232,7 +236,9 @@ class ImageCropResizeProcessorStep(ObservationProcessorStep): "resize_size": self.resize_size, } - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Updates the image feature shapes in the policy features dictionary if resizing is applied. @@ -244,9 +250,13 @@ class ImageCropResizeProcessorStep(ObservationProcessorStep): """ if self.resize_size is None: return features - for key in features: + for key in features[PipelineFeatureType.OBSERVATION]: if "image" in key: - features[key] = PolicyFeature(type=features[key].type, shape=self.resize_size) + nb_channel = features[PipelineFeatureType.OBSERVATION][key].shape[0] + features[PipelineFeatureType.OBSERVATION][key] = PolicyFeature( + type=features[PipelineFeatureType.OBSERVATION][key].type, + shape=(nb_channel, *self.resize_size), + ) return features @@ -295,7 +305,9 @@ class TimeLimitProcessorStep(TruncatedProcessorStep): """Resets the step counter, typically called at the start of a new episode.""" self.current_step = 0 - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features @@ -368,7 +380,9 @@ class GripperPenaltyProcessorStep(ComplementaryDataProcessorStep): """Resets the processor's internal state.""" pass - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features @@ -468,7 +482,9 @@ class InterventionActionProcessorStep(ProcessorStep): "terminate_on_success": self.terminate_on_success, } - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features @@ -570,5 +586,7 @@ class RewardClassifierProcessorStep(ProcessorStep): "terminate_on_success": self.terminate_on_success, } - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features diff --git a/src/lerobot/processor/joint_observations_processor.py b/src/lerobot/processor/joint_observations_processor.py index 81ba66b53..ab3c6ecc1 100644 --- a/src/lerobot/processor/joint_observations_processor.py +++ b/src/lerobot/processor/joint_observations_processor.py @@ -19,7 +19,7 @@ from typing import Any import torch -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.constants import OBS_STATE from lerobot.processor.pipeline import ( ObservationProcessorStep, @@ -103,7 +103,9 @@ class JointVelocityProcessorStep(ObservationProcessorStep): """Resets the internal state, clearing the last known joint positions.""" self.last_joint_positions = None - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Updates the `observation.state` feature to reflect the added velocities. @@ -116,12 +118,14 @@ class JointVelocityProcessorStep(ObservationProcessorStep): Returns: The updated policy features dictionary. """ - if OBS_STATE in features: - original_feature = features[OBS_STATE] + if OBS_STATE in features[PipelineFeatureType.OBSERVATION]: + original_feature = features[PipelineFeatureType.OBSERVATION][OBS_STATE] # Double the shape to account for positions + velocities new_shape = (original_feature.shape[0] * 2,) + original_feature.shape[1:] - features[OBS_STATE] = PolicyFeature(type=original_feature.type, shape=new_shape) + features[PipelineFeatureType.OBSERVATION][OBS_STATE] = PolicyFeature( + type=original_feature.type, shape=new_shape + ) return features @@ -177,7 +181,9 @@ class MotorCurrentProcessorStep(ObservationProcessorStep): return new_observation - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Updates the `observation.state` feature to reflect the added motor currents. @@ -190,8 +196,8 @@ class MotorCurrentProcessorStep(ObservationProcessorStep): Returns: The updated policy features dictionary. """ - if OBS_STATE in features and self.robot is not None: - original_feature = features[OBS_STATE] + if OBS_STATE in features[PipelineFeatureType.OBSERVATION] and self.robot is not None: + original_feature = features[PipelineFeatureType.OBSERVATION][OBS_STATE] # Add motor current dimensions to the original state shape num_motors = 0 if hasattr(self.robot, "bus") and hasattr(self.robot.bus, "motors"): # type: ignore[attr-defined] @@ -199,5 +205,7 @@ class MotorCurrentProcessorStep(ObservationProcessorStep): if num_motors > 0: new_shape = (original_feature.shape[0] + num_motors,) + original_feature.shape[1:] - features[OBS_STATE] = PolicyFeature(type=original_feature.type, shape=new_shape) + features[PipelineFeatureType.OBSERVATION][OBS_STATE] = PolicyFeature( + type=original_feature.type, shape=new_shape + ) return features diff --git a/src/lerobot/processor/migrate_policy_normalization.py b/src/lerobot/processor/migrate_policy_normalization.py index 659a43856..ba5ccaba7 100644 --- a/src/lerobot/processor/migrate_policy_normalization.py +++ b/src/lerobot/processor/migrate_policy_normalization.py @@ -57,7 +57,7 @@ from .batch_processor import AddBatchDimensionProcessorStep from .device_processor import DeviceProcessorStep from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep from .pipeline import PolicyProcessorPipeline -from .rename_processor import RenameProcessorStep +from .rename_processor import RenameObservationsProcessorStep # Policy type to class mapping POLICY_CLASSES = { @@ -482,7 +482,7 @@ def main(): # Create preprocessor with two normalizers (following the pattern from processor factories) preprocessor_steps = [ - RenameProcessorStep(rename_map={}), + RenameObservationsProcessorStep(rename_map={}), NormalizerProcessorStep( features={**input_features, **output_features}, norm_map=norm_map, diff --git a/src/lerobot/processor/normalize_processor.py b/src/lerobot/processor/normalize_processor.py index 9b502e54d..7e9c6b527 100644 --- a/src/lerobot/processor/normalize_processor.py +++ b/src/lerobot/processor/normalize_processor.py @@ -24,7 +24,7 @@ from typing import Any import torch from torch import Tensor -from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.configs.types import FeatureType, NormalizationMode, PipelineFeatureType, PolicyFeature from lerobot.datasets.lerobot_dataset import LeRobotDataset from .converters import from_tensor_to_numpy, to_tensor @@ -350,7 +350,9 @@ class NormalizerProcessorStep(_NormalizationMixin, ProcessorStep): return new_transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features @@ -404,7 +406,9 @@ class UnnormalizerProcessorStep(_NormalizationMixin, ProcessorStep): return new_transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features diff --git a/src/lerobot/processor/observation_processor.py b/src/lerobot/processor/observation_processor.py index bcb351669..71fdbbf0d 100644 --- a/src/lerobot/processor/observation_processor.py +++ b/src/lerobot/processor/observation_processor.py @@ -20,7 +20,7 @@ import numpy as np import torch from torch import Tensor -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE from .pipeline import ObservationProcessorStep, ProcessorStepRegistry @@ -128,7 +128,9 @@ class VanillaObservationProcessorStep(ObservationProcessorStep): def observation(self, observation): return self._process_observation(observation) - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Transforms feature keys from the Gym standard to the LeRobot standard. @@ -148,6 +150,10 @@ class VanillaObservationProcessorStep(ObservationProcessorStep): Returns: The policy features dictionary with standardized LeRobot keys. """ + # Build a new features mapping keyed by the same FeatureType buckets + # We assume callers already placed features in the correct FeatureType. + new_features: dict[PipelineFeatureType, dict[str, PolicyFeature]] = {ft: {} for ft in features.keys()} + exact_pairs = { "pixels": OBS_IMAGE, "environment_state": OBS_ENV_STATE, @@ -158,29 +164,43 @@ class VanillaObservationProcessorStep(ObservationProcessorStep): "pixels.": f"{OBS_IMAGES}.", } - for key in list(features.keys()): - matched_prefix = False - for old_prefix, new_prefix in prefix_pairs.items(): - prefixed_old = f"observation.{old_prefix}" - if key.startswith(prefixed_old): - suffix = key[len(prefixed_old) :] - features[f"{new_prefix}{suffix}"] = features.pop(key) - matched_prefix = True - break + # Iterate over all incoming feature buckets and normalize/move each entry + for src_ft, bucket in features.items(): + for key, feat in list(bucket.items()): + handled = False - if key.startswith(old_prefix): - suffix = key[len(old_prefix) :] - features[f"{new_prefix}{suffix}"] = features.pop(key) - matched_prefix = True - break - - if matched_prefix: - continue - - for old, new in exact_pairs.items(): - if key == old or key == f"observation.{old}": - if key in features: - features[new] = features.pop(key) + # Prefix-based rules (e.g. pixels.cam1 -> OBS_IMAGES.cam1) + for old_prefix, new_prefix in prefix_pairs.items(): + prefixed_old = f"observation.{old_prefix}" + if key.startswith(prefixed_old): + suffix = key[len(prefixed_old) :] + new_key = f"{new_prefix}{suffix}" + new_features[src_ft][new_key] = feat + handled = True break - return features + if key.startswith(old_prefix): + suffix = key[len(old_prefix) :] + new_key = f"{new_prefix}{suffix}" + new_features[src_ft][new_key] = feat + handled = True + break + + if handled: + continue + + # Exact-name rules (pixels, environment_state, agent_pos) + for old, new in exact_pairs.items(): + if key == old or key == f"observation.{old}": + new_key = new + new_features[src_ft][new_key] = feat + handled = True + break + + if handled: + continue + + # Default: keep key in the same source FeatureType bucket + new_features[src_ft][key] = feat + + return new_features diff --git a/src/lerobot/processor/pipeline.py b/src/lerobot/processor/pipeline.py index f26610e9e..274bc1e72 100644 --- a/src/lerobot/processor/pipeline.py +++ b/src/lerobot/processor/pipeline.py @@ -29,7 +29,7 @@ import torch from huggingface_hub import ModelHubMixin, hf_hub_download from safetensors.torch import load_file, save_file -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from .converters import batch_to_transition, create_transition, transition_to_batch from .core import EnvTransition, TransitionKey @@ -169,7 +169,9 @@ class ProcessorStep(ABC): return None @abstractmethod - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features @@ -734,12 +736,14 @@ class DataProcessorPipeline(ModelHubMixin, Generic[TOutput]): if not isinstance(step, ProcessorStep): raise TypeError(f"Step {i} ({type(step).__name__}) must inherit from ProcessorStep") - def transform_features(self, initial_features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, initial_features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Apply ALL steps in order. Only if a step has a features method, it will be called. We aggregate the dataset features of all steps. """ - features: dict[str, PolicyFeature] = deepcopy(initial_features) + features: dict[PipelineFeatureType, dict[str, PolicyFeature]] = deepcopy(initial_features) for _, step in enumerate(self.steps): out = step.transform_features(features) @@ -1114,5 +1118,7 @@ class IdentityProcessorStep(ProcessorStep): def __call__(self, transition: EnvTransition) -> EnvTransition: return transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features diff --git a/src/lerobot/processor/rename_processor.py b/src/lerobot/processor/rename_processor.py index f233e1881..6cae5921f 100644 --- a/src/lerobot/processor/rename_processor.py +++ b/src/lerobot/processor/rename_processor.py @@ -17,14 +17,14 @@ from copy import deepcopy from dataclasses import dataclass, field from typing import Any -from lerobot.configs.types import PolicyFeature +from lerobot.configs.types import PipelineFeatureType, PolicyFeature from .pipeline import ObservationProcessorStep, ProcessorStepRegistry @dataclass -@ProcessorStepRegistry.register(name="rename_processor") -class RenameProcessorStep(ObservationProcessorStep): +@ProcessorStepRegistry.register(name="rename_observations_processor") +class RenameObservationsProcessorStep(ObservationProcessorStep): """ A processor step that renames keys in an observation dictionary. @@ -53,12 +53,18 @@ class RenameProcessorStep(ObservationProcessorStep): def get_config(self) -> dict[str, Any]: return {"rename_map": self.rename_map} - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """Transforms: - Each key in the observation that appears in `rename_map` is renamed to its value. - Keys not in `rename_map` remain unchanged. """ - return {self.rename_map.get(k, k): v for k, v in features.items()} + new_features: dict[PipelineFeatureType, dict[str, PolicyFeature]] = features.copy() + new_features[PipelineFeatureType.OBSERVATION] = { + self.rename_map.get(k, k): v for k, v in features[PipelineFeatureType.OBSERVATION].items() + } + return new_features def rename_stats(stats: dict[str, dict[str, Any]], rename_map: dict[str, str]) -> dict[str, dict[str, Any]]: diff --git a/src/lerobot/processor/tokenizer_processor.py b/src/lerobot/processor/tokenizer_processor.py index 3ab21ecdd..3698fff9d 100644 --- a/src/lerobot/processor/tokenizer_processor.py +++ b/src/lerobot/processor/tokenizer_processor.py @@ -28,7 +28,7 @@ from typing import TYPE_CHECKING, Any import torch -from lerobot.configs.types import FeatureType, PolicyFeature +from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.constants import OBS_LANGUAGE_ATTENTION_MASK, OBS_LANGUAGE_TOKENS from lerobot.utils.import_utils import _transformers_available @@ -243,7 +243,9 @@ class TokenizerProcessorStep(ObservationProcessorStep): return config - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: """ Adds feature definitions for the language tokens and attention mask. @@ -257,12 +259,14 @@ class TokenizerProcessorStep(ObservationProcessorStep): The updated dictionary of policy features. """ # Add a feature for the token IDs if it doesn't already exist - if OBS_LANGUAGE_TOKENS not in features: - features[OBS_LANGUAGE_TOKENS] = PolicyFeature(type=FeatureType.LANGUAGE, shape=(self.max_length,)) + if OBS_LANGUAGE_TOKENS not in features[PipelineFeatureType.OBSERVATION]: + features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_TOKENS] = PolicyFeature( + type=FeatureType.LANGUAGE, shape=(self.max_length,) + ) # Add a feature for the attention mask if it doesn't already exist - if OBS_LANGUAGE_ATTENTION_MASK not in features: - features[OBS_LANGUAGE_ATTENTION_MASK] = PolicyFeature( + if OBS_LANGUAGE_ATTENTION_MASK not in features[PipelineFeatureType.OBSERVATION]: + features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_ATTENTION_MASK] = PolicyFeature( type=FeatureType.LANGUAGE, shape=(self.max_length,) ) diff --git a/src/lerobot/record.py b/src/lerobot/record.py index e7dfafc61..9888c8411 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -88,8 +88,8 @@ from lerobot.processor.converters import ( action_to_transition, identity_transition, observation_to_transition, + transition_to_action, transition_to_dataset_frame, - transition_to_robot_action, ) from lerobot.processor.rename_processor import rename_stats from lerobot.robots import ( # noqa: F401 @@ -263,7 +263,7 @@ def record_loop( or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=identity_transition, - to_output=transition_to_robot_action, + to_output=transition_to_action, ) ) robot_observation_processor: RobotProcessorPipeline[EnvTransition] = ( diff --git a/src/lerobot/replay.py b/src/lerobot/replay.py index 0b4451cbe..ae2d01e04 100644 --- a/src/lerobot/replay.py +++ b/src/lerobot/replay.py @@ -48,7 +48,7 @@ from pprint import pformat from lerobot.configs import parser from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.processor import IdentityProcessorStep, RobotProcessorPipeline -from lerobot.processor.converters import action_to_transition, transition_to_robot_action +from lerobot.processor.converters import action_to_transition, transition_to_action from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, @@ -98,7 +98,7 @@ def replay(cfg: ReplayConfig): robot_action_processor = cfg.robot_action_processor or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=action_to_transition, - to_output=transition_to_robot_action, # type: ignore[arg-type] + to_output=transition_to_action, # type: ignore[arg-type] ) # Reset processor diff --git a/src/lerobot/robots/so100_follower/robot_kinematic_processor.py b/src/lerobot/robots/so100_follower/robot_kinematic_processor.py index 0eb5ad5cd..9db737cfa 100644 --- a/src/lerobot/robots/so100_follower/robot_kinematic_processor.py +++ b/src/lerobot/robots/so100_follower/robot_kinematic_processor.py @@ -18,8 +18,8 @@ from dataclasses import dataclass, field import numpy as np -from lerobot.configs.types import FeatureType, PolicyFeature -from lerobot.constants import ACTION, OBS_STATE +from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature +from lerobot.constants import OBS_STATE from lerobot.model.kinematics import RobotKinematics from lerobot.processor import ( ActionProcessorStep, @@ -91,13 +91,13 @@ class EEReferenceAndDelta(ActionProcessorStep): # Current pose from FK on measured joints t_curr = self.kinematics.forward_kinematics(q) - enabled = bool(new_action.pop(f"{ACTION}.enabled", 0)) - tx = float(new_action.pop(f"{ACTION}.target_x", 0.0)) - ty = float(new_action.pop(f"{ACTION}.target_y", 0.0)) - tz = float(new_action.pop(f"{ACTION}.target_z", 0.0)) - wx = float(new_action.pop(f"{ACTION}.target_wx", 0.0)) - wy = float(new_action.pop(f"{ACTION}.target_wy", 0.0)) - wz = float(new_action.pop(f"{ACTION}.target_wz", 0.0)) + enabled = bool(new_action.pop("enabled", 0)) + tx = float(new_action.pop("target_x", 0.0)) + ty = float(new_action.pop("target_y", 0.0)) + tz = float(new_action.pop("target_z", 0.0)) + wx = float(new_action.pop("target_wx", 0.0)) + wy = float(new_action.pop("target_wy", 0.0)) + wz = float(new_action.pop("target_wz", 0.0)) desired = None @@ -133,12 +133,12 @@ class EEReferenceAndDelta(ActionProcessorStep): # Write action fields pos = desired[:3, 3] tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec() - new_action[f"{ACTION}.ee.x"] = float(pos[0]) - new_action[f"{ACTION}.ee.y"] = float(pos[1]) - new_action[f"{ACTION}.ee.z"] = float(pos[2]) - new_action[f"{ACTION}.ee.wx"] = float(tw[0]) - new_action[f"{ACTION}.ee.wy"] = float(tw[1]) - new_action[f"{ACTION}.ee.wz"] = float(tw[2]) + new_action["ee.x"] = float(pos[0]) + new_action["ee.y"] = float(pos[1]) + new_action["ee.z"] = float(pos[2]) + new_action["ee.wx"] = float(tw[0]) + new_action["ee.wy"] = float(tw[1]) + new_action["ee.wz"] = float(tw[2]) self._prev_enabled = enabled return new_action @@ -149,21 +149,23 @@ class EEReferenceAndDelta(ActionProcessorStep): self.reference_ee_pose = None self._command_when_disabled = None - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - features.pop(f"{ACTION}.enabled", None) - features.pop(f"{ACTION}.target_x", None) - features.pop(f"{ACTION}.target_y", None) - features.pop(f"{ACTION}.target_z", None) - features.pop(f"{ACTION}.target_wx", None) - features.pop(f"{ACTION}.target_wy", None) - features.pop(f"{ACTION}.target_wz", None) + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + features[PipelineFeatureType.ACTION].pop("enabled", None) + features[PipelineFeatureType.ACTION].pop("target_x", None) + features[PipelineFeatureType.ACTION].pop("target_y", None) + features[PipelineFeatureType.ACTION].pop("target_z", None) + features[PipelineFeatureType.ACTION].pop("target_wx", None) + features[PipelineFeatureType.ACTION].pop("target_wy", None) + features[PipelineFeatureType.ACTION].pop("target_wz", None) - features[f"{ACTION}.ee.x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.ee.y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.ee.z"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.ee.wx"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.ee.wy"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.ee.wz"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["ee.x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["ee.y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["ee.z"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["ee.wx"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["ee.wy"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["ee.wz"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) return features @@ -191,12 +193,12 @@ class EEBoundsAndSafety(ActionProcessorStep): _last_twist: np.ndarray | None = field(default=None, init=False, repr=False) def action(self, act: dict) -> dict: - x = act.get(f"{ACTION}.ee.x", None) - y = act.get(f"{ACTION}.ee.y", None) - z = act.get(f"{ACTION}.ee.z", None) - wx = act.get(f"{ACTION}.ee.wx", None) - wy = act.get(f"{ACTION}.ee.wy", None) - wz = act.get(f"{ACTION}.ee.wz", None) + x = act.get("ee.x", None) + y = act.get("ee.y", None) + z = act.get("ee.z", None) + wx = act.get("ee.wx", None) + wy = act.get("ee.wy", None) + wz = act.get("ee.wz", None) if None in (x, y, z, wx, wy, wz): raise ValueError( @@ -220,12 +222,12 @@ class EEBoundsAndSafety(ActionProcessorStep): self._last_pos = pos self._last_twist = twist - act[f"{ACTION}.ee.x"] = float(pos[0]) - act[f"{ACTION}.ee.y"] = float(pos[1]) - act[f"{ACTION}.ee.z"] = float(pos[2]) - act[f"{ACTION}.ee.wx"] = float(twist[0]) - act[f"{ACTION}.ee.wy"] = float(twist[1]) - act[f"{ACTION}.ee.wz"] = float(twist[2]) + act["ee.x"] = float(pos[0]) + act["ee.y"] = float(pos[1]) + act["ee.z"] = float(pos[2]) + act["ee.wx"] = float(twist[0]) + act["ee.wy"] = float(twist[1]) + act["ee.wz"] = float(twist[2]) return act def reset(self): @@ -233,9 +235,9 @@ class EEBoundsAndSafety(ActionProcessorStep): self._last_pos = None self._last_twist = None - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - # check if features as f"{ACTION}.ee.{x,y,z,wx,wy,wz}" - + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features @@ -266,12 +268,12 @@ class InverseKinematicsEEToJoints(ProcessorStep): act = new_transition.get(TransitionKey.ACTION) or {} comp = new_transition.get(TransitionKey.COMPLEMENTARY_DATA) or {} - x = act.get(f"{ACTION}.ee.x", None) - y = act.get(f"{ACTION}.ee.y", None) - z = act.get(f"{ACTION}.ee.z", None) - wx = act.get(f"{ACTION}.ee.wx", None) - wy = act.get(f"{ACTION}.ee.wy", None) - wz = act.get(f"{ACTION}.ee.wz", None) + x = act.get("ee.x", None) + y = act.get("ee.y", None) + z = act.get("ee.z", None) + wx = act.get("ee.wx", None) + wy = act.get("ee.wy", None) + wz = act.get("ee.wz", None) if None in (x, y, z, wx, wy, wz): return new_transition @@ -303,18 +305,24 @@ class InverseKinematicsEEToJoints(ProcessorStep): if name == "gripper": # TODO(pepijn): Investigate if this is correct # Do we want an observation key in the action field? - new_act[f"{ACTION}.gripper.pos"] = float(raw["gripper"]) + new_act["gripper.pos"] = float(raw["gripper"]) else: - new_act[f"{ACTION}.{name}.pos"] = float(q_target[i]) + new_act[f"{name}.pos"] = float(q_target[i]) new_transition[TransitionKey.ACTION] = new_act if not self.initial_guess_current_joints: new_transition[TransitionKey.COMPLEMENTARY_DATA]["reference_joint_positions"] = q_target return new_transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - features[f"{ACTION}.gripper.pos"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + features[PipelineFeatureType.ACTION]["gripper.pos"] = PolicyFeature( + type=FeatureType.ACTION, shape=(1,) + ) for name in self.motor_names: - features[f"{ACTION}.{name}.pos"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION][f"{name}.pos"] = PolicyFeature( + type=FeatureType.ACTION, shape=(1,) + ) return features @@ -353,8 +361,8 @@ class GripperVelocityToJoint(ProcessorStep): act = new_transition.get(TransitionKey.ACTION) or {} comp = new_transition.get(TransitionKey.COMPLEMENTARY_DATA) or {} - if f"{ACTION}.gripper" not in act: - raise ValueError(f"Required action key '{ACTION}.gripper' not found in transition") + if "gripper" not in act: + raise ValueError("Required action key 'gripper' not found in transition") if "gripper" not in self.motor_names: raise ValueError( @@ -365,33 +373,39 @@ class GripperVelocityToJoint(ProcessorStep): # Discrete gripper actions are in [0, 1, 2] # 0: open, 1: close, 2: stay # We need to shift them to [-1, 0, 1] and then scale them to clip_max - gripper_action = act.get(f"{ACTION}.gripper", 1.0) + gripper_action = act.get("gripper", 1.0) gripper_action = gripper_action - 1.0 gripper_action *= self.clip_max - act[f"{ACTION}.gripper"] = gripper_action + act["gripper"] = gripper_action # Get current gripper position from complementary data raw = comp.get("raw_joint_positions") or {} curr_pos = float(raw.get("gripper")) - # Compute desired gripper position - u = float(act.get(f"{ACTION}.gripper", 0.0)) + # Compute desired gripper velocity + u = float(act.get("gripper", 0.0)) delta = u * float(self.speed_factor) gripper_pos = float(np.clip(curr_pos + delta, self.clip_min, self.clip_max)) new_act = dict(act) - new_act[f"{ACTION}.gripper.pos"] = gripper_pos - new_act.pop(f"{ACTION}.gripper", None) + new_act["gripper.pos"] = gripper_pos + new_act.pop("gripper", None) new_transition[TransitionKey.ACTION] = new_act obs[f"{OBS_STATE}.gripper.pos"] = curr_pos new_transition[TransitionKey.OBSERVATION] = obs return new_transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - features.pop(f"{ACTION}.gripper", None) - features[f"{ACTION}.gripper.pos"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{OBS_STATE}.gripper.pos"] = PolicyFeature(type=FeatureType.STATE, shape=(1,)) + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + features[PipelineFeatureType.ACTION].pop("gripper", None) + features[PipelineFeatureType.ACTION]["gripper.pos"] = PolicyFeature( + type=FeatureType.ACTION, shape=(1,) + ) + features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.gripper.pos"] = PolicyFeature( + type=FeatureType.STATE, shape=(1,) + ) return features @@ -430,10 +444,14 @@ class ForwardKinematicsJointsToEE(ObservationProcessorStep): obs[f"{OBS_STATE}.ee.wz"] = float(tw[2]) return obs - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We specify the dataset features of this step that we want to be stored in the dataset for k in ["x", "y", "z", "wx", "wy", "wz"]: - features[f"{OBS_STATE}.ee.{k}"] = PolicyFeature(type=FeatureType.STATE, shape=(1,)) + features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.ee.{k}"] = PolicyFeature( + type=FeatureType.STATE, shape=(1,) + ) return features @@ -466,5 +484,7 @@ class AddRobotObservationAsComplimentaryData(ComplementaryDataProcessorStep): } return new_comp - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features diff --git a/src/lerobot/teleoperate.py b/src/lerobot/teleoperate.py index ff57511dd..44ef73278 100644 --- a/src/lerobot/teleoperate.py +++ b/src/lerobot/teleoperate.py @@ -67,7 +67,7 @@ from lerobot.processor.converters import ( action_to_transition, identity_transition, observation_to_transition, - transition_to_robot_action, + transition_to_action, ) from lerobot.robots import ( # noqa: F401 Robot, @@ -148,7 +148,7 @@ def teleop_loop( or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=identity_transition, - to_output=transition_to_robot_action, # type: ignore[arg-type] + to_output=transition_to_action, # type: ignore[arg-type] ) ) robot_observation_processor: RobotProcessorPipeline[EnvTransition] = ( diff --git a/src/lerobot/teleoperators/phone/phone_processor.py b/src/lerobot/teleoperators/phone/phone_processor.py index 8b3a3d3a7..b2c1b766b 100644 --- a/src/lerobot/teleoperators/phone/phone_processor.py +++ b/src/lerobot/teleoperators/phone/phone_processor.py @@ -16,7 +16,7 @@ from dataclasses import dataclass, field -from lerobot.configs.types import FeatureType, PolicyFeature +from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.constants import ACTION from lerobot.processor import ActionProcessorStep, ProcessorStepRegistry from lerobot.teleoperators.phone.config_phone import PhoneOS @@ -87,18 +87,20 @@ class MapPhoneActionToRobotAction(ActionProcessorStep): act[f"{ACTION}.gripper"] = gripper # Still send gripper action when disabled return act - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - features.pop(f"{ACTION}.phone.enabled", None) - features.pop(f"{ACTION}.phone.pos", None) - features.pop(f"{ACTION}.phone.rot", None) - features.pop(f"{ACTION}.phone.raw_inputs", None) + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + features[PipelineFeatureType.ACTION].pop("phone.enabled", None) + features[PipelineFeatureType.ACTION].pop("phone.pos", None) + features[PipelineFeatureType.ACTION].pop("phone.rot", None) + features[PipelineFeatureType.ACTION].pop("phone.raw_inputs", None) - features[f"{ACTION}.enabled"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_z"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_wx"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_wy"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.target_wz"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - features[f"{ACTION}.gripper"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["enabled"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_z"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_wx"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_wy"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["target_wz"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + features[PipelineFeatureType.ACTION]["gripper"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,)) return features diff --git a/tests/conftest.py b/tests/conftest.py index e273da50f..245cde526 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,7 +19,7 @@ import traceback import pytest from serial import SerialException -from lerobot.configs.types import FeatureType, PolicyFeature +from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from tests.utils import DEVICE # Import fixture modules as plugins @@ -83,7 +83,9 @@ def policy_feature_factory(): return _pf -def assert_contract_is_typed(features: dict[str, PolicyFeature]) -> None: +def assert_contract_is_typed(features: dict[PipelineFeatureType, dict[str, PolicyFeature]]) -> None: assert isinstance(features, dict) - assert all(isinstance(k, str) for k in features.keys()) - assert all(isinstance(v, PolicyFeature) for v in features.values()) + assert all(isinstance(k, PipelineFeatureType) for k in features.keys()) + assert all(isinstance(v, dict) for v in features.values()) + assert all(all(isinstance(nk, str) for nk in v.keys()) for v in features.values()) + assert all(all(isinstance(nv, PolicyFeature) for nv in v.values()) for v in features.values()) diff --git a/tests/processor/test_act_processor.py b/tests/processor/test_act_processor.py index c577405a8..ef3b72f54 100644 --- a/tests/processor/test_act_processor.py +++ b/tests/processor/test_act_processor.py @@ -29,7 +29,7 @@ from lerobot.processor import ( DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, - RenameProcessorStep, + RenameObservationsProcessorStep, TransitionKey, UnnormalizerProcessorStep, ) @@ -86,7 +86,7 @@ def test_make_act_processor_basic(): # Check steps in preprocessor assert len(preprocessor.steps) == 4 - assert isinstance(preprocessor.steps[0], RenameProcessorStep) + assert isinstance(preprocessor.steps[0], RenameObservationsProcessorStep) assert isinstance(preprocessor.steps[1], AddBatchDimensionProcessorStep) assert isinstance(preprocessor.steps[2], DeviceProcessorStep) assert isinstance(preprocessor.steps[3], NormalizerProcessorStep) diff --git a/tests/processor/test_converters.py b/tests/processor/test_converters.py index 688e4e17d..ee474c872 100644 --- a/tests/processor/test_converters.py +++ b/tests/processor/test_converters.py @@ -4,111 +4,13 @@ import torch from lerobot.processor import TransitionKey from lerobot.processor.converters import ( - action_to_transition, batch_to_transition, - observation_to_transition, to_tensor, transition_to_batch, transition_to_dataset_frame, - transition_to_robot_action, ) -def test_to_transition_teleop_action_prefix_and_tensor_conversion(): - # Scalars, arrays, and uint8 arrays are all converted to tensors - img = np.zeros((8, 12, 3), dtype=np.uint8) - act = { - "ee.x": 0.5, # scalar to torch tensor - "delta": np.array([1.0, 2.0]), # ndarray to torch tensor - "raw_img": img, # uint8 HWC to torch tensor - } - - tr = action_to_transition(act) - - # Should be an EnvTransition-like dict with ACTION populated - assert isinstance(tr, dict) - assert TransitionKey.ACTION in tr - assert "action.ee.x" in tr[TransitionKey.ACTION] - assert "action.delta" in tr[TransitionKey.ACTION] - assert "action.raw_img" in tr[TransitionKey.ACTION] - - # Types: all values -> torch tensor - assert isinstance(tr[TransitionKey.ACTION]["action.ee.x"], torch.Tensor) - assert tr[TransitionKey.ACTION]["action.ee.x"].item() == pytest.approx(0.5) - - assert isinstance(tr[TransitionKey.ACTION]["action.delta"], torch.Tensor) - assert tr[TransitionKey.ACTION]["action.delta"].shape == (2,) - assert torch.allclose(tr[TransitionKey.ACTION]["action.delta"], torch.tensor([1.0, 2.0])) - - assert isinstance(tr[TransitionKey.ACTION]["action.raw_img"], torch.Tensor) - assert tr[TransitionKey.ACTION]["action.raw_img"].dtype == torch.float32 # converted from uint8 - assert tr[TransitionKey.ACTION]["action.raw_img"].shape == (8, 12, 3) - - # Observation is created as empty dict by make_transition - assert TransitionKey.OBSERVATION in tr - assert isinstance(tr[TransitionKey.OBSERVATION], dict) - assert tr[TransitionKey.OBSERVATION] == {} - - -def test_to_transition_robot_observation_state_vs_images_split(): - # Create an observation with mixed content - img = np.full((10, 20, 3), 255, dtype=np.uint8) # image (uint8 HWC) - obs = { - "j1.pos": 10.0, # scalar to state to torch tensor - "j2.pos": np.float32(20.0), # scalar np to state to torch tensor - "image_front": img, # to images passthrough - "flag": np.int32(7), # scalar to state to torch tensor - "arr": np.array([1.5, 2.5]), # vector to state to torch tensor - } - - tr = observation_to_transition(obs) - assert isinstance(tr, dict) - assert TransitionKey.OBSERVATION in tr - - out = tr[TransitionKey.OBSERVATION] - # Check state keys are present and converted to tensors - for k in ("j1.pos", "j2.pos", "flag", "arr"): - key = f"observation.state.{k}" - assert key in out - v = out[key] - if k != "arr": - assert isinstance(v, torch.Tensor) and v.ndim == 0 - else: - assert isinstance(v, torch.Tensor) and v.ndim == 1 and v.shape == (2,) - - # Check image present as is - assert "observation.images.image_front" in out - assert isinstance(out["observation.images.image_front"], np.ndarray) - assert out["observation.images.image_front"].dtype == np.uint8 - assert out["observation.images.image_front"].shape == (10, 20, 3) - - # ACTION should be empty dict by make_transition - assert TransitionKey.ACTION in tr - assert isinstance(tr[TransitionKey.ACTION], dict) - assert tr[TransitionKey.ACTION] == {} - - -def test_to_output_robot_action_strips_prefix_and_filters_pos_keys_only(): - # Build a transition with mixed action keys - tr = { - TransitionKey.ACTION: { - "action.j1.pos": 11.0, # keep "j1.pos" - "action.gripper.pos": torch.tensor(33.0), # keep: tensor accepted - "action.ee.x": 0.5, # ignore (doesn't end with .pos) - "misc": "ignore_me", # ignore (no 'action.' prefix) - } - } - - out = transition_to_robot_action(tr) - # Only ".pos" keys with "action." prefix are retained and stripped to base names - assert set(out.keys()) == {"j1.pos", "gripper.pos"} - # Values converted to float - assert isinstance(out["j1.pos"], float) - assert isinstance(out["gripper.pos"], float) - assert out["j1.pos"] == pytest.approx(11.0) - assert out["gripper.pos"] == pytest.approx(33.0) - - def test_transition_to_dataset_frame_merge_and_pack_vectors_and_metadata(): # Fabricate dataset features (as stored in dataset.meta["features"]) features = { diff --git a/tests/processor/test_device_processor.py b/tests/processor/test_device_processor.py index 88bd184fb..113b1adf2 100644 --- a/tests/processor/test_device_processor.py +++ b/tests/processor/test_device_processor.py @@ -18,7 +18,7 @@ import tempfile import pytest import torch -from lerobot.configs.types import FeatureType, PolicyFeature +from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.processor import DataProcessorPipeline, DeviceProcessorStep, TransitionKey @@ -292,8 +292,10 @@ def test_features(): processor = DeviceProcessorStep(device="cpu") features = { - "observation.state": PolicyFeature(type=FeatureType.STATE, shape=(10,)), - "action": PolicyFeature(type=FeatureType.ACTION, shape=(5,)), + PipelineFeatureType.OBSERVATION: { + "observation.state": PolicyFeature(type=FeatureType.STATE, shape=(10,)) + }, + PipelineFeatureType.ACTION: {"action": PolicyFeature(type=FeatureType.ACTION, shape=(5,))}, } result = processor.transform_features(features) diff --git a/tests/processor/test_diffusion_processor.py b/tests/processor/test_diffusion_processor.py index 6e660937c..1e5e93b4d 100644 --- a/tests/processor/test_diffusion_processor.py +++ b/tests/processor/test_diffusion_processor.py @@ -29,7 +29,7 @@ from lerobot.processor import ( DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, - RenameProcessorStep, + RenameObservationsProcessorStep, TransitionKey, UnnormalizerProcessorStep, ) @@ -89,7 +89,7 @@ def test_make_diffusion_processor_basic(): # Check steps in preprocessor assert len(preprocessor.steps) == 4 - assert isinstance(preprocessor.steps[0], RenameProcessorStep) + assert isinstance(preprocessor.steps[0], RenameObservationsProcessorStep) assert isinstance(preprocessor.steps[1], AddBatchDimensionProcessorStep) assert isinstance(preprocessor.steps[2], DeviceProcessorStep) assert isinstance(preprocessor.steps[3], NormalizerProcessorStep) diff --git a/tests/processor/test_observation_processor.py b/tests/processor/test_observation_processor.py index ddd705442..0b06ea653 100644 --- a/tests/processor/test_observation_processor.py +++ b/tests/processor/test_observation_processor.py @@ -18,7 +18,7 @@ import numpy as np import pytest import torch -from lerobot.configs.types import FeatureType +from lerobot.configs.types import FeatureType, PipelineFeatureType from lerobot.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE from lerobot.processor import TransitionKey, VanillaObservationProcessorStep from tests.conftest import assert_contract_is_typed @@ -412,74 +412,130 @@ def test_equivalent_with_image_dict(): def test_image_processor_features_pixels_to_image(policy_feature_factory): processor = VanillaObservationProcessorStep() features = { - "pixels": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), - "keep": policy_feature_factory(FeatureType.ENV, (1,)), + PipelineFeatureType.OBSERVATION: { + "pixels": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), + "keep": policy_feature_factory(FeatureType.ENV, (1,)), + }, } out = processor.transform_features(features.copy()) - assert OBS_IMAGE in out and out[OBS_IMAGE] == features["pixels"] - assert "pixels" not in out - assert out["keep"] == features["keep"] + assert ( + OBS_IMAGE in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION][OBS_IMAGE] + == features[PipelineFeatureType.OBSERVATION]["pixels"] + ) + assert "pixels" not in out[PipelineFeatureType.OBSERVATION] + assert out[PipelineFeatureType.OBSERVATION]["keep"] == features[PipelineFeatureType.OBSERVATION]["keep"] assert_contract_is_typed(out) def test_image_processor_features_observation_pixels_to_image(policy_feature_factory): processor = VanillaObservationProcessorStep() features = { - "observation.pixels": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), - "keep": policy_feature_factory(FeatureType.ENV, (1,)), + PipelineFeatureType.OBSERVATION: { + "observation.pixels": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), + "keep": policy_feature_factory(FeatureType.ENV, (1,)), + }, } out = processor.transform_features(features.copy()) - assert OBS_IMAGE in out and out[OBS_IMAGE] == features["observation.pixels"] - assert "observation.pixels" not in out - assert out["keep"] == features["keep"] + assert ( + OBS_IMAGE in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION][OBS_IMAGE] + == features[PipelineFeatureType.OBSERVATION]["observation.pixels"] + ) + assert "observation.pixels" not in out[PipelineFeatureType.OBSERVATION] + assert out[PipelineFeatureType.OBSERVATION]["keep"] == features[PipelineFeatureType.OBSERVATION]["keep"] assert_contract_is_typed(out) def test_image_processor_features_multi_camera_and_prefixed(policy_feature_factory): processor = VanillaObservationProcessorStep() features = { - "pixels.front": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), - "pixels.wrist": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), - "observation.pixels.rear": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), - "keep": policy_feature_factory(FeatureType.ENV, (7,)), + PipelineFeatureType.OBSERVATION: { + "pixels.front": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), + "pixels.wrist": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), + "observation.pixels.rear": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), + "keep": policy_feature_factory(FeatureType.ENV, (7,)), + }, } out = processor.transform_features(features.copy()) - assert f"{OBS_IMAGES}.front" in out and out[f"{OBS_IMAGES}.front"] == features["pixels.front"] - assert f"{OBS_IMAGES}.wrist" in out and out[f"{OBS_IMAGES}.wrist"] == features["pixels.wrist"] - assert f"{OBS_IMAGES}.rear" in out and out[f"{OBS_IMAGES}.rear"] == features["observation.pixels.rear"] - assert "pixels.front" not in out and "pixels.wrist" not in out and "observation.pixels.rear" not in out - assert out["keep"] == features["keep"] + assert ( + f"{OBS_IMAGES}.front" in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION][f"{OBS_IMAGES}.front"] + == features[PipelineFeatureType.OBSERVATION]["pixels.front"] + ) + assert ( + f"{OBS_IMAGES}.wrist" in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION][f"{OBS_IMAGES}.wrist"] + == features[PipelineFeatureType.OBSERVATION]["pixels.wrist"] + ) + assert ( + f"{OBS_IMAGES}.rear" in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION][f"{OBS_IMAGES}.rear"] + == features[PipelineFeatureType.OBSERVATION]["observation.pixels.rear"] + ) + assert ( + "pixels.front" not in out[PipelineFeatureType.OBSERVATION] + and "pixels.wrist" not in out[PipelineFeatureType.OBSERVATION] + and "observation.pixels.rear" not in out[PipelineFeatureType.OBSERVATION] + ) + assert out[PipelineFeatureType.OBSERVATION]["keep"] == features[PipelineFeatureType.OBSERVATION]["keep"] assert_contract_is_typed(out) def test_state_processor_features_environment_and_agent_pos(policy_feature_factory): processor = VanillaObservationProcessorStep() features = { - "environment_state": policy_feature_factory(FeatureType.STATE, (3,)), - "agent_pos": policy_feature_factory(FeatureType.STATE, (7,)), - "keep": policy_feature_factory(FeatureType.ENV, (1,)), + PipelineFeatureType.OBSERVATION: { + "environment_state": policy_feature_factory(FeatureType.STATE, (3,)), + "agent_pos": policy_feature_factory(FeatureType.STATE, (7,)), + "keep": policy_feature_factory(FeatureType.ENV, (1,)), + }, } out = processor.transform_features(features.copy()) - assert OBS_ENV_STATE in out and out[OBS_ENV_STATE] == features["environment_state"] - assert OBS_STATE in out and out[OBS_STATE] == features["agent_pos"] - assert "environment_state" not in out and "agent_pos" not in out - assert out["keep"] == features["keep"] + assert ( + OBS_ENV_STATE in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION][OBS_ENV_STATE] + == features[PipelineFeatureType.OBSERVATION]["environment_state"] + ) + assert ( + OBS_STATE in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION][OBS_STATE] + == features[PipelineFeatureType.OBSERVATION]["agent_pos"] + ) + assert ( + "environment_state" not in out[PipelineFeatureType.OBSERVATION] + and "agent_pos" not in out[PipelineFeatureType.OBSERVATION] + ) + assert out[PipelineFeatureType.OBSERVATION]["keep"] == features[PipelineFeatureType.OBSERVATION]["keep"] assert_contract_is_typed(out) def test_state_processor_features_prefixed_inputs(policy_feature_factory): proc = VanillaObservationProcessorStep() features = { - "observation.environment_state": policy_feature_factory(FeatureType.STATE, (2,)), - "observation.agent_pos": policy_feature_factory(FeatureType.STATE, (4,)), + PipelineFeatureType.OBSERVATION: { + "observation.environment_state": policy_feature_factory(FeatureType.STATE, (2,)), + "observation.agent_pos": policy_feature_factory(FeatureType.STATE, (4,)), + }, } out = proc.transform_features(features.copy()) - assert OBS_ENV_STATE in out and out[OBS_ENV_STATE] == features["observation.environment_state"] - assert OBS_STATE in out and out[OBS_STATE] == features["observation.agent_pos"] - assert "environment_state" not in out and "agent_pos" not in out + assert ( + OBS_ENV_STATE in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION][OBS_ENV_STATE] + == features[PipelineFeatureType.OBSERVATION]["observation.environment_state"] + ) + assert ( + OBS_STATE in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION][OBS_STATE] + == features[PipelineFeatureType.OBSERVATION]["observation.agent_pos"] + ) + assert ( + "environment_state" not in out[PipelineFeatureType.OBSERVATION] + and "agent_pos" not in out[PipelineFeatureType.OBSERVATION] + ) assert_contract_is_typed(out) diff --git a/tests/processor/test_pi0_processor.py b/tests/processor/test_pi0_processor.py index 27b354e9d..e83635a48 100644 --- a/tests/processor/test_pi0_processor.py +++ b/tests/processor/test_pi0_processor.py @@ -30,7 +30,7 @@ from lerobot.processor import ( EnvTransition, NormalizerProcessorStep, ProcessorStep, - RenameProcessorStep, + RenameObservationsProcessorStep, TransitionKey, UnnormalizerProcessorStep, ) @@ -115,7 +115,7 @@ def test_make_pi0_processor_basic(): # Check steps in preprocessor assert len(preprocessor.steps) == 6 - assert isinstance(preprocessor.steps[0], RenameProcessorStep) + assert isinstance(preprocessor.steps[0], RenameObservationsProcessorStep) assert isinstance(preprocessor.steps[1], AddBatchDimensionProcessorStep) assert isinstance(preprocessor.steps[2], Pi0NewLineProcessor) # Step 3 would be TokenizerProcessorStep but it's mocked diff --git a/tests/processor/test_pipeline.py b/tests/processor/test_pipeline.py index f4a4c6e44..1e6db6436 100644 --- a/tests/processor/test_pipeline.py +++ b/tests/processor/test_pipeline.py @@ -25,7 +25,7 @@ import pytest import torch import torch.nn as nn -from lerobot.configs.types import FeatureType, PolicyFeature +from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features from lerobot.processor import ( DataProcessorPipeline, @@ -96,7 +96,9 @@ class MockStep(ProcessorStep): def reset(self) -> None: self.counter = 0 - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -118,7 +120,9 @@ class MockStepWithoutOptionalMethods(ProcessorStep): return transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -174,7 +178,9 @@ class MockStepWithTensorState(ProcessorStep): self.running_mean.zero_() self.running_count.zero_() - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -670,7 +676,9 @@ class MockModuleStep(ProcessorStep, nn.Module): self.running_mean.zero_() self.counter = 0 - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -752,7 +760,9 @@ class MockNonModuleStepWithState(ProcessorStep): self.step_count.zero_() self.history.clear() - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -807,7 +817,9 @@ class MockStepWithNonSerializableParam(ProcessorStep): def reset(self) -> None: pass - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -846,7 +858,9 @@ class RegisteredMockStep(ProcessorStep): def reset(self) -> None: pass - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -1406,7 +1420,9 @@ def test_state_file_naming_with_registry(): def load_state_dict(self, state): self.state_tensor = state["state_tensor"] - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -1463,7 +1479,9 @@ def test_override_with_nested_config(): def get_config(self): return {"name": self.name, "simple_param": self.simple_param, "nested_config": self.nested_config} - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -1557,7 +1575,9 @@ def test_override_with_callables(): def get_config(self): return {"name": self.name} - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -1692,7 +1712,9 @@ def test_override_with_device_strings(): def load_state_dict(self, state): self.buffer = state["buffer"] - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # We do not test features here return features @@ -1805,16 +1827,20 @@ class NonCompliantStep: return transition -class NonCallableStep: +class NonCallableStep(ProcessorStep): """Intentionally non-compliant: missing __call__.""" - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features -def test_construction_rejects_step_without_processorstep(): +def test_construction_rejects_step_without_call(): """Test that DataProcessorPipeline rejects steps that don't inherit from ProcessorStep.""" - with pytest.raises(TypeError, match=r"must inherit from ProcessorStep"): + with pytest.raises( + TypeError, match=r"Can't instantiate abstract class NonCallableStep with abstract method __call_" + ): DataProcessorPipeline([NonCallableStep()]) with pytest.raises(TypeError, match=r"must inherit from ProcessorStep"): @@ -1831,8 +1857,10 @@ class FeatureContractAddStep(ProcessorStep): def __call__(self, transition: EnvTransition) -> EnvTransition: return transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - features[self.key] = self.value + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + features[PipelineFeatureType.OBSERVATION][self.key] = self.value return features @@ -1846,8 +1874,12 @@ class FeatureContractMutateStep(ProcessorStep): def __call__(self, transition: EnvTransition) -> EnvTransition: return transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - features[self.key] = self.fn(features.get(self.key)) + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + features[PipelineFeatureType.OBSERVATION][self.key] = self.fn( + features[PipelineFeatureType.OBSERVATION].get(self.key) + ) return features @@ -1858,7 +1890,9 @@ class FeatureContractBadReturnStep(ProcessorStep): def __call__(self, transition: EnvTransition) -> EnvTransition: return transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return ["not-a-dict"] @@ -1871,8 +1905,10 @@ class FeatureContractRemoveStep(ProcessorStep): def __call__(self, transition: EnvTransition) -> EnvTransition: return transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - features.pop(self.key, None) + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + features[PipelineFeatureType.OBSERVATION].pop(self.key, None) return features @@ -1884,17 +1920,22 @@ def test_features_orders_and_merges(policy_feature_factory): FeatureContractAddStep("b", policy_feature_factory(FeatureType.ENV, (2,))), ] ) - out = p.transform_features({}) - - assert out["a"].type == FeatureType.STATE and out["a"].shape == (3,) - assert out["b"].type == FeatureType.ENV and out["b"].shape == (2,) + out = p.transform_features({PipelineFeatureType.OBSERVATION: {}}) + assert out[PipelineFeatureType.OBSERVATION]["a"].type == FeatureType.STATE and out[ + PipelineFeatureType.OBSERVATION + ]["a"].shape == (3,) + assert out[PipelineFeatureType.OBSERVATION]["b"].type == FeatureType.ENV and out[ + PipelineFeatureType.OBSERVATION + ]["b"].shape == (2,) assert_contract_is_typed(out) def test_features_respects_initial_without_mutation(policy_feature_factory): initial = { - "seed": policy_feature_factory(FeatureType.STATE, (7,)), - "nested": policy_feature_factory(FeatureType.ENV, (0,)), + PipelineFeatureType.OBSERVATION: { + "seed": policy_feature_factory(FeatureType.STATE, (7,)), + "nested": policy_feature_factory(FeatureType.ENV, (0,)), + } } p = DataProcessorPipeline( [ @@ -1906,11 +1947,11 @@ def test_features_respects_initial_without_mutation(policy_feature_factory): ) out = p.transform_features(initial_features=initial) - assert out["seed"].shape == (8,) - assert out["nested"].shape == (5,) + assert out[PipelineFeatureType.OBSERVATION]["seed"].shape == (8,) + assert out[PipelineFeatureType.OBSERVATION]["nested"].shape == (5,) # Initial dict must be preserved - assert initial["seed"].shape == (7,) - assert initial["nested"].shape == (0,) + assert initial[PipelineFeatureType.OBSERVATION]["seed"].shape == (7,) + assert initial[PipelineFeatureType.OBSERVATION]["nested"].shape == (0,) assert_contract_is_typed(out) @@ -1923,14 +1964,22 @@ def test_features_execution_order_tracking(): def __call__(self, transition: EnvTransition) -> EnvTransition: return transition - def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: code = {"A": 1, "B": 2, "C": 3}[self.label] - pf = features.get("order", PolicyFeature(type=FeatureType.ENV, shape=())) - features["order"] = PolicyFeature(type=pf.type, shape=pf.shape + (code,)) + pf = features[PipelineFeatureType.OBSERVATION].get( + "order", PolicyFeature(type=FeatureType.ENV, shape=()) + ) + features[PipelineFeatureType.OBSERVATION]["order"] = PolicyFeature( + type=pf.type, shape=pf.shape + (code,) + ) return features - out = DataProcessorPipeline([Track("A"), Track("B"), Track("C")]).transform_features({}) - assert out["order"].shape == (1, 2, 3) + out = DataProcessorPipeline([Track("A"), Track("B"), Track("C")]).transform_features( + initial_features={PipelineFeatureType.OBSERVATION: {}} + ) + assert out[PipelineFeatureType.OBSERVATION]["order"].shape == (1, 2, 3) def test_features_remove_key(policy_feature_factory): @@ -1940,18 +1989,23 @@ def test_features_remove_key(policy_feature_factory): FeatureContractRemoveStep("a"), ] ) - out = p.transform_features({}) - assert "a" not in out + out = p.transform_features({PipelineFeatureType.OBSERVATION: {}}) + assert "a" not in out[PipelineFeatureType.OBSERVATION] def test_features_remove_from_initial(policy_feature_factory): initial = { - "keep": policy_feature_factory(FeatureType.STATE, (1,)), - "drop": policy_feature_factory(FeatureType.STATE, (1,)), + PipelineFeatureType.OBSERVATION: { + "keep": policy_feature_factory(FeatureType.STATE, (1,)), + "drop": policy_feature_factory(FeatureType.STATE, (1,)), + }, } p = DataProcessorPipeline([FeatureContractRemoveStep("drop")]) out = p.transform_features(initial_features=initial) - assert "drop" not in out and out["keep"] == initial["keep"] + assert ( + "drop" not in out[PipelineFeatureType.OBSERVATION] + and out[PipelineFeatureType.OBSERVATION]["keep"] == initial[PipelineFeatureType.OBSERVATION]["keep"] + ) @dataclass @@ -1961,13 +2015,15 @@ class AddActionEEAndJointFeatures(ProcessorStep): def __call__(self, tr): return tr - def transform_features(self, features: dict) -> dict: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # EE features - features["action.ee.x"] = float - features["action.ee.y"] = float + features[PipelineFeatureType.ACTION]["action.ee.x"] = float + features[PipelineFeatureType.ACTION]["action.ee.y"] = float # JOINT features - features["action.j1.pos"] = float - features["action.j2.pos"] = float + features[PipelineFeatureType.ACTION]["action.j1.pos"] = float + features[PipelineFeatureType.ACTION]["action.j2.pos"] = float return features @@ -1981,18 +2037,20 @@ class AddObservationStateFeatures(ProcessorStep): def __call__(self, tr): return tr - def transform_features(self, features: dict) -> dict: + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: # State features (mix EE and a joint state) - features["observation.state.ee.x"] = float - features["observation.state.j1.pos"] = float + features[PipelineFeatureType.OBSERVATION]["observation.state.ee.x"] = float + features[PipelineFeatureType.OBSERVATION]["observation.state.j1.pos"] = float if self.add_front_image: - features["observation.images.front"] = self.front_image_shape + features[PipelineFeatureType.OBSERVATION]["observation.images.front"] = self.front_image_shape return features def test_aggregate_joint_action_only(): rp = DataProcessorPipeline([AddActionEEAndJointFeatures()]) - initial = {"front": (480, 640, 3)} + initial = {PipelineFeatureType.OBSERVATION: {"front": (480, 640, 3)}, PipelineFeatureType.ACTION: {}} out = aggregate_pipeline_dataset_features( pipeline=rp, @@ -2014,7 +2072,7 @@ def test_aggregate_ee_action_and_observation_with_videos(): out = aggregate_pipeline_dataset_features( pipeline=rp, - initial_features=initial, + initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}}, use_videos=True, patterns=["action.ee", "observation.state"], ) @@ -2042,7 +2100,7 @@ def test_aggregate_both_action_types(): rp = DataProcessorPipeline([AddActionEEAndJointFeatures()]) out = aggregate_pipeline_dataset_features( pipeline=rp, - initial_features={}, + initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: {}}, use_videos=True, patterns=["action.ee", "action.j1", "action.j2.pos"], ) @@ -2059,7 +2117,7 @@ def test_aggregate_images_when_use_videos_false(): out = aggregate_pipeline_dataset_features( pipeline=rp, - initial_features=initial, + initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial}, use_videos=False, # expect "image" dtype patterns=None, ) @@ -2076,7 +2134,7 @@ def test_aggregate_images_when_use_videos_true(): out = aggregate_pipeline_dataset_features( pipeline=rp, - initial_features=initial, + initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}}, use_videos=True, patterns=None, ) @@ -2100,7 +2158,7 @@ def test_initial_camera_not_overridden_by_step_image(): out = aggregate_pipeline_dataset_features( pipeline=rp, - initial_features=initial, + initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial}, use_videos=True, patterns=["observation.images.front"], ) diff --git a/tests/processor/test_rename_processor.py b/tests/processor/test_rename_processor.py index 4d345daba..b04ee1c6e 100644 --- a/tests/processor/test_rename_processor.py +++ b/tests/processor/test_rename_processor.py @@ -19,11 +19,11 @@ from pathlib import Path import numpy as np import torch -from lerobot.configs.types import FeatureType +from lerobot.configs.types import FeatureType, PipelineFeatureType from lerobot.processor import ( DataProcessorPipeline, ProcessorStepRegistry, - RenameProcessorStep, + RenameObservationsProcessorStep, TransitionKey, ) from lerobot.processor.rename_processor import rename_stats @@ -51,7 +51,7 @@ def test_basic_renaming(): "old_key1": "new_key1", "old_key2": "new_key2", } - processor = RenameProcessorStep(rename_map=rename_map) + processor = RenameObservationsProcessorStep(rename_map=rename_map) observation = { "old_key1": torch.tensor([1.0, 2.0]), @@ -79,7 +79,7 @@ def test_basic_renaming(): def test_empty_rename_map(): """Test processor with empty rename map (should pass through unchanged).""" - processor = RenameProcessorStep(rename_map={}) + processor = RenameObservationsProcessorStep(rename_map={}) observation = { "key1": torch.tensor([1.0]), @@ -98,7 +98,7 @@ def test_empty_rename_map(): def test_none_observation(): """Test processor with None observation.""" - processor = RenameProcessorStep(rename_map={"old": "new"}) + processor = RenameObservationsProcessorStep(rename_map={"old": "new"}) transition = create_transition() result = processor(transition) @@ -113,7 +113,7 @@ def test_overlapping_rename(): "a": "b", "b": "c", # This creates a potential conflict } - processor = RenameProcessorStep(rename_map=rename_map) + processor = RenameObservationsProcessorStep(rename_map=rename_map) observation = { "a": 1, @@ -138,7 +138,7 @@ def test_partial_rename(): "observation.state": "observation.proprio_state", "pixels": "observation.image", } - processor = RenameProcessorStep(rename_map=rename_map) + processor = RenameObservationsProcessorStep(rename_map=rename_map) observation = { "observation.state": torch.randn(10), @@ -168,7 +168,7 @@ def test_get_config(): "old1": "new1", "old2": "new2", } - processor = RenameProcessorStep(rename_map=rename_map) + processor = RenameObservationsProcessorStep(rename_map=rename_map) config = processor.get_config() assert config == {"rename_map": rename_map} @@ -176,7 +176,7 @@ def test_get_config(): def test_state_dict(): """Test state dict (should be empty for RenameProcessorStep).""" - processor = RenameProcessorStep(rename_map={"old": "new"}) + processor = RenameObservationsProcessorStep(rename_map={"old": "new"}) state = processor.state_dict() assert state == {} @@ -191,7 +191,7 @@ def test_integration_with_robot_processor(): "agent_pos": "observation.state", "pixels": "observation.image", } - rename_processor = RenameProcessorStep(rename_map=rename_map) + rename_processor = RenameObservationsProcessorStep(rename_map=rename_map) pipeline = DataProcessorPipeline([rename_processor], to_transition=lambda x: x, to_output=lambda x: x) @@ -225,7 +225,7 @@ def test_save_and_load_pretrained(): "old_state": "observation.state", "old_image": "observation.image", } - processor = RenameProcessorStep(rename_map=rename_map) + processor = RenameObservationsProcessorStep(rename_map=rename_map) pipeline = DataProcessorPipeline([processor], name="TestRenameProcessorStep") with tempfile.TemporaryDirectory() as tmp_dir: @@ -252,7 +252,7 @@ def test_save_and_load_pretrained(): # Check that loaded processor works correctly loaded_processor = loaded_pipeline.steps[0] - assert isinstance(loaded_processor, RenameProcessorStep) + assert isinstance(loaded_processor, RenameObservationsProcessorStep) assert loaded_processor.rename_map == rename_map # Test functionality after loading @@ -271,21 +271,21 @@ def test_save_and_load_pretrained(): def test_registry_functionality(): """Test that RenameProcessorStep is properly registered.""" # Check that it's registered - assert "rename_processor" in ProcessorStepRegistry.list() + assert "rename_observations_processor" in ProcessorStepRegistry.list() # Get from registry - retrieved_class = ProcessorStepRegistry.get("rename_processor") - assert retrieved_class is RenameProcessorStep + retrieved_class = ProcessorStepRegistry.get("rename_observations_processor") + assert retrieved_class is RenameObservationsProcessorStep # Create instance from registry instance = retrieved_class(rename_map={"old": "new"}) - assert isinstance(instance, RenameProcessorStep) + assert isinstance(instance, RenameObservationsProcessorStep) assert instance.rename_map == {"old": "new"} def test_registry_based_save_load(): """Test save/load using registry name instead of module path.""" - processor = RenameProcessorStep(rename_map={"key1": "renamed_key1"}) + processor = RenameObservationsProcessorStep(rename_map={"key1": "renamed_key1"}) pipeline = DataProcessorPipeline([processor], to_transition=lambda x: x, to_output=lambda x: x) with tempfile.TemporaryDirectory() as tmp_dir: @@ -299,20 +299,20 @@ def test_registry_based_save_load(): config = json.load(f) assert "registry_name" in config["steps"][0] - assert config["steps"][0]["registry_name"] == "rename_processor" + assert config["steps"][0]["registry_name"] == "rename_observations_processor" assert "class" not in config["steps"][0] # Should use registry, not module path # Load should work loaded_pipeline = DataProcessorPipeline.from_pretrained(tmp_dir) loaded_processor = loaded_pipeline.steps[0] - assert isinstance(loaded_processor, RenameProcessorStep) + assert isinstance(loaded_processor, RenameObservationsProcessorStep) assert loaded_processor.rename_map == {"key1": "renamed_key1"} def test_chained_rename_processors(): """Test multiple RenameProcessorSteps in a pipeline.""" # First processor: rename raw keys to intermediate format - processor1 = RenameProcessorStep( + processor1 = RenameObservationsProcessorStep( rename_map={ "pos": "agent_position", "img": "camera_image", @@ -320,7 +320,7 @@ def test_chained_rename_processors(): ) # Second processor: rename to final format - processor2 = RenameProcessorStep( + processor2 = RenameObservationsProcessorStep( rename_map={ "agent_position": "observation.state", "camera_image": "observation.image", @@ -365,7 +365,7 @@ def test_nested_observation_rename(): "observation.images.right": "observation.camera.right_view", "observation.proprio": "observation.proprioception", } - processor = RenameProcessorStep(rename_map=rename_map) + processor = RenameObservationsProcessorStep(rename_map=rename_map) observation = { "observation.images.left": torch.randn(3, 64, 64), @@ -395,7 +395,7 @@ def test_nested_observation_rename(): def test_value_types_preserved(): """Test that various value types are preserved during renaming.""" rename_map = {"old_tensor": "new_tensor", "old_array": "new_array", "old_scalar": "new_scalar"} - processor = RenameProcessorStep(rename_map=rename_map) + processor = RenameObservationsProcessorStep(rename_map=rename_map) tensor_value = torch.randn(3, 3) array_value = np.random.rand(2, 2) @@ -423,59 +423,75 @@ def test_value_types_preserved(): def test_features_basic_renaming(policy_feature_factory): - processor = RenameProcessorStep(rename_map={"a": "x", "b": "y"}) + processor = RenameObservationsProcessorStep(rename_map={"a": "x", "b": "y"}) features = { - "a": policy_feature_factory(FeatureType.STATE, (2,)), - "b": policy_feature_factory(FeatureType.ACTION, (3,)), - "c": policy_feature_factory(FeatureType.ENV, (1,)), + PipelineFeatureType.OBSERVATION: { + "a": policy_feature_factory(FeatureType.VISUAL, (2,)), + "b": policy_feature_factory(FeatureType.VISUAL, (3,)), + "c": policy_feature_factory(FeatureType.VISUAL, (1,)), + }, } out = processor.transform_features(features.copy()) # Values preserved and typed - assert out["x"] == features["a"] - assert out["y"] == features["b"] - assert out["c"] == features["c"] + assert out[PipelineFeatureType.OBSERVATION]["x"] == features[PipelineFeatureType.OBSERVATION]["a"] + assert out[PipelineFeatureType.OBSERVATION]["y"] == features[PipelineFeatureType.OBSERVATION]["b"] + assert out[PipelineFeatureType.OBSERVATION]["c"] == features[PipelineFeatureType.OBSERVATION]["c"] assert_contract_is_typed(out) # Input not mutated - assert set(features) == {"a", "b", "c"} + assert set(features[PipelineFeatureType.OBSERVATION]) == {"a", "b", "c"} def test_features_overlapping_keys(policy_feature_factory): # Overlapping renames: both 'a' and 'b' exist. 'a'->'b', 'b'->'c' - processor = RenameProcessorStep(rename_map={"a": "b", "b": "c"}) + processor = RenameObservationsProcessorStep(rename_map={"a": "b", "b": "c"}) features = { - "a": policy_feature_factory(FeatureType.STATE, (1,)), - "b": policy_feature_factory(FeatureType.STATE, (2,)), + PipelineFeatureType.OBSERVATION: { + "a": policy_feature_factory(FeatureType.VISUAL, (1,)), + "b": policy_feature_factory(FeatureType.VISUAL, (2,)), + }, } out = processor.transform_features(features) - assert set(out) == {"b", "c"} - assert out["b"] == features["a"] # 'a' renamed to'b' - assert out["c"] == features["b"] # 'b' renamed to 'c' + assert set(out[PipelineFeatureType.OBSERVATION]) == {"b", "c"} + assert ( + out[PipelineFeatureType.OBSERVATION]["b"] == features[PipelineFeatureType.OBSERVATION]["a"] + ) # 'a' renamed to'b' + assert ( + out[PipelineFeatureType.OBSERVATION]["c"] == features[PipelineFeatureType.OBSERVATION]["b"] + ) # 'b' renamed to 'c' assert_contract_is_typed(out) def test_features_chained_processors(policy_feature_factory): # Chain two rename processors at the contract level - processor1 = RenameProcessorStep(rename_map={"pos": "agent_position", "img": "camera_image"}) - processor2 = RenameProcessorStep( + processor1 = RenameObservationsProcessorStep(rename_map={"pos": "agent_position", "img": "camera_image"}) + processor2 = RenameObservationsProcessorStep( rename_map={"agent_position": "observation.state", "camera_image": "observation.image"} ) pipeline = DataProcessorPipeline([processor1, processor2]) spec = { - "pos": policy_feature_factory(FeatureType.STATE, (7,)), - "img": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), - "extra": policy_feature_factory(FeatureType.ENV, (1,)), + PipelineFeatureType.OBSERVATION: { + "pos": policy_feature_factory(FeatureType.VISUAL, (7,)), + "img": policy_feature_factory(FeatureType.VISUAL, (3, 64, 64)), + "extra": policy_feature_factory(FeatureType.VISUAL, (1,)), + }, } out = pipeline.transform_features(initial_features=spec) - assert set(out) == {"observation.state", "observation.image", "extra"} - assert out["observation.state"] == spec["pos"] - assert out["observation.image"] == spec["img"] - assert out["extra"] == spec["extra"] + assert set(out[PipelineFeatureType.OBSERVATION]) == {"observation.state", "observation.image", "extra"} + assert ( + out[PipelineFeatureType.OBSERVATION]["observation.state"] + == spec[PipelineFeatureType.OBSERVATION]["pos"] + ) + assert ( + out[PipelineFeatureType.OBSERVATION]["observation.image"] + == spec[PipelineFeatureType.OBSERVATION]["img"] + ) + assert out[PipelineFeatureType.OBSERVATION]["extra"] == spec[PipelineFeatureType.OBSERVATION]["extra"] assert_contract_is_typed(out) diff --git a/tests/processor/test_sac_processor.py b/tests/processor/test_sac_processor.py index 522411e0f..3e26172c3 100644 --- a/tests/processor/test_sac_processor.py +++ b/tests/processor/test_sac_processor.py @@ -29,7 +29,7 @@ from lerobot.processor import ( DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, - RenameProcessorStep, + RenameObservationsProcessorStep, TransitionKey, UnnormalizerProcessorStep, ) @@ -91,7 +91,7 @@ def test_make_sac_processor_basic(): # Check steps in preprocessor assert len(preprocessor.steps) == 4 - assert isinstance(preprocessor.steps[0], RenameProcessorStep) + assert isinstance(preprocessor.steps[0], RenameObservationsProcessorStep) assert isinstance(preprocessor.steps[1], AddBatchDimensionProcessorStep) assert isinstance(preprocessor.steps[2], DeviceProcessorStep) assert isinstance(preprocessor.steps[3], NormalizerProcessorStep) diff --git a/tests/processor/test_smolvla_processor.py b/tests/processor/test_smolvla_processor.py index 5d2e9c7ce..317b0feec 100644 --- a/tests/processor/test_smolvla_processor.py +++ b/tests/processor/test_smolvla_processor.py @@ -20,7 +20,7 @@ from unittest.mock import patch import pytest import torch -from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.configs.types import FeatureType, NormalizationMode, PipelineFeatureType, PolicyFeature from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig from lerobot.policies.smolvla.processor_smolvla import ( @@ -33,7 +33,7 @@ from lerobot.processor import ( EnvTransition, NormalizerProcessorStep, ProcessorStep, - RenameProcessorStep, + RenameObservationsProcessorStep, TransitionKey, UnnormalizerProcessorStep, ) @@ -122,7 +122,7 @@ def test_make_smolvla_processor_basic(): # Check steps in preprocessor assert len(preprocessor.steps) == 6 - assert isinstance(preprocessor.steps[0], RenameProcessorStep) + assert isinstance(preprocessor.steps[0], RenameObservationsProcessorStep) assert isinstance(preprocessor.steps[1], AddBatchDimensionProcessorStep) assert isinstance(preprocessor.steps[2], SmolVLANewLineProcessor) # Step 3 would be TokenizerProcessorStep but it's mocked @@ -400,7 +400,7 @@ def test_smolvla_newline_processor_transform_features(): # Test transform_features features = { - OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)), + PipelineFeatureType.OBSERVATION: {OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,))}, } result = processor.transform_features(features) assert result == features # Should return unchanged diff --git a/tests/processor/test_tdmpc_processor.py b/tests/processor/test_tdmpc_processor.py index bc9ff2bdc..4f08ef289 100644 --- a/tests/processor/test_tdmpc_processor.py +++ b/tests/processor/test_tdmpc_processor.py @@ -29,7 +29,7 @@ from lerobot.processor import ( DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, - RenameProcessorStep, + RenameObservationsProcessorStep, TransitionKey, UnnormalizerProcessorStep, ) @@ -94,7 +94,7 @@ def test_make_tdmpc_processor_basic(): # Check steps in preprocessor assert len(preprocessor.steps) == 4 - assert isinstance(preprocessor.steps[0], RenameProcessorStep) + assert isinstance(preprocessor.steps[0], RenameObservationsProcessorStep) assert isinstance(preprocessor.steps[1], AddBatchDimensionProcessorStep) assert isinstance(preprocessor.steps[2], DeviceProcessorStep) assert isinstance(preprocessor.steps[3], NormalizerProcessorStep) diff --git a/tests/processor/test_tokenizer_processor.py b/tests/processor/test_tokenizer_processor.py index 5a3c97240..8055a9d4b 100644 --- a/tests/processor/test_tokenizer_processor.py +++ b/tests/processor/test_tokenizer_processor.py @@ -8,7 +8,7 @@ from unittest.mock import patch import pytest import torch -from lerobot.configs.types import FeatureType, PolicyFeature +from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.constants import OBS_LANGUAGE from lerobot.processor import DataProcessorPipeline, TokenizerProcessorStep, TransitionKey from tests.utils import require_package @@ -512,23 +512,27 @@ def test_features_basic(): processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=128) input_features = { - "observation.state": PolicyFeature(type=FeatureType.STATE, shape=(10,)), - "action": PolicyFeature(type=FeatureType.ACTION, shape=(5,)), + PipelineFeatureType.OBSERVATION: { + "observation.state": PolicyFeature(type=FeatureType.STATE, shape=(10,)) + }, + PipelineFeatureType.ACTION: {"action": PolicyFeature(type=FeatureType.ACTION, shape=(5,))}, } output_features = processor.transform_features(input_features) # Check that original features are preserved - assert "observation.state" in output_features - assert "action" in output_features + assert "observation.state" in output_features[PipelineFeatureType.OBSERVATION] + assert "action" in output_features[PipelineFeatureType.ACTION] # Check that tokenized features are added - assert f"{OBS_LANGUAGE}.tokens" in output_features - assert f"{OBS_LANGUAGE}.attention_mask" in output_features + assert f"{OBS_LANGUAGE}.tokens" in output_features[PipelineFeatureType.OBSERVATION] + assert f"{OBS_LANGUAGE}.attention_mask" in output_features[PipelineFeatureType.OBSERVATION] # Check feature properties - tokens_feature = output_features[f"{OBS_LANGUAGE}.tokens"] - attention_mask_feature = output_features[f"{OBS_LANGUAGE}.attention_mask"] + tokens_feature = output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask_feature = output_features[PipelineFeatureType.OBSERVATION][ + f"{OBS_LANGUAGE}.attention_mask" + ] assert tokens_feature.type == FeatureType.LANGUAGE assert tokens_feature.shape == (128,) @@ -542,15 +546,17 @@ def test_features_with_custom_max_length(): mock_tokenizer = MockTokenizer(vocab_size=100) processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=64) - input_features = {} + input_features = {PipelineFeatureType.OBSERVATION: {}} output_features = processor.transform_features(input_features) # Check that features use correct max_length - assert f"{OBS_LANGUAGE}.tokens" in output_features - assert f"{OBS_LANGUAGE}.attention_mask" in output_features + assert f"{OBS_LANGUAGE}.tokens" in output_features[PipelineFeatureType.OBSERVATION] + assert f"{OBS_LANGUAGE}.attention_mask" in output_features[PipelineFeatureType.OBSERVATION] - tokens_feature = output_features[f"{OBS_LANGUAGE}.tokens"] - attention_mask_feature = output_features[f"{OBS_LANGUAGE}.attention_mask"] + tokens_feature = output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask_feature = output_features[PipelineFeatureType.OBSERVATION][ + f"{OBS_LANGUAGE}.attention_mask" + ] assert tokens_feature.shape == (64,) assert attention_mask_feature.shape == (64,) @@ -563,15 +569,19 @@ def test_features_existing_features(): processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=256) input_features = { - f"{OBS_LANGUAGE}.tokens": PolicyFeature(type=FeatureType.LANGUAGE, shape=(100,)), - f"{OBS_LANGUAGE}.attention_mask": PolicyFeature(type=FeatureType.LANGUAGE, shape=(100,)), + PipelineFeatureType.OBSERVATION: { + f"{OBS_LANGUAGE}.tokens": PolicyFeature(type=FeatureType.LANGUAGE, shape=(100,)), + f"{OBS_LANGUAGE}.attention_mask": PolicyFeature(type=FeatureType.LANGUAGE, shape=(100,)), + } } output_features = processor.transform_features(input_features) # Should not overwrite existing features - assert output_features[f"{OBS_LANGUAGE}.tokens"].shape == (100,) # Original shape preserved - assert output_features[f"{OBS_LANGUAGE}.attention_mask"].shape == (100,) + assert output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.tokens"].shape == ( + 100, + ) # Original shape preserved + assert output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"].shape == (100,) @require_package("transformers") diff --git a/tests/processor/test_vqbet_processor.py b/tests/processor/test_vqbet_processor.py index bc24c5e0f..c05fb15fe 100644 --- a/tests/processor/test_vqbet_processor.py +++ b/tests/processor/test_vqbet_processor.py @@ -29,7 +29,7 @@ from lerobot.processor import ( DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, - RenameProcessorStep, + RenameObservationsProcessorStep, TransitionKey, UnnormalizerProcessorStep, ) @@ -94,7 +94,7 @@ def test_make_vqbet_processor_basic(): # Check steps in preprocessor assert len(preprocessor.steps) == 4 - assert isinstance(preprocessor.steps[0], RenameProcessorStep) + assert isinstance(preprocessor.steps[0], RenameObservationsProcessorStep) assert isinstance(preprocessor.steps[1], AddBatchDimensionProcessorStep) assert isinstance(preprocessor.steps[2], DeviceProcessorStep) assert isinstance(preprocessor.steps[3], NormalizerProcessorStep)