diff --git a/accelerate_configs/1gpu_config.yaml b/accelerate_configs/1gpu_config.yaml new file mode 100644 index 000000000..156f60d6e --- /dev/null +++ b/accelerate_configs/1gpu_config.yaml @@ -0,0 +1,11 @@ +compute_environment: LOCAL_MACHINE +debug: false +distributed_type: NO +downcast_bf16: 'no' +enable_cpu_affinity: false +machine_rank: 0 +main_training_function: main +mixed_precision: 'no' +num_machines: 1 +num_processes: 1 +use_cpu: false diff --git a/accelerate_configs/2gpu_config_safe.yaml b/accelerate_configs/2gpu_config_safe.yaml new file mode 100644 index 000000000..6a4f83f11 --- /dev/null +++ b/accelerate_configs/2gpu_config_safe.yaml @@ -0,0 +1,18 @@ +compute_environment: LOCAL_MACHINE +debug: false +distributed_type: MULTI_GPU +downcast_bf16: 'no' +enable_cpu_affinity: false +gpu_ids: all +machine_rank: 0 +main_training_function: main +mixed_precision: 'no' +num_machines: 1 +num_processes: 2 +rdzv_backend: static +same_network: true +tpu_env: [] +tpu_use_cluster: false +tpu_use_sudo: false +use_cpu: false +dynamo_backend: "no" diff --git a/src/lerobot/scripts/eval.py b/src/lerobot/scripts/eval.py index 13d30c686..2821d4c9b 100644 --- a/src/lerobot/scripts/eval.py +++ b/src/lerobot/scripts/eval.py @@ -243,7 +243,11 @@ def eval_policy( if max_episodes_rendered > 0 and not videos_dir: raise ValueError("If max_episodes_rendered > 0, videos_dir must be provided.") - if not isinstance(policy, PreTrainedPolicy): + # Handle accelerate-wrapped models by unwrapping them + if hasattr(policy, 'module') and isinstance(policy.module, PreTrainedPolicy): + # This is likely an accelerate-wrapped model (DistributedDataParallel) + policy = policy.module + elif not isinstance(policy, PreTrainedPolicy): raise ValueError( f"Policy of type 'PreTrainedPolicy' is expected, but type '{type(policy)}' was provided." ) diff --git a/src/lerobot/scripts/train.py b/src/lerobot/scripts/train.py index 235352cd8..2b57f74a2 100644 --- a/src/lerobot/scripts/train.py +++ b/src/lerobot/scripts/train.py @@ -16,6 +16,7 @@ import logging import time from contextlib import nullcontext +from functools import partial from pprint import pformat from typing import Any @@ -23,6 +24,8 @@ import torch from termcolor import colored from torch.amp import GradScaler from torch.optim import Optimizer +import os +from datetime import timedelta from lerobot.configs import parser from lerobot.configs.train import TrainPipelineConfig @@ -52,6 +55,8 @@ from lerobot.utils.utils import ( ) from lerobot.utils.wandb_utils import WandBLogger +def is_launched_with_accelerate() -> bool: + return "ACCELERATE_MIXED_PRECISION" in os.environ def update_policy( train_metrics: MetricsTracker, @@ -59,36 +64,65 @@ def update_policy( batch: Any, optimizer: Optimizer, grad_clip_norm: float, - grad_scaler: GradScaler, + grad_scaler: GradScaler | None, lr_scheduler=None, use_amp: bool = False, lock=None, + accelerator=None, ) -> tuple[MetricsTracker, dict]: start_time = time.perf_counter() device = get_device_from_parameters(policy) policy.train() - with torch.autocast(device_type=device.type) if use_amp else nullcontext(): - loss, output_dict = policy.forward(batch) + + grad_norm = 0.0 # Initialize grad_norm to avoid undefined variable + + if accelerator: + with accelerator.accumulate(policy): + with torch.autocast(device_type=device.type) if use_amp else nullcontext(): + loss, output_dict = policy.forward(batch) + # TODO(rcadene): policy.unnormalize_outputs(out_dict) + accelerator.backward(loss) + if accelerator.sync_gradients: + grad_norm = torch.nn.utils.clip_grad_norm_( + policy.parameters(), + grad_clip_norm, + error_if_nonfinite=False, + ) + optimizer.step() + optimizer.zero_grad() + else: + # Standard training loop without accelerate + with torch.autocast(device_type=device.type) if use_amp else nullcontext(): + loss, output_dict = policy.forward(batch) # TODO(rcadene): policy.unnormalize_outputs(out_dict) - grad_scaler.scale(loss).backward() + + if grad_scaler is not None: + grad_scaler.scale(loss).backward() + # Unscale the gradient of the optimizer's assigned params in-place **prior to gradient clipping**. + grad_scaler.unscale_(optimizer) + grad_norm = torch.nn.utils.clip_grad_norm_( + policy.parameters(), + grad_clip_norm, + error_if_nonfinite=False, + ) + # Optimizer's gradients are already unscaled, so scaler.step does not unscale them, + # although it still skips optimizer.step() if the gradients contain infs or NaNs. + with lock if lock is not None else nullcontext(): + grad_scaler.step(optimizer) + # Updates the scale for next iteration. + grad_scaler.update() + else: + # Without GradScaler (fallback) + loss.backward() + grad_norm = torch.nn.utils.clip_grad_norm_( + policy.parameters(), + grad_clip_norm, + error_if_nonfinite=False, + ) + with lock if lock is not None else nullcontext(): + optimizer.step() - # Unscale the gradient of the optimizer's assigned params in-place **prior to gradient clipping**. - grad_scaler.unscale_(optimizer) - - grad_norm = torch.nn.utils.clip_grad_norm_( - policy.parameters(), - grad_clip_norm, - error_if_nonfinite=False, - ) - - # Optimizer's gradients are already unscaled, so scaler.step does not unscale them, - # although it still skips optimizer.step() if the gradients contain infs or NaNs. - with lock if lock is not None else nullcontext(): - grad_scaler.step(optimizer) - # Updates the scale for next iteration. - grad_scaler.update() - - optimizer.zero_grad() + optimizer.zero_grad() # Step through pytorch scheduler at every batch instead of epoch if lr_scheduler is not None: @@ -99,7 +133,7 @@ def update_policy( policy.update() train_metrics.loss = loss.item() - train_metrics.grad_norm = grad_norm.item() + train_metrics.grad_norm = grad_norm.item() if isinstance(grad_norm, torch.Tensor) else grad_norm train_metrics.lr = optimizer.param_groups[0]["lr"] train_metrics.update_s = time.perf_counter() - start_time return train_metrics, output_dict @@ -108,8 +142,33 @@ def update_policy( @parser.wrap() def train(cfg: TrainPipelineConfig): cfg.validate() + + accelerator = None + if is_launched_with_accelerate(): + import accelerate + + # For example pi0 has unused params (last llm block) + from accelerate import DistributedDataParallelKwargs + ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=True) + # accelerator = accelerate.Accelerator(step_scheduler_with_optimizer=False, kwargs_handlers=[ddp_kwargs]) + from accelerate import InitProcessGroupKwargs + # Set NCCL timeout (default 30 minutes = 1800 seconds) + nccl_timeout = getattr(cfg, 'nccl_timeout', 1800) + ddp_init_kwargs = InitProcessGroupKwargs(timeout=timedelta(seconds=nccl_timeout)) # FIXME(mshukor): allow user to set timeout. This should be longer than the evaluation time + # Set gradient accumulation steps (default 1) + gradient_accumulation_steps = getattr(cfg, 'gradient_accumulation_steps', 1) + accelerator = accelerate.Accelerator(step_scheduler_with_optimizer=False, gradient_accumulation_steps=gradient_accumulation_steps, kwargs_handlers=[ddp_init_kwargs, ddp_kwargs]) + if accelerator is not None and not accelerator.is_main_process: + # Disable duplicate logging on non-main processes + logging.info(f"Setting logging level on non-main process {accelerator.process_index} to WARNING.") + logging.getLogger().setLevel(logging.WARNING) + logging.info(pformat(cfg.to_dict())) + if accelerator and not accelerator.is_main_process: + # Disable logging on non-main processes. + cfg.wandb.enable = False + if cfg.wandb.enable and cfg.wandb.project: wandb_logger = WandBLogger(cfg) else: @@ -143,7 +202,8 @@ def train(cfg: TrainPipelineConfig): logging.info("Creating optimizer and scheduler") optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy) - grad_scaler = GradScaler(device.type, enabled=cfg.policy.use_amp) + # Only use GradScaler when not using accelerate (accelerate handles mixed precision internally) + grad_scaler = None if accelerator else GradScaler(device.type, enabled=cfg.policy.use_amp) step = 0 # number of policy updates (forward + backward + optim) @@ -185,6 +245,11 @@ def train(cfg: TrainPipelineConfig): ) dl_iter = cycle(dataloader) + # Prepare models for accelerate if using multi-GPU + if accelerator: + policy, optimizer, dataloader = accelerator.prepare(policy, optimizer, dataloader) + dl_iter = cycle(dataloader) + policy.train() train_metrics = { @@ -205,9 +270,10 @@ def train(cfg: TrainPipelineConfig): batch = next(dl_iter) train_tracker.dataloading_s = time.perf_counter() - start_time - for key in batch: - if isinstance(batch[key], torch.Tensor): - batch[key] = batch[key].to(device, non_blocking=device.type == "cuda") + if not accelerator: + for key in batch: + if isinstance(batch[key], torch.Tensor): + batch[key] = batch[key].to(device, non_blocking=device.type == "cuda") train_tracker, output_dict = update_policy( train_tracker, @@ -218,6 +284,7 @@ def train(cfg: TrainPipelineConfig): grad_scaler=grad_scaler, lr_scheduler=lr_scheduler, use_amp=cfg.policy.use_amp, + accelerator=accelerator, ) # Note: eval and checkpoint happens *after* the `step`th training update has completed, so we @@ -237,15 +304,17 @@ def train(cfg: TrainPipelineConfig): wandb_logger.log_dict(wandb_log_dict, step) train_tracker.reset_averages() - if cfg.save_checkpoint and is_saving_step: + if cfg.save_checkpoint and is_saving_step and (not accelerator or accelerator.is_main_process): logging.info(f"Checkpoint policy after step {step}") checkpoint_dir = get_step_checkpoint_dir(cfg.output_dir, cfg.steps, step) - save_checkpoint(checkpoint_dir, step, cfg, policy, optimizer, lr_scheduler) + # Unwrap model for accelerate + policy_to_save = accelerator.unwrap_model(policy) if accelerator else policy + save_checkpoint(checkpoint_dir, step, cfg, policy_to_save, optimizer, lr_scheduler) update_last_checkpoint(checkpoint_dir) if wandb_logger: wandb_logger.log_policy(checkpoint_dir) - if cfg.env and is_eval_step: + if cfg.env and is_eval_step and (not accelerator or accelerator.is_main_process): step_id = get_step_identifier(step, cfg.steps) logging.info(f"Eval policy at step {step}") with ( @@ -254,7 +323,7 @@ def train(cfg: TrainPipelineConfig): ): eval_info = eval_policy( eval_env, - policy, + accelerator.unwrap_model(policy) if accelerator else policy, cfg.eval.n_episodes, videos_dir=cfg.output_dir / "eval" / f"videos_step_{step_id}", max_episodes_rendered=4, diff --git a/src/lerobot/utils/train_utils.py b/src/lerobot/utils/train_utils.py index 2859fe057..3e9693318 100644 --- a/src/lerobot/utils/train_utils.py +++ b/src/lerobot/utils/train_utils.py @@ -60,11 +60,39 @@ def load_training_step(save_dir: Path) -> int: def update_last_checkpoint(checkpoint_dir: Path) -> Path: + import fcntl + import tempfile + import os + last_checkpoint_dir = checkpoint_dir.parent / LAST_CHECKPOINT_LINK - if last_checkpoint_dir.is_symlink(): - last_checkpoint_dir.unlink() relative_target = checkpoint_dir.relative_to(checkpoint_dir.parent) - last_checkpoint_dir.symlink_to(relative_target) + + # Use file locking to prevent race conditions in multi-GPU training + lock_file = checkpoint_dir.parent / ".symlink_lock" + + try: + with open(lock_file, 'w') as f: + # Get exclusive lock + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + + # Update symlink atomically + if last_checkpoint_dir.exists() or last_checkpoint_dir.is_symlink(): + last_checkpoint_dir.unlink() + last_checkpoint_dir.symlink_to(relative_target) + + except (OSError, FileExistsError) as e: + # Handle race conditions gracefully - another process may have already updated + if not last_checkpoint_dir.exists(): + try: + last_checkpoint_dir.symlink_to(relative_target) + except FileExistsError: + pass # Another process created it, that's fine + finally: + # Clean up lock file + try: + lock_file.unlink() + except FileNotFoundError: + pass def save_checkpoint( diff --git a/test_accelerate_1gpu_local.sh b/test_accelerate_1gpu_local.sh new file mode 100755 index 000000000..2122586bb --- /dev/null +++ b/test_accelerate_1gpu_local.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +echo "=== Local 1-GPU Accelerate Training Test with SmolVLA ===" +echo "Environment: multi" +echo "GPU: 1" +echo "Steps: 50 (quick local test)" +echo "" + +# Activate conda environment +source /fsx/dana_aubakirova/miniconda3/etc/profile.d/conda.sh +conda activate multi + +# Set CUDA environment for 1 GPU +export CUDA_VISIBLE_DEVICES=0 +export PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:128,expandable_segments:True +export TORCH_DISTRIBUTED_DEBUG=OFF +export CUDA_LAUNCH_BLOCKING=0 +export TRANSFORMERS_NO_ADVISORY_WARNINGS=1 + +# Change to working directory +cd /fsx/dana_aubakirova/vla/pr/lerobot + +# Set output directory with timestamp +export OUTPUT_DIR="outputs/test_accelerate_1gpu_local_$(date +%Y%m%d_%H%M%S)" + +echo "Output directory: $OUTPUT_DIR" +echo "" + +# Test accelerate training with 1 GPU +accelerate launch --config_file accelerate_configs/1gpu_config.yaml -m lerobot.scripts.train \ + --policy.path=lerobot/smolvla_base \ + --policy.push_to_hub=false \ + --dataset.repo_id=lerobot/svla_so100_sorting \ + --dataset.video_backend=pyav \ + --steps=50 \ + --save_freq=25 \ + --log_freq=5 \ + --batch_size=1 \ + --num_workers=0 \ + --output_dir=$OUTPUT_DIR \ + --wandb.enable=false + +echo "" +echo "=== Training completed! ===" +echo "Check outputs in: $OUTPUT_DIR" diff --git a/test_accelerate_2gpu.slurm b/test_accelerate_2gpu.slurm new file mode 100644 index 000000000..b2a22fccb --- /dev/null +++ b/test_accelerate_2gpu.slurm @@ -0,0 +1,67 @@ +#!/bin/bash +#SBATCH --job-name=test_accelerate +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=16 +#SBATCH --gres=gpu:2 +#SBATCH --time=1:00:00 +#SBATCH --partition=hopper-prod +#SBATCH --output=/fsx/dana_aubakirova/vla/logs/test_accelerate_%j.out +#SBATCH --error=/fsx/dana_aubakirova/vla/logs/test_accelerate_%j.err + +# Create logs directory if it doesn't exist +mkdir -p /fsx/dana_aubakirova/vla/pr/lerobot/logs + +# Activate conda environment +source /fsx/dana_aubakirova/miniconda3/etc/profile.d/conda.sh +conda activate multi + +# 2-GPU Test CUDA environment +export CUDA_VISIBLE_DEVICES=0,1 +export PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:128,expandable_segments:True +export TORCH_DISTRIBUTED_DEBUG=OFF +export NCCL_DEBUG=INFO +export CUDA_LAUNCH_BLOCKING=0 +export ACCELERATE_USE_FSDP=false +export ACCELERATE_USE_DEEPSPEED=false +export HF_ACCELERATE_DEVICE_MAP=false +export TRANSFORMERS_NO_ADVISORY_WARNINGS=1 +export SAFETENSORS_FAST_GPU=1 +export HF_HUB_ENABLE_HF_TRANSFER=1 +export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True +export ACCELERATE_TORCH_DEVICE_MAP_AUTO=false + +# Change to working directory +cd /fsx/dana_aubakirova/vla/pr/lerobot + +echo "=== Testing Accelerate Multi-GPU Training with SmolVLA ===" +echo "Dataset: lerobot/svla_so100_sorting" +echo "GPUs: 2" +echo "Steps: 100 (for quick test)" +echo "Job ID: $SLURM_JOB_ID" +echo "" + +# Set output directory with job ID +export OUTPUT_DIR="outputs/test_accelerate_2gpu_job_${SLURM_JOB_ID}" + +echo "Output directory: $OUTPUT_DIR" +echo "" + +# Test accelerate training +accelerate launch --config_file accelerate_configs/2gpu_config_safe.yaml -m lerobot.scripts.train \ + --policy.type=smolvla \ + --policy.push_to_hub=false \ + --dataset.repo_id=lerobot/svla_so100_sorting \ + --dataset.video_backend=pyav \ + --steps=100 \ + --save_freq=50 \ + --log_freq=5 \ + --batch_size=2 \ + --num_workers=0 \ + --output_dir=$OUTPUT_DIR \ + --wandb.enable=false + +echo "" +echo "=== Training completed! ===" +echo "Check logs and outputs in: $OUTPUT_DIR" +echo "Job ID: $SLURM_JOB_ID" diff --git a/test_direct_1gpu_local.sh b/test_direct_1gpu_local.sh new file mode 100755 index 000000000..2f3ec787b --- /dev/null +++ b/test_direct_1gpu_local.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +echo "=== Direct 1-GPU Training Test with SmolVLA (no accelerate) ===" +echo "Environment: multi" +echo "GPU: 1" +echo "Steps: 50 (quick local test)" +echo "" + +# Activate conda environment +source /fsx/dana_aubakirova/miniconda3/etc/profile.d/conda.sh +conda activate multi + +# Set CUDA environment for 1 GPU +export CUDA_VISIBLE_DEVICES=0 +export PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:128,expandable_segments:True +export TORCH_DISTRIBUTED_DEBUG=OFF +export CUDA_LAUNCH_BLOCKING=0 +export TRANSFORMERS_NO_ADVISORY_WARNINGS=1 + +# Change to working directory +cd /fsx/dana_aubakirova/vla/pr/lerobot + +# Set output directory with timestamp +export OUTPUT_DIR="outputs/test_direct_1gpu_local_$(date +%Y%m%d_%H%M%S)" + +echo "Output directory: $OUTPUT_DIR" +echo "" + +# Test direct training with 1 GPU (no accelerate) +python -m lerobot.scripts.train \ + --policy.path=lerobot/smolvla_base \ + --policy.push_to_hub=false \ + --dataset.repo_id=lerobot/svla_so100_sorting \ + --dataset.video_backend=pyav \ + --steps=50 \ + --save_freq=25 \ + --log_freq=5 \ + --batch_size=1 \ + --num_workers=0 \ + --output_dir=$OUTPUT_DIR \ + --wandb.enable=false + +echo "" +echo "=== Training completed! ===" +echo "Check outputs in: $OUTPUT_DIR"