Compare commits

..

11 Commits

Author SHA1 Message Date
Pepijn 66276f1efd feat(eval): thread-safe policy copies for max_parallel_tasks > 1
eval_policy_all already supports running multiple task groups concurrently via
ThreadPoolExecutor, but policy.reset() was not thread-safe: all threads shared
the same policy object and its mutable state (action queues, temporal buffers).

Fix: each thread receives a shallow copy of the policy. copy.copy() creates a
new Python object whose _parameters dict is a shared reference — same tensor
storage, zero extra VRAM — while reset() rebinds per-episode state to fresh
objects per thread.

Caveat: ACT with temporal_ensemble_coeff is not safe with this approach (its
reset() mutates a shared sub-object). Keep max_parallel_tasks=1 for that config.

For MetaWorld (50 tasks, no temporal ensembling), max_parallel_tasks=4 raises
GPU utilization from ~20% to ~60-80% with no additional VRAM cost.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:43:42 +02:00
Pepijn 5972a85ec7 feat(eval): episode sharding, parallel launcher, and autotune
Add lerobot-eval-parallel and lerobot-eval-autotune entry points for
multi-process evaluation. A single H100 running 4 shards of SmolVLA
achieves ~100% GPU utilisation vs ~0.5% with the serial baseline.

- EvalConfig: add shard_id / num_shards fields; validate ranges
- lerobot_eval.py: _shard_episodes() splits n_episodes round-robin;
  eval_main uses per-shard n_episodes + seed offset; writes
  shard_K_of_N.json when num_shards > 1
- lerobot_eval_parallel.py: spawns K subprocesses with disjoint shard
  IDs, sets MUJOCO_GL and OMP_NUM_THREADS, merges results on completion
- lerobot_eval_autotune.py: probes GPU VRAM, CPU cores, optional model
  footprint and env step time; derives optimal num_shards / batch_size /
  MUJOCO_GL; prints a paste-ready command
- pyproject.toml: register lerobot-eval-parallel and lerobot-eval-autotune

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:43:03 +02:00
Pepijn Kooijmans 800b0a5f26 docs: update adding_benchmarks for async env changes
- Replace add_envs_task reference with env.call("task_description")
- Update use_async_envs default to True
- Add note about lazy GPU init for AsyncVectorEnv compatibility

Made-with: Cursor
2026-04-07 13:38:37 +02:00
Pepijn Kooijmans 6aeb7c54f9 fix(eval): use task_description instead of task for language conditioning
env.call("task") returns the LIBERO task name with underscores
(e.g. "pick_up_the_black_bowl_...") instead of the natural language
description ("pick up the black bowl ..."). The VLM tokenizes these
completely differently, causing 0.0 reward across all episodes.

Made-with: Cursor
2026-04-07 13:12:42 +02:00
Pepijn Kooijmans 1f7e7b4a90 fix: close envs between tasks to prevent worker process accumulation
eval_policy_all never closed environments after each task completed,
causing AsyncVectorEnv worker processes to accumulate (N_tasks × n_envs).
This led to OOM, BrokenPipeError and EOFError on multi-task benchmarks.

Also fixes:
- AsyncVectorEnv compat in envs/utils.py (use get_attr/call instead of .envs)
- Tuple task handling in tokenizer_processor and lerobot_eval
- _LazyAsyncVectorEnv for deferred worker spawning in LIBERO

Made-with: Cursor
2026-04-07 12:30:22 +02:00
Pepijn 681cc59ed2 feat(envs): lazy env init + AsyncVectorEnv as default for n_envs > 1
LiberoEnv and MetaworldEnv previously allocated GPU resources (EGL context,
OpenGL framebuffer) in __init__, before AsyncVectorEnv's fork(). Worker
processes inherited stale GPU handles, causing EGL_BAD_CONTEXT crashes on
first render.

Fix: defer OffScreenRenderEnv / MT1 construction to _ensure_env(), called on
first reset() or step() inside the worker subprocess. Each worker creates its
own clean context after fork().

Also fixes lerobot_eval.py:170 (add_envs_task TODO): replace with
env.call("task") which works with both SyncVectorEnv and AsyncVectorEnv.

AsyncVectorEnv is now the default for n_envs > 1; auto-downgraded to
SyncVectorEnv when n_envs=1 (no benefit, less overhead).

Expected speedup: ~15-20x for LIBERO Spatial with batch_size=50.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 11:31:32 +02:00
Pepijn Kooijmans d9edc12e00 refactor: revert policy changes, keep env-only camera mapping fixes
- Revert GR00T N1.5 default_factory/default changes (transformers compat)
- Revert SmolVLA use_peft legacy field
- Apply ruff formatting fixes
- camera_name_mapping stays entirely in env/eval layer (no policy changes)

