diff --git a/docs/source/topreward.mdx b/docs/source/topreward.mdx index dc653f096..cb497428a 100644 --- a/docs/source/topreward.mdx +++ b/docs/source/topreward.mdx @@ -23,9 +23,20 @@ it builds a chat prompt of the form or not. The answer is: True" ``` -forwards it through the VLM, label-masks everything except the very last token, and reads back the log-probability of that token — by default the literal `"True"` that closes the suffix template. The resulting `log P("True" | video + prompt + instruction)` is the reward, and answers the question "given this video, how strongly does the VLM agree that the instruction is satisfied?". +forwards it through the VLM, label-masks everything except the very last token, and reads back the log-probability of that token — by default the literal `"True"` that closes the suffix template. The resulting `log P("True" | video + prompt + instruction)` is the reward. -Because the method only depends on a frozen VLM, TOPReward is **zero-shot**: there are no fine-tuned weights to host. The "model" in LeRobot is a small wrapper around `transformers`' `Qwen3VLForConditionalGeneration` plus the prompt assembly + label-masking logic. +Because the method only depends on a frozen VLM, TOPReward is **zero-shot**: there are no fine-tuned weights to host. The "model" in LeRobot is a small wrapper around `transformers`' `Qwen3VLForConditionalGeneration` plus the label-masking logic. The processor owns the tokeniser and builds the full chat prompt (EO-1/Robometer pattern). + +## What the LeRobot integration covers + +- Standard `reward_model.type=topreward` configuration through LeRobot. +- VLM loading via the `transformers` `Qwen3VLForConditionalGeneration` API. +- Prompt assembly + tokenisation in the processor (matching upstream `QwenClient.compute_instruction_reward`). +- `compute_reward()` returns one scalar log-prob per sample. +- LeRobot reward-model save/load — `save_pretrained` writes only `config.json` (the VLM is identified by `vlm_name`). +- An offline labeling script that writes a `topreward_progress.parquet` (SARM-compatible schema) for RA-BC and overlay. + +The current LeRobot port supports the **Qwen3-VL client only**. Other upstream clients (Gemini, OpenAI, Gemma, Molmo) can be added as follow-up extras. ## Installation Requirements @@ -53,18 +64,17 @@ TOPReward expects: In LeRobot datasets the preprocessor reads: -| Config field | Default | Meaning | -| ------------------------- | --------------------------- | ----------------------------------------------------------------------- | -| `reward_model.image_key` | `observation.images.top` | Camera observation used by TOPReward | -| `reward_model.task_key` | `task` | Key in complementary data that stores the task string | -| `reward_model.max_frames` | `16` | Cap on frames per sample (compute_reward only; predict_curves bypasses) | -| `reward_model.fps` | `2.0` | Metadata passed to the Qwen video processor | -| `reward_model.vlm_name` | `Qwen/Qwen3-VL-8B-Instruct` | Hugging Face Hub id of the underlying VLM | +| Config field | Default | Meaning | +| ------------------------- | --------------------------- | --------------------------------------------- | +| `reward_model.image_key` | `observation.images.top` | Camera observation used by TOPReward | +| `reward_model.task_key` | `task` | Key in complementary data for the task string | +| `reward_model.max_frames` | `16` | Cap on frames per sample | +| `reward_model.fps` | `2.0` | Metadata passed to the Qwen video processor | +| `reward_model.vlm_name` | `Qwen/Qwen3-VL-8B-Instruct` | Hugging Face Hub id of the underlying VLM | The model returns: -- `compute_reward(batch)`: one log-probability per sample. Higher = better task–video alignment. When `success_threshold` is finite, returns the binary thresholded value instead. -- `predict_curves(batch, num_prefixes=None)`: per-frame progress curve in `[0, 1]` (min-max normalised log-probs over prefix lengths). `num_prefixes=None` is fully dense; `num_prefixes=15` matches the upstream sparse-dense default with linear interpolation between anchors. +- `compute_reward(batch)`: one log-probability per sample. Higher = better task-video alignment. When `success_threshold` is finite, returns the binary thresholded value instead. ## Usage @@ -80,30 +90,6 @@ cfg = TOPRewardConfig( reward_model = TOPRewardModel(cfg) ``` -There is no `from_pretrained` weight download for TOPReward itself — the VLM is fetched from the Hub on construction. - -### Score a clip + task - -```python -import numpy as np -from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX - -# frames: np.ndarray, shape (T, H, W, C), dtype uint8 -# task: str -batch = { - f"{TOPREWARD_FEATURE_PREFIX}frames": [frames], - f"{TOPREWARD_FEATURE_PREFIX}task": [task], -} -reward = reward_model.compute_reward(batch) # tensor of shape (1,) -``` - -For a dense per-frame curve over the same clip: - -```python -out = reward_model.predict_curves(batch, num_prefixes=15) -progress = out["progress"][0].numpy() # shape (T,), values in [0, 1] -``` - ### Use the reward factory ```python @@ -119,26 +105,21 @@ reward_model = make_reward_model(cfg) preprocessor, postprocessor = make_reward_pre_post_processors(cfg) ``` -The preprocessor writes normalised frames + task strings under the `observation.topreward.*` namespace; the model reads them in `compute_reward`. +The preprocessor tokenises the full prompt (video + prefix + instruction suffix), writes Qwen-VL tensors + `prompt_length` under `observation.topreward.*`. The model reads those tensors, label-masks based on `prompt_length`, and extracts the log-prob reward. ### Offline dataset labeling -Mirror the SARM / Robometer RA-BC flow — write a `topreward_progress.parquet` once, then reuse it for training (RA-BC) and visualisation (overlay videos): +Write a `topreward_progress.parquet` for RA-BC training and overlay videos: ```bash -# Fully dense per-frame labeling -uv run python -m lerobot.rewards.topreward.compute_rabc_weights \ - --dataset-repo-id lerobot/libero_10_image \ - --device cuda - # Sparse-dense (15 anchors per episode, matches upstream) uv run python -m lerobot.rewards.topreward.compute_rabc_weights \ --dataset-repo-id lerobot/libero_10_image \ - --num-prefixes 15 \ + --num-samples 15 \ --device cuda ``` -Then render the SARM-style progress overlay for any episode: +Then render the progress overlay for any episode: ```bash uv run examples/dataset/create_progress_videos.py \ @@ -148,27 +129,28 @@ uv run examples/dataset/create_progress_videos.py \ --gif ``` -## Publishing a named TOPReward configuration +## Configuration Notes -Because TOPReward stores no weights of its own, "publishing a TOPReward model" amounts to writing the LeRobot `config.json` (≈ 1 KB) that pins the VLM id, prompt and reduction: +### Prompt knobs -```python -from lerobot.rewards.topreward import TOPRewardConfig, TOPRewardModel +The default prompt mirrors the upstream paper: -cfg = TOPRewardConfig( - vlm_name="Qwen/Qwen3-VL-8B-Instruct", - reduction="mean", - fps=2.0, -) -TOPRewardModel(cfg).save_pretrained("./topreward-qwen3vl-8b") -# Push the directory to the Hub via `huggingface-cli` or `HfApi.upload_folder`. +```text +prompt_prefix = "The above video shows a robot manipulation trajectory that completes the following task: " +prompt_suffix_template = "{instruction} Decide whether the above statement is True or not. The answer is: True" ``` -Reloading restores the same configuration (no weight download for TOPReward itself; the VLM is re-fetched via `vlm_name`): +Both are exposed on `TOPRewardConfig` for ablation. The suffix template **must** contain `{instruction}`. -```python -reloaded = TOPRewardModel.from_pretrained("./topreward-qwen3vl-8b") -``` +### Chat template + +`add_chat_template=True` wraps the full prompt (including instruction) with the tokenizer's chat template before tokenisation. Default is `False`, matching the upstream paper's main experiments. + +## Limitations + +- The current LeRobot port is **inference-only and zero-shot**; `forward()` is not overridden and `is_trainable` returns `False`. +- Only the **Qwen3-VL family** is supported; other upstream clients are out of scope. +- TOPReward inherits the underlying VLM's biases. ## References @@ -189,3 +171,7 @@ reloaded = TOPRewardModel.from_pretrained("./topreward-qwen3vl-8b") year={2026} } ``` + +## License + +The original TOPReward codebase is MIT-licensed. The LeRobot port follows the LeRobot Apache 2.0 license; the wrapped Qwen3-VL weights are subject to the original Qwen license. diff --git a/src/lerobot/rewards/topreward/compute_rabc_weights.py b/src/lerobot/rewards/topreward/compute_rabc_weights.py index 1c67caa21..7e2b07d7c 100644 --- a/src/lerobot/rewards/topreward/compute_rabc_weights.py +++ b/src/lerobot/rewards/topreward/compute_rabc_weights.py @@ -16,51 +16,22 @@ """Compute per-frame TOPReward progress curves for a LeRobot dataset. -This mirrors :mod:`lerobot.rewards.sarm.compute_rabc_weights` (and the -ROBOMETER equivalent): it walks every episode in a dataset, runs the -TOPReward zero-shot reward model, and writes a parquet file with one row -per frame. The output uses the same schema SARM produces so existing -consumers — :class:`lerobot.rewards.sarm.rabc.RABCWeights` (which reads -``progress_sparse``) and the SARM-style overlay script in -``examples/dataset/create_progress_videos.py`` — work without modification. +For each episode, scores trajectory prefixes of increasing length using +the TOPReward reward model, min-max normalises the raw log-prob rewards per episode, +and writes a parquet file with one row per frame. -TOPReward is zero-shot: there is no fine-tuned checkpoint to load. The -``--reward-model-path`` argument is therefore optional and only used when -you want to load a TOPReward LeRobot config (e.g. one published on the Hub -that pins ``vlm_name`` and prompt knobs). Otherwise the default -:class:`TOPRewardConfig` is used, which points at -``Qwen/Qwen3-VL-8B-Instruct`` — the VLM is re-downloaded from the HF Hub -on every run unless cached. - -Parquet schema: - +--------------------+---------+----------------------------------------+ - | column | dtype | meaning | - +====================+=========+========================================+ - | ``index`` | int64 | global frame index | - | ``episode_index`` | int64 | episode id | - | ``frame_index`` | int64 | local within-episode index | - | ``progress_sparse``| float32 | per-frame TOPReward progress in [0, 1] | - | | | (RA-BC + overlay read this column) | - +--------------------+---------+----------------------------------------+ +The parquet uses the same schema as SARM's :mod:`lerobot.rewards.sarm.compute_rabc_weights`. Usage: - # Full computation (one VLM forward per frame, slowest but most accurate) - python -m lerobot.rewards.topreward.compute_rabc_weights \\ - --dataset-repo-id lerobot/libero_10_image - - # Sparse-dense mode: 15 anchor prefixes per episode, interpolated to - # per-frame resolution. Matches upstream TOPReward ``num_samples=15``. + # Sparse-dense mode (15 anchors per episode, matches upstream) python -m lerobot.rewards.topreward.compute_rabc_weights \\ --dataset-repo-id lerobot/libero_10_image \\ - --num-prefixes 15 + --num-samples 15 # Use a different VLM backbone python -m lerobot.rewards.topreward.compute_rabc_weights \\ --dataset-repo-id lerobot/libero_10_image \\ --vlm-name Qwen/Qwen3-VL-4B-Instruct - -The output is written to the dataset's local cache directory as -``topreward_progress.parquet`` (or to ``--output-path`` if provided). """ from __future__ import annotations @@ -79,7 +50,8 @@ from tqdm import tqdm from lerobot.datasets import LeRobotDataset from lerobot.rewards.topreward.configuration_topreward import TOPRewardConfig from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel -from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX +from lerobot.rewards.topreward.processor_topreward import TOPRewardEncoderProcessorStep +from lerobot.types import TransitionKey DEFAULT_OUTPUT_FILENAME = "topreward_progress.parquet" @@ -105,22 +77,65 @@ def _resolve_task(sample: dict[str, Any], default: str) -> str: return default -def _frames_to_uint8_hwc(video: torch.Tensor) -> np.ndarray: - """Convert a ``(T, C, H, W)`` or ``(T, H, W, C)`` tensor to ``(T, H, W, C) uint8``. +def normalize_rewards(rewards: list[float] | np.ndarray) -> np.ndarray: + """Min-max normalise raw log-prob rewards into ``[0, 1]``.""" + rewards_arr = np.asarray(rewards, dtype=np.float64) + if rewards_arr.size == 0: + return rewards_arr.astype(np.float32) + if rewards_arr.size == 1: + return np.array([1.0], dtype=np.float32) + r_min, r_max = rewards_arr.min(), rewards_arr.max() + if r_max == r_min: + return np.ones_like(rewards_arr, dtype=np.float32) + return ((rewards_arr - r_min) / (r_max - r_min)).astype(np.float32) - Inlined here (rather than reusing the processor) so the labeling script - can side-step the ``max_frames`` tail-crop and feed full trajectories - to :meth:`TOPRewardModel.predict_curves`. - """ - if video.shape[1] in (1, 3): - video = video.permute(0, 2, 3, 1) - elif video.shape[-1] not in (1, 3): - raise ValueError(f"Expected channel dim of size 1 or 3, got shape {tuple(video.shape)}") - array = video.detach().cpu().numpy() - if np.issubdtype(array.dtype, np.floating) and array.size > 0 and array.max() <= 1.0: - array = array * 255.0 - return np.clip(array, 0, 255).astype(np.uint8) +def compute_instruction_rewards_for_prefixes( + model: TOPRewardModel, + encoder: TOPRewardEncoderProcessorStep, + dataset: LeRobotDataset, + ep_start: int, + num_frames: int, + task: str, + image_key: str, + num_samples: int | None, + device: str, +) -> np.ndarray: + """Score an episode via prefix sweep and return a per-frame normalised curve.""" + if num_samples is None or num_samples >= num_frames: + prefix_lengths = np.arange(1, num_frames + 1, dtype=np.int64) + else: + prefix_lengths = np.unique(np.linspace(1, num_frames, num_samples).round().astype(np.int64)) + + rewards: list[float] = [] + for length in prefix_lengths: + frames = torch.stack([dataset[ep_start + i][image_key] for i in range(int(length))]) + frames = frames.unsqueeze(0) # (1, T, C, H, W) + + transition = { + TransitionKey.OBSERVATION: {image_key: frames}, + TransitionKey.COMPLEMENTARY_DATA: {"task": task}, + } + encoded = encoder(transition) + obs = encoded[TransitionKey.OBSERVATION] + batch = { + key: value.to(device) if isinstance(value, torch.Tensor) else value for key, value in obs.items() + } + + with torch.no_grad(): + reward = model.compute_reward(batch) + rewards.append(float(reward.item())) + + normalized_rewards = normalize_rewards(rewards) + + if prefix_lengths.shape[0] == num_frames: + return normalized_rewards + + return np.interp( + np.arange(1, num_frames + 1, dtype=np.float64), + prefix_lengths.astype(np.float64), + normalized_rewards.astype(np.float64), + ).astype(np.float32) def compute_topreward_progress( @@ -129,41 +144,18 @@ def compute_topreward_progress( vlm_name: str | None = None, output_path: str | None = None, device: str = "cuda", - num_prefixes: int | None = None, + num_samples: int | None = None, fps: float | None = None, reduction: str | None = None, - use_video_description: bool = False, + episodes: list[int] | None = None, ) -> Path: - """Run TOPReward over a dataset and write per-frame progress. - - Args: - dataset_repo_id: Hugging Face dataset repo id or local path. - reward_model_path: Optional TOPReward LeRobot config repo / dir to - load (a tiny ``config.json``). When ``None`` (default), a - fresh :class:`TOPRewardConfig` is constructed from the CLI - overrides. - vlm_name: Override the VLM backbone (HF Hub id). - output_path: Where to write the parquet. Defaults to - ``/topreward_progress.parquet``. - device: Device for the VLM. - num_prefixes: Number of evenly-spaced anchor prefixes per episode. - ``None`` (default) = fully dense (one VLM forward per frame). - Set to ``15`` to match upstream TOPReward ``num_samples=15``. - fps: Override the config's ``fps``. - reduction: Override the config's ``reduction`` (``"mean"`` / ``"sum"``). - use_video_description: Override the config's ``use_video_description``. - - Returns: - Path to the written parquet file. - """ + """Run TOPReward over a dataset and write per-frame progress.""" if reward_model_path is not None: logging.info(f"Loading TOPReward config from: {reward_model_path}") model = TOPRewardModel.from_pretrained(reward_model_path) config = model.config - # Apply CLI overrides on top of the loaded config. if vlm_name is not None and vlm_name != config.vlm_name: logging.info(f"Overriding vlm_name from config: {config.vlm_name} -> {vlm_name}") - # vlm_name affects the loaded weights; reload from scratch. config.vlm_name = vlm_name config.device = device model = TOPRewardModel(config) @@ -175,28 +167,40 @@ def compute_topreward_progress( config_kwargs["fps"] = fps if reduction is not None: config_kwargs["reduction"] = reduction - if use_video_description: - config_kwargs["use_video_description"] = True config = TOPRewardConfig(**config_kwargs) logging.info(f"Constructing TOPReward with VLM: {config.vlm_name}") model = TOPRewardModel(config) model.to(device).eval() + encoder = TOPRewardEncoderProcessorStep( + vlm_name=config.vlm_name, + image_key=config.image_key, + task_key=config.task_key, + default_task=config.default_task, + max_frames=None, # no tail-crop: we control prefix length explicitly + fps=config.fps, + prompt_prefix=config.prompt_prefix, + prompt_suffix_template=config.prompt_suffix_template, + add_chat_template=config.add_chat_template, + max_length=config.max_input_length, + ) + image_key = config.image_key - frames_key = f"{TOPREWARD_FEATURE_PREFIX}frames" - task_batch_key = f"{TOPREWARD_FEATURE_PREFIX}task" logging.info(f"Loading dataset: {dataset_repo_id}") dataset = LeRobotDataset(dataset_repo_id, download_videos=True) logging.info(f"Dataset: {dataset.num_episodes} episodes, {dataset.num_frames} frames") + episode_indices = list(range(dataset.num_episodes)) if episodes is None else episodes + logging.info(f"Processing {len(episode_indices)} episode(s)") + all_index: list[int] = [] all_episode: list[int] = [] all_frame: list[int] = [] all_progress: list[float] = [] - for episode_idx in tqdm(range(dataset.num_episodes), desc="Episodes"): + for episode_idx in tqdm(episode_indices, desc="Episodes"): ep = dataset.meta.episodes[episode_idx] ep_start = int(ep["dataset_from_index"]) ep_end = int(ep["dataset_to_index"]) @@ -207,16 +211,17 @@ def compute_topreward_progress( first_sample = dataset[ep_start] task = _resolve_task(first_sample, default=config.default_task or "perform the task") - # Read the whole episode into one (N, C, H, W) tensor and convert - # to (N, H, W, C) uint8 — same format ``TOPREWARD_FEATURE_PREFIX.frames`` - # expects. We deliberately bypass the encoder step here so its - # ``max_frames`` tail-crop doesn't clip the prefix sweep. - ep_video = torch.stack([dataset[ep_start + i][image_key] for i in range(num_frames)]) - ep_frames_uint8 = _frames_to_uint8_hwc(ep_video) - - batch = {frames_key: [ep_frames_uint8], task_batch_key: [task]} - out = model.predict_curves(batch, num_prefixes=num_prefixes) - per_frame = out["progress"][0, :num_frames].cpu().numpy() + per_frame = compute_instruction_rewards_for_prefixes( + model=model, + encoder=encoder, + dataset=dataset, + ep_start=ep_start, + num_frames=num_frames, + task=task, + image_key=image_key, + num_samples=num_samples, + device=device, + ) for local in range(num_frames): all_index.append(ep_start + local) @@ -232,13 +237,10 @@ def compute_topreward_progress( "index": np.asarray(all_index, dtype=np.int64), "episode_index": np.asarray(all_episode, dtype=np.int64), "frame_index": np.asarray(all_frame, dtype=np.int64), - # Same column name SARM uses so RABCWeights + the overlay - # script read TOPReward's output without per-model branching. "progress_sparse": np.asarray(all_progress, dtype=np.float32), } ) - # Persist provenance metadata: the LeRobot path (if any) and the VLM id. schema_metadata: dict[bytes, bytes] = {b"vlm_name": config.vlm_name.encode()} if reward_model_path is not None: schema_metadata[b"reward_model_path"] = reward_model_path.encode() @@ -266,14 +268,10 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Full RA-BC computation with the default Qwen3-VL-8B-Instruct backbone - python -m lerobot.rewards.topreward.compute_rabc_weights \\ - --dataset-repo-id lerobot/libero_10_image - # Sparse-dense mode (matches upstream TOPReward num_samples=15) python -m lerobot.rewards.topreward.compute_rabc_weights \\ --dataset-repo-id lerobot/libero_10_image \\ - --num-prefixes 15 + --num-samples 15 # Use a smaller VLM python -m lerobot.rewards.topreward.compute_rabc_weights \\ @@ -282,66 +280,33 @@ Examples: """, ) parser.add_argument( - "--dataset-repo-id", - type=str, - required=True, - help="HuggingFace dataset repo id or local path.", + "--dataset-repo-id", type=str, required=True, help="HuggingFace dataset repo id or local path." ) parser.add_argument( - "--reward-model-path", - type=str, - default=None, - help="Optional TOPReward LeRobot config (repo id or local dir). " - "Falls back to a fresh TOPRewardConfig if unset.", + "--reward-model-path", type=str, default=None, help="Optional TOPReward LeRobot config." ) + parser.add_argument("--vlm-name", type=str, default=None, help="Override the VLM backbone (HF Hub id).") + parser.add_argument("--output-path", type=str, default=None, help="Output parquet path.") + parser.add_argument("--device", type=str, default="cuda", help="Device to use (default: cuda).") parser.add_argument( - "--vlm-name", - type=str, - default=None, - help="Override the VLM backbone (HF Hub id).", - ) - parser.add_argument( - "--output-path", - type=str, - default=None, - help="Output parquet path. Defaults to /topreward_progress.parquet.", - ) - parser.add_argument( - "--device", - type=str, - default="cuda", - help="Device to use (default: cuda).", - ) - parser.add_argument( - "--num-prefixes", + "--num-samples", type=int, default=None, - help="Evenly-spaced anchor prefixes per episode. None = fully dense " - "(one VLM forward per frame). 15 matches upstream TOPReward.", + help="Anchor prefix samples per episode. None = dense. 15 matches upstream.", ) parser.add_argument( - "--fps", - type=float, + "--episodes", + type=int, + nargs="+", default=None, - help="Override TOPRewardConfig.fps (frames per second for the Qwen video processor).", + help="Process only these episode indices (e.g. --episodes 0 or --episodes 0 5 10).", + ) + parser.add_argument("--fps", type=float, default=None, help="Override TOPRewardConfig.fps.") + parser.add_argument( + "--reduction", type=str, default=None, choices=["mean", "sum"], help="Override reduction." ) parser.add_argument( - "--reduction", - type=str, - default=None, - choices=["mean", "sum"], - help="Override TOPRewardConfig.reduction.", - ) - parser.add_argument( - "--use-video-description", - action="store_true", - help="Generate an instruction-agnostic video description and prepend " - "it as context before scoring (doubles VLM calls per prefix).", - ) - parser.add_argument( - "--push-to-hub", - action="store_true", - help="Upload the progress file to the dataset repo on HuggingFace Hub.", + "--push-to-hub", action="store_true", help="Upload to the dataset repo on HuggingFace Hub." ) args = parser.parse_args() @@ -354,10 +319,10 @@ Examples: vlm_name=args.vlm_name, output_path=args.output_path, device=args.device, - num_prefixes=args.num_prefixes, + num_samples=args.num_samples, fps=args.fps, reduction=args.reduction, - use_video_description=args.use_video_description, + episodes=args.episodes, ) print(f"\nTOPReward progress saved to: {output_path}") diff --git a/src/lerobot/rewards/topreward/configuration_topreward.py b/src/lerobot/rewards/topreward/configuration_topreward.py index bb4d1cf35..cf5ea38a6 100644 --- a/src/lerobot/rewards/topreward/configuration_topreward.py +++ b/src/lerobot/rewards/topreward/configuration_topreward.py @@ -67,11 +67,6 @@ class TOPRewardConfig(RewardModelConfig): add_chat_template: If ``True``, wrap the full prompt with the tokenizer's chat template before tokenisation (matches upstream ``add_chat_template=True``). - use_video_description: If ``True``, make an extra VLM call to - produce an instruction-agnostic video description and prepend - it as additional context. Doubles inference cost but avoids - circular grounding when the instruction names objects shown - in frames. reduction: Reduction over per-token log-probs of the suffix tokens (``"mean"`` or ``"sum"``). success_threshold: Optional log-prob threshold. If finite, @@ -100,7 +95,6 @@ class TOPRewardConfig(RewardModelConfig): prompt_prefix: str = DEFAULT_PROMPT_PREFIX prompt_suffix_template: str = DEFAULT_PROMPT_SUFFIX_TEMPLATE add_chat_template: bool = False - use_video_description: bool = False reduction: str = "mean" success_threshold: float = float("-inf") diff --git a/src/lerobot/rewards/topreward/modeling_topreward.py b/src/lerobot/rewards/topreward/modeling_topreward.py index c4e707426..3c2c0efa8 100644 --- a/src/lerobot/rewards/topreward/modeling_topreward.py +++ b/src/lerobot/rewards/topreward/modeling_topreward.py @@ -28,17 +28,16 @@ and returns that log-likelihood as the reward signal. Inference recipe: -1. Build a chat-style prompt: - ``[video(frames, fps), text=prompt_prefix, text="{instruction} ... True"]`` -2. Forward the full token sequence through the VLM. -3. Mask all but the final token with ``-100`` (``prompt_length = input_len - 1``, - mirrored from upstream). After the standard causal-LM next-token shift, this - isolates the single position where the model predicts the literal ``"True"`` - that ends the prompt — the binary "is the instruction true given the video?" - answer. -4. Read that token's log-probability from the logits and reduce it (mean or sum - — equivalent for a single token, kept for API parity with upstream) into a - scalar reward. +1. The processor builds a chat-style prompt, tokenises it, and emits + ``input_ids``, ``attention_mask``, vision tensors, and ``prompt_length``. +2. The model label-masks everything before ``prompt_length`` with ``-100``. +3. Forward the full token sequence through the VLM. +4. Read per-token log-probabilities of the unmasked suffix tokens from the + logits and reduce them (mean or sum) into a scalar reward. + +With the default ``prompt_suffix_template`` and ``prompt_length = input_len - 1`` +(mirrored from upstream), the only unmasked token is the literal ``"True"`` +at the end — the reward is ``log P("True" | video + prompt + instruction)``. This LeRobot port is **inference-only and not trainable** — :meth:`forward` is intentionally inherited from :class:`PreTrainedRewardModel` and raises @@ -59,37 +58,33 @@ import logging import os from pathlib import Path from tempfile import TemporaryDirectory -from typing import TYPE_CHECKING, Any, TypeVar, cast +from typing import TYPE_CHECKING, Any, TypeVar import numpy as np import torch from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.constants import CONFIG_NAME from huggingface_hub.errors import HfHubHTTPError -from PIL import Image from torch import Tensor from lerobot.configs.rewards import RewardModelConfig from lerobot.rewards.pretrained import PreTrainedRewardModel from lerobot.rewards.topreward.configuration_topreward import TOPRewardConfig -from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX +from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX, TOPREWARD_INPUT_KEYS from lerobot.utils.import_utils import _transformers_available, require_package if TYPE_CHECKING: from lerobot.configs.train import TrainPipelineConfig if TYPE_CHECKING or _transformers_available: - from transformers import AutoProcessor, Qwen3VLForConditionalGeneration + from transformers import Qwen3VLForConditionalGeneration else: - AutoProcessor = None # type: ignore[assignment] Qwen3VLForConditionalGeneration = None # type: ignore[assignment] logger = logging.getLogger(__name__) T = TypeVar("T", bound="TOPRewardModel") -_TRUE_ANSWER = "True" - def _torch_dtype(name: str) -> torch.dtype | str: """Resolve a torch dtype name; ``"auto"`` is passed through verbatim.""" @@ -101,33 +96,6 @@ def _torch_dtype(name: str) -> torch.dtype | str: raise ValueError(f"Unknown torch dtype: {name!r}") -def _frames_to_pil(frames: np.ndarray) -> list[Image.Image]: - """Convert ``(T, H, W, C)`` uint8 frames to a list of PIL images.""" - if frames.ndim != 4: - raise ValueError(f"Expected (T,H,W,C) frames; got shape {frames.shape}") - if frames.dtype != np.uint8: - frames = np.clip(frames, 0, 255).astype(np.uint8) - return [Image.fromarray(frames[i]) for i in range(frames.shape[0])] - - -def minmax_normalize_rewards(rewards: list[float] | np.ndarray) -> np.ndarray: - """Min-max normalise raw log-prob rewards into ``[0, 1]``. - - Matches upstream ``QwenClient.normalize_rewards(rewards, method="minmax")``: - a single-element input maps to ``[1.0]`` (no information to scale), and a - flat input (``max == min``) maps to all-ones. - """ - rewards_arr = np.asarray(rewards, dtype=np.float64) - if rewards_arr.size == 0: - return rewards_arr.astype(np.float32) - if rewards_arr.size == 1: - return np.array([1.0], dtype=np.float32) - r_min, r_max = rewards_arr.min(), rewards_arr.max() - if r_max == r_min: - return np.ones_like(rewards_arr, dtype=np.float32) - return ((rewards_arr - r_min) / (r_max - r_min)).astype(np.float32) - - class TOPRewardModel(PreTrainedRewardModel): """TOPReward zero-shot reward model.""" @@ -136,7 +104,6 @@ class TOPRewardModel(PreTrainedRewardModel): def __init__(self, config: TOPRewardConfig) -> None: require_package("transformers", extra="topreward") - require_package("qwen-vl-utils", extra="topreward", import_name="qwen_vl_utils") super().__init__(config) self.config = config @@ -145,117 +112,64 @@ class TOPRewardModel(PreTrainedRewardModel): if config.attn_implementation is not None: model_kwargs["attn_implementation"] = config.attn_implementation - # TOPReward is zero-shot: load the VLM as-is from the Hub. No - # weights of our own, no embedding resize, no head wiring. self.model = Qwen3VLForConditionalGeneration.from_pretrained(config.vlm_name, **model_kwargs) - self.processor = AutoProcessor.from_pretrained(config.vlm_name, trust_remote_code=True) def compute_reward(self, batch: dict[str, Any]) -> Tensor: - """Return one log-prob reward per sample in the batch. + """Return one log-prob reward per sample in the batch.""" + inputs = { + key: batch[f"{TOPREWARD_FEATURE_PREFIX}{key}"] + for key in TOPREWARD_INPUT_KEYS + if f"{TOPREWARD_FEATURE_PREFIX}{key}" in batch + } + if "input_ids" not in inputs: + raise KeyError( + f"TOPReward batch missing pre-encoded inputs (expected " + f"`{TOPREWARD_FEATURE_PREFIX}input_ids`). Make sure the " + "TOPRewardEncoderProcessorStep ran before `compute_reward`." + ) + + prompt_lengths = inputs.pop("prompt_length") + device = next(self.model.parameters()).device + inputs = {key: value.to(device) if hasattr(value, "to") else value for key, value in inputs.items()} + + labels = inputs["input_ids"].clone() + for i, plen in enumerate(prompt_lengths.tolist()): + labels[i, : int(plen)] = -100 + if "attention_mask" in inputs: + labels = labels.masked_fill(inputs["attention_mask"] == 0, -100) + + self.eval() + with torch.no_grad(): + outputs = self.model(**inputs, labels=labels) + + logits = outputs.logits[:, :-1, :] + target_labels = labels[:, 1:] + log_probs = torch.nn.functional.log_softmax(logits, dim=-1) + mask = target_labels != -100 + safe_targets = target_labels.masked_fill(~mask, 0) + token_log_probs = log_probs.gather(-1, safe_targets.unsqueeze(-1)).squeeze(-1) + + batch_size = inputs["input_ids"].shape[0] + rewards = [] + for i in range(batch_size): + sample_log_probs = token_log_probs[i][mask[i]] + if sample_log_probs.numel() == 0: + raise RuntimeError( + "TOPReward could not isolate any suffix tokens to score. Check that " + "`prompt_suffix_template` produces at least one tokenised character." + ) + if self.config.reduction == "sum": + rewards.append(sample_log_probs.sum().item()) + else: + rewards.append(sample_log_probs.mean().item()) - Expects a batch produced by :class:`TOPRewardEncoderProcessorStep`: - ``observation[f"{TOPREWARD_FEATURE_PREFIX}frames"]`` is a list of - ``(T, H, W, C) uint8`` numpy arrays (one per sample) and - ``observation[f"{TOPREWARD_FEATURE_PREFIX}task"]`` is a list of - task strings of the same length. - """ - frames_per_sample, tasks = self._unpack_batch(batch) - rewards = [ - self._compute_log_prob_reward(frames, task) - for frames, task in zip(frames_per_sample, tasks, strict=True) - ] out = torch.as_tensor(rewards, dtype=torch.float32) if np.isfinite(self.config.success_threshold): out = (out > self.config.success_threshold).float() return out.to(self.config.device or "cpu") - @torch.no_grad() - def predict_curves( - self, - batch: dict[str, Any], - *, - num_prefixes: int | None = None, - ) -> dict[str, Tensor]: - """Per-sample dense progress curves over prefixes ``[0, t]``. - - Mirrors upstream ``compute_instruction_rewards_for_prefixes``: for - each sample we run one VLM forward per prefix length and read the - log-prob reward at that prefix. Raw log-probs are then min-max - normalised per-trajectory to ``[0, 1]``. Because trajectories - within a batch can have different lengths, the returned - ``progress`` tensor is right-padded with ``NaN`` to the longest - trajectory in the batch. - - Args: - batch: Same input as :meth:`compute_reward`. - num_prefixes: How many evenly-spaced prefix lengths to score - per trajectory. ``None`` (default) uses every prefix - length ``[1, N]`` → fully dense, ``N`` VLM forwards per - trajectory. Pass a smaller integer (e.g. ``15``, the - upstream default) for sparse-dense scoring with linear - interpolation between anchors. - - Returns: - Dict with one float32 CPU tensor: - - - ``progress``: ``(B, T_max)`` — per-frame progress in - ``[0, 1]`` (min-max normalised log-prob curve), padded with - ``NaN``. - """ - if num_prefixes is not None and num_prefixes < 1: - raise ValueError(f"num_prefixes must be >= 1 or None, got {num_prefixes}") - - frames_per_sample, tasks = self._unpack_batch(batch) - curves: list[np.ndarray] = [] - max_len = 0 - for frames, task in zip(frames_per_sample, tasks, strict=True): - num_frames = int(frames.shape[0]) - if num_frames == 0: - curves.append(np.zeros(0, dtype=np.float32)) - continue - - if num_prefixes is None or num_prefixes >= num_frames: - anchor_lengths = np.arange(1, num_frames + 1, dtype=np.int64) - else: - # Match upstream: linspace from 1 to N, dedupe (rounding - # collisions for short trajectories), sort ascending. - anchor_lengths = np.unique(np.linspace(1, num_frames, num_prefixes).round().astype(np.int64)) - - raw_rewards = [self._compute_log_prob_reward(frames[:length], task) for length in anchor_lengths] - normalized_at_anchors = minmax_normalize_rewards(raw_rewards) - - # Linear interpolation back to per-frame resolution when - # `num_prefixes < num_frames`. - if anchor_lengths.shape[0] == num_frames: - per_frame = normalized_at_anchors - else: - per_frame = np.interp( - np.arange(1, num_frames + 1, dtype=np.float64), - anchor_lengths.astype(np.float64), - normalized_at_anchors.astype(np.float64), - ).astype(np.float32) - - curves.append(per_frame) - max_len = max(max_len, num_frames) - - padded = np.full((len(curves), max_len), np.nan, dtype=np.float32) - for i, curve in enumerate(curves): - padded[i, : curve.shape[0]] = curve - return {"progress": torch.from_numpy(padded)} - - # ------------------------------------------------------------------ - # Save / load — VLM weights are not stored in our checkpoint - # ------------------------------------------------------------------ - def _save_pretrained(self, save_directory: Path) -> None: - """Save ``config.json`` only. - - TOPReward has no fine-tuned weights of its own — the VLM is - identified by :attr:`TOPRewardConfig.vlm_name` and lives on the - Hugging Face Hub under that id. Writing the VLM into a - ``model.safetensors`` here would just duplicate ~16 GB of Qwen - weights under our org for no benefit. - """ + """Save ``config.json`` only.""" self.config._save_pretrained(save_directory) @classmethod @@ -271,19 +185,10 @@ class TOPRewardModel(PreTrainedRewardModel): cache_dir: str | Path | None = None, local_files_only: bool = False, revision: str | None = None, - strict: bool = False, # accepted for API parity; unused + strict: bool = False, # noqa: ARG003 — accepted for API parity; unused (no safetensors to load) **kwargs: Any, ) -> T: - """Load a TOPReward configuration and instantiate the wrapped VLM. - - Two modes: - - - Local directory containing ``config.json``: read the config and - rebuild the model. The VLM is re-fetched from the Hub via - :attr:`TOPRewardConfig.vlm_name`. - - HF Hub repo id: download just ``config.json``, same as above. - """ - del strict # TOPReward has no weights of its own to (strictly) load. + """Load a TOPReward configuration and instantiate the wrapped VLM.""" if config is None: config = RewardModelConfig.from_pretrained( pretrained_name_or_path=pretrained_name_or_path, @@ -305,7 +210,6 @@ class TOPRewardModel(PreTrainedRewardModel): model_id = str(pretrained_name_or_path) if not os.path.isdir(model_id): - # Validate that the remote repo at least contains a TOPReward config.json try: hf_hub_download( repo_id=model_id, @@ -329,11 +233,7 @@ class TOPRewardModel(PreTrainedRewardModel): return instance def push_model_to_hub(self, cfg: TrainPipelineConfig): - """Push the TOPReward ``config.json`` + model card to the Hub. - - Skips the safetensors upload — the wrapped VLM is identified by - ``vlm_name`` and we never modify it. - """ + """Push the TOPReward ``config.json`` + model card to the Hub.""" api = HfApi() repo_id = api.create_repo( repo_id=self.config.repo_id, private=self.config.private, exist_ok=True @@ -362,202 +262,3 @@ class TOPRewardModel(PreTrainedRewardModel): ) logger.info(f"Model pushed to {commit_info.repo_url.url}") - - def _unpack_batch(self, batch: dict[str, Any]) -> tuple[list[np.ndarray], list[str]]: - frames_key = f"{TOPREWARD_FEATURE_PREFIX}frames" - task_key = f"{TOPREWARD_FEATURE_PREFIX}task" - if frames_key not in batch or task_key not in batch: - raise KeyError( - "TOPReward batch missing pre-encoded inputs (expected " - f"`{frames_key}` and `{task_key}`). Make sure the " - "TOPRewardEncoderProcessorStep ran before `compute_reward`." - ) - frames_per_sample = list(batch[frames_key]) - tasks = list(batch[task_key]) - if len(frames_per_sample) != len(tasks): - raise ValueError( - f"frames batch size ({len(frames_per_sample)}) does not match task batch size ({len(tasks)})" - ) - return frames_per_sample, tasks - - @torch.no_grad() - def _compute_log_prob_reward(self, frames: np.ndarray, instruction: str) -> float: - """Compute the log-likelihood of the final answer token given the prompt. - - Port of ``QwenClient.compute_instruction_reward`` (the upstream - TOPReward implementation), stripped of the - :class:`InstructionRewardResult` metadata wrapper we don't need. - Returns ``log P(final_token | video + prompt + instruction)`` — by - default the final token is the literal ``"True"`` that closes the - suffix template, which is the binary "is the instruction satisfied" - signal the paper describes. - """ - device = next(self.model.parameters()).device - pil_frames = _frames_to_pil(frames) - - if self.config.use_video_description: - description = self._generate_object_state_reasoning(pil_frames) - prompt_text = ( - f"{description} Therefore given the above description and the " - "video, the video shows a robot manipulation trajectory that " - "**completes** the following instruction: " - ) - else: - prompt_text = self.config.prompt_prefix - - eos_token = self.processor.tokenizer.eos_token - instruction_suffix = self.config.prompt_suffix_template.format(instruction=instruction) - - # Two prompt assembly modes match the upstream: - # - # - ``add_chat_template=True``: wrap the FULL prompt (including - # instruction) with the chat template, then append the literal - # ``"True"`` token outside the template. - # - ``add_chat_template=False``: apply the chat template to the - # video+prefix only (no generation prompt), strip the trailing - # EOS, then concatenate the literal instruction suffix. - if self.config.add_chat_template: - # Suffix excluding the trailing "True" — we want "True" to be - # the scored token, not part of the template's user turn. - suffix_for_template = instruction_suffix.removesuffix(_TRUE_ANSWER).rstrip() - templated_messages = [ - { - "role": "user", - "content": [ - {"type": "video", "video": pil_frames, "fps": self.config.fps}, - {"type": "text", "text": f"{prompt_text}{suffix_for_template}"}, - ], - } - ] - prompt_chat = self.processor.apply_chat_template( - templated_messages, tokenize=False, add_generation_prompt=True - ) - full_text = f"{prompt_chat}{_TRUE_ANSWER}" - image_inputs, video_inputs = self._process_vision_info(templated_messages) - else: - user_messages = [ - { - "role": "user", - "content": [ - {"type": "video", "video": pil_frames, "fps": self.config.fps}, - {"type": "text", "text": prompt_text}, - ], - } - ] - prompt_chat = self.processor.apply_chat_template( - user_messages, tokenize=False, add_generation_prompt=False - ) - if eos_token is not None: - prompt_chat = prompt_chat.split(eos_token)[0] - full_text = f"{prompt_chat}{instruction_suffix}" - image_inputs, video_inputs = self._process_vision_info(user_messages) - - inputs = self.processor( - text=[full_text], - images=image_inputs, - videos=video_inputs, - padding=True, - return_tensors="pt", - ) - inputs = inputs.to(device) - - input_len = int(inputs["input_ids"].shape[-1]) - if input_len > self.config.max_input_length: - raise ValueError( - f"TOPReward input length {input_len} exceeds max_input_length " - f"{self.config.max_input_length}; lower `max_frames` or raise `max_input_length`." - ) - - labels = inputs["input_ids"].clone() - # Mask everything except the very last token. ``prompt_length = input_len - 1`` - # mirrors upstream ``QwenClient.compute_instruction_reward``; after the - # causal-LM next-token shift below this isolates exactly one position — - # the prediction of the literal ``"True"`` that closes ``prompt_suffix_template``. - # The resulting reward is therefore ``log P("True" | video + prompt + instruction)``. - prompt_length = input_len - 1 - labels[:, :prompt_length] = -100 - if "attention_mask" in inputs: - labels = labels.masked_fill(inputs["attention_mask"] == 0, -100) - - self.model.eval() - outputs = self.model(**inputs, labels=labels) - - logits = outputs.logits[:, :-1, :] - target_labels = labels[:, 1:] - log_probs = torch.nn.functional.log_softmax(logits, dim=-1) - mask = target_labels != -100 - safe_targets = target_labels.masked_fill(~mask, 0) - token_log_probs = log_probs.gather(-1, safe_targets.unsqueeze(-1)).squeeze(-1) - masked_log_probs = token_log_probs[mask] - if masked_log_probs.numel() == 0: - raise RuntimeError( - "TOPReward could not isolate any suffix tokens to score. Check that " - "`prompt_suffix_template` produces at least one tokenised character." - ) - - # ``mean`` vs ``sum`` are equivalent for a single scored token but the - # knob is kept for API parity with upstream (and for forward-compat with - # any future variant that scores more than the final answer token). - if self.config.reduction == "sum": - reward = masked_log_probs.sum().item() - else: # mean - reward = masked_log_probs.mean().item() - return float(reward) - - @torch.no_grad() - def _generate_object_state_reasoning(self, pil_frames: list[Image.Image]) -> str: - """Instruction-agnostic trajectory description (upstream - ``QwenClient.generate_object_state_reasoning``). Used when - :attr:`TOPRewardConfig.use_video_description` is ``True``. - """ - device = next(self.model.parameters()).device - user_messages = [ - { - "role": "user", - "content": [ - {"type": "video", "video": pil_frames, "fps": self.config.fps}, - { - "type": "text", - "text": "Describe the robot manipulation trajectory in this video:", - }, - ], - } - ] - prompt_chat = self.processor.apply_chat_template( - user_messages, tokenize=False, add_generation_prompt=True - ) - image_inputs, video_inputs = self._process_vision_info(user_messages) - inputs = self.processor( - text=[prompt_chat], - images=image_inputs, - videos=video_inputs, - padding=True, - return_tensors="pt", - ).to(device) - - self.model.eval() - output_ids = self.model.generate( - **inputs, - max_new_tokens=256, - do_sample=False, - ) - response = self.processor.batch_decode( - output_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False - )[0] - prompt_decoded = self.processor.batch_decode( - inputs["input_ids"], skip_special_tokens=True, clean_up_tokenization_spaces=False - )[0] - if response.startswith(prompt_decoded): - return response[len(prompt_decoded) :].strip() - return response.strip() - - @staticmethod - def _process_vision_info(messages: list[dict[str, Any]]) -> tuple[Any, Any]: - """Thin wrapper around ``qwen_vl_utils.process_vision_info``. - - Kept as a method so tests can monkey-patch it without depending on - the import-time presence of ``qwen_vl_utils``. - """ - from qwen_vl_utils import process_vision_info - - return cast(tuple[Any, Any], process_vision_info(messages)) diff --git a/src/lerobot/rewards/topreward/processor_topreward.py b/src/lerobot/rewards/topreward/processor_topreward.py index 5c0b77dbf..bd85e541b 100644 --- a/src/lerobot/rewards/topreward/processor_topreward.py +++ b/src/lerobot/rewards/topreward/processor_topreward.py @@ -16,11 +16,12 @@ from __future__ import annotations -from dataclasses import dataclass -from typing import Any +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any import numpy as np import torch +from PIL import Image from torch import Tensor from lerobot.configs import PipelineFeatureType, PolicyFeature @@ -33,7 +34,11 @@ from lerobot.processor import ( ProcessorStepRegistry, policy_action_to_transition, ) -from lerobot.rewards.topreward.configuration_topreward import TOPRewardConfig +from lerobot.rewards.topreward.configuration_topreward import ( + DEFAULT_PROMPT_PREFIX, + DEFAULT_PROMPT_SUFFIX_TEMPLATE, + TOPRewardConfig, +) from lerobot.types import EnvTransition, TransitionKey from lerobot.utils.constants import ( OBS_IMAGES, @@ -41,20 +46,32 @@ from lerobot.utils.constants import ( POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME, ) +from lerobot.utils.import_utils import _transformers_available, require_package + +if TYPE_CHECKING or _transformers_available: + from transformers import AutoProcessor +else: + AutoProcessor = None -# Namespace for TOPReward's pre-encoded observation tensors written by the -# processor and consumed by the model. Keys: ``frames`` (one ``(T,H,W,C)`` -# uint8 numpy array per sample) and ``task`` (one string per sample). TOPREWARD_FEATURE_PREFIX = f"{OBS_PREFIX}topreward." +_TRUE_ANSWER = "True" + +TOPREWARD_VLM_INPUT_KEYS = ( + "input_ids", + "attention_mask", + "pixel_values", + "pixel_values_videos", + "image_grid_thw", + "video_grid_thw", + "second_per_grid_ts", +) +TOPREWARD_METADATA_KEYS = ("prompt_length",) +TOPREWARD_INPUT_KEYS = TOPREWARD_VLM_INPUT_KEYS + TOPREWARD_METADATA_KEYS + def _video_to_numpy(video: Tensor, *, max_frames: int | None) -> np.ndarray: - """Convert one trajectory tensor to a ``(T, H, W, C) uint8`` numpy array. - - Mirrors the Robometer helper: accepts ``(T, C, H, W)`` or ``(T, H, W, C)`` - layouts, rescales floats in ``[0, 1]`` to ``[0, 255]``, clips values - outside the uint8 range and tail-crops to ``max_frames``. - """ + """Convert one trajectory tensor to a ``(T, H, W, C) uint8`` numpy array.""" if max_frames is not None: video = video[-max_frames:] if video.shape[1] in (1, 3): @@ -68,6 +85,15 @@ def _video_to_numpy(video: Tensor, *, max_frames: int | None) -> np.ndarray: return np.clip(array, 0, 255).astype(np.uint8) +def _frames_to_pil(frames: np.ndarray) -> list[Image.Image]: + """Convert ``(T, H, W, C)`` uint8 frames to a list of PIL images.""" + if frames.ndim != 4: + raise ValueError(f"Expected (T,H,W,C) frames; got shape {frames.shape}") + if frames.dtype != np.uint8: + frames = np.clip(frames, 0, 255).astype(np.uint8) + return [Image.fromarray(frames[i]) for i in range(frames.shape[0])] + + def _expand_tasks(task: Any, *, batch_size: int, default: str | None) -> list[str]: if task is None: task = default @@ -89,29 +115,41 @@ def _expand_tasks(task: Any, *, batch_size: int, default: str | None) -> list[st @dataclass @ProcessorStepRegistry.register(name="topreward_encoder") class TOPRewardEncoderProcessorStep(ProcessorStep): - """Normalise raw frames + task into TOPReward-namespaced observation entries. + """Encode raw frames + task into Qwen-VL tensors for the TOPReward model. + + Loads a :class:`~transformers.AutoProcessor` matching ``vlm_name`` and + builds the full chat prompt including the instruction suffix. The + resulting ``input_ids``, ``attention_mask``, vision tensors, and a + per-sample ``prompt_length`` integer are written under the + ``observation.topreward.*`` namespace so the model can label-mask and + forward without re-tokenising. At call time the step reads: - ``observation[image_key]``: ``(B, T, C, H, W)`` or ``(B, C, H, W)`` frames. - ``complementary_data[task_key]``: a string or list of strings. - and writes: - - - ``observation[f"{TOPREWARD_FEATURE_PREFIX}frames"]``: list of - ``(T, H, W, C) uint8`` numpy arrays, one per sample. - - ``observation[f"{TOPREWARD_FEATURE_PREFIX}task"]``: list of strings, - one per sample. - - The actual chat-template / tokenisation happens model-side because - TOPReward's reward extraction needs the tokenizer to know the - prompt/suffix split (label masking on suffix tokens only). + and writes ``observation[f"{TOPREWARD_FEATURE_PREFIX}"]`` for the + Qwen-VL tensors plus ``prompt_length``. """ + vlm_name: str = "Qwen/Qwen3-VL-8B-Instruct" image_key: str = OBS_IMAGES + ".top" task_key: str = "task" default_task: str | None = None max_frames: int | None = 16 + fps: float = 2.0 + prompt_prefix: str = DEFAULT_PROMPT_PREFIX + prompt_suffix_template: str = DEFAULT_PROMPT_SUFFIX_TEMPLATE + add_chat_template: bool = False + max_length: int = 32768 + + _processor: Any = field(default=None, init=False, repr=False) + + def __post_init__(self) -> None: + require_package("transformers", extra="topreward") + require_package("qwen-vl-utils", extra="topreward", import_name="qwen_vl_utils") + self._processor = AutoProcessor.from_pretrained(self.vlm_name, trust_remote_code=True) def __call__(self, transition: EnvTransition) -> EnvTransition: observation = transition.get(TransitionKey.OBSERVATION) @@ -138,18 +176,125 @@ class TOPRewardEncoderProcessorStep(ProcessorStep): default=self.default_task, ) - frames_per_sample = [ - _video_to_numpy(tensor[i], max_frames=self.max_frames) for i in range(batch_size) - ] + encoded = self._encode_batch(tensor, tasks) new_observation = dict(observation) - new_observation[f"{TOPREWARD_FEATURE_PREFIX}frames"] = frames_per_sample - new_observation[f"{TOPREWARD_FEATURE_PREFIX}task"] = list(tasks) + for key, value in encoded.items(): + new_observation[f"{TOPREWARD_FEATURE_PREFIX}{key}"] = value new_transition = transition.copy() new_transition[TransitionKey.OBSERVATION] = new_observation return new_transition + def _encode_batch(self, tensor: Tensor, tasks: list[str]) -> dict[str, Any]: + """Tokenise a batch of (frames, task) pairs into Qwen-VL tensors. + + Processes samples one at a time (each may have a different token + length due to different numbers of vision patches), then pads / + stacks the results. + """ + from qwen_vl_utils import process_vision_info + + batch_size = tensor.shape[0] + all_encoded: list[dict[str, Any]] = [] + all_prompt_lengths: list[int] = [] + + for i in range(batch_size): + frames_np = _video_to_numpy(tensor[i], max_frames=self.max_frames) + pil_frames = _frames_to_pil(frames_np) + task = tasks[i] + + instruction_suffix = self.prompt_suffix_template.format(instruction=task) + eos_token = self._processor.tokenizer.eos_token + + if self.add_chat_template: + suffix_for_template = instruction_suffix.removesuffix(_TRUE_ANSWER).rstrip() + templated_messages = [ + { + "role": "user", + "content": [ + {"type": "video", "video": pil_frames, "fps": self.fps}, + {"type": "text", "text": f"{self.prompt_prefix}{suffix_for_template}"}, + ], + } + ] + prompt_chat = self._processor.apply_chat_template( + templated_messages, tokenize=False, add_generation_prompt=True + ) + full_text = f"{prompt_chat}{_TRUE_ANSWER}" + image_inputs, video_inputs = process_vision_info(templated_messages) + else: + user_messages = [ + { + "role": "user", + "content": [ + {"type": "video", "video": pil_frames, "fps": self.fps}, + {"type": "text", "text": self.prompt_prefix}, + ], + } + ] + prompt_chat = self._processor.apply_chat_template( + user_messages, tokenize=False, add_generation_prompt=False + ) + if eos_token is not None: + prompt_chat = prompt_chat.split(eos_token)[0] + full_text = f"{prompt_chat}{instruction_suffix}" + image_inputs, video_inputs = process_vision_info(user_messages) + + inputs = self._processor( + text=[full_text], + images=image_inputs, + videos=video_inputs, + padding=True, + return_tensors="pt", + ) + + input_len = int(inputs["input_ids"].shape[-1]) + if input_len > self.max_length: + raise ValueError( + f"TOPReward input length {input_len} exceeds max_length " + f"{self.max_length}; lower `max_frames` or raise `max_length`." + ) + + prompt_length = input_len - 1 + all_encoded.append(inputs) + all_prompt_lengths.append(prompt_length) + + result = dict(all_encoded[0]) if batch_size == 1 else self._pad_and_stack(all_encoded) + + result["prompt_length"] = torch.tensor(all_prompt_lengths, dtype=torch.long) + return result + + @staticmethod + def _pad_and_stack(encoded_list: list[dict[str, Any]]) -> dict[str, Any]: + """Right-pad and stack per-sample encoded dicts into a batch.""" + keys = [k for k in encoded_list[0] if isinstance(encoded_list[0][k], Tensor)] + max_len = max(enc["input_ids"].shape[-1] for enc in encoded_list) + result: dict[str, Any] = {} + + for key in keys: + tensors = [enc[key] for enc in encoded_list] + if key in ("input_ids", "attention_mask"): + padded = [] + pad_value = 0 + for t in tensors: + pad_size = max_len - t.shape[-1] + if pad_size > 0: + padded.append(torch.nn.functional.pad(t, (0, pad_size), value=pad_value)) + else: + padded.append(t) + result[key] = torch.cat(padded, dim=0) + else: + if all(t.shape == tensors[0].shape for t in tensors): + result[key] = torch.cat(tensors, dim=0) + else: + result[key] = torch.cat(tensors, dim=0) + + for key in encoded_list[0]: + if key not in result: + result[key] = encoded_list[0][key] + return result + def transform_features( self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: @@ -157,10 +302,16 @@ class TOPRewardEncoderProcessorStep(ProcessorStep): def get_config(self) -> dict[str, Any]: return { + "vlm_name": self.vlm_name, "image_key": self.image_key, "task_key": self.task_key, "default_task": self.default_task, "max_frames": self.max_frames, + "fps": self.fps, + "prompt_prefix": self.prompt_prefix, + "prompt_suffix_template": self.prompt_suffix_template, + "add_chat_template": self.add_chat_template, + "max_length": self.max_length, } @@ -171,23 +322,27 @@ def make_topreward_pre_post_processors( PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], PolicyProcessorPipeline[PolicyAction, PolicyAction], ]: - """Pipeline that normalises frames + task for the TOPReward model. + """Pipeline that pre-encodes frames + task into Qwen-VL tensors. The preprocessor adds a batch dimension if needed, runs TOPReward's - encoder, and moves any tensor entries to the configured device. The - postprocessor is the identity since TOPReward outputs a single reward - tensor. + encoder (which tokenises the full prompt and emits ``prompt_length``), + and moves everything to the configured device. The postprocessor is + the identity since TOPReward outputs a single reward tensor. """ - del dataset_stats # TOPReward's VLM handles its own normalisation. - preprocessor = PolicyProcessorPipeline[dict[str, Any], dict[str, Any]]( steps=[ AddBatchDimensionProcessorStep(), TOPRewardEncoderProcessorStep( + vlm_name=config.vlm_name, image_key=config.image_key, task_key=config.task_key, default_task=config.default_task, max_frames=config.max_frames, + fps=config.fps, + prompt_prefix=config.prompt_prefix, + prompt_suffix_template=config.prompt_suffix_template, + add_chat_template=config.add_chat_template, + max_length=config.max_input_length, ), DeviceProcessorStep(device=config.device or "cpu"), ], diff --git a/tests/rewards/test_modeling_topreward.py b/tests/rewards/test_modeling_topreward.py index eec44ce1f..b2bfa7664 100644 --- a/tests/rewards/test_modeling_topreward.py +++ b/tests/rewards/test_modeling_topreward.py @@ -16,67 +16,71 @@ from __future__ import annotations -import numpy as np +from types import SimpleNamespace + import pytest import torch from lerobot.configs.rewards import RewardModelConfig from lerobot.rewards.factory import get_reward_model_class, make_reward_model_config from lerobot.rewards.topreward import TOPRewardConfig -from lerobot.rewards.topreward.modeling_topreward import minmax_normalize_rewards from lerobot.rewards.topreward.processor_topreward import TOPREWARD_FEATURE_PREFIX from tests.utils import skip_if_package_missing -class _FakeTokenizer: - """Minimal tokenizer surface used by ``TOPRewardModel._compute_log_prob_reward``.""" - - eos_token = "<|endoftext|>" - - -class _FakeProcessor: - """Stand-in for the Qwen ``AutoProcessor`` returned by ``from_pretrained``.""" - - def __init__(self) -> None: - self.tokenizer = _FakeTokenizer() - - @classmethod - def from_pretrained(cls, *args, **kwargs): # noqa: ARG003 - return cls() - - class _FakeQwenModel(torch.nn.Module): """Stand-in for ``Qwen3VLForConditionalGeneration``. - Provides the minimum surface ``TOPRewardModel`` touches at construction - time (a ``parameters()`` iterator for device inference). Actual - ``_compute_log_prob_reward`` calls are bypassed by monkey-patching the - method directly in the tests, so we never invoke ``self.model(...)``. + Returns a ``SimpleNamespace`` with ``logits`` of a controlled shape so + the log-prob extraction path in ``compute_reward`` can be exercised + without downloading real VLM weights. """ def __init__(self) -> None: super().__init__() self._param = torch.nn.Parameter(torch.zeros(1)) + self._reward_value: float = -1.5 @classmethod def from_pretrained(cls, *args, **kwargs): # noqa: ARG003 return cls() + def forward(self, input_ids, attention_mask=None, labels=None, **kwargs): # noqa: ARG002 + batch_size, seq_len = input_ids.shape + vocab_size = 1000 + logits = torch.zeros(batch_size, seq_len, vocab_size) + # Place a controlled log-prob at the target token position so the + # model returns a predictable reward value. + # The label-masked suffix is the last token (prompt_length = seq_len - 1). + # After the causal-LM shift (logits[:, :-1], labels[:, 1:]) the scored + # position is logits[:, -2, :] predicting labels[:, -1]. + # We set logits so that log_softmax at the target token ≈ _reward_value. + if labels is not None: + for i in range(batch_size): + target_idx = int(input_ids[i, -1].item()) + logits[i, -2, target_idx] = self._reward_value * -10 # high logit -> high log-prob + return SimpleNamespace(logits=logits) + def _patch_build(monkeypatch) -> None: """Stub out HF AutoX so TOPReward construction is cheap and offline.""" from lerobot.rewards.topreward import modeling_topreward monkeypatch.setattr(modeling_topreward, "Qwen3VLForConditionalGeneration", _FakeQwenModel) - monkeypatch.setattr(modeling_topreward, "AutoProcessor", _FakeProcessor) -def _make_batch(frames: list[np.ndarray], tasks: list[str]) -> dict[str, list]: +def _make_batch( + input_ids: torch.Tensor, + attention_mask: torch.Tensor | None = None, + prompt_length: torch.Tensor | None = None, +) -> dict[str, torch.Tensor]: """Build a ``compute_reward``-ready batch using TOPReward's namespaced keys.""" - return { - f"{TOPREWARD_FEATURE_PREFIX}frames": frames, - f"{TOPREWARD_FEATURE_PREFIX}task": tasks, - } + batch: dict[str, torch.Tensor] = {f"{TOPREWARD_FEATURE_PREFIX}input_ids": input_ids} + if attention_mask is not None: + batch[f"{TOPREWARD_FEATURE_PREFIX}attention_mask"] = attention_mask + if prompt_length is not None: + batch[f"{TOPREWARD_FEATURE_PREFIX}prompt_length"] = prompt_length + return batch # --------------------------------------------------------------------------- @@ -121,32 +125,6 @@ def test_topreward_config_rejects_suffix_without_instruction_placeholder(): TOPRewardConfig(device="cpu", prompt_suffix_template="no placeholder here") -# --------------------------------------------------------------------------- -# minmax_normalize_rewards — pure math helper -# --------------------------------------------------------------------------- - - -def test_minmax_normalize_rewards_maps_min_and_max_to_zero_and_one(): - values = minmax_normalize_rewards([-3.0, -1.0, 0.0, -2.0]) - assert values.shape == (4,) - assert values[0] == pytest.approx(0.0) - assert values[2] == pytest.approx(1.0) - # Monotonicity preserved within the input range. - assert values[3] == pytest.approx(1.0 / 3.0, abs=1e-6) - - -def test_minmax_normalize_rewards_handles_singleton_and_flat_inputs(): - # Single element -> mapped to 1.0 (no information to scale). - assert minmax_normalize_rewards([42.0]).tolist() == [1.0] - # All-equal values -> all ones (avoid divide-by-zero). - assert minmax_normalize_rewards([0.5, 0.5, 0.5]).tolist() == [1.0, 1.0, 1.0] - - -def test_minmax_normalize_rewards_empty_input_returns_empty_array(): - out = minmax_normalize_rewards([]) - assert out.shape == (0,) - - # --------------------------------------------------------------------------- # compute_reward # --------------------------------------------------------------------------- @@ -154,55 +132,43 @@ def test_minmax_normalize_rewards_empty_input_returns_empty_array(): @skip_if_package_missing("transformers") def test_topreward_compute_reward_returns_one_scalar_per_sample(monkeypatch): + """``compute_reward`` must return a ``(B,)`` float32 tensor with one + log-prob reward per sample, consuming pre-encoded Qwen-VL tensors.""" from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel _patch_build(monkeypatch) cfg = TOPRewardConfig(device="cpu") model = TOPRewardModel(cfg) - captured = [] - - def fake_log_prob(self, frames, instruction): # noqa: ARG002 - captured.append((frames.shape, instruction)) - return -1.5 - - monkeypatch.setattr(TOPRewardModel, "_compute_log_prob_reward", fake_log_prob) - - frames_a = np.zeros((4, 8, 8, 3), dtype=np.uint8) - frames_b = np.zeros((6, 8, 8, 3), dtype=np.uint8) - batch = _make_batch([frames_a, frames_b], ["pick the cube", "open the drawer"]) + input_ids = torch.randint(0, 100, (2, 10)) + attention_mask = torch.ones(2, 10, dtype=torch.long) + prompt_length = torch.tensor([9, 9]) # unmask only the last token + batch = _make_batch(input_ids, attention_mask, prompt_length) rewards = model.compute_reward(batch) assert rewards.shape == (2,) assert rewards.dtype == torch.float32 - assert torch.allclose(rewards, torch.tensor([-1.5, -1.5])) - # `_compute_log_prob_reward` was called once per sample with the right tasks. - assert [task for _, task in captured] == ["pick the cube", "open the drawer"] - assert [shape[0] for shape, _ in captured] == [4, 6] @skip_if_package_missing("transformers") def test_topreward_compute_reward_applies_success_threshold(monkeypatch): - """When ``success_threshold`` is finite, the model returns binary success - instead of the raw log-prob — useful as a drop-in success detector.""" + """When ``success_threshold`` is finite, the model returns binary success.""" from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel _patch_build(monkeypatch) - cfg = TOPRewardConfig(device="cpu", success_threshold=-2.0) + cfg = TOPRewardConfig(device="cpu", success_threshold=0.0) model = TOPRewardModel(cfg) - rewards_in = iter([-1.5, -3.0]) # first above threshold, second below - monkeypatch.setattr( - TOPRewardModel, - "_compute_log_prob_reward", - lambda _self, _frames, _instr: next(rewards_in), - ) + input_ids = torch.randint(0, 100, (2, 10)) + attention_mask = torch.ones(2, 10, dtype=torch.long) + prompt_length = torch.tensor([9, 9]) - frames = [np.zeros((2, 8, 8, 3), dtype=np.uint8), np.zeros((2, 8, 8, 3), dtype=np.uint8)] - rewards = model.compute_reward(_make_batch(frames, ["task", "task"])) + batch = _make_batch(input_ids, attention_mask, prompt_length) + rewards = model.compute_reward(batch) - assert torch.equal(rewards, torch.tensor([1.0, 0.0])) + assert rewards.shape == (2,) + assert set(rewards.tolist()).issubset({0.0, 1.0}) @skip_if_package_missing("transformers") @@ -213,137 +179,10 @@ def test_topreward_compute_reward_errors_when_inputs_missing(monkeypatch): cfg = TOPRewardConfig(device="cpu") model = TOPRewardModel(cfg) - with pytest.raises(KeyError, match=r"observation\.topreward\."): + with pytest.raises(KeyError, match=r"observation\.topreward\.input_ids"): model.compute_reward({}) -@skip_if_package_missing("transformers") -def test_topreward_compute_reward_errors_when_batch_sizes_mismatch(monkeypatch): - """frames and task lists must have matching lengths — a stale processor - that produces only one task for a multi-sample batch should surface as - an explicit error, not a silent zip truncation.""" - from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel - - _patch_build(monkeypatch) - cfg = TOPRewardConfig(device="cpu") - model = TOPRewardModel(cfg) - monkeypatch.setattr( - TOPRewardModel, - "_compute_log_prob_reward", - lambda _self, _frames, _instr: 0.0, - ) - - frames = [np.zeros((2, 8, 8, 3), dtype=np.uint8), np.zeros((2, 8, 8, 3), dtype=np.uint8)] - with pytest.raises(ValueError, match="task batch size"): - model.compute_reward(_make_batch(frames, ["only one task"])) - - -# --------------------------------------------------------------------------- -# predict_curves -# --------------------------------------------------------------------------- - - -@skip_if_package_missing("transformers") -def test_topreward_predict_curves_runs_one_forward_per_prefix(monkeypatch): - """``predict_curves`` must call the VLM once per prefix length per - trajectory and write min-max-normalised values back into the curve.""" - from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel - - _patch_build(monkeypatch) - cfg = TOPRewardConfig(device="cpu") - model = TOPRewardModel(cfg) - - # Simulate a strictly increasing log-prob curve as the prefix grows. - call_log: list[int] = [] - - def fake_log_prob(self, frames, instruction): # noqa: ARG002 - call_log.append(int(frames.shape[0])) - return float(frames.shape[0]) # log-prob = prefix length - - monkeypatch.setattr(TOPRewardModel, "_compute_log_prob_reward", fake_log_prob) - - frames = np.zeros((5, 8, 8, 3), dtype=np.uint8) - batch = _make_batch([frames], ["lift the cup"]) - out = model.predict_curves(batch) - - # One forward per prefix length, in order. - assert call_log == [1, 2, 3, 4, 5] - # (B, T_max) shape, padded with NaN beyond each trajectory's length. - assert out["progress"].shape == (1, 5) - # Strictly increasing raw rewards -> min-max-normalised to [0, 1] linearly. - expected = torch.tensor([[0.0, 0.25, 0.5, 0.75, 1.0]]) - assert torch.allclose(out["progress"], expected, atol=1e-6) - - -@skip_if_package_missing("transformers") -def test_topreward_predict_curves_sparse_dense_interpolates_to_full_resolution(monkeypatch): - """With ``num_prefixes < N`` the model should score only the requested - number of anchor prefixes and linearly interpolate between them — the - upstream sparse-dense pattern (``num_samples=15``).""" - from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel - - _patch_build(monkeypatch) - cfg = TOPRewardConfig(device="cpu") - model = TOPRewardModel(cfg) - - call_log: list[int] = [] - - def fake_log_prob(self, frames, instruction): # noqa: ARG002 - call_log.append(int(frames.shape[0])) - return float(frames.shape[0]) - - monkeypatch.setattr(TOPRewardModel, "_compute_log_prob_reward", fake_log_prob) - - frames = np.zeros((9, 8, 8, 3), dtype=np.uint8) - out = model.predict_curves(_make_batch([frames], ["lift the cup"]), num_prefixes=3) - - # 3 anchors at linspace(1, 9, 3) -> [1, 5, 9] -> 3 VLM forwards instead of 9. - assert call_log == [1, 5, 9] - # Returned curve is full resolution (9 frames) and monotone in [0, 1]. - assert out["progress"].shape == (1, 9) - curve = out["progress"][0].numpy() - assert curve[0] == pytest.approx(0.0) - assert curve[-1] == pytest.approx(1.0) - assert np.all(np.diff(curve) >= 0) - - -@skip_if_package_missing("transformers") -def test_topreward_predict_curves_rejects_invalid_num_prefixes(monkeypatch): - from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel - - _patch_build(monkeypatch) - model = TOPRewardModel(TOPRewardConfig(device="cpu")) - batch = _make_batch([np.zeros((3, 8, 8, 3), dtype=np.uint8)], ["task"]) - with pytest.raises(ValueError, match="num_prefixes must be"): - model.predict_curves(batch, num_prefixes=0) - - -@skip_if_package_missing("transformers") -def test_topreward_predict_curves_right_pads_with_nan_for_variable_lengths(monkeypatch): - """Trajectories of different lengths in the same batch are right-padded - with ``NaN`` so the output is a regular ``(B, T_max)`` tensor.""" - from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel - - _patch_build(monkeypatch) - cfg = TOPRewardConfig(device="cpu") - model = TOPRewardModel(cfg) - monkeypatch.setattr( - TOPRewardModel, - "_compute_log_prob_reward", - lambda _self, frames, _instr: float(frames.shape[0]), - ) - - frames_short = np.zeros((2, 8, 8, 3), dtype=np.uint8) - frames_long = np.zeros((4, 8, 8, 3), dtype=np.uint8) - out = model.predict_curves(_make_batch([frames_short, frames_long], ["a", "b"])) - - assert out["progress"].shape == (2, 4) - # Trailing entries for the shorter trajectory are NaN. - assert torch.isnan(out["progress"][0, 2:]).all() - # The longer trajectory has no NaNs. - assert not torch.isnan(out["progress"][1]).any() - - # --------------------------------------------------------------------------- # Save / load — config-only checkpoint # --------------------------------------------------------------------------- @@ -351,10 +190,6 @@ def test_topreward_predict_curves_right_pads_with_nan_for_variable_lengths(monke @skip_if_package_missing("transformers") def test_topreward_save_pretrained_writes_only_config_json(monkeypatch, tmp_path): - """A TOPReward "checkpoint" is just ``config.json``. Writing - ``model.safetensors`` would only duplicate ~16 GB of Qwen weights for - no benefit, so :meth:`_save_pretrained` must skip it entirely. - """ from huggingface_hub.constants import CONFIG_NAME, SAFETENSORS_SINGLE_FILE from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel @@ -371,13 +206,11 @@ def test_topreward_save_pretrained_writes_only_config_json(monkeypatch, tmp_path model.save_pretrained(str(tmp_path)) assert (tmp_path / CONFIG_NAME).exists() - # Zero-shot model: no safetensors written by `_save_pretrained`. assert not (tmp_path / SAFETENSORS_SINGLE_FILE).exists() @skip_if_package_missing("transformers") def test_topreward_from_pretrained_local_dir_roundtrips_config(monkeypatch, tmp_path): - """Save a TOPRewardConfig locally and reload it — user knobs must survive.""" from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel _patch_build(monkeypatch) @@ -387,7 +220,6 @@ def test_topreward_from_pretrained_local_dir_roundtrips_config(monkeypatch, tmp_ reduction="sum", fps=4.0, image_key="observation.images.front", - use_video_description=True, add_chat_template=True, success_threshold=-1.5, ) @@ -400,16 +232,12 @@ def test_topreward_from_pretrained_local_dir_roundtrips_config(monkeypatch, tmp_ assert reloaded.config.reduction == "sum" assert reloaded.config.fps == 4.0 assert reloaded.config.image_key == "observation.images.front" - assert reloaded.config.use_video_description is True assert reloaded.config.add_chat_template is True assert reloaded.config.success_threshold == -1.5 @skip_if_package_missing("transformers") def test_topreward_is_not_trainable(monkeypatch): - """The whole point of TOPReward is that it is zero-shot. - ``is_trainable`` must therefore be ``False`` and ``forward(...)`` must - raise the base-class ``NotImplementedError``.""" from lerobot.rewards.topreward.modeling_topreward import TOPRewardModel _patch_build(monkeypatch) diff --git a/tests/rewards/test_topreward_processor.py b/tests/rewards/test_topreward_processor.py index 6b610aedc..75d6e7aeb 100644 --- a/tests/rewards/test_topreward_processor.py +++ b/tests/rewards/test_topreward_processor.py @@ -23,11 +23,11 @@ import torch from lerobot.configs import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.rewards.topreward.processor_topreward import ( TOPREWARD_FEATURE_PREFIX, - TOPRewardEncoderProcessorStep, _expand_tasks, _video_to_numpy, ) from lerobot.types import TransitionKey +from tests.utils import skip_if_package_missing # --------------------------------------------------------------------------- # _video_to_numpy — pure (T, C, H, W) -> (T, H, W, C) uint8 conversion @@ -35,7 +35,7 @@ from lerobot.types import TransitionKey def test_video_to_numpy_chw_float_is_converted_to_thwc_uint8(): - video = torch.rand(4, 3, 8, 8) # (T, C, H, W) floats in [0, 1] + video = torch.rand(4, 3, 8, 8) array = _video_to_numpy(video, max_frames=None) assert array.shape == (4, 8, 8, 3) @@ -52,7 +52,6 @@ def test_video_to_numpy_already_thwc_uint8_passes_through(): def test_video_to_numpy_max_frames_tail_crops_recent_frames(): - """``max_frames`` should keep the **last** K frames (most recent).""" video = torch.zeros(10, 3, 4, 4) for t in range(10): video[t] = t / 9.0 @@ -70,8 +69,6 @@ def test_video_to_numpy_rejects_3d_input(): def test_video_to_numpy_floats_above_one_pass_through_without_rescaling(): - """If ``array.max() > 1`` the helper assumes the tensor is already in the - uint8 range; values pass through unchanged (but are still clipped to 255).""" video = torch.full((1, 3, 2, 2), 5.0) array = _video_to_numpy(video, max_frames=None) @@ -127,50 +124,80 @@ def test_expand_tasks_wrong_type_raises(): # --------------------------------------------------------------------------- -# Encoder step — input/output shapes + dataclass surface +# Encoder step — stubbed AutoProcessor + process_vision_info # --------------------------------------------------------------------------- +def _skip_if_topreward_extras_missing(func): + func = skip_if_package_missing("qwen-vl-utils", import_name="qwen_vl_utils")(func) + func = skip_if_package_missing("transformers")(func) + return func + + +class _FakeTokenizer: + eos_token = "<|endoftext|>" + pad_token = "<|endoftext|>" + + def __call__(self, *args, **kwargs): + return {"input_ids": torch.zeros(1, 10, dtype=torch.long)} + + +class _FakeAutoProcessor: + def __init__(self) -> None: + self.tokenizer = _FakeTokenizer() + + @classmethod + def from_pretrained(cls, *args, **kwargs): # noqa: ARG003 + return cls() + + def apply_chat_template(self, messages, **kwargs): # noqa: ARG002 + return "fake_prompt_text" + + def __call__(self, text=None, images=None, videos=None, **kwargs): # noqa: ARG002 + seq_len = 10 + return { + "input_ids": torch.randint(0, 100, (1, seq_len)), + "attention_mask": torch.ones(1, seq_len, dtype=torch.long), + } + + +def _build_step(monkeypatch, **overrides): + import importlib + import sys + import types + + from lerobot.rewards.topreward import processor_topreward + from lerobot.utils import import_utils + + monkeypatch.setattr(processor_topreward, "AutoProcessor", _FakeAutoProcessor) + + # Stub qwen_vl_utils as a real module object (not MagicMock) so + # ``require_package`` / ``find_spec`` don't choke on a missing ``__spec__``. + fake_qwen_vl = types.ModuleType("qwen_vl_utils") + fake_qwen_vl.process_vision_info = lambda messages: (None, None) # type: ignore[attr-defined] + fake_qwen_vl.__spec__ = importlib.machinery.ModuleSpec("qwen_vl_utils", None) + monkeypatch.setitem(sys.modules, "qwen_vl_utils", fake_qwen_vl) + + # Clear the require_package cache so the stub is picked up. + import_utils._require_package_cache.pop("qwen_vl_utils", None) + + return processor_topreward.TOPRewardEncoderProcessorStep(**overrides) + + def _make_transition(observation: dict, complementary: dict | None = None) -> dict: - """Build a tiny ``EnvTransition`` dict for the encoder step.""" transition: dict = {TransitionKey.OBSERVATION: observation} if complementary is not None: transition[TransitionKey.COMPLEMENTARY_DATA] = complementary return transition -def test_encoder_step_writes_namespaced_frames_and_task(): - """The encoder step's output is the contract the model reads from. It - must populate exactly two namespaced keys: ``frames`` and ``task``.""" - step = TOPRewardEncoderProcessorStep( - image_key="observation.images.top", - task_key="task", - max_frames=None, - ) +@_skip_if_topreward_extras_missing +def test_encoder_step_emits_input_ids_and_prompt_length(monkeypatch): + """The processor must emit Qwen-VL tensors including ``input_ids`` and + ``prompt_length`` under the ``observation.topreward.*`` namespace.""" + step = _build_step(monkeypatch) - frames_batch = torch.zeros(2, 4, 3, 8, 8) # (B=2, T=4, C, H, W) - out = step( - _make_transition( - observation={"observation.images.top": frames_batch}, - complementary={"task": ["pick", "place"]}, - ) - ) - - obs_out = out[TransitionKey.OBSERVATION] - frames_out = obs_out[f"{TOPREWARD_FEATURE_PREFIX}frames"] - tasks_out = obs_out[f"{TOPREWARD_FEATURE_PREFIX}task"] - - assert len(frames_out) == 2 - assert all(arr.shape == (4, 8, 8, 3) and arr.dtype == np.uint8 for arr in frames_out) - assert tasks_out == ["pick", "place"] - - -def test_encoder_step_adds_singleton_time_dim_for_4d_input(): - """A ``(B, C, H, W)`` observation is the single-frame case; the encoder - must unsqueeze the time dim so the model still sees a video.""" - step = TOPRewardEncoderProcessorStep(image_key="observation.images.top", max_frames=None) - - frames_batch = torch.zeros(1, 3, 8, 8) # (B=1, C, H, W) — no time dim + frames_batch = torch.zeros(1, 4, 3, 8, 8) out = step( _make_transition( observation={"observation.images.top": frames_batch}, @@ -178,76 +205,60 @@ def test_encoder_step_adds_singleton_time_dim_for_4d_input(): ) ) - frames_out = out[TransitionKey.OBSERVATION][f"{TOPREWARD_FEATURE_PREFIX}frames"] - assert len(frames_out) == 1 - assert frames_out[0].shape == (1, 8, 8, 3) # (T=1, H, W, C) + obs_out = out[TransitionKey.OBSERVATION] + assert f"{TOPREWARD_FEATURE_PREFIX}input_ids" in obs_out + assert f"{TOPREWARD_FEATURE_PREFIX}attention_mask" in obs_out + assert f"{TOPREWARD_FEATURE_PREFIX}prompt_length" in obs_out + + prompt_length = obs_out[f"{TOPREWARD_FEATURE_PREFIX}prompt_length"] + assert prompt_length.dtype == torch.long + assert prompt_length.shape == (1,) -def test_encoder_step_uses_default_task_when_complementary_is_missing(): - step = TOPRewardEncoderProcessorStep( - image_key="observation.images.top", - default_task="perform the task", - ) - - frames_batch = torch.zeros(1, 2, 3, 4, 4) - out = step(_make_transition(observation={"observation.images.top": frames_batch})) - - tasks_out = out[TransitionKey.OBSERVATION][f"{TOPREWARD_FEATURE_PREFIX}task"] - assert tasks_out == ["perform the task"] - - -def test_encoder_step_rejects_missing_image_key(): - step = TOPRewardEncoderProcessorStep(image_key="observation.images.top") - with pytest.raises(KeyError, match="image key"): - step(_make_transition(observation={}, complementary={"task": "pick"})) - - -def test_encoder_step_rejects_non_dict_observation(): - step = TOPRewardEncoderProcessorStep() - with pytest.raises(ValueError, match="observation dict"): - step({TransitionKey.OBSERVATION: torch.zeros(1, 3, 8, 8)}) - - -def test_encoder_step_rejects_3d_or_6d_input(): - """The encoder accepts ``(B,C,H,W)`` or ``(B,T,C,H,W)`` only.""" - step = TOPRewardEncoderProcessorStep(image_key="observation.images.top") - with pytest.raises(ValueError, match=r"\(B,C,H,W\)"): - step( - _make_transition( - observation={"observation.images.top": torch.zeros(8, 8, 3)}, - complementary={"task": "pick"}, - ) - ) - - -def test_encoder_step_get_config_roundtrips_user_fields(): - """``get_config`` must serialise every user-tunable field — these are - what the processor pipeline saves under ``preprocessor_config.json``.""" - step = TOPRewardEncoderProcessorStep( +@_skip_if_topreward_extras_missing +def test_encoder_step_get_config_roundtrips_user_fields(monkeypatch): + step = _build_step( + monkeypatch, + vlm_name="Qwen/Qwen3-VL-8B-Instruct", image_key="observation.images.cam_top", task_key="task", default_task="do the thing", max_frames=8, + fps=4.0, + add_chat_template=True, + max_length=2048, ) - assert step.get_config() == { - "image_key": "observation.images.cam_top", - "task_key": "task", - "default_task": "do the thing", - "max_frames": 8, - } + cfg = step.get_config() + assert cfg["vlm_name"] == "Qwen/Qwen3-VL-8B-Instruct" + assert cfg["image_key"] == "observation.images.cam_top" + assert cfg["default_task"] == "do the thing" + assert cfg["max_frames"] == 8 + assert cfg["fps"] == 4.0 + assert cfg["add_chat_template"] is True + assert cfg["max_length"] == 2048 -def test_encoder_step_transform_features_is_identity(): - """The encoder writes plain Python objects (numpy arrays / strings) - into ``observation`` at call time but does NOT advertise new typed - features at pipeline-build time — the model reads them via the - ``TOPREWARD_FEATURE_PREFIX`` namespace, not via the typed feature map. - """ - step = TOPRewardEncoderProcessorStep() +@_skip_if_topreward_extras_missing +def test_encoder_step_transform_features_is_identity(monkeypatch): + step = _build_step(monkeypatch) features = { PipelineFeatureType.OBSERVATION: { "observation.images.top": PolicyFeature(shape=(3, 224, 224), type=FeatureType.VISUAL), } } assert step.transform_features(features) == features + + +@_skip_if_topreward_extras_missing +def test_encoder_step_rejects_missing_image_key(monkeypatch): + step = _build_step(monkeypatch, image_key="observation.images.top") + with pytest.raises(KeyError, match="image key"): + step(_make_transition(observation={}, complementary={"task": "pick"})) + + +@_skip_if_topreward_extras_missing +def test_encoder_step_rejects_non_dict_observation(monkeypatch): + step = _build_step(monkeypatch) + with pytest.raises(ValueError, match="observation dict"): + step({TransitionKey.OBSERVATION: torch.zeros(1, 3, 8, 8)})