speed up benchmark eval scheduling and docker workflow

This commit is contained in:
Pepijn Kooijmans
2026-03-21 06:09:01 +01:00
parent f60d163588
commit 285c500aef
18 changed files with 1451 additions and 235 deletions
+59
View File
@@ -0,0 +1,59 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ARG CUDA_VERSION=12.4.1
ARG OS_VERSION=22.04
FROM nvidia/cuda:${CUDA_VERSION}-base-ubuntu${OS_VERSION}
ARG PYTHON_VERSION=3.12
ENV DEBIAN_FRONTEND=noninteractive \
MUJOCO_GL=egl \
PATH=/lerobot/.venv/bin:$PATH
RUN apt-get update && apt-get install -y --no-install-recommends \
software-properties-common build-essential git curl \
libglib2.0-0 libgl1-mesa-glx libegl1-mesa libosmesa6 \
ffmpeg libusb-1.0-0-dev speech-dispatcher libgeos-dev portaudio19-dev \
cmake pkg-config ninja-build \
&& add-apt-repository -y ppa:deadsnakes/ppa \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
python${PYTHON_VERSION} \
python${PYTHON_VERSION}-venv \
python${PYTHON_VERSION}-dev \
&& curl -LsSf https://astral.sh/uv/install.sh | sh \
&& mv /root/.local/bin/uv /usr/local/bin/uv \
&& useradd --create-home --shell /bin/bash user_lerobot \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
WORKDIR /lerobot
RUN chown -R user_lerobot:user_lerobot /lerobot
USER user_lerobot
ENV HOME=/home/user_lerobot \
HF_HOME=/home/user_lerobot/.cache/huggingface \
HF_LEROBOT_HOME=/home/user_lerobot/.cache/huggingface/lerobot \
TORCH_HOME=/home/user_lerobot/.cache/torch \
TRITON_CACHE_DIR=/home/user_lerobot/.cache/triton
RUN uv venv --seed --python python${PYTHON_VERSION}
COPY --chown=user_lerobot:user_lerobot setup.py pyproject.toml README.md MANIFEST.in ./
COPY --chown=user_lerobot:user_lerobot src/ src/
RUN uv pip install --no-cache .
COPY --chown=user_lerobot:user_lerobot . .
CMD ["/bin/bash"]
+22
View File
@@ -0,0 +1,22 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM lerobot-eval-base:latest
RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -m pip install --no-cache-dir ".[libero]" \
&& PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -c "import libero"
CMD ["/bin/bash"]
+25
View File
@@ -0,0 +1,25 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM lerobot-eval-base:latest
RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -m pip install --no-cache-dir ".[libero_plus]" \
&& git clone --depth 1 https://github.com/sylvestf/LIBERO-plus.git /tmp/LIBERO-plus \
&& PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -c "import pathlib, site; pathlib.Path(site.getsitepackages()[0], 'libero_plus_repo.pth').write_text('/tmp/LIBERO-plus\n')" \
&& PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -c "import libero, robosuite, bddl"
CMD ["/bin/bash"]
+22
View File
@@ -0,0 +1,22 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM lerobot-eval-base:latest
RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -m pip install --no-cache-dir ".[metaworld]" \
&& PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -c "import metaworld"
CMD ["/bin/bash"]
+22
View File
@@ -0,0 +1,22 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM lerobot-eval-base:latest
RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -m pip install --no-cache-dir ".[robocasa]" \
&& PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -c "import robocasa"
CMD ["/bin/bash"]
+22
View File
@@ -0,0 +1,22 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM lerobot-eval-base:latest
RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -m pip install --no-cache-dir ".[robomme]" \
&& PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \
/lerobot/.venv/bin/python -c "import robomme"
CMD ["/bin/bash"]
+148 -10
View File
@@ -47,6 +47,116 @@ For multi-GPU training you also need [Accelerate](https://huggingface.co/docs/ac
pip install accelerate
```
## Docker-isolated evaluation (EnvHub)
LeRobot eval now supports running the full eval worker in a Docker container
while keeping policy loading compatible with local checkpoints and local code changes.
Use `lerobot-eval` with `--eval.runtime=docker`:
```bash
lerobot-eval \
--policy.path=outputs/train/my_policy/checkpoints/050000/pretrained_model \
--env.type=libero_plus \
--eval.runtime=docker \
--eval.docker.envhub_ref=envhub://lerobot/libero_plus@v1 \
--eval.n_episodes=10 \
--eval.batch_size=10
```
`eval.docker.envhub_ref` is optional. If omitted, LeRobot resolves a default
image from `env.type`. You can also override the image directly:
```bash
--eval.docker.image=docker://ghcr.io/huggingface/lerobot-eval-libero-plus:latest
```
By default (`eval.docker.use_local_code=true`), the local repository is mounted
in the container and added to `PYTHONPATH`, so edited policy/env code and local
checkpoints continue to work without rebuilding the image for each change.
Common Docker runtime options:
```bash
--eval.docker.pull=true \
--eval.docker.gpus=all \
--eval.docker.shm_size=8g \
--eval.docker.use_local_code=true
```
The benchmark runner supports the same Docker eval path (extra args are
forwarded to each generated `lerobot-eval` call):
```bash
lerobot-benchmark eval \
--benchmarks libero_plus,robocasa \
--hub-user $HF_USER \
--n-episodes 50 \
--eval.runtime=docker \
--eval.docker.pull=true
```
Build benchmark images locally:
```bash
make build-eval-images
```
## Fast single-machine eval tuning
`lerobot-eval` now has two orthogonal throughput knobs:
- `eval.batch_size`: number of sub-envs per task (inside one vector env).
- `env.max_parallel_tasks`: number of tasks scheduled concurrently.
- `eval.instance_count`: number of full eval instances (process-level sharding).
Use them in this order:
1. Increase `eval.batch_size` first for per-task throughput.
2. Then increase `env.max_parallel_tasks` to overlap tasks, while monitoring RAM/VRAM.
3. Optionally increase `eval.instance_count` for process-level parallelism (best with enough CPU/RAM and small models).
The eval logs print the active scheduler mode (`sequential`, `threaded`, or `batched_lazy`) so you can verify the effective concurrency path.
### Suggested starting points
| Benchmark | Conservative | Faster (single GPU) | Notes |
|---|---|---|---|
| `libero` / `libero_plus` | `eval.batch_size=1`, `env.max_parallel_tasks=4` | `eval.batch_size=1`, `env.max_parallel_tasks=16` | For large suite sweeps, increase `max_parallel_tasks` before `batch_size` to avoid MuJoCo memory spikes. |
| `metaworld` | `eval.batch_size=8`, `env.max_parallel_tasks=1` | `eval.batch_size=16`, `env.max_parallel_tasks=2` | Prefer larger per-task vectorization first. |
| `robocasa` | `eval.batch_size=4`, `env.max_parallel_tasks=1` | `eval.batch_size=8`, `env.max_parallel_tasks=2` | Rendering/memory can dominate at high image resolution. |
| `robomme` | `eval.batch_size=4`, `env.max_parallel_tasks=1` | `eval.batch_size=8`, `env.max_parallel_tasks=2` | Start small and scale gradually with task count. |
### Local fast eval recipe
```bash
lerobot-eval \
--policy.path=$HF_USER/smolvla_libero_plus \
--env.type=libero_plus \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--env.max_parallel_tasks=16 \
--eval.instance_count=2 \
--rename_map='{"observation.images.image":"observation.images.camera1","observation.images.image2":"observation.images.camera2"}' \
--output_dir=outputs/eval/smolvla_libero_plus \
--push_to_hub=true
```
### Docker fast eval recipe
```bash
lerobot-eval \
--policy.path=$HF_USER/smolvla_libero_plus \
--env.type=libero_plus \
--eval.runtime=docker \
--eval.docker.envhub_ref=envhub://lerobot/libero_plus@v1 \
--eval.docker.gpus=all \
--eval.docker.shm_size=16g \
--eval.n_episodes=1 \
--eval.batch_size=1 \
--env.max_parallel_tasks=16
```
## Quick start — single benchmark
Train SmolVLA on LIBERO-plus with 4 GPUs for 50 000 steps:
@@ -95,7 +205,7 @@ lerobot-benchmark all \
For each benchmark the runner:
1. Trains a policy on its dataset.
2. Evaluates on every eval task in the benchmark (e.g. 4 suites for LIBERO).
3. Uploads eval results + videos to the Hub.
3. Pushes HF-native `.eval_results` rows (and optional artifacts) to the Hub.
<Tip>
@@ -140,7 +250,9 @@ for SUITE in libero_spatial libero_object libero_goal libero_10; do
--eval.n_episodes=50 \
--eval.batch_size=10 \
--output_dir=outputs/eval/smolvla_libero_plus/$SUITE \
--policy.device=cuda
--policy.device=cuda \
--push_to_hub=true \
--benchmark_dataset_id=lerobot/sim-benchmarks
done
```
@@ -226,28 +338,44 @@ outputs/
Each `eval_info.json` contains per-episode rewards, success rates, and aggregate metrics.
## Uploading eval results to the Hub
## HF Eval Results + Leaderboard
Add `--push-eval-to-hub` to upload evaluation metrics and videos to the policy's
Hub repo after each eval run:
LeRobot publishes benchmark scores using Hugging Face's native
`/.eval_results/*.yaml` format, which powers model-page eval cards and
benchmark leaderboards.
Add `--push-eval-to-hub` to push results after each eval run:
```bash
lerobot-benchmark eval \
--benchmarks libero_plus,robocasa \
--hub-user $HF_USER \
--benchmark-dataset-id lerobot/sim-benchmarks \
--push-eval-to-hub
```
For LIBERO-plus, each suite's results are uploaded to `eval/libero_spatial/`,
`eval/libero_object/`, etc. inside the `$HF_USER/smolvla_libero_plus` model repo.
This writes one or more files under `.eval_results/` in the model repo, for example:
This also works with the `all` subcommand — pass `--push-eval-to-hub` and results
are automatically uploaded after each eval run.
```yaml
- dataset:
id: lerobot/sim-benchmarks
task_id: libero_plus/spatial
value: 82.4
notes: lerobot-eval
```
Notes:
- `--benchmark-dataset-id` points to your consolidated benchmark dataset repo.
- `task_id` values are derived from `env.type` and evaluated suite/task names.
- Eval artifacts (`eval_info.json`, `eval_config.json`, videos) are still uploaded
for provenance, but leaderboard ranking comes from `.eval_results`.
## Passing extra arguments
Any arguments after the recognized flags are forwarded to `lerobot-train` or
`lerobot-eval`. For example, to use PEFT/LoRA during training:
`lerobot-eval`.
Example (training): use PEFT/LoRA during training.
```bash
lerobot-benchmark train \
@@ -258,3 +386,13 @@ lerobot-benchmark train \
--steps 50000 \
--peft.method_type=LORA --peft.r=16
```
Example (evaluation): forward Docker runtime flags to each `lerobot-eval` call.
```bash
lerobot-benchmark eval \
--benchmarks libero_plus \
--hub-user $HF_USER \
--eval.runtime=docker \
--eval.docker.envhub_ref=envhub://lerobot/libero_plus@v1
```
+38 -2
View File
@@ -49,15 +49,51 @@ class WandBConfig:
mode: str | None = None # Allowed values: 'online', 'offline' 'disabled'. Defaults to 'online'
@dataclass
class EvalDockerConfig:
# Docker image to use for evaluation (e.g., "ghcr.io/org/lerobot-eval-libero:latest").
# Takes precedence over eval.envhub_ref.
image: str | None = None
# Optional EnvHub reference to resolve an image, e.g. "envhub://lerobot/libero_plus@v1".
envhub_ref: str | None = None
# If true, mount the local repository and prefer local source code in the container.
use_local_code: bool = True
# Pull the image before running.
pull: bool = True
# Docker --gpus value. Set to None to disable GPU flags and run CPU-only.
gpus: str | None = "all"
# Docker --shm-size value (increase when using larger eval.batch_size values).
shm_size: str = "8g"
@dataclass
class EvalConfig:
n_episodes: int = 50
# `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv.
# Number of sub-envs per task inside one VectorEnv. Increase to improve per-task
# inference throughput until GPU or simulator memory saturates.
batch_size: int = 50
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
# Use AsyncVectorEnv (multiprocessing). Prefer SyncVectorEnv unless your environment
# spends significant time in Python-side stepping and can benefit from process parallelism.
use_async_envs: bool = False
# Runtime where evaluation executes: "local" or "docker".
runtime: str = "local"
docker: EvalDockerConfig = field(default_factory=EvalDockerConfig)
# Number of parallel eval script instances to launch for one run.
# instance_count > 1 enables multi-instance task sharding.
instance_count: int = 1
# 0-indexed shard id for this process. Users usually leave this at 0.
# Additional shards are launched automatically by `lerobot-eval` when instance_count > 1.
instance_id: int = 0
def __post_init__(self) -> None:
if self.runtime not in {"local", "docker"}:
raise ValueError(f"Unsupported eval.runtime '{self.runtime}'. Expected one of: local, docker.")
if self.instance_count < 1:
raise ValueError("eval.instance_count must be >= 1.")
if self.instance_id < 0 or self.instance_id >= self.instance_count:
raise ValueError(
f"eval.instance_id must be in [0, {self.instance_count - 1}] (got {self.instance_id})."
)
if self.batch_size > self.n_episodes:
raise ValueError(
"The eval batch size is greater than the number of eval episodes "
+5 -1
View File
@@ -45,6 +45,10 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
fps: int = 30
features: dict[str, PolicyFeature] = field(default_factory=dict)
features_map: dict[str, str] = field(default_factory=dict)
# Upper bound on concurrent task evaluation in `lerobot-eval`.
# - For lazy wrappers (e.g. LIBERO/LIBERO-plus), values >1 can enable chunked
# task batching with one policy forward pass over multiple tasks.
# - For other envs, values >1 use a threaded task scheduler fallback.
max_parallel_tasks: int = 1
disable_env_checker: bool = True
@@ -356,7 +360,7 @@ class LiberoPlusEnv(LiberoEnv):
in experiment configs (`env.type=libero_plus`).
"""
task: str = "libero_spatial"
task: str = "libero_spatial,libero_object,libero_goal,libero_10"
@EnvConfig.register_subclass("robocasa")
+4 -3
View File
@@ -125,7 +125,7 @@ def make_env(
use_async_envs: bool = False,
hub_cache_dir: str | None = None,
trust_remote_code: bool = False,
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
) -> dict[str, dict[int, Any]]:
"""Makes a gym vector environment according to the config or Hub reference.
Args:
@@ -143,8 +143,9 @@ def make_env(
ModuleNotFoundError: If the requested env package is not installed
Returns:
dict[str, dict[int, gym.vector.VectorEnv]]:
A mapping from suite name to indexed vectorized environments.
dict[str, dict[int, Any]]:
A mapping from suite name to indexed environments. Values are either
materialized vector envs or lazy wrappers that materialize on first use.
- For multi-task benchmarks (e.g., LIBERO): one entry per suite, and one vec env per task_id.
- For single-task environments: a single suite entry (cfg.type) with task_id=0.
+58
View File
@@ -0,0 +1,58 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from collections.abc import Callable, Sequence
from typing import Any
class LazyVectorEnv:
"""Defer vector-env construction until first usage.
This is useful for benchmarks with many tasks: we can register one env object
per task without eagerly allocating all simulator/rendering resources.
"""
def __init__(self, env_cls: Callable[[Sequence[Callable[[], Any]]], Any], factory_fns: list[Callable]):
self._env_cls = env_cls
self._factory_fns = factory_fns
self._env = None
@property
def env_cls(self) -> Callable[[Sequence[Callable[[], Any]]], Any]:
return self._env_cls
@property
def factory_fns(self) -> list[Callable]:
return self._factory_fns
@property
def num_factory_fns(self) -> int:
return len(self._factory_fns)
def materialize(self):
if self._env is None:
self._env = self._env_cls(self._factory_fns)
return self._env
def close(self):
if self._env is not None:
self._env.close()
self._env = None
def __getattr__(self, name):
return getattr(self.materialize(), name)
+196 -20
View File
@@ -75,6 +75,7 @@ if "wand" not in sys.modules:
from libero.libero import benchmark, get_libero_path
from libero.libero.envs import OffScreenRenderEnv
from lerobot.envs.lazy_vec_env import LazyVectorEnv
from lerobot.processor import RobotObservation
_ASSET_DOWNLOAD_INSTRUCTIONS = """\
@@ -104,6 +105,145 @@ def _check_libero_plus_assets() -> None:
raise FileNotFoundError(_ASSET_DOWNLOAD_INSTRUCTIONS.format(assets_dir=assets_dir))
# ---- Perturbation support for LIBERO-Plus -----------------------------------
PERTURBATION_DIMENSIONS = (
"Camera Viewpoints",
"Robot Initial States",
"Language Instructions",
"Light Conditions",
"Background Textures",
"Sensor Noise",
"Objects Layout",
)
PERTURBATION_SHORT_KEYS = {
"Camera Viewpoints": "camera",
"Robot Initial States": "robot",
"Language Instructions": "language",
"Light Conditions": "light",
"Background Textures": "background",
"Sensor Noise": "noise",
"Objects Layout": "layout",
}
def load_task_classification() -> dict:
"""Load task_classification.json shipped with LIBERO-Plus."""
import json
benchmark_root = Path(get_libero_path("benchmark_root"))
candidates = [
benchmark_root / "benchmark" / "task_classification.json",
benchmark_root / "task_classification.json",
benchmark_root.parent / "benchmark" / "task_classification.json",
]
for p in candidates:
if p.exists():
with open(p) as f:
return json.load(f)
raise FileNotFoundError(
f"task_classification.json not found. Tried: {[str(c) for c in candidates]}"
)
def build_perturbation_index(suite_name: str) -> dict[int, str]:
"""Return {0-indexed task_id: perturbation_dimension} for *suite_name*."""
tc = load_task_classification()
suite_data = tc.get(suite_name, {})
index: dict[int, str] = {}
# LIBERO-Plus task_classification.json has appeared in two shapes:
# 1) dict[suite][task_id_str] -> meta
# 2) dict[suite] -> list[{id, category, ...}]
if isinstance(suite_data, list):
for item in suite_data:
if not isinstance(item, dict):
continue
raw_id = item.get("id")
if raw_id is None:
continue
try:
# list-form ids are 1-indexed in current LIBERO-Plus release.
tid = int(raw_id) - 1
except (TypeError, ValueError):
continue
if tid < 0:
continue
dim = item.get("perturbation_type") or item.get("category", "unknown")
index[tid] = dim
return index
if isinstance(suite_data, dict):
# Handle both 0-indexed and 1-indexed key conventions.
numeric_keys: list[int] = []
for k in suite_data:
try:
numeric_keys.append(int(k))
except (TypeError, ValueError):
continue
one_indexed = bool(numeric_keys) and 0 not in numeric_keys and min(numeric_keys) >= 1
for task_id_str, meta in suite_data.items():
try:
tid = int(task_id_str)
except (TypeError, ValueError):
continue
if one_indexed:
tid -= 1
if tid < 0:
continue
if isinstance(meta, dict):
dim = meta.get("perturbation_type") or meta.get("category", "unknown")
else:
dim = "unknown"
index[tid] = dim
return index
return index
def aggregate_by_perturbation(
per_task: list[dict], suite_indices: dict[str, dict[int, str]]
) -> dict[str, dict]:
"""Aggregate per-task eval results by perturbation dimension.
Args:
per_task: list of {"task_group": str, "task_id": int, "metrics": {...}}
suite_indices: {suite_name: {task_id: dimension_name}} from build_perturbation_index
Returns:
{short_key: {"pc_success": float, "n_episodes": int}} for each perturbation dimension
"""
dim_successes: dict[str, list] = defaultdict(list)
for entry in per_task:
suite = entry["task_group"]
tid = entry["task_id"]
idx = suite_indices.get(suite, {})
dim = idx.get(tid, "unknown")
short = PERTURBATION_SHORT_KEYS.get(dim, dim.lower().replace(" ", "_"))
successes = entry["metrics"].get("successes", [])
dim_successes[short].extend(successes)
results: dict[str, dict] = {}
all_successes: list = []
for short_key in list(PERTURBATION_SHORT_KEYS.values()) + ["unknown"]:
if short_key not in dim_successes:
continue
s = dim_successes[short_key]
all_successes.extend(s)
results[short_key] = {
"pc_success": float(np.nanmean(s) * 100) if s else float("nan"),
"n_episodes": len(s),
}
if all_successes:
results["total"] = {
"pc_success": float(np.nanmean(all_successes) * 100),
"n_episodes": len(all_successes),
}
return results
def _parse_camera_names(camera_name: str | Sequence[str]) -> list[str]:
"""Normalize camera_name into a non-empty list of strings."""
if isinstance(camera_name, str):
@@ -143,26 +283,31 @@ def get_task_init_states(task_suite: Any, i: int) -> np.ndarray:
init_states_dir = Path(get_libero_path("init_states")) / task_suite.tasks[i].problem_folder
init_states_file = task_suite.tasks[i].init_states_file
candidate_names = [init_states_file]
# Some LIBERO-plus task names include a "_table_<n>" suffix while shipped
# init files use the base name without that table suffix.
if "_table_" in init_states_file:
candidate_names.append(re.sub(r"_table_\d+(?=\.pruned_init$|\.init$)", "", init_states_file))
# 1. Direct match
direct = init_states_dir / init_states_file
if direct.exists():
return torch.load(direct, weights_only=False) # nosec B614
for name in candidate_names:
candidate_path = init_states_dir / name
if candidate_path.exists():
return torch.load(candidate_path, weights_only=False) # nosec B614
# 2. LIBERO-Plus perturbation filenames append suffixes like
# _view_0_0_100_0_0_initstate_1, _language_19, _noise_45, _table_1, _tb_9, _add_16
# to the base task name. Instead of regex-matching every variant, find the
# longest existing base file whose stem is a prefix of the perturbation stem.
stem, ext = os.path.splitext(init_states_file)
best_match: Path | None = None
best_len = 0
for candidate in init_states_dir.glob(f"*{ext}"):
cstem = candidate.stem
if stem == cstem or (stem.startswith(cstem) and stem[len(cstem)] == "_"):
if len(cstem) > best_len:
best_len = len(cstem)
best_match = candidate
# Last-resort fallback: pick any file matching the base prefix + extension.
stem, suffix = os.path.splitext(init_states_file)
stem = re.sub(r"_table_\d+$", "", stem)
fallback_matches = sorted(init_states_dir.glob(f"{stem}*{suffix}"))
if fallback_matches:
return torch.load(fallback_matches[0], weights_only=False) # nosec B614
if best_match is not None:
return torch.load(best_match, weights_only=False) # nosec B614
raise FileNotFoundError(
f"Could not find init states for task {i}. Tried {candidate_names} in '{init_states_dir}'."
f"Could not find init states for task {i}. "
f"Tried '{init_states_file}' and prefix matching in '{init_states_dir}'."
)
@@ -259,6 +404,7 @@ class LiberoEnv(gym.Env):
# Load once and keep
self._init_states = get_task_init_states(task_suite, self.task_id) if self.init_states else None
self._reset_stride = n_envs # when performing a reset, append `_reset_stride` to `init_state_id`.
self._init_state_error_warned = False
self.init_state_id = self.episode_index # tie each sub-env to a fixed init state
@@ -410,8 +556,21 @@ class LiberoEnv(gym.Env):
self._env.seed(seed)
raw_obs = self._env.reset()
if self.init_states and self._init_states is not None:
raw_obs = self._env.set_init_state(self._init_states[self.init_state_id % len(self._init_states)])
self.init_state_id += self._reset_stride # Change init_state_id when reset
try:
raw_obs = self._env.set_init_state(self._init_states[self.init_state_id % len(self._init_states)])
self.init_state_id += self._reset_stride # Change init_state_id when reset
except Exception as exc:
# Some LIBERO-Plus perturbation tasks (notably object-layout variants)
# can have different simulator state dimensions than their base init files.
# Fall back to plain env.reset() instead of aborting the whole evaluation.
self.init_states = False
if not self._init_state_error_warned:
print(
"WARNING: Failed to apply init state for "
f"task_id={self.task_id} ({self.task}). "
f"Falling back to plain reset. Error: {exc}"
)
self._init_state_error_warned = True
# After reset, objects may be unstable (slightly floating, intersecting, etc.).
# Step the simulator with a no-op action for a few frames so everything settles.
@@ -500,6 +659,9 @@ def _make_env_fns(
return fns
_LazyVecEnv = LazyVectorEnv
# ---- Main API ----------------------------------------------------------------
@@ -543,12 +705,23 @@ def create_libero_envs(
print(f"Restricting to task_ids={task_ids_filter}")
out: dict[str, dict[int, Any]] = defaultdict(dict)
total_tasks = 0
for suite_name in suite_names:
suite = _get_suite(suite_name)
total = len(suite.tasks)
selected = _select_task_ids(total, task_ids_filter)
if not selected:
raise ValueError(f"No tasks selected for suite '{suite_name}' (available: {total}).")
total_tasks += len(selected)
lazy = total_tasks > 50
if lazy:
print(f"Using lazy env creation for {total_tasks} tasks (envs created on demand)")
for suite_name in suite_names:
suite = _get_suite(suite_name)
total = len(suite.tasks)
selected = _select_task_ids(total, task_ids_filter)
for tid in selected:
fns = _make_env_fns(
@@ -562,8 +735,11 @@ def create_libero_envs(
gym_kwargs=gym_kwargs,
control_mode=control_mode,
)
out[suite_name][tid] = env_cls(fns)
print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}")
if lazy:
out[suite_name][tid] = LazyVectorEnv(env_cls, fns)
else:
out[suite_name][tid] = env_cls(fns)
print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}")
# return plain dicts for predictability
return {suite: dict(task_map) for suite, task_map in out.items()}
+11 -5
View File
@@ -25,6 +25,7 @@ import metaworld.policies as policies
import numpy as np
from gymnasium import spaces
from lerobot.envs.lazy_vec_env import LazyVectorEnv
from lerobot.processor import RobotObservation
# ---- Load configuration data from the external JSON file ----
@@ -297,19 +298,24 @@ def create_metaworld_envs(
print(f"Creating Meta-World envs | task_groups={task_groups} | n_envs(per task)={n_envs}")
group_to_tasks = {group: DIFFICULTY_TO_TASKS.get(group, [group]) for group in task_groups}
total_tasks = sum(len(tasks) for tasks in group_to_tasks.values())
lazy = total_tasks > 50
if lazy:
print(f"Using lazy env creation for {total_tasks} tasks (envs created on demand)")
out: dict[str, dict[int, Any]] = defaultdict(dict)
for group in task_groups:
# if not in difficulty presets, treat it as a single custom task
tasks = DIFFICULTY_TO_TASKS.get(group, [group])
tasks = group_to_tasks[group]
for tid, task_name in enumerate(tasks):
print(f"Building vec env | group={group} | task_id={tid} | task={task_name}")
if not lazy:
print(f"Building vec env | group={group} | task_id={tid} | task={task_name}")
# build n_envs factories
fns = [(lambda tn=task_name: MetaworldEnv(task=tn, **gym_kwargs)) for _ in range(n_envs)]
out[group][tid] = env_cls(fns)
out[group][tid] = LazyVectorEnv(env_cls, fns) if lazy else env_cls(fns)
# return a plain dict for consistency
return {group: dict(task_map) for group, task_map in out.items()}
+8 -2
View File
@@ -24,6 +24,7 @@ import gymnasium as gym
import numpy as np
from gymnasium import spaces
from lerobot.envs.lazy_vec_env import LazyVectorEnv
# Action layout (flat 12D, normalized to [-1, 1]):
# [0:3] end_effector_position (delta x, y, z)
@@ -256,8 +257,12 @@ def create_robocasa_envs(
gym_kwargs = dict(gym_kwargs or {})
out: dict[str, dict[int, Any]] = defaultdict(dict)
total_tasks = len(task_list)
lazy = total_tasks > 50
print(f"Creating RoboCasa envs | tasks={task_list} | n_envs(per task)={n_envs} | split={split}")
if lazy:
print(f"Using lazy env creation for {total_tasks} tasks (envs created on demand)")
for task in task_list:
fns = _make_env_fns(
task=task,
@@ -267,7 +272,8 @@ def create_robocasa_envs(
episode_length=episode_length,
gym_kwargs=gym_kwargs,
)
out["robocasa"][len(out["robocasa"])] = env_cls(fns)
print(f" Built vec env | task={task} | n_envs={n_envs}")
out["robocasa"][len(out["robocasa"])] = LazyVectorEnv(env_cls, fns) if lazy else env_cls(fns)
if not lazy:
print(f" Built vec env | task={task} | n_envs={n_envs}")
return {suite: dict(task_map) for suite, task_map in out.items()}
+43 -16
View File
@@ -14,12 +14,16 @@ Install: pip install robomme (or from source: https://github.com/RoboMME/robomm
from __future__ import annotations
from collections.abc import Callable, Sequence
from functools import partial
from typing import Any
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from lerobot.envs.lazy_vec_env import LazyVectorEnv
ROBOMME_TASKS = [
"BinFill", "PickXtimes", "SwingXtimes", "StopCube",
"VideoUnmask", "VideoUnmaskSwap", "ButtonUnmask", "ButtonUnmaskSwap",
@@ -113,6 +117,29 @@ class RoboMMEGymEnv(gym.Env):
}
def _make_env_fns(
*,
task: str,
n_envs: int,
action_space_type: str,
dataset: str,
episode_length: int,
task_id: int,
) -> list[Callable[[], RoboMMEGymEnv]]:
"""Build n_envs factory callables for one RoboMME task id."""
def _make_one(episode_index: int) -> RoboMMEGymEnv:
return RoboMMEGymEnv(
task=task,
action_space_type=action_space_type,
dataset=dataset,
episode_idx=episode_index,
max_steps=episode_length,
)
return [partial(_make_one, task_id + i) for i in range(n_envs)]
def create_robomme_envs(
task: str,
n_envs: int = 1,
@@ -120,35 +147,35 @@ def create_robomme_envs(
dataset: str = "test",
episode_length: int = 300,
task_ids: list[int] | None = None,
env_cls=None,
env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None,
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
"""Create vectorized RoboMME environments for evaluation.
Returns {suite_name: {task_id: VectorEnv}} matching lerobot's expected format.
"""
if env_cls is None:
env_cls = gym.vector.SyncVectorEnv
if env_cls is None or not callable(env_cls):
raise ValueError("env_cls must be a callable that wraps a list of env factory callables.")
if not isinstance(n_envs, int) or n_envs <= 0:
raise ValueError(f"n_envs must be a positive int; got {n_envs}.")
if task_ids is None:
task_ids = [0]
suite_name = "robomme"
envs_by_task = {}
lazy = len(task_ids) > 50
if lazy:
print(f"Using lazy env creation for {len(task_ids)} tasks (envs created on demand)")
for task_id in task_ids:
def _make_one(ep_idx=task_id):
return RoboMMEGymEnv(
task=task,
action_space_type=action_space_type,
dataset=dataset,
episode_idx=ep_idx,
max_steps=episode_length,
)
vec = env_cls(
[_make_one for _ in range(n_envs)],
autoreset_mode=gym.vector.AutoresetMode.SAME_STEP,
fns = _make_env_fns(
task=task,
n_envs=n_envs,
action_space_type=action_space_type,
dataset=dataset,
episode_length=episode_length,
task_id=task_id,
)
envs_by_task[task_id] = vec
envs_by_task[task_id] = LazyVectorEnv(env_cls, fns) if lazy else env_cls(fns)
return {suite_name: envs_by_task}
+462 -176
View File
@@ -49,7 +49,9 @@ You can learn about the CLI options for this script in the `EvalPipelineConfig`
import concurrent.futures as cf
import json
import logging
import re
import shutil
import subprocess
import sys
import threading
import time
from collections import defaultdict
@@ -72,6 +74,8 @@ from tqdm import trange
from lerobot.configs import parser
from lerobot.configs.eval import EvalPipelineConfig
from lerobot.envs.lazy_vec_env import LazyVectorEnv
from lerobot.envs.docker_runtime import run_eval_in_docker
from lerobot.envs.factory import make_env, make_env_pre_post_processors
from lerobot.envs.utils import (
add_envs_task,
@@ -86,6 +90,11 @@ from lerobot.utils.constants import ACTION, DONE, OBS_STR, REWARD
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.io_utils import write_video
from lerobot.utils.random_utils import set_seed
from lerobot.utils.hf_eval_results import (
build_eval_results_rows,
default_eval_date,
upload_eval_results_yaml,
)
from lerobot.utils.utils import (
get_safe_torch_device,
init_logging,
@@ -521,16 +530,22 @@ def push_eval_to_hub(
output_dir: Path,
info: dict,
env_type: str,
eval_config: dict | None = None,
env_task: str | None,
benchmark_dataset_id: str,
source_url: str | None = None,
notes: str | None = None,
) -> str:
"""Upload eval results, videos, config, and an updated model card to the Hub.
"""Upload eval artifacts and `.eval_results` rows to the Hub.
Args:
repo_id: HF model repo (e.g. "user/my_policy").
output_dir: Local directory containing eval_info.json and videos/.
info: The eval results dict (as returned by eval_policy_all).
env_type: Environment type string (e.g. "libero_plus", "pusht").
eval_config: Serialized EvalPipelineConfig dict (policy + env + eval settings).
env_task: The env task string from eval config.
benchmark_dataset_id: HF dataset id of the consolidated benchmark dataset.
source_url: Optional source URL for `.eval_results` attribution.
notes: Optional setup notes to include in `.eval_results`.
Returns:
URL of the last Hub commit.
@@ -572,96 +587,95 @@ def push_eval_to_hub(
commit_message=f"Upload eval rollout videos for {env_type}",
)
# 4. Update the model card with an eval results table and config summary
_update_model_card_with_eval(api, repo_id, info, env_type, eval_config=eval_config)
# 4. Upload HF-native `.eval_results` rows (canonical leaderboard surface).
rows = build_eval_results_rows(
info=info,
env_type=env_type,
env_task=env_task,
benchmark_dataset_id=benchmark_dataset_id,
source_url=source_url,
notes=notes,
eval_date=default_eval_date(),
)
commit_url = upload_eval_results_yaml(
api=api,
repo_id=repo_id,
rows=rows,
env_type=env_type,
env_task=env_task,
output_dir=output_dir,
)
logging.info(f"Eval results pushed to https://huggingface.co/{repo_id}")
return commit_url
def _format_eval_table(info: dict, env_type: str, eval_config: dict | None = None) -> str:
"""Build a markdown table from eval results, optionally including config."""
lines = [
f"### Evaluation: `{env_type}`\n",
"| Suite | Success Rate (%) | Avg Sum Reward | Episodes |",
"|-------|-----------------|----------------|----------|",
]
per_group = info.get("per_group", {})
for group_name, stats in sorted(per_group.items()):
sr = stats.get("pc_success", float("nan"))
reward = stats.get("avg_sum_reward", float("nan"))
n_ep = stats.get("n_episodes", 0)
lines.append(f"| {group_name} | {sr:.1f} | {reward:.2f} | {n_ep} |")
overall = info.get("overall", {})
if overall:
sr = overall.get("pc_success", float("nan"))
reward = overall.get("avg_sum_reward", float("nan"))
n_ep = overall.get("n_episodes", 0)
lines.append(f"| **Overall** | **{sr:.1f}** | **{reward:.2f}** | **{n_ep}** |")
if eval_config:
lines.append("")
lines.append("<details><summary>Eval configuration</summary>\n")
lines.append("```json")
lines.append(json.dumps(eval_config, indent=2))
lines.append("```\n")
lines.append("</details>")
video_paths = overall.get("video_paths", [])
if video_paths:
lines.append("")
lines.append("<details><summary>Rollout videos</summary>\n")
for vp in video_paths[:10]:
video_name = Path(vp).name
parent = Path(vp).parent.name
lines.append(f"**{parent}/{video_name}**\n")
lines.append(f"![{video_name}](eval/{env_type}/videos/{parent}/{video_name})\n")
lines.append("</details>")
return "\n".join(lines)
def _update_model_card_with_eval(
api: Any, repo_id: str, info: dict, env_type: str, eval_config: dict | None = None
) -> None:
"""Append or replace the eval section in the model card README."""
from huggingface_hub import ModelCard
try:
card = ModelCard.load(repo_id)
except Exception:
card = ModelCard("")
content = card.content or ""
eval_table = _format_eval_table(info, env_type, eval_config=eval_config)
section_marker_start = f"<!-- eval-results-{env_type}-start -->"
section_marker_end = f"<!-- eval-results-{env_type}-end -->"
new_section = f"{section_marker_start}\n{eval_table}\n{section_marker_end}"
if section_marker_start in content:
content = re.sub(
rf"{re.escape(section_marker_start)}.*?{re.escape(section_marker_end)}",
new_section,
content,
flags=re.DOTALL,
)
else:
eval_header = "\n## Evaluation Results\n\n"
if "## Evaluation Results" not in content:
content += eval_header
content += f"\n{new_section}\n"
card.content = content
card.push_to_hub(repo_id, commit_message=f"Update eval results for {env_type}")
@parser.wrap()
def eval_main(cfg: EvalPipelineConfig):
logging.info(pformat(asdict(cfg)))
if cfg.eval.instance_count > 1 and cfg.eval.instance_id == 0:
_orchestrate_multi_instance_eval(cfg)
else:
_run_eval_worker(cfg)
def _maybe_add_libero_plus_perturbation(info: dict, cfg: EvalPipelineConfig) -> None:
if cfg.env.type != "libero_plus":
return
try:
from lerobot.envs.libero import aggregate_by_perturbation, build_perturbation_index
suite_names = [s.strip() for s in cfg.env.task.split(",") if s.strip()]
suite_indices = {s: build_perturbation_index(s) for s in suite_names}
perturbation_results = aggregate_by_perturbation(info["per_task"], suite_indices)
info["perturbation_results"] = perturbation_results
print("\n=== Perturbation Results ===")
for dim, stats in perturbation_results.items():
print(f" {dim}: {stats['pc_success']:.1f}% ({stats['n_episodes']} episodes)")
except Exception as exc:
# Never fail a finished long-running eval on post-processing.
print(f"WARNING: Failed to compute LIBERO-Plus perturbation breakdown: {exc}")
print("Continuing with per-suite + overall metrics only.")
def _save_eval_outputs(cfg: EvalPipelineConfig, info: dict) -> None:
output_dir = Path(cfg.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "eval_info.json", "w") as f:
json.dump(info, f, indent=2)
eval_cfg_dict = _serializable_config(asdict(cfg))
with open(output_dir / "eval_config.json", "w") as f:
json.dump(eval_cfg_dict, f, indent=2)
def _maybe_push_eval_outputs(cfg: EvalPipelineConfig, info: dict) -> None:
if not cfg.push_to_hub:
return
repo_id = str(cfg.policy.pretrained_path)
try:
push_eval_to_hub(
repo_id=repo_id,
output_dir=Path(cfg.output_dir),
info=info,
env_type=cfg.env.type,
env_task=cfg.env.task,
benchmark_dataset_id=cfg.benchmark_dataset_id,
source_url=cfg.eval_result_source_url,
notes=cfg.eval_result_notes,
)
except Exception as exc:
logging.warning("Failed to push eval artifacts/results to Hub: %s", exc)
def _run_eval_worker(cfg: EvalPipelineConfig) -> dict:
logging.info(pformat(asdict(cfg)))
if cfg.eval.runtime == "docker":
run_eval_in_docker(cfg)
output_dir = Path(cfg.output_dir)
with open(output_dir / "eval_info.json") as f:
return json.load(f)
# Check device is available
device = get_safe_torch_device(cfg.policy.device, log=True)
@@ -705,50 +719,120 @@ def eval_main(cfg: EvalPipelineConfig):
# Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
info = eval_policy_all(
envs=envs,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
)
print("Overall Aggregated Metrics:")
print(info["overall"])
try:
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
info = eval_policy_all(
envs=envs,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
instance_count=cfg.eval.instance_count,
instance_id=cfg.eval.instance_id,
)
print("Overall Aggregated Metrics:")
print(info["overall"])
# Print per-suite stats
for task_group, task_group_info in info.items():
print(f"\nAggregated Metrics for {task_group}:")
print(task_group_info)
# Close all vec envs
close_envs(envs)
for key, val in info.get("per_group", {}).items():
print(f"\nAggregated Metrics for {key}:")
print(val)
# Save info
output_dir = Path(cfg.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "eval_info.json", "w") as f:
json.dump(info, f, indent=2)
_maybe_add_libero_plus_perturbation(info, cfg)
finally:
close_envs(envs)
eval_cfg_dict = _serializable_config(asdict(cfg))
with open(output_dir / "eval_config.json", "w") as f:
json.dump(eval_cfg_dict, f, indent=2)
_save_eval_outputs(cfg, info)
_maybe_push_eval_outputs(cfg, info)
if cfg.push_to_hub:
repo_id = str(cfg.policy.pretrained_path)
push_eval_to_hub(
repo_id=repo_id,
output_dir=output_dir,
info=info,
env_type=cfg.env.type,
eval_config=eval_cfg_dict,
)
logging.info("End of eval")
return info
def _orchestrate_multi_instance_eval(cfg: EvalPipelineConfig) -> None:
start_t = time.time()
root_output_dir = Path(cfg.output_dir)
instances_root = root_output_dir / "instances"
instances_root.mkdir(parents=True, exist_ok=True)
n_instances = cfg.eval.instance_count
logging.info(f"Launching multi-instance eval with {n_instances} workers.")
# Spawn workers for shard 1..N-1, run shard 0 in-process.
child_procs: list[tuple[int, subprocess.Popen]] = []
argv = [
arg
for arg in sys.argv[1:]
if not arg.startswith("--eval.instance_id=")
and not arg.startswith("--output_dir=")
and not arg.startswith("--push_to_hub=")
]
for i in range(1, n_instances):
child_output_dir = instances_root / str(i)
cmd = [
sys.executable,
"-m",
"lerobot.scripts.lerobot_eval",
*argv,
f"--eval.instance_id={i}",
f"--output_dir={child_output_dir}",
"--push_to_hub=false",
]
logging.info("Starting eval worker %s/%s", i + 1, n_instances)
child_procs.append((i, subprocess.Popen(cmd)))
cfg0 = deepcopy(cfg)
cfg0.eval.instance_id = 0
cfg0.push_to_hub = False
cfg0.output_dir = instances_root / "0"
_run_eval_worker(cfg0)
failed = []
for idx, proc in child_procs:
rc = proc.wait()
if rc != 0:
failed.append((idx, rc))
if failed:
raise RuntimeError(f"Multi-instance eval failed for workers: {failed}")
partial_infos: list[dict] = []
for i in range(n_instances):
info_path = instances_root / str(i) / "eval_info.json"
with open(info_path) as f:
partial_infos.append(json.load(f))
merged_per_task = []
for info in partial_infos:
merged_per_task.extend(info.get("per_task", []))
merged_per_task.sort(key=lambda x: (x["task_group"], x["task_id"]))
# Merge videos from each shard into final output dir.
merged_videos_dir = root_output_dir / "videos"
for i in range(n_instances):
shard_dir = instances_root / str(i)
shard_videos = shard_dir / "videos"
if shard_videos.is_dir():
shutil.copytree(shard_videos, merged_videos_dir, dirs_exist_ok=True)
old_prefix = str(shard_videos)
new_prefix = str(merged_videos_dir)
for entry in merged_per_task:
paths = entry.get("metrics", {}).get("video_paths", [])
entry["metrics"]["video_paths"] = [
p.replace(old_prefix, new_prefix, 1) if p.startswith(old_prefix) else p for p in paths
]
merged_info = _aggregate_eval_from_per_task(merged_per_task, total_eval_s=time.time() - start_t)
_maybe_add_libero_plus_perturbation(merged_info, cfg)
print("Overall Aggregated Metrics:")
print(merged_info["overall"])
_save_eval_outputs(cfg, merged_info)
_maybe_push_eval_outputs(cfg, merged_info)
logging.info("End of eval")
@@ -763,6 +847,177 @@ class TaskMetrics(TypedDict):
ACC_KEYS = ("sum_rewards", "max_rewards", "successes", "video_paths")
def _aggregate_eval_from_per_task(per_task_infos: list[dict], total_eval_s: float) -> dict:
"""Aggregate eval metrics from per-task payloads."""
group_acc: dict[str, dict[str, list]] = defaultdict(lambda: {k: [] for k in ACC_KEYS})
overall: dict[str, list] = {k: [] for k in ACC_KEYS}
def _append(group: str, key: str, value: Any):
if value is None:
return
if isinstance(value, list):
group_acc[group][key].extend(value)
overall[key].extend(value)
else:
group_acc[group][key].append(value)
overall[key].append(value)
for entry in per_task_infos:
group = entry["task_group"]
metrics = entry["metrics"]
_append(group, "sum_rewards", metrics.get("sum_rewards"))
_append(group, "max_rewards", metrics.get("max_rewards"))
_append(group, "successes", metrics.get("successes"))
paths = metrics.get("video_paths", [])
if paths:
group_acc[group]["video_paths"].extend(paths)
overall["video_paths"].extend(paths)
def _agg_from_list(xs: list[float]) -> float:
if not xs:
return float("nan")
arr = np.array(xs, dtype=float)
return float(np.nanmean(arr))
groups_aggregated = {}
for group, acc in group_acc.items():
groups_aggregated[group] = {
"avg_sum_reward": _agg_from_list(acc["sum_rewards"]),
"avg_max_reward": _agg_from_list(acc["max_rewards"]),
"pc_success": _agg_from_list(acc["successes"]) * 100 if acc["successes"] else float("nan"),
"n_episodes": len(acc["sum_rewards"]),
"video_paths": list(acc["video_paths"]),
}
overall_agg = {
"avg_sum_reward": _agg_from_list(overall["sum_rewards"]),
"avg_max_reward": _agg_from_list(overall["max_rewards"]),
"pc_success": _agg_from_list(overall["successes"]) * 100 if overall["successes"] else float("nan"),
"n_episodes": len(overall["sum_rewards"]),
"eval_s": total_eval_s,
"eval_ep_s": total_eval_s / max(1, len(overall["sum_rewards"])),
"video_paths": list(overall["video_paths"]),
}
return {"per_task": per_task_infos, "per_group": groups_aggregated, "overall": overall_agg}
def _eval_task_batch(
batch: list[tuple[str, int, LazyVectorEnv]],
policy,
env_preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
env_postprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
postprocessor: PolicyProcessorPipeline[PolicyAction, PolicyAction],
start_seed: int | None,
max_episodes_rendered: int = 0,
videos_dir: Path | None = None,
) -> list[tuple[str, int, TaskMetrics]]:
"""Evaluate N tasks in a single batched rollout for GPU efficiency.
Each task contributes one sub-env to a combined SyncVectorEnv so the policy
processes all N observations in one forward pass per step.
"""
all_fns: list[Callable] = []
task_slices: list[tuple[str, int, int, int]] = []
offset = 0
for task_group, task_id, lazy_env in batch:
fns = lazy_env.factory_fns
if not fns:
continue
start = offset
offset += len(fns)
all_fns.extend(fns)
task_slices.append((task_group, task_id, start, offset))
if not all_fns:
return []
env_cls = batch[0][2].env_cls
combined_env = env_cls(all_fns)
try:
seeds = None
if start_seed is not None:
seeds = list(range(start_seed, start_seed + combined_env.num_envs))
ep_frames: list[np.ndarray] = []
def render_frame(env: gym.vector.VectorEnv):
if max_episodes_rendered <= 0:
return
n = min(max_episodes_rendered, env.num_envs)
if isinstance(env, gym.vector.SyncVectorEnv):
ep_frames.append(np.stack([env.envs[i].render() for i in range(n)]))
elif isinstance(env, gym.vector.AsyncVectorEnv):
ep_frames.append(np.stack(env.call("render")[:n]))
rollout_data = rollout(
env=combined_env,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
seeds=seeds,
render_callback=render_frame if max_episodes_rendered > 0 else None,
)
n_steps = rollout_data["done"].shape[1]
done_indices = torch.argmax(rollout_data["done"].to(int), dim=1)
mask = (torch.arange(n_steps) <= einops.repeat(done_indices + 1, "b -> b s", s=n_steps)).int()
batch_sum_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "sum")
batch_max_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "max")
batch_successes = einops.reduce((rollout_data["success"] * mask), "b n -> b", "any")
video_paths_per_task: dict[tuple[str, int], list[str]] = defaultdict(list)
if max_episodes_rendered > 0 and ep_frames and videos_dir:
stacked = np.stack(ep_frames, axis=1) # (batch, time, h, w, c)
rendered = 0
threads = []
for tg, tid, start_i, end_i in task_slices:
if rendered >= max_episodes_rendered:
break
task_dir = videos_dir / f"{tg}_{tid}"
task_dir.mkdir(parents=True, exist_ok=True)
for env_idx in range(start_i, end_i):
if rendered >= max_episodes_rendered:
break
episode_index = env_idx - start_i
video_path = task_dir / f"eval_episode_{episode_index}.mp4"
video_paths_per_task[(tg, tid)].append(str(video_path))
di = done_indices[env_idx].item()
thread = threading.Thread(
target=write_video,
args=(
str(video_path),
stacked[env_idx, : di + 1],
combined_env.unwrapped.metadata["render_fps"],
),
)
thread.start()
threads.append(thread)
rendered += 1
for t in threads:
t.join()
results: list[tuple[str, int, TaskMetrics]] = []
for tg, tid, start_i, end_i in task_slices:
results.append((
tg,
tid,
TaskMetrics(
sum_rewards=batch_sum_rewards[start_i:end_i].tolist(),
max_rewards=batch_max_rewards[start_i:end_i].tolist(),
successes=batch_successes[start_i:end_i].tolist(),
video_paths=video_paths_per_task.get((tg, tid), []),
),
))
return results
finally:
combined_env.close()
def eval_one(
env: gym.vector.VectorEnv,
*,
@@ -807,7 +1062,7 @@ def eval_one(
def run_one(
task_group: str,
task_id: int,
env,
env: Any,
*,
policy,
env_preprocessor,
@@ -851,7 +1106,7 @@ def run_one(
def eval_policy_all(
envs: dict[str, dict[int, gym.vector.VectorEnv]],
envs: dict[str, dict[int, Any]],
policy,
env_preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
env_postprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
@@ -864,6 +1119,8 @@ def eval_policy_all(
return_episode_data: bool = False,
start_seed: int | None = None,
max_parallel_tasks: int = 1,
instance_count: int = 1,
instance_id: int = 0,
) -> dict:
"""
Evaluate a nested `envs` dict: {task_group: {task_id: vec_env}}.
@@ -876,6 +1133,13 @@ def eval_policy_all(
# Flatten envs into list of (task_group, task_id, env)
tasks = [(tg, tid, vec) for tg, group in envs.items() for tid, vec in group.items()]
if instance_count > 1:
total_tasks = len(tasks)
tasks = [task for idx, task in enumerate(tasks) if idx % instance_count == instance_id]
logging.info(
f"Instance shard {instance_id + 1}/{instance_count}: "
f"{len(tasks)}/{total_tasks} tasks assigned."
)
# accumulators: track metrics at both per-group level and across all groups
group_acc: dict[str, dict[str, list]] = defaultdict(lambda: {k: [] for k in ACC_KEYS})
@@ -921,59 +1185,81 @@ def eval_policy_all(
start_seed=start_seed,
)
if max_parallel_tasks <= 1:
# sequential path (single accumulator path on the main thread)
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
for task_group, task_id, env in tasks:
tg, tid, metrics = task_runner(task_group, task_id, env)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
else:
# threaded path: submit all tasks, consume completions on main thread and accumulate there
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
fut2meta = {}
for task_group, task_id, env in tasks:
fut = executor.submit(task_runner, task_group, task_id, env)
fut2meta[fut] = (task_group, task_id)
for fut in cf.as_completed(fut2meta):
tg, tid, metrics = fut.result()
all_lazy = all(isinstance(env, LazyVectorEnv) for _, _, env in tasks)
single_factory_per_task = all(
not isinstance(env, LazyVectorEnv) or env.num_factory_fns == 1 for _, _, env in tasks
)
can_batch = max_parallel_tasks > 1 and all_lazy and single_factory_per_task and n_episodes == 1
if can_batch:
# Multi-task batched path: combine N tasks into one SyncVectorEnv per chunk
# so the policy processes all N observations in a single forward pass per step.
chunk_size = max_parallel_tasks
logging.info(f"Task scheduler mode: batched_lazy (chunk_size={chunk_size})")
n_chunks = (len(tasks) + chunk_size - 1) // chunk_size
rendered_so_far = 0
for chunk_idx in range(n_chunks):
chunk = tasks[chunk_idx * chunk_size : (chunk_idx + 1) * chunk_size]
render_budget = max(0, max_episodes_rendered - rendered_so_far)
logging.info(
f"Batch {chunk_idx + 1}/{n_chunks}: evaluating {len(chunk)} tasks "
f"({chunk_idx * chunk_size + 1}{chunk_idx * chunk_size + len(chunk)}/{len(tasks)})"
)
batch_results = _eval_task_batch(
chunk,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
start_seed=start_seed,
max_episodes_rendered=render_budget,
videos_dir=videos_dir,
)
for tg, tid, metrics in batch_results:
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
rendered_so_far += len(metrics.get("video_paths", []))
# compute aggregated metrics helper (robust to lists/scalars)
def _agg_from_list(xs):
if not xs:
return float("nan")
arr = np.array(xs, dtype=float)
return float(np.nanmean(arr))
if overall["successes"]:
sr = np.nanmean(overall["successes"]) * 100
logging.info(f" running success rate: {sr:.1f}%")
elif max_parallel_tasks <= 1:
logging.info("Task scheduler mode: sequential")
for task_group, task_id, env in tasks:
try:
tg, tid, metrics = task_runner(task_group, task_id, env)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
else:
# Threaded fallback for cases where batched lazy mode cannot be used.
if all_lazy and n_episodes != 1:
logging.info(
"Task scheduler mode: threaded (lazy batching disabled because n_episodes != 1)"
)
elif all_lazy and not single_factory_per_task:
logging.info(
"Task scheduler mode: threaded (lazy batching disabled because eval.batch_size > 1)"
)
else:
logging.info(f"Task scheduler mode: threaded (max_workers={max_parallel_tasks})")
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
fut2meta: dict[cf.Future, tuple[str, int, Any]] = {}
for task_group, task_id, env in tasks:
fut = executor.submit(task_runner, task_group, task_id, env)
fut2meta[fut] = (task_group, task_id, env)
for fut in cf.as_completed(fut2meta):
tg, tid, env = fut2meta[fut]
try:
_, _, metrics = fut.result()
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
# compute per-group aggregates
groups_aggregated = {}
for group, acc in group_acc.items():
groups_aggregated[group] = {
"avg_sum_reward": _agg_from_list(acc["sum_rewards"]),
"avg_max_reward": _agg_from_list(acc["max_rewards"]),
"pc_success": _agg_from_list(acc["successes"]) * 100 if acc["successes"] else float("nan"),
"n_episodes": len(acc["sum_rewards"]),
"video_paths": list(acc["video_paths"]),
}
# overall aggregates
overall_agg = {
"avg_sum_reward": _agg_from_list(overall["sum_rewards"]),
"avg_max_reward": _agg_from_list(overall["max_rewards"]),
"pc_success": _agg_from_list(overall["successes"]) * 100 if overall["successes"] else float("nan"),
"n_episodes": len(overall["sum_rewards"]),
"eval_s": time.time() - start_t,
"eval_ep_s": (time.time() - start_t) / max(1, len(overall["sum_rewards"])),
"video_paths": list(overall["video_paths"]),
}
return {
"per_task": per_task_infos,
"per_group": groups_aggregated,
"overall": overall_agg,
}
return _aggregate_eval_from_per_task(per_task_infos, total_eval_s=time.time() - start_t)
def main():
+62
View File
@@ -0,0 +1,62 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from lerobot.envs.lazy_vec_env import LazyVectorEnv
class _DummyVectorEnv:
def __init__(self):
self.marker = "ok"
self.closed = False
def close(self):
self.closed = True
def test_lazy_vec_env_materializes_only_on_access():
created = []
def _make_env(fns):
created.append(len(fns))
return _DummyVectorEnv()
lazy = LazyVectorEnv(_make_env, [lambda: None, lambda: None])
assert created == []
assert lazy.num_factory_fns == 2
assert lazy.marker == "ok"
assert created == [2]
# Second access should re-use the same materialized env.
assert lazy.marker == "ok"
assert created == [2]
def test_lazy_vec_env_can_rematerialize_after_close():
created = []
def _make_env(fns):
created.append(len(fns))
return _DummyVectorEnv()
lazy = LazyVectorEnv(_make_env, [lambda: None])
lazy.materialize()
assert created == [1]
lazy.close()
lazy.materialize()
assert created == [1, 1]
+244
View File
@@ -0,0 +1,244 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from lerobot.envs.lazy_vec_env import LazyVectorEnv
from lerobot.scripts import lerobot_eval
class _DummyTaskEnv:
def __init__(self):
self.close_calls = 0
def close(self):
self.close_calls += 1
class _TrackedLazyEnv(LazyVectorEnv):
def __init__(self, n_factory_fns: int = 1):
super().__init__(lambda fns: None, [lambda: None for _ in range(n_factory_fns)])
self.close_calls = 0
def close(self):
self.close_calls += 1
super().close()
def _fake_metrics():
return {
"sum_rewards": [1.0],
"max_rewards": [1.0],
"successes": [True],
"video_paths": [],
}
def test_eval_policy_all_sequential_closes_envs(monkeypatch):
def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001
return task_group, task_id, _fake_metrics()
monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one)
env_a = _DummyTaskEnv()
env_b = _DummyTaskEnv()
envs = {"suite": {0: env_a, 1: env_b}}
result = lerobot_eval.eval_policy_all(
envs=envs,
policy=None,
env_preprocessor=None,
env_postprocessor=None,
preprocessor=None,
postprocessor=None,
n_episodes=1,
max_parallel_tasks=1,
)
assert env_a.close_calls == 1
assert env_b.close_calls == 1
assert result["overall"]["n_episodes"] == 2
def test_eval_policy_all_threaded_fallback_closes_envs(monkeypatch):
def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001
return task_group, task_id, _fake_metrics()
monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one)
env_a = _DummyTaskEnv()
env_b = _DummyTaskEnv()
env_c = _DummyTaskEnv()
envs = {"suite": {0: env_a, 1: env_b, 2: env_c}}
result = lerobot_eval.eval_policy_all(
envs=envs,
policy=None,
env_preprocessor=None,
env_postprocessor=None,
preprocessor=None,
postprocessor=None,
n_episodes=1,
max_parallel_tasks=2,
)
assert env_a.close_calls == 1
assert env_b.close_calls == 1
assert env_c.close_calls == 1
assert result["overall"]["n_episodes"] == 3
def test_eval_policy_all_uses_batched_lazy_mode(monkeypatch):
def _run_one_should_not_be_called(*args, **kwargs):
raise AssertionError("run_one should not run in batched lazy mode")
chunk_sizes = []
def _fake_eval_task_batch(chunk, **kwargs): # noqa: ARG001
chunk_sizes.append(len(chunk))
return [(tg, tid, _fake_metrics()) for tg, tid, _ in chunk]
monkeypatch.setattr(lerobot_eval, "run_one", _run_one_should_not_be_called)
monkeypatch.setattr(lerobot_eval, "_eval_task_batch", _fake_eval_task_batch)
envs = {
"suite": {
0: LazyVectorEnv(lambda fns: None, [lambda: None]),
1: LazyVectorEnv(lambda fns: None, [lambda: None]),
2: LazyVectorEnv(lambda fns: None, [lambda: None]),
}
}
result = lerobot_eval.eval_policy_all(
envs=envs,
policy=None,
env_preprocessor=None,
env_postprocessor=None,
preprocessor=None,
postprocessor=None,
n_episodes=1,
max_parallel_tasks=2,
)
assert chunk_sizes == [2, 1]
assert result["overall"]["n_episodes"] == 3
def test_eval_policy_all_disables_batched_lazy_when_n_episodes_not_one(monkeypatch):
def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001
return task_group, task_id, _fake_metrics()
def _batch_should_not_run(*args, **kwargs):
raise AssertionError("_eval_task_batch should not run when n_episodes != 1")
monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one)
monkeypatch.setattr(lerobot_eval, "_eval_task_batch", _batch_should_not_run)
env_a = _TrackedLazyEnv()
env_b = _TrackedLazyEnv()
envs = {"suite": {0: env_a, 1: env_b}}
result = lerobot_eval.eval_policy_all(
envs=envs,
policy=None,
env_preprocessor=None,
env_postprocessor=None,
preprocessor=None,
postprocessor=None,
n_episodes=2,
max_parallel_tasks=2,
)
assert env_a.close_calls == 1
assert env_b.close_calls == 1
assert result["overall"]["n_episodes"] == 2
def test_eval_policy_all_disables_batched_lazy_when_batch_size_above_one(monkeypatch):
def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001
return task_group, task_id, _fake_metrics()
def _batch_should_not_run(*args, **kwargs):
raise AssertionError("_eval_task_batch should not run when eval.batch_size > 1")
monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one)
monkeypatch.setattr(lerobot_eval, "_eval_task_batch", _batch_should_not_run)
env_a = _TrackedLazyEnv(n_factory_fns=2)
env_b = _TrackedLazyEnv(n_factory_fns=2)
envs = {"suite": {0: env_a, 1: env_b}}
result = lerobot_eval.eval_policy_all(
envs=envs,
policy=None,
env_preprocessor=None,
env_postprocessor=None,
preprocessor=None,
postprocessor=None,
n_episodes=1,
max_parallel_tasks=2,
)
assert env_a.close_calls == 1
assert env_b.close_calls == 1
assert result["overall"]["n_episodes"] == 2
def test_eval_policy_all_applies_instance_sharding(monkeypatch):
called = []
def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001
called.append(task_id)
return task_group, task_id, _fake_metrics()
monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one)
envs = {"suite": {0: _DummyTaskEnv(), 1: _DummyTaskEnv(), 2: _DummyTaskEnv(), 3: _DummyTaskEnv()}}
result = lerobot_eval.eval_policy_all(
envs=envs,
policy=None,
env_preprocessor=None,
env_postprocessor=None,
preprocessor=None,
postprocessor=None,
n_episodes=1,
max_parallel_tasks=1,
instance_count=2,
instance_id=1,
)
assert called == [1, 3]
assert result["overall"]["n_episodes"] == 2
def test_aggregate_eval_from_per_task_merges_groups_and_overall():
per_task = [
{
"task_group": "a",
"task_id": 0,
"metrics": {"sum_rewards": [1.0], "max_rewards": [2.0], "successes": [True], "video_paths": ["v0"]},
},
{
"task_group": "b",
"task_id": 1,
"metrics": {"sum_rewards": [3.0], "max_rewards": [4.0], "successes": [False], "video_paths": []},
},
]
merged = lerobot_eval._aggregate_eval_from_per_task(per_task, total_eval_s=10.0)
assert merged["overall"]["n_episodes"] == 2
assert merged["overall"]["avg_sum_reward"] == 2.0
assert merged["overall"]["pc_success"] == 50.0
assert merged["overall"]["eval_s"] == 10.0
assert set(merged["per_group"]) == {"a", "b"}