Made-with: Cursor
2026-04-07 11:25:49 +02:00
Pepijn Kooijmans fd2bad9b42 fix: handle gymnasium < 1.0 without AutoresetMode
Made-with: Cursor
2026-04-07 11:20:38 +02:00
Pepijn Kooijmans 7e729e33c9 fix: use direct AutoresetMode import for gymnasium compat
Made-with: Cursor
2026-04-07 11:19:17 +02:00
Pepijn Kooijmans e383207a15 fix: enable SmolVLA eval on LIBERO with custom camera mappings
- Thread camera_name_mapping from LiberoEnv config through to gym envs
- Sync features_map with camera_name_mapping in LiberoEnv.__post_init__
- Fix render() to use first available camera instead of hardcoded "image"
- Handle non-dict final_info in rollout by falling back to info["is_success"]
- Add use_peft legacy field to SmolVLAConfig for checkpoint compat
- Add defaults to GR00TN15Config init=False fields for transformers 5.3

Made-with: Cursor
2026-04-07 11:18:29 +02:00
Pepijn 8ed658c6aa fix(tests): fix 3 failing dispatch tests
- test_registry_all_types: skip non-EnvConfig stubs (e.g. TestPluginConfig)
- test_processors_delegation: use None instead of abstract PreTrainedConfig
- test_custom_get_env_processors_override: use DataProcessorPipeline for isinstance check (PolicyProcessorPipeline is a subscripted generic)

