Files
lerobot/examples/evaluate_libero.py
T
Jade Choghari 5c628f1700 new things
2025-09-10 11:32:54 +02:00

255 lines
9.3 KiB
Python

"""
This script demonstrates how to evaluate a pretrained smolVLA policy on the LIBERO benchmark.
"""
import collections
import dataclasses
import logging
import math
import pathlib
import cv2
import draccus
import imageio
import numpy as np
import torch
from libero.libero import benchmark, get_libero_path
from libero.libero.envs import OffScreenRenderEnv
from tqdm import tqdm
from lerobot.policies.smolvla.modeling_smolvla import SmolVLAPolicy
from lerobot.policies.pi0.modeling_pi0 import PI0Policy
LIBERO_DUMMY_ACTION = [0.0] * 6 + [-1.0]
LIBERO_ENV_RESOLUTION = 256 # resolution used to render training data
@dataclasses.dataclass
class Args:
"""
Evaluation arguments for smolVLA on LIBERO.
"""
# --- Hugging Face arguments ---
policy_path: str = "lerobot/smolvla_base"
"""Path to the pretrained policy on the Hugging Face Hub or local directory."""
# --- LIBERO environment-specific parameters ---
task_suite_name: str = "libero_spatial"
"""Task suite. Options: libero_spatial, libero_object, libero_goal, libero_10, libero_90"""
num_steps_wait: int = 10
"""Number of steps to wait for objects to stabilize in sim."""
num_trials_per_task: int = 50
"""Number of rollouts per task."""
# --- Evaluation arguments ---
video_out_path: str = "data/libero/videos"
"""Path to save videos."""
device: str = "cuda"
"""Device to use for evaluation."""
seed: int = 7
"""Random Seed (for reproducibility)"""
@draccus.wrap()
def eval_libero(args: Args) -> None:
# Set random seed
torch.manual_seed(args.seed)
np.random.seed(args.seed)
# --- Load Policy ---
policy = SmolVLAPolicy.from_pretrained(args.policy_path)
policy.to(args.device)
policy.eval()
# --- Initialize LIBERO task suite ---
benchmark_dict = benchmark.get_benchmark_dict()
try:
task_suite = benchmark_dict[args.task_suite_name]()
except KeyError:
raise ValueError(
f"Unknown task suite: {args.task_suite_name}. "
f"Available options are: {list(benchmark_dict.keys())}"
)
num_tasks_in_suite = task_suite.n_tasks
logging.info(f"Task suite: {args.task_suite_name}")
pathlib.Path(args.video_out_path).mkdir(parents=True, exist_ok=True)
if args.task_suite_name == "libero_spatial":
max_steps = 220 # longest training demo has 193 steps
elif args.task_suite_name == "libero_object":
max_steps = 280 # longest training demo has 254 steps
elif args.task_suite_name == "libero_goal":
max_steps = 300 # longest training demo has 270 steps
elif args.task_suite_name == "libero_10":
max_steps = 520 # longest training demo has 505 steps
elif args.task_suite_name == "libero_90":
max_steps = 400 # longest training demo has 373 steps
else:
# Fallback for custom task suites
max_steps = 520
# --- Evaluation Loop ---
total_episodes, total_successes = 0, 0
for task_id in tqdm(range(num_tasks_in_suite), desc="Tasks"):
# Get task
task = task_suite.get_task(task_id)
# Get default LIBERO initial states
initial_states = task_suite.get_task_init_states(task_id)
# Initialize LIBERO environment and task description
env, task_description = _get_libero_env(task, LIBERO_ENV_RESOLUTION, args.seed)
# Start episodes
task_episodes, task_successes = 0, 0
for episode_idx in tqdm(
range(min(args.num_trials_per_task, len(initial_states))),
desc=f"Task {task_id}: {task.language}",
leave=False,
):
logging.info(f"\nTask: {task_description}")
# Reset environment and policy
env.reset()
policy.reset()
# Set initial states
obs = env.set_init_state(initial_states[episode_idx])
# IMPORTANT: Do nothing for the first few timesteps because the simulator drops objects
# and we need to wait for them to fall
for _ in range(args.num_steps_wait):
obs, _, _, _ = env.step(LIBERO_DUMMY_ACTION)
# Setup
t = 0
frames = []
done = False
# Add initial frame
agentview_image = np.ascontiguousarray(obs["agentview_image"][::-1, ::-1])
# frames.append(agentview_image)
# import ipdb; ipdb.set_trace()
logging.info(f"Starting episode {task_episodes+1}...")
while t < max_steps:
try:
# Get preprocessed image
# IMPORTANT: rotate 180 degrees to match train preprocessing
wrist_img = np.ascontiguousarray(obs["robot0_eye_in_hand_image"][::-1, ::-1])
agentview_image = np.ascontiguousarray(obs["agentview_image"][::-1, ::-1])
frames.append(agentview_image)
# Prepare observations dict
state = np.concatenate(
(
obs["robot0_eef_pos"],
_quat2axisangle(obs["robot0_eef_quat"]),
obs["robot0_gripper_qpos"],
)
)
observation = {
"observation.images.image": torch.from_numpy(agentview_image / 255.0)
.permute(2, 0, 1)
.to(torch.float32)
.to(args.device).unsqueeze(0),
"observation.images.image2": torch.from_numpy(wrist_img / 255.0)
.permute(2, 0, 1)
.to(torch.float32)
.to(args.device).unsqueeze(0),
"observation.state": torch.from_numpy(state).to(torch.float32).to(args.device).unsqueeze(0),
"task": task_description,
}
# Query model to get action
with torch.inference_mode():
action_tensor = policy.select_action(observation)
action = action_tensor.cpu().numpy()[0]
action[-1] = 1 - action[-1]
# Execute action in environment
obs, _, done, _ = env.step(action)
if done:
task_successes += 1
total_successes += 1
break
t += 1
except Exception as e:
logging.error(f"Caught exception: {e}")
break
task_episodes += 1
total_episodes += 1
# Save a replay video of the episode
suffix = "success" if done else "failure"
task_segment = task_description.replace(" ", "_").replace("/", "_")
video_path = (
pathlib.Path(args.video_out_path) / f"rollout_task_{task_id}_episode_{episode_idx}_{task_segment}_{suffix}.mp4"
)
fps = 30
writer = imageio.get_writer(video_path, fps=fps)
for image in frames:
writer.append_data(image)
writer.close()
logging.info(f"Saved video to {video_path}")
# Log current results
logging.info(f"Success: {done}")
if total_episodes > 0:
logging.info(f"# episodes completed so far: {total_episodes}")
logging.info(f"# successes: {total_successes} ({total_successes / total_episodes * 100:.1f}%)")
# Log final results for the task
if task_episodes > 0:
logging.info(f"Task {task_id} success rate: {float(task_successes) / float(task_episodes):.2f}")
if total_episodes > 0:
logging.info(f"Cumulative success rate: {float(total_successes) / float(total_episodes):.2f}")
logging.info("--- Evaluation finished ---")
if total_episodes > 0:
logging.info(f"Total success rate: {float(total_successes) / float(total_episodes):.2f}")
logging.info(f"Total episodes: {total_episodes}")
logging.info(f"Total successes: {total_successes}")
cv2.destroyAllWindows()
def _get_libero_env(task, resolution, seed):
"""Initializes and returns the LIBERO environment, along with the task description."""
task_description = task.language
task_bddl_file = pathlib.Path(get_libero_path("bddl_files")) / task.problem_folder / task.bddl_file
env_args = {
"bddl_file_name": str(task_bddl_file),
"camera_heights": resolution,
"camera_widths": resolution,
}
env = OffScreenRenderEnv(**env_args)
env.seed(seed) # IMPORTANT: seed seems to affect object positions even when using fixed initial state
return env, task_description
def _quat2axisangle(quat):
"""
Copied from robosuite:
https://github.com/ARISE-Initiative/robosuite/blob/eafb81f54ffc104f905ee48a16bb15f059176ad3/robosuite/utils/transform_utils.py#L490C1-L512C55
"""
# clip quaternion
if quat[3] > 1.0:
quat[3] = 1.0
elif quat[3] < -1.0:
quat[3] = -1.0
den = np.sqrt(1.0 - quat[3] * quat[3])
if math.isclose(den, 0.0):
# This is (close to) a zero degree rotation, immediately return
return np.zeros(3)
return (quat[:3] * 2.0 * math.acos(quat[3])) / den
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
eval_libero()