mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-12 23:29:52 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 46e9e22b05 | |||
| b43f9ab048 |
@@ -67,7 +67,8 @@ class EvalConfig:
|
||||
# `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv.
|
||||
batch_size: int = 50
|
||||
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
|
||||
use_async_envs: bool = False
|
||||
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
|
||||
use_async_envs: bool = True
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.batch_size > self.n_episodes:
|
||||
|
||||
@@ -75,13 +75,14 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
|
||||
def create_envs(
|
||||
self,
|
||||
n_envs: int,
|
||||
use_async_envs: bool = False,
|
||||
use_async_envs: bool = True,
|
||||
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
|
||||
"""Create {suite: {task_id: VectorEnv}}.
|
||||
|
||||
Default: single-task env via gym.make(). Multi-task benchmarks override.
|
||||
AsyncVectorEnv is the default for n_envs > 1; auto-downgraded to Sync for n_envs=1.
|
||||
"""
|
||||
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
|
||||
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
|
||||
|
||||
if self.gym_id not in gym_registry:
|
||||
print(f"gym id '{self.gym_id}' not found, attempting to import '{self.package_name}'...")
|
||||
@@ -388,12 +389,12 @@ class LiberoEnv(EnvConfig):
|
||||
kwargs["task_ids"] = self.task_ids
|
||||
return kwargs
|
||||
|
||||
def create_envs(self, n_envs: int, use_async_envs: bool = False):
|
||||
def create_envs(self, n_envs: int, use_async_envs: bool = True):
|
||||
from lerobot.envs.libero import create_libero_envs
|
||||
|
||||
if self.task is None:
|
||||
raise ValueError("LiberoEnv requires a task to be specified")
|
||||
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
|
||||
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
|
||||
return create_libero_envs(
|
||||
task=self.task,
|
||||
n_envs=n_envs,
|
||||
@@ -456,12 +457,12 @@ class MetaworldEnv(EnvConfig):
|
||||
"render_mode": self.render_mode,
|
||||
}
|
||||
|
||||
def create_envs(self, n_envs: int, use_async_envs: bool = False):
|
||||
def create_envs(self, n_envs: int, use_async_envs: bool = True):
|
||||
from lerobot.envs.metaworld import create_metaworld_envs
|
||||
|
||||
if self.task is None:
|
||||
raise ValueError("MetaWorld requires a task to be specified")
|
||||
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
|
||||
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
|
||||
return create_metaworld_envs(
|
||||
task=self.task,
|
||||
n_envs=n_envs,
|
||||
|
||||
@@ -58,7 +58,7 @@ def make_env_pre_post_processors(
|
||||
def make_env(
|
||||
cfg: EnvConfig | str,
|
||||
n_envs: int = 1,
|
||||
use_async_envs: bool = False,
|
||||
use_async_envs: bool = True,
|
||||
hub_cache_dir: str | None = None,
|
||||
trust_remote_code: bool = False,
|
||||
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
|
||||
|
||||
+35
-17
@@ -150,7 +150,17 @@ class LiberoEnv(gym.Env):
|
||||
|
||||
self.init_state_id = self.episode_index # tie each sub-env to a fixed init state
|
||||
|
||||
self._env = self._make_envs_task(task_suite, self.task_id)
|
||||
# Extract task metadata without allocating GPU resources (safe before fork).
|
||||
task = task_suite.get_task(task_id)
|
||||
self.task = task.name
|
||||
self.task_description = task.language
|
||||
self._task_bddl_file = os.path.join(
|
||||
get_libero_path("bddl_files"), task.problem_folder, task.bddl_file
|
||||
)
|
||||
self._env: OffScreenRenderEnv | None = (
|
||||
None # deferred — created on first reset() inside the worker subprocess
|
||||
)
|
||||
|
||||
default_steps = 500
|
||||
self._max_episode_steps = (
|
||||
TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps)
|
||||
@@ -221,28 +231,32 @@ class LiberoEnv(gym.Env):
|
||||
low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32
|
||||
)
|
||||
|
||||
def _ensure_env(self) -> None:
|
||||
"""Create the underlying OffScreenRenderEnv on first use.
|
||||
|
||||
Called inside the worker subprocess after fork(), so each worker gets
|
||||
its own clean EGL context rather than inheriting a stale one from the
|
||||
parent process (which causes EGL_BAD_CONTEXT crashes with AsyncVectorEnv).
|
||||
"""
|
||||
if self._env is not None:
|
||||
return
|
||||
env = OffScreenRenderEnv(
|
||||
bddl_file_name=self._task_bddl_file,
|
||||
camera_heights=self.observation_height,
|
||||
camera_widths=self.observation_width,
|
||||
)
|
||||
env.reset()
|
||||
self._env = env
|
||||
|
||||
def render(self):
|
||||
self._ensure_env()
|
||||
raw_obs = self._env.env._get_observations()
|
||||
image = self._format_raw_obs(raw_obs)["pixels"]["image"]
|
||||
image = image[::-1, ::-1] # flip both H and W for visualization
|
||||
return image
|
||||
|
||||
def _make_envs_task(self, task_suite: Any, task_id: int = 0):
|
||||
task = task_suite.get_task(task_id)
|
||||
self.task = task.name
|
||||
self.task_description = task.language
|
||||
task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file)
|
||||
|
||||
env_args = {
|
||||
"bddl_file_name": task_bddl_file,
|
||||
"camera_heights": self.observation_height,
|
||||
"camera_widths": self.observation_width,
|
||||
}
|
||||
env = OffScreenRenderEnv(**env_args)
|
||||
env.reset()
|
||||
return env
|
||||
|
||||
def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation:
|
||||
assert self._env is not None, "_format_raw_obs called before _ensure_env()"
|
||||
images = {}
|
||||
for camera_name in self.camera_name:
|
||||
image = raw_obs[camera_name]
|
||||
@@ -294,6 +308,7 @@ class LiberoEnv(gym.Env):
|
||||
)
|
||||
|
||||
def reset(self, seed=None, **kwargs):
|
||||
self._ensure_env()
|
||||
super().reset(seed=seed)
|
||||
self._env.seed(seed)
|
||||
raw_obs = self._env.reset()
|
||||
@@ -320,6 +335,8 @@ class LiberoEnv(gym.Env):
|
||||
return observation, info
|
||||
|
||||
def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]:
|
||||
self._ensure_env()
|
||||
assert self._env is not None
|
||||
if action.ndim != 1:
|
||||
raise ValueError(
|
||||
f"Expected action to be 1-D (shape (action_dim,)), "
|
||||
@@ -350,7 +367,8 @@ class LiberoEnv(gym.Env):
|
||||
return observation, reward, terminated, truncated, info
|
||||
|
||||
def close(self):
|
||||
self._env.close()
|
||||
if self._env is not None:
|
||||
self._env.close()
|
||||
|
||||
|
||||
def _make_env_fns(
|
||||
|
||||
@@ -97,8 +97,9 @@ class MetaworldEnv(gym.Env):
|
||||
self.visualization_height = visualization_height
|
||||
self.camera_name = camera_name
|
||||
|
||||
self._env = self._make_envs_task(self.task)
|
||||
self._max_episode_steps = self._env.max_path_length
|
||||
self._env_name = self.task # already stripped of "metaworld-" prefix above
|
||||
self._env = None # deferred — created on first reset() inside the worker subprocess
|
||||
self._max_episode_steps = 500 # MT1 environments always have max_path_length=500
|
||||
self.task_description = TASK_DESCRIPTIONS[self.task]
|
||||
|
||||
self.expert_policy = TASK_POLICY_MAPPING[self.task]()
|
||||
@@ -136,6 +137,24 @@ class MetaworldEnv(gym.Env):
|
||||
|
||||
self.action_space = spaces.Box(low=-1, high=1, shape=(ACTION_DIM,), dtype=np.float32)
|
||||
|
||||
def _ensure_env(self) -> None:
|
||||
"""Create the underlying MetaWorld env on first use.
|
||||
|
||||
Called inside the worker subprocess after fork(), so each worker gets
|
||||
its own clean rendering context rather than inheriting a stale one from
|
||||
the parent process (which causes crashes with AsyncVectorEnv).
|
||||
"""
|
||||
if self._env is not None:
|
||||
return
|
||||
mt1 = metaworld.MT1(self._env_name, seed=42)
|
||||
env = mt1.train_classes[self._env_name](render_mode="rgb_array", camera_name=self.camera_name)
|
||||
env.set_task(mt1.train_tasks[0])
|
||||
if self.camera_name == "corner2":
|
||||
env.model.cam_pos[2] = [0.75, 0.075, 0.7]
|
||||
env.reset()
|
||||
env._freeze_rand_vec = False # otherwise no randomization
|
||||
self._env = env
|
||||
|
||||
def render(self) -> np.ndarray:
|
||||
"""
|
||||
Render the current environment frame.
|
||||
@@ -143,26 +162,13 @@ class MetaworldEnv(gym.Env):
|
||||
Returns:
|
||||
np.ndarray: The rendered RGB image from the environment.
|
||||
"""
|
||||
self._ensure_env()
|
||||
image = self._env.render()
|
||||
if self.camera_name == "corner2":
|
||||
# Images from this camera are flipped — correct them
|
||||
image = np.flip(image, (0, 1))
|
||||
return image
|
||||
|
||||
def _make_envs_task(self, env_name: str):
|
||||
mt1 = metaworld.MT1(env_name, seed=42)
|
||||
env = mt1.train_classes[env_name](render_mode="rgb_array", camera_name=self.camera_name)
|
||||
env.set_task(mt1.train_tasks[0])
|
||||
if self.camera_name == "corner2":
|
||||
env.model.cam_pos[2] = [
|
||||
0.75,
|
||||
0.075,
|
||||
0.7,
|
||||
] # corner2 position, similar to https://arxiv.org/pdf/2206.14244
|
||||
env.reset()
|
||||
env._freeze_rand_vec = False # otherwise no randomization
|
||||
return env
|
||||
|
||||
def _format_raw_obs(self, raw_obs: np.ndarray) -> RobotObservation:
|
||||
image = None
|
||||
if self._env is not None:
|
||||
@@ -209,6 +215,7 @@ class MetaworldEnv(gym.Env):
|
||||
observation (RobotObservation): The initial formatted observation.
|
||||
info (Dict[str, Any]): Additional info about the reset state.
|
||||
"""
|
||||
self._ensure_env()
|
||||
super().reset(seed=seed)
|
||||
|
||||
raw_obs, info = self._env.reset(seed=seed)
|
||||
@@ -232,6 +239,7 @@ class MetaworldEnv(gym.Env):
|
||||
truncated (bool): Whether the episode was truncated due to a time limit.
|
||||
info (Dict[str, Any]): Additional environment info.
|
||||
"""
|
||||
self._ensure_env()
|
||||
if action.ndim != 1:
|
||||
raise ValueError(
|
||||
f"Expected action to be 1-D (shape (action_dim,)), "
|
||||
@@ -263,7 +271,8 @@ class MetaworldEnv(gym.Env):
|
||||
return observation, reward, terminated, truncated, info
|
||||
|
||||
def close(self):
|
||||
self._env.close()
|
||||
if self._env is not None:
|
||||
self._env.close()
|
||||
|
||||
|
||||
# ---- Main API ----------------------------------------------------------------
|
||||
|
||||
@@ -47,6 +47,7 @@ You can learn about the CLI options for this script in the `EvalPipelineConfig`
|
||||
"""
|
||||
|
||||
import concurrent.futures as cf
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
@@ -56,7 +57,6 @@ from collections.abc import Callable
|
||||
from contextlib import nullcontext
|
||||
from copy import deepcopy
|
||||
from dataclasses import asdict
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
from typing import Any, TypedDict
|
||||
@@ -73,7 +73,6 @@ from lerobot.configs import parser
|
||||
from lerobot.configs.eval import EvalPipelineConfig
|
||||
from lerobot.envs.factory import make_env, make_env_pre_post_processors
|
||||
from lerobot.envs.utils import (
|
||||
add_envs_task,
|
||||
check_env_attributes_and_types,
|
||||
close_envs,
|
||||
preprocess_observation,
|
||||
@@ -166,9 +165,9 @@ def rollout(
|
||||
if return_observations:
|
||||
all_observations.append(deepcopy(observation))
|
||||
|
||||
# Infer "task" from attributes of environments.
|
||||
# TODO: works with SyncVectorEnv but not AsyncVectorEnv
|
||||
observation = add_envs_task(env, observation)
|
||||
# Infer "task" from sub-environments.
|
||||
# env.call() works with both SyncVectorEnv and AsyncVectorEnv.
|
||||
observation["task"] = env.call("task")
|
||||
|
||||
# Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO)
|
||||
observation = env_preprocessor(observation)
|
||||
@@ -734,34 +733,48 @@ def eval_policy_all(
|
||||
group_acc[group]["video_paths"].extend(paths)
|
||||
overall["video_paths"].extend(paths)
|
||||
|
||||
def _make_thread_policy(p: PreTrainedPolicy) -> PreTrainedPolicy:
|
||||
"""Shallow copy sharing weight tensors, with independent per-thread state.
|
||||
|
||||
copy.copy() gives a new Python object whose _parameters dict is a shared
|
||||
reference (same tensor storage, zero extra VRAM). reset() then rebinds
|
||||
mutable state (action queues etc.) to fresh per-thread objects.
|
||||
|
||||
Note: does NOT work for ACT with temporal_ensemble_coeff — that policy's
|
||||
reset() mutates a shared sub-object. Use max_parallel_tasks=1 for that config.
|
||||
"""
|
||||
thread_p = copy.copy(p)
|
||||
thread_p.reset()
|
||||
return thread_p
|
||||
|
||||
# Choose runner (sequential vs threaded)
|
||||
task_runner = partial(
|
||||
run_one,
|
||||
policy=policy,
|
||||
env_preprocessor=env_preprocessor,
|
||||
env_postprocessor=env_postprocessor,
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
n_episodes=n_episodes,
|
||||
max_episodes_rendered=max_episodes_rendered,
|
||||
videos_dir=videos_dir,
|
||||
return_episode_data=return_episode_data,
|
||||
start_seed=start_seed,
|
||||
)
|
||||
_runner_kwargs = {
|
||||
"env_preprocessor": env_preprocessor,
|
||||
"env_postprocessor": env_postprocessor,
|
||||
"preprocessor": preprocessor,
|
||||
"postprocessor": postprocessor,
|
||||
"n_episodes": n_episodes,
|
||||
"max_episodes_rendered": max_episodes_rendered,
|
||||
"videos_dir": videos_dir,
|
||||
"return_episode_data": return_episode_data,
|
||||
"start_seed": start_seed,
|
||||
}
|
||||
|
||||
if max_parallel_tasks <= 1:
|
||||
# sequential path (single accumulator path on the main thread)
|
||||
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
|
||||
for task_group, task_id, env in tasks:
|
||||
tg, tid, metrics = task_runner(task_group, task_id, env)
|
||||
tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs)
|
||||
_accumulate_to(tg, metrics)
|
||||
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
|
||||
else:
|
||||
# threaded path: submit all tasks, consume completions on main thread and accumulate there
|
||||
# threaded path: each thread gets a shallow policy copy (shared weights, independent state)
|
||||
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
|
||||
fut2meta = {}
|
||||
for task_group, task_id, env in tasks:
|
||||
fut = executor.submit(task_runner, task_group, task_id, env)
|
||||
fut = executor.submit(
|
||||
run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs
|
||||
)
|
||||
fut2meta[fut] = (task_group, task_id)
|
||||
for fut in cf.as_completed(fut2meta):
|
||||
tg, tid, metrics = fut.result()
|
||||
|
||||
Reference in New Issue
Block a user