From 8c796b39f502c7cd326e73e6f848ccea153d4511 Mon Sep 17 00:00:00 2001 From: Steven Palma Date: Wed, 3 Sep 2025 17:13:16 +0200 Subject: [PATCH] chore(processor): rename RobotProcessor -> DataProcessorPipeline (#1850) --- examples/phone_to_so100/evaluate.py | 6 +- examples/phone_to_so100/record.py | 8 +- examples/phone_to_so100/replay.py | 4 +- examples/phone_to_so100/teleoperate.py | 4 +- src/lerobot/datasets/pipeline_features.py | 4 +- src/lerobot/policies/act/processor_act.py | 8 +- .../policies/diffusion/processor_diffusion.py | 8 +- src/lerobot/policies/factory.py | 8 +- src/lerobot/policies/pi0/processor_pi0.py | 8 +- .../policies/pi0fast/processor_pi0fast.py | 8 +- src/lerobot/policies/sac/processor_sac.py | 8 +- .../sac/reward_model/processor_classifier.py | 8 +- .../policies/smolvla/processor_smolvla.py | 8 +- src/lerobot/policies/tdmpc/processor_tdmpc.py | 8 +- src/lerobot/policies/vqbet/processor_vqbet.py | 8 +- src/lerobot/processor/__init__.py | 4 +- .../processor/migrate_policy_normalization.py | 6 +- src/lerobot/processor/normalize_processor.py | 6 +- src/lerobot/processor/pipeline.py | 40 +-- src/lerobot/record.py | 18 +- src/lerobot/replay.py | 6 +- src/lerobot/scripts/rl/gym_manipulator.py | 20 +- src/lerobot/teleoperate.py | 20 +- src/lerobot/utils/control_utils.py | 6 +- src/lerobot/utils/train_utils.py | 6 +- tests/processor/test_act_processor.py | 4 +- tests/processor/test_batch_conversion.py | 6 +- tests/processor/test_batch_processor.py | 12 +- tests/processor/test_classifier_processor.py | 8 +- tests/processor/test_device_processor.py | 12 +- tests/processor/test_diffusion_processor.py | 8 +- tests/processor/test_normalize_processor.py | 26 +- tests/processor/test_pipeline.py | 246 +++++++++--------- tests/processor/test_rename_processor.py | 20 +- tests/processor/test_sac_processor.py | 8 +- tests/processor/test_tdmpc_processor.py | 8 +- tests/processor/test_tokenizer_processor.py | 20 +- tests/processor/test_vqbet_processor.py | 8 +- 38 files changed, 326 insertions(+), 298 deletions(-) diff --git a/examples/phone_to_so100/evaluate.py b/examples/phone_to_so100/evaluate.py index baef233c5..67fa86978 100644 --- a/examples/phone_to_so100/evaluate.py +++ b/examples/phone_to_so100/evaluate.py @@ -21,7 +21,7 @@ from lerobot.datasets.utils import merge_features from lerobot.model.kinematics import RobotKinematics from lerobot.policies.act.modeling_act import ACTPolicy from lerobot.policies.factory import make_pre_post_processors -from lerobot.processor import RobotProcessor +from lerobot.processor import DataProcessorPipeline from lerobot.processor.converters import ( to_output_robot_action, to_transition_robot_observation, @@ -65,7 +65,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert ee pose action to joint action -robot_ee_to_joints_processor = RobotProcessor( +robot_ee_to_joints_processor = DataProcessorPipeline( steps=[ AddRobotObservationAsComplimentaryData(robot=robot), InverseKinematicsEEToJoints( @@ -79,7 +79,7 @@ robot_ee_to_joints_processor = RobotProcessor( ) # Build pipeline to convert joint observation to ee pose observation -robot_joints_to_ee_pose_processor = RobotProcessor( +robot_joints_to_ee_pose_processor = DataProcessorPipeline( steps=[ ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ], diff --git a/examples/phone_to_so100/record.py b/examples/phone_to_so100/record.py index bde2ebc2d..ee72ca5d2 100644 --- a/examples/phone_to_so100/record.py +++ b/examples/phone_to_so100/record.py @@ -20,7 +20,7 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features from lerobot.datasets.utils import merge_features from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import RobotProcessor +from lerobot.processor import DataProcessorPipeline from lerobot.processor.converters import ( to_output_robot_action, to_transition_robot_observation, @@ -73,7 +73,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert phone action to ee pose action -phone_to_robot_ee_pose_processor = RobotProcessor( +phone_to_robot_ee_pose_processor = DataProcessorPipeline( steps=[ MapPhoneActionToRobotAction(platform=teleop_config.phone_os), AddRobotObservationAsComplimentaryData(robot=robot), @@ -93,7 +93,7 @@ phone_to_robot_ee_pose_processor = RobotProcessor( ) # Build pipeline to convert ee pose action to joint action -robot_ee_to_joints_processor = RobotProcessor( +robot_ee_to_joints_processor = DataProcessorPipeline( steps=[ InverseKinematicsEEToJoints( kinematics=kinematics_solver, @@ -110,7 +110,7 @@ robot_ee_to_joints_processor = RobotProcessor( ) # Build pipeline to convert joint observation to ee pose observation -robot_joints_to_ee_pose = RobotProcessor( +robot_joints_to_ee_pose = DataProcessorPipeline( steps=[ ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ], diff --git a/examples/phone_to_so100/replay.py b/examples/phone_to_so100/replay.py index f7c0d395f..fe2facb5a 100644 --- a/examples/phone_to_so100/replay.py +++ b/examples/phone_to_so100/replay.py @@ -19,7 +19,7 @@ import time from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import RobotProcessor +from lerobot.processor import DataProcessorPipeline from lerobot.processor.converters import to_output_robot_action, to_transition_teleop_action from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig from lerobot.robots.so100_follower.robot_kinematic_processor import ( @@ -50,7 +50,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert ee pose action to joint action -robot_ee_to_joints_processor = RobotProcessor( +robot_ee_to_joints_processor = DataProcessorPipeline( steps=[ AddRobotObservationAsComplimentaryData(robot=robot), InverseKinematicsEEToJoints( diff --git a/examples/phone_to_so100/teleoperate.py b/examples/phone_to_so100/teleoperate.py index e937265a6..d7cc4a457 100644 --- a/examples/phone_to_so100/teleoperate.py +++ b/examples/phone_to_so100/teleoperate.py @@ -16,7 +16,7 @@ import time from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import RobotProcessor +from lerobot.processor import DataProcessorPipeline from lerobot.processor.converters import to_output_robot_action, to_transition_teleop_action from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig from lerobot.robots.so100_follower.robot_kinematic_processor import ( @@ -49,7 +49,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert phone action to ee pose action to joint action -phone_to_robot_joints_processor = RobotProcessor( +phone_to_robot_joints_processor = DataProcessorPipeline( steps=[ MapPhoneActionToRobotAction(platform=teleop_config.phone_os), AddRobotObservationAsComplimentaryData(robot=robot), diff --git a/src/lerobot/datasets/pipeline_features.py b/src/lerobot/datasets/pipeline_features.py index 98e936a24..4f0aa5704 100644 --- a/src/lerobot/datasets/pipeline_features.py +++ b/src/lerobot/datasets/pipeline_features.py @@ -17,11 +17,11 @@ from typing import Any from lerobot.constants import ACTION, OBS_IMAGES, OBS_STATE from lerobot.datasets.utils import hw_to_dataset_features -from lerobot.processor import RobotProcessor +from lerobot.processor import DataProcessorPipeline def aggregate_pipeline_dataset_features( - pipeline: RobotProcessor, + pipeline: DataProcessorPipeline, initial_features: dict[str, Any], *, use_videos: bool = True, diff --git a/src/lerobot/policies/act/processor_act.py b/src/lerobot/policies/act/processor_act.py index 00ffbef9d..0d7e466f3 100644 --- a/src/lerobot/policies/act/processor_act.py +++ b/src/lerobot/policies/act/processor_act.py @@ -18,11 +18,11 @@ import torch from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_NAME from lerobot.policies.act.configuration_act import ACTConfig from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, ProcessorKwargs, RenameProcessor, - RobotProcessor, ToBatchProcessor, UnnormalizerProcessor, ) @@ -33,7 +33,7 @@ def make_act_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -57,12 +57,12 @@ def make_act_pre_post_processors( ] return ( - RobotProcessor( + DataProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - RobotProcessor( + DataProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/diffusion/processor_diffusion.py b/src/lerobot/policies/diffusion/processor_diffusion.py index b459349d0..ba57f6dba 100644 --- a/src/lerobot/policies/diffusion/processor_diffusion.py +++ b/src/lerobot/policies/diffusion/processor_diffusion.py @@ -19,11 +19,11 @@ import torch from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_NAME from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, ProcessorKwargs, RenameProcessor, - RobotProcessor, ToBatchProcessor, UnnormalizerProcessor, ) @@ -34,7 +34,7 @@ def make_diffusion_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -57,12 +57,12 @@ def make_diffusion_pre_post_processors( ), ] return ( - RobotProcessor( + DataProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - RobotProcessor( + DataProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/factory.py b/src/lerobot/policies/factory.py index 0bb91eeac..15c2a3521 100644 --- a/src/lerobot/policies/factory.py +++ b/src/lerobot/policies/factory.py @@ -38,7 +38,7 @@ from lerobot.policies.sac.reward_model.configuration_classifier import RewardCla from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig -from lerobot.processor import ProcessorKwargs, RobotProcessor +from lerobot.processor import DataProcessorPipeline, ProcessorKwargs def get_policy_class(name: str) -> type[PreTrainedPolicy]: @@ -122,7 +122,7 @@ def make_pre_post_processors( policy_cfg: PreTrainedConfig, pretrained_path: str | None = None, **kwargs: Unpack[ProcessorConfigKwargs], -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: """Make a processor instance for a given policy type. This function creates the appropriate processor configuration based on the policy type. @@ -146,14 +146,14 @@ def make_pre_post_processors( postprocessor_kwargs = kwargs.get("postprocessor_kwargs", {}) return ( - RobotProcessor.from_pretrained( + DataProcessorPipeline.from_pretrained( pretrained_model_name_or_path=pretrained_path, config_filename=kwargs.get("preprocessor_config_filename", "robot_preprocessor.json"), overrides=kwargs.get("preprocessor_overrides", {}), to_transition=preprocessor_kwargs.get("to_transition"), to_output=preprocessor_kwargs.get("to_output"), ), - RobotProcessor.from_pretrained( + DataProcessorPipeline.from_pretrained( pretrained_model_name_or_path=pretrained_path, config_filename=kwargs.get("postprocessor_config_filename", "robot_postprocessor.json"), overrides=kwargs.get("postprocessor_overrides", {}), diff --git a/src/lerobot/policies/pi0/processor_pi0.py b/src/lerobot/policies/pi0/processor_pi0.py index 56854273c..52d3ccf3d 100644 --- a/src/lerobot/policies/pi0/processor_pi0.py +++ b/src/lerobot/policies/pi0/processor_pi0.py @@ -22,13 +22,13 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N from lerobot.policies.pi0.configuration_pi0 import PI0Config from lerobot.processor import ( ComplementaryDataProcessor, + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, ProcessorKwargs, ProcessorStep, ProcessorStepRegistry, RenameProcessor, - RobotProcessor, ToBatchProcessor, TokenizerProcessor, UnnormalizerProcessor, @@ -72,7 +72,7 @@ def make_pi0_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -105,12 +105,12 @@ def make_pi0_pre_post_processors( ] return ( - RobotProcessor( + DataProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - RobotProcessor( + DataProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/pi0fast/processor_pi0fast.py b/src/lerobot/policies/pi0fast/processor_pi0fast.py index 951cd3381..a4c971626 100644 --- a/src/lerobot/policies/pi0fast/processor_pi0fast.py +++ b/src/lerobot/policies/pi0fast/processor_pi0fast.py @@ -19,11 +19,11 @@ import torch from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_NAME from lerobot.policies.pi0.configuration_pi0 import PI0Config from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, ProcessorKwargs, RenameProcessor, - RobotProcessor, ToBatchProcessor, UnnormalizerProcessor, ) @@ -34,7 +34,7 @@ def make_pi0fast_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -57,12 +57,12 @@ def make_pi0fast_pre_post_processors( ), ] return ( - RobotProcessor( + DataProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - RobotProcessor( + DataProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/sac/processor_sac.py b/src/lerobot/policies/sac/processor_sac.py index 1ba8600ab..0632693ba 100644 --- a/src/lerobot/policies/sac/processor_sac.py +++ b/src/lerobot/policies/sac/processor_sac.py @@ -20,11 +20,11 @@ import torch from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_NAME from lerobot.policies.sac.configuration_sac import SACConfig from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, ProcessorKwargs, RenameProcessor, - RobotProcessor, ToBatchProcessor, UnnormalizerProcessor, ) @@ -35,7 +35,7 @@ def make_sac_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -58,12 +58,12 @@ def make_sac_pre_post_processors( ), ] return ( - RobotProcessor( + DataProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - RobotProcessor( + DataProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/sac/reward_model/processor_classifier.py b/src/lerobot/policies/sac/reward_model/processor_classifier.py index a7fe49cb2..ea1a0e219 100644 --- a/src/lerobot/policies/sac/reward_model/processor_classifier.py +++ b/src/lerobot/policies/sac/reward_model/processor_classifier.py @@ -17,11 +17,11 @@ import torch from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, IdentityProcessor, NormalizerProcessor, ProcessorKwargs, - RobotProcessor, ) @@ -30,7 +30,7 @@ def make_classifier_processor( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -48,12 +48,12 @@ def make_classifier_processor( output_steps = [DeviceProcessor(device="cpu"), IdentityProcessor()] return ( - RobotProcessor( + DataProcessorPipeline( steps=input_steps, name="classifier_preprocessor", **preprocessor_kwargs, ), - RobotProcessor( + DataProcessorPipeline( steps=output_steps, name="classifier_postprocessor", **postprocessor_kwargs, diff --git a/src/lerobot/policies/smolvla/processor_smolvla.py b/src/lerobot/policies/smolvla/processor_smolvla.py index f95908d22..a8bcb4fd9 100644 --- a/src/lerobot/policies/smolvla/processor_smolvla.py +++ b/src/lerobot/policies/smolvla/processor_smolvla.py @@ -21,12 +21,12 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig from lerobot.processor import ( ComplementaryDataProcessor, + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, ProcessorKwargs, ProcessorStepRegistry, RenameProcessor, - RobotProcessor, ToBatchProcessor, TokenizerProcessor, UnnormalizerProcessor, @@ -38,7 +38,7 @@ def make_smolvla_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -68,12 +68,12 @@ def make_smolvla_pre_post_processors( ), ] return ( - RobotProcessor( + DataProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - RobotProcessor( + DataProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/tdmpc/processor_tdmpc.py b/src/lerobot/policies/tdmpc/processor_tdmpc.py index 846b32200..660c66b72 100644 --- a/src/lerobot/policies/tdmpc/processor_tdmpc.py +++ b/src/lerobot/policies/tdmpc/processor_tdmpc.py @@ -19,11 +19,11 @@ import torch from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_NAME from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, ProcessorKwargs, RenameProcessor, - RobotProcessor, ToBatchProcessor, UnnormalizerProcessor, ) @@ -34,7 +34,7 @@ def make_tdmpc_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -57,12 +57,12 @@ def make_tdmpc_pre_post_processors( ), ] return ( - RobotProcessor( + DataProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - RobotProcessor( + DataProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/vqbet/processor_vqbet.py b/src/lerobot/policies/vqbet/processor_vqbet.py index 83ec4f0a0..f92a4e88e 100644 --- a/src/lerobot/policies/vqbet/processor_vqbet.py +++ b/src/lerobot/policies/vqbet/processor_vqbet.py @@ -20,11 +20,11 @@ import torch from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_NAME from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, ProcessorKwargs, RenameProcessor, - RobotProcessor, ToBatchProcessor, UnnormalizerProcessor, ) @@ -35,7 +35,7 @@ def make_vqbet_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[RobotProcessor, RobotProcessor]: +) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -58,12 +58,12 @@ def make_vqbet_pre_post_processors( ), ] return ( - RobotProcessor( + DataProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - RobotProcessor( + DataProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index cbb5de664..8f093d59b 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -41,6 +41,7 @@ from .observation_processor import VanillaObservationProcessor from .pipeline import ( ActionProcessor, ComplementaryDataProcessor, + DataProcessorPipeline, DoneProcessor, IdentityProcessor, InfoProcessor, @@ -49,7 +50,6 @@ from .pipeline import ( ProcessorStep, ProcessorStepRegistry, RewardProcessor, - RobotProcessor, TruncatedProcessor, ) from .rename_processor import RenameProcessor @@ -85,7 +85,7 @@ __all__ = [ "RenameProcessor", "RewardClassifierProcessor", "RewardProcessor", - "RobotProcessor", + "DataProcessorPipeline", "TimeLimitProcessor", "ToBatchProcessor", "TokenizerProcessor", diff --git a/src/lerobot/processor/migrate_policy_normalization.py b/src/lerobot/processor/migrate_policy_normalization.py index d5b186bab..1530f0aaa 100644 --- a/src/lerobot/processor/migrate_policy_normalization.py +++ b/src/lerobot/processor/migrate_policy_normalization.py @@ -50,7 +50,7 @@ from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature from .batch_processor import ToBatchProcessor from .device_processor import DeviceProcessor from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor -from .pipeline import RobotProcessor +from .pipeline import DataProcessorPipeline from .rename_processor import RenameProcessor # Policy type to class mapping @@ -422,14 +422,14 @@ def main(): ToBatchProcessor(), DeviceProcessor(device=policy_config.device), ] - preprocessor = RobotProcessor(steps=preprocessor_steps, name="robot_preprocessor") + preprocessor = DataProcessorPipeline(steps=preprocessor_steps, name="robot_preprocessor") # Create postprocessor with unnormalizer for outputs only postprocessor_steps = [ DeviceProcessor(device="cpu"), UnnormalizerProcessor(features=output_features, norm_map=norm_map, stats=stats), ] - postprocessor = RobotProcessor(steps=postprocessor_steps, name="robot_postprocessor") + postprocessor = DataProcessorPipeline(steps=postprocessor_steps, name="robot_postprocessor") # Determine hub repo ID if pushing to hub if args.push_to_hub: diff --git a/src/lerobot/processor/normalize_processor.py b/src/lerobot/processor/normalize_processor.py index af2a810f5..f962ca18f 100644 --- a/src/lerobot/processor/normalize_processor.py +++ b/src/lerobot/processor/normalize_processor.py @@ -12,7 +12,7 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset from .converters import to_tensor from .core import EnvTransition, TransitionKey -from .pipeline import ProcessorStep, ProcessorStepRegistry, RobotProcessor +from .pipeline import DataProcessorPipeline, ProcessorStep, ProcessorStepRegistry @dataclass @@ -252,7 +252,9 @@ class UnnormalizerProcessor(_NormalizationMixin, ProcessorStep): return features -def hotswap_stats(robot_processor: RobotProcessor, stats: dict[str, dict[str, Any]]) -> RobotProcessor: +def hotswap_stats( + robot_processor: DataProcessorPipeline, stats: dict[str, dict[str, Any]] +) -> DataProcessorPipeline: """ Replaces normalization statistics in a RobotProcessor pipeline. diff --git a/src/lerobot/processor/pipeline.py b/src/lerobot/processor/pipeline.py index ebe7d6afd..b6cc0271c 100644 --- a/src/lerobot/processor/pipeline.py +++ b/src/lerobot/processor/pipeline.py @@ -117,7 +117,7 @@ class ProcessorStep(ABC): A step is any callable accepting a full `EnvTransition` dict and returning a (possibly modified) dict of the same structure. Implementers are encouraged—but not required—to expose the optional helper methods - listed below. When present, these hooks let `RobotProcessor` + listed below. When present, these hooks let `DataProcessorPipeline` automatically serialise the step's configuration and learnable state using a safe-to-share JSON + SafeTensors format. @@ -175,14 +175,14 @@ class ProcessorStep(ABC): class ProcessorKwargs(TypedDict, total=False): - """Keyword arguments for RobotProcessor constructor.""" + """Keyword arguments for DataProcessorPipeline constructor.""" to_transition: Callable[[dict[str, Any]], EnvTransition] | None to_output: Callable[[EnvTransition], Any] | None @dataclass -class RobotProcessor(ModelHubMixin, Generic[TOutput]): +class DataProcessorPipeline(ModelHubMixin, Generic[TOutput]): """ Composable, debuggable post-processing processor for robot transitions. @@ -196,7 +196,7 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): Args: steps: Ordered list of processing steps executed on every call. Defaults to empty list. name: Human-readable identifier that is persisted inside the JSON config. - Defaults to "RobotProcessor". + Defaults to "DataProcessorPipeline". to_transition: Function to convert batch dict to EnvTransition dict. Defaults to _default_batch_to_transition. to_output: Function to convert EnvTransition dict to the desired output format of type TOutput. @@ -210,18 +210,20 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): Type Safety Examples: ```python # Default behavior - returns batch dict - processor: RobotProcessor[dict[str, Any]] = RobotProcessor(steps=[some_step1, some_step2]) + processor: DataProcessorPipeline[dict[str, Any]] = DataProcessorPipeline( + steps=[some_step1, some_step2] + ) result: dict[str, Any] = processor(batch_data) # Type checker knows this is a dict # For EnvTransition output, explicitly specify identity function - transition_processor: RobotProcessor[EnvTransition] = RobotProcessor( + transition_processor: DataProcessorPipeline[EnvTransition] = DataProcessorPipeline( steps=[some_step1, some_step2], to_output=lambda x: x, # Identity function ) result: EnvTransition = transition_processor(batch_data) # Type checker knows this is EnvTransition # For custom output types - processor: RobotProcessor[str] = RobotProcessor( + processor: DataProcessorPipeline[str] = DataProcessorPipeline( steps=[custom_step], to_output=lambda t: f"Processed {len(t)} keys" ) result: str = processor(batch_data) # Type checker knows this is str @@ -243,7 +245,7 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): """ steps: Sequence[ProcessorStep] = field(default_factory=list) - name: str = "RobotProcessor" + name: str = "DataProcessorPipeline" to_transition: Callable[[dict[str, Any]], EnvTransition] = field(default=batch_to_transition, repr=False) to_output: Callable[[EnvTransition], TOutput] = field( @@ -419,7 +421,7 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): to_transition: Callable[[dict[str, Any]], EnvTransition] | None = None, to_output: Callable[[EnvTransition], TOutput] | None = None, **kwargs, - ) -> RobotProcessor[TOutput]: + ) -> DataProcessorPipeline[TOutput]: """Load a serialized processor from source (local path or Hugging Face Hub identifier). Args: @@ -440,7 +442,7 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): Use identity function (lambda x: x) for EnvTransition output. Returns: - A RobotProcessor[TOutput] instance loaded from the saved configuration. + A DataProcessorPipeline[TOutput] instance loaded from the saved configuration. Raises: ImportError: If a processor step class cannot be loaded or imported. @@ -450,12 +452,12 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): Examples: Basic loading: ```python - processor = RobotProcessor.from_pretrained("path/to/processor") + processor = DataProcessorPipeline.from_pretrained("path/to/processor") ``` Loading specific config file: ```python - processor = RobotProcessor.from_pretrained( + processor = DataProcessorPipeline.from_pretrained( "username/multi-processor-repo", config_filename="preprocessor.json" ) ``` @@ -465,14 +467,14 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): import gym env = gym.make("CartPole-v1") - processor = RobotProcessor.from_pretrained( + processor = DataProcessorPipeline.from_pretrained( "username/cartpole-processor", overrides={"ActionRepeatStep": {"env": env}} ) ``` Multiple overrides: ```python - processor = RobotProcessor.from_pretrained( + processor = DataProcessorPipeline.from_pretrained( "path/to/processor", overrides={ "CustomStep": {"param1": "new_value"}, @@ -656,7 +658,7 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): return cls( steps=steps, - name=loaded_config.get("name", "RobotProcessor"), + name=loaded_config.get("name", "DataProcessorPipeline"), to_transition=to_transition or batch_to_transition, # Cast is necessary here: Same type-checker limitation as above. # When to_output is None, we use the default which returns dict[str, Any]. @@ -668,13 +670,13 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): """Return the number of steps in the processor.""" return len(self.steps) - def __getitem__(self, idx: int | slice) -> ProcessorStep | RobotProcessor[TOutput]: + def __getitem__(self, idx: int | slice) -> ProcessorStep | DataProcessorPipeline[TOutput]: """Indexing helper exposing underlying steps. * ``int`` – returns the idx-th ProcessorStep. - * ``slice`` – returns a new RobotProcessor with the sliced steps. + * ``slice`` – returns a new DataProcessorPipeline with the sliced steps. """ if isinstance(idx, slice): - return RobotProcessor( + return DataProcessorPipeline( steps=self.steps[idx], name=self.name, to_transition=self.to_transition, @@ -745,7 +747,7 @@ class RobotProcessor(ModelHubMixin, Generic[TOutput]): parts = [f"name='{self.name}'", steps_repr] - return f"RobotProcessor({', '.join(parts)})" + return f"DataProcessorPipeline({', '.join(parts)})" def __post_init__(self): for i, step in enumerate(self.steps): diff --git a/src/lerobot/record.py b/src/lerobot/record.py index 33a6bb98c..8fc5a43a6 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -76,7 +76,7 @@ from lerobot.datasets.utils import hw_to_dataset_features from lerobot.datasets.video_utils import VideoEncodingManager from lerobot.policies.factory import make_policy, make_pre_post_processors from lerobot.policies.pretrained import PreTrainedPolicy -from lerobot.processor import IdentityProcessor, RobotProcessor, TransitionKey +from lerobot.processor import DataProcessorPipeline, IdentityProcessor, TransitionKey from lerobot.processor.converters import ( to_output_robot_action, to_transition_robot_observation, @@ -235,22 +235,22 @@ def record_loop( dataset: LeRobotDataset | None = None, teleop: Teleoperator | list[Teleoperator] | None = None, policy: PreTrainedPolicy | None = None, - preprocessor: RobotProcessor | None = None, - postprocessor: RobotProcessor | None = None, + preprocessor: DataProcessorPipeline | None = None, + postprocessor: DataProcessorPipeline | None = None, control_time_s: int | None = None, - teleop_action_processor: RobotProcessor | None = None, # runs after teleop - robot_action_processor: RobotProcessor | None = None, # runs before robot - robot_observation_processor: RobotProcessor | None = None, # runs after robot + teleop_action_processor: DataProcessorPipeline | None = None, # runs after teleop + robot_action_processor: DataProcessorPipeline | None = None, # runs before robot + robot_observation_processor: DataProcessorPipeline | None = None, # runs after robot single_task: str | None = None, display_data: bool = False, ): - teleop_action_processor = teleop_action_processor or RobotProcessor( + teleop_action_processor = teleop_action_processor or DataProcessorPipeline( steps=[IdentityProcessor()], to_transition=to_transition_teleop_action, to_output=lambda tr: tr ) - robot_action_processor = robot_action_processor or RobotProcessor( + robot_action_processor = robot_action_processor or DataProcessorPipeline( steps=[IdentityProcessor()], to_transition=lambda tr: tr, to_output=to_output_robot_action ) - robot_observation_processor = robot_observation_processor or RobotProcessor( + robot_observation_processor = robot_observation_processor or DataProcessorPipeline( steps=[IdentityProcessor()], to_transition=to_transition_robot_observation, to_output=lambda tr: tr ) diff --git a/src/lerobot/replay.py b/src/lerobot/replay.py index 88148f7a7..8656c4792 100644 --- a/src/lerobot/replay.py +++ b/src/lerobot/replay.py @@ -47,7 +47,7 @@ from pprint import pformat from lerobot.configs import parser from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.processor import IdentityProcessor, RobotProcessor +from lerobot.processor import DataProcessorPipeline, IdentityProcessor from lerobot.processor.converters import to_output_robot_action, to_transition_teleop_action from lerobot.robots import ( # noqa: F401 Robot, @@ -85,7 +85,7 @@ class ReplayConfig: # Use vocal synthesis to read events. play_sounds: bool = True # Optional processor for actions before sending to robot - robot_action_processor: RobotProcessor | None = None + robot_action_processor: DataProcessorPipeline | None = None @parser.wrap() @@ -94,7 +94,7 @@ def replay(cfg: ReplayConfig): logging.info(pformat(asdict(cfg))) # Initialize robot action processor with default if not provided - robot_action_processor = cfg.robot_action_processor or RobotProcessor( + robot_action_processor = cfg.robot_action_processor or DataProcessorPipeline( steps=[IdentityProcessor()], to_transition=to_transition_teleop_action, to_output=to_output_robot_action, # type: ignore[arg-type] diff --git a/src/lerobot/scripts/rl/gym_manipulator.py b/src/lerobot/scripts/rl/gym_manipulator.py index 7e76689c6..ccc9be2de 100644 --- a/src/lerobot/scripts/rl/gym_manipulator.py +++ b/src/lerobot/scripts/rl/gym_manipulator.py @@ -31,6 +31,7 @@ from lerobot.model.kinematics import RobotKinematics from lerobot.processor import ( AddTeleopActionAsComplimentaryData, AddTeleopEventsAsInfo, + DataProcessorPipeline, DeviceProcessor, EnvTransition, GripperPenaltyProcessor, @@ -42,7 +43,6 @@ from lerobot.processor import ( MotorCurrentProcessor, Numpy2TorchActionProcessor, RewardClassifierProcessor, - RobotProcessor, TimeLimitProcessor, ToBatchProcessor, Torch2NumpyActionProcessor, @@ -374,7 +374,9 @@ def make_processors( DeviceProcessor(device=device), ] - return RobotProcessor(steps=env_pipeline_steps), RobotProcessor(steps=action_pipeline_steps) + return DataProcessorPipeline(steps=env_pipeline_steps), DataProcessorPipeline( + steps=action_pipeline_steps + ) # Full processor pipeline for real robot environment # Get robot and motor information for kinematics @@ -486,15 +488,15 @@ def make_processors( ] action_pipeline_steps.extend(inverse_kinematics_steps) - return RobotProcessor(steps=env_pipeline_steps), RobotProcessor(steps=action_pipeline_steps) + return DataProcessorPipeline(steps=env_pipeline_steps), DataProcessorPipeline(steps=action_pipeline_steps) def step_env_and_process_transition( env: gym.Env, transition: EnvTransition, action: torch.Tensor, - env_processor: RobotProcessor, - action_processor: RobotProcessor, + env_processor: DataProcessorPipeline, + action_processor: DataProcessorPipeline, ): """ Execute one step with processor pipeline. @@ -543,8 +545,8 @@ def step_env_and_process_transition( def control_loop( env: gym.Env, - env_processor: RobotProcessor, - action_processor: RobotProcessor, + env_processor: DataProcessorPipeline, + action_processor: DataProcessorPipeline, teleop_device: Teleoperator, cfg: GymManipulatorConfig, ) -> None: @@ -698,7 +700,9 @@ def control_loop( dataset.push_to_hub() -def replay_trajectory(env: gym.Env, action_processor: RobotProcessor, cfg: GymManipulatorConfig) -> None: +def replay_trajectory( + env: gym.Env, action_processor: DataProcessorPipeline, cfg: GymManipulatorConfig +) -> None: """Replay recorded trajectory on robot environment.""" assert cfg.dataset.replay_episode is not None, "Replay episode must be provided for replay" diff --git a/src/lerobot/teleoperate.py b/src/lerobot/teleoperate.py index 97b34b310..1b77b92d0 100644 --- a/src/lerobot/teleoperate.py +++ b/src/lerobot/teleoperate.py @@ -61,7 +61,7 @@ import rerun as rr from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401 from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401 from lerobot.configs import parser -from lerobot.processor import IdentityProcessor, RobotProcessor +from lerobot.processor import DataProcessorPipeline, IdentityProcessor from lerobot.processor.converters import ( to_output_robot_action, to_transition_robot_observation, @@ -104,9 +104,9 @@ class TeleoperateConfig: # Display all cameras on screen display_data: bool = False # Optional processors for data transformation - teleop_action_processor: RobotProcessor | None = None # runs after teleop - robot_action_processor: RobotProcessor | None = None # runs before robot - robot_observation_processor: RobotProcessor | None = None # runs after robot + teleop_action_processor: DataProcessorPipeline | None = None # runs after teleop + robot_action_processor: DataProcessorPipeline | None = None # runs before robot + robot_observation_processor: DataProcessorPipeline | None = None # runs after robot def teleop_loop( @@ -115,20 +115,20 @@ def teleop_loop( fps: int, display_data: bool = False, duration: float | None = None, - teleop_action_processor: RobotProcessor | None = None, - robot_action_processor: RobotProcessor | None = None, - robot_observation_processor: RobotProcessor | None = None, + teleop_action_processor: DataProcessorPipeline | None = None, + robot_action_processor: DataProcessorPipeline | None = None, + robot_observation_processor: DataProcessorPipeline | None = None, ): # Initialize processors with defaults if not provided - teleop_action_processor = teleop_action_processor or RobotProcessor( + teleop_action_processor = teleop_action_processor or DataProcessorPipeline( steps=[IdentityProcessor()], to_transition=to_transition_teleop_action, to_output=lambda tr: tr ) - robot_action_processor = robot_action_processor or RobotProcessor( + robot_action_processor = robot_action_processor or DataProcessorPipeline( steps=[IdentityProcessor()], to_transition=lambda tr: tr, to_output=to_output_robot_action, # type: ignore[arg-type] ) - robot_observation_processor = robot_observation_processor or RobotProcessor( + robot_observation_processor = robot_observation_processor or DataProcessorPipeline( steps=[IdentityProcessor()], to_transition=to_transition_robot_observation, to_output=lambda tr: tr ) diff --git a/src/lerobot/utils/control_utils.py b/src/lerobot/utils/control_utils.py index d8c7c9d57..5553d56f2 100644 --- a/src/lerobot/utils/control_utils.py +++ b/src/lerobot/utils/control_utils.py @@ -31,7 +31,7 @@ from termcolor import colored from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.utils import DEFAULT_FEATURES from lerobot.policies.pretrained import PreTrainedPolicy -from lerobot.processor import RobotProcessor, TransitionKey +from lerobot.processor import DataProcessorPipeline, TransitionKey from lerobot.robots import Robot @@ -102,8 +102,8 @@ def predict_action( observation: dict[str, np.ndarray], policy: PreTrainedPolicy, device: torch.device, - preprocessor: RobotProcessor, - postprocessor: RobotProcessor, + preprocessor: DataProcessorPipeline, + postprocessor: DataProcessorPipeline, use_amp: bool, task: str | None = None, robot_type: str | None = None, diff --git a/src/lerobot/utils/train_utils.py b/src/lerobot/utils/train_utils.py index 25cdeb23b..10eab8741 100644 --- a/src/lerobot/utils/train_utils.py +++ b/src/lerobot/utils/train_utils.py @@ -32,7 +32,7 @@ from lerobot.datasets.utils import load_json, write_json from lerobot.optim.optimizers import load_optimizer_state, save_optimizer_state from lerobot.optim.schedulers import load_scheduler_state, save_scheduler_state from lerobot.policies.pretrained import PreTrainedPolicy -from lerobot.processor import RobotProcessor +from lerobot.processor import DataProcessorPipeline from lerobot.utils.random_utils import load_rng_state, save_rng_state @@ -75,8 +75,8 @@ def save_checkpoint( policy: PreTrainedPolicy, optimizer: Optimizer, scheduler: LRScheduler | None = None, - preprocessor: RobotProcessor | None = None, - postprocessor: RobotProcessor | None = None, + preprocessor: DataProcessorPipeline | None = None, + postprocessor: DataProcessorPipeline | None = None, ) -> None: """This function creates the following directory structure: diff --git a/tests/processor/test_act_processor.py b/tests/processor/test_act_processor.py index f405da419..56cc1b9b4 100644 --- a/tests/processor/test_act_processor.py +++ b/tests/processor/test_act_processor.py @@ -25,10 +25,10 @@ from lerobot.constants import ACTION, OBS_STATE from lerobot.policies.act.configuration_act import ACTConfig from lerobot.policies.act.processor_act import make_act_pre_post_processors from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, RenameProcessor, - RobotProcessor, ToBatchProcessor, TransitionKey, UnnormalizerProcessor, @@ -250,7 +250,7 @@ def test_act_processor_save_and_load(): preprocessor.save_pretrained(tmpdir) # Load preprocessor - loaded_preprocessor = RobotProcessor.from_pretrained( + loaded_preprocessor = DataProcessorPipeline.from_pretrained( tmpdir, to_transition=lambda x: x, to_output=lambda x: x ) diff --git a/tests/processor/test_batch_conversion.py b/tests/processor/test_batch_conversion.py index 9be80821a..8d1f5e20e 100644 --- a/tests/processor/test_batch_conversion.py +++ b/tests/processor/test_batch_conversion.py @@ -1,6 +1,6 @@ import torch -from lerobot.processor import RobotProcessor, TransitionKey +from lerobot.processor import DataProcessorPipeline, TransitionKey from lerobot.processor.converters import batch_to_transition, transition_to_batch @@ -20,7 +20,7 @@ def _dummy_batch(): def test_observation_grouping_roundtrip(): """Test that observation.* keys are properly grouped and ungrouped.""" - proc = RobotProcessor([]) + proc = DataProcessorPipeline([]) batch_in = _dummy_batch() batch_out = proc(batch_in) @@ -261,7 +261,7 @@ def test_custom_converter(): batch = transition_to_batch(tr) return batch - processor = RobotProcessor(steps=[], to_transition=to_tr, to_output=to_batch) + processor = DataProcessorPipeline(steps=[], to_transition=to_tr, to_output=to_batch) batch = { "observation.state": torch.randn(1, 4), diff --git a/tests/processor/test_batch_processor.py b/tests/processor/test_batch_processor.py index 681ee9a60..9b3a7fa1e 100644 --- a/tests/processor/test_batch_processor.py +++ b/tests/processor/test_batch_processor.py @@ -22,7 +22,7 @@ import pytest import torch from lerobot.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE -from lerobot.processor import ProcessorStepRegistry, RobotProcessor, ToBatchProcessor, TransitionKey +from lerobot.processor import DataProcessorPipeline, ProcessorStepRegistry, ToBatchProcessor, TransitionKey def create_transition( @@ -243,7 +243,7 @@ def test_mixed_observation(): def test_integration_with_robot_processor(): """Test ToBatchProcessor integration with RobotProcessor.""" to_batch_processor = ToBatchProcessor() - pipeline = RobotProcessor([to_batch_processor], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline([to_batch_processor], to_transition=lambda x: x, to_output=lambda x: x) # Create unbatched observation observation = { @@ -283,7 +283,7 @@ def test_serialization_methods(): def test_save_and_load_pretrained(): """Test saving and loading ToBatchProcessor with RobotProcessor.""" processor = ToBatchProcessor() - pipeline = RobotProcessor( + pipeline = DataProcessorPipeline( [processor], name="BatchPipeline", to_transition=lambda x: x, to_output=lambda x: x ) @@ -296,7 +296,7 @@ def test_save_and_load_pretrained(): assert config_path.exists() # Load pipeline - loaded_pipeline = RobotProcessor.from_pretrained( + loaded_pipeline = DataProcessorPipeline.from_pretrained( tmp_dir, to_transition=lambda x: x, to_output=lambda x: x ) @@ -325,11 +325,11 @@ def test_registry_functionality(): def test_registry_based_save_load(): """Test saving and loading using registry name.""" processor = ToBatchProcessor() - pipeline = RobotProcessor([processor], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline([processor], to_transition=lambda x: x, to_output=lambda x: x) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) - loaded_pipeline = RobotProcessor.from_pretrained( + loaded_pipeline = DataProcessorPipeline.from_pretrained( tmp_dir, to_transition=lambda x: x, to_output=lambda x: x ) diff --git a/tests/processor/test_classifier_processor.py b/tests/processor/test_classifier_processor.py index 1ec62eda4..e369d11ea 100644 --- a/tests/processor/test_classifier_processor.py +++ b/tests/processor/test_classifier_processor.py @@ -25,10 +25,10 @@ from lerobot.constants import OBS_IMAGE, OBS_STATE from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig from lerobot.policies.sac.reward_model.processor_classifier import make_classifier_processor from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, IdentityProcessor, NormalizerProcessor, - RobotProcessor, TransitionKey, ) @@ -254,7 +254,7 @@ def test_classifier_processor_save_and_load(): factory_preprocessor, factory_postprocessor = make_classifier_processor(config, stats) # Create new processors with EnvTransition input/output - preprocessor = RobotProcessor( + preprocessor = DataProcessorPipeline( factory_preprocessor.steps, to_transition=lambda x: x, to_output=lambda x: x ) @@ -263,7 +263,7 @@ def test_classifier_processor_save_and_load(): preprocessor.save_pretrained(tmpdir) # Load preprocessor - loaded_preprocessor = RobotProcessor.from_pretrained( + loaded_preprocessor = DataProcessorPipeline.from_pretrained( tmpdir, to_transition=lambda x: x, to_output=lambda x: x ) @@ -300,7 +300,7 @@ def test_classifier_processor_mixed_precision(): modified_steps.append(step) # Create new processors with EnvTransition input/output - preprocessor = RobotProcessor(modified_steps, to_transition=lambda x: x, to_output=lambda x: x) + preprocessor = DataProcessorPipeline(modified_steps, to_transition=lambda x: x, to_output=lambda x: x) # Create test data observation = { diff --git a/tests/processor/test_device_processor.py b/tests/processor/test_device_processor.py index 0482b4916..b9906b720 100644 --- a/tests/processor/test_device_processor.py +++ b/tests/processor/test_device_processor.py @@ -19,7 +19,7 @@ import pytest import torch from lerobot.configs.types import FeatureType, PolicyFeature -from lerobot.processor import DeviceProcessor, RobotProcessor, TransitionKey +from lerobot.processor import DataProcessorPipeline, DeviceProcessor, TransitionKey def create_transition( @@ -310,7 +310,7 @@ def test_integration_with_robot_processor(): device_processor = DeviceProcessor(device="cpu") batch_processor = ToBatchProcessor() - processor = RobotProcessor( + processor = DataProcessorPipeline( steps=[batch_processor, device_processor], name="test_pipeline", to_transition=lambda x: x, @@ -336,14 +336,14 @@ def test_save_and_load_pretrained(): """Test saving and loading processor with DeviceProcessor.""" device = "cuda:0" if torch.cuda.is_available() else "cpu" processor = DeviceProcessor(device=device, float_dtype="float16") - robot_processor = RobotProcessor(steps=[processor], name="device_test_processor") + robot_processor = DataProcessorPipeline(steps=[processor], name="device_test_processor") with tempfile.TemporaryDirectory() as tmpdir: # Save robot_processor.save_pretrained(tmpdir) # Load - loaded_processor = RobotProcessor.from_pretrained(tmpdir) + loaded_processor = DataProcessorPipeline.from_pretrained(tmpdir) assert len(loaded_processor.steps) == 1 loaded_device_processor = loaded_processor.steps[0] @@ -982,7 +982,7 @@ def test_policy_processor_integration(): norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD, FeatureType.ACTION: NormalizationMode.MEAN_STD} # Create input processor (preprocessor) that moves to GPU - input_processor = RobotProcessor( + input_processor = DataProcessorPipeline( steps=[ NormalizerProcessor(features=features, norm_map=norm_map, stats=stats), ToBatchProcessor(), @@ -994,7 +994,7 @@ def test_policy_processor_integration(): ) # Create output processor (postprocessor) that moves to CPU - output_processor = RobotProcessor( + output_processor = DataProcessorPipeline( steps=[ DeviceProcessor(device="cpu"), UnnormalizerProcessor(features={ACTION: features[ACTION]}, norm_map=norm_map, stats=stats), diff --git a/tests/processor/test_diffusion_processor.py b/tests/processor/test_diffusion_processor.py index 0efe0e5e1..0ebca478a 100644 --- a/tests/processor/test_diffusion_processor.py +++ b/tests/processor/test_diffusion_processor.py @@ -25,10 +25,10 @@ from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig from lerobot.policies.diffusion.processor_diffusion import make_diffusion_pre_post_processors from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, RenameProcessor, - RobotProcessor, ToBatchProcessor, TransitionKey, UnnormalizerProcessor, @@ -257,7 +257,7 @@ def test_diffusion_processor_save_and_load(): factory_preprocessor, factory_postprocessor = make_diffusion_pre_post_processors(config, stats) # Create new processors with EnvTransition input/output - preprocessor = RobotProcessor( + preprocessor = DataProcessorPipeline( factory_preprocessor.steps, to_transition=lambda x: x, to_output=lambda x: x ) @@ -266,7 +266,7 @@ def test_diffusion_processor_save_and_load(): preprocessor.save_pretrained(tmpdir) # Load preprocessor - loaded_preprocessor = RobotProcessor.from_pretrained( + loaded_preprocessor = DataProcessorPipeline.from_pretrained( tmpdir, to_transition=lambda x: x, to_output=lambda x: x ) @@ -303,7 +303,7 @@ def test_diffusion_processor_mixed_precision(): modified_steps.append(step) # Create new processors with EnvTransition input/output - preprocessor = RobotProcessor(modified_steps, to_transition=lambda x: x, to_output=lambda x: x) + preprocessor = DataProcessorPipeline(modified_steps, to_transition=lambda x: x, to_output=lambda x: x) # Create test data observation = { diff --git a/tests/processor/test_normalize_processor.py b/tests/processor/test_normalize_processor.py index 816a9f36a..4fa22e23b 100644 --- a/tests/processor/test_normalize_processor.py +++ b/tests/processor/test_normalize_processor.py @@ -21,9 +21,9 @@ import torch from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature from lerobot.processor import ( + DataProcessorPipeline, IdentityProcessor, NormalizerProcessor, - RobotProcessor, TransitionKey, UnnormalizerProcessor, hotswap_stats, @@ -508,7 +508,9 @@ def test_get_config(full_stats): def test_integration_with_robot_processor(normalizer_processor): """Test integration with RobotProcessor pipeline""" - robot_processor = RobotProcessor([normalizer_processor], to_transition=lambda x: x, to_output=lambda x: x) + robot_processor = DataProcessorPipeline( + [normalizer_processor], to_transition=lambda x: x, to_output=lambda x: x + ) observation = { "observation.image": torch.tensor([0.7, 0.5, 0.3]), @@ -1009,7 +1011,7 @@ def test_hotswap_stats_basic_functionality(): identity = IdentityProcessor() # Create robot processor - robot_processor = RobotProcessor(steps=[normalizer, unnormalizer, identity]) + robot_processor = DataProcessorPipeline(steps=[normalizer, unnormalizer, identity]) # Hotswap stats new_processor = hotswap_stats(robot_processor, new_stats) @@ -1046,7 +1048,7 @@ def test_hotswap_stats_deep_copy(): norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD} normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=initial_stats) - original_processor = RobotProcessor(steps=[normalizer]) + original_processor = DataProcessorPipeline(steps=[normalizer]) # Store reference to original stats original_stats_reference = original_processor.steps[0].stats @@ -1089,7 +1091,7 @@ def test_hotswap_stats_only_affects_normalizer_steps(): unnormalizer = UnnormalizerProcessor(features=features, norm_map=norm_map, stats=stats) identity = IdentityProcessor() - robot_processor = RobotProcessor(steps=[normalizer, identity, unnormalizer]) + robot_processor = DataProcessorPipeline(steps=[normalizer, identity, unnormalizer]) # Hotswap stats new_processor = hotswap_stats(robot_processor, new_stats) @@ -1116,7 +1118,7 @@ def test_hotswap_stats_empty_stats(): norm_map = {FeatureType.VISUAL: NormalizationMode.MEAN_STD} normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=initial_stats) - robot_processor = RobotProcessor(steps=[normalizer]) + robot_processor = DataProcessorPipeline(steps=[normalizer]) # Hotswap with empty stats new_processor = hotswap_stats(robot_processor, empty_stats) @@ -1133,7 +1135,7 @@ def test_hotswap_stats_no_normalizer_steps(): } # Create processor with only identity steps - robot_processor = RobotProcessor(steps=[IdentityProcessor(), IdentityProcessor()]) + robot_processor = DataProcessorPipeline(steps=[IdentityProcessor(), IdentityProcessor()]) # Hotswap stats - should work without error new_processor = hotswap_stats(robot_processor, stats) @@ -1172,7 +1174,7 @@ def test_hotswap_stats_preserves_other_attributes(): normalize_observation_keys=normalize_observation_keys, eps=eps, ) - robot_processor = RobotProcessor(steps=[normalizer]) + robot_processor = DataProcessorPipeline(steps=[normalizer]) # Hotswap stats new_processor = hotswap_stats(robot_processor, new_stats) @@ -1215,7 +1217,7 @@ def test_hotswap_stats_multiple_normalizer_types(): unnormalizer1 = UnnormalizerProcessor(features=features, norm_map=norm_map, stats=initial_stats) unnormalizer2 = UnnormalizerProcessor(features=features, norm_map=norm_map, stats=initial_stats) - robot_processor = RobotProcessor(steps=[normalizer1, unnormalizer1, normalizer2, unnormalizer2]) + robot_processor = DataProcessorPipeline(steps=[normalizer1, unnormalizer1, normalizer2, unnormalizer2]) # Hotswap stats new_processor = hotswap_stats(robot_processor, new_stats) @@ -1263,7 +1265,7 @@ def test_hotswap_stats_with_different_data_types(): } normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=initial_stats) - robot_processor = RobotProcessor(steps=[normalizer]) + robot_processor = DataProcessorPipeline(steps=[normalizer]) # Hotswap stats new_processor = hotswap_stats(robot_processor, new_stats) @@ -1319,7 +1321,9 @@ def test_hotswap_stats_functional_test(): # Create original processor normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=initial_stats) - original_processor = RobotProcessor(steps=[normalizer], to_transition=lambda x: x, to_output=lambda x: x) + original_processor = DataProcessorPipeline( + steps=[normalizer], to_transition=lambda x: x, to_output=lambda x: x + ) # Process with original stats original_result = original_processor(transition) diff --git a/tests/processor/test_pipeline.py b/tests/processor/test_pipeline.py index e1b61e0df..2bc7991fc 100644 --- a/tests/processor/test_pipeline.py +++ b/tests/processor/test_pipeline.py @@ -27,7 +27,7 @@ import torch.nn as nn from lerobot.configs.types import FeatureType, PolicyFeature from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features -from lerobot.processor import EnvTransition, ProcessorStepRegistry, RobotProcessor, TransitionKey +from lerobot.processor import DataProcessorPipeline, EnvTransition, ProcessorStepRegistry, TransitionKey from tests.conftest import assert_contract_is_typed @@ -175,7 +175,7 @@ class MockStepWithTensorState: def test_empty_pipeline(): """Test pipeline with no steps.""" - pipeline = RobotProcessor([], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline([], to_transition=lambda x: x, to_output=lambda x: x) transition = create_transition() result = pipeline(transition) @@ -187,7 +187,7 @@ def test_empty_pipeline(): def test_single_step_pipeline(): """Test pipeline with a single step.""" step = MockStep("test_step") - pipeline = RobotProcessor([step], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline([step], to_transition=lambda x: x, to_output=lambda x: x) transition = create_transition() result = pipeline(transition) @@ -204,7 +204,7 @@ def test_multiple_steps_pipeline(): """Test pipeline with multiple steps.""" step1 = MockStep("step1") step2 = MockStep("step2") - pipeline = RobotProcessor([step1, step2], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline([step1, step2], to_transition=lambda x: x, to_output=lambda x: x) transition = create_transition() result = pipeline(transition) @@ -216,7 +216,7 @@ def test_multiple_steps_pipeline(): def test_invalid_transition_format(): """Test pipeline with invalid transition format.""" - pipeline = RobotProcessor([MockStep()]) + pipeline = DataProcessorPipeline([MockStep()]) # Test with wrong type (tuple instead of dict) with pytest.raises(ValueError, match="EnvTransition must be a dictionary"): @@ -231,7 +231,7 @@ def test_step_through(): """Test step_through method with dict input.""" step1 = MockStep("step1") step2 = MockStep("step2") - pipeline = RobotProcessor([step1, step2]) + pipeline = DataProcessorPipeline([step1, step2]) transition = create_transition() @@ -252,7 +252,7 @@ def test_step_through_with_dict(): """Test step_through method with dict input.""" step1 = MockStep("step1") step2 = MockStep("step2") - pipeline = RobotProcessor([step1, step2]) + pipeline = DataProcessorPipeline([step1, step2]) batch = { "observation.image": None, @@ -291,7 +291,7 @@ def test_step_through_with_dict(): def test_step_through_no_hooks(): """Test that step_through doesn't execute hooks.""" step = MockStep("test_step") - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) hook_calls = [] @@ -326,7 +326,7 @@ def test_indexing(): """Test pipeline indexing.""" step1 = MockStep("step1") step2 = MockStep("step2") - pipeline = RobotProcessor([step1, step2]) + pipeline = DataProcessorPipeline([step1, step2]) # Test integer indexing assert pipeline[0] is step1 @@ -334,7 +334,7 @@ def test_indexing(): # Test slice indexing sub_pipeline = pipeline[0:1] - assert isinstance(sub_pipeline, RobotProcessor) + assert isinstance(sub_pipeline, DataProcessorPipeline) assert len(sub_pipeline) == 1 assert sub_pipeline[0] is step1 @@ -342,7 +342,7 @@ def test_indexing(): def test_hooks(): """Test before/after step hooks.""" step = MockStep("test_step") - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) before_calls = [] after_calls = [] @@ -366,7 +366,7 @@ def test_hooks(): def test_unregister_hooks(): """Test unregistering hooks from the pipeline.""" step = MockStep("test_step") - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) # Test before_step_hook before_calls = [] @@ -405,7 +405,7 @@ def test_unregister_hooks(): def test_unregister_nonexistent_hook(): """Test error handling when unregistering hooks that don't exist.""" - pipeline = RobotProcessor([MockStep()]) + pipeline = DataProcessorPipeline([MockStep()]) def some_hook(idx: int, transition: EnvTransition): pass @@ -423,7 +423,7 @@ def test_unregister_nonexistent_hook(): def test_multiple_hooks_and_selective_unregister(): """Test registering multiple hooks and selectively unregistering them.""" - pipeline = RobotProcessor([MockStep("step1"), MockStep("step2")]) + pipeline = DataProcessorPipeline([MockStep("step1"), MockStep("step2")]) calls_1 = [] calls_2 = [] @@ -469,7 +469,7 @@ def test_multiple_hooks_and_selective_unregister(): def test_hook_execution_order_documentation(): """Test and document that hooks are executed sequentially in registration order.""" - pipeline = RobotProcessor([MockStep("step")]) + pipeline = DataProcessorPipeline([MockStep("step")]) execution_order = [] @@ -521,7 +521,7 @@ def test_save_and_load_pretrained(): step1.counter = 5 step2.counter = 10 - pipeline = RobotProcessor([step1, step2], name="TestPipeline") + pipeline = DataProcessorPipeline([step1, step2], name="TestPipeline") with tempfile.TemporaryDirectory() as tmp_dir: # Save pipeline @@ -543,7 +543,7 @@ def test_save_and_load_pretrained(): assert config["steps"][1]["config"]["counter"] == 10 # Load pipeline - loaded_pipeline = RobotProcessor.from_pretrained(tmp_dir) + loaded_pipeline = DataProcessorPipeline.from_pretrained(tmp_dir) assert loaded_pipeline.name == "TestPipeline" assert len(loaded_pipeline) == 2 @@ -556,7 +556,7 @@ def test_save_and_load_pretrained(): def test_step_without_optional_methods(): """Test pipeline with steps that don't implement optional methods.""" step = MockStepWithoutOptionalMethods(multiplier=3.0) - pipeline = RobotProcessor( + pipeline = DataProcessorPipeline( [step], to_transition=lambda x: x, to_output=lambda x: x ) # Identity for EnvTransition input/output @@ -571,14 +571,14 @@ def test_step_without_optional_methods(): # Save/load should work even without optional methods with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) - loaded_pipeline = RobotProcessor.from_pretrained(tmp_dir) + loaded_pipeline = DataProcessorPipeline.from_pretrained(tmp_dir) assert len(loaded_pipeline) == 1 def test_mixed_json_and_tensor_state(): """Test step with both JSON attributes and tensor state.""" step = MockStepWithTensorState(name="stats", learning_rate=0.05, window_size=5) - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) # Process some transitions with rewards for i in range(10): @@ -594,13 +594,13 @@ def test_mixed_json_and_tensor_state(): pipeline.save_pretrained(tmp_dir) # Check that both config and state files were created - config_path = Path(tmp_dir) / "robotprocessor.json" # Default name is "RobotProcessor" - state_path = Path(tmp_dir) / "robotprocessor_step_0.safetensors" + config_path = Path(tmp_dir) / "dataprocessorpipeline.json" # Default name is "RobotProcessor" + state_path = Path(tmp_dir) / "dataprocessorpipeline_step_0.safetensors" assert config_path.exists() assert state_path.exists() # Load and verify - loaded_pipeline = RobotProcessor.from_pretrained(tmp_dir) + loaded_pipeline = DataProcessorPipeline.from_pretrained(tmp_dir) loaded_step = loaded_pipeline.steps[0] # Check JSON attributes were restored @@ -861,7 +861,7 @@ def test_from_pretrained_with_overrides(): env_step = MockStepWithNonSerializableParam(name="env_step", multiplier=2.0) registered_step = RegisteredMockStep(value=100, device="cpu") - pipeline = RobotProcessor([env_step, registered_step], name="TestOverrides") + pipeline = DataProcessorPipeline([env_step, registered_step], name="TestOverrides") with tempfile.TemporaryDirectory() as tmp_dir: # Save the pipeline @@ -879,7 +879,7 @@ def test_from_pretrained_with_overrides(): "registered_mock_step": {"device": "cuda", "value": 200}, } - loaded_pipeline = RobotProcessor.from_pretrained( + loaded_pipeline = DataProcessorPipeline.from_pretrained( tmp_dir, overrides=overrides, to_transition=lambda x: x, to_output=lambda x: x ) @@ -907,7 +907,7 @@ def test_from_pretrained_with_partial_overrides(): step1 = MockStepWithNonSerializableParam(name="step1", multiplier=1.0) step2 = MockStepWithNonSerializableParam(name="step2", multiplier=2.0) - pipeline = RobotProcessor([step1, step2]) + pipeline = DataProcessorPipeline([step1, step2]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -917,7 +917,7 @@ def test_from_pretrained_with_partial_overrides(): # The current implementation applies overrides to ALL steps with the same class name # Both steps will get the override - loaded_pipeline = RobotProcessor.from_pretrained( + loaded_pipeline = DataProcessorPipeline.from_pretrained( tmp_dir, overrides=overrides, to_transition=lambda x: x, to_output=lambda x: x ) @@ -933,7 +933,7 @@ def test_from_pretrained_with_partial_overrides(): def test_from_pretrained_invalid_override_key(): """Test that invalid override keys raise KeyError.""" step = MockStepWithNonSerializableParam() - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -942,13 +942,13 @@ def test_from_pretrained_invalid_override_key(): overrides = {"NonExistentStep": {"param": "value"}} with pytest.raises(KeyError, match="Override keys.*do not match any step"): - RobotProcessor.from_pretrained(tmp_dir, overrides=overrides) + DataProcessorPipeline.from_pretrained(tmp_dir, overrides=overrides) def test_from_pretrained_multiple_invalid_override_keys(): """Test that multiple invalid override keys are reported.""" step = MockStepWithNonSerializableParam() - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -957,7 +957,7 @@ def test_from_pretrained_multiple_invalid_override_keys(): overrides = {"NonExistentStep1": {"param": "value1"}, "NonExistentStep2": {"param": "value2"}} with pytest.raises(KeyError) as exc_info: - RobotProcessor.from_pretrained(tmp_dir, overrides=overrides) + DataProcessorPipeline.from_pretrained(tmp_dir, overrides=overrides) error_msg = str(exc_info.value) assert "NonExistentStep1" in error_msg @@ -968,7 +968,7 @@ def test_from_pretrained_multiple_invalid_override_keys(): def test_from_pretrained_registered_step_override(): """Test overriding registered steps using registry names.""" registered_step = RegisteredMockStep(value=50, device="cpu") - pipeline = RobotProcessor([registered_step]) + pipeline = DataProcessorPipeline([registered_step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -976,7 +976,7 @@ def test_from_pretrained_registered_step_override(): # Override using registry name overrides = {"registered_mock_step": {"value": 999, "device": "cuda"}} - loaded_pipeline = RobotProcessor.from_pretrained( + loaded_pipeline = DataProcessorPipeline.from_pretrained( tmp_dir, overrides=overrides, to_transition=lambda x: x, to_output=lambda x: x ) @@ -994,7 +994,7 @@ def test_from_pretrained_mixed_registered_and_unregistered(): unregistered_step = MockStepWithNonSerializableParam(name="unregistered", multiplier=1.0) registered_step = RegisteredMockStep(value=10, device="cpu") - pipeline = RobotProcessor([unregistered_step, registered_step]) + pipeline = DataProcessorPipeline([unregistered_step, registered_step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -1006,7 +1006,7 @@ def test_from_pretrained_mixed_registered_and_unregistered(): "registered_mock_step": {"value": 777}, } - loaded_pipeline = RobotProcessor.from_pretrained( + loaded_pipeline = DataProcessorPipeline.from_pretrained( tmp_dir, overrides=overrides, to_transition=lambda x: x, to_output=lambda x: x ) @@ -1023,13 +1023,13 @@ def test_from_pretrained_mixed_registered_and_unregistered(): def test_from_pretrained_no_overrides(): """Test that from_pretrained works without overrides (backward compatibility).""" step = MockStepWithNonSerializableParam(name="no_override", multiplier=3.0) - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) # Load without overrides - loaded_pipeline = RobotProcessor.from_pretrained( + loaded_pipeline = DataProcessorPipeline.from_pretrained( tmp_dir, to_transition=lambda x: x, to_output=lambda x: x ) @@ -1045,13 +1045,13 @@ def test_from_pretrained_no_overrides(): def test_from_pretrained_empty_overrides(): """Test that from_pretrained works with empty overrides dict.""" step = MockStepWithNonSerializableParam(multiplier=2.0) - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) # Load with empty overrides - loaded_pipeline = RobotProcessor.from_pretrained( + loaded_pipeline = DataProcessorPipeline.from_pretrained( tmp_dir, overrides={}, to_transition=lambda x: x, to_output=lambda x: x ) @@ -1067,7 +1067,7 @@ def test_from_pretrained_empty_overrides(): def test_from_pretrained_override_instantiation_error(): """Test that instantiation errors with overrides are properly reported.""" step = MockStepWithNonSerializableParam(multiplier=1.0) - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -1080,13 +1080,13 @@ def test_from_pretrained_override_instantiation_error(): } with pytest.raises(ValueError, match="Failed to instantiate processor step"): - RobotProcessor.from_pretrained(tmp_dir, overrides=overrides) + DataProcessorPipeline.from_pretrained(tmp_dir, overrides=overrides) def test_from_pretrained_with_state_and_overrides(): """Test that overrides work correctly with steps that have tensor state.""" step = MockStepWithTensorState(name="tensor_step", learning_rate=0.01, window_size=5) - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) # Process some data to create state for i in range(10): @@ -1104,7 +1104,7 @@ def test_from_pretrained_with_state_and_overrides(): } } - loaded_pipeline = RobotProcessor.from_pretrained(tmp_dir, overrides=overrides) + loaded_pipeline = DataProcessorPipeline.from_pretrained(tmp_dir, overrides=overrides) loaded_step = loaded_pipeline.steps[0] # Check that config overrides were applied @@ -1123,7 +1123,7 @@ def test_from_pretrained_override_error_messages(): """Test that error messages for override failures are helpful.""" step1 = MockStepWithNonSerializableParam(name="step1") step2 = RegisteredMockStep() - pipeline = RobotProcessor([step1, step2]) + pipeline = DataProcessorPipeline([step1, step2]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -1132,7 +1132,7 @@ def test_from_pretrained_override_error_messages(): overrides = {"WrongStepName": {"param": "value"}} with pytest.raises(KeyError) as exc_info: - RobotProcessor.from_pretrained(tmp_dir, overrides=overrides) + DataProcessorPipeline.from_pretrained(tmp_dir, overrides=overrides) error_msg = str(exc_info.value) assert "WrongStepName" in error_msg @@ -1143,20 +1143,20 @@ def test_from_pretrained_override_error_messages(): def test_repr_empty_processor(): """Test __repr__ with empty processor.""" - pipeline = RobotProcessor() + pipeline = DataProcessorPipeline() repr_str = repr(pipeline) - expected = "RobotProcessor(name='RobotProcessor', steps=0: [])" + expected = "DataProcessorPipeline(name='DataProcessorPipeline', steps=0: [])" assert repr_str == expected def test_repr_single_step(): """Test __repr__ with single step.""" step = MockStep("test_step") - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) repr_str = repr(pipeline) - expected = "RobotProcessor(name='RobotProcessor', steps=1: [MockStep])" + expected = "DataProcessorPipeline(name='DataProcessorPipeline', steps=1: [MockStep])" assert repr_str == expected @@ -1164,18 +1164,18 @@ def test_repr_multiple_steps_under_limit(): """Test __repr__ with 2-3 steps (all shown).""" step1 = MockStep("step1") step2 = MockStepWithoutOptionalMethods() - pipeline = RobotProcessor([step1, step2]) + pipeline = DataProcessorPipeline([step1, step2]) repr_str = repr(pipeline) - expected = "RobotProcessor(name='RobotProcessor', steps=2: [MockStep, MockStepWithoutOptionalMethods])" + expected = "DataProcessorPipeline(name='DataProcessorPipeline', steps=2: [MockStep, MockStepWithoutOptionalMethods])" assert repr_str == expected # Test with 3 steps (boundary case) step3 = MockStepWithTensorState() - pipeline = RobotProcessor([step1, step2, step3]) + pipeline = DataProcessorPipeline([step1, step2, step3]) repr_str = repr(pipeline) - expected = "RobotProcessor(name='RobotProcessor', steps=3: [MockStep, MockStepWithoutOptionalMethods, MockStepWithTensorState])" + expected = "DataProcessorPipeline(name='DataProcessorPipeline', steps=3: [MockStep, MockStepWithoutOptionalMethods, MockStepWithTensorState])" assert repr_str == expected @@ -1187,30 +1187,30 @@ def test_repr_many_steps_truncated(): step4 = MockModuleStep() step5 = MockNonModuleStepWithState() - pipeline = RobotProcessor([step1, step2, step3, step4, step5]) + pipeline = DataProcessorPipeline([step1, step2, step3, step4, step5]) repr_str = repr(pipeline) - expected = "RobotProcessor(name='RobotProcessor', steps=5: [MockStep, MockStepWithoutOptionalMethods, ..., MockNonModuleStepWithState])" + expected = "DataProcessorPipeline(name='DataProcessorPipeline', steps=5: [MockStep, MockStepWithoutOptionalMethods, ..., MockNonModuleStepWithState])" assert repr_str == expected def test_repr_with_custom_name(): """Test __repr__ with custom processor name.""" step = MockStep("test_step") - pipeline = RobotProcessor([step], name="CustomProcessor") + pipeline = DataProcessorPipeline([step], name="CustomProcessor") repr_str = repr(pipeline) - expected = "RobotProcessor(name='CustomProcessor', steps=1: [MockStep])" + expected = "DataProcessorPipeline(name='CustomProcessor', steps=1: [MockStep])" assert repr_str == expected def test_repr_with_seed(): """Test __repr__ with seed parameter.""" step = MockStep("test_step") - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) repr_str = repr(pipeline) - expected = "RobotProcessor(name='RobotProcessor', steps=1: [MockStep])" + expected = "DataProcessorPipeline(name='DataProcessorPipeline', steps=1: [MockStep])" assert repr_str == expected @@ -1218,20 +1218,22 @@ def test_repr_with_custom_name_and_seed(): """Test __repr__ with both custom name and seed.""" step1 = MockStep("step1") step2 = MockStepWithoutOptionalMethods() - pipeline = RobotProcessor([step1, step2], name="MyProcessor") + pipeline = DataProcessorPipeline([step1, step2], name="MyProcessor") repr_str = repr(pipeline) - expected = "RobotProcessor(name='MyProcessor', steps=2: [MockStep, MockStepWithoutOptionalMethods])" + expected = ( + "DataProcessorPipeline(name='MyProcessor', steps=2: [MockStep, MockStepWithoutOptionalMethods])" + ) assert repr_str == expected def test_repr_without_seed(): """Test __repr__ when seed is explicitly None (should not show seed).""" step = MockStep("test_step") - pipeline = RobotProcessor([step], name="TestProcessor") + pipeline = DataProcessorPipeline([step], name="TestProcessor") repr_str = repr(pipeline) - expected = "RobotProcessor(name='TestProcessor', steps=1: [MockStep])" + expected = "DataProcessorPipeline(name='TestProcessor', steps=1: [MockStep])" assert repr_str == expected @@ -1242,10 +1244,10 @@ def test_repr_various_step_types(): step3 = MockModuleStep() step4 = MockNonModuleStepWithState() - pipeline = RobotProcessor([step1, step2, step3, step4], name="MixedSteps") + pipeline = DataProcessorPipeline([step1, step2, step3, step4], name="MixedSteps") repr_str = repr(pipeline) - expected = "RobotProcessor(name='MixedSteps', steps=4: [MockStep, MockStepWithTensorState, ..., MockNonModuleStepWithState])" + expected = "DataProcessorPipeline(name='MixedSteps', steps=4: [MockStep, MockStepWithTensorState, ..., MockNonModuleStepWithState])" assert repr_str == expected @@ -1256,10 +1258,10 @@ def test_repr_edge_case_long_names(): step3 = MockStepWithTensorState() step4 = MockNonModuleStepWithState() - pipeline = RobotProcessor([step1, step2, step3, step4], name="LongNames") + pipeline = DataProcessorPipeline([step1, step2, step3, step4], name="LongNames") repr_str = repr(pipeline) - expected = "RobotProcessor(name='LongNames', steps=4: [MockStepWithNonSerializableParam, MockStepWithoutOptionalMethods, ..., MockNonModuleStepWithState])" + expected = "DataProcessorPipeline(name='LongNames', steps=4: [MockStepWithNonSerializableParam, MockStepWithoutOptionalMethods, ..., MockNonModuleStepWithState])" assert repr_str == expected @@ -1267,7 +1269,7 @@ def test_repr_edge_case_long_names(): def test_save_with_custom_config_filename(): """Test saving processor with custom config filename.""" step = MockStep("test") - pipeline = RobotProcessor([step], name="TestProcessor") + pipeline = DataProcessorPipeline([step], name="TestProcessor") with tempfile.TemporaryDirectory() as tmp_dir: # Save with custom filename @@ -1283,16 +1285,18 @@ def test_save_with_custom_config_filename(): assert config["name"] == "TestProcessor" # Load with specific filename - loaded = RobotProcessor.from_pretrained(tmp_dir, config_filename="my_custom_config.json") + loaded = DataProcessorPipeline.from_pretrained(tmp_dir, config_filename="my_custom_config.json") assert loaded.name == "TestProcessor" def test_multiple_processors_same_directory(): """Test saving multiple processors to the same directory with different config files.""" # Create different processors - preprocessor = RobotProcessor([MockStep("pre1"), MockStep("pre2")], name="preprocessor") + preprocessor = DataProcessorPipeline([MockStep("pre1"), MockStep("pre2")], name="preprocessor") - postprocessor = RobotProcessor([MockStepWithoutOptionalMethods(multiplier=0.5)], name="postprocessor") + postprocessor = DataProcessorPipeline( + [MockStepWithoutOptionalMethods(multiplier=0.5)], name="postprocessor" + ) with tempfile.TemporaryDirectory() as tmp_dir: # Save both to same directory @@ -1304,8 +1308,8 @@ def test_multiple_processors_same_directory(): assert (Path(tmp_dir) / "postprocessor.json").exists() # Load them back - loaded_pre = RobotProcessor.from_pretrained(tmp_dir, config_filename="preprocessor.json") - loaded_post = RobotProcessor.from_pretrained(tmp_dir, config_filename="postprocessor.json") + loaded_pre = DataProcessorPipeline.from_pretrained(tmp_dir, config_filename="preprocessor.json") + loaded_post = DataProcessorPipeline.from_pretrained(tmp_dir, config_filename="postprocessor.json") assert loaded_pre.name == "preprocessor" assert loaded_post.name == "postprocessor" @@ -1316,20 +1320,20 @@ def test_multiple_processors_same_directory(): def test_auto_detect_single_config(): """Test automatic config detection when there's only one JSON file.""" step = MockStepWithTensorState() - pipeline = RobotProcessor([step], name="SingleConfig") + pipeline = DataProcessorPipeline([step], name="SingleConfig") with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) # Load without specifying config_filename - loaded = RobotProcessor.from_pretrained(tmp_dir) + loaded = DataProcessorPipeline.from_pretrained(tmp_dir) assert loaded.name == "SingleConfig" def test_error_multiple_configs_no_filename(): """Test error when multiple configs exist and no filename specified.""" - proc1 = RobotProcessor([MockStep()], name="processor1") - proc2 = RobotProcessor([MockStep()], name="processor2") + proc1 = DataProcessorPipeline([MockStep()], name="processor1") + proc2 = DataProcessorPipeline([MockStep()], name="processor2") with tempfile.TemporaryDirectory() as tmp_dir: proc1.save_pretrained(tmp_dir) @@ -1337,7 +1341,7 @@ def test_error_multiple_configs_no_filename(): # Should raise error with pytest.raises(ValueError, match="Multiple .json files found"): - RobotProcessor.from_pretrained(tmp_dir) + DataProcessorPipeline.from_pretrained(tmp_dir) def test_state_file_naming_with_indices(): @@ -1347,7 +1351,7 @@ def test_state_file_naming_with_indices(): step2 = MockStepWithTensorState(name="norm2", window_size=10) step3 = MockModuleStep(input_dim=5) - pipeline = RobotProcessor([step1, step2, step3]) + pipeline = DataProcessorPipeline([step1, step2, step3]) # Process some data to create state for i in range(5): @@ -1363,9 +1367,9 @@ def test_state_file_naming_with_indices(): # Files should be named with pipeline name prefix and indices expected_names = [ - "robotprocessor_step_0.safetensors", - "robotprocessor_step_1.safetensors", - "robotprocessor_step_2.safetensors", + "dataprocessorpipeline_step_0.safetensors", + "dataprocessorpipeline_step_1.safetensors", + "dataprocessorpipeline_step_2.safetensors", ] actual_names = [f.name for f in state_files] assert actual_names == expected_names @@ -1404,7 +1408,7 @@ def test_state_file_naming_with_registry(): # Create pipeline with registered steps step1 = TestStatefulStep(1) step2 = TestStatefulStep(2) - pipeline = RobotProcessor([step1, step2]) + pipeline = DataProcessorPipeline([step1, step2]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -1415,8 +1419,8 @@ def test_state_file_naming_with_registry(): # Should include pipeline name, index and registry name expected_names = [ - "robotprocessor_step_0_test_stateful_step.safetensors", - "robotprocessor_step_1_test_stateful_step.safetensors", + "dataprocessorpipeline_step_0_test_stateful_step.safetensors", + "dataprocessorpipeline_step_1_test_stateful_step.safetensors", ] actual_names = [f.name for f in state_files] assert actual_names == expected_names @@ -1459,13 +1463,13 @@ def test_override_with_nested_config(): try: step = ComplexConfigStep() - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) # Load with nested override - loaded = RobotProcessor.from_pretrained( + loaded = DataProcessorPipeline.from_pretrained( tmp_dir, overrides={"complex_config_step": {"nested_config": {"level1": {"level2": "overridden"}}}}, to_transition=lambda x: x, @@ -1483,13 +1487,13 @@ def test_override_with_nested_config(): def test_override_preserves_defaults(): """Test that overrides only affect specified parameters.""" step = MockStepWithNonSerializableParam(name="test", multiplier=2.0) - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) # Override only one parameter - loaded = RobotProcessor.from_pretrained( + loaded = DataProcessorPipeline.from_pretrained( tmp_dir, overrides={ "MockStepWithNonSerializableParam": { @@ -1507,7 +1511,7 @@ def test_override_preserves_defaults(): def test_override_type_validation(): """Test that type errors in overrides are caught properly.""" step = MockStepWithTensorState(learning_rate=0.01) - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -1520,7 +1524,7 @@ def test_override_type_validation(): } with pytest.raises(ValueError, match="Failed to instantiate"): - RobotProcessor.from_pretrained(tmp_dir, overrides=overrides) + DataProcessorPipeline.from_pretrained(tmp_dir, overrides=overrides) def test_override_with_callables(): @@ -1553,7 +1557,7 @@ def test_override_with_callables(): try: step = CallableStep() - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -1567,7 +1571,7 @@ def test_override_with_callables(): return x # Load with callable override - loaded = RobotProcessor.from_pretrained( + loaded = DataProcessorPipeline.from_pretrained( tmp_dir, overrides={"callable_step": {"transform_fn": double_values}}, to_transition=lambda x: x, @@ -1586,13 +1590,13 @@ def test_override_multiple_same_class_warning(): """Test behavior when multiple steps of same class exist.""" step1 = MockStepWithNonSerializableParam(name="step1", multiplier=1.0) step2 = MockStepWithNonSerializableParam(name="step2", multiplier=2.0) - pipeline = RobotProcessor([step1, step2]) + pipeline = DataProcessorPipeline([step1, step2]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) # Override affects all instances of the class - loaded = RobotProcessor.from_pretrained( + loaded = DataProcessorPipeline.from_pretrained( tmp_dir, overrides={"MockStepWithNonSerializableParam": {"multiplier": 10.0}} ) @@ -1608,7 +1612,7 @@ def test_override_multiple_same_class_warning(): def test_config_filename_special_characters(): """Test config filenames with special characters are sanitized.""" # Processor name with special characters - pipeline = RobotProcessor([MockStep()], name="My/Processor\\With:Special*Chars") + pipeline = DataProcessorPipeline([MockStep()], name="My/Processor\\With:Special*Chars") with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) @@ -1626,10 +1630,10 @@ def test_state_file_naming_with_multiple_processors(): """Test that state files are properly prefixed with pipeline names to avoid conflicts.""" # Create two processors with state step1 = MockStepWithTensorState(name="norm", window_size=5) - preprocessor = RobotProcessor([step1], name="PreProcessor") + preprocessor = DataProcessorPipeline([step1], name="PreProcessor") step2 = MockStepWithTensorState(name="norm", window_size=10) - postprocessor = RobotProcessor([step2], name="PostProcessor") + postprocessor = DataProcessorPipeline([step2], name="PostProcessor") # Process some data to create state for i in range(3): @@ -1649,8 +1653,8 @@ def test_state_file_naming_with_multiple_processors(): assert (Path(tmp_dir) / "postprocessor_step_0.safetensors").exists() # Load both back and verify they work correctly - loaded_pre = RobotProcessor.from_pretrained(tmp_dir, config_filename="preprocessor.json") - loaded_post = RobotProcessor.from_pretrained(tmp_dir, config_filename="postprocessor.json") + loaded_pre = DataProcessorPipeline.from_pretrained(tmp_dir, config_filename="preprocessor.json") + loaded_post = DataProcessorPipeline.from_pretrained(tmp_dir, config_filename="postprocessor.json") assert loaded_pre.name == "PreProcessor" assert loaded_post.name == "PostProcessor" @@ -1688,14 +1692,14 @@ def test_override_with_device_strings(): try: step = DeviceAwareStep(device="cpu") - pipeline = RobotProcessor([step]) + pipeline = DataProcessorPipeline([step]) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) # Override device if torch.cuda.is_available(): - loaded = RobotProcessor.from_pretrained( + loaded = DataProcessorPipeline.from_pretrained( tmp_dir, overrides={"device_aware_step": {"device": "cuda:0"}} ) @@ -1714,16 +1718,16 @@ def test_from_pretrained_nonexistent_path(): # Test with an invalid repo ID (too many slashes) - caught by HF validation with pytest.raises(HFValidationError): - RobotProcessor.from_pretrained("/path/that/does/not/exist") + DataProcessorPipeline.from_pretrained("/path/that/does/not/exist") # Test with a non-existent but valid Hub repo format with pytest.raises((FileNotFoundError, HfHubHTTPError)): - RobotProcessor.from_pretrained("nonexistent-user/nonexistent-repo") + DataProcessorPipeline.from_pretrained("nonexistent-user/nonexistent-repo") # Test with a local directory that exists but has no config files with tempfile.TemporaryDirectory() as tmp_dir: with pytest.raises(FileNotFoundError, match="No .json configuration files found"): - RobotProcessor.from_pretrained(tmp_dir) + DataProcessorPipeline.from_pretrained(tmp_dir) def test_save_load_with_custom_converter_functions(): @@ -1752,13 +1756,15 @@ def test_save_load_with_custom_converter_functions(): } # Create processor with custom converters - pipeline = RobotProcessor([MockStep()], to_transition=custom_to_transition, to_output=custom_to_output) + pipeline = DataProcessorPipeline( + [MockStep()], to_transition=custom_to_transition, to_output=custom_to_output + ) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) # Load - should use default converters - loaded = RobotProcessor.from_pretrained(tmp_dir) + loaded = DataProcessorPipeline.from_pretrained(tmp_dir) # Verify it uses default converters by checking with standard batch format batch = { @@ -1792,7 +1798,7 @@ class NonCallableStep: def test_construction_rejects_step_without_call(): with pytest.raises(TypeError, match=r"must define __call__"): - RobotProcessor([NonCallableStep()]) + DataProcessorPipeline([NonCallableStep()]) @dataclass @@ -1851,7 +1857,7 @@ class FeatureContractRemoveStep: def test_features_orders_and_merges(policy_feature_factory): - p = RobotProcessor( + p = DataProcessorPipeline( [ FeatureContractAddStep("a", policy_feature_factory(FeatureType.STATE, (1,))), FeatureContractMutateStep("a", lambda v: PolicyFeature(type=v.type, shape=(3,))), @@ -1870,7 +1876,7 @@ def test_features_respects_initial_without_mutation(policy_feature_factory): "seed": policy_feature_factory(FeatureType.STATE, (7,)), "nested": policy_feature_factory(FeatureType.ENV, (0,)), } - p = RobotProcessor( + p = DataProcessorPipeline( [ FeatureContractMutateStep("seed", lambda v: PolicyFeature(type=v.type, shape=(v.shape[0] + 1,))), FeatureContractMutateStep( @@ -1903,12 +1909,12 @@ def test_features_execution_order_tracking(): features["order"] = PolicyFeature(type=pf.type, shape=pf.shape + (code,)) return features - out = RobotProcessor([Track("A"), Track("B"), Track("C")]).transform_features({}) + out = DataProcessorPipeline([Track("A"), Track("B"), Track("C")]).transform_features({}) assert out["order"].shape == (1, 2, 3) def test_features_remove_key(policy_feature_factory): - p = RobotProcessor( + p = DataProcessorPipeline( [ FeatureContractAddStep("a", policy_feature_factory(FeatureType.STATE, (1,))), FeatureContractRemoveStep("a"), @@ -1923,7 +1929,7 @@ def test_features_remove_from_initial(policy_feature_factory): "keep": policy_feature_factory(FeatureType.STATE, (1,)), "drop": policy_feature_factory(FeatureType.STATE, (1,)), } - p = RobotProcessor([FeatureContractRemoveStep("drop")]) + p = DataProcessorPipeline([FeatureContractRemoveStep("drop")]) out = p.transform_features(initial_features=initial) assert "drop" not in out and out["keep"] == initial["keep"] @@ -1965,7 +1971,7 @@ class AddObservationStateFeatures: def test_aggregate_joint_action_only(): - rp = RobotProcessor([AddActionEEAndJointFeatures()]) + rp = DataProcessorPipeline([AddActionEEAndJointFeatures()]) initial = {"front": (480, 640, 3)} out = aggregate_pipeline_dataset_features( @@ -1983,7 +1989,7 @@ def test_aggregate_joint_action_only(): def test_aggregate_ee_action_and_observation_with_videos(): - rp = RobotProcessor([AddActionEEAndJointFeatures(), AddObservationStateFeatures()]) + rp = DataProcessorPipeline([AddActionEEAndJointFeatures(), AddObservationStateFeatures()]) initial = {"front": (480, 640, 3), "side": (720, 1280, 3)} out = aggregate_pipeline_dataset_features( @@ -2013,7 +2019,7 @@ def test_aggregate_ee_action_and_observation_with_videos(): def test_aggregate_both_action_types(): - rp = RobotProcessor([AddActionEEAndJointFeatures()]) + rp = DataProcessorPipeline([AddActionEEAndJointFeatures()]) out = aggregate_pipeline_dataset_features( pipeline=rp, initial_features={}, @@ -2028,7 +2034,7 @@ def test_aggregate_both_action_types(): def test_aggregate_images_when_use_videos_false(): - rp = RobotProcessor([AddObservationStateFeatures(add_front_image=True)]) + rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)]) initial = {"back": (480, 640, 3)} out = aggregate_pipeline_dataset_features( @@ -2045,7 +2051,7 @@ def test_aggregate_images_when_use_videos_false(): def test_aggregate_images_when_use_videos_true(): - rp = RobotProcessor([AddObservationStateFeatures(add_front_image=True)]) + rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)]) initial = {"back": (480, 640, 3)} out = aggregate_pipeline_dataset_features( @@ -2067,7 +2073,9 @@ def test_aggregate_images_when_use_videos_true(): def test_initial_camera_not_overridden_by_step_image(): # Step explicitly sets a different front image shape; initial has another shape. # aggregate_pipeline_dataset_features should keep the step's value (setdefault behavior on initial cams). - rp = RobotProcessor([AddObservationStateFeatures(add_front_image=True, front_image_shape=(240, 320, 3))]) + rp = DataProcessorPipeline( + [AddObservationStateFeatures(add_front_image=True, front_image_shape=(240, 320, 3))] + ) initial = {"front": (480, 640, 3)} # should NOT override the step-provided (240, 320, 3) out = aggregate_pipeline_dataset_features( diff --git a/tests/processor/test_rename_processor.py b/tests/processor/test_rename_processor.py index 6f8dfd3c5..a8c29e973 100644 --- a/tests/processor/test_rename_processor.py +++ b/tests/processor/test_rename_processor.py @@ -21,9 +21,9 @@ import torch from lerobot.configs.types import FeatureType from lerobot.processor import ( + DataProcessorPipeline, ProcessorStepRegistry, RenameProcessor, - RobotProcessor, TransitionKey, ) from lerobot.processor.rename_processor import rename_stats @@ -193,7 +193,7 @@ def test_integration_with_robot_processor(): } rename_processor = RenameProcessor(rename_map=rename_map) - pipeline = RobotProcessor([rename_processor], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline([rename_processor], to_transition=lambda x: x, to_output=lambda x: x) observation = { "agent_pos": np.array([1.0, 2.0, 3.0]), @@ -226,7 +226,7 @@ def test_save_and_load_pretrained(): "old_image": "observation.image", } processor = RenameProcessor(rename_map=rename_map) - pipeline = RobotProcessor([processor], name="TestRenameProcessor") + pipeline = DataProcessorPipeline([processor], name="TestRenameProcessor") with tempfile.TemporaryDirectory() as tmp_dir: # Save pipeline @@ -241,7 +241,7 @@ def test_save_and_load_pretrained(): assert len(state_files) == 0 # Load pipeline - loaded_pipeline = RobotProcessor.from_pretrained( + loaded_pipeline = DataProcessorPipeline.from_pretrained( tmp_dir, to_transition=lambda x: x, to_output=lambda x: x ) @@ -284,7 +284,7 @@ def test_registry_functionality(): def test_registry_based_save_load(): """Test save/load using registry name instead of module path.""" processor = RenameProcessor(rename_map={"key1": "renamed_key1"}) - pipeline = RobotProcessor([processor], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline([processor], to_transition=lambda x: x, to_output=lambda x: x) with tempfile.TemporaryDirectory() as tmp_dir: # Save and load @@ -293,7 +293,7 @@ def test_registry_based_save_load(): # Verify config uses registry name import json - with open(Path(tmp_dir) / "robotprocessor.json") as f: # Default name is "RobotProcessor" + with open(Path(tmp_dir) / "dataprocessorpipeline.json") as f: # Default name is "RobotProcessor" config = json.load(f) assert "registry_name" in config["steps"][0] @@ -301,7 +301,7 @@ def test_registry_based_save_load(): assert "class" not in config["steps"][0] # Should use registry, not module path # Load should work - loaded_pipeline = RobotProcessor.from_pretrained(tmp_dir) + loaded_pipeline = DataProcessorPipeline.from_pretrained(tmp_dir) loaded_processor = loaded_pipeline.steps[0] assert isinstance(loaded_processor, RenameProcessor) assert loaded_processor.rename_map == {"key1": "renamed_key1"} @@ -325,7 +325,9 @@ def test_chained_rename_processors(): } ) - pipeline = RobotProcessor([processor1, processor2], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline( + [processor1, processor2], to_transition=lambda x: x, to_output=lambda x: x + ) observation = { "pos": np.array([1.0, 2.0]), @@ -459,7 +461,7 @@ def test_features_chained_processors(policy_feature_factory): processor2 = RenameProcessor( rename_map={"agent_position": "observation.state", "camera_image": "observation.image"} ) - pipeline = RobotProcessor([processor1, processor2]) + pipeline = DataProcessorPipeline([processor1, processor2]) spec = { "pos": policy_feature_factory(FeatureType.STATE, (7,)), diff --git a/tests/processor/test_sac_processor.py b/tests/processor/test_sac_processor.py index c2e50f6c5..ca15bd55d 100644 --- a/tests/processor/test_sac_processor.py +++ b/tests/processor/test_sac_processor.py @@ -25,10 +25,10 @@ from lerobot.constants import ACTION, OBS_STATE from lerobot.policies.sac.configuration_sac import SACConfig from lerobot.policies.sac.processor_sac import make_sac_pre_post_processors from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, RenameProcessor, - RobotProcessor, ToBatchProcessor, TransitionKey, UnnormalizerProcessor, @@ -234,13 +234,13 @@ def test_sac_processor_without_stats(): factory_preprocessor, factory_postprocessor = make_sac_pre_post_processors(config, dataset_stats=None) # Create new processors with EnvTransition input/output - preprocessor = RobotProcessor( + preprocessor = DataProcessorPipeline( factory_preprocessor.steps, name=factory_preprocessor.name, to_transition=lambda x: x, to_output=lambda x: x, ) - postprocessor = RobotProcessor( + postprocessor = DataProcessorPipeline( factory_postprocessor.steps, name=factory_postprocessor.name, to_transition=lambda x: x, @@ -277,7 +277,7 @@ def test_sac_processor_save_and_load(): preprocessor.save_pretrained(tmpdir) # Load preprocessor - loaded_preprocessor = RobotProcessor.from_pretrained( + loaded_preprocessor = DataProcessorPipeline.from_pretrained( tmpdir, to_transition=lambda x: x, to_output=lambda x: x ) diff --git a/tests/processor/test_tdmpc_processor.py b/tests/processor/test_tdmpc_processor.py index 6e2259e83..b9be43ec4 100644 --- a/tests/processor/test_tdmpc_processor.py +++ b/tests/processor/test_tdmpc_processor.py @@ -25,10 +25,10 @@ from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig from lerobot.policies.tdmpc.processor_tdmpc import make_tdmpc_pre_post_processors from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, RenameProcessor, - RobotProcessor, ToBatchProcessor, TransitionKey, UnnormalizerProcessor, @@ -251,13 +251,13 @@ def test_tdmpc_processor_without_stats(): factory_preprocessor, factory_postprocessor = make_tdmpc_pre_post_processors(config, dataset_stats=None) # Create new processors with EnvTransition input/output - preprocessor = RobotProcessor( + preprocessor = DataProcessorPipeline( factory_preprocessor.steps, name=factory_preprocessor.name, to_transition=lambda x: x, to_output=lambda x: x, ) - postprocessor = RobotProcessor( + postprocessor = DataProcessorPipeline( factory_postprocessor.steps, name=factory_postprocessor.name, to_transition=lambda x: x, @@ -297,7 +297,7 @@ def test_tdmpc_processor_save_and_load(): preprocessor.save_pretrained(tmpdir) # Load preprocessor - loaded_preprocessor = RobotProcessor.from_pretrained( + loaded_preprocessor = DataProcessorPipeline.from_pretrained( tmpdir, to_transition=lambda x: x, to_output=lambda x: x ) diff --git a/tests/processor/test_tokenizer_processor.py b/tests/processor/test_tokenizer_processor.py index 022767de3..6a518a250 100644 --- a/tests/processor/test_tokenizer_processor.py +++ b/tests/processor/test_tokenizer_processor.py @@ -10,7 +10,7 @@ import torch from lerobot.configs.types import FeatureType, PolicyFeature from lerobot.constants import OBS_LANGUAGE -from lerobot.processor import RobotProcessor, TokenizerProcessor, TransitionKey +from lerobot.processor import DataProcessorPipeline, TokenizerProcessor, TransitionKey from tests.utils import require_package @@ -388,7 +388,9 @@ def test_integration_with_robot_processor(mock_auto_tokenizer): mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer tokenizer_processor = TokenizerProcessor(tokenizer_name="test-tokenizer", max_length=6) - robot_processor = RobotProcessor([tokenizer_processor], to_transition=lambda x: x, to_output=lambda x: x) + robot_processor = DataProcessorPipeline( + [tokenizer_processor], to_transition=lambda x: x, to_output=lambda x: x + ) transition = create_transition( observation={"state": torch.tensor([1.0, 2.0])}, @@ -426,14 +428,16 @@ def test_save_and_load_pretrained_with_tokenizer_name(mock_auto_tokenizer): tokenizer_name="test-tokenizer", max_length=32, task_key="instruction" ) - robot_processor = RobotProcessor([original_processor], to_transition=lambda x: x, to_output=lambda x: x) + robot_processor = DataProcessorPipeline( + [original_processor], to_transition=lambda x: x, to_output=lambda x: x + ) with tempfile.TemporaryDirectory() as temp_dir: # Save processor robot_processor.save_pretrained(temp_dir) # Load processor - tokenizer will be recreated from saved config - loaded_processor = RobotProcessor.from_pretrained( + loaded_processor = DataProcessorPipeline.from_pretrained( temp_dir, to_transition=lambda x: x, to_output=lambda x: x ) @@ -457,14 +461,16 @@ def test_save_and_load_pretrained_with_tokenizer_object(): original_processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=32, task_key="instruction") - robot_processor = RobotProcessor([original_processor], to_transition=lambda x: x, to_output=lambda x: x) + robot_processor = DataProcessorPipeline( + [original_processor], to_transition=lambda x: x, to_output=lambda x: x + ) with tempfile.TemporaryDirectory() as temp_dir: # Save processor robot_processor.save_pretrained(temp_dir) # Load processor with tokenizer override (since tokenizer object wasn't saved) - loaded_processor = RobotProcessor.from_pretrained( + loaded_processor = DataProcessorPipeline.from_pretrained( temp_dir, overrides={"tokenizer_processor": {"tokenizer": mock_tokenizer}}, to_transition=lambda x: x, @@ -956,7 +962,7 @@ def test_integration_with_device_processor(mock_auto_tokenizer): # Create pipeline with TokenizerProcessor then DeviceProcessor tokenizer_processor = TokenizerProcessor(tokenizer_name="test-tokenizer", max_length=6) device_processor = DeviceProcessor(device="cuda:0") - robot_processor = RobotProcessor( + robot_processor = DataProcessorPipeline( [tokenizer_processor, device_processor], to_transition=lambda x: x, to_output=lambda x: x ) diff --git a/tests/processor/test_vqbet_processor.py b/tests/processor/test_vqbet_processor.py index bb57d8cd0..6f7e21b41 100644 --- a/tests/processor/test_vqbet_processor.py +++ b/tests/processor/test_vqbet_processor.py @@ -25,10 +25,10 @@ from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig from lerobot.policies.vqbet.processor_vqbet import make_vqbet_pre_post_processors from lerobot.processor import ( + DataProcessorPipeline, DeviceProcessor, NormalizerProcessor, RenameProcessor, - RobotProcessor, ToBatchProcessor, TransitionKey, UnnormalizerProcessor, @@ -244,13 +244,13 @@ def test_vqbet_processor_without_stats(): factory_preprocessor, factory_postprocessor = make_vqbet_pre_post_processors(config, dataset_stats=None) # Create new processors with EnvTransition input/output - preprocessor = RobotProcessor( + preprocessor = DataProcessorPipeline( factory_preprocessor.steps, name=factory_preprocessor.name, to_transition=lambda x: x, to_output=lambda x: x, ) - postprocessor = RobotProcessor( + postprocessor = DataProcessorPipeline( factory_postprocessor.steps, name=factory_postprocessor.name, to_transition=lambda x: x, @@ -290,7 +290,7 @@ def test_vqbet_processor_save_and_load(): preprocessor.save_pretrained(tmpdir) # Load preprocessor - loaded_preprocessor = RobotProcessor.from_pretrained( + loaded_preprocessor = DataProcessorPipeline.from_pretrained( tmpdir, to_transition=lambda x: x, to_output=lambda x: x )