diff --git a/src/lerobot/processor/pipeline.py b/src/lerobot/processor/pipeline.py index 9db6d2f25..dcf5db28a 100644 --- a/src/lerobot/processor/pipeline.py +++ b/src/lerobot/processor/pipeline.py @@ -31,7 +31,6 @@ from huggingface_hub.errors import HfHubHTTPError from safetensors.torch import load_file, save_file from lerobot.configs.types import PolicyFeature -from lerobot.utils.utils import get_safe_torch_device class TransitionKey(str, Enum): @@ -504,24 +503,6 @@ class RobotProcessor(ModelHubMixin): # Generate README.md from template self._generate_model_card(destination_path) - def to(self, device: str | torch.device): - """Move all tensor states inside each step to device and return self. - - Uses a generic mechanism: fetch each step's state dict, move every tensor - to the target device, and reload it. Only works for steps that implement - both state_dict() and load_state_dict() methods. - """ - device = get_safe_torch_device(device) - - for step in self.steps: - if hasattr(step, "state_dict") and hasattr(step, "load_state_dict"): - state = step.state_dict() - if state: # Only process if there's actual state - moved_state = {k: v.to(device) for k, v in state.items()} - step.load_state_dict(moved_state) - - return self - @classmethod def from_pretrained( cls, source: str, *, config_filename: str | None = None, overrides: dict[str, Any] | None = None diff --git a/tests/processor/test_pipeline.py b/tests/processor/test_pipeline.py index a5f39b7ef..0822b474d 100644 --- a/tests/processor/test_pipeline.py +++ b/tests/processor/test_pipeline.py @@ -21,7 +21,6 @@ from dataclasses import dataclass from pathlib import Path from typing import Any -import numpy as np import pytest import torch import torch.nn as nn @@ -730,182 +729,6 @@ class MockModuleStep(nn.Module): return features -def test_to_device_with_state_dict(): - """Test moving pipeline to device for steps with state_dict.""" - step = MockStepWithTensorState(name="device_test", window_size=5) - pipeline = RobotProcessor([step]) - - # Process some transitions to populate state - for i in range(10): - transition = create_transition(reward=float(i)) - pipeline(transition) - - # Check initial device (should be CPU) - assert step.running_mean.device.type == "cpu" - assert step.running_count.device.type == "cpu" - - # Move to same device (CPU) - result = pipeline.to("cpu") - assert result is pipeline # Check it returns self - assert step.running_mean.device.type == "cpu" - assert step.running_count.device.type == "cpu" - - # Test with torch.device object - result = pipeline.to(torch.device("cpu")) - assert result is pipeline - assert step.running_mean.device.type == "cpu" - - # If CUDA is available, test GPU transfer - if torch.cuda.is_available(): - result = pipeline.to("cuda") - assert result is pipeline - assert step.running_mean.device.type == "cuda" - assert step.running_count.device.type == "cuda" - - # Move back to CPU - pipeline.to("cpu") - assert step.running_mean.device.type == "cpu" - assert step.running_count.device.type == "cpu" - - -def test_to_device_with_module(): - """Test moving pipeline to device for steps that inherit from nn.Module. - - Even though the step inherits from nn.Module, the pipeline will use the - state_dict/load_state_dict approach to move tensors to the device. - """ - module_step = MockModuleStep(input_dim=5, hidden_dim=3) - pipeline = RobotProcessor([module_step]) - - # Process some data - obs = torch.randn(2, 5) - transition = create_transition(observation=obs, reward=1.0) - pipeline(transition) - - # Check initial device - assert module_step.linear.weight.device.type == "cpu" - assert module_step.running_mean.device.type == "cpu" - - # Move to same device - result = pipeline.to("cpu") - assert result is pipeline - assert module_step.linear.weight.device.type == "cpu" - assert module_step.running_mean.device.type == "cpu" - - # If CUDA is available, test GPU transfer - if torch.cuda.is_available(): - result = pipeline.to("cuda:0") - assert result is pipeline - assert module_step.linear.weight.device.type == "cuda" - assert module_step.linear.weight.device.index == 0 - assert module_step.running_mean.device.type == "cuda" - assert module_step.running_mean.device.index == 0 - - # Verify the module still works after transfer - obs_cuda = torch.randn(2, 5, device="cuda:0") - transition = create_transition(observation=obs_cuda, reward=1.0) - pipeline(transition) # Should not raise an error - - -def test_to_device_mixed_steps(): - """Test moving pipeline with various types of steps, all using state_dict approach.""" - module_step = MockModuleStep() - state_dict_step = MockStepWithTensorState() - simple_step = MockStepWithoutOptionalMethods() # No tensor state - - pipeline = RobotProcessor([module_step, state_dict_step, simple_step]) - - # Process some data - for i in range(5): - transition = create_transition(observation=torch.randn(2, 10), reward=float(i)) - pipeline(transition) - - # Check initial state - assert module_step.linear.weight.device.type == "cpu" - assert state_dict_step.running_mean.device.type == "cpu" - - # Move to device - result = pipeline.to("cpu") - assert result is pipeline - - if torch.cuda.is_available(): - pipeline.to("cuda") - assert module_step.linear.weight.device.type == "cuda" - assert module_step.running_mean.device.type == "cuda" - assert state_dict_step.running_mean.device.type == "cuda" - assert state_dict_step.running_count.device.type == "cuda" - - -def test_to_device_empty_state(): - """Test moving pipeline with steps that have empty state_dict.""" - step = MockStep("empty_state") # This step has empty state_dict - pipeline = RobotProcessor([step]) - - # Should not raise an error even with empty state - result = pipeline.to("cpu") - assert result is pipeline - - if torch.cuda.is_available(): - result = pipeline.to("cuda") - assert result is pipeline - - -def test_to_device_preserves_functionality(): - """Test that pipeline functionality is preserved after device transfer.""" - step = MockStepWithTensorState(window_size=3) - pipeline = RobotProcessor([step]) - - # Process initial data - rewards = [1.0, 2.0, 3.0] - for r in rewards: - transition = create_transition(reward=r) - pipeline(transition) - - # Check state before transfer - initial_mean = step.running_mean.clone() - initial_count = step.running_count.clone() - - # Move to device (CPU to CPU in this case, but tests the mechanism) - pipeline.to("cpu") - - # Verify state is preserved - assert torch.allclose(step.running_mean, initial_mean) - assert step.running_count == initial_count - - # Process more data to ensure functionality - transition = create_transition(reward=4.0) - _ = pipeline(transition) - - assert step.running_count == 4 - assert step.running_mean[0] == 4.0 # First slot should have been overwritten with 4.0 - - -def test_to_device_invalid_device(): - """Test error handling for invalid devices.""" - pipeline = RobotProcessor([MockStep()]) - - # Invalid device names should raise an error from PyTorch - with pytest.raises(RuntimeError): - pipeline.to("invalid_device") - - -def test_to_device_chaining(): - """Test that to() returns self for method chaining.""" - step1 = MockStepWithTensorState() - step2 = MockModuleStep() - pipeline = RobotProcessor([step1, step2]) - - # Test chaining - result = pipeline.to("cpu").reset() - assert result is None # reset() returns None - - # Can chain multiple to() calls - result1 = pipeline.to("cpu") - result2 = result1.to("cpu") - assert result1 is pipeline - assert result2 is pipeline - - class MockNonModuleStepWithState: """Mock step that explicitly does NOT inherit from nn.Module but has tensor state. @@ -988,129 +811,6 @@ class MockNonModuleStepWithState: return features -def test_to_device_non_module_class(): - """Test moving pipeline to device for regular classes (non nn.Module) with tensor state. - - This ensures the state_dict/load_state_dict approach works for classes that - don't inherit from nn.Module but still have tensor state to manage. - """ - # Create a non-module step with tensor state - non_module_step = MockNonModuleStepWithState(name="device_test", feature_dim=5) - pipeline = RobotProcessor([non_module_step]) - - # Process some data to populate state - for i in range(3): - obs = torch.randn(2, 5) - transition = create_transition(observation=obs, reward=float(i)) - result = pipeline(transition) - comp_data = result[TransitionKey.COMPLEMENTARY_DATA] - assert f"{non_module_step.name}_steps" in comp_data - - # Verify all tensors are on CPU initially - assert non_module_step.weights.device.type == "cpu" - assert non_module_step.bias.device.type == "cpu" - assert non_module_step.running_stats.device.type == "cpu" - assert non_module_step.step_count.device.type == "cpu" - - # Verify step count - assert non_module_step.step_count.item() == 3 - - # Store initial values for comparison - initial_weights = non_module_step.weights.clone() - initial_bias = non_module_step.bias.clone() - initial_stats = non_module_step.running_stats.clone() - - # Move to same device (CPU) - result = pipeline.to("cpu") - assert result is pipeline - - # Verify tensors are still on CPU and values unchanged - assert non_module_step.weights.device.type == "cpu" - assert torch.allclose(non_module_step.weights, initial_weights) - assert torch.allclose(non_module_step.bias, initial_bias) - assert torch.allclose(non_module_step.running_stats, initial_stats) - - # If CUDA is available, test GPU transfer - if torch.cuda.is_available(): - # Move to GPU - pipeline.to("cuda") - - # Verify all tensors moved to GPU - assert non_module_step.weights.device.type == "cuda" - assert non_module_step.bias.device.type == "cuda" - assert non_module_step.running_stats.device.type == "cuda" - assert non_module_step.step_count.device.type == "cuda" - - # Verify values are preserved - assert torch.allclose(non_module_step.weights.cpu(), initial_weights) - assert torch.allclose(non_module_step.bias.cpu(), initial_bias) - assert torch.allclose(non_module_step.running_stats.cpu(), initial_stats) - assert non_module_step.step_count.item() == 3 - - # Test that step still works on GPU - obs_gpu = torch.randn(2, 5, device="cuda") - transition = create_transition(observation=obs_gpu, reward=1.0) - result = pipeline(transition) - comp_data = result[TransitionKey.COMPLEMENTARY_DATA] - - # Verify processing worked - assert comp_data[f"{non_module_step.name}_steps"] == 4 - - # Move back to CPU - pipeline.to("cpu") - assert non_module_step.weights.device.type == "cpu" - assert non_module_step.step_count.item() == 4 - - -def test_to_device_module_vs_non_module(): - """Test that both nn.Module and non-Module steps work with the same state_dict approach.""" - # Create both types of steps - module_step = MockModuleStep(input_dim=5, hidden_dim=3) - non_module_step = MockNonModuleStepWithState(name="non_module", feature_dim=5) - - # Create pipeline with both - pipeline = RobotProcessor([module_step, non_module_step]) - - # Process some data - obs = torch.randn(2, 5) - transition = create_transition(observation=obs, reward=1.0) - _ = pipeline(transition) - - # Check initial devices - assert module_step.linear.weight.device.type == "cpu" - assert module_step.running_mean.device.type == "cpu" - assert non_module_step.weights.device.type == "cpu" - assert non_module_step.running_stats.device.type == "cpu" - - # Both should have been called - assert module_step.counter == 1 - assert non_module_step.step_count.item() == 1 - - if torch.cuda.is_available(): - # Move to GPU - pipeline.to("cuda") - - # Verify both types of steps moved correctly - assert module_step.linear.weight.device.type == "cuda" - assert module_step.running_mean.device.type == "cuda" - assert non_module_step.weights.device.type == "cuda" - assert non_module_step.running_stats.device.type == "cuda" - - # Process data on GPU - obs_gpu = torch.randn(2, 5, device="cuda") - transition = create_transition(observation=obs_gpu, reward=2.0) - _ = pipeline(transition) - - # Verify both steps processed the data - assert module_step.counter == 2 - assert non_module_step.step_count.item() == 2 - - # Move back to CPU and verify - pipeline.to("cpu") - assert module_step.linear.weight.device.type == "cpu" - assert non_module_step.weights.device.type == "cpu" - - # Tests for overrides functionality @dataclass class MockStepWithNonSerializableParam: @@ -1489,96 +1189,6 @@ def test_from_pretrained_override_error_messages(): assert "registered_mock_step" in error_msg -class MockStepWithMixedState: - """Mock step demonstrating proper separation of tensor and non-tensor state. - - Non-tensor state should go in get_config(), only tensors in state_dict(). - """ - - def __init__(self, name: str = "mixed_state"): - self.name = name - self.tensor_data = torch.randn(5) - self.numpy_data = np.array([1, 2, 3, 4, 5]) # Goes in config - self.scalar_value = 42 # Goes in config - self.list_value = [1, 2, 3] # Goes in config - - def __call__(self, transition: EnvTransition) -> EnvTransition: - # Simple pass-through - return transition - - def state_dict(self) -> dict[str, torch.Tensor]: - """Return ONLY tensor state as per the type contract.""" - return { - "tensor_data": self.tensor_data, - } - - def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: - """Load tensor state only.""" - self.tensor_data = state["tensor_data"] - - def get_config(self) -> dict[str, Any]: - """Non-tensor state goes here.""" - return { - "name": self.name, - "numpy_data": self.numpy_data.tolist(), # Convert to list for JSON serialization - "scalar_value": self.scalar_value, - "list_value": self.list_value, - } - - def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: - # We do not test feature_contract here - return features - - -def test_to_device_with_mixed_state_types(): - """Test that to() only moves tensor state, while non-tensor state remains in config.""" - step = MockStepWithMixedState() - pipeline = RobotProcessor([step]) - - # Store initial values - initial_numpy = step.numpy_data.copy() - initial_scalar = step.scalar_value - initial_list = step.list_value.copy() - - # Check initial state - assert step.tensor_data.device.type == "cpu" - assert isinstance(step.numpy_data, np.ndarray) - assert isinstance(step.scalar_value, int) - assert isinstance(step.list_value, list) - - # Verify state_dict only contains tensors - state = step.state_dict() - assert all(isinstance(v, torch.Tensor) for v in state.values()) - assert "tensor_data" in state - assert "numpy_data" not in state - - # Move to same device - pipeline.to("cpu") - - # Verify tensor moved and non-tensor attributes unchanged - assert step.tensor_data.device.type == "cpu" - assert np.array_equal(step.numpy_data, initial_numpy) - assert step.scalar_value == initial_scalar - assert step.list_value == initial_list - - if torch.cuda.is_available(): - # Move to GPU - pipeline.to("cuda") - - # Only tensor should move to GPU - assert step.tensor_data.device.type == "cuda" - - # Non-tensor values should remain unchanged - assert isinstance(step.numpy_data, np.ndarray) - assert np.array_equal(step.numpy_data, initial_numpy) - assert step.scalar_value == initial_scalar - assert step.list_value == initial_list - - # Move back to CPU - pipeline.to("cpu") - assert step.tensor_data.device.type == "cpu" - - def test_repr_empty_processor(): """Test __repr__ with empty processor.""" pipeline = RobotProcessor()