mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-15 16:49:55 +00:00
RL stack refactoring (#3075)
* refactor: RL stack refactoring — RLAlgorithm, RLTrainer, DataMixer, and SAC restructuring * chore: clarify torch.compile disabled note in SACAlgorithm * fix(teleop): keyboard EE teleop not registering special keys and losing intervention state Fixes #2345 Co-authored-by: jpizarrom <jpizarrom@gmail.com> * fix: remove leftover normalization calls from reward classifier predict_reward Fixes #2355 * fix: add thread synchronization to ReplayBuffer to prevent race condition between add() and sample() * refactor: update SACAlgorithm to pass action_dim to _init_critics and fix encoder reference * perf: remove redundant CPU→GPU→CPU transition move in learner * Fix: add kwargs in reward classifier __init__() * fix: include IS_INTERVENTION in complementary_info sent to learner for offline replay buffer * fix: add try/finally to control_loop to ensure image writer cleanup on exit * fix: use string key for IS_INTERVENTION in complementary_info to avoid torch.load serialization error * fix: skip tests that require grpc if not available * fix(tests): ensure tensor stats comparison accounts for reshaping in normalization tests * fix(tests): skip tests that require grpc if not available * refactor(rl): expose public API in rl/__init__ and use relative imports in sub-packages * fix(config): update vision encoder model name to lerobot/resnet10 * fix(sac): clarify torch.compile status * refactor(rl): update shutdown_event type hints from 'any' to 'Any' for consistency and clarity * refactor(sac): simplify optimizer return structure * perf(rl): use async iterators in OnlineOfflineMixer.get_iterator * refactor(sac): decouple algorithm hyperparameters from policy config * update losses names in tests * fix docstring * remove unused type alias * fix test for flat dict structure * refactor(policies): rename policies/sac → policies/gaussian_actor * refactor(rl/sac): consolidate hyperparameter ownership and clean up discrete critic * perf(observation_processor): add CUDA support for image processing * fix(rl): correctly wire HIL-SERL gripper penalty through processor pipeline (cherry picked from commit9c2af818ff) * fix(rl): add time limit processor to environment pipeline (cherry picked from commitcd105f65cb) * fix(rl): clarify discrete gripper action mapping in GripperVelocityToJoint for SO100 (cherry picked from commit494f469a2b) * fix(rl): update neutral gripper action (cherry picked from commit9c9064e5be) * fix(rl): merge environment and action-processor info in transition processing (cherry picked from commit30e1886b64) * fix(rl): mirror gym_manipulator in actor (cherry picked from commitd2a046dfc5) * fix(rl): postprocess action in actor (cherry picked from commitc2556439e5) * fix(rl): improve action processing for discrete and continuous actions (cherry picked from commitf887ab3f6a) * fix(rl): enhance intervention handling in actor and learner (cherry picked from commitef8bfffbd7) * Revert "perf(observation_processor): add CUDA support for image processing" This reverts commit38b88c414c. * refactor(rl): make algorithm a nested config so all SAC hyperparameters are JSON-addressable * refactor(rl): add make_algorithm_config function for RLAlgorithmConfig instantiation * refactor(rl): add type property to RLAlgorithmConfig for better clarity * refactor(rl): make RLAlgorithmConfig an abstract base class for better extensibility * refactor(tests): remove grpc import checks from test files for cleaner code * fix(tests): gate RL tests on the `datasets` extra * refactor: simplify docstrings for clarity and conciseness across multiple files * fix(rl): update gripper position key and handle action absence during reset * fix(rl): record pre-step observation so (obs, action, next.reward) align in gym_manipulator dataset * refactor: clean up import statements * chore: address reviewer comments * chore: improve visual stats reshaping logic and update docstring for clarity * refactor: enforce mandatory config_class and name attributes in RLAlgorithm * refactor: implement NotImplementedError for abstract methods in RLAlgorithm and DataMixer * refactor: replace build_algorithm with make_algorithm for SACAlgorithmConfig and update related tests * refactor: add require_package calls for grpcio and gym-hil in relevant modules * refactor(rl): move grpcio guards to runtime entry points * feat(rl): consolidate HIL-SERL checkpoint into HF-style components Make `RLAlgorithmConfig` and `RLAlgorithm` `HubMixin`s, add abstract `state_dict()` / `load_state_dict()` for critic ensemble, target nets and `log_alpha`, and persist them as a sibling `algorithm/` component next to `pretrained_model/`. Replace the pickled `training_state.pt` with an enriched `training_step.json` carrying `step` and `interaction_step`, so resume restores actor + critics + target nets + temperature + optimizers + RNG + counters from HF-standard files. * refactor(rl): move actor weight-sync wire format from policy to algorithm * refactor(rl): update type hints for learner and actor functions * refactor(rl): hoist grpcio guard to module top in actor/learner * chore(rl): manage import pattern in actor (#3564) * chore(rl): manage import pattern in actor * chore(rl): optional grpc imports in learner; quote grpc ServicerContext types --------- Co-authored-by: Khalil Meftah <khalil.meftah@huggingface.co> * update uv.lock * chore(doc): update doc --------- Co-authored-by: jpizarrom <jpizarrom@gmail.com> Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
This commit is contained in:
@@ -1,414 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for SAC policy processor."""
|
||||
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.policies.sac.configuration_sac import SACConfig
|
||||
from lerobot.policies.sac.processor_sac import make_sac_pre_post_processors
|
||||
from lerobot.processor import (
|
||||
AddBatchDimensionProcessorStep,
|
||||
DataProcessorPipeline,
|
||||
DeviceProcessorStep,
|
||||
NormalizerProcessorStep,
|
||||
RenameObservationsProcessorStep,
|
||||
TransitionKey,
|
||||
UnnormalizerProcessorStep,
|
||||
)
|
||||
from lerobot.processor.converters import create_transition, transition_to_batch
|
||||
from lerobot.utils.constants import ACTION, OBS_STATE
|
||||
|
||||
|
||||
def create_default_config():
|
||||
"""Create a default SAC configuration for testing."""
|
||||
config = SACConfig()
|
||||
config.input_features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)),
|
||||
}
|
||||
config.output_features = {
|
||||
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(5,)),
|
||||
}
|
||||
config.normalization_mapping = {
|
||||
FeatureType.STATE: NormalizationMode.MEAN_STD,
|
||||
FeatureType.ACTION: NormalizationMode.MIN_MAX,
|
||||
}
|
||||
config.device = "cpu"
|
||||
return config
|
||||
|
||||
|
||||
def create_default_stats():
|
||||
"""Create default dataset statistics for testing."""
|
||||
return {
|
||||
OBS_STATE: {"mean": torch.zeros(10), "std": torch.ones(10)},
|
||||
ACTION: {"min": torch.full((5,), -1.0), "max": torch.ones(5)},
|
||||
}
|
||||
|
||||
|
||||
def test_make_sac_processor_basic():
|
||||
"""Test basic creation of SAC processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
# Check processor names
|
||||
assert preprocessor.name == "policy_preprocessor"
|
||||
assert postprocessor.name == "policy_postprocessor"
|
||||
|
||||
# Check steps in preprocessor
|
||||
assert len(preprocessor.steps) == 4
|
||||
assert isinstance(preprocessor.steps[0], RenameObservationsProcessorStep)
|
||||
assert isinstance(preprocessor.steps[1], AddBatchDimensionProcessorStep)
|
||||
assert isinstance(preprocessor.steps[2], DeviceProcessorStep)
|
||||
assert isinstance(preprocessor.steps[3], NormalizerProcessorStep)
|
||||
|
||||
# Check steps in postprocessor
|
||||
assert len(postprocessor.steps) == 2
|
||||
assert isinstance(postprocessor.steps[0], UnnormalizerProcessorStep)
|
||||
assert isinstance(postprocessor.steps[1], DeviceProcessorStep)
|
||||
|
||||
|
||||
def test_sac_processor_normalization_modes():
|
||||
"""Test that SAC processor correctly handles different normalization modes."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
# Create test data
|
||||
observation = {OBS_STATE: torch.randn(10) * 2} # Larger values to test normalization
|
||||
action = torch.rand(5) * 2 - 1 # Range [-1, 1]
|
||||
transition = create_transition(observation, action)
|
||||
batch = transition_to_batch(transition)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(batch)
|
||||
|
||||
# Check that data is normalized and batched
|
||||
# State should be mean-std normalized
|
||||
# Action should be min-max normalized to [-1, 1]
|
||||
assert processed[OBS_STATE].shape == (1, 10)
|
||||
assert processed[TransitionKey.ACTION.value].shape == (1, 5)
|
||||
|
||||
# Process action through postprocessor
|
||||
postprocessed = postprocessor(processed[TransitionKey.ACTION.value])
|
||||
|
||||
# Check that action is unnormalized (but still batched)
|
||||
assert postprocessed.shape == (1, 5)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_sac_processor_cuda():
|
||||
"""Test SAC processor with CUDA device."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
# Create CPU data
|
||||
observation = {OBS_STATE: torch.randn(10)}
|
||||
action = torch.randn(5)
|
||||
transition = create_transition(observation, action)
|
||||
batch = transition_to_batch(transition)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(batch)
|
||||
|
||||
# Check that data is on CUDA
|
||||
assert processed[OBS_STATE].device.type == "cuda"
|
||||
assert processed[TransitionKey.ACTION.value].device.type == "cuda"
|
||||
|
||||
# Process through postprocessor
|
||||
postprocessed = postprocessor(processed[TransitionKey.ACTION.value])
|
||||
|
||||
# Check that action is back on CPU
|
||||
assert postprocessed.device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_sac_processor_accelerate_scenario():
|
||||
"""Test SAC processor in simulated Accelerate scenario."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
# Simulate Accelerate: data already on GPU
|
||||
device = torch.device("cuda:0")
|
||||
observation = {OBS_STATE: torch.randn(10).to(device)}
|
||||
action = torch.randn(5).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
batch = transition_to_batch(transition)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(batch)
|
||||
|
||||
# Check that data stays on same GPU
|
||||
assert processed[OBS_STATE].device == device
|
||||
assert processed[TransitionKey.ACTION.value].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_sac_processor_multi_gpu():
|
||||
"""Test SAC processor with multi-GPU setup."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
# Simulate data on different GPU
|
||||
device = torch.device("cuda:1")
|
||||
observation = {OBS_STATE: torch.randn(10).to(device)}
|
||||
action = torch.randn(5).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
batch = transition_to_batch(transition)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(batch)
|
||||
|
||||
# Check that data stays on cuda:1
|
||||
assert processed[OBS_STATE].device == device
|
||||
assert processed[TransitionKey.ACTION.value].device == device
|
||||
|
||||
|
||||
def test_sac_processor_without_stats():
|
||||
"""Test SAC processor creation without dataset statistics."""
|
||||
config = create_default_config()
|
||||
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(config, dataset_stats=None)
|
||||
|
||||
# Should still create processors
|
||||
assert preprocessor is not None
|
||||
assert postprocessor is not None
|
||||
|
||||
# Process should still work
|
||||
observation = {OBS_STATE: torch.randn(10)}
|
||||
action = torch.randn(5)
|
||||
transition = create_transition(observation, action)
|
||||
batch = transition_to_batch(transition)
|
||||
|
||||
processed = preprocessor(batch)
|
||||
assert processed is not None
|
||||
|
||||
|
||||
def test_sac_processor_save_and_load():
|
||||
"""Test saving and loading SAC processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Save preprocessor
|
||||
preprocessor.save_pretrained(tmpdir)
|
||||
|
||||
# Load preprocessor
|
||||
loaded_preprocessor = DataProcessorPipeline.from_pretrained(
|
||||
tmpdir, config_filename="policy_preprocessor.json"
|
||||
)
|
||||
|
||||
# Test that loaded processor works
|
||||
observation = {OBS_STATE: torch.randn(10)}
|
||||
action = torch.randn(5)
|
||||
transition = create_transition(observation, action)
|
||||
batch = transition_to_batch(transition)
|
||||
|
||||
processed = loaded_preprocessor(batch)
|
||||
assert processed[OBS_STATE].shape == (1, 10)
|
||||
assert processed[TransitionKey.ACTION.value].shape == (1, 5)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_sac_processor_mixed_precision():
|
||||
"""Test SAC processor with mixed precision."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Create processor
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
# Replace DeviceProcessorStep with one that uses float16
|
||||
modified_steps = []
|
||||
for step in preprocessor.steps:
|
||||
if isinstance(step, DeviceProcessorStep):
|
||||
modified_steps.append(DeviceProcessorStep(device=config.device, float_dtype="float16"))
|
||||
elif isinstance(step, NormalizerProcessorStep):
|
||||
# Update normalizer to use the same device as the device processor
|
||||
norm_step = step # Now type checker knows this is NormalizerProcessorStep
|
||||
modified_steps.append(
|
||||
NormalizerProcessorStep(
|
||||
features=norm_step.features,
|
||||
norm_map=norm_step.norm_map,
|
||||
stats=norm_step.stats,
|
||||
device=config.device,
|
||||
dtype=torch.float16, # Match the float16 dtype
|
||||
)
|
||||
)
|
||||
else:
|
||||
modified_steps.append(step)
|
||||
preprocessor.steps = modified_steps
|
||||
|
||||
# Create test data
|
||||
observation = {OBS_STATE: torch.randn(10, dtype=torch.float32)}
|
||||
action = torch.randn(5, dtype=torch.float32)
|
||||
transition = create_transition(observation, action)
|
||||
batch = transition_to_batch(transition)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(batch)
|
||||
|
||||
# Check that data is converted to float16
|
||||
assert processed[OBS_STATE].dtype == torch.float16
|
||||
assert processed[TransitionKey.ACTION.value].dtype == torch.float16
|
||||
|
||||
|
||||
def test_sac_processor_batch_data():
|
||||
"""Test SAC processor with batched data."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
# Test with batched data
|
||||
batch_size = 32
|
||||
observation = {OBS_STATE: torch.randn(batch_size, 10)}
|
||||
action = torch.randn(batch_size, 5)
|
||||
transition = create_transition(observation, action)
|
||||
batch = transition_to_batch(transition)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(batch)
|
||||
|
||||
# Check that batch dimension is preserved
|
||||
assert processed[OBS_STATE].shape == (batch_size, 10)
|
||||
assert processed[TransitionKey.ACTION.value].shape == (batch_size, 5)
|
||||
|
||||
|
||||
def test_sac_processor_edge_cases():
|
||||
"""Test SAC processor with edge cases."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
# Test with observation that has no state key but still exists
|
||||
observation = {"observation.dummy": torch.randn(1)} # Some dummy observation to pass validation
|
||||
action = torch.randn(5)
|
||||
batch = {TransitionKey.ACTION.value: action, **observation}
|
||||
processed = preprocessor(batch)
|
||||
# observation.state wasn't in original, so it won't be in processed
|
||||
assert OBS_STATE not in processed
|
||||
assert processed[TransitionKey.ACTION.value].shape == (1, 5)
|
||||
|
||||
# Test with zero action (representing "null" action)
|
||||
transition = create_transition(observation={OBS_STATE: torch.randn(10)}, action=torch.zeros(5))
|
||||
batch = transition_to_batch(transition)
|
||||
processed = preprocessor(batch)
|
||||
assert processed[OBS_STATE].shape == (1, 10)
|
||||
# Action should be present and batched, even if it's zeros
|
||||
assert processed[TransitionKey.ACTION.value].shape == (1, 5)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_sac_processor_bfloat16_device_float32_normalizer():
|
||||
"""Test: DeviceProcessor(bfloat16) + NormalizerProcessor(float32) → output bfloat16 via automatic adaptation"""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, _ = make_sac_pre_post_processors(
|
||||
config,
|
||||
stats,
|
||||
)
|
||||
|
||||
# Modify the pipeline to use bfloat16 device processor with float32 normalizer
|
||||
modified_steps = []
|
||||
for step in preprocessor.steps:
|
||||
if isinstance(step, DeviceProcessorStep):
|
||||
# Device processor converts to bfloat16
|
||||
modified_steps.append(DeviceProcessorStep(device=config.device, float_dtype="bfloat16"))
|
||||
elif isinstance(step, NormalizerProcessorStep):
|
||||
# Normalizer stays configured as float32 (will auto-adapt to bfloat16)
|
||||
norm_step = step # Now type checker knows this is NormalizerProcessorStep
|
||||
modified_steps.append(
|
||||
NormalizerProcessorStep(
|
||||
features=norm_step.features,
|
||||
norm_map=norm_step.norm_map,
|
||||
stats=norm_step.stats,
|
||||
device=config.device,
|
||||
dtype=torch.float32, # Deliberately configured as float32
|
||||
)
|
||||
)
|
||||
else:
|
||||
modified_steps.append(step)
|
||||
preprocessor.steps = modified_steps
|
||||
|
||||
# Verify initial normalizer configuration
|
||||
normalizer_step = preprocessor.steps[3] # NormalizerProcessorStep
|
||||
assert normalizer_step.dtype == torch.float32
|
||||
|
||||
# Create test data
|
||||
observation = {OBS_STATE: torch.randn(10, dtype=torch.float32)} # Start with float32
|
||||
action = torch.randn(5, dtype=torch.float32)
|
||||
transition = create_transition(observation, action)
|
||||
batch = transition_to_batch(transition)
|
||||
|
||||
# Process through full pipeline
|
||||
processed = preprocessor(batch)
|
||||
|
||||
# Verify: DeviceProcessor → bfloat16, NormalizerProcessor adapts → final output is bfloat16
|
||||
assert processed[OBS_STATE].dtype == torch.bfloat16
|
||||
assert processed[TransitionKey.ACTION.value].dtype == torch.bfloat16
|
||||
|
||||
# Verify normalizer automatically adapted its internal state
|
||||
assert normalizer_step.dtype == torch.bfloat16
|
||||
for stat_tensor in normalizer_step._tensor_stats[OBS_STATE].values():
|
||||
assert stat_tensor.dtype == torch.bfloat16
|
||||
Reference in New Issue
Block a user