feat(runtime): action chunk diagnostic — log normalized + unnormalized values

Adds a per-chunk log line in LowLevelForward that surfaces what the
action expert actually emits and what the robot receives after the
postprocessor unnormalizes it, so "barely moving" can be diagnosed
at a glance:

  [act] T=50 |a|_mean=0.234 spread=0.512
  [act] norm  first=[0.12, -0.31, ...]  last=[0.45, -0.22, ...]
  [act] joint first=[3.2, -47.8, ...]  last=[12.4, -41.0, ...]  state=[0.5, -55.3, ...]

|a|_mean ~ 0.3–0.6 with spread ~ 0.3+ and visible delta from first to
last → healthy trajectory. |a|_mean near 0 across the chunk → model
defaulting to median pose. joint values that don't differ much from
state → safety cap or model output near current state.

Postprocessor is stashed on runtime.state["_postprocessor"] at startup
so the diagnostic can replay the same unnormalize the dispatcher uses.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-05-25 12:10:52 +02:00
parent 77a16db529
commit db927ab40b
2 changed files with 46 additions and 0 deletions
@@ -167,6 +167,47 @@ class LowLevelForward(InferenceStep):
chunk_iter = chunk
else:
chunk_iter = chunk.unsqueeze(0)
# Diagnostic: show what the action expert actually emitted for
# this chunk. The values here are *normalized* (pre-postprocessor),
# so we expect them roughly in [-1, 1] under QUANTILES; a chunk
# that stays near zero across all 50 steps is the canonical
# "barely moving" signature (model defaults to median pose).
# Also surface a few unnormalized samples by running the
# postprocessor on a copy so we can see the actual joint targets
# the robot will receive.
try:
import torch as _t # noqa: PLC0415
sample = chunk_iter.detach().float().cpu() # (T, D) normalized
mag = sample.abs().mean().item()
spread = (sample.amax(0) - sample.amin(0)).abs().mean().item()
first_norm = [round(float(x), 3) for x in sample[0].tolist()]
last_norm = [round(float(x), 3) for x in sample[-1].tolist()]
postprocessor = state.get("_postprocessor")
first_unnorm = last_unnorm = None
if postprocessor is not None:
try:
first_unnorm_t = postprocessor(sample[:1].clone())
last_unnorm_t = postprocessor(sample[-1:].clone())
if isinstance(first_unnorm_t, _t.Tensor):
first_unnorm = [round(float(x), 2) for x in first_unnorm_t.flatten().tolist()]
if isinstance(last_unnorm_t, _t.Tensor):
last_unnorm = [round(float(x), 2) for x in last_unnorm_t.flatten().tolist()]
except Exception: # noqa: BLE001
pass
state_now = observation.get("observation.state")
state_first = None
if isinstance(state_now, _t.Tensor):
s = state_now.detach().float().cpu().flatten().tolist()
state_first = [round(float(x), 2) for x in s[:6]]
push_log(state, f" [act] T={sample.shape[0]} |a|_mean={mag:.3f} spread={spread:.3f}")
push_log(state, f" [act] norm first={first_norm} last={last_norm}")
if first_unnorm is not None:
push_log(state, f" [act] joint first={first_unnorm} last={last_unnorm} state={state_first}")
except Exception as exc: # noqa: BLE001
logger.debug("act-diag failed: %s", exc)
for step in chunk_iter:
queue.append(step.unsqueeze(0))
state["last_chunk_size"] = int(chunk_iter.shape[0])
@@ -1508,6 +1508,11 @@ def main(argv: list[str] | None = None) -> int:
# under-trained checkpoint without recompiling.
runtime.state["text_gen_min_new_tokens"] = int(getattr(args, "text_min_new_tokens", 0) or 0)
runtime.state["text_gen_temperature"] = float(getattr(args, "text_temperature", 0.0) or 0.0)
# Stash the postprocessor so LowLevelForward's action diagnostic
# can show both normalized chunk values AND unnormalized joint
# targets — answers "what is the model emitting + what does the
# robot actually receive" in one log line.
runtime.state["_postprocessor"] = postprocessor
runtime.state["text_gen_top_p"] = float(getattr(args, "text_top_p", 1.0) or 1.0)
# Apply the startup mode chosen above the task picker.
runtime.state["mode"] = startup_mode