From 6f0ba4be3853b773ad2e8887be2761bc5474fc89 Mon Sep 17 00:00:00 2001 From: Khalil Meftah Date: Tue, 23 Jun 2026 14:03:57 +0200 Subject: [PATCH] Record eval rollouts as LeRobot datasets (#3825) * feat(eval): record eval rollouts as raw LeRobot datasets - Record raw env observations inline during rollout(), before preprocess_observation() transforms them. Uses LeRobotDataset.create() with add_frame()/save_episode(). - Supports vectorized envs: each env in the batch records independently, with save_episode() called per env on termination. Each task gets its own dataset under output_dir/recordings/{task_group}_{task_id}/. Enabled via --eval.recording=true; disabled by default. * fix(eval): use FeatureType enum comparison instead of string value * refactor(eval): per-env datasets recording, no double reset - Extract _infer_shape_from_obs() to reduce nesting in feature conversion - Move dataset creation into rollout() using its own env.reset() observation, eliminating the extra reset in run_one() - Replace deepcopy with _shallow_copy_obs() for raw observation stashing - Support batch_size > 1: each parallel env records to its own dataset (single env skips the env_0/ nesting for simplicity) - One-time warning for env_features keys missing from observations - Pass recording_dir + env_features through the call chain instead of a pre-built recording_dataset object * refactor(eval): remove shape inference and shallow copy helpers * feat(eval): optionally push recorded eval datasets to the Hub * fix(eval): address review comments - Wrap rollout loop in try/finally so finalize() runs on crash/interrupt - Guard push_to_hub with num_episodes > 0 to avoid pushing empty datasets - Hoist loop-invariant multi_env and base_repo_id out of creation loop --- src/lerobot/configs/default.py | 9 + src/lerobot/scripts/lerobot_eval.py | 308 +++++++++++++++++++++------- 2 files changed, 248 insertions(+), 69 deletions(-) diff --git a/src/lerobot/configs/default.py b/src/lerobot/configs/default.py index b809e71d9..648e03f33 100644 --- a/src/lerobot/configs/default.py +++ b/src/lerobot/configs/default.py @@ -73,8 +73,17 @@ class EvalConfig: # `use_async_envs` specifies whether to use asynchronous environments (multiprocessing). # Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1. use_async_envs: bool = True + # Whether to record eval rollouts as a LeRobot dataset on disk. + recording: bool = False + # If set, push recorded eval datasets to the Hub under this repo id (one repo per task, + # suffixed by task and env index). Requires recording=true. + recording_repo_id: str | None = None + # Whether the pushed recording repositories should be private. + recording_private: bool = False def __post_init__(self) -> None: + if self.recording_repo_id is not None and not self.recording: + raise ValueError("eval.recording_repo_id requires eval.recording=true.") if self.batch_size == 0: self.batch_size = self._auto_batch_size() if self.batch_size > self.n_episodes: diff --git a/src/lerobot/scripts/lerobot_eval.py b/src/lerobot/scripts/lerobot_eval.py index d45483d21..1ec4ea75f 100644 --- a/src/lerobot/scripts/lerobot_eval.py +++ b/src/lerobot/scripts/lerobot_eval.py @@ -72,8 +72,9 @@ from termcolor import colored from torch import Tensor, nn from tqdm import trange -from lerobot.configs import parser +from lerobot.configs import FeatureType, parser from lerobot.configs.eval import EvalPipelineConfig +from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.envs import ( check_env_attributes_and_types, close_envs, @@ -84,7 +85,7 @@ from lerobot.envs import ( from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors from lerobot.processor import PolicyProcessorPipeline from lerobot.types import PolicyAction -from lerobot.utils.constants import ACTION, DONE, OBS_STR, REWARD +from lerobot.utils.constants import ACTION, DONE, OBS_IMAGE, OBS_IMAGES, OBS_STR, REWARD from lerobot.utils.device_utils import get_safe_torch_device from lerobot.utils.import_utils import register_third_party_plugins from lerobot.utils.io_utils import write_video @@ -95,6 +96,65 @@ from lerobot.utils.utils import ( ) +def _env_features_to_dataset_features(env_features: dict) -> dict: + """Convert EnvConfig.features to the dict format expected by LeRobotDataset.create().""" + features = {} + for key, ft in env_features.items(): + shape = tuple(ft.shape) + if ft.type is FeatureType.VISUAL: + features[key] = {"dtype": "video", "shape": shape, "names": ["height", "width", "channel"]} + else: + features[key] = {"dtype": "float32", "shape": shape, "names": None} + features["next.reward"] = {"dtype": "float32", "shape": (1,), "names": None} + features["next.success"] = {"dtype": "bool", "shape": (1,), "names": None} + features["next.done"] = {"dtype": "bool", "shape": (1,), "names": None} + return features + + +def _build_raw_frame( + raw_obs: dict, + env_idx: int, + action: np.ndarray, + reward: float, + success: bool, + done: bool, + task: str, + env_features: dict, +) -> dict: + """Build a dataset frame from raw env observations for one env index. + + Keys in the frame match the keys in env_features so they align with the + dataset schema created by _env_features_to_dataset_features(). + """ + frame: dict[str, Any] = {} + for key in env_features: + if key == ACTION: + continue + if key.startswith("next."): + continue + if "pixels" in raw_obs and isinstance(raw_obs["pixels"], dict): + for cam_name, img in raw_obs["pixels"].items(): + candidate = f"{OBS_IMAGES}.{cam_name}" + if candidate == key: + frame[key] = img[env_idx] + if key in frame: + continue + if "pixels" in raw_obs and not isinstance(raw_obs["pixels"], dict) and key in ("pixels", OBS_IMAGE): + frame[key] = raw_obs["pixels"][env_idx] + continue + if key in raw_obs and isinstance(raw_obs[key], np.ndarray): + val = raw_obs[key][env_idx] + if val.dtype == np.float64: + val = val.astype(np.float32) + frame[key] = val + frame[ACTION] = action + frame["next.reward"] = np.atleast_1d(np.float32(reward)) + frame["next.success"] = np.atleast_1d(np.bool_(success)) + frame["next.done"] = np.atleast_1d(np.bool_(done)) + frame["task"] = task + return frame + + def rollout( env: gym.vector.VectorEnv, policy: PreTrainedPolicy, @@ -105,6 +165,10 @@ def rollout( seeds: list[int] | None = None, return_observations: bool = False, render_callback: Callable[[gym.vector.VectorEnv], None] | None = None, + recording_dir: Path | None = None, + env_features: dict | None = None, + recording_repo_id: str | None = None, + recording_private: bool = False, ) -> dict: """Run a batched policy rollout once through a batch of environments. @@ -145,6 +209,33 @@ def rollout( if render_callback is not None: render_callback(env) + recording_datasets: list[LeRobotDataset] | None = None + raw_observation = None + task_desc = "" + if recording_dir is not None and env_features is not None: + features = _env_features_to_dataset_features(env_features) + fps = env.unwrapped.metadata.get("render_fps", 30) + recording_datasets = [] + multi_env = env.num_envs > 1 + base_repo_id = recording_repo_id or "eval_recording" + for i in range(env.num_envs): + root = str(recording_dir / f"env_{i}") if multi_env else str(recording_dir) + repo_id = f"{base_repo_id}_env_{i}" if multi_env else base_repo_id + recording_datasets.append( + LeRobotDataset.create( + repo_id=repo_id, + fps=fps, + features=features, + root=root, + use_videos=True, + ) + ) + raw_observation = deepcopy(observation) + try: + task_desc = list(env.call("task_description"))[0] + except (AttributeError, NotImplementedError): + task_desc = "" + all_observations = [] all_actions = [] all_rewards = [] @@ -162,80 +253,112 @@ def rollout( leave=False, ) check_env_attributes_and_types(env) - while not np.all(done) and step < max_steps: - # Numpy array to tensor and changing dictionary keys to LeRobot policy format. - observation = preprocess_observation(observation) - if return_observations: - all_observations.append(deepcopy(observation)) + try: + while not np.all(done) and step < max_steps: + # Numpy array to tensor and changing dictionary keys to LeRobot policy format. + observation = preprocess_observation(observation) + if return_observations: + all_observations.append(deepcopy(observation)) - # Infer "task" from sub-environments (prefer natural language description). - # env.call() works with both SyncVectorEnv and AsyncVectorEnv. - try: - observation["task"] = list(env.call("task_description")) - except (AttributeError, NotImplementedError): + # Infer "task" from sub-environments (prefer natural language description). + # env.call() works with both SyncVectorEnv and AsyncVectorEnv. try: - observation["task"] = list(env.call("task")) + observation["task"] = list(env.call("task_description")) except (AttributeError, NotImplementedError): - observation["task"] = [""] * env.num_envs + try: + observation["task"] = list(env.call("task")) + except (AttributeError, NotImplementedError): + observation["task"] = [""] * env.num_envs - # Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO) - observation = env_preprocessor(observation) + # Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO) + observation = env_preprocessor(observation) - observation = preprocessor(observation) - with torch.inference_mode(): - action = policy.select_action(observation) - action = postprocessor(action) + observation = preprocessor(observation) + with torch.inference_mode(): + action = policy.select_action(observation) + action = postprocessor(action) - action_transition = {ACTION: action} - action_transition = env_postprocessor(action_transition) - action = action_transition[ACTION] + action_transition = {ACTION: action} + action_transition = env_postprocessor(action_transition) + action = action_transition[ACTION] - # Convert to CPU / numpy. - action_numpy: np.ndarray = action.to("cpu").numpy() - assert action_numpy.ndim == 2, "Action dimensions should be (batch, action_dim)" + # Convert to CPU / numpy. + action_numpy: np.ndarray = action.to("cpu").numpy() + assert action_numpy.ndim == 2, "Action dimensions should be (batch, action_dim)" - # Apply the next action. - observation, reward, terminated, truncated, info = env.step(action_numpy) - if render_callback is not None: - render_callback(env) + # Apply the next action. + observation, reward, terminated, truncated, info = env.step(action_numpy) + if render_callback is not None: + render_callback(env) - # VectorEnv stores is_success in `info["final_info"][env_index]["is_success"]`. "final_info" isn't - # available if none of the envs finished. - if "final_info" in info: - final_info = info["final_info"] - if not isinstance(final_info, dict): - raise RuntimeError( - "Unsupported `final_info` format: expected dict (Gymnasium >= 1.0). " - "You're likely using an older version of gymnasium (< 1.0). Please upgrade." + # VectorEnv stores is_success in `info["final_info"][env_index]["is_success"]`. "final_info" isn't + # available if none of the envs finished. + if "final_info" in info: + final_info = info["final_info"] + if not isinstance(final_info, dict): + raise RuntimeError( + "Unsupported `final_info` format: expected dict (Gymnasium >= 1.0). " + "You're likely using an older version of gymnasium (< 1.0). Please upgrade." + ) + successes = final_info["is_success"].tolist() + elif "is_success" in info: + is_success = info["is_success"] + successes = ( + is_success.tolist() + if hasattr(is_success, "tolist") + else [bool(is_success)] * env.num_envs ) - successes = final_info["is_success"].tolist() - elif "is_success" in info: - is_success = info["is_success"] - successes = ( - is_success.tolist() if hasattr(is_success, "tolist") else [bool(is_success)] * env.num_envs + else: + successes = [False] * env.num_envs + + if recording_datasets is not None and raw_observation is not None: + prev_done = done.copy() + for env_idx in range(env.num_envs): + if prev_done[env_idx]: + continue + frame = _build_raw_frame( + raw_observation, + env_idx, + action_numpy[env_idx], + reward[env_idx], + successes[env_idx], + bool(terminated[env_idx] | truncated[env_idx]), + task_desc, + recording_datasets[env_idx].features, + ) + recording_datasets[env_idx].add_frame(frame) + if terminated[env_idx] or truncated[env_idx]: + recording_datasets[env_idx].save_episode() + raw_observation = deepcopy(observation) + + # Keep track of which environments are done so far. + # Mark the episode as done if we reach the maximum step limit. + # This ensures that the rollout always terminates cleanly at `max_steps`, + # and allows logging/saving (e.g., videos) to be triggered consistently. + done = terminated | truncated | done + if step + 1 == max_steps: + done = np.ones_like(done, dtype=bool) + + all_actions.append(torch.from_numpy(action_numpy)) + all_rewards.append(torch.from_numpy(reward)) + all_dones.append(torch.from_numpy(done)) + all_successes.append(torch.tensor(successes)) + + step += 1 + running_success_rate = ( + einops.reduce(torch.stack(all_successes, dim=1), "b n -> b", "any").numpy().mean() ) - else: - successes = [False] * env.num_envs - - # Keep track of which environments are done so far. - # Mark the episode as done if we reach the maximum step limit. - # This ensures that the rollout always terminates cleanly at `max_steps`, - # and allows logging/saving (e.g., videos) to be triggered consistently. - done = terminated | truncated | done - if step + 1 == max_steps: - done = np.ones_like(done, dtype=bool) - - all_actions.append(torch.from_numpy(action_numpy)) - all_rewards.append(torch.from_numpy(reward)) - all_dones.append(torch.from_numpy(done)) - all_successes.append(torch.tensor(successes)) - - step += 1 - running_success_rate = ( - einops.reduce(torch.stack(all_successes, dim=1), "b n -> b", "any").numpy().mean() - ) - progbar.set_postfix({"running_success_rate": f"{running_success_rate.item() * 100:.1f}%"}) - progbar.update() + progbar.set_postfix({"running_success_rate": f"{running_success_rate.item() * 100:.1f}%"}) + progbar.update() + finally: + if recording_datasets is not None: + for ds in recording_datasets: + ds.finalize() + if recording_repo_id is not None: + if ds.num_episodes > 0: + ds.push_to_hub(private=recording_private) + else: + logging.warning("No episodes recorded for %s — skipping push to hub.", ds.repo_id) # Track the final observation. if return_observations: @@ -273,6 +396,10 @@ def eval_policy( videos_dir: Path | None = None, return_episode_data: bool = False, start_seed: int | None = None, + recording_dir: Path | None = None, + env_features: dict | None = None, + recording_repo_id: str | None = None, + recording_private: bool = False, ) -> dict: """ Args: @@ -361,6 +488,10 @@ def eval_policy( seeds=list(seeds) if seeds else None, return_observations=return_episode_data, render_callback=render_frame if max_episodes_rendered > 0 else None, + recording_dir=recording_dir, + env_features=env_features, + recording_repo_id=recording_repo_id, + recording_private=recording_private, ) # Figure out where in each rollout sequence the first done condition was encountered (results after @@ -563,6 +694,10 @@ def eval_main(cfg: EvalPipelineConfig): # Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments) env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy) + recording_dir = Path(cfg.output_dir) / "recordings" if cfg.eval.recording else None + max_episodes_rendered = 0 if cfg.eval.recording else 10 + videos_dir = None if cfg.eval.recording else Path(cfg.output_dir) / "videos" + with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(): info = eval_policy_all( envs=envs, @@ -572,10 +707,15 @@ def eval_main(cfg: EvalPipelineConfig): preprocessor=preprocessor, postprocessor=postprocessor, n_episodes=cfg.eval.n_episodes, - max_episodes_rendered=10, - videos_dir=Path(cfg.output_dir) / "videos", + max_episodes_rendered=max_episodes_rendered, + videos_dir=videos_dir, + return_episode_data=False, start_seed=cfg.seed, max_parallel_tasks=cfg.env.max_parallel_tasks, + recording_dir=recording_dir, + env_features=cfg.env.features if cfg.eval.recording else None, + recording_repo_id=cfg.eval.recording_repo_id, + recording_private=cfg.eval.recording_private, ) print("Overall Aggregated Metrics:") print(info["overall"]) @@ -618,6 +758,10 @@ def eval_one( videos_dir: Path | None, return_episode_data: bool, start_seed: int | None, + recording_dir: Path | None = None, + env_features: dict | None = None, + recording_repo_id: str | None = None, + recording_private: bool = False, ) -> TaskMetrics: """Evaluates one task_id of one suite using the provided vec env.""" @@ -635,6 +779,10 @@ def eval_one( videos_dir=task_videos_dir, return_episode_data=return_episode_data, start_seed=start_seed, + recording_dir=recording_dir, + env_features=env_features, + recording_repo_id=recording_repo_id, + recording_private=recording_private, ) per_episode = task_result["per_episode"] @@ -661,6 +809,10 @@ def run_one( videos_dir: Path | None, return_episode_data: bool, start_seed: int | None, + recording_dir: Path | None = None, + env_features: dict | None = None, + recording_repo_id: str | None = None, + recording_private: bool = False, ): """ Run eval_one for a single (task_group, task_id, env). @@ -672,7 +824,13 @@ def run_one( task_videos_dir = videos_dir / f"{task_group}_{task_id}" task_videos_dir.mkdir(parents=True, exist_ok=True) - # Call the existing eval_one (assumed to return TaskMetrics-like dict) + task_recording_dir = None + task_repo_id = None + if recording_dir is not None and env_features is not None: + task_recording_dir = recording_dir / f"{task_group}_{task_id}" + if recording_repo_id is not None: + task_repo_id = f"{recording_repo_id}_{task_group}_{task_id}" + metrics = eval_one( env, policy=policy, @@ -685,8 +843,12 @@ def run_one( videos_dir=task_videos_dir, return_episode_data=return_episode_data, start_seed=start_seed, + recording_dir=task_recording_dir, + env_features=env_features, + recording_repo_id=task_repo_id, + recording_private=recording_private, ) - # ensure we always provide video_paths key to simplify accumulation + if max_episodes_rendered > 0: metrics.setdefault("video_paths", []) return task_group, task_id, metrics @@ -702,6 +864,10 @@ def eval_policy_all( n_episodes: int, *, max_episodes_rendered: int = 0, + recording_dir: Path | None = None, + env_features: dict | None = None, + recording_repo_id: str | None = None, + recording_private: bool = False, videos_dir: Path | None = None, return_episode_data: bool = False, start_seed: int | None = None, @@ -761,6 +927,10 @@ def eval_policy_all( videos_dir=videos_dir, return_episode_data=return_episode_data, start_seed=start_seed, + recording_dir=recording_dir, + env_features=env_features, + recording_repo_id=recording_repo_id, + recording_private=recording_private, ) if max_parallel_tasks <= 1: