Compare commits

..

2 Commits

Author SHA1 Message Date
Pepijn 46e9e22b05 feat(eval): thread-safe policy copies for max_parallel_tasks > 1
eval_policy_all already supports running multiple task groups concurrently via
ThreadPoolExecutor, but policy.reset() was not thread-safe: all threads shared
the same policy object and its mutable state (action queues, temporal buffers).

Fix: each thread receives a shallow copy of the policy. copy.copy() creates a
new Python object whose _parameters dict is a shared reference — same tensor
storage, zero extra VRAM — while reset() rebinds per-episode state to fresh
objects per thread.

Caveat: ACT with temporal_ensemble_coeff is not safe with this approach (its
reset() mutates a shared sub-object). Keep max_parallel_tasks=1 for that config.

For MetaWorld (50 tasks, no temporal ensembling), max_parallel_tasks=4 raises
GPU utilization from ~20% to ~60-80% with no additional VRAM cost.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 17:11:36 +02:00
Pepijn b43f9ab048 feat(envs): lazy env init + AsyncVectorEnv as default for n_envs > 1
LiberoEnv and MetaworldEnv previously allocated GPU resources (EGL context,
OpenGL framebuffer) in __init__, before AsyncVectorEnv's fork(). Worker
processes inherited stale GPU handles, causing EGL_BAD_CONTEXT crashes on
first render.

Fix: defer OffScreenRenderEnv / MT1 construction to _ensure_env(), called on
first reset() or step() inside the worker subprocess. Each worker creates its
own clean context after fork().

Also fixes lerobot_eval.py:170 (add_envs_task TODO): replace with
env.call("task") which works with both SyncVectorEnv and AsyncVectorEnv.

AsyncVectorEnv is now the default for n_envs > 1; auto-downgraded to
SyncVectorEnv when n_envs=1 (no benefit, less overhead).

