Compare commits

..

19 Commits

Author SHA1 Message Date
Khalil Meftah e069557228 fix(eval): use FeatureType enum comparison instead of string value 2026-06-15 18:50:24 +02:00
Khalil Meftah 58cf6c8710 fix(eval): infer recording features from actual env observations 2026-06-15 18:47:16 +02:00
Khalil Meftah 36470d059e fix(eval): align raw frame keys with dataset schema and fix numpy types 2026-06-15 18:38:12 +02:00
Khalil Meftah 040a1df9d6 fix(datasets): remap absolute indices in __getitem__ for filtered datasets 2026-06-15 18:26:44 +02:00
Khalil Meftah 87ae050b28 Merge branch 'feat/eval-dataset-recording' into test/gs-gym-integration 2026-06-15 17:03:36 +02:00
Khalil Meftah 3bec437d83 Merge branch 'feat/env-plugin-discovery' into test/gs-gym-integration 2026-06-15 17:03:22 +02:00
Khalil Meftah 97f53732bf Merge branch 'fix/logging-stats-robustness' into test/gs-gym-integration 2026-06-15 17:03:03 +02:00
Khalil Meftah b31837ffeb Merge branch 'feat/pretrained-revision' into test/gs-gym-integration 2026-06-15 17:02:51 +02:00
Khalil Meftah fd822287e4 Merge branch 'fix/offline-policy-inference' into test/gs-gym-integration 2026-06-15 17:02:37 +02:00
Khalil Meftah 7e2d7024c4 Merge branch 'feat/offline-validation' into test/gs-gym-integration 2026-06-15 17:02:03 +02:00
Khalil Meftah 240393d238 feat(eval): record eval rollouts as raw LeRobot datasets
- Record raw env observations inline during rollout(), before
preprocess_observation() transforms them. Uses LeRobotDataset.create()
with add_frame()/save_episode().

- Supports vectorized envs: each env in the batch records independently,
with save_episode() called per env on termination. Each task gets its
own dataset under output_dir/recordings/{task_group}_{task_id}/.

Enabled via --eval.recording=true; disabled by default.
2026-06-15 16:12:25 +02:00
Khalil Meftah 6407a244c0 feat(envs): add generic observation passthrough
- Add generic observation passthrough in preprocess_observation() for
unhandled ndarray/tensor keys, replacing the pattern of adding per-env
hardcoded key handlers. Extra keys are forwarded as observation.<key>
and can be shaped by env-specific ProcessorSteps via get_env_processors().
2026-06-15 14:17:59 +02:00
Khalil Meftah 0511c12b8f feat(envs): add env plugin discovery
- Add 'lerobot_env_' to third-party plugin discovery prefixes, completing
the plugin system for all component types (robots, cameras, teleoperators,
policies, and now environments). External packages named lerobot_env_*
can self-register EnvConfig subclasses on import, enabling --env.type=
resolution without lerobot code changes.
2026-06-15 14:13:12 +02:00
Khalil Meftah 0efa3dc874 fix(stats): handle scalar stats robustly
- Wrap cast_stats_to_numpy with np.atleast_1d to prevent 0-d arrays
from scalar stats causing shape mismatches downstream.
2026-06-15 12:28:18 +02:00
Khalil Meftah 949f4fcbe9 fix(logging): batch wandb metrics
- Batch all metrics into a single wandb.log() call instead of one per
key, reducing API overhead.

