feat(envs): lazy env init + AsyncVectorEnv as default for n_envs > 1 (#3274)

* docs(benchmarks): add benchmark integration guide and standardize benchmark docs

Add a comprehensive guide for adding new benchmarks to LeRobot, and
refactor the existing LIBERO and Meta-World docs to follow the new
standardized template.

Made-with: Cursor

* refactor(envs): move dispatch logic from factory into EnvConfig subclasses

Replace hardcoded if/elif chains in factory.py with create_envs() and
get_env_processors() methods on EnvConfig. New benchmarks now only need
to register a config subclass — no factory.py edits required.

Net -23 lines: factory.py shrinks from ~200 to ~70 lines of logic.

Made-with: Cursor

* docs(benchmarks): clean up adding-benchmarks guide for clarity

Rewrite for simpler language, better structure, and easier navigation.
Move quick-reference table to the top, fold eval explanation into
architecture section, condense the doc template to a bulleted outline.

Made-with: Cursor

* fix link

* fix task count

* fix: enable SmolVLA eval on LIBERO with custom camera mappings

- Thread camera_name_mapping from LiberoEnv config through to gym envs
- Sync features_map with camera_name_mapping in LiberoEnv.__post_init__
- Fix render() to use first available camera instead of hardcoded "image"
- Handle non-dict final_info in rollout by falling back to info["is_success"]
- Add use_peft legacy field to SmolVLAConfig for checkpoint compat
- Add defaults to GR00TN15Config init=False fields for transformers 5.3

Made-with: Cursor

* fix: use direct AutoresetMode import for gymnasium compat

Made-with: Cursor

* fix: handle gymnasium < 1.0 without AutoresetMode

Made-with: Cursor

* refactor: revert policy changes, keep env-only camera mapping fixes

- Revert GR00T N1.5 default_factory/default changes (transformers compat)
- Revert SmolVLA use_peft legacy field
- Apply ruff formatting fixes
- camera_name_mapping stays entirely in env/eval layer (no policy changes)

Made-with: Cursor

* Update docs/source/env_processor.mdx

Co-authored-by: Khalil Meftah <khalil.meftah@huggingface.co>
Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* feat(envs): lazy env init + AsyncVectorEnv as default for n_envs > 1

LiberoEnv and MetaworldEnv previously allocated GPU resources (EGL context,
OpenGL framebuffer) in __init__, before AsyncVectorEnv's fork(). Worker
processes inherited stale GPU handles, causing EGL_BAD_CONTEXT crashes on
first render.

Fix: defer OffScreenRenderEnv / MT1 construction to _ensure_env(), called on
first reset() or step() inside the worker subprocess. Each worker creates its
own clean context after fork().

Also fixes lerobot_eval.py:170 (add_envs_task TODO): replace with
env.call("task") which works with both SyncVectorEnv and AsyncVectorEnv.

AsyncVectorEnv is now the default for n_envs > 1; auto-downgraded to
SyncVectorEnv when n_envs=1 (no benefit, less overhead).

Expected speedup: ~15-20x for LIBERO Spatial with batch_size=50.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: close envs between tasks to prevent worker process accumulation

eval_policy_all never closed environments after each task completed,
causing AsyncVectorEnv worker processes to accumulate (N_tasks × n_envs).
This led to OOM, BrokenPipeError and EOFError on multi-task benchmarks.

Also fixes:
- AsyncVectorEnv compat in envs/utils.py (use get_attr/call instead of .envs)
- Tuple task handling in tokenizer_processor and lerobot_eval
- _LazyAsyncVectorEnv for deferred worker spawning in LIBERO

Made-with: Cursor

* fix(eval): use task_description instead of task for language conditioning

env.call("task") returns the LIBERO task name with underscores
(e.g. "pick_up_the_black_bowl_...") instead of the natural language
description ("pick up the black bowl ..."). The VLM tokenizes these
completely differently, causing 0.0 reward across all episodes.

Made-with: Cursor

* docs: update adding_benchmarks for async env changes

- Replace add_envs_task reference with env.call("task_description")
- Update use_async_envs default to True
- Add note about lazy GPU init for AsyncVectorEnv compatibility

Made-with: Cursor

* feat(eval): batch_size=auto + faster env loading

- batch_size=0 (default) auto-tunes based on CPU cores, capped by
  n_episodes and 64. Removes the need for users to guess the right
  value. The old batch_size > n_episodes error is replaced by silently
  clamping to n_episodes.
- _LazyAsyncVectorEnv accepts pre-computed spaces so only one temp env
  is created per suite (not per task). For libero_spatial (10 tasks)
  this avoids 9 redundant LiberoEnv instantiations during env setup.

Made-with: Cursor

* docs: add evaluation guide and update benchmarks doc

- New docs/source/evaluation.mdx covering lerobot-eval usage, batch_size
  auto-tuning, AsyncVectorEnv performance, tuning tips, output format,
  multi-task evaluation, and programmatic usage.
- Add evaluation page to _toctree.yml under Benchmarks section.
- Update adding_benchmarks.mdx to reference batch_size auto default and
  link to the evaluation guide.

Made-with: Cursor

* docs(evaluation): remove benchmark table, rename section header

Made-with: Cursor

* perf(eval): shared memory, observation passthrough, task prefetch

- AsyncVectorEnv now uses shared_memory=True for zero-copy observation transfer
- LiberoEnvConfig.gym_kwargs passes observation_height/width to the env
- eval_policy_all prefetches next task's workers while current task runs

Made-with: Cursor

* style: ruff format

Made-with: Cursor

* chore: revert env_processor.mdx changes (not part of this PR)

Made-with: Cursor

* ci(benchmarks): add isolated integration tests for libero and metaworld

Each benchmark gets its own Docker image (lerobot[libero] / lerobot[metaworld]
only) so incompatible dep trees cannot collide. A 1-episode smoke eval runs
per benchmark on GPU runners.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* ci(benchmarks): pin action hashes and use uv sync --locked

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* ci(benchmarks): trigger only on envs/ or lerobot_eval.py changes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(ci): set LIBERO_DATA_FOLDER to bypass interactive stdin prompt

libero/__init__.py calls input() to ask about a custom dataset path,
which raises EOFError when stdin is closed inside Docker. Setting
LIBERO_DATA_FOLDER skips the prompt entirely.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs(benchmarks): add CI smoke test step to adding_benchmarks guide

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(ci): pre-create libero config in Dockerfile to bypass stdin prompt

libero/__init__.py calls input() when ~/.libero/config.yaml is missing.
We write the config at image build time (without importing libero) so
the prompt never fires at runtime. Also trigger CI on pyproject.toml changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(ci): use shell to create libero config instead of multiline python -c

The multiline RUN python -c "..." was being parsed as Dockerfile
instructions. Use printf to write ~/.libero/config.yaml directly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(ci): point libero config to bundled package init_files

The config was pointing to /tmp/libero_init which doesn't exist.
Use importlib.util.find_spec to locate the hf-libero package directory
and write paths to the actual bundled bddl_files/init_files/assets.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(ci): add smolvla extra to benchmark Dockerfiles

num2words (required by SmolVLM processor) is declared in lerobot[smolvla],
not lerobot[libero/metaworld]. Install both extras together.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(eval): render_frame covers _LazyAsyncVectorEnv

isinstance(env, AsyncVectorEnv) silently skipped _LazyAsyncVectorEnv,
causing video rendering to produce no frames on the default async path.
Switch to hasattr(env, "call") so any async-compatible env (including
_LazyAsyncVectorEnv) hits the call("render") branch.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(envs): remove unused _get_sub_env_attr helper

_get_sub_env_attr was defined but never called anywhere in the codebase.
_sub_env_has_attr (its sibling) is kept — it is actively used in utils.py.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* chore: apply prettier formatting to docs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs(env_processor): remove deprecated add_envs_task from pipeline example

add_envs_task is replaced by env.call("task_description") in this PR.
Remove it from the pipeline walkthrough and renumber the steps (8→7).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(envs): remove __del__ from _LazyAsyncVectorEnv

__del__ is unreliable as a cleanup mechanism. close() is already called
explicitly in the eval loop's finally block, so the finalizer is redundant.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(eval): prefetch next task's workers after close to avoid GPU memory overlap

Previously, next task's AsyncVectorEnv workers were spawned while the
current task was still running, causing both tasks' GPU contexts to coexist.
Moving the prefetch start into the finally block (after env.close()) ensures
workers for task N+1 only spin up once task N has released GPU memory.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(envs): move _LazyAsyncVectorEnv to utils and apply to metaworld

_LazyAsyncVectorEnv lived in libero.py but metaworld had the same OOM
problem: all tasks' AsyncVectorEnv workers were spawned eagerly, wasting
GPU memory for tasks not yet running.

Move the class to envs/utils.py so both environments share it, then apply
the same is_async + lazy wrapping pattern in create_metaworld_envs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* chore: remove out-of-scope benchmark/CI/docs files from PR

Benchmark CI workflow, Dockerfiles, benchmark docs, evaluation smoke-test
doc, and dispatch tests belong in a separate PR. Scope this PR to the
async env init changes only.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* chore: restore adding_benchmarks + test_dispatch, drop env_processor changes

- Restore docs/source/adding_benchmarks.mdx (belongs in this PR)
- Restore tests/envs/test_dispatch.py (belongs in this PR)
- Revert docs/source/env_processor.mdx to main (out of scope for this PR)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs(adding_benchmarks): remove CI smoke test step (coming in separate PR)

Step 7 (Dockerfile + benchmark_tests.yml CI job) and its table rows are
out of scope for this PR. The CI infrastructure will be added on top in a
follow-up PR.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(envs): remove unused add_envs_task

Replaced by env.call("task_description") in lerobot_eval.py. No callers
remain in the codebase.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* style: fix prettier formatting in env_processor.mdx

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(eval): catch AttributeError and NotImplementedError explicitly for task description

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(envs): use forkserver context and close envs in test to prevent deadlock

AsyncVectorEnv with default fork context leaks worker processes between
test_policy parametrized cases; subsequent env creation deadlocks because
new forked workers inherit stale pipe FDs from previous test's leaked workers.

- configs.py: pass context="forkserver" to AsyncVectorEnv (matches _LazyAsyncVectorEnv)
- test_policies.py: call close_envs(envs) at end of test_policy to clean up workers

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(envs): default use_async_envs=False in create_envs and make_env

Tests that call make_env(n_envs=2) without passing use_async_envs were
getting AsyncVectorEnv, whose forked workers can't resolve gym namespaces
registered at runtime. Default to False (sync) so existing tests pass.

lerobot_eval.py explicitly passes cfg.eval.use_async_envs, so the CLI
async behaviour (controlled by EvalConfig.use_async_envs) is unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com>
Co-authored-by: Khalil Meftah <khalil.meftah@huggingface.co>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-04-09 10:29:20 +02:00
committed by GitHub
parent 5de7aa5a4f
commit 919184d6f8
13 changed files with 331 additions and 159 deletions
+6 -4
View File
@@ -26,7 +26,7 @@ During evaluation, data moves through four stages:
1. gym.Env ──→ raw observations (numpy dicts)
2. Preprocessing ──→ standard LeRobot keys + task description
(preprocess_observation, add_envs_task in envs/utils.py)
(preprocess_observation in envs/utils.py, env.call("task_description"))
3. Processors ──→ env-specific then policy-specific transforms
(env_preprocessor, policy_preprocessor)
@@ -161,6 +161,8 @@ class MyBenchmarkEnv(gym.Env):
...
```
**GPU-based simulators (e.g. MuJoCo with EGL rendering):** If your simulator allocates GPU/EGL contexts during `__init__`, defer that allocation to a `_ensure_env()` helper called on first `reset()`/`step()`. This avoids inheriting stale GPU handles when `AsyncVectorEnv` spawns worker processes. See `LiberoEnv._ensure_env()` for the pattern.
Also provide a factory function that returns the nested dict structure:
```python
@@ -207,7 +209,7 @@ class MyBenchmarkEnvConfig(EnvConfig):
def gym_kwargs(self) -> dict:
return {"obs_type": self.obs_type, "render_mode": self.render_mode}
def create_envs(self, n_envs: int, use_async_envs: bool = False):
def create_envs(self, n_envs: int, use_async_envs: bool = True):
"""Override for multi-task benchmarks or custom env creation."""
from lerobot.envs.<benchmark> import create_<benchmark>_envs
return create_<benchmark>_envs(task=self.task, n_envs=n_envs, ...)
@@ -299,7 +301,7 @@ After completing the steps above, confirm that everything works:
1. **Install** — `pip install -e ".[mybenchmark]"` and verify the dependency group installs cleanly.
2. **Smoke test env creation** — call `make_env()` with your config in Python, check that the returned dict has the expected `{suite: {task_id: VectorEnv}}` shape, and that `reset()` returns observations with the right keys.
3. **Run a full eval** — `lerobot-eval --env.type=<name> --env.task=<task> --eval.n_episodes=1 --eval.batch_size=1 --policy.path=<any_compatible_policy>` to exercise the full pipeline end-to-end.
3. **Run a full eval** — `lerobot-eval --env.type=<name> --env.task=<task> --eval.n_episodes=1 --policy.path=<any_compatible_policy>` to exercise the full pipeline end-to-end. (`batch_size` defaults to auto-tuning based on CPU cores; pass `--eval.batch_size=1` to force a single environment.)
4. **Check success detection** — verify that `info["is_success"]` flips to `True` when the task is actually completed. This is what the eval loop uses to compute success rates.
## Writing a benchmark doc page
@@ -311,7 +313,7 @@ Each benchmark `.mdx` page should include:
- **Overview image or GIF.**
- **Available tasks** — table of task suites with counts and brief descriptions.
- **Installation** — `pip install -e ".[<benchmark>]"` plus any extra steps (env vars, system packages).
- **Evaluation** — recommended `lerobot-eval` command with `n_episodes` and `batch_size` for reproducible results. Include single-task and multi-task examples if applicable.
- **Evaluation** — recommended `lerobot-eval` command with `n_episodes` for reproducible results. `batch_size` defaults to auto; only specify it if needed. Include single-task and multi-task examples if applicable.
- **Policy inputs and outputs** — observation keys with shapes, action space description.
- **Recommended evaluation episodes** — how many episodes per task is standard.
- **Training** — example `lerobot-train` command.
+60 -35
View File
@@ -88,7 +88,7 @@ policy_preprocessor = NormalizerProcessorStep(stats=dataset_stats)
The same policy can work with different environment processors, and the same environment processor can work with different policies:
```python
````python
# Use SmolVLA policy with LIBERO environment
# Use SmolVLA policy with LIBERO environment
libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(
@@ -102,7 +102,20 @@ libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(
policy_cfg=act_cfg,
)
act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg)
```
```python
# Use SmolVLA policy with LIBERO environment
libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(
env_cfg=libero_cfg,
policy_cfg=smolvla_cfg,
)
smolvla_preprocessor, smolvla_postprocessor = make_pre_post_processors(smolvla_cfg)
# Or use ACT policy with the same LIBERO environment
libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(
env_cfg=libero_cfg,
policy_cfg=act_cfg,
)
act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg)
### 3. **Easier Experimentation**
@@ -132,7 +145,7 @@ class LiberoVelocityProcessorStep(ObservationProcessorStep):
state = torch.cat([eef_pos, eef_axisangle, eef_vel,
gripper_pos, gripper_vel], dim=-1) # 14D
return state
```
````
### 4. **Cleaner Environment Code**
@@ -157,7 +170,7 @@ observation = {
### Factory Function
The `make_env_pre_post_processors` function delegates to `env_cfg.get_env_processors()`:
The `make_env_pre_post_processors` function follows the same pattern as `make_pre_post_processors` for policies:
```python
from lerobot.envs.factory import make_env_pre_post_processors
@@ -165,30 +178,46 @@ from lerobot.envs.configs import LiberoEnv, PushtEnv
# For LIBERO: Returns LiberoProcessorStep in preprocessor
libero_cfg = LiberoEnv(task="libero_spatial", camera_name=["agentview"])
env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg, policy_cfg)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg)
# For other environments: Returns identity processors (no-op)
pusht_cfg = PushtEnv()
env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg, policy_cfg)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg)
```
### How It Works
Each `EnvConfig` subclass can override `get_env_processors()` to return benchmark-specific
processor pipelines. The base class returns identity (no-op) processors by default.
### Implementation in `envs/factory.py`
```python
# In your EnvConfig subclass:
def get_env_processors(self):
from lerobot.processor.pipeline import PolicyProcessorPipeline
return (
PolicyProcessorPipeline(steps=[MyProcessorStep()]),
PolicyProcessorPipeline(steps=[]),
)
```
def make_env_pre_post_processors(
env_cfg: EnvConfig,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
]:
"""
Create preprocessor and postprocessor pipelines for environment observations.
The factory function `make_env_pre_post_processors` simply delegates to this method,
with a special case for `XVLAConfig` policies which override the env processors entirely.
Args:
env_cfg: The configuration of the environment.
Returns:
A tuple containing:
- preprocessor: Pipeline that processes environment observations
- postprocessor: Pipeline that processes environment outputs
"""
# For LIBERO environments, add the LiberoProcessorStep to preprocessor
if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()])
else:
# For all other environments, return an identity preprocessor
preprocessor = PolicyProcessorPipeline(steps=[])
# Postprocessor is currently identity for all environments
# Future: Could add environment-specific action transformations
postprocessor = PolicyProcessorPipeline(steps=[])
return preprocessor, postprocessor
```
### Integration in Evaluation
@@ -209,10 +238,7 @@ def eval_main(cfg: EvalPipelineConfig):
)
# Create environment processors (NEW!)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(
env_cfg=cfg.env,
policy_cfg=cfg.policy,
)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env)
# Run evaluation with both processor types
eval_policy_all(
@@ -319,19 +345,18 @@ class MyEnvProcessorStep(ObservationProcessorStep):
### 2. Update Your `EnvConfig` Subclass
```python
# In src/lerobot/envs/configs.py
@EnvConfig.register_subclass("myenv")
@dataclass
class MyEnvConfig(EnvConfig):
# ... task/features/gym kwargs ...
# In src/lerobot/envs/factory.py
def get_env_processors(self):
from lerobot.processor.pipeline import PolicyProcessorPipeline
def make_env_pre_post_processors(env_cfg: EnvConfig):
if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()])
elif isinstance(env_cfg, MyEnvConfig) or "myenv" in env_cfg.type:
preprocessor = PolicyProcessorPipeline(steps=[MyEnvProcessorStep()])
else:
preprocessor = PolicyProcessorPipeline(steps=[])
return (
PolicyProcessorPipeline(steps=[MyEnvProcessorStep()]),
PolicyProcessorPipeline(steps=[]),
)
postprocessor = PolicyProcessorPipeline(steps=[])
return preprocessor, postprocessor
```
### 3. Use in Evaluation
+1 -1
View File
@@ -2,7 +2,7 @@
Meta-World is an open-source simulation benchmark for **multi-task and meta reinforcement learning** in continuous-control robotic manipulation. It bundles 50 diverse manipulation tasks using everyday objects and a common tabletop Sawyer arm, providing a standardized playground to test whether algorithms can learn many different tasks and generalize quickly to new ones.
- Paper: [Meta-World: A Benchmark and Evaluation for Multi-Task and Meta Reinforcement Learning](https://arxiv.org/abs/1910.10897)
- Paper: [Meta-World: A Benchmark and Evaluation for Multi-Task and Meta Reinforcement Learning paper](https://arxiv.org/abs/1910.10897)
- GitHub: [Farama-Foundation/Metaworld](https://github.com/Farama-Foundation/Metaworld)
- Project website: [metaworld.farama.org](https://metaworld.farama.org)
+17 -10
View File
@@ -65,20 +65,27 @@ class WandBConfig:
class EvalConfig:
n_episodes: int = 50
# `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv.
batch_size: int = 50
# Set to 0 for auto-tuning based on available CPU cores and n_episodes.
batch_size: int = 0
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
use_async_envs: bool = False
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
use_async_envs: bool = True
def __post_init__(self) -> None:
if self.batch_size == 0:
self.batch_size = self._auto_batch_size()
if self.batch_size > self.n_episodes:
raise ValueError(
"The eval batch size is greater than the number of eval episodes "
f"({self.batch_size} > {self.n_episodes}). As a result, {self.batch_size} "
f"eval environments will be instantiated, but only {self.n_episodes} will be used. "
"This might significantly slow down evaluation. To fix this, you should update your command "
f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={self.batch_size}`), "
f"or lower the batch size (e.g. `eval.batch_size={self.n_episodes}`)."
)
self.batch_size = self.n_episodes
def _auto_batch_size(self) -> int:
"""Pick batch_size based on CPU cores, capped by n_episodes."""
import math
import os
cpu_cores = os.cpu_count() or 4
# Each async env worker needs ~1 core; leave headroom for main process + inference.
by_cpu = max(1, math.floor(cpu_cores * 0.7))
return min(by_cpu, self.n_episodes, 64)
@dataclass
+24 -6
View File
@@ -44,6 +44,13 @@ from lerobot.utils.constants import (
)
def _make_vec_env_cls(use_async: bool, n_envs: int):
"""Return the right VectorEnv constructor."""
if use_async and n_envs > 1:
return gym.vector.AsyncVectorEnv
return gym.vector.SyncVectorEnv
@dataclass
class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
task: str | None = None
@@ -80,8 +87,9 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
"""Create {suite: {task_id: VectorEnv}}.
Default: single-task env via gym.make(). Multi-task benchmarks override.
AsyncVectorEnv is the default for n_envs > 1; auto-downgraded to Sync for n_envs=1.
"""
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
if self.gym_id not in gym_registry:
print(f"gym id '{self.gym_id}' not found, attempting to import '{self.package_name}'...")
@@ -101,12 +109,17 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
def _make_one():
return gym.make(self.gym_id, disable_env_checker=self.disable_env_checker, **self.gym_kwargs)
extra_kwargs: dict = {}
if env_cls is gym.vector.AsyncVectorEnv:
extra_kwargs["context"] = "forkserver"
try:
from gymnasium.vector import AutoresetMode
vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=AutoresetMode.SAME_STEP)
vec = env_cls(
[_make_one for _ in range(n_envs)], autoreset_mode=AutoresetMode.SAME_STEP, **extra_kwargs
)
except ImportError:
vec = env_cls([_make_one for _ in range(n_envs)])
vec = env_cls([_make_one for _ in range(n_envs)], **extra_kwargs)
return {self.type: {0: vec}}
def get_env_processors(self):
@@ -394,7 +407,12 @@ class LiberoEnv(EnvConfig):
@property
def gym_kwargs(self) -> dict:
kwargs: dict[str, Any] = {"obs_type": self.obs_type, "render_mode": self.render_mode}
kwargs: dict[str, Any] = {
"obs_type": self.obs_type,
"render_mode": self.render_mode,
"observation_height": self.observation_height,
"observation_width": self.observation_width,
}
if self.task_ids is not None:
kwargs["task_ids"] = self.task_ids
return kwargs
@@ -404,7 +422,7 @@ class LiberoEnv(EnvConfig):
if self.task is None:
raise ValueError("LiberoEnv requires a task to be specified")
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
env_cls = _make_vec_env_cls(use_async_envs, n_envs)
return create_libero_envs(
task=self.task,
n_envs=n_envs,
@@ -473,7 +491,7 @@ class MetaworldEnv(EnvConfig):
if self.task is None:
raise ValueError("MetaWorld requires a task to be specified")
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
env_cls = _make_vec_env_cls(use_async_envs, n_envs)
return create_metaworld_envs(
task=self.task,
n_envs=n_envs,
+51 -19
View File
@@ -29,6 +29,7 @@ from gymnasium import spaces
from libero.libero import benchmark, get_libero_path
from libero.libero.envs import OffScreenRenderEnv
from lerobot.envs.utils import _LazyAsyncVectorEnv
from lerobot.types import RobotObservation
@@ -150,7 +151,17 @@ class LiberoEnv(gym.Env):
self.init_state_id = self.episode_index # tie each sub-env to a fixed init state
self._env = self._make_envs_task(task_suite, self.task_id)
# Extract task metadata without allocating GPU resources (safe before fork).
task = task_suite.get_task(task_id)
self.task = task.name
self.task_description = task.language
self._task_bddl_file = os.path.join(
get_libero_path("bddl_files"), task.problem_folder, task.bddl_file
)
self._env: OffScreenRenderEnv | None = (
None # deferred — created on first reset() inside the worker subprocess
)
default_steps = 500
self._max_episode_steps = (
TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps)
@@ -221,29 +232,33 @@ class LiberoEnv(gym.Env):
low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32
)
def _ensure_env(self) -> None:
"""Create the underlying OffScreenRenderEnv on first use.
Called inside the worker subprocess after fork(), so each worker gets
its own clean EGL context rather than inheriting a stale one from the
parent process (which causes EGL_BAD_CONTEXT crashes with AsyncVectorEnv).
"""
if self._env is not None:
return
env = OffScreenRenderEnv(
bddl_file_name=self._task_bddl_file,
camera_heights=self.observation_height,
camera_widths=self.observation_width,
)
env.reset()
self._env = env
def render(self):
self._ensure_env()
raw_obs = self._env.env._get_observations()
pixels = self._format_raw_obs(raw_obs)["pixels"]
image = next(iter(pixels.values()))
image = image[::-1, ::-1] # flip both H and W for visualization
return image
def _make_envs_task(self, task_suite: Any, task_id: int = 0):
task = task_suite.get_task(task_id)
self.task = task.name
self.task_description = task.language
task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file)
env_args = {
"bddl_file_name": task_bddl_file,
"camera_heights": self.observation_height,
"camera_widths": self.observation_width,
}
env = OffScreenRenderEnv(**env_args)
env.reset()
return env
def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation:
assert self._env is not None, "_format_raw_obs called before _ensure_env()"
images = {}
for camera_name in self.camera_name:
image = raw_obs[camera_name]
@@ -295,6 +310,7 @@ class LiberoEnv(gym.Env):
)
def reset(self, seed=None, **kwargs):
self._ensure_env()
super().reset(seed=seed)
self._env.seed(seed)
raw_obs = self._env.reset()
@@ -321,6 +337,8 @@ class LiberoEnv(gym.Env):
return observation, info
def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]:
self._ensure_env()
assert self._env is not None
if action.ndim != 1:
raise ValueError(
f"Expected action to be 1-D (shape (action_dim,)), "
@@ -345,7 +363,8 @@ class LiberoEnv(gym.Env):
return observation, reward, terminated, truncated, info
def close(self):
self._env.close()
if self._env is not None:
self._env.close()
def _make_env_fns(
@@ -428,6 +447,8 @@ def create_libero_envs(
if task_ids_filter is not None:
print(f"Restricting to task_ids={task_ids_filter}")
is_async = env_cls is gym.vector.AsyncVectorEnv
out: dict[str, dict[int, Any]] = defaultdict(dict)
for suite_name in suite_names:
suite = _get_suite(suite_name)
@@ -436,6 +457,11 @@ def create_libero_envs(
if not selected:
raise ValueError(f"No tasks selected for suite '{suite_name}' (available: {total}).")
# All tasks in a suite share identical observation/action spaces.
# Probe once and reuse to avoid creating a temp env per task.
cached_obs_space: spaces.Space | None = None
cached_act_space: spaces.Space | None = None
for tid in selected:
fns = _make_env_fns(
suite=suite,
@@ -449,8 +475,14 @@ def create_libero_envs(
control_mode=control_mode,
camera_name_mapping=camera_name_mapping,
)
out[suite_name][tid] = env_cls(fns)
if is_async:
lazy = _LazyAsyncVectorEnv(fns, cached_obs_space, cached_act_space)
if cached_obs_space is None:
cached_obs_space = lazy.observation_space
cached_act_space = lazy.action_space
out[suite_name][tid] = lazy
else:
out[suite_name][tid] = env_cls(fns)
print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}")
# return plain dicts for predictability
return {suite: dict(task_map) for suite, task_map in out.items()}
+38 -18
View File
@@ -25,6 +25,7 @@ import metaworld.policies as policies
import numpy as np
from gymnasium import spaces
from lerobot.envs.utils import _LazyAsyncVectorEnv
from lerobot.types import RobotObservation
# ---- Load configuration data from the external JSON file ----
@@ -97,8 +98,9 @@ class MetaworldEnv(gym.Env):
self.visualization_height = visualization_height
self.camera_name = camera_name
self._env = self._make_envs_task(self.task)
self._max_episode_steps = self._env.max_path_length
self._env_name = self.task # already stripped of "metaworld-" prefix above
self._env = None # deferred — created on first reset() inside the worker subprocess
self._max_episode_steps = 500 # MT1 environments always have max_path_length=500
self.task_description = TASK_DESCRIPTIONS[self.task]
self.expert_policy = TASK_POLICY_MAPPING[self.task]()
@@ -136,6 +138,24 @@ class MetaworldEnv(gym.Env):
self.action_space = spaces.Box(low=-1, high=1, shape=(ACTION_DIM,), dtype=np.float32)
def _ensure_env(self) -> None:
"""Create the underlying MetaWorld env on first use.
Called inside the worker subprocess after fork(), so each worker gets
its own clean rendering context rather than inheriting a stale one from
the parent process (which causes crashes with AsyncVectorEnv).
"""
if self._env is not None:
return
mt1 = metaworld.MT1(self._env_name, seed=42)
env = mt1.train_classes[self._env_name](render_mode="rgb_array", camera_name=self.camera_name)
env.set_task(mt1.train_tasks[0])
if self.camera_name == "corner2":
env.model.cam_pos[2] = [0.75, 0.075, 0.7]
env.reset()
env._freeze_rand_vec = False # otherwise no randomization
self._env = env
def render(self) -> np.ndarray:
"""
Render the current environment frame.
@@ -143,26 +163,13 @@ class MetaworldEnv(gym.Env):
Returns:
np.ndarray: The rendered RGB image from the environment.
"""
self._ensure_env()
image = self._env.render()
if self.camera_name == "corner2":
# Images from this camera are flipped — correct them
image = np.flip(image, (0, 1))
return image
def _make_envs_task(self, env_name: str):
mt1 = metaworld.MT1(env_name, seed=42)
env = mt1.train_classes[env_name](render_mode="rgb_array", camera_name=self.camera_name)
env.set_task(mt1.train_tasks[0])
if self.camera_name == "corner2":
env.model.cam_pos[2] = [
0.75,
0.075,
0.7,
] # corner2 position, similar to https://arxiv.org/pdf/2206.14244
env.reset()
env._freeze_rand_vec = False # otherwise no randomization
return env
def _format_raw_obs(self, raw_obs: np.ndarray) -> RobotObservation:
image = None
if self._env is not None:
@@ -209,6 +216,7 @@ class MetaworldEnv(gym.Env):
observation (RobotObservation): The initial formatted observation.
info (Dict[str, Any]): Additional info about the reset state.
"""
self._ensure_env()
super().reset(seed=seed)
raw_obs, info = self._env.reset(seed=seed)
@@ -232,6 +240,7 @@ class MetaworldEnv(gym.Env):
truncated (bool): Whether the episode was truncated due to a time limit.
info (Dict[str, Any]): Additional environment info.
"""
self._ensure_env()
if action.ndim != 1:
raise ValueError(
f"Expected action to be 1-D (shape (action_dim,)), "
@@ -263,7 +272,8 @@ class MetaworldEnv(gym.Env):
return observation, reward, terminated, truncated, info
def close(self):
self._env.close()
if self._env is not None:
self._env.close()
# ---- Main API ----------------------------------------------------------------
@@ -297,6 +307,9 @@ def create_metaworld_envs(
print(f"Creating Meta-World envs | task_groups={task_groups} | n_envs(per task)={n_envs}")
is_async = env_cls is gym.vector.AsyncVectorEnv
cached_obs_space = None
cached_act_space = None
out: dict[str, dict[int, Any]] = defaultdict(dict)
for group in task_groups:
@@ -309,7 +322,14 @@ def create_metaworld_envs(
# build n_envs factories
fns = [(lambda tn=task_name: MetaworldEnv(task=tn, **gym_kwargs)) for _ in range(n_envs)]
out[group][tid] = env_cls(fns)
if is_async:
lazy = _LazyAsyncVectorEnv(fns, cached_obs_space, cached_act_space)
if cached_obs_space is None:
cached_obs_space = lazy.observation_space
cached_act_space = lazy.action_space
out[group][tid] = lazy
else:
out[group][tid] = env_cls(fns)
# return a plain dict for consistency
return {group: dict(task_map) for group, task_map in out.items()}
+65 -45
View File
@@ -16,7 +16,7 @@
import importlib.util
import os
import warnings
from collections.abc import Mapping, Sequence
from collections.abc import Callable, Mapping, Sequence
from functools import singledispatch
from typing import Any
@@ -29,7 +29,6 @@ from torch import Tensor
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.envs.configs import EnvConfig
from lerobot.types import RobotObservation
from lerobot.utils.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE, OBS_STR
from lerobot.utils.utils import get_channel_first_image_shape
@@ -130,59 +129,80 @@ def env_to_policy_features(env_cfg: EnvConfig) -> dict[str, PolicyFeature]:
return policy_features
def are_all_envs_same_type(env: gym.vector.VectorEnv) -> bool:
first_type = type(env.envs[0]) # Get type of first env
return all(type(e) is first_type for e in env.envs) # Fast type check
def _sub_env_has_attr(env: gym.vector.VectorEnv, attr: str) -> bool:
try:
env.get_attr(attr)
return True
except (AttributeError, Exception):
return False
class _LazyAsyncVectorEnv:
"""Defers AsyncVectorEnv creation until first use.
Creating all tasks' AsyncVectorEnvs upfront spawns N_tasks × n_envs worker
processes, all of which allocate EGL/GPU resources immediately. Since tasks
are evaluated sequentially, only one task's workers need to be alive at a
time. This wrapper stores the factory functions and creates the real
AsyncVectorEnv on first reset()/step()/call(), keeping peak process count = n_envs.
"""
def __init__(
self,
env_fns: list[Callable],
observation_space=None,
action_space=None,
):
self._env_fns = env_fns
self._env: gym.vector.AsyncVectorEnv | None = None
self.num_envs = len(env_fns)
if observation_space is not None and action_space is not None:
self.observation_space = observation_space
self.action_space = action_space
else:
tmp = env_fns[0]()
self.observation_space = tmp.observation_space
self.action_space = tmp.action_space
tmp.close()
self.single_observation_space = self.observation_space
self.single_action_space = self.action_space
def _ensure(self) -> None:
if self._env is None:
self._env = gym.vector.AsyncVectorEnv(self._env_fns, context="forkserver", shared_memory=True)
def reset(self, **kwargs):
self._ensure()
return self._env.reset(**kwargs)
def step(self, actions):
self._ensure()
return self._env.step(actions)
def call(self, name, *args, **kwargs):
self._ensure()
return self._env.call(name, *args, **kwargs)
def get_attr(self, name):
self._ensure()
return self._env.get_attr(name)
def close(self) -> None:
if self._env is not None:
self._env.close()
self._env = None
def check_env_attributes_and_types(env: gym.vector.VectorEnv) -> None:
with warnings.catch_warnings():
warnings.simplefilter("once", UserWarning) # Apply filter only in this function
warnings.simplefilter("once", UserWarning)
if not (hasattr(env.envs[0], "task_description") and hasattr(env.envs[0], "task")):
if not (_sub_env_has_attr(env, "task_description") and _sub_env_has_attr(env, "task")):
warnings.warn(
"The environment does not have 'task_description' and 'task'. Some policies require these features.",
UserWarning,
stacklevel=2,
)
if not are_all_envs_same_type(env):
warnings.warn(
"The environments have different types. Make sure you infer the right task from each environment. Empty task will be passed instead.",
UserWarning,
stacklevel=2,
)
def add_envs_task(env: gym.vector.VectorEnv, observation: RobotObservation) -> RobotObservation:
"""Adds task feature to the observation dict with respect to the first environment attribute."""
if hasattr(env.envs[0], "task_description"):
task_result = env.call("task_description")
if isinstance(task_result, tuple):
task_result = list(task_result)
if not isinstance(task_result, list):
raise TypeError(f"Expected task_description to return a list, got {type(task_result)}")
if not all(isinstance(item, str) for item in task_result):
raise TypeError("All items in task_description result must be strings")
observation["task"] = task_result
elif hasattr(env.envs[0], "task"):
task_result = env.call("task")
if isinstance(task_result, tuple):
task_result = list(task_result)
if not isinstance(task_result, list):
raise TypeError(f"Expected task to return a list, got {type(task_result)}")
if not all(isinstance(item, str) for item in task_result):
raise TypeError("All items in task result must be strings")
observation["task"] = task_result
else: # For envs without language instructions, e.g. aloha transfer cube and etc.
num_envs = observation[list(observation.keys())[0]].shape[0]
observation["task"] = ["" for _ in range(num_envs)]
return observation
def _close_single_env(env: Any) -> None:
+2 -2
View File
@@ -136,8 +136,8 @@ class TokenizerProcessorStep(ObservationProcessorStep):
# Standardize to a list of strings for the tokenizer
if isinstance(task, str):
return [task]
elif isinstance(task, list) and all(isinstance(t, str) for t in task):
return task
elif isinstance(task, (list, tuple)) and all(isinstance(t, str) for t in task):
return list(task)
return None
+39 -17
View File
@@ -73,7 +73,6 @@ from lerobot.configs import parser
from lerobot.configs.eval import EvalPipelineConfig
from lerobot.envs.factory import make_env, make_env_pre_post_processors
from lerobot.envs.utils import (
add_envs_task,
check_env_attributes_and_types,
close_envs,
preprocess_observation,
@@ -166,9 +165,15 @@ def rollout(
if return_observations:
all_observations.append(deepcopy(observation))
# Infer "task" from attributes of environments.
# TODO: works with SyncVectorEnv but not AsyncVectorEnv
observation = add_envs_task(env, observation)
# Infer "task" from sub-environments (prefer natural language description).
# env.call() works with both SyncVectorEnv and AsyncVectorEnv.
try:
observation["task"] = list(env.call("task_description"))
except (AttributeError, NotImplementedError):
try:
observation["task"] = list(env.call("task"))
except (AttributeError, NotImplementedError):
observation["task"] = [""] * env.num_envs
# Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO)
observation = env_preprocessor(observation)
@@ -318,8 +323,9 @@ def eval_policy(
n_to_render_now = min(max_episodes_rendered - n_episodes_rendered, env.num_envs)
if isinstance(env, gym.vector.SyncVectorEnv):
ep_frames.append(np.stack([env.envs[i].render() for i in range(n_to_render_now)])) # noqa: B023
elif isinstance(env, gym.vector.AsyncVectorEnv):
elif hasattr(env, "call"):
# Here we must render all frames and discard any we don't need.
# Covers AsyncVectorEnv and _LazyAsyncVectorEnv (which wraps one).
ep_frames.append(np.stack(env.call("render")[:n_to_render_now]))
if max_episodes_rendered > 0:
@@ -521,7 +527,7 @@ def eval_main(cfg: EvalPipelineConfig):
logging.info(colored("Output dir:", "yellow", attrs=["bold"]) + f" {cfg.output_dir}")
logging.info("Making environment.")
logging.info(f"Making environment (batch_size={cfg.eval.batch_size}, async={cfg.eval.use_async_envs}).")
envs = make_env(
cfg.env,
n_envs=cfg.eval.batch_size,
@@ -755,23 +761,39 @@ def eval_policy_all(
)
if max_parallel_tasks <= 1:
# sequential path (single accumulator path on the main thread)
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
for task_group, task_id, env in tasks:
tg, tid, metrics = task_runner(task_group, task_id, env)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
prefetch_thread: threading.Thread | None = None
for i, (task_group, task_id, env) in enumerate(tasks):
if prefetch_thread is not None:
prefetch_thread.join()
prefetch_thread = None
try:
tg, tid, metrics = task_runner(task_group, task_id, env)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
# Prefetch next task's workers *after* closing current env to prevent
# GPU memory overlap between consecutive tasks.
if i + 1 < len(tasks):
next_env = tasks[i + 1][2]
if hasattr(next_env, "_ensure"):
prefetch_thread = threading.Thread(target=next_env._ensure, daemon=True)
prefetch_thread.start()
else:
# threaded path: submit all tasks, consume completions on main thread and accumulate there
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
fut2meta = {}
for task_group, task_id, env in tasks:
fut = executor.submit(task_runner, task_group, task_id, env)
fut2meta[fut] = (task_group, task_id)
fut2meta[fut] = (task_group, task_id, env)
for fut in cf.as_completed(fut2meta):
tg, tid, metrics = fut.result()
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
tg, tid, env = fut2meta[fut]
try:
tg, tid, metrics = fut.result()
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
# compute aggregated metrics helper (robust to lists/scalars)
def _agg_from_list(xs):
+1 -1
View File
@@ -90,7 +90,7 @@ def test_base_create_envs():
envs = _Env().create_envs(n_envs=2)
assert "_dispatch_base_test" in envs
env = envs["_dispatch_base_test"][0]
assert isinstance(env, gym.vector.SyncVectorEnv)
assert isinstance(env, gym.vector.VectorEnv)
assert env.num_envs == 2
env.close()
finally:
+3 -1
View File
@@ -31,7 +31,7 @@ from lerobot.datasets.factory import make_dataset
from lerobot.datasets.feature_utils import dataset_to_policy_features
from lerobot.datasets.utils import cycle
from lerobot.envs.factory import make_env, make_env_config
from lerobot.envs.utils import preprocess_observation
from lerobot.envs.utils import close_envs, preprocess_observation
from lerobot.optim.factory import make_optimizer_and_scheduler
from lerobot.policies.act.configuration_act import ACTConfig
from lerobot.policies.act.modeling_act import ACTTemporalEnsembler
@@ -224,6 +224,8 @@ def test_policy(ds_repo_id, env_name, env_kwargs, policy_name, policy_kwargs):
# Test step through policy
env.step(action)
close_envs(envs)
# TODO(rcadene, aliberts): This test is quite end-to-end. Move this test in test_optimizer?
def test_act_backbone_lr():
@@ -189,6 +189,30 @@ def test_list_of_strings_tokenization(mock_auto_tokenizer):
assert attention_mask.shape == (2, 8)
@require_package("transformers")
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
def test_tuple_of_strings_tokenization(mock_auto_tokenizer):
"""Test tokenization of a tuple of strings (returned by VectorEnv.call())."""
mock_tokenizer = MockTokenizer(vocab_size=100)
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8)
transition = create_transition(
observation={"state": torch.tensor([1.0, 2.0])},
action=torch.tensor([0.1, 0.2]),
complementary_data={"task": ("pick up cube", "place on table")},
)
result = processor(transition)
observation = result[TransitionKey.OBSERVATION]
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
assert tokens.shape == (2, 8)
assert attention_mask.shape == (2, 8)
@require_package("transformers")
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
def test_custom_keys(mock_auto_tokenizer):