Files
lerobot/tests/rl/test_queue.py
T
Khalil Meftah e963e5a0c4 RL stack refactoring (#3075)
* refactor: RL stack refactoring — RLAlgorithm, RLTrainer, DataMixer, and SAC restructuring

* chore: clarify torch.compile disabled note in SACAlgorithm

* fix(teleop): keyboard EE teleop not registering special keys and losing intervention state

Fixes #2345

Co-authored-by: jpizarrom <jpizarrom@gmail.com>

* fix: remove leftover normalization calls from reward classifier predict_reward

Fixes #2355

* fix: add thread synchronization to ReplayBuffer to prevent race condition between add() and sample()

* refactor: update SACAlgorithm to pass action_dim to _init_critics and fix encoder reference

* perf: remove redundant CPU→GPU→CPU transition move in learner

* Fix: add kwargs in reward classifier __init__()

* fix: include IS_INTERVENTION in complementary_info sent to learner for offline replay buffer

* fix: add try/finally to control_loop to ensure image writer cleanup on exit

* fix: use string key for IS_INTERVENTION in complementary_info to avoid torch.load serialization error

* fix: skip tests that require grpc if not available

* fix(tests): ensure tensor stats comparison accounts for reshaping in normalization tests

* fix(tests): skip tests that require grpc if not available

* refactor(rl): expose public API in rl/__init__ and use relative imports in sub-packages

* fix(config): update vision encoder model name to lerobot/resnet10

* fix(sac): clarify torch.compile status

* refactor(rl): update shutdown_event type hints from 'any' to 'Any' for consistency and clarity

* refactor(sac): simplify optimizer return structure

* perf(rl): use async iterators in OnlineOfflineMixer.get_iterator

* refactor(sac): decouple algorithm hyperparameters from policy config

* update losses names in tests

* fix docstring

* remove unused type alias

* fix test for flat dict structure

* refactor(policies): rename policies/sac → policies/gaussian_actor

* refactor(rl/sac): consolidate hyperparameter ownership and clean up discrete critic

* perf(observation_processor): add CUDA support for image processing

* fix(rl): correctly wire HIL-SERL gripper penalty through processor pipeline

(cherry picked from commit 9c2af818ff)

* fix(rl): add time limit processor to environment pipeline

(cherry picked from commit cd105f65cb)

* fix(rl): clarify discrete gripper action mapping in GripperVelocityToJoint for SO100

(cherry picked from commit 494f469a2b)

* fix(rl): update neutral gripper action

(cherry picked from commit 9c9064e5be)

* fix(rl): merge environment and action-processor info in transition processing

(cherry picked from commit 30e1886b64)

* fix(rl): mirror gym_manipulator in actor

(cherry picked from commit d2a046dfc5)

* fix(rl): postprocess action in actor

(cherry picked from commit c2556439e5)

* fix(rl): improve action processing for discrete and continuous actions

(cherry picked from commit f887ab3f6a)

* fix(rl): enhance intervention handling in actor and learner

(cherry picked from commit ef8bfffbd7)

* Revert "perf(observation_processor): add CUDA support for image processing"

This reverts commit 38b88c414c.

* refactor(rl): make algorithm a nested config so all SAC hyperparameters are JSON-addressable

* refactor(rl): add make_algorithm_config function for RLAlgorithmConfig instantiation

* refactor(rl): add type property to RLAlgorithmConfig for better clarity

* refactor(rl): make RLAlgorithmConfig an abstract base class for better extensibility

* refactor(tests): remove grpc import checks from test files for cleaner code

* fix(tests): gate RL tests on the `datasets` extra

* refactor: simplify docstrings for clarity and conciseness across multiple files

* fix(rl): update gripper position key and handle action absence during reset

* fix(rl): record pre-step observation so (obs, action, next.reward) align in gym_manipulator dataset

* refactor: clean up import statements

* chore: address reviewer comments

* chore: improve visual stats reshaping logic and update docstring for clarity

* refactor: enforce mandatory config_class and name attributes in RLAlgorithm

* refactor: implement NotImplementedError for abstract methods in RLAlgorithm and DataMixer

* refactor: replace build_algorithm with make_algorithm for SACAlgorithmConfig and update related tests

* refactor: add require_package calls for grpcio and gym-hil in relevant modules

* refactor(rl): move grpcio guards to runtime entry points

* feat(rl): consolidate HIL-SERL checkpoint into HF-style components

Make `RLAlgorithmConfig` and `RLAlgorithm` `HubMixin`s, add abstract
`state_dict()` / `load_state_dict()` for critic ensemble, target nets
and `log_alpha`, and persist them as a sibling `algorithm/` component
next to `pretrained_model/`. Replace the pickled `training_state.pt`
with an enriched `training_step.json` carrying `step` and
`interaction_step`, so resume restores actor + critics + target nets +
temperature + optimizers + RNG + counters from HF-standard files.

* refactor(rl): move actor weight-sync wire format from policy to algorithm

* refactor(rl): update type hints for learner and actor functions

* refactor(rl): hoist grpcio guard to module top in actor/learner

* chore(rl): manage import pattern in actor (#3564)

* chore(rl): manage import pattern in actor

* chore(rl): optional grpc imports in learner; quote grpc ServicerContext types

---------

Co-authored-by: Khalil Meftah <khalil.meftah@huggingface.co>

* update uv.lock

* chore(doc): update doc

---------

Co-authored-by: jpizarrom <jpizarrom@gmail.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-05-12 15:49:54 +02:00

171 lines
4.4 KiB
Python

#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time
from queue import Queue
import pytest
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
from torch.multiprocessing import Queue as TorchMPQueue # noqa: E402
from lerobot.rl.queue import get_last_item_from_queue # noqa: E402
def test_get_last_item_single_item():
"""Test getting the last item when queue has only one item."""
queue = Queue()
queue.put("single_item")
result = get_last_item_from_queue(queue)
assert result == "single_item"
assert queue.empty()
def test_get_last_item_multiple_items():
"""Test getting the last item when queue has multiple items."""
queue = Queue()
items = ["first", "second", "third", "fourth", "last"]
for item in items:
queue.put(item)
result = get_last_item_from_queue(queue)
assert result == "last"
assert queue.empty()
def test_get_last_item_multiple_items_with_torch_queue():
"""Test getting the last item when queue has multiple items."""
queue = TorchMPQueue()
items = ["first", "second", "third", "fourth", "last"]
for item in items:
queue.put(item)
result = get_last_item_from_queue(queue)
assert result == "last"
assert queue.empty()
def test_get_last_item_different_types():
"""Test with different data types in the queue."""
queue = Queue()
items = [1, 2.5, "string", {"key": "value"}, [1, 2, 3], ("tuple", "data")]
for item in items:
queue.put(item)
result = get_last_item_from_queue(queue)
assert result == ("tuple", "data")
assert queue.empty()
def test_get_last_item_maxsize_queue():
"""Test with a queue that has a maximum size."""
queue = Queue(maxsize=5)
# Fill the queue
for i in range(5):
queue.put(i)
# Give the queue time to fill
time.sleep(0.1)
result = get_last_item_from_queue(queue)
assert result == 4
assert queue.empty()
def test_get_last_item_with_none_values():
"""Test with None values in the queue."""
queue = Queue()
items = [1, None, 2, None, 3]
for item in items:
queue.put(item)
# Give the queue time to fill
time.sleep(0.1)
result = get_last_item_from_queue(queue)
assert result == 3
assert queue.empty()
def test_get_last_item_blocking_timeout():
"""Test get_last_item_from_queue returns None on timeout."""
queue = Queue()
result = get_last_item_from_queue(queue, block=True, timeout=0.1)
assert result is None
def test_get_last_item_non_blocking_empty():
"""Test get_last_item_from_queue with block=False on an empty queue returns None."""
queue = Queue()
result = get_last_item_from_queue(queue, block=False)
assert result is None
def test_get_last_item_non_blocking_success():
"""Test get_last_item_from_queue with block=False on a non-empty queue."""
queue = Queue()
items = ["first", "second", "last"]
for item in items:
queue.put(item)
# Give the queue time to fill
time.sleep(0.1)
result = get_last_item_from_queue(queue, block=False)
assert result == "last"
assert queue.empty()
def test_get_last_item_blocking_waits_for_item():
"""Test that get_last_item_from_queue waits for an item if block=True."""
queue = Queue()
result = []
def producer():
queue.put("item1")
queue.put("item2")
def consumer():
# This will block until the producer puts the first item
item = get_last_item_from_queue(queue, block=True, timeout=0.2)
result.append(item)
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
assert result == ["item2"]
assert queue.empty()