diff --git a/src/lerobot/configs/default.py b/src/lerobot/configs/default.py index b809e71d9..30f021ba3 100644 --- a/src/lerobot/configs/default.py +++ b/src/lerobot/configs/default.py @@ -73,6 +73,8 @@ class EvalConfig: # `use_async_envs` specifies whether to use asynchronous environments (multiprocessing). # Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1. use_async_envs: bool = True + # Whether to record eval rollouts as a LeRobot v3.0 dataset on disk. + recording: bool = False def __post_init__(self) -> None: if self.batch_size == 0: diff --git a/src/lerobot/scripts/lerobot_eval.py b/src/lerobot/scripts/lerobot_eval.py index d45483d21..f877fd39f 100644 --- a/src/lerobot/scripts/lerobot_eval.py +++ b/src/lerobot/scripts/lerobot_eval.py @@ -74,6 +74,7 @@ from tqdm import trange from lerobot.configs import parser from lerobot.configs.eval import EvalPipelineConfig +from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.envs import ( check_env_attributes_and_types, close_envs, @@ -84,7 +85,7 @@ from lerobot.envs import ( from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors from lerobot.processor import PolicyProcessorPipeline from lerobot.types import PolicyAction -from lerobot.utils.constants import ACTION, DONE, OBS_STR, REWARD +from lerobot.utils.constants import ACTION, DONE, OBS_IMAGE, OBS_IMAGES, OBS_STATE, OBS_STR, REWARD from lerobot.utils.device_utils import get_safe_torch_device from lerobot.utils.import_utils import register_third_party_plugins from lerobot.utils.io_utils import write_video @@ -95,6 +96,56 @@ from lerobot.utils.utils import ( ) +def _env_features_to_dataset_features(env_features: dict) -> dict: + """Convert EnvConfig.features (PolicyFeature objects) to the plain dict format for LeRobotDataset.create().""" + features = {} + for key, ft in env_features.items(): + if ft.type.value == "visual": + features[key] = { + "dtype": "video", + "shape": tuple(ft.shape), + "names": ["channel", "height", "width"], + } + else: + features[key] = {"dtype": "float32", "shape": tuple(ft.shape), "names": None} + features["next.reward"] = {"dtype": "float32", "shape": (1,), "names": None} + features["next.success"] = {"dtype": "bool", "shape": (1,), "names": None} + features["next.done"] = {"dtype": "bool", "shape": (1,), "names": None} + return features + + +def _build_raw_frame( + raw_obs: dict, + env_idx: int, + action: np.ndarray, + reward: float, + success: bool, + done: bool, + task: str, +) -> dict: + """Build a dataset frame from raw env observations for one env index.""" + frame: dict[str, Any] = {} + if "pixels" in raw_obs: + if isinstance(raw_obs["pixels"], dict): + for cam_name, img in raw_obs["pixels"].items(): + frame[f"{OBS_IMAGES}.{cam_name}"] = img[env_idx] + else: + frame[OBS_IMAGE] = raw_obs["pixels"][env_idx] + if "agent_pos" in raw_obs: + frame[OBS_STATE] = raw_obs["agent_pos"][env_idx] + for key, val in raw_obs.items(): + if key in ("pixels", "agent_pos"): + continue + if isinstance(val, np.ndarray): + frame[f"{OBS_STR}.{key}"] = val[env_idx] + frame[ACTION] = action + frame["next.reward"] = np.float32(reward) + frame["next.success"] = success + frame["next.done"] = done + frame["task"] = task + return frame + + def rollout( env: gym.vector.VectorEnv, policy: PreTrainedPolicy, @@ -105,6 +156,7 @@ def rollout( seeds: list[int] | None = None, return_observations: bool = False, render_callback: Callable[[gym.vector.VectorEnv], None] | None = None, + recording_dataset: Any | None = None, ) -> dict: """Run a batched policy rollout once through a batch of environments. @@ -145,6 +197,14 @@ def rollout( if render_callback is not None: render_callback(env) + raw_observation = deepcopy(observation) if recording_dataset is not None else None + task_desc = "" + if recording_dataset is not None: + try: + task_desc = list(env.call("task_description"))[0] + except (AttributeError, NotImplementedError): + task_desc = "" + all_observations = [] all_actions = [] all_rewards = [] @@ -217,6 +277,25 @@ def rollout( else: successes = [False] * env.num_envs + if recording_dataset is not None and raw_observation is not None: + prev_done = done.copy() + for env_idx in range(env.num_envs): + if prev_done[env_idx]: + continue + frame = _build_raw_frame( + raw_observation, + env_idx, + action_numpy[env_idx], + reward[env_idx], + successes[env_idx], + bool(terminated[env_idx] | truncated[env_idx]), + task_desc, + ) + recording_dataset.add_frame(frame) + if terminated[env_idx] or truncated[env_idx]: + recording_dataset.save_episode() + raw_observation = deepcopy(observation) + # Keep track of which environments are done so far. # Mark the episode as done if we reach the maximum step limit. # This ensures that the rollout always terminates cleanly at `max_steps`, @@ -273,6 +352,7 @@ def eval_policy( videos_dir: Path | None = None, return_episode_data: bool = False, start_seed: int | None = None, + recording_dataset: Any | None = None, ) -> dict: """ Args: @@ -361,6 +441,7 @@ def eval_policy( seeds=list(seeds) if seeds else None, return_observations=return_episode_data, render_callback=render_frame if max_episodes_rendered > 0 else None, + recording_dataset=recording_dataset, ) # Figure out where in each rollout sequence the first done condition was encountered (results after @@ -563,6 +644,10 @@ def eval_main(cfg: EvalPipelineConfig): # Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments) env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy) + recording_dir = Path(cfg.output_dir) / "recordings" if cfg.eval.recording else None + max_episodes_rendered = 0 if cfg.eval.recording else 10 + videos_dir = None if cfg.eval.recording else Path(cfg.output_dir) / "videos" + with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(): info = eval_policy_all( envs=envs, @@ -572,10 +657,13 @@ def eval_main(cfg: EvalPipelineConfig): preprocessor=preprocessor, postprocessor=postprocessor, n_episodes=cfg.eval.n_episodes, - max_episodes_rendered=10, - videos_dir=Path(cfg.output_dir) / "videos", + max_episodes_rendered=max_episodes_rendered, + videos_dir=videos_dir, + return_episode_data=False, start_seed=cfg.seed, max_parallel_tasks=cfg.env.max_parallel_tasks, + recording_dir=recording_dir, + env_features=cfg.env.features if cfg.eval.recording else None, ) print("Overall Aggregated Metrics:") print(info["overall"]) @@ -618,6 +706,7 @@ def eval_one( videos_dir: Path | None, return_episode_data: bool, start_seed: int | None, + recording_dataset: Any | None = None, ) -> TaskMetrics: """Evaluates one task_id of one suite using the provided vec env.""" @@ -635,6 +724,7 @@ def eval_one( videos_dir=task_videos_dir, return_episode_data=return_episode_data, start_seed=start_seed, + recording_dataset=recording_dataset, ) per_episode = task_result["per_episode"] @@ -661,6 +751,8 @@ def run_one( videos_dir: Path | None, return_episode_data: bool, start_seed: int | None, + recording_dir: Path | None = None, + env_features: dict | None = None, ): """ Run eval_one for a single (task_group, task_id, env). @@ -672,21 +764,38 @@ def run_one( task_videos_dir = videos_dir / f"{task_group}_{task_id}" task_videos_dir.mkdir(parents=True, exist_ok=True) - # Call the existing eval_one (assumed to return TaskMetrics-like dict) - metrics = eval_one( - env, - policy=policy, - env_preprocessor=env_preprocessor, - env_postprocessor=env_postprocessor, - preprocessor=preprocessor, - postprocessor=postprocessor, - n_episodes=n_episodes, - max_episodes_rendered=max_episodes_rendered, - videos_dir=task_videos_dir, - return_episode_data=return_episode_data, - start_seed=start_seed, - ) - # ensure we always provide video_paths key to simplify accumulation + recording_dataset = None + if recording_dir is not None and env_features is not None: + task_recording_dir = recording_dir / f"{task_group}_{task_id}" + fps = env.unwrapped.metadata.get("render_fps", 30) + features = _env_features_to_dataset_features(env_features) + recording_dataset = LeRobotDataset.create( + repo_id=f"eval_{task_group}_{task_id}", + fps=fps, + features=features, + root=str(task_recording_dir), + use_videos=True, + ) + + try: + metrics = eval_one( + env, + policy=policy, + env_preprocessor=env_preprocessor, + env_postprocessor=env_postprocessor, + preprocessor=preprocessor, + postprocessor=postprocessor, + n_episodes=n_episodes, + max_episodes_rendered=max_episodes_rendered, + videos_dir=task_videos_dir, + return_episode_data=return_episode_data, + start_seed=start_seed, + recording_dataset=recording_dataset, + ) + finally: + if recording_dataset is not None: + recording_dataset.finalize() + if max_episodes_rendered > 0: metrics.setdefault("video_paths", []) return task_group, task_id, metrics @@ -702,6 +811,8 @@ def eval_policy_all( n_episodes: int, *, max_episodes_rendered: int = 0, + recording_dir: Path | None = None, + env_features: dict | None = None, videos_dir: Path | None = None, return_episode_data: bool = False, start_seed: int | None = None, @@ -761,6 +872,8 @@ def eval_policy_all( videos_dir=videos_dir, return_episode_data=return_episode_data, start_seed=start_seed, + recording_dir=recording_dir, + env_features=env_features, ) if max_parallel_tasks <= 1: