This commit is contained in:
Pepijn
2025-09-11 17:49:36 +02:00
8 changed files with 585 additions and 297 deletions
+58
View File
@@ -0,0 +1,58 @@
#!/bin/bash
# storage / caches
RAID=/raid/jade
export TRANSFORMERS_CACHE=$RAID/.cache/huggingface/transformers
export HF_HOME=$RAID/.cache/huggingface
export HF_DATASETS_CACHE=$RAID/.cache/huggingface/datasets
export HF_LEROBOT_HOME=$RAID/.cache/huggingface/lerobot
export WANDB_CACHE_DIR=$RAID/.cache/wandb
export TMPDIR=$RAID/.cache/tmp
mkdir -p $TMPDIR
export WANDB_MODE=offline
export HF_DATASETS_OFFLINE=1
export HF_HUB_OFFLINE=1
export TOKENIZERS_PARALLELISM=false
export MUJOCO_GL=egl
export CUDA_VISIBLE_DEVICES=2
# CONFIGURATION
POLICY_PATH="/raid/jade/logs/lerobot/lerobot_2_HuggingFaceVLA_libero_smolvla_lr1e-4bs32steps100000/checkpoints/100000/pretrained_model"
POLICY_PATH="/raid/jade/models/smolvlamust"
TASK=libero_spatial,libero_object
ENV_TYPE="libero"
BATCH_SIZE=1
N_EPISODES=1
# storage / caches
RAID=/raid/jade
N_ACTION_STEPS=1
export TRANSFORMERS_CACHE=$RAID/.cache/huggingface/transformers
export HF_HOME=$RAID/.cache/huggingface
export HF_DATASETS_CACHE=$RAID/.cache/huggingface/datasets
export HF_LEROBOT_HOME=$RAID/.cache/huggingface/lerobot
export WANDB_CACHE_DIR=$RAID/.cache/wandb
export TMPDIR=$RAID/.cache/tmp
mkdir -p $TMPDIR
export WANDB_MODE=offline
# export HF_DATASETS_OFFLINE=1
# export HF_HUB_OFFLINE=1
export TOKENIZERS_PARALLELISM=false
export MUJOCO_GL=egl
export MUJOCO_GL=egl
unset HF_HUB_OFFLINE
# RUN EVALUATION
python src/lerobot/scripts/eval.py \
--policy.path="$POLICY_PATH" \
--env.type="$ENV_TYPE" \
--eval.batch_size="$BATCH_SIZE" \
--eval.n_episodes="$N_EPISODES" \
--env.multitask_eval=True \
--env.task=$TASK \
# python examples/evaluate_libero.py \
# --policy_path "$POLICY_PATH" \
# --task_suite_name "$TASK" \
# --num_steps_wait 10 \
# --num_trials_per_task 10 \
# --video_out_path "data/libero/videos" \
# --device "cuda" \
# --seed 7
-2
View File
@@ -320,8 +320,6 @@ class LiberoEnv(EnvConfig):
@property
def gym_kwargs(self) -> dict:
return {
# "task": self.task,
"obs_type": self.obs_type,
"render_mode": self.render_mode,
# "max_episode_steps": self.episode_length,
}
+19 -17
View File
@@ -57,15 +57,14 @@ def make_env(
"""
if n_envs < 1:
raise ValueError("`n_envs must be at least 1")
raise ValueError("`n_envs` must be at least 1")
# batched version of the env that returns an observation of shape (b, c)
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
if "libero" in cfg.type:
from lerobot.envs.libero import create_libero_envs
env = create_libero_envs(
return create_libero_envs(
task=cfg.task,
n_envs=n_envs,
camera_name=cfg.camera_name,
@@ -74,19 +73,22 @@ def make_env(
env_cls=env_cls,
multitask_eval=cfg.multitask_eval,
)
else:
package_name = f"gym_{cfg.type}"
try:
importlib.import_module(package_name)
except ModuleNotFoundError as e:
print(
f"{package_name} is not installed. Please install it with `pip install 'lerobot[{cfg.type}]'`"
)
raise e
gym_handle = f"{package_name}/{cfg.task}"
env = env_cls(
[lambda: gym.make(gym_handle, disable_env_checker=True, **cfg.gym_kwargs) for _ in range(n_envs)]
)
package_name = f"gym_{cfg.type}"
try:
importlib.import_module(package_name)
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
f'{package_name} is not installed. Install with: pip install "lerobot[{cfg.type}]"'
) from e
return env
gym_handle = f"{package_name}/{cfg.task}"
def _make_one():
return gym.make(gym_handle, disable_env_checker=True, **(cfg.gym_kwargs or {}))
vec = env_cls([_make_one for _ in range(n_envs)])
# normalize to {suite: {task_id: vec_env}} for consistency
suite_name = cfg.type # e.g., "pusht", "aloha"
return {suite_name: {0: vec}}
+279 -107
View File
@@ -1,7 +1,10 @@
from __future__ import annotations
import logging
import math
import os
from collections import defaultdict
from collections.abc import Callable
from collections.abc import Callable, Iterable, Mapping, Sequence
from itertools import chain
from typing import Any
@@ -12,98 +15,164 @@ from gymnasium import spaces
from libero.libero import benchmark, get_libero_path
from libero.libero.envs import OffScreenRenderEnv
logger = logging.getLogger(__name__)
# ---- Helpers -----------------------------------------------------------------
def _parse_camera_names(camera_name: str | Sequence[str]) -> list[str]:
"""Normalize camera_name into a non-empty list of strings."""
if isinstance(camera_name, str):
cams = [c.strip() for c in camera_name.split(",") if c.strip()]
elif isinstance(camera_name, (list, tuple)):
cams = [str(c).strip() for c in camera_name if str(c).strip()]
else:
raise TypeError(f"camera_name must be str or sequence[str], got {type(camera_name).__name__}")
if not cams:
raise ValueError("camera_name resolved to an empty list.")
return cams
def _get_suite(name: str):
"""Instantiate a LIBERO suite by name with clear validation."""
bench = benchmark.get_benchmark_dict()
if name not in bench:
raise ValueError(f"Unknown LIBERO suite '{name}'. Available: {', '.join(sorted(bench.keys()))}")
suite = bench[name]()
if not getattr(suite, "tasks", None):
raise ValueError(f"Suite '{name}' has no tasks.")
return suite
def _select_task_ids(total_tasks: int, task_ids: Iterable[int] | None) -> list[int]:
"""Validate/normalize task ids. If None → all tasks."""
if task_ids is None:
return list(range(total_tasks))
ids = sorted({int(t) for t in task_ids})
for t in ids:
if t < 0 or t >= total_tasks:
raise ValueError(f"task_id {t} out of range [0, {total_tasks - 1}].")
return ids
def _make_env_fns(
*,
suite,
suite_name: str,
task_id: int,
n_envs: int,
camera_names: list[str],
init_states: bool,
gym_kwargs: Mapping[str, Any],
LiberoEnv: type, # injected to avoid forward ref issues if needed
) -> list[Callable[[], LiberoEnv]]:
"""Build n_envs factory callables for a single (suite, task_id)."""
joined_cams = ",".join(camera_names) # keep backward-compat: downstream expects a string
fns: list[Callable[[], LiberoEnv]] = []
for i in range(n_envs):
def _mk(
i=i,
suite=suite,
task_id=task_id,
suite_name=suite_name,
joined_cams=joined_cams,
init_states=init_states,
gym_kwargs=dict(gym_kwargs),
):
return LiberoEnv(
task_suite=suite,
task_id=task_id,
task_suite_name=suite_name,
camera_name=joined_cams,
init_states=init_states,
episode_index=i,
**gym_kwargs,
)
fns.append(_mk)
return fns
# ---- Main API ----------------------------------------------------------------
def create_libero_envs(
task: str,
n_envs: int,
gym_kwargs: dict[str, Any] = None,
camera_name: str = "agentview_image,robot0_eye_in_hand_image",
gym_kwargs: dict[str, Any] | None = None,
camera_name: str | Sequence[str] = "agentview_image,robot0_eye_in_hand_image",
init_states: bool = True,
env_cls: Callable = None,
multitask_eval: bool = True,
) -> dict[str, dict[str, Any]]:
env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None,
multitask_eval: bool = True, # kept for signature compatibility; return type is consistent regardless
) -> dict[str, dict[int, Any]]:
"""
Here n_envs is per task and equal to the number of rollouts.
Returns:
dict[str, dict[str, list[LiberoEnv]]]: keys are task_suite and values are list of LiberoEnv envs.
"""
print("num envs", n_envs)
print("multitask_eval", multitask_eval)
print("gym_kwargs", gym_kwargs)
if gym_kwargs is None:
gym_kwargs = {}
Create vectorized LIBERO environments with a consistent return shape.
if not multitask_eval:
benchmark_dict = benchmark.get_benchmark_dict()
task_suite = benchmark_dict[task]() # can also choose libero_spatial, libero_object, libero_10 etc.
tasks_id = list(range(len(task_suite.tasks)))
episode_indices = [0 for i in range(len(tasks_id))]
if len(tasks_id) == 1:
tasks_id = [tasks_id[0] for _ in range(n_envs)]
episode_indices = list(range(n_envs))
elif len(tasks_id) < n_envs and n_envs % len(tasks_id) == 0:
n_repeat = n_envs // len(tasks_id)
print("n_repeat", n_repeat)
episode_indices = []
for _ in range(len(tasks_id)):
episode_indices.extend(list(range(n_repeat)))
tasks_id = list(chain.from_iterable([[item] * n_repeat for item in tasks_id]))
elif n_envs < len(tasks_id):
tasks_id = tasks_id[:n_envs]
episode_indices = list(range(n_envs))[:n_envs]
print(f"WARNING: n_envs < len(tasks_id), evaluating only on {tasks_id}")
print(f"Creating Libero envs with task ids {tasks_id} from suite {task}")
assert n_envs == len(tasks_id), (
f"len(n_envs) and tasks_id should be the same, got {n_envs} and {len(tasks_id)}"
)
return env_cls(
[
lambda i=i: LiberoEnv(
task_suite=task_suite,
task_id=tasks_id[i],
task_suite_name=task,
camera_name=camera_name,
init_states=init_states,
episode_index=episode_indices[i],
**gym_kwargs,
)
for i in range(n_envs)
]
)
else:
envs = defaultdict(dict)
benchmark_dict = benchmark.get_benchmark_dict()
task = task.split(",")
for _task in task:
task_suite = benchmark_dict[
_task
]() # can also choose libero_spatial, libero_object, libero_10 etc.
tasks_ids = list(range(len(task_suite.tasks)))
for tasks_id in tasks_ids:
episode_indices = list(range(n_envs))
print(
f"Creating Libero envs with task ids {tasks_id} from suite {_task}, episode_indices: {episode_indices}"
)
envs_list = [
(
lambda i=i,
task_suite=task_suite,
tasks_id=tasks_id,
_task=_task,
episode_indices=episode_indices: LiberoEnv(
task_suite=task_suite,
task_id=tasks_id,
task_suite_name=_task,
camera_name=camera_name,
init_states=init_states,
episode_index=episode_indices[i],
**gym_kwargs,
)
)
for i in range(n_envs)
]
envs[_task][tasks_id] = env_cls(envs_list)
return envs
Returns:
dict[suite_name][task_id] -> vec_env (env_cls([...]) with exactly n_envs factories)
Notes:
- n_envs is the number of rollouts *per task* (episode_index = 0..n_envs-1).
- `task` can be a single suite or a comma-separated list of suites.
- You may pass `task_ids` (list[int]) inside `gym_kwargs` to restrict tasks per suite.
"""
if env_cls is None or not callable(env_cls):
raise ValueError("env_cls must be a callable that wraps a list of environment factory callables.")
if not isinstance(n_envs, int) or n_envs <= 0:
raise ValueError(f"n_envs must be a positive int; got {n_envs}.")
gym_kwargs = dict(gym_kwargs or {})
task_ids_filter = gym_kwargs.pop("task_ids", None) # optional: limit to specific tasks
# Avoid circular import/type issues: assume LiberoEnv is defined in this module
try:
LiberoEnv # type: ignore[name-defined]
except NameError:
# If LiberoEnv is in the same file, this won't run. If it's elsewhere, import here.
exit()
# from .libero_env import LiberoEnv # adjust if your class lives in another module
camera_names = _parse_camera_names(camera_name)
suite_names = [s.strip() for s in str(task).split(",") if s.strip()]
if not suite_names:
raise ValueError("`task` must contain at least one LIBERO suite name.")
logger.info(
"Creating LIBERO envs | suites=%s | n_envs(per task)=%d | init_states=%s | multitask_eval=%s",
suite_names,
n_envs,
init_states,
bool(multitask_eval),
)
if task_ids_filter is not None:
logger.info("Restricting to task_ids=%s", task_ids_filter)
out: dict[str, dict[int, Any]] = defaultdict(dict)
for suite_name in suite_names:
suite = _get_suite(suite_name)
total = len(suite.tasks)
selected = _select_task_ids(total, task_ids_filter)
if not selected:
raise ValueError(f"No tasks selected for suite '{suite_name}' (available: {total}).")
for tid in selected:
fns = _make_env_fns(
suite=suite,
suite_name=suite_name,
task_id=tid,
n_envs=n_envs,
camera_names=camera_names,
init_states=init_states,
gym_kwargs=gym_kwargs,
LiberoEnv=LiberoEnv,
)
out[suite_name][tid] = env_cls(fns)
logger.debug("Built vec env | suite=%s | task_id=%d | n_envs=%d", suite_name, tid, n_envs)
# return plain dicts for predictability
return {suite: dict(task_map) for suite, task_map in out.items()}
def quat2axisangle(quat):
@@ -199,17 +268,15 @@ class LiberoEnv(gym.Env):
self.episode_index = episode_index
self._env = self._make_envs_task(task_suite, self.task_id)
if task_suite_name == "libero_spatial":
max_steps = 220 # longest training demo has 193 steps
elif task_suite_name == "libero_object":
max_steps = 280 # longest training demo has 254 steps
elif task_suite_name == "libero_goal":
max_steps = 300 # longest training demo has 270 steps
elif task_suite_name == "libero_10":
max_steps = 520 # longest training demo has 505 steps
elif task_suite_name == "libero_90":
max_steps = 400 # longest training demo has 373 steps
self._max_episode_steps = max_steps
TASK_SUITE_MAX_STEPS: dict[str, int] = {
"libero_spatial": 220, # longest training demo has 193 steps
"libero_object": 280, # longest training demo has 254 steps
"libero_goal": 300, # longest training demo has 270 steps
"libero_10": 520, # longest training demo has 505 steps
"libero_90": 400, # longest training demo has 373 steps
}
default_steps = 500
self._max_episode_steps = TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps)
images = {}
for cam in self.camera_name:
@@ -221,7 +288,11 @@ class LiberoEnv(gym.Env):
)
if self.obs_type == "state":
raise NotImplementedError()
raise NotImplementedError(
"The 'state' observation type is not supported in LiberoEnv. "
"Please switch to an image-based obs_type (e.g. 'pixels', 'pixels_agent_pos')."
)
elif self.obs_type == "pixels":
self.observation_space = spaces.Dict(
{
@@ -245,9 +316,8 @@ class LiberoEnv(gym.Env):
def render(self):
raw_obs = self._env.env._get_observations()
formatted = self._format_raw_obs(raw_obs)
# grab the "main" camera
return formatted["pixels"]["image"]
image = self._format_raw_obs(raw_obs)["pixels"]["image"]
return image
def _make_envs_task(self, task_suite, task_id: int = 0):
task = task_suite.get_task(task_id)
@@ -277,7 +347,6 @@ class LiberoEnv(gym.Env):
image = raw_obs[camera_name]
image = image[::-1, ::-1] # rotate 180 degrees
images[self.camera_name_mapping[camera_name]] = image
# images = image if len(images) == 1 else images
state = np.concatenate(
(
raw_obs["robot0_eef_pos"],
@@ -287,7 +356,10 @@ class LiberoEnv(gym.Env):
)
agent_pos = state
if self.obs_type == "state":
raise NotImplementedError()
raise NotImplementedError(
"The 'state' observation type is not supported in LiberoEnv. "
"Please switch to an image-based obs_type (e.g. 'pixels', 'pixels_agent_pos')."
)
elif self.obs_type == "pixels":
obs = {"pixels": images.copy()}
elif self.obs_type == "pixels_agent_pos":
@@ -310,16 +382,116 @@ class LiberoEnv(gym.Env):
return observation, info
def step(self, action):
assert action.ndim == 1
action[-1] = 1.0 - action[-1]
if action.ndim != 1:
raise ValueError(
f"Expected action to be 1-D (shape (action_dim,)), "
f"but got shape {action.shape} with ndim={action.ndim}"
)
raw_obs, reward, done, info = self._env.step(action)
is_success = self._env.check_success()
terminated = done or is_success
info["is_success"] = is_success
info["is_success"] = done # is_success
observation = self._format_raw_obs(raw_obs)
if done:
self.reset()
print(self.task, self.task_id, done, is_success)
truncated = False
# note if it is unable to complete get libero error after many steps
return observation, reward, terminated, truncated, info
def close(self):
self._env.close()
def create_libero_envs1(
task: str,
n_envs: int,
gym_kwargs: dict[str, Any] = None,
camera_name: str = "agentview_image,robot0_eye_in_hand_image",
init_states: bool = True,
env_cls: Callable = None,
multitask_eval: bool = True,
) -> dict[str, dict[str, Any]]:
"""
Here n_envs is per task and equal to the number of rollouts.
Returns:
dict[str, dict[str, list[LiberoEnv]]]: keys are task_suite and values are list of LiberoEnv envs.
"""
print("num envs", n_envs)
print("multitask_eval", multitask_eval)
print("gym_kwargs", gym_kwargs)
if gym_kwargs is None:
gym_kwargs = {}
if not multitask_eval:
benchmark_dict = benchmark.get_benchmark_dict()
task_suite = benchmark_dict[task]() # can also choose libero_spatial, libero_object, libero_10 etc.
tasks_id = list(range(len(task_suite.tasks)))
episode_indices = [0 for i in range(len(tasks_id))]
if len(tasks_id) == 1:
tasks_id = [tasks_id[0] for _ in range(n_envs)]
episode_indices = list(range(n_envs))
elif len(tasks_id) < n_envs and n_envs % len(tasks_id) == 0:
n_repeat = n_envs // len(tasks_id)
print("n_repeat", n_repeat)
episode_indices = []
for _ in range(len(tasks_id)):
episode_indices.extend(list(range(n_repeat)))
tasks_id = list(chain.from_iterable([[item] * n_repeat for item in tasks_id]))
elif n_envs < len(tasks_id):
tasks_id = tasks_id[:n_envs]
episode_indices = list(range(n_envs))[:n_envs]
print(f"WARNING: n_envs < len(tasks_id), evaluating only on {tasks_id}")
print(f"Creating Libero envs with task ids {tasks_id} from suite {task}")
assert n_envs == len(tasks_id), (
f"len(n_envs) and tasks_id should be the same, got {n_envs} and {len(tasks_id)}"
)
return env_cls(
[
lambda i=i: LiberoEnv(
task_suite=task_suite,
task_id=tasks_id[i],
task_suite_name=task,
camera_name=camera_name,
init_states=init_states,
episode_index=episode_indices[i],
**gym_kwargs,
)
for i in range(n_envs)
]
)
else:
envs = defaultdict(dict)
benchmark_dict = benchmark.get_benchmark_dict()
task = task.split(",")
for _task in task:
task_suite = benchmark_dict[
_task
]() # can also choose libero_spatial, libero_object, libero_10 etc.
tasks_ids = list(range(len(task_suite.tasks)))
for tasks_id in tasks_ids:
episode_indices = list(range(n_envs))
print(
f"Creating Libero envs with task ids {tasks_id} from suite {_task}, episode_indices: {episode_indices}"
)
envs_list = [
(
lambda i=i,
task_suite=task_suite,
tasks_id=tasks_id,
_task=_task,
episode_indices=episode_indices: LiberoEnv(
task_suite=task_suite,
task_id=tasks_id,
task_suite_name=_task,
camera_name=camera_name,
init_states=init_states,
episode_index=episode_indices[i],
**gym_kwargs,
)
)
for i in range(n_envs)
]
envs[_task][tasks_id] = env_cls(envs_list)
return envs
+46
View File
@@ -134,3 +134,49 @@ def add_envs_task(env: gym.vector.VectorEnv, observation: dict[str, Any]) -> dic
num_envs = observation[list(observation.keys())[0]].shape[0]
observation["task"] = ["" for _ in range(num_envs)]
return observation
def _close_single_env(env: Any) -> None:
"""Try to close a single env object if it exposes .close()."""
try:
close_fn = getattr(env, "close", None)
if callable(close_fn):
close_fn()
except Exception as exc:
# Best-effort close: log but don't raise
LOG.debug("Exception while closing env %s: %s", env, exc)
def close_envs(env_or_collection: Any) -> None:
"""
Close a single env or any nested structure of envs.
Accepts:
- a single env with .close()
- a Mapping of things (e.g. dict)
- a Sequence of things (list/tuple) but NOT str/bytes
- nested combinations of the above
This is intentionally permissive and best-effort: it will swallow exceptions
encountered while closing individual envs and continue.
"""
# Guard: single object with close()
if hasattr(env_or_collection, "close") and not isinstance(env_or_collection, (Mapping, Sequence)):
_close_single_env(env_or_collection)
return
# Mapping (e.g., {suite: {task_id: vec_env}})
if isinstance(env_or_collection, Mapping):
for v in env_or_collection.values():
close_envs(v)
return
# Sequence (list/tuple) but skip str/bytes
if isinstance(env_or_collection, Sequence) and not isinstance(env_or_collection, (str, bytes)):
for v in env_or_collection:
close_envs(v)
return
# Fallback: try to close if possible
if hasattr(env_or_collection, "close"):
_close_single_env(env_or_collection)
-2
View File
@@ -186,7 +186,5 @@ def make_policy(
policy.to(cfg.device)
assert isinstance(policy, nn.Module)
# policy = torch.compile(policy, mode="reduce-overhead")
return policy
+157 -128
View File
@@ -46,17 +46,19 @@ Note that in both examples, the repo/folder should contain at least `config.json
You can learn about the CLI options for this script in the `EvalPipelineConfig` in lerobot/configs/eval.py
"""
import concurrent
import concurrent.futures as cf
import json
import logging
import threading
import time
from collections.abc import Callable
from collections import defaultdict
from collections.abc import Callable, Iterator
from contextlib import nullcontext
from copy import deepcopy
from dataclasses import asdict
from pathlib import Path
from pprint import pformat
from typing import TypedDict
import einops
import gymnasium as gym
@@ -69,7 +71,11 @@ from tqdm import trange
from lerobot.configs import parser
from lerobot.configs.eval import EvalPipelineConfig
from lerobot.envs.factory import make_env
from lerobot.envs.utils import add_envs_task, check_env_attributes_and_types, preprocess_observation
from lerobot.envs.utils import (
add_envs_task,
check_env_attributes_and_types,
preprocess_observation,
)
from lerobot.policies.factory import make_policy
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.utils import get_device_from_parameters
@@ -466,7 +472,9 @@ def eval_main(cfg: EvalPipelineConfig):
# Check device is available
device = get_safe_torch_device(cfg.policy.device, log=True)
# login to hf
# login()
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
set_seed(cfg.seed)
@@ -474,50 +482,39 @@ def eval_main(cfg: EvalPipelineConfig):
logging.info(colored("Output dir:", "yellow", attrs=["bold"]) + f" {cfg.output_dir}")
logging.info("Making environment.")
env = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs)
envs = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs)
logging.info("Making policy.")
policy = make_policy(
cfg=cfg.policy,
env_cfg=cfg.env,
)
policy.eval()
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
if cfg.env.multitask_eval:
info = eval_policy_multitask(
env,
policy,
cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
verbose=False,
)
print("Overall Aggregated Metrics:")
print(info["overall"]["aggregated"])
# Print per-suite stats
for task_group, task_group_info in info.items():
if task_group == "overall":
continue # Skip the overall stats since we already printed it
print(f"\nAggregated Metrics for {task_group}:")
print(task_group_info["aggregated"])
for _task_group, v in env.items():
for _env in v.values():
_env.close()
else:
info = eval_policy(
env,
policy,
cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
)
print(info["aggregated"])
env.close()
info = eval_policy_all(
envs,
policy,
cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
verbose=False,
)
print("Overall Aggregated Metrics:")
print(info["overall"]["aggregated"])
# Print per-suite stats
for task_group, task_group_info in info.items():
if task_group == "overall":
continue # Skip the overall stats since we already printed it
print(f"\nAggregated Metrics for {task_group}:")
print(task_group_info["aggregated"])
# Close all vec envs
for _suite, task_map in envs.items():
for _vec in task_map.values():
_vec.close()
# Save info
with open(Path(cfg.output_dir) / "eval_info.json", "w") as f:
json.dump(info, f, indent=2)
@@ -525,9 +522,20 @@ def eval_main(cfg: EvalPipelineConfig):
logging.info("End of eval")
def eval_policy_multitask(
envs: dict[str, dict[str, gym.vector.VectorEnv]],
policy,
# ---- typed payload returned by one task eval ----
class TaskMetrics(TypedDict):
sum_rewards: list[float]
max_rewards: list[float]
successes: list[bool]
video_paths: list[str]
ACC_KEYS = ("sum_rewards", "max_rewards", "successes", "video_paths")
def eval_policy_all(
envs: dict[str, dict[int, gym.vector.VectorEnv]],
policy: PreTrainedPolicy,
n_episodes: int,
max_episodes_rendered: int = 0,
videos_dir: Path | None = None,
@@ -536,126 +544,147 @@ def eval_policy_multitask(
max_parallel_tasks: int = 5,
verbose: bool = True,
) -> dict:
"""
Evaluate a policy over a dict-of-dicts of vectorized envs:
envs[suite_name][task_id] -> gym.vector.VectorEnv
Returns a dict with per-suite aggregates and an 'overall' block.
"""
global_start = time.time()
results = {}
overall_rewards, overall_max_rewards, overall_successes = [], [], []
overall_video_paths = []
overall_episode_data = None
# inner: evaluate a single (suite, task)
def eval_one(
task_group: str,
task_id: int,
env: gym.vector.VectorEnv,
*,
policy: PreTrainedPolicy,
n_episodes: int,
max_episodes_rendered: int,
videos_dir: Path | None,
return_episode_data: bool,
start_seed: int | None,
) -> TaskMetrics:
"""Evaluates one task_id of one suite using the provided vec env."""
if verbose:
print(f"Evaluating: task_group={task_group}, task_id={task_id} ...")
def eval_task(task_group, task_id, env):
"""Evaluates a single task in parallel."""
print(f"Evaluating: task_group: {task_group}, task_id: {task_id} ...")
task_videos_dir = None
if videos_dir is not None:
task_videos_dir = videos_dir / f"{task_group}_{task_id}"
task_videos_dir.mkdir(parents=True, exist_ok=True)
task_result = eval_policy(
env,
policy,
n_episodes,
max_episodes_rendered,
task_videos_dir,
return_episode_data,
start_seed,
env=env,
policy=policy,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=task_videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
)
per_episode = task_result["per_episode"]
return {
"task_group": task_group,
"task_id": task_id,
"sum_rewards": [ep["sum_reward"] for ep in per_episode],
"max_rewards": [ep["max_reward"] for ep in per_episode],
"successes": [ep["success"] for ep in per_episode],
"video_paths": task_result.get("video_paths", []),
}
return TaskMetrics(
sum_rewards=[ep["sum_reward"] for ep in per_episode],
max_rewards=[ep["max_reward"] for ep in per_episode],
successes=[ep["success"] for ep in per_episode],
video_paths=task_result.get("video_paths", []),
)
task_group_results = {}
if max_parallel_tasks == 1:
# sequential mode (safe for colab / EGL)
for task_group, tasks in envs.items():
for task_id, env in tasks.items():
task_result = eval_task(task_group, task_id, env)
if task_group not in task_group_results:
task_group_results[task_group] = {
"sum_rewards": [],
"max_rewards": [],
"successes": [],
"video_paths": [],
}
task_group_results[task_group]["sum_rewards"].extend(task_result["sum_rewards"])
task_group_results[task_group]["max_rewards"].extend(task_result["max_rewards"])
task_group_results[task_group]["successes"].extend(task_result["successes"])
task_group_results[task_group]["video_paths"].extend(task_result["video_paths"])
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
future_to_task = {
executor.submit(eval_task, task_group, task_id, env): (task_group, task_id)
for task_group, tasks in envs.items()
for task_id, env in tasks.items()
}
# result producer: sequential or threaded, same consumer
def iter_task_results() -> Iterator[tuple[str, int, TaskMetrics]]:
if max_parallel_tasks == 1:
for task_group, tasks in envs.items():
for task_id, vec in tasks.items():
yield (
task_group,
task_id,
eval_one(
task_group,
task_id,
vec,
policy=policy,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
),
)
else:
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
fut2key: dict[cf.Future, tuple[str, int]] = {}
for task_group, tasks in envs.items():
for task_id, vec in tasks.items():
fut = executor.submit(
eval_one,
task_group,
task_id,
vec,
policy=policy,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
)
fut2key[fut] = (task_group, task_id)
for fut in cf.as_completed(fut2key):
task_group, task_id = fut2key[fut]
yield task_group, task_id, fut.result()
task_group_results = {}
# single accumulator path on the main thread
group_acc: dict[str, dict[str, list]] = defaultdict(lambda: {k: [] for k in ACC_KEYS})
overall: dict[str, list] = {k: [] for k in ACC_KEYS}
for future in concurrent.futures.as_completed(future_to_task):
task_result = future.result()
task_group = task_result["task_group"]
for task_group, task_id, metrics in iter_task_results():
acc = group_acc[task_group]
for k in ACC_KEYS:
acc[k].extend(metrics[k])
overall[k].extend(metrics[k])
if task_group not in task_group_results:
task_group_results[task_group] = {
"sum_rewards": [],
"max_rewards": [],
"successes": [],
"video_paths": [],
}
task_group_results[task_group]["sum_rewards"].extend(task_result["sum_rewards"])
task_group_results[task_group]["max_rewards"].extend(task_result["max_rewards"])
task_group_results[task_group]["successes"].extend(task_result["successes"])
task_group_results[task_group]["video_paths"].extend(task_result["video_paths"])
# Process results per task group
for task_group, data in task_group_results.items():
# build outputs
results: dict[str, dict] = {}
for task_group, data in group_acc.items():
suite_rewards = data["sum_rewards"]
suite_max_rewards = data["max_rewards"]
suite_successes = data["successes"]
suite_video_paths = data["video_paths"]
suite_max = data["max_rewards"]
suite_succ = data["successes"]
suite_vids = data["video_paths"]
suite_eval_s = time.time() - global_start
suite_eval_ep_s = suite_eval_s / max(1, len(suite_rewards))
results[task_group] = {
"aggregated": {
"avg_sum_reward": float(np.nanmean(suite_rewards)),
"avg_max_reward": float(np.nanmean(suite_max_rewards)),
"pc_success": float(np.nanmean(suite_successes) * 100),
"avg_sum_reward": float(np.nanmean(suite_rewards)) if suite_rewards else float("nan"),
"avg_max_reward": float(np.nanmean(suite_max)) if suite_max else float("nan"),
"pc_success": float(np.nanmean(suite_succ) * 100) if suite_succ else float("nan"),
"eval_s": suite_eval_s,
"eval_ep_s": suite_eval_ep_s,
},
"video_paths": suite_video_paths,
"episodes": None, # Modify if episode data is needed
"video_paths": suite_vids,
"episodes": None,
}
overall_rewards.extend(suite_rewards)
overall_max_rewards.extend(suite_max_rewards)
overall_successes.extend(suite_successes)
overall_video_paths.extend(suite_video_paths)
# Global metrics
global_eval_s = time.time() - global_start
global_eval_ep_s = global_eval_s / max(1, len(overall_rewards))
global_eval_ep_s = global_eval_s / max(1, len(overall["sum_rewards"]))
results["overall"] = {
"aggregated": {
"avg_sum_reward": float(np.nanmean(overall_rewards)),
"avg_max_reward": float(np.nanmean(overall_max_rewards)),
"pc_success": float(np.nanmean(overall_successes) * 100),
"avg_sum_reward": float(np.nanmean(overall["sum_rewards"]))
if overall["sum_rewards"]
else float("nan"),
"avg_max_reward": float(np.nanmean(overall["max_rewards"]))
if overall["max_rewards"]
else float("nan"),
"pc_success": float(np.nanmean(overall["successes"]) * 100)
if overall["successes"]
else float("nan"),
"eval_s": global_eval_s,
"eval_ep_s": global_eval_ep_s,
},
"video_paths": overall_video_paths,
"episodes": overall_episode_data,
"video_paths": overall["video_paths"],
"episodes": None,
}
return results
+26 -41
View File
@@ -30,11 +30,12 @@ from lerobot.datasets.factory import make_dataset
from lerobot.datasets.sampler import EpisodeAwareSampler
from lerobot.datasets.utils import cycle
from lerobot.envs.factory import make_env
from lerobot.envs.utils import close_envs
from lerobot.optim.factory import make_optimizer_and_scheduler
from lerobot.policies.factory import make_policy
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.utils import get_device_from_parameters
from lerobot.scripts.eval import eval_policy, eval_policy_multitask
from lerobot.scripts.eval import eval_policy_all
from lerobot.utils.logging_utils import AverageMeter, MetricsTracker
from lerobot.utils.random_utils import set_seed
from lerobot.utils.train_utils import (
@@ -126,7 +127,6 @@ def train(cfg: TrainPipelineConfig):
logging.info("Creating dataset")
dataset = make_dataset(cfg)
# Create environment used for evaluating checkpoints during training on simulation data.
# On real-world data, no need to create an environment as evaluations are done outside train.py,
# using the eval.py instead, with gym_dora environment and dora-rs.
@@ -140,7 +140,6 @@ def train(cfg: TrainPipelineConfig):
cfg=cfg.policy,
ds_meta=dataset.meta,
)
logging.info("Creating optimizer and scheduler")
optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy)
grad_scaler = GradScaler(device.type, enabled=cfg.policy.use_amp)
@@ -203,7 +202,6 @@ def train(cfg: TrainPipelineConfig):
start_time = time.perf_counter()
batch = next(dl_iter)
train_tracker.dataloading_s = time.perf_counter() - start_time
for key in batch:
if isinstance(batch[key], torch.Tensor):
batch[key] = batch[key].to(device, non_blocking=device.type == "cuda")
@@ -251,34 +249,27 @@ def train(cfg: TrainPipelineConfig):
torch.no_grad(),
torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(),
):
if cfg.env.multitask_eval:
eval_info = eval_policy_multitask(
eval_env,
policy,
cfg.eval.n_episodes,
videos_dir=cfg.output_dir / "eval" / f"videos_step_{step_id}",
max_episodes_rendered=4,
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
)
aggregated = eval_info["overall"]["aggregated"]
# Print per-suite stats, log?
for task_group, task_group_info in eval_info.items():
if task_group == "overall":
continue # Skip the overall stats since we already printed it
print(f"\nAggregated Metrics for {task_group}:")
print(task_group_info["aggregated"])
else:
eval_info = eval_policy(
eval_env,
policy,
cfg.eval.n_episodes,
videos_dir=cfg.output_dir / "eval" / f"videos_step_{step_id}",
max_episodes_rendered=4,
start_seed=cfg.seed,
)
aggregated = eval_info["aggregated"]
eval_info = eval_policy_all(
eval_env, # dict[suite][task_id] -> vec_env
policy,
cfg.eval.n_episodes,
videos_dir=videos_dir,
max_episodes_rendered=4,
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
verbose=False,
)
# overall metrics (suite-agnostic)
aggregated = eval_info["overall"]["aggregated"]
# optional: per-suite logging
for suite, suite_info in eval_info.items():
if suite == "overall":
continue
logging.info("Suite %s aggregated: %s", suite, suite_info["aggregated"])
# meters/tracker
eval_metrics = {
"avg_sum_reward": AverageMeter("∑rwrd", ":.3f"),
"pc_success": AverageMeter("success", ":.1f"),
@@ -287,22 +278,16 @@ def train(cfg: TrainPipelineConfig):
eval_tracker = MetricsTracker(
cfg.batch_size, dataset.num_frames, dataset.num_episodes, eval_metrics, initial_step=step
)
eval_tracker.eval_s = aggregated.pop("eval_s")
eval_tracker.avg_sum_reward = aggregated.pop("avg_sum_reward")
eval_tracker.pc_success = aggregated.pop("pc_success")
logging.info(eval_tracker)
eval_tracker.eval_s = aggregated.get("eval_s", 0.0)
eval_tracker.avg_sum_reward = aggregated.get("avg_sum_reward", float("nan"))
eval_tracker.pc_success = aggregated.get("pc_success", float("nan"))
if wandb_logger:
wandb_log_dict = {**eval_tracker.to_dict(), **eval_info}
wandb_logger.log_dict(wandb_log_dict, step, mode="eval")
wandb_logger.log_video(eval_info["video_paths"][0], step, mode="eval")
if eval_env:
if cfg.env.multitask_eval:
for _task_group, envs_dict in eval_env.items():
for _idx, env in envs_dict.items():
env.close()
else:
eval_env.close()
close_envs(eval_env)
logging.info("End of training")
if cfg.policy.push_to_hub: