diff --git a/docker/Dockerfile.eval-base b/docker/Dockerfile.eval-base new file mode 100644 index 000000000..3b70764ff --- /dev/null +++ b/docker/Dockerfile.eval-base @@ -0,0 +1,59 @@ +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ARG CUDA_VERSION=12.4.1 +ARG OS_VERSION=22.04 +FROM nvidia/cuda:${CUDA_VERSION}-base-ubuntu${OS_VERSION} + +ARG PYTHON_VERSION=3.12 + +ENV DEBIAN_FRONTEND=noninteractive \ + MUJOCO_GL=egl \ + PATH=/lerobot/.venv/bin:$PATH + +RUN apt-get update && apt-get install -y --no-install-recommends \ + software-properties-common build-essential git curl \ + libglib2.0-0 libgl1-mesa-glx libegl1-mesa libosmesa6 \ + ffmpeg libusb-1.0-0-dev speech-dispatcher libgeos-dev portaudio19-dev \ + cmake pkg-config ninja-build \ + && add-apt-repository -y ppa:deadsnakes/ppa \ + && apt-get update \ + && apt-get install -y --no-install-recommends \ + python${PYTHON_VERSION} \ + python${PYTHON_VERSION}-venv \ + python${PYTHON_VERSION}-dev \ + && curl -LsSf https://astral.sh/uv/install.sh | sh \ + && mv /root/.local/bin/uv /usr/local/bin/uv \ + && useradd --create-home --shell /bin/bash user_lerobot \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +WORKDIR /lerobot +RUN chown -R user_lerobot:user_lerobot /lerobot +USER user_lerobot + +ENV HOME=/home/user_lerobot \ + HF_HOME=/home/user_lerobot/.cache/huggingface \ + HF_LEROBOT_HOME=/home/user_lerobot/.cache/huggingface/lerobot \ + TORCH_HOME=/home/user_lerobot/.cache/torch \ + TRITON_CACHE_DIR=/home/user_lerobot/.cache/triton + +RUN uv venv --seed --python python${PYTHON_VERSION} + +COPY --chown=user_lerobot:user_lerobot setup.py pyproject.toml README.md MANIFEST.in ./ +COPY --chown=user_lerobot:user_lerobot src/ src/ +RUN uv pip install --no-cache . + +COPY --chown=user_lerobot:user_lerobot . . + +CMD ["/bin/bash"] diff --git a/docker/Dockerfile.eval-libero b/docker/Dockerfile.eval-libero new file mode 100644 index 000000000..f12ec03ff --- /dev/null +++ b/docker/Dockerfile.eval-libero @@ -0,0 +1,22 @@ +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM lerobot-eval-base:latest + +RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -m pip install --no-cache-dir ".[libero]" \ + && PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -c "import libero" + +CMD ["/bin/bash"] diff --git a/docker/Dockerfile.eval-libero-plus b/docker/Dockerfile.eval-libero-plus new file mode 100644 index 000000000..ad626cc21 --- /dev/null +++ b/docker/Dockerfile.eval-libero-plus @@ -0,0 +1,25 @@ +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM lerobot-eval-base:latest + +RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -m pip install --no-cache-dir ".[libero_plus]" \ + && git clone --depth 1 https://github.com/sylvestf/LIBERO-plus.git /tmp/LIBERO-plus \ + && PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -c "import pathlib, site; pathlib.Path(site.getsitepackages()[0], 'libero_plus_repo.pth').write_text('/tmp/LIBERO-plus\n')" \ + && PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -c "import libero, robosuite, bddl" + +CMD ["/bin/bash"] diff --git a/docker/Dockerfile.eval-metaworld b/docker/Dockerfile.eval-metaworld new file mode 100644 index 000000000..9a5d0a1e3 --- /dev/null +++ b/docker/Dockerfile.eval-metaworld @@ -0,0 +1,22 @@ +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM lerobot-eval-base:latest + +RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -m pip install --no-cache-dir ".[metaworld]" \ + && PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -c "import metaworld" + +CMD ["/bin/bash"] diff --git a/docker/Dockerfile.eval-robocasa b/docker/Dockerfile.eval-robocasa new file mode 100644 index 000000000..bf9614673 --- /dev/null +++ b/docker/Dockerfile.eval-robocasa @@ -0,0 +1,22 @@ +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM lerobot-eval-base:latest + +RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -m pip install --no-cache-dir ".[robocasa]" \ + && PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -c "import robocasa" + +CMD ["/bin/bash"] diff --git a/docker/Dockerfile.eval-robomme b/docker/Dockerfile.eval-robomme new file mode 100644 index 000000000..6027e7e0b --- /dev/null +++ b/docker/Dockerfile.eval-robomme @@ -0,0 +1,22 @@ +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM lerobot-eval-base:latest + +RUN PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -m pip install --no-cache-dir ".[robomme]" \ + && PATH=/usr/bin:/bin:/lerobot/.venv/bin:$PATH \ + /lerobot/.venv/bin/python -c "import robomme" + +CMD ["/bin/bash"] diff --git a/docs/source/benchmark_training.mdx b/docs/source/benchmark_training.mdx index ffe20ae51..7f5d71557 100644 --- a/docs/source/benchmark_training.mdx +++ b/docs/source/benchmark_training.mdx @@ -47,6 +47,116 @@ For multi-GPU training you also need [Accelerate](https://huggingface.co/docs/ac pip install accelerate ``` +## Docker-isolated evaluation (EnvHub) + +LeRobot eval now supports running the full eval worker in a Docker container +while keeping policy loading compatible with local checkpoints and local code changes. + +Use `lerobot-eval` with `--eval.runtime=docker`: + +```bash +lerobot-eval \ + --policy.path=outputs/train/my_policy/checkpoints/050000/pretrained_model \ + --env.type=libero_plus \ + --eval.runtime=docker \ + --eval.docker.envhub_ref=envhub://lerobot/libero_plus@v1 \ + --eval.n_episodes=10 \ + --eval.batch_size=10 +``` + +`eval.docker.envhub_ref` is optional. If omitted, LeRobot resolves a default +image from `env.type`. You can also override the image directly: + +```bash +--eval.docker.image=docker://ghcr.io/huggingface/lerobot-eval-libero-plus:latest +``` + +By default (`eval.docker.use_local_code=true`), the local repository is mounted +in the container and added to `PYTHONPATH`, so edited policy/env code and local +checkpoints continue to work without rebuilding the image for each change. + +Common Docker runtime options: + +```bash +--eval.docker.pull=true \ +--eval.docker.gpus=all \ +--eval.docker.shm_size=8g \ +--eval.docker.use_local_code=true +``` + +The benchmark runner supports the same Docker eval path (extra args are +forwarded to each generated `lerobot-eval` call): + +```bash +lerobot-benchmark eval \ + --benchmarks libero_plus,robocasa \ + --hub-user $HF_USER \ + --n-episodes 50 \ + --eval.runtime=docker \ + --eval.docker.pull=true +``` + +Build benchmark images locally: + +```bash +make build-eval-images +``` + +## Fast single-machine eval tuning + +`lerobot-eval` now has two orthogonal throughput knobs: + +- `eval.batch_size`: number of sub-envs per task (inside one vector env). +- `env.max_parallel_tasks`: number of tasks scheduled concurrently. +- `eval.instance_count`: number of full eval instances (process-level sharding). + +Use them in this order: + +1. Increase `eval.batch_size` first for per-task throughput. +2. Then increase `env.max_parallel_tasks` to overlap tasks, while monitoring RAM/VRAM. +3. Optionally increase `eval.instance_count` for process-level parallelism (best with enough CPU/RAM and small models). + +The eval logs print the active scheduler mode (`sequential`, `threaded`, or `batched_lazy`) so you can verify the effective concurrency path. + +### Suggested starting points + +| Benchmark | Conservative | Faster (single GPU) | Notes | +|---|---|---|---| +| `libero` / `libero_plus` | `eval.batch_size=1`, `env.max_parallel_tasks=4` | `eval.batch_size=1`, `env.max_parallel_tasks=16` | For large suite sweeps, increase `max_parallel_tasks` before `batch_size` to avoid MuJoCo memory spikes. | +| `metaworld` | `eval.batch_size=8`, `env.max_parallel_tasks=1` | `eval.batch_size=16`, `env.max_parallel_tasks=2` | Prefer larger per-task vectorization first. | +| `robocasa` | `eval.batch_size=4`, `env.max_parallel_tasks=1` | `eval.batch_size=8`, `env.max_parallel_tasks=2` | Rendering/memory can dominate at high image resolution. | +| `robomme` | `eval.batch_size=4`, `env.max_parallel_tasks=1` | `eval.batch_size=8`, `env.max_parallel_tasks=2` | Start small and scale gradually with task count. | + +### Local fast eval recipe + +```bash +lerobot-eval \ + --policy.path=$HF_USER/smolvla_libero_plus \ + --env.type=libero_plus \ + --eval.n_episodes=1 \ + --eval.batch_size=1 \ + --env.max_parallel_tasks=16 \ + --eval.instance_count=2 \ + --rename_map='{"observation.images.image":"observation.images.camera1","observation.images.image2":"observation.images.camera2"}' \ + --output_dir=outputs/eval/smolvla_libero_plus \ + --push_to_hub=true +``` + +### Docker fast eval recipe + +```bash +lerobot-eval \ + --policy.path=$HF_USER/smolvla_libero_plus \ + --env.type=libero_plus \ + --eval.runtime=docker \ + --eval.docker.envhub_ref=envhub://lerobot/libero_plus@v1 \ + --eval.docker.gpus=all \ + --eval.docker.shm_size=16g \ + --eval.n_episodes=1 \ + --eval.batch_size=1 \ + --env.max_parallel_tasks=16 +``` + ## Quick start — single benchmark Train SmolVLA on LIBERO-plus with 4 GPUs for 50 000 steps: @@ -95,7 +205,7 @@ lerobot-benchmark all \ For each benchmark the runner: 1. Trains a policy on its dataset. 2. Evaluates on every eval task in the benchmark (e.g. 4 suites for LIBERO). -3. Uploads eval results + videos to the Hub. +3. Pushes HF-native `.eval_results` rows (and optional artifacts) to the Hub. @@ -140,7 +250,9 @@ for SUITE in libero_spatial libero_object libero_goal libero_10; do --eval.n_episodes=50 \ --eval.batch_size=10 \ --output_dir=outputs/eval/smolvla_libero_plus/$SUITE \ - --policy.device=cuda + --policy.device=cuda \ + --push_to_hub=true \ + --benchmark_dataset_id=lerobot/sim-benchmarks done ``` @@ -226,28 +338,44 @@ outputs/ Each `eval_info.json` contains per-episode rewards, success rates, and aggregate metrics. -## Uploading eval results to the Hub +## HF Eval Results + Leaderboard -Add `--push-eval-to-hub` to upload evaluation metrics and videos to the policy's -Hub repo after each eval run: +LeRobot publishes benchmark scores using Hugging Face's native +`/.eval_results/*.yaml` format, which powers model-page eval cards and +benchmark leaderboards. + +Add `--push-eval-to-hub` to push results after each eval run: ```bash lerobot-benchmark eval \ --benchmarks libero_plus,robocasa \ --hub-user $HF_USER \ + --benchmark-dataset-id lerobot/sim-benchmarks \ --push-eval-to-hub ``` -For LIBERO-plus, each suite's results are uploaded to `eval/libero_spatial/`, -`eval/libero_object/`, etc. inside the `$HF_USER/smolvla_libero_plus` model repo. +This writes one or more files under `.eval_results/` in the model repo, for example: -This also works with the `all` subcommand — pass `--push-eval-to-hub` and results -are automatically uploaded after each eval run. +```yaml +- dataset: + id: lerobot/sim-benchmarks + task_id: libero_plus/spatial + value: 82.4 + notes: lerobot-eval +``` + +Notes: +- `--benchmark-dataset-id` points to your consolidated benchmark dataset repo. +- `task_id` values are derived from `env.type` and evaluated suite/task names. +- Eval artifacts (`eval_info.json`, `eval_config.json`, videos) are still uploaded + for provenance, but leaderboard ranking comes from `.eval_results`. ## Passing extra arguments Any arguments after the recognized flags are forwarded to `lerobot-train` or -`lerobot-eval`. For example, to use PEFT/LoRA during training: +`lerobot-eval`. + +Example (training): use PEFT/LoRA during training. ```bash lerobot-benchmark train \ @@ -258,3 +386,13 @@ lerobot-benchmark train \ --steps 50000 \ --peft.method_type=LORA --peft.r=16 ``` + +Example (evaluation): forward Docker runtime flags to each `lerobot-eval` call. + +```bash +lerobot-benchmark eval \ + --benchmarks libero_plus \ + --hub-user $HF_USER \ + --eval.runtime=docker \ + --eval.docker.envhub_ref=envhub://lerobot/libero_plus@v1 +``` diff --git a/src/lerobot/configs/default.py b/src/lerobot/configs/default.py index dcb0cbd54..6d3dbe89b 100644 --- a/src/lerobot/configs/default.py +++ b/src/lerobot/configs/default.py @@ -49,15 +49,51 @@ class WandBConfig: mode: str | None = None # Allowed values: 'online', 'offline' 'disabled'. Defaults to 'online' +@dataclass +class EvalDockerConfig: + # Docker image to use for evaluation (e.g., "ghcr.io/org/lerobot-eval-libero:latest"). + # Takes precedence over eval.envhub_ref. + image: str | None = None + # Optional EnvHub reference to resolve an image, e.g. "envhub://lerobot/libero_plus@v1". + envhub_ref: str | None = None + # If true, mount the local repository and prefer local source code in the container. + use_local_code: bool = True + # Pull the image before running. + pull: bool = True + # Docker --gpus value. Set to None to disable GPU flags and run CPU-only. + gpus: str | None = "all" + # Docker --shm-size value (increase when using larger eval.batch_size values). + shm_size: str = "8g" + + @dataclass class EvalConfig: n_episodes: int = 50 - # `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv. + # Number of sub-envs per task inside one VectorEnv. Increase to improve per-task + # inference throughput until GPU or simulator memory saturates. batch_size: int = 50 - # `use_async_envs` specifies whether to use asynchronous environments (multiprocessing). + # Use AsyncVectorEnv (multiprocessing). Prefer SyncVectorEnv unless your environment + # spends significant time in Python-side stepping and can benefit from process parallelism. use_async_envs: bool = False + # Runtime where evaluation executes: "local" or "docker". + runtime: str = "local" + docker: EvalDockerConfig = field(default_factory=EvalDockerConfig) + # Number of parallel eval script instances to launch for one run. + # instance_count > 1 enables multi-instance task sharding. + instance_count: int = 1 + # 0-indexed shard id for this process. Users usually leave this at 0. + # Additional shards are launched automatically by `lerobot-eval` when instance_count > 1. + instance_id: int = 0 def __post_init__(self) -> None: + if self.runtime not in {"local", "docker"}: + raise ValueError(f"Unsupported eval.runtime '{self.runtime}'. Expected one of: local, docker.") + if self.instance_count < 1: + raise ValueError("eval.instance_count must be >= 1.") + if self.instance_id < 0 or self.instance_id >= self.instance_count: + raise ValueError( + f"eval.instance_id must be in [0, {self.instance_count - 1}] (got {self.instance_id})." + ) if self.batch_size > self.n_episodes: raise ValueError( "The eval batch size is greater than the number of eval episodes " diff --git a/src/lerobot/envs/configs.py b/src/lerobot/envs/configs.py index bdc16bc73..fbbdcadb5 100644 --- a/src/lerobot/envs/configs.py +++ b/src/lerobot/envs/configs.py @@ -45,6 +45,10 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC): fps: int = 30 features: dict[str, PolicyFeature] = field(default_factory=dict) features_map: dict[str, str] = field(default_factory=dict) + # Upper bound on concurrent task evaluation in `lerobot-eval`. + # - For lazy wrappers (e.g. LIBERO/LIBERO-plus), values >1 can enable chunked + # task batching with one policy forward pass over multiple tasks. + # - For other envs, values >1 use a threaded task scheduler fallback. max_parallel_tasks: int = 1 disable_env_checker: bool = True @@ -356,7 +360,7 @@ class LiberoPlusEnv(LiberoEnv): in experiment configs (`env.type=libero_plus`). """ - task: str = "libero_spatial" + task: str = "libero_spatial,libero_object,libero_goal,libero_10" @EnvConfig.register_subclass("robocasa") diff --git a/src/lerobot/envs/factory.py b/src/lerobot/envs/factory.py index 7471d57b9..757b521a8 100644 --- a/src/lerobot/envs/factory.py +++ b/src/lerobot/envs/factory.py @@ -125,7 +125,7 @@ def make_env( use_async_envs: bool = False, hub_cache_dir: str | None = None, trust_remote_code: bool = False, -) -> dict[str, dict[int, gym.vector.VectorEnv]]: +) -> dict[str, dict[int, Any]]: """Makes a gym vector environment according to the config or Hub reference. Args: @@ -143,8 +143,9 @@ def make_env( ModuleNotFoundError: If the requested env package is not installed Returns: - dict[str, dict[int, gym.vector.VectorEnv]]: - A mapping from suite name to indexed vectorized environments. + dict[str, dict[int, Any]]: + A mapping from suite name to indexed environments. Values are either + materialized vector envs or lazy wrappers that materialize on first use. - For multi-task benchmarks (e.g., LIBERO): one entry per suite, and one vec env per task_id. - For single-task environments: a single suite entry (cfg.type) with task_id=0. diff --git a/src/lerobot/envs/lazy_vec_env.py b/src/lerobot/envs/lazy_vec_env.py new file mode 100644 index 000000000..123606d6c --- /dev/null +++ b/src/lerobot/envs/lazy_vec_env.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from collections.abc import Callable, Sequence +from typing import Any + + +class LazyVectorEnv: + """Defer vector-env construction until first usage. + + This is useful for benchmarks with many tasks: we can register one env object + per task without eagerly allocating all simulator/rendering resources. + """ + + def __init__(self, env_cls: Callable[[Sequence[Callable[[], Any]]], Any], factory_fns: list[Callable]): + self._env_cls = env_cls + self._factory_fns = factory_fns + self._env = None + + @property + def env_cls(self) -> Callable[[Sequence[Callable[[], Any]]], Any]: + return self._env_cls + + @property + def factory_fns(self) -> list[Callable]: + return self._factory_fns + + @property + def num_factory_fns(self) -> int: + return len(self._factory_fns) + + def materialize(self): + if self._env is None: + self._env = self._env_cls(self._factory_fns) + return self._env + + def close(self): + if self._env is not None: + self._env.close() + self._env = None + + def __getattr__(self, name): + return getattr(self.materialize(), name) + diff --git a/src/lerobot/envs/libero.py b/src/lerobot/envs/libero.py index a289728b5..457dad576 100644 --- a/src/lerobot/envs/libero.py +++ b/src/lerobot/envs/libero.py @@ -75,6 +75,7 @@ if "wand" not in sys.modules: from libero.libero import benchmark, get_libero_path from libero.libero.envs import OffScreenRenderEnv +from lerobot.envs.lazy_vec_env import LazyVectorEnv from lerobot.processor import RobotObservation _ASSET_DOWNLOAD_INSTRUCTIONS = """\ @@ -104,6 +105,145 @@ def _check_libero_plus_assets() -> None: raise FileNotFoundError(_ASSET_DOWNLOAD_INSTRUCTIONS.format(assets_dir=assets_dir)) +# ---- Perturbation support for LIBERO-Plus ----------------------------------- + +PERTURBATION_DIMENSIONS = ( + "Camera Viewpoints", + "Robot Initial States", + "Language Instructions", + "Light Conditions", + "Background Textures", + "Sensor Noise", + "Objects Layout", +) + +PERTURBATION_SHORT_KEYS = { + "Camera Viewpoints": "camera", + "Robot Initial States": "robot", + "Language Instructions": "language", + "Light Conditions": "light", + "Background Textures": "background", + "Sensor Noise": "noise", + "Objects Layout": "layout", +} + + +def load_task_classification() -> dict: + """Load task_classification.json shipped with LIBERO-Plus.""" + import json + + benchmark_root = Path(get_libero_path("benchmark_root")) + candidates = [ + benchmark_root / "benchmark" / "task_classification.json", + benchmark_root / "task_classification.json", + benchmark_root.parent / "benchmark" / "task_classification.json", + ] + for p in candidates: + if p.exists(): + with open(p) as f: + return json.load(f) + raise FileNotFoundError( + f"task_classification.json not found. Tried: {[str(c) for c in candidates]}" + ) + + +def build_perturbation_index(suite_name: str) -> dict[int, str]: + """Return {0-indexed task_id: perturbation_dimension} for *suite_name*.""" + tc = load_task_classification() + suite_data = tc.get(suite_name, {}) + index: dict[int, str] = {} + + # LIBERO-Plus task_classification.json has appeared in two shapes: + # 1) dict[suite][task_id_str] -> meta + # 2) dict[suite] -> list[{id, category, ...}] + if isinstance(suite_data, list): + for item in suite_data: + if not isinstance(item, dict): + continue + raw_id = item.get("id") + if raw_id is None: + continue + try: + # list-form ids are 1-indexed in current LIBERO-Plus release. + tid = int(raw_id) - 1 + except (TypeError, ValueError): + continue + if tid < 0: + continue + dim = item.get("perturbation_type") or item.get("category", "unknown") + index[tid] = dim + return index + + if isinstance(suite_data, dict): + # Handle both 0-indexed and 1-indexed key conventions. + numeric_keys: list[int] = [] + for k in suite_data: + try: + numeric_keys.append(int(k)) + except (TypeError, ValueError): + continue + one_indexed = bool(numeric_keys) and 0 not in numeric_keys and min(numeric_keys) >= 1 + + for task_id_str, meta in suite_data.items(): + try: + tid = int(task_id_str) + except (TypeError, ValueError): + continue + if one_indexed: + tid -= 1 + if tid < 0: + continue + if isinstance(meta, dict): + dim = meta.get("perturbation_type") or meta.get("category", "unknown") + else: + dim = "unknown" + index[tid] = dim + return index + + return index + + +def aggregate_by_perturbation( + per_task: list[dict], suite_indices: dict[str, dict[int, str]] +) -> dict[str, dict]: + """Aggregate per-task eval results by perturbation dimension. + + Args: + per_task: list of {"task_group": str, "task_id": int, "metrics": {...}} + suite_indices: {suite_name: {task_id: dimension_name}} from build_perturbation_index + + Returns: + {short_key: {"pc_success": float, "n_episodes": int}} for each perturbation dimension + """ + dim_successes: dict[str, list] = defaultdict(list) + for entry in per_task: + suite = entry["task_group"] + tid = entry["task_id"] + idx = suite_indices.get(suite, {}) + dim = idx.get(tid, "unknown") + short = PERTURBATION_SHORT_KEYS.get(dim, dim.lower().replace(" ", "_")) + successes = entry["metrics"].get("successes", []) + dim_successes[short].extend(successes) + + results: dict[str, dict] = {} + all_successes: list = [] + for short_key in list(PERTURBATION_SHORT_KEYS.values()) + ["unknown"]: + if short_key not in dim_successes: + continue + s = dim_successes[short_key] + all_successes.extend(s) + results[short_key] = { + "pc_success": float(np.nanmean(s) * 100) if s else float("nan"), + "n_episodes": len(s), + } + if all_successes: + results["total"] = { + "pc_success": float(np.nanmean(all_successes) * 100), + "n_episodes": len(all_successes), + } + return results + + def _parse_camera_names(camera_name: str | Sequence[str]) -> list[str]: """Normalize camera_name into a non-empty list of strings.""" if isinstance(camera_name, str): @@ -143,26 +283,31 @@ def get_task_init_states(task_suite: Any, i: int) -> np.ndarray: init_states_dir = Path(get_libero_path("init_states")) / task_suite.tasks[i].problem_folder init_states_file = task_suite.tasks[i].init_states_file - candidate_names = [init_states_file] - # Some LIBERO-plus task names include a "_table_" suffix while shipped - # init files use the base name without that table suffix. - if "_table_" in init_states_file: - candidate_names.append(re.sub(r"_table_\d+(?=\.pruned_init$|\.init$)", "", init_states_file)) + # 1. Direct match + direct = init_states_dir / init_states_file + if direct.exists(): + return torch.load(direct, weights_only=False) # nosec B614 - for name in candidate_names: - candidate_path = init_states_dir / name - if candidate_path.exists(): - return torch.load(candidate_path, weights_only=False) # nosec B614 + # 2. LIBERO-Plus perturbation filenames append suffixes like + # _view_0_0_100_0_0_initstate_1, _language_19, _noise_45, _table_1, _tb_9, _add_16 + # to the base task name. Instead of regex-matching every variant, find the + # longest existing base file whose stem is a prefix of the perturbation stem. + stem, ext = os.path.splitext(init_states_file) + best_match: Path | None = None + best_len = 0 + for candidate in init_states_dir.glob(f"*{ext}"): + cstem = candidate.stem + if stem == cstem or (stem.startswith(cstem) and stem[len(cstem)] == "_"): + if len(cstem) > best_len: + best_len = len(cstem) + best_match = candidate - # Last-resort fallback: pick any file matching the base prefix + extension. - stem, suffix = os.path.splitext(init_states_file) - stem = re.sub(r"_table_\d+$", "", stem) - fallback_matches = sorted(init_states_dir.glob(f"{stem}*{suffix}")) - if fallback_matches: - return torch.load(fallback_matches[0], weights_only=False) # nosec B614 + if best_match is not None: + return torch.load(best_match, weights_only=False) # nosec B614 raise FileNotFoundError( - f"Could not find init states for task {i}. Tried {candidate_names} in '{init_states_dir}'." + f"Could not find init states for task {i}. " + f"Tried '{init_states_file}' and prefix matching in '{init_states_dir}'." ) @@ -259,6 +404,7 @@ class LiberoEnv(gym.Env): # Load once and keep self._init_states = get_task_init_states(task_suite, self.task_id) if self.init_states else None self._reset_stride = n_envs # when performing a reset, append `_reset_stride` to `init_state_id`. + self._init_state_error_warned = False self.init_state_id = self.episode_index # tie each sub-env to a fixed init state @@ -410,8 +556,21 @@ class LiberoEnv(gym.Env): self._env.seed(seed) raw_obs = self._env.reset() if self.init_states and self._init_states is not None: - raw_obs = self._env.set_init_state(self._init_states[self.init_state_id % len(self._init_states)]) - self.init_state_id += self._reset_stride # Change init_state_id when reset + try: + raw_obs = self._env.set_init_state(self._init_states[self.init_state_id % len(self._init_states)]) + self.init_state_id += self._reset_stride # Change init_state_id when reset + except Exception as exc: + # Some LIBERO-Plus perturbation tasks (notably object-layout variants) + # can have different simulator state dimensions than their base init files. + # Fall back to plain env.reset() instead of aborting the whole evaluation. + self.init_states = False + if not self._init_state_error_warned: + print( + "WARNING: Failed to apply init state for " + f"task_id={self.task_id} ({self.task}). " + f"Falling back to plain reset. Error: {exc}" + ) + self._init_state_error_warned = True # After reset, objects may be unstable (slightly floating, intersecting, etc.). # Step the simulator with a no-op action for a few frames so everything settles. @@ -500,6 +659,9 @@ def _make_env_fns( return fns +_LazyVecEnv = LazyVectorEnv + + # ---- Main API ---------------------------------------------------------------- @@ -543,12 +705,23 @@ def create_libero_envs( print(f"Restricting to task_ids={task_ids_filter}") out: dict[str, dict[int, Any]] = defaultdict(dict) + total_tasks = 0 for suite_name in suite_names: suite = _get_suite(suite_name) total = len(suite.tasks) selected = _select_task_ids(total, task_ids_filter) if not selected: raise ValueError(f"No tasks selected for suite '{suite_name}' (available: {total}).") + total_tasks += len(selected) + + lazy = total_tasks > 50 + if lazy: + print(f"Using lazy env creation for {total_tasks} tasks (envs created on demand)") + + for suite_name in suite_names: + suite = _get_suite(suite_name) + total = len(suite.tasks) + selected = _select_task_ids(total, task_ids_filter) for tid in selected: fns = _make_env_fns( @@ -562,8 +735,11 @@ def create_libero_envs( gym_kwargs=gym_kwargs, control_mode=control_mode, ) - out[suite_name][tid] = env_cls(fns) - print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}") + if lazy: + out[suite_name][tid] = LazyVectorEnv(env_cls, fns) + else: + out[suite_name][tid] = env_cls(fns) + print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}") # return plain dicts for predictability return {suite: dict(task_map) for suite, task_map in out.items()} diff --git a/src/lerobot/envs/metaworld.py b/src/lerobot/envs/metaworld.py index 4d91e002d..c5437f06b 100644 --- a/src/lerobot/envs/metaworld.py +++ b/src/lerobot/envs/metaworld.py @@ -25,6 +25,7 @@ import metaworld.policies as policies import numpy as np from gymnasium import spaces +from lerobot.envs.lazy_vec_env import LazyVectorEnv from lerobot.processor import RobotObservation # ---- Load configuration data from the external JSON file ---- @@ -297,19 +298,24 @@ def create_metaworld_envs( print(f"Creating Meta-World envs | task_groups={task_groups} | n_envs(per task)={n_envs}") + group_to_tasks = {group: DIFFICULTY_TO_TASKS.get(group, [group]) for group in task_groups} + total_tasks = sum(len(tasks) for tasks in group_to_tasks.values()) + lazy = total_tasks > 50 + if lazy: + print(f"Using lazy env creation for {total_tasks} tasks (envs created on demand)") + out: dict[str, dict[int, Any]] = defaultdict(dict) for group in task_groups: - # if not in difficulty presets, treat it as a single custom task - tasks = DIFFICULTY_TO_TASKS.get(group, [group]) + tasks = group_to_tasks[group] for tid, task_name in enumerate(tasks): - print(f"Building vec env | group={group} | task_id={tid} | task={task_name}") + if not lazy: + print(f"Building vec env | group={group} | task_id={tid} | task={task_name}") # build n_envs factories fns = [(lambda tn=task_name: MetaworldEnv(task=tn, **gym_kwargs)) for _ in range(n_envs)] - - out[group][tid] = env_cls(fns) + out[group][tid] = LazyVectorEnv(env_cls, fns) if lazy else env_cls(fns) # return a plain dict for consistency return {group: dict(task_map) for group, task_map in out.items()} diff --git a/src/lerobot/envs/robocasa.py b/src/lerobot/envs/robocasa.py index b85f2eb74..50f79273c 100644 --- a/src/lerobot/envs/robocasa.py +++ b/src/lerobot/envs/robocasa.py @@ -24,6 +24,7 @@ import gymnasium as gym import numpy as np from gymnasium import spaces +from lerobot.envs.lazy_vec_env import LazyVectorEnv # Action layout (flat 12D, normalized to [-1, 1]): # [0:3] end_effector_position (delta x, y, z) @@ -256,8 +257,12 @@ def create_robocasa_envs( gym_kwargs = dict(gym_kwargs or {}) out: dict[str, dict[int, Any]] = defaultdict(dict) + total_tasks = len(task_list) + lazy = total_tasks > 50 print(f"Creating RoboCasa envs | tasks={task_list} | n_envs(per task)={n_envs} | split={split}") + if lazy: + print(f"Using lazy env creation for {total_tasks} tasks (envs created on demand)") for task in task_list: fns = _make_env_fns( task=task, @@ -267,7 +272,8 @@ def create_robocasa_envs( episode_length=episode_length, gym_kwargs=gym_kwargs, ) - out["robocasa"][len(out["robocasa"])] = env_cls(fns) - print(f" Built vec env | task={task} | n_envs={n_envs}") + out["robocasa"][len(out["robocasa"])] = LazyVectorEnv(env_cls, fns) if lazy else env_cls(fns) + if not lazy: + print(f" Built vec env | task={task} | n_envs={n_envs}") return {suite: dict(task_map) for suite, task_map in out.items()} diff --git a/src/lerobot/envs/robomme.py b/src/lerobot/envs/robomme.py index e963edf6e..514226300 100644 --- a/src/lerobot/envs/robomme.py +++ b/src/lerobot/envs/robomme.py @@ -14,12 +14,16 @@ Install: pip install robomme (or from source: https://github.com/RoboMME/robomm from __future__ import annotations +from collections.abc import Callable, Sequence +from functools import partial from typing import Any import gymnasium as gym import numpy as np from gymnasium import spaces +from lerobot.envs.lazy_vec_env import LazyVectorEnv + ROBOMME_TASKS = [ "BinFill", "PickXtimes", "SwingXtimes", "StopCube", "VideoUnmask", "VideoUnmaskSwap", "ButtonUnmask", "ButtonUnmaskSwap", @@ -113,6 +117,29 @@ class RoboMMEGymEnv(gym.Env): } +def _make_env_fns( + *, + task: str, + n_envs: int, + action_space_type: str, + dataset: str, + episode_length: int, + task_id: int, +) -> list[Callable[[], RoboMMEGymEnv]]: + """Build n_envs factory callables for one RoboMME task id.""" + + def _make_one(episode_index: int) -> RoboMMEGymEnv: + return RoboMMEGymEnv( + task=task, + action_space_type=action_space_type, + dataset=dataset, + episode_idx=episode_index, + max_steps=episode_length, + ) + + return [partial(_make_one, task_id + i) for i in range(n_envs)] + + def create_robomme_envs( task: str, n_envs: int = 1, @@ -120,35 +147,35 @@ def create_robomme_envs( dataset: str = "test", episode_length: int = 300, task_ids: list[int] | None = None, - env_cls=None, + env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None, ) -> dict[str, dict[int, gym.vector.VectorEnv]]: """Create vectorized RoboMME environments for evaluation. Returns {suite_name: {task_id: VectorEnv}} matching lerobot's expected format. """ - if env_cls is None: - env_cls = gym.vector.SyncVectorEnv + if env_cls is None or not callable(env_cls): + raise ValueError("env_cls must be a callable that wraps a list of env factory callables.") + if not isinstance(n_envs, int) or n_envs <= 0: + raise ValueError(f"n_envs must be a positive int; got {n_envs}.") if task_ids is None: task_ids = [0] suite_name = "robomme" envs_by_task = {} + lazy = len(task_ids) > 50 + if lazy: + print(f"Using lazy env creation for {len(task_ids)} tasks (envs created on demand)") for task_id in task_ids: - def _make_one(ep_idx=task_id): - return RoboMMEGymEnv( - task=task, - action_space_type=action_space_type, - dataset=dataset, - episode_idx=ep_idx, - max_steps=episode_length, - ) - - vec = env_cls( - [_make_one for _ in range(n_envs)], - autoreset_mode=gym.vector.AutoresetMode.SAME_STEP, + fns = _make_env_fns( + task=task, + n_envs=n_envs, + action_space_type=action_space_type, + dataset=dataset, + episode_length=episode_length, + task_id=task_id, ) - envs_by_task[task_id] = vec + envs_by_task[task_id] = LazyVectorEnv(env_cls, fns) if lazy else env_cls(fns) return {suite_name: envs_by_task} diff --git a/src/lerobot/scripts/lerobot_eval.py b/src/lerobot/scripts/lerobot_eval.py index 3c7a5b9cc..be2b5e220 100644 --- a/src/lerobot/scripts/lerobot_eval.py +++ b/src/lerobot/scripts/lerobot_eval.py @@ -49,7 +49,9 @@ You can learn about the CLI options for this script in the `EvalPipelineConfig` import concurrent.futures as cf import json import logging -import re +import shutil +import subprocess +import sys import threading import time from collections import defaultdict @@ -72,6 +74,8 @@ from tqdm import trange from lerobot.configs import parser from lerobot.configs.eval import EvalPipelineConfig +from lerobot.envs.lazy_vec_env import LazyVectorEnv +from lerobot.envs.docker_runtime import run_eval_in_docker from lerobot.envs.factory import make_env, make_env_pre_post_processors from lerobot.envs.utils import ( add_envs_task, @@ -86,6 +90,11 @@ from lerobot.utils.constants import ACTION, DONE, OBS_STR, REWARD from lerobot.utils.import_utils import register_third_party_plugins from lerobot.utils.io_utils import write_video from lerobot.utils.random_utils import set_seed +from lerobot.utils.hf_eval_results import ( + build_eval_results_rows, + default_eval_date, + upload_eval_results_yaml, +) from lerobot.utils.utils import ( get_safe_torch_device, init_logging, @@ -521,16 +530,22 @@ def push_eval_to_hub( output_dir: Path, info: dict, env_type: str, - eval_config: dict | None = None, + env_task: str | None, + benchmark_dataset_id: str, + source_url: str | None = None, + notes: str | None = None, ) -> str: - """Upload eval results, videos, config, and an updated model card to the Hub. + """Upload eval artifacts and `.eval_results` rows to the Hub. Args: repo_id: HF model repo (e.g. "user/my_policy"). output_dir: Local directory containing eval_info.json and videos/. info: The eval results dict (as returned by eval_policy_all). env_type: Environment type string (e.g. "libero_plus", "pusht"). - eval_config: Serialized EvalPipelineConfig dict (policy + env + eval settings). + env_task: The env task string from eval config. + benchmark_dataset_id: HF dataset id of the consolidated benchmark dataset. + source_url: Optional source URL for `.eval_results` attribution. + notes: Optional setup notes to include in `.eval_results`. Returns: URL of the last Hub commit. @@ -572,96 +587,95 @@ def push_eval_to_hub( commit_message=f"Upload eval rollout videos for {env_type}", ) - # 4. Update the model card with an eval results table and config summary - _update_model_card_with_eval(api, repo_id, info, env_type, eval_config=eval_config) + # 4. Upload HF-native `.eval_results` rows (canonical leaderboard surface). + rows = build_eval_results_rows( + info=info, + env_type=env_type, + env_task=env_task, + benchmark_dataset_id=benchmark_dataset_id, + source_url=source_url, + notes=notes, + eval_date=default_eval_date(), + ) + commit_url = upload_eval_results_yaml( + api=api, + repo_id=repo_id, + rows=rows, + env_type=env_type, + env_task=env_task, + output_dir=output_dir, + ) logging.info(f"Eval results pushed to https://huggingface.co/{repo_id}") return commit_url -def _format_eval_table(info: dict, env_type: str, eval_config: dict | None = None) -> str: - """Build a markdown table from eval results, optionally including config.""" - lines = [ - f"### Evaluation: `{env_type}`\n", - "| Suite | Success Rate (%) | Avg Sum Reward | Episodes |", - "|-------|-----------------|----------------|----------|", - ] - - per_group = info.get("per_group", {}) - for group_name, stats in sorted(per_group.items()): - sr = stats.get("pc_success", float("nan")) - reward = stats.get("avg_sum_reward", float("nan")) - n_ep = stats.get("n_episodes", 0) - lines.append(f"| {group_name} | {sr:.1f} | {reward:.2f} | {n_ep} |") - - overall = info.get("overall", {}) - if overall: - sr = overall.get("pc_success", float("nan")) - reward = overall.get("avg_sum_reward", float("nan")) - n_ep = overall.get("n_episodes", 0) - lines.append(f"| **Overall** | **{sr:.1f}** | **{reward:.2f}** | **{n_ep}** |") - - if eval_config: - lines.append("") - lines.append("
Eval configuration\n") - lines.append("```json") - lines.append(json.dumps(eval_config, indent=2)) - lines.append("```\n") - lines.append("
") - - video_paths = overall.get("video_paths", []) - if video_paths: - lines.append("") - lines.append("
Rollout videos\n") - for vp in video_paths[:10]: - video_name = Path(vp).name - parent = Path(vp).parent.name - lines.append(f"**{parent}/{video_name}**\n") - lines.append(f"![{video_name}](eval/{env_type}/videos/{parent}/{video_name})\n") - lines.append("
") - - return "\n".join(lines) - - -def _update_model_card_with_eval( - api: Any, repo_id: str, info: dict, env_type: str, eval_config: dict | None = None -) -> None: - """Append or replace the eval section in the model card README.""" - from huggingface_hub import ModelCard - - try: - card = ModelCard.load(repo_id) - except Exception: - card = ModelCard("") - - content = card.content or "" - - eval_table = _format_eval_table(info, env_type, eval_config=eval_config) - - section_marker_start = f"" - section_marker_end = f"" - new_section = f"{section_marker_start}\n{eval_table}\n{section_marker_end}" - - if section_marker_start in content: - content = re.sub( - rf"{re.escape(section_marker_start)}.*?{re.escape(section_marker_end)}", - new_section, - content, - flags=re.DOTALL, - ) - else: - eval_header = "\n## Evaluation Results\n\n" - if "## Evaluation Results" not in content: - content += eval_header - content += f"\n{new_section}\n" - - card.content = content - card.push_to_hub(repo_id, commit_message=f"Update eval results for {env_type}") - - @parser.wrap() def eval_main(cfg: EvalPipelineConfig): logging.info(pformat(asdict(cfg))) + if cfg.eval.instance_count > 1 and cfg.eval.instance_id == 0: + _orchestrate_multi_instance_eval(cfg) + else: + _run_eval_worker(cfg) + + +def _maybe_add_libero_plus_perturbation(info: dict, cfg: EvalPipelineConfig) -> None: + if cfg.env.type != "libero_plus": + return + try: + from lerobot.envs.libero import aggregate_by_perturbation, build_perturbation_index + + suite_names = [s.strip() for s in cfg.env.task.split(",") if s.strip()] + suite_indices = {s: build_perturbation_index(s) for s in suite_names} + perturbation_results = aggregate_by_perturbation(info["per_task"], suite_indices) + info["perturbation_results"] = perturbation_results + print("\n=== Perturbation Results ===") + for dim, stats in perturbation_results.items(): + print(f" {dim}: {stats['pc_success']:.1f}% ({stats['n_episodes']} episodes)") + except Exception as exc: + # Never fail a finished long-running eval on post-processing. + print(f"WARNING: Failed to compute LIBERO-Plus perturbation breakdown: {exc}") + print("Continuing with per-suite + overall metrics only.") + + +def _save_eval_outputs(cfg: EvalPipelineConfig, info: dict) -> None: + output_dir = Path(cfg.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + with open(output_dir / "eval_info.json", "w") as f: + json.dump(info, f, indent=2) + + eval_cfg_dict = _serializable_config(asdict(cfg)) + with open(output_dir / "eval_config.json", "w") as f: + json.dump(eval_cfg_dict, f, indent=2) + + +def _maybe_push_eval_outputs(cfg: EvalPipelineConfig, info: dict) -> None: + if not cfg.push_to_hub: + return + repo_id = str(cfg.policy.pretrained_path) + try: + push_eval_to_hub( + repo_id=repo_id, + output_dir=Path(cfg.output_dir), + info=info, + env_type=cfg.env.type, + env_task=cfg.env.task, + benchmark_dataset_id=cfg.benchmark_dataset_id, + source_url=cfg.eval_result_source_url, + notes=cfg.eval_result_notes, + ) + except Exception as exc: + logging.warning("Failed to push eval artifacts/results to Hub: %s", exc) + + +def _run_eval_worker(cfg: EvalPipelineConfig) -> dict: + logging.info(pformat(asdict(cfg))) + + if cfg.eval.runtime == "docker": + run_eval_in_docker(cfg) + output_dir = Path(cfg.output_dir) + with open(output_dir / "eval_info.json") as f: + return json.load(f) # Check device is available device = get_safe_torch_device(cfg.policy.device, log=True) @@ -705,50 +719,120 @@ def eval_main(cfg: EvalPipelineConfig): # Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments) env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy) - with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(): - info = eval_policy_all( - envs=envs, - policy=policy, - env_preprocessor=env_preprocessor, - env_postprocessor=env_postprocessor, - preprocessor=preprocessor, - postprocessor=postprocessor, - n_episodes=cfg.eval.n_episodes, - max_episodes_rendered=10, - videos_dir=Path(cfg.output_dir) / "videos", - start_seed=cfg.seed, - max_parallel_tasks=cfg.env.max_parallel_tasks, - ) - print("Overall Aggregated Metrics:") - print(info["overall"]) + try: + with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(): + info = eval_policy_all( + envs=envs, + policy=policy, + env_preprocessor=env_preprocessor, + env_postprocessor=env_postprocessor, + preprocessor=preprocessor, + postprocessor=postprocessor, + n_episodes=cfg.eval.n_episodes, + max_episodes_rendered=10, + videos_dir=Path(cfg.output_dir) / "videos", + start_seed=cfg.seed, + max_parallel_tasks=cfg.env.max_parallel_tasks, + instance_count=cfg.eval.instance_count, + instance_id=cfg.eval.instance_id, + ) + print("Overall Aggregated Metrics:") + print(info["overall"]) - # Print per-suite stats - for task_group, task_group_info in info.items(): - print(f"\nAggregated Metrics for {task_group}:") - print(task_group_info) - # Close all vec envs - close_envs(envs) + for key, val in info.get("per_group", {}).items(): + print(f"\nAggregated Metrics for {key}:") + print(val) - # Save info - output_dir = Path(cfg.output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - with open(output_dir / "eval_info.json", "w") as f: - json.dump(info, f, indent=2) + _maybe_add_libero_plus_perturbation(info, cfg) + finally: + close_envs(envs) - eval_cfg_dict = _serializable_config(asdict(cfg)) - with open(output_dir / "eval_config.json", "w") as f: - json.dump(eval_cfg_dict, f, indent=2) + _save_eval_outputs(cfg, info) + _maybe_push_eval_outputs(cfg, info) - if cfg.push_to_hub: - repo_id = str(cfg.policy.pretrained_path) - push_eval_to_hub( - repo_id=repo_id, - output_dir=output_dir, - info=info, - env_type=cfg.env.type, - eval_config=eval_cfg_dict, - ) + logging.info("End of eval") + return info + +def _orchestrate_multi_instance_eval(cfg: EvalPipelineConfig) -> None: + start_t = time.time() + root_output_dir = Path(cfg.output_dir) + instances_root = root_output_dir / "instances" + instances_root.mkdir(parents=True, exist_ok=True) + + n_instances = cfg.eval.instance_count + logging.info(f"Launching multi-instance eval with {n_instances} workers.") + + # Spawn workers for shard 1..N-1, run shard 0 in-process. + child_procs: list[tuple[int, subprocess.Popen]] = [] + argv = [ + arg + for arg in sys.argv[1:] + if not arg.startswith("--eval.instance_id=") + and not arg.startswith("--output_dir=") + and not arg.startswith("--push_to_hub=") + ] + for i in range(1, n_instances): + child_output_dir = instances_root / str(i) + cmd = [ + sys.executable, + "-m", + "lerobot.scripts.lerobot_eval", + *argv, + f"--eval.instance_id={i}", + f"--output_dir={child_output_dir}", + "--push_to_hub=false", + ] + logging.info("Starting eval worker %s/%s", i + 1, n_instances) + child_procs.append((i, subprocess.Popen(cmd))) + + cfg0 = deepcopy(cfg) + cfg0.eval.instance_id = 0 + cfg0.push_to_hub = False + cfg0.output_dir = instances_root / "0" + _run_eval_worker(cfg0) + + failed = [] + for idx, proc in child_procs: + rc = proc.wait() + if rc != 0: + failed.append((idx, rc)) + if failed: + raise RuntimeError(f"Multi-instance eval failed for workers: {failed}") + + partial_infos: list[dict] = [] + for i in range(n_instances): + info_path = instances_root / str(i) / "eval_info.json" + with open(info_path) as f: + partial_infos.append(json.load(f)) + + merged_per_task = [] + for info in partial_infos: + merged_per_task.extend(info.get("per_task", [])) + merged_per_task.sort(key=lambda x: (x["task_group"], x["task_id"])) + + # Merge videos from each shard into final output dir. + merged_videos_dir = root_output_dir / "videos" + for i in range(n_instances): + shard_dir = instances_root / str(i) + shard_videos = shard_dir / "videos" + if shard_videos.is_dir(): + shutil.copytree(shard_videos, merged_videos_dir, dirs_exist_ok=True) + old_prefix = str(shard_videos) + new_prefix = str(merged_videos_dir) + for entry in merged_per_task: + paths = entry.get("metrics", {}).get("video_paths", []) + entry["metrics"]["video_paths"] = [ + p.replace(old_prefix, new_prefix, 1) if p.startswith(old_prefix) else p for p in paths + ] + + merged_info = _aggregate_eval_from_per_task(merged_per_task, total_eval_s=time.time() - start_t) + _maybe_add_libero_plus_perturbation(merged_info, cfg) + print("Overall Aggregated Metrics:") + print(merged_info["overall"]) + + _save_eval_outputs(cfg, merged_info) + _maybe_push_eval_outputs(cfg, merged_info) logging.info("End of eval") @@ -763,6 +847,177 @@ class TaskMetrics(TypedDict): ACC_KEYS = ("sum_rewards", "max_rewards", "successes", "video_paths") +def _aggregate_eval_from_per_task(per_task_infos: list[dict], total_eval_s: float) -> dict: + """Aggregate eval metrics from per-task payloads.""" + group_acc: dict[str, dict[str, list]] = defaultdict(lambda: {k: [] for k in ACC_KEYS}) + overall: dict[str, list] = {k: [] for k in ACC_KEYS} + + def _append(group: str, key: str, value: Any): + if value is None: + return + if isinstance(value, list): + group_acc[group][key].extend(value) + overall[key].extend(value) + else: + group_acc[group][key].append(value) + overall[key].append(value) + + for entry in per_task_infos: + group = entry["task_group"] + metrics = entry["metrics"] + _append(group, "sum_rewards", metrics.get("sum_rewards")) + _append(group, "max_rewards", metrics.get("max_rewards")) + _append(group, "successes", metrics.get("successes")) + paths = metrics.get("video_paths", []) + if paths: + group_acc[group]["video_paths"].extend(paths) + overall["video_paths"].extend(paths) + + def _agg_from_list(xs: list[float]) -> float: + if not xs: + return float("nan") + arr = np.array(xs, dtype=float) + return float(np.nanmean(arr)) + + groups_aggregated = {} + for group, acc in group_acc.items(): + groups_aggregated[group] = { + "avg_sum_reward": _agg_from_list(acc["sum_rewards"]), + "avg_max_reward": _agg_from_list(acc["max_rewards"]), + "pc_success": _agg_from_list(acc["successes"]) * 100 if acc["successes"] else float("nan"), + "n_episodes": len(acc["sum_rewards"]), + "video_paths": list(acc["video_paths"]), + } + + overall_agg = { + "avg_sum_reward": _agg_from_list(overall["sum_rewards"]), + "avg_max_reward": _agg_from_list(overall["max_rewards"]), + "pc_success": _agg_from_list(overall["successes"]) * 100 if overall["successes"] else float("nan"), + "n_episodes": len(overall["sum_rewards"]), + "eval_s": total_eval_s, + "eval_ep_s": total_eval_s / max(1, len(overall["sum_rewards"])), + "video_paths": list(overall["video_paths"]), + } + + return {"per_task": per_task_infos, "per_group": groups_aggregated, "overall": overall_agg} + + +def _eval_task_batch( + batch: list[tuple[str, int, LazyVectorEnv]], + policy, + env_preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], + env_postprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], + preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], + postprocessor: PolicyProcessorPipeline[PolicyAction, PolicyAction], + start_seed: int | None, + max_episodes_rendered: int = 0, + videos_dir: Path | None = None, +) -> list[tuple[str, int, TaskMetrics]]: + """Evaluate N tasks in a single batched rollout for GPU efficiency. + + Each task contributes one sub-env to a combined SyncVectorEnv so the policy + processes all N observations in one forward pass per step. + """ + all_fns: list[Callable] = [] + task_slices: list[tuple[str, int, int, int]] = [] + offset = 0 + for task_group, task_id, lazy_env in batch: + fns = lazy_env.factory_fns + if not fns: + continue + start = offset + offset += len(fns) + all_fns.extend(fns) + task_slices.append((task_group, task_id, start, offset)) + + if not all_fns: + return [] + + env_cls = batch[0][2].env_cls + combined_env = env_cls(all_fns) + + try: + seeds = None + if start_seed is not None: + seeds = list(range(start_seed, start_seed + combined_env.num_envs)) + + ep_frames: list[np.ndarray] = [] + + def render_frame(env: gym.vector.VectorEnv): + if max_episodes_rendered <= 0: + return + n = min(max_episodes_rendered, env.num_envs) + if isinstance(env, gym.vector.SyncVectorEnv): + ep_frames.append(np.stack([env.envs[i].render() for i in range(n)])) + elif isinstance(env, gym.vector.AsyncVectorEnv): + ep_frames.append(np.stack(env.call("render")[:n])) + + rollout_data = rollout( + env=combined_env, + policy=policy, + env_preprocessor=env_preprocessor, + env_postprocessor=env_postprocessor, + preprocessor=preprocessor, + postprocessor=postprocessor, + seeds=seeds, + render_callback=render_frame if max_episodes_rendered > 0 else None, + ) + + n_steps = rollout_data["done"].shape[1] + done_indices = torch.argmax(rollout_data["done"].to(int), dim=1) + mask = (torch.arange(n_steps) <= einops.repeat(done_indices + 1, "b -> b s", s=n_steps)).int() + batch_sum_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "sum") + batch_max_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "max") + batch_successes = einops.reduce((rollout_data["success"] * mask), "b n -> b", "any") + + video_paths_per_task: dict[tuple[str, int], list[str]] = defaultdict(list) + if max_episodes_rendered > 0 and ep_frames and videos_dir: + stacked = np.stack(ep_frames, axis=1) # (batch, time, h, w, c) + rendered = 0 + threads = [] + for tg, tid, start_i, end_i in task_slices: + if rendered >= max_episodes_rendered: + break + task_dir = videos_dir / f"{tg}_{tid}" + task_dir.mkdir(parents=True, exist_ok=True) + for env_idx in range(start_i, end_i): + if rendered >= max_episodes_rendered: + break + episode_index = env_idx - start_i + video_path = task_dir / f"eval_episode_{episode_index}.mp4" + video_paths_per_task[(tg, tid)].append(str(video_path)) + di = done_indices[env_idx].item() + thread = threading.Thread( + target=write_video, + args=( + str(video_path), + stacked[env_idx, : di + 1], + combined_env.unwrapped.metadata["render_fps"], + ), + ) + thread.start() + threads.append(thread) + rendered += 1 + for t in threads: + t.join() + + results: list[tuple[str, int, TaskMetrics]] = [] + for tg, tid, start_i, end_i in task_slices: + results.append(( + tg, + tid, + TaskMetrics( + sum_rewards=batch_sum_rewards[start_i:end_i].tolist(), + max_rewards=batch_max_rewards[start_i:end_i].tolist(), + successes=batch_successes[start_i:end_i].tolist(), + video_paths=video_paths_per_task.get((tg, tid), []), + ), + )) + return results + finally: + combined_env.close() + + def eval_one( env: gym.vector.VectorEnv, *, @@ -807,7 +1062,7 @@ def eval_one( def run_one( task_group: str, task_id: int, - env, + env: Any, *, policy, env_preprocessor, @@ -851,7 +1106,7 @@ def run_one( def eval_policy_all( - envs: dict[str, dict[int, gym.vector.VectorEnv]], + envs: dict[str, dict[int, Any]], policy, env_preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], env_postprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], @@ -864,6 +1119,8 @@ def eval_policy_all( return_episode_data: bool = False, start_seed: int | None = None, max_parallel_tasks: int = 1, + instance_count: int = 1, + instance_id: int = 0, ) -> dict: """ Evaluate a nested `envs` dict: {task_group: {task_id: vec_env}}. @@ -876,6 +1133,13 @@ def eval_policy_all( # Flatten envs into list of (task_group, task_id, env) tasks = [(tg, tid, vec) for tg, group in envs.items() for tid, vec in group.items()] + if instance_count > 1: + total_tasks = len(tasks) + tasks = [task for idx, task in enumerate(tasks) if idx % instance_count == instance_id] + logging.info( + f"Instance shard {instance_id + 1}/{instance_count}: " + f"{len(tasks)}/{total_tasks} tasks assigned." + ) # accumulators: track metrics at both per-group level and across all groups group_acc: dict[str, dict[str, list]] = defaultdict(lambda: {k: [] for k in ACC_KEYS}) @@ -921,59 +1185,81 @@ def eval_policy_all( start_seed=start_seed, ) - if max_parallel_tasks <= 1: - # sequential path (single accumulator path on the main thread) - # NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks - for task_group, task_id, env in tasks: - tg, tid, metrics = task_runner(task_group, task_id, env) - _accumulate_to(tg, metrics) - per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) - else: - # threaded path: submit all tasks, consume completions on main thread and accumulate there - with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor: - fut2meta = {} - for task_group, task_id, env in tasks: - fut = executor.submit(task_runner, task_group, task_id, env) - fut2meta[fut] = (task_group, task_id) - for fut in cf.as_completed(fut2meta): - tg, tid, metrics = fut.result() + all_lazy = all(isinstance(env, LazyVectorEnv) for _, _, env in tasks) + single_factory_per_task = all( + not isinstance(env, LazyVectorEnv) or env.num_factory_fns == 1 for _, _, env in tasks + ) + can_batch = max_parallel_tasks > 1 and all_lazy and single_factory_per_task and n_episodes == 1 + + if can_batch: + # Multi-task batched path: combine N tasks into one SyncVectorEnv per chunk + # so the policy processes all N observations in a single forward pass per step. + chunk_size = max_parallel_tasks + logging.info(f"Task scheduler mode: batched_lazy (chunk_size={chunk_size})") + n_chunks = (len(tasks) + chunk_size - 1) // chunk_size + rendered_so_far = 0 + for chunk_idx in range(n_chunks): + chunk = tasks[chunk_idx * chunk_size : (chunk_idx + 1) * chunk_size] + render_budget = max(0, max_episodes_rendered - rendered_so_far) + logging.info( + f"Batch {chunk_idx + 1}/{n_chunks}: evaluating {len(chunk)} tasks " + f"({chunk_idx * chunk_size + 1}–{chunk_idx * chunk_size + len(chunk)}/{len(tasks)})" + ) + batch_results = _eval_task_batch( + chunk, + policy=policy, + env_preprocessor=env_preprocessor, + env_postprocessor=env_postprocessor, + preprocessor=preprocessor, + postprocessor=postprocessor, + start_seed=start_seed, + max_episodes_rendered=render_budget, + videos_dir=videos_dir, + ) + for tg, tid, metrics in batch_results: _accumulate_to(tg, metrics) per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) + rendered_so_far += len(metrics.get("video_paths", [])) - # compute aggregated metrics helper (robust to lists/scalars) - def _agg_from_list(xs): - if not xs: - return float("nan") - arr = np.array(xs, dtype=float) - return float(np.nanmean(arr)) + if overall["successes"]: + sr = np.nanmean(overall["successes"]) * 100 + logging.info(f" running success rate: {sr:.1f}%") + elif max_parallel_tasks <= 1: + logging.info("Task scheduler mode: sequential") + for task_group, task_id, env in tasks: + try: + tg, tid, metrics = task_runner(task_group, task_id, env) + _accumulate_to(tg, metrics) + per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) + finally: + env.close() + else: + # Threaded fallback for cases where batched lazy mode cannot be used. + if all_lazy and n_episodes != 1: + logging.info( + "Task scheduler mode: threaded (lazy batching disabled because n_episodes != 1)" + ) + elif all_lazy and not single_factory_per_task: + logging.info( + "Task scheduler mode: threaded (lazy batching disabled because eval.batch_size > 1)" + ) + else: + logging.info(f"Task scheduler mode: threaded (max_workers={max_parallel_tasks})") + with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor: + fut2meta: dict[cf.Future, tuple[str, int, Any]] = {} + for task_group, task_id, env in tasks: + fut = executor.submit(task_runner, task_group, task_id, env) + fut2meta[fut] = (task_group, task_id, env) + for fut in cf.as_completed(fut2meta): + tg, tid, env = fut2meta[fut] + try: + _, _, metrics = fut.result() + _accumulate_to(tg, metrics) + per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) + finally: + env.close() - # compute per-group aggregates - groups_aggregated = {} - for group, acc in group_acc.items(): - groups_aggregated[group] = { - "avg_sum_reward": _agg_from_list(acc["sum_rewards"]), - "avg_max_reward": _agg_from_list(acc["max_rewards"]), - "pc_success": _agg_from_list(acc["successes"]) * 100 if acc["successes"] else float("nan"), - "n_episodes": len(acc["sum_rewards"]), - "video_paths": list(acc["video_paths"]), - } - - # overall aggregates - overall_agg = { - "avg_sum_reward": _agg_from_list(overall["sum_rewards"]), - "avg_max_reward": _agg_from_list(overall["max_rewards"]), - "pc_success": _agg_from_list(overall["successes"]) * 100 if overall["successes"] else float("nan"), - "n_episodes": len(overall["sum_rewards"]), - "eval_s": time.time() - start_t, - "eval_ep_s": (time.time() - start_t) / max(1, len(overall["sum_rewards"])), - "video_paths": list(overall["video_paths"]), - } - - return { - "per_task": per_task_infos, - "per_group": groups_aggregated, - "overall": overall_agg, - } + return _aggregate_eval_from_per_task(per_task_infos, total_eval_s=time.time() - start_t) def main(): diff --git a/tests/envs/test_lazy_vec_env.py b/tests/envs/test_lazy_vec_env.py new file mode 100644 index 000000000..2f5137ff2 --- /dev/null +++ b/tests/envs/test_lazy_vec_env.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from lerobot.envs.lazy_vec_env import LazyVectorEnv + + +class _DummyVectorEnv: + def __init__(self): + self.marker = "ok" + self.closed = False + + def close(self): + self.closed = True + + +def test_lazy_vec_env_materializes_only_on_access(): + created = [] + + def _make_env(fns): + created.append(len(fns)) + return _DummyVectorEnv() + + lazy = LazyVectorEnv(_make_env, [lambda: None, lambda: None]) + assert created == [] + assert lazy.num_factory_fns == 2 + + assert lazy.marker == "ok" + assert created == [2] + + # Second access should re-use the same materialized env. + assert lazy.marker == "ok" + assert created == [2] + + +def test_lazy_vec_env_can_rematerialize_after_close(): + created = [] + + def _make_env(fns): + created.append(len(fns)) + return _DummyVectorEnv() + + lazy = LazyVectorEnv(_make_env, [lambda: None]) + lazy.materialize() + assert created == [1] + + lazy.close() + lazy.materialize() + assert created == [1, 1] + diff --git a/tests/scripts/test_eval_scheduler.py b/tests/scripts/test_eval_scheduler.py new file mode 100644 index 000000000..ac2f534e5 --- /dev/null +++ b/tests/scripts/test_eval_scheduler.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from lerobot.envs.lazy_vec_env import LazyVectorEnv +from lerobot.scripts import lerobot_eval + + +class _DummyTaskEnv: + def __init__(self): + self.close_calls = 0 + + def close(self): + self.close_calls += 1 + + +class _TrackedLazyEnv(LazyVectorEnv): + def __init__(self, n_factory_fns: int = 1): + super().__init__(lambda fns: None, [lambda: None for _ in range(n_factory_fns)]) + self.close_calls = 0 + + def close(self): + self.close_calls += 1 + super().close() + + +def _fake_metrics(): + return { + "sum_rewards": [1.0], + "max_rewards": [1.0], + "successes": [True], + "video_paths": [], + } + + +def test_eval_policy_all_sequential_closes_envs(monkeypatch): + def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001 + return task_group, task_id, _fake_metrics() + + monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one) + env_a = _DummyTaskEnv() + env_b = _DummyTaskEnv() + envs = {"suite": {0: env_a, 1: env_b}} + + result = lerobot_eval.eval_policy_all( + envs=envs, + policy=None, + env_preprocessor=None, + env_postprocessor=None, + preprocessor=None, + postprocessor=None, + n_episodes=1, + max_parallel_tasks=1, + ) + + assert env_a.close_calls == 1 + assert env_b.close_calls == 1 + assert result["overall"]["n_episodes"] == 2 + + +def test_eval_policy_all_threaded_fallback_closes_envs(monkeypatch): + def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001 + return task_group, task_id, _fake_metrics() + + monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one) + env_a = _DummyTaskEnv() + env_b = _DummyTaskEnv() + env_c = _DummyTaskEnv() + envs = {"suite": {0: env_a, 1: env_b, 2: env_c}} + + result = lerobot_eval.eval_policy_all( + envs=envs, + policy=None, + env_preprocessor=None, + env_postprocessor=None, + preprocessor=None, + postprocessor=None, + n_episodes=1, + max_parallel_tasks=2, + ) + + assert env_a.close_calls == 1 + assert env_b.close_calls == 1 + assert env_c.close_calls == 1 + assert result["overall"]["n_episodes"] == 3 + + +def test_eval_policy_all_uses_batched_lazy_mode(monkeypatch): + def _run_one_should_not_be_called(*args, **kwargs): + raise AssertionError("run_one should not run in batched lazy mode") + + chunk_sizes = [] + + def _fake_eval_task_batch(chunk, **kwargs): # noqa: ARG001 + chunk_sizes.append(len(chunk)) + return [(tg, tid, _fake_metrics()) for tg, tid, _ in chunk] + + monkeypatch.setattr(lerobot_eval, "run_one", _run_one_should_not_be_called) + monkeypatch.setattr(lerobot_eval, "_eval_task_batch", _fake_eval_task_batch) + + envs = { + "suite": { + 0: LazyVectorEnv(lambda fns: None, [lambda: None]), + 1: LazyVectorEnv(lambda fns: None, [lambda: None]), + 2: LazyVectorEnv(lambda fns: None, [lambda: None]), + } + } + + result = lerobot_eval.eval_policy_all( + envs=envs, + policy=None, + env_preprocessor=None, + env_postprocessor=None, + preprocessor=None, + postprocessor=None, + n_episodes=1, + max_parallel_tasks=2, + ) + + assert chunk_sizes == [2, 1] + assert result["overall"]["n_episodes"] == 3 + + +def test_eval_policy_all_disables_batched_lazy_when_n_episodes_not_one(monkeypatch): + def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001 + return task_group, task_id, _fake_metrics() + + def _batch_should_not_run(*args, **kwargs): + raise AssertionError("_eval_task_batch should not run when n_episodes != 1") + + monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one) + monkeypatch.setattr(lerobot_eval, "_eval_task_batch", _batch_should_not_run) + + env_a = _TrackedLazyEnv() + env_b = _TrackedLazyEnv() + envs = {"suite": {0: env_a, 1: env_b}} + + result = lerobot_eval.eval_policy_all( + envs=envs, + policy=None, + env_preprocessor=None, + env_postprocessor=None, + preprocessor=None, + postprocessor=None, + n_episodes=2, + max_parallel_tasks=2, + ) + + assert env_a.close_calls == 1 + assert env_b.close_calls == 1 + assert result["overall"]["n_episodes"] == 2 + + +def test_eval_policy_all_disables_batched_lazy_when_batch_size_above_one(monkeypatch): + def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001 + return task_group, task_id, _fake_metrics() + + def _batch_should_not_run(*args, **kwargs): + raise AssertionError("_eval_task_batch should not run when eval.batch_size > 1") + + monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one) + monkeypatch.setattr(lerobot_eval, "_eval_task_batch", _batch_should_not_run) + + env_a = _TrackedLazyEnv(n_factory_fns=2) + env_b = _TrackedLazyEnv(n_factory_fns=2) + envs = {"suite": {0: env_a, 1: env_b}} + + result = lerobot_eval.eval_policy_all( + envs=envs, + policy=None, + env_preprocessor=None, + env_postprocessor=None, + preprocessor=None, + postprocessor=None, + n_episodes=1, + max_parallel_tasks=2, + ) + + assert env_a.close_calls == 1 + assert env_b.close_calls == 1 + assert result["overall"]["n_episodes"] == 2 + + +def test_eval_policy_all_applies_instance_sharding(monkeypatch): + called = [] + + def _fake_run_one(task_group, task_id, env, **kwargs): # noqa: ARG001 + called.append(task_id) + return task_group, task_id, _fake_metrics() + + monkeypatch.setattr(lerobot_eval, "run_one", _fake_run_one) + envs = {"suite": {0: _DummyTaskEnv(), 1: _DummyTaskEnv(), 2: _DummyTaskEnv(), 3: _DummyTaskEnv()}} + + result = lerobot_eval.eval_policy_all( + envs=envs, + policy=None, + env_preprocessor=None, + env_postprocessor=None, + preprocessor=None, + postprocessor=None, + n_episodes=1, + max_parallel_tasks=1, + instance_count=2, + instance_id=1, + ) + + assert called == [1, 3] + assert result["overall"]["n_episodes"] == 2 + + +def test_aggregate_eval_from_per_task_merges_groups_and_overall(): + per_task = [ + { + "task_group": "a", + "task_id": 0, + "metrics": {"sum_rewards": [1.0], "max_rewards": [2.0], "successes": [True], "video_paths": ["v0"]}, + }, + { + "task_group": "b", + "task_id": 1, + "metrics": {"sum_rewards": [3.0], "max_rewards": [4.0], "successes": [False], "video_paths": []}, + }, + ] + + merged = lerobot_eval._aggregate_eval_from_per_task(per_task, total_eval_s=10.0) + + assert merged["overall"]["n_episodes"] == 2 + assert merged["overall"]["avg_sum_reward"] == 2.0 + assert merged["overall"]["pc_success"] == 50.0 + assert merged["overall"]["eval_s"] == 10.0 + assert set(merged["per_group"]) == {"a", "b"} +