mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-15 16:49:55 +00:00
test(processor): all processors use now the same create_transition (#1906)
* test(processor): all processors use now the same create_transition * test(processor): use identity instead of lambda for transition in pipelines
This commit is contained in:
@@ -28,21 +28,7 @@ from lerobot.processor import (
|
||||
ProcessorStepRegistry,
|
||||
TransitionKey,
|
||||
)
|
||||
|
||||
|
||||
def create_transition(
|
||||
observation=None, action=None, reward=None, done=None, truncated=None, info=None, complementary_data=None
|
||||
):
|
||||
"""Helper to create an EnvTransition dictionary."""
|
||||
return {
|
||||
TransitionKey.OBSERVATION: observation,
|
||||
TransitionKey.ACTION: action,
|
||||
TransitionKey.REWARD: reward,
|
||||
TransitionKey.DONE: done,
|
||||
TransitionKey.TRUNCATED: truncated,
|
||||
TransitionKey.INFO: info,
|
||||
TransitionKey.COMPLEMENTARY_DATA: complementary_data,
|
||||
}
|
||||
from lerobot.processor.converters import create_transition, identity_transition
|
||||
|
||||
|
||||
def test_state_1d_to_2d():
|
||||
@@ -248,7 +234,9 @@ def test_mixed_observation():
|
||||
def test_integration_with_robot_processor():
|
||||
"""Test AddBatchDimensionProcessorStep integration with RobotProcessor."""
|
||||
to_batch_processor = AddBatchDimensionProcessorStep()
|
||||
pipeline = DataProcessorPipeline([to_batch_processor], to_transition=lambda x: x, to_output=lambda x: x)
|
||||
pipeline = DataProcessorPipeline(
|
||||
[to_batch_processor], to_transition=identity_transition, to_output=identity_transition
|
||||
)
|
||||
|
||||
# Create unbatched observation
|
||||
observation = {
|
||||
@@ -289,7 +277,7 @@ def test_save_and_load_pretrained():
|
||||
"""Test saving and loading AddBatchDimensionProcessorStep with RobotProcessor."""
|
||||
processor = AddBatchDimensionProcessorStep()
|
||||
pipeline = DataProcessorPipeline(
|
||||
[processor], name="BatchPipeline", to_transition=lambda x: x, to_output=lambda x: x
|
||||
[processor], name="BatchPipeline", to_transition=identity_transition, to_output=identity_transition
|
||||
)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
@@ -302,7 +290,7 @@ def test_save_and_load_pretrained():
|
||||
|
||||
# Load pipeline
|
||||
loaded_pipeline = DataProcessorPipeline.from_pretrained(
|
||||
tmp_dir, to_transition=lambda x: x, to_output=lambda x: x
|
||||
tmp_dir, to_transition=identity_transition, to_output=identity_transition
|
||||
)
|
||||
|
||||
assert loaded_pipeline.name == "BatchPipeline"
|
||||
@@ -330,12 +318,14 @@ def test_registry_functionality():
|
||||
def test_registry_based_save_load():
|
||||
"""Test saving and loading using registry name."""
|
||||
processor = AddBatchDimensionProcessorStep()
|
||||
pipeline = DataProcessorPipeline([processor], to_transition=lambda x: x, to_output=lambda x: x)
|
||||
pipeline = DataProcessorPipeline(
|
||||
[processor], to_transition=identity_transition, to_output=identity_transition
|
||||
)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
pipeline.save_pretrained(tmp_dir)
|
||||
loaded_pipeline = DataProcessorPipeline.from_pretrained(
|
||||
tmp_dir, to_transition=lambda x: x, to_output=lambda x: x
|
||||
tmp_dir, to_transition=identity_transition, to_output=identity_transition
|
||||
)
|
||||
|
||||
# Verify the loaded processor works
|
||||
@@ -703,7 +693,7 @@ def test_complementary_data_none():
|
||||
transition = create_transition(complementary_data=None)
|
||||
result = processor(transition)
|
||||
|
||||
assert result[TransitionKey.COMPLEMENTARY_DATA] is None
|
||||
assert result[TransitionKey.COMPLEMENTARY_DATA] == {}
|
||||
|
||||
|
||||
def test_complementary_data_empty():
|
||||
|
||||
Reference in New Issue
Block a user