refactor(normalization): remove Normalize and Unnormalize classes

- Deleted the Normalize and Unnormalize classes from the normalization module to streamline the codebase.
- Updated tests to ensure compatibility with the removal of these classes, focusing on the new NormalizerProcessor and UnnormalizerProcessor implementations.
- Enhanced the handling of normalization statistics and improved overall code clarity.
This commit is contained in:
AdilZouitine
2025-08-08 13:23:10 +02:00
parent e4fd30a8d4
commit abcbc16126
2 changed files with 154 additions and 420 deletions
+154
View File
@@ -25,6 +25,7 @@ from lerobot.processor.normalize_processor import (
UnnormalizerProcessor,
_convert_stats_to_tensors,
hotswap_stats,
rename_stats,
)
from lerobot.processor.pipeline import IdentityProcessor, RobotProcessor, TransitionKey
@@ -1604,3 +1605,156 @@ def test_hotswap_stats_functional_test():
new_result["observation"]["observation.image"], observation["observation.image"]
)
assert not torch.allclose(new_result["action"], action)
def test_zero_std_uses_eps():
"""When std == 0, (x-mean)/(std+eps) is well-defined; x==mean should map to 0."""
features = {"observation.state": PolicyFeature(FeatureType.STATE, (1,))}
norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD}
stats = {"observation.state": {"mean": np.array([0.5]), "std": np.array([0.0])}}
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats, eps=1e-6)
observation = {"observation.state": torch.tensor([0.5])} # equals mean
out = normalizer(create_transition(observation=observation))
assert torch.allclose(out[TransitionKey.OBSERVATION]["observation.state"], torch.tensor([0.0]))
def test_min_equals_max_maps_to_minus_one():
"""When min == max, MIN_MAX path maps to -1 after [-1,1] scaling for x==min."""
features = {"observation.state": PolicyFeature(FeatureType.STATE, (1,))}
norm_map = {FeatureType.STATE: NormalizationMode.MIN_MAX}
stats = {"observation.state": {"min": np.array([2.0]), "max": np.array([2.0])}}
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats, eps=1e-6)
observation = {"observation.state": torch.tensor([2.0])}
out = normalizer(create_transition(observation=observation))
assert torch.allclose(out[TransitionKey.OBSERVATION]["observation.state"], torch.tensor([-1.0]))
def test_action_normalized_despite_normalize_keys():
"""Action normalization is independent of normalize_keys filter for observations."""
features = {
"observation.state": PolicyFeature(FeatureType.STATE, (1,)),
"action": PolicyFeature(FeatureType.ACTION, (2,)),
}
norm_map = {FeatureType.STATE: NormalizationMode.IDENTITY, FeatureType.ACTION: NormalizationMode.MEAN_STD}
stats = {"action": {"mean": np.array([1.0, -1.0]), "std": np.array([2.0, 4.0])}}
normalizer = NormalizerProcessor(
features=features, norm_map=norm_map, stats=stats, normalize_keys={"observation.state"}
)
transition = create_transition(
observation={"observation.state": torch.tensor([3.0])}, action=torch.tensor([3.0, 3.0])
)
out = normalizer(transition)
# (3-1)/2 = 1.0 ; (3-(-1))/4 = 1.0
assert torch.allclose(out[TransitionKey.ACTION], torch.tensor([1.0, 1.0]))
def test_unnormalize_observations_mean_std_and_min_max():
features = {
"observation.ms": PolicyFeature(FeatureType.STATE, (2,)),
"observation.mm": PolicyFeature(FeatureType.STATE, (2,)),
}
# Build two processors: one mean/std and one min/max
unnorm_ms = UnnormalizerProcessor(
features={"observation.ms": features["observation.ms"]},
norm_map={FeatureType.STATE: NormalizationMode.MEAN_STD},
stats={"observation.ms": {"mean": np.array([1.0, -1.0]), "std": np.array([2.0, 4.0])}},
)
unnorm_mm = UnnormalizerProcessor(
features={"observation.mm": features["observation.mm"]},
norm_map={FeatureType.STATE: NormalizationMode.MIN_MAX},
stats={"observation.mm": {"min": np.array([0.0, -2.0]), "max": np.array([2.0, 2.0])}},
)
tr = create_transition(
observation={
"observation.ms": torch.tensor([0.0, 0.0]), # → mean
"observation.mm": torch.tensor([0.0, 0.0]), # → mid-point
}
)
out_ms = unnorm_ms(tr)[TransitionKey.OBSERVATION]["observation.ms"]
out_mm = unnorm_mm(tr)[TransitionKey.OBSERVATION]["observation.mm"]
assert torch.allclose(out_ms, torch.tensor([1.0, -1.0]))
assert torch.allclose(out_mm, torch.tensor([1.0, 0.0])) # mid of [0,2] and [-2,2]
def test_rename_stats_basic():
orig = {
"observation.state": {"mean": np.array([0.0]), "std": np.array([1.0])},
"action": {"mean": np.array([0.0])},
}
mapping = {"observation.state": "observation.robot_state"}
renamed = rename_stats(orig, mapping)
assert "observation.robot_state" in renamed and "observation.state" not in renamed
# Ensure deep copy: mutate original and verify renamed unaffected
orig["observation.state"]["mean"][0] = 42.0
assert renamed["observation.robot_state"]["mean"][0] != 42.0
def test_unknown_observation_keys_ignored():
features = {"observation.state": PolicyFeature(FeatureType.STATE, (1,))}
norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD}
stats = {"observation.state": {"mean": np.array([0.0]), "std": np.array([1.0])}}
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
obs = {"observation.state": torch.tensor([1.0]), "observation.unknown": torch.tensor([5.0])}
tr = create_transition(observation=obs)
out = normalizer(tr)
# Unknown key should pass through unchanged and not be tracked
assert torch.allclose(out[TransitionKey.OBSERVATION]["observation.unknown"], obs["observation.unknown"])
comp = out.get(TransitionKey.COMPLEMENTARY_DATA) or {}
assert "normalized_keys" in comp and "observation.unknown" not in comp["normalized_keys"]
def test_batched_action_normalization():
features = {"action": PolicyFeature(FeatureType.ACTION, (2,))}
norm_map = {FeatureType.ACTION: NormalizationMode.MEAN_STD}
stats = {"action": {"mean": np.array([1.0, -1.0]), "std": np.array([2.0, 4.0])}}
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
actions = torch.tensor([[1.0, -1.0], [3.0, 3.0]]) # first equals mean → zeros; second → [1, 1]
out = normalizer(create_transition(action=actions))[TransitionKey.ACTION]
expected = torch.tensor([[0.0, 0.0], [1.0, 1.0]])
assert torch.allclose(out, expected)
def test_complementary_data_preservation():
features = {"observation.state": PolicyFeature(FeatureType.STATE, (1,))}
norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD}
stats = {"observation.state": {"mean": np.array([0.0]), "std": np.array([1.0])}}
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
comp = {"existing": 123}
tr = create_transition(observation={"observation.state": torch.tensor([1.0])}, complementary_data=comp)
out = normalizer(tr)
new_comp = out[TransitionKey.COMPLEMENTARY_DATA]
assert new_comp["existing"] == 123 and "normalized_keys" in new_comp
def test_roundtrip_normalize_unnormalize_non_identity():
features = {
"observation.state": PolicyFeature(FeatureType.STATE, (2,)),
"action": PolicyFeature(FeatureType.ACTION, (2,)),
}
norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD, FeatureType.ACTION: NormalizationMode.MIN_MAX}
stats = {
"observation.state": {"mean": np.array([1.0, -1.0]), "std": np.array([2.0, 4.0])},
"action": {"min": np.array([-2.0, 0.0]), "max": np.array([2.0, 4.0])},
}
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
unnormalizer = UnnormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
# Add a time dimension in action for broadcasting check (B,T,D)
obs = {"observation.state": torch.tensor([[3.0, 3.0], [1.0, -1.0]])}
act = torch.tensor([[[0.0, -1.0], [1.0, 1.0]]]) # shape (1,2,2) already in [-1,1]
tr = create_transition(observation=obs, action=act)
out = unnormalizer(normalizer(tr))
assert torch.allclose(
out[TransitionKey.OBSERVATION]["observation.state"], obs["observation.state"], atol=1e-5
)
assert torch.allclose(out[TransitionKey.ACTION], act, atol=1e-5)