refactor(rewards): clean up TOPReward processor/model

This commit is contained in:
Khalil Meftah
2026-05-20 17:39:21 +02:00
parent 70ad322676
commit f6ecb7b955
7 changed files with 568 additions and 928 deletions
+41 -55
View File
@@ -23,9 +23,20 @@ it builds a chat prompt of the form
or not. The answer is: True"
```
forwards it through the VLM, label-masks everything except the very last token, and reads back the log-probability of that token — by default the literal `"True"` that closes the suffix template. The resulting `log P("True" | video + prompt + instruction)` is the reward, and answers the question "given this video, how strongly does the VLM agree that the instruction is satisfied?".
forwards it through the VLM, label-masks everything except the very last token, and reads back the log-probability of that token — by default the literal `"True"` that closes the suffix template. The resulting `log P("True" | video + prompt + instruction)` is the reward.
Because the method only depends on a frozen VLM, TOPReward is **zero-shot**: there are no fine-tuned weights to host. The "model" in LeRobot is a small wrapper around `transformers`' `Qwen3VLForConditionalGeneration` plus the prompt assembly + label-masking logic.
Because the method only depends on a frozen VLM, TOPReward is **zero-shot**: there are no fine-tuned weights to host. The "model" in LeRobot is a small wrapper around `transformers`' `Qwen3VLForConditionalGeneration` plus the label-masking logic. The processor owns the tokeniser and builds the full chat prompt (EO-1/Robometer pattern).
## What the LeRobot integration covers
- Standard `reward_model.type=topreward` configuration through LeRobot.
- VLM loading via the `transformers` `Qwen3VLForConditionalGeneration` API.
- Prompt assembly + tokenisation in the processor (matching upstream `QwenClient.compute_instruction_reward`).
- `compute_reward()` returns one scalar log-prob per sample.
- LeRobot reward-model save/load — `save_pretrained` writes only `config.json` (the VLM is identified by `vlm_name`).
- An offline labeling script that writes a `topreward_progress.parquet` (SARM-compatible schema) for RA-BC and overlay.
The current LeRobot port supports the **Qwen3-VL client only**. Other upstream clients (Gemini, OpenAI, Gemma, Molmo) can be added as follow-up extras.
## Installation Requirements
@@ -54,17 +65,16 @@ TOPReward expects:
In LeRobot datasets the preprocessor reads:
| Config field | Default | Meaning |
| ------------------------- | --------------------------- | ----------------------------------------------------------------------- |
| ------------------------- | --------------------------- | --------------------------------------------- |
| `reward_model.image_key` | `observation.images.top` | Camera observation used by TOPReward |
| `reward_model.task_key` | `task` | Key in complementary data that stores the task string |
| `reward_model.max_frames` | `16` | Cap on frames per sample (compute_reward only; predict_curves bypasses) |
| `reward_model.task_key` | `task` | Key in complementary data for the task string |
| `reward_model.max_frames` | `16` | Cap on frames per sample |
| `reward_model.fps` | `2.0` | Metadata passed to the Qwen video processor |
| `reward_model.vlm_name` | `Qwen/Qwen3-VL-8B-Instruct` | Hugging Face Hub id of the underlying VLM |
The model returns:
- `compute_reward(batch)`: one log-probability per sample. Higher = better taskvideo alignment. When `success_threshold` is finite, returns the binary thresholded value instead.
- `predict_curves(batch, num_prefixes=None)`: per-frame progress curve in `[0, 1]` (min-max normalised log-probs over prefix lengths). `num_prefixes=None` is fully dense; `num_prefixes=15` matches the upstream sparse-dense default with linear interpolation between anchors.
- `compute_reward(batch)`: one log-probability per sample. Higher = better task-video alignment. When `success_threshold` is finite, returns the binary thresholded value instead.
## Usage
@@ -80,30 +90,6 @@ cfg = TOPRewardConfig(
reward_model = TOPRewardModel(cfg)
```
There is no `from_pretrained` weight download for TOPReward itself — the VLM is fetched from the Hub on construction.
### Score a clip + task
```python
import numpy as np
from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX
# frames: np.ndarray, shape (T, H, W, C), dtype uint8
# task: str
batch = {
f"{TOPREWARD_FEATURE_PREFIX}frames": [frames],
f"{TOPREWARD_FEATURE_PREFIX}task": [task],
}
reward = reward_model.compute_reward(batch) # tensor of shape (1,)
```
For a dense per-frame curve over the same clip:
```python
out = reward_model.predict_curves(batch, num_prefixes=15)
progress = out["progress"][0].numpy() # shape (T,), values in [0, 1]
```
### Use the reward factory
```python
@@ -119,26 +105,21 @@ reward_model = make_reward_model(cfg)
preprocessor, postprocessor = make_reward_pre_post_processors(cfg)
```
The preprocessor writes normalised frames + task strings under the `observation.topreward.*` namespace; the model reads them in `compute_reward`.
The preprocessor tokenises the full prompt (video + prefix + instruction suffix), writes Qwen-VL tensors + `prompt_length` under `observation.topreward.*`. The model reads those tensors, label-masks based on `prompt_length`, and extracts the log-prob reward.
### Offline dataset labeling
Mirror the SARM / Robometer RA-BC flow — write a `topreward_progress.parquet` once, then reuse it for training (RA-BC) and visualisation (overlay videos):
Write a `topreward_progress.parquet` for RA-BC training and overlay videos:
```bash
# Fully dense per-frame labeling
uv run python -m lerobot.rewards.topreward.compute_rabc_weights \
--dataset-repo-id lerobot/libero_10_image \
--device cuda
# Sparse-dense (15 anchors per episode, matches upstream)
uv run python -m lerobot.rewards.topreward.compute_rabc_weights \
--dataset-repo-id lerobot/libero_10_image \
--num-prefixes 15 \
--num-samples 15 \
--device cuda
```
Then render the SARM-style progress overlay for any episode:
Then render the progress overlay for any episode:
```bash
uv run examples/dataset/create_progress_videos.py \
@@ -148,27 +129,28 @@ uv run examples/dataset/create_progress_videos.py \
--gif
```
## Publishing a named TOPReward configuration
## Configuration Notes
Because TOPReward stores no weights of its own, "publishing a TOPReward model" amounts to writing the LeRobot `config.json` (≈ 1 KB) that pins the VLM id, prompt and reduction:
### Prompt knobs
```python
from lerobot.rewards.topreward import TOPRewardConfig, TOPRewardModel
The default prompt mirrors the upstream paper:
cfg = TOPRewardConfig(
vlm_name="Qwen/Qwen3-VL-8B-Instruct",
reduction="mean",
fps=2.0,
)
TOPRewardModel(cfg).save_pretrained("./topreward-qwen3vl-8b")
# Push the directory to the Hub via `huggingface-cli` or `HfApi.upload_folder`.
```text
prompt_prefix = "The above video shows a robot manipulation trajectory that completes the following task: "
prompt_suffix_template = "{instruction} Decide whether the above statement is True or not. The answer is: True"
```
Reloading restores the same configuration (no weight download for TOPReward itself; the VLM is re-fetched via `vlm_name`):
Both are exposed on `TOPRewardConfig` for ablation. The suffix template **must** contain `{instruction}`.
```python
reloaded = TOPRewardModel.from_pretrained("./topreward-qwen3vl-8b")
```
### Chat template
`add_chat_template=True` wraps the full prompt (including instruction) with the tokenizer's chat template before tokenisation. Default is `False`, matching the upstream paper's main experiments.
## Limitations
- The current LeRobot port is **inference-only and zero-shot**; `forward()` is not overridden and `is_trainable` returns `False`.
- Only the **Qwen3-VL family** is supported; other upstream clients are out of scope.
- TOPReward inherits the underlying VLM's biases.
## References
@@ -189,3 +171,7 @@ reloaded = TOPRewardModel.from_pretrained("./topreward-qwen3vl-8b")
year={2026}
}
```
## License
The original TOPReward codebase is MIT-licensed. The LeRobot port follows the LeRobot Apache 2.0 license; the wrapped Qwen3-VL weights are subject to the original Qwen license.
@@ -16,51 +16,22 @@
"""Compute per-frame TOPReward progress curves for a LeRobot dataset.
This mirrors :mod:`lerobot.rewards.sarm.compute_rabc_weights` (and the
ROBOMETER equivalent): it walks every episode in a dataset, runs the
TOPReward zero-shot reward model, and writes a parquet file with one row
per frame. The output uses the same schema SARM produces so existing
consumers — :class:`lerobot.rewards.sarm.rabc.RABCWeights` (which reads
``progress_sparse``) and the SARM-style overlay script in
``examples/dataset/create_progress_videos.py`` — work without modification.
For each episode, scores trajectory prefixes of increasing length using
the TOPReward reward model, min-max normalises the raw log-prob rewards per episode,
and writes a parquet file with one row per frame.
TOPReward is zero-shot: there is no fine-tuned checkpoint to load. The
``--reward-model-path`` argument is therefore optional and only used when
you want to load a TOPReward LeRobot config (e.g. one published on the Hub
that pins ``vlm_name`` and prompt knobs). Otherwise the default
:class:`TOPRewardConfig` is used, which points at
``Qwen/Qwen3-VL-8B-Instruct`` — the VLM is re-downloaded from the HF Hub
on every run unless cached.
Parquet schema:
+--------------------+---------+----------------------------------------+
| column | dtype | meaning |
+====================+=========+========================================+
| ``index`` | int64 | global frame index |
| ``episode_index`` | int64 | episode id |
| ``frame_index`` | int64 | local within-episode index |
| ``progress_sparse``| float32 | per-frame TOPReward progress in [0, 1] |
| | | (RA-BC + overlay read this column) |
+--------------------+---------+----------------------------------------+
The parquet uses the same schema as SARM's :mod:`lerobot.rewards.sarm.compute_rabc_weights`.
Usage:
# Full computation (one VLM forward per frame, slowest but most accurate)
python -m lerobot.rewards.topreward.compute_rabc_weights \\
--dataset-repo-id lerobot/libero_10_image
# Sparse-dense mode: 15 anchor prefixes per episode, interpolated to
# per-frame resolution. Matches upstream TOPReward ``num_samples=15``.
# Sparse-dense mode (15 anchors per episode, matches upstream)
python -m lerobot.rewards.topreward.compute_rabc_weights \\
--dataset-repo-id lerobot/libero_10_image \\
--num-prefixes 15
--num-samples 15
# Use a different VLM backbone
python -m lerobot.rewards.topreward.compute_rabc_weights \\
--dataset-repo-id lerobot/libero_10_image \\
--vlm-name Qwen/Qwen3-VL-4B-Instruct
The output is written to the dataset's local cache directory as
``topreward_progress.parquet`` (or to ``--output-path`` if provided).
"""
from __future__ import annotations
@@ -79,7 +50,8 @@ from tqdm import tqdm
from lerobot.datasets import LeRobotDataset
from lerobot.rewards.topreward.configuration_topreward import TOPRewardConfig
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX
from lerobot.rewards.topreward.processor_topreward import TOPRewardEncoderProcessorStep
from lerobot.types import TransitionKey
DEFAULT_OUTPUT_FILENAME = "topreward_progress.parquet"
@@ -105,22 +77,65 @@ def _resolve_task(sample: dict[str, Any], default: str) -> str:
return default
def _frames_to_uint8_hwc(video: torch.Tensor) -> np.ndarray:
"""Convert a ``(T, C, H, W)`` or ``(T, H, W, C)`` tensor to ``(T, H, W, C) uint8``.
def normalize_rewards(rewards: list[float] | np.ndarray) -> np.ndarray:
"""Min-max normalise raw log-prob rewards into ``[0, 1]``."""
rewards_arr = np.asarray(rewards, dtype=np.float64)
if rewards_arr.size == 0:
return rewards_arr.astype(np.float32)
if rewards_arr.size == 1:
return np.array([1.0], dtype=np.float32)
r_min, r_max = rewards_arr.min(), rewards_arr.max()
if r_max == r_min:
return np.ones_like(rewards_arr, dtype=np.float32)
return ((rewards_arr - r_min) / (r_max - r_min)).astype(np.float32)
Inlined here (rather than reusing the processor) so the labeling script
can side-step the ``max_frames`` tail-crop and feed full trajectories
to :meth:`TOPRewardModel.predict_curves`.
"""
if video.shape[1] in (1, 3):
video = video.permute(0, 2, 3, 1)
elif video.shape[-1] not in (1, 3):
raise ValueError(f"Expected channel dim of size 1 or 3, got shape {tuple(video.shape)}")
array = video.detach().cpu().numpy()
if np.issubdtype(array.dtype, np.floating) and array.size > 0 and array.max() <= 1.0:
array = array * 255.0
return np.clip(array, 0, 255).astype(np.uint8)
def compute_instruction_rewards_for_prefixes(
model: TOPRewardModel,
encoder: TOPRewardEncoderProcessorStep,
dataset: LeRobotDataset,
ep_start: int,
num_frames: int,
task: str,
image_key: str,
num_samples: int | None,
device: str,
) -> np.ndarray:
"""Score an episode via prefix sweep and return a per-frame normalised curve."""
if num_samples is None or num_samples >= num_frames:
prefix_lengths = np.arange(1, num_frames + 1, dtype=np.int64)
else:
prefix_lengths = np.unique(np.linspace(1, num_frames, num_samples).round().astype(np.int64))
rewards: list[float] = []
for length in prefix_lengths:
frames = torch.stack([dataset[ep_start + i][image_key] for i in range(int(length))])
frames = frames.unsqueeze(0) # (1, T, C, H, W)
transition = {
TransitionKey.OBSERVATION: {image_key: frames},
TransitionKey.COMPLEMENTARY_DATA: {"task": task},
}
encoded = encoder(transition)
obs = encoded[TransitionKey.OBSERVATION]
batch = {
key: value.to(device) if isinstance(value, torch.Tensor) else value for key, value in obs.items()
}
with torch.no_grad():
reward = model.compute_reward(batch)
rewards.append(float(reward.item()))
normalized_rewards = normalize_rewards(rewards)
if prefix_lengths.shape[0] == num_frames:
return normalized_rewards
return np.interp(
np.arange(1, num_frames + 1, dtype=np.float64),
prefix_lengths.astype(np.float64),
normalized_rewards.astype(np.float64),
).astype(np.float32)
def compute_topreward_progress(
@@ -129,41 +144,18 @@ def compute_topreward_progress(
vlm_name: str | None = None,
output_path: str | None = None,
device: str = "cuda",
num_prefixes: int | None = None,
num_samples: int | None = None,
fps: float | None = None,
reduction: str | None = None,
use_video_description: bool = False,
episodes: list[int] | None = None,
) -> Path:
"""Run TOPReward over a dataset and write per-frame progress.
Args:
dataset_repo_id: Hugging Face dataset repo id or local path.
reward_model_path: Optional TOPReward LeRobot config repo / dir to
load (a tiny ``config.json``). When ``None`` (default), a
fresh :class:`TOPRewardConfig` is constructed from the CLI
overrides.
vlm_name: Override the VLM backbone (HF Hub id).
output_path: Where to write the parquet. Defaults to
``<dataset_root>/topreward_progress.parquet``.
device: Device for the VLM.
num_prefixes: Number of evenly-spaced anchor prefixes per episode.
``None`` (default) = fully dense (one VLM forward per frame).
Set to ``15`` to match upstream TOPReward ``num_samples=15``.
fps: Override the config's ``fps``.
reduction: Override the config's ``reduction`` (``"mean"`` / ``"sum"``).
use_video_description: Override the config's ``use_video_description``.
Returns:
Path to the written parquet file.
"""
"""Run TOPReward over a dataset and write per-frame progress."""
if reward_model_path is not None:
logging.info(f"Loading TOPReward config from: {reward_model_path}")
model = TOPRewardModel.from_pretrained(reward_model_path)
config = model.config
# Apply CLI overrides on top of the loaded config.
if vlm_name is not None and vlm_name != config.vlm_name:
logging.info(f"Overriding vlm_name from config: {config.vlm_name} -> {vlm_name}")
# vlm_name affects the loaded weights; reload from scratch.
config.vlm_name = vlm_name
config.device = device
model = TOPRewardModel(config)
@@ -175,28 +167,40 @@ def compute_topreward_progress(
config_kwargs["fps"] = fps
if reduction is not None:
config_kwargs["reduction"] = reduction
if use_video_description:
config_kwargs["use_video_description"] = True
config = TOPRewardConfig(**config_kwargs)
logging.info(f"Constructing TOPReward with VLM: {config.vlm_name}")
model = TOPRewardModel(config)
model.to(device).eval()
encoder = TOPRewardEncoderProcessorStep(
vlm_name=config.vlm_name,
image_key=config.image_key,
task_key=config.task_key,
default_task=config.default_task,
max_frames=None, # no tail-crop: we control prefix length explicitly
fps=config.fps,
prompt_prefix=config.prompt_prefix,
prompt_suffix_template=config.prompt_suffix_template,
add_chat_template=config.add_chat_template,
max_length=config.max_input_length,
)
image_key = config.image_key
frames_key = f"{TOPREWARD_FEATURE_PREFIX}frames"
task_batch_key = f"{TOPREWARD_FEATURE_PREFIX}task"
logging.info(f"Loading dataset: {dataset_repo_id}")
dataset = LeRobotDataset(dataset_repo_id, download_videos=True)
logging.info(f"Dataset: {dataset.num_episodes} episodes, {dataset.num_frames} frames")
episode_indices = list(range(dataset.num_episodes)) if episodes is None else episodes
logging.info(f"Processing {len(episode_indices)} episode(s)")
all_index: list[int] = []
all_episode: list[int] = []
all_frame: list[int] = []
all_progress: list[float] = []
for episode_idx in tqdm(range(dataset.num_episodes), desc="Episodes"):
for episode_idx in tqdm(episode_indices, desc="Episodes"):
ep = dataset.meta.episodes[episode_idx]
ep_start = int(ep["dataset_from_index"])
ep_end = int(ep["dataset_to_index"])
@@ -207,16 +211,17 @@ def compute_topreward_progress(
first_sample = dataset[ep_start]
task = _resolve_task(first_sample, default=config.default_task or "perform the task")
# Read the whole episode into one (N, C, H, W) tensor and convert
# to (N, H, W, C) uint8 — same format ``TOPREWARD_FEATURE_PREFIX.frames``
# expects. We deliberately bypass the encoder step here so its
# ``max_frames`` tail-crop doesn't clip the prefix sweep.
ep_video = torch.stack([dataset[ep_start + i][image_key] for i in range(num_frames)])
ep_frames_uint8 = _frames_to_uint8_hwc(ep_video)
batch = {frames_key: [ep_frames_uint8], task_batch_key: [task]}
out = model.predict_curves(batch, num_prefixes=num_prefixes)
per_frame = out["progress"][0, :num_frames].cpu().numpy()
per_frame = compute_instruction_rewards_for_prefixes(
model=model,
encoder=encoder,
dataset=dataset,
ep_start=ep_start,
num_frames=num_frames,
task=task,
image_key=image_key,
num_samples=num_samples,
device=device,
)
for local in range(num_frames):
all_index.append(ep_start + local)
@@ -232,13 +237,10 @@ def compute_topreward_progress(
"index": np.asarray(all_index, dtype=np.int64),
"episode_index": np.asarray(all_episode, dtype=np.int64),
"frame_index": np.asarray(all_frame, dtype=np.int64),
# Same column name SARM uses so RABCWeights + the overlay
# script read TOPReward's output without per-model branching.
"progress_sparse": np.asarray(all_progress, dtype=np.float32),
}
)
# Persist provenance metadata: the LeRobot path (if any) and the VLM id.
schema_metadata: dict[bytes, bytes] = {b"vlm_name": config.vlm_name.encode()}
if reward_model_path is not None:
schema_metadata[b"reward_model_path"] = reward_model_path.encode()
@@ -266,14 +268,10 @@ def main():
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Full RA-BC computation with the default Qwen3-VL-8B-Instruct backbone
python -m lerobot.rewards.topreward.compute_rabc_weights \\
--dataset-repo-id lerobot/libero_10_image
# Sparse-dense mode (matches upstream TOPReward num_samples=15)
python -m lerobot.rewards.topreward.compute_rabc_weights \\
--dataset-repo-id lerobot/libero_10_image \\
--num-prefixes 15
--num-samples 15
# Use a smaller VLM
python -m lerobot.rewards.topreward.compute_rabc_weights \\
@@ -282,66 +280,33 @@ Examples:
""",
)
parser.add_argument(
"--dataset-repo-id",
type=str,
required=True,
help="HuggingFace dataset repo id or local path.",
"--dataset-repo-id", type=str, required=True, help="HuggingFace dataset repo id or local path."
)
parser.add_argument(
"--reward-model-path",
type=str,
default=None,
help="Optional TOPReward LeRobot config (repo id or local dir). "
"Falls back to a fresh TOPRewardConfig if unset.",
"--reward-model-path", type=str, default=None, help="Optional TOPReward LeRobot config."
)
parser.add_argument("--vlm-name", type=str, default=None, help="Override the VLM backbone (HF Hub id).")
parser.add_argument("--output-path", type=str, default=None, help="Output parquet path.")
parser.add_argument("--device", type=str, default="cuda", help="Device to use (default: cuda).")
parser.add_argument(
"--vlm-name",
type=str,
default=None,
help="Override the VLM backbone (HF Hub id).",
)
parser.add_argument(
"--output-path",
type=str,
default=None,
help="Output parquet path. Defaults to <dataset_root>/topreward_progress.parquet.",
)
parser.add_argument(
"--device",
type=str,
default="cuda",
help="Device to use (default: cuda).",
)
parser.add_argument(
"--num-prefixes",
"--num-samples",
type=int,
default=None,
help="Evenly-spaced anchor prefixes per episode. None = fully dense "
"(one VLM forward per frame). 15 matches upstream TOPReward.",
help="Anchor prefix samples per episode. None = dense. 15 matches upstream.",
)
parser.add_argument(
"--fps",
type=float,
"--episodes",
type=int,
nargs="+",
default=None,
help="Override TOPRewardConfig.fps (frames per second for the Qwen video processor).",
help="Process only these episode indices (e.g. --episodes 0 or --episodes 0 5 10).",
)
parser.add_argument("--fps", type=float, default=None, help="Override TOPRewardConfig.fps.")
parser.add_argument(
"--reduction", type=str, default=None, choices=["mean", "sum"], help="Override reduction."
)
parser.add_argument(
"--reduction",
type=str,
default=None,
choices=["mean", "sum"],
help="Override TOPRewardConfig.reduction.",
)
parser.add_argument(
"--use-video-description",
action="store_true",
help="Generate an instruction-agnostic video description and prepend "
"it as context before scoring (doubles VLM calls per prefix).",
)
parser.add_argument(
"--push-to-hub",
action="store_true",
help="Upload the progress file to the dataset repo on HuggingFace Hub.",
"--push-to-hub", action="store_true", help="Upload to the dataset repo on HuggingFace Hub."
)
args = parser.parse_args()
@@ -354,10 +319,10 @@ Examples:
vlm_name=args.vlm_name,
output_path=args.output_path,
device=args.device,
num_prefixes=args.num_prefixes,
num_samples=args.num_samples,
fps=args.fps,
reduction=args.reduction,
use_video_description=args.use_video_description,
episodes=args.episodes,
)
print(f"\nTOPReward progress saved to: {output_path}")
@@ -67,11 +67,6 @@ class TOPRewardConfig(RewardModelConfig):
add_chat_template: If ``True``, wrap the full prompt with the
tokenizer's chat template before tokenisation (matches
upstream ``add_chat_template=True``).
use_video_description: If ``True``, make an extra VLM call to
produce an instruction-agnostic video description and prepend
it as additional context. Doubles inference cost but avoids
circular grounding when the instruction names objects shown
in frames.
reduction: Reduction over per-token log-probs of the suffix
tokens (``"mean"`` or ``"sum"``).
success_threshold: Optional log-prob threshold. If finite,
@@ -100,7 +95,6 @@ class TOPRewardConfig(RewardModelConfig):
prompt_prefix: str = DEFAULT_PROMPT_PREFIX
prompt_suffix_template: str = DEFAULT_PROMPT_SUFFIX_TEMPLATE
add_chat_template: bool = False
use_video_description: bool = False
reduction: str = "mean"
success_threshold: float = float("-inf")
@@ -28,17 +28,16 @@ and returns that log-likelihood as the reward signal.
Inference recipe:
1. Build a chat-style prompt:
``[video(frames, fps), text=prompt_prefix, text="{instruction} ... True"]``
2. Forward the full token sequence through the VLM.
3. Mask all but the final token with ``-100`` (``prompt_length = input_len - 1``,
mirrored from upstream). After the standard causal-LM next-token shift, this
isolates the single position where the model predicts the literal ``"True"``
that ends the prompt — the binary "is the instruction true given the video?"
answer.
4. Read that token's log-probability from the logits and reduce it (mean or sum
— equivalent for a single token, kept for API parity with upstream) into a
scalar reward.
1. The processor builds a chat-style prompt, tokenises it, and emits
``input_ids``, ``attention_mask``, vision tensors, and ``prompt_length``.
2. The model label-masks everything before ``prompt_length`` with ``-100``.
3. Forward the full token sequence through the VLM.
4. Read per-token log-probabilities of the unmasked suffix tokens from the
logits and reduce them (mean or sum) into a scalar reward.
With the default ``prompt_suffix_template`` and ``prompt_length = input_len - 1``
(mirrored from upstream), the only unmasked token is the literal ``"True"``
at the end — the reward is ``log P("True" | video + prompt + instruction)``.
This LeRobot port is **inference-only and not trainable** — :meth:`forward`
is intentionally inherited from :class:`PreTrainedRewardModel` and raises
@@ -59,37 +58,33 @@ import logging
import os
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any, TypeVar, cast
from typing import TYPE_CHECKING, Any, TypeVar
import numpy as np
import torch
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.constants import CONFIG_NAME
from huggingface_hub.errors import HfHubHTTPError
from PIL import Image
from torch import Tensor
from lerobot.configs.rewards import RewardModelConfig
from lerobot.rewards.pretrained import PreTrainedRewardModel
from lerobot.rewards.topreward.configuration_topreward import TOPRewardConfig
from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX
from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX, TOPREWARD_INPUT_KEYS
from lerobot.utils.import_utils import _transformers_available, require_package
if TYPE_CHECKING:
from lerobot.configs.train import TrainPipelineConfig
if TYPE_CHECKING or _transformers_available:
from transformers import AutoProcessor, Qwen3VLForConditionalGeneration
from transformers import Qwen3VLForConditionalGeneration
else:
AutoProcessor = None # type: ignore[assignment]
Qwen3VLForConditionalGeneration = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
T = TypeVar("T", bound="TOPRewardModel")
_TRUE_ANSWER = "True"
def _torch_dtype(name: str) -> torch.dtype | str:
"""Resolve a torch dtype name; ``"auto"`` is passed through verbatim."""
@@ -101,33 +96,6 @@ def _torch_dtype(name: str) -> torch.dtype | str:
raise ValueError(f"Unknown torch dtype: {name!r}")
def _frames_to_pil(frames: np.ndarray) -> list[Image.Image]:
"""Convert ``(T, H, W, C)`` uint8 frames to a list of PIL images."""
if frames.ndim != 4:
raise ValueError(f"Expected (T,H,W,C) frames; got shape {frames.shape}")
if frames.dtype != np.uint8:
frames = np.clip(frames, 0, 255).astype(np.uint8)
return [Image.fromarray(frames[i]) for i in range(frames.shape[0])]
def minmax_normalize_rewards(rewards: list[float] | np.ndarray) -> np.ndarray:
"""Min-max normalise raw log-prob rewards into ``[0, 1]``.
Matches upstream ``QwenClient.normalize_rewards(rewards, method="minmax")``:
a single-element input maps to ``[1.0]`` (no information to scale), and a
flat input (``max == min``) maps to all-ones.
"""
rewards_arr = np.asarray(rewards, dtype=np.float64)
if rewards_arr.size == 0:
return rewards_arr.astype(np.float32)
if rewards_arr.size == 1:
return np.array([1.0], dtype=np.float32)
r_min, r_max = rewards_arr.min(), rewards_arr.max()
if r_max == r_min:
return np.ones_like(rewards_arr, dtype=np.float32)
return ((rewards_arr - r_min) / (r_max - r_min)).astype(np.float32)
class TOPRewardModel(PreTrainedRewardModel):
"""TOPReward zero-shot reward model."""
@@ -136,7 +104,6 @@ class TOPRewardModel(PreTrainedRewardModel):
def __init__(self, config: TOPRewardConfig) -> None:
require_package("transformers", extra="topreward")
require_package("qwen-vl-utils", extra="topreward", import_name="qwen_vl_utils")
super().__init__(config)
self.config = config
@@ -145,117 +112,64 @@ class TOPRewardModel(PreTrainedRewardModel):
if config.attn_implementation is not None:
model_kwargs["attn_implementation"] = config.attn_implementation
# TOPReward is zero-shot: load the VLM as-is from the Hub. No
# weights of our own, no embedding resize, no head wiring.
self.model = Qwen3VLForConditionalGeneration.from_pretrained(config.vlm_name, **model_kwargs)
self.processor = AutoProcessor.from_pretrained(config.vlm_name, trust_remote_code=True)
def compute_reward(self, batch: dict[str, Any]) -> Tensor:
"""Return one log-prob reward per sample in the batch.
"""Return one log-prob reward per sample in the batch."""
inputs = {
key: batch[f"{TOPREWARD_FEATURE_PREFIX}{key}"]
for key in TOPREWARD_INPUT_KEYS
if f"{TOPREWARD_FEATURE_PREFIX}{key}" in batch
}
if "input_ids" not in inputs:
raise KeyError(
f"TOPReward batch missing pre-encoded inputs (expected "
f"`{TOPREWARD_FEATURE_PREFIX}input_ids`). Make sure the "
"TOPRewardEncoderProcessorStep ran before `compute_reward`."
)
prompt_lengths = inputs.pop("prompt_length")
device = next(self.model.parameters()).device
inputs = {key: value.to(device) if hasattr(value, "to") else value for key, value in inputs.items()}
labels = inputs["input_ids"].clone()
for i, plen in enumerate(prompt_lengths.tolist()):
labels[i, : int(plen)] = -100
if "attention_mask" in inputs:
labels = labels.masked_fill(inputs["attention_mask"] == 0, -100)
self.eval()
with torch.no_grad():
outputs = self.model(**inputs, labels=labels)
logits = outputs.logits[:, :-1, :]
target_labels = labels[:, 1:]
log_probs = torch.nn.functional.log_softmax(logits, dim=-1)
mask = target_labels != -100
safe_targets = target_labels.masked_fill(~mask, 0)
token_log_probs = log_probs.gather(-1, safe_targets.unsqueeze(-1)).squeeze(-1)
batch_size = inputs["input_ids"].shape[0]
rewards = []
for i in range(batch_size):
sample_log_probs = token_log_probs[i][mask[i]]
if sample_log_probs.numel() == 0:
raise RuntimeError(
"TOPReward could not isolate any suffix tokens to score. Check that "
"`prompt_suffix_template` produces at least one tokenised character."
)
if self.config.reduction == "sum":
rewards.append(sample_log_probs.sum().item())
else:
rewards.append(sample_log_probs.mean().item())
Expects a batch produced by :class:`TOPRewardEncoderProcessorStep`:
``observation[f"{TOPREWARD_FEATURE_PREFIX}frames"]`` is a list of
``(T, H, W, C) uint8`` numpy arrays (one per sample) and
``observation[f"{TOPREWARD_FEATURE_PREFIX}task"]`` is a list of
task strings of the same length.
"""
frames_per_sample, tasks = self._unpack_batch(batch)
rewards = [
self._compute_log_prob_reward(frames, task)
for frames, task in zip(frames_per_sample, tasks, strict=True)
]
out = torch.as_tensor(rewards, dtype=torch.float32)
if np.isfinite(self.config.success_threshold):
out = (out > self.config.success_threshold).float()
return out.to(self.config.device or "cpu")
@torch.no_grad()
def predict_curves(
self,
batch: dict[str, Any],
*,
num_prefixes: int | None = None,
) -> dict[str, Tensor]:
"""Per-sample dense progress curves over prefixes ``[0, t]``.
Mirrors upstream ``compute_instruction_rewards_for_prefixes``: for
each sample we run one VLM forward per prefix length and read the
log-prob reward at that prefix. Raw log-probs are then min-max
normalised per-trajectory to ``[0, 1]``. Because trajectories
within a batch can have different lengths, the returned
``progress`` tensor is right-padded with ``NaN`` to the longest
trajectory in the batch.
Args:
batch: Same input as :meth:`compute_reward`.
num_prefixes: How many evenly-spaced prefix lengths to score
per trajectory. ``None`` (default) uses every prefix
length ``[1, N]`` → fully dense, ``N`` VLM forwards per
trajectory. Pass a smaller integer (e.g. ``15``, the
upstream default) for sparse-dense scoring with linear
interpolation between anchors.
Returns:
Dict with one float32 CPU tensor:
- ``progress``: ``(B, T_max)`` — per-frame progress in
``[0, 1]`` (min-max normalised log-prob curve), padded with
``NaN``.
"""
if num_prefixes is not None and num_prefixes < 1:
raise ValueError(f"num_prefixes must be >= 1 or None, got {num_prefixes}")
frames_per_sample, tasks = self._unpack_batch(batch)
curves: list[np.ndarray] = []
max_len = 0
for frames, task in zip(frames_per_sample, tasks, strict=True):
num_frames = int(frames.shape[0])
if num_frames == 0:
curves.append(np.zeros(0, dtype=np.float32))
continue
if num_prefixes is None or num_prefixes >= num_frames:
anchor_lengths = np.arange(1, num_frames + 1, dtype=np.int64)
else:
# Match upstream: linspace from 1 to N, dedupe (rounding
# collisions for short trajectories), sort ascending.
anchor_lengths = np.unique(np.linspace(1, num_frames, num_prefixes).round().astype(np.int64))
raw_rewards = [self._compute_log_prob_reward(frames[:length], task) for length in anchor_lengths]
normalized_at_anchors = minmax_normalize_rewards(raw_rewards)
# Linear interpolation back to per-frame resolution when
# `num_prefixes < num_frames`.
if anchor_lengths.shape[0] == num_frames:
per_frame = normalized_at_anchors
else:
per_frame = np.interp(
np.arange(1, num_frames + 1, dtype=np.float64),
anchor_lengths.astype(np.float64),
normalized_at_anchors.astype(np.float64),
).astype(np.float32)
curves.append(per_frame)
max_len = max(max_len, num_frames)
padded = np.full((len(curves), max_len), np.nan, dtype=np.float32)
for i, curve in enumerate(curves):
padded[i, : curve.shape[0]] = curve
return {"progress": torch.from_numpy(padded)}
# ------------------------------------------------------------------
# Save / load — VLM weights are not stored in our checkpoint
# ------------------------------------------------------------------
def _save_pretrained(self, save_directory: Path) -> None:
"""Save ``config.json`` only.
TOPReward has no fine-tuned weights of its own — the VLM is
identified by :attr:`TOPRewardConfig.vlm_name` and lives on the
Hugging Face Hub under that id. Writing the VLM into a
``model.safetensors`` here would just duplicate ~16 GB of Qwen
weights under our org for no benefit.
"""
"""Save ``config.json`` only."""
self.config._save_pretrained(save_directory)
@classmethod
@@ -271,19 +185,10 @@ class TOPRewardModel(PreTrainedRewardModel):
cache_dir: str | Path | None = None,
local_files_only: bool = False,
revision: str | None = None,
strict: bool = False, # accepted for API parity; unused
strict: bool = False, # noqa: ARG003 — accepted for API parity; unused (no safetensors to load)
**kwargs: Any,
) -> T:
"""Load a TOPReward configuration and instantiate the wrapped VLM.
Two modes:
- Local directory containing ``config.json``: read the config and
rebuild the model. The VLM is re-fetched from the Hub via
:attr:`TOPRewardConfig.vlm_name`.
- HF Hub repo id: download just ``config.json``, same as above.
"""
del strict # TOPReward has no weights of its own to (strictly) load.
"""Load a TOPReward configuration and instantiate the wrapped VLM."""
if config is None:
config = RewardModelConfig.from_pretrained(
pretrained_name_or_path=pretrained_name_or_path,
@@ -305,7 +210,6 @@ class TOPRewardModel(PreTrainedRewardModel):
model_id = str(pretrained_name_or_path)
if not os.path.isdir(model_id):
# Validate that the remote repo at least contains a TOPReward config.json
try:
hf_hub_download(
repo_id=model_id,
@@ -329,11 +233,7 @@ class TOPRewardModel(PreTrainedRewardModel):
return instance
def push_model_to_hub(self, cfg: TrainPipelineConfig):
"""Push the TOPReward ``config.json`` + model card to the Hub.
Skips the safetensors upload — the wrapped VLM is identified by
``vlm_name`` and we never modify it.
"""
"""Push the TOPReward ``config.json`` + model card to the Hub."""
api = HfApi()
repo_id = api.create_repo(
repo_id=self.config.repo_id, private=self.config.private, exist_ok=True
@@ -362,202 +262,3 @@ class TOPRewardModel(PreTrainedRewardModel):
)
logger.info(f"Model pushed to {commit_info.repo_url.url}")
def _unpack_batch(self, batch: dict[str, Any]) -> tuple[list[np.ndarray], list[str]]:
frames_key = f"{TOPREWARD_FEATURE_PREFIX}frames"
task_key = f"{TOPREWARD_FEATURE_PREFIX}task"
if frames_key not in batch or task_key not in batch:
raise KeyError(
"TOPReward batch missing pre-encoded inputs (expected "
f"`{frames_key}` and `{task_key}`). Make sure the "
"TOPRewardEncoderProcessorStep ran before `compute_reward`."
)
frames_per_sample = list(batch[frames_key])
tasks = list(batch[task_key])
if len(frames_per_sample) != len(tasks):
raise ValueError(
f"frames batch size ({len(frames_per_sample)}) does not match task batch size ({len(tasks)})"
)
return frames_per_sample, tasks
@torch.no_grad()
def _compute_log_prob_reward(self, frames: np.ndarray, instruction: str) -> float:
"""Compute the log-likelihood of the final answer token given the prompt.
Port of ``QwenClient.compute_instruction_reward`` (the upstream
TOPReward implementation), stripped of the
:class:`InstructionRewardResult` metadata wrapper we don't need.
Returns ``log P(final_token | video + prompt + instruction)`` — by
default the final token is the literal ``"True"`` that closes the
suffix template, which is the binary "is the instruction satisfied"
signal the paper describes.
"""
device = next(self.model.parameters()).device
pil_frames = _frames_to_pil(frames)
if self.config.use_video_description:
description = self._generate_object_state_reasoning(pil_frames)
prompt_text = (
f"{description} Therefore given the above description and the "
"video, the video shows a robot manipulation trajectory that "
"**completes** the following instruction: "
)
else:
prompt_text = self.config.prompt_prefix
eos_token = self.processor.tokenizer.eos_token
instruction_suffix = self.config.prompt_suffix_template.format(instruction=instruction)
# Two prompt assembly modes match the upstream:
#
# - ``add_chat_template=True``: wrap the FULL prompt (including
# instruction) with the chat template, then append the literal
# ``"True"`` token outside the template.
# - ``add_chat_template=False``: apply the chat template to the
# video+prefix only (no generation prompt), strip the trailing
# EOS, then concatenate the literal instruction suffix.
if self.config.add_chat_template:
# Suffix excluding the trailing "True" — we want "True" to be
# the scored token, not part of the template's user turn.
suffix_for_template = instruction_suffix.removesuffix(_TRUE_ANSWER).rstrip()
templated_messages = [
{
"role": "user",
"content": [
{"type": "video", "video": pil_frames, "fps": self.config.fps},
{"type": "text", "text": f"{prompt_text}{suffix_for_template}"},
],
}
]
prompt_chat = self.processor.apply_chat_template(
templated_messages, tokenize=False, add_generation_prompt=True
)
full_text = f"{prompt_chat}{_TRUE_ANSWER}"
image_inputs, video_inputs = self._process_vision_info(templated_messages)
else:
user_messages = [
{
"role": "user",
"content": [
{"type": "video", "video": pil_frames, "fps": self.config.fps},
{"type": "text", "text": prompt_text},
],
}
]
prompt_chat = self.processor.apply_chat_template(
user_messages, tokenize=False, add_generation_prompt=False
)
if eos_token is not None:
prompt_chat = prompt_chat.split(eos_token)[0]
full_text = f"{prompt_chat}{instruction_suffix}"
image_inputs, video_inputs = self._process_vision_info(user_messages)
inputs = self.processor(
text=[full_text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(device)
input_len = int(inputs["input_ids"].shape[-1])
if input_len > self.config.max_input_length:
raise ValueError(
f"TOPReward input length {input_len} exceeds max_input_length "
f"{self.config.max_input_length}; lower `max_frames` or raise `max_input_length`."
)
labels = inputs["input_ids"].clone()
# Mask everything except the very last token. ``prompt_length = input_len - 1``
# mirrors upstream ``QwenClient.compute_instruction_reward``; after the
# causal-LM next-token shift below this isolates exactly one position —
# the prediction of the literal ``"True"`` that closes ``prompt_suffix_template``.
# The resulting reward is therefore ``log P("True" | video + prompt + instruction)``.
prompt_length = input_len - 1
labels[:, :prompt_length] = -100
if "attention_mask" in inputs:
labels = labels.masked_fill(inputs["attention_mask"] == 0, -100)
self.model.eval()
outputs = self.model(**inputs, labels=labels)
logits = outputs.logits[:, :-1, :]
target_labels = labels[:, 1:]
log_probs = torch.nn.functional.log_softmax(logits, dim=-1)
mask = target_labels != -100
safe_targets = target_labels.masked_fill(~mask, 0)
token_log_probs = log_probs.gather(-1, safe_targets.unsqueeze(-1)).squeeze(-1)
masked_log_probs = token_log_probs[mask]
if masked_log_probs.numel() == 0:
raise RuntimeError(
"TOPReward could not isolate any suffix tokens to score. Check that "
"`prompt_suffix_template` produces at least one tokenised character."
)
# ``mean`` vs ``sum`` are equivalent for a single scored token but the
# knob is kept for API parity with upstream (and for forward-compat with
# any future variant that scores more than the final answer token).
if self.config.reduction == "sum":
reward = masked_log_probs.sum().item()
else: # mean
reward = masked_log_probs.mean().item()
return float(reward)
@torch.no_grad()
def _generate_object_state_reasoning(self, pil_frames: list[Image.Image]) -> str:
"""Instruction-agnostic trajectory description (upstream
``QwenClient.generate_object_state_reasoning``). Used when
:attr:`TOPRewardConfig.use_video_description` is ``True``.
"""
device = next(self.model.parameters()).device
user_messages = [
{
"role": "user",
"content": [
{"type": "video", "video": pil_frames, "fps": self.config.fps},
{
"type": "text",
"text": "Describe the robot manipulation trajectory in this video:",
},
],
}
]
prompt_chat = self.processor.apply_chat_template(
user_messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = self._process_vision_info(user_messages)
inputs = self.processor(
text=[prompt_chat],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
).to(device)
self.model.eval()
output_ids = self.model.generate(
**inputs,
max_new_tokens=256,
do_sample=False,
)
response = self.processor.batch_decode(
output_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
prompt_decoded = self.processor.batch_decode(
inputs["input_ids"], skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
if response.startswith(prompt_decoded):
return response[len(prompt_decoded) :].strip()
return response.strip()
@staticmethod
def _process_vision_info(messages: list[dict[str, Any]]) -> tuple[Any, Any]:
"""Thin wrapper around ``qwen_vl_utils.process_vision_info``.
Kept as a method so tests can monkey-patch it without depending on
the import-time presence of ``qwen_vl_utils``.
"""
from qwen_vl_utils import process_vision_info
return cast(tuple[Any, Any], process_vision_info(messages))
@@ -16,11 +16,12 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import numpy as np
import torch
from PIL import Image
from torch import Tensor
from lerobot.configs import PipelineFeatureType, PolicyFeature
@@ -33,7 +34,11 @@ from lerobot.processor import (
ProcessorStepRegistry,
policy_action_to_transition,
)
from lerobot.rewards.topreward.configuration_topreward import TOPRewardConfig
from lerobot.rewards.topreward.configuration_topreward import (
DEFAULT_PROMPT_PREFIX,
DEFAULT_PROMPT_SUFFIX_TEMPLATE,
TOPRewardConfig,
)
from lerobot.types import EnvTransition, TransitionKey
from lerobot.utils.constants import (
OBS_IMAGES,
@@ -41,20 +46,32 @@ from lerobot.utils.constants import (
POLICY_POSTPROCESSOR_DEFAULT_NAME,
POLICY_PREPROCESSOR_DEFAULT_NAME,
)
from lerobot.utils.import_utils import _transformers_available, require_package
if TYPE_CHECKING or _transformers_available:
from transformers import AutoProcessor
else:
AutoProcessor = None
# Namespace for TOPReward's pre-encoded observation tensors written by the
# processor and consumed by the model. Keys: ``frames`` (one ``(T,H,W,C)``
# uint8 numpy array per sample) and ``task`` (one string per sample).
TOPREWARD_FEATURE_PREFIX = f"{OBS_PREFIX}topreward."
_TRUE_ANSWER = "True"
TOPREWARD_VLM_INPUT_KEYS = (
"input_ids",
"attention_mask",
"pixel_values",
"pixel_values_videos",
"image_grid_thw",
"video_grid_thw",
"second_per_grid_ts",
)
TOPREWARD_METADATA_KEYS = ("prompt_length",)
TOPREWARD_INPUT_KEYS = TOPREWARD_VLM_INPUT_KEYS + TOPREWARD_METADATA_KEYS
def _video_to_numpy(video: Tensor, *, max_frames: int | None) -> np.ndarray:
"""Convert one trajectory tensor to a ``(T, H, W, C) uint8`` numpy array.
Mirrors the Robometer helper: accepts ``(T, C, H, W)`` or ``(T, H, W, C)``
layouts, rescales floats in ``[0, 1]`` to ``[0, 255]``, clips values
outside the uint8 range and tail-crops to ``max_frames``.
"""
"""Convert one trajectory tensor to a ``(T, H, W, C) uint8`` numpy array."""
if max_frames is not None:
video = video[-max_frames:]
if video.shape[1] in (1, 3):
@@ -68,6 +85,15 @@ def _video_to_numpy(video: Tensor, *, max_frames: int | None) -> np.ndarray:
return np.clip(array, 0, 255).astype(np.uint8)
def _frames_to_pil(frames: np.ndarray) -> list[Image.Image]:
"""Convert ``(T, H, W, C)`` uint8 frames to a list of PIL images."""
if frames.ndim != 4:
raise ValueError(f"Expected (T,H,W,C) frames; got shape {frames.shape}")
if frames.dtype != np.uint8:
frames = np.clip(frames, 0, 255).astype(np.uint8)
return [Image.fromarray(frames[i]) for i in range(frames.shape[0])]
def _expand_tasks(task: Any, *, batch_size: int, default: str | None) -> list[str]:
if task is None:
task = default
@@ -89,29 +115,41 @@ def _expand_tasks(task: Any, *, batch_size: int, default: str | None) -> list[st
@dataclass
@ProcessorStepRegistry.register(name="topreward_encoder")
class TOPRewardEncoderProcessorStep(ProcessorStep):
"""Normalise raw frames + task into TOPReward-namespaced observation entries.
"""Encode raw frames + task into Qwen-VL tensors for the TOPReward model.
Loads a :class:`~transformers.AutoProcessor` matching ``vlm_name`` and
builds the full chat prompt including the instruction suffix. The
resulting ``input_ids``, ``attention_mask``, vision tensors, and a
per-sample ``prompt_length`` integer are written under the
``observation.topreward.*`` namespace so the model can label-mask and
forward without re-tokenising.
At call time the step reads:
- ``observation[image_key]``: ``(B, T, C, H, W)`` or ``(B, C, H, W)`` frames.
- ``complementary_data[task_key]``: a string or list of strings.
and writes:
- ``observation[f"{TOPREWARD_FEATURE_PREFIX}frames"]``: list of
``(T, H, W, C) uint8`` numpy arrays, one per sample.
- ``observation[f"{TOPREWARD_FEATURE_PREFIX}task"]``: list of strings,
one per sample.
The actual chat-template / tokenisation happens model-side because
TOPReward's reward extraction needs the tokenizer to know the
prompt/suffix split (label masking on suffix tokens only).
and writes ``observation[f"{TOPREWARD_FEATURE_PREFIX}<name>"]`` for the
Qwen-VL tensors plus ``prompt_length``.
"""
vlm_name: str = "Qwen/Qwen3-VL-8B-Instruct"
image_key: str = OBS_IMAGES + ".top"
task_key: str = "task"
default_task: str | None = None
max_frames: int | None = 16
fps: float = 2.0
prompt_prefix: str = DEFAULT_PROMPT_PREFIX
prompt_suffix_template: str = DEFAULT_PROMPT_SUFFIX_TEMPLATE
add_chat_template: bool = False
max_length: int = 32768
_processor: Any = field(default=None, init=False, repr=False)
def __post_init__(self) -> None:
require_package("transformers", extra="topreward")
require_package("qwen-vl-utils", extra="topreward", import_name="qwen_vl_utils")
self._processor = AutoProcessor.from_pretrained(self.vlm_name, trust_remote_code=True)
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
@@ -138,18 +176,125 @@ class TOPRewardEncoderProcessorStep(ProcessorStep):
default=self.default_task,
)
frames_per_sample = [
_video_to_numpy(tensor[i], max_frames=self.max_frames) for i in range(batch_size)
]
encoded = self._encode_batch(tensor, tasks)
new_observation = dict(observation)
new_observation[f"{TOPREWARD_FEATURE_PREFIX}frames"] = frames_per_sample
new_observation[f"{TOPREWARD_FEATURE_PREFIX}task"] = list(tasks)
for key, value in encoded.items():
new_observation[f"{TOPREWARD_FEATURE_PREFIX}{key}"] = value
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = new_observation
return new_transition
def _encode_batch(self, tensor: Tensor, tasks: list[str]) -> dict[str, Any]:
"""Tokenise a batch of (frames, task) pairs into Qwen-VL tensors.
Processes samples one at a time (each may have a different token
length due to different numbers of vision patches), then pads /
stacks the results.
"""
from qwen_vl_utils import process_vision_info
batch_size = tensor.shape[0]
all_encoded: list[dict[str, Any]] = []
all_prompt_lengths: list[int] = []
for i in range(batch_size):
frames_np = _video_to_numpy(tensor[i], max_frames=self.max_frames)
pil_frames = _frames_to_pil(frames_np)
task = tasks[i]
instruction_suffix = self.prompt_suffix_template.format(instruction=task)
eos_token = self._processor.tokenizer.eos_token
if self.add_chat_template:
suffix_for_template = instruction_suffix.removesuffix(_TRUE_ANSWER).rstrip()
templated_messages = [
{
"role": "user",
"content": [
{"type": "video", "video": pil_frames, "fps": self.fps},
{"type": "text", "text": f"{self.prompt_prefix}{suffix_for_template}"},
],
}
]
prompt_chat = self._processor.apply_chat_template(
templated_messages, tokenize=False, add_generation_prompt=True
)
full_text = f"{prompt_chat}{_TRUE_ANSWER}"
image_inputs, video_inputs = process_vision_info(templated_messages)
else:
user_messages = [
{
"role": "user",
"content": [
{"type": "video", "video": pil_frames, "fps": self.fps},
{"type": "text", "text": self.prompt_prefix},
],
}
]
prompt_chat = self._processor.apply_chat_template(
user_messages, tokenize=False, add_generation_prompt=False
)
if eos_token is not None:
prompt_chat = prompt_chat.split(eos_token)[0]
full_text = f"{prompt_chat}{instruction_suffix}"
image_inputs, video_inputs = process_vision_info(user_messages)
inputs = self._processor(
text=[full_text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
input_len = int(inputs["input_ids"].shape[-1])
if input_len > self.max_length:
raise ValueError(
f"TOPReward input length {input_len} exceeds max_length "
f"{self.max_length}; lower `max_frames` or raise `max_length`."
)
prompt_length = input_len - 1
all_encoded.append(inputs)
all_prompt_lengths.append(prompt_length)
result = dict(all_encoded[0]) if batch_size == 1 else self._pad_and_stack(all_encoded)
result["prompt_length"] = torch.tensor(all_prompt_lengths, dtype=torch.long)
return result
@staticmethod
def _pad_and_stack(encoded_list: list[dict[str, Any]]) -> dict[str, Any]:
"""Right-pad and stack per-sample encoded dicts into a batch."""
keys = [k for k in encoded_list[0] if isinstance(encoded_list[0][k], Tensor)]
max_len = max(enc["input_ids"].shape[-1] for enc in encoded_list)
result: dict[str, Any] = {}
for key in keys:
tensors = [enc[key] for enc in encoded_list]
if key in ("input_ids", "attention_mask"):
padded = []
pad_value = 0
for t in tensors:
pad_size = max_len - t.shape[-1]
if pad_size > 0:
padded.append(torch.nn.functional.pad(t, (0, pad_size), value=pad_value))
else:
padded.append(t)
result[key] = torch.cat(padded, dim=0)
else:
if all(t.shape == tensors[0].shape for t in tensors):
result[key] = torch.cat(tensors, dim=0)
else:
result[key] = torch.cat(tensors, dim=0)
for key in encoded_list[0]:
if key not in result:
result[key] = encoded_list[0][key]
return result
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
@@ -157,10 +302,16 @@ class TOPRewardEncoderProcessorStep(ProcessorStep):
def get_config(self) -> dict[str, Any]:
return {
"vlm_name": self.vlm_name,
"image_key": self.image_key,
"task_key": self.task_key,
"default_task": self.default_task,
"max_frames": self.max_frames,
"fps": self.fps,
"prompt_prefix": self.prompt_prefix,
"prompt_suffix_template": self.prompt_suffix_template,
"add_chat_template": self.add_chat_template,
"max_length": self.max_length,
}
@@ -171,23 +322,27 @@ def make_topreward_pre_post_processors(
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""Pipeline that normalises frames + task for the TOPReward model.
"""Pipeline that pre-encodes frames + task into Qwen-VL tensors.
The preprocessor adds a batch dimension if needed, runs TOPReward's
encoder, and moves any tensor entries to the configured device. The
postprocessor is the identity since TOPReward outputs a single reward
tensor.
encoder (which tokenises the full prompt and emits ``prompt_length``),
and moves everything to the configured device. The postprocessor is
the identity since TOPReward outputs a single reward tensor.
"""
del dataset_stats # TOPReward's VLM handles its own normalisation.
preprocessor = PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=[
AddBatchDimensionProcessorStep(),
TOPRewardEncoderProcessorStep(
vlm_name=config.vlm_name,
image_key=config.image_key,
task_key=config.task_key,
default_task=config.default_task,
max_frames=config.max_frames,
fps=config.fps,
prompt_prefix=config.prompt_prefix,
prompt_suffix_template=config.prompt_suffix_template,
add_chat_template=config.add_chat_template,
max_length=config.max_input_length,
),
DeviceProcessorStep(device=config.device or "cpu"),
],
+49 -221
View File
@@ -16,67 +16,71 @@
from __future__ import annotations
import numpy as np
from types import SimpleNamespace
import pytest
import torch
from lerobot.configs.rewards import RewardModelConfig
from lerobot.rewards.factory import get_reward_model_class, make_reward_model_config
from lerobot.rewards.topreward import TOPRewardConfig
from lerobot.rewards.topreward.modeling_topreward import minmax_normalize_rewards
from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX
from tests.utils import skip_if_package_missing
class _FakeTokenizer:
"""Minimal tokenizer surface used by ``TOPRewardModel._compute_log_prob_reward``."""
eos_token = "<|endoftext|>"
class _FakeProcessor:
"""Stand-in for the Qwen ``AutoProcessor`` returned by ``from_pretrained``."""
def __init__(self) -> None:
self.tokenizer = _FakeTokenizer()
@classmethod
def from_pretrained(cls, *args, **kwargs): # noqa: ARG003
return cls()
class _FakeQwenModel(torch.nn.Module):
"""Stand-in for ``Qwen3VLForConditionalGeneration``.
Provides the minimum surface ``TOPRewardModel`` touches at construction
time (a ``parameters()`` iterator for device inference). Actual
``_compute_log_prob_reward`` calls are bypassed by monkey-patching the
method directly in the tests, so we never invoke ``self.model(...)``.
Returns a ``SimpleNamespace`` with ``logits`` of a controlled shape so
the log-prob extraction path in ``compute_reward`` can be exercised
without downloading real VLM weights.
"""
def __init__(self) -> None:
super().__init__()
self._param = torch.nn.Parameter(torch.zeros(1))
self._reward_value: float = -1.5
@classmethod
def from_pretrained(cls, *args, **kwargs): # noqa: ARG003
return cls()
def forward(self, input_ids, attention_mask=None, labels=None, **kwargs): # noqa: ARG002
batch_size, seq_len = input_ids.shape
vocab_size = 1000
logits = torch.zeros(batch_size, seq_len, vocab_size)
# Place a controlled log-prob at the target token position so the
# model returns a predictable reward value.
# The label-masked suffix is the last token (prompt_length = seq_len - 1).
# After the causal-LM shift (logits[:, :-1], labels[:, 1:]) the scored
# position is logits[:, -2, :] predicting labels[:, -1].
# We set logits so that log_softmax at the target token ≈ _reward_value.
if labels is not None:
for i in range(batch_size):
target_idx = int(input_ids[i, -1].item())
logits[i, -2, target_idx] = self._reward_value * -10 # high logit -> high log-prob
return SimpleNamespace(logits=logits)
def _patch_build(monkeypatch) -> None:
"""Stub out HF AutoX so TOPReward construction is cheap and offline."""
from lerobot.rewards.topreward import modeling_topreward
monkeypatch.setattr(modeling_topreward, "Qwen3VLForConditionalGeneration", _FakeQwenModel)
monkeypatch.setattr(modeling_topreward, "AutoProcessor", _FakeProcessor)
def _make_batch(frames: list[np.ndarray], tasks: list[str]) -> dict[str, list]:
def _make_batch(
input_ids: torch.Tensor,
attention_mask: torch.Tensor | None = None,
prompt_length: torch.Tensor | None = None,
) -> dict[str, torch.Tensor]:
"""Build a ``compute_reward``-ready batch using TOPReward's namespaced keys."""
return {
f"{TOPREWARD_FEATURE_PREFIX}frames": frames,
f"{TOPREWARD_FEATURE_PREFIX}task": tasks,
}
batch: dict[str, torch.Tensor] = {f"{TOPREWARD_FEATURE_PREFIX}input_ids": input_ids}
if attention_mask is not None:
batch[f"{TOPREWARD_FEATURE_PREFIX}attention_mask"] = attention_mask
if prompt_length is not None:
batch[f"{TOPREWARD_FEATURE_PREFIX}prompt_length"] = prompt_length
return batch
# ---------------------------------------------------------------------------
@@ -121,32 +125,6 @@ def test_topreward_config_rejects_suffix_without_instruction_placeholder():
TOPRewardConfig(device="cpu", prompt_suffix_template="no placeholder here")
# ---------------------------------------------------------------------------
# minmax_normalize_rewards — pure math helper
# ---------------------------------------------------------------------------
def test_minmax_normalize_rewards_maps_min_and_max_to_zero_and_one():
values = minmax_normalize_rewards([-3.0, -1.0, 0.0, -2.0])
assert values.shape == (4,)
assert values[0] == pytest.approx(0.0)
assert values[2] == pytest.approx(1.0)
# Monotonicity preserved within the input range.
assert values[3] == pytest.approx(1.0 / 3.0, abs=1e-6)
def test_minmax_normalize_rewards_handles_singleton_and_flat_inputs():
# Single element -> mapped to 1.0 (no information to scale).
assert minmax_normalize_rewards([42.0]).tolist() == [1.0]
# All-equal values -> all ones (avoid divide-by-zero).
assert minmax_normalize_rewards([0.5, 0.5, 0.5]).tolist() == [1.0, 1.0, 1.0]
def test_minmax_normalize_rewards_empty_input_returns_empty_array():
out = minmax_normalize_rewards([])
assert out.shape == (0,)
# ---------------------------------------------------------------------------
# compute_reward
# ---------------------------------------------------------------------------
@@ -154,55 +132,43 @@ def test_minmax_normalize_rewards_empty_input_returns_empty_array():
@skip_if_package_missing("transformers")
def test_topreward_compute_reward_returns_one_scalar_per_sample(monkeypatch):
"""``compute_reward`` must return a ``(B,)`` float32 tensor with one
log-prob reward per sample, consuming pre-encoded Qwen-VL tensors."""
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
_patch_build(monkeypatch)
cfg = TOPRewardConfig(device="cpu")
model = TOPRewardModel(cfg)
captured = []
def fake_log_prob(self, frames, instruction): # noqa: ARG002
captured.append((frames.shape, instruction))
return -1.5
monkeypatch.setattr(TOPRewardModel, "_compute_log_prob_reward", fake_log_prob)
frames_a = np.zeros((4, 8, 8, 3), dtype=np.uint8)
frames_b = np.zeros((6, 8, 8, 3), dtype=np.uint8)
batch = _make_batch([frames_a, frames_b], ["pick the cube", "open the drawer"])
input_ids = torch.randint(0, 100, (2, 10))
attention_mask = torch.ones(2, 10, dtype=torch.long)
prompt_length = torch.tensor([9, 9]) # unmask only the last token
batch = _make_batch(input_ids, attention_mask, prompt_length)
rewards = model.compute_reward(batch)
assert rewards.shape == (2,)
assert rewards.dtype == torch.float32
assert torch.allclose(rewards, torch.tensor([-1.5, -1.5]))
# `_compute_log_prob_reward` was called once per sample with the right tasks.
assert [task for _, task in captured] == ["pick the cube", "open the drawer"]
assert [shape[0] for shape, _ in captured] == [4, 6]
@skip_if_package_missing("transformers")
def test_topreward_compute_reward_applies_success_threshold(monkeypatch):
"""When ``success_threshold`` is finite, the model returns binary success
instead of the raw log-prob — useful as a drop-in success detector."""
"""When ``success_threshold`` is finite, the model returns binary success."""
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
_patch_build(monkeypatch)
cfg = TOPRewardConfig(device="cpu", success_threshold=-2.0)
cfg = TOPRewardConfig(device="cpu", success_threshold=0.0)
model = TOPRewardModel(cfg)
rewards_in = iter([-1.5, -3.0]) # first above threshold, second below
monkeypatch.setattr(
TOPRewardModel,
"_compute_log_prob_reward",
lambda _self, _frames, _instr: next(rewards_in),
)
input_ids = torch.randint(0, 100, (2, 10))
attention_mask = torch.ones(2, 10, dtype=torch.long)
prompt_length = torch.tensor([9, 9])
frames = [np.zeros((2, 8, 8, 3), dtype=np.uint8), np.zeros((2, 8, 8, 3), dtype=np.uint8)]
rewards = model.compute_reward(_make_batch(frames, ["task", "task"]))
batch = _make_batch(input_ids, attention_mask, prompt_length)
rewards = model.compute_reward(batch)
assert torch.equal(rewards, torch.tensor([1.0, 0.0]))
assert rewards.shape == (2,)
assert set(rewards.tolist()).issubset({0.0, 1.0})
@skip_if_package_missing("transformers")
@@ -213,137 +179,10 @@ def test_topreward_compute_reward_errors_when_inputs_missing(monkeypatch):
cfg = TOPRewardConfig(device="cpu")
model = TOPRewardModel(cfg)
with pytest.raises(KeyError, match=r"observation\.topreward\."):
with pytest.raises(KeyError, match=r"observation\.topreward\.input_ids"):
model.compute_reward({})
@skip_if_package_missing("transformers")
def test_topreward_compute_reward_errors_when_batch_sizes_mismatch(monkeypatch):
"""frames and task lists must have matching lengths — a stale processor
that produces only one task for a multi-sample batch should surface as
an explicit error, not a silent zip truncation."""
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
_patch_build(monkeypatch)
cfg = TOPRewardConfig(device="cpu")
model = TOPRewardModel(cfg)
monkeypatch.setattr(
TOPRewardModel,
"_compute_log_prob_reward",
lambda _self, _frames, _instr: 0.0,
)
frames = [np.zeros((2, 8, 8, 3), dtype=np.uint8), np.zeros((2, 8, 8, 3), dtype=np.uint8)]
with pytest.raises(ValueError, match="task batch size"):
model.compute_reward(_make_batch(frames, ["only one task"]))
# ---------------------------------------------------------------------------
# predict_curves
# ---------------------------------------------------------------------------
@skip_if_package_missing("transformers")
def test_topreward_predict_curves_runs_one_forward_per_prefix(monkeypatch):
"""``predict_curves`` must call the VLM once per prefix length per
trajectory and write min-max-normalised values back into the curve."""
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
_patch_build(monkeypatch)
cfg = TOPRewardConfig(device="cpu")
model = TOPRewardModel(cfg)
# Simulate a strictly increasing log-prob curve as the prefix grows.
call_log: list[int] = []
def fake_log_prob(self, frames, instruction): # noqa: ARG002
call_log.append(int(frames.shape[0]))
return float(frames.shape[0]) # log-prob = prefix length
monkeypatch.setattr(TOPRewardModel, "_compute_log_prob_reward", fake_log_prob)
frames = np.zeros((5, 8, 8, 3), dtype=np.uint8)
batch = _make_batch([frames], ["lift the cup"])
out = model.predict_curves(batch)
# One forward per prefix length, in order.
assert call_log == [1, 2, 3, 4, 5]
# (B, T_max) shape, padded with NaN beyond each trajectory's length.
assert out["progress"].shape == (1, 5)
# Strictly increasing raw rewards -> min-max-normalised to [0, 1] linearly.
expected = torch.tensor([[0.0, 0.25, 0.5, 0.75, 1.0]])
assert torch.allclose(out["progress"], expected, atol=1e-6)
@skip_if_package_missing("transformers")
def test_topreward_predict_curves_sparse_dense_interpolates_to_full_resolution(monkeypatch):
"""With ``num_prefixes < N`` the model should score only the requested
number of anchor prefixes and linearly interpolate between them — the
upstream sparse-dense pattern (``num_samples=15``)."""
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
_patch_build(monkeypatch)
cfg = TOPRewardConfig(device="cpu")
model = TOPRewardModel(cfg)
call_log: list[int] = []
def fake_log_prob(self, frames, instruction): # noqa: ARG002
call_log.append(int(frames.shape[0]))
return float(frames.shape[0])
monkeypatch.setattr(TOPRewardModel, "_compute_log_prob_reward", fake_log_prob)
frames = np.zeros((9, 8, 8, 3), dtype=np.uint8)
out = model.predict_curves(_make_batch([frames], ["lift the cup"]), num_prefixes=3)
# 3 anchors at linspace(1, 9, 3) -> [1, 5, 9] -> 3 VLM forwards instead of 9.
assert call_log == [1, 5, 9]
# Returned curve is full resolution (9 frames) and monotone in [0, 1].
assert out["progress"].shape == (1, 9)
curve = out["progress"][0].numpy()
assert curve[0] == pytest.approx(0.0)
assert curve[-1] == pytest.approx(1.0)
assert np.all(np.diff(curve) >= 0)
@skip_if_package_missing("transformers")
def test_topreward_predict_curves_rejects_invalid_num_prefixes(monkeypatch):
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
_patch_build(monkeypatch)
model = TOPRewardModel(TOPRewardConfig(device="cpu"))
batch = _make_batch([np.zeros((3, 8, 8, 3), dtype=np.uint8)], ["task"])
with pytest.raises(ValueError, match="num_prefixes must be"):
model.predict_curves(batch, num_prefixes=0)
@skip_if_package_missing("transformers")
def test_topreward_predict_curves_right_pads_with_nan_for_variable_lengths(monkeypatch):
"""Trajectories of different lengths in the same batch are right-padded
with ``NaN`` so the output is a regular ``(B, T_max)`` tensor."""
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
_patch_build(monkeypatch)
cfg = TOPRewardConfig(device="cpu")
model = TOPRewardModel(cfg)
monkeypatch.setattr(
TOPRewardModel,
"_compute_log_prob_reward",
lambda _self, frames, _instr: float(frames.shape[0]),
)
frames_short = np.zeros((2, 8, 8, 3), dtype=np.uint8)
frames_long = np.zeros((4, 8, 8, 3), dtype=np.uint8)
out = model.predict_curves(_make_batch([frames_short, frames_long], ["a", "b"]))
assert out["progress"].shape == (2, 4)
# Trailing entries for the shorter trajectory are NaN.
assert torch.isnan(out["progress"][0, 2:]).all()
# The longer trajectory has no NaNs.
assert not torch.isnan(out["progress"][1]).any()
# ---------------------------------------------------------------------------
# Save / load — config-only checkpoint
# ---------------------------------------------------------------------------
@@ -351,10 +190,6 @@ def test_topreward_predict_curves_right_pads_with_nan_for_variable_lengths(monke
@skip_if_package_missing("transformers")
def test_topreward_save_pretrained_writes_only_config_json(monkeypatch, tmp_path):
"""A TOPReward "checkpoint" is just ``config.json``. Writing
``model.safetensors`` would only duplicate ~16 GB of Qwen weights for
no benefit, so :meth:`_save_pretrained` must skip it entirely.
"""
from huggingface_hub.constants import CONFIG_NAME, SAFETENSORS_SINGLE_FILE
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
@@ -371,13 +206,11 @@ def test_topreward_save_pretrained_writes_only_config_json(monkeypatch, tmp_path
model.save_pretrained(str(tmp_path))
assert (tmp_path / CONFIG_NAME).exists()
# Zero-shot model: no safetensors written by `_save_pretrained`.
assert not (tmp_path / SAFETENSORS_SINGLE_FILE).exists()
@skip_if_package_missing("transformers")
def test_topreward_from_pretrained_local_dir_roundtrips_config(monkeypatch, tmp_path):
"""Save a TOPRewardConfig locally and reload it — user knobs must survive."""
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
_patch_build(monkeypatch)
@@ -387,7 +220,6 @@ def test_topreward_from_pretrained_local_dir_roundtrips_config(monkeypatch, tmp_
reduction="sum",
fps=4.0,
image_key="observation.images.front",
use_video_description=True,
add_chat_template=True,
success_threshold=-1.5,
)
@@ -400,16 +232,12 @@ def test_topreward_from_pretrained_local_dir_roundtrips_config(monkeypatch, tmp_
assert reloaded.config.reduction == "sum"
assert reloaded.config.fps == 4.0
assert reloaded.config.image_key == "observation.images.front"
assert reloaded.config.use_video_description is True
assert reloaded.config.add_chat_template is True
assert reloaded.config.success_threshold == -1.5
@skip_if_package_missing("transformers")
def test_topreward_is_not_trainable(monkeypatch):
"""The whole point of TOPReward is that it is zero-shot.
``is_trainable`` must therefore be ``False`` and ``forward(...)`` must
raise the base-class ``NotImplementedError``."""
from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel
_patch_build(monkeypatch)
+105 -94
View File
@@ -23,11 +23,11 @@ import torch
from lerobot.configs import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.rewards.topreward.processor_topreward import (
TOPREWARD_FEATURE_PREFIX,
TOPRewardEncoderProcessorStep,
_expand_tasks,
_video_to_numpy,
)
from lerobot.types import TransitionKey
from tests.utils import skip_if_package_missing
# ---------------------------------------------------------------------------
# _video_to_numpy — pure (T, C, H, W) -> (T, H, W, C) uint8 conversion
@@ -35,7 +35,7 @@ from lerobot.types import TransitionKey
def test_video_to_numpy_chw_float_is_converted_to_thwc_uint8():
video = torch.rand(4, 3, 8, 8) # (T, C, H, W) floats in [0, 1]
video = torch.rand(4, 3, 8, 8)
array = _video_to_numpy(video, max_frames=None)
assert array.shape == (4, 8, 8, 3)
@@ -52,7 +52,6 @@ def test_video_to_numpy_already_thwc_uint8_passes_through():
def test_video_to_numpy_max_frames_tail_crops_recent_frames():
"""``max_frames`` should keep the **last** K frames (most recent)."""
video = torch.zeros(10, 3, 4, 4)
for t in range(10):
video[t] = t / 9.0
@@ -70,8 +69,6 @@ def test_video_to_numpy_rejects_3d_input():
def test_video_to_numpy_floats_above_one_pass_through_without_rescaling():
"""If ``array.max() > 1`` the helper assumes the tensor is already in the
uint8 range; values pass through unchanged (but are still clipped to 255)."""
video = torch.full((1, 3, 2, 2), 5.0)
array = _video_to_numpy(video, max_frames=None)
@@ -127,127 +124,141 @@ def test_expand_tasks_wrong_type_raises():
# ---------------------------------------------------------------------------
# Encoder step — input/output shapes + dataclass surface
# Encoder step — stubbed AutoProcessor + process_vision_info
# ---------------------------------------------------------------------------
def _skip_if_topreward_extras_missing(func):
func = skip_if_package_missing("qwen-vl-utils", import_name="qwen_vl_utils")(func)
func = skip_if_package_missing("transformers")(func)
return func
class _FakeTokenizer:
eos_token = "<|endoftext|>"
pad_token = "<|endoftext|>"
def __call__(self, *args, **kwargs):
return {"input_ids": torch.zeros(1, 10, dtype=torch.long)}
class _FakeAutoProcessor:
def __init__(self) -> None:
self.tokenizer = _FakeTokenizer()
@classmethod
def from_pretrained(cls, *args, **kwargs): # noqa: ARG003
return cls()
def apply_chat_template(self, messages, **kwargs): # noqa: ARG002
return "fake_prompt_text"
def __call__(self, text=None, images=None, videos=None, **kwargs): # noqa: ARG002
seq_len = 10
return {
"input_ids": torch.randint(0, 100, (1, seq_len)),
"attention_mask": torch.ones(1, seq_len, dtype=torch.long),
}
def _build_step(monkeypatch, **overrides):
import importlib
import sys
import types
from lerobot.rewards.topreward import processor_topreward
from lerobot.utils import import_utils
monkeypatch.setattr(processor_topreward, "AutoProcessor", _FakeAutoProcessor)
# Stub qwen_vl_utils as a real module object (not MagicMock) so
# ``require_package`` / ``find_spec`` don't choke on a missing ``__spec__``.
fake_qwen_vl = types.ModuleType("qwen_vl_utils")
fake_qwen_vl.process_vision_info = lambda messages: (None, None) # type: ignore[attr-defined]
fake_qwen_vl.__spec__ = importlib.machinery.ModuleSpec("qwen_vl_utils", None)
monkeypatch.setitem(sys.modules, "qwen_vl_utils", fake_qwen_vl)
# Clear the require_package cache so the stub is picked up.
import_utils._require_package_cache.pop("qwen_vl_utils", None)
return processor_topreward.TOPRewardEncoderProcessorStep(**overrides)
def _make_transition(observation: dict, complementary: dict | None = None) -> dict:
"""Build a tiny ``EnvTransition`` dict for the encoder step."""
transition: dict = {TransitionKey.OBSERVATION: observation}
if complementary is not None:
transition[TransitionKey.COMPLEMENTARY_DATA] = complementary
return transition
def test_encoder_step_writes_namespaced_frames_and_task():
"""The encoder step's output is the contract the model reads from. It
must populate exactly two namespaced keys: ``frames`` and ``task``."""
step = TOPRewardEncoderProcessorStep(
image_key="observation.images.top",
task_key="task",
max_frames=None,
)
@_skip_if_topreward_extras_missing
def test_encoder_step_emits_input_ids_and_prompt_length(monkeypatch):
"""The processor must emit Qwen-VL tensors including ``input_ids`` and
``prompt_length`` under the ``observation.topreward.*`` namespace."""
step = _build_step(monkeypatch)
frames_batch = torch.zeros(2, 4, 3, 8, 8) # (B=2, T=4, C, H, W)
frames_batch = torch.zeros(1, 4, 3, 8, 8)
out = step(
_make_transition(
observation={"observation.images.top": frames_batch},
complementary={"task": ["pick", "place"]},
complementary={"task": "pick"},
)
)
obs_out = out[TransitionKey.OBSERVATION]
frames_out = obs_out[f"{TOPREWARD_FEATURE_PREFIX}frames"]
tasks_out = obs_out[f"{TOPREWARD_FEATURE_PREFIX}task"]
assert f"{TOPREWARD_FEATURE_PREFIX}input_ids" in obs_out
assert f"{TOPREWARD_FEATURE_PREFIX}attention_mask" in obs_out
assert f"{TOPREWARD_FEATURE_PREFIX}prompt_length" in obs_out
assert len(frames_out) == 2
assert all(arr.shape == (4, 8, 8, 3) and arr.dtype == np.uint8 for arr in frames_out)
assert tasks_out == ["pick", "place"]
prompt_length = obs_out[f"{TOPREWARD_FEATURE_PREFIX}prompt_length"]
assert prompt_length.dtype == torch.long
assert prompt_length.shape == (1,)
def test_encoder_step_adds_singleton_time_dim_for_4d_input():
"""A ``(B, C, H, W)`` observation is the single-frame case; the encoder
must unsqueeze the time dim so the model still sees a video."""
step = TOPRewardEncoderProcessorStep(image_key="observation.images.top", max_frames=None)
frames_batch = torch.zeros(1, 3, 8, 8) # (B=1, C, H, W) — no time dim
out = step(
_make_transition(
observation={"observation.images.top": frames_batch},
complementary={"task": "pick"},
)
)
frames_out = out[TransitionKey.OBSERVATION][f"{TOPREWARD_FEATURE_PREFIX}frames"]
assert len(frames_out) == 1
assert frames_out[0].shape == (1, 8, 8, 3) # (T=1, H, W, C)
def test_encoder_step_uses_default_task_when_complementary_is_missing():
step = TOPRewardEncoderProcessorStep(
image_key="observation.images.top",
default_task="perform the task",
)
frames_batch = torch.zeros(1, 2, 3, 4, 4)
out = step(_make_transition(observation={"observation.images.top": frames_batch}))
tasks_out = out[TransitionKey.OBSERVATION][f"{TOPREWARD_FEATURE_PREFIX}task"]
assert tasks_out == ["perform the task"]
def test_encoder_step_rejects_missing_image_key():
step = TOPRewardEncoderProcessorStep(image_key="observation.images.top")
with pytest.raises(KeyError, match="image key"):
step(_make_transition(observation={}, complementary={"task": "pick"}))
def test_encoder_step_rejects_non_dict_observation():
step = TOPRewardEncoderProcessorStep()
with pytest.raises(ValueError, match="observation dict"):
step({TransitionKey.OBSERVATION: torch.zeros(1, 3, 8, 8)})
def test_encoder_step_rejects_3d_or_6d_input():
"""The encoder accepts ``(B,C,H,W)`` or ``(B,T,C,H,W)`` only."""
step = TOPRewardEncoderProcessorStep(image_key="observation.images.top")
with pytest.raises(ValueError, match=r"\(B,C,H,W\)"):
step(
_make_transition(
observation={"observation.images.top": torch.zeros(8, 8, 3)},
complementary={"task": "pick"},
)
)
def test_encoder_step_get_config_roundtrips_user_fields():
"""``get_config`` must serialise every user-tunable field — these are
what the processor pipeline saves under ``preprocessor_config.json``."""
step = TOPRewardEncoderProcessorStep(
@_skip_if_topreward_extras_missing
def test_encoder_step_get_config_roundtrips_user_fields(monkeypatch):
step = _build_step(
monkeypatch,
vlm_name="Qwen/Qwen3-VL-8B-Instruct",
image_key="observation.images.cam_top",
task_key="task",
default_task="do the thing",
max_frames=8,
fps=4.0,
add_chat_template=True,
max_length=2048,
)
assert step.get_config() == {
"image_key": "observation.images.cam_top",
"task_key": "task",
"default_task": "do the thing",
"max_frames": 8,
}
cfg = step.get_config()
assert cfg["vlm_name"] == "Qwen/Qwen3-VL-8B-Instruct"
assert cfg["image_key"] == "observation.images.cam_top"
assert cfg["default_task"] == "do the thing"
assert cfg["max_frames"] == 8
assert cfg["fps"] == 4.0
assert cfg["add_chat_template"] is True
assert cfg["max_length"] == 2048
def test_encoder_step_transform_features_is_identity():
"""The encoder writes plain Python objects (numpy arrays / strings)
into ``observation`` at call time but does NOT advertise new typed
features at pipeline-build time the model reads them via the
``TOPREWARD_FEATURE_PREFIX`` namespace, not via the typed feature map.
"""
step = TOPRewardEncoderProcessorStep()
@_skip_if_topreward_extras_missing
def test_encoder_step_transform_features_is_identity(monkeypatch):
step = _build_step(monkeypatch)
features = {
PipelineFeatureType.OBSERVATION: {
"observation.images.top": PolicyFeature(shape=(3, 224, 224), type=FeatureType.VISUAL),
}
}
assert step.transform_features(features) == features
@_skip_if_topreward_extras_missing
def test_encoder_step_rejects_missing_image_key(monkeypatch):
step = _build_step(monkeypatch, image_key="observation.images.top")
with pytest.raises(KeyError, match="image key"):
step(_make_transition(observation={}, complementary={"task": "pick"}))
@_skip_if_topreward_extras_missing
def test_encoder_step_rejects_non_dict_observation(monkeypatch):
step = _build_step(monkeypatch)
with pytest.raises(ValueError, match="observation dict"):
step({TransitionKey.OBSERVATION: torch.zeros(1, 3, 8, 8)})