mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-23 19:27:08 +00:00
73782447f2
* feat(train): FSDP checkpoint saving * adding docs for FSDP * adding a test for the fsdp checkpoint path * cleanup * fixing final upload to hub * refactored initial implementation to use torch fsdp api and adding new tests
309 lines
12 KiB
Python
309 lines
12 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Multi-GPU Training Tests
|
|
|
|
This module tests multi-GPU training functionality with accelerate.
|
|
These tests are designed to run on machines with 2+ GPUs and are executed
|
|
in the nightly CI workflow.
|
|
|
|
The tests automatically generate accelerate configs and launch training
|
|
with subprocess to properly test the distributed training environment.
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
import torch
|
|
|
|
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
|
|
|
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
|
|
|
|
|
def get_num_available_gpus():
|
|
"""Returns the number of available GPUs."""
|
|
if not torch.cuda.is_available():
|
|
return 0
|
|
return torch.cuda.device_count()
|
|
|
|
|
|
def download_dataset(repo_id, episodes):
|
|
"""
|
|
Pre-download dataset to avoid race conditions in multi-GPU training.
|
|
|
|
Args:
|
|
repo_id: HuggingFace dataset repository ID
|
|
episodes: List of episode indices to download
|
|
"""
|
|
# Simply instantiating the dataset will download it
|
|
_ = LeRobotDataset(repo_id, episodes=episodes)
|
|
print(f"Dataset {repo_id} downloaded successfully")
|
|
|
|
|
|
def _write_multi_gpu_config(f, num_processes):
|
|
f.write("compute_environment: LOCAL_MACHINE\n")
|
|
f.write("distributed_type: MULTI_GPU\n")
|
|
f.write("mixed_precision: 'no'\n")
|
|
f.write(f"num_processes: {num_processes}\n")
|
|
f.write("use_cpu: false\n")
|
|
f.write("gpu_ids: all\n")
|
|
f.write("downcast_bf16: 'no'\n")
|
|
f.write("machine_rank: 0\n")
|
|
f.write("main_training_function: main\n")
|
|
f.write("num_machines: 1\n")
|
|
f.write("rdzv_backend: static\n")
|
|
f.write("same_network: true\n")
|
|
|
|
|
|
def _write_fsdp_config(f, num_processes):
|
|
# FSDP1 with FULL_SHARD (ZeRO-3-equivalent) and FULL_STATE_DICT, matching
|
|
# docs/source/multi_gpu_training.mdx. ACT's repeated transformer blocks are the wrap units;
|
|
# fsdp_use_orig_params is required because LeRobot builds the optimizer before prepare().
|
|
f.write("compute_environment: LOCAL_MACHINE\n")
|
|
f.write("distributed_type: FSDP\n")
|
|
f.write("mixed_precision: 'no'\n")
|
|
f.write(f"num_processes: {num_processes}\n")
|
|
f.write("use_cpu: false\n")
|
|
f.write("gpu_ids: all\n")
|
|
f.write("machine_rank: 0\n")
|
|
f.write("main_training_function: main\n")
|
|
f.write("num_machines: 1\n")
|
|
f.write("rdzv_backend: static\n")
|
|
f.write("same_network: true\n")
|
|
f.write("fsdp_config:\n")
|
|
f.write(" fsdp_version: 1\n")
|
|
f.write(" fsdp_sharding_strategy: FULL_SHARD\n")
|
|
f.write(" fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP\n")
|
|
f.write(" fsdp_transformer_layer_cls_to_wrap: ACTEncoderLayer,ACTDecoderLayer\n")
|
|
f.write(" fsdp_use_orig_params: true\n")
|
|
f.write(" fsdp_state_dict_type: FULL_STATE_DICT\n")
|
|
|
|
|
|
def run_accelerate_training(config_args, num_processes=4, temp_dir=None, distributed_type="MULTI_GPU"):
|
|
"""
|
|
Helper function to run training with accelerate launch.
|
|
|
|
Args:
|
|
config_args: List of config arguments to pass to lerobot_train.py
|
|
num_processes: Number of processes (GPUs) to use
|
|
temp_dir: Temporary directory for outputs
|
|
distributed_type: "MULTI_GPU" (DDP) or "FSDP" — selects the generated accelerate config.
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess result
|
|
"""
|
|
|
|
config_path = Path(temp_dir) / "accelerate_config.yaml"
|
|
|
|
# Write YAML config
|
|
with open(config_path, "w") as f:
|
|
if distributed_type == "FSDP":
|
|
_write_fsdp_config(f, num_processes)
|
|
else:
|
|
_write_multi_gpu_config(f, num_processes)
|
|
|
|
cmd = [
|
|
"accelerate",
|
|
"launch",
|
|
"--config_file",
|
|
str(config_path),
|
|
"-m",
|
|
"lerobot.scripts.lerobot_train",
|
|
] + config_args
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
env={**os.environ, "CUDA_VISIBLE_DEVICES": ",".join(map(str, range(num_processes)))},
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
get_num_available_gpus() < 2,
|
|
reason="Multi-GPU tests require at least 2 GPUs",
|
|
)
|
|
class TestMultiGPUTraining:
|
|
"""Test suite for multi-GPU training functionality."""
|
|
|
|
def test_basic_multi_gpu_training(self):
|
|
"""
|
|
Test that basic multi-GPU training runs successfully.
|
|
Verifies that the training completes without errors.
|
|
"""
|
|
# Pre-download dataset to avoid race conditions
|
|
download_dataset("lerobot/pusht", episodes=[0])
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
output_dir = Path(temp_dir) / "outputs"
|
|
|
|
config_args = [
|
|
"--dataset.repo_id=lerobot/pusht",
|
|
"--dataset.episodes=[0]",
|
|
"--policy.type=act",
|
|
"--policy.device=cuda",
|
|
"--policy.push_to_hub=false",
|
|
f"--output_dir={output_dir}",
|
|
"--batch_size=4",
|
|
"--steps=10",
|
|
"--eval_freq=-1",
|
|
"--log_freq=5",
|
|
"--save_freq=10",
|
|
"--seed=42",
|
|
"--num_workers=0",
|
|
]
|
|
|
|
result = run_accelerate_training(config_args, num_processes=4, temp_dir=temp_dir)
|
|
|
|
# Check that training completed successfully
|
|
assert result.returncode == 0, (
|
|
f"Multi-GPU training failed with return code {result.returncode}\n"
|
|
f"STDOUT:\n{result.stdout}\n"
|
|
f"STDERR:\n{result.stderr}"
|
|
)
|
|
|
|
# Verify checkpoint was saved
|
|
checkpoints_dir = output_dir / "checkpoints"
|
|
assert checkpoints_dir.exists(), "Checkpoints directory was not created"
|
|
|
|
# Verify that training completed
|
|
assert "End of training" in result.stdout or "End of training" in result.stderr
|
|
|
|
def test_checkpoint_saving_multi_gpu(self):
|
|
"""
|
|
Test that checkpoints are correctly saved during multi-GPU training.
|
|
Only the main process (rank 0) should save checkpoints.
|
|
"""
|
|
# Pre-download dataset to avoid race conditions
|
|
download_dataset("lerobot/pusht", episodes=[0])
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
output_dir = Path(temp_dir) / "outputs"
|
|
|
|
config_args = [
|
|
"--dataset.repo_id=lerobot/pusht",
|
|
"--dataset.episodes=[0]",
|
|
"--policy.type=act",
|
|
"--policy.device=cuda",
|
|
"--policy.push_to_hub=false",
|
|
f"--output_dir={output_dir}",
|
|
"--batch_size=4",
|
|
"--steps=20",
|
|
"--eval_freq=-1",
|
|
"--log_freq=5",
|
|
"--save_freq=10",
|
|
"--seed=42",
|
|
"--num_workers=0",
|
|
]
|
|
|
|
result = run_accelerate_training(config_args, num_processes=2, temp_dir=temp_dir)
|
|
|
|
assert result.returncode == 0, (
|
|
f"Training failed:\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}"
|
|
)
|
|
|
|
# Verify checkpoint directory exists
|
|
checkpoints_dir = output_dir / "checkpoints"
|
|
assert checkpoints_dir.exists(), "Checkpoints directory not created"
|
|
|
|
# Count checkpoint directories (should have checkpoint at step 10 and 20)
|
|
checkpoint_dirs = [d for d in checkpoints_dir.iterdir() if d.is_dir()]
|
|
assert len(checkpoint_dirs) >= 1, f"Expected at least 1 checkpoint, found {len(checkpoint_dirs)}"
|
|
|
|
# Verify checkpoint contents
|
|
for checkpoint_dir in checkpoint_dirs:
|
|
# Check for model files
|
|
model_files = list(checkpoint_dir.rglob("*.safetensors"))
|
|
assert len(model_files) > 0, f"No model files in checkpoint {checkpoint_dir}"
|
|
|
|
# Check for training state
|
|
training_state_dir = checkpoint_dir / "training_state"
|
|
assert training_state_dir.exists(), f"No training state in checkpoint {checkpoint_dir}"
|
|
|
|
# Verify optimizer state exists
|
|
optimizer_state = training_state_dir / "optimizer_state.safetensors"
|
|
assert optimizer_state.exists(), f"No optimizer state in checkpoint {checkpoint_dir}"
|
|
|
|
def test_fsdp_optimizer_save_and_resume(self):
|
|
"""
|
|
Test that FSDP saves the (gathered) optimizer state and can resume from it.
|
|
|
|
Trains a few steps under FSDP, verifies the gathered optimizer state is written next to the
|
|
rest of the training state, then resumes from the checkpoint for more steps and checks it
|
|
completes without shape/key errors in the FSDP optimizer load path.
|
|
"""
|
|
# Pre-download dataset to avoid race conditions
|
|
download_dataset("lerobot/pusht", episodes=[0])
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
output_dir = Path(temp_dir) / "outputs"
|
|
|
|
config_args = [
|
|
"--dataset.repo_id=lerobot/pusht",
|
|
"--dataset.episodes=[0]",
|
|
"--policy.type=act",
|
|
"--policy.device=cuda",
|
|
"--policy.push_to_hub=false",
|
|
f"--output_dir={output_dir}",
|
|
"--batch_size=4",
|
|
"--steps=10",
|
|
"--eval_freq=-1",
|
|
"--log_freq=5",
|
|
"--save_freq=10",
|
|
"--seed=42",
|
|
"--num_workers=0",
|
|
]
|
|
|
|
result = run_accelerate_training(
|
|
config_args, num_processes=2, temp_dir=temp_dir, distributed_type="FSDP"
|
|
)
|
|
assert result.returncode == 0, (
|
|
f"FSDP training failed:\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}"
|
|
)
|
|
|
|
# The gathered optimizer state must be written under FSDP (proves the save collective ran),
|
|
# in the same safetensors format as single-GPU training.
|
|
training_state_dir = output_dir / "checkpoints" / "last" / "training_state"
|
|
optimizer_state = training_state_dir / "optimizer_state.safetensors"
|
|
optimizer_param_groups = training_state_dir / "optimizer_param_groups.json"
|
|
assert optimizer_state.exists(), f"FSDP optimizer state not saved in {training_state_dir}"
|
|
assert optimizer_param_groups.exists(), (
|
|
f"FSDP optimizer param groups not saved in {training_state_dir}"
|
|
)
|
|
|
|
# Resume from the checkpoint for more steps. A successful run proves load_fsdp_optimizer
|
|
# accepts the saved state and reshards it without shape/key errors.
|
|
resume_config = output_dir / "checkpoints" / "last" / "pretrained_model" / "train_config.json"
|
|
resume_args = [
|
|
f"--config_path={resume_config}",
|
|
"--resume=true",
|
|
"--steps=20",
|
|
]
|
|
resume_result = run_accelerate_training(
|
|
resume_args, num_processes=2, temp_dir=temp_dir, distributed_type="FSDP"
|
|
)
|
|
assert resume_result.returncode == 0, (
|
|
f"FSDP resume failed:\nSTDOUT:\n{resume_result.stdout}\n\nSTDERR:\n{resume_result.stderr}"
|
|
)
|
|
assert "End of training" in resume_result.stdout or "End of training" in resume_result.stderr
|