mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-16 17:20:05 +00:00
initial commit
This commit is contained in:
@@ -0,0 +1,188 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Test script for RLearN evaluation metrics.
|
||||
|
||||
This script tests the VOC-S and success/failure detection metrics with synthetic data
|
||||
to ensure they work correctly before running on real datasets.
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
|
||||
from lerobot.policies.rlearn.evaluation import (
|
||||
compute_success_failure_detection,
|
||||
compute_voc_s,
|
||||
generate_mismatched_languages,
|
||||
)
|
||||
|
||||
|
||||
def test_voc_s():
|
||||
"""Test VOC-S computation with synthetic data."""
|
||||
print("Testing VOC-S computation...")
|
||||
|
||||
# Test case 1: Perfect positive correlation (0 -> 1)
|
||||
perfect_positive = [np.linspace(0, 1, 20) for _ in range(10)]
|
||||
results = compute_voc_s(perfect_positive)
|
||||
|
||||
print("Perfect positive correlation:")
|
||||
print(f" Mean: {results['voc_s_mean']:.4f} (should be ~1.0)")
|
||||
print(f" IQM: {results['voc_s_iqm']:.4f} (should be ~1.0)")
|
||||
assert results["voc_s_mean"] > 0.95, f"Expected >0.95, got {results['voc_s_mean']}"
|
||||
|
||||
# Test case 2: Perfect negative correlation (1 -> 0)
|
||||
perfect_negative = [np.linspace(1, 0, 20) for _ in range(10)]
|
||||
results = compute_voc_s(perfect_negative)
|
||||
|
||||
print("Perfect negative correlation:")
|
||||
print(f" Mean: {results['voc_s_mean']:.4f} (should be ~-1.0)")
|
||||
print(f" IQM: {results['voc_s_iqm']:.4f} (should be ~-1.0)")
|
||||
assert results["voc_s_mean"] < -0.95, f"Expected <-0.95, got {results['voc_s_mean']}"
|
||||
|
||||
# Test case 3: No correlation (random)
|
||||
np.random.seed(42)
|
||||
random_rewards = [np.random.random(20) for _ in range(50)]
|
||||
results = compute_voc_s(random_rewards)
|
||||
|
||||
print("Random correlation:")
|
||||
print(f" Mean: {results['voc_s_mean']:.4f} (should be ~0.0)")
|
||||
print(f" IQM: {results['voc_s_iqm']:.4f} (should be ~0.0)")
|
||||
assert abs(results["voc_s_mean"]) < 0.3, f"Expected ~0, got {results['voc_s_mean']}"
|
||||
|
||||
# Test case 4: Mixed correlations
|
||||
mixed = []
|
||||
mixed.extend([np.linspace(0, 1, 15) for _ in range(5)]) # Positive
|
||||
mixed.extend([np.linspace(1, 0, 15) for _ in range(5)]) # Negative
|
||||
mixed.extend([np.random.random(15) for _ in range(5)]) # Random
|
||||
|
||||
results = compute_voc_s(mixed)
|
||||
print("Mixed correlations:")
|
||||
print(f" Mean: {results['voc_s_mean']:.4f}")
|
||||
print(f" IQM: {results['voc_s_iqm']:.4f}")
|
||||
print(f" Std: {results['voc_s_std']:.4f}")
|
||||
|
||||
print("✓ VOC-S tests passed!\n")
|
||||
|
||||
|
||||
def test_success_failure_detection():
|
||||
"""Test success/failure detection with synthetic data."""
|
||||
print("Testing Success/Failure Detection...")
|
||||
|
||||
# Test case 1: Clear separation (correct > incorrect)
|
||||
correct_rewards = [np.linspace(0, 1, 20) for _ in range(20)] # Always increasing
|
||||
incorrect_rewards = [np.linspace(0, 0.3, 20) for _ in range(20)] # Lower final values
|
||||
|
||||
results = compute_success_failure_detection(correct_rewards, incorrect_rewards)
|
||||
|
||||
print("Clear separation test:")
|
||||
print(f" Detection accuracy: {results['detection_accuracy']:.4f} (should be 1.0)")
|
||||
print(f" Mean correct: {results['mean_correct_final']:.4f}")
|
||||
print(f" Mean incorrect: {results['mean_incorrect_final']:.4f}")
|
||||
print(f" Separation score: {results['separation_score']:.4f}")
|
||||
assert results["detection_accuracy"] == 1.0, f"Expected 1.0, got {results['detection_accuracy']}"
|
||||
|
||||
# Test case 2: No separation (same distributions with some randomness)
|
||||
np.random.seed(42)
|
||||
same_rewards_1 = [np.random.normal(0.5, 0.05, 15) for _ in range(20)]
|
||||
same_rewards_2 = [np.random.normal(0.5, 0.05, 15) for _ in range(20)]
|
||||
|
||||
results = compute_success_failure_detection(same_rewards_1, same_rewards_2)
|
||||
|
||||
print("No separation test:")
|
||||
print(f" Detection accuracy: {results['detection_accuracy']:.4f} (should be ~0.5)")
|
||||
print(f" Separation score: {results['separation_score']:.4f} (should be ~0.0)")
|
||||
# Relax the assertion since random data can vary
|
||||
assert 0.2 <= results["detection_accuracy"] <= 0.8, (
|
||||
f"Expected ~0.5 (±0.3), got {results['detection_accuracy']}"
|
||||
)
|
||||
|
||||
# Test case 3: Partial separation
|
||||
np.random.seed(42)
|
||||
partial_correct = [np.random.normal(0.7, 0.1, 10) for _ in range(20)]
|
||||
partial_incorrect = [np.random.normal(0.4, 0.1, 10) for _ in range(20)]
|
||||
|
||||
results = compute_success_failure_detection(partial_correct, partial_incorrect)
|
||||
|
||||
print("Partial separation test:")
|
||||
print(f" Detection accuracy: {results['detection_accuracy']:.4f}")
|
||||
print(f" Separation score: {results['separation_score']:.4f}")
|
||||
|
||||
print("✓ Success/Failure Detection tests passed!\n")
|
||||
|
||||
|
||||
def test_mismatch_generation():
|
||||
"""Test mismatch language generation."""
|
||||
print("Testing mismatch language generation...")
|
||||
|
||||
original_languages = [
|
||||
"pick up the red ball",
|
||||
"put the cup on the table",
|
||||
"open the drawer",
|
||||
"close the door",
|
||||
]
|
||||
|
||||
# Test with default templates
|
||||
mismatched = generate_mismatched_languages(original_languages)
|
||||
|
||||
print(f"Original languages: {len(original_languages)}")
|
||||
print(f"Mismatched languages: {len(mismatched)}")
|
||||
assert len(mismatched) == len(original_languages)
|
||||
|
||||
# Ensure they're actually different
|
||||
for orig, mismatch in zip(original_languages, mismatched, strict=False):
|
||||
print(f" '{orig}' -> '{mismatch}'")
|
||||
assert orig != mismatch, "Mismatch should be different from original"
|
||||
|
||||
# Test with custom templates
|
||||
custom_templates = ["dance", "sing", "jump"]
|
||||
mismatched_custom = generate_mismatched_languages(original_languages, custom_templates)
|
||||
|
||||
print("\nWith custom templates:")
|
||||
for orig, mismatch in zip(original_languages, mismatched_custom, strict=False):
|
||||
print(f" '{orig}' -> '{mismatch}'")
|
||||
assert mismatch in custom_templates
|
||||
|
||||
print("✓ Mismatch generation tests passed!\n")
|
||||
|
||||
|
||||
def test_edge_cases():
|
||||
"""Test edge cases and error handling."""
|
||||
print("Testing edge cases...")
|
||||
|
||||
# Empty input
|
||||
empty_results = compute_voc_s([])
|
||||
assert empty_results["num_episodes"] == 0
|
||||
assert empty_results["voc_s_mean"] == 0.0
|
||||
|
||||
# Single frame episodes (should be skipped)
|
||||
single_frame = [np.array([0.5]) for _ in range(5)]
|
||||
results = compute_voc_s(single_frame)
|
||||
assert results["num_episodes"] == 0, "Single-frame episodes should be skipped"
|
||||
|
||||
# Constant rewards (should give correlation = 0)
|
||||
constant_rewards = [np.ones(10) * 0.5 for _ in range(5)]
|
||||
results = compute_voc_s(constant_rewards)
|
||||
print(f"Constant rewards correlation: {results['voc_s_mean']:.4f} (should be 0.0)")
|
||||
assert results["voc_s_mean"] == 0.0
|
||||
|
||||
# Mismatched array lengths for detection
|
||||
try:
|
||||
compute_success_failure_detection([np.array([1, 2])], [])
|
||||
assert False, "Should have raised ValueError"
|
||||
except ValueError:
|
||||
pass # Expected
|
||||
|
||||
print("✓ Edge case tests passed!\n")
|
||||
@@ -0,0 +1,237 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, PolicyFeature
|
||||
from lerobot.constants import OBS_IMAGES, OBS_LANGUAGE, REWARD
|
||||
from lerobot.policies.factory import make_processor
|
||||
from lerobot.policies.rlearn.configuration_rlearn import RLearNConfig
|
||||
from lerobot.policies.rlearn.modeling_rlearn import RLearNPolicy
|
||||
from tests.utils import require_package
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_rlearn_instantiation_and_forward_tensor_batch():
|
||||
"""Instantiate RLearN and run a forward pass with a (B, T, C, H, W) tensor input using a real model and real text."""
|
||||
cfg = RLearNConfig(
|
||||
model_name="google/siglip2-large-patch16-256",
|
||||
push_to_hub=False,
|
||||
freeze_backbones=True,
|
||||
)
|
||||
cfg.input_features = {
|
||||
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
cfg.output_features = {
|
||||
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
|
||||
}
|
||||
|
||||
policy = RLearNPolicy(cfg)
|
||||
|
||||
B, T, C, H, W = 2, 3, 3, 256, 256
|
||||
batch = {
|
||||
OBS_IMAGES: torch.rand(B, T, C, H, W),
|
||||
REWARD: torch.randint(low=0, high=1, size=(B, T)).float(),
|
||||
OBS_LANGUAGE: ["move the green cube into the box" for _ in range(B)],
|
||||
}
|
||||
|
||||
loss, logs = policy.forward(batch)
|
||||
assert isinstance(loss, torch.Tensor)
|
||||
assert "loss" in logs
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_rlearn_instantiation_and_forward_list_batch_with_language():
|
||||
"""Instantiate RLearN and run a forward pass with a list-of-frames input and real language using a real model."""
|
||||
cfg = RLearNConfig(
|
||||
model_name="google/siglip2-large-patch16-256",
|
||||
push_to_hub=False,
|
||||
freeze_backbones=True,
|
||||
)
|
||||
cfg.input_features = {
|
||||
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
cfg.output_features = {
|
||||
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
|
||||
}
|
||||
|
||||
policy = RLearNPolicy(cfg)
|
||||
|
||||
B, T, C, H, W = 2, 4, 3, 256, 256
|
||||
frames = [torch.rand(B, C, H, W) for _ in range(T)]
|
||||
batch = {
|
||||
OBS_IMAGES: frames, # list[(B, C, H, W)]
|
||||
REWARD: torch.randint(low=0, high=2, size=(B, T)).float(),
|
||||
OBS_LANGUAGE: ["move the red cube into the box" for _ in range(B)],
|
||||
}
|
||||
|
||||
loss, logs = policy.forward(batch)
|
||||
assert isinstance(loss, torch.Tensor)
|
||||
assert "loss" in logs
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_rlearn_composite_loss_shapes_and_terms():
|
||||
"""Smoke test composite loss: checks presence of terms and valid gradients."""
|
||||
cfg = RLearNConfig(
|
||||
model_name="google/siglip2-large-patch16-256",
|
||||
push_to_hub=False,
|
||||
freeze_backbones=True,
|
||||
loss_type="composite",
|
||||
lambda_prog=1.0,
|
||||
lambda_spatial_nce=0.5,
|
||||
lambda_rewind=0.4,
|
||||
num_ranking_pairs=32, # Fewer pairs for testing
|
||||
last_k_for_nce=2,
|
||||
)
|
||||
cfg.input_features = {
|
||||
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
cfg.output_features = {
|
||||
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
|
||||
}
|
||||
|
||||
policy = RLearNPolicy(cfg)
|
||||
|
||||
B, T, C, H, W = 2, 3, 3, 256, 256
|
||||
# Progress labels y in [0,1]
|
||||
y = torch.linspace(0, 1, T).unsqueeze(0).repeat(B, 1)
|
||||
batch = {
|
||||
OBS_IMAGES: torch.rand(B, T, C, H, W),
|
||||
REWARD: y.clone(),
|
||||
OBS_LANGUAGE: ["stack the blocks" for _ in range(B)],
|
||||
}
|
||||
|
||||
loss, logs = policy.forward(batch)
|
||||
assert isinstance(loss, torch.Tensor) and torch.isfinite(loss)
|
||||
# Expect composite terms present with spatial awareness and ReWiND
|
||||
assert "loss_prog" in logs
|
||||
assert "loss_spatial_nce" in logs
|
||||
assert "loss_rewind_forward" in logs
|
||||
assert "loss_rewind_reverse" in logs
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_rlearn_preprocessor_tokenizes_and_copies_task():
|
||||
cfg = RLearNConfig(
|
||||
model_name="google/siglip2-large-patch16-256",
|
||||
device="cpu",
|
||||
push_to_hub=False,
|
||||
)
|
||||
cfg.input_features = {
|
||||
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 64, 64)),
|
||||
}
|
||||
cfg.output_features = {
|
||||
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
|
||||
}
|
||||
|
||||
pre, post = make_processor(cfg, dataset_stats=None)
|
||||
|
||||
B, C, H, W = 2, 3, 64, 64
|
||||
batch = {
|
||||
"observation.image": torch.rand(B, C, H, W),
|
||||
REWARD: torch.zeros(B),
|
||||
"task": ["pick the cube", "place it in the box"],
|
||||
}
|
||||
|
||||
processed = pre(batch)
|
||||
|
||||
assert isinstance(processed, dict)
|
||||
assert f"{OBS_LANGUAGE}.tokens" in processed
|
||||
assert f"{OBS_LANGUAGE}.attention_mask" in processed
|
||||
assert OBS_LANGUAGE in processed
|
||||
|
||||
tokens = processed[f"{OBS_LANGUAGE}.tokens"]
|
||||
attn = processed[f"{OBS_LANGUAGE}.attention_mask"]
|
||||
assert tokens.dim() == 2 and attn.dim() == 2
|
||||
assert tokens.shape[0] == B and attn.shape[0] == B
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_rlearn_preprocessor_string_task_and_to_batch():
|
||||
cfg = RLearNConfig(
|
||||
model_name="google/siglip2-large-patch16-256",
|
||||
device="cpu",
|
||||
push_to_hub=False,
|
||||
)
|
||||
cfg.input_features = {
|
||||
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 64, 64)),
|
||||
}
|
||||
cfg.output_features = {
|
||||
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
|
||||
}
|
||||
|
||||
pre, post = make_processor(cfg, dataset_stats=None)
|
||||
|
||||
# Unbatched image and single string task
|
||||
batch = {
|
||||
"observation.image": torch.rand(3, 64, 64),
|
||||
REWARD: torch.tensor(0.0),
|
||||
"task": "move the green cube into the box",
|
||||
}
|
||||
|
||||
processed = pre(batch)
|
||||
|
||||
# Image should have batch dim now
|
||||
assert processed["observation.image"].dim() == 4 and processed["observation.image"].shape[0] == 1
|
||||
# Language copy and tokenization should exist
|
||||
assert OBS_LANGUAGE in processed and isinstance(processed[OBS_LANGUAGE], list)
|
||||
assert f"{OBS_LANGUAGE}.tokens" in processed
|
||||
assert f"{OBS_LANGUAGE}.attention_mask" in processed
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_rlearn_pipeline_end_to_end_forward():
|
||||
"""End-to-end: preprocessor + model forward using RLearN pipeline on synthetic data."""
|
||||
cfg = RLearNConfig(
|
||||
model_name="google/siglip2-large-patch16-256",
|
||||
device="cpu",
|
||||
push_to_hub=False,
|
||||
freeze_backbones=True,
|
||||
loss_type="composite",
|
||||
)
|
||||
cfg.input_features = {
|
||||
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
cfg.output_features = {
|
||||
REWARD: PolicyFeature(type=FeatureType.REWARD, shape=(1,)),
|
||||
}
|
||||
|
||||
# Build processors and model
|
||||
pre, post = make_processor(cfg, dataset_stats=None)
|
||||
policy = RLearNPolicy(cfg)
|
||||
|
||||
B, T, C, H, W = 2, 3, 3, 256, 256
|
||||
y = torch.linspace(0, 1, T).unsqueeze(0).repeat(B, 1)
|
||||
raw = {
|
||||
# Provide as observation.image to let preprocessor map/normalize and batch
|
||||
"observation.image": torch.rand(B, C, H, W), # not time-major to test ToBatch
|
||||
REWARD: y[:, :1].clone(), # single step label; pipeline keeps structure
|
||||
"task": ["insert the peg", "insert the peg"],
|
||||
}
|
||||
|
||||
processed = pre(raw)
|
||||
# Integrate preprocessor output with model forward
|
||||
loss, logs = policy.forward(
|
||||
{
|
||||
OBS_IMAGES: processed.get(OBS_IMAGES, processed.get("observation.image"))
|
||||
.unsqueeze(1)
|
||||
.repeat(1, T, 1, 1, 1),
|
||||
REWARD: y.clone(),
|
||||
OBS_LANGUAGE: processed[OBS_LANGUAGE],
|
||||
}
|
||||
)
|
||||
assert isinstance(loss, torch.Tensor) and torch.isfinite(loss)
|
||||
Reference in New Issue
Block a user