Files
lerobot/tests/rewards/test_classifier_processor.py
Khalil Meftah 8a3d64033f Reward models refactor (#3142)
* feat(rewards): add RewardModelConfig and PreTrainedRewardModel base classes

* refactor(rewards): migrate Classifier from policies/sac/reward_model/ to rewards/classifier/

* refactor(rewards): migrate SARM from policies/sarm/ to rewards/sarm/

* refactor(rewards): add rewards/factory.py and remove reward model code from policies/factory.py

* refactor(rewards): update imports and delete old reward model locations

* test(rewards): add reward model tests and update existing test imports

* fix(rewards): restore full Classifier and SARM implementations

* test(rewards): restore missing CUDA and mixed precision classifier processor tests

* refactor(lerobot_train.py): remove rabc specific configuration and replace it with a generic samplerweight class in lerobot_train

* refactor(lerobot_train.py): add missing sampling weight script

* linter + missing files

* add testing for sampl weighter

* revert some useless changes, improve typing

* update docs

* add automatic detection of the progress path

* remove type exp

* improve comment

* fix: move rabc.py to rewards/sarm/ and update import paths

* refactor(imports): update reward model imports to new module structure

* refactor(imports): update reward model imports to reflect new module structure

* refactor(imports): conditionally import pandas based on availability

* feat(configs): add reward_model field to TrainPipelineConfig and Hub fields to RewardModelConfig

* refactor(policies): remove reward model branches from policy factory and __init__

* refactor(rewards): expand __init__ facade and fix SARMConfig __post_init__ crash

* feat(train): route reward model training through rewards/factory instead of policies/factory

* refactor(train): streamline reward model training logic

* fix(rewards): ensure FileNotFoundError is raised for missing config_file

* refactor(train): update __get_path_fields__ to include reward_model for config loading

* refactor(classifier): remove redundant input normalization in predict_reward method

* fix(train): raise ValueError for non-trainable reward models in train function

* refactor(pretrained_rm): add model card template

* refactor(tests): reward models

* refactor(sarm): update reset method and remove unused action prediction methods

* refactor(wandb): differentiate tags for reward model and policy training in cfg_to_group function

* fix(train): raise ValueError for PEFT usage in reward model training

* refactor(rewards): enhance RewardModelConfig with device handling and delta indices properties

---------

Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co>
2026-04-28 17:56:24 +02:00

339 lines
12 KiB
Python

# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for Reward Classifier processor."""
import tempfile
import pytest
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.processor import (
DataProcessorPipeline,
DeviceProcessorStep,
IdentityProcessorStep,
NormalizerProcessorStep,
TransitionKey,
)
from lerobot.processor.converters import create_transition, transition_to_batch
from lerobot.rewards.classifier.configuration_classifier import RewardClassifierConfig
from lerobot.rewards.classifier.processor_classifier import make_classifier_processor
from lerobot.utils.constants import OBS_IMAGE, OBS_STATE
def create_default_config():
"""Create a default Reward Classifier configuration for testing."""
config = RewardClassifierConfig()
config.input_features = {
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)),
OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
}
config.output_features = {
"reward": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
}
config.normalization_mapping = {
FeatureType.STATE: NormalizationMode.MEAN_STD,
FeatureType.VISUAL: NormalizationMode.IDENTITY,
FeatureType.ACTION: NormalizationMode.IDENTITY, # No normalization for classifier output
}
config.device = "cpu"
return config
def create_default_stats():
"""Create default dataset statistics for testing."""
return {
OBS_STATE: {"mean": torch.zeros(10), "std": torch.ones(10)},
OBS_IMAGE: {}, # No normalization for images
"reward": {}, # No normalization for classifier output
}
def test_make_classifier_processor_basic():
"""Test basic creation of Classifier processor."""
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_classifier_processor(config, stats)
# Check processor names
assert preprocessor.name == "classifier_preprocessor"
assert postprocessor.name == "classifier_postprocessor"
# Check steps in preprocessor
assert len(preprocessor.steps) == 3
assert isinstance(preprocessor.steps[0], NormalizerProcessorStep) # For input features
assert isinstance(preprocessor.steps[1], NormalizerProcessorStep) # For output features
assert isinstance(preprocessor.steps[2], DeviceProcessorStep)
# Check steps in postprocessor
assert len(postprocessor.steps) == 2
assert isinstance(postprocessor.steps[0], DeviceProcessorStep)
assert isinstance(postprocessor.steps[1], IdentityProcessorStep)
def test_classifier_processor_normalization():
"""Test that Classifier processor correctly normalizes data."""
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_classifier_processor(config, stats)
# Create test data
observation = {
OBS_STATE: torch.randn(10),
OBS_IMAGE: torch.randn(3, 224, 224),
}
action = torch.randn(1)
transition = create_transition(observation, action)
batch = transition_to_batch(transition)
# Process through preprocessor
processed = preprocessor(batch)
# Check that data is processed
assert processed[OBS_STATE].shape == (10,)
assert processed[OBS_IMAGE].shape == (3, 224, 224)
assert processed[TransitionKey.ACTION.value].shape == (1,)
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_classifier_processor_cuda():
"""Test Classifier processor with CUDA device."""
config = create_default_config()
config.device = "cuda"
stats = create_default_stats()
preprocessor, postprocessor = make_classifier_processor(config, stats)
# Create CPU data
observation = {
OBS_STATE: torch.randn(10),
OBS_IMAGE: torch.randn(3, 224, 224),
}
action = torch.randn(1)
transition = create_transition(observation, action)
batch = transition_to_batch(transition)
# Process through preprocessor
processed = preprocessor(batch)
# Check that data is on CUDA
assert processed[OBS_STATE].device.type == "cuda"
assert processed[OBS_IMAGE].device.type == "cuda"
assert processed[TransitionKey.ACTION.value].device.type == "cuda"
# Process through postprocessor
postprocessed = postprocessor(processed[TransitionKey.ACTION.value])
# Check that output is back on CPU
assert postprocessed.device.type == "cpu"
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_classifier_processor_accelerate_scenario():
"""Test Classifier processor in simulated Accelerate scenario."""
config = create_default_config()
config.device = "cuda:0"
stats = create_default_stats()
preprocessor, postprocessor = make_classifier_processor(config, stats)
# Simulate Accelerate: data already on GPU
device = torch.device("cuda:0")
observation = {
OBS_STATE: torch.randn(10).to(device),
OBS_IMAGE: torch.randn(3, 224, 224).to(device),
}
action = torch.randn(1).to(device)
transition = create_transition(observation, action)
batch = transition_to_batch(transition)
# Process through preprocessor
processed = preprocessor(batch)
# Check that data stays on same GPU
assert processed[OBS_STATE].device == device
assert processed[OBS_IMAGE].device == device
assert processed[TransitionKey.ACTION.value].device == device
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
def test_classifier_processor_multi_gpu():
"""Test Classifier processor with multi-GPU setup."""
config = create_default_config()
config.device = "cuda:0"
stats = create_default_stats()
preprocessor, postprocessor = make_classifier_processor(config, stats)
# Simulate data on different GPU
device = torch.device("cuda:1")
observation = {
OBS_STATE: torch.randn(10).to(device),
OBS_IMAGE: torch.randn(3, 224, 224).to(device),
}
action = torch.randn(1).to(device)
transition = create_transition(observation, action)
batch = transition_to_batch(transition)
# Process through preprocessor
processed = preprocessor(batch)
# Check that data stays on cuda:1
assert processed[OBS_STATE].device == device
assert processed[OBS_IMAGE].device == device
assert processed[TransitionKey.ACTION.value].device == device
def test_classifier_processor_without_stats():
"""Test Classifier processor creation without dataset statistics."""
config = create_default_config()
preprocessor, postprocessor = make_classifier_processor(config, dataset_stats=None)
# Should still create processors
assert preprocessor is not None
assert postprocessor is not None
# Process should still work
observation = {
OBS_STATE: torch.randn(10),
OBS_IMAGE: torch.randn(3, 224, 224),
}
action = torch.randn(1)
transition = create_transition(observation, action)
batch = transition_to_batch(transition)
processed = preprocessor(batch)
assert processed is not None
def test_classifier_processor_save_and_load():
"""Test saving and loading Classifier processor."""
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_classifier_processor(config, stats)
with tempfile.TemporaryDirectory() as tmpdir:
# Save preprocessor
preprocessor.save_pretrained(tmpdir)
# Load preprocessor
loaded_preprocessor = DataProcessorPipeline.from_pretrained(
tmpdir, config_filename="classifier_preprocessor.json"
)
# Test that loaded processor works
observation = {
OBS_STATE: torch.randn(10),
OBS_IMAGE: torch.randn(3, 224, 224),
}
action = torch.randn(1)
transition = create_transition(observation, action)
batch = transition_to_batch(transition)
processed = loaded_preprocessor(batch)
assert processed[OBS_STATE].shape == (10,)
assert processed[OBS_IMAGE].shape == (3, 224, 224)
assert processed[TransitionKey.ACTION.value].shape == (1,)
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_classifier_processor_mixed_precision():
"""Test Classifier processor with mixed precision."""
config = create_default_config()
config.device = "cuda"
stats = create_default_stats()
preprocessor, postprocessor = make_classifier_processor(config, stats)
# Replace DeviceProcessorStep with one that uses float16
modified_steps = []
for step in preprocessor.steps:
if isinstance(step, DeviceProcessorStep):
modified_steps.append(DeviceProcessorStep(device=config.device, float_dtype="float16"))
else:
modified_steps.append(step)
preprocessor.steps = modified_steps
# Create test data
observation = {
OBS_STATE: torch.randn(10, dtype=torch.float32),
OBS_IMAGE: torch.randn(3, 224, 224, dtype=torch.float32),
}
action = torch.randn(1, dtype=torch.float32)
transition = create_transition(observation, action)
batch = transition_to_batch(transition)
# Process through preprocessor
processed = preprocessor(batch)
# Check that data is converted to float16
assert processed[OBS_STATE].dtype == torch.float16
assert processed[OBS_IMAGE].dtype == torch.float16
assert processed[TransitionKey.ACTION.value].dtype == torch.float16
def test_classifier_processor_batch_data():
"""Test Classifier processor with batched data."""
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_classifier_processor(config, stats)
# Test with batched data
batch_size = 16
observation = {
OBS_STATE: torch.randn(batch_size, 10),
OBS_IMAGE: torch.randn(batch_size, 3, 224, 224),
}
action = torch.randn(batch_size, 1)
transition = create_transition(observation, action)
batch = transition_to_batch(transition)
# Process through preprocessor
processed = preprocessor(batch)
# Check that batch dimension is preserved
assert processed[OBS_STATE].shape == (batch_size, 10)
assert processed[OBS_IMAGE].shape == (batch_size, 3, 224, 224)
assert processed[TransitionKey.ACTION.value].shape == (batch_size, 1)
def test_classifier_processor_postprocessor_identity():
"""Test that Classifier postprocessor uses IdentityProcessor correctly."""
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_classifier_processor(config, stats)
# Create test data for postprocessor
reward = torch.tensor([[0.8], [0.3], [0.9]])
transition = create_transition(action=reward)
_ = transition_to_batch(transition)
# Process through postprocessor
processed = postprocessor(reward)
# IdentityProcessor should leave values unchanged (except device)
assert torch.allclose(processed.cpu(), reward.cpu())
assert processed.device.type == "cpu"