Compare commits

...

19 Commits

Author SHA1 Message Date
Pepijn 66276f1efd feat(eval): thread-safe policy copies for max_parallel_tasks > 1
eval_policy_all already supports running multiple task groups concurrently via
ThreadPoolExecutor, but policy.reset() was not thread-safe: all threads shared
the same policy object and its mutable state (action queues, temporal buffers).

Fix: each thread receives a shallow copy of the policy. copy.copy() creates a
new Python object whose _parameters dict is a shared reference — same tensor
storage, zero extra VRAM — while reset() rebinds per-episode state to fresh
objects per thread.

Caveat: ACT with temporal_ensemble_coeff is not safe with this approach (its
reset() mutates a shared sub-object). Keep max_parallel_tasks=1 for that config.

For MetaWorld (50 tasks, no temporal ensembling), max_parallel_tasks=4 raises
GPU utilization from ~20% to ~60-80% with no additional VRAM cost.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:43:42 +02:00
Pepijn 5972a85ec7 feat(eval): episode sharding, parallel launcher, and autotune
Add lerobot-eval-parallel and lerobot-eval-autotune entry points for
multi-process evaluation. A single H100 running 4 shards of SmolVLA
achieves ~100% GPU utilisation vs ~0.5% with the serial baseline.

- EvalConfig: add shard_id / num_shards fields; validate ranges
- lerobot_eval.py: _shard_episodes() splits n_episodes round-robin;
  eval_main uses per-shard n_episodes + seed offset; writes
  shard_K_of_N.json when num_shards > 1
- lerobot_eval_parallel.py: spawns K subprocesses with disjoint shard
  IDs, sets MUJOCO_GL and OMP_NUM_THREADS, merges results on completion
- lerobot_eval_autotune.py: probes GPU VRAM, CPU cores, optional model
  footprint and env step time; derives optimal num_shards / batch_size /
  MUJOCO_GL; prints a paste-ready command
- pyproject.toml: register lerobot-eval-parallel and lerobot-eval-autotune

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:43:03 +02:00
Pepijn Kooijmans 800b0a5f26 docs: update adding_benchmarks for async env changes
- Replace add_envs_task reference with env.call("task_description")
- Update use_async_envs default to True
- Add note about lazy GPU init for AsyncVectorEnv compatibility

Made-with: Cursor
2026-04-07 13:38:37 +02:00
Pepijn Kooijmans 6aeb7c54f9 fix(eval): use task_description instead of task for language conditioning
env.call("task") returns the LIBERO task name with underscores
(e.g. "pick_up_the_black_bowl_...") instead of the natural language
description ("pick up the black bowl ..."). The VLM tokenizes these
completely differently, causing 0.0 reward across all episodes.

Made-with: Cursor
2026-04-07 13:12:42 +02:00
Pepijn Kooijmans 1f7e7b4a90 fix: close envs between tasks to prevent worker process accumulation
eval_policy_all never closed environments after each task completed,
causing AsyncVectorEnv worker processes to accumulate (N_tasks × n_envs).
This led to OOM, BrokenPipeError and EOFError on multi-task benchmarks.

Also fixes:
- AsyncVectorEnv compat in envs/utils.py (use get_attr/call instead of .envs)
- Tuple task handling in tokenizer_processor and lerobot_eval
- _LazyAsyncVectorEnv for deferred worker spawning in LIBERO

Made-with: Cursor
2026-04-07 12:30:22 +02:00
Pepijn 681cc59ed2 feat(envs): lazy env init + AsyncVectorEnv as default for n_envs > 1
LiberoEnv and MetaworldEnv previously allocated GPU resources (EGL context,
OpenGL framebuffer) in __init__, before AsyncVectorEnv's fork(). Worker
processes inherited stale GPU handles, causing EGL_BAD_CONTEXT crashes on
first render.

Fix: defer OffScreenRenderEnv / MT1 construction to _ensure_env(), called on
first reset() or step() inside the worker subprocess. Each worker creates its
own clean context after fork().

Also fixes lerobot_eval.py:170 (add_envs_task TODO): replace with
env.call("task") which works with both SyncVectorEnv and AsyncVectorEnv.

AsyncVectorEnv is now the default for n_envs > 1; auto-downgraded to
SyncVectorEnv when n_envs=1 (no benefit, less overhead).

Expected speedup: ~15-20x for LIBERO Spatial with batch_size=50.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 11:31:32 +02:00
Pepijn Kooijmans d9edc12e00 refactor: revert policy changes, keep env-only camera mapping fixes
- Revert GR00T N1.5 default_factory/default changes (transformers compat)
- Revert SmolVLA use_peft legacy field
- Apply ruff formatting fixes
- camera_name_mapping stays entirely in env/eval layer (no policy changes)

Made-with: Cursor
2026-04-07 11:25:49 +02:00
Pepijn Kooijmans fd2bad9b42 fix: handle gymnasium < 1.0 without AutoresetMode
Made-with: Cursor
2026-04-07 11:20:38 +02:00
Pepijn Kooijmans 7e729e33c9 fix: use direct AutoresetMode import for gymnasium compat
Made-with: Cursor
2026-04-07 11:19:17 +02:00
Pepijn Kooijmans e383207a15 fix: enable SmolVLA eval on LIBERO with custom camera mappings
- Thread camera_name_mapping from LiberoEnv config through to gym envs
- Sync features_map with camera_name_mapping in LiberoEnv.__post_init__
- Fix render() to use first available camera instead of hardcoded "image"
- Handle non-dict final_info in rollout by falling back to info["is_success"]
- Add use_peft legacy field to SmolVLAConfig for checkpoint compat
- Add defaults to GR00TN15Config init=False fields for transformers 5.3

Made-with: Cursor
2026-04-07 11:18:29 +02:00
Pepijn 8ed658c6aa fix(tests): fix 3 failing dispatch tests
- test_registry_all_types: skip non-EnvConfig stubs (e.g. TestPluginConfig)
- test_processors_delegation: use None instead of abstract PreTrainedConfig
- test_custom_get_env_processors_override: use DataProcessorPipeline for isinstance check (PolicyProcessorPipeline is a subscripted generic)

Made-with: Cursor
2026-04-03 17:19:27 +02:00
Pepijn 0045f88355 merge: resolve conflicts from main into refactor/benchmark-dispatch
Keep refactored dispatch pattern (no factory.py edits for new benchmarks).
Incorporate main's "Verifying your integration" section and class naming fix.

Made-with: Cursor
2026-04-03 14:49:36 +02:00
Pepijn 89ce91f69f Merge branch 'docs/adding-benchmarks-guide' into refactor/benchmark-dispatch 2026-04-03 13:56:49 +02:00
Pepijn 90e614f6b9 fix task count 2026-04-03 13:48:37 +02:00
Pepijn ff4f860e5d fix link 2026-04-03 13:47:17 +02:00
Pepijn 6f2823bfc4 merge: resolve conflicts with docs/adding-benchmarks-guide
Incorporate cleaner writing from the docs branch while reflecting the
refactored dispatch pattern (no factory.py edits needed for new benchmarks).

Made-with: Cursor
2026-04-03 13:45:12 +02:00
Pepijn 77415559b8 docs(benchmarks): clean up adding-benchmarks guide for clarity
Rewrite for simpler language, better structure, and easier navigation.
Move quick-reference table to the top, fold eval explanation into
architecture section, condense the doc template to a bulleted outline.

Made-with: Cursor
2026-04-03 13:36:16 +02:00
Pepijn 24d9b74d81 refactor(envs): move dispatch logic from factory into EnvConfig subclasses
Replace hardcoded if/elif chains in factory.py with create_envs() and
get_env_processors() methods on EnvConfig. New benchmarks now only need
to register a config subclass — no factory.py edits required.

Net -23 lines: factory.py shrinks from ~200 to ~70 lines of logic.

Made-with: Cursor
2026-04-03 13:23:44 +02:00
Pepijn 508358749a docs(benchmarks): add benchmark integration guide and standardize benchmark docs
Add a comprehensive guide for adding new benchmarks to LeRobot, and
refactor the existing LIBERO and Meta-World docs to follow the new
standardized template.

