Compare commits

...

8 Commits

Author SHA1 Message Date
Khalil Meftah a130a9db39 feat(eval): optionally push recorded eval datasets to the Hub 2026-06-17 11:41:50 +02:00
Khalil Meftah 4f5e6596be refactor(eval): remove shape inference and shallow copy helpers 2026-06-16 22:13:23 +02:00
Khalil Meftah afeeeb8982 Merge branch 'main' into feat/eval-dataset-recording 2026-06-16 21:45:06 +02:00
Khalil Meftah 040c6b3d66 refactor(eval): per-env datasets recording, no double reset
- Extract _infer_shape_from_obs() to reduce nesting in feature conversion
- Move dataset creation into rollout() using its own env.reset() observation,
  eliminating the extra reset in run_one()
- Replace deepcopy with _shallow_copy_obs() for raw observation stashing
- Support batch_size > 1: each parallel env records to its own dataset
  (single env skips the env_0/ nesting for simplicity)
- One-time warning for env_features keys missing from observations
- Pass recording_dir + env_features through the call chain instead of
  a pre-built recording_dataset object
2026-06-16 21:35:05 +02:00
Caroline Pascal 287c823f13 fix(features copy): adding deepcopy on LeRobot dataset features to avoid shallow copy leaks (#3826)
* fix(features copy): adding deepcopy on LeRobot dataset features to avoid shallow copy leaks

* tests(test): adding new test
2026-06-16 17:58:59 +02:00
Khalil Meftah acd31c7de2 fix(eval): use FeatureType enum comparison instead of string value 2026-06-16 15:22:50 +02:00
Pepijn 58ccc01508 fix(datasets): enforce one parquet row group per episode in v3 data writes (#3807)
* fix(datasets): enforce one parquet row group per episode in v3 data writes

LeRobot v3 data shards must hold exactly one row group per episode so a
reader can fetch episode i with pq.ParquetFile(path).read_row_group(i)
(a byte-range read) instead of loading the whole shard. The recording
writer already does this (one write_table per episode); the aggregate
and lerobot-annotate re-write paths instead concatenated many episodes
and wrote them in one shot, collapsing the file to a single row group.

- io_utils: add write_table_one_row_group_per_episode (one ParquetWriter,
  one write_table per episode — same pattern as the recording writer);
  to_parquet_with_hf_images embeds images then writes per-episode row
  groups; to_parquet_one_row_group_per_episode wraps it for plain frames
- aggregate: route non-image data writes through the per-episode writer;
  leave the episodes-metadata parquet untouched (already one row/episode)
- annotate: rewrite shards via the per-episode writer instead of a single
  bulk pq.write_table
- tests: invariant coverage through the aggregate (image + video) and
  annotate paths

No change to on-disk schema, paths, naming, rollover thresholds, or
compression. Readers stay backward-compatible (old collapsed files load).

* Update src/lerobot/datasets/io_utils.py

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* Update src/lerobot/datasets/io_utils.py

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(datasets): correct indentation and add strict= in row-group helper

The web-edited numpy version of write_table_one_row_group_per_episode had an
over-indented line (IndentationError, breaking pre-commit + test collection)
and a zip() without strict=. Fix both; behaviour unchanged.

---------

Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com>
Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
2026-06-16 12:15:48 +02:00
Khalil Meftah 240393d238 feat(eval): record eval rollouts as raw LeRobot datasets
- Record raw env observations inline during rollout(), before
preprocess_observation() transforms them. Uses LeRobotDataset.create()
with add_frame()/save_episode().

- Supports vectorized envs: each env in the batch records independently,
with save_episode() called per env on termination. Each task gets its
own dataset under output_dir/recordings/{task_group}_{task_id}/.

Enabled via --eval.recording=true; disabled by default.
2026-06-15 16:12:25 +02:00
10 changed files with 381 additions and 23 deletions
@@ -54,6 +54,7 @@ from typing import Any
import pyarrow as pa
import pyarrow.parquet as pq
from lerobot.datasets.io_utils import write_table_one_row_group_per_episode
from lerobot.datasets.language import (
EVENT_ONLY_STYLES,
LANGUAGE_EVENTS,
@@ -274,12 +275,11 @@ class LanguageColumnsWriter:
new_table = self._materialize_table(
table, per_row_persistent, per_row_events, drop_old=self.drop_existing_subtask_index
)
# Atomic replace: write to a sibling tmp path and rename so a crash
# mid-write can't leave a half-written shard that ``pq.read_table``
# would then fail to open. ``Path.replace`` is atomic on POSIX +
# Windows when source and target sit on the same filesystem.
# Re-emit one row group per episode (a bulk pq.write_table would collapse
# them into one). Write to a sibling tmp path and atomically rename so a
# crash mid-write can't leave a half-written shard.
tmp_path = path.with_suffix(path.suffix + ".tmp")
pq.write_table(new_table, tmp_path)
write_table_one_row_group_per_episode(new_table, tmp_path)
tmp_path.replace(path)
def _materialize_table(
+9
View File
@@ -73,8 +73,17 @@ class EvalConfig:
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
use_async_envs: bool = True
# Whether to record eval rollouts as a LeRobot dataset on disk.
recording: bool = False
# If set, push recorded eval datasets to the Hub under this repo id (one repo per task,
# suffixed by task and env index). Requires recording=true.
recording_repo_id: str | None = None
# Whether the pushed recording repositories should be private.
recording_private: bool = False
def __post_init__(self) -> None:
if self.recording_repo_id is not None and not self.recording:
raise ValueError("eval.recording_repo_id requires eval.recording=true.")
if self.batch_size == 0:
self.batch_size = self._auto_batch_size()
if self.batch_size > self.n_episodes:
+9
View File
@@ -32,6 +32,7 @@ from .feature_utils import features_equal_for_merge, get_hf_features_from_featur
from .io_utils import (
get_file_size_in_mb,
get_parquet_file_size_in_mb,
to_parquet_one_row_group_per_episode,
to_parquet_with_hf_images,
write_info,
write_stats,
@@ -551,6 +552,7 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si
aggr_root=dst_meta.root,
hf_features=hf_features,
concatenate=concatenate_data,
one_row_group_per_episode=True,
)
# Record the mapping from source to actual destination
@@ -628,6 +630,7 @@ def append_or_create_parquet_file(
aggr_root: Path = None,
hf_features: datasets.Features | None = None,
concatenate: bool = True,
one_row_group_per_episode: bool = False,
) -> tuple[dict[str, int], tuple[int, int]]:
"""Appends data to an existing parquet file or creates a new one based on size constraints.
@@ -645,6 +648,8 @@ def append_or_create_parquet_file(
aggr_root: Root path for the aggregated dataset.
hf_features: Optional HuggingFace Features schema for proper image typing.
concatenate: When False, always rotate to a new file instead of appending to the current one.
one_row_group_per_episode: True for DATA parquet (emit one row group per episode); False for
the episodes-metadata parquet (already one row per episode).
Returns:
tuple: (updated_idx, (dst_chunk, dst_file)) where updated_idx is the index dict
@@ -657,6 +662,8 @@ def append_or_create_parquet_file(
dst_path.parent.mkdir(parents=True, exist_ok=True)
if contains_images:
to_parquet_with_hf_images(df, dst_path, features=hf_features)
elif one_row_group_per_episode:
to_parquet_one_row_group_per_episode(df, dst_path)
else:
df.to_parquet(dst_path)
return idx, (dst_chunk, dst_file)
@@ -683,6 +690,8 @@ def append_or_create_parquet_file(
if contains_images:
to_parquet_with_hf_images(final_df, target_path, features=hf_features)
elif one_row_group_per_episode:
to_parquet_one_row_group_per_episode(final_df, target_path)
else:
final_df.to_parquet(target_path)
+2 -1
View File
@@ -15,6 +15,7 @@
# limitations under the License.
import contextlib
from collections.abc import Callable
from copy import deepcopy
from pathlib import Path
import numpy as np
@@ -709,7 +710,7 @@ class LeRobotDatasetMetadata:
obj.root.mkdir(parents=True, exist_ok=False)
features = {**features, **DEFAULT_FEATURES}
features = {**deepcopy(features), **DEFAULT_FEATURES}
_validate_feature_names(features)
obj.tasks = None
+4 -1
View File
@@ -27,6 +27,7 @@ import logging
import shutil
from collections.abc import Callable
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from copy import deepcopy
from pathlib import Path
import datasets
@@ -1101,7 +1102,9 @@ def _copy_episodes_metadata_and_stats(
if dst_meta.video_keys and src_dataset.meta.video_keys:
for key in dst_meta.video_keys:
if key in src_dataset.meta.features:
dst_meta.info.features[key]["info"] = src_dataset.meta.info.features[key].get("info", {})
dst_meta.info.features[key]["info"] = deepcopy(
src_dataset.meta.info.features[key].get("info", {})
)
write_info(dst_meta.info, dst_meta.root)
+38 -9
View File
@@ -20,6 +20,7 @@ import datasets
import numpy as np
import pandas
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as pa_ds
import pyarrow.parquet as pq
import torch
@@ -270,21 +271,49 @@ def hf_transform_to_torch(items_dict: dict[str, list[Any]]) -> dict[str, list[to
return items_dict
def write_table_one_row_group_per_episode(table: pa.Table, path: Path) -> None:
"""Write ``table`` with one parquet row group per episode (in episode order).
Keeps shards random-access friendly (``read_row_group(i)`` fetches episode i),
mirroring the recording writer. ``table`` must carry a contiguous
``episode_index`` column.
"""
episode_index = table.column("episode_index").to_numpy(zero_copy_only=False)
starts = np.concatenate(([0], np.nonzero(np.diff(episode_index))[0] + 1))
writer = pq.ParquetWriter(str(path), table.schema, compression="snappy", use_dictionary=True)
try:
for start, stop in zip(starts, np.append(starts[1:], len(episode_index)), strict=True):
writer.write_table(table.slice(start, stop - start)) # one episode -> one row group
finally:
writer.close()
def to_parquet_with_hf_images(
df: pandas.DataFrame, path: Path, features: datasets.Features | None = None
) -> None:
"""This function correctly writes to parquet a panda DataFrame that contains images encoded by HF dataset.
This way, it can be loaded by HF dataset and correctly formatted images are returned.
"""Write a DataFrame with HF-encoded images to parquet, one row group per episode.
Args:
df: DataFrame to write to parquet.
path: Path to write the parquet file.
features: Optional HuggingFace Features schema. If provided, ensures image columns
are properly typed as Image() in the parquet schema.
Images are embedded into the arrow table first (``ParquetWriter.write_table``
does not embed external image files like ``Dataset.to_parquet`` does).
``features`` types image columns as ``Image()`` in the parquet schema.
"""
# TODO(qlhoest): replace this weird synthax by `df.to_parquet(path)` only
ds = datasets.Dataset.from_dict(df.to_dict(orient="list"), features=features)
ds.to_parquet(path)
ds = embed_images(ds)
table = ds.with_format("arrow")[:]
if "episode_index" in table.column_names:
write_table_one_row_group_per_episode(table, path)
else:
# No episode boundaries to align row groups to — keep a single write.
pq.write_table(table, str(path))
def to_parquet_one_row_group_per_episode(df: pandas.DataFrame, path: Path) -> None:
"""Write a (non-image) DataFrame to parquet with one row group per episode."""
table = pa.Table.from_pandas(df, preserve_index=False)
if "episode_index" in table.column_names:
write_table_one_row_group_per_episode(table, path)
else:
pq.write_table(table, str(path))
def item_to_torch(item: dict) -> dict:
+170 -6
View File
@@ -72,8 +72,9 @@ from termcolor import colored
from torch import Tensor, nn
from tqdm import trange
from lerobot.configs import parser
from lerobot.configs import FeatureType, parser
from lerobot.configs.eval import EvalPipelineConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.envs import (
check_env_attributes_and_types,
close_envs,
@@ -84,7 +85,7 @@ from lerobot.envs import (
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
from lerobot.processor import PolicyProcessorPipeline
from lerobot.types import PolicyAction
from lerobot.utils.constants import ACTION, DONE, OBS_STR, REWARD
from lerobot.utils.constants import ACTION, DONE, OBS_IMAGE, OBS_IMAGES, OBS_STR, REWARD
from lerobot.utils.device_utils import get_safe_torch_device
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.io_utils import write_video
@@ -95,6 +96,65 @@ from lerobot.utils.utils import (
)
def _env_features_to_dataset_features(env_features: dict) -> dict:
"""Convert EnvConfig.features to the dict format expected by LeRobotDataset.create()."""
features = {}
for key, ft in env_features.items():
shape = tuple(ft.shape)
if ft.type is FeatureType.VISUAL:
features[key] = {"dtype": "video", "shape": shape, "names": ["height", "width", "channel"]}
else:
features[key] = {"dtype": "float32", "shape": shape, "names": None}
features["next.reward"] = {"dtype": "float32", "shape": (1,), "names": None}
features["next.success"] = {"dtype": "bool", "shape": (1,), "names": None}
features["next.done"] = {"dtype": "bool", "shape": (1,), "names": None}
return features
def _build_raw_frame(
raw_obs: dict,
env_idx: int,
action: np.ndarray,
reward: float,
success: bool,
done: bool,
task: str,
env_features: dict,
) -> dict:
"""Build a dataset frame from raw env observations for one env index.
Keys in the frame match the keys in env_features so they align with the
dataset schema created by _env_features_to_dataset_features().
"""
frame: dict[str, Any] = {}
for key in env_features:
if key == ACTION:
continue
if key.startswith("next."):
continue
if "pixels" in raw_obs and isinstance(raw_obs["pixels"], dict):
for cam_name, img in raw_obs["pixels"].items():
candidate = f"{OBS_IMAGES}.{cam_name}"
if candidate == key:
frame[key] = img[env_idx]
if key in frame:
continue
if "pixels" in raw_obs and not isinstance(raw_obs["pixels"], dict) and key in ("pixels", OBS_IMAGE):
frame[key] = raw_obs["pixels"][env_idx]
continue
if key in raw_obs and isinstance(raw_obs[key], np.ndarray):
val = raw_obs[key][env_idx]
if val.dtype == np.float64:
val = val.astype(np.float32)
frame[key] = val
frame[ACTION] = action
frame["next.reward"] = np.atleast_1d(np.float32(reward))
frame["next.success"] = np.atleast_1d(np.bool_(success))
frame["next.done"] = np.atleast_1d(np.bool_(done))
frame["task"] = task
return frame
def rollout(
env: gym.vector.VectorEnv,
policy: PreTrainedPolicy,
@@ -105,6 +165,10 @@ def rollout(
seeds: list[int] | None = None,
return_observations: bool = False,
render_callback: Callable[[gym.vector.VectorEnv], None] | None = None,
recording_dir: Path | None = None,
env_features: dict | None = None,
recording_repo_id: str | None = None,
recording_private: bool = False,
) -> dict:
"""Run a batched policy rollout once through a batch of environments.
@@ -145,6 +209,33 @@ def rollout(
if render_callback is not None:
render_callback(env)
recording_datasets: list[LeRobotDataset] | None = None
raw_observation = None
task_desc = ""
if recording_dir is not None and env_features is not None:
features = _env_features_to_dataset_features(env_features)
fps = env.unwrapped.metadata.get("render_fps", 30)
recording_datasets = []
for i in range(env.num_envs):
multi_env = env.num_envs > 1
root = str(recording_dir / f"env_{i}") if multi_env else str(recording_dir)
base_repo_id = recording_repo_id or "eval_recording"
repo_id = f"{base_repo_id}_env_{i}" if multi_env else base_repo_id
recording_datasets.append(
LeRobotDataset.create(
repo_id=repo_id,
fps=fps,
features=features,
root=root,
use_videos=True,
)
)
raw_observation = deepcopy(observation)
try:
task_desc = list(env.call("task_description"))[0]
except (AttributeError, NotImplementedError):
task_desc = ""
all_observations = []
all_actions = []
all_rewards = []
@@ -217,6 +308,26 @@ def rollout(
else:
successes = [False] * env.num_envs
if recording_datasets is not None and raw_observation is not None:
prev_done = done.copy()
for env_idx in range(env.num_envs):
if prev_done[env_idx]:
continue
frame = _build_raw_frame(
raw_observation,
env_idx,
action_numpy[env_idx],
reward[env_idx],
successes[env_idx],
bool(terminated[env_idx] | truncated[env_idx]),
task_desc,
recording_datasets[env_idx].features,
)
recording_datasets[env_idx].add_frame(frame)
if terminated[env_idx] or truncated[env_idx]:
recording_datasets[env_idx].save_episode()
raw_observation = deepcopy(observation)
# Keep track of which environments are done so far.
# Mark the episode as done if we reach the maximum step limit.
# This ensures that the rollout always terminates cleanly at `max_steps`,
@@ -255,6 +366,12 @@ def rollout(
stacked_observations[key] = torch.stack([obs[key] for obs in all_observations], dim=1)
ret[OBS_STR] = stacked_observations
if recording_datasets is not None:
for ds in recording_datasets:
ds.finalize()
if recording_repo_id is not None:
ds.push_to_hub(private=recording_private)
if hasattr(policy, "use_original_modules"):
policy.use_original_modules()
@@ -273,6 +390,10 @@ def eval_policy(
videos_dir: Path | None = None,
return_episode_data: bool = False,
start_seed: int | None = None,
recording_dir: Path | None = None,
env_features: dict | None = None,
recording_repo_id: str | None = None,
recording_private: bool = False,
) -> dict:
"""
Args:
@@ -361,6 +482,10 @@ def eval_policy(
seeds=list(seeds) if seeds else None,
return_observations=return_episode_data,
render_callback=render_frame if max_episodes_rendered > 0 else None,
recording_dir=recording_dir,
env_features=env_features,
recording_repo_id=recording_repo_id,
recording_private=recording_private,
)
# Figure out where in each rollout sequence the first done condition was encountered (results after
@@ -563,6 +688,10 @@ def eval_main(cfg: EvalPipelineConfig):
# Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)
recording_dir = Path(cfg.output_dir) / "recordings" if cfg.eval.recording else None
max_episodes_rendered = 0 if cfg.eval.recording else 10
videos_dir = None if cfg.eval.recording else Path(cfg.output_dir) / "videos"
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
info = eval_policy_all(
envs=envs,
@@ -572,10 +701,15 @@ def eval_main(cfg: EvalPipelineConfig):
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=cfg.eval.n_episodes,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
max_episodes_rendered=max_episodes_rendered,
videos_dir=videos_dir,
return_episode_data=False,
start_seed=cfg.seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
recording_dir=recording_dir,
env_features=cfg.env.features if cfg.eval.recording else None,
recording_repo_id=cfg.eval.recording_repo_id,
recording_private=cfg.eval.recording_private,
)
print("Overall Aggregated Metrics:")
print(info["overall"])
@@ -618,6 +752,10 @@ def eval_one(
videos_dir: Path | None,
return_episode_data: bool,
start_seed: int | None,
recording_dir: Path | None = None,
env_features: dict | None = None,
recording_repo_id: str | None = None,
recording_private: bool = False,
) -> TaskMetrics:
"""Evaluates one task_id of one suite using the provided vec env."""
@@ -635,6 +773,10 @@ def eval_one(
videos_dir=task_videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
recording_dir=recording_dir,
env_features=env_features,
recording_repo_id=recording_repo_id,
recording_private=recording_private,
)
per_episode = task_result["per_episode"]
@@ -661,6 +803,10 @@ def run_one(
videos_dir: Path | None,
return_episode_data: bool,
start_seed: int | None,
recording_dir: Path | None = None,
env_features: dict | None = None,
recording_repo_id: str | None = None,
recording_private: bool = False,
):
"""
Run eval_one for a single (task_group, task_id, env).
@@ -672,7 +818,13 @@ def run_one(
task_videos_dir = videos_dir / f"{task_group}_{task_id}"
task_videos_dir.mkdir(parents=True, exist_ok=True)
# Call the existing eval_one (assumed to return TaskMetrics-like dict)
task_recording_dir = None
task_repo_id = None
if recording_dir is not None and env_features is not None:
task_recording_dir = recording_dir / f"{task_group}_{task_id}"
if recording_repo_id is not None:
task_repo_id = f"{recording_repo_id}_{task_group}_{task_id}"
metrics = eval_one(
env,
policy=policy,
@@ -685,8 +837,12 @@ def run_one(
videos_dir=task_videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
recording_dir=task_recording_dir,
env_features=env_features,
recording_repo_id=task_repo_id,
recording_private=recording_private,
)
# ensure we always provide video_paths key to simplify accumulation
if max_episodes_rendered > 0:
metrics.setdefault("video_paths", [])
return task_group, task_id, metrics
@@ -702,6 +858,10 @@ def eval_policy_all(
n_episodes: int,
*,
max_episodes_rendered: int = 0,
recording_dir: Path | None = None,
env_features: dict | None = None,
recording_repo_id: str | None = None,
recording_private: bool = False,
videos_dir: Path | None = None,
return_episode_data: bool = False,
start_seed: int | None = None,
@@ -761,6 +921,10 @@ def eval_policy_all(
videos_dir=videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
recording_dir=recording_dir,
env_features=env_features,
recording_repo_id=recording_repo_id,
recording_private=recording_private,
)
if max_parallel_tasks <= 1:
+73
View File
@@ -28,6 +28,7 @@ import pytest
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
pytest.importorskip("pandas", reason="pandas is required (install lerobot[dataset])")
import pandas as pd # noqa: E402
import pyarrow.parquet as pq # noqa: E402
from lerobot.annotations.steerable_pipeline.reader import iter_episodes # noqa: E402
@@ -344,6 +345,78 @@ def test_annotation_metadata_sync_allows_non_streaming_load(
assert len(dataset) == 24
def _build_packed_dataset(root: Path, episode_lengths: list[int], *, fps: int = 10) -> Path:
"""Pack several episodes into a single shard (vs build_annotation_dataset's one-per-file),
so the writer's rewrite must re-emit one row group per episode instead of collapsing them."""
from lerobot.datasets.io_utils import write_tasks
from lerobot.utils.io_utils import write_json
data_dir = root / "data" / "chunk-000"
data_dir.mkdir(parents=True, exist_ok=True)
episode_index, frame_index, timestamp, task_index, subtask_index = [], [], [], [], []
for ep, length in enumerate(episode_lengths):
episode_index += [ep] * length
frame_index += list(range(length))
timestamp += [round(i / fps, 6) for i in range(length)]
task_index += [0] * length
subtask_index += [0] * length # legacy column the writer must drop
pd.DataFrame(
{
"episode_index": episode_index,
"frame_index": frame_index,
"timestamp": timestamp,
"task_index": task_index,
"subtask_index": subtask_index,
}
).to_parquet(data_dir / "file-000.parquet", index=False)
tasks_df = pd.DataFrame({"task_index": [0]}, index=pd.Index(["do the thing"], name="task"))
write_tasks(tasks_df, root)
write_json(
{"codebase_version": "v3.1", "fps": fps, "features": {}, "total_episodes": len(episode_lengths)},
root / "meta" / "info.json",
)
return root
def test_writer_one_row_group_per_episode(tmp_path: Path) -> None:
"""Rewriting a packed shard must keep one row group per episode, not collapse
every episode into a single giant row group."""
episode_lengths = [4, 6, 5] # unequal lengths, all in one shard
root = _build_packed_dataset(tmp_path / "ds", episode_lengths)
shard = root / "data" / "chunk-000" / "file-000.parquet"
assert pq.ParquetFile(shard).metadata.num_row_groups == 1, "fixture should start collapsed"
staging_dir = tmp_path / "stage"
for ep in range(len(episode_lengths)):
_stage_episode(
staging_dir,
ep,
plan=[
{
"role": "assistant",
"content": f"subtask for ep {ep}",
"style": "subtask",
"timestamp": 0.0,
"tool_calls": None,
}
],
)
records = list(iter_episodes(root))
LanguageColumnsWriter().write_all(records, staging_dir, root)
# One row group per episode, with row counts matching the episode lengths.
md = pq.ParquetFile(shard).metadata
assert md.num_row_groups == len(episode_lengths)
assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == episode_lengths
# Language columns are still present after the per-episode rewrite.
table = pq.read_table(shard)
assert "language_persistent" in table.column_names
assert "language_events" in table.column_names
def test_speech_atom_shape_matches_plan_spec() -> None:
atom = speech_atom(2.5, "I'm cleaning up!")
assert atom["role"] == "assistant"
+55
View File
@@ -32,6 +32,26 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
from tests.fixtures.constants import DUMMY_REPO_ID
def assert_data_shards_one_row_group_per_episode(root):
"""Every aggregated DATA shard must have exactly one parquet row group per episode."""
import pyarrow.parquet as pq
shards = sorted((root / "data").rglob("*.parquet"))
assert shards, f"no data shards found under {root}/data"
n_episodes = 0
for shard in shards:
pf = pq.ParquetFile(shard)
episodes = pf.read(columns=["episode_index"]).column("episode_index").to_pylist()
assert pf.metadata.num_row_groups == len(set(episodes)), shard
for i in range(pf.metadata.num_row_groups):
rg_episodes = set(
pf.read_row_group(i, columns=["episode_index"]).column("episode_index").to_pylist()
)
assert len(rg_episodes) == 1, f"{shard} row group {i} spans episodes {rg_episodes}"
n_episodes += len(set(episodes))
return n_episodes
def assert_episode_and_frame_counts(aggr_ds, expected_episodes, expected_frames):
"""Test that total number of episodes and frames are correctly aggregated."""
assert aggr_ds.num_episodes == expected_episodes, (
@@ -566,6 +586,41 @@ def assert_image_frames_integrity(aggr_ds, ds_0, ds_1):
)
@pytest.mark.parametrize("use_videos", [True, False], ids=["video", "image"])
def test_aggregate_one_row_group_per_episode(tmp_path, lerobot_dataset_factory, use_videos):
"""Aggregated DATA shards keep one row group per episode (not one collapsed group).
Covers both the non-image (``df.to_parquet``) and image
(``to_parquet_with_hf_images``) write branches, including the merge-into-
existing-file branch via a low file-size threshold that forces packing.
"""
ds_0 = lerobot_dataset_factory(
root=tmp_path / "rg_0",
repo_id=f"{DUMMY_REPO_ID}_rg_0",
total_episodes=3,
total_frames=60,
use_videos=use_videos,
)
ds_1 = lerobot_dataset_factory(
root=tmp_path / "rg_1",
repo_id=f"{DUMMY_REPO_ID}_rg_1",
total_episodes=4,
total_frames=80,
use_videos=use_videos,
)
aggr_root = tmp_path / "rg_aggr"
aggregate_datasets(
repo_ids=[ds_0.repo_id, ds_1.repo_id],
roots=[ds_0.root, ds_1.root],
aggr_repo_id=f"{DUMMY_REPO_ID}_rg_aggr",
aggr_root=aggr_root,
)
n_episodes = assert_data_shards_one_row_group_per_episode(aggr_root)
assert n_episodes == ds_0.num_episodes + ds_1.num_episodes
def test_aggregate_image_datasets(tmp_path, lerobot_dataset_factory):
"""Test aggregation of image-based datasets preserves HuggingFace Image schema.
+16 -1
View File
@@ -51,7 +51,7 @@ from lerobot.robots import make_robot_from_config
from lerobot.transforms import ImageTransforms, ImageTransformsConfig
from lerobot.utils.constants import ACTION, DONE, OBS_IMAGES, OBS_STATE, OBS_STR, REWARD
from lerobot.utils.feature_utils import hw_to_dataset_features
from tests.fixtures.constants import DUMMY_CHW, DUMMY_HWC, DUMMY_REPO_ID
from tests.fixtures.constants import DUMMY_CHW, DUMMY_HWC, DUMMY_MOTOR_FEATURES, DUMMY_REPO_ID
from tests.mocks.mock_robot import MockRobotConfig
from tests.utils import require_x86_64_kernel
@@ -133,6 +133,21 @@ def test_dataset_feature_with_forward_slash_raises_error():
)
def test_create_does_not_mutate_input_features(tmp_path, empty_lerobot_dataset_factory):
# ``create`` must deep-copy features so a dataset built from another's features stays independent.
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds1", features=DUMMY_MOTOR_FEATURES, use_videos=False
)
dataset_copy = empty_lerobot_dataset_factory(
root=tmp_path / "ds2", features=dataset.meta.features, use_videos=False
)
original_shape = dataset.meta.info.features["state"]["shape"]
dataset_copy.meta.info.features["state"]["shape"] = (999,)
assert dataset.meta.info.features["state"]["shape"] == original_shape
def test_add_frame_missing_task(tmp_path, empty_lerobot_dataset_factory):
features = {"state": {"dtype": "float32", "shape": (1,), "names": None}}
dataset = empty_lerobot_dataset_factory(root=tmp_path / "test", features=features)