feature(pipeline): port tokenizer pipeline for VLA (#1645)

* feat(tokenizer): Introduce TokenizerProcessor for text tokenization

- Added TokenizerProcessor class to handle tokenization of task strings using Hugging Face's AutoTokenizer.
- Supports both string and list inputs, with customizable parameters for task key, output key, and tokenization settings.
- Implemented comprehensive unit tests to validate functionality, including handling of various input scenarios and integration with RobotProcessor.
- Updated types.py to include LANGUAGE feature type and modified __init__.py to register the new processor.

* feat(language): Enhance language processing in TokenizerProcessor

- Added OBS_LANGUAGE constant to define the observation language key.
- Updated TokenizerProcessor to store tokenized task data in the observation dictionary, ensuring compatibility with the new language feature.
- Introduced Pi0NewLineProcessor to append newlines to tasks for proper tokenization.
- Modified tests to validate the integration of language tokens and attention masks in the observation structure.

* feat(tokenizer): Add padding configuration to TokenizerProcessor

- Introduced `padding_side` parameter to the TokenizerProcessor for customizable padding direction.
- Updated the `make_pi0_processor` function to include the new padding configuration.
- Enhanced unit tests to validate the functionality of the `padding_side` parameter in various scenarios.

* feat(processor): Add state management methods to Pi0NewLineProcessor

* feat(normalization): Track normalization and unnormalization info in complementary data

- Updated NormalizerProcessor and UnnormalizerProcessor to accept additional parameters for tracking normalization modes.
- Enhanced the __call__ methods to store normalization and unnormalization information in the complementary data of transitions.
- Added unit tests to verify the correct tracking of normalization info, including scenarios with missing stats and selective normalization keys.

* feat(factory): Add preprocessor and postprocessor overrides to ProcessorConfigKwargs

- Updated ProcessorConfigKwargs to include optional overrides for preprocessor and postprocessor configurations.
- Enhanced the make_processor function to utilize the new overrides, allowing for more flexible processor initialization.

* feat(processors): Integrate RenameProcessor into various processor configurations

- Added RenameProcessor to the input steps of multiple processor functions, including make_act_processor, make_diffusion_processor, make_pi0_processor, make_sac_processor, make_tdmpc_processor, make_vqbet_processor, and make_smolvla_processor.
- Consolidated normalization features from input and output into a single NormalizerProcessor for improved efficiency.
- Updated the input steps to ensure compatibility with the new RenameProcessor integration.

* feat(smolvla): Refactor language processing and introduce new line processor (#1658)

- Removed the prepare_language method and directly accessed language tokens and masks from the batch using the OBS_LANGUAGE constant.
- Added SmolVLANewLineProcessor to ensure tasks end with a newline, enhancing tokenization compatibility.
- Updated the make_smolvla_processor function to include the new line processor and tokenizer processor for improved input handling.

* feture(policies): add device processor (#1659)

* feat(processors): Integrate DeviceProcessor into multiple processor configurations

- Added DeviceProcessor to the input and output steps of various processor functions, including make_act_processor, make_diffusion_processor, make_pi0_processor, make_pi0fast_processor, make_sac_processor, make_tdmpc_processor, make_vqbet_processor, and make_smolvla_processor.
- Enhanced the DeviceProcessor class with state management methods and ensured compatibility with existing processor pipelines.
- Introduced unit tests for DeviceProcessor to validate functionality across different scenarios, including CPU and CUDA operations.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor(pipeline): Remove to() method for device management

- Eliminated the to() method from RobotProcessor, which was responsible for moving tensor states to specified devices.
- Removed associated unit tests that validated the functionality of the to() method across various scenarios.
- Streamlined the pipeline code by focusing on other device management strategies.

* feat(processor): Enhance DeviceProcessor with float dtype conversion

- Added support for optional float dtype conversion in DeviceProcessor, allowing tensors to be converted to specified floating-point types while preserving non-float types.
- Implemented validation for float dtype input and updated the processor's configuration methods to include float dtype.
- Refactored tensor processing logic to streamline device movement and dtype conversion.
- Introduced comprehensive unit tests to validate the new float dtype functionality across various scenarios.

* feat(policies): Add new line processors and update module exports

* feat(processor): Enhance batch and device processors to handle index and task_index fields

- Added logic to ToBatchProcessor for unsqueezing 0D tensors for index and task_index fields, ensuring they are processed as 1D tensors.
- Updated DeviceProcessor to process index and task_index fields in complementary data, preserving their tensor types and ensuring non-tensor fields remain unchanged.
- Enhanced unit tests to validate the correct handling of index and task_index fields across various scenarios, including device compatibility and dtype preservation.
This commit is contained in:
Adil Zouitine
2025-08-05 10:53:08 +02:00
committed by Steven Palma
parent a1734cf575
commit 5326ffe77e
26 changed files with 2776 additions and 232 deletions
+874
View File
@@ -0,0 +1,874 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tempfile
import pytest
import torch
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.processor import DeviceProcessor, RobotProcessor
from lerobot.processor.pipeline import TransitionKey
def create_transition(
observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None
):
"""Helper function to create a transition dictionary."""
transition = {}
if observation is not None:
transition[TransitionKey.OBSERVATION] = observation
if action is not None:
transition[TransitionKey.ACTION] = action
if reward is not None:
transition[TransitionKey.REWARD] = reward
if done is not None:
transition[TransitionKey.DONE] = done
if truncated is not None:
transition[TransitionKey.TRUNCATED] = truncated
if info is not None:
transition[TransitionKey.INFO] = info
if complementary_data is not None:
transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data
return transition
def test_basic_functionality():
"""Test basic device processor functionality on CPU."""
processor = DeviceProcessor(device="cpu")
# Create a transition with CPU tensors
observation = {"observation.state": torch.randn(10), "observation.image": torch.randn(3, 224, 224)}
action = torch.randn(5)
reward = torch.tensor(1.0)
done = torch.tensor(False)
truncated = torch.tensor(False)
transition = create_transition(
observation=observation, action=action, reward=reward, done=done, truncated=truncated
)
result = processor(transition)
# Check that all tensors are on CPU
assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cpu"
assert result[TransitionKey.OBSERVATION]["observation.image"].device.type == "cpu"
assert result[TransitionKey.ACTION].device.type == "cpu"
assert result[TransitionKey.REWARD].device.type == "cpu"
assert result[TransitionKey.DONE].device.type == "cpu"
assert result[TransitionKey.TRUNCATED].device.type == "cpu"
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_cuda_functionality():
"""Test device processor functionality on CUDA."""
processor = DeviceProcessor(device="cuda")
# Create a transition with CPU tensors
observation = {"observation.state": torch.randn(10), "observation.image": torch.randn(3, 224, 224)}
action = torch.randn(5)
reward = torch.tensor(1.0)
done = torch.tensor(False)
truncated = torch.tensor(False)
transition = create_transition(
observation=observation, action=action, reward=reward, done=done, truncated=truncated
)
result = processor(transition)
# Check that all tensors are on CUDA
assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cuda"
assert result[TransitionKey.OBSERVATION]["observation.image"].device.type == "cuda"
assert result[TransitionKey.ACTION].device.type == "cuda"
assert result[TransitionKey.REWARD].device.type == "cuda"
assert result[TransitionKey.DONE].device.type == "cuda"
assert result[TransitionKey.TRUNCATED].device.type == "cuda"
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_specific_cuda_device():
"""Test device processor with specific CUDA device."""
processor = DeviceProcessor(device="cuda:0")
observation = {"observation.state": torch.randn(10)}
action = torch.randn(5)
transition = create_transition(observation=observation, action=action)
result = processor(transition)
assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cuda"
assert result[TransitionKey.OBSERVATION]["observation.state"].device.index == 0
assert result[TransitionKey.ACTION].device.type == "cuda"
assert result[TransitionKey.ACTION].device.index == 0
def test_non_tensor_values():
"""Test that non-tensor values are preserved."""
processor = DeviceProcessor(device="cpu")
observation = {
"observation.state": torch.randn(10),
"observation.metadata": {"key": "value"}, # Non-tensor data
"observation.list": [1, 2, 3], # Non-tensor data
}
action = torch.randn(5)
info = {"episode": 1, "step": 42}
transition = create_transition(observation=observation, action=action, info=info)
result = processor(transition)
# Check tensors are processed
assert isinstance(result[TransitionKey.OBSERVATION]["observation.state"], torch.Tensor)
assert isinstance(result[TransitionKey.ACTION], torch.Tensor)
# Check non-tensor values are preserved
assert result[TransitionKey.OBSERVATION]["observation.metadata"] == {"key": "value"}
assert result[TransitionKey.OBSERVATION]["observation.list"] == [1, 2, 3]
assert result[TransitionKey.INFO] == {"episode": 1, "step": 42}
def test_none_values():
"""Test handling of None values."""
processor = DeviceProcessor(device="cpu")
# Test with None observation
transition = create_transition(observation=None, action=torch.randn(5))
result = processor(transition)
assert TransitionKey.OBSERVATION not in result
assert result[TransitionKey.ACTION].device.type == "cpu"
# Test with None action
transition = create_transition(observation={"observation.state": torch.randn(10)}, action=None)
result = processor(transition)
assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cpu"
assert TransitionKey.ACTION not in result
def test_empty_observation():
"""Test handling of empty observation dictionary."""
processor = DeviceProcessor(device="cpu")
transition = create_transition(observation={}, action=torch.randn(5))
result = processor(transition)
assert result[TransitionKey.OBSERVATION] == {}
assert result[TransitionKey.ACTION].device.type == "cpu"
def test_scalar_tensors():
"""Test handling of scalar tensors."""
processor = DeviceProcessor(device="cpu")
observation = {"observation.scalar": torch.tensor(1.5)}
action = torch.tensor(2.0)
reward = torch.tensor(0.5)
transition = create_transition(observation=observation, action=action, reward=reward)
result = processor(transition)
assert result[TransitionKey.OBSERVATION]["observation.scalar"].item() == 1.5
assert result[TransitionKey.ACTION].item() == 2.0
assert result[TransitionKey.REWARD].item() == 0.5
def test_dtype_preservation():
"""Test that tensor dtypes are preserved."""
processor = DeviceProcessor(device="cpu")
observation = {
"observation.float32": torch.randn(5, dtype=torch.float32),
"observation.float64": torch.randn(5, dtype=torch.float64),
"observation.int32": torch.randint(0, 10, (5,), dtype=torch.int32),
"observation.bool": torch.tensor([True, False, True], dtype=torch.bool),
}
action = torch.randn(3, dtype=torch.float16)
transition = create_transition(observation=observation, action=action)
result = processor(transition)
assert result[TransitionKey.OBSERVATION]["observation.float32"].dtype == torch.float32
assert result[TransitionKey.OBSERVATION]["observation.float64"].dtype == torch.float64
assert result[TransitionKey.OBSERVATION]["observation.int32"].dtype == torch.int32
assert result[TransitionKey.OBSERVATION]["observation.bool"].dtype == torch.bool
assert result[TransitionKey.ACTION].dtype == torch.float16
def test_shape_preservation():
"""Test that tensor shapes are preserved."""
processor = DeviceProcessor(device="cpu")
observation = {
"observation.1d": torch.randn(10),
"observation.2d": torch.randn(5, 10),
"observation.3d": torch.randn(3, 224, 224),
"observation.4d": torch.randn(2, 3, 224, 224),
}
action = torch.randn(2, 5, 3)
transition = create_transition(observation=observation, action=action)
result = processor(transition)
assert result[TransitionKey.OBSERVATION]["observation.1d"].shape == (10,)
assert result[TransitionKey.OBSERVATION]["observation.2d"].shape == (5, 10)
assert result[TransitionKey.OBSERVATION]["observation.3d"].shape == (3, 224, 224)
assert result[TransitionKey.OBSERVATION]["observation.4d"].shape == (2, 3, 224, 224)
assert result[TransitionKey.ACTION].shape == (2, 5, 3)
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_mixed_devices():
"""Test handling of tensors already on different devices."""
processor = DeviceProcessor(device="cuda")
# Create tensors on different devices
observation = {
"observation.cpu": torch.randn(5), # CPU
"observation.cuda": torch.randn(5).cuda(), # Already on CUDA
}
action = torch.randn(3).cuda() # Already on CUDA
transition = create_transition(observation=observation, action=action)
result = processor(transition)
# All should be on CUDA
assert result[TransitionKey.OBSERVATION]["observation.cpu"].device.type == "cuda"
assert result[TransitionKey.OBSERVATION]["observation.cuda"].device.type == "cuda"
assert result[TransitionKey.ACTION].device.type == "cuda"
def test_non_blocking_flag():
"""Test that non_blocking flag is set correctly."""
# CPU processor should have non_blocking=False
cpu_processor = DeviceProcessor(device="cpu")
assert cpu_processor.non_blocking is False
# CUDA processor should have non_blocking=True
cuda_processor = DeviceProcessor(device="cuda")
assert cuda_processor.non_blocking is True
cuda_0_processor = DeviceProcessor(device="cuda:0")
assert cuda_0_processor.non_blocking is True
def test_serialization_methods():
"""Test get_config, state_dict, and load_state_dict methods."""
processor = DeviceProcessor(device="cuda")
# Test get_config
config = processor.get_config()
assert config == {"device": "cuda", "float_dtype": None}
# Test state_dict (should be empty)
state = processor.state_dict()
assert state == {}
# Test load_state_dict (should be no-op)
processor.load_state_dict({})
assert processor.device == "cuda"
# Test reset (should be no-op)
processor.reset()
assert processor.device == "cuda"
def test_feature_contract():
"""Test that feature_contract returns features unchanged."""
processor = DeviceProcessor(device="cpu")
features = {
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(10,)),
"action": PolicyFeature(type=FeatureType.ACTION, shape=(5,)),
}
result = processor.feature_contract(features)
assert result == features
assert result is features # Should return the same object
def test_integration_with_robot_processor():
"""Test integration with RobotProcessor."""
from lerobot.processor import ToBatchProcessor
# Create a pipeline with DeviceProcessor
device_processor = DeviceProcessor(device="cpu")
batch_processor = ToBatchProcessor()
processor = RobotProcessor(steps=[batch_processor, device_processor], name="test_pipeline")
# Create test data
observation = {"observation.state": torch.randn(10)}
action = torch.randn(5)
transition = create_transition(observation=observation, action=action)
result = processor(transition)
# Check that tensors are batched and on correct device
assert result[TransitionKey.OBSERVATION]["observation.state"].shape[0] == 1 # Batched
assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cpu"
assert result[TransitionKey.ACTION].shape[0] == 1 # Batched
assert result[TransitionKey.ACTION].device.type == "cpu"
def test_save_and_load_pretrained():
"""Test saving and loading processor with DeviceProcessor."""
processor = DeviceProcessor(device="cuda:0", float_dtype="float16")
robot_processor = RobotProcessor(steps=[processor], name="device_test_processor")
with tempfile.TemporaryDirectory() as tmpdir:
# Save
robot_processor.save_pretrained(tmpdir)
# Load
loaded_processor = RobotProcessor.from_pretrained(tmpdir)
assert len(loaded_processor.steps) == 1
loaded_device_processor = loaded_processor.steps[0]
assert isinstance(loaded_device_processor, DeviceProcessor)
assert loaded_device_processor.device == "cuda:0"
assert loaded_device_processor.float_dtype == "float16"
def test_registry_functionality():
"""Test that DeviceProcessor is properly registered."""
from lerobot.processor.pipeline import ProcessorStepRegistry
# Check that DeviceProcessor is registered
registered_class = ProcessorStepRegistry.get("device_processor")
assert registered_class is DeviceProcessor
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_performance_with_large_tensors():
"""Test performance with large tensors and non_blocking flag."""
processor = DeviceProcessor(device="cuda")
# Create large tensors
observation = {
"observation.large_image": torch.randn(10, 3, 512, 512), # Large image batch
"observation.features": torch.randn(10, 2048), # Large feature vector
}
action = torch.randn(10, 100) # Large action space
transition = create_transition(observation=observation, action=action)
# Process should not raise any errors
result = processor(transition)
# Verify all tensors are on CUDA
assert result[TransitionKey.OBSERVATION]["observation.large_image"].device.type == "cuda"
assert result[TransitionKey.OBSERVATION]["observation.features"].device.type == "cuda"
assert result[TransitionKey.ACTION].device.type == "cuda"
def test_reward_done_truncated_types():
"""Test handling of different types for reward, done, and truncated."""
processor = DeviceProcessor(device="cpu")
# Test with scalar values (not tensors)
transition = create_transition(
observation={"observation.state": torch.randn(5)},
action=torch.randn(3),
reward=1.0, # float
done=False, # bool
truncated=True, # bool
)
result = processor(transition)
# Non-tensor values should be preserved as-is
assert result[TransitionKey.REWARD] == 1.0
assert result[TransitionKey.DONE] is False
assert result[TransitionKey.TRUNCATED] is True
# Test with tensor values
transition = create_transition(
observation={"observation.state": torch.randn(5)},
action=torch.randn(3),
reward=torch.tensor(1.0),
done=torch.tensor(False),
truncated=torch.tensor(True),
)
result = processor(transition)
# Tensor values should be moved to device
assert isinstance(result[TransitionKey.REWARD], torch.Tensor)
assert isinstance(result[TransitionKey.DONE], torch.Tensor)
assert isinstance(result[TransitionKey.TRUNCATED], torch.Tensor)
assert result[TransitionKey.REWARD].device.type == "cpu"
assert result[TransitionKey.DONE].device.type == "cpu"
assert result[TransitionKey.TRUNCATED].device.type == "cpu"
def test_complementary_data_preserved():
"""Test that complementary_data is preserved unchanged."""
processor = DeviceProcessor(device="cpu")
complementary_data = {
"task": "pick_object",
"episode_id": 42,
"metadata": {"sensor": "camera_1"},
"observation_is_pad": torch.tensor([False, False, True]), # This should be moved to device
}
transition = create_transition(
observation={"observation.state": torch.randn(5)}, complementary_data=complementary_data
)
result = processor(transition)
# Check that complementary_data is preserved
assert TransitionKey.COMPLEMENTARY_DATA in result
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "pick_object"
assert result[TransitionKey.COMPLEMENTARY_DATA]["episode_id"] == 42
assert result[TransitionKey.COMPLEMENTARY_DATA]["metadata"] == {"sensor": "camera_1"}
# Note: Currently DeviceProcessor doesn't process tensors in complementary_data
# This is intentional as complementary_data is typically metadata
def test_float_dtype_conversion():
"""Test float dtype conversion functionality."""
processor = DeviceProcessor(device="cpu", float_dtype="float16")
# Create tensors of different types
observation = {
"observation.float32": torch.randn(5, dtype=torch.float32),
"observation.float64": torch.randn(5, dtype=torch.float64),
"observation.int32": torch.randint(0, 10, (5,), dtype=torch.int32),
"observation.int64": torch.randint(0, 10, (5,), dtype=torch.int64),
"observation.bool": torch.tensor([True, False, True], dtype=torch.bool),
}
action = torch.randn(3, dtype=torch.float32)
reward = torch.tensor(1.0, dtype=torch.float32)
transition = create_transition(observation=observation, action=action, reward=reward)
result = processor(transition)
# Check that float tensors are converted to float16
assert result[TransitionKey.OBSERVATION]["observation.float32"].dtype == torch.float16
assert result[TransitionKey.OBSERVATION]["observation.float64"].dtype == torch.float16
assert result[TransitionKey.ACTION].dtype == torch.float16
assert result[TransitionKey.REWARD].dtype == torch.float16
# Check that non-float tensors are preserved
assert result[TransitionKey.OBSERVATION]["observation.int32"].dtype == torch.int32
assert result[TransitionKey.OBSERVATION]["observation.int64"].dtype == torch.int64
assert result[TransitionKey.OBSERVATION]["observation.bool"].dtype == torch.bool
def test_float_dtype_none():
"""Test that when float_dtype is None, no dtype conversion occurs."""
processor = DeviceProcessor(device="cpu", float_dtype=None)
observation = {
"observation.float32": torch.randn(5, dtype=torch.float32),
"observation.float64": torch.randn(5, dtype=torch.float64),
"observation.int32": torch.randint(0, 10, (5,), dtype=torch.int32),
}
action = torch.randn(3, dtype=torch.float64)
transition = create_transition(observation=observation, action=action)
result = processor(transition)
# Check that dtypes are preserved when float_dtype is None
assert result[TransitionKey.OBSERVATION]["observation.float32"].dtype == torch.float32
assert result[TransitionKey.OBSERVATION]["observation.float64"].dtype == torch.float64
assert result[TransitionKey.OBSERVATION]["observation.int32"].dtype == torch.int32
assert result[TransitionKey.ACTION].dtype == torch.float64
def test_float_dtype_bfloat16():
"""Test conversion to bfloat16."""
processor = DeviceProcessor(device="cpu", float_dtype="bfloat16")
observation = {"observation.state": torch.randn(5, dtype=torch.float32)}
action = torch.randn(3, dtype=torch.float64)
transition = create_transition(observation=observation, action=action)
result = processor(transition)
assert result[TransitionKey.OBSERVATION]["observation.state"].dtype == torch.bfloat16
assert result[TransitionKey.ACTION].dtype == torch.bfloat16
def test_float_dtype_float64():
"""Test conversion to float64."""
processor = DeviceProcessor(device="cpu", float_dtype="float64")
observation = {"observation.state": torch.randn(5, dtype=torch.float16)}
action = torch.randn(3, dtype=torch.float32)
transition = create_transition(observation=observation, action=action)
result = processor(transition)
assert result[TransitionKey.OBSERVATION]["observation.state"].dtype == torch.float64
assert result[TransitionKey.ACTION].dtype == torch.float64
def test_float_dtype_invalid():
"""Test that invalid float_dtype raises ValueError."""
with pytest.raises(ValueError, match="Invalid float_dtype 'invalid_dtype'"):
DeviceProcessor(device="cpu", float_dtype="invalid_dtype")
def test_float_dtype_aliases():
"""Test that dtype aliases work correctly."""
# Test 'half' alias for float16
processor_half = DeviceProcessor(device="cpu", float_dtype="half")
assert processor_half._target_float_dtype == torch.float16
# Test 'float' alias for float32
processor_float = DeviceProcessor(device="cpu", float_dtype="float")
assert processor_float._target_float_dtype == torch.float32
# Test 'double' alias for float64
processor_double = DeviceProcessor(device="cpu", float_dtype="double")
assert processor_double._target_float_dtype == torch.float64
def test_float_dtype_with_mixed_tensors():
"""Test float dtype conversion with mixed tensor types."""
processor = DeviceProcessor(device="cpu", float_dtype="float32")
observation = {
"observation.image": torch.randint(0, 255, (3, 64, 64), dtype=torch.uint8), # Should not convert
"observation.state": torch.randn(10, dtype=torch.float64), # Should convert
"observation.mask": torch.tensor([True, False, True], dtype=torch.bool), # Should not convert
"observation.indices": torch.tensor([1, 2, 3], dtype=torch.long), # Should not convert
}
action = torch.randn(5, dtype=torch.float16) # Should convert
transition = create_transition(observation=observation, action=action)
result = processor(transition)
# Check conversions
assert result[TransitionKey.OBSERVATION]["observation.image"].dtype == torch.uint8 # Unchanged
assert result[TransitionKey.OBSERVATION]["observation.state"].dtype == torch.float32 # Converted
assert result[TransitionKey.OBSERVATION]["observation.mask"].dtype == torch.bool # Unchanged
assert result[TransitionKey.OBSERVATION]["observation.indices"].dtype == torch.long # Unchanged
assert result[TransitionKey.ACTION].dtype == torch.float32 # Converted
def test_float_dtype_serialization():
"""Test that float_dtype is properly serialized in get_config."""
processor = DeviceProcessor(device="cuda", float_dtype="float16")
config = processor.get_config()
assert config == {"device": "cuda", "float_dtype": "float16"}
# Test with None float_dtype
processor_none = DeviceProcessor(device="cpu", float_dtype=None)
config_none = processor_none.get_config()
assert config_none == {"device": "cpu", "float_dtype": None}
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_float_dtype_with_cuda():
"""Test float dtype conversion combined with CUDA device."""
processor = DeviceProcessor(device="cuda", float_dtype="float16")
# Create tensors on CPU with different dtypes
observation = {
"observation.float32": torch.randn(5, dtype=torch.float32),
"observation.int64": torch.tensor([1, 2, 3], dtype=torch.int64),
}
action = torch.randn(3, dtype=torch.float64)
transition = create_transition(observation=observation, action=action)
result = processor(transition)
# Check that tensors are on CUDA and float types are converted
assert result[TransitionKey.OBSERVATION]["observation.float32"].device.type == "cuda"
assert result[TransitionKey.OBSERVATION]["observation.float32"].dtype == torch.float16
assert result[TransitionKey.OBSERVATION]["observation.int64"].device.type == "cuda"
assert result[TransitionKey.OBSERVATION]["observation.int64"].dtype == torch.int64 # Unchanged
assert result[TransitionKey.ACTION].device.type == "cuda"
assert result[TransitionKey.ACTION].dtype == torch.float16
def test_complementary_data_index_fields():
"""Test processing of index and task_index fields in complementary_data."""
processor = DeviceProcessor(device="cpu")
# Create transition with index and task_index in complementary_data
complementary_data = {
"task": ["pick_cube"],
"index": torch.tensor([42], dtype=torch.int64),
"task_index": torch.tensor([3], dtype=torch.int64),
"episode_id": 123, # Non-tensor field
}
transition = create_transition(
observation={"observation.state": torch.randn(1, 7)},
action=torch.randn(1, 4),
complementary_data=complementary_data,
)
result = processor(transition)
# Check that tensors in complementary_data are processed
processed_comp_data = result[TransitionKey.COMPLEMENTARY_DATA]
# Check index tensor
assert isinstance(processed_comp_data["index"], torch.Tensor)
assert processed_comp_data["index"].device.type == "cpu"
assert torch.equal(processed_comp_data["index"], complementary_data["index"])
# Check task_index tensor
assert isinstance(processed_comp_data["task_index"], torch.Tensor)
assert processed_comp_data["task_index"].device.type == "cpu"
assert torch.equal(processed_comp_data["task_index"], complementary_data["task_index"])
# Check non-tensor fields remain unchanged
assert processed_comp_data["task"] == ["pick_cube"]
assert processed_comp_data["episode_id"] == 123
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_complementary_data_index_fields_cuda():
"""Test moving index and task_index fields to CUDA."""
processor = DeviceProcessor(device="cuda:0")
# Create CPU tensors
complementary_data = {
"index": torch.tensor([100, 101], dtype=torch.int64),
"task_index": torch.tensor([5], dtype=torch.int64),
}
transition = create_transition(complementary_data=complementary_data)
result = processor(transition)
processed_comp_data = result[TransitionKey.COMPLEMENTARY_DATA]
# Check tensors moved to CUDA
assert processed_comp_data["index"].device.type == "cuda"
assert processed_comp_data["index"].device.index == 0
assert processed_comp_data["task_index"].device.type == "cuda"
assert processed_comp_data["task_index"].device.index == 0
def test_complementary_data_without_index_fields():
"""Test that complementary_data without index/task_index fields works correctly."""
processor = DeviceProcessor(device="cpu")
complementary_data = {
"task": ["navigate"],
"episode_id": 456,
}
transition = create_transition(complementary_data=complementary_data)
result = processor(transition)
# Should process without errors and preserve non-tensor fields
processed_comp_data = result[TransitionKey.COMPLEMENTARY_DATA]
assert processed_comp_data["task"] == ["navigate"]
assert processed_comp_data["episode_id"] == 456
def test_complementary_data_mixed_tensors():
"""Test complementary_data with mix of tensors and non-tensors."""
processor = DeviceProcessor(device="cpu")
complementary_data = {
"task": ["pick_and_place"],
"index": torch.tensor([42], dtype=torch.int64),
"task_index": torch.tensor([3], dtype=torch.int64),
"metrics": [1.0, 2.0, 3.0], # List, not tensor
"config": {"speed": "fast"}, # Dict
"episode_id": 789, # Int
}
transition = create_transition(complementary_data=complementary_data)
result = processor(transition)
processed_comp_data = result[TransitionKey.COMPLEMENTARY_DATA]
# Check tensors are processed
assert isinstance(processed_comp_data["index"], torch.Tensor)
assert isinstance(processed_comp_data["task_index"], torch.Tensor)
# Check non-tensors remain unchanged
assert processed_comp_data["task"] == ["pick_and_place"]
assert processed_comp_data["metrics"] == [1.0, 2.0, 3.0]
assert processed_comp_data["config"] == {"speed": "fast"}
assert processed_comp_data["episode_id"] == 789
def test_complementary_data_float_dtype_conversion():
"""Test that float dtype conversion doesn't affect int tensors in complementary_data."""
processor = DeviceProcessor(device="cpu", float_dtype="float16")
complementary_data = {
"index": torch.tensor([42], dtype=torch.int64),
"task_index": torch.tensor([3], dtype=torch.int64),
"float_tensor": torch.tensor([1.5, 2.5], dtype=torch.float32), # Should be converted
}
transition = create_transition(complementary_data=complementary_data)
result = processor(transition)
processed_comp_data = result[TransitionKey.COMPLEMENTARY_DATA]
# Int tensors should keep their dtype
assert processed_comp_data["index"].dtype == torch.int64
assert processed_comp_data["task_index"].dtype == torch.int64
# Float tensor should be converted
assert processed_comp_data["float_tensor"].dtype == torch.float16
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_complementary_data_full_pipeline_cuda():
"""Test full transition with complementary_data on CUDA."""
processor = DeviceProcessor(device="cuda:0", float_dtype="float16")
# Create full transition with mixed CPU tensors
observation = {"observation.state": torch.randn(1, 7, dtype=torch.float32)}
action = torch.randn(1, 4, dtype=torch.float32)
reward = torch.tensor(1.5, dtype=torch.float32)
done = torch.tensor(False)
complementary_data = {
"task": ["reach_target"],
"index": torch.tensor([1000], dtype=torch.int64),
"task_index": torch.tensor([10], dtype=torch.int64),
}
transition = create_transition(
observation=observation,
action=action,
reward=reward,
done=done,
complementary_data=complementary_data,
)
result = processor(transition)
# Check all components moved to CUDA
assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cuda"
assert result[TransitionKey.ACTION].device.type == "cuda"
assert result[TransitionKey.REWARD].device.type == "cuda"
assert result[TransitionKey.DONE].device.type == "cuda"
# Check complementary_data tensors
processed_comp_data = result[TransitionKey.COMPLEMENTARY_DATA]
assert processed_comp_data["index"].device.type == "cuda"
assert processed_comp_data["task_index"].device.type == "cuda"
# Check float conversion happened for float tensors
assert result[TransitionKey.OBSERVATION]["observation.state"].dtype == torch.float16
assert result[TransitionKey.ACTION].dtype == torch.float16
assert result[TransitionKey.REWARD].dtype == torch.float16
# Check int tensors kept their dtype
assert processed_comp_data["index"].dtype == torch.int64
assert processed_comp_data["task_index"].dtype == torch.int64
def test_complementary_data_empty():
"""Test empty complementary_data handling."""
processor = DeviceProcessor(device="cpu")
transition = create_transition(
observation={"observation.state": torch.randn(1, 7)},
complementary_data={},
)
result = processor(transition)
# Should have empty dict
assert result[TransitionKey.COMPLEMENTARY_DATA] == {}
def test_complementary_data_none():
"""Test None complementary_data handling."""
processor = DeviceProcessor(device="cpu")
transition = create_transition(
observation={"observation.state": torch.randn(1, 7)},
complementary_data=None,
)
result = processor(transition)
# Complementary data should not be in the result (same as input)
assert TransitionKey.COMPLEMENTARY_DATA not in result
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_policy_processor_integration():
"""Test integration with policy processors - input on GPU, output on CPU."""
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.processor import NormalizerProcessor, ToBatchProcessor, UnnormalizerProcessor
# Create features and stats
features = {
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(10,)),
"action": PolicyFeature(type=FeatureType.ACTION, shape=(5,)),
}
stats = {
"observation.state": {"mean": torch.zeros(10), "std": torch.ones(10)},
"action": {"mean": torch.zeros(5), "std": torch.ones(5)},
}
norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD, FeatureType.ACTION: NormalizationMode.MEAN_STD}
# Create input processor (preprocessor) that moves to GPU
input_processor = RobotProcessor(
steps=[
NormalizerProcessor(features=features, norm_map=norm_map, stats=stats),
ToBatchProcessor(),
DeviceProcessor(device="cuda"),
],
name="test_preprocessor",
)
# Create output processor (postprocessor) that moves to CPU
output_processor = RobotProcessor(
steps=[
DeviceProcessor(device="cpu"),
UnnormalizerProcessor(features={"action": features["action"]}, norm_map=norm_map, stats=stats),
],
name="test_postprocessor",
)
# Test data on CPU
observation = {"observation.state": torch.randn(10)}
action = torch.randn(5)
transition = create_transition(observation=observation, action=action)
# Process through input processor
input_result = input_processor(transition)
# Verify tensors are on GPU and batched
assert input_result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cuda"
assert input_result[TransitionKey.OBSERVATION]["observation.state"].shape[0] == 1
assert input_result[TransitionKey.ACTION].device.type == "cuda"
assert input_result[TransitionKey.ACTION].shape[0] == 1
# Simulate model output on GPU
model_output = create_transition(action=torch.randn(1, 5).cuda())
# Process through output processor
output_result = output_processor(model_output)
# Verify action is back on CPU and unnormalized
assert output_result[TransitionKey.ACTION].device.type == "cpu"
assert output_result[TransitionKey.ACTION].shape == (1, 5)