diff --git a/tests/processor/test_act_processor.py b/tests/processor/test_act_processor.py new file mode 100644 index 000000000..03fa35a2b --- /dev/null +++ b/tests/processor/test_act_processor.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for ACT policy processor.""" + +import tempfile + +import pytest +import torch + +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.constants import ACTION, OBS_STATE +from lerobot.policies.act.configuration_act import ACTConfig +from lerobot.policies.act.processor_act import make_act_processor +from lerobot.processor import ( + DeviceProcessor, + NormalizerProcessor, + RenameProcessor, + RobotProcessor, + ToBatchProcessor, + UnnormalizerProcessor, +) +from lerobot.processor.pipeline import TransitionKey + + +def create_transition(observation=None, action=None, **kwargs): + """Helper function to create a transition dictionary.""" + transition = {} + if observation is not None: + transition[TransitionKey.OBSERVATION] = observation + if action is not None: + transition[TransitionKey.ACTION] = action + for key, value in kwargs.items(): + if hasattr(TransitionKey, key.upper()): + transition[getattr(TransitionKey, key.upper())] = value + return transition + + +def create_default_config(): + """Create a default ACT configuration for testing.""" + config = ACTConfig() + config.input_features = { + OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(7,)), + } + config.output_features = { + ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(4,)), + } + config.normalization_mapping = { + FeatureType.STATE: NormalizationMode.MEAN_STD, + FeatureType.ACTION: NormalizationMode.MEAN_STD, + } + config.device = "cpu" + return config + + +def create_default_stats(): + """Create default dataset statistics for testing.""" + return { + OBS_STATE: {"mean": torch.zeros(7), "std": torch.ones(7)}, + ACTION: {"mean": torch.zeros(4), "std": torch.ones(4)}, + } + + +def test_make_act_processor_basic(): + """Test basic creation of ACT processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_act_processor(config, stats) + + # Check processor names + assert preprocessor.name == "robot_preprocessor" + assert postprocessor.name == "robot_postprocessor" + + # Check steps in preprocessor + assert len(preprocessor.steps) == 4 + assert isinstance(preprocessor.steps[0], RenameProcessor) + assert isinstance(preprocessor.steps[1], NormalizerProcessor) + assert isinstance(preprocessor.steps[2], ToBatchProcessor) + assert isinstance(preprocessor.steps[3], DeviceProcessor) + + # Check steps in postprocessor + assert len(postprocessor.steps) == 2 + assert isinstance(postprocessor.steps[0], DeviceProcessor) + assert isinstance(postprocessor.steps[1], UnnormalizerProcessor) + + +def test_act_processor_normalization(): + """Test that ACT processor correctly normalizes and unnormalizes data.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_act_processor(config, stats) + + # Create test data + observation = {OBS_STATE: torch.randn(7)} + action = torch.randn(4) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is normalized and batched + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 7) + assert processed[TransitionKey.ACTION].shape == (1, 4) + + # Process action through postprocessor + action_transition = create_transition(action=processed[TransitionKey.ACTION]) + postprocessed = postprocessor(action_transition) + + # Check that action is unnormalized + assert postprocessed[TransitionKey.ACTION].shape == (1, 4) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_act_processor_cuda(): + """Test ACT processor with CUDA device.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + preprocessor, postprocessor = make_act_processor(config, stats) + + # Create CPU data + observation = {OBS_STATE: torch.randn(7)} + action = torch.randn(4) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is on CUDA + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda" + assert processed[TransitionKey.ACTION].device.type == "cuda" + + # Process through postprocessor + action_transition = create_transition(action=processed[TransitionKey.ACTION]) + postprocessed = postprocessor(action_transition) + + # Check that action is back on CPU + assert postprocessed[TransitionKey.ACTION].device.type == "cpu" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_act_processor_accelerate_scenario(): + """Test ACT processor in simulated Accelerate scenario (data already on GPU).""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_act_processor(config, stats) + + # Simulate Accelerate: data already on GPU + device = torch.device("cuda:0") + observation = {OBS_STATE: torch.randn(1, 7).to(device)} # Already batched and on GPU + action = torch.randn(1, 4).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on same GPU (not moved unnecessarily) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.ACTION].device == device + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_act_processor_multi_gpu(): + """Test ACT processor with multi-GPU setup.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_act_processor(config, stats) + + # Simulate data on different GPU (like in multi-GPU training) + device = torch.device("cuda:1") + observation = {OBS_STATE: torch.randn(1, 7).to(device)} + action = torch.randn(1, 4).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on cuda:1 (not moved to cuda:0) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.ACTION].device == device + + +def test_act_processor_without_stats(): + """Test ACT processor creation without dataset statistics.""" + config = create_default_config() + + preprocessor, postprocessor = make_act_processor(config, dataset_stats=None) + + # Should still create processors, but normalization won't have stats + assert preprocessor is not None + assert postprocessor is not None + + # Process should still work (but won't normalize without stats) + observation = {OBS_STATE: torch.randn(7)} + action = torch.randn(4) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed is not None + + +def test_act_processor_save_and_load(): + """Test saving and loading ACT processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_act_processor(config, stats) + + with tempfile.TemporaryDirectory() as tmpdir: + # Save preprocessor + preprocessor.save_pretrained(tmpdir) + + # Load preprocessor + loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir) + + # Test that loaded processor works + observation = {OBS_STATE: torch.randn(7)} + action = torch.randn(4) + transition = create_transition(observation, action) + + processed = loaded_preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 7) + assert processed[TransitionKey.ACTION].shape == (1, 4) + + +def test_act_processor_device_placement_preservation(): + """Test that ACT processor preserves device placement correctly.""" + config = create_default_config() + stats = create_default_stats() + + # Test with CPU config + config.device = "cpu" + preprocessor, _ = make_act_processor(config, stats) + + # Process CPU data + observation = {OBS_STATE: torch.randn(7)} + action = torch.randn(4) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cpu" + assert processed[TransitionKey.ACTION].device.type == "cpu" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_act_processor_mixed_precision(): + """Test ACT processor with mixed precision (float16).""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + # Modify the device processor to use float16 + preprocessor, postprocessor = make_act_processor(config, stats) + + # Replace DeviceProcessor with one that uses float16 + for i, step in enumerate(preprocessor.steps): + if isinstance(step, DeviceProcessor): + preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16") + + # Create test data + observation = {OBS_STATE: torch.randn(7, dtype=torch.float32)} + action = torch.randn(4, dtype=torch.float32) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is converted to float16 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16 + assert processed[TransitionKey.ACTION].dtype == torch.float16 + + +def test_act_processor_batch_consistency(): + """Test that ACT processor handles different batch sizes correctly.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_act_processor(config, stats) + + # Test single sample (unbatched) + observation = {OBS_STATE: torch.randn(7)} + action = torch.randn(4) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape[0] == 1 # Batched + + # Test already batched data + observation_batched = {OBS_STATE: torch.randn(8, 7)} # Batch of 8 + action_batched = torch.randn(8, 4) + transition_batched = create_transition(observation_batched, action_batched) + + processed_batched = preprocessor(transition_batched) + assert processed_batched[TransitionKey.OBSERVATION][OBS_STATE].shape[0] == 8 + assert processed_batched[TransitionKey.ACTION].shape[0] == 8 diff --git a/tests/processor/test_classifier_processor.py b/tests/processor/test_classifier_processor.py new file mode 100644 index 000000000..ddc873984 --- /dev/null +++ b/tests/processor/test_classifier_processor.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for Reward Classifier processor.""" + +import tempfile + +import pytest +import torch + +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.constants import OBS_IMAGE, OBS_STATE +from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig +from lerobot.policies.sac.reward_model.processor_classifier import make_classifier_processor +from lerobot.processor import DeviceProcessor, IdentityProcessor, NormalizerProcessor, RobotProcessor +from lerobot.processor.pipeline import TransitionKey + + +def create_transition(observation=None, action=None, **kwargs): + """Helper function to create a transition dictionary.""" + transition = {} + if observation is not None: + transition[TransitionKey.OBSERVATION] = observation + if action is not None: + transition[TransitionKey.ACTION] = action + for key, value in kwargs.items(): + if hasattr(TransitionKey, key.upper()): + transition[getattr(TransitionKey, key.upper())] = value + return transition + + +def create_default_config(): + """Create a default Reward Classifier configuration for testing.""" + config = RewardClassifierConfig() + config.input_features = { + OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)), + OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)), + } + config.output_features = { + "reward": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), # Classifier output + } + config.normalization_mapping = { + FeatureType.STATE: NormalizationMode.MEAN_STD, + FeatureType.VISUAL: NormalizationMode.IDENTITY, + FeatureType.ACTION: NormalizationMode.IDENTITY, # No normalization for classifier output + } + config.device = "cpu" + return config + + +def create_default_stats(): + """Create default dataset statistics for testing.""" + return { + OBS_STATE: {"mean": torch.zeros(10), "std": torch.ones(10)}, + OBS_IMAGE: {}, # No normalization for images + "reward": {}, # No normalization for classifier output + } + + +def test_make_classifier_processor_basic(): + """Test basic creation of Classifier processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_classifier_processor(config, stats) + + # Check processor names + assert preprocessor.name == "classifier_preprocessor" + assert postprocessor.name == "classifier_postprocessor" + + # Check steps in preprocessor + assert len(preprocessor.steps) == 3 + assert isinstance(preprocessor.steps[0], NormalizerProcessor) # For input features + assert isinstance(preprocessor.steps[1], NormalizerProcessor) # For output features + assert isinstance(preprocessor.steps[2], DeviceProcessor) + + # Check steps in postprocessor + assert len(postprocessor.steps) == 2 + assert isinstance(postprocessor.steps[0], DeviceProcessor) + assert isinstance(postprocessor.steps[1], IdentityProcessor) + + +def test_classifier_processor_normalization(): + """Test that Classifier processor correctly normalizes data.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_classifier_processor(config, stats) + + # Create test data + observation = { + OBS_STATE: torch.randn(10), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(1) # Dummy action/reward + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is processed + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (10,) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (1,) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_classifier_processor_cuda(): + """Test Classifier processor with CUDA device.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + preprocessor, postprocessor = make_classifier_processor(config, stats) + + # Create CPU data + observation = { + OBS_STATE: torch.randn(10), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(1) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is on CUDA + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda" + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda" + assert processed[TransitionKey.ACTION].device.type == "cuda" + + # Process through postprocessor + reward_transition = create_transition(action=processed[TransitionKey.ACTION]) + postprocessed = postprocessor(reward_transition) + + # Check that output is back on CPU + assert postprocessed[TransitionKey.ACTION].device.type == "cpu" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_classifier_processor_accelerate_scenario(): + """Test Classifier processor in simulated Accelerate scenario.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_classifier_processor(config, stats) + + # Simulate Accelerate: data already on GPU + device = torch.device("cuda:0") + observation = { + OBS_STATE: torch.randn(10).to(device), + OBS_IMAGE: torch.randn(3, 224, 224).to(device), + } + action = torch.randn(1).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on same GPU + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_classifier_processor_multi_gpu(): + """Test Classifier processor with multi-GPU setup.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_classifier_processor(config, stats) + + # Simulate data on different GPU + device = torch.device("cuda:1") + observation = { + OBS_STATE: torch.randn(10).to(device), + OBS_IMAGE: torch.randn(3, 224, 224).to(device), + } + action = torch.randn(1).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on cuda:1 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +def test_classifier_processor_without_stats(): + """Test Classifier processor creation without dataset statistics.""" + config = create_default_config() + + preprocessor, postprocessor = make_classifier_processor(config, dataset_stats=None) + + # Should still create processors + assert preprocessor is not None + assert postprocessor is not None + + # Process should still work + observation = { + OBS_STATE: torch.randn(10), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(1) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed is not None + + +def test_classifier_processor_save_and_load(): + """Test saving and loading Classifier processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_classifier_processor(config, stats) + + with tempfile.TemporaryDirectory() as tmpdir: + # Save preprocessor + preprocessor.save_pretrained(tmpdir) + + # Load preprocessor + loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir) + + # Test that loaded processor works + observation = { + OBS_STATE: torch.randn(10), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(1) + transition = create_transition(observation, action) + + processed = loaded_preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (10,) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (1,) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_classifier_processor_mixed_precision(): + """Test Classifier processor with mixed precision.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + # Create processor + preprocessor, postprocessor = make_classifier_processor(config, stats) + + # Replace DeviceProcessor with one that uses float16 + for i, step in enumerate(preprocessor.steps): + if isinstance(step, DeviceProcessor): + preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16") + + # Create test data + observation = { + OBS_STATE: torch.randn(10, dtype=torch.float32), + OBS_IMAGE: torch.randn(3, 224, 224, dtype=torch.float32), + } + action = torch.randn(1, dtype=torch.float32) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is converted to float16 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16 + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].dtype == torch.float16 + assert processed[TransitionKey.ACTION].dtype == torch.float16 + + +def test_classifier_processor_batch_data(): + """Test Classifier processor with batched data.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_classifier_processor(config, stats) + + # Test with batched data + batch_size = 16 + observation = { + OBS_STATE: torch.randn(batch_size, 10), + OBS_IMAGE: torch.randn(batch_size, 3, 224, 224), + } + action = torch.randn(batch_size, 1) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that batch dimension is preserved + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (batch_size, 10) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (batch_size, 3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (batch_size, 1) + + +def test_classifier_processor_postprocessor_identity(): + """Test that Classifier postprocessor uses IdentityProcessor correctly.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_classifier_processor(config, stats) + + # Create test data for postprocessor + reward = torch.tensor([[0.8], [0.3], [0.9]]) # Batch of rewards/predictions + transition = create_transition(action=reward) + + # Process through postprocessor + processed = postprocessor(transition) + + # IdentityProcessor should leave values unchanged (except device) + assert torch.allclose(processed[TransitionKey.ACTION].cpu(), reward.cpu()) + assert processed[TransitionKey.ACTION].device.type == "cpu" diff --git a/tests/processor/test_diffusion_processor.py b/tests/processor/test_diffusion_processor.py new file mode 100644 index 000000000..4b029d64c --- /dev/null +++ b/tests/processor/test_diffusion_processor.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for Diffusion policy processor.""" + +import tempfile + +import pytest +import torch + +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE +from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig +from lerobot.policies.diffusion.processor_diffusion import make_diffusion_processor +from lerobot.processor import ( + DeviceProcessor, + NormalizerProcessor, + RenameProcessor, + RobotProcessor, + ToBatchProcessor, + UnnormalizerProcessor, +) +from lerobot.processor.pipeline import TransitionKey + + +def create_transition(observation=None, action=None, **kwargs): + """Helper function to create a transition dictionary.""" + transition = {} + if observation is not None: + transition[TransitionKey.OBSERVATION] = observation + if action is not None: + transition[TransitionKey.ACTION] = action + for key, value in kwargs.items(): + if hasattr(TransitionKey, key.upper()): + transition[getattr(TransitionKey, key.upper())] = value + return transition + + +def create_default_config(): + """Create a default Diffusion configuration for testing.""" + config = DiffusionConfig() + config.input_features = { + OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(7,)), + OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)), + } + config.output_features = { + ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(6,)), + } + config.normalization_mapping = { + FeatureType.STATE: NormalizationMode.MEAN_STD, + FeatureType.VISUAL: NormalizationMode.IDENTITY, + FeatureType.ACTION: NormalizationMode.MIN_MAX, + } + config.device = "cpu" + return config + + +def create_default_stats(): + """Create default dataset statistics for testing.""" + return { + OBS_STATE: {"mean": torch.zeros(7), "std": torch.ones(7)}, + OBS_IMAGE: {}, # No normalization for images + ACTION: {"min": torch.full((6,), -1.0), "max": torch.ones(6)}, + } + + +def test_make_diffusion_processor_basic(): + """Test basic creation of Diffusion processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_diffusion_processor(config, stats) + + # Check processor names + assert preprocessor.name == "robot_preprocessor" + assert postprocessor.name == "robot_postprocessor" + + # Check steps in preprocessor + assert len(preprocessor.steps) == 4 + assert isinstance(preprocessor.steps[0], RenameProcessor) + assert isinstance(preprocessor.steps[1], NormalizerProcessor) + assert isinstance(preprocessor.steps[2], ToBatchProcessor) + assert isinstance(preprocessor.steps[3], DeviceProcessor) + + # Check steps in postprocessor + assert len(postprocessor.steps) == 2 + assert isinstance(postprocessor.steps[0], DeviceProcessor) + assert isinstance(postprocessor.steps[1], UnnormalizerProcessor) + + +def test_diffusion_processor_with_images(): + """Test Diffusion processor with image observations.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_diffusion_processor(config, stats) + + # Create test data with images + observation = { + OBS_STATE: torch.randn(7), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(6) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is batched + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 7) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (1, 6) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_diffusion_processor_cuda(): + """Test Diffusion processor with CUDA device.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + preprocessor, postprocessor = make_diffusion_processor(config, stats) + + # Create CPU data + observation = { + OBS_STATE: torch.randn(7), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(6) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is on CUDA + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda" + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda" + assert processed[TransitionKey.ACTION].device.type == "cuda" + + # Process through postprocessor + action_transition = create_transition(action=processed[TransitionKey.ACTION]) + postprocessed = postprocessor(action_transition) + + # Check that action is back on CPU + assert postprocessed[TransitionKey.ACTION].device.type == "cpu" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_diffusion_processor_accelerate_scenario(): + """Test Diffusion processor in simulated Accelerate scenario.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_diffusion_processor(config, stats) + + # Simulate Accelerate: data already on GPU + device = torch.device("cuda:0") + observation = { + OBS_STATE: torch.randn(1, 7).to(device), + OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), + } + action = torch.randn(1, 6).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on same GPU + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_diffusion_processor_multi_gpu(): + """Test Diffusion processor with multi-GPU setup.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_diffusion_processor(config, stats) + + # Simulate data on different GPU + device = torch.device("cuda:1") + observation = { + OBS_STATE: torch.randn(1, 7).to(device), + OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), + } + action = torch.randn(1, 6).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on cuda:1 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +def test_diffusion_processor_without_stats(): + """Test Diffusion processor creation without dataset statistics.""" + config = create_default_config() + + preprocessor, postprocessor = make_diffusion_processor(config, dataset_stats=None) + + # Should still create processors + assert preprocessor is not None + assert postprocessor is not None + + # Process should still work + observation = { + OBS_STATE: torch.randn(7), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(6) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed is not None + + +def test_diffusion_processor_save_and_load(): + """Test saving and loading Diffusion processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_diffusion_processor(config, stats) + + with tempfile.TemporaryDirectory() as tmpdir: + # Save preprocessor + preprocessor.save_pretrained(tmpdir) + + # Load preprocessor + loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir) + + # Test that loaded processor works + observation = { + OBS_STATE: torch.randn(7), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(6) + transition = create_transition(observation, action) + + processed = loaded_preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 7) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (1, 6) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_diffusion_processor_mixed_precision(): + """Test Diffusion processor with mixed precision.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + # Create processor + preprocessor, postprocessor = make_diffusion_processor(config, stats) + + # Replace DeviceProcessor with one that uses float16 + for i, step in enumerate(preprocessor.steps): + if isinstance(step, DeviceProcessor): + preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16") + + # Create test data + observation = { + OBS_STATE: torch.randn(7, dtype=torch.float32), + OBS_IMAGE: torch.randn(3, 224, 224, dtype=torch.float32), + } + action = torch.randn(6, dtype=torch.float32) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is converted to float16 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16 + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].dtype == torch.float16 + assert processed[TransitionKey.ACTION].dtype == torch.float16 + + +def test_diffusion_processor_identity_normalization(): + """Test that images with IDENTITY normalization are not normalized.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_diffusion_processor(config, stats) + + # Create test data + image_value = torch.rand(3, 224, 224) * 255 # Large values + observation = { + OBS_STATE: torch.randn(7), + OBS_IMAGE: image_value.clone(), + } + action = torch.randn(6) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Image should not be normalized (IDENTITY mode) + # Just batched + assert torch.allclose(processed[TransitionKey.OBSERVATION][OBS_IMAGE][0], image_value, rtol=1e-5) + + +def test_diffusion_processor_batch_consistency(): + """Test Diffusion processor with different batch sizes.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_diffusion_processor(config, stats) + + # Test with different batch sizes + for batch_size in [1, 8, 32]: + observation = { + OBS_STATE: torch.randn(batch_size, 7) if batch_size > 1 else torch.randn(7), + OBS_IMAGE: torch.randn(batch_size, 3, 224, 224) if batch_size > 1 else torch.randn(3, 224, 224), + } + action = torch.randn(batch_size, 6) if batch_size > 1 else torch.randn(6) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + + # Check correct batch size + expected_batch = batch_size if batch_size > 1 else 1 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape[0] == expected_batch + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape[0] == expected_batch + assert processed[TransitionKey.ACTION].shape[0] == expected_batch diff --git a/tests/processor/test_pi0_processor.py b/tests/processor/test_pi0_processor.py new file mode 100644 index 000000000..41a41ca2c --- /dev/null +++ b/tests/processor/test_pi0_processor.py @@ -0,0 +1,336 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for PI0 policy processor.""" + +from unittest.mock import patch + +import pytest +import torch + +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE +from lerobot.policies.pi0.configuration_pi0 import PI0Config +from lerobot.policies.pi0.processor_pi0 import Pi0NewLineProcessor, make_pi0_processor +from lerobot.processor import ( + DeviceProcessor, + NormalizerProcessor, + RenameProcessor, + ToBatchProcessor, + UnnormalizerProcessor, +) +from lerobot.processor.pipeline import TransitionKey + + +def create_transition(observation=None, action=None, **kwargs): + """Helper function to create a transition dictionary.""" + transition = {} + if observation is not None: + transition[TransitionKey.OBSERVATION] = observation + if action is not None: + transition[TransitionKey.ACTION] = action + for key, value in kwargs.items(): + if hasattr(TransitionKey, key.upper()): + transition[getattr(TransitionKey, key.upper())] = value + elif key == "complementary_data": + transition[TransitionKey.COMPLEMENTARY_DATA] = value + return transition + + +def create_default_config(): + """Create a default PI0 configuration for testing.""" + config = PI0Config() + config.input_features = { + OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)), + OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)), + } + config.output_features = { + ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(6,)), + } + config.normalization_mapping = { + FeatureType.STATE: NormalizationMode.MEAN_STD, + FeatureType.VISUAL: NormalizationMode.IDENTITY, + FeatureType.ACTION: NormalizationMode.MIN_MAX, + } + config.device = "cpu" + config.tokenizer_max_length = 128 + return config + + +def create_default_stats(): + """Create default dataset statistics for testing.""" + return { + OBS_STATE: {"mean": torch.zeros(10), "std": torch.ones(10)}, + OBS_IMAGE: {}, # No normalization for images + ACTION: {"min": torch.full((6,), -1.0), "max": torch.ones(6)}, + } + + +def test_make_pi0_processor_basic(): + """Test basic creation of PI0 processor.""" + config = create_default_config() + stats = create_default_stats() + + with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor"): + preprocessor, postprocessor = make_pi0_processor(config, stats) + + # Check processor names + assert preprocessor.name == "robot_preprocessor" + assert postprocessor.name == "robot_postprocessor" + + # Check steps in preprocessor + assert len(preprocessor.steps) == 6 + assert isinstance(preprocessor.steps[0], RenameProcessor) + assert isinstance(preprocessor.steps[1], NormalizerProcessor) + assert isinstance(preprocessor.steps[2], ToBatchProcessor) + assert isinstance(preprocessor.steps[3], Pi0NewLineProcessor) + # Step 4 would be TokenizerProcessor but it's mocked + assert isinstance(preprocessor.steps[5], DeviceProcessor) + + # Check steps in postprocessor + assert len(postprocessor.steps) == 2 + assert isinstance(postprocessor.steps[0], DeviceProcessor) + assert isinstance(postprocessor.steps[1], UnnormalizerProcessor) + + +def test_pi0_newline_processor_single_task(): + """Test Pi0NewLineProcessor with single task string.""" + processor = Pi0NewLineProcessor() + + # Test with task that doesn't have newline + transition = create_transition(complementary_data={"task": "test task"}) + result = processor(transition) + assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n" + + # Test with task that already has newline + transition = create_transition(complementary_data={"task": "test task\n"}) + result = processor(transition) + assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n" + + +def test_pi0_newline_processor_list_of_tasks(): + """Test Pi0NewLineProcessor with list of task strings.""" + processor = Pi0NewLineProcessor() + + # Test with list of tasks + tasks = ["task1", "task2\n", "task3"] + transition = create_transition(complementary_data={"task": tasks}) + result = processor(transition) + expected = ["task1\n", "task2\n", "task3\n"] + assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == expected + + +def test_pi0_newline_processor_empty_transition(): + """Test Pi0NewLineProcessor with empty transition.""" + processor = Pi0NewLineProcessor() + + # Test with no complementary_data + transition = create_transition() + result = processor(transition) + assert result == transition + + # Test with complementary_data but no task + transition = create_transition(complementary_data={"other": "data"}) + result = processor(transition) + assert result == transition + + # Test with None task + transition = create_transition(complementary_data={"task": None}) + result = processor(transition) + assert result == transition + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_pi0_processor_cuda(): + """Test PI0 processor with CUDA device.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + # Mock the tokenizer processor to act as pass-through + class MockTokenizerProcessor: + def __init__(self, *args, **kwargs): + pass + + def __call__(self, transition): + return transition + + def state_dict(self): + return {} + + def load_state_dict(self, state): + pass + + def reset(self): + pass + + def get_config(self): + return {"tokenizer_name": "google/paligemma-3b-pt-224"} + + def transform_features(self, features): + return features + + with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor", MockTokenizerProcessor): + preprocessor, postprocessor = make_pi0_processor(config, stats) + + # Create CPU data + observation = { + OBS_STATE: torch.randn(10), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(6) + transition = create_transition(observation, action, complementary_data={"task": "test task"}) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is on CUDA + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda" + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda" + assert processed[TransitionKey.ACTION].device.type == "cuda" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_pi0_processor_accelerate_scenario(): + """Test PI0 processor in simulated Accelerate scenario.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + # Mock the tokenizer processor to act as pass-through + class MockTokenizerProcessor: + def __init__(self, *args, **kwargs): + pass + + def __call__(self, transition): + return transition + + def state_dict(self): + return {} + + def load_state_dict(self, state): + pass + + def reset(self): + pass + + def get_config(self): + return {"tokenizer_name": "google/paligemma-3b-pt-224"} + + def transform_features(self, features): + return features + + with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor", MockTokenizerProcessor): + preprocessor, postprocessor = make_pi0_processor(config, stats) + + # Simulate Accelerate: data already on GPU and batched + device = torch.device("cuda:0") + observation = { + OBS_STATE: torch.randn(1, 10).to(device), + OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), + } + action = torch.randn(1, 6).to(device) + transition = create_transition(observation, action, complementary_data={"task": ["test task"]}) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on same GPU + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_pi0_processor_multi_gpu(): + """Test PI0 processor with multi-GPU setup.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + # Mock the tokenizer processor to act as pass-through + class MockTokenizerProcessor: + def __init__(self, *args, **kwargs): + pass + + def __call__(self, transition): + return transition + + def state_dict(self): + return {} + + def load_state_dict(self, state): + pass + + def reset(self): + pass + + def get_config(self): + return {"tokenizer_name": "google/paligemma-3b-pt-224"} + + def transform_features(self, features): + return features + + with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor", MockTokenizerProcessor): + preprocessor, postprocessor = make_pi0_processor(config, stats) + + # Simulate data on different GPU + device = torch.device("cuda:1") + observation = { + OBS_STATE: torch.randn(1, 10).to(device), + OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), + } + action = torch.randn(1, 6).to(device) + transition = create_transition(observation, action, complementary_data={"task": ["test task"]}) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on cuda:1 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +def test_pi0_processor_without_stats(): + """Test PI0 processor creation without dataset statistics.""" + config = create_default_config() + + # Mock the tokenizer processor + with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor"): + preprocessor, postprocessor = make_pi0_processor(config, dataset_stats=None) + + # Should still create processors + assert preprocessor is not None + assert postprocessor is not None + + +def test_pi0_newline_processor_state_dict(): + """Test Pi0NewLineProcessor state dict methods.""" + processor = Pi0NewLineProcessor() + + # Test state_dict (should be empty) + state = processor.state_dict() + assert state == {} + + # Test load_state_dict (should do nothing) + processor.load_state_dict({}) + + # Test reset (should do nothing) + processor.reset() + + # Test get_config + config = processor.get_config() + assert config == {} diff --git a/tests/processor/test_sac_processor.py b/tests/processor/test_sac_processor.py new file mode 100644 index 000000000..f0825acb4 --- /dev/null +++ b/tests/processor/test_sac_processor.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for SAC policy processor.""" + +import tempfile + +import pytest +import torch + +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.constants import ACTION, OBS_STATE +from lerobot.policies.sac.configuration_sac import SACConfig +from lerobot.policies.sac.processor_sac import make_sac_processor +from lerobot.processor import ( + DeviceProcessor, + NormalizerProcessor, + RenameProcessor, + RobotProcessor, + ToBatchProcessor, + UnnormalizerProcessor, +) +from lerobot.processor.pipeline import TransitionKey + + +def create_transition(observation=None, action=None, **kwargs): + """Helper function to create a transition dictionary.""" + transition = {} + if observation is not None: + transition[TransitionKey.OBSERVATION] = observation + if action is not None: + transition[TransitionKey.ACTION] = action + for key, value in kwargs.items(): + if hasattr(TransitionKey, key.upper()): + transition[getattr(TransitionKey, key.upper())] = value + return transition + + +def create_default_config(): + """Create a default SAC configuration for testing.""" + config = SACConfig() + config.input_features = { + OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)), + } + config.output_features = { + ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(5,)), + } + config.normalization_mapping = { + FeatureType.STATE: NormalizationMode.MEAN_STD, + FeatureType.ACTION: NormalizationMode.MIN_MAX, + } + config.device = "cpu" + return config + + +def create_default_stats(): + """Create default dataset statistics for testing.""" + return { + OBS_STATE: {"mean": torch.zeros(10), "std": torch.ones(10)}, + ACTION: {"min": torch.full((5,), -1.0), "max": torch.ones(5)}, + } + + +def test_make_sac_processor_basic(): + """Test basic creation of SAC processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_sac_processor(config, stats) + + # Check processor names + assert preprocessor.name == "robot_preprocessor" + assert postprocessor.name == "robot_postprocessor" + + # Check steps in preprocessor + assert len(preprocessor.steps) == 4 + assert isinstance(preprocessor.steps[0], RenameProcessor) + assert isinstance(preprocessor.steps[1], NormalizerProcessor) + assert isinstance(preprocessor.steps[2], ToBatchProcessor) + assert isinstance(preprocessor.steps[3], DeviceProcessor) + + # Check steps in postprocessor + assert len(postprocessor.steps) == 2 + assert isinstance(postprocessor.steps[0], DeviceProcessor) + assert isinstance(postprocessor.steps[1], UnnormalizerProcessor) + + +def test_sac_processor_normalization_modes(): + """Test that SAC processor correctly handles different normalization modes.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_sac_processor(config, stats) + + # Create test data + observation = {OBS_STATE: torch.randn(10) * 2} # Larger values to test normalization + action = torch.rand(5) * 2 - 1 # Range [-1, 1] + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is normalized and batched + # State should be mean-std normalized + # Action should be min-max normalized to [-1, 1] + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 10) + assert processed[TransitionKey.ACTION].shape == (1, 5) + + # Process action through postprocessor + action_transition = create_transition(action=processed[TransitionKey.ACTION]) + postprocessed = postprocessor(action_transition) + + # Check that action is unnormalized (but still batched) + assert postprocessed[TransitionKey.ACTION].shape == (1, 5) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_sac_processor_cuda(): + """Test SAC processor with CUDA device.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + preprocessor, postprocessor = make_sac_processor(config, stats) + + # Create CPU data + observation = {OBS_STATE: torch.randn(10)} + action = torch.randn(5) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is on CUDA + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda" + assert processed[TransitionKey.ACTION].device.type == "cuda" + + # Process through postprocessor + action_transition = create_transition(action=processed[TransitionKey.ACTION]) + postprocessed = postprocessor(action_transition) + + # Check that action is back on CPU + assert postprocessed[TransitionKey.ACTION].device.type == "cpu" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_sac_processor_accelerate_scenario(): + """Test SAC processor in simulated Accelerate scenario.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_sac_processor(config, stats) + + # Simulate Accelerate: data already on GPU + device = torch.device("cuda:0") + observation = {OBS_STATE: torch.randn(10).to(device)} + action = torch.randn(5).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on same GPU + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.ACTION].device == device + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_sac_processor_multi_gpu(): + """Test SAC processor with multi-GPU setup.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_sac_processor(config, stats) + + # Simulate data on different GPU + device = torch.device("cuda:1") + observation = {OBS_STATE: torch.randn(10).to(device)} + action = torch.randn(5).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on cuda:1 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.ACTION].device == device + + +def test_sac_processor_without_stats(): + """Test SAC processor creation without dataset statistics.""" + config = create_default_config() + + preprocessor, postprocessor = make_sac_processor(config, dataset_stats=None) + + # Should still create processors + assert preprocessor is not None + assert postprocessor is not None + + # Process should still work + observation = {OBS_STATE: torch.randn(10)} + action = torch.randn(5) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed is not None + + +def test_sac_processor_save_and_load(): + """Test saving and loading SAC processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_sac_processor(config, stats) + + with tempfile.TemporaryDirectory() as tmpdir: + # Save preprocessor + preprocessor.save_pretrained(tmpdir) + + # Load preprocessor + loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir) + + # Test that loaded processor works + observation = {OBS_STATE: torch.randn(10)} + action = torch.randn(5) + transition = create_transition(observation, action) + + processed = loaded_preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 10) + assert processed[TransitionKey.ACTION].shape == (1, 5) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_sac_processor_mixed_precision(): + """Test SAC processor with mixed precision.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + # Create processor + preprocessor, postprocessor = make_sac_processor(config, stats) + + # Replace DeviceProcessor with one that uses float16 + for i, step in enumerate(preprocessor.steps): + if isinstance(step, DeviceProcessor): + preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16") + + # Create test data + observation = {OBS_STATE: torch.randn(10, dtype=torch.float32)} + action = torch.randn(5, dtype=torch.float32) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is converted to float16 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16 + assert processed[TransitionKey.ACTION].dtype == torch.float16 + + +def test_sac_processor_batch_data(): + """Test SAC processor with batched data.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_sac_processor(config, stats) + + # Test with batched data + batch_size = 32 + observation = {OBS_STATE: torch.randn(batch_size, 10)} + action = torch.randn(batch_size, 5) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that batch dimension is preserved + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (batch_size, 10) + assert processed[TransitionKey.ACTION].shape == (batch_size, 5) + + +def test_sac_processor_edge_cases(): + """Test SAC processor with edge cases.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_sac_processor(config, stats) + + # Test with empty observation + transition = create_transition(observation={}, action=torch.randn(5)) + processed = preprocessor(transition) + assert processed[TransitionKey.OBSERVATION] == {} + assert processed[TransitionKey.ACTION].shape == (1, 5) + + # Test with None action + transition = create_transition(observation={OBS_STATE: torch.randn(10)}, action=None) + processed = preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 10) + # When action is None, it may still be present with None value + assert TransitionKey.ACTION not in processed or processed[TransitionKey.ACTION] is None diff --git a/tests/processor/test_smolvla_processor.py b/tests/processor/test_smolvla_processor.py new file mode 100644 index 000000000..be538b017 --- /dev/null +++ b/tests/processor/test_smolvla_processor.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for SmolVLA policy processor.""" + +from unittest.mock import patch + +import pytest +import torch + +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE +from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig +from lerobot.policies.smolvla.processor_smolvla import SmolVLANewLineProcessor, make_smolvla_processor +from lerobot.processor import ( + DeviceProcessor, + NormalizerProcessor, + RenameProcessor, + ToBatchProcessor, + UnnormalizerProcessor, +) +from lerobot.processor.pipeline import TransitionKey + + +def create_transition(observation=None, action=None, **kwargs): + """Helper function to create a transition dictionary.""" + transition = {} + if observation is not None: + transition[TransitionKey.OBSERVATION] = observation + if action is not None: + transition[TransitionKey.ACTION] = action + for key, value in kwargs.items(): + if hasattr(TransitionKey, key.upper()): + transition[getattr(TransitionKey, key.upper())] = value + elif key == "complementary_data": + transition[TransitionKey.COMPLEMENTARY_DATA] = value + return transition + + +def create_default_config(): + """Create a default SmolVLA configuration for testing.""" + config = SmolVLAConfig() + config.input_features = { + OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(8,)), + OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)), + } + config.output_features = { + ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(7,)), + } + config.normalization_mapping = { + FeatureType.STATE: NormalizationMode.MEAN_STD, + FeatureType.VISUAL: NormalizationMode.IDENTITY, + FeatureType.ACTION: NormalizationMode.MIN_MAX, + } + config.device = "cpu" + config.vlm_model_name = "HuggingFaceTB/SmolVLM-Instruct" + config.pad_language_to = "max_length" + config.tokenizer_max_length = 100 + return config + + +def create_default_stats(): + """Create default dataset statistics for testing.""" + return { + OBS_STATE: {"mean": torch.zeros(8), "std": torch.ones(8)}, + OBS_IMAGE: {}, # No normalization for images + ACTION: {"min": torch.full((7,), -1.0), "max": torch.ones(7)}, + } + + +def test_make_smolvla_processor_basic(): + """Test basic creation of SmolVLA processor.""" + config = create_default_config() + stats = create_default_stats() + + with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor"): + preprocessor, postprocessor = make_smolvla_processor(config, stats) + + # Check processor names + assert preprocessor.name == "robot_preprocessor" + assert postprocessor.name == "robot_postprocessor" + + # Check steps in preprocessor + assert len(preprocessor.steps) == 6 + assert isinstance(preprocessor.steps[0], RenameProcessor) + assert isinstance(preprocessor.steps[1], NormalizerProcessor) + assert isinstance(preprocessor.steps[2], ToBatchProcessor) + assert isinstance(preprocessor.steps[3], SmolVLANewLineProcessor) + # Step 4 would be TokenizerProcessor but it's mocked + assert isinstance(preprocessor.steps[5], DeviceProcessor) + + # Check steps in postprocessor + assert len(postprocessor.steps) == 2 + assert isinstance(postprocessor.steps[0], DeviceProcessor) + assert isinstance(postprocessor.steps[1], UnnormalizerProcessor) + + +def test_smolvla_newline_processor_single_task(): + """Test SmolVLANewLineProcessor with single task string.""" + processor = SmolVLANewLineProcessor() + + # Test with task that doesn't have newline + transition = create_transition(complementary_data={"task": "test task"}) + result = processor(transition) + assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n" + + # Test with task that already has newline + transition = create_transition(complementary_data={"task": "test task\n"}) + result = processor(transition) + assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n" + + +def test_smolvla_newline_processor_list_of_tasks(): + """Test SmolVLANewLineProcessor with list of task strings.""" + processor = SmolVLANewLineProcessor() + + # Test with list of tasks + tasks = ["task1", "task2\n", "task3"] + transition = create_transition(complementary_data={"task": tasks}) + result = processor(transition) + expected = ["task1\n", "task2\n", "task3\n"] + assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == expected + + +def test_smolvla_newline_processor_empty_transition(): + """Test SmolVLANewLineProcessor with empty transition.""" + processor = SmolVLANewLineProcessor() + + # Test with no complementary_data + transition = create_transition() + result = processor(transition) + assert result == transition + + # Test with complementary_data but no task + transition = create_transition(complementary_data={"other": "data"}) + result = processor(transition) + assert result == transition + + # Test with None task + transition = create_transition(complementary_data={"task": None}) + result = processor(transition) + assert result == transition + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_smolvla_processor_cuda(): + """Test SmolVLA processor with CUDA device.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + # Mock the tokenizer processor to act as pass-through + class MockTokenizerProcessor: + def __init__(self, *args, **kwargs): + pass + + def __call__(self, transition): + return transition + + def state_dict(self): + return {} + + def load_state_dict(self, state): + pass + + def reset(self): + pass + + def get_config(self): + return {"tokenizer_name": "HuggingFaceTB/SmolVLM-Instruct"} + + def transform_features(self, features): + return features + + with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor", MockTokenizerProcessor): + preprocessor, postprocessor = make_smolvla_processor(config, stats) + + # Create CPU data + observation = { + OBS_STATE: torch.randn(8), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(7) + transition = create_transition(observation, action, complementary_data={"task": "test task"}) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is on CUDA + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda" + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda" + assert processed[TransitionKey.ACTION].device.type == "cuda" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_smolvla_processor_accelerate_scenario(): + """Test SmolVLA processor in simulated Accelerate scenario.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + # Mock the tokenizer processor to act as pass-through + class MockTokenizerProcessor: + def __init__(self, *args, **kwargs): + pass + + def __call__(self, transition): + return transition + + def state_dict(self): + return {} + + def load_state_dict(self, state): + pass + + def reset(self): + pass + + def get_config(self): + return {"tokenizer_name": "HuggingFaceTB/SmolVLM-Instruct"} + + def transform_features(self, features): + return features + + with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor", MockTokenizerProcessor): + preprocessor, postprocessor = make_smolvla_processor(config, stats) + + # Simulate Accelerate: data already on GPU and batched + device = torch.device("cuda:0") + observation = { + OBS_STATE: torch.randn(1, 8).to(device), + OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), + } + action = torch.randn(1, 7).to(device) + transition = create_transition(observation, action, complementary_data={"task": ["test task"]}) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on same GPU + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_smolvla_processor_multi_gpu(): + """Test SmolVLA processor with multi-GPU setup.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + # Mock the tokenizer processor to act as pass-through + class MockTokenizerProcessor: + def __init__(self, *args, **kwargs): + pass + + def __call__(self, transition): + return transition + + def state_dict(self): + return {} + + def load_state_dict(self, state): + pass + + def reset(self): + pass + + def get_config(self): + return {"tokenizer_name": "HuggingFaceTB/SmolVLM-Instruct"} + + def transform_features(self, features): + return features + + with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor", MockTokenizerProcessor): + preprocessor, postprocessor = make_smolvla_processor(config, stats) + + # Simulate data on different GPU + device = torch.device("cuda:1") + observation = { + OBS_STATE: torch.randn(1, 8).to(device), + OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), + } + action = torch.randn(1, 7).to(device) + transition = create_transition(observation, action, complementary_data={"task": ["test task"]}) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on cuda:1 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +def test_smolvla_processor_without_stats(): + """Test SmolVLA processor creation without dataset statistics.""" + config = create_default_config() + + # Mock the tokenizer processor + with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor"): + preprocessor, postprocessor = make_smolvla_processor(config, dataset_stats=None) + + # Should still create processors + assert preprocessor is not None + assert postprocessor is not None + + +def test_smolvla_newline_processor_state_dict(): + """Test SmolVLANewLineProcessor state dict methods.""" + processor = SmolVLANewLineProcessor() + + # Test state_dict (should be empty) + state = processor.state_dict() + assert state == {} + + # Test load_state_dict (should do nothing) + processor.load_state_dict({}) + + # Test reset (should do nothing) + processor.reset() + + # Test get_config + config = processor.get_config() + assert config == {} + + +def test_smolvla_newline_processor_transform_features(): + """Test SmolVLANewLineProcessor transform_features method.""" + processor = SmolVLANewLineProcessor() + + # Test transform_features + features = { + OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)), + } + result = processor.transform_features(features) + assert result == features # Should return unchanged diff --git a/tests/processor/test_tdmpc_processor.py b/tests/processor/test_tdmpc_processor.py new file mode 100644 index 000000000..c6bac6442 --- /dev/null +++ b/tests/processor/test_tdmpc_processor.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for TDMPC policy processor.""" + +import tempfile + +import pytest +import torch + +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE +from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig +from lerobot.policies.tdmpc.processor_tdmpc import make_tdmpc_processor +from lerobot.processor import ( + DeviceProcessor, + NormalizerProcessor, + RenameProcessor, + RobotProcessor, + ToBatchProcessor, + UnnormalizerProcessor, +) +from lerobot.processor.pipeline import TransitionKey + + +def create_transition(observation=None, action=None, **kwargs): + """Helper function to create a transition dictionary.""" + transition = {} + if observation is not None: + transition[TransitionKey.OBSERVATION] = observation + if action is not None: + transition[TransitionKey.ACTION] = action + for key, value in kwargs.items(): + if hasattr(TransitionKey, key.upper()): + transition[getattr(TransitionKey, key.upper())] = value + return transition + + +def create_default_config(): + """Create a default TDMPC configuration for testing.""" + config = TDMPCConfig() + config.input_features = { + OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(12,)), + OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)), + } + config.output_features = { + ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(6,)), + } + config.normalization_mapping = { + FeatureType.STATE: NormalizationMode.MEAN_STD, + FeatureType.VISUAL: NormalizationMode.IDENTITY, + FeatureType.ACTION: NormalizationMode.MIN_MAX, + } + config.device = "cpu" + return config + + +def create_default_stats(): + """Create default dataset statistics for testing.""" + return { + OBS_STATE: {"mean": torch.zeros(12), "std": torch.ones(12)}, + OBS_IMAGE: {}, # No normalization for images + ACTION: {"min": torch.full((6,), -1.0), "max": torch.ones(6)}, + } + + +def test_make_tdmpc_processor_basic(): + """Test basic creation of TDMPC processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_tdmpc_processor(config, stats) + + # Check processor names + assert preprocessor.name == "robot_preprocessor" + assert postprocessor.name == "robot_postprocessor" + + # Check steps in preprocessor + assert len(preprocessor.steps) == 4 + assert isinstance(preprocessor.steps[0], RenameProcessor) + assert isinstance(preprocessor.steps[1], NormalizerProcessor) + assert isinstance(preprocessor.steps[2], ToBatchProcessor) + assert isinstance(preprocessor.steps[3], DeviceProcessor) + + # Check steps in postprocessor + assert len(postprocessor.steps) == 2 + assert isinstance(postprocessor.steps[0], DeviceProcessor) + assert isinstance(postprocessor.steps[1], UnnormalizerProcessor) + + +def test_tdmpc_processor_normalization(): + """Test that TDMPC processor correctly normalizes and unnormalizes data.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_tdmpc_processor(config, stats) + + # Create test data + observation = { + OBS_STATE: torch.randn(12), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(6) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is processed and batched + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 12) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (1, 6) + + # Process action through postprocessor + action_transition = create_transition(action=processed[TransitionKey.ACTION]) + postprocessed = postprocessor(action_transition) + + # Check that action is unnormalized (but still batched) + assert postprocessed[TransitionKey.ACTION].shape == (1, 6) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_tdmpc_processor_cuda(): + """Test TDMPC processor with CUDA device.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + preprocessor, postprocessor = make_tdmpc_processor(config, stats) + + # Create CPU data + observation = { + OBS_STATE: torch.randn(12), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(6) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is on CUDA + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda" + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda" + assert processed[TransitionKey.ACTION].device.type == "cuda" + + # Process through postprocessor + action_transition = create_transition(action=processed[TransitionKey.ACTION]) + postprocessed = postprocessor(action_transition) + + # Check that action is back on CPU + assert postprocessed[TransitionKey.ACTION].device.type == "cpu" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_tdmpc_processor_accelerate_scenario(): + """Test TDMPC processor in simulated Accelerate scenario.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_tdmpc_processor(config, stats) + + # Simulate Accelerate: data already on GPU + device = torch.device("cuda:0") + observation = { + OBS_STATE: torch.randn(12).to(device), + OBS_IMAGE: torch.randn(3, 224, 224).to(device), + } + action = torch.randn(6).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on same GPU + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_tdmpc_processor_multi_gpu(): + """Test TDMPC processor with multi-GPU setup.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_tdmpc_processor(config, stats) + + # Simulate data on different GPU + device = torch.device("cuda:1") + observation = { + OBS_STATE: torch.randn(12).to(device), + OBS_IMAGE: torch.randn(3, 224, 224).to(device), + } + action = torch.randn(6).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on cuda:1 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +def test_tdmpc_processor_without_stats(): + """Test TDMPC processor creation without dataset statistics.""" + config = create_default_config() + + preprocessor, postprocessor = make_tdmpc_processor(config, dataset_stats=None) + + # Should still create processors + assert preprocessor is not None + assert postprocessor is not None + + # Process should still work + observation = { + OBS_STATE: torch.randn(12), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(6) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed is not None + + +def test_tdmpc_processor_save_and_load(): + """Test saving and loading TDMPC processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_tdmpc_processor(config, stats) + + with tempfile.TemporaryDirectory() as tmpdir: + # Save preprocessor + preprocessor.save_pretrained(tmpdir) + + # Load preprocessor + loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir) + + # Test that loaded processor works + observation = { + OBS_STATE: torch.randn(12), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(6) + transition = create_transition(observation, action) + + processed = loaded_preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 12) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (1, 6) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_tdmpc_processor_mixed_precision(): + """Test TDMPC processor with mixed precision.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + # Create processor + preprocessor, postprocessor = make_tdmpc_processor(config, stats) + + # Replace DeviceProcessor with one that uses float16 + for i, step in enumerate(preprocessor.steps): + if isinstance(step, DeviceProcessor): + preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16") + + # Create test data + observation = { + OBS_STATE: torch.randn(12, dtype=torch.float32), + OBS_IMAGE: torch.randn(3, 224, 224, dtype=torch.float32), + } + action = torch.randn(6, dtype=torch.float32) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is converted to float16 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16 + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].dtype == torch.float16 + assert processed[TransitionKey.ACTION].dtype == torch.float16 + + +def test_tdmpc_processor_batch_data(): + """Test TDMPC processor with batched data.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_tdmpc_processor(config, stats) + + # Test with batched data + batch_size = 64 + observation = { + OBS_STATE: torch.randn(batch_size, 12), + OBS_IMAGE: torch.randn(batch_size, 3, 224, 224), + } + action = torch.randn(batch_size, 6) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that batch dimension is preserved + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (batch_size, 12) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (batch_size, 3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (batch_size, 6) + + +def test_tdmpc_processor_edge_cases(): + """Test TDMPC processor with edge cases.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_tdmpc_processor(config, stats) + + # Test with only state observation (no image) + observation = {OBS_STATE: torch.randn(12)} + action = torch.randn(6) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 12) + assert OBS_IMAGE not in processed[TransitionKey.OBSERVATION] + + # Test with only image observation (no state) + observation = {OBS_IMAGE: torch.randn(3, 224, 224)} + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224) + assert OBS_STATE not in processed[TransitionKey.OBSERVATION] diff --git a/tests/processor/test_vqbet_processor.py b/tests/processor/test_vqbet_processor.py new file mode 100644 index 000000000..6369d92d1 --- /dev/null +++ b/tests/processor/test_vqbet_processor.py @@ -0,0 +1,345 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for VQBeT policy processor.""" + +import tempfile + +import pytest +import torch + +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE +from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig +from lerobot.policies.vqbet.processor_vqbet import make_vqbet_processor +from lerobot.processor import ( + DeviceProcessor, + NormalizerProcessor, + RenameProcessor, + RobotProcessor, + ToBatchProcessor, + UnnormalizerProcessor, +) +from lerobot.processor.pipeline import TransitionKey + + +def create_transition(observation=None, action=None, **kwargs): + """Helper function to create a transition dictionary.""" + transition = {} + if observation is not None: + transition[TransitionKey.OBSERVATION] = observation + if action is not None: + transition[TransitionKey.ACTION] = action + for key, value in kwargs.items(): + if hasattr(TransitionKey, key.upper()): + transition[getattr(TransitionKey, key.upper())] = value + return transition + + +def create_default_config(): + """Create a default VQBeT configuration for testing.""" + config = VQBeTConfig() + config.input_features = { + OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(8,)), + OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)), + } + config.output_features = { + ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(7,)), + } + config.normalization_mapping = { + FeatureType.STATE: NormalizationMode.MEAN_STD, + FeatureType.VISUAL: NormalizationMode.IDENTITY, + FeatureType.ACTION: NormalizationMode.MIN_MAX, + } + config.device = "cpu" + return config + + +def create_default_stats(): + """Create default dataset statistics for testing.""" + return { + OBS_STATE: {"mean": torch.zeros(8), "std": torch.ones(8)}, + OBS_IMAGE: {}, # No normalization for images + ACTION: {"min": torch.full((7,), -1.0), "max": torch.ones(7)}, + } + + +def test_make_vqbet_processor_basic(): + """Test basic creation of VQBeT processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_vqbet_processor(config, stats) + + # Check processor names + assert preprocessor.name == "robot_preprocessor" + assert postprocessor.name == "robot_postprocessor" + + # Check steps in preprocessor + assert len(preprocessor.steps) == 4 + assert isinstance(preprocessor.steps[0], RenameProcessor) + assert isinstance(preprocessor.steps[1], NormalizerProcessor) + assert isinstance(preprocessor.steps[2], ToBatchProcessor) + assert isinstance(preprocessor.steps[3], DeviceProcessor) + + # Check steps in postprocessor + assert len(postprocessor.steps) == 2 + assert isinstance(postprocessor.steps[0], DeviceProcessor) + assert isinstance(postprocessor.steps[1], UnnormalizerProcessor) + + +def test_vqbet_processor_with_images(): + """Test VQBeT processor with image and state observations.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_vqbet_processor(config, stats) + + # Create test data with images and states + observation = { + OBS_STATE: torch.randn(8), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(7) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is batched + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 8) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (1, 7) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_vqbet_processor_cuda(): + """Test VQBeT processor with CUDA device.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + preprocessor, postprocessor = make_vqbet_processor(config, stats) + + # Create CPU data + observation = { + OBS_STATE: torch.randn(8), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(7) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is on CUDA + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda" + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda" + assert processed[TransitionKey.ACTION].device.type == "cuda" + + # Process through postprocessor + action_transition = create_transition(action=processed[TransitionKey.ACTION]) + postprocessed = postprocessor(action_transition) + + # Check that action is back on CPU + assert postprocessed[TransitionKey.ACTION].device.type == "cpu" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_vqbet_processor_accelerate_scenario(): + """Test VQBeT processor in simulated Accelerate scenario.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_vqbet_processor(config, stats) + + # Simulate Accelerate: data already on GPU and batched + device = torch.device("cuda:0") + observation = { + OBS_STATE: torch.randn(1, 8).to(device), + OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), + } + action = torch.randn(1, 7).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on same GPU + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_vqbet_processor_multi_gpu(): + """Test VQBeT processor with multi-GPU setup.""" + config = create_default_config() + config.device = "cuda:0" + stats = create_default_stats() + + preprocessor, postprocessor = make_vqbet_processor(config, stats) + + # Simulate data on different GPU + device = torch.device("cuda:1") + observation = { + OBS_STATE: torch.randn(1, 8).to(device), + OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), + } + action = torch.randn(1, 7).to(device) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data stays on cuda:1 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device + assert processed[TransitionKey.ACTION].device == device + + +def test_vqbet_processor_without_stats(): + """Test VQBeT processor creation without dataset statistics.""" + config = create_default_config() + + preprocessor, postprocessor = make_vqbet_processor(config, dataset_stats=None) + + # Should still create processors + assert preprocessor is not None + assert postprocessor is not None + + # Process should still work + observation = { + OBS_STATE: torch.randn(8), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(7) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + assert processed is not None + + +def test_vqbet_processor_save_and_load(): + """Test saving and loading VQBeT processor.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_vqbet_processor(config, stats) + + with tempfile.TemporaryDirectory() as tmpdir: + # Save preprocessor + preprocessor.save_pretrained(tmpdir) + + # Load preprocessor + loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir) + + # Test that loaded processor works + observation = { + OBS_STATE: torch.randn(8), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(7) + transition = create_transition(observation, action) + + processed = loaded_preprocessor(transition) + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 8) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (1, 7) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_vqbet_processor_mixed_precision(): + """Test VQBeT processor with mixed precision.""" + config = create_default_config() + config.device = "cuda" + stats = create_default_stats() + + # Create processor + preprocessor, postprocessor = make_vqbet_processor(config, stats) + + # Replace DeviceProcessor with one that uses float16 + for i, step in enumerate(preprocessor.steps): + if isinstance(step, DeviceProcessor): + preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16") + + # Create test data + observation = { + OBS_STATE: torch.randn(8, dtype=torch.float32), + OBS_IMAGE: torch.randn(3, 224, 224, dtype=torch.float32), + } + action = torch.randn(7, dtype=torch.float32) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that data is converted to float16 + assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16 + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].dtype == torch.float16 + assert processed[TransitionKey.ACTION].dtype == torch.float16 + + +def test_vqbet_processor_large_batch(): + """Test VQBeT processor with large batch sizes.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_vqbet_processor(config, stats) + + # Test with large batch + batch_size = 128 + observation = { + OBS_STATE: torch.randn(batch_size, 8), + OBS_IMAGE: torch.randn(batch_size, 3, 224, 224), + } + action = torch.randn(batch_size, 7) + transition = create_transition(observation, action) + + # Process through preprocessor + processed = preprocessor(transition) + + # Check that batch dimension is preserved + assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (batch_size, 8) + assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (batch_size, 3, 224, 224) + assert processed[TransitionKey.ACTION].shape == (batch_size, 7) + + +def test_vqbet_processor_sequential_processing(): + """Test VQBeT processor with sequential data processing.""" + config = create_default_config() + stats = create_default_stats() + + preprocessor, postprocessor = make_vqbet_processor(config, stats) + + # Process multiple samples sequentially + results = [] + for _ in range(5): + observation = { + OBS_STATE: torch.randn(8), + OBS_IMAGE: torch.randn(3, 224, 224), + } + action = torch.randn(7) + transition = create_transition(observation, action) + + processed = preprocessor(transition) + results.append(processed) + + # Check that all results are consistent + for result in results: + assert result[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 8) + assert result[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224) + assert result[TransitionKey.ACTION].shape == (1, 7)