diff --git a/tests/processor/test_act_processor.py b/tests/processor/test_act_processor.py index ef3b72f54..8c663a4c1 100644 --- a/tests/processor/test_act_processor.py +++ b/tests/processor/test_act_processor.py @@ -33,19 +33,7 @@ from lerobot.processor import ( TransitionKey, UnnormalizerProcessorStep, ) - - -def create_transition(observation=None, action=None, **kwargs): - """Helper function to create a transition dictionary.""" - transition = {} - if observation is not None: - transition[TransitionKey.OBSERVATION] = observation - if action is not None: - transition[TransitionKey.ACTION] = action - for key, value in kwargs.items(): - if hasattr(TransitionKey, key.upper()): - transition[getattr(TransitionKey, key.upper())] = value - return transition +from lerobot.processor.converters import create_transition, identity_transition def create_default_config(): @@ -105,8 +93,8 @@ def test_act_processor_normalization(): preprocessor, postprocessor = make_act_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create test data @@ -139,8 +127,8 @@ def test_act_processor_cuda(): preprocessor, postprocessor = make_act_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create CPU data @@ -173,8 +161,8 @@ def test_act_processor_accelerate_scenario(): preprocessor, postprocessor = make_act_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate Accelerate: data already on GPU @@ -198,7 +186,11 @@ def test_act_processor_multi_gpu(): config.device = "cuda:0" stats = create_default_stats() - preprocessor, postprocessor = make_act_pre_post_processors(config, stats) + preprocessor, postprocessor = make_act_pre_post_processors( + config, + stats, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + ) # Simulate data on different GPU (like in multi-GPU training) device = torch.device("cuda:1") @@ -218,7 +210,12 @@ def test_act_processor_without_stats(): """Test ACT processor creation without dataset statistics.""" config = create_default_config() - preprocessor, postprocessor = make_act_pre_post_processors(config, dataset_stats=None) + preprocessor, postprocessor = make_act_pre_post_processors( + config, + dataset_stats=None, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + ) # Should still create processors, but normalization won't have stats assert preprocessor is not None @@ -241,8 +238,8 @@ def test_act_processor_save_and_load(): preprocessor, postprocessor = make_act_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) with tempfile.TemporaryDirectory() as tmpdir: @@ -251,7 +248,7 @@ def test_act_processor_save_and_load(): # Load preprocessor loaded_preprocessor = DataProcessorPipeline.from_pretrained( - tmpdir, to_transition=lambda x: x, to_output=lambda x: x + tmpdir, to_transition=identity_transition, to_output=identity_transition ) # Test that loaded processor works @@ -274,8 +271,8 @@ def test_act_processor_device_placement_preservation(): preprocessor, _ = make_act_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Process CPU data @@ -299,8 +296,8 @@ def test_act_processor_mixed_precision(): preprocessor, postprocessor = make_act_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Replace DeviceProcessorStep with one that uses float16 @@ -344,8 +341,8 @@ def test_act_processor_batch_consistency(): preprocessor, postprocessor = make_act_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Test single sample (unbatched) @@ -376,7 +373,7 @@ def test_act_processor_bfloat16_device_float32_normalizer(): preprocessor, _ = make_act_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Modify the pipeline to use bfloat16 device processor with float32 normalizer diff --git a/tests/processor/test_batch_processor.py b/tests/processor/test_batch_processor.py index 568bfa5c4..1258a29d2 100644 --- a/tests/processor/test_batch_processor.py +++ b/tests/processor/test_batch_processor.py @@ -28,21 +28,7 @@ from lerobot.processor import ( ProcessorStepRegistry, TransitionKey, ) - - -def create_transition( - observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None -): - """Helper to create an EnvTransition dictionary.""" - return { - TransitionKey.OBSERVATION: observation, - TransitionKey.ACTION: action, - TransitionKey.REWARD: reward, - TransitionKey.DONE: done, - TransitionKey.TRUNCATED: truncated, - TransitionKey.INFO: info, - TransitionKey.COMPLEMENTARY_DATA: complementary_data, - } +from lerobot.processor.converters import create_transition, identity_transition def test_state_1d_to_2d(): @@ -248,7 +234,9 @@ def test_mixed_observation(): def test_integration_with_robot_processor(): """Test AddBatchDimensionProcessorStep integration with RobotProcessor.""" to_batch_processor = AddBatchDimensionProcessorStep() - pipeline = DataProcessorPipeline([to_batch_processor], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline( + [to_batch_processor], to_transition=identity_transition, to_output=identity_transition + ) # Create unbatched observation observation = { @@ -289,7 +277,7 @@ def test_save_and_load_pretrained(): """Test saving and loading AddBatchDimensionProcessorStep with RobotProcessor.""" processor = AddBatchDimensionProcessorStep() pipeline = DataProcessorPipeline( - [processor], name="BatchPipeline", to_transition=lambda x: x, to_output=lambda x: x + [processor], name="BatchPipeline", to_transition=identity_transition, to_output=identity_transition ) with tempfile.TemporaryDirectory() as tmp_dir: @@ -302,7 +290,7 @@ def test_save_and_load_pretrained(): # Load pipeline loaded_pipeline = DataProcessorPipeline.from_pretrained( - tmp_dir, to_transition=lambda x: x, to_output=lambda x: x + tmp_dir, to_transition=identity_transition, to_output=identity_transition ) assert loaded_pipeline.name == "BatchPipeline" @@ -330,12 +318,14 @@ def test_registry_functionality(): def test_registry_based_save_load(): """Test saving and loading using registry name.""" processor = AddBatchDimensionProcessorStep() - pipeline = DataProcessorPipeline([processor], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline( + [processor], to_transition=identity_transition, to_output=identity_transition + ) with tempfile.TemporaryDirectory() as tmp_dir: pipeline.save_pretrained(tmp_dir) loaded_pipeline = DataProcessorPipeline.from_pretrained( - tmp_dir, to_transition=lambda x: x, to_output=lambda x: x + tmp_dir, to_transition=identity_transition, to_output=identity_transition ) # Verify the loaded processor works @@ -703,7 +693,7 @@ def test_complementary_data_none(): transition = create_transition(complementary_data=None) result = processor(transition) - assert result[TransitionKey.COMPLEMENTARY_DATA] is None + assert result[TransitionKey.COMPLEMENTARY_DATA] == {} def test_complementary_data_empty(): diff --git a/tests/processor/test_classifier_processor.py b/tests/processor/test_classifier_processor.py index 1c9118bd1..75c65a4dc 100644 --- a/tests/processor/test_classifier_processor.py +++ b/tests/processor/test_classifier_processor.py @@ -31,19 +31,7 @@ from lerobot.processor import ( NormalizerProcessorStep, TransitionKey, ) - - -def create_transition(observation=None, action=None, **kwargs): - """Helper function to create a transition dictionary.""" - transition = {} - if observation is not None: - transition[TransitionKey.OBSERVATION] = observation - if action is not None: - transition[TransitionKey.ACTION] = action - for key, value in kwargs.items(): - if hasattr(TransitionKey, key.upper()): - transition[getattr(TransitionKey, key.upper())] = value - return transition +from lerobot.processor.converters import create_transition, identity_transition def create_default_config(): @@ -105,8 +93,8 @@ def test_classifier_processor_normalization(): preprocessor, postprocessor = make_classifier_processor( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create test data @@ -136,8 +124,8 @@ def test_classifier_processor_cuda(): preprocessor, postprocessor = make_classifier_processor( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create CPU data @@ -174,8 +162,8 @@ def test_classifier_processor_accelerate_scenario(): preprocessor, postprocessor = make_classifier_processor( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate Accelerate: data already on GPU @@ -255,7 +243,7 @@ def test_classifier_processor_save_and_load(): # Create new processors with EnvTransition input/output preprocessor = DataProcessorPipeline( - factory_preprocessor.steps, to_transition=lambda x: x, to_output=lambda x: x + factory_preprocessor.steps, to_transition=identity_transition, to_output=identity_transition ) with tempfile.TemporaryDirectory() as tmpdir: @@ -264,7 +252,7 @@ def test_classifier_processor_save_and_load(): # Load preprocessor loaded_preprocessor = DataProcessorPipeline.from_pretrained( - tmpdir, to_transition=lambda x: x, to_output=lambda x: x + tmpdir, to_transition=identity_transition, to_output=identity_transition ) # Test that loaded processor works @@ -300,7 +288,9 @@ def test_classifier_processor_mixed_precision(): modified_steps.append(step) # Create new processors with EnvTransition input/output - preprocessor = DataProcessorPipeline(modified_steps, to_transition=lambda x: x, to_output=lambda x: x) + preprocessor = DataProcessorPipeline( + modified_steps, to_transition=identity_transition, to_output=identity_transition + ) # Create test data observation = { @@ -327,8 +317,8 @@ def test_classifier_processor_batch_data(): preprocessor, postprocessor = make_classifier_processor( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Test with batched data @@ -357,8 +347,8 @@ def test_classifier_processor_postprocessor_identity(): preprocessor, postprocessor = make_classifier_processor( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create test data for postprocessor diff --git a/tests/processor/test_converters.py b/tests/processor/test_converters.py index ee474c872..f7301fde6 100644 --- a/tests/processor/test_converters.py +++ b/tests/processor/test_converters.py @@ -5,6 +5,7 @@ import torch from lerobot.processor import TransitionKey from lerobot.processor.converters import ( batch_to_transition, + create_transition, to_tensor, transition_to_batch, transition_to_dataset_frame, @@ -283,21 +284,6 @@ def test_to_tensor_unsupported_type(): to_tensor(object()) -def create_transition( - observation=None, action=None, reward=0.0, done=False, truncated=False, info=None, complementary_data=None -): - """Helper to create an EnvTransition dictionary.""" - return { - TransitionKey.OBSERVATION: observation, - TransitionKey.ACTION: action, - TransitionKey.REWARD: reward, - TransitionKey.DONE: done, - TransitionKey.TRUNCATED: truncated, - TransitionKey.INFO: info if info is not None else {}, - TransitionKey.COMPLEMENTARY_DATA: complementary_data if complementary_data is not None else {}, - } - - def test_batch_to_transition_with_index_fields(): """Test that batch_to_transition handles index and task_index fields correctly.""" diff --git a/tests/processor/test_device_processor.py b/tests/processor/test_device_processor.py index 113b1adf2..4baf2dc03 100644 --- a/tests/processor/test_device_processor.py +++ b/tests/processor/test_device_processor.py @@ -20,28 +20,7 @@ import torch from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.processor import DataProcessorPipeline, DeviceProcessorStep, TransitionKey - - -def create_transition( - observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None -): - """Helper function to create a transition dictionary.""" - transition = {} - if observation is not None: - transition[TransitionKey.OBSERVATION] = observation - if action is not None: - transition[TransitionKey.ACTION] = action - if reward is not None: - transition[TransitionKey.REWARD] = reward - if done is not None: - transition[TransitionKey.DONE] = done - if truncated is not None: - transition[TransitionKey.TRUNCATED] = truncated - if info is not None: - transition[TransitionKey.INFO] = info - if complementary_data is not None: - transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data - return transition +from lerobot.processor.converters import create_transition, identity_transition def test_basic_functionality(): @@ -147,14 +126,14 @@ def test_none_values(): # Test with None observation transition = create_transition(observation=None, action=torch.randn(5)) result = processor(transition) - assert TransitionKey.OBSERVATION not in result + assert result[TransitionKey.OBSERVATION] is None assert result[TransitionKey.ACTION].device.type == "cpu" # Test with None action transition = create_transition(observation={"observation.state": torch.randn(10)}, action=None) result = processor(transition) assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cpu" - assert TransitionKey.ACTION not in result + assert result[TransitionKey.ACTION] is None def test_empty_observation(): @@ -315,8 +294,8 @@ def test_integration_with_robot_processor(): processor = DataProcessorPipeline( steps=[batch_processor, device_processor], name="test_pipeline", - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) # Create test data @@ -823,7 +802,7 @@ def test_complementary_data_none(): result = processor(transition) # Complementary data should not be in the result (same as input) - assert TransitionKey.COMPLEMENTARY_DATA not in result + assert result[TransitionKey.COMPLEMENTARY_DATA] == {} @pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") @@ -995,8 +974,8 @@ def test_policy_processor_integration(): DeviceProcessorStep(device="cuda"), ], name="test_preprocessor", - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) # Create output processor (postprocessor) that moves to CPU @@ -1006,8 +985,8 @@ def test_policy_processor_integration(): UnnormalizerProcessorStep(features={ACTION: features[ACTION]}, norm_map=norm_map, stats=stats), ], name="test_postprocessor", - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) # Test data on CPU diff --git a/tests/processor/test_diffusion_processor.py b/tests/processor/test_diffusion_processor.py index 1e5e93b4d..5163997a7 100644 --- a/tests/processor/test_diffusion_processor.py +++ b/tests/processor/test_diffusion_processor.py @@ -33,19 +33,7 @@ from lerobot.processor import ( TransitionKey, UnnormalizerProcessorStep, ) - - -def create_transition(observation=None, action=None, **kwargs): - """Helper function to create a transition dictionary.""" - transition = {} - if observation is not None: - transition[TransitionKey.OBSERVATION] = observation - if action is not None: - transition[TransitionKey.ACTION] = action - for key, value in kwargs.items(): - if hasattr(TransitionKey, key.upper()): - transition[getattr(TransitionKey, key.upper())] = value - return transition +from lerobot.processor.converters import create_transition, identity_transition def create_default_config(): @@ -108,8 +96,8 @@ def test_diffusion_processor_with_images(): preprocessor, postprocessor = make_diffusion_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create test data with images @@ -139,8 +127,8 @@ def test_diffusion_processor_cuda(): preprocessor, postprocessor = make_diffusion_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create CPU data @@ -177,8 +165,8 @@ def test_diffusion_processor_accelerate_scenario(): preprocessor, postprocessor = make_diffusion_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate Accelerate: data already on GPU @@ -258,7 +246,7 @@ def test_diffusion_processor_save_and_load(): # Create new processors with EnvTransition input/output preprocessor = DataProcessorPipeline( - factory_preprocessor.steps, to_transition=lambda x: x, to_output=lambda x: x + factory_preprocessor.steps, to_transition=identity_transition, to_output=identity_transition ) with tempfile.TemporaryDirectory() as tmpdir: @@ -267,7 +255,7 @@ def test_diffusion_processor_save_and_load(): # Load preprocessor loaded_preprocessor = DataProcessorPipeline.from_pretrained( - tmpdir, to_transition=lambda x: x, to_output=lambda x: x + tmpdir, to_transition=identity_transition, to_output=identity_transition ) # Test that loaded processor works @@ -314,7 +302,9 @@ def test_diffusion_processor_mixed_precision(): modified_steps.append(step) # Create new processors with EnvTransition input/output - preprocessor = DataProcessorPipeline(modified_steps, to_transition=lambda x: x, to_output=lambda x: x) + preprocessor = DataProcessorPipeline( + modified_steps, to_transition=identity_transition, to_output=identity_transition + ) # Create test data observation = { @@ -341,8 +331,8 @@ def test_diffusion_processor_identity_normalization(): preprocessor, postprocessor = make_diffusion_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create test data @@ -370,8 +360,8 @@ def test_diffusion_processor_batch_consistency(): preprocessor, postprocessor = make_diffusion_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Test with different batch sizes @@ -423,7 +413,9 @@ def test_diffusion_processor_bfloat16_device_float32_normalizer(): modified_steps.append(step) # Create new processor with modified steps - preprocessor = DataProcessorPipeline(modified_steps, to_transition=lambda x: x, to_output=lambda x: x) + preprocessor = DataProcessorPipeline( + modified_steps, to_transition=identity_transition, to_output=identity_transition + ) # Verify initial normalizer configuration normalizer_step = modified_steps[3] # NormalizerProcessorStep diff --git a/tests/processor/test_normalize_processor.py b/tests/processor/test_normalize_processor.py index 25ad66860..dcb450cd1 100644 --- a/tests/processor/test_normalize_processor.py +++ b/tests/processor/test_normalize_processor.py @@ -28,22 +28,7 @@ from lerobot.processor import ( UnnormalizerProcessorStep, hotswap_stats, ) -from lerobot.processor.converters import to_tensor - - -def create_transition( - observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None -): - """Helper to create an EnvTransition dictionary.""" - return { - TransitionKey.OBSERVATION: observation, - TransitionKey.ACTION: action, - TransitionKey.REWARD: reward, - TransitionKey.DONE: done, - TransitionKey.TRUNCATED: truncated, - TransitionKey.INFO: info, - TransitionKey.COMPLEMENTARY_DATA: complementary_data, - } +from lerobot.processor.converters import create_transition, identity_transition, to_tensor def test_numpy_conversion(): @@ -509,7 +494,7 @@ def test_get_config(full_stats): def test_integration_with_robot_processor(normalizer_processor): """Test integration with RobotProcessor pipeline""" robot_processor = DataProcessorPipeline( - [normalizer_processor], to_transition=lambda x: x, to_output=lambda x: x + [normalizer_processor], to_transition=identity_transition, to_output=identity_transition ) observation = { @@ -1322,7 +1307,7 @@ def test_hotswap_stats_functional_test(): # Create original processor normalizer = NormalizerProcessorStep(features=features, norm_map=norm_map, stats=initial_stats) original_processor = DataProcessorPipeline( - steps=[normalizer], to_transition=lambda x: x, to_output=lambda x: x + steps=[normalizer], to_transition=identity_transition, to_output=identity_transition ) # Process with original stats diff --git a/tests/processor/test_observation_processor.py b/tests/processor/test_observation_processor.py index 0b06ea653..f21c9f662 100644 --- a/tests/processor/test_observation_processor.py +++ b/tests/processor/test_observation_processor.py @@ -21,24 +21,10 @@ import torch from lerobot.configs.types import FeatureType, PipelineFeatureType from lerobot.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE from lerobot.processor import TransitionKey, VanillaObservationProcessorStep +from lerobot.processor.converters import create_transition from tests.conftest import assert_contract_is_typed -def create_transition( - observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None -): - """Helper to create an EnvTransition dictionary.""" - return { - TransitionKey.OBSERVATION: observation, - TransitionKey.ACTION: action, - TransitionKey.REWARD: reward, - TransitionKey.DONE: done, - TransitionKey.TRUNCATED: truncated, - TransitionKey.INFO: info, - TransitionKey.COMPLEMENTARY_DATA: complementary_data, - } - - def test_process_single_image(): """Test processing a single image.""" processor = VanillaObservationProcessorStep() diff --git a/tests/processor/test_pi0_processor.py b/tests/processor/test_pi0_processor.py index e83635a48..1745e1779 100644 --- a/tests/processor/test_pi0_processor.py +++ b/tests/processor/test_pi0_processor.py @@ -34,6 +34,7 @@ from lerobot.processor import ( TransitionKey, UnnormalizerProcessorStep, ) +from lerobot.processor.converters import create_transition, identity_transition class MockTokenizerProcessorStep(ProcessorStep): @@ -52,21 +53,6 @@ class MockTokenizerProcessorStep(ProcessorStep): return features -def create_transition(observation=None, action=None, **kwargs): - """Helper function to create a transition dictionary.""" - transition = {} - if observation is not None: - transition[TransitionKey.OBSERVATION] = observation - if action is not None: - transition[TransitionKey.ACTION] = action - for key, value in kwargs.items(): - if hasattr(TransitionKey, key.upper()): - transition[getattr(TransitionKey, key.upper())] = value - elif key == "complementary_data": - transition[TransitionKey.COMPLEMENTARY_DATA] = value - return transition - - def create_default_config(): """Create a default PI0 configuration for testing.""" config = PI0Config() @@ -105,8 +91,8 @@ def test_make_pi0_processor_basic(): preprocessor, postprocessor = make_pi0_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Check processor names @@ -209,8 +195,8 @@ def test_pi0_processor_cuda(): preprocessor, postprocessor = make_pi0_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create CPU data @@ -264,8 +250,8 @@ def test_pi0_processor_accelerate_scenario(): preprocessor, postprocessor = make_pi0_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate Accelerate: data already on GPU and batched @@ -320,8 +306,8 @@ def test_pi0_processor_multi_gpu(): preprocessor, postprocessor = make_pi0_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate data on different GPU @@ -351,8 +337,8 @@ def test_pi0_processor_without_stats(): preprocessor, postprocessor = make_pi0_pre_post_processors( config, dataset_stats=None, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Should still create processors @@ -390,8 +376,8 @@ def test_pi0_processor_bfloat16_device_float32_normalizer(): preprocessor, _ = make_pi0_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Modify the pipeline to use bfloat16 device processor with float32 normalizer diff --git a/tests/processor/test_pipeline.py b/tests/processor/test_pipeline.py index 1e6db6436..3fe483df5 100644 --- a/tests/processor/test_pipeline.py +++ b/tests/processor/test_pipeline.py @@ -34,24 +34,10 @@ from lerobot.processor import ( ProcessorStepRegistry, TransitionKey, ) +from lerobot.processor.converters import create_transition, identity_transition from tests.conftest import assert_contract_is_typed -def create_transition( - observation=None, action=None, reward=0.0, done=False, truncated=False, info=None, complementary_data=None -): - """Helper to create an EnvTransition dictionary.""" - return { - TransitionKey.OBSERVATION: observation, - TransitionKey.ACTION: action, - TransitionKey.REWARD: reward, - TransitionKey.DONE: done, - TransitionKey.TRUNCATED: truncated, - TransitionKey.INFO: info if info is not None else {}, - TransitionKey.COMPLEMENTARY_DATA: complementary_data if complementary_data is not None else {}, - } - - @dataclass class MockStep(ProcessorStep): """Mock pipeline step for testing - demonstrates best practices. @@ -187,7 +173,7 @@ class MockStepWithTensorState(ProcessorStep): def test_empty_pipeline(): """Test pipeline with no steps.""" - pipeline = DataProcessorPipeline([], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline([], to_transition=identity_transition, to_output=identity_transition) transition = create_transition() result = pipeline(transition) @@ -199,7 +185,7 @@ def test_empty_pipeline(): def test_single_step_pipeline(): """Test pipeline with a single step.""" step = MockStep("test_step") - pipeline = DataProcessorPipeline([step], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline([step], to_transition=identity_transition, to_output=identity_transition) transition = create_transition() result = pipeline(transition) @@ -216,7 +202,9 @@ def test_multiple_steps_pipeline(): """Test pipeline with multiple steps.""" step1 = MockStep("step1") step2 = MockStep("step2") - pipeline = DataProcessorPipeline([step1, step2], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline( + [step1, step2], to_transition=identity_transition, to_output=identity_transition + ) transition = create_transition() result = pipeline(transition) @@ -569,7 +557,7 @@ def test_step_without_optional_methods(): """Test pipeline with steps that don't implement optional methods.""" step = MockStepWithoutOptionalMethods(multiplier=3.0) pipeline = DataProcessorPipeline( - [step], to_transition=lambda x: x, to_output=lambda x: x + [step], to_transition=identity_transition, to_output=identity_transition ) # Identity for EnvTransition input/output transition = create_transition(reward=2.0) @@ -900,7 +888,7 @@ def test_from_pretrained_with_overrides(): } loaded_pipeline = DataProcessorPipeline.from_pretrained( - tmp_dir, overrides=overrides, to_transition=lambda x: x, to_output=lambda x: x + tmp_dir, overrides=overrides, to_transition=identity_transition, to_output=identity_transition ) # Verify the pipeline was loaded correctly @@ -938,7 +926,7 @@ def test_from_pretrained_with_partial_overrides(): # The current implementation applies overrides to ALL steps with the same class name # Both steps will get the override loaded_pipeline = DataProcessorPipeline.from_pretrained( - tmp_dir, overrides=overrides, to_transition=lambda x: x, to_output=lambda x: x + tmp_dir, overrides=overrides, to_transition=identity_transition, to_output=identity_transition ) transition = create_transition(reward=1.0) @@ -997,7 +985,7 @@ def test_from_pretrained_registered_step_override(): overrides = {"registered_mock_step": {"value": 999, "device": "cuda"}} loaded_pipeline = DataProcessorPipeline.from_pretrained( - tmp_dir, overrides=overrides, to_transition=lambda x: x, to_output=lambda x: x + tmp_dir, overrides=overrides, to_transition=identity_transition, to_output=identity_transition ) # Test that overrides were applied @@ -1027,7 +1015,7 @@ def test_from_pretrained_mixed_registered_and_unregistered(): } loaded_pipeline = DataProcessorPipeline.from_pretrained( - tmp_dir, overrides=overrides, to_transition=lambda x: x, to_output=lambda x: x + tmp_dir, overrides=overrides, to_transition=identity_transition, to_output=identity_transition ) # Test both steps @@ -1050,7 +1038,7 @@ def test_from_pretrained_no_overrides(): # Load without overrides loaded_pipeline = DataProcessorPipeline.from_pretrained( - tmp_dir, to_transition=lambda x: x, to_output=lambda x: x + tmp_dir, to_transition=identity_transition, to_output=identity_transition ) assert len(loaded_pipeline) == 1 @@ -1072,7 +1060,7 @@ def test_from_pretrained_empty_overrides(): # Load with empty overrides loaded_pipeline = DataProcessorPipeline.from_pretrained( - tmp_dir, overrides={}, to_transition=lambda x: x, to_output=lambda x: x + tmp_dir, overrides={}, to_transition=identity_transition, to_output=identity_transition ) assert len(loaded_pipeline) == 1 @@ -1496,8 +1484,8 @@ def test_override_with_nested_config(): loaded = DataProcessorPipeline.from_pretrained( tmp_dir, overrides={"complex_config_step": {"nested_config": {"level1": {"level2": "overridden"}}}}, - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) # Test that override worked @@ -1600,8 +1588,8 @@ def test_override_with_callables(): loaded = DataProcessorPipeline.from_pretrained( tmp_dir, overrides={"callable_step": {"transform_fn": double_values}}, - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) # Test it works @@ -1869,7 +1857,7 @@ class FeatureContractMutateStep(ProcessorStep): """Mutates a PolicyFeature""" key: str = "a" - fn: Callable[[PolicyFeature | None], PolicyFeature] = lambda x: x # noqa: E731 + fn: Callable[[PolicyFeature | None], PolicyFeature] = identity_transition # noqa: E731 def __call__(self, transition: EnvTransition) -> EnvTransition: return transition diff --git a/tests/processor/test_rename_processor.py b/tests/processor/test_rename_processor.py index b04ee1c6e..54b47f8c8 100644 --- a/tests/processor/test_rename_processor.py +++ b/tests/processor/test_rename_processor.py @@ -26,25 +26,11 @@ from lerobot.processor import ( RenameObservationsProcessorStep, TransitionKey, ) +from lerobot.processor.converters import create_transition, identity_transition from lerobot.processor.rename_processor import rename_stats from tests.conftest import assert_contract_is_typed -def create_transition( - observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None -): - """Helper to create an EnvTransition dictionary.""" - return { - TransitionKey.OBSERVATION: observation, - TransitionKey.ACTION: action, - TransitionKey.REWARD: reward, - TransitionKey.DONE: done, - TransitionKey.TRUNCATED: truncated, - TransitionKey.INFO: info, - TransitionKey.COMPLEMENTARY_DATA: complementary_data, - } - - def test_basic_renaming(): """Test basic key renaming functionality.""" rename_map = { @@ -193,7 +179,9 @@ def test_integration_with_robot_processor(): } rename_processor = RenameObservationsProcessorStep(rename_map=rename_map) - pipeline = DataProcessorPipeline([rename_processor], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline( + [rename_processor], to_transition=identity_transition, to_output=identity_transition + ) observation = { "agent_pos": np.array([1.0, 2.0, 3.0]), @@ -244,7 +232,7 @@ def test_save_and_load_pretrained(): # Load pipeline loaded_pipeline = DataProcessorPipeline.from_pretrained( - tmp_dir, to_transition=lambda x: x, to_output=lambda x: x + tmp_dir, to_transition=identity_transition, to_output=identity_transition ) assert loaded_pipeline.name == "TestRenameProcessorStep" @@ -286,7 +274,9 @@ def test_registry_functionality(): def test_registry_based_save_load(): """Test save/load using registry name instead of module path.""" processor = RenameObservationsProcessorStep(rename_map={"key1": "renamed_key1"}) - pipeline = DataProcessorPipeline([processor], to_transition=lambda x: x, to_output=lambda x: x) + pipeline = DataProcessorPipeline( + [processor], to_transition=identity_transition, to_output=identity_transition + ) with tempfile.TemporaryDirectory() as tmp_dir: # Save and load @@ -328,7 +318,7 @@ def test_chained_rename_processors(): ) pipeline = DataProcessorPipeline( - [processor1, processor2], to_transition=lambda x: x, to_output=lambda x: x + [processor1, processor2], to_transition=identity_transition, to_output=identity_transition ) observation = { diff --git a/tests/processor/test_sac_processor.py b/tests/processor/test_sac_processor.py index 3e26172c3..fb18c1d06 100644 --- a/tests/processor/test_sac_processor.py +++ b/tests/processor/test_sac_processor.py @@ -33,19 +33,7 @@ from lerobot.processor import ( TransitionKey, UnnormalizerProcessorStep, ) - - -def create_transition(observation=None, action=None, **kwargs): - """Helper function to create a transition dictionary.""" - transition = {} - if observation is not None: - transition[TransitionKey.OBSERVATION] = observation - if action is not None: - transition[TransitionKey.ACTION] = action - for key, value in kwargs.items(): - if hasattr(TransitionKey, key.upper()): - transition[getattr(TransitionKey, key.upper())] = value - return transition +from lerobot.processor.converters import create_transition, identity_transition def create_default_config(): @@ -81,8 +69,8 @@ def test_make_sac_processor_basic(): preprocessor, postprocessor = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Check processor names @@ -110,8 +98,8 @@ def test_sac_processor_normalization_modes(): preprocessor, postprocessor = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create test data @@ -146,8 +134,8 @@ def test_sac_processor_cuda(): preprocessor, postprocessor = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create CPU data @@ -180,8 +168,8 @@ def test_sac_processor_accelerate_scenario(): preprocessor, postprocessor = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate Accelerate: data already on GPU @@ -208,8 +196,8 @@ def test_sac_processor_multi_gpu(): preprocessor, postprocessor = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate data on different GPU @@ -237,14 +225,14 @@ def test_sac_processor_without_stats(): preprocessor = DataProcessorPipeline( factory_preprocessor.steps, name=factory_preprocessor.name, - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) postprocessor = DataProcessorPipeline( factory_postprocessor.steps, name=factory_postprocessor.name, - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) # Should still create processors @@ -268,8 +256,8 @@ def test_sac_processor_save_and_load(): preprocessor, postprocessor = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) with tempfile.TemporaryDirectory() as tmpdir: @@ -278,7 +266,7 @@ def test_sac_processor_save_and_load(): # Load preprocessor loaded_preprocessor = DataProcessorPipeline.from_pretrained( - tmpdir, to_transition=lambda x: x, to_output=lambda x: x + tmpdir, to_transition=identity_transition, to_output=identity_transition ) # Test that loaded processor works @@ -302,8 +290,8 @@ def test_sac_processor_mixed_precision(): preprocessor, postprocessor = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Replace DeviceProcessorStep with one that uses float16 @@ -347,8 +335,8 @@ def test_sac_processor_batch_data(): preprocessor, postprocessor = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Test with batched data @@ -373,8 +361,8 @@ def test_sac_processor_edge_cases(): preprocessor, postprocessor = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Test with empty observation @@ -401,8 +389,8 @@ def test_sac_processor_bfloat16_device_float32_normalizer(): preprocessor, _ = make_sac_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Modify the pipeline to use bfloat16 device processor with float32 normalizer diff --git a/tests/processor/test_smolvla_processor.py b/tests/processor/test_smolvla_processor.py index 317b0feec..f2dd0156f 100644 --- a/tests/processor/test_smolvla_processor.py +++ b/tests/processor/test_smolvla_processor.py @@ -37,6 +37,7 @@ from lerobot.processor import ( TransitionKey, UnnormalizerProcessorStep, ) +from lerobot.processor.converters import create_transition, identity_transition class MockTokenizerProcessorStep(ProcessorStep): @@ -55,21 +56,6 @@ class MockTokenizerProcessorStep(ProcessorStep): return features -def create_transition(observation=None, action=None, **kwargs): - """Helper function to create a transition dictionary.""" - transition = {} - if observation is not None: - transition[TransitionKey.OBSERVATION] = observation - if action is not None: - transition[TransitionKey.ACTION] = action - for key, value in kwargs.items(): - if hasattr(TransitionKey, key.upper()): - transition[getattr(TransitionKey, key.upper())] = value - elif key == "complementary_data": - transition[TransitionKey.COMPLEMENTARY_DATA] = value - return transition - - def create_default_config(): """Create a default SmolVLA configuration for testing.""" config = SmolVLAConfig() @@ -112,8 +98,8 @@ def test_make_smolvla_processor_basic(): preprocessor, postprocessor = make_smolvla_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Check processor names @@ -218,8 +204,8 @@ def test_smolvla_processor_cuda(): preprocessor, postprocessor = make_smolvla_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create CPU data @@ -275,8 +261,8 @@ def test_smolvla_processor_accelerate_scenario(): preprocessor, postprocessor = make_smolvla_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate Accelerate: data already on GPU and batched @@ -333,8 +319,8 @@ def test_smolvla_processor_multi_gpu(): preprocessor, postprocessor = make_smolvla_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate data on different GPU @@ -366,8 +352,8 @@ def test_smolvla_processor_without_stats(): preprocessor, postprocessor = make_smolvla_pre_post_processors( config, dataset_stats=None, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Should still create processors @@ -419,8 +405,8 @@ def test_smolvla_processor_bfloat16_device_float32_normalizer(): preprocessor, _ = make_smolvla_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Modify the pipeline to use bfloat16 device processor with float32 normalizer diff --git a/tests/processor/test_tdmpc_processor.py b/tests/processor/test_tdmpc_processor.py index 4f08ef289..660fe10ea 100644 --- a/tests/processor/test_tdmpc_processor.py +++ b/tests/processor/test_tdmpc_processor.py @@ -33,19 +33,7 @@ from lerobot.processor import ( TransitionKey, UnnormalizerProcessorStep, ) - - -def create_transition(observation=None, action=None, **kwargs): - """Helper function to create a transition dictionary.""" - transition = {} - if observation is not None: - transition[TransitionKey.OBSERVATION] = observation - if action is not None: - transition[TransitionKey.ACTION] = action - for key, value in kwargs.items(): - if hasattr(TransitionKey, key.upper()): - transition[getattr(TransitionKey, key.upper())] = value - return transition +from lerobot.processor.converters import create_transition, identity_transition def create_default_config(): @@ -84,8 +72,8 @@ def test_make_tdmpc_processor_basic(): preprocessor, postprocessor = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Check processor names @@ -113,8 +101,8 @@ def test_tdmpc_processor_normalization(): preprocessor, postprocessor = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create test data @@ -151,8 +139,8 @@ def test_tdmpc_processor_cuda(): preprocessor, postprocessor = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create CPU data @@ -189,8 +177,8 @@ def test_tdmpc_processor_accelerate_scenario(): preprocessor, postprocessor = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate Accelerate: data already on GPU @@ -221,8 +209,8 @@ def test_tdmpc_processor_multi_gpu(): preprocessor, postprocessor = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate data on different GPU @@ -254,14 +242,14 @@ def test_tdmpc_processor_without_stats(): preprocessor = DataProcessorPipeline( factory_preprocessor.steps, name=factory_preprocessor.name, - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) postprocessor = DataProcessorPipeline( factory_postprocessor.steps, name=factory_postprocessor.name, - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) # Should still create processors @@ -288,8 +276,8 @@ def test_tdmpc_processor_save_and_load(): preprocessor, postprocessor = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) with tempfile.TemporaryDirectory() as tmpdir: @@ -298,7 +286,7 @@ def test_tdmpc_processor_save_and_load(): # Load preprocessor loaded_preprocessor = DataProcessorPipeline.from_pretrained( - tmpdir, to_transition=lambda x: x, to_output=lambda x: x + tmpdir, to_transition=identity_transition, to_output=identity_transition ) # Test that loaded processor works @@ -326,8 +314,8 @@ def test_tdmpc_processor_mixed_precision(): preprocessor, postprocessor = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Replace DeviceProcessorStep with one that uses float16 @@ -375,8 +363,8 @@ def test_tdmpc_processor_batch_data(): preprocessor, postprocessor = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Test with batched data @@ -405,8 +393,8 @@ def test_tdmpc_processor_edge_cases(): preprocessor, postprocessor = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Test with only state observation (no image) @@ -437,7 +425,7 @@ def test_tdmpc_processor_bfloat16_device_float32_normalizer(): preprocessor, _ = make_tdmpc_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Modify the pipeline to use bfloat16 device processor with float32 normalizer diff --git a/tests/processor/test_tokenizer_processor.py b/tests/processor/test_tokenizer_processor.py index 8055a9d4b..8c2a2d834 100644 --- a/tests/processor/test_tokenizer_processor.py +++ b/tests/processor/test_tokenizer_processor.py @@ -11,24 +11,10 @@ import torch from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.constants import OBS_LANGUAGE from lerobot.processor import DataProcessorPipeline, TokenizerProcessorStep, TransitionKey +from lerobot.processor.converters import create_transition, identity_transition from tests.utils import require_package -def create_transition( - observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None -): - """Helper function to create test transitions.""" - return { - TransitionKey.OBSERVATION: observation, - TransitionKey.ACTION: action, - TransitionKey.REWARD: reward, - TransitionKey.DONE: done, - TransitionKey.TRUNCATED: truncated, - TransitionKey.INFO: info, - TransitionKey.COMPLEMENTARY_DATA: complementary_data, - } - - class MockTokenizer: """Mock tokenizer for testing that mimics transformers tokenizer interface.""" @@ -389,7 +375,7 @@ def test_integration_with_robot_processor(mock_auto_tokenizer): tokenizer_processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=6) robot_processor = DataProcessorPipeline( - [tokenizer_processor], to_transition=lambda x: x, to_output=lambda x: x + [tokenizer_processor], to_transition=identity_transition, to_output=identity_transition ) transition = create_transition( @@ -429,7 +415,7 @@ def test_save_and_load_pretrained_with_tokenizer_name(mock_auto_tokenizer): ) robot_processor = DataProcessorPipeline( - [original_processor], to_transition=lambda x: x, to_output=lambda x: x + [original_processor], to_transition=identity_transition, to_output=identity_transition ) with tempfile.TemporaryDirectory() as temp_dir: @@ -438,7 +424,7 @@ def test_save_and_load_pretrained_with_tokenizer_name(mock_auto_tokenizer): # Load processor - tokenizer will be recreated from saved config loaded_processor = DataProcessorPipeline.from_pretrained( - temp_dir, to_transition=lambda x: x, to_output=lambda x: x + temp_dir, to_transition=identity_transition, to_output=identity_transition ) # Test that loaded processor works @@ -464,7 +450,7 @@ def test_save_and_load_pretrained_with_tokenizer_object(): ) robot_processor = DataProcessorPipeline( - [original_processor], to_transition=lambda x: x, to_output=lambda x: x + [original_processor], to_transition=identity_transition, to_output=identity_transition ) with tempfile.TemporaryDirectory() as temp_dir: @@ -475,8 +461,8 @@ def test_save_and_load_pretrained_with_tokenizer_object(): loaded_processor = DataProcessorPipeline.from_pretrained( temp_dir, overrides={"tokenizer_processor": {"tokenizer": mock_tokenizer}}, - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) # Test that loaded processor works @@ -979,7 +965,9 @@ def test_integration_with_device_processor(mock_auto_tokenizer): tokenizer_processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=6) device_processor = DeviceProcessorStep(device="cuda:0") robot_processor = DataProcessorPipeline( - [tokenizer_processor, device_processor], to_transition=lambda x: x, to_output=lambda x: x + [tokenizer_processor, device_processor], + to_transition=identity_transition, + to_output=identity_transition, ) # Start with CPU tensors diff --git a/tests/processor/test_vqbet_processor.py b/tests/processor/test_vqbet_processor.py index c05fb15fe..0d773993e 100644 --- a/tests/processor/test_vqbet_processor.py +++ b/tests/processor/test_vqbet_processor.py @@ -33,19 +33,7 @@ from lerobot.processor import ( TransitionKey, UnnormalizerProcessorStep, ) - - -def create_transition(observation=None, action=None, **kwargs): - """Helper function to create a transition dictionary.""" - transition = {} - if observation is not None: - transition[TransitionKey.OBSERVATION] = observation - if action is not None: - transition[TransitionKey.ACTION] = action - for key, value in kwargs.items(): - if hasattr(TransitionKey, key.upper()): - transition[getattr(TransitionKey, key.upper())] = value - return transition +from lerobot.processor.converters import create_transition, identity_transition def create_default_config(): @@ -84,8 +72,8 @@ def test_make_vqbet_processor_basic(): preprocessor, postprocessor = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Check processor names @@ -113,8 +101,8 @@ def test_vqbet_processor_with_images(): preprocessor, postprocessor = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create test data with images and states @@ -144,8 +132,8 @@ def test_vqbet_processor_cuda(): preprocessor, postprocessor = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Create CPU data @@ -182,8 +170,8 @@ def test_vqbet_processor_accelerate_scenario(): preprocessor, postprocessor = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate Accelerate: data already on GPU and batched @@ -214,8 +202,8 @@ def test_vqbet_processor_multi_gpu(): preprocessor, postprocessor = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Simulate data on different GPU @@ -247,14 +235,14 @@ def test_vqbet_processor_without_stats(): preprocessor = DataProcessorPipeline( factory_preprocessor.steps, name=factory_preprocessor.name, - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) postprocessor = DataProcessorPipeline( factory_postprocessor.steps, name=factory_postprocessor.name, - to_transition=lambda x: x, - to_output=lambda x: x, + to_transition=identity_transition, + to_output=identity_transition, ) # Should still create processors @@ -281,8 +269,8 @@ def test_vqbet_processor_save_and_load(): preprocessor, postprocessor = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) with tempfile.TemporaryDirectory() as tmpdir: @@ -291,7 +279,7 @@ def test_vqbet_processor_save_and_load(): # Load preprocessor loaded_preprocessor = DataProcessorPipeline.from_pretrained( - tmpdir, to_transition=lambda x: x, to_output=lambda x: x + tmpdir, to_transition=identity_transition, to_output=identity_transition ) # Test that loaded processor works @@ -319,8 +307,8 @@ def test_vqbet_processor_mixed_precision(): preprocessor, postprocessor = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Replace DeviceProcessorStep with one that uses float16 @@ -368,8 +356,8 @@ def test_vqbet_processor_large_batch(): preprocessor, postprocessor = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Test with large batch @@ -398,8 +386,8 @@ def test_vqbet_processor_sequential_processing(): preprocessor, postprocessor = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Process multiple samples sequentially @@ -432,8 +420,8 @@ def test_vqbet_processor_bfloat16_device_float32_normalizer(): preprocessor, _ = make_vqbet_pre_post_processors( config, stats, - preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, - postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x}, + preprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, + postprocessor_kwargs={"to_transition": identity_transition, "to_output": identity_transition}, ) # Modify the pipeline to use bfloat16 device processor with float32 normalizer