From 96cc634a661283f780f27326c406361c72da7655 Mon Sep 17 00:00:00 2001 From: Jade Choghari Date: Thu, 11 Sep 2025 12:21:21 +0200 Subject: [PATCH] add new changes --- examples/6_evaluate_libero.sh | 10 +- examples/8_train_smolvla_must.sh | 10 +- src/lerobot/envs/factory.py | 59 +++--- src/lerobot/envs/libero.py | 351 ++++++++++++++++++++++--------- src/lerobot/envs/utils.py | 44 ++++ src/lerobot/policies/factory.py | 1 - src/lerobot/scripts/eval.py | 261 ++++++++++++----------- src/lerobot/scripts/train.py | 73 +++---- 8 files changed, 505 insertions(+), 304 deletions(-) diff --git a/examples/6_evaluate_libero.sh b/examples/6_evaluate_libero.sh index c15d71c95..46355dfa1 100644 --- a/examples/6_evaluate_libero.sh +++ b/examples/6_evaluate_libero.sh @@ -18,11 +18,11 @@ export CUDA_VISIBLE_DEVICES=2 # CONFIGURATION POLICY_PATH="/raid/jade/logs/lerobot/lerobot_2_HuggingFaceVLA_libero_smolvla_lr1e-4bs32steps100000/checkpoints/100000/pretrained_model" -POLICY_PATH="/raid/jade/logs/lerobot/lerobot_new_HuggingfaceVLA_libero_smolvla_lr1e-4bs32steps100000/checkpoints/100000/pretrained_model" -TASK=libero_spatial +POLICY_PATH="/raid/jade/models/smolvlamust" +TASK=libero_spatial,libero_object ENV_TYPE="libero" -BATCH_SIZE=10 -N_EPISODES=10 +BATCH_SIZE=1 +N_EPISODES=1 # storage / caches RAID=/raid/jade N_ACTION_STEPS=1 @@ -46,7 +46,7 @@ python src/lerobot/scripts/eval.py \ --env.type="$ENV_TYPE" \ --eval.batch_size="$BATCH_SIZE" \ --eval.n_episodes="$N_EPISODES" \ - --env.multitask_eval=False \ + --env.multitask_eval=True \ --env.task=$TASK \ # python examples/evaluate_libero.py \ # --policy_path "$POLICY_PATH" \ diff --git a/examples/8_train_smolvla_must.sh b/examples/8_train_smolvla_must.sh index 98029f3ad..828627d85 100644 --- a/examples/8_train_smolvla_must.sh +++ b/examples/8_train_smolvla_must.sh @@ -56,14 +56,16 @@ EXPERT_WIDTH_MULTIPLIER=0.5 # number of gpus to use NUM_PROCESSES=2 NUM_VLM_LAYERS=0 -SELF_ATTN_EVERY_N_LAYERS=2 +SELF_ATTN_EVERY_N_LAYERS=0 CHUNK_SIZE=50 -export CUDA_VISIBLE_DEVICES=0 +export CUDA_VISIBLE_DEVICES=1 PORT=29522 PREFIX_LENGTH=0 LOAD_VLM_WEIGHTS=true +MAX_ACTION_DIM=32 +MAX_STATE_DIM=32 # naming/output dir -TRAIN_DIR=$RAID/logs/lerobot/lerobot_new_${REPO_ID//\//_}_${POLICY}_lr${LR}bs${BATCH_SIZE}steps${OFFLINE_STEPS} +TRAIN_DIR=$RAID/logs/lerobot/lerobot_new_sep11_v2_${REPO_ID//\//_}_${POLICY}_lr${LR}bs${BATCH_SIZE}steps${OFFLINE_STEPS} echo "Training dir: $TRAIN_DIR" rm -rf "$TRAIN_DIR" @@ -137,5 +139,7 @@ python src/lerobot/scripts/train.py \ --policy.load_vlm_weights=$LOAD_VLM_WEIGHTS \ --policy.expert_width_multiplier=$EXPERT_WIDTH_MULTIPLIER \ --policy.self_attn_every_n_layers=$SELF_ATTN_EVERY_N_LAYERS \ + --policy.max_action_dim=$MAX_ACTION_DIM \ + --policy.max_state_dim=$MAX_STATE_DIM \ --seed=$SEED \ --wandb.enable=false diff --git a/src/lerobot/envs/factory.py b/src/lerobot/envs/factory.py index 211b41714..d38b2eed3 100644 --- a/src/lerobot/envs/factory.py +++ b/src/lerobot/envs/factory.py @@ -56,36 +56,41 @@ def make_env( names to indexed vectorized environments (when multitask eval is used). """ - if n_envs < 1: - raise ValueError("`n_envs` must be at least 1") + if n_envs < 1: + raise ValueError("`n_envs` must be at least 1") - env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv + env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv + + + if "libero" in cfg.type: + from lerobot.envs.libero import create_libero_envs + return create_libero_envs( + task=cfg.task, + n_envs=n_envs, + camera_name=cfg.camera_name, + init_states=cfg.init_states, + gym_kwargs=cfg.gym_kwargs, + env_cls=env_cls, + multitask_eval=cfg.multitask_eval, + ) - if "libero" in cfg.type: - from lerobot.envs.libero import create_libero_envs - return create_libero_envs( - task=cfg.task, - n_envs=n_envs, - camera_name=cfg.camera_name, - init_states=cfg.init_states, - gym_kwargs=cfg.gym_kwargs, - env_cls=env_cls, - multitask_eval=cfg.multitask_eval, - ) + package_name = f"gym_{cfg.type}" + try: + importlib.import_module(package_name) + except ModuleNotFoundError as e: + raise ModuleNotFoundError( + f"{package_name} is not installed. Install with: pip install \"lerobot[{cfg.type}]\"" + ) from e - - package_name = f"gym_{cfg.type}" - try: - importlib.import_module(package_name) - except ModuleNotFoundError as e: - raise ModuleNotFoundError( - f"{package_name} is not installed. Install with: pip install \"lerobot[{cfg.type}]\"" - ) from e + gym_handle = f"{package_name}/{cfg.task}" + + def _make_one(): + return gym.make(gym_handle, disable_env_checker=True, **(cfg.gym_kwargs or {})) - gym_handle = f"{package_name}/{cfg.task}" - - def _make_one(): - return gym.make(gym_handle, disable_env_checker=True, **(cfg.gym_kwargs or {})) + vec = env_cls([_make_one for _ in range(n_envs)]) + + # normalize to {suite: {task_id: vec_env}} for consistency + suite_name = cfg.type # e.g., "pusht", "aloha" + return {suite_name: {0: vec}} - return env_cls([_make_one for _ in range(n_envs)]) diff --git a/src/lerobot/envs/libero.py b/src/lerobot/envs/libero.py index 83ccd2fb9..ff1574416 100644 --- a/src/lerobot/envs/libero.py +++ b/src/lerobot/envs/libero.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import math import os from collections import defaultdict @@ -12,100 +14,153 @@ from gymnasium import spaces from libero.libero import benchmark, get_libero_path from libero.libero.envs import OffScreenRenderEnv +import logging +from collections import defaultdict +from typing import Any, Callable, Dict, Iterable, List, Mapping, Sequence + + +logger = logging.getLogger(__name__) + +# ---- Helpers ----------------------------------------------------------------- + +def _parse_camera_names(camera_name: str | Sequence[str]) -> List[str]: + """Normalize camera_name into a non-empty list of strings.""" + if isinstance(camera_name, str): + cams = [c.strip() for c in camera_name.split(",") if c.strip()] + elif isinstance(camera_name, (list, tuple)): + cams = [str(c).strip() for c in camera_name if str(c).strip()] + else: + raise TypeError(f"camera_name must be str or sequence[str], got {type(camera_name).__name__}") + if not cams: + raise ValueError("camera_name resolved to an empty list.") + return cams + + +def _get_suite(name: str): + """Instantiate a LIBERO suite by name with clear validation.""" + bench = benchmark.get_benchmark_dict() + if name not in bench: + raise ValueError(f"Unknown LIBERO suite '{name}'. Available: {', '.join(sorted(bench.keys()))}") + suite = bench[name]() + if not getattr(suite, "tasks", None): + raise ValueError(f"Suite '{name}' has no tasks.") + return suite + + +def _select_task_ids(total_tasks: int, task_ids: Iterable[int] | None) -> List[int]: + """Validate/normalize task ids. If None → all tasks.""" + if task_ids is None: + return list(range(total_tasks)) + ids = sorted(set(int(t) for t in task_ids)) + for t in ids: + if t < 0 or t >= total_tasks: + raise ValueError(f"task_id {t} out of range [0, {total_tasks-1}].") + return ids + + +def _make_env_fns( + *, + suite, + suite_name: str, + task_id: int, + n_envs: int, + camera_names: List[str], + init_states: bool, + gym_kwargs: Mapping[str, Any], + LiberoEnv: type, # injected to avoid forward ref issues if needed +) -> List[Callable[[], "LiberoEnv"]]: + """Build n_envs factory callables for a single (suite, task_id).""" + joined_cams = ",".join(camera_names) # keep backward-compat: downstream expects a string + fns: List[Callable[[], "LiberoEnv"]] = [] + for i in range(n_envs): + def _mk(i=i, suite=suite, task_id=task_id, suite_name=suite_name, joined_cams=joined_cams, init_states=init_states, gym_kwargs=dict(gym_kwargs)): + return LiberoEnv( + task_suite=suite, + task_id=task_id, + task_suite_name=suite_name, + camera_name=joined_cams, + init_states=init_states, + episode_index=i, + **gym_kwargs, + ) + fns.append(_mk) + return fns + +# ---- Main API ---------------------------------------------------------------- def create_libero_envs( task: str, n_envs: int, - gym_kwargs: dict[str, Any] = None, - camera_name: str = "agentview_image,robot0_eye_in_hand_image", + gym_kwargs: dict[str, Any] | None = None, + camera_name: str | Sequence[str] = "agentview_image,robot0_eye_in_hand_image", init_states: bool = True, - env_cls: Callable = None, - multitask_eval: bool = True, -) -> dict[str, dict[str, Any]]: + env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None, + multitask_eval: bool = True, # kept for signature compatibility; return type is consistent regardless +) -> dict[str, dict[int, Any]]: """ - Here n_envs is per task and equal to the number of rollouts. + Create vectorized LIBERO environments with a consistent return shape. + Returns: - dict[str, dict[str, list[LiberoEnv]]]: keys are task_suite and values are list of LiberoEnv envs. + dict[suite_name][task_id] -> vec_env (env_cls([...]) with exactly n_envs factories) + Notes: + - n_envs is the number of rollouts *per task* (episode_index = 0..n_envs-1). + - `task` can be a single suite or a comma-separated list of suites. + - You may pass `task_ids` (list[int]) inside `gym_kwargs` to restrict tasks per suite. """ - print("num envs", n_envs) - print("multitask_eval", multitask_eval) - print("gym_kwargs", gym_kwargs) - if gym_kwargs is None: - gym_kwargs = {} + if env_cls is None or not callable(env_cls): + raise ValueError("env_cls must be a callable that wraps a list of environment factory callables.") + if not isinstance(n_envs, int) or n_envs <= 0: + raise ValueError(f"n_envs must be a positive int; got {n_envs}.") - if not multitask_eval: - benchmark_dict = benchmark.get_benchmark_dict() - task_suite = benchmark_dict[task]() # can also choose libero_spatial, libero_object, libero_10 etc. - tasks_id = list(range(len(task_suite.tasks))) - episode_indices = [0 for i in range(len(tasks_id))] - if len(tasks_id) == 1: - tasks_id = [tasks_id[0] for _ in range(n_envs)] - episode_indices = list(range(n_envs)) - elif len(tasks_id) < n_envs and n_envs % len(tasks_id) == 0: - n_repeat = n_envs // len(tasks_id) - print("n_repeat", n_repeat) - episode_indices = [] - for _ in range(len(tasks_id)): - episode_indices.extend(list(range(n_repeat))) - tasks_id = list(chain.from_iterable([[item] * n_repeat for item in tasks_id])) - elif n_envs < len(tasks_id): - tasks_id = tasks_id[:n_envs] - episode_indices = list(range(n_envs))[:n_envs] - print(f"WARNING: n_envs < len(tasks_id), evaluating only on {tasks_id}") - print(f"Creating Libero envs with task ids {tasks_id} from suite {task}") - assert n_envs == len(tasks_id), ( - f"len(n_envs) and tasks_id should be the same, got {n_envs} and {len(tasks_id)}" - ) - return env_cls( - [ - lambda i=i: LiberoEnv( - task_suite=task_suite, - task_id=tasks_id[i], - task_suite_name=task, - camera_name=camera_name, - init_states=init_states, - episode_index=episode_indices[i], - **gym_kwargs, - ) - for i in range(n_envs) - ] - ) - else: - envs = defaultdict(dict) - benchmark_dict = benchmark.get_benchmark_dict() - task = task.split(",") - for _task in task: - task_suite = benchmark_dict[ - _task - ]() # can also choose libero_spatial, libero_object, libero_10 etc. - tasks_ids = list(range(len(task_suite.tasks))) - for tasks_id in tasks_ids: - episode_indices = list(range(n_envs)) - print( - f"Creating Libero envs with task ids {tasks_id} from suite {_task}, episode_indices: {episode_indices}" - ) - envs_list = [ - ( - lambda i=i, - task_suite=task_suite, - tasks_id=tasks_id, - _task=_task, - episode_indices=episode_indices: LiberoEnv( - task_suite=task_suite, - task_id=tasks_id, - task_suite_name=_task, - camera_name=camera_name, - init_states=init_states, - episode_index=episode_indices[i], - **gym_kwargs, - ) - ) - for i in range(n_envs) - ] - envs[_task][tasks_id] = env_cls(envs_list) - return envs + gym_kwargs = dict(gym_kwargs or {}) + task_ids_filter = gym_kwargs.pop("task_ids", None) # optional: limit to specific tasks + # Avoid circular import/type issues: assume LiberoEnv is defined in this module + try: + LiberoEnv # type: ignore[name-defined] + except NameError: + # If LiberoEnv is in the same file, this won't run. If it's elsewhere, import here. + exit() + # from .libero_env import LiberoEnv # adjust if your class lives in another module + camera_names = _parse_camera_names(camera_name) + suite_names = [s.strip() for s in str(task).split(",") if s.strip()] + if not suite_names: + raise ValueError("`task` must contain at least one LIBERO suite name.") + + logger.info( + "Creating LIBERO envs | suites=%s | n_envs(per task)=%d | init_states=%s | multitask_eval=%s", + suite_names, n_envs, init_states, bool(multitask_eval) + ) + if task_ids_filter is not None: + logger.info("Restricting to task_ids=%s", task_ids_filter) + + out: Dict[str, Dict[int, Any]] = defaultdict(dict) + + for suite_name in suite_names: + suite = _get_suite(suite_name) + total = len(suite.tasks) + selected = _select_task_ids(total, task_ids_filter) + + if not selected: + raise ValueError(f"No tasks selected for suite '{suite_name}' (available: {total}).") + + for tid in selected: + fns = _make_env_fns( + suite=suite, + suite_name=suite_name, + task_id=tid, + n_envs=n_envs, + camera_names=camera_names, + init_states=init_states, + gym_kwargs=gym_kwargs, + LiberoEnv=LiberoEnv, + ) + out[suite_name][tid] = env_cls(fns) + logger.debug("Built vec env | suite=%s | task_id=%d | n_envs=%d", suite_name, tid, n_envs) + + # return plain dicts for predictability + return {suite: dict(task_map) for suite, task_map in out.items()} def quat2axisangle(quat): """ Copied from robosuite: https://github.com/ARISE-Initiative/robosuite/blob/eafb81f54ffc104f905ee48a16bb15f059176ad3/robosuite/utils/transform_utils.py#L490C1-L512C55 @@ -199,17 +254,15 @@ class LiberoEnv(gym.Env): self.episode_index = episode_index self._env = self._make_envs_task(task_suite, self.task_id) - if task_suite_name == "libero_spatial": - max_steps = 220 # longest training demo has 193 steps - elif task_suite_name == "libero_object": - max_steps = 280 # longest training demo has 254 steps - elif task_suite_name == "libero_goal": - max_steps = 300 # longest training demo has 270 steps - elif task_suite_name == "libero_10": - max_steps = 520 # longest training demo has 505 steps - elif task_suite_name == "libero_90": - max_steps = 400 # longest training demo has 373 steps - self._max_episode_steps = max_steps + TASK_SUITE_MAX_STEPS: dict[str, int] = { + "libero_spatial": 220, # longest training demo has 193 steps + "libero_object": 280, # longest training demo has 254 steps + "libero_goal": 300, # longest training demo has 270 steps + "libero_10": 520, # longest training demo has 505 steps + "libero_90": 400, # longest training demo has 373 steps + } + default_steps = 500 + self._max_episode_steps = TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps) images = {} for cam in self.camera_name: @@ -221,7 +274,11 @@ class LiberoEnv(gym.Env): ) if self.obs_type == "state": - raise NotImplementedError() + raise NotImplementedError( + "The 'state' observation type is not supported in LiberoEnv. " + "Please switch to an image-based obs_type (e.g. 'pixels', 'pixels_agent_pos')." + ) + elif self.obs_type == "pixels": self.observation_space = spaces.Dict( { @@ -285,7 +342,10 @@ class LiberoEnv(gym.Env): ) agent_pos = state if self.obs_type == "state": - raise NotImplementedError() + raise NotImplementedError( + "The 'state' observation type is not supported in LiberoEnv. " + "Please switch to an image-based obs_type (e.g. 'pixels', 'pixels_agent_pos')." + ) elif self.obs_type == "pixels": obs = {"pixels": images.copy()} elif self.obs_type == "pixels_agent_pos": @@ -308,7 +368,11 @@ class LiberoEnv(gym.Env): return observation, info def step(self, action): - assert action.ndim == 1 + if action.ndim != 1: + raise ValueError( + f"Expected action to be 1-D (shape (action_dim,)), " + f"but got shape {action.shape} with ndim={action.ndim}" + ) raw_obs, reward, done, info = self._env.step(action) is_success = self._env.check_success() @@ -324,3 +388,96 @@ class LiberoEnv(gym.Env): def close(self): self._env.close() + + +def create_libero_envs1( + task: str, + n_envs: int, + gym_kwargs: dict[str, Any] = None, + camera_name: str = "agentview_image,robot0_eye_in_hand_image", + init_states: bool = True, + env_cls: Callable = None, + multitask_eval: bool = True, +) -> dict[str, dict[str, Any]]: + """ + Here n_envs is per task and equal to the number of rollouts. + Returns: + dict[str, dict[str, list[LiberoEnv]]]: keys are task_suite and values are list of LiberoEnv envs. + """ + print("num envs", n_envs) + print("multitask_eval", multitask_eval) + print("gym_kwargs", gym_kwargs) + if gym_kwargs is None: + gym_kwargs = {} + + if not multitask_eval: + benchmark_dict = benchmark.get_benchmark_dict() + task_suite = benchmark_dict[task]() # can also choose libero_spatial, libero_object, libero_10 etc. + tasks_id = list(range(len(task_suite.tasks))) + episode_indices = [0 for i in range(len(tasks_id))] + if len(tasks_id) == 1: + tasks_id = [tasks_id[0] for _ in range(n_envs)] + episode_indices = list(range(n_envs)) + elif len(tasks_id) < n_envs and n_envs % len(tasks_id) == 0: + n_repeat = n_envs // len(tasks_id) + print("n_repeat", n_repeat) + episode_indices = [] + for _ in range(len(tasks_id)): + episode_indices.extend(list(range(n_repeat))) + tasks_id = list(chain.from_iterable([[item] * n_repeat for item in tasks_id])) + elif n_envs < len(tasks_id): + tasks_id = tasks_id[:n_envs] + episode_indices = list(range(n_envs))[:n_envs] + print(f"WARNING: n_envs < len(tasks_id), evaluating only on {tasks_id}") + print(f"Creating Libero envs with task ids {tasks_id} from suite {task}") + assert n_envs == len(tasks_id), ( + f"len(n_envs) and tasks_id should be the same, got {n_envs} and {len(tasks_id)}" + ) + return env_cls( + [ + lambda i=i: LiberoEnv( + task_suite=task_suite, + task_id=tasks_id[i], + task_suite_name=task, + camera_name=camera_name, + init_states=init_states, + episode_index=episode_indices[i], + **gym_kwargs, + ) + for i in range(n_envs) + ] + ) + else: + envs = defaultdict(dict) + benchmark_dict = benchmark.get_benchmark_dict() + task = task.split(",") + for _task in task: + task_suite = benchmark_dict[ + _task + ]() # can also choose libero_spatial, libero_object, libero_10 etc. + tasks_ids = list(range(len(task_suite.tasks))) + for tasks_id in tasks_ids: + episode_indices = list(range(n_envs)) + print( + f"Creating Libero envs with task ids {tasks_id} from suite {_task}, episode_indices: {episode_indices}" + ) + envs_list = [ + ( + lambda i=i, + task_suite=task_suite, + tasks_id=tasks_id, + _task=_task, + episode_indices=episode_indices: LiberoEnv( + task_suite=task_suite, + task_id=tasks_id, + task_suite_name=_task, + camera_name=camera_name, + init_states=init_states, + episode_index=episode_indices[i], + **gym_kwargs, + ) + ) + for i in range(n_envs) + ] + envs[_task][tasks_id] = env_cls(envs_list) + return envs diff --git a/src/lerobot/envs/utils.py b/src/lerobot/envs/utils.py index 5ae252dbe..9490f670e 100644 --- a/src/lerobot/envs/utils.py +++ b/src/lerobot/envs/utils.py @@ -182,3 +182,47 @@ def add_envs_task(env: gym.vector.VectorEnv, observation: dict[str, Any]) -> dic num_envs = observation[list(observation.keys())[0]].shape[0] observation["task"] = ["" for _ in range(num_envs)] return observation + +def _close_single_env(env: Any) -> None: + """Try to close a single env object if it exposes .close().""" + try: + close_fn = getattr(env, "close", None) + if callable(close_fn): + close_fn() + except Exception as exc: + # Best-effort close: log but don't raise + LOG.debug("Exception while closing env %s: %s", env, exc) + +def close_envs(env_or_collection: Any) -> None: + """ + Close a single env or any nested structure of envs. + + Accepts: + - a single env with .close() + - a Mapping of things (e.g. dict) + - a Sequence of things (list/tuple) but NOT str/bytes + - nested combinations of the above + + This is intentionally permissive and best-effort: it will swallow exceptions + encountered while closing individual envs and continue. + """ + # Guard: single object with close() + if hasattr(env_or_collection, "close") and not isinstance(env_or_collection, (Mapping, Sequence)): + _close_single_env(env_or_collection) + return + + # Mapping (e.g., {suite: {task_id: vec_env}}) + if isinstance(env_or_collection, Mapping): + for v in env_or_collection.values(): + close_envs(v) + return + + # Sequence (list/tuple) but skip str/bytes + if isinstance(env_or_collection, Sequence) and not isinstance(env_or_collection, (str, bytes)): + for v in env_or_collection: + close_envs(v) + return + + # Fallback: try to close if possible + if hasattr(env_or_collection, "close"): + _close_single_env(env_or_collection) \ No newline at end of file diff --git a/src/lerobot/policies/factory.py b/src/lerobot/policies/factory.py index b7d92d988..03fa44a2a 100644 --- a/src/lerobot/policies/factory.py +++ b/src/lerobot/policies/factory.py @@ -177,6 +177,5 @@ def make_policy( policy = policy_cls(**kwargs) policy.to(cfg.device) assert isinstance(policy, nn.Module) - breakpoint() # policy = torch.compile(policy, mode="reduce-overhead") return policy diff --git a/src/lerobot/scripts/eval.py b/src/lerobot/scripts/eval.py index 6fbae645b..795ed2b3c 100644 --- a/src/lerobot/scripts/eval.py +++ b/src/lerobot/scripts/eval.py @@ -45,7 +45,6 @@ Note that in both examples, the repo/folder should contain at least `config.json You can learn about the CLI options for this script in the `EvalPipelineConfig` in lerobot/configs/eval.py """ - import concurrent import json import logging @@ -80,6 +79,9 @@ from lerobot.utils.utils import ( init_logging, inside_slurm, ) +from typing import TypedDict, Dict, List, Tuple, Iterator +from collections import defaultdict +import concurrent.futures as cf def rollout( @@ -537,8 +539,8 @@ def eval_main(cfg: EvalPipelineConfig): logging.info(colored("Output dir:", "yellow", attrs=["bold"]) + f" {cfg.output_dir}") logging.info("Making environment.") - env = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs) - + envs = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs) + breakpoint() logging.info("Making policy.") policy = make_policy( cfg=cfg.policy, @@ -550,41 +552,29 @@ def eval_main(cfg: EvalPipelineConfig): policy.eval() with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(): - if cfg.env.multitask_eval: - info = eval_policy_multitask( - env, - policy, - cfg.eval.n_episodes, - max_episodes_rendered=10, - videos_dir=Path(cfg.output_dir) / "videos", - start_seed=cfg.seed, - max_parallel_tasks=cfg.env.max_parallel_tasks, - verbose=False, - ) - print("Overall Aggregated Metrics:") - print(info["overall"]["aggregated"]) - - # Print per-suite stats - for task_group, task_group_info in info.items(): - if task_group == "overall": - continue # Skip the overall stats since we already printed it - print(f"\nAggregated Metrics for {task_group}:") - print(task_group_info["aggregated"]) - for _task_group, v in env.items(): - for _env in v.values(): - _env.close() - else: - info = eval_policy( - env, - policy, - cfg.eval.n_episodes, - max_episodes_rendered=10, - videos_dir=Path(cfg.output_dir) / "videos", - start_seed=cfg.seed, - ) - print(info["aggregated"]) - env.close() + info = eval_policy_all( + envs, + policy, + cfg.eval.n_episodes, + max_episodes_rendered=10, + videos_dir=Path(cfg.output_dir) / "videos", + start_seed=cfg.seed, + max_parallel_tasks=cfg.env.max_parallel_tasks, + verbose=False, + ) + print("Overall Aggregated Metrics:") + print(info["overall"]["aggregated"]) + # Print per-suite stats + for task_group, task_group_info in info.items(): + if task_group == "overall": + continue # Skip the overall stats since we already printed it + print(f"\nAggregated Metrics for {task_group}:") + print(task_group_info["aggregated"]) + # Close all vec envs + for _suite, task_map in envs.items(): + for _vec in task_map.values(): + _vec.close() # Save info with open(Path(cfg.output_dir) / "eval_info.json", "w") as f: json.dump(info, f, indent=2) @@ -592,9 +582,19 @@ def eval_main(cfg: EvalPipelineConfig): logging.info("End of eval") -def eval_policy_multitask( - envs: dict[str, dict[str, gym.vector.VectorEnv]], - policy, +# ---- typed payload returned by one task eval ---- +class TaskMetrics(TypedDict): + sum_rewards: List[float] + max_rewards: List[float] + successes: List[bool] + video_paths: List[str] + +ACC_KEYS = ("sum_rewards", "max_rewards", "successes", "video_paths") + + +def eval_policy_all( + envs: dict[str, dict[int, gym.vector.VectorEnv]], + policy: PreTrainedPolicy, n_episodes: int, max_episodes_rendered: int = 0, videos_dir: Path | None = None, @@ -603,129 +603,136 @@ def eval_policy_multitask( max_parallel_tasks: int = 5, verbose: bool = True, ) -> dict: + """ + Evaluate a policy over a dict-of-dicts of vectorized envs: + envs[suite_name][task_id] -> gym.vector.VectorEnv + Returns a dict with per-suite aggregates and an 'overall' block. + """ global_start = time.time() - results = {} - overall_rewards, overall_max_rewards, overall_successes = [], [], [] - overall_video_paths = [] - overall_episode_data = None + # inner: evaluate a single (suite, task) + def eval_one( + task_group: str, + task_id: int, + env: gym.vector.VectorEnv, + *, + policy: PreTrainedPolicy, + n_episodes: int, + max_episodes_rendered: int, + videos_dir: Path | None, + return_episode_data: bool, + start_seed: int | None, + ) -> TaskMetrics: + """Evaluates one task_id of one suite using the provided vec env.""" + if verbose: + print(f"Evaluating: task_group={task_group}, task_id={task_id} ...") - def eval_task(task_group, task_id, env): - """Evaluates a single task in parallel.""" - print(f"Evaluating: task_group: {task_group}, task_id: {task_id} ...") + task_videos_dir = None if videos_dir is not None: task_videos_dir = videos_dir / f"{task_group}_{task_id}" task_videos_dir.mkdir(parents=True, exist_ok=True) + task_result = eval_policy( - env, - policy, - n_episodes, - max_episodes_rendered, - task_videos_dir, - return_episode_data, - start_seed, + env=env, + policy=policy, + n_episodes=n_episodes, + max_episodes_rendered=max_episodes_rendered, + videos_dir=task_videos_dir, + return_episode_data=return_episode_data, + start_seed=start_seed, ) per_episode = task_result["per_episode"] - return { - "task_group": task_group, - "task_id": task_id, - "sum_rewards": [ep["sum_reward"] for ep in per_episode], - "max_rewards": [ep["max_reward"] for ep in per_episode], - "successes": [ep["success"] for ep in per_episode], - "video_paths": task_result.get("video_paths", []), - } + return TaskMetrics( + sum_rewards=[ep["sum_reward"] for ep in per_episode], + max_rewards=[ep["max_reward"] for ep in per_episode], + successes=[ep["success"] for ep in per_episode], + video_paths=task_result.get("video_paths", []), + ) - task_group_results = {} - if max_parallel_tasks == 1: - # sequential mode (safe for colab / EGL) - for task_group, tasks in envs.items(): - for task_id, env in tasks.items(): - task_result = eval_task(task_group, task_id, env) - if task_group not in task_group_results: - task_group_results[task_group] = { - "sum_rewards": [], - "max_rewards": [], - "successes": [], - "video_paths": [], - } - task_group_results[task_group]["sum_rewards"].extend(task_result["sum_rewards"]) - task_group_results[task_group]["max_rewards"].extend(task_result["max_rewards"]) - task_group_results[task_group]["successes"].extend(task_result["successes"]) - task_group_results[task_group]["video_paths"].extend(task_result["video_paths"]) - else: - with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor: - future_to_task = { - executor.submit(eval_task, task_group, task_id, env): (task_group, task_id) - for task_group, tasks in envs.items() - for task_id, env in tasks.items() - } + # result producer: sequential or threaded, same consumer + def iter_task_results() -> Iterator[Tuple[str, int, TaskMetrics]]: + if max_parallel_tasks == 1: + for task_group, tasks in envs.items(): + for task_id, vec in tasks.items(): + yield task_group, task_id, eval_one( + task_group, task_id, vec, + policy=policy, + n_episodes=n_episodes, + max_episodes_rendered=max_episodes_rendered, + videos_dir=videos_dir, + return_episode_data=return_episode_data, + start_seed=start_seed, + ) + else: + with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor: + fut2key: Dict[cf.Future, Tuple[str, int]] = {} + for task_group, tasks in envs.items(): + for task_id, vec in tasks.items(): + fut = executor.submit( + eval_one, task_group, task_id, vec, + policy=policy, + n_episodes=n_episodes, + max_episodes_rendered=max_episodes_rendered, + videos_dir=videos_dir, + return_episode_data=return_episode_data, + start_seed=start_seed, + ) + fut2key[fut] = (task_group, task_id) + for fut in cf.as_completed(fut2key): + task_group, task_id = fut2key[fut] + yield task_group, task_id, fut.result() - task_group_results = {} + # single accumulator path on the main thread + group_acc: Dict[str, Dict[str, List]] = defaultdict(lambda: {k: [] for k in ACC_KEYS}) + overall: Dict[str, List] = {k: [] for k in ACC_KEYS} - for future in concurrent.futures.as_completed(future_to_task): - task_result = future.result() - task_group = task_result["task_group"] + for task_group, task_id, metrics in iter_task_results(): + acc = group_acc[task_group] + for k in ACC_KEYS: + acc[k].extend(metrics[k]) + overall[k].extend(metrics[k]) - if task_group not in task_group_results: - task_group_results[task_group] = { - "sum_rewards": [], - "max_rewards": [], - "successes": [], - "video_paths": [], - } - - task_group_results[task_group]["sum_rewards"].extend(task_result["sum_rewards"]) - task_group_results[task_group]["max_rewards"].extend(task_result["max_rewards"]) - task_group_results[task_group]["successes"].extend(task_result["successes"]) - task_group_results[task_group]["video_paths"].extend(task_result["video_paths"]) - - # Process results per task group - for task_group, data in task_group_results.items(): + # build outputs + results: Dict[str, dict] = {} + for task_group, data in group_acc.items(): suite_rewards = data["sum_rewards"] - suite_max_rewards = data["max_rewards"] - suite_successes = data["successes"] - suite_video_paths = data["video_paths"] + suite_max = data["max_rewards"] + suite_succ = data["successes"] + suite_vids = data["video_paths"] suite_eval_s = time.time() - global_start suite_eval_ep_s = suite_eval_s / max(1, len(suite_rewards)) results[task_group] = { "aggregated": { - "avg_sum_reward": float(np.nanmean(suite_rewards)), - "avg_max_reward": float(np.nanmean(suite_max_rewards)), - "pc_success": float(np.nanmean(suite_successes) * 100), + "avg_sum_reward": float(np.nanmean(suite_rewards)) if suite_rewards else float("nan"), + "avg_max_reward": float(np.nanmean(suite_max)) if suite_max else float("nan"), + "pc_success": float(np.nanmean(suite_succ) * 100) if suite_succ else float("nan"), "eval_s": suite_eval_s, "eval_ep_s": suite_eval_ep_s, }, - "video_paths": suite_video_paths, - "episodes": None, # Modify if episode data is needed + "video_paths": suite_vids, + "episodes": None, } - overall_rewards.extend(suite_rewards) - overall_max_rewards.extend(suite_max_rewards) - overall_successes.extend(suite_successes) - overall_video_paths.extend(suite_video_paths) - - # Global metrics global_eval_s = time.time() - global_start - global_eval_ep_s = global_eval_s / max(1, len(overall_rewards)) - + global_eval_ep_s = global_eval_s / max(1, len(overall["sum_rewards"])) results["overall"] = { "aggregated": { - "avg_sum_reward": float(np.nanmean(overall_rewards)), - "avg_max_reward": float(np.nanmean(overall_max_rewards)), - "pc_success": float(np.nanmean(overall_successes) * 100), + "avg_sum_reward": float(np.nanmean(overall["sum_rewards"])) if overall["sum_rewards"] else float("nan"), + "avg_max_reward": float(np.nanmean(overall["max_rewards"])) if overall["max_rewards"] else float("nan"), + "pc_success": float(np.nanmean(overall["successes"]) * 100) if overall["successes"] else float("nan"), "eval_s": global_eval_s, "eval_ep_s": global_eval_ep_s, }, - "video_paths": overall_video_paths, - "episodes": overall_episode_data, + "video_paths": overall["video_paths"], + "episodes": None, } - return results + if __name__ == "__main__": init_logging() eval_main() diff --git a/src/lerobot/scripts/train.py b/src/lerobot/scripts/train.py index 74219fc38..3feeb0512 100644 --- a/src/lerobot/scripts/train.py +++ b/src/lerobot/scripts/train.py @@ -30,11 +30,12 @@ from lerobot.datasets.factory import make_dataset from lerobot.datasets.sampler import EpisodeAwareSampler from lerobot.datasets.utils import cycle from lerobot.envs.factory import make_env +from lerobot.envs.utils import close_envs from lerobot.optim.factory import make_optimizer_and_scheduler from lerobot.policies.factory import make_policy from lerobot.policies.pretrained import PreTrainedPolicy from lerobot.policies.utils import get_device_from_parameters -from lerobot.scripts.eval import eval_policy, eval_policy_multitask +from lerobot.scripts.eval import eval_policy_all from lerobot.utils.logging_utils import AverageMeter, MetricsTracker from lerobot.utils.random_utils import set_seed from lerobot.utils.train_utils import ( @@ -270,62 +271,46 @@ def train(cfg: TrainPipelineConfig): if cfg.env and is_eval_step: step_id = get_step_identifier(step, cfg.steps) logging.info(f"Eval policy at step {step}") - with ( - torch.no_grad(), - torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(), - ): - if cfg.env.multitask_eval: - eval_info = eval_policy_multitask( - eval_env, - policy, - cfg.eval.n_episodes, - videos_dir=cfg.output_dir / "eval" / f"videos_step_{step_id}", - max_episodes_rendered=4, - start_seed=cfg.seed, - max_parallel_tasks=cfg.env.max_parallel_tasks, - ) - aggregated = eval_info["overall"]["aggregated"] - # Print per-suite stats, log? - for task_group, task_group_info in eval_info.items(): - if task_group == "overall": - continue # Skip the overall stats since we already printed it - print(f"\nAggregated Metrics for {task_group}:") - print(task_group_info["aggregated"]) - else: - eval_info = eval_policy( - eval_env, - policy, - cfg.eval.n_episodes, - videos_dir=cfg.output_dir / "eval" / f"videos_step_{step_id}", - max_episodes_rendered=4, - start_seed=cfg.seed, - ) - aggregated = eval_info["aggregated"] + with torch.no_grad(), (torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext()): + eval_info = eval_policy_all( + eval_env, # dict[suite][task_id] -> vec_env + policy, + cfg.eval.n_episodes, + videos_dir=videos_dir, + max_episodes_rendered=4, + start_seed=cfg.seed, + max_parallel_tasks=cfg.env.max_parallel_tasks, + verbose=False, + ) + # overall metrics (suite-agnostic) + aggregated = eval_info["overall"]["aggregated"] + + # optional: per-suite logging + for suite, suite_info in eval_info.items(): + if suite == "overall": + continue + logging.info("Suite %s aggregated: %s", suite, suite_info["aggregated"]) + + # meters/tracker eval_metrics = { "avg_sum_reward": AverageMeter("∑rwrd", ":.3f"), - "pc_success": AverageMeter("success", ":.1f"), - "eval_s": AverageMeter("eval_s", ":.3f"), + "pc_success": AverageMeter("success", ":.1f"), + "eval_s": AverageMeter("eval_s", ":.3f"), } eval_tracker = MetricsTracker( cfg.batch_size, dataset.num_frames, dataset.num_episodes, eval_metrics, initial_step=step ) - eval_tracker.eval_s = aggregated.pop("eval_s") - eval_tracker.avg_sum_reward = aggregated.pop("avg_sum_reward") - eval_tracker.pc_success = aggregated.pop("pc_success") - logging.info(eval_tracker) + eval_tracker.eval_s = aggregated.get("eval_s", 0.0) + eval_tracker.avg_sum_reward = aggregated.get("avg_sum_reward", float("nan")) + eval_tracker.pc_success = aggregated.get("pc_success", float("nan")) if wandb_logger: wandb_log_dict = {**eval_tracker.to_dict(), **eval_info} wandb_logger.log_dict(wandb_log_dict, step, mode="eval") wandb_logger.log_video(eval_info["video_paths"][0], step, mode="eval") if eval_env: - if cfg.env.multitask_eval: - for _task_group, envs_dict in eval_env.items(): - for _idx, env in envs_dict.items(): - env.close() - else: - eval_env.close() + close_envs(eval_env) logging.info("End of training") if cfg.policy.push_to_hub: