refactor(lingbot_va): use built-in UnnormalizerProcessorStep for actions

Replace the bespoke LingBotVAActionUnnormalizeStep with the standard
UnnormalizerProcessorStep in QUANTILES mode, which computes the identical
(action + 1) / 2 * (q99 - q01) + q01 mapping. The per-channel q01/q99 are stored
as the step's saved state (a safetensors file) and restored on load; a fresh build
has no action stats so the step is an identity passthrough.

The 3 Hub checkpoints (lerobot/lingbot_va_{libero_long,robotwin,base}) have been
re-uploaded with the new post-processor (policy_postprocessor.json +
*_unnormalizer_processor.safetensors); reloading from the Hub round-trips q01/q99.

- processor_lingbot_va.py: drop the custom step + registry; build the post-processor
  with UnnormalizerProcessorStep (explicit ACTION->QUANTILES norm_map so the
  preprocessor / training path is unchanged).
- tests: assert the built-in step is used, identity-when-no-stats, correct quantile
  unnormalization, and a save_pretrained/from_pretrained stats round-trip.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-06-08 11:57:31 +02:00
committed by Maxime Ellerbach
parent 2a7b7ea744
commit 3061ca6661
2 changed files with 49 additions and 89 deletions
@@ -16,28 +16,27 @@
The policy itself handles image resizing, scaling to [-1, 1] and VAE encoding (the VAE
lives inside the policy), so the preprocessor only renames, batches, normalizes (IDENTITY)
and moves to device. The postprocessor reverses the *fixed* action quantile normalization
(``(action + 1) / 2 * (q99 - q01 + 1e-6) + q01``) baked into the released checkpoints — this
is a fixed transform, not a dataset-stats one, so it cannot use the standard
``UnnormalizerProcessorStep`` and is implemented as a dedicated step below.
and moves to device. The policy emits actions in the normalized ``[-1, 1]`` space; the
postprocessor maps them back to physical units with the standard ``UnnormalizerProcessorStep``
in QUANTILES mode (``(action + 1) / 2 * (q99 - q01) + q01``). The per-channel q01/q99 are NOT
hardcoded: they are saved in each checkpoint's post-processor state and restored on load. A
fresh (unconverted) policy has no action stats, so the step is a no-op (identity passthrough).
"""
from dataclasses import dataclass, field
from typing import Any
import torch
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.configs.types import FeatureType, NormalizationMode
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyActionProcessorStep,
PolicyProcessorPipeline,
ProcessorStep,
ProcessorStepRegistry,
RenameObservationsProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action
from lerobot.utils.constants import (
@@ -47,36 +46,6 @@ from lerobot.utils.constants import (
from .configuration_lingbot_va import LingBotVAConfig
# LingBot-VA uses fixed per-channel action quantile (un)normalization. The benchmark quantiles are
# NOT hardcoded here: they live in each checkpoint's ``policy_postprocessor.json`` and are restored on
# load. A fresh (unconverted) policy defaults to a neutral ``[-1, 1]`` mapping (identity rescale).
@dataclass
@ProcessorStepRegistry.register(name="lingbot_va_action_unnormalize")
class LingBotVAActionUnnormalizeStep(PolicyActionProcessorStep):
"""Reverse LingBot-VA's fixed per-channel quantile normalization on predicted actions.
The policy emits actions in the normalized ``[-1, 1]`` space of the used action channels.
This step maps them back to physical units via the fixed quantiles stored in the config.
"""
action_q01: list[float] = field(default_factory=list)
action_q99: list[float] = field(default_factory=list)
def action(self, action: PolicyAction) -> PolicyAction:
q01 = torch.as_tensor(self.action_q01, dtype=action.dtype, device=action.device)
q99 = torch.as_tensor(self.action_q99, dtype=action.dtype, device=action.device)
return (action + 1.0) / 2.0 * (q99 - q01 + 1e-6) + q01
def get_config(self) -> dict[str, Any]:
return {"action_q01": list(self.action_q01), "action_q99": list(self.action_q99)}
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
return features
def make_lingbot_va_pre_post_processors(
config: LingBotVAConfig,
@@ -98,11 +67,15 @@ def make_lingbot_va_pre_post_processors(
DeviceProcessorStep(device=config.device),
]
# Fresh-build default: neutral [-1, 1] mapping (identity rescale). The real per-benchmark
# quantiles are restored from the checkpoint's saved post-processor config by from_pretrained.
n_used = len(config.used_action_channel_ids)
# Unnormalize predicted actions from [-1, 1] back to physical units via per-channel q01/q99
# (QUANTILES mode), overriding the policy's IDENTITY action mapping. The q01/q99 stats are
# restored from the checkpoint on load; a fresh build has no action stats and is a passthrough.
output_steps: list[ProcessorStep] = [
LingBotVAActionUnnormalizeStep(action_q01=[-1.0] * n_used, action_q99=[1.0] * n_used),
UnnormalizerProcessorStep(
features=config.output_features,
norm_map={FeatureType.ACTION: NormalizationMode.QUANTILES},
stats=dataset_stats,
),
DeviceProcessorStep(device="cpu"),
]
+34 -47
View File
@@ -20,12 +20,11 @@ import torch
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.policies.lingbot_va.configuration_lingbot_va import LingBotVAConfig
from lerobot.policies.lingbot_va.processor_lingbot_va import (
LingBotVAActionUnnormalizeStep,
make_lingbot_va_pre_post_processors,
)
from lerobot.processor import PolicyProcessorPipeline
from lerobot.policies.lingbot_va.processor_lingbot_va import make_lingbot_va_pre_post_processors
from lerobot.processor import PolicyProcessorPipeline, UnnormalizerProcessorStep
from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action
from lerobot.utils.constants import (
ACTION,
OBS_IMAGES,
POLICY_POSTPROCESSOR_DEFAULT_NAME,
POLICY_PREPROCESSOR_DEFAULT_NAME,
@@ -40,62 +39,50 @@ def _make_config() -> LingBotVAConfig:
return cfg
def test_action_unnormalize_inverts_quantile_norm() -> None:
q01 = [-1.0, -0.5, 0.0]
q99 = [1.0, 0.5, 2.0]
step = LingBotVAActionUnnormalizeStep(action_q01=q01, action_q99=q99)
# Forward (the policy-side) quantile normalization: (x - q01) / (q99 - q01 + eps) * 2 - 1.
q01_t = torch.tensor(q01)
q99_t = torch.tensor(q99)
raw = torch.tensor([[0.3, 0.1, 1.0]])
normed = (raw - q01_t) / (q99_t - q01_t + 1e-6) * 2.0 - 1.0
recovered = step.action(normed)
assert torch.allclose(recovered, raw, atol=1e-4)
def test_action_unnormalize_config_roundtrip() -> None:
step = LingBotVAActionUnnormalizeStep(action_q01=[0.0, 1.0], action_q99=[2.0, 3.0])
cfg = step.get_config()
assert cfg == {"action_q01": [0.0, 1.0], "action_q99": [2.0, 3.0]}
rebuilt = LingBotVAActionUnnormalizeStep(**cfg)
assert rebuilt.action_q01 == step.action_q01
assert rebuilt.action_q99 == step.action_q99
def test_make_pre_post_processors_names_and_steps() -> None:
cfg = _make_config()
pre, post = make_lingbot_va_pre_post_processors(cfg, dataset_stats=None)
assert pre.name == POLICY_PREPROCESSOR_DEFAULT_NAME
assert post.name == POLICY_POSTPROCESSOR_DEFAULT_NAME
# The postprocessor must contain the dedicated quantile unnormalize step.
assert any(isinstance(s, LingBotVAActionUnnormalizeStep) for s in post.steps)
# Actions are unnormalized by the standard built-in quantile unnormalizer.
assert any(isinstance(s, UnnormalizerProcessorStep) for s in post.steps)
def test_freshly_built_postprocessor_is_neutral() -> None:
# A fresh (unconverted) policy defaults to a neutral [-1, 1] mapping (identity rescale): the real
# per-benchmark quantiles are NOT hardcoded, they are restored from the saved checkpoint on load.
def test_freshly_built_postprocessor_is_identity() -> None:
# Without action stats the quantile unnormalizer is a no-op (identity passthrough): the real
# per-benchmark q01/q99 are restored from the saved checkpoint on load, not hardcoded here.
cfg = _make_config()
_, post = make_lingbot_va_pre_post_processors(cfg, dataset_stats=None)
normed = torch.tensor([[0.3, -0.5, 1.0, -1.0, 0.0, 0.7, -0.2]])
out = post(normed)
assert torch.allclose(out, normed, atol=1e-4)
assert torch.allclose(post(normed), normed, atol=1e-6)
def test_postprocessor_quantiles_survive_save_load(tmp_path) -> None:
# Regression guard for the Hub mechanism this policy relies on: the benchmark quantiles live in
# the serialized post-processor config and must round-trip through save_pretrained/from_pretrained.
def test_postprocessor_quantile_unnormalization() -> None:
# QUANTILES unnormalize maps [-1, 1] -> [q01, q99]: -1 -> q01, +1 -> q99.
cfg = _make_config()
q01 = [-1.0, -0.5, 0.0, -1.0, -1.0, -1.0, -1.0]
q99 = [1.0, 0.5, 2.0, 1.0, 1.0, 1.0, 1.0]
stats = {ACTION: {"q01": q01, "q99": q99}}
_, post = make_lingbot_va_pre_post_processors(cfg, dataset_stats=stats)
out_lo = post(torch.full((1, 7), -1.0))
out_hi = post(torch.full((1, 7), 1.0))
assert torch.allclose(out_lo, torch.tensor(q01).unsqueeze(0), atol=1e-4)
assert torch.allclose(out_hi, torch.tensor(q99).unsqueeze(0), atol=1e-4)
def test_postprocessor_stats_survive_save_load(tmp_path) -> None:
# Regression guard for the Hub mechanism: the q01/q99 stats live in the saved post-processor
# state and must round-trip through save_pretrained / from_pretrained.
cfg = _make_config()
q01 = [-0.6, -0.8, -0.9, -0.1, -0.15, -0.25, -1.0]
q99 = [0.9, 0.85, 0.9, 0.17, 0.18, 0.34, 1.0]
post = PolicyProcessorPipeline[torch.Tensor, torch.Tensor](
steps=[LingBotVAActionUnnormalizeStep(action_q01=q01, action_q99=q99)],
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
)
_, post = make_lingbot_va_pre_post_processors(cfg, dataset_stats={ACTION: {"q01": q01, "q99": q99}})
post.save_pretrained(tmp_path)
loaded = PolicyProcessorPipeline.from_pretrained(
tmp_path, config_filename=f"{POLICY_POSTPROCESSOR_DEFAULT_NAME}.json"
tmp_path,
config_filename=f"{POLICY_POSTPROCESSOR_DEFAULT_NAME}.json",
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
)
step = next(s for s in loaded.steps if isinstance(s, LingBotVAActionUnnormalizeStep))
assert step.action_q01 == q01
assert step.action_q99 == q99
out = loaded(torch.full((1, 7), -1.0))
assert torch.allclose(out, torch.tensor(q01).unsqueeze(0), atol=1e-4)