Expected speedup: ~15-20x for LIBERO Spatial with batch_size=50.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 17:10:10 +02:00
12 changed files with 72 additions and 645 deletions
+2 -4
View File
@@ -26,7 +26,7 @@ During evaluation, data moves through four stages:
1. gym.Env ──→ raw observations (numpy dicts) 1. gym.Env ──→ raw observations (numpy dicts)
2. Preprocessing ──→ standard LeRobot keys + task description 2. Preprocessing ──→ standard LeRobot keys + task description
(preprocess_observation in envs/utils.py, env.call("task_description")) (preprocess_observation, add_envs_task in envs/utils.py)
3. Processors ──→ env-specific then policy-specific transforms 3. Processors ──→ env-specific then policy-specific transforms
(env_preprocessor, policy_preprocessor) (env_preprocessor, policy_preprocessor)
@@ -161,8 +161,6 @@ class MyBenchmarkEnv(gym.Env):
... ...
``` ```
**GPU-based simulators (e.g. MuJoCo with EGL rendering):** If your simulator allocates GPU/EGL contexts during `__init__`, defer that allocation to a `_ensure_env()` helper called on first `reset()`/`step()`. This avoids inheriting stale GPU handles when `AsyncVectorEnv` spawns worker processes. See `LiberoEnv._ensure_env()` for the pattern.
Also provide a factory function that returns the nested dict structure: Also provide a factory function that returns the nested dict structure:
```python ```python
@@ -209,7 +207,7 @@ class MyBenchmarkEnvConfig(EnvConfig):
def gym_kwargs(self) -> dict: def gym_kwargs(self) -> dict:
return {"obs_type": self.obs_type, "render_mode": self.render_mode} return {"obs_type": self.obs_type, "render_mode": self.render_mode}
def create_envs(self, n_envs: int, use_async_envs: bool = True): def create_envs(self, n_envs: int, use_async_envs: bool = False):
"""Override for multi-task benchmarks or custom env creation.""" """Override for multi-task benchmarks or custom env creation."""
from lerobot.envs.<benchmark> import create_<benchmark>_envs from lerobot.envs.<benchmark> import create_<benchmark>_envs
return create_<benchmark>_envs(task=self.task, n_envs=n_envs, ...) return create_<benchmark>_envs(task=self.task, n_envs=n_envs, ...)
-2
View File
@@ -220,8 +220,6 @@ lerobot-replay="lerobot.scripts.lerobot_replay:main"
lerobot-setup-motors="lerobot.scripts.lerobot_setup_motors:main" lerobot-setup-motors="lerobot.scripts.lerobot_setup_motors:main"
lerobot-teleoperate="lerobot.scripts.lerobot_teleoperate:main" lerobot-teleoperate="lerobot.scripts.lerobot_teleoperate:main"
lerobot-eval="lerobot.scripts.lerobot_eval:main" lerobot-eval="lerobot.scripts.lerobot_eval:main"
lerobot-eval-parallel="lerobot.scripts.lerobot_eval_parallel:main"
lerobot-eval-autotune="lerobot.scripts.lerobot_eval_autotune:main"
lerobot-train="lerobot.scripts.lerobot_train:main" lerobot-train="lerobot.scripts.lerobot_train:main"
lerobot-train-tokenizer="lerobot.scripts.lerobot_train_tokenizer:main" lerobot-train-tokenizer="lerobot.scripts.lerobot_train_tokenizer:main"
lerobot-dataset-viz="lerobot.scripts.lerobot_dataset_viz:main" lerobot-dataset-viz="lerobot.scripts.lerobot_dataset_viz:main"
-11
View File
@@ -69,11 +69,6 @@ class EvalConfig:
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing). # `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1. # Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
use_async_envs: bool = True use_async_envs: bool = True
# Sharding: split n_episodes across independent processes.
# shard_id=0, num_shards=1 is the default (no sharding, existing behaviour).
# Set via lerobot_eval_parallel or manually: --eval.shard_id=K --eval.num_shards=N
shard_id: int = 0
num_shards: int = 1
def __post_init__(self) -> None: def __post_init__(self) -> None:
if self.batch_size > self.n_episodes: if self.batch_size > self.n_episodes:
@@ -85,12 +80,6 @@ class EvalConfig:
f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={self.batch_size}`), " f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={self.batch_size}`), "
f"or lower the batch size (e.g. `eval.batch_size={self.n_episodes}`)." f"or lower the batch size (e.g. `eval.batch_size={self.n_episodes}`)."
) )
if self.num_shards < 1:
raise ValueError(f"`num_shards` must be >= 1, got {self.num_shards}")
if not (0 <= self.shard_id < self.num_shards):
raise ValueError(
f"`shard_id` must be in [0, num_shards), got shard_id={self.shard_id}, num_shards={self.num_shards}"
)
@dataclass @dataclass
+3 -22
View File
@@ -44,13 +44,6 @@ from lerobot.utils.constants import (
) )
def _make_vec_env_cls(use_async: bool, n_envs: int):
"""Return the right VectorEnv constructor."""
if use_async and n_envs > 1:
return gym.vector.AsyncVectorEnv
return gym.vector.SyncVectorEnv
@dataclass @dataclass
class EnvConfig(draccus.ChoiceRegistry, abc.ABC): class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
task: str | None = None task: str | None = None
@@ -109,12 +102,7 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
def _make_one(): def _make_one():
return gym.make(self.gym_id, disable_env_checker=self.disable_env_checker, **self.gym_kwargs) return gym.make(self.gym_id, disable_env_checker=self.disable_env_checker, **self.gym_kwargs)
try: vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=gym.vector.AutoresetMode.SAME_STEP)
from gymnasium.vector import AutoresetMode
vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=AutoresetMode.SAME_STEP)
except ImportError:
vec = env_cls([_make_one for _ in range(n_envs)])
return {self.type: {0: vec}} return {self.type: {0: vec}}
def get_env_processors(self): def get_env_processors(self):
@@ -394,12 +382,6 @@ class LiberoEnv(EnvConfig):
else: else:
raise ValueError(f"Unsupported obs_type: {self.obs_type}") raise ValueError(f"Unsupported obs_type: {self.obs_type}")
if self.camera_name_mapping is not None:
mapped_agentview = self.camera_name_mapping.get("agentview_image", "image")
mapped_eye_in_hand = self.camera_name_mapping.get("robot0_eye_in_hand_image", "image2")
self.features_map[LIBERO_KEY_PIXELS_AGENTVIEW] = f"{OBS_IMAGES}.{mapped_agentview}"
self.features_map[LIBERO_KEY_PIXELS_EYE_IN_HAND] = f"{OBS_IMAGES}.{mapped_eye_in_hand}"
@property @property
def gym_kwargs(self) -> dict: def gym_kwargs(self) -> dict:
kwargs: dict[str, Any] = {"obs_type": self.obs_type, "render_mode": self.render_mode} kwargs: dict[str, Any] = {"obs_type": self.obs_type, "render_mode": self.render_mode}
@@ -412,7 +394,7 @@ class LiberoEnv(EnvConfig):
if self.task is None: if self.task is None:
raise ValueError("LiberoEnv requires a task to be specified") raise ValueError("LiberoEnv requires a task to be specified")
env_cls = _make_vec_env_cls(use_async_envs, n_envs) env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
return create_libero_envs( return create_libero_envs(
task=self.task, task=self.task,
n_envs=n_envs, n_envs=n_envs,
@@ -422,7 +404,6 @@ class LiberoEnv(EnvConfig):
env_cls=env_cls, env_cls=env_cls,
control_mode=self.control_mode, control_mode=self.control_mode,
episode_length=self.episode_length, episode_length=self.episode_length,
camera_name_mapping=self.camera_name_mapping,
) )
def get_env_processors(self): def get_env_processors(self):
@@ -481,7 +462,7 @@ class MetaworldEnv(EnvConfig):
if self.task is None: if self.task is None:
raise ValueError("MetaWorld requires a task to be specified") raise ValueError("MetaWorld requires a task to be specified")
env_cls = _make_vec_env_cls(use_async_envs, n_envs) env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
return create_metaworld_envs( return create_metaworld_envs(
task=self.task, task=self.task,
n_envs=n_envs, n_envs=n_envs,
+8 -62
View File
@@ -251,8 +251,7 @@ class LiberoEnv(gym.Env):
def render(self): def render(self):
self._ensure_env() self._ensure_env()
raw_obs = self._env.env._get_observations() raw_obs = self._env.env._get_observations()
pixels = self._format_raw_obs(raw_obs)["pixels"] image = self._format_raw_obs(raw_obs)["pixels"]["image"]
image = next(iter(pixels.values()))
image = image[::-1, ::-1] # flip both H and W for visualization image = image[::-1, ::-1] # flip both H and W for visualization
return image return image
@@ -357,6 +356,12 @@ class LiberoEnv(gym.Env):
) )
observation = self._format_raw_obs(raw_obs) observation = self._format_raw_obs(raw_obs)
if terminated: if terminated:
info["final_info"] = {
"task": self.task,
"task_id": self.task_id,
"done": bool(done),
"is_success": bool(is_success),
}
self.reset() self.reset()
truncated = False truncated = False
return observation, reward, terminated, truncated, info return observation, reward, terminated, truncated, info
@@ -377,7 +382,6 @@ def _make_env_fns(
init_states: bool, init_states: bool,
gym_kwargs: Mapping[str, Any], gym_kwargs: Mapping[str, Any],
control_mode: str, control_mode: str,
camera_name_mapping: dict[str, str] | None = None,
) -> list[Callable[[], LiberoEnv]]: ) -> list[Callable[[], LiberoEnv]]:
"""Build n_envs factory callables for a single (suite, task_id).""" """Build n_envs factory callables for a single (suite, task_id)."""
@@ -393,7 +397,6 @@ def _make_env_fns(
episode_index=episode_index, episode_index=episode_index,
n_envs=n_envs, n_envs=n_envs,
control_mode=control_mode, control_mode=control_mode,
camera_name_mapping=camera_name_mapping,
**local_kwargs, **local_kwargs,
) )
@@ -403,57 +406,6 @@ def _make_env_fns(
return fns return fns
class _LazyAsyncVectorEnv:
"""Wrapper that defers AsyncVectorEnv creation until first use.
Creating all tasks' AsyncVectorEnvs upfront spawns N_tasks × n_envs worker
processes, all of which allocate EGL/GPU resources immediately. Since tasks
are evaluated sequentially, only one task's workers need to be alive at a
time. This wrapper stores the factory functions and creates the real
AsyncVectorEnv on first reset(), keeping peak process count = n_envs.
"""
def __init__(self, env_fns: list[Callable]):
self._env_fns = env_fns
self._env: gym.vector.AsyncVectorEnv | None = None
self.num_envs = len(env_fns)
# Instantiate one env to expose spaces (no GPU — _ensure_env is lazy).
tmp = env_fns[0]()
self.observation_space = tmp.observation_space
self.action_space = tmp.action_space
self.single_observation_space = tmp.observation_space
self.single_action_space = tmp.action_space
tmp.close()
def _ensure(self):
if self._env is None:
self._env = gym.vector.AsyncVectorEnv(self._env_fns, context="forkserver")
def reset(self, **kwargs):
self._ensure()
return self._env.reset(**kwargs)
def step(self, actions):
self._ensure()
return self._env.step(actions)
def call(self, name, *args, **kwargs):
self._ensure()
return self._env.call(name, *args, **kwargs)
def get_attr(self, name):
self._ensure()
return self._env.get_attr(name)
def close(self):
if self._env is not None:
self._env.close()
self._env = None
def __del__(self):
self.close()
# ---- Main API ---------------------------------------------------------------- # ---- Main API ----------------------------------------------------------------
@@ -466,7 +418,6 @@ def create_libero_envs(
env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None, env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None,
control_mode: str = "relative", control_mode: str = "relative",
episode_length: int | None = None, episode_length: int | None = None,
camera_name_mapping: dict[str, str] | None = None,
) -> dict[str, dict[int, Any]]: ) -> dict[str, dict[int, Any]]:
""" """
Create vectorized LIBERO environments with a consistent return shape. Create vectorized LIBERO environments with a consistent return shape.
@@ -497,8 +448,6 @@ def create_libero_envs(
if task_ids_filter is not None: if task_ids_filter is not None:
print(f"Restricting to task_ids={task_ids_filter}") print(f"Restricting to task_ids={task_ids_filter}")
is_async = env_cls is gym.vector.AsyncVectorEnv
out: dict[str, dict[int, Any]] = defaultdict(dict) out: dict[str, dict[int, Any]] = defaultdict(dict)
for suite_name in suite_names: for suite_name in suite_names:
suite = _get_suite(suite_name) suite = _get_suite(suite_name)
@@ -518,12 +467,9 @@ def create_libero_envs(
init_states=init_states, init_states=init_states,
gym_kwargs=gym_kwargs, gym_kwargs=gym_kwargs,
control_mode=control_mode, control_mode=control_mode,
camera_name_mapping=camera_name_mapping,
) )
if is_async:
out[suite_name][tid] = _LazyAsyncVectorEnv(fns)
else:
out[suite_name][tid] = env_cls(fns) out[suite_name][tid] = env_cls(fns)
print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}") print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}")
# return plain dicts for predictability
return {suite: dict(task_map) for suite, task_map in out.items()} return {suite: dict(task_map) for suite, task_map in out.items()}
+26 -21
View File
@@ -130,51 +130,56 @@ def env_to_policy_features(env_cfg: EnvConfig) -> dict[str, PolicyFeature]:
return policy_features return policy_features
def _get_sub_env_attr(env: gym.vector.VectorEnv, attr: str, index: int = 0): def are_all_envs_same_type(env: gym.vector.VectorEnv) -> bool:
"""Retrieve an attribute from a sub-environment, works for both Sync and Async.""" first_type = type(env.envs[0]) # Get type of first env
try: return all(type(e) is first_type for e in env.envs) # Fast type check
return env.get_attr(attr)[index]
except (AttributeError, Exception):
return None
def _sub_env_has_attr(env: gym.vector.VectorEnv, attr: str) -> bool:
try:
env.get_attr(attr)
return True
except (AttributeError, Exception):
return False
def check_env_attributes_and_types(env: gym.vector.VectorEnv) -> None: def check_env_attributes_and_types(env: gym.vector.VectorEnv) -> None:
with warnings.catch_warnings(): with warnings.catch_warnings():
warnings.simplefilter("once", UserWarning) warnings.simplefilter("once", UserWarning) # Apply filter only in this function
if not (_sub_env_has_attr(env, "task_description") and _sub_env_has_attr(env, "task")): if not (hasattr(env.envs[0], "task_description") and hasattr(env.envs[0], "task")):
warnings.warn( warnings.warn(
"The environment does not have 'task_description' and 'task'. Some policies require these features.", "The environment does not have 'task_description' and 'task'. Some policies require these features.",
UserWarning, UserWarning,
stacklevel=2, stacklevel=2,
) )
if not are_all_envs_same_type(env):
warnings.warn(
"The environments have different types. Make sure you infer the right task from each environment. Empty task will be passed instead.",
UserWarning,
stacklevel=2,
)
def add_envs_task(env: gym.vector.VectorEnv, observation: RobotObservation) -> RobotObservation: def add_envs_task(env: gym.vector.VectorEnv, observation: RobotObservation) -> RobotObservation:
"""Adds task feature to the observation dict with respect to the first environment attribute.""" """Adds task feature to the observation dict with respect to the first environment attribute."""
if _sub_env_has_attr(env, "task_description"): if hasattr(env.envs[0], "task_description"):
task_result = list(env.call("task_description")) task_result = env.call("task_description")
if isinstance(task_result, tuple):
task_result = list(task_result)
if not isinstance(task_result, list):
raise TypeError(f"Expected task_description to return a list, got {type(task_result)}")
if not all(isinstance(item, str) for item in task_result): if not all(isinstance(item, str) for item in task_result):
raise TypeError("All items in task_description result must be strings") raise TypeError("All items in task_description result must be strings")
observation["task"] = task_result observation["task"] = task_result
elif _sub_env_has_attr(env, "task"): elif hasattr(env.envs[0], "task"):
task_result = list(env.call("task")) task_result = env.call("task")
if isinstance(task_result, tuple):
task_result = list(task_result)
if not isinstance(task_result, list):
raise TypeError(f"Expected task to return a list, got {type(task_result)}")
if not all(isinstance(item, str) for item in task_result): if not all(isinstance(item, str) for item in task_result):
raise TypeError("All items in task result must be strings") raise TypeError("All items in task result must be strings")
observation["task"] = task_result observation["task"] = task_result
else: else: # For envs without language instructions, e.g. aloha transfer cube and etc.
num_envs = observation[list(observation.keys())[0]].shape[0] num_envs = observation[list(observation.keys())[0]].shape[0]
observation["task"] = ["" for _ in range(num_envs)] observation["task"] = ["" for _ in range(num_envs)]
return observation return observation
+2 -2
View File
@@ -136,8 +136,8 @@ class TokenizerProcessorStep(ObservationProcessorStep):
# Standardize to a list of strings for the tokenizer # Standardize to a list of strings for the tokenizer
if isinstance(task, str): if isinstance(task, str):
return [task] return [task]
elif isinstance(task, (list, tuple)) and all(isinstance(t, str) for t in task): elif isinstance(task, list) and all(isinstance(t, str) for t in task):
return list(task) return task
return None return None
+16 -48
View File
@@ -50,7 +50,6 @@ import concurrent.futures as cf
import copy import copy
import json import json
import logging import logging
import math
import threading import threading
import time import time
from collections import defaultdict from collections import defaultdict
@@ -93,14 +92,6 @@ from lerobot.utils.utils import (
) )
def _shard_episodes(n_episodes: int, shard_id: int, num_shards: int) -> list[int]:
"""Return the episode indices assigned to this shard (round-robin distribution).
Example: _shard_episodes(10, 1, 4) -> [1, 5, 9]
"""
return list(range(shard_id, n_episodes, num_shards))
def rollout( def rollout(
env: gym.vector.VectorEnv, env: gym.vector.VectorEnv,
policy: PreTrainedPolicy, policy: PreTrainedPolicy,
@@ -174,15 +165,9 @@ def rollout(
if return_observations: if return_observations:
all_observations.append(deepcopy(observation)) all_observations.append(deepcopy(observation))
# Infer "task" from sub-environments (prefer natural language description). # Infer "task" from sub-environments.
# env.call() works with both SyncVectorEnv and AsyncVectorEnv. # env.call() works with both SyncVectorEnv and AsyncVectorEnv.
try: observation["task"] = env.call("task")
observation["task"] = list(env.call("task_description"))
except Exception:
try:
observation["task"] = list(env.call("task"))
except Exception:
observation["task"] = [""] * env.num_envs
# Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO) # Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO)
observation = env_preprocessor(observation) observation = env_preprocessor(observation)
@@ -207,13 +192,14 @@ def rollout(
# VectorEnv stores is_success in `info["final_info"][env_index]["is_success"]`. "final_info" isn't # VectorEnv stores is_success in `info["final_info"][env_index]["is_success"]`. "final_info" isn't
# available if none of the envs finished. # available if none of the envs finished.
if "final_info" in info and isinstance(info["final_info"], dict): if "final_info" in info:
successes = info["final_info"]["is_success"].tolist() final_info = info["final_info"]
elif "is_success" in info: if not isinstance(final_info, dict):
is_success = info["is_success"] raise RuntimeError(
successes = ( "Unsupported `final_info` format: expected dict (Gymnasium >= 1.0). "
is_success.tolist() if hasattr(is_success, "tolist") else [bool(is_success)] * env.num_envs "You're likely using an older version of gymnasium (< 1.0). Please upgrade."
) )
successes = final_info["is_success"].tolist()
else: else:
successes = [False] * env.num_envs successes = [False] * env.num_envs
@@ -562,14 +548,6 @@ def eval_main(cfg: EvalPipelineConfig):
# Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments) # Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy) env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)
# Sharding: each shard runs a subset of n_episodes with non-overlapping seeds.
shard_id = cfg.eval.shard_id
num_shards = cfg.eval.num_shards
episodes_for_shard = _shard_episodes(cfg.eval.n_episodes, shard_id, num_shards)
n_per_shard = len(episodes_for_shard)
# Shift the seed so each shard gets a different, non-overlapping seed range.
shard_seed = (cfg.seed or 0) + shard_id * math.ceil(cfg.eval.n_episodes / num_shards)
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(): with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
info = eval_policy_all( info = eval_policy_all(
envs=envs, envs=envs,
@@ -578,10 +556,10 @@ def eval_main(cfg: EvalPipelineConfig):
env_postprocessor=env_postprocessor, env_postprocessor=env_postprocessor,
preprocessor=preprocessor, preprocessor=preprocessor,
postprocessor=postprocessor, postprocessor=postprocessor,
n_episodes=n_per_shard, n_episodes=cfg.eval.n_episodes,
max_episodes_rendered=10, max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos", videos_dir=Path(cfg.output_dir) / "videos",
start_seed=shard_seed, start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks, max_parallel_tasks=cfg.env.max_parallel_tasks,
) )
print("Overall Aggregated Metrics:") print("Overall Aggregated Metrics:")
@@ -594,13 +572,8 @@ def eval_main(cfg: EvalPipelineConfig):
# Close all vec envs # Close all vec envs
close_envs(envs) close_envs(envs)
# Save info — use shard-specific filename when running in parallel mode. # Save info
if num_shards > 1: with open(Path(cfg.output_dir) / "eval_info.json", "w") as f:
out_path = Path(cfg.output_dir) / f"shard_{shard_id}_of_{num_shards}.json"
else:
out_path = Path(cfg.output_dir) / "eval_info.json"
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w") as f:
json.dump(info, f, indent=2) json.dump(info, f, indent=2)
logging.info("End of eval") logging.info("End of eval")
@@ -788,13 +761,12 @@ def eval_policy_all(
} }
if max_parallel_tasks <= 1: if max_parallel_tasks <= 1:
# sequential path (single accumulator path on the main thread)
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
for task_group, task_id, env in tasks: for task_group, task_id, env in tasks:
try:
tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs) tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs)
_accumulate_to(tg, metrics) _accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
else: else:
# threaded path: each thread gets a shallow policy copy (shared weights, independent state) # threaded path: each thread gets a shallow policy copy (shared weights, independent state)
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor: with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
@@ -803,15 +775,11 @@ def eval_policy_all(
fut = executor.submit( fut = executor.submit(
run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs
) )
fut2meta[fut] = (task_group, task_id, env) fut2meta[fut] = (task_group, task_id)
for fut in cf.as_completed(fut2meta): for fut in cf.as_completed(fut2meta):
tg, tid, env = fut2meta[fut]
try:
tg, tid, metrics = fut.result() tg, tid, metrics = fut.result()
_accumulate_to(tg, metrics) _accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
# compute aggregated metrics helper (robust to lists/scalars) # compute aggregated metrics helper (robust to lists/scalars)
def _agg_from_list(xs): def _agg_from_list(xs):
@@ -1,249 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Probe hardware and recommend optimal lerobot-eval-parallel flags.
Run standalone:
lerobot-eval-autotune --policy.path=lerobot/smolvla_libero --env.type=libero
Or called programmatically from lerobot_eval_parallel when --num-shards auto.
Steps:
1. Probe GPU VRAM and CPU core count.
2. Measure model VRAM footprint (load policy, delta of cuda.memory_allocated).
3. Compute max shards limited by VRAM (85% of total).
4. Probe env step time (optional, skipped when skip_timing=True).
5. Probe inference time (optional, skipped when skip_timing=True).
6. Derive num_shards = min(vram_limit, saturation_shards).
7. Choose MUJOCO_GL (egl vs osmesa) based on remaining VRAM headroom.
8. Compute batch_size = max(4, min(floor(cpu_cores * 0.8 / num_shards), 64)).
9. Print paste-ready command.
"""
import math
import os
import sys
import time
from dataclasses import dataclass
@dataclass
class AutotuneRecommendation:
num_shards: int
batch_size: int
mujoco_gl: str
use_amp: bool
# Probed values
gpu_name: str
vram_gb: float
cpu_cores: int
model_gb: float
env_step_ms: float | None
infer_ms: float | None
_DEFAULT_ENV_STEP_MS = 22.0 # LIBERO on GPU, typical value
_DEFAULT_INFER_MS = 5.0 # SmolVLA fp16 on H100
def _probe_gpu() -> tuple[str, float]:
"""Return (gpu_name, vram_gb). Falls back to CPU sentinel on non-CUDA systems."""
try:
import torch
if not torch.cuda.is_available():
return "CPU (no CUDA)", 0.0
props = torch.cuda.get_device_properties(0)
return props.name, props.total_memory / (1024**3)
except Exception:
return "unknown", 0.0
def _probe_model_gb(passthrough: list[str]) -> float:
"""Load the policy (from --policy.path) and measure VRAM delta. Returns GB."""
# Extract policy path from passthrough args
policy_path = None
for tok in passthrough:
if tok.startswith("policy.path="):
policy_path = tok.split("=", 1)[1]
break
if tok.startswith("--policy.path="):
policy_path = tok.split("=", 1)[1]
break
if policy_path is None:
return 0.0
try:
import torch
from lerobot.policies.factory import make_policy
from lerobot.policies.pretrained import PreTrainedConfig
if not torch.cuda.is_available():
return 0.0
torch.cuda.synchronize()
before = torch.cuda.memory_allocated(0)
cfg = PreTrainedConfig.from_pretrained(policy_path)
cfg.pretrained_path = policy_path # type: ignore[assignment]
policy = make_policy(cfg=cfg)
policy.eval()
torch.cuda.synchronize()
after = torch.cuda.memory_allocated(0)
del policy
torch.cuda.empty_cache()
return (after - before) / (1024**3)
except Exception as e:
print(f"[autotune] could not measure model VRAM: {e}", file=sys.stderr)
return 0.0
def _probe_env_step_ms(passthrough: list[str], batch_size: int = 8, n_steps: int = 30) -> float | None:
"""Run a short env warmup and return median step latency in ms. Returns None on failure."""
try:
import numpy as np
from lerobot.envs.factory import make_env
# Parse env config from passthrough using lerobot's own parser
env_type = None
for tok in passthrough:
if tok.startswith("env.type=") or tok.startswith("--env.type="):
env_type = tok.split("=", 1)[1]
break
if env_type is None:
return None
# Minimal env config
from lerobot.envs.factory import make_env_config
env_cfg = make_env_config(env_type)
envs = make_env(env_cfg, n_envs=batch_size, use_async_envs=(batch_size > 1))
# Get first vec env
first_suite = next(iter(envs.values()))
env = next(iter(first_suite.values()))
env.reset()
dummy_action = np.zeros((batch_size, env.single_action_space.shape[0]))
timings = []
for _ in range(n_steps):
t0 = time.perf_counter()
env.step(dummy_action)
timings.append((time.perf_counter() - t0) * 1000)
env.close()
return float(np.median(timings))
except Exception as e:
print(f"[autotune] env step probe failed: {e}", file=sys.stderr)
return None
def probe_and_recommend(
passthrough: list[str],
skip_timing: bool = False,
) -> AutotuneRecommendation:
"""Probe hardware + model and return the recommended configuration."""
gpu_name, vram_gb = _probe_gpu()
cpu_cores = os.cpu_count() or 4
# Model footprint
model_gb = _probe_model_gb(passthrough)
if model_gb == 0.0:
# Unknown model: assume a conservative 14 GB (SmolVLA fp16) as placeholder
model_gb = 14.0
print("[autotune] model size unknown, assuming 14 GB (SmolVLA fp16)", file=sys.stderr)
# Max shards from VRAM (leave 15% headroom for activations + env frames)
max_shards_vram = max(1, math.floor(vram_gb * 0.85 / model_gb)) if vram_gb > 0 else 1
# Timing probes
env_step_ms: float | None = None
infer_ms: float | None = None
if not skip_timing:
env_step_ms = _probe_env_step_ms(passthrough)
# Inference time: assume ~infer = env_step / saturation_factor heuristic
# Full probe would require loading policy — skip for now to stay fast.
infer_ms = _DEFAULT_INFER_MS
# Number of shards to saturate GPU: ceil(env_step / infer)
_step = env_step_ms or _DEFAULT_ENV_STEP_MS
_infer = infer_ms or _DEFAULT_INFER_MS
saturation_shards = max(1, math.ceil(_step / _infer))
num_shards = min(max_shards_vram, saturation_shards)
# Rendering mode: EGL if all model copies + env frame buffers fit in VRAM
env_vram_per_shard_gb = 0.01 # ~10 MB overhead per env batch
total_with_egl = num_shards * (model_gb + env_vram_per_shard_gb)
mujoco_gl = "egl" if (vram_gb == 0 or total_with_egl < vram_gb * 0.85) else "osmesa"
# Batch size: fill CPU cores evenly across shards
batch_size = max(4, min(math.floor(cpu_cores * 0.8 / num_shards), 64))
# Recommend AMP when model is large (saves ~50% VRAM)
use_amp = model_gb > 8.0
return AutotuneRecommendation(
num_shards=num_shards,
batch_size=batch_size,
mujoco_gl=mujoco_gl,
use_amp=use_amp,
gpu_name=gpu_name,
vram_gb=vram_gb,
cpu_cores=cpu_cores,
model_gb=model_gb,
env_step_ms=env_step_ms,
infer_ms=infer_ms,
)
def main(argv: list[str] | None = None) -> None:
passthrough = argv if argv is not None else sys.argv[1:]
rec = probe_and_recommend(passthrough)
env_step_str = (
f"{rec.env_step_ms:.0f}ms" if rec.env_step_ms else f"~{_DEFAULT_ENV_STEP_MS:.0f}ms (estimated)"
)
infer_str = f"{rec.infer_ms:.0f}ms" if rec.infer_ms else f"~{_DEFAULT_INFER_MS:.0f}ms (estimated)"
print()
print(
f"GPU: {rec.gpu_name} | VRAM: {rec.vram_gb:.1f} GB | CPU cores: {rec.cpu_cores} | Model: {rec.model_gb:.1f} GB"
)
print()
print(f" env_step_ms: {env_step_str} | infer_ms: {infer_str}")
print()
print(f" num_shards: {rec.num_shards}")
print(f" batch_size: {rec.batch_size}")
print(f" MUJOCO_GL: {rec.mujoco_gl}")
if rec.use_amp:
print(" use_amp: true (recommended — halves VRAM, faster matmuls)")
print()
# Build paste-ready command
flags = [f"--num-shards {rec.num_shards}", f"eval.batch_size={rec.batch_size}"]
if rec.use_amp:
flags.append("policy.use_amp=true")
flags_str = " \\\n ".join(flags)
passthrough_str = " \\\n ".join(passthrough) if passthrough else "[your flags]"
print(" Paste-ready command:")
print(f" MUJOCO_GL={rec.mujoco_gl} lerobot-eval-parallel \\")
print(f" {flags_str} \\")
print(f" {passthrough_str}")
print()
if __name__ == "__main__":
main()
@@ -1,185 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run lerobot-eval across N independent subprocesses (shards) for maximum GPU utilization.
Each shard handles a disjoint subset of episodes and writes its own JSON results file.
Results are merged and printed when all shards complete.
Usage:
lerobot-eval-parallel --num-shards 4 [any lerobot-eval flags]
lerobot-eval-parallel --num-shards auto [any lerobot-eval flags]
lerobot-eval-parallel --num-shards auto --render-device cpu [any lerobot-eval flags]
--num-shards auto:
Calls lerobot-eval-autotune to probe hardware and determine the optimal number of shards.
--render-device gpu|cpu|auto:
Controls MUJOCO_GL env var. 'gpu' -> EGL (faster, ~3ms/frame, ~200KB VRAM/env).
'cpu' -> osmesa (slower, ~12ms/frame, 0 VRAM). 'auto' picks based on VRAM headroom.
Default: auto.
"""
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
def _parse_known(argv: list[str]) -> tuple[argparse.Namespace, list[str]]:
p = argparse.ArgumentParser(add_help=False)
p.add_argument("--num-shards", default="1")
p.add_argument("--render-device", choices=["gpu", "cpu", "auto"], default="auto")
p.add_argument("--output-dir", default=None)
return p.parse_known_args(argv)
def _resolve_num_shards(num_shards_str: str, passthrough: list[str]) -> int:
if num_shards_str == "auto":
from lerobot.scripts.lerobot_eval_autotune import probe_and_recommend
rec = probe_and_recommend(passthrough)
print(
f"[autotune] recommended num_shards={rec.num_shards}, batch_size={rec.batch_size}, MUJOCO_GL={rec.mujoco_gl}"
)
return rec.num_shards
return int(num_shards_str)
def _resolve_mujoco_gl(render_device: str, num_shards: int, passthrough: list[str]) -> str:
if render_device == "gpu":
return "egl"
if render_device == "cpu":
return "osmesa"
# auto: use EGL for single shard; for multiple shards check VRAM headroom
if num_shards == 1:
return "egl"
try:
from lerobot.scripts.lerobot_eval_autotune import probe_and_recommend
rec = probe_and_recommend(passthrough, skip_timing=True)
return rec.mujoco_gl
except Exception:
# Conservative fallback: osmesa avoids EGL VRAM contention
return "osmesa"
def _extract_output_dir(passthrough: list[str]) -> str | None:
for tok in passthrough:
if tok.startswith("--output-dir="):
return tok.split("=", 1)[1]
if tok == "--output-dir":
idx = passthrough.index(tok)
if idx + 1 < len(passthrough):
return passthrough[idx + 1]
return None
def _merge_shards(output_dir: str, num_shards: int) -> dict:
"""Merge per-shard JSON files into a single result dict and write eval_info.json."""
all_per_task: list[dict] = []
per_group: dict[str, dict] = {}
for k in range(num_shards):
shard_path = Path(output_dir) / f"shard_{k}_of_{num_shards}.json"
if not shard_path.exists():
print(f"[warning] shard file not found: {shard_path}", file=sys.stderr)
continue
with open(shard_path) as f:
shard = json.load(f)
all_per_task.extend(shard.get("per_task", []))
for group, metrics in shard.get("per_group", {}).items():
if group not in per_group:
per_group[group] = {"sum_rewards": [], "max_rewards": [], "successes": []}
for key in ("sum_rewards", "max_rewards", "successes"):
# metrics may store aggregates; reconstruct lists if possible
per_group[group][key].extend(metrics.get(key, []))
# Re-aggregate
import numpy as np
def _nanmean(xs: list) -> float:
return float(np.nanmean(xs)) if xs else float("nan")
groups_out = {}
all_sr, all_mr, all_succ = [], [], []
for group, acc in per_group.items():
groups_out[group] = {
"avg_sum_reward": _nanmean(acc["sum_rewards"]),
"avg_max_reward": _nanmean(acc["max_rewards"]),
"pc_success": _nanmean(acc["successes"]) * 100 if acc["successes"] else float("nan"),
"n_episodes": len(acc["sum_rewards"]),
}
all_sr.extend(acc["sum_rewards"])
all_mr.extend(acc["max_rewards"])
all_succ.extend(acc["successes"])
overall = {
"avg_sum_reward": _nanmean(all_sr),
"avg_max_reward": _nanmean(all_mr),
"pc_success": _nanmean(all_succ) * 100 if all_succ else float("nan"),
"n_episodes": len(all_sr),
}
merged = {"per_task": all_per_task, "per_group": groups_out, "overall": overall}
out_path = Path(output_dir) / "eval_info.json"
with open(out_path, "w") as f:
json.dump(merged, f, indent=2)
return merged
def main(argv: list[str] | None = None) -> None:
args, passthrough = _parse_known(argv if argv is not None else sys.argv[1:])
num_shards = _resolve_num_shards(args.num_shards, passthrough)
mujoco_gl = _resolve_mujoco_gl(args.render_device, num_shards, passthrough)
output_dir = args.output_dir or _extract_output_dir(passthrough)
print(f"[lerobot-eval-parallel] launching {num_shards} shard(s), MUJOCO_GL={mujoco_gl}")
child_env = {**os.environ, "MUJOCO_GL": mujoco_gl, "OMP_NUM_THREADS": "1"}
procs = []
for k in range(num_shards):
cmd = [
sys.executable,
"-m",
"lerobot.scripts.lerobot_eval",
f"eval.shard_id={k}",
f"eval.num_shards={num_shards}",
*passthrough,
]
if output_dir:
# Each shard shares the same output_dir; shard files are named shard_K_of_N.json
cmd.append(f"output_dir={output_dir}")
procs.append(subprocess.Popen(cmd, env=child_env))
return_codes = [p.wait() for p in procs]
if any(rc != 0 for rc in return_codes):
failed = [k for k, rc in enumerate(return_codes) if rc != 0]
print(f"[lerobot-eval-parallel] shards {failed} failed with non-zero exit codes.", file=sys.stderr)
sys.exit(1)
if output_dir and num_shards > 1:
merged = _merge_shards(output_dir, num_shards)
print("\n=== Merged Results ===")
print(json.dumps(merged["overall"], indent=2))
if __name__ == "__main__":
main()
+7 -7
View File
@@ -22,8 +22,6 @@ def test_registry_all_types():
assert len(known) >= 6 assert len(known) >= 6
for t in known: for t in known:
cfg = make_env_config(t) cfg = make_env_config(t)
if not isinstance(cfg, EnvConfig):
continue
assert cfg.type == t assert cfg.type == t
@@ -56,8 +54,10 @@ def test_delegation():
def test_processors_delegation(): def test_processors_delegation():
"""make_env_pre_post_processors delegates to cfg.get_env_processors().""" """make_env_pre_post_processors delegates to cfg.get_env_processors()."""
from lerobot.configs.policies import PreTrainedConfig
cfg = make_env_config("aloha") cfg = make_env_config("aloha")
pre, post = make_env_pre_post_processors(cfg, policy_cfg=None) pre, post = make_env_pre_post_processors(cfg, PreTrainedConfig())
assert len(pre.steps) == 0 assert len(pre.steps) == 0
@@ -90,7 +90,7 @@ def test_base_create_envs():
envs = _Env().create_envs(n_envs=2) envs = _Env().create_envs(n_envs=2)
assert "_dispatch_base_test" in envs assert "_dispatch_base_test" in envs
env = envs["_dispatch_base_test"][0] env = envs["_dispatch_base_test"][0]
assert isinstance(env, gym.vector.VectorEnv) assert isinstance(env, gym.vector.SyncVectorEnv)
assert env.num_envs == 2 assert env.num_envs == 2
env.close() env.close()
finally: finally:
@@ -124,7 +124,7 @@ def test_custom_create_envs_override():
def test_custom_get_env_processors_override(): def test_custom_get_env_processors_override():
"""A custom EnvConfig subclass can override get_env_processors().""" """A custom EnvConfig subclass can override get_env_processors()."""
from lerobot.processor.pipeline import DataProcessorPipeline from lerobot.processor.pipeline import PolicyProcessorPipeline
@EnvConfig.register_subclass("_dispatch_proc_test") @EnvConfig.register_subclass("_dispatch_proc_test")
@dataclass @dataclass
@@ -137,7 +137,7 @@ def test_custom_get_env_processors_override():
return {} return {}
def get_env_processors(self): def get_env_processors(self):
return DataProcessorPipeline(steps=[]), DataProcessorPipeline(steps=[]) return PolicyProcessorPipeline(steps=[]), PolicyProcessorPipeline(steps=[])
pre, post = _Env().get_env_processors() pre, post = _Env().get_env_processors()
assert isinstance(pre, DataProcessorPipeline) assert isinstance(pre, PolicyProcessorPipeline)
@@ -189,30 +189,6 @@ def test_list_of_strings_tokenization(mock_auto_tokenizer):
assert attention_mask.shape == (2, 8) assert attention_mask.shape == (2, 8)
@require_package("transformers")
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
def test_tuple_of_strings_tokenization(mock_auto_tokenizer):
"""Test tokenization of a tuple of strings (returned by VectorEnv.call())."""
mock_tokenizer = MockTokenizer(vocab_size=100)
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8)
transition = create_transition(
observation={"state": torch.tensor([1.0, 2.0])},
action=torch.tensor([0.1, 0.2]),
complementary_data={"task": ("pick up cube", "place on table")},
)
result = processor(transition)
observation = result[TransitionKey.OBSERVATION]
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
assert tokens.shape == (2, 8)
assert attention_mask.shape == (2, 8)
@require_package("transformers") @require_package("transformers")
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer") @patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
def test_custom_keys(mock_auto_tokenizer): def test_custom_keys(mock_auto_tokenizer):