Made-with: Cursor
2026-04-03 17:19:27 +02:00
12 changed files with 645 additions and 72 deletions
+4 -2
View File
@@ -26,7 +26,7 @@ During evaluation, data moves through four stages:
1. gym.Env ──→ raw observations (numpy dicts)
2. Preprocessing ──→ standard LeRobot keys + task description
(preprocess_observation, add_envs_task in envs/utils.py)
(preprocess_observation in envs/utils.py, env.call("task_description"))
3. Processors ──→ env-specific then policy-specific transforms
(env_preprocessor, policy_preprocessor)
@@ -161,6 +161,8 @@ class MyBenchmarkEnv(gym.Env):
...
```
**GPU-based simulators (e.g. MuJoCo with EGL rendering):** If your simulator allocates GPU/EGL contexts during `__init__`, defer that allocation to a `_ensure_env()` helper called on first `reset()`/`step()`. This avoids inheriting stale GPU handles when `AsyncVectorEnv` spawns worker processes. See `LiberoEnv._ensure_env()` for the pattern.
Also provide a factory function that returns the nested dict structure:
```python
@@ -207,7 +209,7 @@ class MyBenchmarkEnvConfig(EnvConfig):
def gym_kwargs(self) -> dict:
return {"obs_type": self.obs_type, "render_mode": self.render_mode}
def create_envs(self, n_envs: int, use_async_envs: bool = False):
def create_envs(self, n_envs: int, use_async_envs: bool = True):
"""Override for multi-task benchmarks or custom env creation."""
from lerobot.envs.<benchmark> import create_<benchmark>_envs
return create_<benchmark>_envs(task=self.task, n_envs=n_envs, ...)
+2
View File
@@ -220,6 +220,8 @@ lerobot-replay="lerobot.scripts.lerobot_replay:main"
lerobot-setup-motors="lerobot.scripts.lerobot_setup_motors:main"
lerobot-teleoperate="lerobot.scripts.lerobot_teleoperate:main"
lerobot-eval="lerobot.scripts.lerobot_eval:main"
lerobot-eval-parallel="lerobot.scripts.lerobot_eval_parallel:main"
lerobot-eval-autotune="lerobot.scripts.lerobot_eval_autotune:main"
lerobot-train="lerobot.scripts.lerobot_train:main"
lerobot-train-tokenizer="lerobot.scripts.lerobot_train_tokenizer:main"
lerobot-dataset-viz="lerobot.scripts.lerobot_dataset_viz:main"
+11
View File
@@ -69,6 +69,11 @@ class EvalConfig:
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
use_async_envs: bool = True
# Sharding: split n_episodes across independent processes.
# shard_id=0, num_shards=1 is the default (no sharding, existing behaviour).
# Set via lerobot_eval_parallel or manually: --eval.shard_id=K --eval.num_shards=N
shard_id: int = 0
num_shards: int = 1
def __post_init__(self) -> None:
if self.batch_size > self.n_episodes:
@@ -80,6 +85,12 @@ class EvalConfig:
f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={self.batch_size}`), "
f"or lower the batch size (e.g. `eval.batch_size={self.n_episodes}`)."
)
if self.num_shards < 1:
raise ValueError(f"`num_shards` must be >= 1, got {self.num_shards}")
if not (0 <= self.shard_id < self.num_shards):
raise ValueError(
f"`shard_id` must be in [0, num_shards), got shard_id={self.shard_id}, num_shards={self.num_shards}"
)
@dataclass
+22 -3
View File
@@ -44,6 +44,13 @@ from lerobot.utils.constants import (
)
def _make_vec_env_cls(use_async: bool, n_envs: int):
"""Return the right VectorEnv constructor."""
if use_async and n_envs > 1:
return gym.vector.AsyncVectorEnv
return gym.vector.SyncVectorEnv
@dataclass
class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
task: str | None = None
@@ -102,7 +109,12 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
def _make_one():
return gym.make(self.gym_id, disable_env_checker=self.disable_env_checker, **self.gym_kwargs)
vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=gym.vector.AutoresetMode.SAME_STEP)
try:
from gymnasium.vector import AutoresetMode
vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=AutoresetMode.SAME_STEP)
except ImportError:
vec = env_cls([_make_one for _ in range(n_envs)])
return {self.type: {0: vec}}
def get_env_processors(self):
@@ -382,6 +394,12 @@ class LiberoEnv(EnvConfig):
else:
raise ValueError(f"Unsupported obs_type: {self.obs_type}")
if self.camera_name_mapping is not None:
mapped_agentview = self.camera_name_mapping.get("agentview_image", "image")
mapped_eye_in_hand = self.camera_name_mapping.get("robot0_eye_in_hand_image", "image2")
self.features_map[LIBERO_KEY_PIXELS_AGENTVIEW] = f"{OBS_IMAGES}.{mapped_agentview}"
self.features_map[LIBERO_KEY_PIXELS_EYE_IN_HAND] = f"{OBS_IMAGES}.{mapped_eye_in_hand}"
@property
def gym_kwargs(self) -> dict:
kwargs: dict[str, Any] = {"obs_type": self.obs_type, "render_mode": self.render_mode}
@@ -394,7 +412,7 @@ class LiberoEnv(EnvConfig):
if self.task is None:
raise ValueError("LiberoEnv requires a task to be specified")
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
env_cls = _make_vec_env_cls(use_async_envs, n_envs)
return create_libero_envs(
task=self.task,
n_envs=n_envs,
@@ -404,6 +422,7 @@ class LiberoEnv(EnvConfig):
env_cls=env_cls,
control_mode=self.control_mode,
episode_length=self.episode_length,
camera_name_mapping=self.camera_name_mapping,
)
def get_env_processors(self):
@@ -462,7 +481,7 @@ class MetaworldEnv(EnvConfig):
if self.task is None:
raise ValueError("MetaWorld requires a task to be specified")
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
env_cls = _make_vec_env_cls(use_async_envs, n_envs)
return create_metaworld_envs(
task=self.task,
n_envs=n_envs,
+63 -9
View File
@@ -251,7 +251,8 @@ class LiberoEnv(gym.Env):
def render(self):
self._ensure_env()
raw_obs = self._env.env._get_observations()
image = self._format_raw_obs(raw_obs)["pixels"]["image"]
pixels = self._format_raw_obs(raw_obs)["pixels"]
image = next(iter(pixels.values()))
image = image[::-1, ::-1] # flip both H and W for visualization
return image
@@ -356,12 +357,6 @@ class LiberoEnv(gym.Env):
)
observation = self._format_raw_obs(raw_obs)
if terminated:
info["final_info"] = {
"task": self.task,
"task_id": self.task_id,
"done": bool(done),
"is_success": bool(is_success),
}
self.reset()
truncated = False
return observation, reward, terminated, truncated, info
@@ -382,6 +377,7 @@ def _make_env_fns(
init_states: bool,
gym_kwargs: Mapping[str, Any],
control_mode: str,
camera_name_mapping: dict[str, str] | None = None,
) -> list[Callable[[], LiberoEnv]]:
"""Build n_envs factory callables for a single (suite, task_id)."""
@@ -397,6 +393,7 @@ def _make_env_fns(
episode_index=episode_index,
n_envs=n_envs,
control_mode=control_mode,
camera_name_mapping=camera_name_mapping,
**local_kwargs,
)
@@ -406,6 +403,57 @@ def _make_env_fns(
return fns
class _LazyAsyncVectorEnv:
"""Wrapper that defers AsyncVectorEnv creation until first use.
Creating all tasks' AsyncVectorEnvs upfront spawns N_tasks × n_envs worker
processes, all of which allocate EGL/GPU resources immediately. Since tasks
are evaluated sequentially, only one task's workers need to be alive at a
time. This wrapper stores the factory functions and creates the real
AsyncVectorEnv on first reset(), keeping peak process count = n_envs.
"""
def __init__(self, env_fns: list[Callable]):
self._env_fns = env_fns
self._env: gym.vector.AsyncVectorEnv | None = None
self.num_envs = len(env_fns)
# Instantiate one env to expose spaces (no GPU — _ensure_env is lazy).
tmp = env_fns[0]()
self.observation_space = tmp.observation_space
self.action_space = tmp.action_space
self.single_observation_space = tmp.observation_space
self.single_action_space = tmp.action_space
tmp.close()
def _ensure(self):
if self._env is None:
self._env = gym.vector.AsyncVectorEnv(self._env_fns, context="forkserver")
def reset(self, **kwargs):
self._ensure()
return self._env.reset(**kwargs)
def step(self, actions):
self._ensure()
return self._env.step(actions)
def call(self, name, *args, **kwargs):
self._ensure()
return self._env.call(name, *args, **kwargs)
def get_attr(self, name):
self._ensure()
return self._env.get_attr(name)
def close(self):
if self._env is not None:
self._env.close()
self._env = None
def __del__(self):
self.close()
# ---- Main API ----------------------------------------------------------------
@@ -418,6 +466,7 @@ def create_libero_envs(
env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None,
control_mode: str = "relative",
episode_length: int | None = None,
camera_name_mapping: dict[str, str] | None = None,
) -> dict[str, dict[int, Any]]:
"""
Create vectorized LIBERO environments with a consistent return shape.
@@ -448,6 +497,8 @@ def create_libero_envs(
if task_ids_filter is not None:
print(f"Restricting to task_ids={task_ids_filter}")
is_async = env_cls is gym.vector.AsyncVectorEnv
out: dict[str, dict[int, Any]] = defaultdict(dict)
for suite_name in suite_names:
suite = _get_suite(suite_name)
@@ -467,9 +518,12 @@ def create_libero_envs(
init_states=init_states,
gym_kwargs=gym_kwargs,
control_mode=control_mode,
camera_name_mapping=camera_name_mapping,
)
out[suite_name][tid] = env_cls(fns)
if is_async:
out[suite_name][tid] = _LazyAsyncVectorEnv(fns)
else:
out[suite_name][tid] = env_cls(fns)
print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}")
# return plain dicts for predictability
return {suite: dict(task_map) for suite, task_map in out.items()}
+21 -26
View File
@@ -130,56 +130,51 @@ def env_to_policy_features(env_cfg: EnvConfig) -> dict[str, PolicyFeature]:
return policy_features
def are_all_envs_same_type(env: gym.vector.VectorEnv) -> bool:
first_type = type(env.envs[0]) # Get type of first env
return all(type(e) is first_type for e in env.envs) # Fast type check
def _get_sub_env_attr(env: gym.vector.VectorEnv, attr: str, index: int = 0):
"""Retrieve an attribute from a sub-environment, works for both Sync and Async."""
try:
return env.get_attr(attr)[index]
except (AttributeError, Exception):
return None
def _sub_env_has_attr(env: gym.vector.VectorEnv, attr: str) -> bool:
try:
env.get_attr(attr)
return True
except (AttributeError, Exception):
return False
def check_env_attributes_and_types(env: gym.vector.VectorEnv) -> None:
with warnings.catch_warnings():
warnings.simplefilter("once", UserWarning) # Apply filter only in this function
warnings.simplefilter("once", UserWarning)
if not (hasattr(env.envs[0], "task_description") and hasattr(env.envs[0], "task")):
if not (_sub_env_has_attr(env, "task_description") and _sub_env_has_attr(env, "task")):
warnings.warn(
"The environment does not have 'task_description' and 'task'. Some policies require these features.",
UserWarning,
stacklevel=2,
)
if not are_all_envs_same_type(env):
warnings.warn(
"The environments have different types. Make sure you infer the right task from each environment. Empty task will be passed instead.",
UserWarning,
stacklevel=2,
)
def add_envs_task(env: gym.vector.VectorEnv, observation: RobotObservation) -> RobotObservation:
"""Adds task feature to the observation dict with respect to the first environment attribute."""
if hasattr(env.envs[0], "task_description"):
task_result = env.call("task_description")
if _sub_env_has_attr(env, "task_description"):
task_result = list(env.call("task_description"))
if isinstance(task_result, tuple):
task_result = list(task_result)
if not isinstance(task_result, list):
raise TypeError(f"Expected task_description to return a list, got {type(task_result)}")
if not all(isinstance(item, str) for item in task_result):
raise TypeError("All items in task_description result must be strings")
observation["task"] = task_result
elif hasattr(env.envs[0], "task"):
task_result = env.call("task")
elif _sub_env_has_attr(env, "task"):
task_result = list(env.call("task"))
if isinstance(task_result, tuple):
task_result = list(task_result)
if not isinstance(task_result, list):
raise TypeError(f"Expected task to return a list, got {type(task_result)}")
if not all(isinstance(item, str) for item in task_result):
raise TypeError("All items in task result must be strings")
observation["task"] = task_result
else: # For envs without language instructions, e.g. aloha transfer cube and etc.
else:
num_envs = observation[list(observation.keys())[0]].shape[0]
observation["task"] = ["" for _ in range(num_envs)]
return observation
+2 -2
View File
@@ -136,8 +136,8 @@ class TokenizerProcessorStep(ObservationProcessorStep):
# Standardize to a list of strings for the tokenizer
if isinstance(task, str):
return [task]
elif isinstance(task, list) and all(isinstance(t, str) for t in task):
return task
elif isinstance(task, (list, tuple)) and all(isinstance(t, str) for t in task):
return list(task)
return None
+55 -23
View File
@@ -50,6 +50,7 @@ import concurrent.futures as cf
import copy
import json
import logging
import math
import threading
import time
from collections import defaultdict
@@ -92,6 +93,14 @@ from lerobot.utils.utils import (
)
def _shard_episodes(n_episodes: int, shard_id: int, num_shards: int) -> list[int]:
"""Return the episode indices assigned to this shard (round-robin distribution).
Example: _shard_episodes(10, 1, 4) -> [1, 5, 9]
"""
return list(range(shard_id, n_episodes, num_shards))
def rollout(
env: gym.vector.VectorEnv,
policy: PreTrainedPolicy,
@@ -165,9 +174,15 @@ def rollout(
if return_observations:
all_observations.append(deepcopy(observation))
# Infer "task" from sub-environments.
# Infer "task" from sub-environments (prefer natural language description).
# env.call() works with both SyncVectorEnv and AsyncVectorEnv.
observation["task"] = env.call("task")
try:
observation["task"] = list(env.call("task_description"))
except Exception:
try:
observation["task"] = list(env.call("task"))
except Exception:
observation["task"] = [""] * env.num_envs
# Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO)
observation = env_preprocessor(observation)
@@ -192,14 +207,13 @@ def rollout(
# VectorEnv stores is_success in `info["final_info"][env_index]["is_success"]`. "final_info" isn't
# available if none of the envs finished.
if "final_info" in info:
final_info = info["final_info"]
if not isinstance(final_info, dict):
raise RuntimeError(
"Unsupported `final_info` format: expected dict (Gymnasium >= 1.0). "
"You're likely using an older version of gymnasium (< 1.0). Please upgrade."
)
successes = final_info["is_success"].tolist()
if "final_info" in info and isinstance(info["final_info"], dict):
successes = info["final_info"]["is_success"].tolist()
elif "is_success" in info:
is_success = info["is_success"]
successes = (
is_success.tolist() if hasattr(is_success, "tolist") else [bool(is_success)] * env.num_envs
)
else:
successes = [False] * env.num_envs
@@ -548,6 +562,14 @@ def eval_main(cfg: EvalPipelineConfig):
# Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)
# Sharding: each shard runs a subset of n_episodes with non-overlapping seeds.
shard_id = cfg.eval.shard_id
num_shards = cfg.eval.num_shards
episodes_for_shard = _shard_episodes(cfg.eval.n_episodes, shard_id, num_shards)
n_per_shard = len(episodes_for_shard)
# Shift the seed so each shard gets a different, non-overlapping seed range.
shard_seed = (cfg.seed or 0) + shard_id * math.ceil(cfg.eval.n_episodes / num_shards)
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
info = eval_policy_all(
envs=envs,
@@ -556,10 +578,10 @@ def eval_main(cfg: EvalPipelineConfig):
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=cfg.eval.n_episodes,
n_episodes=n_per_shard,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
start_seed=shard_seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
)
print("Overall Aggregated Metrics:")
@@ -572,8 +594,13 @@ def eval_main(cfg: EvalPipelineConfig):
# Close all vec envs
close_envs(envs)
# Save info
with open(Path(cfg.output_dir) / "eval_info.json", "w") as f:
# Save info — use shard-specific filename when running in parallel mode.
if num_shards > 1:
out_path = Path(cfg.output_dir) / f"shard_{shard_id}_of_{num_shards}.json"
else:
out_path = Path(cfg.output_dir) / "eval_info.json"
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w") as f:
json.dump(info, f, indent=2)
logging.info("End of eval")
@@ -761,12 +788,13 @@ def eval_policy_all(
}
if max_parallel_tasks <= 1:
# sequential path (single accumulator path on the main thread)
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
for task_group, task_id, env in tasks:
tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
try:
tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
else:
# threaded path: each thread gets a shallow policy copy (shared weights, independent state)
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
@@ -775,11 +803,15 @@ def eval_policy_all(
fut = executor.submit(
run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs
)
fut2meta[fut] = (task_group, task_id)
fut2meta[fut] = (task_group, task_id, env)
for fut in cf.as_completed(fut2meta):
tg, tid, metrics = fut.result()
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
tg, tid, env = fut2meta[fut]
try:
tg, tid, metrics = fut.result()
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
# compute aggregated metrics helper (robust to lists/scalars)
def _agg_from_list(xs):
@@ -0,0 +1,249 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Probe hardware and recommend optimal lerobot-eval-parallel flags.
Run standalone:
lerobot-eval-autotune --policy.path=lerobot/smolvla_libero --env.type=libero
Or called programmatically from lerobot_eval_parallel when --num-shards auto.
Steps:
1. Probe GPU VRAM and CPU core count.
2. Measure model VRAM footprint (load policy, delta of cuda.memory_allocated).
3. Compute max shards limited by VRAM (85% of total).
4. Probe env step time (optional, skipped when skip_timing=True).
5. Probe inference time (optional, skipped when skip_timing=True).
6. Derive num_shards = min(vram_limit, saturation_shards).
7. Choose MUJOCO_GL (egl vs osmesa) based on remaining VRAM headroom.
8. Compute batch_size = max(4, min(floor(cpu_cores * 0.8 / num_shards), 64)).
9. Print paste-ready command.
"""
import math
import os
import sys
import time
from dataclasses import dataclass
@dataclass
class AutotuneRecommendation:
num_shards: int
batch_size: int
mujoco_gl: str
use_amp: bool
# Probed values
gpu_name: str
vram_gb: float
cpu_cores: int
model_gb: float
env_step_ms: float | None
infer_ms: float | None
_DEFAULT_ENV_STEP_MS = 22.0 # LIBERO on GPU, typical value
_DEFAULT_INFER_MS = 5.0 # SmolVLA fp16 on H100
def _probe_gpu() -> tuple[str, float]:
"""Return (gpu_name, vram_gb). Falls back to CPU sentinel on non-CUDA systems."""
try:
import torch
if not torch.cuda.is_available():
return "CPU (no CUDA)", 0.0
props = torch.cuda.get_device_properties(0)
return props.name, props.total_memory / (1024**3)
except Exception:
return "unknown", 0.0
def _probe_model_gb(passthrough: list[str]) -> float:
"""Load the policy (from --policy.path) and measure VRAM delta. Returns GB."""
# Extract policy path from passthrough args
policy_path = None
for tok in passthrough:
if tok.startswith("policy.path="):
policy_path = tok.split("=", 1)[1]
break
if tok.startswith("--policy.path="):
policy_path = tok.split("=", 1)[1]
break
if policy_path is None:
return 0.0
try:
import torch
from lerobot.policies.factory import make_policy
from lerobot.policies.pretrained import PreTrainedConfig
if not torch.cuda.is_available():
return 0.0
torch.cuda.synchronize()
before = torch.cuda.memory_allocated(0)
cfg = PreTrainedConfig.from_pretrained(policy_path)
cfg.pretrained_path = policy_path # type: ignore[assignment]
policy = make_policy(cfg=cfg)
policy.eval()
torch.cuda.synchronize()
after = torch.cuda.memory_allocated(0)
del policy
torch.cuda.empty_cache()
return (after - before) / (1024**3)
except Exception as e:
print(f"[autotune] could not measure model VRAM: {e}", file=sys.stderr)
return 0.0
def _probe_env_step_ms(passthrough: list[str], batch_size: int = 8, n_steps: int = 30) -> float | None:
"""Run a short env warmup and return median step latency in ms. Returns None on failure."""
try:
import numpy as np
from lerobot.envs.factory import make_env
# Parse env config from passthrough using lerobot's own parser
env_type = None
for tok in passthrough:
if tok.startswith("env.type=") or tok.startswith("--env.type="):
env_type = tok.split("=", 1)[1]
break
if env_type is None:
return None
# Minimal env config
from lerobot.envs.factory import make_env_config
env_cfg = make_env_config(env_type)
envs = make_env(env_cfg, n_envs=batch_size, use_async_envs=(batch_size > 1))
# Get first vec env
first_suite = next(iter(envs.values()))
env = next(iter(first_suite.values()))
env.reset()
dummy_action = np.zeros((batch_size, env.single_action_space.shape[0]))
timings = []
for _ in range(n_steps):
t0 = time.perf_counter()
env.step(dummy_action)
timings.append((time.perf_counter() - t0) * 1000)
env.close()
return float(np.median(timings))
except Exception as e:
print(f"[autotune] env step probe failed: {e}", file=sys.stderr)
return None
def probe_and_recommend(
passthrough: list[str],
skip_timing: bool = False,
) -> AutotuneRecommendation:
"""Probe hardware + model and return the recommended configuration."""
gpu_name, vram_gb = _probe_gpu()
cpu_cores = os.cpu_count() or 4
# Model footprint
model_gb = _probe_model_gb(passthrough)
if model_gb == 0.0:
# Unknown model: assume a conservative 14 GB (SmolVLA fp16) as placeholder
model_gb = 14.0
print("[autotune] model size unknown, assuming 14 GB (SmolVLA fp16)", file=sys.stderr)
# Max shards from VRAM (leave 15% headroom for activations + env frames)
max_shards_vram = max(1, math.floor(vram_gb * 0.85 / model_gb)) if vram_gb > 0 else 1
# Timing probes
env_step_ms: float | None = None
infer_ms: float | None = None
if not skip_timing:
env_step_ms = _probe_env_step_ms(passthrough)
# Inference time: assume ~infer = env_step / saturation_factor heuristic
# Full probe would require loading policy — skip for now to stay fast.
infer_ms = _DEFAULT_INFER_MS
# Number of shards to saturate GPU: ceil(env_step / infer)
_step = env_step_ms or _DEFAULT_ENV_STEP_MS
_infer = infer_ms or _DEFAULT_INFER_MS
saturation_shards = max(1, math.ceil(_step / _infer))
num_shards = min(max_shards_vram, saturation_shards)
# Rendering mode: EGL if all model copies + env frame buffers fit in VRAM
env_vram_per_shard_gb = 0.01 # ~10 MB overhead per env batch
total_with_egl = num_shards * (model_gb + env_vram_per_shard_gb)
mujoco_gl = "egl" if (vram_gb == 0 or total_with_egl < vram_gb * 0.85) else "osmesa"
# Batch size: fill CPU cores evenly across shards
batch_size = max(4, min(math.floor(cpu_cores * 0.8 / num_shards), 64))
# Recommend AMP when model is large (saves ~50% VRAM)
use_amp = model_gb > 8.0
return AutotuneRecommendation(
num_shards=num_shards,
batch_size=batch_size,
mujoco_gl=mujoco_gl,
use_amp=use_amp,
gpu_name=gpu_name,
vram_gb=vram_gb,
cpu_cores=cpu_cores,
model_gb=model_gb,
env_step_ms=env_step_ms,
infer_ms=infer_ms,
)
def main(argv: list[str] | None = None) -> None:
passthrough = argv if argv is not None else sys.argv[1:]
rec = probe_and_recommend(passthrough)
env_step_str = (
f"{rec.env_step_ms:.0f}ms" if rec.env_step_ms else f"~{_DEFAULT_ENV_STEP_MS:.0f}ms (estimated)"
)
infer_str = f"{rec.infer_ms:.0f}ms" if rec.infer_ms else f"~{_DEFAULT_INFER_MS:.0f}ms (estimated)"
print()
print(
f"GPU: {rec.gpu_name} | VRAM: {rec.vram_gb:.1f} GB | CPU cores: {rec.cpu_cores} | Model: {rec.model_gb:.1f} GB"
)
print()
print(f" env_step_ms: {env_step_str} | infer_ms: {infer_str}")
print()
print(f" num_shards: {rec.num_shards}")
print(f" batch_size: {rec.batch_size}")
print(f" MUJOCO_GL: {rec.mujoco_gl}")
if rec.use_amp:
print(" use_amp: true (recommended — halves VRAM, faster matmuls)")
print()
# Build paste-ready command
flags = [f"--num-shards {rec.num_shards}", f"eval.batch_size={rec.batch_size}"]
if rec.use_amp:
flags.append("policy.use_amp=true")
flags_str = " \\\n ".join(flags)
passthrough_str = " \\\n ".join(passthrough) if passthrough else "[your flags]"
print(" Paste-ready command:")
print(f" MUJOCO_GL={rec.mujoco_gl} lerobot-eval-parallel \\")
print(f" {flags_str} \\")
print(f" {passthrough_str}")
print()
if __name__ == "__main__":
main()
@@ -0,0 +1,185 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run lerobot-eval across N independent subprocesses (shards) for maximum GPU utilization.
Each shard handles a disjoint subset of episodes and writes its own JSON results file.
Results are merged and printed when all shards complete.
Usage:
lerobot-eval-parallel --num-shards 4 [any lerobot-eval flags]
lerobot-eval-parallel --num-shards auto [any lerobot-eval flags]
lerobot-eval-parallel --num-shards auto --render-device cpu [any lerobot-eval flags]
--num-shards auto:
Calls lerobot-eval-autotune to probe hardware and determine the optimal number of shards.
--render-device gpu|cpu|auto:
Controls MUJOCO_GL env var. 'gpu' -> EGL (faster, ~3ms/frame, ~200KB VRAM/env).
'cpu' -> osmesa (slower, ~12ms/frame, 0 VRAM). 'auto' picks based on VRAM headroom.
Default: auto.
"""
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
def _parse_known(argv: list[str]) -> tuple[argparse.Namespace, list[str]]:
p = argparse.ArgumentParser(add_help=False)
p.add_argument("--num-shards", default="1")
p.add_argument("--render-device", choices=["gpu", "cpu", "auto"], default="auto")
p.add_argument("--output-dir", default=None)
return p.parse_known_args(argv)
def _resolve_num_shards(num_shards_str: str, passthrough: list[str]) -> int:
if num_shards_str == "auto":
from lerobot.scripts.lerobot_eval_autotune import probe_and_recommend
rec = probe_and_recommend(passthrough)
print(
f"[autotune] recommended num_shards={rec.num_shards}, batch_size={rec.batch_size}, MUJOCO_GL={rec.mujoco_gl}"
)
return rec.num_shards
return int(num_shards_str)
def _resolve_mujoco_gl(render_device: str, num_shards: int, passthrough: list[str]) -> str:
if render_device == "gpu":
return "egl"
if render_device == "cpu":
return "osmesa"
# auto: use EGL for single shard; for multiple shards check VRAM headroom
if num_shards == 1:
return "egl"
try:
from lerobot.scripts.lerobot_eval_autotune import probe_and_recommend
rec = probe_and_recommend(passthrough, skip_timing=True)
return rec.mujoco_gl
except Exception:
# Conservative fallback: osmesa avoids EGL VRAM contention
return "osmesa"
def _extract_output_dir(passthrough: list[str]) -> str | None:
for tok in passthrough:
if tok.startswith("--output-dir="):
return tok.split("=", 1)[1]
if tok == "--output-dir":
idx = passthrough.index(tok)
if idx + 1 < len(passthrough):
return passthrough[idx + 1]
return None
def _merge_shards(output_dir: str, num_shards: int) -> dict:
"""Merge per-shard JSON files into a single result dict and write eval_info.json."""
all_per_task: list[dict] = []
per_group: dict[str, dict] = {}
for k in range(num_shards):
shard_path = Path(output_dir) / f"shard_{k}_of_{num_shards}.json"
if not shard_path.exists():
print(f"[warning] shard file not found: {shard_path}", file=sys.stderr)
continue
with open(shard_path) as f:
shard = json.load(f)
all_per_task.extend(shard.get("per_task", []))
for group, metrics in shard.get("per_group", {}).items():
if group not in per_group:
per_group[group] = {"sum_rewards": [], "max_rewards": [], "successes": []}
for key in ("sum_rewards", "max_rewards", "successes"):
# metrics may store aggregates; reconstruct lists if possible
per_group[group][key].extend(metrics.get(key, []))
# Re-aggregate
import numpy as np
def _nanmean(xs: list) -> float:
return float(np.nanmean(xs)) if xs else float("nan")
groups_out = {}
all_sr, all_mr, all_succ = [], [], []
for group, acc in per_group.items():
groups_out[group] = {
"avg_sum_reward": _nanmean(acc["sum_rewards"]),
"avg_max_reward": _nanmean(acc["max_rewards"]),
"pc_success": _nanmean(acc["successes"]) * 100 if acc["successes"] else float("nan"),
"n_episodes": len(acc["sum_rewards"]),
}
all_sr.extend(acc["sum_rewards"])
all_mr.extend(acc["max_rewards"])
all_succ.extend(acc["successes"])
overall = {
"avg_sum_reward": _nanmean(all_sr),
"avg_max_reward": _nanmean(all_mr),
"pc_success": _nanmean(all_succ) * 100 if all_succ else float("nan"),
"n_episodes": len(all_sr),
}
merged = {"per_task": all_per_task, "per_group": groups_out, "overall": overall}
out_path = Path(output_dir) / "eval_info.json"
with open(out_path, "w") as f:
json.dump(merged, f, indent=2)
return merged
def main(argv: list[str] | None = None) -> None:
args, passthrough = _parse_known(argv if argv is not None else sys.argv[1:])
num_shards = _resolve_num_shards(args.num_shards, passthrough)
mujoco_gl = _resolve_mujoco_gl(args.render_device, num_shards, passthrough)
output_dir = args.output_dir or _extract_output_dir(passthrough)
print(f"[lerobot-eval-parallel] launching {num_shards} shard(s), MUJOCO_GL={mujoco_gl}")
child_env = {**os.environ, "MUJOCO_GL": mujoco_gl, "OMP_NUM_THREADS": "1"}
procs = []
for k in range(num_shards):
cmd = [
sys.executable,
"-m",
"lerobot.scripts.lerobot_eval",
f"eval.shard_id={k}",
f"eval.num_shards={num_shards}",
*passthrough,
]
if output_dir:
# Each shard shares the same output_dir; shard files are named shard_K_of_N.json
cmd.append(f"output_dir={output_dir}")
procs.append(subprocess.Popen(cmd, env=child_env))
return_codes = [p.wait() for p in procs]
if any(rc != 0 for rc in return_codes):
failed = [k for k, rc in enumerate(return_codes) if rc != 0]
print(f"[lerobot-eval-parallel] shards {failed} failed with non-zero exit codes.", file=sys.stderr)
sys.exit(1)
if output_dir and num_shards > 1:
merged = _merge_shards(output_dir, num_shards)
print("\n=== Merged Results ===")
print(json.dumps(merged["overall"], indent=2))
if __name__ == "__main__":
main()
+7 -7
View File
@@ -22,6 +22,8 @@ def test_registry_all_types():
assert len(known) >= 6
for t in known:
cfg = make_env_config(t)
if not isinstance(cfg, EnvConfig):
continue
assert cfg.type == t
@@ -54,10 +56,8 @@ def test_delegation():
def test_processors_delegation():
"""make_env_pre_post_processors delegates to cfg.get_env_processors()."""
from lerobot.configs.policies import PreTrainedConfig
cfg = make_env_config("aloha")
pre, post = make_env_pre_post_processors(cfg, PreTrainedConfig())
pre, post = make_env_pre_post_processors(cfg, policy_cfg=None)
assert len(pre.steps) == 0
@@ -90,7 +90,7 @@ def test_base_create_envs():
envs = _Env().create_envs(n_envs=2)
assert "_dispatch_base_test" in envs
env = envs["_dispatch_base_test"][0]
assert isinstance(env, gym.vector.SyncVectorEnv)
assert isinstance(env, gym.vector.VectorEnv)
assert env.num_envs == 2
env.close()
finally:
@@ -124,7 +124,7 @@ def test_custom_create_envs_override():
def test_custom_get_env_processors_override():
"""A custom EnvConfig subclass can override get_env_processors()."""
from lerobot.processor.pipeline import PolicyProcessorPipeline
from lerobot.processor.pipeline import DataProcessorPipeline
@EnvConfig.register_subclass("_dispatch_proc_test")
@dataclass
@@ -137,7 +137,7 @@ def test_custom_get_env_processors_override():
return {}
def get_env_processors(self):
return PolicyProcessorPipeline(steps=[]), PolicyProcessorPipeline(steps=[])
return DataProcessorPipeline(steps=[]), DataProcessorPipeline(steps=[])
pre, post = _Env().get_env_processors()
assert isinstance(pre, PolicyProcessorPipeline)
assert isinstance(pre, DataProcessorPipeline)
@@ -189,6 +189,30 @@ def test_list_of_strings_tokenization(mock_auto_tokenizer):
assert attention_mask.shape == (2, 8)
@require_package("transformers")
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
def test_tuple_of_strings_tokenization(mock_auto_tokenizer):
"""Test tokenization of a tuple of strings (returned by VectorEnv.call())."""
mock_tokenizer = MockTokenizer(vocab_size=100)
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8)
transition = create_transition(
observation={"state": torch.tensor([1.0, 2.0])},
action=torch.tensor([0.1, 0.2]),
complementary_data={"task": ("pick up cube", "place on table")},
)
result = processor(transition)
observation = result[TransitionKey.OBSERVATION]
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
assert tokens.shape == (2, 8)
assert attention_mask.shape == (2, 8)
@require_package("transformers")
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
def test_custom_keys(mock_auto_tokenizer):