Files
lerobot/tests/processor/test_smolvla_processor.py
T

379 lines
13 KiB
Python

#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for SmolVLA policy processor."""
from unittest.mock import patch
import pytest
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE
from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig
from lerobot.policies.smolvla.processor_smolvla import (
SmolVLANewLineProcessor,
make_smolvla_pre_post_processors,
)
from lerobot.processor import (
DeviceProcessor,
NormalizerProcessor,
RenameProcessor,
ToBatchProcessor,
TransitionKey,
UnnormalizerProcessor,
)
def create_transition(observation=None, action=None, **kwargs):
"""Helper function to create a transition dictionary."""
transition = {}
if observation is not None:
transition[TransitionKey.OBSERVATION] = observation
if action is not None:
transition[TransitionKey.ACTION] = action
for key, value in kwargs.items():
if hasattr(TransitionKey, key.upper()):
transition[getattr(TransitionKey, key.upper())] = value
elif key == "complementary_data":
transition[TransitionKey.COMPLEMENTARY_DATA] = value
return transition
def create_default_config():
"""Create a default SmolVLA configuration for testing."""
config = SmolVLAConfig()
config.input_features = {
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(8,)),
OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
}
config.output_features = {
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(7,)),
}
config.normalization_mapping = {
FeatureType.STATE: NormalizationMode.MEAN_STD,
FeatureType.VISUAL: NormalizationMode.IDENTITY,
FeatureType.ACTION: NormalizationMode.MIN_MAX,
}
config.device = "cpu"
config.vlm_model_name = "HuggingFaceTB/SmolVLM-Instruct"
config.pad_language_to = "max_length"
config.tokenizer_max_length = 100
return config
def create_default_stats():
"""Create default dataset statistics for testing."""
return {
OBS_STATE: {"mean": torch.zeros(8), "std": torch.ones(8)},
OBS_IMAGE: {}, # No normalization for images
ACTION: {"min": torch.full((7,), -1.0), "max": torch.ones(7)},
}
def test_make_smolvla_processor_basic():
"""Test basic creation of SmolVLA processor."""
config = create_default_config()
stats = create_default_stats()
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor"):
preprocessor, postprocessor = make_smolvla_pre_post_processors(
config,
stats,
preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
)
# Check processor names
assert preprocessor.name == "robot_preprocessor"
assert postprocessor.name == "robot_postprocessor"
# Check steps in preprocessor
assert len(preprocessor.steps) == 6
assert isinstance(preprocessor.steps[0], RenameProcessor)
assert isinstance(preprocessor.steps[1], NormalizerProcessor)
assert isinstance(preprocessor.steps[2], ToBatchProcessor)
assert isinstance(preprocessor.steps[3], SmolVLANewLineProcessor)
# Step 4 would be TokenizerProcessor but it's mocked
assert isinstance(preprocessor.steps[5], DeviceProcessor)
# Check steps in postprocessor
assert len(postprocessor.steps) == 2
assert isinstance(postprocessor.steps[0], DeviceProcessor)
assert isinstance(postprocessor.steps[1], UnnormalizerProcessor)
def test_smolvla_newline_processor_single_task():
"""Test SmolVLANewLineProcessor with single task string."""
processor = SmolVLANewLineProcessor()
# Test with task that doesn't have newline
transition = create_transition(complementary_data={"task": "test task"})
result = processor(transition)
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n"
# Test with task that already has newline
transition = create_transition(complementary_data={"task": "test task\n"})
result = processor(transition)
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n"
def test_smolvla_newline_processor_list_of_tasks():
"""Test SmolVLANewLineProcessor with list of task strings."""
processor = SmolVLANewLineProcessor()
# Test with list of tasks
tasks = ["task1", "task2\n", "task3"]
transition = create_transition(complementary_data={"task": tasks})
result = processor(transition)
expected = ["task1\n", "task2\n", "task3\n"]
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == expected
def test_smolvla_newline_processor_empty_transition():
"""Test SmolVLANewLineProcessor with empty transition."""
processor = SmolVLANewLineProcessor()
# Test with no complementary_data
transition = create_transition()
result = processor(transition)
assert result == transition
# Test with complementary_data but no task
transition = create_transition(complementary_data={"other": "data"})
result = processor(transition)
assert result == transition
# Test with None task
transition = create_transition(complementary_data={"task": None})
result = processor(transition)
assert result == transition
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_smolvla_processor_cuda():
"""Test SmolVLA processor with CUDA device."""
config = create_default_config()
config.device = "cuda"
stats = create_default_stats()
# Mock the tokenizer processor to act as pass-through
class MockTokenizerProcessor:
def __init__(self, *args, **kwargs):
pass
def __call__(self, transition):
return transition
def state_dict(self):
return {}
def load_state_dict(self, state):
pass
def reset(self):
pass
def get_config(self):
return {"tokenizer_name": "HuggingFaceTB/SmolVLM-Instruct"}
def transform_features(self, features):
return features
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor", MockTokenizerProcessor):
preprocessor, postprocessor = make_smolvla_pre_post_processors(
config,
stats,
preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
)
# Create CPU data
observation = {
OBS_STATE: torch.randn(8),
OBS_IMAGE: torch.randn(3, 224, 224),
}
action = torch.randn(7)
transition = create_transition(observation, action, complementary_data={"task": "test task"})
# Process through preprocessor
processed = preprocessor(transition)
# Check that data is on CUDA
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda"
assert processed[TransitionKey.ACTION].device.type == "cuda"
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_smolvla_processor_accelerate_scenario():
"""Test SmolVLA processor in simulated Accelerate scenario."""
config = create_default_config()
config.device = "cuda:0"
stats = create_default_stats()
# Mock the tokenizer processor to act as pass-through
class MockTokenizerProcessor:
def __init__(self, *args, **kwargs):
pass
def __call__(self, transition):
return transition
def state_dict(self):
return {}
def load_state_dict(self, state):
pass
def reset(self):
pass
def get_config(self):
return {"tokenizer_name": "HuggingFaceTB/SmolVLM-Instruct"}
def transform_features(self, features):
return features
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor", MockTokenizerProcessor):
preprocessor, postprocessor = make_smolvla_pre_post_processors(
config,
stats,
preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
)
# Simulate Accelerate: data already on GPU and batched
device = torch.device("cuda:0")
observation = {
OBS_STATE: torch.randn(1, 8).to(device),
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
}
action = torch.randn(1, 7).to(device)
transition = create_transition(observation, action, complementary_data={"task": ["test task"]})
# Process through preprocessor
processed = preprocessor(transition)
# Check that data stays on same GPU
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
assert processed[TransitionKey.ACTION].device == device
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
def test_smolvla_processor_multi_gpu():
"""Test SmolVLA processor with multi-GPU setup."""
config = create_default_config()
config.device = "cuda:0"
stats = create_default_stats()
# Mock the tokenizer processor to act as pass-through
class MockTokenizerProcessor:
def __init__(self, *args, **kwargs):
pass
def __call__(self, transition):
return transition
def state_dict(self):
return {}
def load_state_dict(self, state):
pass
def reset(self):
pass
def get_config(self):
return {"tokenizer_name": "HuggingFaceTB/SmolVLM-Instruct"}
def transform_features(self, features):
return features
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor", MockTokenizerProcessor):
preprocessor, postprocessor = make_smolvla_pre_post_processors(
config,
stats,
preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
)
# Simulate data on different GPU
device = torch.device("cuda:1")
observation = {
OBS_STATE: torch.randn(1, 8).to(device),
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
}
action = torch.randn(1, 7).to(device)
transition = create_transition(observation, action, complementary_data={"task": ["test task"]})
# Process through preprocessor
processed = preprocessor(transition)
# Check that data stays on cuda:1
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
assert processed[TransitionKey.ACTION].device == device
def test_smolvla_processor_without_stats():
"""Test SmolVLA processor creation without dataset statistics."""
config = create_default_config()
# Mock the tokenizer processor
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor"):
preprocessor, postprocessor = make_smolvla_pre_post_processors(
config,
dataset_stats=None,
preprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
postprocessor_kwargs={"to_transition": lambda x: x, "to_output": lambda x: x},
)
# Should still create processors
assert preprocessor is not None
assert postprocessor is not None
def test_smolvla_newline_processor_state_dict():
"""Test SmolVLANewLineProcessor state dict methods."""
processor = SmolVLANewLineProcessor()
# Test state_dict (should be empty)
state = processor.state_dict()
assert state == {}
# Test load_state_dict (should do nothing)
processor.load_state_dict({})
# Test reset (should do nothing)
processor.reset()
# Test get_config
config = processor.get_config()
assert config == {}
def test_smolvla_newline_processor_transform_features():
"""Test SmolVLANewLineProcessor transform_features method."""
processor = SmolVLANewLineProcessor()
# Test transform_features
features = {
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)),
}
result = processor.transform_features(features)
assert result == features # Should return unchanged