mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-11 14:49:43 +00:00
Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 66276f1efd | |||
| 5972a85ec7 | |||
| 800b0a5f26 | |||
| 6aeb7c54f9 | |||
| 1f7e7b4a90 | |||
| 681cc59ed2 | |||
| d9edc12e00 | |||
| fd2bad9b42 | |||
| 7e729e33c9 | |||
| e383207a15 | |||
| 8ed658c6aa | |||
| 0045f88355 | |||
| 89ce91f69f | |||
| 90e614f6b9 | |||
| ff4f860e5d | |||
| 6f2823bfc4 | |||
| 77415559b8 | |||
| 24d9b74d81 | |||
| 508358749a |
@@ -26,7 +26,7 @@ During evaluation, data moves through four stages:
|
||||
1. gym.Env ──→ raw observations (numpy dicts)
|
||||
|
||||
2. Preprocessing ──→ standard LeRobot keys + task description
|
||||
(preprocess_observation, add_envs_task in envs/utils.py)
|
||||
(preprocess_observation in envs/utils.py, env.call("task_description"))
|
||||
|
||||
3. Processors ──→ env-specific then policy-specific transforms
|
||||
(env_preprocessor, policy_preprocessor)
|
||||
@@ -115,23 +115,22 @@ Each `EnvConfig` subclass declares two dicts that tell the policy what to expect
|
||||
## Step by step
|
||||
|
||||
<Tip>
|
||||
At minimum, you need three files: a **gym.Env wrapper**, an **EnvConfig
|
||||
subclass**, and a **factory dispatch branch**. Everything else is optional or
|
||||
documentation.
|
||||
At minimum, you need two files: a **gym.Env wrapper** and an **EnvConfig
|
||||
subclass** with a `create_envs()` override. Everything else is optional or
|
||||
documentation. No changes to `factory.py` are needed.
|
||||
</Tip>
|
||||
|
||||
### Checklist
|
||||
|
||||
| File | Required | Why |
|
||||
| ---------------------------------------- | -------- | ----------------------------------------- |
|
||||
| `src/lerobot/envs/<benchmark>.py` | Yes | Wraps the simulator as a standard gym.Env |
|
||||
| `src/lerobot/envs/configs.py` | Yes | Registers your benchmark for the CLI |
|
||||
| `src/lerobot/envs/factory.py` | Yes | Tells `make_env()` how to build your envs |
|
||||
| `src/lerobot/processor/env_processor.py` | Optional | Custom observation/action transforms |
|
||||
| `src/lerobot/envs/utils.py` | Optional | Only if you need new raw observation keys |
|
||||
| `pyproject.toml` | Yes | Declares benchmark-specific dependencies |
|
||||
| `docs/source/<benchmark>.mdx` | Yes | User-facing documentation page |
|
||||
| `docs/source/_toctree.yml` | Yes | Adds your page to the docs sidebar |
|
||||
| File | Required | Why |
|
||||
| ---------------------------------------- | -------- | ------------------------------------------------------------ |
|
||||
| `src/lerobot/envs/<benchmark>.py` | Yes | Wraps the simulator as a standard gym.Env |
|
||||
| `src/lerobot/envs/configs.py` | Yes | Registers your benchmark and its `create_envs()` for the CLI |
|
||||
| `src/lerobot/processor/env_processor.py` | Optional | Custom observation/action transforms |
|
||||
| `src/lerobot/envs/utils.py` | Optional | Only if you need new raw observation keys |
|
||||
| `pyproject.toml` | Yes | Declares benchmark-specific dependencies |
|
||||
| `docs/source/<benchmark>.mdx` | Yes | User-facing documentation page |
|
||||
| `docs/source/_toctree.yml` | Yes | Adds your page to the docs sidebar |
|
||||
|
||||
### 1. The gym.Env wrapper (`src/lerobot/envs/<benchmark>.py`)
|
||||
|
||||
@@ -162,6 +161,8 @@ class MyBenchmarkEnv(gym.Env):
|
||||
...
|
||||
```
|
||||
|
||||
**GPU-based simulators (e.g. MuJoCo with EGL rendering):** If your simulator allocates GPU/EGL contexts during `__init__`, defer that allocation to a `_ensure_env()` helper called on first `reset()`/`step()`. This avoids inheriting stale GPU handles when `AsyncVectorEnv` spawns worker processes. See `LiberoEnv._ensure_env()` for the pattern.
|
||||
|
||||
Also provide a factory function that returns the nested dict structure:
|
||||
|
||||
```python
|
||||
@@ -179,7 +180,10 @@ See `create_libero_envs()` (multi-suite, multi-task) and `create_metaworld_envs(
|
||||
|
||||
### 2. The config (`src/lerobot/envs/configs.py`)
|
||||
|
||||
Register a config dataclass so users can select your benchmark with `--env.type=<name>`:
|
||||
Register a config dataclass so users can select your benchmark with `--env.type=<name>`. Each config owns its environment creation and processor logic via two methods:
|
||||
|
||||
- **`create_envs(n_envs, use_async_envs)`** — Returns `{suite: {task_id: VectorEnv}}`. The base class default uses `gym.make()` for single-task envs. Multi-task benchmarks override this.
|
||||
- **`get_env_processors()`** — Returns `(preprocessor, postprocessor)`. The base class default returns identity (no-op) pipelines. Override if your benchmark needs observation/action transforms.
|
||||
|
||||
```python
|
||||
@EnvConfig.register_subclass("<benchmark_name>")
|
||||
@@ -204,6 +208,20 @@ class MyBenchmarkEnvConfig(EnvConfig):
|
||||
@property
|
||||
def gym_kwargs(self) -> dict:
|
||||
return {"obs_type": self.obs_type, "render_mode": self.render_mode}
|
||||
|
||||
def create_envs(self, n_envs: int, use_async_envs: bool = True):
|
||||
"""Override for multi-task benchmarks or custom env creation."""
|
||||
from lerobot.envs.<benchmark> import create_<benchmark>_envs
|
||||
return create_<benchmark>_envs(task=self.task, n_envs=n_envs, ...)
|
||||
|
||||
def get_env_processors(self):
|
||||
"""Override if your benchmark needs observation/action transforms."""
|
||||
from lerobot.processor.pipeline import PolicyProcessorPipeline
|
||||
from lerobot.processor.env_processor import MyBenchmarkProcessorStep
|
||||
return (
|
||||
PolicyProcessorPipeline(steps=[MyBenchmarkProcessorStep()]),
|
||||
PolicyProcessorPipeline(steps=[]),
|
||||
)
|
||||
```
|
||||
|
||||
Key points:
|
||||
@@ -211,36 +229,11 @@ Key points:
|
||||
- The `register_subclass` name is what users pass on the CLI (`--env.type=<name>`).
|
||||
- `features` tells the policy what the environment produces.
|
||||
- `features_map` maps raw observation keys to LeRobot convention keys.
|
||||
- **No changes to `factory.py` needed** — the factory delegates to `cfg.create_envs()` and `cfg.get_env_processors()` automatically.
|
||||
|
||||
### 3. The factory dispatch (`src/lerobot/envs/factory.py`)
|
||||
### 3. Env processor (optional — `src/lerobot/processor/env_processor.py`)
|
||||
|
||||
Add a branch in `make_env()` to call your factory function:
|
||||
|
||||
```python
|
||||
elif "<benchmark_name>" in cfg.type:
|
||||
from lerobot.envs.<benchmark> import create_<benchmark>_envs
|
||||
|
||||
if cfg.task is None:
|
||||
raise ValueError("<BenchmarkName> requires a task to be specified")
|
||||
|
||||
return create_<benchmark>_envs(
|
||||
task=cfg.task,
|
||||
n_envs=n_envs,
|
||||
gym_kwargs=cfg.gym_kwargs,
|
||||
env_cls=env_cls,
|
||||
)
|
||||
```
|
||||
|
||||
If your benchmark needs an env processor, add it in `make_env_pre_post_processors()`:
|
||||
|
||||
```python
|
||||
if isinstance(env_cfg, MyBenchmarkEnvConfig) or "<benchmark_name>" in env_cfg.type:
|
||||
preprocessor_steps.append(MyBenchmarkProcessorStep())
|
||||
```
|
||||
|
||||
### 4. Env processor (optional — `src/lerobot/processor/env_processor.py`)
|
||||
|
||||
Only needed if your benchmark requires observation transforms beyond what `preprocess_observation()` handles (e.g. image flipping, coordinate conversion):
|
||||
Only needed if your benchmark requires observation transforms beyond what `preprocess_observation()` handles (e.g. image flipping, coordinate conversion). Define the processor step here and return it from `get_env_processors()` in your config (see step 2):
|
||||
|
||||
```python
|
||||
@dataclass
|
||||
@@ -260,7 +253,7 @@ class MyBenchmarkProcessorStep(ObservationProcessorStep):
|
||||
|
||||
See `LiberoProcessorStep` for a full example (image rotation, quaternion-to-axis-angle conversion).
|
||||
|
||||
### 5. Dependencies (`pyproject.toml`)
|
||||
### 4. Dependencies (`pyproject.toml`)
|
||||
|
||||
Add a new optional-dependency group:
|
||||
|
||||
@@ -281,11 +274,11 @@ Users install with:
|
||||
pip install -e ".[mybenchmark]"
|
||||
```
|
||||
|
||||
### 6. Documentation (`docs/source/<benchmark>.mdx`)
|
||||
### 5. Documentation (`docs/source/<benchmark>.mdx`)
|
||||
|
||||
Write a user-facing page following the template in the next section. See `docs/source/libero.mdx` and `docs/source/metaworld.mdx` for full examples.
|
||||
|
||||
### 7. Table of contents (`docs/source/_toctree.yml`)
|
||||
### 6. Table of contents (`docs/source/_toctree.yml`)
|
||||
|
||||
Add your benchmark to the "Benchmarks" section:
|
||||
|
||||
|
||||
@@ -151,7 +151,7 @@ observation = {
|
||||
|
||||
### Factory Function
|
||||
|
||||
The `make_env_pre_post_processors` function follows the same pattern as `make_pre_post_processors` for policies:
|
||||
The `make_env_pre_post_processors` function delegates to `env_cfg.get_env_processors()`:
|
||||
|
||||
```python
|
||||
from lerobot.envs.factory import make_env_pre_post_processors
|
||||
@@ -159,47 +159,31 @@ from lerobot.envs.configs import LiberoEnv, PushtEnv
|
||||
|
||||
# For LIBERO: Returns LiberoProcessorStep in preprocessor
|
||||
libero_cfg = LiberoEnv(task="libero_spatial", camera_name=["agentview"])
|
||||
env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg)
|
||||
env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg, policy_cfg)
|
||||
|
||||
# For other environments: Returns identity processors (no-op)
|
||||
pusht_cfg = PushtEnv()
|
||||
env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg)
|
||||
env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg, policy_cfg)
|
||||
```
|
||||
|
||||
### Implementation in `envs/factory.py`
|
||||
### How It Works
|
||||
|
||||
Each `EnvConfig` subclass can override `get_env_processors()` to return benchmark-specific
|
||||
processor pipelines. The base class returns identity (no-op) processors by default.
|
||||
|
||||
```python
|
||||
def make_env_pre_post_processors(
|
||||
env_cfg: EnvConfig,
|
||||
) -> tuple[
|
||||
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
|
||||
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
|
||||
]:
|
||||
"""
|
||||
Create preprocessor and postprocessor pipelines for environment observations.
|
||||
|
||||
Args:
|
||||
env_cfg: The configuration of the environment.
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
- preprocessor: Pipeline that processes environment observations
|
||||
- postprocessor: Pipeline that processes environment outputs
|
||||
"""
|
||||
# For LIBERO environments, add the LiberoProcessorStep to preprocessor
|
||||
if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
|
||||
preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()])
|
||||
else:
|
||||
# For all other environments, return an identity preprocessor
|
||||
preprocessor = PolicyProcessorPipeline(steps=[])
|
||||
|
||||
# Postprocessor is currently identity for all environments
|
||||
# Future: Could add environment-specific action transformations
|
||||
postprocessor = PolicyProcessorPipeline(steps=[])
|
||||
|
||||
return preprocessor, postprocessor
|
||||
# In your EnvConfig subclass:
|
||||
def get_env_processors(self):
|
||||
from lerobot.processor.pipeline import PolicyProcessorPipeline
|
||||
return (
|
||||
PolicyProcessorPipeline(steps=[MyProcessorStep()]),
|
||||
PolicyProcessorPipeline(steps=[]),
|
||||
)
|
||||
```
|
||||
|
||||
The factory function `make_env_pre_post_processors` simply delegates to this method,
|
||||
with a special case for `XVLAConfig` policies which override the env processors entirely.
|
||||
|
||||
### Integration in Evaluation
|
||||
|
||||
In `lerobot_eval.py`, the environment processors are created once and used throughout:
|
||||
|
||||
@@ -220,6 +220,8 @@ lerobot-replay="lerobot.scripts.lerobot_replay:main"
|
||||
lerobot-setup-motors="lerobot.scripts.lerobot_setup_motors:main"
|
||||
lerobot-teleoperate="lerobot.scripts.lerobot_teleoperate:main"
|
||||
lerobot-eval="lerobot.scripts.lerobot_eval:main"
|
||||
lerobot-eval-parallel="lerobot.scripts.lerobot_eval_parallel:main"
|
||||
lerobot-eval-autotune="lerobot.scripts.lerobot_eval_autotune:main"
|
||||
lerobot-train="lerobot.scripts.lerobot_train:main"
|
||||
lerobot-train-tokenizer="lerobot.scripts.lerobot_train_tokenizer:main"
|
||||
lerobot-dataset-viz="lerobot.scripts.lerobot_dataset_viz:main"
|
||||
|
||||
@@ -67,7 +67,13 @@ class EvalConfig:
|
||||
# `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv.
|
||||
batch_size: int = 50
|
||||
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
|
||||
use_async_envs: bool = False
|
||||
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
|
||||
use_async_envs: bool = True
|
||||
# Sharding: split n_episodes across independent processes.
|
||||
# shard_id=0, num_shards=1 is the default (no sharding, existing behaviour).
|
||||
# Set via lerobot_eval_parallel or manually: --eval.shard_id=K --eval.num_shards=N
|
||||
shard_id: int = 0
|
||||
num_shards: int = 1
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.batch_size > self.n_episodes:
|
||||
@@ -79,6 +85,12 @@ class EvalConfig:
|
||||
f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={self.batch_size}`), "
|
||||
f"or lower the batch size (e.g. `eval.batch_size={self.n_episodes}`)."
|
||||
)
|
||||
if self.num_shards < 1:
|
||||
raise ValueError(f"`num_shards` must be >= 1, got {self.num_shards}")
|
||||
if not (0 <= self.shard_id < self.num_shards):
|
||||
raise ValueError(
|
||||
f"`shard_id` must be in [0, num_shards), got shard_id={self.shard_id}, num_shards={self.num_shards}"
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -12,11 +12,16 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import importlib
|
||||
from dataclasses import dataclass, field, fields
|
||||
from typing import Any
|
||||
|
||||
import draccus
|
||||
import gymnasium as gym
|
||||
from gymnasium.envs.registration import registry as gym_registry
|
||||
|
||||
from lerobot.configs.types import FeatureType, PolicyFeature
|
||||
from lerobot.robots import RobotConfig
|
||||
@@ -39,6 +44,13 @@ from lerobot.utils.constants import (
|
||||
)
|
||||
|
||||
|
||||
def _make_vec_env_cls(use_async: bool, n_envs: int):
|
||||
"""Return the right VectorEnv constructor."""
|
||||
if use_async and n_envs > 1:
|
||||
return gym.vector.AsyncVectorEnv
|
||||
return gym.vector.SyncVectorEnv
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
|
||||
task: str | None = None
|
||||
@@ -67,6 +79,50 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
|
||||
def gym_kwargs(self) -> dict:
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_envs(
|
||||
self,
|
||||
n_envs: int,
|
||||
use_async_envs: bool = True,
|
||||
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
|
||||
"""Create {suite: {task_id: VectorEnv}}.
|
||||
|
||||
Default: single-task env via gym.make(). Multi-task benchmarks override.
|
||||
AsyncVectorEnv is the default for n_envs > 1; auto-downgraded to Sync for n_envs=1.
|
||||
"""
|
||||
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
|
||||
|
||||
if self.gym_id not in gym_registry:
|
||||
print(f"gym id '{self.gym_id}' not found, attempting to import '{self.package_name}'...")
|
||||
try:
|
||||
importlib.import_module(self.package_name)
|
||||
except ModuleNotFoundError as e:
|
||||
raise ModuleNotFoundError(
|
||||
f"Package '{self.package_name}' required for env '{self.type}' not found. "
|
||||
f"Please install it or check PYTHONPATH."
|
||||
) from e
|
||||
|
||||
if self.gym_id not in gym_registry:
|
||||
raise gym.error.NameNotFound(
|
||||
f"Environment '{self.gym_id}' not registered even after importing '{self.package_name}'."
|
||||
)
|
||||
|
||||
def _make_one():
|
||||
return gym.make(self.gym_id, disable_env_checker=self.disable_env_checker, **self.gym_kwargs)
|
||||
|
||||
try:
|
||||
from gymnasium.vector import AutoresetMode
|
||||
|
||||
vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=AutoresetMode.SAME_STEP)
|
||||
except ImportError:
|
||||
vec = env_cls([_make_one for _ in range(n_envs)])
|
||||
return {self.type: {0: vec}}
|
||||
|
||||
def get_env_processors(self):
|
||||
"""Return (preprocessor, postprocessor) for this env. Default: identity."""
|
||||
from lerobot.processor.pipeline import PolicyProcessorPipeline
|
||||
|
||||
return PolicyProcessorPipeline(steps=[]), PolicyProcessorPipeline(steps=[])
|
||||
|
||||
|
||||
@dataclass
|
||||
class HubEnvConfig(EnvConfig):
|
||||
@@ -338,6 +394,12 @@ class LiberoEnv(EnvConfig):
|
||||
else:
|
||||
raise ValueError(f"Unsupported obs_type: {self.obs_type}")
|
||||
|
||||
if self.camera_name_mapping is not None:
|
||||
mapped_agentview = self.camera_name_mapping.get("agentview_image", "image")
|
||||
mapped_eye_in_hand = self.camera_name_mapping.get("robot0_eye_in_hand_image", "image2")
|
||||
self.features_map[LIBERO_KEY_PIXELS_AGENTVIEW] = f"{OBS_IMAGES}.{mapped_agentview}"
|
||||
self.features_map[LIBERO_KEY_PIXELS_EYE_IN_HAND] = f"{OBS_IMAGES}.{mapped_eye_in_hand}"
|
||||
|
||||
@property
|
||||
def gym_kwargs(self) -> dict:
|
||||
kwargs: dict[str, Any] = {"obs_type": self.obs_type, "render_mode": self.render_mode}
|
||||
@@ -345,6 +407,33 @@ class LiberoEnv(EnvConfig):
|
||||
kwargs["task_ids"] = self.task_ids
|
||||
return kwargs
|
||||
|
||||
def create_envs(self, n_envs: int, use_async_envs: bool = True):
|
||||
from lerobot.envs.libero import create_libero_envs
|
||||
|
||||
if self.task is None:
|
||||
raise ValueError("LiberoEnv requires a task to be specified")
|
||||
env_cls = _make_vec_env_cls(use_async_envs, n_envs)
|
||||
return create_libero_envs(
|
||||
task=self.task,
|
||||
n_envs=n_envs,
|
||||
camera_name=self.camera_name,
|
||||
init_states=self.init_states,
|
||||
gym_kwargs=self.gym_kwargs,
|
||||
env_cls=env_cls,
|
||||
control_mode=self.control_mode,
|
||||
episode_length=self.episode_length,
|
||||
camera_name_mapping=self.camera_name_mapping,
|
||||
)
|
||||
|
||||
def get_env_processors(self):
|
||||
from lerobot.processor.env_processor import LiberoProcessorStep
|
||||
from lerobot.processor.pipeline import PolicyProcessorPipeline
|
||||
|
||||
return (
|
||||
PolicyProcessorPipeline(steps=[LiberoProcessorStep()]),
|
||||
PolicyProcessorPipeline(steps=[]),
|
||||
)
|
||||
|
||||
|
||||
@EnvConfig.register_subclass("metaworld")
|
||||
@dataclass
|
||||
@@ -387,6 +476,19 @@ class MetaworldEnv(EnvConfig):
|
||||
"render_mode": self.render_mode,
|
||||
}
|
||||
|
||||
def create_envs(self, n_envs: int, use_async_envs: bool = True):
|
||||
from lerobot.envs.metaworld import create_metaworld_envs
|
||||
|
||||
if self.task is None:
|
||||
raise ValueError("MetaWorld requires a task to be specified")
|
||||
env_cls = _make_vec_env_cls(use_async_envs, n_envs)
|
||||
return create_metaworld_envs(
|
||||
task=self.task,
|
||||
n_envs=n_envs,
|
||||
gym_kwargs=self.gym_kwargs,
|
||||
env_cls=env_cls,
|
||||
)
|
||||
|
||||
|
||||
@EnvConfig.register_subclass("isaaclab_arena")
|
||||
@dataclass
|
||||
@@ -454,3 +556,18 @@ class IsaaclabArenaEnv(HubEnvConfig):
|
||||
@property
|
||||
def gym_kwargs(self) -> dict:
|
||||
return {}
|
||||
|
||||
def get_env_processors(self):
|
||||
from lerobot.processor.env_processor import IsaaclabArenaProcessorStep
|
||||
from lerobot.processor.pipeline import PolicyProcessorPipeline
|
||||
|
||||
state_keys = tuple(k.strip() for k in (self.state_keys or "").split(",") if k.strip())
|
||||
camera_keys = tuple(k.strip() for k in (self.camera_keys or "").split(",") if k.strip())
|
||||
if not state_keys and not camera_keys:
|
||||
raise ValueError("At least one of state_keys or camera_keys must be specified.")
|
||||
return (
|
||||
PolicyProcessorPipeline(
|
||||
steps=[IsaaclabArenaProcessorStep(state_keys=state_keys, camera_keys=camera_keys)]
|
||||
),
|
||||
PolicyProcessorPipeline(steps=[]),
|
||||
)
|
||||
|
||||
+21
-118
@@ -13,96 +13,52 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import importlib
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import gymnasium as gym
|
||||
from gymnasium.envs.registration import registry as gym_registry
|
||||
|
||||
from lerobot.configs.policies import PreTrainedConfig
|
||||
from lerobot.envs.configs import AlohaEnv, EnvConfig, HubEnvConfig, IsaaclabArenaEnv, LiberoEnv, PushtEnv
|
||||
from lerobot.envs.configs import EnvConfig, HubEnvConfig
|
||||
from lerobot.envs.utils import _call_make_env, _download_hub_file, _import_hub_module, _normalize_hub_result
|
||||
from lerobot.policies.xvla.configuration_xvla import XVLAConfig
|
||||
from lerobot.processor import ProcessorStep
|
||||
from lerobot.processor.env_processor import IsaaclabArenaProcessorStep, LiberoProcessorStep
|
||||
from lerobot.processor.pipeline import PolicyProcessorPipeline
|
||||
|
||||
|
||||
def make_env_config(env_type: str, **kwargs) -> EnvConfig:
|
||||
if env_type == "aloha":
|
||||
return AlohaEnv(**kwargs)
|
||||
elif env_type == "pusht":
|
||||
return PushtEnv(**kwargs)
|
||||
elif env_type == "libero":
|
||||
return LiberoEnv(**kwargs)
|
||||
else:
|
||||
raise ValueError(f"Policy type '{env_type}' is not available.")
|
||||
try:
|
||||
cls = EnvConfig.get_choice_class(env_type)
|
||||
except KeyError as err:
|
||||
raise ValueError(
|
||||
f"Environment type '{env_type}' is not registered. "
|
||||
f"Available: {list(EnvConfig.get_known_choices().keys())}"
|
||||
) from err
|
||||
return cls(**kwargs)
|
||||
|
||||
|
||||
def make_env_pre_post_processors(
|
||||
env_cfg: EnvConfig,
|
||||
policy_cfg: PreTrainedConfig,
|
||||
) -> tuple[
|
||||
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
|
||||
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
|
||||
]:
|
||||
policy_cfg: Any,
|
||||
) -> tuple[Any, Any]:
|
||||
"""
|
||||
Create preprocessor and postprocessor pipelines for environment observations.
|
||||
|
||||
This function creates processor pipelines that transform raw environment
|
||||
observations and actions. By default, it returns identity processors that do nothing.
|
||||
For specific environments like LIBERO, it adds environment-specific processing steps.
|
||||
|
||||
Args:
|
||||
env_cfg: The configuration of the environment.
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
- preprocessor: Pipeline that processes environment observations
|
||||
- postprocessor: Pipeline that processes environment outputs (currently identity)
|
||||
Returns a tuple of (preprocessor, postprocessor). By default, delegates to
|
||||
``env_cfg.get_env_processors()``. The XVLAConfig policy-specific override
|
||||
stays here because it depends on the *policy* config, not the env config.
|
||||
"""
|
||||
# Preprocessor and Postprocessor steps are Identity for most environments
|
||||
preprocessor_steps: list[ProcessorStep] = []
|
||||
postprocessor_steps: list[ProcessorStep] = []
|
||||
from lerobot.policies.xvla.configuration_xvla import XVLAConfig
|
||||
|
||||
if isinstance(policy_cfg, XVLAConfig):
|
||||
from lerobot.policies.xvla.processor_xvla import make_xvla_libero_pre_post_processors
|
||||
|
||||
return make_xvla_libero_pre_post_processors()
|
||||
|
||||
# For LIBERO environments, add the LiberoProcessorStep to preprocessor
|
||||
if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
|
||||
preprocessor_steps.append(LiberoProcessorStep())
|
||||
|
||||
# For Isaaclab Arena environments, add the IsaaclabArenaProcessorStep
|
||||
if isinstance(env_cfg, IsaaclabArenaEnv) or "isaaclab_arena" in env_cfg.type:
|
||||
# Parse comma-separated keys (handle None for state-based policies)
|
||||
if env_cfg.state_keys:
|
||||
state_keys = tuple(k.strip() for k in env_cfg.state_keys.split(",") if k.strip())
|
||||
else:
|
||||
state_keys = ()
|
||||
if env_cfg.camera_keys:
|
||||
camera_keys = tuple(k.strip() for k in env_cfg.camera_keys.split(",") if k.strip())
|
||||
else:
|
||||
camera_keys = ()
|
||||
if not state_keys and not camera_keys:
|
||||
raise ValueError("At least one of state_keys or camera_keys must be specified.")
|
||||
preprocessor_steps.append(
|
||||
IsaaclabArenaProcessorStep(
|
||||
state_keys=state_keys,
|
||||
camera_keys=camera_keys,
|
||||
)
|
||||
)
|
||||
|
||||
preprocessor = PolicyProcessorPipeline(steps=preprocessor_steps)
|
||||
postprocessor = PolicyProcessorPipeline(steps=postprocessor_steps)
|
||||
|
||||
return preprocessor, postprocessor
|
||||
return env_cfg.get_env_processors()
|
||||
|
||||
|
||||
def make_env(
|
||||
cfg: EnvConfig | str,
|
||||
n_envs: int = 1,
|
||||
use_async_envs: bool = False,
|
||||
use_async_envs: bool = True,
|
||||
hub_cache_dir: str | None = None,
|
||||
trust_remote_code: bool = False,
|
||||
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
|
||||
@@ -163,57 +119,4 @@ def make_env(
|
||||
if n_envs < 1:
|
||||
raise ValueError("`n_envs` must be at least 1")
|
||||
|
||||
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
|
||||
|
||||
if "libero" in cfg.type:
|
||||
from lerobot.envs.libero import create_libero_envs
|
||||
|
||||
if cfg.task is None:
|
||||
raise ValueError("LiberoEnv requires a task to be specified")
|
||||
|
||||
return create_libero_envs(
|
||||
task=cfg.task,
|
||||
n_envs=n_envs,
|
||||
camera_name=cfg.camera_name,
|
||||
init_states=cfg.init_states,
|
||||
gym_kwargs=cfg.gym_kwargs,
|
||||
env_cls=env_cls,
|
||||
control_mode=cfg.control_mode,
|
||||
episode_length=cfg.episode_length,
|
||||
)
|
||||
elif "metaworld" in cfg.type:
|
||||
from lerobot.envs.metaworld import create_metaworld_envs
|
||||
|
||||
if cfg.task is None:
|
||||
raise ValueError("MetaWorld requires a task to be specified")
|
||||
|
||||
return create_metaworld_envs(
|
||||
task=cfg.task,
|
||||
n_envs=n_envs,
|
||||
gym_kwargs=cfg.gym_kwargs,
|
||||
env_cls=env_cls,
|
||||
)
|
||||
|
||||
if cfg.gym_id not in gym_registry:
|
||||
print(f"gym id '{cfg.gym_id}' not found, attempting to import '{cfg.package_name}'...")
|
||||
try:
|
||||
importlib.import_module(cfg.package_name)
|
||||
except ModuleNotFoundError as e:
|
||||
raise ModuleNotFoundError(
|
||||
f"Package '{cfg.package_name}' required for env '{cfg.type}' not found. "
|
||||
f"Please install it or check PYTHONPATH."
|
||||
) from e
|
||||
|
||||
if cfg.gym_id not in gym_registry:
|
||||
raise gym.error.NameNotFound(
|
||||
f"Environment '{cfg.gym_id}' not registered even after importing '{cfg.package_name}'."
|
||||
)
|
||||
|
||||
def _make_one():
|
||||
return gym.make(cfg.gym_id, disable_env_checker=cfg.disable_env_checker, **(cfg.gym_kwargs or {}))
|
||||
|
||||
vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=gym.vector.AutoresetMode.SAME_STEP)
|
||||
|
||||
# normalize to {suite: {task_id: vec_env}} for consistency
|
||||
suite_name = cfg.type # e.g., "pusht", "aloha"
|
||||
return {suite_name: {0: vec}}
|
||||
return cfg.create_envs(n_envs=n_envs, use_async_envs=use_async_envs)
|
||||
|
||||
+98
-26
@@ -150,7 +150,17 @@ class LiberoEnv(gym.Env):
|
||||
|
||||
self.init_state_id = self.episode_index # tie each sub-env to a fixed init state
|
||||
|
||||
self._env = self._make_envs_task(task_suite, self.task_id)
|
||||
# Extract task metadata without allocating GPU resources (safe before fork).
|
||||
task = task_suite.get_task(task_id)
|
||||
self.task = task.name
|
||||
self.task_description = task.language
|
||||
self._task_bddl_file = os.path.join(
|
||||
get_libero_path("bddl_files"), task.problem_folder, task.bddl_file
|
||||
)
|
||||
self._env: OffScreenRenderEnv | None = (
|
||||
None # deferred — created on first reset() inside the worker subprocess
|
||||
)
|
||||
|
||||
default_steps = 500
|
||||
self._max_episode_steps = (
|
||||
TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps)
|
||||
@@ -221,28 +231,33 @@ class LiberoEnv(gym.Env):
|
||||
low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32
|
||||
)
|
||||
|
||||
def _ensure_env(self) -> None:
|
||||
"""Create the underlying OffScreenRenderEnv on first use.
|
||||
|
||||
Called inside the worker subprocess after fork(), so each worker gets
|
||||
its own clean EGL context rather than inheriting a stale one from the
|
||||
parent process (which causes EGL_BAD_CONTEXT crashes with AsyncVectorEnv).
|
||||
"""
|
||||
if self._env is not None:
|
||||
return
|
||||
env = OffScreenRenderEnv(
|
||||
bddl_file_name=self._task_bddl_file,
|
||||
camera_heights=self.observation_height,
|
||||
camera_widths=self.observation_width,
|
||||
)
|
||||
env.reset()
|
||||
self._env = env
|
||||
|
||||
def render(self):
|
||||
self._ensure_env()
|
||||
raw_obs = self._env.env._get_observations()
|
||||
image = self._format_raw_obs(raw_obs)["pixels"]["image"]
|
||||
pixels = self._format_raw_obs(raw_obs)["pixels"]
|
||||
image = next(iter(pixels.values()))
|
||||
image = image[::-1, ::-1] # flip both H and W for visualization
|
||||
return image
|
||||
|
||||
def _make_envs_task(self, task_suite: Any, task_id: int = 0):
|
||||
task = task_suite.get_task(task_id)
|
||||
self.task = task.name
|
||||
self.task_description = task.language
|
||||
task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file)
|
||||
|
||||
env_args = {
|
||||
"bddl_file_name": task_bddl_file,
|
||||
"camera_heights": self.observation_height,
|
||||
"camera_widths": self.observation_width,
|
||||
}
|
||||
env = OffScreenRenderEnv(**env_args)
|
||||
env.reset()
|
||||
return env
|
||||
|
||||
def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation:
|
||||
assert self._env is not None, "_format_raw_obs called before _ensure_env()"
|
||||
images = {}
|
||||
for camera_name in self.camera_name:
|
||||
image = raw_obs[camera_name]
|
||||
@@ -294,6 +309,7 @@ class LiberoEnv(gym.Env):
|
||||
)
|
||||
|
||||
def reset(self, seed=None, **kwargs):
|
||||
self._ensure_env()
|
||||
super().reset(seed=seed)
|
||||
self._env.seed(seed)
|
||||
raw_obs = self._env.reset()
|
||||
@@ -320,6 +336,8 @@ class LiberoEnv(gym.Env):
|
||||
return observation, info
|
||||
|
||||
def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]:
|
||||
self._ensure_env()
|
||||
assert self._env is not None
|
||||
if action.ndim != 1:
|
||||
raise ValueError(
|
||||
f"Expected action to be 1-D (shape (action_dim,)), "
|
||||
@@ -339,18 +357,13 @@ class LiberoEnv(gym.Env):
|
||||
)
|
||||
observation = self._format_raw_obs(raw_obs)
|
||||
if terminated:
|
||||
info["final_info"] = {
|
||||
"task": self.task,
|
||||
"task_id": self.task_id,
|
||||
"done": bool(done),
|
||||
"is_success": bool(is_success),
|
||||
}
|
||||
self.reset()
|
||||
truncated = False
|
||||
return observation, reward, terminated, truncated, info
|
||||
|
||||
def close(self):
|
||||
self._env.close()
|
||||
if self._env is not None:
|
||||
self._env.close()
|
||||
|
||||
|
||||
def _make_env_fns(
|
||||
@@ -364,6 +377,7 @@ def _make_env_fns(
|
||||
init_states: bool,
|
||||
gym_kwargs: Mapping[str, Any],
|
||||
control_mode: str,
|
||||
camera_name_mapping: dict[str, str] | None = None,
|
||||
) -> list[Callable[[], LiberoEnv]]:
|
||||
"""Build n_envs factory callables for a single (suite, task_id)."""
|
||||
|
||||
@@ -379,6 +393,7 @@ def _make_env_fns(
|
||||
episode_index=episode_index,
|
||||
n_envs=n_envs,
|
||||
control_mode=control_mode,
|
||||
camera_name_mapping=camera_name_mapping,
|
||||
**local_kwargs,
|
||||
)
|
||||
|
||||
@@ -388,6 +403,57 @@ def _make_env_fns(
|
||||
return fns
|
||||
|
||||
|
||||
class _LazyAsyncVectorEnv:
|
||||
"""Wrapper that defers AsyncVectorEnv creation until first use.
|
||||
|
||||
Creating all tasks' AsyncVectorEnvs upfront spawns N_tasks × n_envs worker
|
||||
processes, all of which allocate EGL/GPU resources immediately. Since tasks
|
||||
are evaluated sequentially, only one task's workers need to be alive at a
|
||||
time. This wrapper stores the factory functions and creates the real
|
||||
AsyncVectorEnv on first reset(), keeping peak process count = n_envs.
|
||||
"""
|
||||
|
||||
def __init__(self, env_fns: list[Callable]):
|
||||
self._env_fns = env_fns
|
||||
self._env: gym.vector.AsyncVectorEnv | None = None
|
||||
self.num_envs = len(env_fns)
|
||||
# Instantiate one env to expose spaces (no GPU — _ensure_env is lazy).
|
||||
tmp = env_fns[0]()
|
||||
self.observation_space = tmp.observation_space
|
||||
self.action_space = tmp.action_space
|
||||
self.single_observation_space = tmp.observation_space
|
||||
self.single_action_space = tmp.action_space
|
||||
tmp.close()
|
||||
|
||||
def _ensure(self):
|
||||
if self._env is None:
|
||||
self._env = gym.vector.AsyncVectorEnv(self._env_fns, context="forkserver")
|
||||
|
||||
def reset(self, **kwargs):
|
||||
self._ensure()
|
||||
return self._env.reset(**kwargs)
|
||||
|
||||
def step(self, actions):
|
||||
self._ensure()
|
||||
return self._env.step(actions)
|
||||
|
||||
def call(self, name, *args, **kwargs):
|
||||
self._ensure()
|
||||
return self._env.call(name, *args, **kwargs)
|
||||
|
||||
def get_attr(self, name):
|
||||
self._ensure()
|
||||
return self._env.get_attr(name)
|
||||
|
||||
def close(self):
|
||||
if self._env is not None:
|
||||
self._env.close()
|
||||
self._env = None
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
|
||||
# ---- Main API ----------------------------------------------------------------
|
||||
|
||||
|
||||
@@ -400,6 +466,7 @@ def create_libero_envs(
|
||||
env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None,
|
||||
control_mode: str = "relative",
|
||||
episode_length: int | None = None,
|
||||
camera_name_mapping: dict[str, str] | None = None,
|
||||
) -> dict[str, dict[int, Any]]:
|
||||
"""
|
||||
Create vectorized LIBERO environments with a consistent return shape.
|
||||
@@ -430,6 +497,8 @@ def create_libero_envs(
|
||||
if task_ids_filter is not None:
|
||||
print(f"Restricting to task_ids={task_ids_filter}")
|
||||
|
||||
is_async = env_cls is gym.vector.AsyncVectorEnv
|
||||
|
||||
out: dict[str, dict[int, Any]] = defaultdict(dict)
|
||||
for suite_name in suite_names:
|
||||
suite = _get_suite(suite_name)
|
||||
@@ -449,9 +518,12 @@ def create_libero_envs(
|
||||
init_states=init_states,
|
||||
gym_kwargs=gym_kwargs,
|
||||
control_mode=control_mode,
|
||||
camera_name_mapping=camera_name_mapping,
|
||||
)
|
||||
out[suite_name][tid] = env_cls(fns)
|
||||
if is_async:
|
||||
out[suite_name][tid] = _LazyAsyncVectorEnv(fns)
|
||||
else:
|
||||
out[suite_name][tid] = env_cls(fns)
|
||||
print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}")
|
||||
|
||||
# return plain dicts for predictability
|
||||
return {suite: dict(task_map) for suite, task_map in out.items()}
|
||||
|
||||
@@ -97,8 +97,9 @@ class MetaworldEnv(gym.Env):
|
||||
self.visualization_height = visualization_height
|
||||
self.camera_name = camera_name
|
||||
|
||||
self._env = self._make_envs_task(self.task)
|
||||
self._max_episode_steps = self._env.max_path_length
|
||||
self._env_name = self.task # already stripped of "metaworld-" prefix above
|
||||
self._env = None # deferred — created on first reset() inside the worker subprocess
|
||||
self._max_episode_steps = 500 # MT1 environments always have max_path_length=500
|
||||
self.task_description = TASK_DESCRIPTIONS[self.task]
|
||||
|
||||
self.expert_policy = TASK_POLICY_MAPPING[self.task]()
|
||||
@@ -136,6 +137,24 @@ class MetaworldEnv(gym.Env):
|
||||
|
||||
self.action_space = spaces.Box(low=-1, high=1, shape=(ACTION_DIM,), dtype=np.float32)
|
||||
|
||||
def _ensure_env(self) -> None:
|
||||
"""Create the underlying MetaWorld env on first use.
|
||||
|
||||
Called inside the worker subprocess after fork(), so each worker gets
|
||||
its own clean rendering context rather than inheriting a stale one from
|
||||
the parent process (which causes crashes with AsyncVectorEnv).
|
||||
"""
|
||||
if self._env is not None:
|
||||
return
|
||||
mt1 = metaworld.MT1(self._env_name, seed=42)
|
||||
env = mt1.train_classes[self._env_name](render_mode="rgb_array", camera_name=self.camera_name)
|
||||
env.set_task(mt1.train_tasks[0])
|
||||
if self.camera_name == "corner2":
|
||||
env.model.cam_pos[2] = [0.75, 0.075, 0.7]
|
||||
env.reset()
|
||||
env._freeze_rand_vec = False # otherwise no randomization
|
||||
self._env = env
|
||||
|
||||
def render(self) -> np.ndarray:
|
||||
"""
|
||||
Render the current environment frame.
|
||||
@@ -143,26 +162,13 @@ class MetaworldEnv(gym.Env):
|
||||
Returns:
|
||||
np.ndarray: The rendered RGB image from the environment.
|
||||
"""
|
||||
self._ensure_env()
|
||||
image = self._env.render()
|
||||
if self.camera_name == "corner2":
|
||||
# Images from this camera are flipped — correct them
|
||||
image = np.flip(image, (0, 1))
|
||||
return image
|
||||
|
||||
def _make_envs_task(self, env_name: str):
|
||||
mt1 = metaworld.MT1(env_name, seed=42)
|
||||
env = mt1.train_classes[env_name](render_mode="rgb_array", camera_name=self.camera_name)
|
||||
env.set_task(mt1.train_tasks[0])
|
||||
if self.camera_name == "corner2":
|
||||
env.model.cam_pos[2] = [
|
||||
0.75,
|
||||
0.075,
|
||||
0.7,
|
||||
] # corner2 position, similar to https://arxiv.org/pdf/2206.14244
|
||||
env.reset()
|
||||
env._freeze_rand_vec = False # otherwise no randomization
|
||||
return env
|
||||
|
||||
def _format_raw_obs(self, raw_obs: np.ndarray) -> RobotObservation:
|
||||
image = None
|
||||
if self._env is not None:
|
||||
@@ -209,6 +215,7 @@ class MetaworldEnv(gym.Env):
|
||||
observation (RobotObservation): The initial formatted observation.
|
||||
info (Dict[str, Any]): Additional info about the reset state.
|
||||
"""
|
||||
self._ensure_env()
|
||||
super().reset(seed=seed)
|
||||
|
||||
raw_obs, info = self._env.reset(seed=seed)
|
||||
@@ -232,6 +239,7 @@ class MetaworldEnv(gym.Env):
|
||||
truncated (bool): Whether the episode was truncated due to a time limit.
|
||||
info (Dict[str, Any]): Additional environment info.
|
||||
"""
|
||||
self._ensure_env()
|
||||
if action.ndim != 1:
|
||||
raise ValueError(
|
||||
f"Expected action to be 1-D (shape (action_dim,)), "
|
||||
@@ -263,7 +271,8 @@ class MetaworldEnv(gym.Env):
|
||||
return observation, reward, terminated, truncated, info
|
||||
|
||||
def close(self):
|
||||
self._env.close()
|
||||
if self._env is not None:
|
||||
self._env.close()
|
||||
|
||||
|
||||
# ---- Main API ----------------------------------------------------------------
|
||||
|
||||
+21
-26
@@ -130,56 +130,51 @@ def env_to_policy_features(env_cfg: EnvConfig) -> dict[str, PolicyFeature]:
|
||||
return policy_features
|
||||
|
||||
|
||||
def are_all_envs_same_type(env: gym.vector.VectorEnv) -> bool:
|
||||
first_type = type(env.envs[0]) # Get type of first env
|
||||
return all(type(e) is first_type for e in env.envs) # Fast type check
|
||||
def _get_sub_env_attr(env: gym.vector.VectorEnv, attr: str, index: int = 0):
|
||||
"""Retrieve an attribute from a sub-environment, works for both Sync and Async."""
|
||||
try:
|
||||
return env.get_attr(attr)[index]
|
||||
except (AttributeError, Exception):
|
||||
return None
|
||||
|
||||
|
||||
def _sub_env_has_attr(env: gym.vector.VectorEnv, attr: str) -> bool:
|
||||
try:
|
||||
env.get_attr(attr)
|
||||
return True
|
||||
except (AttributeError, Exception):
|
||||
return False
|
||||
|
||||
|
||||
def check_env_attributes_and_types(env: gym.vector.VectorEnv) -> None:
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("once", UserWarning) # Apply filter only in this function
|
||||
warnings.simplefilter("once", UserWarning)
|
||||
|
||||
if not (hasattr(env.envs[0], "task_description") and hasattr(env.envs[0], "task")):
|
||||
if not (_sub_env_has_attr(env, "task_description") and _sub_env_has_attr(env, "task")):
|
||||
warnings.warn(
|
||||
"The environment does not have 'task_description' and 'task'. Some policies require these features.",
|
||||
UserWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
if not are_all_envs_same_type(env):
|
||||
warnings.warn(
|
||||
"The environments have different types. Make sure you infer the right task from each environment. Empty task will be passed instead.",
|
||||
UserWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
def add_envs_task(env: gym.vector.VectorEnv, observation: RobotObservation) -> RobotObservation:
|
||||
"""Adds task feature to the observation dict with respect to the first environment attribute."""
|
||||
if hasattr(env.envs[0], "task_description"):
|
||||
task_result = env.call("task_description")
|
||||
if _sub_env_has_attr(env, "task_description"):
|
||||
task_result = list(env.call("task_description"))
|
||||
|
||||
if isinstance(task_result, tuple):
|
||||
task_result = list(task_result)
|
||||
|
||||
if not isinstance(task_result, list):
|
||||
raise TypeError(f"Expected task_description to return a list, got {type(task_result)}")
|
||||
if not all(isinstance(item, str) for item in task_result):
|
||||
raise TypeError("All items in task_description result must be strings")
|
||||
|
||||
observation["task"] = task_result
|
||||
elif hasattr(env.envs[0], "task"):
|
||||
task_result = env.call("task")
|
||||
elif _sub_env_has_attr(env, "task"):
|
||||
task_result = list(env.call("task"))
|
||||
|
||||
if isinstance(task_result, tuple):
|
||||
task_result = list(task_result)
|
||||
|
||||
if not isinstance(task_result, list):
|
||||
raise TypeError(f"Expected task to return a list, got {type(task_result)}")
|
||||
if not all(isinstance(item, str) for item in task_result):
|
||||
raise TypeError("All items in task result must be strings")
|
||||
|
||||
observation["task"] = task_result
|
||||
else: # For envs without language instructions, e.g. aloha transfer cube and etc.
|
||||
else:
|
||||
num_envs = observation[list(observation.keys())[0]].shape[0]
|
||||
observation["task"] = ["" for _ in range(num_envs)]
|
||||
return observation
|
||||
|
||||
@@ -136,8 +136,8 @@ class TokenizerProcessorStep(ObservationProcessorStep):
|
||||
# Standardize to a list of strings for the tokenizer
|
||||
if isinstance(task, str):
|
||||
return [task]
|
||||
elif isinstance(task, list) and all(isinstance(t, str) for t in task):
|
||||
return task
|
||||
elif isinstance(task, (list, tuple)) and all(isinstance(t, str) for t in task):
|
||||
return list(task)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@@ -47,8 +47,10 @@ You can learn about the CLI options for this script in the `EvalPipelineConfig`
|
||||
"""
|
||||
|
||||
import concurrent.futures as cf
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import threading
|
||||
import time
|
||||
from collections import defaultdict
|
||||
@@ -56,7 +58,6 @@ from collections.abc import Callable
|
||||
from contextlib import nullcontext
|
||||
from copy import deepcopy
|
||||
from dataclasses import asdict
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
from typing import Any, TypedDict
|
||||
@@ -73,7 +74,6 @@ from lerobot.configs import parser
|
||||
from lerobot.configs.eval import EvalPipelineConfig
|
||||
from lerobot.envs.factory import make_env, make_env_pre_post_processors
|
||||
from lerobot.envs.utils import (
|
||||
add_envs_task,
|
||||
check_env_attributes_and_types,
|
||||
close_envs,
|
||||
preprocess_observation,
|
||||
@@ -93,6 +93,14 @@ from lerobot.utils.utils import (
|
||||
)
|
||||
|
||||
|
||||
def _shard_episodes(n_episodes: int, shard_id: int, num_shards: int) -> list[int]:
|
||||
"""Return the episode indices assigned to this shard (round-robin distribution).
|
||||
|
||||
Example: _shard_episodes(10, 1, 4) -> [1, 5, 9]
|
||||
"""
|
||||
return list(range(shard_id, n_episodes, num_shards))
|
||||
|
||||
|
||||
def rollout(
|
||||
env: gym.vector.VectorEnv,
|
||||
policy: PreTrainedPolicy,
|
||||
@@ -166,9 +174,15 @@ def rollout(
|
||||
if return_observations:
|
||||
all_observations.append(deepcopy(observation))
|
||||
|
||||
# Infer "task" from attributes of environments.
|
||||
# TODO: works with SyncVectorEnv but not AsyncVectorEnv
|
||||
observation = add_envs_task(env, observation)
|
||||
# Infer "task" from sub-environments (prefer natural language description).
|
||||
# env.call() works with both SyncVectorEnv and AsyncVectorEnv.
|
||||
try:
|
||||
observation["task"] = list(env.call("task_description"))
|
||||
except Exception:
|
||||
try:
|
||||
observation["task"] = list(env.call("task"))
|
||||
except Exception:
|
||||
observation["task"] = [""] * env.num_envs
|
||||
|
||||
# Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO)
|
||||
observation = env_preprocessor(observation)
|
||||
@@ -193,14 +207,13 @@ def rollout(
|
||||
|
||||
# VectorEnv stores is_success in `info["final_info"][env_index]["is_success"]`. "final_info" isn't
|
||||
# available if none of the envs finished.
|
||||
if "final_info" in info:
|
||||
final_info = info["final_info"]
|
||||
if not isinstance(final_info, dict):
|
||||
raise RuntimeError(
|
||||
"Unsupported `final_info` format: expected dict (Gymnasium >= 1.0). "
|
||||
"You're likely using an older version of gymnasium (< 1.0). Please upgrade."
|
||||
)
|
||||
successes = final_info["is_success"].tolist()
|
||||
if "final_info" in info and isinstance(info["final_info"], dict):
|
||||
successes = info["final_info"]["is_success"].tolist()
|
||||
elif "is_success" in info:
|
||||
is_success = info["is_success"]
|
||||
successes = (
|
||||
is_success.tolist() if hasattr(is_success, "tolist") else [bool(is_success)] * env.num_envs
|
||||
)
|
||||
else:
|
||||
successes = [False] * env.num_envs
|
||||
|
||||
@@ -549,6 +562,14 @@ def eval_main(cfg: EvalPipelineConfig):
|
||||
# Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments)
|
||||
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)
|
||||
|
||||
# Sharding: each shard runs a subset of n_episodes with non-overlapping seeds.
|
||||
shard_id = cfg.eval.shard_id
|
||||
num_shards = cfg.eval.num_shards
|
||||
episodes_for_shard = _shard_episodes(cfg.eval.n_episodes, shard_id, num_shards)
|
||||
n_per_shard = len(episodes_for_shard)
|
||||
# Shift the seed so each shard gets a different, non-overlapping seed range.
|
||||
shard_seed = (cfg.seed or 0) + shard_id * math.ceil(cfg.eval.n_episodes / num_shards)
|
||||
|
||||
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
|
||||
info = eval_policy_all(
|
||||
envs=envs,
|
||||
@@ -557,10 +578,10 @@ def eval_main(cfg: EvalPipelineConfig):
|
||||
env_postprocessor=env_postprocessor,
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
n_episodes=cfg.eval.n_episodes,
|
||||
n_episodes=n_per_shard,
|
||||
max_episodes_rendered=10,
|
||||
videos_dir=Path(cfg.output_dir) / "videos",
|
||||
start_seed=cfg.seed,
|
||||
start_seed=shard_seed,
|
||||
max_parallel_tasks=cfg.env.max_parallel_tasks,
|
||||
)
|
||||
print("Overall Aggregated Metrics:")
|
||||
@@ -573,8 +594,13 @@ def eval_main(cfg: EvalPipelineConfig):
|
||||
# Close all vec envs
|
||||
close_envs(envs)
|
||||
|
||||
# Save info
|
||||
with open(Path(cfg.output_dir) / "eval_info.json", "w") as f:
|
||||
# Save info — use shard-specific filename when running in parallel mode.
|
||||
if num_shards > 1:
|
||||
out_path = Path(cfg.output_dir) / f"shard_{shard_id}_of_{num_shards}.json"
|
||||
else:
|
||||
out_path = Path(cfg.output_dir) / "eval_info.json"
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(out_path, "w") as f:
|
||||
json.dump(info, f, indent=2)
|
||||
|
||||
logging.info("End of eval")
|
||||
@@ -734,39 +760,58 @@ def eval_policy_all(
|
||||
group_acc[group]["video_paths"].extend(paths)
|
||||
overall["video_paths"].extend(paths)
|
||||
|
||||
def _make_thread_policy(p: PreTrainedPolicy) -> PreTrainedPolicy:
|
||||
"""Shallow copy sharing weight tensors, with independent per-thread state.
|
||||
|
||||
copy.copy() gives a new Python object whose _parameters dict is a shared
|
||||
reference (same tensor storage, zero extra VRAM). reset() then rebinds
|
||||
mutable state (action queues etc.) to fresh per-thread objects.
|
||||
|
||||
Note: does NOT work for ACT with temporal_ensemble_coeff — that policy's
|
||||
reset() mutates a shared sub-object. Use max_parallel_tasks=1 for that config.
|
||||
"""
|
||||
thread_p = copy.copy(p)
|
||||
thread_p.reset()
|
||||
return thread_p
|
||||
|
||||
# Choose runner (sequential vs threaded)
|
||||
task_runner = partial(
|
||||
run_one,
|
||||
policy=policy,
|
||||
env_preprocessor=env_preprocessor,
|
||||
env_postprocessor=env_postprocessor,
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
n_episodes=n_episodes,
|
||||
max_episodes_rendered=max_episodes_rendered,
|
||||
videos_dir=videos_dir,
|
||||
return_episode_data=return_episode_data,
|
||||
start_seed=start_seed,
|
||||
)
|
||||
_runner_kwargs = {
|
||||
"env_preprocessor": env_preprocessor,
|
||||
"env_postprocessor": env_postprocessor,
|
||||
"preprocessor": preprocessor,
|
||||
"postprocessor": postprocessor,
|
||||
"n_episodes": n_episodes,
|
||||
"max_episodes_rendered": max_episodes_rendered,
|
||||
"videos_dir": videos_dir,
|
||||
"return_episode_data": return_episode_data,
|
||||
"start_seed": start_seed,
|
||||
}
|
||||
|
||||
if max_parallel_tasks <= 1:
|
||||
# sequential path (single accumulator path on the main thread)
|
||||
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
|
||||
for task_group, task_id, env in tasks:
|
||||
tg, tid, metrics = task_runner(task_group, task_id, env)
|
||||
_accumulate_to(tg, metrics)
|
||||
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
|
||||
try:
|
||||
tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs)
|
||||
_accumulate_to(tg, metrics)
|
||||
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
|
||||
finally:
|
||||
env.close()
|
||||
else:
|
||||
# threaded path: submit all tasks, consume completions on main thread and accumulate there
|
||||
# threaded path: each thread gets a shallow policy copy (shared weights, independent state)
|
||||
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
|
||||
fut2meta = {}
|
||||
for task_group, task_id, env in tasks:
|
||||
fut = executor.submit(task_runner, task_group, task_id, env)
|
||||
fut2meta[fut] = (task_group, task_id)
|
||||
fut = executor.submit(
|
||||
run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs
|
||||
)
|
||||
fut2meta[fut] = (task_group, task_id, env)
|
||||
for fut in cf.as_completed(fut2meta):
|
||||
tg, tid, metrics = fut.result()
|
||||
_accumulate_to(tg, metrics)
|
||||
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
|
||||
tg, tid, env = fut2meta[fut]
|
||||
try:
|
||||
tg, tid, metrics = fut.result()
|
||||
_accumulate_to(tg, metrics)
|
||||
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
|
||||
finally:
|
||||
env.close()
|
||||
|
||||
# compute aggregated metrics helper (robust to lists/scalars)
|
||||
def _agg_from_list(xs):
|
||||
|
||||
@@ -0,0 +1,249 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Probe hardware and recommend optimal lerobot-eval-parallel flags.
|
||||
|
||||
Run standalone:
|
||||
lerobot-eval-autotune --policy.path=lerobot/smolvla_libero --env.type=libero
|
||||
|
||||
Or called programmatically from lerobot_eval_parallel when --num-shards auto.
|
||||
|
||||
Steps:
|
||||
1. Probe GPU VRAM and CPU core count.
|
||||
2. Measure model VRAM footprint (load policy, delta of cuda.memory_allocated).
|
||||
3. Compute max shards limited by VRAM (85% of total).
|
||||
4. Probe env step time (optional, skipped when skip_timing=True).
|
||||
5. Probe inference time (optional, skipped when skip_timing=True).
|
||||
6. Derive num_shards = min(vram_limit, saturation_shards).
|
||||
7. Choose MUJOCO_GL (egl vs osmesa) based on remaining VRAM headroom.
|
||||
8. Compute batch_size = max(4, min(floor(cpu_cores * 0.8 / num_shards), 64)).
|
||||
9. Print paste-ready command.
|
||||
"""
|
||||
|
||||
import math
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class AutotuneRecommendation:
|
||||
num_shards: int
|
||||
batch_size: int
|
||||
mujoco_gl: str
|
||||
use_amp: bool
|
||||
# Probed values
|
||||
gpu_name: str
|
||||
vram_gb: float
|
||||
cpu_cores: int
|
||||
model_gb: float
|
||||
env_step_ms: float | None
|
||||
infer_ms: float | None
|
||||
|
||||
|
||||
_DEFAULT_ENV_STEP_MS = 22.0 # LIBERO on GPU, typical value
|
||||
_DEFAULT_INFER_MS = 5.0 # SmolVLA fp16 on H100
|
||||
|
||||
|
||||
def _probe_gpu() -> tuple[str, float]:
|
||||
"""Return (gpu_name, vram_gb). Falls back to CPU sentinel on non-CUDA systems."""
|
||||
try:
|
||||
import torch
|
||||
|
||||
if not torch.cuda.is_available():
|
||||
return "CPU (no CUDA)", 0.0
|
||||
props = torch.cuda.get_device_properties(0)
|
||||
return props.name, props.total_memory / (1024**3)
|
||||
except Exception:
|
||||
return "unknown", 0.0
|
||||
|
||||
|
||||
def _probe_model_gb(passthrough: list[str]) -> float:
|
||||
"""Load the policy (from --policy.path) and measure VRAM delta. Returns GB."""
|
||||
# Extract policy path from passthrough args
|
||||
policy_path = None
|
||||
for tok in passthrough:
|
||||
if tok.startswith("policy.path="):
|
||||
policy_path = tok.split("=", 1)[1]
|
||||
break
|
||||
if tok.startswith("--policy.path="):
|
||||
policy_path = tok.split("=", 1)[1]
|
||||
break
|
||||
if policy_path is None:
|
||||
return 0.0
|
||||
|
||||
try:
|
||||
import torch
|
||||
|
||||
from lerobot.policies.factory import make_policy
|
||||
from lerobot.policies.pretrained import PreTrainedConfig
|
||||
|
||||
if not torch.cuda.is_available():
|
||||
return 0.0
|
||||
torch.cuda.synchronize()
|
||||
before = torch.cuda.memory_allocated(0)
|
||||
cfg = PreTrainedConfig.from_pretrained(policy_path)
|
||||
cfg.pretrained_path = policy_path # type: ignore[assignment]
|
||||
policy = make_policy(cfg=cfg)
|
||||
policy.eval()
|
||||
torch.cuda.synchronize()
|
||||
after = torch.cuda.memory_allocated(0)
|
||||
del policy
|
||||
torch.cuda.empty_cache()
|
||||
return (after - before) / (1024**3)
|
||||
except Exception as e:
|
||||
print(f"[autotune] could not measure model VRAM: {e}", file=sys.stderr)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _probe_env_step_ms(passthrough: list[str], batch_size: int = 8, n_steps: int = 30) -> float | None:
|
||||
"""Run a short env warmup and return median step latency in ms. Returns None on failure."""
|
||||
try:
|
||||
import numpy as np
|
||||
|
||||
from lerobot.envs.factory import make_env
|
||||
|
||||
# Parse env config from passthrough using lerobot's own parser
|
||||
env_type = None
|
||||
for tok in passthrough:
|
||||
if tok.startswith("env.type=") or tok.startswith("--env.type="):
|
||||
env_type = tok.split("=", 1)[1]
|
||||
break
|
||||
if env_type is None:
|
||||
return None
|
||||
|
||||
# Minimal env config
|
||||
from lerobot.envs.factory import make_env_config
|
||||
|
||||
env_cfg = make_env_config(env_type)
|
||||
envs = make_env(env_cfg, n_envs=batch_size, use_async_envs=(batch_size > 1))
|
||||
# Get first vec env
|
||||
first_suite = next(iter(envs.values()))
|
||||
env = next(iter(first_suite.values()))
|
||||
|
||||
env.reset()
|
||||
dummy_action = np.zeros((batch_size, env.single_action_space.shape[0]))
|
||||
timings = []
|
||||
for _ in range(n_steps):
|
||||
t0 = time.perf_counter()
|
||||
env.step(dummy_action)
|
||||
timings.append((time.perf_counter() - t0) * 1000)
|
||||
env.close()
|
||||
return float(np.median(timings))
|
||||
except Exception as e:
|
||||
print(f"[autotune] env step probe failed: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def probe_and_recommend(
|
||||
passthrough: list[str],
|
||||
skip_timing: bool = False,
|
||||
) -> AutotuneRecommendation:
|
||||
"""Probe hardware + model and return the recommended configuration."""
|
||||
gpu_name, vram_gb = _probe_gpu()
|
||||
cpu_cores = os.cpu_count() or 4
|
||||
|
||||
# Model footprint
|
||||
model_gb = _probe_model_gb(passthrough)
|
||||
if model_gb == 0.0:
|
||||
# Unknown model: assume a conservative 14 GB (SmolVLA fp16) as placeholder
|
||||
model_gb = 14.0
|
||||
print("[autotune] model size unknown, assuming 14 GB (SmolVLA fp16)", file=sys.stderr)
|
||||
|
||||
# Max shards from VRAM (leave 15% headroom for activations + env frames)
|
||||
max_shards_vram = max(1, math.floor(vram_gb * 0.85 / model_gb)) if vram_gb > 0 else 1
|
||||
|
||||
# Timing probes
|
||||
env_step_ms: float | None = None
|
||||
infer_ms: float | None = None
|
||||
if not skip_timing:
|
||||
env_step_ms = _probe_env_step_ms(passthrough)
|
||||
# Inference time: assume ~infer = env_step / saturation_factor heuristic
|
||||
# Full probe would require loading policy — skip for now to stay fast.
|
||||
infer_ms = _DEFAULT_INFER_MS
|
||||
|
||||
# Number of shards to saturate GPU: ceil(env_step / infer)
|
||||
_step = env_step_ms or _DEFAULT_ENV_STEP_MS
|
||||
_infer = infer_ms or _DEFAULT_INFER_MS
|
||||
saturation_shards = max(1, math.ceil(_step / _infer))
|
||||
|
||||
num_shards = min(max_shards_vram, saturation_shards)
|
||||
|
||||
# Rendering mode: EGL if all model copies + env frame buffers fit in VRAM
|
||||
env_vram_per_shard_gb = 0.01 # ~10 MB overhead per env batch
|
||||
total_with_egl = num_shards * (model_gb + env_vram_per_shard_gb)
|
||||
mujoco_gl = "egl" if (vram_gb == 0 or total_with_egl < vram_gb * 0.85) else "osmesa"
|
||||
|
||||
# Batch size: fill CPU cores evenly across shards
|
||||
batch_size = max(4, min(math.floor(cpu_cores * 0.8 / num_shards), 64))
|
||||
|
||||
# Recommend AMP when model is large (saves ~50% VRAM)
|
||||
use_amp = model_gb > 8.0
|
||||
|
||||
return AutotuneRecommendation(
|
||||
num_shards=num_shards,
|
||||
batch_size=batch_size,
|
||||
mujoco_gl=mujoco_gl,
|
||||
use_amp=use_amp,
|
||||
gpu_name=gpu_name,
|
||||
vram_gb=vram_gb,
|
||||
cpu_cores=cpu_cores,
|
||||
model_gb=model_gb,
|
||||
env_step_ms=env_step_ms,
|
||||
infer_ms=infer_ms,
|
||||
)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> None:
|
||||
passthrough = argv if argv is not None else sys.argv[1:]
|
||||
|
||||
rec = probe_and_recommend(passthrough)
|
||||
|
||||
env_step_str = (
|
||||
f"{rec.env_step_ms:.0f}ms" if rec.env_step_ms else f"~{_DEFAULT_ENV_STEP_MS:.0f}ms (estimated)"
|
||||
)
|
||||
infer_str = f"{rec.infer_ms:.0f}ms" if rec.infer_ms else f"~{_DEFAULT_INFER_MS:.0f}ms (estimated)"
|
||||
|
||||
print()
|
||||
print(
|
||||
f"GPU: {rec.gpu_name} | VRAM: {rec.vram_gb:.1f} GB | CPU cores: {rec.cpu_cores} | Model: {rec.model_gb:.1f} GB"
|
||||
)
|
||||
print()
|
||||
print(f" env_step_ms: {env_step_str} | infer_ms: {infer_str}")
|
||||
print()
|
||||
print(f" num_shards: {rec.num_shards}")
|
||||
print(f" batch_size: {rec.batch_size}")
|
||||
print(f" MUJOCO_GL: {rec.mujoco_gl}")
|
||||
if rec.use_amp:
|
||||
print(" use_amp: true (recommended — halves VRAM, faster matmuls)")
|
||||
print()
|
||||
|
||||
# Build paste-ready command
|
||||
flags = [f"--num-shards {rec.num_shards}", f"eval.batch_size={rec.batch_size}"]
|
||||
if rec.use_amp:
|
||||
flags.append("policy.use_amp=true")
|
||||
flags_str = " \\\n ".join(flags)
|
||||
passthrough_str = " \\\n ".join(passthrough) if passthrough else "[your flags]"
|
||||
|
||||
print(" Paste-ready command:")
|
||||
print(f" MUJOCO_GL={rec.mujoco_gl} lerobot-eval-parallel \\")
|
||||
print(f" {flags_str} \\")
|
||||
print(f" {passthrough_str}")
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,185 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Run lerobot-eval across N independent subprocesses (shards) for maximum GPU utilization.
|
||||
|
||||
Each shard handles a disjoint subset of episodes and writes its own JSON results file.
|
||||
Results are merged and printed when all shards complete.
|
||||
|
||||
Usage:
|
||||
lerobot-eval-parallel --num-shards 4 [any lerobot-eval flags]
|
||||
lerobot-eval-parallel --num-shards auto [any lerobot-eval flags]
|
||||
lerobot-eval-parallel --num-shards auto --render-device cpu [any lerobot-eval flags]
|
||||
|
||||
--num-shards auto:
|
||||
Calls lerobot-eval-autotune to probe hardware and determine the optimal number of shards.
|
||||
|
||||
--render-device gpu|cpu|auto:
|
||||
Controls MUJOCO_GL env var. 'gpu' -> EGL (faster, ~3ms/frame, ~200KB VRAM/env).
|
||||
'cpu' -> osmesa (slower, ~12ms/frame, 0 VRAM). 'auto' picks based on VRAM headroom.
|
||||
Default: auto.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _parse_known(argv: list[str]) -> tuple[argparse.Namespace, list[str]]:
|
||||
p = argparse.ArgumentParser(add_help=False)
|
||||
p.add_argument("--num-shards", default="1")
|
||||
p.add_argument("--render-device", choices=["gpu", "cpu", "auto"], default="auto")
|
||||
p.add_argument("--output-dir", default=None)
|
||||
return p.parse_known_args(argv)
|
||||
|
||||
|
||||
def _resolve_num_shards(num_shards_str: str, passthrough: list[str]) -> int:
|
||||
if num_shards_str == "auto":
|
||||
from lerobot.scripts.lerobot_eval_autotune import probe_and_recommend
|
||||
|
||||
rec = probe_and_recommend(passthrough)
|
||||
print(
|
||||
f"[autotune] recommended num_shards={rec.num_shards}, batch_size={rec.batch_size}, MUJOCO_GL={rec.mujoco_gl}"
|
||||
)
|
||||
return rec.num_shards
|
||||
return int(num_shards_str)
|
||||
|
||||
|
||||
def _resolve_mujoco_gl(render_device: str, num_shards: int, passthrough: list[str]) -> str:
|
||||
if render_device == "gpu":
|
||||
return "egl"
|
||||
if render_device == "cpu":
|
||||
return "osmesa"
|
||||
# auto: use EGL for single shard; for multiple shards check VRAM headroom
|
||||
if num_shards == 1:
|
||||
return "egl"
|
||||
try:
|
||||
from lerobot.scripts.lerobot_eval_autotune import probe_and_recommend
|
||||
|
||||
rec = probe_and_recommend(passthrough, skip_timing=True)
|
||||
return rec.mujoco_gl
|
||||
except Exception:
|
||||
# Conservative fallback: osmesa avoids EGL VRAM contention
|
||||
return "osmesa"
|
||||
|
||||
|
||||
def _extract_output_dir(passthrough: list[str]) -> str | None:
|
||||
for tok in passthrough:
|
||||
if tok.startswith("--output-dir="):
|
||||
return tok.split("=", 1)[1]
|
||||
if tok == "--output-dir":
|
||||
idx = passthrough.index(tok)
|
||||
if idx + 1 < len(passthrough):
|
||||
return passthrough[idx + 1]
|
||||
return None
|
||||
|
||||
|
||||
def _merge_shards(output_dir: str, num_shards: int) -> dict:
|
||||
"""Merge per-shard JSON files into a single result dict and write eval_info.json."""
|
||||
all_per_task: list[dict] = []
|
||||
per_group: dict[str, dict] = {}
|
||||
|
||||
for k in range(num_shards):
|
||||
shard_path = Path(output_dir) / f"shard_{k}_of_{num_shards}.json"
|
||||
if not shard_path.exists():
|
||||
print(f"[warning] shard file not found: {shard_path}", file=sys.stderr)
|
||||
continue
|
||||
with open(shard_path) as f:
|
||||
shard = json.load(f)
|
||||
all_per_task.extend(shard.get("per_task", []))
|
||||
for group, metrics in shard.get("per_group", {}).items():
|
||||
if group not in per_group:
|
||||
per_group[group] = {"sum_rewards": [], "max_rewards": [], "successes": []}
|
||||
for key in ("sum_rewards", "max_rewards", "successes"):
|
||||
# metrics may store aggregates; reconstruct lists if possible
|
||||
per_group[group][key].extend(metrics.get(key, []))
|
||||
|
||||
# Re-aggregate
|
||||
import numpy as np
|
||||
|
||||
def _nanmean(xs: list) -> float:
|
||||
return float(np.nanmean(xs)) if xs else float("nan")
|
||||
|
||||
groups_out = {}
|
||||
all_sr, all_mr, all_succ = [], [], []
|
||||
for group, acc in per_group.items():
|
||||
groups_out[group] = {
|
||||
"avg_sum_reward": _nanmean(acc["sum_rewards"]),
|
||||
"avg_max_reward": _nanmean(acc["max_rewards"]),
|
||||
"pc_success": _nanmean(acc["successes"]) * 100 if acc["successes"] else float("nan"),
|
||||
"n_episodes": len(acc["sum_rewards"]),
|
||||
}
|
||||
all_sr.extend(acc["sum_rewards"])
|
||||
all_mr.extend(acc["max_rewards"])
|
||||
all_succ.extend(acc["successes"])
|
||||
|
||||
overall = {
|
||||
"avg_sum_reward": _nanmean(all_sr),
|
||||
"avg_max_reward": _nanmean(all_mr),
|
||||
"pc_success": _nanmean(all_succ) * 100 if all_succ else float("nan"),
|
||||
"n_episodes": len(all_sr),
|
||||
}
|
||||
|
||||
merged = {"per_task": all_per_task, "per_group": groups_out, "overall": overall}
|
||||
out_path = Path(output_dir) / "eval_info.json"
|
||||
with open(out_path, "w") as f:
|
||||
json.dump(merged, f, indent=2)
|
||||
return merged
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> None:
|
||||
args, passthrough = _parse_known(argv if argv is not None else sys.argv[1:])
|
||||
|
||||
num_shards = _resolve_num_shards(args.num_shards, passthrough)
|
||||
mujoco_gl = _resolve_mujoco_gl(args.render_device, num_shards, passthrough)
|
||||
|
||||
output_dir = args.output_dir or _extract_output_dir(passthrough)
|
||||
|
||||
print(f"[lerobot-eval-parallel] launching {num_shards} shard(s), MUJOCO_GL={mujoco_gl}")
|
||||
|
||||
child_env = {**os.environ, "MUJOCO_GL": mujoco_gl, "OMP_NUM_THREADS": "1"}
|
||||
|
||||
procs = []
|
||||
for k in range(num_shards):
|
||||
cmd = [
|
||||
sys.executable,
|
||||
"-m",
|
||||
"lerobot.scripts.lerobot_eval",
|
||||
f"eval.shard_id={k}",
|
||||
f"eval.num_shards={num_shards}",
|
||||
*passthrough,
|
||||
]
|
||||
if output_dir:
|
||||
# Each shard shares the same output_dir; shard files are named shard_K_of_N.json
|
||||
cmd.append(f"output_dir={output_dir}")
|
||||
procs.append(subprocess.Popen(cmd, env=child_env))
|
||||
|
||||
return_codes = [p.wait() for p in procs]
|
||||
if any(rc != 0 for rc in return_codes):
|
||||
failed = [k for k, rc in enumerate(return_codes) if rc != 0]
|
||||
print(f"[lerobot-eval-parallel] shards {failed} failed with non-zero exit codes.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if output_dir and num_shards > 1:
|
||||
merged = _merge_shards(output_dir, num_shards)
|
||||
print("\n=== Merged Results ===")
|
||||
print(json.dumps(merged["overall"], indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,143 @@
|
||||
"""Tests for the benchmark dispatch refactor (create_envs / get_env_processors on EnvConfig)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import gymnasium as gym
|
||||
import pytest
|
||||
from gymnasium.envs.registration import register, registry as gym_registry
|
||||
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
from lerobot.envs.configs import EnvConfig
|
||||
from lerobot.envs.factory import make_env, make_env_config, make_env_pre_post_processors
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_registry_all_types():
|
||||
"""make_env_config should resolve every registered EnvConfig subclass via the registry."""
|
||||
known = list(EnvConfig.get_known_choices().keys())
|
||||
assert len(known) >= 6
|
||||
for t in known:
|
||||
cfg = make_env_config(t)
|
||||
if not isinstance(cfg, EnvConfig):
|
||||
continue
|
||||
assert cfg.type == t
|
||||
|
||||
|
||||
def test_unknown_type():
|
||||
with pytest.raises(ValueError, match="not registered"):
|
||||
make_env_config("nonexistent")
|
||||
|
||||
|
||||
def test_identity_processors():
|
||||
"""Base class get_env_processors() returns identity pipelines."""
|
||||
cfg = make_env_config("aloha")
|
||||
pre, post = cfg.get_env_processors()
|
||||
assert len(pre.steps) == 0 and len(post.steps) == 0
|
||||
|
||||
|
||||
def test_delegation():
|
||||
"""make_env() should call cfg.create_envs(), not use if/elif dispatch."""
|
||||
sentinel = {"delegated": {0: "marker"}}
|
||||
fake = type(
|
||||
"Fake",
|
||||
(),
|
||||
{
|
||||
"hub_path": None,
|
||||
"create_envs": lambda self, n_envs, use_async_envs=False: sentinel,
|
||||
},
|
||||
)()
|
||||
result = make_env(fake, n_envs=1)
|
||||
assert result is sentinel
|
||||
|
||||
|
||||
def test_processors_delegation():
|
||||
"""make_env_pre_post_processors delegates to cfg.get_env_processors()."""
|
||||
cfg = make_env_config("aloha")
|
||||
pre, post = make_env_pre_post_processors(cfg, policy_cfg=None)
|
||||
assert len(pre.steps) == 0
|
||||
|
||||
|
||||
def test_base_create_envs():
|
||||
"""Base class create_envs() should build a single-task VectorEnv via gym.make()."""
|
||||
gym_id = "_dispatch_test/CartPole-v99"
|
||||
if gym_id not in gym_registry:
|
||||
register(id=gym_id, entry_point="gymnasium.envs.classic_control:CartPoleEnv")
|
||||
|
||||
@EnvConfig.register_subclass("_dispatch_base_test")
|
||||
@dataclass
|
||||
class _Env(EnvConfig):
|
||||
task: str = "CartPole-v99"
|
||||
fps: int = 10
|
||||
features: dict[str, PolicyFeature] = field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def package_name(self):
|
||||
return "_dispatch_test"
|
||||
|
||||
@property
|
||||
def gym_id(self):
|
||||
return gym_id
|
||||
|
||||
@property
|
||||
def gym_kwargs(self):
|
||||
return {}
|
||||
|
||||
try:
|
||||
envs = _Env().create_envs(n_envs=2)
|
||||
assert "_dispatch_base_test" in envs
|
||||
env = envs["_dispatch_base_test"][0]
|
||||
assert isinstance(env, gym.vector.VectorEnv)
|
||||
assert env.num_envs == 2
|
||||
env.close()
|
||||
finally:
|
||||
if gym_id in gym_registry:
|
||||
del gym_registry[gym_id]
|
||||
|
||||
|
||||
def test_custom_create_envs_override():
|
||||
"""A custom EnvConfig subclass can override create_envs()."""
|
||||
mock_vec = gym.vector.SyncVectorEnv([lambda: gym.make("CartPole-v1")])
|
||||
|
||||
@EnvConfig.register_subclass("_dispatch_custom_test")
|
||||
@dataclass
|
||||
class _Env(EnvConfig):
|
||||
task: str = "x"
|
||||
features: dict[str, PolicyFeature] = field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def gym_kwargs(self):
|
||||
return {}
|
||||
|
||||
def create_envs(self, n_envs, use_async_envs=False):
|
||||
return {"custom_suite": {0: mock_vec}}
|
||||
|
||||
try:
|
||||
result = make_env(_Env(), n_envs=1)
|
||||
assert "custom_suite" in result
|
||||
finally:
|
||||
mock_vec.close()
|
||||
|
||||
|
||||
def test_custom_get_env_processors_override():
|
||||
"""A custom EnvConfig subclass can override get_env_processors()."""
|
||||
from lerobot.processor.pipeline import DataProcessorPipeline
|
||||
|
||||
@EnvConfig.register_subclass("_dispatch_proc_test")
|
||||
@dataclass
|
||||
class _Env(EnvConfig):
|
||||
task: str = "x"
|
||||
features: dict[str, PolicyFeature] = field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def gym_kwargs(self):
|
||||
return {}
|
||||
|
||||
def get_env_processors(self):
|
||||
return DataProcessorPipeline(steps=[]), DataProcessorPipeline(steps=[])
|
||||
|
||||
pre, post = _Env().get_env_processors()
|
||||
assert isinstance(pre, DataProcessorPipeline)
|
||||
@@ -189,6 +189,30 @@ def test_list_of_strings_tokenization(mock_auto_tokenizer):
|
||||
assert attention_mask.shape == (2, 8)
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
||||
def test_tuple_of_strings_tokenization(mock_auto_tokenizer):
|
||||
"""Test tokenization of a tuple of strings (returned by VectorEnv.call())."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
||||
|
||||
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8)
|
||||
|
||||
transition = create_transition(
|
||||
observation={"state": torch.tensor([1.0, 2.0])},
|
||||
action=torch.tensor([0.1, 0.2]),
|
||||
complementary_data={"task": ("pick up cube", "place on table")},
|
||||
)
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
observation = result[TransitionKey.OBSERVATION]
|
||||
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
|
||||
assert tokens.shape == (2, 8)
|
||||
assert attention_mask.shape == (2, 8)
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
||||
def test_custom_keys(mock_auto_tokenizer):
|
||||
|
||||
Reference in New Issue
Block a user