Made-with: Cursor
2026-04-02 20:43:31 +02:00
15 changed files with 1043 additions and 310 deletions
+39 -46
View File
@@ -26,7 +26,7 @@ During evaluation, data moves through four stages:
1. gym.Env ──→ raw observations (numpy dicts)
2. Preprocessing ──→ standard LeRobot keys + task description
(preprocess_observation, add_envs_task in envs/utils.py)
(preprocess_observation in envs/utils.py, env.call("task_description"))
3. Processors ──→ env-specific then policy-specific transforms
(env_preprocessor, policy_preprocessor)
@@ -115,23 +115,22 @@ Each `EnvConfig` subclass declares two dicts that tell the policy what to expect
## Step by step
<Tip>
At minimum, you need three files: a **gym.Env wrapper**, an **EnvConfig
subclass**, and a **factory dispatch branch**. Everything else is optional or
documentation.
At minimum, you need two files: a **gym.Env wrapper** and an **EnvConfig
subclass** with a `create_envs()` override. Everything else is optional or
documentation. No changes to `factory.py` are needed.
</Tip>
### Checklist
| File | Required | Why |
| ---------------------------------------- | -------- | ----------------------------------------- |
| `src/lerobot/envs/<benchmark>.py` | Yes | Wraps the simulator as a standard gym.Env |
| `src/lerobot/envs/configs.py` | Yes | Registers your benchmark for the CLI |
| `src/lerobot/envs/factory.py` | Yes | Tells `make_env()` how to build your envs |
| `src/lerobot/processor/env_processor.py` | Optional | Custom observation/action transforms |
| `src/lerobot/envs/utils.py` | Optional | Only if you need new raw observation keys |
| `pyproject.toml` | Yes | Declares benchmark-specific dependencies |
| `docs/source/<benchmark>.mdx` | Yes | User-facing documentation page |
| `docs/source/_toctree.yml` | Yes | Adds your page to the docs sidebar |
| File | Required | Why |
| ---------------------------------------- | -------- | ------------------------------------------------------------ |
| `src/lerobot/envs/<benchmark>.py` | Yes | Wraps the simulator as a standard gym.Env |
| `src/lerobot/envs/configs.py` | Yes | Registers your benchmark and its `create_envs()` for the CLI |
| `src/lerobot/processor/env_processor.py` | Optional | Custom observation/action transforms |
| `src/lerobot/envs/utils.py` | Optional | Only if you need new raw observation keys |
| `pyproject.toml` | Yes | Declares benchmark-specific dependencies |
| `docs/source/<benchmark>.mdx` | Yes | User-facing documentation page |
| `docs/source/_toctree.yml` | Yes | Adds your page to the docs sidebar |
### 1. The gym.Env wrapper (`src/lerobot/envs/<benchmark>.py`)
@@ -162,6 +161,8 @@ class MyBenchmarkEnv(gym.Env):
...
```
**GPU-based simulators (e.g. MuJoCo with EGL rendering):** If your simulator allocates GPU/EGL contexts during `__init__`, defer that allocation to a `_ensure_env()` helper called on first `reset()`/`step()`. This avoids inheriting stale GPU handles when `AsyncVectorEnv` spawns worker processes. See `LiberoEnv._ensure_env()` for the pattern.
Also provide a factory function that returns the nested dict structure:
```python
@@ -179,7 +180,10 @@ See `create_libero_envs()` (multi-suite, multi-task) and `create_metaworld_envs(
### 2. The config (`src/lerobot/envs/configs.py`)
Register a config dataclass so users can select your benchmark with `--env.type=<name>`:
Register a config dataclass so users can select your benchmark with `--env.type=<name>`. Each config owns its environment creation and processor logic via two methods:
- **`create_envs(n_envs, use_async_envs)`** — Returns `{suite: {task_id: VectorEnv}}`. The base class default uses `gym.make()` for single-task envs. Multi-task benchmarks override this.
- **`get_env_processors()`** — Returns `(preprocessor, postprocessor)`. The base class default returns identity (no-op) pipelines. Override if your benchmark needs observation/action transforms.
```python
@EnvConfig.register_subclass("<benchmark_name>")
@@ -204,6 +208,20 @@ class MyBenchmarkEnvConfig(EnvConfig):
@property
def gym_kwargs(self) -> dict:
return {"obs_type": self.obs_type, "render_mode": self.render_mode}
def create_envs(self, n_envs: int, use_async_envs: bool = True):
"""Override for multi-task benchmarks or custom env creation."""
from lerobot.envs.<benchmark> import create_<benchmark>_envs
return create_<benchmark>_envs(task=self.task, n_envs=n_envs, ...)
def get_env_processors(self):
"""Override if your benchmark needs observation/action transforms."""
from lerobot.processor.pipeline import PolicyProcessorPipeline
from lerobot.processor.env_processor import MyBenchmarkProcessorStep
return (
PolicyProcessorPipeline(steps=[MyBenchmarkProcessorStep()]),
PolicyProcessorPipeline(steps=[]),
)
```
Key points:
@@ -211,36 +229,11 @@ Key points:
- The `register_subclass` name is what users pass on the CLI (`--env.type=<name>`).
- `features` tells the policy what the environment produces.
- `features_map` maps raw observation keys to LeRobot convention keys.
- **No changes to `factory.py` needed** — the factory delegates to `cfg.create_envs()` and `cfg.get_env_processors()` automatically.
### 3. The factory dispatch (`src/lerobot/envs/factory.py`)
### 3. Env processor (optional — `src/lerobot/processor/env_processor.py`)
Add a branch in `make_env()` to call your factory function:
```python
elif "<benchmark_name>" in cfg.type:
from lerobot.envs.<benchmark> import create_<benchmark>_envs
if cfg.task is None:
raise ValueError("<BenchmarkName> requires a task to be specified")
return create_<benchmark>_envs(
task=cfg.task,
n_envs=n_envs,
gym_kwargs=cfg.gym_kwargs,
env_cls=env_cls,
)
```
If your benchmark needs an env processor, add it in `make_env_pre_post_processors()`:
```python
if isinstance(env_cfg, MyBenchmarkEnvConfig) or "<benchmark_name>" in env_cfg.type:
preprocessor_steps.append(MyBenchmarkProcessorStep())
```
### 4. Env processor (optional — `src/lerobot/processor/env_processor.py`)
Only needed if your benchmark requires observation transforms beyond what `preprocess_observation()` handles (e.g. image flipping, coordinate conversion):
Only needed if your benchmark requires observation transforms beyond what `preprocess_observation()` handles (e.g. image flipping, coordinate conversion). Define the processor step here and return it from `get_env_processors()` in your config (see step 2):
```python
@dataclass
@@ -260,7 +253,7 @@ class MyBenchmarkProcessorStep(ObservationProcessorStep):
See `LiberoProcessorStep` for a full example (image rotation, quaternion-to-axis-angle conversion).
### 5. Dependencies (`pyproject.toml`)
### 4. Dependencies (`pyproject.toml`)
Add a new optional-dependency group:
@@ -281,11 +274,11 @@ Users install with:
pip install -e ".[mybenchmark]"
```
### 6. Documentation (`docs/source/<benchmark>.mdx`)
### 5. Documentation (`docs/source/<benchmark>.mdx`)
Write a user-facing page following the template in the next section. See `docs/source/libero.mdx` and `docs/source/metaworld.mdx` for full examples.
### 7. Table of contents (`docs/source/_toctree.yml`)
### 6. Table of contents (`docs/source/_toctree.yml`)
Add your benchmark to the "Benchmarks" section:
+17 -33
View File
@@ -151,7 +151,7 @@ observation = {
### Factory Function
The `make_env_pre_post_processors` function follows the same pattern as `make_pre_post_processors` for policies:
The `make_env_pre_post_processors` function delegates to `env_cfg.get_env_processors()`:
```python
from lerobot.envs.factory import make_env_pre_post_processors
@@ -159,47 +159,31 @@ from lerobot.envs.configs import LiberoEnv, PushtEnv
# For LIBERO: Returns LiberoProcessorStep in preprocessor
libero_cfg = LiberoEnv(task="libero_spatial", camera_name=["agentview"])
env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg, policy_cfg)
# For other environments: Returns identity processors (no-op)
pusht_cfg = PushtEnv()
env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg, policy_cfg)
```
### Implementation in `envs/factory.py`
### How It Works
Each `EnvConfig` subclass can override `get_env_processors()` to return benchmark-specific
processor pipelines. The base class returns identity (no-op) processors by default.
```python
def make_env_pre_post_processors(
env_cfg: EnvConfig,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
]:
"""
Create preprocessor and postprocessor pipelines for environment observations.
Args:
env_cfg: The configuration of the environment.
Returns:
A tuple containing:
- preprocessor: Pipeline that processes environment observations
- postprocessor: Pipeline that processes environment outputs
"""
# For LIBERO environments, add the LiberoProcessorStep to preprocessor
if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()])
else:
# For all other environments, return an identity preprocessor
preprocessor = PolicyProcessorPipeline(steps=[])
# Postprocessor is currently identity for all environments
# Future: Could add environment-specific action transformations
postprocessor = PolicyProcessorPipeline(steps=[])
return preprocessor, postprocessor
# In your EnvConfig subclass:
def get_env_processors(self):
from lerobot.processor.pipeline import PolicyProcessorPipeline
return (
PolicyProcessorPipeline(steps=[MyProcessorStep()]),
PolicyProcessorPipeline(steps=[]),
)
```
The factory function `make_env_pre_post_processors` simply delegates to this method,
with a special case for `XVLAConfig` policies which override the env processors entirely.
### Integration in Evaluation
In `lerobot_eval.py`, the environment processors are created once and used throughout:
+2
View File
@@ -220,6 +220,8 @@ lerobot-replay="lerobot.scripts.lerobot_replay:main"
lerobot-setup-motors="lerobot.scripts.lerobot_setup_motors:main"
lerobot-teleoperate="lerobot.scripts.lerobot_teleoperate:main"
lerobot-eval="lerobot.scripts.lerobot_eval:main"
lerobot-eval-parallel="lerobot.scripts.lerobot_eval_parallel:main"
lerobot-eval-autotune="lerobot.scripts.lerobot_eval_autotune:main"
lerobot-train="lerobot.scripts.lerobot_train:main"
lerobot-train-tokenizer="lerobot.scripts.lerobot_train_tokenizer:main"
lerobot-dataset-viz="lerobot.scripts.lerobot_dataset_viz:main"
+13 -1
View File
@@ -67,7 +67,13 @@ class EvalConfig:
# `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv.
batch_size: int = 50
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
use_async_envs: bool = False
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
use_async_envs: bool = True
# Sharding: split n_episodes across independent processes.
# shard_id=0, num_shards=1 is the default (no sharding, existing behaviour).
# Set via lerobot_eval_parallel or manually: --eval.shard_id=K --eval.num_shards=N
shard_id: int = 0
num_shards: int = 1
def __post_init__(self) -> None:
if self.batch_size > self.n_episodes:
@@ -79,6 +85,12 @@ class EvalConfig:
f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={self.batch_size}`), "
f"or lower the batch size (e.g. `eval.batch_size={self.n_episodes}`)."
)
if self.num_shards < 1:
raise ValueError(f"`num_shards` must be >= 1, got {self.num_shards}")
if not (0 <= self.shard_id < self.num_shards):
raise ValueError(
f"`shard_id` must be in [0, num_shards), got shard_id={self.shard_id}, num_shards={self.num_shards}"
)
@dataclass
+117
View File
@@ -12,11 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import abc
import importlib
from dataclasses import dataclass, field, fields
from typing import Any
import draccus
import gymnasium as gym
from gymnasium.envs.registration import registry as gym_registry
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.robots import RobotConfig
@@ -39,6 +44,13 @@ from lerobot.utils.constants import (
)
def _make_vec_env_cls(use_async: bool, n_envs: int):
"""Return the right VectorEnv constructor."""
if use_async and n_envs > 1:
return gym.vector.AsyncVectorEnv
return gym.vector.SyncVectorEnv
@dataclass
class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
task: str | None = None
@@ -67,6 +79,50 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
def gym_kwargs(self) -> dict:
raise NotImplementedError()
def create_envs(
self,
n_envs: int,
use_async_envs: bool = True,
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
"""Create {suite: {task_id: VectorEnv}}.
Default: single-task env via gym.make(). Multi-task benchmarks override.
AsyncVectorEnv is the default for n_envs > 1; auto-downgraded to Sync for n_envs=1.
"""
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
if self.gym_id not in gym_registry:
print(f"gym id '{self.gym_id}' not found, attempting to import '{self.package_name}'...")
try:
importlib.import_module(self.package_name)
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
f"Package '{self.package_name}' required for env '{self.type}' not found. "
f"Please install it or check PYTHONPATH."
) from e
if self.gym_id not in gym_registry:
raise gym.error.NameNotFound(
f"Environment '{self.gym_id}' not registered even after importing '{self.package_name}'."
)
def _make_one():
return gym.make(self.gym_id, disable_env_checker=self.disable_env_checker, **self.gym_kwargs)
try:
from gymnasium.vector import AutoresetMode
vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=AutoresetMode.SAME_STEP)
except ImportError:
vec = env_cls([_make_one for _ in range(n_envs)])
return {self.type: {0: vec}}
def get_env_processors(self):
"""Return (preprocessor, postprocessor) for this env. Default: identity."""
from lerobot.processor.pipeline import PolicyProcessorPipeline
return PolicyProcessorPipeline(steps=[]), PolicyProcessorPipeline(steps=[])
@dataclass
class HubEnvConfig(EnvConfig):
@@ -338,6 +394,12 @@ class LiberoEnv(EnvConfig):
else:
raise ValueError(f"Unsupported obs_type: {self.obs_type}")
if self.camera_name_mapping is not None:
mapped_agentview = self.camera_name_mapping.get("agentview_image", "image")
mapped_eye_in_hand = self.camera_name_mapping.get("robot0_eye_in_hand_image", "image2")
self.features_map[LIBERO_KEY_PIXELS_AGENTVIEW] = f"{OBS_IMAGES}.{mapped_agentview}"
self.features_map[LIBERO_KEY_PIXELS_EYE_IN_HAND] = f"{OBS_IMAGES}.{mapped_eye_in_hand}"
@property
def gym_kwargs(self) -> dict:
kwargs: dict[str, Any] = {"obs_type": self.obs_type, "render_mode": self.render_mode}
@@ -345,6 +407,33 @@ class LiberoEnv(EnvConfig):
kwargs["task_ids"] = self.task_ids
return kwargs
def create_envs(self, n_envs: int, use_async_envs: bool = True):
from lerobot.envs.libero import create_libero_envs
if self.task is None:
raise ValueError("LiberoEnv requires a task to be specified")
env_cls = _make_vec_env_cls(use_async_envs, n_envs)
return create_libero_envs(
task=self.task,
n_envs=n_envs,
camera_name=self.camera_name,
init_states=self.init_states,
gym_kwargs=self.gym_kwargs,
env_cls=env_cls,
control_mode=self.control_mode,
episode_length=self.episode_length,
camera_name_mapping=self.camera_name_mapping,
)
def get_env_processors(self):
from lerobot.processor.env_processor import LiberoProcessorStep
from lerobot.processor.pipeline import PolicyProcessorPipeline
return (
PolicyProcessorPipeline(steps=[LiberoProcessorStep()]),
PolicyProcessorPipeline(steps=[]),
)
@EnvConfig.register_subclass("metaworld")
@dataclass
@@ -387,6 +476,19 @@ class MetaworldEnv(EnvConfig):
"render_mode": self.render_mode,
}
def create_envs(self, n_envs: int, use_async_envs: bool = True):
from lerobot.envs.metaworld import create_metaworld_envs
if self.task is None:
raise ValueError("MetaWorld requires a task to be specified")
env_cls = _make_vec_env_cls(use_async_envs, n_envs)
return create_metaworld_envs(
task=self.task,
n_envs=n_envs,
gym_kwargs=self.gym_kwargs,
env_cls=env_cls,
)
@EnvConfig.register_subclass("isaaclab_arena")
@dataclass
@@ -454,3 +556,18 @@ class IsaaclabArenaEnv(HubEnvConfig):
@property
def gym_kwargs(self) -> dict:
return {}
def get_env_processors(self):
from lerobot.processor.env_processor import IsaaclabArenaProcessorStep
from lerobot.processor.pipeline import PolicyProcessorPipeline
state_keys = tuple(k.strip() for k in (self.state_keys or "").split(",") if k.strip())
camera_keys = tuple(k.strip() for k in (self.camera_keys or "").split(",") if k.strip())
if not state_keys and not camera_keys:
raise ValueError("At least one of state_keys or camera_keys must be specified.")
return (
PolicyProcessorPipeline(
steps=[IsaaclabArenaProcessorStep(state_keys=state_keys, camera_keys=camera_keys)]
),
PolicyProcessorPipeline(steps=[]),
)
+21 -118
View File
@@ -13,96 +13,52 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
from __future__ import annotations
from typing import Any
import gymnasium as gym
from gymnasium.envs.registration import registry as gym_registry
from lerobot.configs.policies import PreTrainedConfig
from lerobot.envs.configs import AlohaEnv, EnvConfig, HubEnvConfig, IsaaclabArenaEnv, LiberoEnv, PushtEnv
from lerobot.envs.configs import EnvConfig, HubEnvConfig
from lerobot.envs.utils import _call_make_env, _download_hub_file, _import_hub_module, _normalize_hub_result
from lerobot.policies.xvla.configuration_xvla import XVLAConfig
from lerobot.processor import ProcessorStep
from lerobot.processor.env_processor import IsaaclabArenaProcessorStep, LiberoProcessorStep
from lerobot.processor.pipeline import PolicyProcessorPipeline
def make_env_config(env_type: str, **kwargs) -> EnvConfig:
if env_type == "aloha":
return AlohaEnv(**kwargs)
elif env_type == "pusht":
return PushtEnv(**kwargs)
elif env_type == "libero":
return LiberoEnv(**kwargs)
else:
raise ValueError(f"Policy type '{env_type}' is not available.")
try:
cls = EnvConfig.get_choice_class(env_type)
except KeyError as err:
raise ValueError(
f"Environment type '{env_type}' is not registered. "
f"Available: {list(EnvConfig.get_known_choices().keys())}"
) from err
return cls(**kwargs)
def make_env_pre_post_processors(
env_cfg: EnvConfig,
policy_cfg: PreTrainedConfig,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
]:
policy_cfg: Any,
) -> tuple[Any, Any]:
"""
Create preprocessor and postprocessor pipelines for environment observations.
This function creates processor pipelines that transform raw environment
observations and actions. By default, it returns identity processors that do nothing.
For specific environments like LIBERO, it adds environment-specific processing steps.
Args:
env_cfg: The configuration of the environment.
Returns:
A tuple containing:
- preprocessor: Pipeline that processes environment observations
- postprocessor: Pipeline that processes environment outputs (currently identity)
Returns a tuple of (preprocessor, postprocessor). By default, delegates to
``env_cfg.get_env_processors()``. The XVLAConfig policy-specific override
stays here because it depends on the *policy* config, not the env config.
"""
# Preprocessor and Postprocessor steps are Identity for most environments
preprocessor_steps: list[ProcessorStep] = []
postprocessor_steps: list[ProcessorStep] = []
from lerobot.policies.xvla.configuration_xvla import XVLAConfig
if isinstance(policy_cfg, XVLAConfig):
from lerobot.policies.xvla.processor_xvla import make_xvla_libero_pre_post_processors
return make_xvla_libero_pre_post_processors()
# For LIBERO environments, add the LiberoProcessorStep to preprocessor
if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
preprocessor_steps.append(LiberoProcessorStep())
# For Isaaclab Arena environments, add the IsaaclabArenaProcessorStep
if isinstance(env_cfg, IsaaclabArenaEnv) or "isaaclab_arena" in env_cfg.type:
# Parse comma-separated keys (handle None for state-based policies)
if env_cfg.state_keys:
state_keys = tuple(k.strip() for k in env_cfg.state_keys.split(",") if k.strip())
else:
state_keys = ()
if env_cfg.camera_keys:
camera_keys = tuple(k.strip() for k in env_cfg.camera_keys.split(",") if k.strip())
else:
camera_keys = ()
if not state_keys and not camera_keys:
raise ValueError("At least one of state_keys or camera_keys must be specified.")
preprocessor_steps.append(
IsaaclabArenaProcessorStep(
state_keys=state_keys,
camera_keys=camera_keys,
)
)
preprocessor = PolicyProcessorPipeline(steps=preprocessor_steps)
postprocessor = PolicyProcessorPipeline(steps=postprocessor_steps)
return preprocessor, postprocessor
return env_cfg.get_env_processors()
def make_env(
cfg: EnvConfig | str,
n_envs: int = 1,
use_async_envs: bool = False,
use_async_envs: bool = True,
hub_cache_dir: str | None = None,
trust_remote_code: bool = False,
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
@@ -163,57 +119,4 @@ def make_env(
if n_envs < 1:
raise ValueError("`n_envs` must be at least 1")
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
if "libero" in cfg.type:
from lerobot.envs.libero import create_libero_envs
if cfg.task is None:
raise ValueError("LiberoEnv requires a task to be specified")
return create_libero_envs(
task=cfg.task,
n_envs=n_envs,
camera_name=cfg.camera_name,
init_states=cfg.init_states,
gym_kwargs=cfg.gym_kwargs,
env_cls=env_cls,
control_mode=cfg.control_mode,
episode_length=cfg.episode_length,
)
elif "metaworld" in cfg.type:
from lerobot.envs.metaworld import create_metaworld_envs
if cfg.task is None:
raise ValueError("MetaWorld requires a task to be specified")
return create_metaworld_envs(
task=cfg.task,
n_envs=n_envs,
gym_kwargs=cfg.gym_kwargs,
env_cls=env_cls,
)
if cfg.gym_id not in gym_registry:
print(f"gym id '{cfg.gym_id}' not found, attempting to import '{cfg.package_name}'...")
try:
importlib.import_module(cfg.package_name)
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
f"Package '{cfg.package_name}' required for env '{cfg.type}' not found. "
f"Please install it or check PYTHONPATH."
) from e
if cfg.gym_id not in gym_registry:
raise gym.error.NameNotFound(
f"Environment '{cfg.gym_id}' not registered even after importing '{cfg.package_name}'."
)
def _make_one():
return gym.make(cfg.gym_id, disable_env_checker=cfg.disable_env_checker, **(cfg.gym_kwargs or {}))
vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=gym.vector.AutoresetMode.SAME_STEP)
# normalize to {suite: {task_id: vec_env}} for consistency
suite_name = cfg.type # e.g., "pusht", "aloha"
return {suite_name: {0: vec}}
return cfg.create_envs(n_envs=n_envs, use_async_envs=use_async_envs)
+98 -26
View File
@@ -150,7 +150,17 @@ class LiberoEnv(gym.Env):
self.init_state_id = self.episode_index # tie each sub-env to a fixed init state
self._env = self._make_envs_task(task_suite, self.task_id)
# Extract task metadata without allocating GPU resources (safe before fork).
task = task_suite.get_task(task_id)
self.task = task.name
self.task_description = task.language
self._task_bddl_file = os.path.join(
get_libero_path("bddl_files"), task.problem_folder, task.bddl_file
)
self._env: OffScreenRenderEnv | None = (
None # deferred — created on first reset() inside the worker subprocess
)
default_steps = 500
self._max_episode_steps = (
TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps)
@@ -221,28 +231,33 @@ class LiberoEnv(gym.Env):
low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32
)
def _ensure_env(self) -> None:
"""Create the underlying OffScreenRenderEnv on first use.
Called inside the worker subprocess after fork(), so each worker gets
its own clean EGL context rather than inheriting a stale one from the
parent process (which causes EGL_BAD_CONTEXT crashes with AsyncVectorEnv).
"""
if self._env is not None:
return
env = OffScreenRenderEnv(
bddl_file_name=self._task_bddl_file,
camera_heights=self.observation_height,
camera_widths=self.observation_width,
)
env.reset()
self._env = env
def render(self):
self._ensure_env()
raw_obs = self._env.env._get_observations()
image = self._format_raw_obs(raw_obs)["pixels"]["image"]
pixels = self._format_raw_obs(raw_obs)["pixels"]
image = next(iter(pixels.values()))
image = image[::-1, ::-1] # flip both H and W for visualization
return image
def _make_envs_task(self, task_suite: Any, task_id: int = 0):
task = task_suite.get_task(task_id)
self.task = task.name
self.task_description = task.language
task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file)
env_args = {
"bddl_file_name": task_bddl_file,
"camera_heights": self.observation_height,
"camera_widths": self.observation_width,
}
env = OffScreenRenderEnv(**env_args)
env.reset()
return env
def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation:
assert self._env is not None, "_format_raw_obs called before _ensure_env()"
images = {}
for camera_name in self.camera_name:
image = raw_obs[camera_name]
@@ -294,6 +309,7 @@ class LiberoEnv(gym.Env):
)
def reset(self, seed=None, **kwargs):
self._ensure_env()
super().reset(seed=seed)
self._env.seed(seed)
raw_obs = self._env.reset()
@@ -320,6 +336,8 @@ class LiberoEnv(gym.Env):
return observation, info
def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]:
self._ensure_env()
assert self._env is not None
if action.ndim != 1:
raise ValueError(
f"Expected action to be 1-D (shape (action_dim,)), "
@@ -339,18 +357,13 @@ class LiberoEnv(gym.Env):
)
observation = self._format_raw_obs(raw_obs)
if terminated:
info["final_info"] = {
"task": self.task,
"task_id": self.task_id,
"done": bool(done),
"is_success": bool(is_success),
}
self.reset()
truncated = False
return observation, reward, terminated, truncated, info
def close(self):
self._env.close()
if self._env is not None:
self._env.close()
def _make_env_fns(
@@ -364,6 +377,7 @@ def _make_env_fns(
init_states: bool,
gym_kwargs: Mapping[str, Any],
control_mode: str,
camera_name_mapping: dict[str, str] | None = None,
) -> list[Callable[[], LiberoEnv]]:
"""Build n_envs factory callables for a single (suite, task_id)."""
@@ -379,6 +393,7 @@ def _make_env_fns(
episode_index=episode_index,
n_envs=n_envs,
control_mode=control_mode,
camera_name_mapping=camera_name_mapping,
**local_kwargs,
)
@@ -388,6 +403,57 @@ def _make_env_fns(
return fns
class _LazyAsyncVectorEnv:
"""Wrapper that defers AsyncVectorEnv creation until first use.
Creating all tasks' AsyncVectorEnvs upfront spawns N_tasks × n_envs worker
processes, all of which allocate EGL/GPU resources immediately. Since tasks
are evaluated sequentially, only one task's workers need to be alive at a
time. This wrapper stores the factory functions and creates the real
AsyncVectorEnv on first reset(), keeping peak process count = n_envs.
"""
def __init__(self, env_fns: list[Callable]):
self._env_fns = env_fns
self._env: gym.vector.AsyncVectorEnv | None = None
self.num_envs = len(env_fns)
# Instantiate one env to expose spaces (no GPU — _ensure_env is lazy).
tmp = env_fns[0]()
self.observation_space = tmp.observation_space
self.action_space = tmp.action_space
self.single_observation_space = tmp.observation_space
self.single_action_space = tmp.action_space
tmp.close()
def _ensure(self):
if self._env is None:
self._env = gym.vector.AsyncVectorEnv(self._env_fns, context="forkserver")
def reset(self, **kwargs):
self._ensure()
return self._env.reset(**kwargs)
def step(self, actions):
self._ensure()
return self._env.step(actions)
def call(self, name, *args, **kwargs):
self._ensure()
return self._env.call(name, *args, **kwargs)
def get_attr(self, name):
self._ensure()
return self._env.get_attr(name)
def close(self):
if self._env is not None:
self._env.close()
self._env = None
def __del__(self):
self.close()
# ---- Main API ----------------------------------------------------------------
@@ -400,6 +466,7 @@ def create_libero_envs(
env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None,
control_mode: str = "relative",
episode_length: int | None = None,
camera_name_mapping: dict[str, str] | None = None,
) -> dict[str, dict[int, Any]]:
"""
Create vectorized LIBERO environments with a consistent return shape.
@@ -430,6 +497,8 @@ def create_libero_envs(
if task_ids_filter is not None:
print(f"Restricting to task_ids={task_ids_filter}")
is_async = env_cls is gym.vector.AsyncVectorEnv
out: dict[str, dict[int, Any]] = defaultdict(dict)
for suite_name in suite_names:
suite = _get_suite(suite_name)
@@ -449,9 +518,12 @@ def create_libero_envs(
init_states=init_states,
gym_kwargs=gym_kwargs,
control_mode=control_mode,
camera_name_mapping=camera_name_mapping,
)
out[suite_name][tid] = env_cls(fns)
if is_async:
out[suite_name][tid] = _LazyAsyncVectorEnv(fns)
else:
out[suite_name][tid] = env_cls(fns)
print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}")
# return plain dicts for predictability
return {suite: dict(task_map) for suite, task_map in out.items()}
+26 -17
View File
@@ -97,8 +97,9 @@ class MetaworldEnv(gym.Env):
self.visualization_height = visualization_height
self.camera_name = camera_name
self._env = self._make_envs_task(self.task)
self._max_episode_steps = self._env.max_path_length
self._env_name = self.task # already stripped of "metaworld-" prefix above
self._env = None # deferred — created on first reset() inside the worker subprocess
self._max_episode_steps = 500 # MT1 environments always have max_path_length=500
self.task_description = TASK_DESCRIPTIONS[self.task]
self.expert_policy = TASK_POLICY_MAPPING[self.task]()
@@ -136,6 +137,24 @@ class MetaworldEnv(gym.Env):
self.action_space = spaces.Box(low=-1, high=1, shape=(ACTION_DIM,), dtype=np.float32)
def _ensure_env(self) -> None:
"""Create the underlying MetaWorld env on first use.
Called inside the worker subprocess after fork(), so each worker gets
its own clean rendering context rather than inheriting a stale one from
the parent process (which causes crashes with AsyncVectorEnv).
"""
if self._env is not None:
return
mt1 = metaworld.MT1(self._env_name, seed=42)
env = mt1.train_classes[self._env_name](render_mode="rgb_array", camera_name=self.camera_name)
env.set_task(mt1.train_tasks[0])
if self.camera_name == "corner2":
env.model.cam_pos[2] = [0.75, 0.075, 0.7]
env.reset()
env._freeze_rand_vec = False # otherwise no randomization
self._env = env
def render(self) -> np.ndarray:
"""
Render the current environment frame.
@@ -143,26 +162,13 @@ class MetaworldEnv(gym.Env):
Returns:
np.ndarray: The rendered RGB image from the environment.
"""
self._ensure_env()
image = self._env.render()
if self.camera_name == "corner2":
# Images from this camera are flipped — correct them
image = np.flip(image, (0, 1))
return image
def _make_envs_task(self, env_name: str):
mt1 = metaworld.MT1(env_name, seed=42)
env = mt1.train_classes[env_name](render_mode="rgb_array", camera_name=self.camera_name)
env.set_task(mt1.train_tasks[0])
if self.camera_name == "corner2":
env.model.cam_pos[2] = [
0.75,
0.075,
0.7,
] # corner2 position, similar to https://arxiv.org/pdf/2206.14244
env.reset()
env._freeze_rand_vec = False # otherwise no randomization
return env
def _format_raw_obs(self, raw_obs: np.ndarray) -> RobotObservation:
image = None
if self._env is not None:
@@ -209,6 +215,7 @@ class MetaworldEnv(gym.Env):
observation (RobotObservation): The initial formatted observation.
info (Dict[str, Any]): Additional info about the reset state.
"""
self._ensure_env()
super().reset(seed=seed)
raw_obs, info = self._env.reset(seed=seed)
@@ -232,6 +239,7 @@ class MetaworldEnv(gym.Env):
truncated (bool): Whether the episode was truncated due to a time limit.
info (Dict[str, Any]): Additional environment info.
"""
self._ensure_env()
if action.ndim != 1:
raise ValueError(
f"Expected action to be 1-D (shape (action_dim,)), "
@@ -263,7 +271,8 @@ class MetaworldEnv(gym.Env):
return observation, reward, terminated, truncated, info
def close(self):
self._env.close()
if self._env is not None:
self._env.close()
# ---- Main API ----------------------------------------------------------------
+21 -26
View File
@@ -130,56 +130,51 @@ def env_to_policy_features(env_cfg: EnvConfig) -> dict[str, PolicyFeature]:
return policy_features
def are_all_envs_same_type(env: gym.vector.VectorEnv) -> bool:
first_type = type(env.envs[0]) # Get type of first env
return all(type(e) is first_type for e in env.envs) # Fast type check
def _get_sub_env_attr(env: gym.vector.VectorEnv, attr: str, index: int = 0):
"""Retrieve an attribute from a sub-environment, works for both Sync and Async."""
try:
return env.get_attr(attr)[index]
except (AttributeError, Exception):
return None
def _sub_env_has_attr(env: gym.vector.VectorEnv, attr: str) -> bool:
try:
env.get_attr(attr)
return True
except (AttributeError, Exception):
return False
def check_env_attributes_and_types(env: gym.vector.VectorEnv) -> None:
with warnings.catch_warnings():
warnings.simplefilter("once", UserWarning) # Apply filter only in this function
warnings.simplefilter("once", UserWarning)
if not (hasattr(env.envs[0], "task_description") and hasattr(env.envs[0], "task")):
if not (_sub_env_has_attr(env, "task_description") and _sub_env_has_attr(env, "task")):
warnings.warn(
"The environment does not have 'task_description' and 'task'. Some policies require these features.",
UserWarning,
stacklevel=2,
)
if not are_all_envs_same_type(env):
warnings.warn(
"The environments have different types. Make sure you infer the right task from each environment. Empty task will be passed instead.",
UserWarning,
stacklevel=2,
)
def add_envs_task(env: gym.vector.VectorEnv, observation: RobotObservation) -> RobotObservation:
"""Adds task feature to the observation dict with respect to the first environment attribute."""
if hasattr(env.envs[0], "task_description"):
task_result = env.call("task_description")
if _sub_env_has_attr(env, "task_description"):
task_result = list(env.call("task_description"))
if isinstance(task_result, tuple):
task_result = list(task_result)
if not isinstance(task_result, list):
raise TypeError(f"Expected task_description to return a list, got {type(task_result)}")
if not all(isinstance(item, str) for item in task_result):
raise TypeError("All items in task_description result must be strings")
observation["task"] = task_result
elif hasattr(env.envs[0], "task"):
task_result = env.call("task")
elif _sub_env_has_attr(env, "task"):
task_result = list(env.call("task"))
if isinstance(task_result, tuple):
task_result = list(task_result)
if not isinstance(task_result, list):
raise TypeError(f"Expected task to return a list, got {type(task_result)}")
if not all(isinstance(item, str) for item in task_result):
raise TypeError("All items in task result must be strings")
observation["task"] = task_result
else: # For envs without language instructions, e.g. aloha transfer cube and etc.
else:
num_envs = observation[list(observation.keys())[0]].shape[0]
observation["task"] = ["" for _ in range(num_envs)]
return observation
+2 -2
View File
@@ -136,8 +136,8 @@ class TokenizerProcessorStep(ObservationProcessorStep):
# Standardize to a list of strings for the tokenizer
if isinstance(task, str):
return [task]
elif isinstance(task, list) and all(isinstance(t, str) for t in task):
return task
elif isinstance(task, (list, tuple)) and all(isinstance(t, str) for t in task):
return list(task)
return None
+86 -41
View File
@@ -47,8 +47,10 @@ You can learn about the CLI options for this script in the `EvalPipelineConfig`
"""
import concurrent.futures as cf
import copy
import json
import logging
import math
import threading
import time
from collections import defaultdict
@@ -56,7 +58,6 @@ from collections.abc import Callable
from contextlib import nullcontext
from copy import deepcopy
from dataclasses import asdict
from functools import partial
from pathlib import Path
from pprint import pformat
from typing import Any, TypedDict
@@ -73,7 +74,6 @@ from lerobot.configs import parser
from lerobot.configs.eval import EvalPipelineConfig
from lerobot.envs.factory import make_env, make_env_pre_post_processors
from lerobot.envs.utils import (
add_envs_task,
check_env_attributes_and_types,
close_envs,
preprocess_observation,
@@ -93,6 +93,14 @@ from lerobot.utils.utils import (
)
def _shard_episodes(n_episodes: int, shard_id: int, num_shards: int) -> list[int]:
"""Return the episode indices assigned to this shard (round-robin distribution).
Example: _shard_episodes(10, 1, 4) -> [1, 5, 9]
"""
return list(range(shard_id, n_episodes, num_shards))
def rollout(
env: gym.vector.VectorEnv,
policy: PreTrainedPolicy,
@@ -166,9 +174,15 @@ def rollout(
if return_observations:
all_observations.append(deepcopy(observation))
# Infer "task" from attributes of environments.
# TODO: works with SyncVectorEnv but not AsyncVectorEnv
observation = add_envs_task(env, observation)
# Infer "task" from sub-environments (prefer natural language description).
# env.call() works with both SyncVectorEnv and AsyncVectorEnv.
try:
observation["task"] = list(env.call("task_description"))
except Exception:
try:
observation["task"] = list(env.call("task"))
except Exception:
observation["task"] = [""] * env.num_envs
# Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO)
observation = env_preprocessor(observation)
@@ -193,14 +207,13 @@ def rollout(
# VectorEnv stores is_success in `info["final_info"][env_index]["is_success"]`. "final_info" isn't
# available if none of the envs finished.
if "final_info" in info:
final_info = info["final_info"]
if not isinstance(final_info, dict):
raise RuntimeError(
"Unsupported `final_info` format: expected dict (Gymnasium >= 1.0). "
"You're likely using an older version of gymnasium (< 1.0). Please upgrade."
)
successes = final_info["is_success"].tolist()
if "final_info" in info and isinstance(info["final_info"], dict):
successes = info["final_info"]["is_success"].tolist()
elif "is_success" in info:
is_success = info["is_success"]
successes = (
is_success.tolist() if hasattr(is_success, "tolist") else [bool(is_success)] * env.num_envs
)
else:
successes = [False] * env.num_envs
@@ -549,6 +562,14 @@ def eval_main(cfg: EvalPipelineConfig):
# Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)
# Sharding: each shard runs a subset of n_episodes with non-overlapping seeds.
shard_id = cfg.eval.shard_id
num_shards = cfg.eval.num_shards
episodes_for_shard = _shard_episodes(cfg.eval.n_episodes, shard_id, num_shards)
n_per_shard = len(episodes_for_shard)
# Shift the seed so each shard gets a different, non-overlapping seed range.
shard_seed = (cfg.seed or 0) + shard_id * math.ceil(cfg.eval.n_episodes / num_shards)
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
info = eval_policy_all(
envs=envs,
@@ -557,10 +578,10 @@ def eval_main(cfg: EvalPipelineConfig):
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=cfg.eval.n_episodes,
n_episodes=n_per_shard,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
start_seed=shard_seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
)
print("Overall Aggregated Metrics:")
@@ -573,8 +594,13 @@ def eval_main(cfg: EvalPipelineConfig):
# Close all vec envs
close_envs(envs)
# Save info
with open(Path(cfg.output_dir) / "eval_info.json", "w") as f:
# Save info — use shard-specific filename when running in parallel mode.
if num_shards > 1:
out_path = Path(cfg.output_dir) / f"shard_{shard_id}_of_{num_shards}.json"
else:
out_path = Path(cfg.output_dir) / "eval_info.json"
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w") as f:
json.dump(info, f, indent=2)
logging.info("End of eval")
@@ -734,39 +760,58 @@ def eval_policy_all(
group_acc[group]["video_paths"].extend(paths)
overall["video_paths"].extend(paths)
def _make_thread_policy(p: PreTrainedPolicy) -> PreTrainedPolicy:
"""Shallow copy sharing weight tensors, with independent per-thread state.
copy.copy() gives a new Python object whose _parameters dict is a shared
reference (same tensor storage, zero extra VRAM). reset() then rebinds
mutable state (action queues etc.) to fresh per-thread objects.
Note: does NOT work for ACT with temporal_ensemble_coeff — that policy's
reset() mutates a shared sub-object. Use max_parallel_tasks=1 for that config.
"""
thread_p = copy.copy(p)
thread_p.reset()
return thread_p
# Choose runner (sequential vs threaded)
task_runner = partial(
run_one,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
)
_runner_kwargs = {
"env_preprocessor": env_preprocessor,
"env_postprocessor": env_postprocessor,
"preprocessor": preprocessor,
"postprocessor": postprocessor,
"n_episodes": n_episodes,
"max_episodes_rendered": max_episodes_rendered,
"videos_dir": videos_dir,
"return_episode_data": return_episode_data,
"start_seed": start_seed,
}
if max_parallel_tasks <= 1:
# sequential path (single accumulator path on the main thread)
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
for task_group, task_id, env in tasks:
tg, tid, metrics = task_runner(task_group, task_id, env)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
try:
tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
else:
# threaded path: submit all tasks, consume completions on main thread and accumulate there
# threaded path: each thread gets a shallow policy copy (shared weights, independent state)
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
fut2meta = {}
for task_group, task_id, env in tasks:
fut = executor.submit(task_runner, task_group, task_id, env)
fut2meta[fut] = (task_group, task_id)
fut = executor.submit(
run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs
)
fut2meta[fut] = (task_group, task_id, env)
for fut in cf.as_completed(fut2meta):
tg, tid, metrics = fut.result()
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
tg, tid, env = fut2meta[fut]
try:
tg, tid, metrics = fut.result()
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
# compute aggregated metrics helper (robust to lists/scalars)
def _agg_from_list(xs):
@@ -0,0 +1,249 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Probe hardware and recommend optimal lerobot-eval-parallel flags.
Run standalone:
lerobot-eval-autotune --policy.path=lerobot/smolvla_libero --env.type=libero
Or called programmatically from lerobot_eval_parallel when --num-shards auto.
Steps:
1. Probe GPU VRAM and CPU core count.
2. Measure model VRAM footprint (load policy, delta of cuda.memory_allocated).
3. Compute max shards limited by VRAM (85% of total).
4. Probe env step time (optional, skipped when skip_timing=True).
5. Probe inference time (optional, skipped when skip_timing=True).
6. Derive num_shards = min(vram_limit, saturation_shards).
7. Choose MUJOCO_GL (egl vs osmesa) based on remaining VRAM headroom.
8. Compute batch_size = max(4, min(floor(cpu_cores * 0.8 / num_shards), 64)).
9. Print paste-ready command.
"""
import math
import os
import sys
import time
from dataclasses import dataclass
@dataclass
class AutotuneRecommendation:
num_shards: int
batch_size: int
mujoco_gl: str
use_amp: bool
# Probed values
gpu_name: str
vram_gb: float
cpu_cores: int
model_gb: float
env_step_ms: float | None
infer_ms: float | None
_DEFAULT_ENV_STEP_MS = 22.0 # LIBERO on GPU, typical value
_DEFAULT_INFER_MS = 5.0 # SmolVLA fp16 on H100
def _probe_gpu() -> tuple[str, float]:
"""Return (gpu_name, vram_gb). Falls back to CPU sentinel on non-CUDA systems."""
try:
import torch
if not torch.cuda.is_available():
return "CPU (no CUDA)", 0.0
props = torch.cuda.get_device_properties(0)
return props.name, props.total_memory / (1024**3)
except Exception:
return "unknown", 0.0
def _probe_model_gb(passthrough: list[str]) -> float:
"""Load the policy (from --policy.path) and measure VRAM delta. Returns GB."""
# Extract policy path from passthrough args
policy_path = None
for tok in passthrough:
if tok.startswith("policy.path="):
policy_path = tok.split("=", 1)[1]
break
if tok.startswith("--policy.path="):
policy_path = tok.split("=", 1)[1]
break
if policy_path is None:
return 0.0
try:
import torch
from lerobot.policies.factory import make_policy
from lerobot.policies.pretrained import PreTrainedConfig
if not torch.cuda.is_available():
return 0.0
torch.cuda.synchronize()
before = torch.cuda.memory_allocated(0)
cfg = PreTrainedConfig.from_pretrained(policy_path)
cfg.pretrained_path = policy_path # type: ignore[assignment]
policy = make_policy(cfg=cfg)
policy.eval()
torch.cuda.synchronize()
after = torch.cuda.memory_allocated(0)
del policy
torch.cuda.empty_cache()
return (after - before) / (1024**3)
except Exception as e:
print(f"[autotune] could not measure model VRAM: {e}", file=sys.stderr)
return 0.0
def _probe_env_step_ms(passthrough: list[str], batch_size: int = 8, n_steps: int = 30) -> float | None:
"""Run a short env warmup and return median step latency in ms. Returns None on failure."""
try:
import numpy as np
from lerobot.envs.factory import make_env
# Parse env config from passthrough using lerobot's own parser
env_type = None
for tok in passthrough:
if tok.startswith("env.type=") or tok.startswith("--env.type="):
env_type = tok.split("=", 1)[1]
break
if env_type is None:
return None
# Minimal env config
from lerobot.envs.factory import make_env_config
env_cfg = make_env_config(env_type)
envs = make_env(env_cfg, n_envs=batch_size, use_async_envs=(batch_size > 1))
# Get first vec env
first_suite = next(iter(envs.values()))
env = next(iter(first_suite.values()))
env.reset()
dummy_action = np.zeros((batch_size, env.single_action_space.shape[0]))
timings = []
for _ in range(n_steps):
t0 = time.perf_counter()
env.step(dummy_action)
timings.append((time.perf_counter() - t0) * 1000)
env.close()
return float(np.median(timings))
except Exception as e:
print(f"[autotune] env step probe failed: {e}", file=sys.stderr)
return None
def probe_and_recommend(
passthrough: list[str],
skip_timing: bool = False,
) -> AutotuneRecommendation:
"""Probe hardware + model and return the recommended configuration."""
gpu_name, vram_gb = _probe_gpu()
cpu_cores = os.cpu_count() or 4
# Model footprint
model_gb = _probe_model_gb(passthrough)
if model_gb == 0.0:
# Unknown model: assume a conservative 14 GB (SmolVLA fp16) as placeholder
model_gb = 14.0
print("[autotune] model size unknown, assuming 14 GB (SmolVLA fp16)", file=sys.stderr)
# Max shards from VRAM (leave 15% headroom for activations + env frames)
max_shards_vram = max(1, math.floor(vram_gb * 0.85 / model_gb)) if vram_gb > 0 else 1
# Timing probes
env_step_ms: float | None = None
infer_ms: float | None = None
if not skip_timing:
env_step_ms = _probe_env_step_ms(passthrough)
# Inference time: assume ~infer = env_step / saturation_factor heuristic
# Full probe would require loading policy — skip for now to stay fast.
infer_ms = _DEFAULT_INFER_MS
# Number of shards to saturate GPU: ceil(env_step / infer)
_step = env_step_ms or _DEFAULT_ENV_STEP_MS
_infer = infer_ms or _DEFAULT_INFER_MS
saturation_shards = max(1, math.ceil(_step / _infer))
num_shards = min(max_shards_vram, saturation_shards)
# Rendering mode: EGL if all model copies + env frame buffers fit in VRAM
env_vram_per_shard_gb = 0.01 # ~10 MB overhead per env batch
total_with_egl = num_shards * (model_gb + env_vram_per_shard_gb)
mujoco_gl = "egl" if (vram_gb == 0 or total_with_egl < vram_gb * 0.85) else "osmesa"
# Batch size: fill CPU cores evenly across shards
batch_size = max(4, min(math.floor(cpu_cores * 0.8 / num_shards), 64))
# Recommend AMP when model is large (saves ~50% VRAM)
use_amp = model_gb > 8.0
return AutotuneRecommendation(
num_shards=num_shards,
batch_size=batch_size,
mujoco_gl=mujoco_gl,
use_amp=use_amp,
gpu_name=gpu_name,
vram_gb=vram_gb,
cpu_cores=cpu_cores,
model_gb=model_gb,
env_step_ms=env_step_ms,
infer_ms=infer_ms,
)
def main(argv: list[str] | None = None) -> None:
passthrough = argv if argv is not None else sys.argv[1:]
rec = probe_and_recommend(passthrough)
env_step_str = (
f"{rec.env_step_ms:.0f}ms" if rec.env_step_ms else f"~{_DEFAULT_ENV_STEP_MS:.0f}ms (estimated)"
)
infer_str = f"{rec.infer_ms:.0f}ms" if rec.infer_ms else f"~{_DEFAULT_INFER_MS:.0f}ms (estimated)"
print()
print(
f"GPU: {rec.gpu_name} | VRAM: {rec.vram_gb:.1f} GB | CPU cores: {rec.cpu_cores} | Model: {rec.model_gb:.1f} GB"
)
print()
print(f" env_step_ms: {env_step_str} | infer_ms: {infer_str}")
print()
print(f" num_shards: {rec.num_shards}")
print(f" batch_size: {rec.batch_size}")
print(f" MUJOCO_GL: {rec.mujoco_gl}")
if rec.use_amp:
print(" use_amp: true (recommended — halves VRAM, faster matmuls)")
print()
# Build paste-ready command
flags = [f"--num-shards {rec.num_shards}", f"eval.batch_size={rec.batch_size}"]
if rec.use_amp:
flags.append("policy.use_amp=true")
flags_str = " \\\n ".join(flags)
passthrough_str = " \\\n ".join(passthrough) if passthrough else "[your flags]"
print(" Paste-ready command:")
print(f" MUJOCO_GL={rec.mujoco_gl} lerobot-eval-parallel \\")
print(f" {flags_str} \\")
print(f" {passthrough_str}")
print()
if __name__ == "__main__":
main()
@@ -0,0 +1,185 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run lerobot-eval across N independent subprocesses (shards) for maximum GPU utilization.
Each shard handles a disjoint subset of episodes and writes its own JSON results file.
Results are merged and printed when all shards complete.
Usage:
lerobot-eval-parallel --num-shards 4 [any lerobot-eval flags]
lerobot-eval-parallel --num-shards auto [any lerobot-eval flags]
lerobot-eval-parallel --num-shards auto --render-device cpu [any lerobot-eval flags]
--num-shards auto:
Calls lerobot-eval-autotune to probe hardware and determine the optimal number of shards.
--render-device gpu|cpu|auto:
Controls MUJOCO_GL env var. 'gpu' -> EGL (faster, ~3ms/frame, ~200KB VRAM/env).
'cpu' -> osmesa (slower, ~12ms/frame, 0 VRAM). 'auto' picks based on VRAM headroom.
Default: auto.
"""
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
def _parse_known(argv: list[str]) -> tuple[argparse.Namespace, list[str]]:
p = argparse.ArgumentParser(add_help=False)
p.add_argument("--num-shards", default="1")
p.add_argument("--render-device", choices=["gpu", "cpu", "auto"], default="auto")
p.add_argument("--output-dir", default=None)
return p.parse_known_args(argv)
def _resolve_num_shards(num_shards_str: str, passthrough: list[str]) -> int:
if num_shards_str == "auto":
from lerobot.scripts.lerobot_eval_autotune import probe_and_recommend
rec = probe_and_recommend(passthrough)
print(
f"[autotune] recommended num_shards={rec.num_shards}, batch_size={rec.batch_size}, MUJOCO_GL={rec.mujoco_gl}"
)
return rec.num_shards
return int(num_shards_str)
def _resolve_mujoco_gl(render_device: str, num_shards: int, passthrough: list[str]) -> str:
if render_device == "gpu":
return "egl"
if render_device == "cpu":
return "osmesa"
# auto: use EGL for single shard; for multiple shards check VRAM headroom
if num_shards == 1:
return "egl"
try:
from lerobot.scripts.lerobot_eval_autotune import probe_and_recommend
rec = probe_and_recommend(passthrough, skip_timing=True)
return rec.mujoco_gl
except Exception:
# Conservative fallback: osmesa avoids EGL VRAM contention
return "osmesa"
def _extract_output_dir(passthrough: list[str]) -> str | None:
for tok in passthrough:
if tok.startswith("--output-dir="):
return tok.split("=", 1)[1]
if tok == "--output-dir":
idx = passthrough.index(tok)
if idx + 1 < len(passthrough):
return passthrough[idx + 1]
return None
def _merge_shards(output_dir: str, num_shards: int) -> dict:
"""Merge per-shard JSON files into a single result dict and write eval_info.json."""
all_per_task: list[dict] = []
per_group: dict[str, dict] = {}
for k in range(num_shards):
shard_path = Path(output_dir) / f"shard_{k}_of_{num_shards}.json"
if not shard_path.exists():
print(f"[warning] shard file not found: {shard_path}", file=sys.stderr)
continue
with open(shard_path) as f:
shard = json.load(f)
all_per_task.extend(shard.get("per_task", []))
for group, metrics in shard.get("per_group", {}).items():
if group not in per_group:
per_group[group] = {"sum_rewards": [], "max_rewards": [], "successes": []}
for key in ("sum_rewards", "max_rewards", "successes"):
# metrics may store aggregates; reconstruct lists if possible
per_group[group][key].extend(metrics.get(key, []))
# Re-aggregate
import numpy as np
def _nanmean(xs: list) -> float:
return float(np.nanmean(xs)) if xs else float("nan")
groups_out = {}
all_sr, all_mr, all_succ = [], [], []
for group, acc in per_group.items():
groups_out[group] = {
"avg_sum_reward": _nanmean(acc["sum_rewards"]),
"avg_max_reward": _nanmean(acc["max_rewards"]),
"pc_success": _nanmean(acc["successes"]) * 100 if acc["successes"] else float("nan"),
"n_episodes": len(acc["sum_rewards"]),
}
all_sr.extend(acc["sum_rewards"])
all_mr.extend(acc["max_rewards"])
all_succ.extend(acc["successes"])
overall = {
"avg_sum_reward": _nanmean(all_sr),
"avg_max_reward": _nanmean(all_mr),
"pc_success": _nanmean(all_succ) * 100 if all_succ else float("nan"),
"n_episodes": len(all_sr),
}
merged = {"per_task": all_per_task, "per_group": groups_out, "overall": overall}
out_path = Path(output_dir) / "eval_info.json"
with open(out_path, "w") as f:
json.dump(merged, f, indent=2)
return merged
def main(argv: list[str] | None = None) -> None:
args, passthrough = _parse_known(argv if argv is not None else sys.argv[1:])
num_shards = _resolve_num_shards(args.num_shards, passthrough)
mujoco_gl = _resolve_mujoco_gl(args.render_device, num_shards, passthrough)
output_dir = args.output_dir or _extract_output_dir(passthrough)
print(f"[lerobot-eval-parallel] launching {num_shards} shard(s), MUJOCO_GL={mujoco_gl}")
child_env = {**os.environ, "MUJOCO_GL": mujoco_gl, "OMP_NUM_THREADS": "1"}
procs = []
for k in range(num_shards):
cmd = [
sys.executable,
"-m",
"lerobot.scripts.lerobot_eval",
f"eval.shard_id={k}",
f"eval.num_shards={num_shards}",
*passthrough,
]
if output_dir:
# Each shard shares the same output_dir; shard files are named shard_K_of_N.json
cmd.append(f"output_dir={output_dir}")
procs.append(subprocess.Popen(cmd, env=child_env))
return_codes = [p.wait() for p in procs]
if any(rc != 0 for rc in return_codes):
failed = [k for k, rc in enumerate(return_codes) if rc != 0]
print(f"[lerobot-eval-parallel] shards {failed} failed with non-zero exit codes.", file=sys.stderr)
sys.exit(1)
if output_dir and num_shards > 1:
merged = _merge_shards(output_dir, num_shards)
print("\n=== Merged Results ===")
print(json.dumps(merged["overall"], indent=2))
if __name__ == "__main__":
main()
+143
View File
@@ -0,0 +1,143 @@
"""Tests for the benchmark dispatch refactor (create_envs / get_env_processors on EnvConfig)."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
import gymnasium as gym
import pytest
from gymnasium.envs.registration import register, registry as gym_registry
from lerobot.configs.types import PolicyFeature
from lerobot.envs.configs import EnvConfig
from lerobot.envs.factory import make_env, make_env_config, make_env_pre_post_processors
logger = logging.getLogger(__name__)
def test_registry_all_types():
"""make_env_config should resolve every registered EnvConfig subclass via the registry."""
known = list(EnvConfig.get_known_choices().keys())
assert len(known) >= 6
for t in known:
cfg = make_env_config(t)
if not isinstance(cfg, EnvConfig):
continue
assert cfg.type == t
def test_unknown_type():
with pytest.raises(ValueError, match="not registered"):
make_env_config("nonexistent")
def test_identity_processors():
"""Base class get_env_processors() returns identity pipelines."""
cfg = make_env_config("aloha")
pre, post = cfg.get_env_processors()
assert len(pre.steps) == 0 and len(post.steps) == 0
def test_delegation():
"""make_env() should call cfg.create_envs(), not use if/elif dispatch."""
sentinel = {"delegated": {0: "marker"}}
fake = type(
"Fake",
(),
{
"hub_path": None,
"create_envs": lambda self, n_envs, use_async_envs=False: sentinel,
},
)()
result = make_env(fake, n_envs=1)
assert result is sentinel
def test_processors_delegation():
"""make_env_pre_post_processors delegates to cfg.get_env_processors()."""
cfg = make_env_config("aloha")
pre, post = make_env_pre_post_processors(cfg, policy_cfg=None)
assert len(pre.steps) == 0
def test_base_create_envs():
"""Base class create_envs() should build a single-task VectorEnv via gym.make()."""
gym_id = "_dispatch_test/CartPole-v99"
if gym_id not in gym_registry:
register(id=gym_id, entry_point="gymnasium.envs.classic_control:CartPoleEnv")
@EnvConfig.register_subclass("_dispatch_base_test")
@dataclass
class _Env(EnvConfig):
task: str = "CartPole-v99"
fps: int = 10
features: dict[str, PolicyFeature] = field(default_factory=dict)
@property
def package_name(self):
return "_dispatch_test"
@property
def gym_id(self):
return gym_id
@property
def gym_kwargs(self):
return {}
try:
envs = _Env().create_envs(n_envs=2)
assert "_dispatch_base_test" in envs
env = envs["_dispatch_base_test"][0]
assert isinstance(env, gym.vector.VectorEnv)
assert env.num_envs == 2
env.close()
finally:
if gym_id in gym_registry:
del gym_registry[gym_id]
def test_custom_create_envs_override():
"""A custom EnvConfig subclass can override create_envs()."""
mock_vec = gym.vector.SyncVectorEnv([lambda: gym.make("CartPole-v1")])
@EnvConfig.register_subclass("_dispatch_custom_test")
@dataclass
class _Env(EnvConfig):
task: str = "x"
features: dict[str, PolicyFeature] = field(default_factory=dict)
@property
def gym_kwargs(self):
return {}
def create_envs(self, n_envs, use_async_envs=False):
return {"custom_suite": {0: mock_vec}}
try:
result = make_env(_Env(), n_envs=1)
assert "custom_suite" in result
finally:
mock_vec.close()
def test_custom_get_env_processors_override():
"""A custom EnvConfig subclass can override get_env_processors()."""
from lerobot.processor.pipeline import DataProcessorPipeline
@EnvConfig.register_subclass("_dispatch_proc_test")
@dataclass
class _Env(EnvConfig):
task: str = "x"
features: dict[str, PolicyFeature] = field(default_factory=dict)
@property
def gym_kwargs(self):
return {}
def get_env_processors(self):
return DataProcessorPipeline(steps=[]), DataProcessorPipeline(steps=[])
pre, post = _Env().get_env_processors()
assert isinstance(pre, DataProcessorPipeline)
@@ -189,6 +189,30 @@ def test_list_of_strings_tokenization(mock_auto_tokenizer):
assert attention_mask.shape == (2, 8)
@require_package("transformers")
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
def test_tuple_of_strings_tokenization(mock_auto_tokenizer):
"""Test tokenization of a tuple of strings (returned by VectorEnv.call())."""
mock_tokenizer = MockTokenizer(vocab_size=100)
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8)
transition = create_transition(
observation={"state": torch.tensor([1.0, 2.0])},
action=torch.tensor([0.1, 0.2]),
complementary_data={"task": ("pick up cube", "place on table")},
)
result = processor(transition)
observation = result[TransitionKey.OBSERVATION]
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
assert tokens.shape == (2, 8)
assert attention_mask.shape == (2, 8)
@require_package("transformers")
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
def test_custom_keys(mock_auto_tokenizer):