refactor(pipeline): Remove to() method for device management

- Eliminated the to() method from RobotProcessor, which was responsible for moving tensor states to specified devices.
- Removed associated unit tests that validated the functionality of the to() method across various scenarios.
- Streamlined the pipeline code by focusing on other device management strategies.
This commit is contained in:
AdilZouitine
2025-08-03 19:03:27 +02:00
committed by Adil Zouitine
parent 41959389b6
commit 5595887fd0
2 changed files with 0 additions and 409 deletions
-390
View File
@@ -21,7 +21,6 @@ from dataclasses import dataclass
from pathlib import Path
from typing import Any
import numpy as np
import pytest
import torch
import torch.nn as nn
@@ -730,182 +729,6 @@ class MockModuleStep(nn.Module):
return features
def test_to_device_with_state_dict():
"""Test moving pipeline to device for steps with state_dict."""
step = MockStepWithTensorState(name="device_test", window_size=5)
pipeline = RobotProcessor([step])
# Process some transitions to populate state
for i in range(10):
transition = create_transition(reward=float(i))
pipeline(transition)
# Check initial device (should be CPU)
assert step.running_mean.device.type == "cpu"
assert step.running_count.device.type == "cpu"
# Move to same device (CPU)
result = pipeline.to("cpu")
assert result is pipeline # Check it returns self
assert step.running_mean.device.type == "cpu"
assert step.running_count.device.type == "cpu"
# Test with torch.device object
result = pipeline.to(torch.device("cpu"))
assert result is pipeline
assert step.running_mean.device.type == "cpu"
# If CUDA is available, test GPU transfer
if torch.cuda.is_available():
result = pipeline.to("cuda")
assert result is pipeline
assert step.running_mean.device.type == "cuda"
assert step.running_count.device.type == "cuda"
# Move back to CPU
pipeline.to("cpu")
assert step.running_mean.device.type == "cpu"
assert step.running_count.device.type == "cpu"
def test_to_device_with_module():
"""Test moving pipeline to device for steps that inherit from nn.Module.
Even though the step inherits from nn.Module, the pipeline will use the
state_dict/load_state_dict approach to move tensors to the device.
"""
module_step = MockModuleStep(input_dim=5, hidden_dim=3)
pipeline = RobotProcessor([module_step])
# Process some data
obs = torch.randn(2, 5)
transition = create_transition(observation=obs, reward=1.0)
pipeline(transition)
# Check initial device
assert module_step.linear.weight.device.type == "cpu"
assert module_step.running_mean.device.type == "cpu"
# Move to same device
result = pipeline.to("cpu")
assert result is pipeline
assert module_step.linear.weight.device.type == "cpu"
assert module_step.running_mean.device.type == "cpu"
# If CUDA is available, test GPU transfer
if torch.cuda.is_available():
result = pipeline.to("cuda:0")
assert result is pipeline
assert module_step.linear.weight.device.type == "cuda"
assert module_step.linear.weight.device.index == 0
assert module_step.running_mean.device.type == "cuda"
assert module_step.running_mean.device.index == 0
# Verify the module still works after transfer
obs_cuda = torch.randn(2, 5, device="cuda:0")
transition = create_transition(observation=obs_cuda, reward=1.0)
pipeline(transition) # Should not raise an error
def test_to_device_mixed_steps():
"""Test moving pipeline with various types of steps, all using state_dict approach."""
module_step = MockModuleStep()
state_dict_step = MockStepWithTensorState()
simple_step = MockStepWithoutOptionalMethods() # No tensor state
pipeline = RobotProcessor([module_step, state_dict_step, simple_step])
# Process some data
for i in range(5):
transition = create_transition(observation=torch.randn(2, 10), reward=float(i))
pipeline(transition)
# Check initial state
assert module_step.linear.weight.device.type == "cpu"
assert state_dict_step.running_mean.device.type == "cpu"
# Move to device
result = pipeline.to("cpu")
assert result is pipeline
if torch.cuda.is_available():
pipeline.to("cuda")
assert module_step.linear.weight.device.type == "cuda"
assert module_step.running_mean.device.type == "cuda"
assert state_dict_step.running_mean.device.type == "cuda"
assert state_dict_step.running_count.device.type == "cuda"
def test_to_device_empty_state():
"""Test moving pipeline with steps that have empty state_dict."""
step = MockStep("empty_state") # This step has empty state_dict
pipeline = RobotProcessor([step])
# Should not raise an error even with empty state
result = pipeline.to("cpu")
assert result is pipeline
if torch.cuda.is_available():
result = pipeline.to("cuda")
assert result is pipeline
def test_to_device_preserves_functionality():
"""Test that pipeline functionality is preserved after device transfer."""
step = MockStepWithTensorState(window_size=3)
pipeline = RobotProcessor([step])
# Process initial data
rewards = [1.0, 2.0, 3.0]
for r in rewards:
transition = create_transition(reward=r)
pipeline(transition)
# Check state before transfer
initial_mean = step.running_mean.clone()
initial_count = step.running_count.clone()
# Move to device (CPU to CPU in this case, but tests the mechanism)
pipeline.to("cpu")
# Verify state is preserved
assert torch.allclose(step.running_mean, initial_mean)
assert step.running_count == initial_count
# Process more data to ensure functionality
transition = create_transition(reward=4.0)
_ = pipeline(transition)
assert step.running_count == 4
assert step.running_mean[0] == 4.0 # First slot should have been overwritten with 4.0
def test_to_device_invalid_device():
"""Test error handling for invalid devices."""
pipeline = RobotProcessor([MockStep()])
# Invalid device names should raise an error from PyTorch
with pytest.raises(RuntimeError):
pipeline.to("invalid_device")
def test_to_device_chaining():
"""Test that to() returns self for method chaining."""
step1 = MockStepWithTensorState()
step2 = MockModuleStep()
pipeline = RobotProcessor([step1, step2])
# Test chaining
result = pipeline.to("cpu").reset()
assert result is None # reset() returns None
# Can chain multiple to() calls
result1 = pipeline.to("cpu")
result2 = result1.to("cpu")
assert result1 is pipeline
assert result2 is pipeline
class MockNonModuleStepWithState:
"""Mock step that explicitly does NOT inherit from nn.Module but has tensor state.
@@ -988,129 +811,6 @@ class MockNonModuleStepWithState:
return features
def test_to_device_non_module_class():
"""Test moving pipeline to device for regular classes (non nn.Module) with tensor state.
This ensures the state_dict/load_state_dict approach works for classes that
don't inherit from nn.Module but still have tensor state to manage.
"""
# Create a non-module step with tensor state
non_module_step = MockNonModuleStepWithState(name="device_test", feature_dim=5)
pipeline = RobotProcessor([non_module_step])
# Process some data to populate state
for i in range(3):
obs = torch.randn(2, 5)
transition = create_transition(observation=obs, reward=float(i))
result = pipeline(transition)
comp_data = result[TransitionKey.COMPLEMENTARY_DATA]
assert f"{non_module_step.name}_steps" in comp_data
# Verify all tensors are on CPU initially
assert non_module_step.weights.device.type == "cpu"
assert non_module_step.bias.device.type == "cpu"
assert non_module_step.running_stats.device.type == "cpu"
assert non_module_step.step_count.device.type == "cpu"
# Verify step count
assert non_module_step.step_count.item() == 3
# Store initial values for comparison
initial_weights = non_module_step.weights.clone()
initial_bias = non_module_step.bias.clone()
initial_stats = non_module_step.running_stats.clone()
# Move to same device (CPU)
result = pipeline.to("cpu")
assert result is pipeline
# Verify tensors are still on CPU and values unchanged
assert non_module_step.weights.device.type == "cpu"
assert torch.allclose(non_module_step.weights, initial_weights)
assert torch.allclose(non_module_step.bias, initial_bias)
assert torch.allclose(non_module_step.running_stats, initial_stats)
# If CUDA is available, test GPU transfer
if torch.cuda.is_available():
# Move to GPU
pipeline.to("cuda")
# Verify all tensors moved to GPU
assert non_module_step.weights.device.type == "cuda"
assert non_module_step.bias.device.type == "cuda"
assert non_module_step.running_stats.device.type == "cuda"
assert non_module_step.step_count.device.type == "cuda"
# Verify values are preserved
assert torch.allclose(non_module_step.weights.cpu(), initial_weights)
assert torch.allclose(non_module_step.bias.cpu(), initial_bias)
assert torch.allclose(non_module_step.running_stats.cpu(), initial_stats)
assert non_module_step.step_count.item() == 3
# Test that step still works on GPU
obs_gpu = torch.randn(2, 5, device="cuda")
transition = create_transition(observation=obs_gpu, reward=1.0)
result = pipeline(transition)
comp_data = result[TransitionKey.COMPLEMENTARY_DATA]
# Verify processing worked
assert comp_data[f"{non_module_step.name}_steps"] == 4
# Move back to CPU
pipeline.to("cpu")
assert non_module_step.weights.device.type == "cpu"
assert non_module_step.step_count.item() == 4
def test_to_device_module_vs_non_module():
"""Test that both nn.Module and non-Module steps work with the same state_dict approach."""
# Create both types of steps
module_step = MockModuleStep(input_dim=5, hidden_dim=3)
non_module_step = MockNonModuleStepWithState(name="non_module", feature_dim=5)
# Create pipeline with both
pipeline = RobotProcessor([module_step, non_module_step])
# Process some data
obs = torch.randn(2, 5)
transition = create_transition(observation=obs, reward=1.0)
_ = pipeline(transition)
# Check initial devices
assert module_step.linear.weight.device.type == "cpu"
assert module_step.running_mean.device.type == "cpu"
assert non_module_step.weights.device.type == "cpu"
assert non_module_step.running_stats.device.type == "cpu"
# Both should have been called
assert module_step.counter == 1
assert non_module_step.step_count.item() == 1
if torch.cuda.is_available():
# Move to GPU
pipeline.to("cuda")
# Verify both types of steps moved correctly
assert module_step.linear.weight.device.type == "cuda"
assert module_step.running_mean.device.type == "cuda"
assert non_module_step.weights.device.type == "cuda"
assert non_module_step.running_stats.device.type == "cuda"
# Process data on GPU
obs_gpu = torch.randn(2, 5, device="cuda")
transition = create_transition(observation=obs_gpu, reward=2.0)
_ = pipeline(transition)
# Verify both steps processed the data
assert module_step.counter == 2
assert non_module_step.step_count.item() == 2
# Move back to CPU and verify
pipeline.to("cpu")
assert module_step.linear.weight.device.type == "cpu"
assert non_module_step.weights.device.type == "cpu"
# Tests for overrides functionality
@dataclass
class MockStepWithNonSerializableParam:
@@ -1489,96 +1189,6 @@ def test_from_pretrained_override_error_messages():
assert "registered_mock_step" in error_msg
class MockStepWithMixedState:
"""Mock step demonstrating proper separation of tensor and non-tensor state.
Non-tensor state should go in get_config(), only tensors in state_dict().
"""
def __init__(self, name: str = "mixed_state"):
self.name = name
self.tensor_data = torch.randn(5)
self.numpy_data = np.array([1, 2, 3, 4, 5]) # Goes in config
self.scalar_value = 42 # Goes in config
self.list_value = [1, 2, 3] # Goes in config
def __call__(self, transition: EnvTransition) -> EnvTransition:
# Simple pass-through
return transition
def state_dict(self) -> dict[str, torch.Tensor]:
"""Return ONLY tensor state as per the type contract."""
return {
"tensor_data": self.tensor_data,
}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
"""Load tensor state only."""
self.tensor_data = state["tensor_data"]
def get_config(self) -> dict[str, Any]:
"""Non-tensor state goes here."""
return {
"name": self.name,
"numpy_data": self.numpy_data.tolist(), # Convert to list for JSON serialization
"scalar_value": self.scalar_value,
"list_value": self.list_value,
}
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
# We do not test feature_contract here
return features
def test_to_device_with_mixed_state_types():
"""Test that to() only moves tensor state, while non-tensor state remains in config."""
step = MockStepWithMixedState()
pipeline = RobotProcessor([step])
# Store initial values
initial_numpy = step.numpy_data.copy()
initial_scalar = step.scalar_value
initial_list = step.list_value.copy()
# Check initial state
assert step.tensor_data.device.type == "cpu"
assert isinstance(step.numpy_data, np.ndarray)
assert isinstance(step.scalar_value, int)
assert isinstance(step.list_value, list)
# Verify state_dict only contains tensors
state = step.state_dict()
assert all(isinstance(v, torch.Tensor) for v in state.values())
assert "tensor_data" in state
assert "numpy_data" not in state
# Move to same device
pipeline.to("cpu")
# Verify tensor moved and non-tensor attributes unchanged
assert step.tensor_data.device.type == "cpu"
assert np.array_equal(step.numpy_data, initial_numpy)
assert step.scalar_value == initial_scalar
assert step.list_value == initial_list
if torch.cuda.is_available():
# Move to GPU
pipeline.to("cuda")
# Only tensor should move to GPU
assert step.tensor_data.device.type == "cuda"
# Non-tensor values should remain unchanged
assert isinstance(step.numpy_data, np.ndarray)
assert np.array_equal(step.numpy_data, initial_numpy)
assert step.scalar_value == initial_scalar
assert step.list_value == initial_list
# Move back to CPU
pipeline.to("cpu")
assert step.tensor_data.device.type == "cpu"
def test_repr_empty_processor():
"""Test __repr__ with empty processor."""
pipeline = RobotProcessor()