diff --git a/src/lerobot/processor/device_processor.py b/src/lerobot/processor/device_processor.py index 39bd1cf11..4188e6208 100644 --- a/src/lerobot/processor/device_processor.py +++ b/src/lerobot/processor/device_processor.py @@ -66,9 +66,26 @@ class DeviceProcessor: self._target_float_dtype = None def _process_tensor(self, tensor: torch.Tensor) -> torch.Tensor: - """Process a tensor by moving to device and optionally converting float dtype.""" - # Move to device first - tensor = tensor.to(self.device, non_blocking=self.non_blocking) + """Process a tensor by moving to device and optionally converting float dtype. + + If the tensor is already on a GPU and we're configured for a GPU, it preserves + that GPU placement (useful for multi-GPU training with Accelerate). + Otherwise, it moves to the configured device. + """ + # Determine target device + if tensor.is_cuda and self._device.type == "cuda": + # Both tensor and target are on GPU - preserve tensor's GPU placement + # This handles multi-GPU scenarios where Accelerate has already placed + # tensors on the correct GPU for each process + target_device = tensor.device + else: + # Either tensor is on CPU, or we're configured for CPU + # In both cases, use the configured device + target_device = self._device + + # Only move if necessary + if tensor.device != target_device: + tensor = tensor.to(target_device, non_blocking=self.non_blocking) # Convert float dtype if specified and tensor is floating point if self._target_float_dtype is not None and tensor.is_floating_point(): diff --git a/src/lerobot/processor/tokenizer_processor.py b/src/lerobot/processor/tokenizer_processor.py index 796acf924..003b00a4b 100644 --- a/src/lerobot/processor/tokenizer_processor.py +++ b/src/lerobot/processor/tokenizer_processor.py @@ -134,9 +134,19 @@ class TokenizerProcessor: if task is None: return transition - # Tokenize the task + # Tokenize the task (creates CPU tensors) tokenized_prompt = self._tokenize_text(task) + # Detect device from existing tensors in the transition + target_device = self._detect_device(transition) + + # Move tokenized tensors to match the device of other data + if target_device is not None: + tokenized_prompt = { + k: v.to(target_device) if isinstance(v, torch.Tensor) else v + for k, v in tokenized_prompt.items() + } + # Get or create observation dict observation = transition.get(TransitionKey.OBSERVATION) if observation is None: @@ -153,6 +163,45 @@ class TokenizerProcessor: transition[TransitionKey.OBSERVATION.value] = observation # type: ignore[misc] return transition + def _detect_device(self, transition: EnvTransition) -> torch.device | None: + """Detect device from existing tensors in the transition. + + This allows the tokenized tensors to match the device of other data, + which is especially important for multi-GPU training with Accelerate. + + Args: + transition: The transition to search for existing tensors. + + Returns: + The device of the first tensor found, or None if no tensors exist. + """ + # Check observation tensors first (most likely to exist) + observation = transition.get(TransitionKey.OBSERVATION) + if observation: + for value in observation.values(): + if isinstance(value, torch.Tensor): + return value.device + + # Check action tensor + action = transition.get(TransitionKey.ACTION) + if isinstance(action, torch.Tensor): + return action.device + + # Check other tensor fields + for key in [TransitionKey.REWARD, TransitionKey.DONE, TransitionKey.TRUNCATED]: + value = transition.get(key) + if isinstance(value, torch.Tensor): + return value.device + + # Check complementary data for tensors + complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA) + if complementary_data: + for value in complementary_data.values(): + if isinstance(value, torch.Tensor): + return value.device + + return None # No tensors found, keep on CPU + def _tokenize_text(self, text: str | list[str]) -> dict[str, torch.Tensor]: """Tokenize text using the configured tokenizer. diff --git a/tests/processor/test_device_processor.py b/tests/processor/test_device_processor.py index 7e30750f4..354eb22ec 100644 --- a/tests/processor/test_device_processor.py +++ b/tests/processor/test_device_processor.py @@ -820,6 +820,143 @@ def test_complementary_data_none(): assert TransitionKey.COMPLEMENTARY_DATA not in result +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_preserves_gpu_placement(): + """Test that DeviceProcessor preserves GPU placement when tensor is already on GPU.""" + processor = DeviceProcessor(device="cuda:0") + + # Create tensors already on GPU + observation = { + "observation.state": torch.randn(10).cuda(), # Already on GPU + "observation.image": torch.randn(3, 224, 224).cuda(), # Already on GPU + } + action = torch.randn(5).cuda() # Already on GPU + + transition = create_transition(observation=observation, action=action) + result = processor(transition) + + # Check that tensors remain on their original GPU + assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cuda" + assert result[TransitionKey.OBSERVATION]["observation.image"].device.type == "cuda" + assert result[TransitionKey.ACTION].device.type == "cuda" + + # Verify no unnecessary copies were made (same data pointer) + assert torch.equal( + result[TransitionKey.OBSERVATION]["observation.state"], observation["observation.state"] + ) + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_multi_gpu_preservation(): + """Test that DeviceProcessor preserves placement on different GPUs in multi-GPU setup.""" + # Test 1: GPU-to-GPU preservation (cuda:0 config, cuda:1 input) + processor_gpu = DeviceProcessor(device="cuda:0") + + # Create tensors on cuda:1 (simulating Accelerate placement) + cuda1_device = torch.device("cuda:1") + observation = { + "observation.state": torch.randn(10).to(cuda1_device), + "observation.image": torch.randn(3, 224, 224).to(cuda1_device), + } + action = torch.randn(5).to(cuda1_device) + + transition = create_transition(observation=observation, action=action) + result = processor_gpu(transition) + + # Check that tensors remain on cuda:1 (not moved to cuda:0) + assert result[TransitionKey.OBSERVATION]["observation.state"].device == cuda1_device + assert result[TransitionKey.OBSERVATION]["observation.image"].device == cuda1_device + assert result[TransitionKey.ACTION].device == cuda1_device + + # Test 2: GPU-to-CPU should move to CPU (not preserve GPU) + processor_cpu = DeviceProcessor(device="cpu") + + transition_gpu = create_transition( + observation={"observation.state": torch.randn(10).cuda()}, action=torch.randn(5).cuda() + ) + result_cpu = processor_cpu(transition_gpu) + + # Check that tensors are moved to CPU + assert result_cpu[TransitionKey.OBSERVATION]["observation.state"].device.type == "cpu" + assert result_cpu[TransitionKey.ACTION].device.type == "cpu" + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_multi_gpu_with_cpu_tensors(): + """Test that CPU tensors are moved to configured device even in multi-GPU context.""" + # Processor configured for cuda:1 + processor = DeviceProcessor(device="cuda:1") + + # Mix of CPU and GPU tensors + observation = { + "observation.cpu": torch.randn(10), # CPU tensor + "observation.gpu0": torch.randn(10).cuda(0), # Already on cuda:0 + "observation.gpu1": torch.randn(10).cuda(1), # Already on cuda:1 + } + action = torch.randn(5) # CPU tensor + + transition = create_transition(observation=observation, action=action) + result = processor(transition) + + # CPU tensor should move to configured device (cuda:1) + assert result[TransitionKey.OBSERVATION]["observation.cpu"].device.type == "cuda" + assert result[TransitionKey.OBSERVATION]["observation.cpu"].device.index == 1 + assert result[TransitionKey.ACTION].device.type == "cuda" + assert result[TransitionKey.ACTION].device.index == 1 + + # GPU tensors should stay on their original devices + assert result[TransitionKey.OBSERVATION]["observation.gpu0"].device.index == 0 + assert result[TransitionKey.OBSERVATION]["observation.gpu1"].device.index == 1 + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +def test_multi_gpu_with_float_dtype(): + """Test float dtype conversion works correctly with multi-GPU preservation.""" + processor = DeviceProcessor(device="cuda:0", float_dtype="float16") + + # Create float tensors on different GPUs + observation = { + "observation.gpu0": torch.randn(5, dtype=torch.float32).cuda(0), + "observation.gpu1": torch.randn(5, dtype=torch.float32).cuda(1), + "observation.cpu": torch.randn(5, dtype=torch.float32), # CPU + } + + transition = create_transition(observation=observation) + result = processor(transition) + + # Check device placement + assert result[TransitionKey.OBSERVATION]["observation.gpu0"].device.index == 0 + assert result[TransitionKey.OBSERVATION]["observation.gpu1"].device.index == 1 + assert result[TransitionKey.OBSERVATION]["observation.cpu"].device.index == 0 # Moved to cuda:0 + + # Check dtype conversion happened for all + assert result[TransitionKey.OBSERVATION]["observation.gpu0"].dtype == torch.float16 + assert result[TransitionKey.OBSERVATION]["observation.gpu1"].dtype == torch.float16 + assert result[TransitionKey.OBSERVATION]["observation.cpu"].dtype == torch.float16 + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_simulated_accelerate_scenario(): + """Test a scenario simulating how Accelerate would use the processor.""" + # Simulate different processes getting different GPU assignments + for gpu_id in range(min(torch.cuda.device_count(), 2)): + # Each "process" has a processor configured for cuda:0 + # but data comes in already placed on the process's GPU + processor = DeviceProcessor(device="cuda:0") + + # Simulate data already placed by Accelerate + device = torch.device(f"cuda:{gpu_id}") + observation = {"observation.state": torch.randn(1, 10).to(device)} + action = torch.randn(1, 5).to(device) + + transition = create_transition(observation=observation, action=action) + result = processor(transition) + + # Verify data stays on the GPU where Accelerate placed it + assert result[TransitionKey.OBSERVATION]["observation.state"].device == device + assert result[TransitionKey.ACTION].device == device + + @pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") def test_policy_processor_integration(): """Test integration with policy processors - input on GPU, output on CPU.""" diff --git a/tests/processor/test_tokenizer_processor.py b/tests/processor/test_tokenizer_processor.py index 784b1ce81..802b2edb7 100644 --- a/tests/processor/test_tokenizer_processor.py +++ b/tests/processor/test_tokenizer_processor.py @@ -725,3 +725,264 @@ def test_custom_padding_side(mock_auto_tokenizer): processor_right(transition) assert tracking_tokenizer.padding_side_calls[-1] == "right" + + +@require_package("transformers") +def test_device_detection_cpu(): + """Test that tokenized tensors stay on CPU when other tensors are on CPU.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10) + + # Create transition with CPU tensors + observation = {"observation.state": torch.randn(10)} # CPU tensor + action = torch.randn(5) # CPU tensor + transition = create_transition( + observation=observation, action=action, complementary_data={"task": "test task"} + ) + + result = processor(transition) + + # Check that tokenized tensors are on CPU + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + + assert tokens.device.type == "cpu" + assert attention_mask.device.type == "cpu" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +@require_package("transformers") +def test_device_detection_cuda(): + """Test that tokenized tensors are moved to CUDA when other tensors are on CUDA.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10) + + # Create transition with CUDA tensors + observation = {"observation.state": torch.randn(10).cuda()} # CUDA tensor + action = torch.randn(5).cuda() # CUDA tensor + transition = create_transition( + observation=observation, action=action, complementary_data={"task": "test task"} + ) + + result = processor(transition) + + # Check that tokenized tensors are on CUDA + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + + assert tokens.device.type == "cuda" + assert attention_mask.device.type == "cuda" + assert tokens.device.index == 0 # Should be on same device as input + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") +@require_package("transformers") +def test_device_detection_multi_gpu(): + """Test that tokenized tensors match device in multi-GPU setup.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10) + + # Test with tensors on cuda:1 + device = torch.device("cuda:1") + observation = {"observation.state": torch.randn(10).to(device)} + action = torch.randn(5).to(device) + transition = create_transition( + observation=observation, action=action, complementary_data={"task": "multi gpu test"} + ) + + result = processor(transition) + + # Check that tokenized tensors are on cuda:1 + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + + assert tokens.device == device + assert attention_mask.device == device + + +@require_package("transformers") +def test_device_detection_no_tensors(): + """Test that tokenized tensors stay on CPU when no other tensors exist.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10) + + # Create transition with no tensors + transition = create_transition( + observation={"metadata": {"key": "value"}}, # No tensors + complementary_data={"task": "no tensor test"}, + ) + + result = processor(transition) + + # Check that tokenized tensors are on CPU (default) + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + + assert tokens.device.type == "cpu" + assert attention_mask.device.type == "cpu" + + +@require_package("transformers") +def test_device_detection_mixed_devices(): + """Test device detection when tensors are on different devices (uses first found).""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10) + + if torch.cuda.is_available(): + # Create transition with mixed devices + observation = { + "observation.cpu": torch.randn(10), # CPU + "observation.cuda": torch.randn(10).cuda(), # CUDA + } + transition = create_transition( + observation=observation, complementary_data={"task": "mixed device test"} + ) + + result = processor(transition) + + # The device detection should use the first tensor found + # (iteration order depends on dict, but result should be consistent) + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + + # Both should be on the same device + assert tokens.device == attention_mask.device + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +@require_package("transformers") +def test_device_detection_from_action(): + """Test that device is detected from action tensor when no observation tensors exist.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10) + + # Create transition with action on CUDA but no observation tensors + observation = {"metadata": {"key": "value"}} # No tensors in observation + action = torch.randn(5).cuda() + transition = create_transition( + observation=observation, action=action, complementary_data={"task": "action device test"} + ) + + result = processor(transition) + + # Check that tokenized tensors match action's device + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + + assert tokens.device.type == "cuda" + assert attention_mask.device.type == "cuda" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +@require_package("transformers") +def test_device_detection_from_complementary_data(): + """Test that device is detected from tensors in complementary_data.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10) + + # Create transition with tensor in complementary_data + transition = create_transition( + observation={"metadata": {"key": "value"}}, # No tensors + complementary_data={ + "task": "comp data test", + "index": torch.tensor([42]).cuda(), # Tensor in complementary_data + }, + ) + + result = processor(transition) + + # Check that tokenized tensors match complementary_data tensor's device + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + + assert tokens.device.type == "cuda" + assert attention_mask.device.type == "cuda" + + +@require_package("transformers") +def test_device_detection_preserves_dtype(): + """Test that device detection doesn't affect dtype of tokenized tensors.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10) + + # Create transition with float tensor (to test dtype isn't affected) + observation = {"observation.state": torch.randn(10, dtype=torch.float16)} + transition = create_transition(observation=observation, complementary_data={"task": "dtype test"}) + + result = processor(transition) + + # Check that tokenized tensors have correct dtypes (not affected by input dtype) + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + + assert tokens.dtype == torch.long # Should remain long + assert attention_mask.dtype == torch.bool # Should be bool (converted in processor) + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +@require_package("transformers") +@patch("lerobot.processor.tokenizer_processor.AutoTokenizer") +def test_integration_with_device_processor(mock_auto_tokenizer): + """Test that TokenizerProcessor works correctly with DeviceProcessor in pipeline.""" + from lerobot.processor import DeviceProcessor + + mock_tokenizer = MockTokenizer(vocab_size=100) + mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer + + # Create pipeline with TokenizerProcessor then DeviceProcessor + tokenizer_processor = TokenizerProcessor(tokenizer_name="test-tokenizer", max_length=6) + device_processor = DeviceProcessor(device="cuda:0") + robot_processor = RobotProcessor([tokenizer_processor, device_processor]) + + # Start with CPU tensors + transition = create_transition( + observation={"observation.state": torch.randn(10)}, # CPU + action=torch.randn(5), # CPU + complementary_data={"task": "pipeline test"}, + ) + + result = robot_processor(transition) + + # All tensors should end up on CUDA (moved by DeviceProcessor) + assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cuda" + assert result[TransitionKey.ACTION].device.type == "cuda" + + # Tokenized tensors should also be on CUDA + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + assert tokens.device.type == "cuda" + assert attention_mask.device.type == "cuda" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +@require_package("transformers") +def test_simulated_accelerate_scenario(): + """Test scenario simulating Accelerate with data already on GPU.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10) + + # Simulate Accelerate scenario: batch already on GPU + device = torch.device("cuda:0") + observation = { + "observation.state": torch.randn(1, 10).to(device), # Batched, on GPU + "observation.image": torch.randn(1, 3, 224, 224).to(device), # Batched, on GPU + } + action = torch.randn(1, 5).to(device) # Batched, on GPU + + transition = create_transition( + observation=observation, + action=action, + complementary_data={"task": ["accelerate test"]}, # List for batched task + ) + + result = processor(transition) + + # Tokenized tensors should match GPU placement + tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] + attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] + + assert tokens.device == device + assert attention_mask.device == device + # MockTokenizer squeezes single-item batches, so shape is (max_length,) not (1, max_length) + assert tokens.shape == (10,) # MockTokenizer behavior for single string in list + assert attention_mask.shape == (10,)