Compare commits

..

1 Commits

Author SHA1 Message Date
Khalil Meftah 84abfe5c60 fix(policies): support offline batch inference for ACT and Diffusion
- Guard ACT's KL divergence computation against None latent params to
prevent crashes during eval when use_vae is set but the forward path
returns no VAE outputs.
- Add offline batch fallback to Diffusion's predict_action_chunk() so
it works with dataloader batches (empty queues) in addition to the
existing online rollout path (populated queues). This enables batched
action prediction for offline evaluation.
2026-06-15 11:35:06 +02:00
19 changed files with 44 additions and 171 deletions
+3 -3
View File
@@ -167,9 +167,9 @@ jobs:
# ── LIBERO TRAIN+EVAL SMOKE ──────────────────────────────────────────────
# Train SmolVLA for 1 step (batch_size=1, dataset episode 0 only) then
# immediately runs eval inside the training loop (env_eval_freq=1, 1 episode).
# immediately runs eval inside the training loop (eval_freq=1, 1 episode).
# Tests the full train→eval-within-training pipeline end-to-end.
- name: Run Libero train+eval smoke (1 step, env_eval_freq=1)
- name: Run Libero train+eval smoke (1 step, eval_freq=1)
if: env.HF_USER_TOKEN != ''
run: |
docker run --name libero-train-smoke --gpus all \
@@ -196,7 +196,7 @@ jobs:
--output_dir=/tmp/train-smoke \
--steps=1 \
--batch_size=1 \
--env_eval_freq=1 \
--eval_freq=1 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--eval.use_async_envs=false \
+4 -4
View File
@@ -58,7 +58,7 @@ test-act-ete-train:
--dataset.episodes="[0]" \
--batch_size=2 \
--steps=4 \
--env_eval_freq=2 \
--eval_freq=2 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--save_freq=2 \
@@ -96,7 +96,7 @@ test-diffusion-ete-train:
--dataset.episodes="[0]" \
--batch_size=2 \
--steps=2 \
--env_eval_freq=2 \
--eval_freq=2 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--save_checkpoint=true \
@@ -126,7 +126,7 @@ test-tdmpc-ete-train:
--dataset.episodes="[0]" \
--batch_size=2 \
--steps=2 \
--env_eval_freq=2 \
--eval_freq=2 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--save_checkpoint=true \
@@ -161,7 +161,7 @@ test-smolvla-ete-train:
--dataset.episodes="[0]" \
--batch_size=2 \
--steps=4 \
--env_eval_freq=2 \
--eval_freq=2 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--save_freq=2 \
+1 -1
View File
@@ -719,7 +719,7 @@ Example configuration for training the [reward classifier](https://huggingface.c
"num_workers": 4,
"steps": 5000,
"log_freq": 10,
"env_eval_freq": 1000,
"eval_freq": 1000,
"save_freq": 1000,
"save_checkpoint": true,
"seed": 2,
+1 -1
View File
@@ -143,7 +143,7 @@ lerobot-train \
--batch_size=4 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--env_eval_freq=1000
--eval_freq=1000
```
## Reproducing published results
+1 -1
View File
@@ -173,7 +173,7 @@ lerobot-train \
--batch_size=4 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--env_eval_freq=1000
--eval_freq=1000
```
## Relationship to LIBERO
+2 -2
View File
@@ -120,11 +120,11 @@ lerobot-train \
--batch_size=4 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--env_eval_freq=1000
--eval_freq=1000
```
## Practical tips
- Use the one-hot task conditioning for multi-task training (MT10/MT50 conventions) so policies have explicit task context.
- Inspect the dataset task descriptions and the `info["is_success"]` keys when writing post-processing or logging so your success metrics line up with the benchmark.
- Adjust `batch_size`, `steps`, and `env_eval_freq` to match your compute budget.
- Adjust `batch_size`, `steps`, and `eval_freq` to match your compute budget.
+2 -2
View File
@@ -103,7 +103,7 @@ accelerate launch \
--batch_size=32 \
--num_workers=4 \
--log_freq=20 \
--env_eval_freq=-1 \
--eval_freq=-1 \
--save_checkpoint=true \
--save_freq=2000
```
@@ -142,7 +142,7 @@ accelerate launch \
--batch_size=32 \
--num_workers=4 \
--log_freq=20 \
--env_eval_freq=-1 \
--eval_freq=-1 \
--save_checkpoint=true \
--save_freq=2000
```
+1 -1
View File
@@ -314,7 +314,7 @@ lerobot-train \
--steps=30000 \
--save_freq=1000 \
--log_freq=100 \
--env_eval_freq=1000 \
--eval_freq=1000 \
--policy.type=multi_task_dit \
--policy.device=cuda \
--policy.horizon=32 \
+1 -1
View File
@@ -166,7 +166,7 @@ lerobot-train \
--output_dir=./outputs/smolvla_robocasa_CloseFridge \
--steps=100000 \
--batch_size=4 \
--env_eval_freq=5000 \
--eval_freq=5000 \
--eval.batch_size=1 \
--eval.n_episodes=5 \
--save_freq=10000
+1 -1
View File
@@ -165,7 +165,7 @@ lerobot-train \
--output_dir=./outputs/smolvla_vlabench_primitive \
--steps=100000 \
--batch_size=4 \
--env_eval_freq=5000 \
--eval_freq=5000 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--save_freq=10000
-2
View File
@@ -39,8 +39,6 @@ class DatasetConfig:
# This reduces memory and speeds up DataLoader IPC. The training pipeline handles the conversion.
return_uint8: bool = False
streaming: bool = False
# Fraction of episodes held out per task for offline evaluation (0.0 = disabled).
eval_split: float = 0.0
def __post_init__(self) -> None:
if self.episodes is not None:
+1 -6
View File
@@ -100,13 +100,8 @@ class TrainPipelineConfig(HubMixin):
prefetch_factor: int = 4
persistent_workers: bool = True
steps: int = 100_000
# Run policy in the simulation environment every N steps to measure reward/success (0 = disabled).
env_eval_freq: int = 20_000
eval_freq: int = 20_000
log_freq: int = 200
# Compute eval loss on held-out episodes every N steps (0 = disabled). Requires eval_split > 0.
eval_steps: int = 0
# Cap on total eval samples, split uniformly across tasks (0 = use all held-out data).
max_eval_samples: int = 0
tolerance_s: float = 1e-4
save_checkpoint: bool = True
# Checkpoint is saved every `save_freq` training iterations and after the last training step.
+1 -2
View File
@@ -35,7 +35,7 @@ from .dataset_tools import (
remove_feature,
split_dataset,
)
from .factory import make_dataset, make_train_eval_datasets, resolve_delta_timestamps
from .factory import make_dataset, resolve_delta_timestamps
from .image_writer import safe_stop_image_writer
from .io_utils import load_episodes, write_stats
from .language import (
@@ -89,7 +89,6 @@ __all__ = [
"get_feature_stats",
"load_episodes",
"make_dataset",
"make_train_eval_datasets",
"merge_datasets",
"modify_features",
"modify_tasks",
-79
View File
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import math
from pprint import pformat
import torch
@@ -131,81 +130,3 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas
dataset.meta.stats[key][stats_type] = torch.tensor(stats, dtype=torch.float32)
return dataset
def make_train_eval_datasets(
cfg: TrainPipelineConfig,
) -> tuple[LeRobotDataset | MultiLeRobotDataset, LeRobotDataset | None]:
"""Create train and optional eval datasets by splitting episodes based on eval_split.
The last ceil(n_episodes * eval_split) episodes per task are held out for evaluation.
If eval_split == 0.0, returns (full_dataset, None).
"""
full_dataset = make_dataset(cfg)
if cfg.dataset.eval_split == 0.0:
return full_dataset, None
base_episodes = (
full_dataset.episodes if full_dataset.episodes is not None else list(range(full_dataset.num_episodes))
)
episode_tasks = full_dataset.meta.episodes["tasks"]
task_to_episodes: dict[str, list[int]] = {}
for ep_idx in base_episodes:
task_key = episode_tasks[ep_idx][0] if episode_tasks[ep_idx] else ""
task_to_episodes.setdefault(task_key, []).append(ep_idx)
train_episodes, eval_episodes = [], []
for eps in task_to_episodes.values():
n_eval = math.ceil(len(eps) * cfg.dataset.eval_split)
train_episodes.extend(eps[: len(eps) - n_eval])
eval_episodes.extend(eps[len(eps) - n_eval :])
if not train_episodes:
raise ValueError(
f"eval_split={cfg.dataset.eval_split} leaves 0 training episodes from {len(base_episodes)} total."
)
logging.info(
f"Train/eval split: {len(train_episodes)} train, {len(eval_episodes)} eval "
f"(eval_split={cfg.dataset.eval_split}, {len(task_to_episodes)} tasks)"
)
delta_timestamps = resolve_delta_timestamps(cfg.trainable_config, full_dataset.meta)
train_image_transforms = (
ImageTransforms(cfg.dataset.image_transforms) if cfg.dataset.image_transforms.enable else None
)
train_dataset = LeRobotDataset(
cfg.dataset.repo_id,
root=cfg.dataset.root,
episodes=train_episodes,
delta_timestamps=delta_timestamps,
image_transforms=train_image_transforms,
revision=cfg.dataset.revision,
video_backend=cfg.dataset.video_backend,
return_uint8=True,
tolerance_s=cfg.tolerance_s,
)
eval_dataset = LeRobotDataset(
cfg.dataset.repo_id,
root=cfg.dataset.root,
episodes=eval_episodes,
delta_timestamps=delta_timestamps,
image_transforms=None,
revision=cfg.dataset.revision,
video_backend=cfg.dataset.video_backend,
return_uint8=True,
tolerance_s=cfg.tolerance_s,
)
if cfg.dataset.use_imagenet_stats:
for ds in (train_dataset, eval_dataset):
for key in ds.meta.camera_keys:
for stats_type, stats in IMAGENET_STATS.items():
ds.meta.stats[key][stats_type] = torch.tensor(stats, dtype=torch.float32)
return train_dataset, eval_dataset
-2
View File
@@ -474,8 +474,6 @@ class LeRobotDataset(torch.utils.data.Dataset):
if reader.hf_dataset is None:
# One-shot load after finalize()
reader.load_and_activate()
if reader._absolute_to_relative_idx is not None and idx in reader._absolute_to_relative_idx:
idx = reader._absolute_to_relative_idx[idx]
return reader.get_item(idx)
def select_columns(self, column_names: str | list[str]):
+1 -1
View File
@@ -148,7 +148,7 @@ class ACTPolicy(PreTrainedPolicy):
l1_loss = (abs_err * valid_mask).sum() / num_valid.clamp_min(1)
loss_dict = {"l1_loss": l1_loss.item()}
if self.config.use_vae:
if self.config.use_vae and log_sigma_x2_hat is not None:
# Calculate Dₖₗ(latent_pdf || standard_normal). Note: After computing the KL-divergence for
# each dimension independently, we sum over the latent dimension to get the total
# KL-divergence per batch element, then take the mean over the batch.
@@ -101,11 +101,23 @@ class DiffusionPolicy(PreTrainedPolicy):
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor:
"""Predict a chunk of actions given environment observations."""
# stack n latest observations from the queue
batch = {k: torch.stack(list(self._queues[k]), dim=1) for k in batch if k in self._queues}
actions = self.diffusion.generate_actions(batch, noise=noise)
"""Predict a chunk of actions given environment observations.
Supports two modes:
- Online (queues populated via select_action): stacks observations from internal queues.
- Offline (empty queues, e.g. dataloader batch): uses the batch directly.
"""
queues_populated = any(len(q) > 0 for q in self._queues.values())
if queues_populated:
batch = {k: torch.stack(list(self._queues[k]), dim=1) for k in batch if k in self._queues}
else:
batch = dict(batch)
if self.config.image_features:
for key in self.config.image_features:
if batch[key].ndim == 4:
batch[key] = batch[key].unsqueeze(1)
batch[OBS_IMAGES] = torch.stack([batch[key] for key in self.config.image_features], dim=-4)
actions = self.diffusion.generate_actions(batch, noise=noise)
return actions
@torch.no_grad()
+6 -56
View File
@@ -45,8 +45,7 @@ from lerobot.common.train_utils import (
from lerobot.common.wandb_utils import WandBLogger
from lerobot.configs import parser
from lerobot.configs.train import TrainPipelineConfig
from lerobot.datasets import EpisodeAwareSampler, compute_sampler_state
from lerobot.datasets.factory import make_train_eval_datasets
from lerobot.datasets import EpisodeAwareSampler, compute_sampler_state, make_dataset
from lerobot.envs import close_envs, make_env, make_env_pre_post_processors
from lerobot.optim.factory import make_optimizer_and_scheduler
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
@@ -245,19 +244,19 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
# LeRobotDataset skips its snapshot_download when try_load() succeeds, so no rank re-downloads.
if is_main_process:
logging.info("Creating dataset")
dataset, eval_dataset = make_train_eval_datasets(cfg)
dataset = make_dataset(cfg)
accelerator.wait_for_everyone()
# Other ranks read from the shared copy populated by the main process.
if not is_main_process:
dataset, eval_dataset = make_train_eval_datasets(cfg)
dataset = make_dataset(cfg)
# Create environment used for evaluating checkpoints during training on simulation data.
# On real-world data, no need to create an environment as evaluations are done outside train.py,
# using the eval.py instead, with gym_dora environment and dora-rs.
eval_env = None
if cfg.env_eval_freq > 0 and cfg.env is not None and is_main_process:
if cfg.eval_freq > 0 and cfg.env is not None and is_main_process:
logging.info("Creating env")
eval_env = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs)
@@ -456,33 +455,6 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
persistent_workers=cfg.persistent_workers and cfg.num_workers > 0,
)
# Build eval dataloader if a held-out split exists
eval_dataloader = None
if eval_dataset is not None:
eval_ds = eval_dataset
if cfg.max_eval_samples > 0 and hasattr(eval_dataset, "hf_dataset"):
task_arr = eval_dataset.hf_dataset.data.column("task_index").to_numpy()
unique_tasks = sorted(set(task_arr.tolist()))
per_task = max(1, cfg.max_eval_samples // len(unique_tasks))
selected: list[int] = []
for t in unique_tasks:
frames = (task_arr == t).nonzero()[0][:per_task]
selected.extend(frames.tolist())
eval_ds = torch.utils.data.Subset(eval_dataset, selected)
eval_collate_fn = lerobot_collate_fn if dataset.meta.has_language_columns else None
eval_dataloader = torch.utils.data.DataLoader(
eval_ds,
batch_size=cfg.batch_size,
shuffle=False,
num_workers=cfg.num_workers,
pin_memory=device.type == "cuda",
drop_last=False,
collate_fn=eval_collate_fn,
prefetch_factor=cfg.prefetch_factor if cfg.num_workers > 0 else None,
persistent_workers=cfg.persistent_workers and cfg.num_workers > 0,
)
# Prepare everything with accelerator
accelerator.wait_for_everyone()
policy, optimizer, dataloader, lr_scheduler = accelerator.prepare(
@@ -562,8 +534,7 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
train_tracker.step()
is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0
is_saving_step = step % cfg.save_freq == 0 or step == cfg.steps
is_env_eval_step = cfg.env_eval_freq > 0 and step % cfg.env_eval_freq == 0
is_eval_step = cfg.eval_steps > 0 and eval_dataloader is not None and step % cfg.eval_steps == 0
is_eval_step = cfg.eval_freq > 0 and step % cfg.eval_freq == 0
if is_log_step:
# Collective reduce must run on every rank, before the main-process gate below.
@@ -586,27 +557,6 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
wandb_logger.log_dict(wandb_log_dict, step)
train_tracker.reset_averages()
if is_eval_step:
policy.eval()
eval_loss_sum = 0.0
n_eval_batches = 0
with torch.no_grad(), accelerator.autocast():
for eval_batch in eval_dataloader:
for cam_key in dataset.meta.camera_keys:
if cam_key in eval_batch and eval_batch[cam_key].dtype == torch.uint8:
eval_batch[cam_key] = eval_batch[cam_key].to(dtype=torch.float32) / 255.0
eval_batch = preprocessor(eval_batch)
loss, _ = policy.forward(eval_batch)
eval_loss_sum += loss.item()
n_eval_batches += 1
eval_loss = eval_loss_sum / max(n_eval_batches, 1)
policy.train()
if is_main_process:
logging.info(f"step {step}: eval_loss={eval_loss:.4f}")
if wandb_logger:
wandb_logger.log_dict({"eval_loss": eval_loss}, step=step, mode="eval")
if cfg.save_checkpoint and is_saving_step:
if is_main_process:
logging.info(f"Checkpoint policy after step {step}")
@@ -629,7 +579,7 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
accelerator.wait_for_everyone()
if cfg.env and is_env_eval_step:
if cfg.env and is_eval_step:
if is_main_process:
step_id = get_step_identifier(step, cfg.steps)
logging.info(f"Eval policy at step {step}")
+2 -2
View File
@@ -134,7 +134,7 @@ class TestMultiGPUTraining:
f"--output_dir={output_dir}",
"--batch_size=4",
"--steps=10",
"--env_eval_freq=-1",
"--eval_freq=-1",
"--log_freq=5",
"--save_freq=10",
"--seed=42",
@@ -177,7 +177,7 @@ class TestMultiGPUTraining:
f"--output_dir={output_dir}",
"--batch_size=4",
"--steps=20",
"--env_eval_freq=-1",
"--eval_freq=-1",
"--log_freq=5",
"--save_freq=10",
"--seed=42",