diff --git a/src/lerobot/datasets/compute_stats.py b/src/lerobot/datasets/compute_stats.py index bf466d5a7..10a305912 100644 --- a/src/lerobot/datasets/compute_stats.py +++ b/src/lerobot/datasets/compute_stats.py @@ -163,12 +163,13 @@ def aggregate_feature_stats(stats_ft_list: list[dict[str, dict]]) -> dict[str, d delta_means = means - total_mean weighted_variances = (variances + delta_means**2) * counts total_variance = weighted_variances.sum(axis=0) / total_count + total_std = np.sqrt(total_variance) return { "min": np.min(np.stack([s["min"] for s in valid_stats]), axis=0), "max": np.max(np.stack([s["max"] for s in valid_stats]), axis=0), "mean": total_mean, - "std": np.sqrt(total_variance), + "std": total_std, "count": total_count, } diff --git a/src/lerobot/datasets/factory.py b/src/lerobot/datasets/factory.py index 48c0b1c39..d40116fa4 100644 --- a/src/lerobot/datasets/factory.py +++ b/src/lerobot/datasets/factory.py @@ -155,7 +155,15 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas f"{pformat(dataset.repo_id_to_index, indent=2)}" ) if cfg.dataset.use_imagenet_stats: + # Initialize stats structure if it doesn't exist + if dataset.meta.stats is None: + dataset.meta.stats = {} + for key in dataset.meta.camera_keys: + # Initialize stats for this camera key if it doesn't exist + if key not in dataset.meta.stats or dataset.meta.stats[key] is None: + dataset.meta.stats[key] = {} + for stats_type, stats in IMAGENET_STATS.items(): dataset.meta.stats[key][stats_type] = torch.tensor(stats, dtype=torch.float32) diff --git a/src/lerobot/datasets/lerobot_dataset.py b/src/lerobot/datasets/lerobot_dataset.py index 205de6901..f77b798e4 100644 --- a/src/lerobot/datasets/lerobot_dataset.py +++ b/src/lerobot/datasets/lerobot_dataset.py @@ -155,12 +155,13 @@ class LeRobotDatasetMetadata: check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION) self.tasks, self.task_to_task_index = load_tasks(self.root) self.episodes = load_episodes(self.root) - if self._version < packaging.version.parse("v2.1"): - self.stats = load_stats(self.root) - self.episodes_stats = backward_compatible_episodes_stats(self.stats, self.episodes) - else: - self.episodes_stats = load_episodes_stats(self.root) - self.stats = aggregate_stats(list(self.episodes_stats.values())) + # Force all datasets to use v2.1 format (episodes_stats.jsonl) to avoid missing stats.json issues, because I converted all the datasets to v2.1 format. + # if self._version < packaging.version.parse("v2.1"): + # self.stats = load_stats(self.root) + # self.episodes_stats = backward_compatible_episodes_stats(self.stats, self.episodes) + # else: + self.episodes_stats = load_episodes_stats(self.root) + self.stats = aggregate_stats(list(self.episodes_stats.values())) def pull_from_repo( self, @@ -400,6 +401,7 @@ class LeRobotDataset(torch.utils.data.Dataset): force_cache_sync: bool = False, download_videos: bool = True, video_backend: str | None = None, + local_files_only: bool = False, # new thing by M feature_keys_mapping: dict[str, str] | None = None, max_action_dim: int = None, @@ -550,7 +552,8 @@ class LeRobotDataset(torch.utils.data.Dataset): self.meta = LeRobotDatasetMetadata( self.repo_id, self.root, - self.revision, + local_files_only=local_files_only, + revision=self.revision, force_cache_sync=force_cache_sync, feature_keys_mapping=feature_keys_mapping, ) @@ -787,6 +790,11 @@ class LeRobotDataset(torch.utils.data.Dataset): return get_hf_features_from_features(self.features) def _get_query_indices(self, idx: int, ep_idx: int) -> tuple[dict[str, list[int | bool]]]: + # Bounds check to prevent IndexError when episode_index is out of range + if ep_idx >= len(self.episode_data_index["from"]): + # Fall back to the last valid episode + ep_idx = len(self.episode_data_index["from"]) - 1 + ep_start = self.episode_data_index["from"][ep_idx] ep_end = self.episode_data_index["to"][ep_idx] query_indices = { diff --git a/src/lerobot/datasets/utils_must.py b/src/lerobot/datasets/utils_must.py index 9302e9ad1..11af90121 100644 --- a/src/lerobot/datasets/utils_must.py +++ b/src/lerobot/datasets/utils_must.py @@ -68,6 +68,7 @@ def keep_datasets_with_the_same_features_per_robot_type(ls_datasets: list) -> li for ds in ls_datasets if ds.meta.info["robot_type"] == robot_type for ep_stats in ds.meta.episodes_stats.values() + if ep_stats is not None # Filter out None values ] if not stats_list: continue @@ -133,10 +134,16 @@ def aggregate_stats_per_robot_type(ls_datasets) -> dict[str, dict[str, torch.Ten robot_type_datasets = [] for ds in ls_datasets: if ds.meta.info["robot_type"] == robot_type: - robot_type_datasets.extend(list(ds.meta.episodes_stats.values())) + # Filter out None values from episodes_stats to handle missing stats + valid_episodes_stats = [stats for stats in ds.meta.episodes_stats.values() if stats is not None] + robot_type_datasets.extend(valid_episodes_stats) # robot_type_datasets = [list(ds.episodes_stats.values()) for ds in ls_datasets if ds.meta.info["robot_type"] == robot_type] - stat = aggregate_stats(robot_type_datasets) - stats[robot_type] = stat + if robot_type_datasets: # Only aggregate if we have valid stats + stat = aggregate_stats(robot_type_datasets) + stats[robot_type] = stat + else: + print(f"Warning: No valid episode stats found for robot type {robot_type}, skipping aggregation") + stats[robot_type] = {} return stats diff --git a/src/lerobot/datasets/v21/convert_stats.py b/src/lerobot/datasets/v21/convert_stats.py index 462781c15..b18cf64c4 100644 --- a/src/lerobot/datasets/v21/convert_stats.py +++ b/src/lerobot/datasets/v21/convert_stats.py @@ -43,14 +43,32 @@ def convert_episode_stats(dataset: LeRobotDataset, ep_idx: int): else: ep_ft_data = np.array(ep_data[key]) - axes_to_reduce = (0, 2, 3) if ft["dtype"] in ["image", "video"] else 0 - keepdims = True if ft["dtype"] in ["image", "video"] else ep_ft_data.ndim == 1 + if ft["dtype"] in ["image", "video"]: + # Handle variable dimensions for image/video data + # Expected formats: (frames, channels, height, width) or (channels, height, width) + if ep_ft_data.ndim == 4: + # Standard case: (frames, channels, height, width) + axes_to_reduce = (0, 2, 3) # reduce over frames, height, width + elif ep_ft_data.ndim == 3: + # Squeezed case: (channels, height, width) - single frame + axes_to_reduce = (1, 2) # reduce over height, width + else: + raise ValueError(f"Unexpected dimensions for {ft['dtype']} data: {ep_ft_data.shape}") + keepdims = True + else: + axes_to_reduce = 0 + keepdims = ep_ft_data.ndim == 1 ep_stats[key] = get_feature_stats(ep_ft_data, axis=axes_to_reduce, keepdims=keepdims) if ft["dtype"] in ["image", "video"]: # remove batch dim - ep_stats[key] = { - k: v if k == "count" else np.squeeze(v, axis=0) for k, v in ep_stats[key].items() - } + if ep_ft_data.ndim == 4: + # For 4D data, squeeze the first axis (batch/frames) + ep_stats[key] = { + k: v if k == "count" else np.squeeze(v, axis=0) for k, v in ep_stats[key].items() + } + elif ep_ft_data.ndim == 3: + # For 3D data, the stats already have correct shape (channels,) + pass dataset.meta.episodes_stats[ep_idx] = ep_stats diff --git a/src/lerobot/policies/factory.py b/src/lerobot/policies/factory.py index c3a8aa80f..7e46ce246 100644 --- a/src/lerobot/policies/factory.py +++ b/src/lerobot/policies/factory.py @@ -154,7 +154,18 @@ def make_policy( kwargs = {} if ds_meta is not None: features = dataset_to_policy_features(ds_meta.features) - kwargs["dataset_stats"] = ds_meta.stats + # Handle robot-type grouped stats - flatten to feature-level stats + if ds_meta.stats and len(ds_meta.stats) == 1: + # Single robot type - use its stats directly + robot_type = list(ds_meta.stats.keys())[0] + kwargs["dataset_stats"] = ds_meta.stats[robot_type] + elif ds_meta.stats and len(ds_meta.stats) > 1: + # Multiple robot types - need to aggregate across all robot types + # For now, use the first robot type (TODO: proper multi-robot handling) + robot_type = list(ds_meta.stats.keys())[0] + kwargs["dataset_stats"] = ds_meta.stats[robot_type] + else: + kwargs["dataset_stats"] = ds_meta.stats else: if not cfg.pretrained_path: logging.warning( diff --git a/src/lerobot/policies/normalize.py b/src/lerobot/policies/normalize.py index 9cc94b929..38da0a99b 100644 --- a/src/lerobot/policies/normalize.py +++ b/src/lerobot/policies/normalize.py @@ -79,7 +79,7 @@ def create_stats_buffers( ) # TODO(aliberts, rcadene): harmonize this to only use one framework (np or torch) - if stats: + if stats and key in stats: if isinstance(stats[key]["mean"], np.ndarray): if norm_mode is NormalizationMode.MEAN_STD: buffer["mean"].data = torch.from_numpy(stats[key]["mean"]).to(dtype=torch.float32) diff --git a/src/lerobot/policies/smolvla2/modeling_smolvla2.py b/src/lerobot/policies/smolvla2/modeling_smolvla2.py index 9670781f9..85639800e 100644 --- a/src/lerobot/policies/smolvla2/modeling_smolvla2.py +++ b/src/lerobot/policies/smolvla2/modeling_smolvla2.py @@ -357,6 +357,11 @@ class SmolVLA2Policy(PreTrainedPolicy): self.language_tokenizer = AutoProcessor.from_pretrained(self.config.vlm_model_name).tokenizer self.model = VLAFlowMatching(config) + + # Set up image processing attributes + self.include_past_images = self.config.n_obs_steps > 1 and "image" in self.config.past_obs_keys.split(",") + self.num_past_images = self.config.n_obs_steps if self.include_past_images else 1 + self.reset() def reset(self): @@ -1003,7 +1008,7 @@ class VLAFlowMatching(nn.Module): prefix_embs, prefix_pad_masks, prefix_att_masks = self.embed_prefix( images, img_masks, lang_tokens, lang_masks, state=state ) - suffix_embs, suffix_pad_masks, suffix_att_masks = self.embed_suffix(x_t, time) + suffix_embs, suffix_pad_masks, suffix_att_masks = self.embed_suffix(state, x_t, time) pad_masks = torch.cat([prefix_pad_masks, suffix_pad_masks], dim=1) att_masks = torch.cat([prefix_att_masks, suffix_att_masks], dim=1) diff --git a/src/lerobot/policies/smolvla2/smolvlm_with_expert2.py b/src/lerobot/policies/smolvla2/smolvlm_with_expert2.py index 8196bb4b8..07353b142 100644 --- a/src/lerobot/policies/smolvla2/smolvlm_with_expert2.py +++ b/src/lerobot/policies/smolvla2/smolvlm_with_expert2.py @@ -76,9 +76,14 @@ class SmolVLMWithExpertModel(nn.Module): super().__init__() if load_vlm_weights: print(f"Loading {model_id} weights ...") + # Disable device_map when using Accelerate for multi-GPU training + import os + use_accelerate = os.environ.get('ACCELERATE_USE_FSDP', 'false').lower() != 'true' and \ + 'ACCELERATE_CONFIG_FILE' in os.environ + device_map = None if use_accelerate else "auto" self.vlm = AutoModelForImageTextToText.from_pretrained( model_id, - device_map="auto", + device_map=device_map, torch_dtype="bfloat16", low_cpu_mem_usage=True, ) diff --git a/src/lerobot/scripts/eval.py b/src/lerobot/scripts/eval.py index d85ac27b3..41910be6a 100644 --- a/src/lerobot/scripts/eval.py +++ b/src/lerobot/scripts/eval.py @@ -243,7 +243,11 @@ def eval_policy( if max_episodes_rendered > 0 and not videos_dir: raise ValueError("If max_episodes_rendered > 0, videos_dir must be provided.") - if not isinstance(policy, PreTrainedPolicy): + # Handle accelerate-wrapped models by unwrapping them + if hasattr(policy, 'module') and isinstance(policy.module, PreTrainedPolicy): + # This is likely an accelerate-wrapped model (DistributedDataParallel) + policy = policy.module + elif not isinstance(policy, PreTrainedPolicy): raise ValueError( f"Policy of type 'PreTrainedPolicy' is expected, but type '{type(policy)}' was provided." ) diff --git a/src/lerobot/scripts/train.py b/src/lerobot/scripts/train.py index 2caf8c18f..921a927f4 100644 --- a/src/lerobot/scripts/train.py +++ b/src/lerobot/scripts/train.py @@ -24,6 +24,9 @@ import torch from termcolor import colored from torch.amp import GradScaler from torch.optim import Optimizer +import os +from datetime import timedelta + from lerobot.configs import parser from lerobot.configs.train import TrainPipelineConfig @@ -54,6 +57,8 @@ from lerobot.utils.utils import ( ) from lerobot.utils.wandb_utils import WandBLogger +def is_launched_with_accelerate() -> bool: + return "ACCELERATE_MIXED_PRECISION" in os.environ def update_policy( train_metrics: MetricsTracker, @@ -65,41 +70,55 @@ def update_policy( lr_scheduler=None, use_amp: bool = False, lock=None, + accelerator=None, ) -> tuple[MetricsTracker, dict]: start_time = time.perf_counter() device = get_device_from_parameters(policy) policy.train() - with torch.autocast(device_type=device.type) if use_amp else nullcontext(): - loss, output_dict = policy.forward(batch) + + grad_norm = 0.0 # Initialize grad_norm to avoid undefined variable + + if accelerator: + with accelerator.accumulate(policy): + with torch.autocast(device_type=device.type) if use_amp else nullcontext(): + loss, output_dict = policy.forward(batch) + # TODO(rcadene): policy.unnormalize_outputs(out_dict) + accelerator.backward(loss) + if accelerator.sync_gradients: + grad_norm = torch.nn.utils.clip_grad_norm_( + policy.parameters(), + grad_clip_norm, + error_if_nonfinite=False, + ) + optimizer.step() + optimizer.zero_grad() + else: + # Standard training loop without accelerate + with torch.autocast(device_type=device.type) if use_amp else nullcontext(): + loss, output_dict = policy.forward(batch) # TODO(rcadene): policy.unnormalize_outputs(out_dict) - grad_scaler.scale(loss).backward() - - # Unscale the gradient of the optimizer's assigned params in-place **prior to gradient clipping**. - grad_scaler.unscale_(optimizer) - - grad_norm = torch.nn.utils.clip_grad_norm_( - policy.parameters(), - grad_clip_norm, - error_if_nonfinite=False, - ) - - # Optimizer's gradients are already unscaled, so scaler.step does not unscale them, - # although it still skips optimizer.step() if the gradients contain infs or NaNs. - with lock if lock is not None else nullcontext(): + + grad_scaler.scale(loss).backward() + grad_scaler.unscale_(optimizer) + grad_norm = torch.nn.utils.clip_grad_norm_( + policy.parameters(), + grad_clip_norm, + error_if_nonfinite=False, + ) grad_scaler.step(optimizer) - # Updates the scale for next iteration. - grad_scaler.update() - - optimizer.zero_grad() + grad_scaler.update() + optimizer.zero_grad() # Step through pytorch scheduler at every batch instead of epoch if lr_scheduler is not None: lr_scheduler.step() if has_method(policy, "update"): - # To possibly update an internal buffer (for instance an Exponential Moving Average like in TDMPC). - policy.update() - + if accelerator: + accelerator.unwrap_model(policy, keep_fp32_wrapper=True).update() + else: + policy.update() + train_metrics.loss = loss.item() train_metrics.grad_norm = grad_norm.item() train_metrics.lr = optimizer.param_groups[0]["lr"] @@ -110,8 +129,34 @@ def update_policy( @parser.wrap() def train(cfg: TrainPipelineConfig): cfg.validate() + + accelerator = None # Initialize accelerator variable + + if is_launched_with_accelerate(): + import accelerate + + # For example pi0 has unused params (last llm block) + from accelerate import DistributedDataParallelKwargs + ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=True) + # accelerator = accelerate.Accelerator(step_scheduler_with_optimizer=False, kwargs_handlers=[ddp_kwargs]) + from accelerate import InitProcessGroupKwargs + # Set NCCL timeout (default 30 minutes = 1800 seconds) + nccl_timeout = getattr(cfg, 'nccl_timeout', 1800) + ddp_init_kwargs = InitProcessGroupKwargs(timeout=timedelta(seconds=nccl_timeout)) # FIXME(mshukor): allow user to set timeout. This should be longer than the evaluation time + # Set gradient accumulation steps (default 1) + gradient_accumulation_steps = getattr(cfg, 'gradient_accumulation_steps', 1) + accelerator = accelerate.Accelerator(step_scheduler_with_optimizer=False, gradient_accumulation_steps=gradient_accumulation_steps, kwargs_handlers=[ddp_init_kwargs, ddp_kwargs]) + if accelerator is not None and not accelerator.is_main_process: + # Disable duplicate logging on non-main processes + logging.info(f"Setting logging level on non-main process {accelerator.process_index} to WARNING.") + logging.getLogger().setLevel(logging.WARNING) + logging.info(pformat(cfg.to_dict())) + if accelerator and not accelerator.is_main_process: + # Disable logging on non-main processes. + cfg.wandb.enable = False + if cfg.wandb.enable and cfg.wandb.project: wandb_logger = WandBLogger(cfg) else: @@ -193,7 +238,11 @@ def train(cfg: TrainPipelineConfig): sampler=sampler, pin_memory=device.type != "cpu", drop_last=False, - ) + ) # Most important line + if accelerator: + policy, optimizer, dataloader, lr_scheduler = accelerator.prepare( + policy, optimizer, dataloader, lr_scheduler + ) dl_iter = cycle(dataloader) policy.train() @@ -229,6 +278,7 @@ def train(cfg: TrainPipelineConfig): grad_scaler=grad_scaler, lr_scheduler=lr_scheduler, use_amp=cfg.policy.use_amp, + accelerator=accelerator, ) # Note: eval and checkpoint happens *after* the `step`th training update has completed, so we @@ -251,7 +301,9 @@ def train(cfg: TrainPipelineConfig): if cfg.save_checkpoint and is_saving_step: logging.info(f"Checkpoint policy after step {step}") checkpoint_dir = get_step_checkpoint_dir(cfg.output_dir, cfg.steps, step) - save_checkpoint(checkpoint_dir, step, cfg, policy, optimizer, lr_scheduler) + # Unwrap policy from accelerate if needed + unwrapped_policy = accelerator.unwrap_model(policy) if accelerator else policy + save_checkpoint(checkpoint_dir, step, cfg, unwrapped_policy, optimizer, lr_scheduler) update_last_checkpoint(checkpoint_dir) if wandb_logger: wandb_logger.log_policy(checkpoint_dir) @@ -263,9 +315,11 @@ def train(cfg: TrainPipelineConfig): torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(), ): + # Unwrap policy from accelerate if needed for evaluation + unwrapped_policy = accelerator.unwrap_model(policy) if accelerator else policy eval_info = eval_policy( eval_env, - policy, + unwrapped_policy, cfg.eval.n_episodes, videos_dir=cfg.output_dir / "eval" / f"videos_step_{step_id}", max_episodes_rendered=4, @@ -294,7 +348,9 @@ def train(cfg: TrainPipelineConfig): logging.info("End of training") if cfg.policy.push_to_hub: - policy.push_model_to_hub(cfg) + # Unwrap policy from accelerate if needed + unwrapped_policy = accelerator.unwrap_model(policy) if accelerator else policy + unwrapped_policy.push_model_to_hub(cfg) if __name__ == "__main__": diff --git a/src/lerobot/utils/train_utils.py b/src/lerobot/utils/train_utils.py index 2859fe057..3e9693318 100644 --- a/src/lerobot/utils/train_utils.py +++ b/src/lerobot/utils/train_utils.py @@ -60,11 +60,39 @@ def load_training_step(save_dir: Path) -> int: def update_last_checkpoint(checkpoint_dir: Path) -> Path: + import fcntl + import tempfile + import os + last_checkpoint_dir = checkpoint_dir.parent / LAST_CHECKPOINT_LINK - if last_checkpoint_dir.is_symlink(): - last_checkpoint_dir.unlink() relative_target = checkpoint_dir.relative_to(checkpoint_dir.parent) - last_checkpoint_dir.symlink_to(relative_target) + + # Use file locking to prevent race conditions in multi-GPU training + lock_file = checkpoint_dir.parent / ".symlink_lock" + + try: + with open(lock_file, 'w') as f: + # Get exclusive lock + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + + # Update symlink atomically + if last_checkpoint_dir.exists() or last_checkpoint_dir.is_symlink(): + last_checkpoint_dir.unlink() + last_checkpoint_dir.symlink_to(relative_target) + + except (OSError, FileExistsError) as e: + # Handle race conditions gracefully - another process may have already updated + if not last_checkpoint_dir.exists(): + try: + last_checkpoint_dir.symlink_to(relative_target) + except FileExistsError: + pass # Another process created it, that's fine + finally: + # Clean up lock file + try: + lock_file.unlink() + except FileNotFoundError: + pass def save_checkpoint( diff --git a/src/lerobot/utils/wandb_utils.py b/src/lerobot/utils/wandb_utils.py index 91b4ec95c..1fe405ece 100644 --- a/src/lerobot/utils/wandb_utils.py +++ b/src/lerobot/utils/wandb_utils.py @@ -33,7 +33,16 @@ def cfg_to_group(cfg: TrainPipelineConfig, return_list: bool = False) -> list[st f"seed:{cfg.seed}", ] if cfg.dataset is not None: - lst.append(f"dataset:{cfg.dataset.repo_id}") + # Create shorter dataset tag to avoid wandb 64-char limit + repo_id = cfg.dataset.repo_id + if "," in repo_id: + # Multiple datasets - use count + dataset_count = len(repo_id.split(",")) + lst.append(f"datasets:{dataset_count}") + else: + # Single dataset - use last part of path + dataset_name = repo_id.split("/")[-1][:20] # Truncate to 20 chars + lst.append(f"dataset:{dataset_name}") if cfg.env is not None: lst.append(f"env:{cfg.env.type}") return lst if return_list else "-".join(lst) diff --git a/train_smolvla_optimized_fresh.slurm b/train_smolvla_optimized_fresh.slurm new file mode 100644 index 000000000..5d311f95b --- /dev/null +++ b/train_smolvla_optimized_fresh.slurm @@ -0,0 +1,228 @@ +#!/bin/bash +#SBATCH --job-name=smolvla_optimized_8gpu_fresh +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=88 +#SBATCH --gres=gpu:8 +#SBATCH --mem=0 +#SBATCH --time=72:00:00 +#SBATCH --partition=hopper-prod +#SBATCH --output=/fsx/dana_aubakirova/vla/logs/smolvla_optimized_8gpu_fresh_%j.out +#SBATCH --error=/fsx/dana_aubakirova/vla/logs/smolvla_optimized_8gpu_fresh_%j.err +#SBATCH --exclusive + +# Create logs directory if it doesn't exist +mkdir -p /fsx/dana_aubakirova/vla/logs + +# Activate conda environment +source /fsx/dana_aubakirova/miniconda/etc/profile.d/conda.sh +conda activate lerobot + +# Add local lerobot source to Python path to use development version +export PYTHONPATH="/fsx/dana_aubakirova/vla/lerobot/src:$PYTHONPATH" + +# OPTIMIZED 8-GPU CUDA environment - high performance configuration +export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 +export PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:512,expandable_segments:True,garbage_collection_threshold:0.8 +export TORCH_DISTRIBUTED_DEBUG=OFF +export NCCL_DEBUG=WARN +export CUDA_LAUNCH_BLOCKING=0 +export ACCELERATE_USE_FSDP=false +export ACCELERATE_USE_DEEPSPEED=false +export HF_ACCELERATE_DEVICE_MAP=false +export TRANSFORMERS_NO_ADVISORY_WARNINGS=1 +# 8-GPU optimizations +export NCCL_IB_DISABLE=1 +export NCCL_P2P_DISABLE=1 + +# Change to working directory +cd /fsx/dana_aubakirova/vla + +# FRESH START 8-GPU training configuration - NEW OUTPUT DIRECTORY +export OUTPUT_DIR="/fsx/dana_aubakirova/vla/outputs/train_smolvla_optimized_8gpu_fresh_$(date +%Y%m%d_%H%M%S)" +# Use ALL datasets from relative_datasets_list.txt - full scale training +export REPO_IDS=$(cat dataset_lists/all_datasets_relative.txt) + +# Model configuration - optimized for 8-GPU with global batch size 32 +export VLM_REPO_ID=HuggingFaceTB/SmolVLM2-500M-Video-Instruct +export STEPS=200000 # Full training steps +export BATCH_SIZE=8 # 4 per GPU = 32 global batch size (prevent hanging) +export EVAL_FREQ=-1 # Disable evaluation for faster training +export NUM_WORKERS=0 # MEMORY FIX: Disable workers to prevent memory exhaustion +export SAVE_FREQ=10000 # Save every 10k steps + +# Model config - optimized settings inspired by SmolPi0 +export POLICY=smolvla2 +export USE_AMP=false # DISABLE AMP for stability +export OPTIMIZER_LR=5e-4 # Optimized learning rate +export PEFT_METHOD=lora +export LOAD_VLM_WEIGHTS=true +export MAX_ACTION_DIM=32 +export MAX_STATE_DIM=32 + +# Dataset config - optimized from analysis +export USE_IMAGENET_STATS=false +export ENABLE_IMG_TRANSFORM=true +export MAX_NUM_IMAGES=2 # OPTIMIZED: 2 images for better context +export MAX_IMAGE_DIM=256 # OPTIMIZED: 256px resolution +export TRAIN_ON_ALL_FEATURES=true +export FEATURES_VERSION=2 + +# Advanced optimizations for 8-GPU setup +export FPS_MIN=30 +export FPS_MAX=30 +export GRADIENT_ACCUMULATION_STEPS=1 # Global batch size = 4 × 8 × 2 = 64 +export PRECISION=no +export DROP_LAST=true + +# SmolPi0-inspired VLM parameters +export VLM_LAYERS=16 +export EXPERT_WIDTH_MULTIPLIER=0.75 +export CAUSAL_ACTION_ATTENTION=true +export SELF_ATTN_EVERY_N_LAYERS=2 +export ATTENTION_MODE=cross_attn +export LORA_R=32 +export LORA_TARGET_MODULES=q_proj,v_proj +export PREFIX_LENGTH=0 + +# Learning rate schedule inspired by SmolPi0 +export DECAY_LR=1e-6 +export DECAY_STEPS=50000 +export LR_VLM=1e-4 +export WARMUP_STEPS=1000 + +# Set environment variables for model cache and offline mode +export HF_LEROBOT_HOME="/fsx/dana_aubakirova/vla" +export HF_HOME="/fsx/dana_aubakirova/vla/.cache/huggingface" +export HF_HUB_CACHE="/fsx/dana_aubakirova/vla/.cache/huggingface" +export TRANSFORMERS_CACHE="/fsx/dana_aubakirova/vla/.cache/huggingface" +export HF_HUB_OFFLINE=0 +export TRANSFORMERS_OFFLINE=0 + +# Optimized accelerate config +export ACCELERATE_CONFIG_FILE="/fsx/dana_aubakirova/vla/accelerate_configs/optimized_fresh_config.yaml" + +# Wandb configuration - FRESH START +export WANDB_PROJECT="smolvla2-training" +export WANDB_NOTES="8-GPU optimized training FRESH START - same parameters as previous run but from scratch" +export WANDB_MODE="online" + +# Print comprehensive optimization info +echo "🚀 ==============================================" +echo "🚀 OPTIMIZED 8-GPU FRESH START TRAINING" +echo "🚀 ==============================================" +echo "🆕 FRESH START - No resume, new output directory" +echo "📊 Datasets: ALL available datasets (same as previous run)" +echo "📁 Output directory: $OUTPUT_DIR" +echo "🎯 Policy: $POLICY" +echo "🔧 Batch size per GPU: $BATCH_SIZE (GLOBAL BATCH SIZE: $((BATCH_SIZE * 8)))" +echo "🔄 Gradient accumulation steps: $GRADIENT_ACCUMULATION_STEPS" +echo "📈 Training steps: $STEPS" +echo "💾 Save frequency: $SAVE_FREQ" +echo "🔬 Evaluation frequency: $EVAL_FREQ" +echo "⚡ AMP enabled: $USE_AMP (no mixed precision - stable)" +echo "📚 Learning rate: $OPTIMIZER_LR" +echo "🎓 VLM Learning rate: $LR_VLM" +echo "🔥 Warmup steps: $WARMUP_STEPS" +echo "📷 Max images: $MAX_NUM_IMAGES" +echo "🖼️ Image dimension: $MAX_IMAGE_DIM" +echo "👥 Data workers per GPU: $NUM_WORKERS (memory optimized)" +echo "🧠 VLM layers: $VLM_LAYERS" +echo "🔄 Expert width multiplier: $EXPERT_WIDTH_MULTIPLIER" +echo "🎯 LORA rank: $LORA_R" +echo "🖥️ GPUs: 8 (HIGH PERFORMANCE)" +echo "📊 Wandb project: $WANDB_PROJECT" +echo "🚀 ==============================================" + +# Check GPU availability +echo "🖥️ GPU Information:" +nvidia-smi --list-gpus + +# Create optimized 8-GPU accelerate config +mkdir -p /fsx/dana_aubakirova/vla/accelerate_configs +cat > /fsx/dana_aubakirova/vla/accelerate_configs/optimized_fresh_config.yaml << EOF +compute_environment: LOCAL_MACHINE +debug: false +distributed_type: MULTI_GPU +downcast_bf16: 'no' +enable_cpu_affinity: false +gpu_ids: '0,1,2,3,4,5,6,7' +machine_rank: 0 +main_training_function: main +mixed_precision: 'no' +num_machines: 1 +num_processes: 8 +rdzv_backend: static +same_network: true +tpu_env: [] +tpu_use_cluster: false +tpu_use_sudo: false +use_cpu: false +EOF + +echo "📋 Created optimized accelerate config with no mixed precision for stability" + +# Run distributed training with optimized accelerate configuration - FRESH START +accelerate launch --config_file /fsx/dana_aubakirova/vla/accelerate_configs/optimized_fresh_config.yaml \ + lerobot/src/lerobot/scripts/train.py \ + --policy.type=$POLICY \ + --dataset.repo_id="$REPO_IDS" \ + --dataset.root="/fsx/dana_aubakirova/vla" \ + --dataset.use_imagenet_stats=$USE_IMAGENET_STATS \ + --dataset.image_transforms.enable=$ENABLE_IMG_TRANSFORM \ + --dataset.train_on_all_features=$TRAIN_ON_ALL_FEATURES \ + --dataset.features_version=$FEATURES_VERSION \ + --policy.max_action_dim=$MAX_ACTION_DIM \ + --policy.max_state_dim=$MAX_STATE_DIM \ + --output_dir=$OUTPUT_DIR \ + --batch_size=$BATCH_SIZE \ + --steps=$STEPS \ + --eval_freq=$EVAL_FREQ \ + --save_freq=$SAVE_FREQ \ + --policy.use_amp=$USE_AMP \ + --policy.optimizer_lr=$OPTIMIZER_LR \ + --policy.optimizer_lr_vlm=$LR_VLM \ + --policy.scheduler_decay_lr=$DECAY_LR \ + --policy.scheduler_decay_steps=$DECAY_STEPS \ + --policy.scheduler_warmup_steps=$WARMUP_STEPS \ + --policy.peft_method=$PEFT_METHOD \ + --policy.peft_config.r=$LORA_R \ + --policy.peft_config.target_modules=$LORA_TARGET_MODULES \ + --policy.load_vlm_weights=$LOAD_VLM_WEIGHTS \ + --policy.repo_id=$VLM_REPO_ID \ + --policy.push_to_hub=false \ + --dataset.max_num_images=$MAX_NUM_IMAGES \ + --dataset.max_image_dim=$MAX_IMAGE_DIM \ + --dataset.video_backend=pyav \ + --num_workers=$NUM_WORKERS \ + --wandb.enable=true \ + --wandb.project=$WANDB_PROJECT \ + --wandb.notes="$WANDB_NOTES" \ + --dataset.min_fps=$FPS_MIN \ + --dataset.max_fps=$FPS_MAX \ + --policy.num_vlm_layers=$VLM_LAYERS \ + --policy.expert_width_multiplier=$EXPERT_WIDTH_MULTIPLIER \ + --policy.causal_action_attention_mask=$CAUSAL_ACTION_ATTENTION \ + --policy.self_attn_every_n_layers=$SELF_ATTN_EVERY_N_LAYERS \ + --policy.attention_mode=$ATTENTION_MODE \ + --policy.prefix_length=$PREFIX_LENGTH + +echo "✅ Optimized 8-GPU FRESH START training completed! Check results in: $OUTPUT_DIR" +echo "📊 View training progress at: https://wandb.ai" +echo "🆕 FRESH START TRAINING SUMMARY:" +echo " • Started from scratch with new output directory" +echo " • Training from step 0 to step $STEPS" +echo " • Same optimized parameters as previous successful run" +echo " • New WandB run will be created automatically" +echo "" +echo "🚀 Key 8-GPU optimizations applied:" +echo " • 8 GPUs with global batch size $((BATCH_SIZE * 8))" +echo " • Memory-optimized data loading: 0 workers (prevents OOM)" +echo " • STABLE: No mixed precision (matches conservative setup)" +echo " • Optimized NCCL settings for 8-GPU communication" +echo " • Enhanced memory allocation for high-throughput" +echo "" +echo "🏃‍♂️ Expected performance gains:" +echo " • ~4x faster training throughput vs single GPU" +echo " • Clean start without any checkpoint compatibility issues" +echo " • Proven parameter configuration from previous run"