Label Foxglove dataset scalars with feature dimension names

Use the dataset's per-dimension feature names (e.g. joint names) as the
Foxglove series labels for /observation/state and /action/state instead
of bare indices. LeRobot stores `names` inconsistently (flat list,
{category: [...]}, or {name: index}), so _feature_dim_names handles each
and falls back to indices on any unknown format or length mismatch.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Roman Shtylman
2026-06-18 18:44:26 -07:00
committed by CarolinePascal
parent a3c0a8ca1a
commit 802f49438c
+49 -8
View File
@@ -407,20 +407,50 @@ def log_foxglove_data(
_SUCCESS = "next.success"
def _frame_to_scalars(sample: dict, key: str) -> dict[str, float]:
def _feature_dim_names(feature: dict | None) -> list[str] | None:
"""Best-effort per-dimension series labels for a 1D feature, or ``None`` to fall back to indices.
LeRobot records a feature's ``names`` inconsistently: a flat list (``["x", "y"]``), a category
mapping (``{"motors": ["motor_0", "motor_1"]}``), or a name->index mapping
(``{"delta_x": 0, "delta_y": 1}``). Each is handled, but labels are only returned when their count
matches the feature's 1D shape, so a malformed/mismatched ``names`` can't silently mislabel series.
"""
if not feature:
return None
shape = feature.get("shape")
dim = shape[0] if shape and len(shape) == 1 else None
names = feature.get("names")
labels: list[str] | None = None
if isinstance(names, dict):
values = list(names.values())
if values and all(isinstance(v, (list, tuple)) for v in values):
labels = [str(n) for group in values for n in group]
elif values and all(isinstance(v, int) for v in values):
labels = [name for name, _ in sorted(names.items(), key=lambda kv: kv[1])]
elif isinstance(names, (list, tuple)):
labels = [str(n) for n in names]
if labels is not None and dim is not None and len(labels) == dim:
return labels
return None
def _frame_to_scalars(sample: dict, key: str, labels: list[str] | None = None) -> dict[str, float]:
"""Flatten a frame's vector/scalar feature ``key`` into ``{label: value}`` entries.
Vectors are expanded to ``<i>`` labels (one series per dimension); a scalar becomes a single
entry. Missing or ``None`` features yield an empty mapping.
``labels`` provides one name per dimension (from the dataset's feature metadata); when absent or
the wrong length, dimensions fall back to their index. A scalar feature becomes a single entry.
Missing or ``None`` features yield an empty mapping.
"""
v = sample.get(key)
if v is None:
return {}
arr = v.numpy() if hasattr(v, "numpy") else np.asarray(v)
if arr.ndim == 0:
return {"0": float(arr)}
return {str(i): float(x) for i, x in enumerate(arr.flatten())}
flat = [float(arr)] if arr.ndim == 0 else [float(x) for x in arr.flatten()]
if labels is None or len(labels) != len(flat):
labels = [str(i) for i in range(len(flat))]
return dict(zip(labels, flat, strict=True))
def serve_foxglove_dataset_playback(
@@ -467,6 +497,11 @@ def serve_foxglove_dataset_playback(
raise ValueError("Cannot visualize an empty episode.")
first_ns, last_ns = times_ns[0], times_ns[-1]
camera_keys = list(dataset.meta.camera_keys)
# Per-dimension series labels from the dataset metadata (e.g. joint names), computed once.
scalar_labels = {
OBS_STATE: _feature_dim_names(dataset.meta.features.get(OBS_STATE)),
ACTION: _feature_dim_names(dataset.meta.features.get(ACTION)),
}
def topic_for(key: str) -> str:
name = key[len(OBS_PREFIX) :] if str(key).startswith(OBS_PREFIX) else str(key)
@@ -484,8 +519,14 @@ def serve_foxglove_dataset_playback(
if np.issubdtype(arr.dtype, np.floating):
arr = (arr * 255.0).clip(0, 255).astype(np.uint8)
_log_foxglove_image(topic_for(key), key, arr, compress_images=compress_images, time_ns=log_time)
_log_foxglove_scalars(f"/{OBS_STR}/state", _frame_to_scalars(sample, OBS_STATE), log_time=log_time)
_log_foxglove_scalars(f"/{ACTION}/state", _frame_to_scalars(sample, ACTION), log_time=log_time)
_log_foxglove_scalars(
f"/{OBS_STR}/state",
_frame_to_scalars(sample, OBS_STATE, scalar_labels[OBS_STATE]),
log_time=log_time,
)
_log_foxglove_scalars(
f"/{ACTION}/state", _frame_to_scalars(sample, ACTION, scalar_labels[ACTION]), log_time=log_time
)
episode_scalars = {}
for feat, label in ((DONE, "done"), (REWARD, "reward"), (_SUCCESS, "success")):
v = sample.get(feat)