- Add support for list-valued metrics by expanding them to indexed keys (e.g.
metric_0, metric_1).
2026-06-15 12:25:06 +02:00
Khalil Meftah 0d1d5e0a86 feat(hub): add pretrained_revision to pin Hub model versions
- Add pretrained_revision field to PreTrainedConfig (policies) and
RewardModelConfig (reward models), and thread it through make_policy(),
make_pre_post_processors(), and make_reward_model() so that weights and
processor configs can be loaded from a specific Hub commit, branch, or
tag. Defaults to None (latest version, preserving current behavior).
Dataset and env hub loading already supported revision pinning.
2026-06-15 11:58:57 +02:00
Khalil Meftah 84abfe5c60 fix(policies): support offline batch inference for ACT and Diffusion
- Guard ACT's KL divergence computation against None latent params to
prevent crashes during eval when use_vae is set but the forward path
returns no VAE outputs.
- Add offline batch fallback to Diffusion's predict_action_chunk() so
it works with dataloader batches (empty queues) in addition to the
existing online rollout path (populated queues). This enables batched
action prediction for offline evaluation.
2026-06-15 11:35:06 +02:00
Khalil Meftah 2201401c99 feat(training): add inline offline validation with train/eval split
- Add eval_split config for balanced per-task holdout
- Add eval_steps for periodic inline eval loss computation
- Add max_eval_samples to cap eval cost
2026-06-14 21:29:54 +02:00
Khalil Meftah 64773e7b22 refactor(training): rename eval_freq to env_eval_freq
- Rename eval_freq to env_eval_freq to distinguish sim environment evaluation from offline loss evaluation.
2026-06-14 14:19:25 +02:00
37 changed files with 418 additions and 660 deletions
+3 -3
View File
@@ -167,9 +167,9 @@ jobs:
# ── LIBERO TRAIN+EVAL SMOKE ──────────────────────────────────────────────
# Train SmolVLA for 1 step (batch_size=1, dataset episode 0 only) then
# immediately runs eval inside the training loop (eval_freq=1, 1 episode).
# immediately runs eval inside the training loop (env_eval_freq=1, 1 episode).
# Tests the full train→eval-within-training pipeline end-to-end.
- name: Run Libero train+eval smoke (1 step, eval_freq=1)
- name: Run Libero train+eval smoke (1 step, env_eval_freq=1)
if: env.HF_USER_TOKEN != ''
run: |
docker run --name libero-train-smoke --gpus all \
@@ -196,7 +196,7 @@ jobs:
--output_dir=/tmp/train-smoke \
--steps=1 \
--batch_size=1 \
--eval_freq=1 \
--env_eval_freq=1 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--eval.use_async_envs=false \
+4 -4
View File
@@ -58,7 +58,7 @@ test-act-ete-train:
--dataset.episodes="[0]" \
--batch_size=2 \
--steps=4 \
--eval_freq=2 \
--env_eval_freq=2 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--save_freq=2 \
@@ -96,7 +96,7 @@ test-diffusion-ete-train:
--dataset.episodes="[0]" \
--batch_size=2 \
--steps=2 \
--eval_freq=2 \
--env_eval_freq=2 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--save_checkpoint=true \
@@ -126,7 +126,7 @@ test-tdmpc-ete-train:
--dataset.episodes="[0]" \
--batch_size=2 \
--steps=2 \
--eval_freq=2 \
--env_eval_freq=2 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--save_checkpoint=true \
@@ -161,7 +161,7 @@ test-smolvla-ete-train:
--dataset.episodes="[0]" \
--batch_size=2 \
--steps=4 \
--eval_freq=2 \
--env_eval_freq=2 \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--save_freq=2 \
+1 -1
View File
@@ -719,7 +719,7 @@ Example configuration for training the [reward classifier](https://huggingface.c
"num_workers": 4,
"steps": 5000,
"log_freq": 10,
"eval_freq": 1000,
"env_eval_freq": 1000,
"save_freq": 1000,
"save_checkpoint": true,
"seed": 2,
+1 -1
View File
@@ -143,7 +143,7 @@ lerobot-train \
--batch_size=4 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--eval_freq=1000
--env_eval_freq=1000
```
## Reproducing published results
+1 -1
View File
@@ -173,7 +173,7 @@ lerobot-train \
--batch_size=4 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--eval_freq=1000
--env_eval_freq=1000
```
## Relationship to LIBERO
+2 -2
View File
@@ -120,11 +120,11 @@ lerobot-train \
--batch_size=4 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--eval_freq=1000
--env_eval_freq=1000
```
## Practical tips
- Use the one-hot task conditioning for multi-task training (MT10/MT50 conventions) so policies have explicit task context.
- Inspect the dataset task descriptions and the `info["is_success"]` keys when writing post-processing or logging so your success metrics line up with the benchmark.
- Adjust `batch_size`, `steps`, and `eval_freq` to match your compute budget.
- Adjust `batch_size`, `steps`, and `env_eval_freq` to match your compute budget.
+2 -2
View File
@@ -103,7 +103,7 @@ accelerate launch \
--batch_size=32 \
--num_workers=4 \
--log_freq=20 \
--eval_freq=-1 \
--env_eval_freq=-1 \
--save_checkpoint=true \
--save_freq=2000
```
@@ -142,7 +142,7 @@ accelerate launch \
--batch_size=32 \
--num_workers=4 \
--log_freq=20 \
--eval_freq=-1 \
--env_eval_freq=-1 \
--save_checkpoint=true \
--save_freq=2000
```
+1 -1
View File
@@ -314,7 +314,7 @@ lerobot-train \
--steps=30000 \
--save_freq=1000 \
--log_freq=100 \
--eval_freq=1000 \
--env_eval_freq=1000 \
--policy.type=multi_task_dit \
--policy.device=cuda \
--policy.horizon=32 \
+1 -1
View File
@@ -166,7 +166,7 @@ lerobot-train \
--output_dir=./outputs/smolvla_robocasa_CloseFridge \
--steps=100000 \
--batch_size=4 \
--eval_freq=5000 \
--env_eval_freq=5000 \
--eval.batch_size=1 \
--eval.n_episodes=5 \
--save_freq=10000
+1 -1
View File
@@ -165,7 +165,7 @@ lerobot-train \
--output_dir=./outputs/smolvla_vlabench_primitive \
--steps=100000 \
--batch_size=4 \
--eval_freq=5000 \
--env_eval_freq=5000 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--save_freq=10000
@@ -1,79 +0,0 @@
#!/usr/bin/env python
"""Convert a legacy LeRobot checkpoint to the current processor-pipeline format.
Older hub checkpoints (e.g. ``lerobot/act_aloha_sim_insertion_human``) bake
normalization stats into the model weights and do not ship
``policy_preprocessor.json`` / ``policy_postprocessor.json``. Current ``main``
loads those processor configs from the checkpoint, so eval/rollout fail with
``FileNotFoundError: Could not find 'policy_preprocessor.json'``.
This script rebuilds the processors from the training dataset's stats and saves
a pipeline-format checkpoint locally that ``lerobot-eval`` can consume directly.
Usage:
python examples/onnx/convert_legacy_checkpoint.py \
--policy-path=lerobot/act_aloha_sim_insertion_human \
--dataset-repo-id=lerobot/aloha_sim_insertion_human \
--output-dir=outputs/converted/act_aloha_sim_insertion_human
Then:
lerobot-eval \
--policy.path=outputs/converted/act_aloha_sim_insertion_human \
--env.type=aloha --env.task=AlohaInsertion-v0 \
--eval.batch_size=10 --eval.n_episodes=50 \
--eval.use_async_envs=false --policy.device=cuda
"""
import argparse
from pathlib import Path
from lerobot.configs.policies import PreTrainedConfig
from lerobot.datasets.dataset_metadata import LeRobotDatasetMetadata
from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.utils.constants import (
POLICY_POSTPROCESSOR_DEFAULT_NAME,
POLICY_PREPROCESSOR_DEFAULT_NAME,
)
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--policy-path", required=True, help="Legacy checkpoint repo id or local dir")
parser.add_argument(
"--dataset-repo-id",
required=True,
help="Training dataset repo id, used only for normalization stats",
)
parser.add_argument("--output-dir", required=True, help="Where to save the converted checkpoint")
parser.add_argument("--device", default="cpu", help="Device for building the policy (cpu is fine)")
args = parser.parse_args()
out = Path(args.output_dir)
out.mkdir(parents=True, exist_ok=True)
print(f"[1/4] Loading dataset stats from '{args.dataset_repo_id}' (metadata only)...")
ds_meta = LeRobotDatasetMetadata(args.dataset_repo_id)
print(f"[2/4] Loading policy weights from '{args.policy_path}'...")
cfg = PreTrainedConfig.from_pretrained(args.policy_path)
cfg.pretrained_path = args.policy_path
cfg.device = args.device
policy = make_policy(cfg, ds_meta=ds_meta)
print("[3/4] Building processors from dataset stats...")
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy.config,
dataset_stats=ds_meta.stats,
)
print(f"[4/4] Saving pipeline-format checkpoint to '{out}'...")
policy.save_pretrained(out)
preprocessor.save_pretrained(out, config_filename=f"{POLICY_PREPROCESSOR_DEFAULT_NAME}.json")
postprocessor.save_pretrained(out, config_filename=f"{POLICY_POSTPROCESSOR_DEFAULT_NAME}.json")
print(f"\nDone. Converted checkpoint at: {out}")
print("Eval it with --policy.path=" + str(out))
if __name__ == "__main__":
main()
-179
View File
@@ -1,179 +0,0 @@
#!/usr/bin/env python
"""Evaluate an ACT policy in sim with either the PyTorch or ONNX network.
The ONNX backend swaps only ``policy.model`` (ResNet + transformer + action head)
with an onnxruntime session. Everything else - the LeRobot processor pipeline
(normalization), the action queue, and the gym env - is identical, so any
difference in success rate is attributable to the network backend alone.
Run both backends with the same seed to compare:
python examples/onnx/eval_act_onnx.py \
--policy-path=lerobot/act_aloha_sim_transfer_cube_human \
--task=AlohaTransferCube-v0 \
--backend=torch --n-episodes=50 --batch-size=10 --device=cuda
python examples/onnx/eval_act_onnx.py \
--policy-path=lerobot/act_aloha_sim_transfer_cube_human \
--task=AlohaTransferCube-v0 \
--onnx=outputs/onnx/act_transfer_cube.onnx \
--backend=onnx --n-episodes=50 --batch-size=10 --device=cuda
"""
import argparse
from pathlib import Path
import numpy as np
import torch
from torch import nn
from lerobot.envs.factory import make_env, make_env_config, make_env_pre_post_processors
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.scripts.lerobot_eval import eval_policy
from lerobot.utils.constants import OBS_ENV_STATE, OBS_IMAGES, OBS_STATE
from lerobot.utils.random_utils import set_seed
class ONNXACTModel(nn.Module):
"""Drop-in replacement for ``ACTPolicy.model`` backed by onnxruntime."""
def __init__(self, onnx_path: str, image_keys: list[str], has_state: bool, has_env_state: bool, device: str):
super().__init__()
import onnxruntime as ort
providers = (
["CUDAExecutionProvider", "CPUExecutionProvider"]
if str(device).startswith("cuda")
else ["CPUExecutionProvider"]
)
so = ort.SessionOptions()
so.log_severity_level = 3
self.sess = ort.InferenceSession(onnx_path, sess_options=so, providers=providers)
self.image_keys = image_keys
self.has_state = has_state
self.has_env_state = has_env_state
print(f"[onnx] providers in use: {self.sess.get_providers()}")
def forward(self, batch: dict):
if self.has_state:
state = batch[OBS_STATE]
else:
state = batch[OBS_ENV_STATE]
ref = state
ort_inputs = {"state": state.detach().cpu().numpy().astype(np.float32)}
images = batch[OBS_IMAGES]
for i, img in enumerate(images):
ort_inputs[f"image_{i}"] = img.detach().cpu().numpy().astype(np.float32)
out = self.sess.run(None, ort_inputs)[0]
actions = torch.from_numpy(out).to(ref.device, dtype=ref.dtype)
return actions, None
def load_stats_from_checkpoint(policy_path: str, input_features, output_features) -> dict:
"""Recover MEAN_STD stats baked into a legacy ACT checkpoint's safetensors buffers.
Legacy checkpoints store normalization as buffers like
``normalize_inputs.buffer_observation_state.{mean,std}``. We map those back to
feature names so we can rebuild the processor pipeline without the dataset.
"""
from safetensors.torch import load_file
p = Path(policy_path)
if p.is_dir():
st_path = p / "model.safetensors"
else:
from huggingface_hub import hf_hub_download
st_path = Path(hf_hub_download(policy_path, "model.safetensors"))
sd = load_file(str(st_path))
stats: dict = {}
for feat in list(input_features) + list(output_features):
buf = "buffer_" + feat.replace(".", "_")
for prefix in ("normalize_inputs", "normalize_targets", "unnormalize_outputs"):
mkey, skey = f"{prefix}.{buf}.mean", f"{prefix}.{buf}.std"
if mkey in sd and skey in sd:
stats[feat] = {"mean": sd[mkey].numpy(), "std": sd[skey].numpy()}
break
return stats
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--policy-path", required=True)
parser.add_argument("--task", required=True, help="e.g. AlohaTransferCube-v0")
parser.add_argument("--env-type", default="aloha")
parser.add_argument("--backend", choices=["torch", "onnx"], default="torch")
parser.add_argument("--onnx", default=None, help="Path to .onnx (required for --backend=onnx)")
parser.add_argument("--n-episodes", type=int, default=50)
parser.add_argument("--batch-size", type=int, default=10)
parser.add_argument("--device", default="cuda")
parser.add_argument("--seed", type=int, default=1000)
args = parser.parse_args()
if args.backend == "onnx" and not args.onnx:
raise SystemExit("--backend=onnx requires --onnx=<path>")
device = "cuda" if (args.device == "cuda" and torch.cuda.is_available()) else "cpu"
set_seed(args.seed)
print(f"[1/4] Loading ACT policy from '{args.policy_path}'...")
policy = ACTPolicy.from_pretrained(args.policy_path)
policy.config.device = device
policy.eval()
policy.to(device)
cfg = policy.config
if args.backend == "onnx":
image_keys = list(cfg.image_features)
has_state = cfg.robot_state_feature is not None
has_env_state = cfg.env_state_feature is not None
print(f"[2/4] Swapping policy.model with ONNX backend ({args.onnx})")
policy.model = ONNXACTModel(args.onnx, image_keys, has_state, has_env_state, device)
policy.to(device)
else:
print("[2/4] Using PyTorch backend")
print("[3/4] Building processors and environment...")
stats = load_stats_from_checkpoint(args.policy_path, cfg.input_features, cfg.output_features)
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg,
dataset_stats=stats,
preprocessor_overrides={"device_processor": {"device": device}},
)
env_cfg = make_env_config(args.env_type, task=args.task)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=env_cfg, policy_cfg=cfg)
env_groups = make_env(env_cfg, n_envs=args.batch_size, use_async_envs=False)
# make_env returns {task_group: {idx: VectorEnv}}; grab the single env.
first_group = next(iter(env_groups.values()))
env = next(iter(first_group.values()))
print(f"[4/4] Evaluating backend='{args.backend}' for {args.n_episodes} episodes (seed={args.seed})...")
with torch.no_grad():
info = eval_policy(
env=env,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=args.n_episodes,
start_seed=args.seed,
)
agg = info["aggregated"]
print("\n==== RESULT ====")
print(f"backend : {args.backend}")
print(f"task : {args.task}")
print(f"n_episodes : {args.n_episodes}")
print(f"pc_success : {agg['pc_success']:.1f}%")
print(f"avg_max_reward: {agg['avg_max_reward']:.4f}")
print(f"eval_ep_s : {agg['eval_ep_s']:.2f}s")
env.close()
if __name__ == "__main__":
main()
-133
View File
@@ -1,133 +0,0 @@
#!/usr/bin/env python
"""Export an ACT policy's network to ONNX and verify numerical parity.
Only the inference network is exported (ResNet backbone + transformer enc/dec +
action head). The VAE encoder is training-only and the inference latent is zeros,
so the exported graph is a pure function of (state, images) -> action_chunk.
Normalization stays in the LeRobot processor pipeline (outside ONNX).
Usage:
python examples/onnx/export_act.py \
--policy-path=outputs/converted/act_aloha_sim_transfer_cube_human \
--output=outputs/onnx/act_transfer_cube.onnx
"""
import argparse
from pathlib import Path
import numpy as np
import torch
from torch import nn
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.utils.constants import OBS_ENV_STATE, OBS_IMAGES, OBS_STATE
class ACTExportWrapper(nn.Module):
"""Tensor-in/tensor-out wrapper around ACT's inference network."""
def __init__(self, model: nn.Module, image_keys: list[str], has_state: bool, has_env_state: bool):
super().__init__()
self.model = model
self.image_keys = image_keys
self.has_state = has_state
self.has_env_state = has_env_state
def forward(self, state: torch.Tensor, *images: torch.Tensor) -> torch.Tensor:
batch: dict = {}
if self.has_state:
batch[OBS_STATE] = state
if self.has_env_state:
# Convention: when env_state is used it is passed as `state`.
batch[OBS_ENV_STATE] = state
batch[OBS_IMAGES] = list(images)
actions, _ = self.model(batch)
return actions
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--policy-path", required=True, help="Converted ACT checkpoint dir or repo id")
parser.add_argument("--output", required=True, help="Output .onnx path")
parser.add_argument("--opset", type=int, default=17)
parser.add_argument("--atol", type=float, default=1e-3)
parser.add_argument("--device", default="cpu")
args = parser.parse_args()
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
print(f"[1/4] Loading ACT policy from '{args.policy_path}'...")
policy = ACTPolicy.from_pretrained(args.policy_path)
policy.eval()
policy.to(args.device)
cfg = policy.config
image_keys = list(cfg.image_features)
has_state = cfg.robot_state_feature is not None
has_env_state = cfg.env_state_feature is not None
state_dim = (cfg.robot_state_feature or cfg.env_state_feature).shape[0]
print(f" image_keys={image_keys} state_dim={state_dim} "
f"chunk_size={cfg.chunk_size} action_dim={cfg.action_feature.shape[0]}")
wrapper = ACTExportWrapper(policy.model, image_keys, has_state, has_env_state).eval().to(args.device)
# Build example inputs (batch size 1) from the config feature shapes.
state_example = torch.randn(1, state_dim, device=args.device)
image_examples = [
torch.rand(1, *cfg.image_features[k].shape, device=args.device) for k in image_keys
]
example_inputs = (state_example, *image_examples)
input_names = ["state"] + [f"image_{i}" for i in range(len(image_keys))]
output_names = ["action_chunk"]
dynamic_axes = {name: {0: "batch"} for name in input_names + output_names}
print(f"[2/4] Exporting to ONNX (opset {args.opset}) -> {out}")
torch.onnx.export(
wrapper,
example_inputs,
str(out),
input_names=input_names,
output_names=output_names,
dynamic_axes=dynamic_axes,
opset_version=args.opset,
do_constant_folding=True,
dynamo=False,
)
print("[3/4] Running parity check (torch vs onnxruntime)...")
import onnxruntime as ort
providers = ["CPUExecutionProvider"]
so = ort.SessionOptions()
so.log_severity_level = 3
sess = ort.InferenceSession(str(out), sess_options=so, providers=providers)
# Fresh random inputs for the check.
state_check = torch.randn(2, state_dim, device=args.device)
image_check = [torch.rand(2, *cfg.image_features[k].shape, device=args.device) for k in image_keys]
with torch.no_grad():
torch_out = wrapper(state_check, *image_check).cpu().numpy()
ort_inputs = {"state": state_check.cpu().numpy()}
for i, img in enumerate(image_check):
ort_inputs[f"image_{i}"] = img.cpu().numpy()
ort_out = sess.run(None, ort_inputs)[0]
max_abs = float(np.max(np.abs(torch_out - ort_out)))
mean_abs = float(np.mean(np.abs(torch_out - ort_out)))
print(f" shapes: torch={torch_out.shape} onnx={ort_out.shape}")
print(f" max_abs_diff={max_abs:.3e} mean_abs_diff={mean_abs:.3e} (atol={args.atol:.0e})")
ok = max_abs <= args.atol
print(f"[4/4] Parity: {'PASS' if ok else 'FAIL'}")
if not ok:
raise SystemExit(f"Parity check failed: max_abs_diff {max_abs:.3e} > atol {args.atol:.0e}")
print(f"\nDone. ONNX model at: {out}")
if __name__ == "__main__":
main()
@@ -54,7 +54,6 @@ from typing import Any
import pyarrow as pa
import pyarrow.parquet as pq
from lerobot.datasets.io_utils import write_table_one_row_group_per_episode
from lerobot.datasets.language import (
EVENT_ONLY_STYLES,
LANGUAGE_EVENTS,
@@ -275,11 +274,12 @@ class LanguageColumnsWriter:
new_table = self._materialize_table(
table, per_row_persistent, per_row_events, drop_old=self.drop_existing_subtask_index
)
# Re-emit one row group per episode (a bulk pq.write_table would collapse
# them into one). Write to a sibling tmp path and atomically rename so a
# crash mid-write can't leave a half-written shard.
# Atomic replace: write to a sibling tmp path and rename so a crash
# mid-write can't leave a half-written shard that ``pq.read_table``
# would then fail to open. ``Path.replace`` is atomic on POSIX +
# Windows when source and target sit on the same filesystem.
tmp_path = path.with_suffix(path.suffix + ".tmp")
write_table_one_row_group_per_episode(new_table, tmp_path)
pq.write_table(new_table, tmp_path)
tmp_path.replace(path)
def _materialize_table(
+17 -9
View File
@@ -180,24 +180,32 @@ class WandBLogger:
self._wandb_custom_step_key.add(new_custom_key)
self._wandb.define_metric(new_custom_key, hidden=True)
batch_data = {}
for k, v in d.items():
# Skip the custom step key here, it's added to the batch below.
if custom_step_key is not None and k == custom_step_key:
continue
if isinstance(v, list):
for i, elem in enumerate(v):
if isinstance(elem, (int | float)):
batch_data[f"{mode}/{k}_{i}"] = elem
continue
if not isinstance(v, (int | float | str)):
logging.warning(
f'WandB logging of key "{k}" was ignored as its type "{type(v)}" is not handled by this wrapper.'
)
continue
# Do not log the custom step key itself.
if self._wandb_custom_step_key is not None and k in self._wandb_custom_step_key:
continue
batch_data[f"{mode}/{k}"] = v
if batch_data:
if custom_step_key is not None:
value_custom_step = d[custom_step_key]
data = {f"{mode}/{k}": v, f"{mode}/{custom_step_key}": value_custom_step}
self._wandb.log(data)
continue
self._wandb.log(data={f"{mode}/{k}": v}, step=step)
batch_data[f"{mode}/{custom_step_key}"] = d[custom_step_key]
self._wandb.log(batch_data)
else:
self._wandb.log(data=batch_data, step=step)
def log_video(self, video_path: str, step: int, mode: str = "train"):
if mode not in {"train", "eval"}:
+4
View File
@@ -39,6 +39,8 @@ class DatasetConfig:
# This reduces memory and speeds up DataLoader IPC. The training pipeline handles the conversion.
return_uint8: bool = False
streaming: bool = False
# Fraction of episodes held out per task for offline evaluation (0.0 = disabled).
eval_split: float = 0.0
def __post_init__(self) -> None:
if self.episodes is not None:
@@ -73,6 +75,8 @@ class EvalConfig:
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
use_async_envs: bool = True
# Whether to record eval rollouts as a LeRobot v3.0 dataset on disk.
recording: bool = False
def __post_init__(self) -> None:
if self.batch_size == 0:
+2
View File
@@ -79,6 +79,8 @@ class PreTrainedConfig(draccus.ChoiceRegistry, HubMixin, abc.ABC): # type: igno
# Either the repo ID of a model hosted on the Hub or a path to a directory containing weights
# saved using `Policy.save_pretrained`. If not provided, the policy is initialized from scratch.
pretrained_path: Path | None = None
# Optional Hub revision (commit hash, branch, or tag) to pin the pretrained model version.
pretrained_revision: str | None = None
def __post_init__(self) -> None:
if not self.device or not is_torch_device_available(self.device):
+2
View File
@@ -56,6 +56,8 @@ class RewardModelConfig(draccus.ChoiceRegistry, HubMixin, abc.ABC):
device: str | None = None
pretrained_path: str | None = None
# Optional Hub revision (commit hash, branch, or tag) to pin the pretrained reward model version.
pretrained_revision: str | None = None
push_to_hub: bool = False
repo_id: str | None = None
+6 -1
View File
@@ -100,8 +100,13 @@ class TrainPipelineConfig(HubMixin):
prefetch_factor: int = 4
persistent_workers: bool = True
steps: int = 100_000
eval_freq: int = 20_000
# Run policy in the simulation environment every N steps to measure reward/success (0 = disabled).
env_eval_freq: int = 20_000
log_freq: int = 200
# Compute eval loss on held-out episodes every N steps (0 = disabled). Requires eval_split > 0.
eval_steps: int = 0
# Cap on total eval samples, split uniformly across tasks (0 = use all held-out data).
max_eval_samples: int = 0
tolerance_s: float = 1e-4
save_checkpoint: bool = True
# Checkpoint is saved every `save_freq` training iterations and after the last training step.
+2 -1
View File
@@ -35,7 +35,7 @@ from .dataset_tools import (
remove_feature,
split_dataset,
)
from .factory import make_dataset, resolve_delta_timestamps
from .factory import make_dataset, make_train_eval_datasets, resolve_delta_timestamps
from .image_writer import safe_stop_image_writer
from .io_utils import load_episodes, write_stats
from .language import (
@@ -89,6 +89,7 @@ __all__ = [
"get_feature_stats",
"load_episodes",
"make_dataset",
"make_train_eval_datasets",
"merge_datasets",
"modify_features",
"modify_tasks",
-9
View File
@@ -32,7 +32,6 @@ from .feature_utils import features_equal_for_merge, get_hf_features_from_featur
from .io_utils import (
get_file_size_in_mb,
get_parquet_file_size_in_mb,
to_parquet_one_row_group_per_episode,
to_parquet_with_hf_images,
write_info,
write_stats,
@@ -552,7 +551,6 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si
aggr_root=dst_meta.root,
hf_features=hf_features,
concatenate=concatenate_data,
one_row_group_per_episode=True,
)
# Record the mapping from source to actual destination
@@ -630,7 +628,6 @@ def append_or_create_parquet_file(
aggr_root: Path = None,
hf_features: datasets.Features | None = None,
concatenate: bool = True,
one_row_group_per_episode: bool = False,
) -> tuple[dict[str, int], tuple[int, int]]:
"""Appends data to an existing parquet file or creates a new one based on size constraints.
@@ -648,8 +645,6 @@ def append_or_create_parquet_file(
aggr_root: Root path for the aggregated dataset.
hf_features: Optional HuggingFace Features schema for proper image typing.
concatenate: When False, always rotate to a new file instead of appending to the current one.
one_row_group_per_episode: True for DATA parquet (emit one row group per episode); False for
the episodes-metadata parquet (already one row per episode).
Returns:
tuple: (updated_idx, (dst_chunk, dst_file)) where updated_idx is the index dict
@@ -662,8 +657,6 @@ def append_or_create_parquet_file(
dst_path.parent.mkdir(parents=True, exist_ok=True)
if contains_images:
to_parquet_with_hf_images(df, dst_path, features=hf_features)
elif one_row_group_per_episode:
to_parquet_one_row_group_per_episode(df, dst_path)
else:
df.to_parquet(dst_path)
return idx, (dst_chunk, dst_file)
@@ -690,8 +683,6 @@ def append_or_create_parquet_file(
if contains_images:
to_parquet_with_hf_images(final_df, target_path, features=hf_features)
elif one_row_group_per_episode:
to_parquet_one_row_group_per_episode(final_df, target_path)
else:
final_df.to_parquet(target_path)
+79
View File
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import math
from pprint import pformat
import torch
@@ -130,3 +131,81 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas
dataset.meta.stats[key][stats_type] = torch.tensor(stats, dtype=torch.float32)
return dataset
def make_train_eval_datasets(
cfg: TrainPipelineConfig,
) -> tuple[LeRobotDataset | MultiLeRobotDataset, LeRobotDataset | None]:
"""Create train and optional eval datasets by splitting episodes based on eval_split.
The last ceil(n_episodes * eval_split) episodes per task are held out for evaluation.
If eval_split == 0.0, returns (full_dataset, None).
"""
full_dataset = make_dataset(cfg)
if cfg.dataset.eval_split == 0.0:
return full_dataset, None
base_episodes = (
full_dataset.episodes if full_dataset.episodes is not None else list(range(full_dataset.num_episodes))
)
episode_tasks = full_dataset.meta.episodes["tasks"]
task_to_episodes: dict[str, list[int]] = {}
for ep_idx in base_episodes:
task_key = episode_tasks[ep_idx][0] if episode_tasks[ep_idx] else ""
task_to_episodes.setdefault(task_key, []).append(ep_idx)
train_episodes, eval_episodes = [], []
for eps in task_to_episodes.values():
n_eval = math.ceil(len(eps) * cfg.dataset.eval_split)
train_episodes.extend(eps[: len(eps) - n_eval])
eval_episodes.extend(eps[len(eps) - n_eval :])
if not train_episodes:
raise ValueError(
f"eval_split={cfg.dataset.eval_split} leaves 0 training episodes from {len(base_episodes)} total."
)
logging.info(
f"Train/eval split: {len(train_episodes)} train, {len(eval_episodes)} eval "
f"(eval_split={cfg.dataset.eval_split}, {len(task_to_episodes)} tasks)"
)
delta_timestamps = resolve_delta_timestamps(cfg.trainable_config, full_dataset.meta)
train_image_transforms = (
ImageTransforms(cfg.dataset.image_transforms) if cfg.dataset.image_transforms.enable else None
)
train_dataset = LeRobotDataset(
cfg.dataset.repo_id,
root=cfg.dataset.root,
episodes=train_episodes,
delta_timestamps=delta_timestamps,
image_transforms=train_image_transforms,
revision=cfg.dataset.revision,
video_backend=cfg.dataset.video_backend,
return_uint8=True,
tolerance_s=cfg.tolerance_s,
)
eval_dataset = LeRobotDataset(
cfg.dataset.repo_id,
root=cfg.dataset.root,
episodes=eval_episodes,
delta_timestamps=delta_timestamps,
image_transforms=None,
revision=cfg.dataset.revision,
video_backend=cfg.dataset.video_backend,
return_uint8=True,
tolerance_s=cfg.tolerance_s,
)
if cfg.dataset.use_imagenet_stats:
for ds in (train_dataset, eval_dataset):
for key in ds.meta.camera_keys:
for stats_type, stats in IMAGENET_STATS.items():
ds.meta.stats[key][stats_type] = torch.tensor(stats, dtype=torch.float32)
return train_dataset, eval_dataset
+10 -39
View File
@@ -20,7 +20,6 @@ import datasets
import numpy as np
import pandas
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as pa_ds
import pyarrow.parquet as pq
import torch
@@ -154,7 +153,7 @@ def cast_stats_to_numpy(stats: dict) -> dict[str, dict[str, np.ndarray]]:
Returns:
dict: The statistics dictionary with values cast to numpy arrays.
"""
stats = {key: np.array(value) for key, value in flatten_dict(stats).items()}
stats = {key: np.atleast_1d(np.array(value)) for key, value in flatten_dict(stats).items()}
return unflatten_dict(stats)
@@ -271,49 +270,21 @@ def hf_transform_to_torch(items_dict: dict[str, list[Any]]) -> dict[str, list[to
return items_dict
def write_table_one_row_group_per_episode(table: pa.Table, path: Path) -> None:
"""Write ``table`` with one parquet row group per episode (in episode order).
Keeps shards random-access friendly (``read_row_group(i)`` fetches episode i),
mirroring the recording writer. ``table`` must carry a contiguous
``episode_index`` column.
"""
episode_index = table.column("episode_index").to_numpy(zero_copy_only=False)
starts = np.concatenate(([0], np.nonzero(np.diff(episode_index))[0] + 1))
writer = pq.ParquetWriter(str(path), table.schema, compression="snappy", use_dictionary=True)
try:
for start, stop in zip(starts, np.append(starts[1:], len(episode_index)), strict=True):
writer.write_table(table.slice(start, stop - start)) # one episode -> one row group
finally:
writer.close()
def to_parquet_with_hf_images(
df: pandas.DataFrame, path: Path, features: datasets.Features | None = None
) -> None:
"""Write a DataFrame with HF-encoded images to parquet, one row group per episode.
"""This function correctly writes to parquet a panda DataFrame that contains images encoded by HF dataset.
This way, it can be loaded by HF dataset and correctly formatted images are returned.
Images are embedded into the arrow table first (``ParquetWriter.write_table``
does not embed external image files like ``Dataset.to_parquet`` does).
``features`` types image columns as ``Image()`` in the parquet schema.
Args:
df: DataFrame to write to parquet.
path: Path to write the parquet file.
features: Optional HuggingFace Features schema. If provided, ensures image columns
are properly typed as Image() in the parquet schema.
"""
# TODO(qlhoest): replace this weird synthax by `df.to_parquet(path)` only
ds = datasets.Dataset.from_dict(df.to_dict(orient="list"), features=features)
ds = embed_images(ds)
table = ds.with_format("arrow")[:]
if "episode_index" in table.column_names:
write_table_one_row_group_per_episode(table, path)
else:
# No episode boundaries to align row groups to — keep a single write.
pq.write_table(table, str(path))
def to_parquet_one_row_group_per_episode(df: pandas.DataFrame, path: Path) -> None:
"""Write a (non-image) DataFrame to parquet with one row group per episode."""
table = pa.Table.from_pandas(df, preserve_index=False)
if "episode_index" in table.column_names:
write_table_one_row_group_per_episode(table, path)
else:
pq.write_table(table, str(path))
ds.to_parquet(path)
def item_to_torch(item: dict) -> dict:
+2
View File
@@ -474,6 +474,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
if reader.hf_dataset is None:
# One-shot load after finalize()
reader.load_and_activate()
if reader._absolute_to_relative_idx is not None and idx in reader._absolute_to_relative_idx:
idx = reader._absolute_to_relative_idx[idx]
return reader.get_item(idx)
def select_columns(self, column_names: str | list[str]):
+3 -5
View File
@@ -70,21 +70,19 @@ def aggregate_pipeline_dataset_features(
initial_features: dict[PipelineFeatureType, dict[str, Any]],
*,
use_videos: bool = True,
exclude_images: bool = False,
patterns: Sequence[str] | None = None,
) -> dict[str, dict]:
"""
Aggregates and filters pipeline features to create a dataset-ready features dictionary.
This function transforms initial features using the pipeline, categorizes them as action or observations
(image or state), filters them based on `exclude_images` and `patterns`, and finally
(image or state), filters them based on `use_videos` and `patterns`, and finally
formats them for use with a Hugging Face LeRobot Dataset.
Args:
pipeline: The DataProcessorPipeline to apply.
initial_features: A dictionary of raw feature specs for actions and observations.
use_videos: Controls the storage dtype for image features. If True, images are stored as "video"; if False, they are stored as "image".
exclude_images: If True, image features are dropped entirely from the output.
use_videos: If False, image features are excluded.
patterns: A sequence of regex patterns to filter action and state features.
Image features are not affected by this filter.
@@ -122,7 +120,7 @@ def aggregate_pipeline_dataset_features(
)
# 2. Apply filtering rules.
if is_image and exclude_images:
if is_image and not use_videos:
continue
if not is_image and not should_keep(key, compiled_patterns):
continue
+20
View File
@@ -126,6 +126,26 @@ def preprocess_observation(observations: dict[str, np.ndarray]) -> dict[str, Ten
if "camera_obs" in observations:
return_observations[f"{OBS_STR}.camera_obs"] = observations["camera_obs"]
# Pass through any remaining ndarray/tensor keys not already handled above,
# so env plugins can expose extra observation keys via get_env_processors().
_handled = {"pixels", "environment_state", "agent_pos", "robot_state", "policy", "camera_obs"}
for key, value in observations.items():
if key in _handled:
continue
target = f"{OBS_STR}.{key}"
if target in return_observations:
continue
if isinstance(value, np.ndarray):
val = torch.from_numpy(value).float()
if val.dim() == 1:
val = val.unsqueeze(0)
return_observations[target] = val
elif isinstance(value, Tensor):
val = value.float()
if val.dim() == 1:
val = val.unsqueeze(0)
return_observations[target] = val
return return_observations
+1 -1
View File
@@ -148,7 +148,7 @@ class ACTPolicy(PreTrainedPolicy):
l1_loss = (abs_err * valid_mask).sum() / num_valid.clamp_min(1)
loss_dict = {"l1_loss": l1_loss.item()}
if self.config.use_vae:
if self.config.use_vae and log_sigma_x2_hat is not None:
# Calculate Dₖₗ(latent_pdf || standard_normal). Note: After computing the KL-divergence for
# each dimension independently, we sum over the latent dimension to get the total
# KL-divergence per batch element, then take the mean over the batch.
@@ -101,11 +101,23 @@ class DiffusionPolicy(PreTrainedPolicy):
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor:
"""Predict a chunk of actions given environment observations."""
# stack n latest observations from the queue
batch = {k: torch.stack(list(self._queues[k]), dim=1) for k in batch if k in self._queues}
actions = self.diffusion.generate_actions(batch, noise=noise)
"""Predict a chunk of actions given environment observations.
Supports two modes:
- Online (queues populated via select_action): stacks observations from internal queues.
- Offline (empty queues, e.g. dataloader batch): uses the batch directly.
"""
queues_populated = any(len(q) > 0 for q in self._queues.values())
if queues_populated:
batch = {k: torch.stack(list(self._queues[k]), dim=1) for k in batch if k in self._queues}
else:
batch = dict(batch)
if self.config.image_features:
for key in self.config.image_features:
if batch[key].ndim == 4:
batch[key] = batch[key].unsqueeze(1)
batch[OBS_IMAGES] = torch.stack([batch[key] for key in self.config.image_features], dim=-4)
actions = self.diffusion.generate_actions(batch, noise=noise)
return actions
@torch.no_grad()
+4
View File
@@ -252,6 +252,7 @@ class ProcessorConfigKwargs(TypedDict, total=False):
def make_pre_post_processors(
policy_cfg: PreTrainedConfig,
pretrained_path: str | None = None,
pretrained_revision: str | None = None,
**kwargs: Unpack[ProcessorConfigKwargs],
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
@@ -309,6 +310,7 @@ def make_pre_post_processors(
overrides=kwargs.get("preprocessor_overrides", {}),
to_transition=batch_to_transition,
to_output=transition_to_batch,
revision=pretrained_revision,
)
postprocessor = PolicyProcessorPipeline.from_pretrained(
pretrained_model_name_or_path=pretrained_path,
@@ -318,6 +320,7 @@ def make_pre_post_processors(
overrides=kwargs.get("postprocessor_overrides", {}),
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
revision=pretrained_revision,
)
_reconnect_relative_absolute_steps(preprocessor, postprocessor)
return preprocessor, postprocessor
@@ -557,6 +560,7 @@ def make_policy(
# Load a pretrained policy and override the config if needed (for example, if there are inference-time
# hyperparameters that we want to vary).
kwargs["pretrained_name_or_path"] = cfg.pretrained_path
kwargs["revision"] = cfg.pretrained_revision
policy = policy_cls.from_pretrained(**kwargs)
elif cfg.pretrained_path and cfg.use_peft:
# Load a pretrained PEFT model on top of the policy. The pretrained path points to the folder/repo
+1
View File
@@ -124,6 +124,7 @@ def make_reward_model(cfg: RewardModelConfig, **kwargs) -> PreTrainedRewardModel
if cfg.pretrained_path:
kwargs["pretrained_name_or_path"] = cfg.pretrained_path
kwargs["revision"] = cfg.pretrained_revision
reward_model = reward_cls.from_pretrained(**kwargs)
else:
reward_model = reward_cls(**kwargs)
+159 -19
View File
@@ -72,8 +72,9 @@ from termcolor import colored
from torch import Tensor, nn
from tqdm import trange
from lerobot.configs import parser
from lerobot.configs import FeatureType, parser
from lerobot.configs.eval import EvalPipelineConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.envs import (
check_env_attributes_and_types,
close_envs,
@@ -84,7 +85,7 @@ from lerobot.envs import (
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
from lerobot.processor import PolicyProcessorPipeline
from lerobot.types import PolicyAction
from lerobot.utils.constants import ACTION, DONE, OBS_STR, REWARD
from lerobot.utils.constants import ACTION, DONE, OBS_IMAGE, OBS_IMAGES, OBS_STR, REWARD
from lerobot.utils.device_utils import get_safe_torch_device
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.io_utils import write_video
@@ -95,6 +96,81 @@ from lerobot.utils.utils import (
)
def _env_features_to_dataset_features(env_features: dict, raw_obs: dict | None = None) -> dict:
"""Convert EnvConfig.features (PolicyFeature objects) to the plain dict format for LeRobotDataset.create().
If raw_obs is provided, visual feature shapes are inferred from the actual observation
to avoid mismatches between the env config and the real observation resolution.
"""
features = {}
for key, ft in env_features.items():
if ft.type is FeatureType.VISUAL:
shape = tuple(ft.shape)
if raw_obs is not None and key in raw_obs and isinstance(raw_obs[key], np.ndarray):
shape = raw_obs[key].shape[1:] # strip batch dim
elif raw_obs is not None and "pixels" in raw_obs:
pixels = raw_obs["pixels"]
if isinstance(pixels, dict):
for cam_name, img in pixels.items():
if key == f"{OBS_IMAGES}.{cam_name}" or key == cam_name:
shape = img.shape[1:] # strip batch dim
elif key in ("pixels", OBS_IMAGE):
shape = pixels.shape[1:] # strip batch dim
features[key] = {"dtype": "video", "shape": shape, "names": ["height", "width", "channel"]}
else:
shape = tuple(ft.shape)
if raw_obs is not None and key in raw_obs and isinstance(raw_obs[key], np.ndarray):
shape = raw_obs[key].shape[1:] # strip batch dim
features[key] = {"dtype": "float32", "shape": shape, "names": None}
features["next.reward"] = {"dtype": "float32", "shape": (1,), "names": None}
features["next.success"] = {"dtype": "bool", "shape": (1,), "names": None}
features["next.done"] = {"dtype": "bool", "shape": (1,), "names": None}
return features
def _build_raw_frame(
raw_obs: dict,
env_idx: int,
action: np.ndarray,
reward: float,
success: bool,
done: bool,
task: str,
env_features: dict,
) -> dict:
"""Build a dataset frame from raw env observations for one env index.
Keys in the frame match the keys in env_features so they align with the
dataset schema created by _env_features_to_dataset_features().
"""
frame: dict[str, Any] = {}
for key in env_features:
if key == ACTION:
continue
if "pixels" in raw_obs and isinstance(raw_obs["pixels"], dict):
for cam_name, img in raw_obs["pixels"].items():
candidate = f"{OBS_IMAGES}.{cam_name}"
if candidate == key:
frame[key] = img[env_idx]
if key in frame:
continue
if "pixels" in raw_obs and not isinstance(raw_obs["pixels"], dict) and key in ("pixels", OBS_IMAGE):
frame[key] = raw_obs["pixels"][env_idx]
continue
raw_key = key
if raw_key in raw_obs and isinstance(raw_obs[raw_key], np.ndarray):
val = raw_obs[raw_key][env_idx]
if val.dtype == np.float64:
val = val.astype(np.float32)
frame[key] = val
frame[ACTION] = action
frame["next.reward"] = np.atleast_1d(np.float32(reward))
frame["next.success"] = np.atleast_1d(np.bool_(success))
frame["next.done"] = np.atleast_1d(np.bool_(done))
frame["task"] = task
return frame
def rollout(
env: gym.vector.VectorEnv,
policy: PreTrainedPolicy,
@@ -105,6 +181,7 @@ def rollout(
seeds: list[int] | None = None,
return_observations: bool = False,
render_callback: Callable[[gym.vector.VectorEnv], None] | None = None,
recording_dataset: Any | None = None,
) -> dict:
"""Run a batched policy rollout once through a batch of environments.
@@ -145,6 +222,14 @@ def rollout(
if render_callback is not None:
render_callback(env)
raw_observation = deepcopy(observation) if recording_dataset is not None else None
task_desc = ""
if recording_dataset is not None:
try:
task_desc = list(env.call("task_description"))[0]
except (AttributeError, NotImplementedError):
task_desc = ""
all_observations = []
all_actions = []
all_rewards = []
@@ -217,6 +302,26 @@ def rollout(
else:
successes = [False] * env.num_envs
if recording_dataset is not None and raw_observation is not None:
prev_done = done.copy()
for env_idx in range(env.num_envs):
if prev_done[env_idx]:
continue
frame = _build_raw_frame(
raw_observation,
env_idx,
action_numpy[env_idx],
reward[env_idx],
successes[env_idx],
bool(terminated[env_idx] | truncated[env_idx]),
task_desc,
recording_dataset.features,
)
recording_dataset.add_frame(frame)
if terminated[env_idx] or truncated[env_idx]:
recording_dataset.save_episode()
raw_observation = deepcopy(observation)
# Keep track of which environments are done so far.
# Mark the episode as done if we reach the maximum step limit.
# This ensures that the rollout always terminates cleanly at `max_steps`,
@@ -273,6 +378,7 @@ def eval_policy(
videos_dir: Path | None = None,
return_episode_data: bool = False,
start_seed: int | None = None,
recording_dataset: Any | None = None,
) -> dict:
"""
Args:
@@ -361,6 +467,7 @@ def eval_policy(
seeds=list(seeds) if seeds else None,
return_observations=return_episode_data,
render_callback=render_frame if max_episodes_rendered > 0 else None,
recording_dataset=recording_dataset,
)
# Figure out where in each rollout sequence the first done condition was encountered (results after
@@ -563,6 +670,10 @@ def eval_main(cfg: EvalPipelineConfig):
# Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)
recording_dir = Path(cfg.output_dir) / "recordings" if cfg.eval.recording else None
max_episodes_rendered = 0 if cfg.eval.recording else 10
videos_dir = None if cfg.eval.recording else Path(cfg.output_dir) / "videos"
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
info = eval_policy_all(
envs=envs,
@@ -572,10 +683,13 @@ def eval_main(cfg: EvalPipelineConfig):
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
max_episodes_rendered=max_episodes_rendered,
videos_dir=videos_dir,
return_episode_data=False,
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
recording_dir=recording_dir,
env_features=cfg.env.features if cfg.eval.recording else None,
)
print("Overall Aggregated Metrics:")
print(info["overall"])
@@ -618,6 +732,7 @@ def eval_one(
videos_dir: Path | None,
return_episode_data: bool,
start_seed: int | None,
recording_dataset: Any | None = None,
) -> TaskMetrics:
"""Evaluates one task_id of one suite using the provided vec env."""
@@ -635,6 +750,7 @@ def eval_one(
videos_dir=task_videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
recording_dataset=recording_dataset,
)
per_episode = task_result["per_episode"]
@@ -661,6 +777,8 @@ def run_one(
videos_dir: Path | None,
return_episode_data: bool,
start_seed: int | None,
recording_dir: Path | None = None,
env_features: dict | None = None,
):
"""
Run eval_one for a single (task_group, task_id, env).
@@ -672,21 +790,39 @@ def run_one(
task_videos_dir = videos_dir / f"{task_group}_{task_id}"
task_videos_dir.mkdir(parents=True, exist_ok=True)
# Call the existing eval_one (assumed to return TaskMetrics-like dict)
metrics = eval_one(
env,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=task_videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
)
# ensure we always provide video_paths key to simplify accumulation
recording_dataset = None
if recording_dir is not None and env_features is not None:
task_recording_dir = recording_dir / f"{task_group}_{task_id}"
fps = env.unwrapped.metadata.get("render_fps", 30)
sample_obs, _ = env.reset()
features = _env_features_to_dataset_features(env_features, raw_obs=sample_obs)
recording_dataset = LeRobotDataset.create(
repo_id=f"eval_{task_group}_{task_id}",
fps=fps,
features=features,
root=str(task_recording_dir),
use_videos=True,
)
try:
metrics = eval_one(
env,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=task_videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
recording_dataset=recording_dataset,
)
finally:
if recording_dataset is not None:
recording_dataset.finalize()
if max_episodes_rendered > 0:
metrics.setdefault("video_paths", [])
return task_group, task_id, metrics
@@ -702,6 +838,8 @@ def eval_policy_all(
n_episodes: int,
*,
max_episodes_rendered: int = 0,
recording_dir: Path | None = None,
env_features: dict | None = None,
videos_dir: Path | None = None,
return_episode_data: bool = False,
start_seed: int | None = None,
@@ -761,6 +899,8 @@ def eval_policy_all(
videos_dir=videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
recording_dir=recording_dir,
env_features=env_features,
)
if max_parallel_tasks <= 1:
+55 -6
View File
@@ -45,7 +45,8 @@ from lerobot.common.train_utils import (
from lerobot.common.wandb_utils import WandBLogger
from lerobot.configs import parser
from lerobot.configs.train import TrainPipelineConfig
from lerobot.datasets import EpisodeAwareSampler, compute_sampler_state, make_dataset
from lerobot.datasets import EpisodeAwareSampler, compute_sampler_state
from lerobot.datasets.factory import make_train_eval_datasets
from lerobot.envs import close_envs, make_env, make_env_pre_post_processors
from lerobot.optim.factory import make_optimizer_and_scheduler
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
@@ -244,19 +245,19 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
# LeRobotDataset skips its snapshot_download when try_load() succeeds, so no rank re-downloads.
if is_main_process:
logging.info("Creating dataset")
dataset = make_dataset(cfg)
dataset, eval_dataset = make_train_eval_datasets(cfg)
accelerator.wait_for_everyone()
# Other ranks read from the shared copy populated by the main process.
if not is_main_process:
dataset = make_dataset(cfg)
dataset, eval_dataset = make_train_eval_datasets(cfg)
# Create environment used for evaluating checkpoints during training on simulation data.
# On real-world data, no need to create an environment as evaluations are done outside train.py,
# using the eval.py instead, with gym_dora environment and dora-rs.
eval_env = None
if cfg.eval_freq > 0 and cfg.env is not None and is_main_process:
if cfg.env_eval_freq > 0 and cfg.env is not None and is_main_process:
logging.info("Creating env")
eval_env = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs)
@@ -345,6 +346,7 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
pretrained_path=processor_pretrained_path,
pretrained_revision=getattr(cfg.policy, "pretrained_revision", None),
**processor_kwargs,
)
@@ -455,6 +457,31 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
persistent_workers=cfg.persistent_workers and cfg.num_workers > 0,
)
# Build eval dataloader if a held-out split exists
eval_dataloader = None
if eval_dataset is not None:
eval_ds = eval_dataset
if cfg.max_eval_samples > 0 and hasattr(eval_dataset, "hf_dataset"):
task_indices = eval_dataset.hf_dataset["task_index"]
unique_tasks = sorted(set(task_indices))
per_task = max(1, cfg.max_eval_samples // len(unique_tasks))
selected: list[int] = []
for t in unique_tasks:
frames = [i for i, ti in enumerate(task_indices) if ti == t][:per_task]
selected.extend(frames)
eval_ds = torch.utils.data.Subset(eval_dataset, selected)
eval_collate_fn = lerobot_collate_fn if dataset.meta.has_language_columns else None
eval_dataloader = torch.utils.data.DataLoader(
eval_ds,
batch_size=cfg.batch_size,
shuffle=False,
num_workers=cfg.num_workers,
pin_memory=device.type == "cuda",
drop_last=False,
collate_fn=eval_collate_fn,
)
# Prepare everything with accelerator
accelerator.wait_for_everyone()
policy, optimizer, dataloader, lr_scheduler = accelerator.prepare(
@@ -534,7 +561,8 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
train_tracker.step()
is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0
is_saving_step = step % cfg.save_freq == 0 or step == cfg.steps
is_eval_step = cfg.eval_freq > 0 and step % cfg.eval_freq == 0
is_env_eval_step = cfg.env_eval_freq > 0 and step % cfg.env_eval_freq == 0
is_eval_step = cfg.eval_steps > 0 and eval_dataloader is not None and step % cfg.eval_steps == 0
if is_log_step:
# Collective reduce must run on every rank, before the main-process gate below.
@@ -557,6 +585,27 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
wandb_logger.log_dict(wandb_log_dict, step)
train_tracker.reset_averages()
if is_eval_step:
policy.eval()
eval_loss_sum = 0.0
n_eval_batches = 0
with torch.no_grad(), accelerator.autocast():
for eval_batch in eval_dataloader:
for cam_key in dataset.meta.camera_keys:
if cam_key in eval_batch and eval_batch[cam_key].dtype == torch.uint8:
eval_batch[cam_key] = eval_batch[cam_key].to(dtype=torch.float32) / 255.0
eval_batch = preprocessor(eval_batch)
loss, _ = policy.forward(eval_batch)
eval_loss_sum += loss.item()
n_eval_batches += 1
eval_loss = eval_loss_sum / max(n_eval_batches, 1)
policy.train()
if is_main_process:
logging.info(f"step {step}: eval_loss={eval_loss:.4f}")
if wandb_logger:
wandb_logger.log_dict({"eval_loss": eval_loss}, step=step, mode="eval")
if cfg.save_checkpoint and is_saving_step:
if is_main_process:
logging.info(f"Checkpoint policy after step {step}")
@@ -579,7 +628,7 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
accelerator.wait_for_everyone()
if cfg.env and is_eval_step:
if cfg.env and is_env_eval_step:
if is_main_process:
step_id = get_step_identifier(step, cfg.steps)
logging.info(f"Eval policy at step {step}")
+8 -2
View File
@@ -216,9 +216,15 @@ def register_third_party_plugins() -> None:
This function uses `importlib.metadata` to find packages installed in the environment
(including editable installs) starting with 'lerobot_robot_', 'lerobot_camera_',
'lerobot_teleoperator_', or 'lerobot_policy_' and imports them.
'lerobot_teleoperator_', 'lerobot_policy_', or 'lerobot_env_' and imports them.
"""
prefixes = ("lerobot_robot_", "lerobot_camera_", "lerobot_teleoperator_", "lerobot_policy_")
prefixes = (
"lerobot_robot_",
"lerobot_camera_",
"lerobot_teleoperator_",
"lerobot_policy_",
"lerobot_env_",
)
imported: list[str] = []
failed: list[str] = []
-73
View File
@@ -28,7 +28,6 @@ import pytest
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
pytest.importorskip("pandas", reason="pandas is required (install lerobot[dataset])")
import pandas as pd # noqa: E402
import pyarrow.parquet as pq # noqa: E402
from lerobot.annotations.steerable_pipeline.reader import iter_episodes # noqa: E402
@@ -345,78 +344,6 @@ def test_annotation_metadata_sync_allows_non_streaming_load(
assert len(dataset) == 24
def _build_packed_dataset(root: Path, episode_lengths: list[int], *, fps: int = 10) -> Path:
"""Pack several episodes into a single shard (vs build_annotation_dataset's one-per-file),
so the writer's rewrite must re-emit one row group per episode instead of collapsing them."""
from lerobot.datasets.io_utils import write_tasks
from lerobot.utils.io_utils import write_json
data_dir = root / "data" / "chunk-000"
data_dir.mkdir(parents=True, exist_ok=True)
episode_index, frame_index, timestamp, task_index, subtask_index = [], [], [], [], []
for ep, length in enumerate(episode_lengths):
episode_index += [ep] * length
frame_index += list(range(length))
timestamp += [round(i / fps, 6) for i in range(length)]
task_index += [0] * length
subtask_index += [0] * length # legacy column the writer must drop
pd.DataFrame(
{
"episode_index": episode_index,
"frame_index": frame_index,
"timestamp": timestamp,
"task_index": task_index,
"subtask_index": subtask_index,
}
).to_parquet(data_dir / "file-000.parquet", index=False)
tasks_df = pd.DataFrame({"task_index": [0]}, index=pd.Index(["do the thing"], name="task"))
write_tasks(tasks_df, root)
write_json(
{"codebase_version": "v3.1", "fps": fps, "features": {}, "total_episodes": len(episode_lengths)},
root / "meta" / "info.json",
)
return root
def test_writer_one_row_group_per_episode(tmp_path: Path) -> None:
"""Rewriting a packed shard must keep one row group per episode, not collapse
every episode into a single giant row group."""
episode_lengths = [4, 6, 5] # unequal lengths, all in one shard
root = _build_packed_dataset(tmp_path / "ds", episode_lengths)
shard = root / "data" / "chunk-000" / "file-000.parquet"
assert pq.ParquetFile(shard).metadata.num_row_groups == 1, "fixture should start collapsed"
staging_dir = tmp_path / "stage"
for ep in range(len(episode_lengths)):
_stage_episode(
staging_dir,
ep,
plan=[
{
"role": "assistant",
"content": f"subtask for ep {ep}",
"style": "subtask",
"timestamp": 0.0,
"tool_calls": None,
}
],
)
records = list(iter_episodes(root))
LanguageColumnsWriter().write_all(records, staging_dir, root)
# One row group per episode, with row counts matching the episode lengths.
md = pq.ParquetFile(shard).metadata
assert md.num_row_groups == len(episode_lengths)
assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == episode_lengths
# Language columns are still present after the per-episode rewrite.
table = pq.read_table(shard)
assert "language_persistent" in table.column_names
assert "language_events" in table.column_names
def test_speech_atom_shape_matches_plan_spec() -> None:
atom = speech_atom(2.5, "I'm cleaning up!")
assert atom["role"] == "assistant"
-55
View File
@@ -32,26 +32,6 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
from tests.fixtures.constants import DUMMY_REPO_ID
def assert_data_shards_one_row_group_per_episode(root):
"""Every aggregated DATA shard must have exactly one parquet row group per episode."""
import pyarrow.parquet as pq
shards = sorted((root / "data").rglob("*.parquet"))
assert shards, f"no data shards found under {root}/data"
n_episodes = 0
for shard in shards:
pf = pq.ParquetFile(shard)
episodes = pf.read(columns=["episode_index"]).column("episode_index").to_pylist()
assert pf.metadata.num_row_groups == len(set(episodes)), shard
for i in range(pf.metadata.num_row_groups):
rg_episodes = set(
pf.read_row_group(i, columns=["episode_index"]).column("episode_index").to_pylist()
)
assert len(rg_episodes) == 1, f"{shard} row group {i} spans episodes {rg_episodes}"
n_episodes += len(set(episodes))
return n_episodes
def assert_episode_and_frame_counts(aggr_ds, expected_episodes, expected_frames):
"""Test that total number of episodes and frames are correctly aggregated."""
assert aggr_ds.num_episodes == expected_episodes, (
@@ -586,41 +566,6 @@ def assert_image_frames_integrity(aggr_ds, ds_0, ds_1):
)
@pytest.mark.parametrize("use_videos", [True, False], ids=["video", "image"])
def test_aggregate_one_row_group_per_episode(tmp_path, lerobot_dataset_factory, use_videos):
"""Aggregated DATA shards keep one row group per episode (not one collapsed group).
Covers both the non-image (``df.to_parquet``) and image
(``to_parquet_with_hf_images``) write branches, including the merge-into-
existing-file branch via a low file-size threshold that forces packing.
"""
ds_0 = lerobot_dataset_factory(
root=tmp_path / "rg_0",
repo_id=f"{DUMMY_REPO_ID}_rg_0",
total_episodes=3,
total_frames=60,
use_videos=use_videos,
)
ds_1 = lerobot_dataset_factory(
root=tmp_path / "rg_1",
repo_id=f"{DUMMY_REPO_ID}_rg_1",
total_episodes=4,
total_frames=80,
use_videos=use_videos,
)
aggr_root = tmp_path / "rg_aggr"
aggregate_datasets(
repo_ids=[ds_0.repo_id, ds_1.repo_id],
roots=[ds_0.root, ds_1.root],
aggr_repo_id=f"{DUMMY_REPO_ID}_rg_aggr",
aggr_root=aggr_root,
)
n_episodes = assert_data_shards_one_row_group_per_episode(aggr_root)
assert n_episodes == ds_0.num_episodes + ds_1.num_episodes
def test_aggregate_image_datasets(tmp_path, lerobot_dataset_factory):
"""Test aggregation of image-based datasets preserves HuggingFace Image schema.
+3 -21
View File
@@ -2370,32 +2370,14 @@ def test_aggregate_images_when_use_videos_false():
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial},
use_videos=False, # images kept, stored as "image" dtype
use_videos=False, # expect "image" dtype
patterns=None,
)
key = f"{OBS_IMAGES}.back"
key_front = f"{OBS_IMAGES}.front"
assert key in out
assert key_front in out
assert out[key]["dtype"] == "image"
assert out[key_front]["dtype"] == "image"
assert out[key]["shape"] == initial["back"]
def test_aggregate_images_excluded():
rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)])
initial = {"back": (480, 640, 3)}
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial},
exclude_images=True,
patterns=None,
)
assert f"{OBS_IMAGES}.back" not in out
assert f"{OBS_IMAGES}.front" not in out
assert key not in out
assert key_front not in out
def test_aggregate_images_when_use_videos_true():
+2 -2
View File
@@ -134,7 +134,7 @@ class TestMultiGPUTraining:
f"--output_dir={output_dir}",
"--batch_size=4",
"--steps=10",
"--eval_freq=-1",
"--env_eval_freq=-1",
"--log_freq=5",
"--save_freq=10",
"--seed=42",
@@ -177,7 +177,7 @@ class TestMultiGPUTraining:
f"--output_dir={output_dir}",
"--batch_size=4",
"--steps=20",
"--eval_freq=-1",
"--env_eval_freq=-1",
"--log_freq=5",
"--save_freq=10",
"--seed=42",