add new changes

This commit is contained in:
Jade Choghari
2025-09-11 12:21:21 +02:00
parent aa40c8c813
commit 96cc634a66
8 changed files with 505 additions and 304 deletions
+5 -5
View File
@@ -18,11 +18,11 @@ export CUDA_VISIBLE_DEVICES=2
# CONFIGURATION
POLICY_PATH="/raid/jade/logs/lerobot/lerobot_2_HuggingFaceVLA_libero_smolvla_lr1e-4bs32steps100000/checkpoints/100000/pretrained_model"
POLICY_PATH="/raid/jade/logs/lerobot/lerobot_new_HuggingfaceVLA_libero_smolvla_lr1e-4bs32steps100000/checkpoints/100000/pretrained_model"
TASK=libero_spatial
POLICY_PATH="/raid/jade/models/smolvlamust"
TASK=libero_spatial,libero_object
ENV_TYPE="libero"
BATCH_SIZE=10
N_EPISODES=10
BATCH_SIZE=1
N_EPISODES=1
# storage / caches
RAID=/raid/jade
N_ACTION_STEPS=1
@@ -46,7 +46,7 @@ python src/lerobot/scripts/eval.py \
--env.type="$ENV_TYPE" \
--eval.batch_size="$BATCH_SIZE" \
--eval.n_episodes="$N_EPISODES" \
--env.multitask_eval=False \
--env.multitask_eval=True \
--env.task=$TASK \
# python examples/evaluate_libero.py \
# --policy_path "$POLICY_PATH" \
+7 -3
View File
@@ -56,14 +56,16 @@ EXPERT_WIDTH_MULTIPLIER=0.5
# number of gpus to use
NUM_PROCESSES=2
NUM_VLM_LAYERS=0
SELF_ATTN_EVERY_N_LAYERS=2
SELF_ATTN_EVERY_N_LAYERS=0
CHUNK_SIZE=50
export CUDA_VISIBLE_DEVICES=0
export CUDA_VISIBLE_DEVICES=1
PORT=29522
PREFIX_LENGTH=0
LOAD_VLM_WEIGHTS=true
MAX_ACTION_DIM=32
MAX_STATE_DIM=32
# naming/output dir
TRAIN_DIR=$RAID/logs/lerobot/lerobot_new_${REPO_ID//\//_}_${POLICY}_lr${LR}bs${BATCH_SIZE}steps${OFFLINE_STEPS}
TRAIN_DIR=$RAID/logs/lerobot/lerobot_new_sep11_v2_${REPO_ID//\//_}_${POLICY}_lr${LR}bs${BATCH_SIZE}steps${OFFLINE_STEPS}
echo "Training dir: $TRAIN_DIR"
rm -rf "$TRAIN_DIR"
@@ -137,5 +139,7 @@ python src/lerobot/scripts/train.py \
--policy.load_vlm_weights=$LOAD_VLM_WEIGHTS \
--policy.expert_width_multiplier=$EXPERT_WIDTH_MULTIPLIER \
--policy.self_attn_every_n_layers=$SELF_ATTN_EVERY_N_LAYERS \
--policy.max_action_dim=$MAX_ACTION_DIM \
--policy.max_state_dim=$MAX_STATE_DIM \
--seed=$SEED \
--wandb.enable=false
+32 -27
View File
@@ -56,36 +56,41 @@ def make_env(
names to indexed vectorized environments (when multitask eval is used).
"""
if n_envs < 1:
raise ValueError("`n_envs` must be at least 1")
if n_envs < 1:
raise ValueError("`n_envs` must be at least 1")
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
if "libero" in cfg.type:
from lerobot.envs.libero import create_libero_envs
return create_libero_envs(
task=cfg.task,
n_envs=n_envs,
camera_name=cfg.camera_name,
init_states=cfg.init_states,
gym_kwargs=cfg.gym_kwargs,
env_cls=env_cls,
multitask_eval=cfg.multitask_eval,
)
if "libero" in cfg.type:
from lerobot.envs.libero import create_libero_envs
return create_libero_envs(
task=cfg.task,
n_envs=n_envs,
camera_name=cfg.camera_name,
init_states=cfg.init_states,
gym_kwargs=cfg.gym_kwargs,
env_cls=env_cls,
multitask_eval=cfg.multitask_eval,
)
package_name = f"gym_{cfg.type}"
try:
importlib.import_module(package_name)
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
f"{package_name} is not installed. Install with: pip install \"lerobot[{cfg.type}]\""
) from e
package_name = f"gym_{cfg.type}"
try:
importlib.import_module(package_name)
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
f"{package_name} is not installed. Install with: pip install \"lerobot[{cfg.type}]\""
) from e
gym_handle = f"{package_name}/{cfg.task}"
def _make_one():
return gym.make(gym_handle, disable_env_checker=True, **(cfg.gym_kwargs or {}))
gym_handle = f"{package_name}/{cfg.task}"
def _make_one():
return gym.make(gym_handle, disable_env_checker=True, **(cfg.gym_kwargs or {}))
vec = env_cls([_make_one for _ in range(n_envs)])
# normalize to {suite: {task_id: vec_env}} for consistency
suite_name = cfg.type # e.g., "pusht", "aloha"
return {suite_name: {0: vec}}
return env_cls([_make_one for _ in range(n_envs)])
+254 -97
View File
@@ -1,3 +1,5 @@
from __future__ import annotations
import math
import os
from collections import defaultdict
@@ -12,100 +14,153 @@ from gymnasium import spaces
from libero.libero import benchmark, get_libero_path
from libero.libero.envs import OffScreenRenderEnv
import logging
from collections import defaultdict
from typing import Any, Callable, Dict, Iterable, List, Mapping, Sequence
logger = logging.getLogger(__name__)
# ---- Helpers -----------------------------------------------------------------
def _parse_camera_names(camera_name: str | Sequence[str]) -> List[str]:
"""Normalize camera_name into a non-empty list of strings."""
if isinstance(camera_name, str):
cams = [c.strip() for c in camera_name.split(",") if c.strip()]
elif isinstance(camera_name, (list, tuple)):
cams = [str(c).strip() for c in camera_name if str(c).strip()]
else:
raise TypeError(f"camera_name must be str or sequence[str], got {type(camera_name).__name__}")
if not cams:
raise ValueError("camera_name resolved to an empty list.")
return cams
def _get_suite(name: str):
"""Instantiate a LIBERO suite by name with clear validation."""
bench = benchmark.get_benchmark_dict()
if name not in bench:
raise ValueError(f"Unknown LIBERO suite '{name}'. Available: {', '.join(sorted(bench.keys()))}")
suite = bench[name]()
if not getattr(suite, "tasks", None):
raise ValueError(f"Suite '{name}' has no tasks.")
return suite
def _select_task_ids(total_tasks: int, task_ids: Iterable[int] | None) -> List[int]:
"""Validate/normalize task ids. If None → all tasks."""
if task_ids is None:
return list(range(total_tasks))
ids = sorted(set(int(t) for t in task_ids))
for t in ids:
if t < 0 or t >= total_tasks:
raise ValueError(f"task_id {t} out of range [0, {total_tasks-1}].")
return ids
def _make_env_fns(
*,
suite,
suite_name: str,
task_id: int,
n_envs: int,
camera_names: List[str],
init_states: bool,
gym_kwargs: Mapping[str, Any],
LiberoEnv: type, # injected to avoid forward ref issues if needed
) -> List[Callable[[], "LiberoEnv"]]:
"""Build n_envs factory callables for a single (suite, task_id)."""
joined_cams = ",".join(camera_names) # keep backward-compat: downstream expects a string
fns: List[Callable[[], "LiberoEnv"]] = []
for i in range(n_envs):
def _mk(i=i, suite=suite, task_id=task_id, suite_name=suite_name, joined_cams=joined_cams, init_states=init_states, gym_kwargs=dict(gym_kwargs)):
return LiberoEnv(
task_suite=suite,
task_id=task_id,
task_suite_name=suite_name,
camera_name=joined_cams,
init_states=init_states,
episode_index=i,
**gym_kwargs,
)
fns.append(_mk)
return fns
# ---- Main API ----------------------------------------------------------------
def create_libero_envs(
task: str,
n_envs: int,
gym_kwargs: dict[str, Any] = None,
camera_name: str = "agentview_image,robot0_eye_in_hand_image",
gym_kwargs: dict[str, Any] | None = None,
camera_name: str | Sequence[str] = "agentview_image,robot0_eye_in_hand_image",
init_states: bool = True,
env_cls: Callable = None,
multitask_eval: bool = True,
) -> dict[str, dict[str, Any]]:
env_cls: Callable[[Sequence[Callable[[], Any]]], Any] | None = None,
multitask_eval: bool = True, # kept for signature compatibility; return type is consistent regardless
) -> dict[str, dict[int, Any]]:
"""
Here n_envs is per task and equal to the number of rollouts.
Create vectorized LIBERO environments with a consistent return shape.
Returns:
dict[str, dict[str, list[LiberoEnv]]]: keys are task_suite and values are list of LiberoEnv envs.
dict[suite_name][task_id] -> vec_env (env_cls([...]) with exactly n_envs factories)
Notes:
- n_envs is the number of rollouts *per task* (episode_index = 0..n_envs-1).
- `task` can be a single suite or a comma-separated list of suites.
- You may pass `task_ids` (list[int]) inside `gym_kwargs` to restrict tasks per suite.
"""
print("num envs", n_envs)
print("multitask_eval", multitask_eval)
print("gym_kwargs", gym_kwargs)
if gym_kwargs is None:
gym_kwargs = {}
if env_cls is None or not callable(env_cls):
raise ValueError("env_cls must be a callable that wraps a list of environment factory callables.")
if not isinstance(n_envs, int) or n_envs <= 0:
raise ValueError(f"n_envs must be a positive int; got {n_envs}.")
if not multitask_eval:
benchmark_dict = benchmark.get_benchmark_dict()
task_suite = benchmark_dict[task]() # can also choose libero_spatial, libero_object, libero_10 etc.
tasks_id = list(range(len(task_suite.tasks)))
episode_indices = [0 for i in range(len(tasks_id))]
if len(tasks_id) == 1:
tasks_id = [tasks_id[0] for _ in range(n_envs)]
episode_indices = list(range(n_envs))
elif len(tasks_id) < n_envs and n_envs % len(tasks_id) == 0:
n_repeat = n_envs // len(tasks_id)
print("n_repeat", n_repeat)
episode_indices = []
for _ in range(len(tasks_id)):
episode_indices.extend(list(range(n_repeat)))
tasks_id = list(chain.from_iterable([[item] * n_repeat for item in tasks_id]))
elif n_envs < len(tasks_id):
tasks_id = tasks_id[:n_envs]
episode_indices = list(range(n_envs))[:n_envs]
print(f"WARNING: n_envs < len(tasks_id), evaluating only on {tasks_id}")
print(f"Creating Libero envs with task ids {tasks_id} from suite {task}")
assert n_envs == len(tasks_id), (
f"len(n_envs) and tasks_id should be the same, got {n_envs} and {len(tasks_id)}"
)
return env_cls(
[
lambda i=i: LiberoEnv(
task_suite=task_suite,
task_id=tasks_id[i],
task_suite_name=task,
camera_name=camera_name,
init_states=init_states,
episode_index=episode_indices[i],
**gym_kwargs,
)
for i in range(n_envs)
]
)
else:
envs = defaultdict(dict)
benchmark_dict = benchmark.get_benchmark_dict()
task = task.split(",")
for _task in task:
task_suite = benchmark_dict[
_task
]() # can also choose libero_spatial, libero_object, libero_10 etc.
tasks_ids = list(range(len(task_suite.tasks)))
for tasks_id in tasks_ids:
episode_indices = list(range(n_envs))
print(
f"Creating Libero envs with task ids {tasks_id} from suite {_task}, episode_indices: {episode_indices}"
)
envs_list = [
(
lambda i=i,
task_suite=task_suite,
tasks_id=tasks_id,
_task=_task,
episode_indices=episode_indices: LiberoEnv(
task_suite=task_suite,
task_id=tasks_id,
task_suite_name=_task,
camera_name=camera_name,
init_states=init_states,
episode_index=episode_indices[i],
**gym_kwargs,
)
)
for i in range(n_envs)
]
envs[_task][tasks_id] = env_cls(envs_list)
return envs
gym_kwargs = dict(gym_kwargs or {})
task_ids_filter = gym_kwargs.pop("task_ids", None) # optional: limit to specific tasks
# Avoid circular import/type issues: assume LiberoEnv is defined in this module
try:
LiberoEnv # type: ignore[name-defined]
except NameError:
# If LiberoEnv is in the same file, this won't run. If it's elsewhere, import here.
exit()
# from .libero_env import LiberoEnv # adjust if your class lives in another module
camera_names = _parse_camera_names(camera_name)
suite_names = [s.strip() for s in str(task).split(",") if s.strip()]
if not suite_names:
raise ValueError("`task` must contain at least one LIBERO suite name.")
logger.info(
"Creating LIBERO envs | suites=%s | n_envs(per task)=%d | init_states=%s | multitask_eval=%s",
suite_names, n_envs, init_states, bool(multitask_eval)
)
if task_ids_filter is not None:
logger.info("Restricting to task_ids=%s", task_ids_filter)
out: Dict[str, Dict[int, Any]] = defaultdict(dict)
for suite_name in suite_names:
suite = _get_suite(suite_name)
total = len(suite.tasks)
selected = _select_task_ids(total, task_ids_filter)
if not selected:
raise ValueError(f"No tasks selected for suite '{suite_name}' (available: {total}).")
for tid in selected:
fns = _make_env_fns(
suite=suite,
suite_name=suite_name,
task_id=tid,
n_envs=n_envs,
camera_names=camera_names,
init_states=init_states,
gym_kwargs=gym_kwargs,
LiberoEnv=LiberoEnv,
)
out[suite_name][tid] = env_cls(fns)
logger.debug("Built vec env | suite=%s | task_id=%d | n_envs=%d", suite_name, tid, n_envs)
# return plain dicts for predictability
return {suite: dict(task_map) for suite, task_map in out.items()}
def quat2axisangle(quat):
"""
Copied from robosuite: https://github.com/ARISE-Initiative/robosuite/blob/eafb81f54ffc104f905ee48a16bb15f059176ad3/robosuite/utils/transform_utils.py#L490C1-L512C55
@@ -199,17 +254,15 @@ class LiberoEnv(gym.Env):
self.episode_index = episode_index
self._env = self._make_envs_task(task_suite, self.task_id)
if task_suite_name == "libero_spatial":
max_steps = 220 # longest training demo has 193 steps
elif task_suite_name == "libero_object":
max_steps = 280 # longest training demo has 254 steps
elif task_suite_name == "libero_goal":
max_steps = 300 # longest training demo has 270 steps
elif task_suite_name == "libero_10":
max_steps = 520 # longest training demo has 505 steps
elif task_suite_name == "libero_90":
max_steps = 400 # longest training demo has 373 steps
self._max_episode_steps = max_steps
TASK_SUITE_MAX_STEPS: dict[str, int] = {
"libero_spatial": 220, # longest training demo has 193 steps
"libero_object": 280, # longest training demo has 254 steps
"libero_goal": 300, # longest training demo has 270 steps
"libero_10": 520, # longest training demo has 505 steps
"libero_90": 400, # longest training demo has 373 steps
}
default_steps = 500
self._max_episode_steps = TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps)
images = {}
for cam in self.camera_name:
@@ -221,7 +274,11 @@ class LiberoEnv(gym.Env):
)
if self.obs_type == "state":
raise NotImplementedError()
raise NotImplementedError(
"The 'state' observation type is not supported in LiberoEnv. "
"Please switch to an image-based obs_type (e.g. 'pixels', 'pixels_agent_pos')."
)
elif self.obs_type == "pixels":
self.observation_space = spaces.Dict(
{
@@ -285,7 +342,10 @@ class LiberoEnv(gym.Env):
)
agent_pos = state
if self.obs_type == "state":
raise NotImplementedError()
raise NotImplementedError(
"The 'state' observation type is not supported in LiberoEnv. "
"Please switch to an image-based obs_type (e.g. 'pixels', 'pixels_agent_pos')."
)
elif self.obs_type == "pixels":
obs = {"pixels": images.copy()}
elif self.obs_type == "pixels_agent_pos":
@@ -308,7 +368,11 @@ class LiberoEnv(gym.Env):
return observation, info
def step(self, action):
assert action.ndim == 1
if action.ndim != 1:
raise ValueError(
f"Expected action to be 1-D (shape (action_dim,)), "
f"but got shape {action.shape} with ndim={action.ndim}"
)
raw_obs, reward, done, info = self._env.step(action)
is_success = self._env.check_success()
@@ -324,3 +388,96 @@ class LiberoEnv(gym.Env):
def close(self):
self._env.close()
def create_libero_envs1(
task: str,
n_envs: int,
gym_kwargs: dict[str, Any] = None,
camera_name: str = "agentview_image,robot0_eye_in_hand_image",
init_states: bool = True,
env_cls: Callable = None,
multitask_eval: bool = True,
) -> dict[str, dict[str, Any]]:
"""
Here n_envs is per task and equal to the number of rollouts.
Returns:
dict[str, dict[str, list[LiberoEnv]]]: keys are task_suite and values are list of LiberoEnv envs.
"""
print("num envs", n_envs)
print("multitask_eval", multitask_eval)
print("gym_kwargs", gym_kwargs)
if gym_kwargs is None:
gym_kwargs = {}
if not multitask_eval:
benchmark_dict = benchmark.get_benchmark_dict()
task_suite = benchmark_dict[task]() # can also choose libero_spatial, libero_object, libero_10 etc.
tasks_id = list(range(len(task_suite.tasks)))
episode_indices = [0 for i in range(len(tasks_id))]
if len(tasks_id) == 1:
tasks_id = [tasks_id[0] for _ in range(n_envs)]
episode_indices = list(range(n_envs))
elif len(tasks_id) < n_envs and n_envs % len(tasks_id) == 0:
n_repeat = n_envs // len(tasks_id)
print("n_repeat", n_repeat)
episode_indices = []
for _ in range(len(tasks_id)):
episode_indices.extend(list(range(n_repeat)))
tasks_id = list(chain.from_iterable([[item] * n_repeat for item in tasks_id]))
elif n_envs < len(tasks_id):
tasks_id = tasks_id[:n_envs]
episode_indices = list(range(n_envs))[:n_envs]
print(f"WARNING: n_envs < len(tasks_id), evaluating only on {tasks_id}")
print(f"Creating Libero envs with task ids {tasks_id} from suite {task}")
assert n_envs == len(tasks_id), (
f"len(n_envs) and tasks_id should be the same, got {n_envs} and {len(tasks_id)}"
)
return env_cls(
[
lambda i=i: LiberoEnv(
task_suite=task_suite,
task_id=tasks_id[i],
task_suite_name=task,
camera_name=camera_name,
init_states=init_states,
episode_index=episode_indices[i],
**gym_kwargs,
)
for i in range(n_envs)
]
)
else:
envs = defaultdict(dict)
benchmark_dict = benchmark.get_benchmark_dict()
task = task.split(",")
for _task in task:
task_suite = benchmark_dict[
_task
]() # can also choose libero_spatial, libero_object, libero_10 etc.
tasks_ids = list(range(len(task_suite.tasks)))
for tasks_id in tasks_ids:
episode_indices = list(range(n_envs))
print(
f"Creating Libero envs with task ids {tasks_id} from suite {_task}, episode_indices: {episode_indices}"
)
envs_list = [
(
lambda i=i,
task_suite=task_suite,
tasks_id=tasks_id,
_task=_task,
episode_indices=episode_indices: LiberoEnv(
task_suite=task_suite,
task_id=tasks_id,
task_suite_name=_task,
camera_name=camera_name,
init_states=init_states,
episode_index=episode_indices[i],
**gym_kwargs,
)
)
for i in range(n_envs)
]
envs[_task][tasks_id] = env_cls(envs_list)
return envs
+44
View File
@@ -182,3 +182,47 @@ def add_envs_task(env: gym.vector.VectorEnv, observation: dict[str, Any]) -> dic
num_envs = observation[list(observation.keys())[0]].shape[0]
observation["task"] = ["" for _ in range(num_envs)]
return observation
def _close_single_env(env: Any) -> None:
"""Try to close a single env object if it exposes .close()."""
try:
close_fn = getattr(env, "close", None)
if callable(close_fn):
close_fn()
except Exception as exc:
# Best-effort close: log but don't raise
LOG.debug("Exception while closing env %s: %s", env, exc)
def close_envs(env_or_collection: Any) -> None:
"""
Close a single env or any nested structure of envs.
Accepts:
- a single env with .close()
- a Mapping of things (e.g. dict)
- a Sequence of things (list/tuple) but NOT str/bytes
- nested combinations of the above
This is intentionally permissive and best-effort: it will swallow exceptions
encountered while closing individual envs and continue.
"""
# Guard: single object with close()
if hasattr(env_or_collection, "close") and not isinstance(env_or_collection, (Mapping, Sequence)):
_close_single_env(env_or_collection)
return
# Mapping (e.g., {suite: {task_id: vec_env}})
if isinstance(env_or_collection, Mapping):
for v in env_or_collection.values():
close_envs(v)
return
# Sequence (list/tuple) but skip str/bytes
if isinstance(env_or_collection, Sequence) and not isinstance(env_or_collection, (str, bytes)):
for v in env_or_collection:
close_envs(v)
return
# Fallback: try to close if possible
if hasattr(env_or_collection, "close"):
_close_single_env(env_or_collection)
-1
View File
@@ -177,6 +177,5 @@ def make_policy(
policy = policy_cls(**kwargs)
policy.to(cfg.device)
assert isinstance(policy, nn.Module)
breakpoint()
# policy = torch.compile(policy, mode="reduce-overhead")
return policy
+134 -127
View File
@@ -45,7 +45,6 @@ Note that in both examples, the repo/folder should contain at least `config.json
You can learn about the CLI options for this script in the `EvalPipelineConfig` in lerobot/configs/eval.py
"""
import concurrent
import json
import logging
@@ -80,6 +79,9 @@ from lerobot.utils.utils import (
init_logging,
inside_slurm,
)
from typing import TypedDict, Dict, List, Tuple, Iterator
from collections import defaultdict
import concurrent.futures as cf
def rollout(
@@ -537,8 +539,8 @@ def eval_main(cfg: EvalPipelineConfig):
logging.info(colored("Output dir:", "yellow", attrs=["bold"]) + f" {cfg.output_dir}")
logging.info("Making environment.")
env = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs)
envs = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs)
breakpoint()
logging.info("Making policy.")
policy = make_policy(
cfg=cfg.policy,
@@ -550,41 +552,29 @@ def eval_main(cfg: EvalPipelineConfig):
policy.eval()
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
if cfg.env.multitask_eval:
info = eval_policy_multitask(
env,
policy,
cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
verbose=False,
)
print("Overall Aggregated Metrics:")
print(info["overall"]["aggregated"])
# Print per-suite stats
for task_group, task_group_info in info.items():
if task_group == "overall":
continue # Skip the overall stats since we already printed it
print(f"\nAggregated Metrics for {task_group}:")
print(task_group_info["aggregated"])
for _task_group, v in env.items():
for _env in v.values():
_env.close()
else:
info = eval_policy(
env,
policy,
cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
)
print(info["aggregated"])
env.close()
info = eval_policy_all(
envs,
policy,
cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
verbose=False,
)
print("Overall Aggregated Metrics:")
print(info["overall"]["aggregated"])
# Print per-suite stats
for task_group, task_group_info in info.items():
if task_group == "overall":
continue # Skip the overall stats since we already printed it
print(f"\nAggregated Metrics for {task_group}:")
print(task_group_info["aggregated"])
# Close all vec envs
for _suite, task_map in envs.items():
for _vec in task_map.values():
_vec.close()
# Save info
with open(Path(cfg.output_dir) / "eval_info.json", "w") as f:
json.dump(info, f, indent=2)
@@ -592,9 +582,19 @@ def eval_main(cfg: EvalPipelineConfig):
logging.info("End of eval")
def eval_policy_multitask(
envs: dict[str, dict[str, gym.vector.VectorEnv]],
policy,
# ---- typed payload returned by one task eval ----
class TaskMetrics(TypedDict):
sum_rewards: List[float]
max_rewards: List[float]
successes: List[bool]
video_paths: List[str]
ACC_KEYS = ("sum_rewards", "max_rewards", "successes", "video_paths")
def eval_policy_all(
envs: dict[str, dict[int, gym.vector.VectorEnv]],
policy: PreTrainedPolicy,
n_episodes: int,
max_episodes_rendered: int = 0,
videos_dir: Path | None = None,
@@ -603,129 +603,136 @@ def eval_policy_multitask(
max_parallel_tasks: int = 5,
verbose: bool = True,
) -> dict:
"""
Evaluate a policy over a dict-of-dicts of vectorized envs:
envs[suite_name][task_id] -> gym.vector.VectorEnv
Returns a dict with per-suite aggregates and an 'overall' block.
"""
global_start = time.time()
results = {}
overall_rewards, overall_max_rewards, overall_successes = [], [], []
overall_video_paths = []
overall_episode_data = None
# inner: evaluate a single (suite, task)
def eval_one(
task_group: str,
task_id: int,
env: gym.vector.VectorEnv,
*,
policy: PreTrainedPolicy,
n_episodes: int,
max_episodes_rendered: int,
videos_dir: Path | None,
return_episode_data: bool,
start_seed: int | None,
) -> TaskMetrics:
"""Evaluates one task_id of one suite using the provided vec env."""
if verbose:
print(f"Evaluating: task_group={task_group}, task_id={task_id} ...")
def eval_task(task_group, task_id, env):
"""Evaluates a single task in parallel."""
print(f"Evaluating: task_group: {task_group}, task_id: {task_id} ...")
task_videos_dir = None
if videos_dir is not None:
task_videos_dir = videos_dir / f"{task_group}_{task_id}"
task_videos_dir.mkdir(parents=True, exist_ok=True)
task_result = eval_policy(
env,
policy,
n_episodes,
max_episodes_rendered,
task_videos_dir,
return_episode_data,
start_seed,
env=env,
policy=policy,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=task_videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
)
per_episode = task_result["per_episode"]
return {
"task_group": task_group,
"task_id": task_id,
"sum_rewards": [ep["sum_reward"] for ep in per_episode],
"max_rewards": [ep["max_reward"] for ep in per_episode],
"successes": [ep["success"] for ep in per_episode],
"video_paths": task_result.get("video_paths", []),
}
return TaskMetrics(
sum_rewards=[ep["sum_reward"] for ep in per_episode],
max_rewards=[ep["max_reward"] for ep in per_episode],
successes=[ep["success"] for ep in per_episode],
video_paths=task_result.get("video_paths", []),
)
task_group_results = {}
if max_parallel_tasks == 1:
# sequential mode (safe for colab / EGL)
for task_group, tasks in envs.items():
for task_id, env in tasks.items():
task_result = eval_task(task_group, task_id, env)
if task_group not in task_group_results:
task_group_results[task_group] = {
"sum_rewards": [],
"max_rewards": [],
"successes": [],
"video_paths": [],
}
task_group_results[task_group]["sum_rewards"].extend(task_result["sum_rewards"])
task_group_results[task_group]["max_rewards"].extend(task_result["max_rewards"])
task_group_results[task_group]["successes"].extend(task_result["successes"])
task_group_results[task_group]["video_paths"].extend(task_result["video_paths"])
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
future_to_task = {
executor.submit(eval_task, task_group, task_id, env): (task_group, task_id)
for task_group, tasks in envs.items()
for task_id, env in tasks.items()
}
# result producer: sequential or threaded, same consumer
def iter_task_results() -> Iterator[Tuple[str, int, TaskMetrics]]:
if max_parallel_tasks == 1:
for task_group, tasks in envs.items():
for task_id, vec in tasks.items():
yield task_group, task_id, eval_one(
task_group, task_id, vec,
policy=policy,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
)
else:
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
fut2key: Dict[cf.Future, Tuple[str, int]] = {}
for task_group, tasks in envs.items():
for task_id, vec in tasks.items():
fut = executor.submit(
eval_one, task_group, task_id, vec,
policy=policy,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
)
fut2key[fut] = (task_group, task_id)
for fut in cf.as_completed(fut2key):
task_group, task_id = fut2key[fut]
yield task_group, task_id, fut.result()
task_group_results = {}
# single accumulator path on the main thread
group_acc: Dict[str, Dict[str, List]] = defaultdict(lambda: {k: [] for k in ACC_KEYS})
overall: Dict[str, List] = {k: [] for k in ACC_KEYS}
for future in concurrent.futures.as_completed(future_to_task):
task_result = future.result()
task_group = task_result["task_group"]
for task_group, task_id, metrics in iter_task_results():
acc = group_acc[task_group]
for k in ACC_KEYS:
acc[k].extend(metrics[k])
overall[k].extend(metrics[k])
if task_group not in task_group_results:
task_group_results[task_group] = {
"sum_rewards": [],
"max_rewards": [],
"successes": [],
"video_paths": [],
}
task_group_results[task_group]["sum_rewards"].extend(task_result["sum_rewards"])
task_group_results[task_group]["max_rewards"].extend(task_result["max_rewards"])
task_group_results[task_group]["successes"].extend(task_result["successes"])
task_group_results[task_group]["video_paths"].extend(task_result["video_paths"])
# Process results per task group
for task_group, data in task_group_results.items():
# build outputs
results: Dict[str, dict] = {}
for task_group, data in group_acc.items():
suite_rewards = data["sum_rewards"]
suite_max_rewards = data["max_rewards"]
suite_successes = data["successes"]
suite_video_paths = data["video_paths"]
suite_max = data["max_rewards"]
suite_succ = data["successes"]
suite_vids = data["video_paths"]
suite_eval_s = time.time() - global_start
suite_eval_ep_s = suite_eval_s / max(1, len(suite_rewards))
results[task_group] = {
"aggregated": {
"avg_sum_reward": float(np.nanmean(suite_rewards)),
"avg_max_reward": float(np.nanmean(suite_max_rewards)),
"pc_success": float(np.nanmean(suite_successes) * 100),
"avg_sum_reward": float(np.nanmean(suite_rewards)) if suite_rewards else float("nan"),
"avg_max_reward": float(np.nanmean(suite_max)) if suite_max else float("nan"),
"pc_success": float(np.nanmean(suite_succ) * 100) if suite_succ else float("nan"),
"eval_s": suite_eval_s,
"eval_ep_s": suite_eval_ep_s,
},
"video_paths": suite_video_paths,
"episodes": None, # Modify if episode data is needed
"video_paths": suite_vids,
"episodes": None,
}
overall_rewards.extend(suite_rewards)
overall_max_rewards.extend(suite_max_rewards)
overall_successes.extend(suite_successes)
overall_video_paths.extend(suite_video_paths)
# Global metrics
global_eval_s = time.time() - global_start
global_eval_ep_s = global_eval_s / max(1, len(overall_rewards))
global_eval_ep_s = global_eval_s / max(1, len(overall["sum_rewards"]))
results["overall"] = {
"aggregated": {
"avg_sum_reward": float(np.nanmean(overall_rewards)),
"avg_max_reward": float(np.nanmean(overall_max_rewards)),
"pc_success": float(np.nanmean(overall_successes) * 100),
"avg_sum_reward": float(np.nanmean(overall["sum_rewards"])) if overall["sum_rewards"] else float("nan"),
"avg_max_reward": float(np.nanmean(overall["max_rewards"])) if overall["max_rewards"] else float("nan"),
"pc_success": float(np.nanmean(overall["successes"]) * 100) if overall["successes"] else float("nan"),
"eval_s": global_eval_s,
"eval_ep_s": global_eval_ep_s,
},
"video_paths": overall_video_paths,
"episodes": overall_episode_data,
"video_paths": overall["video_paths"],
"episodes": None,
}
return results
if __name__ == "__main__":
init_logging()
eval_main()
+29 -44
View File
@@ -30,11 +30,12 @@ from lerobot.datasets.factory import make_dataset
from lerobot.datasets.sampler import EpisodeAwareSampler
from lerobot.datasets.utils import cycle
from lerobot.envs.factory import make_env
from lerobot.envs.utils import close_envs
from lerobot.optim.factory import make_optimizer_and_scheduler
from lerobot.policies.factory import make_policy
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.utils import get_device_from_parameters
from lerobot.scripts.eval import eval_policy, eval_policy_multitask
from lerobot.scripts.eval import eval_policy_all
from lerobot.utils.logging_utils import AverageMeter, MetricsTracker
from lerobot.utils.random_utils import set_seed
from lerobot.utils.train_utils import (
@@ -270,62 +271,46 @@ def train(cfg: TrainPipelineConfig):
if cfg.env and is_eval_step:
step_id = get_step_identifier(step, cfg.steps)
logging.info(f"Eval policy at step {step}")
with (
torch.no_grad(),
torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(),
):
if cfg.env.multitask_eval:
eval_info = eval_policy_multitask(
eval_env,
policy,
cfg.eval.n_episodes,
videos_dir=cfg.output_dir / "eval" / f"videos_step_{step_id}",
max_episodes_rendered=4,
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
)
aggregated = eval_info["overall"]["aggregated"]
# Print per-suite stats, log?
for task_group, task_group_info in eval_info.items():
if task_group == "overall":
continue # Skip the overall stats since we already printed it
print(f"\nAggregated Metrics for {task_group}:")
print(task_group_info["aggregated"])
else:
eval_info = eval_policy(
eval_env,
policy,
cfg.eval.n_episodes,
videos_dir=cfg.output_dir / "eval" / f"videos_step_{step_id}",
max_episodes_rendered=4,
start_seed=cfg.seed,
)
aggregated = eval_info["aggregated"]
with torch.no_grad(), (torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext()):
eval_info = eval_policy_all(
eval_env, # dict[suite][task_id] -> vec_env
policy,
cfg.eval.n_episodes,
videos_dir=videos_dir,
max_episodes_rendered=4,
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
verbose=False,
)
# overall metrics (suite-agnostic)
aggregated = eval_info["overall"]["aggregated"]
# optional: per-suite logging
for suite, suite_info in eval_info.items():
if suite == "overall":
continue
logging.info("Suite %s aggregated: %s", suite, suite_info["aggregated"])
# meters/tracker
eval_metrics = {
"avg_sum_reward": AverageMeter("∑rwrd", ":.3f"),
"pc_success": AverageMeter("success", ":.1f"),
"eval_s": AverageMeter("eval_s", ":.3f"),
"pc_success": AverageMeter("success", ":.1f"),
"eval_s": AverageMeter("eval_s", ":.3f"),
}
eval_tracker = MetricsTracker(
cfg.batch_size, dataset.num_frames, dataset.num_episodes, eval_metrics, initial_step=step
)
eval_tracker.eval_s = aggregated.pop("eval_s")
eval_tracker.avg_sum_reward = aggregated.pop("avg_sum_reward")
eval_tracker.pc_success = aggregated.pop("pc_success")
logging.info(eval_tracker)
eval_tracker.eval_s = aggregated.get("eval_s", 0.0)
eval_tracker.avg_sum_reward = aggregated.get("avg_sum_reward", float("nan"))
eval_tracker.pc_success = aggregated.get("pc_success", float("nan"))
if wandb_logger:
wandb_log_dict = {**eval_tracker.to_dict(), **eval_info}
wandb_logger.log_dict(wandb_log_dict, step, mode="eval")
wandb_logger.log_video(eval_info["video_paths"][0], step, mode="eval")
if eval_env:
if cfg.env.multitask_eval:
for _task_group, envs_dict in eval_env.items():
for _idx, env in envs_dict.items():
env.close()
else:
eval_env.close()
close_envs(eval_env)
logging.info("End of training")
if cfg.policy.push_to_hub: