diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml
index 470319c48..412386e2d 100644
--- a/docs/source/_toctree.yml
+++ b/docs/source/_toctree.yml
@@ -39,8 +39,10 @@
title: Porting Large Datasets
- local: using_dataset_tools
title: Using the Dataset Tools
- - local: dataset_subtask
- title: Using Subtasks in the Dataset
+ - local: language_and_recipes
+ title: Language Columns and Recipes
+ - local: tools
+ title: Tools
- local: video_encoding_parameters
title: Video encoding parameters
- local: streaming_video_encoding
diff --git a/docs/source/dataset_subtask.mdx b/docs/source/dataset_subtask.mdx
deleted file mode 100644
index 6264aca22..000000000
--- a/docs/source/dataset_subtask.mdx
+++ /dev/null
@@ -1,277 +0,0 @@
-# Using Subtasks in LeRobot Datasets
-
-Subtask support in robotics datasets has proven effective in improving robot reasoning and understanding. Subtasks are particularly useful for:
-
-- **Hierarchical policies**: Building policies that include subtask predictions to visualize robot reasoning in real time
-- **Reward modeling**: Helping reward models understand task progression (e.g., SARM-style stage-aware reward models)
-- **Task decomposition**: Breaking down complex manipulation tasks into atomic, interpretable steps
-
-LeRobotDataset now supports subtasks as part of its dataset structure, alongside tasks.
-
-## What are Subtasks?
-
-While a **task** describes the overall goal (e.g., "Pick up the apple and place it in the basket"), **subtasks** break down the execution into finer-grained steps:
-
-1. "Approach the apple"
-2. "Grasp the apple"
-3. "Lift the apple"
-4. "Move to basket"
-5. "Release the apple"
-
-Each frame in the dataset can be annotated with its corresponding subtask, enabling models to learn and predict these intermediate stages.
-
-
-
-
- Figure: Overview of subtask annotation.
-
-
-**Reference:** _Subtask-learning based for robot self-assembly in flexible collaborative assembly in manufacturing_, Original Article, Published: 19 April 2022.
-
-## Dataset Structure
-
-Subtask information is stored in the dataset metadata:
-
-```
-my-dataset/
-├── data/
-│ └── ...
-├── meta/
-│ ├── info.json
-│ ├── stats.json
-│ ├── tasks.parquet
-│ ├── subtasks.parquet # Subtask index → subtask string mapping
-│ └── episodes/
-│ └── ...
-└── videos/
- └── ...
-```
-
-### Subtasks Parquet File
-
-The `meta/subtasks.parquet` file maps subtask indices to their natural language descriptions:
-
-| subtask_index | subtask (index column) |
-| ------------- | ---------------------- |
-| 0 | "Approach the apple" |
-| 1 | "Grasp the apple" |
-| 2 | "Lift the apple" |
-| ... | ... |
-
-### Frame-Level Annotations
-
-Each frame in the dataset can include a `subtask_index` field that references the subtasks parquet file:
-
-```python
-# Example frame data in the parquet file
-{
- "index": 42,
- "timestamp": 1.4,
- "episode_index": 0,
- "task_index": 0,
- "subtask_index": 2, # References "Lift the apple"
- "observation.state": [...],
- "action": [...],
-}
-```
-
-## Annotating Datasets with Subtasks
-
-We provide a HuggingFace Space for easily annotating any LeRobotDataset with subtasks:
-
-**[https://huggingface.co/spaces/lerobot/annotate](https://huggingface.co/spaces/lerobot/annotate)**
-
-After completing your annotation:
-
-1. Click "Push to Hub" to upload your annotated dataset
-2. You can also run the annotation space locally by following the instructions at [github.com/huggingface/lerobot-annotate](https://github.com/huggingface/lerobot-annotate)
-
-## Loading Datasets with Subtasks
-
-When you load a dataset with subtask annotations, the subtask information is automatically available:
-
-```python
-from lerobot.datasets import LeRobotDataset
-
-# Load a dataset with subtask annotations
-dataset = LeRobotDataset("jadechoghari/collect-fruit-annotated")
-
-# Access a sample
-sample = dataset[100]
-
-# The sample includes both task and subtask information
-print(sample["task"]) # "Collect the fruit"
-print(sample["subtask"]) # "Grasp the apple"
-print(sample["task_index"]) # tensor(0)
-print(sample["subtask_index"]) # tensor(2)
-```
-
-### Checking for Subtask Support
-
-You can check if a dataset has subtask annotations:
-
-```python
-# Check if subtasks are available
-has_subtasks = (
- "subtask_index" in dataset.features
- and dataset.meta.subtasks is not None
-)
-
-if has_subtasks:
- print(f"Dataset has {len(dataset.meta.subtasks)} unique subtasks")
- print("Subtasks:", list(dataset.meta.subtasks.index))
-```
-
-## Using Subtasks for Training
-
-### With the Tokenizer Processor
-
-The `TokenizerProcessor` automatically handles subtask tokenization for Vision-Language Action (VLA) models:
-
-```python
-from lerobot.processor import TokenizerProcessorStep
-
-# Create a tokenizer processor step
-tokenizer_processor = TokenizerProcessorStep(
- tokenizer_name_or_path="google/paligemma-3b-pt-224",
- padding="max_length",
- max_length=64,
-)
-
-# The processor will automatically tokenize subtasks if present in the batch
-# and add them to the observation under:
-# - "observation.subtask.tokens"
-# - "observation.subtask.attention_mask"
-```
-
-When subtasks are available in the batch, the tokenizer processor adds:
-
-- `observation.subtask.tokens`: Tokenized subtask text
-- `observation.subtask.attention_mask`: Attention mask for the subtask tokens
-
-### DataLoader with Subtasks
-
-```python
-import torch
-from lerobot.datasets import LeRobotDataset
-
-dataset = LeRobotDataset("jadechoghari/collect-fruit-annotated")
-
-dataloader = torch.utils.data.DataLoader(
- dataset,
- batch_size=16,
- shuffle=True,
-)
-
-for batch in dataloader:
- # Access subtask information in the batch
- subtasks = batch["subtask"] # List of subtask strings
- subtask_indices = batch["subtask_index"] # Tensor of subtask indices
-
- # Use for training hierarchical policies or reward models
- print(f"Batch subtasks: {set(subtasks)}")
-```
-
-## Example Datasets with Subtask Annotations
-
-Try loading a dataset with subtask annotations:
-
-```python
-from lerobot.datasets import LeRobotDataset
-
-# Example dataset with subtask annotations
-dataset = LeRobotDataset("jadechoghari/collect-fruit-annotated")
-
-# Explore the subtasks
-print("Available subtasks:")
-for subtask_name in dataset.meta.subtasks.index:
- print(f" - {subtask_name}")
-
-# Get subtask distribution
-subtask_counts = {}
-for i in range(len(dataset)):
- sample = dataset[i]
- subtask = sample["subtask"]
- subtask_counts[subtask] = subtask_counts.get(subtask, 0) + 1
-
-print("\nSubtask distribution:")
-for subtask, count in sorted(subtask_counts.items(), key=lambda x: -x[1]):
- print(f" {subtask}: {count} frames")
-```
-
-## Use Cases
-
-### 1. Hierarchical Policy Training
-
-Train policies that predict both actions and current subtask:
-
-```python
-class HierarchicalPolicy(nn.Module):
- def __init__(self, num_subtasks):
- super().__init__()
- self.action_head = nn.Linear(hidden_dim, action_dim)
- self.subtask_head = nn.Linear(hidden_dim, num_subtasks)
-
- def forward(self, observations):
- features = self.encoder(observations)
- actions = self.action_head(features)
- subtask_logits = self.subtask_head(features)
- return actions, subtask_logits
-```
-
-### 2. Stage-Aware Reward Modeling (SARM)
-
-Build reward models that understand task progression:
-
-```python
-# SARM predicts:
-# - Stage: Which subtask is being executed (discrete)
-# - Progress: How far along the subtask (continuous 0-1)
-
-class SARMRewardModel(nn.Module):
- def forward(self, observations):
- features = self.encoder(observations)
- stage_logits = self.stage_classifier(features)
- progress = self.progress_regressor(features)
- return stage_logits, progress
-```
-
-### 3. Progress Visualization
-
-Monitor robot execution by tracking subtask progression:
-
-```python
-def visualize_execution(model, observations):
- for t, obs in enumerate(observations):
- action, subtask_logits = model(obs)
- predicted_subtask = subtask_names[subtask_logits.argmax()]
- print(f"t={t}: Executing '{predicted_subtask}'")
-```
-
-## API Reference
-
-### LeRobotDataset Properties
-
-| Property | Type | Description |
-| --------------------------- | ---------------------- | ------------------------------------------ |
-| `meta.subtasks` | `pd.DataFrame \| None` | DataFrame mapping subtask names to indices |
-| `features["subtask_index"]` | `dict` | Feature spec for subtask_index if present |
-
-### Sample Keys
-
-When subtasks are available, each sample includes:
-
-| Key | Type | Description |
-| --------------- | -------------- | ------------------------------------ |
-| `subtask_index` | `torch.Tensor` | Integer index of the current subtask |
-| `subtask` | `str` | Natural language subtask description |
-
-## Related Resources
-
-- [SARM Paper](https://arxiv.org/pdf/2509.25358) - Stage-Aware Reward Modeling for Long Horizon Robot Manipulation
-- [LeRobot Annotate Space](https://huggingface.co/spaces/lerobot/annotate) - Interactive annotation tool
-- [LeRobotDataset v3.0](./lerobot-dataset-v3) - Dataset format documentation
diff --git a/docs/source/language_and_recipes.mdx b/docs/source/language_and_recipes.mdx
new file mode 100644
index 000000000..4181dbe34
--- /dev/null
+++ b/docs/source/language_and_recipes.mdx
@@ -0,0 +1,147 @@
+# Language columns and recipes
+
+Most LeRobot datasets ship with a single `task` string per episode — fine for
+short, single-instruction skills, but not enough for the longer-horizon,
+multi-modal robot policies the field is moving toward (high-level planning,
+memory, interjections, VQA, tool use). To support those policies without
+forking the dataset format, LeRobot extends `LeRobotDataset` with two optional
+language columns and a small recipe layer that turns those rows into
+chat-style training samples on the fly.
+
+The design splits cleanly into three layers:
+
+1. **Data in the dataset** — language annotations stored next to frames in
+ `data/chunk-*/file-*.parquet` as two optional columns (`language_persistent`
+ and `language_events`). Datasets without these columns keep their existing
+ behavior.
+2. **Recipe** — a YAML file that declares which annotation rows to bind and
+ how to lay them out as chat turns (`role`, `content`, optional images,
+ optional tool calls). Recipes are pure config; no Python required to add a
+ new one.
+3. **Training format** — at sample time, `RenderMessagesStep` resolves the
+ recipe against the per-frame annotations and emits HF-style `messages` plus
+ LeRobot-specific sidecars (`message_streams`, `target_message_indices`)
+ that policy processors consume.
+
+This page describes each layer in turn.
+
+## Layer 1 — language columns in the dataset
+
+The two optional columns live next to frame data in
+`data/chunk-*/file-*.parquet`:
+
+- `language_persistent`: a list of rows broadcast across every frame in an episode for state that remains active, such as `subtask`, `plan`, and `memory`.
+- `language_events`: a list of rows only on the exact frame where an event was emitted, such as `interjection`, `vqa`, and speech tool calls.
+
+Both columns share the same row shape (event rows omit `timestamp` because the
+frame the row sits on already provides it):
+
+```text
+role: string
+content: string | null
+style: string | null
+timestamp: float32 # persistent rows only
+camera: string | null # observation.images.* feature key, view-dependent rows only
+tool_calls: list[Json] | null
+```
+
+The `camera` field tags rows whose `content` is grounded in a specific camera
+view. Rows of view-dependent styles (`vqa` and `trace`) MUST set `camera` to
+the matching `observation.images.*` feature key. Rows of every other style —
+including `motion`, which describes robot-frame primitives in joint / Cartesian
+terms — MUST leave `camera` as `null`. Pipeline writers and the validator
+enforce this via `validate_camera_field(style, camera)`.
+
+`meta/tasks.parquet` remains the canonical source for the task. The special `${task}` recipe binding always reads that task string and does not depend on language annotations.
+
+### Architecture
+
+The language stack itself has three internal modules backing layer 1:
+
+1. `lerobot.datasets.language` defines the schema, style registry, and `column_for_style`.
+2. `lerobot.datasets.language_render` resolves rows and renders messages.
+3. `RenderMessagesStep` turns dataset samples into `messages`, `message_streams`, and `target_message_indices`.
+
+`LeRobotDataset` stays recipe-agnostic. It passes `language_persistent` and `language_events` through when present, and unannotated datasets keep their existing behavior.
+
+## Layer 2 — recipe anatomy
+
+Recipes are YAML files backed by `TrainingRecipe` and `MessageTurn`. They
+declare which annotation rows to pull (via `bindings`) and how to compose them
+into chat turns (`messages`).
+
+```yaml
+messages:
+ - { role: user, content: "${task}", stream: high_level }
+ - { role: assistant, content: "${subtask}", stream: low_level, target: true }
+```
+
+A recipe can also branch into a weighted **blend** of sub-recipes. At sample
+time, exactly one branch is selected deterministically from the sample index,
+so different frames train different objectives (e.g. memory updates vs.
+low-level execution vs. VQA) without any Python wiring.
+
+### Temporal semantics
+
+Persistent styles are active after emission until replaced:
+
+- `active_at(t, style=subtask)`
+- `nth_prev(style=memory, offset=1)`
+- `nth_next(style=subtask, offset=1)`
+
+Event styles only exist on their exact timestamp:
+
+- `emitted_at(t, style=interjection)`
+- `emitted_at(t, style=vqa, role=user, camera=observation.images.top)`
+- `emitted_at(t, role=assistant, tool_name=say)`
+
+Exact event matching has no tolerance window, so writers must stamp event rows with frame timestamps from the parquet data.
+
+### View-dependent resolution
+
+For view-dependent styles (`vqa` and `trace`), the resolver gains a
+`camera=` filter parallel to `role=` and `tool_name=`. Datasets with multiple
+cameras typically emit one (`vqa`, `user`) + (`vqa`, `assistant`) pair per
+camera at the same timestamp; without `camera=`, those resolvers see two
+matches and raise an ambiguity error. Recipes consume each camera through its
+own binding plus a matching image block, e.g.
+
+```yaml
+ask_vqa_top:
+ bindings:
+ vqa_query: "emitted_at(t, style=vqa, role=user, camera=observation.images.top)"
+ vqa: "emitted_at(t, style=vqa, role=assistant, camera=observation.images.top)"
+ messages:
+ - role: user
+ stream: high_level
+ if_present: vqa_query
+ content:
+ - { type: image, feature: observation.images.top }
+ - { type: text, text: "${vqa_query}" }
+ - {
+ role: assistant,
+ content: "${vqa}",
+ stream: high_level,
+ target: true,
+ if_present: vqa,
+ }
+```
+
+Add one such sub-recipe per camera the dataset records.
+
+## Layer 3 — training format
+
+Rendered samples use HF-style chat messages plus LeRobot sidecars:
+
+```python
+sample["messages"]
+sample["message_streams"]
+sample["target_message_indices"]
+```
+
+The renderer does not apply a tokenizer chat template. Policy processors decide how to serialize the messages for their backbone, which keeps the same dataset usable across SmolVLA, Pi0.5, and any future VLM that expects OpenAI-style chat messages.
+
+## Graceful absence
+
+If both language columns are missing, `None`, or empty, `RenderMessagesStep` is a no-op.
+If an event-scoped branch is selected on a frame without the required event row, rendering returns `None`, allowing a loader to retry another sample.
diff --git a/docs/source/tools.mdx b/docs/source/tools.mdx
new file mode 100644
index 000000000..d88881184
--- /dev/null
+++ b/docs/source/tools.mdx
@@ -0,0 +1,210 @@
+# Tools
+
+LeRobot v3.1 supports **tool calls** in policies — assistant messages can
+emit structured invocations like `say(text="OK, starting now")` that the
+runtime dispatches to a real implementation (TTS, controller, logger, …).
+
+This page covers:
+
+1. Where the tool catalog lives.
+2. How the annotation pipeline produces tool-call atoms.
+3. How to add your own tool.
+
+## Where tools are declared
+
+Two layers.
+
+**The catalog** — a list of OpenAI-style function schemas — lives at
+`meta/info.json["tools"]` on each dataset. Example:
+
+```json
+{
+ "features": { "...": "..." },
+ "tools": [
+ {
+ "type": "function",
+ "function": {
+ "name": "say",
+ "description": "Speak a short utterance to the user via the TTS executor.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "text": {
+ "type": "string",
+ "description": "The verbatim text to speak."
+ }
+ },
+ "required": ["text"]
+ }
+ }
+ }
+ ]
+}
+```
+
+Read it via the dataset metadata accessor:
+
+```python
+from lerobot.datasets.dataset_metadata import LeRobotDatasetMetadata
+
+meta = LeRobotDatasetMetadata(repo_id="pepijn/super_poulain_final_annotations")
+tools = meta.tools # list[dict] — OpenAI tool schemas
+```
+
+If the dataset's `info.json` doesn't declare any tools, `meta.tools`
+returns `DEFAULT_TOOLS` from `lerobot.datasets.language` — currently a
+single-entry list with the canonical `say` schema. So unannotated
+datasets and chat-template consumers keep working without any
+configuration:
+
+```python
+prompt_str = tokenizer.apply_chat_template(
+ sample["messages"],
+ tools=meta.tools, # works either way
+ add_generation_prompt=False,
+ tokenize=False,
+)
+```
+
+**The implementations** — runnable Python — will live under
+`src/lerobot/tools/`, one file per tool. The runtime dispatcher and
+the canonical `say` implementation (wrapping Kyutai's pocket-tts) are
+not part of the catalog layer described here; today this layer ships
+only the schema storage and the `DEFAULT_TOOLS` fallback constant.
+
+## Per-row tool _invocations_
+
+The catalog above describes _what can be called_. The actual _call_ — the
+function name plus the argument values — is stored per-row, on the
+assistant atoms in `language_events`:
+
+```python
+{
+ "role": "assistant",
+ "content": null,
+ "style": null,
+ "timestamp": 12.4,
+ "camera": null,
+ "tool_calls": [
+ { "type": "function",
+ "function": { "name": "say", "arguments": { "text": "On it." } } }
+ ]
+}
+```
+
+Recipes splice these into rendered messages via `tool_calls_from`:
+
+```yaml
+user_interjection_response:
+ bindings:
+ speech: "emitted_at(t, role=assistant, tool_name=say)"
+ messages:
+ - { role: user, content: "${task}", stream: high_level }
+ - {
+ role: assistant,
+ content: "${current_plan}",
+ stream: high_level,
+ target: true,
+ tool_calls_from: speech,
+ }
+```
+
+The model's training target is one assistant turn that carries both the
+plan text _and_ the `say` tool call. At inference, the runtime parses
+the generated text back into structured `tool_calls` and dispatches to
+the matching implementation.
+
+## How to add your own tool
+
+> **Note:** Steps 2 and 3 below describe the runtime layer
+> (`src/lerobot/tools/`, the `Tool` protocol, `TOOL_REGISTRY`,
+> `get_tools(meta)`) which is not part of the catalog layer shipped
+> today — those modules don't yet exist in the tree. Step 1 alone is
+> enough to make the tool visible to the chat template via
+> `meta.tools` so the model can learn to _generate_ the call;
+> executing the call at inference requires the runtime layer.
+
+Three steps. Concrete example: a `record_observation` tool the policy
+can call to capture an extra observation outside the regular control
+loop.
+
+### Step 1 — declare the schema
+
+Add an entry under `meta/info.json["tools"]`. Either edit the file
+directly on disk _before_ running the annotation pipeline (it'll be
+preserved) or hand it to `lerobot-annotate` via a config flag.
+
+```json
+{
+ "tools": [
+ { "type": "function", "function": { "name": "say", "...": "..." } },
+ {
+ "type": "function",
+ "function": {
+ "name": "record_observation",
+ "description": "Capture a high-resolution still image for the user.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "label": {
+ "type": "string",
+ "description": "Short label for the saved image."
+ }
+ },
+ "required": ["label"]
+ }
+ }
+ }
+ ]
+}
+```
+
+The schema follows OpenAI's function-calling convention exactly, so the
+chat template can render it natively.
+
+### Step 2 — implement the call
+
+Create `src/lerobot/tools/record_observation.py`:
+
+```python
+from .base import Tool
+from typing import Any
+
+RECORD_OBSERVATION_SCHEMA: dict[str, Any] = { "...": "..." } # mirrors the JSON above
+
+
+class RecordObservationTool:
+ name = "record_observation"
+ schema = RECORD_OBSERVATION_SCHEMA
+
+ def __init__(self, schema: dict | None = None, output_dir: str = "."):
+ self.output_dir = output_dir
+
+ def call(self, arguments: dict) -> str:
+ label = arguments["label"]
+ # ... save the latest camera frame to /.png ...
+ return f"saved {label}.png"
+```
+
+One file per tool keeps dependencies isolated — `record_observation`
+might pull `pillow`, while `say` pulls `pocket-tts`. Users installing
+only the tools they need avoid heavy transitive deps.
+
+### Step 3 — register it
+
+Add to `src/lerobot/tools/registry.py`:
+
+```python
+from .record_observation import RecordObservationTool
+
+TOOL_REGISTRY["record_observation"] = RecordObservationTool
+```
+
+That's it. At runtime `get_tools(meta)` looks up each schema in
+`meta.tools`, instantiates the matching registered class, and returns
+a name → instance dict the dispatcher can route into.
+
+If you want to use a tool _without_ writing an implementation (e.g. for
+training-time chat-template formatting only), step 1 alone is enough —
+the model still learns to _generate_ the call. Steps 2 and 3 are only
+needed to actually _execute_ it at inference.
diff --git a/pyproject.toml b/pyproject.toml
index 93953cd57..ca6248c95 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -95,7 +95,7 @@ dependencies = [
# ── Feature-scoped extras ──────────────────────────────────
dataset = [
- "datasets>=4.0.0,<5.0.0",
+ "datasets>=4.7.0,<5.0.0",
"pandas>=2.0.0,<3.0.0", # NOTE: Transitive dependency of datasets
"pyarrow>=21.0.0,<30.0.0", # NOTE: Transitive dependency of datasets
"lerobot[av-dep]",
diff --git a/src/lerobot/configs/__init__.py b/src/lerobot/configs/__init__.py
index c3fe246cd..be4491811 100644
--- a/src/lerobot/configs/__init__.py
+++ b/src/lerobot/configs/__init__.py
@@ -24,6 +24,7 @@ Import them directly: ``from lerobot.configs.train import TrainPipelineConfig``
from .dataset import DatasetRecordConfig
from .default import DatasetConfig, EvalConfig, PeftConfig, WandBConfig
from .policies import PreTrainedConfig
+from .recipe import MessageTurn, TrainingRecipe, load_recipe
from .types import (
FeatureType,
NormalizationMode,
@@ -49,9 +50,12 @@ __all__ = [
"DatasetRecordConfig",
"DatasetConfig",
"EvalConfig",
+ "MessageTurn",
"PeftConfig",
"PreTrainedConfig",
+ "TrainingRecipe",
"WandBConfig",
+ "load_recipe",
"VideoEncoderConfig",
# Defaults
"camera_encoder_defaults",
diff --git a/src/lerobot/configs/recipe.py b/src/lerobot/configs/recipe.py
new file mode 100644
index 000000000..28e5a0db3
--- /dev/null
+++ b/src/lerobot/configs/recipe.py
@@ -0,0 +1,206 @@
+#!/usr/bin/env python
+
+# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+
+import re
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any, Literal, get_args
+
+MessageRole = Literal["user", "assistant", "system", "tool"]
+MessageStream = Literal["high_level", "low_level"]
+
+DEFAULT_BINDINGS = {
+ "subtask": "active_at(t, style=subtask)",
+ "memory": "active_at(t, style=memory)",
+ "plan": "active_at(t, style=plan)",
+ "speech": "emitted_at(t, role=assistant, tool_name=say)",
+ "interjection": "emitted_at(t, style=interjection)",
+ "vqa": "emitted_at(t, style=vqa, role=assistant)",
+ "vqa_query": "emitted_at(t, style=vqa, role=user)",
+}
+
+PLACEHOLDER_RE = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}")
+"""``${name}`` placeholder pattern used by both recipe binding-reference
+discovery (here) and rendered-message substitution (in ``language_render``)."""
+
+_VALID_ROLES = frozenset(get_args(MessageRole))
+_VALID_STREAMS = frozenset(get_args(MessageStream))
+
+
+@dataclass
+class MessageTurn:
+ """A single chat-style turn in a recipe template.
+
+ ``content`` may be a plain string, a list of HF-style multimodal blocks, or
+ ``None`` when ``tool_calls_from`` supplies tool-call payloads instead.
+ ``stream`` tags the turn for downstream filtering, ``target`` flags it as a
+ training target, and ``if_present`` skips the turn when the named binding
+ resolves to ``None``.
+ """
+
+ role: MessageRole
+ content: str | list[dict[str, Any]] | None = None
+ stream: MessageStream | None = None
+ target: bool = False
+ if_present: str | None = None
+ tool_calls_from: str | None = None
+
+ def __post_init__(self) -> None:
+ """Validate role, stream, and content after dataclass construction."""
+ if self.role not in _VALID_ROLES:
+ raise ValueError(f"Unsupported message role: {self.role!r}")
+ # ``stream`` is typed Optional only so the dataclass can keep its
+ # field ordering, but recipes must always tag every turn with a
+ # stream — the renderer's ``_validate_rendered`` would reject
+ # ``None`` later on. Fail at construction so the bad recipe is
+ # caught at YAML load time rather than at the first sample.
+ if self.stream is None:
+ raise ValueError(
+ f"MessageTurn(role={self.role!r}) is missing a stream — "
+ f"every turn must declare one of {sorted(_VALID_STREAMS)}."
+ )
+ if self.stream not in _VALID_STREAMS:
+ raise ValueError(f"Unsupported message stream: {self.stream!r}")
+ if self.content is None and self.tool_calls_from is None:
+ raise ValueError("MessageTurn.content is required unless tool_calls_from is set.")
+ if self.content is not None and not isinstance(self.content, (str, list)):
+ raise TypeError("MessageTurn.content must be a string, a list of HF-style blocks, or None.")
+ if isinstance(self.content, list):
+ for block in self.content:
+ if not isinstance(block, dict) or "type" not in block:
+ raise ValueError(
+ "Multimodal content blocks must be HF-style dictionaries with a type key."
+ )
+
+ @classmethod
+ def from_dict(cls, data: dict[str, Any]) -> MessageTurn:
+ """Construct a :class:`MessageTurn` from a plain dictionary."""
+ return cls(**data)
+
+
+@dataclass
+class TrainingRecipe:
+ """A recipe describing how to render training samples from language rows.
+
+ A recipe is either a *message recipe* (``messages`` plus optional
+ ``bindings``) or a *blend recipe* (``blend`` mapping names to weighted
+ sub-recipes). ``weight`` is only meaningful inside a blend.
+ """
+
+ messages: list[MessageTurn] | None = None
+ bindings: dict[str, str] | None = None
+ blend: dict[str, TrainingRecipe] | None = None
+ weight: float | None = None
+
+ def __post_init__(self) -> None:
+ """Validate that exactly one of ``messages`` or ``blend`` is set."""
+ if self.messages is not None and self.blend is not None:
+ raise ValueError("TrainingRecipe must set only one of messages or blend.")
+ if self.messages is None and self.blend is None:
+ raise ValueError("TrainingRecipe must set one of messages or blend.")
+
+ if self.messages is not None:
+ self._validate_message_recipe()
+ if self.blend is not None:
+ self._validate_blend_recipe()
+
+ @classmethod
+ def from_dict(cls, data: dict[str, Any]) -> TrainingRecipe:
+ """Construct a :class:`TrainingRecipe` from a nested dictionary."""
+ data = dict(data)
+ if data.get("messages") is not None:
+ data["messages"] = [
+ turn if isinstance(turn, MessageTurn) else MessageTurn.from_dict(turn)
+ for turn in data["messages"]
+ ]
+ if data.get("blend") is not None:
+ data["blend"] = {
+ name: recipe if isinstance(recipe, TrainingRecipe) else cls.from_dict(recipe)
+ for name, recipe in data["blend"].items()
+ }
+ return cls(**data)
+
+ @classmethod
+ def from_yaml(cls, path: str | Path) -> TrainingRecipe:
+ """Load a :class:`TrainingRecipe` from a YAML file at ``path``."""
+ import yaml # type: ignore[import-untyped]
+
+ with open(path) as f:
+ data = yaml.safe_load(f)
+ if not isinstance(data, dict):
+ raise ValueError(f"Recipe YAML must contain a mapping at the top level: {path}")
+ return cls.from_dict(data)
+
+ def _validate_message_recipe(self) -> None:
+ """Ensure every templated binding is known and at least one turn is a target."""
+ assert self.messages is not None
+ known_bindings = set(DEFAULT_BINDINGS) | set(self.bindings or {}) | {"task"}
+
+ for turn in self.messages:
+ missing = self._referenced_bindings(turn) - known_bindings
+ if missing:
+ raise ValueError(f"MessageTurn references unknown binding(s): {sorted(missing)}")
+
+ if not any(turn.target for turn in self.messages):
+ raise ValueError("Message recipes must contain at least one target turn.")
+
+ def _validate_blend_recipe(self) -> None:
+ """Ensure each blend component is a non-empty, weighted message recipe."""
+ assert self.blend is not None
+ if not self.blend:
+ raise ValueError("Blend recipes must contain at least one component.")
+
+ for name, recipe in self.blend.items():
+ if recipe.blend is not None:
+ raise ValueError(f"Blend component {name!r} cannot itself define a blend.")
+ if recipe.messages is None:
+ raise ValueError(f"Blend component {name!r} must define messages.")
+ if recipe.weight is None:
+ raise ValueError(f"Blend component {name!r} must define weight.")
+ if recipe.weight <= 0:
+ raise ValueError(f"Blend component {name!r} must have a positive weight.")
+
+ def _referenced_bindings(self, turn: MessageTurn) -> set[str]:
+ """Return the binding names that ``turn`` references via placeholders or attributes."""
+ names: set[str] = set()
+ if turn.if_present is not None:
+ names.add(turn.if_present)
+ if turn.tool_calls_from is not None:
+ names.add(turn.tool_calls_from)
+ names.update(_placeholders_in_content(turn.content))
+ return names
+
+
+def _placeholders_in_content(content: str | list[dict[str, Any]] | None) -> set[str]:
+ """Return the set of ``${name}`` placeholders found anywhere in ``content``."""
+ if content is None:
+ return set()
+ if isinstance(content, str):
+ return set(PLACEHOLDER_RE.findall(content))
+
+ names: set[str] = set()
+ for block in content:
+ for value in block.values():
+ if isinstance(value, str):
+ names.update(PLACEHOLDER_RE.findall(value))
+ return names
+
+
+def load_recipe(path: str | Path) -> TrainingRecipe:
+ """Load a :class:`TrainingRecipe` from a YAML file at ``path``."""
+ return TrainingRecipe.from_yaml(path)
diff --git a/src/lerobot/datasets/__init__.py b/src/lerobot/datasets/__init__.py
index b51ef0222..e4e3ccdf6 100644
--- a/src/lerobot/datasets/__init__.py
+++ b/src/lerobot/datasets/__init__.py
@@ -37,6 +37,14 @@ from .dataset_tools import (
from .factory import make_dataset, resolve_delta_timestamps
from .image_writer import safe_stop_image_writer
from .io_utils import load_episodes, write_stats
+from .language import (
+ EVENT_ONLY_STYLES,
+ LANGUAGE_EVENTS,
+ LANGUAGE_PERSISTENT,
+ PERSISTENT_STYLES,
+ STYLE_REGISTRY,
+ column_for_style,
+)
from .lerobot_dataset import LeRobotDataset
from .multi_dataset import MultiLeRobotDataset
from .pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
@@ -54,10 +62,15 @@ __all__ = [
"CODEBASE_VERSION",
"DEFAULT_EPISODES_PATH",
"DEFAULT_QUANTILES",
+ "EVENT_ONLY_STYLES",
"EpisodeAwareSampler",
+ "LANGUAGE_EVENTS",
+ "LANGUAGE_PERSISTENT",
"LeRobotDataset",
"LeRobotDatasetMetadata",
"MultiLeRobotDataset",
+ "PERSISTENT_STYLES",
+ "STYLE_REGISTRY",
"StreamingLeRobotDataset",
"VideoEncodingManager",
"check_video_encoder_parameters_pyav",
@@ -69,6 +82,7 @@ __all__ = [
"convert_image_to_video_dataset",
"create_initial_features",
"create_lerobot_dataset_card",
+ "column_for_style",
"delete_episodes",
"get_feature_stats",
"load_episodes",
diff --git a/src/lerobot/datasets/compute_stats.py b/src/lerobot/datasets/compute_stats.py
index f489c84a7..438ac7fba 100644
--- a/src/lerobot/datasets/compute_stats.py
+++ b/src/lerobot/datasets/compute_stats.py
@@ -512,7 +512,7 @@ def compute_episode_stats(
ep_stats = {}
for key, data in episode_data.items():
- if features[key]["dtype"] == "string":
+ if features[key]["dtype"] in {"string", "language"}:
continue
if features[key]["dtype"] in ["image", "video"]:
diff --git a/src/lerobot/datasets/dataset_metadata.py b/src/lerobot/datasets/dataset_metadata.py
index 3c58774c3..39a1b6d2b 100644
--- a/src/lerobot/datasets/dataset_metadata.py
+++ b/src/lerobot/datasets/dataset_metadata.py
@@ -36,12 +36,12 @@ from .io_utils import (
load_episodes,
load_info,
load_stats,
- load_subtasks,
load_tasks,
write_info,
write_stats,
write_tasks,
)
+from .language import DEFAULT_TOOLS, LANGUAGE_COLUMNS
from .utils import (
DEFAULT_EPISODES_PATH,
check_version_compatibility,
@@ -177,7 +177,6 @@ class LeRobotDatasetMetadata:
self.info = load_info(self.root)
check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION)
self.tasks = load_tasks(self.root)
- self.subtasks = load_subtasks(self.root)
self.episodes = load_episodes(self.root)
self.stats = load_stats(self.root)
@@ -343,6 +342,49 @@ class LeRobotDatasetMetadata:
"""Keys to access visual modalities (regardless of their storage method)."""
return [key for key, ft in self.features.items() if ft["dtype"] in ["video", "image"]]
+ @property
+ def has_language_columns(self) -> bool:
+ """Return ``True`` if the dataset declares any language column.
+
+ Used to gate language-aware code paths (collate, render step) so
+ unannotated datasets keep PyTorch's default collate behavior.
+ """
+ return any(col in self.features for col in LANGUAGE_COLUMNS)
+
+ @property
+ def tools(self) -> list[dict]:
+ """OpenAI-style tool schemas declared by this dataset.
+
+ Read from ``meta/info.json["tools"]``. Returns a copy, so callers
+ can mutate the result safely. Falls back to
+ :data:`lerobot.datasets.language.DEFAULT_TOOLS` (the canonical
+ ``say`` schema) when the dataset doesn't declare any — that way
+ unannotated datasets and chat-template consumers
+ (``apply_chat_template(messages, tools=meta.tools)``) keep
+ working out of the box.
+
+ Implementations live under :mod:`lerobot.tools` (one file per
+ tool); see ``docs/source/tools.mdx`` for the authoring guide.
+ """
+ declared = self.info.tools
+ if declared:
+ return [dict(t) for t in declared]
+ return [dict(t) for t in DEFAULT_TOOLS]
+
+ @tools.setter
+ def tools(self, value: list[dict] | None) -> None:
+ """Persist a tool catalog to ``meta/info.json`` and reload metadata.
+
+ Writes ``value`` into the on-disk ``info.json`` (or clears the
+ ``tools`` key when ``value`` is ``None`` or empty), then reloads
+ ``self.info`` so the in-memory metadata matches what's on disk.
+ Saves callers from hand-editing ``info.json`` and re-instantiating
+ the metadata object.
+ """
+ self.info.tools = [dict(t) for t in value] if value else None
+ write_info(self.info, self.root)
+ self.info = load_info(self.root)
+
@property
def names(self) -> dict[str, list | dict]:
"""Names of the various dimensions of vector modalities."""
@@ -671,7 +713,6 @@ class LeRobotDatasetMetadata:
_validate_feature_names(features)
obj.tasks = None
- obj.subtasks = None
obj.episodes = None
obj.stats = None
obj.info = create_empty_dataset_info(
diff --git a/src/lerobot/datasets/dataset_reader.py b/src/lerobot/datasets/dataset_reader.py
index bd1298590..59aaa40e5 100644
--- a/src/lerobot/datasets/dataset_reader.py
+++ b/src/lerobot/datasets/dataset_reader.py
@@ -295,9 +295,4 @@ class DatasetReader:
task_idx = item["task_index"].item()
item["task"] = self._meta.tasks.iloc[task_idx].name
- # add subtask information if available
- if "subtask_index" in self._meta.features and self._meta.subtasks is not None:
- subtask_idx = item["subtask_index"].item()
- item["subtask"] = self._meta.subtasks.iloc[subtask_idx].name
-
return item
diff --git a/src/lerobot/datasets/feature_utils.py b/src/lerobot/datasets/feature_utils.py
index d5a550a4c..56264408f 100644
--- a/src/lerobot/datasets/feature_utils.py
+++ b/src/lerobot/datasets/feature_utils.py
@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
from pprint import pformat
import datasets
@@ -23,6 +24,12 @@ from lerobot.configs import VIDEO_ENCODER_INFO_KEYS
from lerobot.utils.constants import DEFAULT_FEATURES
from lerobot.utils.utils import is_valid_numpy_dtype_string
+from .language import (
+ LANGUAGE_PERSISTENT,
+ is_language_column,
+ language_events_column_feature,
+ language_persistent_column_feature,
+)
from .utils import (
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
@@ -47,7 +54,13 @@ def get_hf_features_from_features(features: dict) -> datasets.Features:
"""
hf_features = {}
for key, ft in features.items():
- if ft["dtype"] == "video":
+ if is_language_column(key):
+ hf_features[key] = (
+ language_persistent_column_feature()
+ if key == LANGUAGE_PERSISTENT
+ else language_events_column_feature()
+ )
+ elif ft["dtype"] == "video":
continue
elif ft["dtype"] == "image":
hf_features[key] = datasets.Image()
@@ -278,6 +291,8 @@ def validate_feature_dtype_and_shape(
return validate_feature_image_or_video(name, expected_shape, value)
elif expected_dtype == "string":
return validate_feature_string(name, value)
+ elif expected_dtype == "language":
+ return validate_feature_language(name, value)
else:
raise NotImplementedError(f"The feature dtype '{expected_dtype}' is not implemented yet.")
@@ -357,6 +372,30 @@ def validate_feature_string(name: str, value: str) -> str:
return ""
+def validate_feature_language(name: str, value) -> str:
+ """Validate a feature that is expected to hold language annotations.
+
+ Language columns (``language_persistent`` / ``language_events``) are
+ populated after recording by the annotation pipeline, not at record time.
+ Any value supplied here is dropped before the frame is written, so a
+ non-empty value almost certainly signals a mistake. We warn rather than
+ fail to keep recording resilient.
+
+ Args:
+ name (str): The name of the feature.
+ value: The value to validate.
+
+ Returns:
+ str: Always an empty string — language values are non-fatal.
+ """
+ if value is not None:
+ logging.warning(
+ f"The feature '{name}' is a 'language' column populated by the annotation pipeline, "
+ f"not at record time. The provided value will be dropped."
+ )
+ return ""
+
+
def validate_episode_buffer(episode_buffer: dict, total_episodes: int, features: dict) -> None:
"""Validate the episode buffer before it's written to disk.
diff --git a/src/lerobot/datasets/io_utils.py b/src/lerobot/datasets/io_utils.py
index f5681c7c0..a41f34704 100644
--- a/src/lerobot/datasets/io_utils.py
+++ b/src/lerobot/datasets/io_utils.py
@@ -31,10 +31,10 @@ from torchvision import transforms
from lerobot.utils.io_utils import load_json, write_json
from lerobot.utils.utils import SuppressProgressBars, flatten_dict, unflatten_dict
+from .language import LANGUAGE_COLUMNS
from .utils import (
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_EPISODES_PATH,
- DEFAULT_SUBTASKS_PATH,
DEFAULT_TASKS_PATH,
EPISODES_DIR,
INFO_PATH,
@@ -186,14 +186,6 @@ def load_tasks(local_dir: Path) -> pandas.DataFrame:
return tasks
-def load_subtasks(local_dir: Path) -> pandas.DataFrame | None:
- """Load subtasks from subtasks.parquet if it exists."""
- subtasks_path = local_dir / DEFAULT_SUBTASKS_PATH
- if subtasks_path.exists():
- return pd.read_parquet(subtasks_path)
- return None
-
-
def write_episodes(episodes: Dataset, local_dir: Path) -> None:
"""Write episode metadata to a parquet file in the LeRobot v3.0 format.
This function writes episode-level metadata to a single parquet file.
@@ -265,11 +257,13 @@ def hf_transform_to_torch(items_dict: dict[str, list[Any]]) -> dict[str, list[to
dict: The batch with items converted to torch tensors.
"""
for key in items_dict:
+ if key in LANGUAGE_COLUMNS:
+ continue
first_item = items_dict[key][0]
if isinstance(first_item, PILImage.Image):
to_tensor = transforms.ToTensor()
items_dict[key] = [to_tensor(img) for img in items_dict[key]]
- elif first_item is None:
+ elif first_item is None or isinstance(first_item, dict):
pass
else:
items_dict[key] = [x if isinstance(x, str) else torch.tensor(x) for x in items_dict[key]]
@@ -304,8 +298,9 @@ def item_to_torch(item: dict) -> dict:
Returns:
dict: Dictionary with all tensor-like items converted to torch.Tensor.
"""
+ skip_keys = {"task", *LANGUAGE_COLUMNS}
for key, val in item.items():
- if isinstance(val, (np.ndarray | list)) and key not in ["task"]:
+ if isinstance(val, (np.ndarray | list)) and key not in skip_keys:
# Convert numpy arrays and lists to torch tensors
item[key] = torch.tensor(val)
return item
diff --git a/src/lerobot/datasets/language.py b/src/lerobot/datasets/language.py
new file mode 100644
index 000000000..124c25221
--- /dev/null
+++ b/src/lerobot/datasets/language.py
@@ -0,0 +1,242 @@
+#!/usr/bin/env python
+
+# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+
+from typing import Literal
+
+import datasets
+import pyarrow as pa
+
+LANGUAGE_PERSISTENT = "language_persistent"
+LANGUAGE_EVENTS = "language_events"
+LANGUAGE_COLUMNS = (LANGUAGE_PERSISTENT, LANGUAGE_EVENTS)
+PERSISTENT_ROW_FIELDS = ("role", "content", "style", "timestamp", "camera", "tool_calls")
+EVENT_ROW_FIELDS = ("role", "content", "style", "camera", "tool_calls")
+
+CORE_STYLES = {
+ "subtask",
+ "plan",
+ "memory",
+ "motion",
+ "interjection",
+ "vqa",
+ "trace",
+ "task_aug",
+}
+# Project-local styles can be registered at import time by appending to
+# ``EXTENDED_STYLES`` before ``column_for_style`` is called. Anything added
+# here is treated as a known style alongside ``CORE_STYLES`` for resolver
+# validation. Empty by default — populate from a downstream module that
+# also extends ``PERSISTENT_STYLES`` or ``EVENT_ONLY_STYLES`` to declare
+# the new style's column.
+EXTENDED_STYLES: set[str] = set()
+STYLE_REGISTRY = CORE_STYLES | EXTENDED_STYLES
+
+PERSISTENT_STYLES = {"subtask", "plan", "memory", "motion", "task_aug"}
+EVENT_ONLY_STYLES = {"interjection", "vqa", "trace"}
+
+# Styles whose ``content`` is grounded in a specific camera view. Rows of these
+# styles MUST carry a non-null ``camera`` referencing an ``observation.images.*``
+# feature key. Rows of every other style MUST have ``camera=None``. ``motion``
+# is intentionally NOT in this set: motion primitives are described in
+# robot-frame (joint / Cartesian) terms, not pixel space, so they are
+# camera-agnostic. ``trace`` is the pixel-trajectory event style and IS
+# view-dependent. The ``camera`` field nevertheless lives on
+# ``PERSISTENT_ROW_FIELDS`` too so the schema, validator, and resolver
+# behave symmetrically across the two columns; persistent rows simply
+# always have ``camera=None`` in practice today.
+VIEW_DEPENDENT_STYLES = {"vqa", "trace"}
+
+LanguageColumn = Literal["language_persistent", "language_events"]
+
+
+def _json_arrow_type() -> pa.DataType:
+ """Return the Arrow JSON type, falling back to ``string`` on older pyarrow."""
+ return pa.json_() if hasattr(pa, "json_") else pa.string()
+
+
+def _json_feature() -> object:
+ """Return the HF ``datasets`` JSON feature, falling back to a string value."""
+ return datasets.Json() if hasattr(datasets, "Json") else datasets.Value("string")
+
+
+def language_persistent_row_arrow_type() -> pa.StructType:
+ """Return the Arrow struct type for a single persistent language row.
+
+ Persistent rows carry their own ``timestamp`` because they represent a state
+ that became active at a specific moment and remains active until superseded.
+ ``timestamp`` is ``float32`` to match the timestamp dtype LeRobotDataset
+ uses for frame data.
+ """
+ return pa.struct(
+ [
+ pa.field("role", pa.string(), nullable=False),
+ pa.field("content", pa.string(), nullable=True),
+ pa.field("style", pa.string(), nullable=True),
+ pa.field("timestamp", pa.float32(), nullable=False),
+ pa.field("camera", pa.string(), nullable=True),
+ pa.field("tool_calls", pa.list_(_json_arrow_type()), nullable=True),
+ ]
+ )
+
+
+def language_event_row_arrow_type() -> pa.StructType:
+ """Return the Arrow struct type for a single event language row.
+
+ Event rows have no ``timestamp`` field: each event is stored on the dataset
+ row whose frame timestamp is the event's firing time.
+ """
+ return pa.struct(
+ [
+ pa.field("role", pa.string(), nullable=False),
+ pa.field("content", pa.string(), nullable=True),
+ pa.field("style", pa.string(), nullable=True),
+ pa.field("camera", pa.string(), nullable=True),
+ pa.field("tool_calls", pa.list_(_json_arrow_type()), nullable=True),
+ ]
+ )
+
+
+def language_persistent_arrow_type() -> pa.ListType:
+ """Return the Arrow list type for the ``language_persistent`` column."""
+ return pa.list_(language_persistent_row_arrow_type())
+
+
+def language_events_arrow_type() -> pa.ListType:
+ """Return the Arrow list type for the ``language_events`` column."""
+ return pa.list_(language_event_row_arrow_type())
+
+
+def language_persistent_row_feature() -> dict[str, object]:
+ """Return the HF ``datasets`` feature mapping for a persistent language row."""
+ return {
+ "role": datasets.Value("string"),
+ "content": datasets.Value("string"),
+ "style": datasets.Value("string"),
+ "timestamp": datasets.Value("float32"),
+ "camera": datasets.Value("string"),
+ "tool_calls": datasets.List(_json_feature()),
+ }
+
+
+def language_event_row_feature() -> dict[str, object]:
+ """Return the HF ``datasets`` feature mapping for an event language row."""
+ return {
+ "role": datasets.Value("string"),
+ "content": datasets.Value("string"),
+ "style": datasets.Value("string"),
+ "camera": datasets.Value("string"),
+ "tool_calls": datasets.List(_json_feature()),
+ }
+
+
+def language_persistent_column_feature() -> datasets.List:
+ """Return the HF ``datasets`` feature for the ``language_persistent`` column."""
+ return datasets.List(language_persistent_row_feature())
+
+
+def language_events_column_feature() -> datasets.List:
+ """Return the HF ``datasets`` feature for the ``language_events`` column."""
+ return datasets.List(language_event_row_feature())
+
+
+def language_feature_info() -> dict[str, dict]:
+ """Return the ``info["features"]`` entries for both language columns."""
+ return {
+ LANGUAGE_PERSISTENT: {"dtype": "language", "shape": (1,), "names": None},
+ LANGUAGE_EVENTS: {"dtype": "language", "shape": (1,), "names": None},
+ }
+
+
+def is_language_column(key: str) -> bool:
+ """Return ``True`` if ``key`` is one of the dataset's language column names."""
+ return key in LANGUAGE_COLUMNS
+
+
+def is_view_dependent_style(style: str | None) -> bool:
+ """Return ``True`` if rows of ``style`` must be tagged with a ``camera`` key."""
+ return style in VIEW_DEPENDENT_STYLES
+
+
+def validate_camera_field(style: str | None, camera: str | None) -> None:
+ """Enforce the ``camera`` invariant: required iff ``style`` is view-dependent.
+
+ Raises ``ValueError`` if a view-dependent style is missing ``camera`` or if
+ a non-view-dependent style carries one. Pipeline writers and the validator
+ should call this on every emitted row.
+ """
+ if is_view_dependent_style(style):
+ if not camera:
+ raise ValueError(
+ f"Rows of view-dependent style {style!r} require a non-empty 'camera' "
+ f"field referencing an 'observation.images.*' feature key."
+ )
+ elif camera is not None:
+ raise ValueError(f"Rows of style {style!r} must have camera=None; got camera={camera!r}.")
+
+
+# --- Tool registry --------------------------------------------------------
+# Tools declared on a dataset live in ``meta/info.json["tools"]`` as a list
+# of OpenAI-style function schemas. The runtime / training stack reads them
+# through :class:`LeRobotDatasetMetadata.tools` (with these constants as
+# fallback when the dataset doesn't declare any). Implementations live
+# under :mod:`lerobot.tools` (one file per tool); see
+# ``docs/source/tools.mdx`` for the authoring guide.
+
+SAY_TOOL_SCHEMA: dict = {
+ "type": "function",
+ "function": {
+ "name": "say",
+ "description": "Speak a short utterance to the user via the TTS executor.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "text": {
+ "type": "string",
+ "description": "The verbatim text to speak.",
+ }
+ },
+ "required": ["text"],
+ },
+ },
+}
+"""Canonical schema for the ``say`` tool emitted by the steerable
+annotation pipeline (PR 2 Module 2). Single source of truth — PR 2's
+writer, PR 3's runtime tool registry, and the dataset visualizer all
+import this constant rather than duplicating the dict."""
+
+DEFAULT_TOOLS: list[dict] = [SAY_TOOL_SCHEMA]
+"""Fallback tools list. Returned by ``LeRobotDatasetMetadata.tools``
+when ``meta/info.json["tools"]`` is unset, so unannotated datasets and
+chat-template consumers (``apply_chat_template(messages, tools=...)``)
+keep working out of the box."""
+
+
+def column_for_style(style: str | None) -> LanguageColumn:
+ """Map a language style to the column where rows of that style are stored.
+
+ Styles in :data:`PERSISTENT_STYLES` route to :data:`LANGUAGE_PERSISTENT`.
+ Styles in :data:`EVENT_ONLY_STYLES` and the implicit ``None`` style route
+ to :data:`LANGUAGE_EVENTS`.
+ """
+ if style is None:
+ return LANGUAGE_EVENTS
+ if style in PERSISTENT_STYLES:
+ return LANGUAGE_PERSISTENT
+ if style in EVENT_ONLY_STYLES:
+ return LANGUAGE_EVENTS
+ raise ValueError(f"Unknown language style: {style!r}")
diff --git a/src/lerobot/datasets/language_render.py b/src/lerobot/datasets/language_render.py
new file mode 100644
index 000000000..999fa19ad
--- /dev/null
+++ b/src/lerobot/datasets/language_render.py
@@ -0,0 +1,545 @@
+#!/usr/bin/env python
+
+# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+
+import copy
+import hashlib
+import re
+from collections.abc import Sequence
+from typing import Any
+
+from lerobot.configs.recipe import DEFAULT_BINDINGS, PLACEHOLDER_RE, TrainingRecipe
+from lerobot.utils.utils import unwrap_scalar
+
+from .language import LANGUAGE_PERSISTENT, column_for_style
+
+LanguageRow = dict[str, Any]
+RenderedMessages = dict[str, list[Any]]
+
+_RESOLVER_RE = re.compile(r"^(?P[A-Za-z_][A-Za-z0-9_]*)\((?P.*)\)$")
+
+
+def active_at(
+ t: float,
+ *,
+ persistent: Sequence[LanguageRow],
+ style: str | None = None,
+ role: str | None = None,
+ tool_name: str | None = None,
+ camera: str | None = None,
+) -> LanguageRow | None:
+ """Return the persistent row of ``style`` that is active at time ``t``.
+
+ A persistent row is "active" at ``t`` when its own ``timestamp`` is the
+ most recent one ``<= t`` for the given ``style``/``role``/``tool_name``/
+ ``camera`` selector. Only valid for persistent styles.
+ """
+ _validate_persistent_resolver("active_at", style)
+ matches = [
+ row
+ for row in _matching_rows(persistent, style=style, role=role, tool_name=tool_name, camera=camera)
+ if _timestamp(row) <= t
+ ]
+ if not matches:
+ return None
+ latest_ts = max(_timestamp(row) for row in matches)
+ return _select_one(
+ [row for row in matches if _timestamp(row) == latest_ts],
+ style=style,
+ role=role,
+ tool_name=tool_name,
+ camera=camera,
+ )
+
+
+EMITTED_AT_TOLERANCE_S = 0.1
+"""Half-window for matching persistent rows to a frame timestamp in
+``emitted_at``. Persistent timestamps come from parquet (float32) and ``t``
+is also a float32 from parquet, so in the ideal hot path an exact match
+would suffice — but any caller that derives ``t`` arithmetically (e.g.
+``frame_idx / fps``) breaks bit-equality. A 0.1 s tolerance covers
+common arithmetic drift without admitting frames that are visibly far
+apart at typical control rates (30–100 Hz). This does mean two persistent
+rows of the same selector emitted within 0.1 s of each other cannot be
+told apart by ``emitted_at`` — acceptable because persistent annotations
+(subtask / plan / memory transitions) change on a human-action timescale,
+not at the camera frame rate."""
+
+
+def emitted_at(
+ t: float,
+ *,
+ persistent: Sequence[LanguageRow],
+ events: Sequence[LanguageRow],
+ style: str | None = None,
+ role: str | None = None,
+ tool_name: str | None = None,
+ camera: str | None = None,
+) -> LanguageRow | None:
+ """Return the row of ``style`` emitted at exactly time ``t``.
+
+ For persistent styles, this matches persistent rows whose own ``timestamp``
+ is within ``EMITTED_AT_TOLERANCE_S`` of ``t`` (see that constant for why
+ we use a tolerance instead of bit-equality). For event styles, the
+ ``events`` list is assumed to come from the dataset row at frame ``t``
+ (event rows carry no timestamp of their own), so all matching event rows
+ are considered emitted at ``t``. ``camera`` filters by the row's
+ ``camera`` field — required to disambiguate when multiple view-dependent
+ rows share ``(t, role)`` across cameras.
+ """
+ if column_for_style(style) == LANGUAGE_PERSISTENT:
+ matches = [
+ row
+ for row in _matching_rows(persistent, style=style, role=role, tool_name=tool_name, camera=camera)
+ if abs(_timestamp(row) - t) <= EMITTED_AT_TOLERANCE_S
+ ]
+ else:
+ matches = _matching_rows(events, style=style, role=role, tool_name=tool_name, camera=camera)
+ return _select_one(matches, style=style, role=role, tool_name=tool_name, camera=camera)
+
+
+def nth_prev(
+ t: float,
+ *,
+ persistent: Sequence[LanguageRow],
+ style: str | None = None,
+ offset: int = 1,
+ role: str | None = None,
+ tool_name: str | None = None,
+ camera: str | None = None,
+) -> LanguageRow | None:
+ """Return the persistent row that was active ``offset`` steps before ``t``.
+
+ Walks back through chronologically sorted persistent rows of ``style``
+ (filtered by optional ``role``/``tool_name``/``camera``) and returns the
+ one ``offset`` positions before the row active at ``t``. Only valid for
+ persistent styles.
+ """
+ return _nth_relative("nth_prev", t, persistent, style, -offset, role, tool_name, camera)
+
+
+def nth_next(
+ t: float,
+ *,
+ persistent: Sequence[LanguageRow],
+ style: str | None = None,
+ offset: int = 1,
+ role: str | None = None,
+ tool_name: str | None = None,
+ camera: str | None = None,
+) -> LanguageRow | None:
+ """Return the persistent row that becomes active ``offset`` steps after ``t``.
+
+ Walks forward through chronologically sorted persistent rows of ``style``
+ (filtered by optional ``role``/``tool_name``/``camera``) and returns the
+ one ``offset`` positions after the row active at ``t``. Only valid for
+ persistent styles.
+ """
+ return _nth_relative("nth_next", t, persistent, style, offset, role, tool_name, camera)
+
+
+def render_sample(
+ *,
+ recipe: TrainingRecipe,
+ persistent: Sequence[LanguageRow] | None,
+ events: Sequence[LanguageRow] | None,
+ t: float,
+ sample_idx: int,
+ task: str | None = None,
+ dataset_ctx: Any | None = None,
+) -> RenderedMessages | None:
+ """Render the chat-style messages for a single dataset sample.
+
+ Resolves the recipe's bindings against ``persistent`` and ``events`` rows
+ at frame timestamp ``t``, then expands the recipe's message templates.
+ Returns ``None`` if the resolved sample contains no target message.
+ """
+ persistent_rows = _normalize_rows(persistent or [])
+ event_rows = _normalize_rows(events or [])
+ selected_recipe = _select_recipe(recipe, sample_idx)
+ bindings = _resolve_bindings(
+ selected_recipe,
+ persistent=persistent_rows,
+ events=event_rows,
+ t=t,
+ sample_idx=sample_idx,
+ task=task,
+ dataset_ctx=dataset_ctx,
+ )
+ return _render_message_recipe(selected_recipe, bindings)
+
+
+def _select_recipe(recipe: TrainingRecipe, sample_idx: int) -> TrainingRecipe:
+ """Pick a deterministic blend component for ``sample_idx`` (or return ``recipe``)."""
+ if recipe.blend is None:
+ return recipe
+
+ total_weight = sum(component.weight or 0.0 for component in recipe.blend.values())
+ if total_weight <= 0:
+ raise ValueError("Blend weights must sum to a positive value.")
+
+ digest = hashlib.blake2b(str(sample_idx).encode(), digest_size=8).digest()
+ draw = int.from_bytes(digest, "big") / 2**64 * total_weight
+ cumulative = 0.0
+ last_component: TrainingRecipe | None = None
+ for component in recipe.blend.values():
+ last_component = component
+ cumulative += component.weight or 0.0
+ if draw < cumulative:
+ return component
+ assert last_component is not None
+ return last_component
+
+
+def _resolve_bindings(
+ recipe: TrainingRecipe,
+ *,
+ persistent: Sequence[LanguageRow],
+ events: Sequence[LanguageRow],
+ t: float,
+ sample_idx: int,
+ task: str | None,
+ dataset_ctx: Any | None,
+) -> dict[str, LanguageRow | str | None]:
+ """Resolve every binding in ``recipe`` (plus ``task``) at time ``t``."""
+ bindings: dict[str, LanguageRow | str | None] = {
+ "task": _resolve_task(task, dataset_ctx, persistent=persistent, sample_idx=sample_idx),
+ }
+ specs = {**DEFAULT_BINDINGS, **(recipe.bindings or {})}
+ for name, spec in specs.items():
+ bindings[name] = _resolve_spec(spec, persistent=persistent, events=events, t=t)
+ return bindings
+
+
+def _resolve_task(
+ task: str | None,
+ dataset_ctx: Any | None,
+ *,
+ persistent: Sequence[LanguageRow] = (),
+ sample_idx: int = 0,
+) -> str | None:
+ """Return the task string for ``sample_idx``.
+
+ Resolution order:
+
+ 1. Explicit ``task`` override (caller-supplied) wins.
+ 2. If ``persistent`` contains rows of style ``task_aug`` (role=user),
+ deterministically pick one by ``sample_idx`` so each frame of an
+ episode rotates through the available rephrasings across an epoch.
+ This realizes Xiao 2022 / CAST-style task-prompt diversity without
+ changing ``meta/tasks.parquet`` and without forcing recipes to opt
+ in: ``${task}`` automatically picks a rephrasing when one exists,
+ and falls back to the canonical task otherwise. Recipes that want
+ the literal canonical task can override the binding.
+ 3. Otherwise read the canonical task from ``dataset_ctx`` (which is
+ backed by ``meta/tasks.parquet``).
+ """
+ if task is not None:
+ return task
+
+ aug_rows = [r for r in persistent if r.get("style") == "task_aug" and r.get("role") == "user"]
+ if aug_rows:
+ # Deterministic, blake2b-based pick keyed on sample_idx so the
+ # rotation is reproducible across runs (Python's built-in ``hash``
+ # is process-randomized).
+ digest = hashlib.blake2b(f"task_aug:{sample_idx}".encode(), digest_size=8).digest()
+ idx = int.from_bytes(digest, "big") % len(aug_rows)
+ chosen = aug_rows[idx].get("content")
+ if chosen:
+ return str(chosen)
+
+ if dataset_ctx is None:
+ return None
+ if isinstance(dataset_ctx, dict):
+ return dataset_ctx.get("task")
+ return getattr(dataset_ctx, "task", None)
+
+
+def _resolve_spec(
+ spec: str,
+ *,
+ persistent: Sequence[LanguageRow],
+ events: Sequence[LanguageRow],
+ t: float,
+) -> LanguageRow | None:
+ """Parse a single binding's resolver expression and dispatch to its function."""
+ match = _RESOLVER_RE.match(spec.strip())
+ if match is None:
+ raise ValueError(f"Invalid resolver expression: {spec!r}")
+ name = match.group("name")
+ kwargs = _parse_resolver_args(match.group("args"))
+ kwargs.pop("t_arg", None)
+
+ if name == "emitted_at":
+ return emitted_at(t, persistent=persistent, events=events, **kwargs)
+ if name == "active_at":
+ return active_at(t, persistent=persistent, **kwargs)
+ if name == "nth_prev":
+ return nth_prev(t, persistent=persistent, **kwargs)
+ if name == "nth_next":
+ return nth_next(t, persistent=persistent, **kwargs)
+ raise ValueError(f"Unknown language resolver: {name!r}")
+
+
+def _parse_resolver_args(args: str) -> dict[str, Any]:
+ """Parse a comma-separated resolver argument list into a kwargs dict."""
+ kwargs: dict[str, Any] = {}
+ if not args.strip():
+ return kwargs
+
+ parts = [part.strip() for part in args.split(",") if part.strip()]
+ for part in parts:
+ if part == "t":
+ kwargs["t_arg"] = True
+ continue
+ if "=" not in part:
+ raise ValueError(f"Invalid resolver argument: {part!r}")
+ key, value = (item.strip() for item in part.split("=", 1))
+ if key == "offset":
+ kwargs[key] = int(value)
+ else:
+ kwargs[key] = value.strip("\"'")
+ return kwargs
+
+
+def _render_message_recipe(
+ recipe: TrainingRecipe,
+ bindings: dict[str, LanguageRow | str | None],
+) -> RenderedMessages | None:
+ """Expand ``recipe.messages`` into rendered chat messages using ``bindings``."""
+ assert recipe.messages is not None
+ messages: list[dict[str, Any]] = []
+ streams: list[str | None] = []
+ target_indices: list[int] = []
+
+ for turn in recipe.messages:
+ if turn.if_present is not None and bindings.get(turn.if_present) is None:
+ continue
+
+ message = {"role": turn.role}
+ if turn.content is not None:
+ message["content"] = _render_content(turn.content, bindings)
+
+ if turn.tool_calls_from is not None:
+ row = bindings.get(turn.tool_calls_from)
+ tool_calls = row.get("tool_calls") if isinstance(row, dict) else None
+ if tool_calls:
+ message["tool_calls"] = copy.deepcopy(tool_calls)
+
+ message_idx = len(messages)
+ messages.append(message)
+ streams.append(turn.stream)
+ if turn.target:
+ target_indices.append(message_idx)
+
+ if not target_indices:
+ return None
+
+ rendered = {
+ "messages": messages,
+ "message_streams": streams,
+ "target_message_indices": target_indices,
+ }
+ _validate_rendered(rendered)
+ return rendered
+
+
+def _render_content(
+ content: str | list[dict[str, Any]],
+ bindings: dict[str, LanguageRow | str | None],
+) -> str | list[dict[str, Any]]:
+ """Substitute bindings into a string or each string field of multimodal blocks."""
+ if isinstance(content, str):
+ return _substitute(content, bindings)
+
+ rendered_blocks = []
+ for block in content:
+ rendered_block = copy.deepcopy(block)
+ for key, value in rendered_block.items():
+ if isinstance(value, str):
+ rendered_block[key] = _substitute(value, bindings)
+ rendered_blocks.append(rendered_block)
+ return rendered_blocks
+
+
+def _substitute(template: str, bindings: dict[str, LanguageRow | str | None]) -> str:
+ """Replace ``${name}`` placeholders in ``template`` with their bound values."""
+
+ def replace(match: re.Match[str]) -> str:
+ """Resolve a single ``${name}`` match to its bound string value."""
+ name = match.group(1)
+ if name not in bindings:
+ raise ValueError(f"Unknown template binding: {name!r}")
+ value = bindings[name]
+ if value is None:
+ return ""
+ if isinstance(value, dict):
+ content = value.get("content")
+ return "" if content is None else str(content)
+ return str(value)
+
+ return PLACEHOLDER_RE.sub(replace, template)
+
+
+def _validate_rendered(rendered: RenderedMessages) -> None:
+ """Sanity-check the rendered output for stream/target alignment."""
+ messages = rendered["messages"]
+ streams = rendered["message_streams"]
+ target_indices = rendered["target_message_indices"]
+
+ if len(streams) != len(messages):
+ raise ValueError("message_streams must be aligned with messages.")
+ if not target_indices:
+ raise ValueError("Rendered samples must contain at least one target message.")
+ for idx in target_indices:
+ if idx < 0 or idx >= len(messages):
+ raise ValueError(f"Target message index {idx} is out of bounds.")
+ # ``stream`` is enforced non-None at MessageTurn construction time
+ # (see ``MessageTurn.__post_init__``), so a missing stream here would
+ # mean the dataclass invariant was bypassed; no need to re-check.
+
+
+def _nth_relative(
+ name: str,
+ t: float,
+ persistent: Sequence[LanguageRow],
+ style: str | None,
+ offset: int,
+ role: str | None,
+ tool_name: str | None,
+ camera: str | None,
+) -> LanguageRow | None:
+ """Shared body for ``nth_prev`` / ``nth_next`` with signed ``offset``."""
+ _validate_persistent_resolver(name, style)
+ if abs(offset) < 1:
+ raise ValueError(f"{name} offset must be non-zero.")
+
+ rows = sorted(
+ _matching_rows(persistent, style=style, role=role, tool_name=tool_name, camera=camera),
+ key=_row_sort_key,
+ )
+ if not rows:
+ return None
+
+ anchor_idx = None
+ for idx, row in enumerate(rows):
+ if _timestamp(row) <= t:
+ anchor_idx = idx
+ else:
+ break
+
+ target_idx = (offset - 1 if offset > 0 else None) if anchor_idx is None else anchor_idx + offset
+
+ if target_idx is None or target_idx < 0 or target_idx >= len(rows):
+ return None
+ return rows[target_idx]
+
+
+def _validate_persistent_resolver(name: str, style: str | None) -> None:
+ """Reject calls with missing or event-only ``style`` for persistent resolvers."""
+ if style is None:
+ raise ValueError(f"{name} requires a persistent style.")
+ if column_for_style(style) != LANGUAGE_PERSISTENT:
+ raise ValueError(f"{name} cannot be used with event-only style {style!r}.")
+
+
+def _matching_rows(
+ rows: Sequence[LanguageRow],
+ *,
+ style: str | None,
+ role: str | None,
+ tool_name: str | None,
+ camera: str | None,
+) -> list[LanguageRow]:
+ """Return ``rows`` filtered by optional ``style``/``role``/``tool_name``/``camera`` selectors."""
+ return [
+ row
+ for row in rows
+ if (style is None or row.get("style") == style)
+ and (role is None or row.get("role") == role)
+ and (tool_name is None or _row_has_tool_name(row, tool_name))
+ and (camera is None or row.get("camera") == camera)
+ ]
+
+
+def _select_one(
+ rows: Sequence[LanguageRow],
+ *,
+ style: str | None,
+ role: str | None,
+ tool_name: str | None,
+ camera: str | None,
+) -> LanguageRow | None:
+ """Return the single matching row, or raise if the resolver is ambiguous.
+
+ Multiple matches always raise — even when the caller already passed
+ some selectors — because remaining ambiguity means the data has
+ several rows that look identical to the resolver and the caller
+ needs to pin down a specific one (e.g. add ``camera=...`` for VQA
+ rows shared across cameras).
+ """
+ if not rows:
+ return None
+ if len(rows) > 1:
+ raise ValueError(
+ f"Ambiguous resolver for style={style!r} role={role!r} "
+ f"tool_name={tool_name!r} camera={camera!r}: {len(rows)} matching rows. "
+ f"Add a selector that distinguishes them."
+ )
+ return rows[0]
+
+
+def _row_sort_key(row: LanguageRow) -> tuple[float, str, str]:
+ """Stable sort key for both persistent and event rows.
+
+ Event rows lack ``timestamp`` (it is implicit in the frame), so default
+ to ``0.0`` — within a single frame all event rows share the same sort
+ bucket and are tiebroken by ``(style, role)``.
+ """
+ timestamp = row.get("timestamp")
+ ts = float(unwrap_scalar(timestamp)) if timestamp is not None else 0.0
+ return (ts, row.get("style") or "", row.get("role") or "")
+
+
+def _timestamp(row: LanguageRow) -> float:
+ """Extract a row's ``timestamp`` as a Python float (unwrapping numpy scalars)."""
+ return float(unwrap_scalar(row["timestamp"]))
+
+
+def _row_has_tool_name(row: LanguageRow, tool_name: str) -> bool:
+ """Return ``True`` if any of the row's tool calls invokes ``tool_name``."""
+ for tool_call in row.get("tool_calls") or []:
+ if isinstance(tool_call, str):
+ continue
+ function = tool_call.get("function") if isinstance(tool_call, dict) else None
+ if isinstance(function, dict) and function.get("name") == tool_name:
+ return True
+ return False
+
+
+def _normalize_rows(rows: Sequence[Any]) -> list[LanguageRow]:
+ """Convert pyarrow scalars / mappings into a fresh list of plain dict rows."""
+ normalized = []
+ for row in rows:
+ if row is None:
+ continue
+ if hasattr(row, "as_py"):
+ row = row.as_py()
+ if not isinstance(row, dict):
+ raise TypeError(f"Language rows must be dictionaries, got {type(row).__name__}.")
+ normalized.append(dict(row))
+ return normalized
diff --git a/src/lerobot/datasets/utils.py b/src/lerobot/datasets/utils.py
index 715bd2f9b..de91978ea 100644
--- a/src/lerobot/datasets/utils.py
+++ b/src/lerobot/datasets/utils.py
@@ -88,7 +88,6 @@ VIDEO_DIR = "videos"
CHUNK_FILE_PATTERN = "chunk-{chunk_index:03d}/file-{file_index:03d}"
DEFAULT_TASKS_PATH = "meta/tasks.parquet"
-DEFAULT_SUBTASKS_PATH = "meta/subtasks.parquet"
DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
@@ -130,6 +129,9 @@ class DatasetInfo:
# Optional metadata
robot_type: str | None = None
splits: dict[str, str] = field(default_factory=dict)
+ # OpenAI-style tool schemas declared by the dataset. ``None`` means the
+ # dataset doesn't declare any — readers fall back to ``DEFAULT_TOOLS``.
+ tools: list[dict] | None = None
def __post_init__(self) -> None:
# Coerce feature shapes from list to tuple — JSON deserialisation
@@ -151,11 +153,15 @@ class DatasetInfo:
"""Return a JSON-serialisable dict.
Converts tuple shapes back to lists so ``json.dump`` can handle them.
+ Drops ``tools`` when unset so existing datasets keep a clean
+ ``info.json``.
"""
d = dataclasses.asdict(self)
for ft in d["features"].values():
if isinstance(ft.get("shape"), tuple):
ft["shape"] = list(ft["shape"])
+ if d.get("tools") is None:
+ d.pop("tools", None)
return d
@classmethod
diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py
index 3688a4b8c..fe35af4b4 100644
--- a/src/lerobot/processor/__init__.py
+++ b/src/lerobot/processor/__init__.py
@@ -95,6 +95,13 @@ from .relative_action_processor import (
from .rename_processor import RenameObservationsProcessorStep, rename_stats
from .tokenizer_processor import ActionTokenizerProcessorStep, TokenizerProcessorStep
+# RenderMessagesStep is intentionally NOT re-exported here: it pulls in
+# `lerobot.datasets.language`, which requires the `[dataset]` extra
+# (`datasets`, `pyarrow`). Importing it from the processor package would
+# break every base-install consumer of `lerobot.processor`. Users that
+# need it import directly:
+# from lerobot.processor.render_messages_processor import RenderMessagesStep
+
__all__ = [
"ActionProcessorStep",
"AddTeleopActionAsComplimentaryDataStep",
diff --git a/src/lerobot/processor/batch_processor.py b/src/lerobot/processor/batch_processor.py
index eb7db255a..669c68a0a 100644
--- a/src/lerobot/processor/batch_processor.py
+++ b/src/lerobot/processor/batch_processor.py
@@ -174,6 +174,24 @@ class AddBatchDimensionComplementaryDataStep(ComplementaryDataProcessorStep):
task_index_value = complementary_data["task_index"]
if isinstance(task_index_value, Tensor) and task_index_value.dim() == 0:
complementary_data["task_index"] = task_index_value.unsqueeze(0)
+
+ complementary_data.pop("language_persistent", None)
+ complementary_data.pop("language_events", None)
+
+ if "messages" in complementary_data:
+ messages = complementary_data["messages"]
+ if isinstance(messages, list) and (not messages or isinstance(messages[0], dict)):
+ complementary_data["messages"] = [messages]
+
+ if "message_streams" in complementary_data:
+ streams = complementary_data["message_streams"]
+ if isinstance(streams, list) and (not streams or isinstance(streams[0], str)):
+ complementary_data["message_streams"] = [streams]
+
+ if "target_message_indices" in complementary_data:
+ indices = complementary_data["target_message_indices"]
+ if isinstance(indices, list) and (not indices or isinstance(indices[0], int)):
+ complementary_data["target_message_indices"] = [indices]
return complementary_data
def transform_features(
diff --git a/src/lerobot/processor/converters.py b/src/lerobot/processor/converters.py
index ffdf0098c..faa4d5cd9 100644
--- a/src/lerobot/processor/converters.py
+++ b/src/lerobot/processor/converters.py
@@ -153,26 +153,30 @@ def from_tensor_to_numpy(x: torch.Tensor | Any) -> np.ndarray | float | int | An
return x
+_COMPLEMENTARY_KEYS = (
+ "task",
+ "index",
+ "task_index",
+ "episode_index",
+ "timestamp",
+ "language_persistent",
+ "language_events",
+ "messages",
+ "message_streams",
+ "target_message_indices",
+)
+
+
def _extract_complementary_data(batch: dict[str, Any]) -> dict[str, Any]:
- """
- Extract complementary data from a batch dictionary.
+ """Extract complementary data from a batch dictionary.
- This includes padding flags, task description, and indices.
-
- Args:
- batch: The batch dictionary.
-
- Returns:
- A dictionary with the extracted complementary data.
+ Includes padding flags (any key containing ``_is_pad``) plus the fixed
+ set of metadata / language keys defined in ``_COMPLEMENTARY_KEYS`` —
+ each only when present in ``batch``.
"""
pad_keys = {k: v for k, v in batch.items() if "_is_pad" in k}
- task_key = {"task": batch["task"]} if "task" in batch else {}
- subtask_key = {"subtask": batch["subtask"]} if "subtask" in batch else {}
- index_key = {"index": batch["index"]} if "index" in batch else {}
- task_index_key = {"task_index": batch["task_index"]} if "task_index" in batch else {}
- episode_index_key = {"episode_index": batch["episode_index"]} if "episode_index" in batch else {}
-
- return {**pad_keys, **task_key, **subtask_key, **index_key, **task_index_key, **episode_index_key}
+ extras = {k: batch[k] for k in _COMPLEMENTARY_KEYS if k in batch}
+ return {**pad_keys, **extras}
def create_transition(
diff --git a/src/lerobot/processor/render_messages_processor.py b/src/lerobot/processor/render_messages_processor.py
new file mode 100644
index 000000000..140592f0e
--- /dev/null
+++ b/src/lerobot/processor/render_messages_processor.py
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+
+# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Any
+
+from lerobot.configs import PipelineFeatureType, PolicyFeature
+from lerobot.configs.recipe import TrainingRecipe
+from lerobot.datasets.language import LANGUAGE_EVENTS, LANGUAGE_PERSISTENT
+from lerobot.datasets.language_render import render_sample
+from lerobot.types import EnvTransition, TransitionKey
+from lerobot.utils.utils import unwrap_scalar
+
+from .pipeline import ProcessorStep, ProcessorStepRegistry
+
+
+@dataclass
+@ProcessorStepRegistry.register(name="render_messages_processor")
+class RenderMessagesStep(ProcessorStep):
+ """Processor step that turns raw language columns into rendered chat messages.
+
+ Reads ``language_persistent`` and ``language_events`` from the transition's
+ complementary data, renders them through ``recipe`` at the sample timestamp,
+ and replaces the raw columns with the resulting ``messages`` /
+ ``message_streams`` / ``target_message_indices`` keys.
+ """
+
+ recipe: TrainingRecipe
+ dataset_ctx: Any | None = None
+
+ def __call__(self, transition: EnvTransition) -> EnvTransition | None:
+ """Render messages for a single transition; return ``None`` to drop it."""
+ complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA) or {}
+ persistent = complementary_data.get(LANGUAGE_PERSISTENT) or []
+ events = complementary_data.get(LANGUAGE_EVENTS) or []
+
+ if not persistent and not events:
+ return transition
+
+ timestamp = complementary_data.get("timestamp")
+ if timestamp is None:
+ raise KeyError("RenderMessagesStep requires sample timestamp in complementary data.")
+
+ sample_idx = complementary_data.get("index", 0)
+ rendered = render_sample(
+ recipe=self.recipe,
+ persistent=persistent,
+ events=events,
+ t=unwrap_scalar(timestamp),
+ sample_idx=int(unwrap_scalar(sample_idx)),
+ task=complementary_data.get("task"),
+ dataset_ctx=self.dataset_ctx,
+ )
+ if rendered is None:
+ return None
+
+ new_transition = transition.copy()
+ new_complementary_data = dict(complementary_data)
+ new_complementary_data.pop(LANGUAGE_PERSISTENT, None)
+ new_complementary_data.pop(LANGUAGE_EVENTS, None)
+ new_complementary_data.update(rendered)
+ new_transition[TransitionKey.COMPLEMENTARY_DATA] = new_complementary_data
+ return new_transition
+
+ def transform_features(
+ self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
+ ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
+ """Pass features through unchanged; rendering only touches complementary data."""
+ return features
diff --git a/src/lerobot/scripts/lerobot_train.py b/src/lerobot/scripts/lerobot_train.py
index 55a8cc935..463668eb2 100644
--- a/src/lerobot/scripts/lerobot_train.py
+++ b/src/lerobot/scripts/lerobot_train.py
@@ -48,6 +48,7 @@ from lerobot.envs import close_envs, make_env, make_env_pre_post_processors
from lerobot.optim.factory import make_optimizer_and_scheduler
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
from lerobot.rewards import make_reward_pre_post_processors
+from lerobot.utils.collate import lerobot_collate_fn
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.logging_utils import AverageMeter, MetricsTracker
from lerobot.utils.random_utils import set_seed
@@ -401,6 +402,10 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
shuffle = True
sampler = None
+ # Only swap in the language-aware collate when the dataset actually
+ # declares language columns; otherwise stay on PyTorch's default
+ # collate so non-language training runs are unaffected.
+ collate_fn = lerobot_collate_fn if dataset.meta.has_language_columns else None
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=cfg.num_workers,
@@ -409,6 +414,7 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
sampler=sampler,
pin_memory=device.type == "cuda",
drop_last=False,
+ collate_fn=collate_fn,
prefetch_factor=cfg.prefetch_factor if cfg.num_workers > 0 else None,
persistent_workers=cfg.persistent_workers and cfg.num_workers > 0,
)
diff --git a/src/lerobot/utils/collate.py b/src/lerobot/utils/collate.py
new file mode 100644
index 000000000..fce7e6b42
--- /dev/null
+++ b/src/lerobot/utils/collate.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+
+# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+from torch.utils.data._utils.collate import default_collate
+
+from lerobot.datasets.language import LANGUAGE_COLUMNS
+
+_PYTHON_LIST_KEYS = {"messages", "message_streams", "target_message_indices"}
+
+
+def lerobot_collate_fn(batch: list[dict[str, Any] | None]) -> dict[str, Any] | None:
+ """Collate function that preserves Python-list and language fields as lists.
+
+ Drops ``None`` samples (e.g. recipes that yielded no target message), keeps
+ rendered-message and language fields as plain Python lists, and delegates
+ every other key to PyTorch's ``default_collate``.
+ """
+ batch = [sample for sample in batch if sample is not None]
+ if not batch:
+ return None
+
+ # All-or-nothing per key: a partial-presence batch (e.g. half the samples
+ # carry `messages` and half don't) is a real bug in the upstream
+ # rendering step — silently filtering would hand downstream consumers a
+ # preserved list shorter than the tensor batch. Raise instead so the
+ # mismatch surfaces at the boundary.
+ preserved: dict[str, list[Any]] = {}
+ for key in _PYTHON_LIST_KEYS:
+ presence = [key in sample for sample in batch]
+ if not any(presence):
+ continue
+ if not all(presence):
+ raise ValueError(
+ f"Inconsistent batch: {sum(presence)}/{len(batch)} samples carry {key!r}; "
+ f"every sample in a batch must agree."
+ )
+ preserved[key] = [sample[key] for sample in batch]
+ tensorizable = [
+ {
+ key: value
+ for key, value in sample.items()
+ if key not in _PYTHON_LIST_KEYS and key not in LANGUAGE_COLUMNS
+ }
+ for sample in batch
+ ]
+ collated = default_collate(tensorizable)
+ collated.update(preserved)
+ return collated
diff --git a/src/lerobot/utils/utils.py b/src/lerobot/utils/utils.py
index 2574f1fa3..6aad0c503 100644
--- a/src/lerobot/utils/utils.py
+++ b/src/lerobot/utils/utils.py
@@ -160,6 +160,25 @@ def has_method(cls: object, method_name: str) -> bool:
return hasattr(cls, method_name) and callable(getattr(cls, method_name))
+def unwrap_scalar(value: Any) -> Any:
+ """Unwrap a tensor / numpy scalar / single-element list into a Python scalar.
+
+ Tensors and numpy scalars expose ``.item()``; single-element lists are
+ unwrapped recursively. Anything else is returned unchanged. Centralized
+ here so the language renderer and processor steps share one definition.
+
+ Raises:
+ ValueError: If ``value`` is a list with zero or multiple elements.
+ """
+ if hasattr(value, "item"):
+ return value.item()
+ if isinstance(value, list):
+ if len(value) != 1:
+ raise ValueError(f"Expected a scalar, got list of length {len(value)}: {value!r}")
+ return unwrap_scalar(value[0])
+ return value
+
+
def is_valid_numpy_dtype_string(dtype_str: str) -> bool:
"""
Return True if a given string can be converted to a numpy dtype.
diff --git a/tests/configs/test_recipe.py b/tests/configs/test_recipe.py
new file mode 100644
index 000000000..b4954efbf
--- /dev/null
+++ b/tests/configs/test_recipe.py
@@ -0,0 +1,168 @@
+#!/usr/bin/env python
+
+from pathlib import Path
+from textwrap import dedent
+
+import pytest
+
+from lerobot.configs.recipe import MessageTurn, TrainingRecipe, load_recipe
+
+
+def _minimal_message_turn(content: str = "${task}") -> MessageTurn:
+ return MessageTurn(role="user", content=content, stream="high_level")
+
+
+def _minimal_target_turn() -> MessageTurn:
+ return MessageTurn(role="assistant", content="ok", stream="high_level", target=True)
+
+
+# ── Message-recipe validation ────────────────────────────────────────
+
+
+def test_message_recipe_validates_unknown_binding():
+ with pytest.raises(ValueError, match="unknown binding"):
+ TrainingRecipe(
+ messages=[
+ MessageTurn(role="user", content="${missing}", stream="high_level"),
+ _minimal_target_turn(),
+ ]
+ )
+
+
+def test_message_turn_requires_a_stream():
+ """Every turn must declare a stream — None is rejected at construction.
+
+ Previously this only failed at render time (``_validate_rendered``);
+ catching it here means a malformed recipe YAML errors at load instead
+ of at the first training sample.
+ """
+ with pytest.raises(ValueError, match="missing a stream"):
+ MessageTurn(role="user", content="${task}")
+
+
+def test_message_recipe_requires_at_least_one_target():
+ with pytest.raises(ValueError, match="target"):
+ TrainingRecipe(
+ messages=[
+ _minimal_message_turn(),
+ MessageTurn(role="assistant", content="no target", stream="high_level"),
+ ]
+ )
+
+
+def test_recipe_rejects_both_messages_and_blend():
+ with pytest.raises(ValueError, match="only one"):
+ TrainingRecipe(
+ messages=[_minimal_message_turn(), _minimal_target_turn()],
+ blend={"a": TrainingRecipe(weight=1.0, messages=[_minimal_target_turn()])},
+ )
+
+
+def test_recipe_rejects_neither_messages_nor_blend():
+ with pytest.raises(ValueError, match="must set one"):
+ TrainingRecipe()
+
+
+# ── Blend validation ─────────────────────────────────────────────────
+
+
+def test_blend_must_be_non_empty():
+ with pytest.raises(ValueError, match="at least one component"):
+ TrainingRecipe(blend={})
+
+
+def test_blend_component_must_define_weight():
+ with pytest.raises(ValueError, match="weight"):
+ TrainingRecipe(blend={"a": TrainingRecipe(messages=[_minimal_target_turn()])})
+
+
+def test_blend_component_weight_must_be_positive():
+ with pytest.raises(ValueError, match="positive weight"):
+ TrainingRecipe(blend={"a": TrainingRecipe(weight=0.0, messages=[_minimal_target_turn()])})
+
+
+def test_blend_component_must_define_messages():
+ # A bare TrainingRecipe(weight=1.0) would itself raise; build it without
+ # going through __post_init__ to exercise the blend-level validator.
+ bad = TrainingRecipe.__new__(TrainingRecipe)
+ bad.messages = None
+ bad.bindings = None
+ bad.blend = None
+ bad.weight = 1.0
+ with pytest.raises(ValueError, match="must define messages"):
+ TrainingRecipe(blend={"a": bad})
+
+
+def test_blend_components_cannot_themselves_define_a_blend():
+ inner = TrainingRecipe(blend={"x": TrainingRecipe(weight=1.0, messages=[_minimal_target_turn()])})
+ # Force-bypass the inner component's normal validation so the test
+ # exercises the outer blend's "no nested blends" rule directly.
+ nested = TrainingRecipe.__new__(TrainingRecipe)
+ nested.messages = None
+ nested.bindings = None
+ nested.blend = inner.blend
+ nested.weight = 1.0
+ with pytest.raises(ValueError, match="cannot itself define a blend"):
+ TrainingRecipe(blend={"outer": nested})
+
+
+# ── from_dict / from_yaml round-trips ────────────────────────────────
+
+
+def test_from_dict_with_nested_blend():
+ recipe = TrainingRecipe.from_dict(
+ {
+ "blend": {
+ "a": {
+ "weight": 1.0,
+ "messages": [
+ {"role": "user", "content": "${task}", "stream": "high_level"},
+ {"role": "assistant", "content": "a", "stream": "high_level", "target": True},
+ ],
+ },
+ "b": {
+ "weight": 2.0,
+ "messages": [
+ {"role": "user", "content": "${task}", "stream": "high_level"},
+ {"role": "assistant", "content": "b", "stream": "high_level", "target": True},
+ ],
+ },
+ }
+ }
+ )
+ assert recipe.blend is not None
+ assert set(recipe.blend) == {"a", "b"}
+ assert recipe.blend["b"].weight == 2.0
+ # Inner messages were promoted to MessageTurn instances.
+ assert isinstance(recipe.blend["a"].messages[0], MessageTurn)
+
+
+def test_from_yaml_round_trips_through_load_recipe(tmp_path: Path):
+ yaml_text = dedent(
+ """
+ bindings:
+ custom: "active_at(t, style=subtask)"
+ messages:
+ - {role: user, content: "${task}: ${custom}", stream: high_level}
+ - {role: assistant, content: "ok", stream: high_level, target: true}
+ """
+ ).strip()
+ path = tmp_path / "recipe.yaml"
+ path.write_text(yaml_text)
+
+ via_classmethod = TrainingRecipe.from_yaml(path)
+ via_helper = load_recipe(path)
+
+ assert via_classmethod.bindings == {"custom": "active_at(t, style=subtask)"}
+ assert via_classmethod.messages[1].target is True
+ # ``load_recipe`` is just a wrapper, but assert the two paths agree
+ # on the structural result so a future divergence is caught here.
+ assert via_helper.bindings == via_classmethod.bindings
+ assert len(via_helper.messages) == len(via_classmethod.messages)
+
+
+def test_from_yaml_rejects_non_mapping(tmp_path: Path):
+ path = tmp_path / "bad.yaml"
+ path.write_text("- just\n- a\n- list\n")
+ with pytest.raises(ValueError, match="mapping at the top level"):
+ TrainingRecipe.from_yaml(path)
diff --git a/tests/datasets/test_dataset_metadata.py b/tests/datasets/test_dataset_metadata.py
index 6c784c90b..171d8af8b 100644
--- a/tests/datasets/test_dataset_metadata.py
+++ b/tests/datasets/test_dataset_metadata.py
@@ -385,3 +385,140 @@ def test_finalize_flushes_buffered_metadata(tmp_path):
assert episodes_dir.exists()
parquet_files = list(episodes_dir.rglob("*.parquet"))
assert len(parquet_files) > 0
+
+
+# ── Tools accessor ───────────────────────────────────────────────────
+
+
+def test_tools_falls_back_to_default_when_info_has_no_tools_field(tmp_path):
+ """meta.tools returns DEFAULT_TOOLS when info.json doesn't declare any."""
+ from lerobot.datasets.language import DEFAULT_TOOLS
+
+ root = tmp_path / "no_tools"
+ meta = LeRobotDatasetMetadata.create(
+ repo_id="test/no_tools",
+ fps=DEFAULT_FPS,
+ features=SIMPLE_FEATURES,
+ root=root,
+ use_videos=False,
+ )
+
+ assert meta.tools == DEFAULT_TOOLS
+ # info.json on disk should NOT include a `tools` key for clean datasets
+ with open(root / INFO_PATH) as f:
+ info_on_disk = json.load(f)
+ assert "tools" not in info_on_disk
+
+
+def test_tools_reads_declared_tools_from_info_json(tmp_path):
+ """A `tools` list written into info.json survives load → meta.tools.
+
+ Regression test for the bug where ``DatasetInfo.from_dict`` silently
+ dropped the ``tools`` key (no matching dataclass field), so
+ ``meta.tools`` always returned ``DEFAULT_TOOLS`` regardless of
+ what was on disk.
+ """
+ from lerobot.datasets.io_utils import load_info
+
+ root = tmp_path / "with_tools"
+ meta = LeRobotDatasetMetadata.create(
+ repo_id="test/with_tools",
+ fps=DEFAULT_FPS,
+ features=SIMPLE_FEATURES,
+ root=root,
+ use_videos=False,
+ )
+
+ custom_tool = {
+ "type": "function",
+ "function": {
+ "name": "record_observation",
+ "description": "Capture a still image.",
+ "parameters": {
+ "type": "object",
+ "properties": {"label": {"type": "string"}},
+ "required": ["label"],
+ },
+ },
+ }
+ info_path = root / INFO_PATH
+ with open(info_path) as f:
+ raw = json.load(f)
+ raw["tools"] = [custom_tool]
+ with open(info_path, "w") as f:
+ json.dump(raw, f)
+
+ # Reload info from disk and rebind it on the metadata object
+ meta.info = load_info(root)
+ assert meta.tools == [custom_tool]
+
+
+def test_tools_round_trip_through_dataset_info(tmp_path):
+ """A `tools` list survives DatasetInfo.from_dict / to_dict."""
+ from lerobot.datasets.utils import DatasetInfo
+
+ raw = {
+ "codebase_version": "v3.1",
+ "fps": 30,
+ "features": SIMPLE_FEATURES,
+ "tools": [{"type": "function", "function": {"name": "say"}}],
+ }
+ info = DatasetInfo.from_dict(raw)
+ assert info.tools == raw["tools"]
+ assert info.to_dict()["tools"] == raw["tools"]
+
+
+def test_tools_setter_persists_to_info_json_and_reloads(tmp_path):
+ """Assigning meta.tools writes info.json and reloads meta.info."""
+ from lerobot.datasets.io_utils import load_info
+
+ root = tmp_path / "set_tools"
+ meta = LeRobotDatasetMetadata.create(
+ repo_id="test/set_tools",
+ fps=DEFAULT_FPS,
+ features=SIMPLE_FEATURES,
+ root=root,
+ use_videos=False,
+ )
+
+ custom_tool = {
+ "type": "function",
+ "function": {
+ "name": "record_observation",
+ "description": "Capture a still image.",
+ "parameters": {
+ "type": "object",
+ "properties": {"label": {"type": "string"}},
+ "required": ["label"],
+ },
+ },
+ }
+ meta.tools = [custom_tool]
+
+ # In-memory metadata reflects the new catalog ...
+ assert meta.tools == [custom_tool]
+ assert meta.info.tools == [custom_tool]
+ # ... and a fresh read from disk agrees.
+ assert load_info(root).tools == [custom_tool]
+
+
+def test_tools_setter_clears_key_when_set_to_none(tmp_path):
+ """Setting meta.tools back to None drops the key and restores the default."""
+ from lerobot.datasets.language import DEFAULT_TOOLS
+
+ root = tmp_path / "clear_tools"
+ meta = LeRobotDatasetMetadata.create(
+ repo_id="test/clear_tools",
+ fps=DEFAULT_FPS,
+ features=SIMPLE_FEATURES,
+ root=root,
+ use_videos=False,
+ )
+
+ meta.tools = [{"type": "function", "function": {"name": "say"}}]
+ meta.tools = None
+
+ assert meta.tools == DEFAULT_TOOLS
+ with open(root / INFO_PATH) as f:
+ info_on_disk = json.load(f)
+ assert "tools" not in info_on_disk
diff --git a/tests/datasets/test_language.py b/tests/datasets/test_language.py
new file mode 100644
index 000000000..52c7b3708
--- /dev/null
+++ b/tests/datasets/test_language.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python
+
+import pytest
+
+pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
+pytest.importorskip("pandas", reason="pandas is required (install lerobot[dataset])")
+
+import numpy as np # noqa: E402
+import pandas as pd # noqa: E402
+import pyarrow as pa # noqa: E402
+
+from lerobot.datasets import LeRobotDataset # noqa: E402
+from lerobot.datasets.io_utils import write_info # noqa: E402
+from lerobot.datasets.language import ( # noqa: E402
+ EVENT_ONLY_STYLES,
+ LANGUAGE_EVENTS,
+ LANGUAGE_PERSISTENT,
+ PERSISTENT_STYLES,
+ STYLE_REGISTRY,
+ VIEW_DEPENDENT_STYLES,
+ column_for_style,
+ is_view_dependent_style,
+ language_events_arrow_type,
+ language_feature_info,
+ language_persistent_arrow_type,
+ validate_camera_field,
+)
+from lerobot.datasets.utils import DEFAULT_DATA_PATH # noqa: E402
+
+
+def test_language_arrow_schema_has_expected_fields():
+ persistent_row_type = language_persistent_arrow_type().value_type
+ event_row_type = language_events_arrow_type().value_type
+
+ assert isinstance(persistent_row_type, pa.StructType)
+ assert persistent_row_type.names == [
+ "role",
+ "content",
+ "style",
+ "timestamp",
+ "camera",
+ "tool_calls",
+ ]
+
+ assert isinstance(event_row_type, pa.StructType)
+ assert event_row_type.names == ["role", "content", "style", "camera", "tool_calls"]
+
+ # Persistent-row timestamps use float32, matching LeRobotDataset frame timestamps.
+ assert persistent_row_type.field("timestamp").type == pa.float32()
+
+
+def test_validate_feature_language_warns_only_on_non_empty_value(caplog):
+ from lerobot.datasets.feature_utils import validate_feature_language
+
+ # None (the expected record-time value) is silent and non-fatal.
+ with caplog.at_level("WARNING"):
+ assert validate_feature_language("language_persistent", None) == ""
+ assert caplog.records == []
+
+ # A stray non-empty value is dropped later, so we warn rather than fail.
+ with caplog.at_level("WARNING"):
+ assert validate_feature_language("language_persistent", [{"role": "user"}]) == ""
+ assert any("language_persistent" in r.message for r in caplog.records)
+
+
+def test_style_registry_routes_columns():
+ assert {"subtask", "plan", "memory", "motion", "task_aug"} == PERSISTENT_STYLES
+ assert {"interjection", "vqa", "trace"} == EVENT_ONLY_STYLES
+ assert PERSISTENT_STYLES | EVENT_ONLY_STYLES <= STYLE_REGISTRY
+
+ assert column_for_style("subtask") == LANGUAGE_PERSISTENT
+ assert column_for_style("plan") == LANGUAGE_PERSISTENT
+ assert column_for_style("memory") == LANGUAGE_PERSISTENT
+ assert column_for_style("motion") == LANGUAGE_PERSISTENT
+ assert column_for_style("task_aug") == LANGUAGE_PERSISTENT
+ assert column_for_style("interjection") == LANGUAGE_EVENTS
+ assert column_for_style("vqa") == LANGUAGE_EVENTS
+ assert column_for_style("trace") == LANGUAGE_EVENTS
+ assert column_for_style(None) == LANGUAGE_EVENTS
+
+
+def test_view_dependent_styles():
+ # motion lives in PERSISTENT_STYLES and is described in robot-frame
+ # (joint / Cartesian) terms, so it is NOT view-dependent. Only vqa
+ # (event) and trace (event, pixel-trajectory) carry a camera tag.
+ assert {"vqa", "trace"} == VIEW_DEPENDENT_STYLES
+ assert is_view_dependent_style("vqa")
+ assert is_view_dependent_style("trace")
+ assert not is_view_dependent_style("motion")
+ assert not is_view_dependent_style("subtask")
+ assert not is_view_dependent_style("plan")
+ assert not is_view_dependent_style("interjection")
+ assert not is_view_dependent_style(None)
+
+
+def test_validate_camera_field_requires_camera_for_view_dependent_styles():
+ validate_camera_field("vqa", "observation.images.top")
+ validate_camera_field("trace", "observation.images.front")
+ with pytest.raises(ValueError, match="view-dependent"):
+ validate_camera_field("vqa", None)
+ with pytest.raises(ValueError, match="view-dependent"):
+ validate_camera_field("trace", "")
+
+
+def test_validate_camera_field_rejects_camera_on_non_view_dependent_styles():
+ validate_camera_field("subtask", None)
+ validate_camera_field("plan", None)
+ validate_camera_field("memory", None)
+ validate_camera_field("motion", None)
+ validate_camera_field("interjection", None)
+ validate_camera_field(None, None)
+ with pytest.raises(ValueError, match="must have camera=None"):
+ validate_camera_field("subtask", "observation.images.top")
+ with pytest.raises(ValueError, match="must have camera=None"):
+ validate_camera_field("motion", "observation.images.top")
+ with pytest.raises(ValueError, match="must have camera=None"):
+ validate_camera_field("interjection", "observation.images.top")
+ with pytest.raises(ValueError, match="must have camera=None"):
+ validate_camera_field(None, "observation.images.top")
+
+
+def test_unknown_style_rejected():
+ with pytest.raises(ValueError, match="Unknown language style"):
+ column_for_style("surprise")
+
+
+def test_lerobot_dataset_passes_language_columns_through(tmp_path, empty_lerobot_dataset_factory):
+ root = tmp_path / "language_dataset"
+ dataset = empty_lerobot_dataset_factory(
+ root=root,
+ features={"state": {"dtype": "float32", "shape": (2,), "names": None}},
+ use_videos=False,
+ )
+ dataset.add_frame({"state": np.array([0.0, 1.0], dtype=np.float32), "task": "tidy"})
+ dataset.add_frame({"state": np.array([1.0, 2.0], dtype=np.float32), "task": "tidy"})
+ dataset.save_episode()
+ dataset.finalize()
+
+ persistent = [
+ {
+ "role": "assistant",
+ "content": "reach for the cup",
+ "style": "subtask",
+ "timestamp": 0.0,
+ "camera": None,
+ "tool_calls": None,
+ }
+ ]
+ event = {
+ "role": "user",
+ "content": "what is visible?",
+ "style": "vqa",
+ "camera": "observation.images.top",
+ "tool_calls": None,
+ }
+ data_path = root / DEFAULT_DATA_PATH.format(chunk_index=0, file_index=0)
+ df = pd.read_parquet(data_path)
+ df[LANGUAGE_PERSISTENT] = [persistent, persistent]
+ df[LANGUAGE_EVENTS] = [[event], []]
+ df.to_parquet(data_path)
+
+ info = dataset.meta.info
+ info["features"].update(language_feature_info())
+ write_info(info, root)
+
+ reloaded = LeRobotDataset(repo_id=dataset.repo_id, root=root)
+
+ first = reloaded[0]
+ second = reloaded[1]
+ assert first[LANGUAGE_PERSISTENT] == persistent
+ assert first[LANGUAGE_EVENTS] == [event]
+ assert second[LANGUAGE_PERSISTENT] == persistent
+ assert second[LANGUAGE_EVENTS] == []
diff --git a/tests/datasets/test_language_render.py b/tests/datasets/test_language_render.py
new file mode 100644
index 000000000..fcef41fd8
--- /dev/null
+++ b/tests/datasets/test_language_render.py
@@ -0,0 +1,417 @@
+#!/usr/bin/env python
+
+import pytest
+
+pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
+
+from lerobot.configs.recipe import MessageTurn, TrainingRecipe # noqa: E402
+from lerobot.datasets.language_render import ( # noqa: E402
+ EMITTED_AT_TOLERANCE_S,
+ active_at,
+ emitted_at,
+ nth_next,
+ nth_prev,
+ render_sample,
+)
+
+
+def persistent_row(role, content, style, timestamp, tool_calls=None, camera=None):
+ return {
+ "role": role,
+ "content": content,
+ "style": style,
+ "timestamp": timestamp,
+ "camera": camera,
+ "tool_calls": tool_calls,
+ }
+
+
+def event_row(role, content, style, tool_calls=None, camera=None):
+ return {
+ "role": role,
+ "content": content,
+ "style": style,
+ "camera": camera,
+ "tool_calls": tool_calls,
+ }
+
+
+PERSISTENT = [
+ persistent_row("assistant", "plan 0", "plan", 0.0),
+ persistent_row("assistant", "memory 0", "memory", 0.0),
+ persistent_row("assistant", "subtask 0", "subtask", 0.0),
+ persistent_row("assistant", "memory 1", "memory", 1.0),
+ persistent_row("assistant", "subtask 1", "subtask", 1.0),
+]
+EVENTS_AT_1 = [
+ event_row("user", "what is visible?", "vqa", camera="observation.images.top"),
+ event_row("assistant", '{"count": 2}', "vqa", camera="observation.images.top"),
+]
+EVENTS_AT_2 = [
+ event_row("user", "skip wiping", "interjection"),
+ event_row(
+ "assistant",
+ None,
+ None,
+ [{"type": "function", "function": {"name": "say", "arguments": {"text": "Skipping wiping."}}}],
+ ),
+]
+# Same emission tick, two cameras: triggers per-camera disambiguation in
+# resolvers, mirroring how Module 3 of the annotation pipeline writes one
+# (vqa, user) + (vqa, assistant) pair per camera.
+EVENTS_AT_3_TWO_CAMERAS = [
+ event_row("user", "how many cups (top)?", "vqa", camera="observation.images.top"),
+ event_row("assistant", '{"count": 3}', "vqa", camera="observation.images.top"),
+ event_row("user", "how many cups (wrist)?", "vqa", camera="observation.images.wrist"),
+ event_row("assistant", '{"count": 1}', "vqa", camera="observation.images.wrist"),
+]
+
+
+def test_resolver_temporal_semantics():
+ assert active_at(0.5, persistent=PERSISTENT, style="subtask")["content"] == "subtask 0"
+ assert active_at(1.0, persistent=PERSISTENT, style="subtask")["content"] == "subtask 1"
+ assert emitted_at(0.5, persistent=PERSISTENT, events=[], style="vqa", role="assistant") is None
+ assert (
+ emitted_at(1.0, persistent=PERSISTENT, events=EVENTS_AT_1, style="vqa", role="assistant")["content"]
+ == '{"count": 2}'
+ )
+
+
+def test_persistent_relative_resolvers_reject_event_styles():
+ with pytest.raises(ValueError, match="event-only"):
+ active_at(1.0, persistent=PERSISTENT, style="vqa")
+ with pytest.raises(ValueError, match="event-only"):
+ nth_prev(1.0, persistent=PERSISTENT, style="interjection")
+
+
+def test_nth_prev_and_next():
+ assert nth_prev(1.0, persistent=PERSISTENT, style="subtask", offset=1)["content"] == "subtask 0"
+ assert nth_next(0.0, persistent=PERSISTENT, style="subtask", offset=1)["content"] == "subtask 1"
+
+
+def test_substitution_if_present_multimodal_and_tool_calls():
+ recipe = TrainingRecipe(
+ messages=[
+ MessageTurn(
+ role="user",
+ content=[
+ {"type": "image", "feature": "observation.images.top"},
+ {"type": "text", "text": "${task}: ${interjection}"},
+ ],
+ stream="high_level",
+ if_present="interjection",
+ ),
+ MessageTurn(
+ role="assistant",
+ content="${plan}",
+ stream="high_level",
+ target=True,
+ tool_calls_from="speech",
+ ),
+ ],
+ bindings={"plan": "active_at(t, style=plan)"},
+ )
+
+ rendered = render_sample(
+ recipe=recipe,
+ persistent=PERSISTENT,
+ events=EVENTS_AT_2,
+ t=2.0,
+ sample_idx=0,
+ task="clean kitchen",
+ )
+
+ assert rendered["messages"][0]["content"][1]["text"] == "clean kitchen: skip wiping"
+ assert rendered["messages"][1]["content"] == "plan 0"
+ assert rendered["messages"][1]["tool_calls"][0]["function"]["name"] == "say"
+ assert rendered["message_streams"] == ["high_level", "high_level"]
+ assert rendered["target_message_indices"] == [1]
+
+
+def test_exact_event_miss_returns_none_when_target_skips():
+ recipe = TrainingRecipe(
+ messages=[
+ MessageTurn(role="user", content="${vqa_query}", stream="high_level", if_present="vqa_query"),
+ MessageTurn(
+ role="assistant",
+ content="${vqa}",
+ stream="high_level",
+ target=True,
+ if_present="vqa",
+ ),
+ ]
+ )
+
+ assert (
+ render_sample(recipe=recipe, persistent=PERSISTENT, events=EVENTS_AT_2, t=0.0, sample_idx=0) is None
+ )
+
+
+def test_deterministic_blend_sampling():
+ recipe = TrainingRecipe(
+ blend={
+ "a": TrainingRecipe(
+ weight=1.0,
+ messages=[
+ MessageTurn(role="user", content="${task}", stream="high_level"),
+ MessageTurn(role="assistant", content="a", stream="high_level", target=True),
+ ],
+ ),
+ "b": TrainingRecipe(
+ weight=1.0,
+ messages=[
+ MessageTurn(role="user", content="${task}", stream="high_level"),
+ MessageTurn(role="assistant", content="b", stream="high_level", target=True),
+ ],
+ ),
+ }
+ )
+
+ first = render_sample(
+ recipe=recipe, persistent=PERSISTENT, events=EVENTS_AT_2, t=0.0, sample_idx=123, task="x"
+ )
+ second = render_sample(
+ recipe=recipe, persistent=PERSISTENT, events=EVENTS_AT_2, t=0.0, sample_idx=123, task="x"
+ )
+ assert first == second
+
+
+def test_emitted_at_filters_vqa_by_camera():
+ top = emitted_at(
+ 3.0,
+ persistent=PERSISTENT,
+ events=EVENTS_AT_3_TWO_CAMERAS,
+ style="vqa",
+ role="assistant",
+ camera="observation.images.top",
+ )
+ wrist = emitted_at(
+ 3.0,
+ persistent=PERSISTENT,
+ events=EVENTS_AT_3_TWO_CAMERAS,
+ style="vqa",
+ role="assistant",
+ camera="observation.images.wrist",
+ )
+ assert top["content"] == '{"count": 3}'
+ assert wrist["content"] == '{"count": 1}'
+
+
+def test_emitted_at_raises_on_ambiguous_per_camera_vqa():
+ with pytest.raises(ValueError, match="Ambiguous resolver"):
+ emitted_at(
+ 3.0,
+ persistent=PERSISTENT,
+ events=EVENTS_AT_3_TWO_CAMERAS,
+ style="vqa",
+ role="assistant",
+ )
+
+
+def _vqa_subrecipe(camera: str) -> TrainingRecipe:
+ return TrainingRecipe(
+ weight=1.0,
+ bindings={
+ "vqa_query": f"emitted_at(t, style=vqa, role=user, camera={camera})",
+ "vqa": f"emitted_at(t, style=vqa, role=assistant, camera={camera})",
+ },
+ messages=[
+ MessageTurn(
+ role="user",
+ content=[{"type": "image", "feature": camera}, {"type": "text", "text": "${vqa_query}"}],
+ stream="high_level",
+ if_present="vqa_query",
+ ),
+ MessageTurn(
+ role="assistant",
+ content="${vqa}",
+ stream="high_level",
+ target=True,
+ if_present="vqa",
+ ),
+ ],
+ )
+
+
+@pytest.mark.parametrize(
+ ("camera", "expected_query", "expected_answer"),
+ [
+ ("observation.images.top", "how many cups (top)?", '{"count": 3}'),
+ ("observation.images.wrist", "how many cups (wrist)?", '{"count": 1}'),
+ ],
+)
+def test_per_camera_blend_renders_both_views(camera, expected_query, expected_answer):
+ rendered = render_sample(
+ recipe=_vqa_subrecipe(camera),
+ persistent=PERSISTENT,
+ events=EVENTS_AT_3_TWO_CAMERAS,
+ t=3.0,
+ sample_idx=0,
+ )
+
+ assert rendered["messages"][0]["content"][0]["feature"] == camera
+ assert rendered["messages"][0]["content"][1]["text"] == expected_query
+ assert rendered["messages"][1]["content"] == expected_answer
+
+
+def test_resolve_task_picks_rephrasing_deterministically_per_sample():
+ rephrasings = [
+ persistent_row("user", "tidy the kitchen", "task_aug", 0.0),
+ persistent_row("user", "please clean up the kitchen", "task_aug", 0.0),
+ persistent_row("user", "kitchen needs tidying", "task_aug", 0.0),
+ persistent_row("user", "make the kitchen clean", "task_aug", 0.0),
+ ]
+ recipe = TrainingRecipe(
+ messages=[
+ MessageTurn(role="user", content="${task}", stream="high_level"),
+ MessageTurn(role="assistant", content="ok", stream="high_level", target=True),
+ ]
+ )
+
+ # No explicit task override → resolver consults persistent rows.
+ seen: set[str] = set()
+ for sample_idx in range(64):
+ rendered = render_sample(
+ recipe=recipe,
+ persistent=rephrasings,
+ events=[],
+ t=0.0,
+ sample_idx=sample_idx,
+ dataset_ctx={"task": "canonical kitchen task"},
+ )
+ seen.add(rendered["messages"][0]["content"])
+ # Every rephrasing should be reachable across enough samples.
+ assert seen == {r["content"] for r in rephrasings}
+ # Same sample_idx → same pick (determinism).
+ a = render_sample(
+ recipe=recipe,
+ persistent=rephrasings,
+ events=[],
+ t=0.0,
+ sample_idx=42,
+ dataset_ctx={"task": "canonical"},
+ )
+ b = render_sample(
+ recipe=recipe,
+ persistent=rephrasings,
+ events=[],
+ t=0.0,
+ sample_idx=42,
+ dataset_ctx={"task": "canonical"},
+ )
+ assert a["messages"][0]["content"] == b["messages"][0]["content"]
+
+
+def test_resolve_task_falls_back_to_canonical_without_rephrasings():
+ recipe = TrainingRecipe(
+ messages=[
+ MessageTurn(role="user", content="${task}", stream="high_level"),
+ MessageTurn(role="assistant", content="ok", stream="high_level", target=True),
+ ]
+ )
+ rendered = render_sample(
+ recipe=recipe,
+ persistent=PERSISTENT, # no task_aug rows
+ events=[],
+ t=0.0,
+ sample_idx=0,
+ dataset_ctx={"task": "clean the kitchen"},
+ )
+ assert rendered["messages"][0]["content"] == "clean the kitchen"
+
+
+def test_resolve_task_explicit_override_beats_rephrasings():
+ rephrasings = [
+ persistent_row("user", "rephrased one", "task_aug", 0.0),
+ persistent_row("user", "rephrased two", "task_aug", 0.0),
+ ]
+ recipe = TrainingRecipe(
+ messages=[
+ MessageTurn(role="user", content="${task}", stream="high_level"),
+ MessageTurn(role="assistant", content="ok", stream="high_level", target=True),
+ ]
+ )
+ rendered = render_sample(
+ recipe=recipe,
+ persistent=rephrasings,
+ events=[],
+ t=0.0,
+ sample_idx=0,
+ task="explicit override wins",
+ dataset_ctx={"task": "canonical"},
+ )
+ assert rendered["messages"][0]["content"] == "explicit override wins"
+
+
+def test_emitted_at_persistent_tolerates_small_timestamp_drift():
+ """Persistent ``emitted_at`` should match within EMITTED_AT_TOLERANCE_S
+ so callers that derive ``t`` arithmetically (``frame_idx / fps``) still
+ line up with the parquet-stored timestamp.
+ """
+ rows = [persistent_row("assistant", "memo", "memory", 1.0)]
+ # Half a tolerance window — bit-different float, comfortably inside
+ inside = emitted_at(1.0 + EMITTED_AT_TOLERANCE_S / 2, persistent=rows, events=[], style="memory")
+ assert inside is not None and inside["content"] == "memo"
+
+ # Just past the window — no match
+ outside = emitted_at(1.0 + EMITTED_AT_TOLERANCE_S * 2, persistent=rows, events=[], style="memory")
+ assert outside is None
+
+
+def test_render_sample_rejects_non_dict_language_rows():
+ """``_normalize_rows`` must surface malformed inputs as TypeError.
+
+ A pipeline that hands the renderer a non-dict (e.g. a stray string)
+ is a real upstream bug — silent skipping would let it propagate.
+ """
+ recipe = TrainingRecipe(
+ messages=[
+ MessageTurn(role="user", content="${task}", stream="high_level"),
+ MessageTurn(role="assistant", content="ok", stream="high_level", target=True),
+ ]
+ )
+ with pytest.raises(TypeError, match="must be dictionaries"):
+ render_sample(
+ recipe=recipe,
+ persistent=["not a dict"],
+ events=[],
+ t=0.0,
+ sample_idx=0,
+ task="x",
+ )
+
+
+def test_low_level_branch_renders_active_subtask():
+ low_level = TrainingRecipe(
+ blend={
+ "low": TrainingRecipe(
+ weight=1.0,
+ messages=[
+ MessageTurn(
+ role="user",
+ content="${task}\nPlan: ${plan}\nMemory: ${memory}",
+ stream="high_level",
+ ),
+ MessageTurn(
+ role="assistant",
+ content="${subtask}",
+ stream="low_level",
+ target=True,
+ ),
+ ],
+ )
+ }
+ )
+
+ rendered = render_sample(
+ recipe=low_level,
+ persistent=PERSISTENT,
+ events=[],
+ t=0.5,
+ sample_idx=0,
+ task="clean kitchen",
+ )
+
+ assert rendered["messages"][-1] == {"role": "assistant", "content": "subtask 0"}
+ assert rendered["message_streams"][-1] == "low_level"
+ assert rendered["target_message_indices"] == [1]
diff --git a/tests/datasets/test_subtask_dataset.py b/tests/datasets/test_subtask_dataset.py
deleted file mode 100644
index bb77b77d1..000000000
--- a/tests/datasets/test_subtask_dataset.py
+++ /dev/null
@@ -1,193 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Tests for subtask functionality in LeRobotDataset.
-
-These tests verify that:
-- Subtask information is correctly loaded from datasets that have subtask data
-- The __getitem__ method correctly adds subtask strings to returned items
-- Subtask handling gracefully handles missing data
-"""
-
-import pytest
-
-pytest.importorskip("pandas", reason="pandas is required (install lerobot[dataset])")
-
-import pandas as pd # noqa: E402
-import torch
-
-from lerobot.datasets.lerobot_dataset import LeRobotDataset
-
-
-class TestSubtaskDataset:
- """Tests for subtask handling in LeRobotDataset."""
-
- @pytest.fixture
- def subtask_dataset(self):
- """Load the test subtask dataset from the hub."""
- # Use lerobot/pusht-subtask dataset with episode 1
- return LeRobotDataset(
- repo_id="lerobot/pusht-subtask",
- episodes=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
- )
-
- def test_subtask_dataset_loads(self, subtask_dataset):
- """Test that the subtask dataset loads successfully."""
- assert subtask_dataset is not None
- assert len(subtask_dataset) > 0
-
- def test_subtask_metadata_loaded(self, subtask_dataset):
- """Test that subtask metadata is loaded when present in dataset."""
- # The dataset should have subtasks metadata loaded
- assert subtask_dataset.meta.subtasks is not None
- assert isinstance(subtask_dataset.meta.subtasks, pd.DataFrame)
-
- def test_subtask_index_in_features(self, subtask_dataset):
- """Test that subtask_index is a feature when dataset has subtasks."""
- assert "subtask_index" in subtask_dataset.features
-
- def test_getitem_returns_subtask_string(self, subtask_dataset):
- """Test that __getitem__ correctly adds subtask string to returned item."""
- item = subtask_dataset[0]
-
- # Subtask should be present in the returned item
- assert "subtask" in item
- assert isinstance(item["subtask"], str)
- assert len(item["subtask"]) > 0 # Should not be empty
-
- def test_getitem_has_subtask_index(self, subtask_dataset):
- """Test that __getitem__ includes subtask_index."""
- item = subtask_dataset[0]
-
- assert "subtask_index" in item
- assert isinstance(item["subtask_index"], torch.Tensor)
-
- def test_subtask_index_maps_to_valid_subtask(self, subtask_dataset):
- """Test that subtask_index correctly maps to a subtask in metadata."""
- item = subtask_dataset[0]
-
- subtask_idx = item["subtask_index"].item()
- subtask_from_metadata = subtask_dataset.meta.subtasks.iloc[subtask_idx].name
-
- assert item["subtask"] == subtask_from_metadata
-
- def test_all_items_have_subtask(self, subtask_dataset):
- """Test that all items in the dataset have subtask information."""
- for i in range(min(len(subtask_dataset), 5)): # Check first 5 items
- item = subtask_dataset[i]
- assert "subtask" in item
- assert isinstance(item["subtask"], str)
-
- def test_task_and_subtask_coexist(self, subtask_dataset):
- """Test that both task and subtask are present in returned items."""
- item = subtask_dataset[0]
-
- # Both task and subtask should be present
- assert "task" in item
- assert "subtask" in item
- assert isinstance(item["task"], str)
- assert isinstance(item["subtask"], str)
-
-
-class TestSubtaskDatasetMissing:
- """Tests for graceful handling when subtask data is missing."""
-
- @pytest.fixture
- def dataset_without_subtasks(self, tmp_path, empty_lerobot_dataset_factory):
- """Create a dataset without subtask information."""
- features = {"state": {"dtype": "float32", "shape": (2,), "names": None}}
- dataset = empty_lerobot_dataset_factory(root=tmp_path / "no_subtask", features=features)
-
- # Add some frames and save
- for _ in range(5):
- dataset.add_frame({"state": torch.randn(2), "task": "Test task"})
- dataset.save_episode()
- dataset.finalize()
-
- # Reload the dataset
- return LeRobotDataset(dataset.repo_id, root=dataset.root)
-
- def test_no_subtask_in_features(self, dataset_without_subtasks):
- """Test that subtask_index is not in features when not provided."""
- assert "subtask_index" not in dataset_without_subtasks.features
-
- def test_getitem_without_subtask(self, dataset_without_subtasks):
- """Test that __getitem__ works when subtask is not present."""
- item = dataset_without_subtasks[0]
-
- # Item should still be retrievable
- assert item is not None
- assert "state" in item
- assert "task" in item
-
- # Subtask should NOT be present
- assert "subtask" not in item
-
- def test_subtasks_metadata_is_none(self, dataset_without_subtasks):
- """Test that subtasks metadata is None when not present."""
- assert dataset_without_subtasks.meta.subtasks is None
-
-
-class TestSubtaskEdgeCases:
- """Edge case tests for subtask handling."""
-
- def test_subtask_with_multiple_episodes(self):
- """Test subtask handling with multiple episodes if available."""
- try:
- dataset = LeRobotDataset(
- repo_id="lerobot/pusht-subtask",
- episodes=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
- )
- except Exception:
- pytest.skip("Could not load test-subtask dataset")
-
- # Check first and last items have valid subtasks
- first_item = dataset[0]
- last_item = dataset[len(dataset) - 1]
-
- assert "subtask" in first_item
- assert "subtask" in last_item
- assert isinstance(first_item["subtask"], str)
- assert isinstance(last_item["subtask"], str)
-
- def test_subtask_index_consistency(self):
- """Test that same subtask_index returns same subtask string."""
- try:
- dataset = LeRobotDataset(
- repo_id="lerobot/pusht-subtask",
- episodes=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
- )
- except Exception:
- pytest.skip("Could not load test-subtask dataset")
-
- if len(dataset) < 2:
- pytest.skip("Dataset too small for this test")
-
- # Collect subtask_index to subtask mappings
- subtask_map = {}
- for i in range(min(len(dataset), 10)):
- item = dataset[i]
- idx = item["subtask_index"].item()
- subtask = item["subtask"]
-
- if idx in subtask_map:
- # Same index should always return same subtask
- assert subtask_map[idx] == subtask, (
- f"Inconsistent subtask for index {idx}: '{subtask_map[idx]}' vs '{subtask}'"
- )
- else:
- subtask_map[idx] = subtask
diff --git a/tests/processor/test_render_messages_processor.py b/tests/processor/test_render_messages_processor.py
new file mode 100644
index 000000000..f96e3c0ab
--- /dev/null
+++ b/tests/processor/test_render_messages_processor.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+
+import pytest
+
+pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
+
+import torch # noqa: E402
+
+from lerobot.configs.recipe import MessageTurn, TrainingRecipe # noqa: E402
+from lerobot.processor.converters import create_transition # noqa: E402
+from lerobot.processor.render_messages_processor import RenderMessagesStep # noqa: E402
+from lerobot.types import TransitionKey # noqa: E402
+
+
+def test_render_messages_step_noops_without_language_columns():
+ recipe = TrainingRecipe(
+ messages=[
+ MessageTurn(role="user", content="${task}", stream="high_level"),
+ MessageTurn(role="assistant", content="${subtask}", stream="low_level", target=True),
+ ]
+ )
+ transition = create_transition(complementary_data={"task": "do it"})
+
+ assert RenderMessagesStep(recipe)(transition) == transition
+
+
+def test_render_messages_step_renders_and_drops_raw_language():
+ recipe = TrainingRecipe(
+ messages=[
+ MessageTurn(role="user", content="${task}", stream="high_level"),
+ MessageTurn(role="assistant", content="${subtask}", stream="low_level", target=True),
+ ]
+ )
+ transition = create_transition(
+ complementary_data={
+ "task": "do it",
+ "timestamp": torch.tensor(0.0),
+ "index": torch.tensor(7),
+ "language_persistent": [
+ {
+ "role": "assistant",
+ "content": "reach carefully",
+ "style": "subtask",
+ "timestamp": 0.0,
+ "camera": None,
+ "tool_calls": None,
+ }
+ ],
+ "language_events": [],
+ }
+ )
+
+ out = RenderMessagesStep(recipe)(transition)
+ data = out[TransitionKey.COMPLEMENTARY_DATA]
+
+ assert "language_persistent" not in data
+ assert "language_events" not in data
+ assert data["messages"][-1]["content"] == "reach carefully"
+ assert data["message_streams"] == ["high_level", "low_level"]
+ assert data["target_message_indices"] == [1]
diff --git a/tests/utils/test_collate.py b/tests/utils/test_collate.py
new file mode 100644
index 000000000..2b23b3180
--- /dev/null
+++ b/tests/utils/test_collate.py
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+
+import pytest
+
+pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
+
+import torch # noqa: E402
+
+from lerobot.utils.collate import lerobot_collate_fn # noqa: E402
+
+
+def test_lerobot_collate_preserves_messages_and_drops_raw_language():
+ batch = [
+ {
+ "index": torch.tensor(0),
+ "messages": [{"role": "assistant", "content": "a"}],
+ "message_streams": ["low_level"],
+ "target_message_indices": [0],
+ "language_persistent": [{"content": "raw"}],
+ "language_events": [],
+ },
+ {
+ "index": torch.tensor(1),
+ "messages": [{"role": "assistant", "content": "b"}],
+ "message_streams": ["low_level"],
+ "target_message_indices": [0],
+ "language_persistent": [{"content": "raw"}],
+ "language_events": [],
+ },
+ ]
+
+ out = lerobot_collate_fn(batch)
+
+ assert out["index"].tolist() == [0, 1]
+ assert out["messages"][0][0]["content"] == "a"
+ assert out["messages"][1][0]["content"] == "b"
+ assert out["message_streams"] == [["low_level"], ["low_level"]]
+ assert out["target_message_indices"] == [[0], [0]]
+ assert "language_persistent" not in out
+ assert "language_events" not in out
+
+
+def test_lerobot_collate_passes_through_standard_batch():
+ """On a non-language batch, the collate must match ``default_collate``.
+
+ Guards against silent regressions: ``lerobot_train.py`` only opts into
+ ``lerobot_collate_fn`` when the dataset declares language columns, but
+ if a future change ever wires it in unconditionally we want the
+ behavior to remain a transparent pass-through for ordinary tensor
+ batches.
+ """
+ from torch.utils.data._utils.collate import default_collate
+
+ batch = [
+ {
+ "observation.image": torch.zeros(3, 4, 4),
+ "action": torch.tensor([0.0, 1.0]),
+ "index": torch.tensor(0),
+ },
+ {
+ "observation.image": torch.ones(3, 4, 4),
+ "action": torch.tensor([2.0, 3.0]),
+ "index": torch.tensor(1),
+ },
+ ]
+
+ custom = lerobot_collate_fn(batch)
+ expected = default_collate(batch)
+
+ assert custom.keys() == expected.keys()
+ for key in expected:
+ assert torch.equal(custom[key], expected[key]), f"key={key} diverged"
+
+
+def test_lerobot_collate_drops_none_samples():
+ """Recipes that yielded no target message return ``None`` — those samples
+ must be filtered out, and an entirely-``None`` batch must collapse to ``None``.
+ """
+ batch = [None, {"index": torch.tensor(0)}, None]
+ out = lerobot_collate_fn(batch)
+ assert out is not None
+ assert out["index"].tolist() == [0]
+
+ assert lerobot_collate_fn([None, None]) is None
diff --git a/uv.lock b/uv.lock
index 692029986..7092f780a 100644
--- a/uv.lock
+++ b/uv.lock
@@ -3057,7 +3057,7 @@ requires-dist = [
{ name = "av", marker = "extra == 'av-dep'", specifier = ">=15.0.0,<16.0.0" },
{ name = "cmake", specifier = ">=3.29.0.1,<4.2.0" },
{ name = "contourpy", marker = "extra == 'matplotlib-dep'", specifier = ">=1.3.0,<2.0.0" },
- { name = "datasets", marker = "extra == 'dataset'", specifier = ">=4.0.0,<5.0.0" },
+ { name = "datasets", marker = "extra == 'dataset'", specifier = ">=4.7.0,<5.0.0" },
{ name = "debugpy", marker = "extra == 'dev'", specifier = ">=1.8.1,<1.9.0" },
{ name = "decord", marker = "(platform_machine == 'AMD64' and extra == 'groot') or (platform_machine == 'x86_64' and extra == 'groot')", specifier = ">=0.6.0,<1.0.0" },
{ name = "deepdiff", marker = "extra == 'deepdiff-dep'", specifier = ">=7.0.1,<9.0.0" },