diff --git a/src/lerobot/annotations/steerable_pipeline/writer.py b/src/lerobot/annotations/steerable_pipeline/writer.py index e1a544c80..70be0c84c 100644 --- a/src/lerobot/annotations/steerable_pipeline/writer.py +++ b/src/lerobot/annotations/steerable_pipeline/writer.py @@ -54,6 +54,7 @@ from typing import Any import pyarrow as pa import pyarrow.parquet as pq +from lerobot.datasets.io_utils import write_table_one_row_group_per_episode from lerobot.datasets.language import ( EVENT_ONLY_STYLES, LANGUAGE_EVENTS, @@ -274,12 +275,11 @@ class LanguageColumnsWriter: new_table = self._materialize_table( table, per_row_persistent, per_row_events, drop_old=self.drop_existing_subtask_index ) - # Atomic replace: write to a sibling tmp path and rename so a crash - # mid-write can't leave a half-written shard that ``pq.read_table`` - # would then fail to open. ``Path.replace`` is atomic on POSIX + - # Windows when source and target sit on the same filesystem. + # Re-emit one row group per episode (a bulk pq.write_table would collapse + # them into one). Write to a sibling tmp path and atomically rename so a + # crash mid-write can't leave a half-written shard. tmp_path = path.with_suffix(path.suffix + ".tmp") - pq.write_table(new_table, tmp_path) + write_table_one_row_group_per_episode(new_table, tmp_path) tmp_path.replace(path) def _materialize_table( diff --git a/src/lerobot/datasets/aggregate.py b/src/lerobot/datasets/aggregate.py index 07da5b039..f5bf70eba 100644 --- a/src/lerobot/datasets/aggregate.py +++ b/src/lerobot/datasets/aggregate.py @@ -32,6 +32,7 @@ from .feature_utils import features_equal_for_merge, get_hf_features_from_featur from .io_utils import ( get_file_size_in_mb, get_parquet_file_size_in_mb, + to_parquet_one_row_group_per_episode, to_parquet_with_hf_images, write_info, write_stats, @@ -551,6 +552,7 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si aggr_root=dst_meta.root, hf_features=hf_features, concatenate=concatenate_data, + one_row_group_per_episode=True, ) # Record the mapping from source to actual destination @@ -628,6 +630,7 @@ def append_or_create_parquet_file( aggr_root: Path = None, hf_features: datasets.Features | None = None, concatenate: bool = True, + one_row_group_per_episode: bool = False, ) -> tuple[dict[str, int], tuple[int, int]]: """Appends data to an existing parquet file or creates a new one based on size constraints. @@ -645,6 +648,8 @@ def append_or_create_parquet_file( aggr_root: Root path for the aggregated dataset. hf_features: Optional HuggingFace Features schema for proper image typing. concatenate: When False, always rotate to a new file instead of appending to the current one. + one_row_group_per_episode: True for DATA parquet (emit one row group per episode); False for + the episodes-metadata parquet (already one row per episode). Returns: tuple: (updated_idx, (dst_chunk, dst_file)) where updated_idx is the index dict @@ -657,6 +662,8 @@ def append_or_create_parquet_file( dst_path.parent.mkdir(parents=True, exist_ok=True) if contains_images: to_parquet_with_hf_images(df, dst_path, features=hf_features) + elif one_row_group_per_episode: + to_parquet_one_row_group_per_episode(df, dst_path) else: df.to_parquet(dst_path) return idx, (dst_chunk, dst_file) @@ -683,6 +690,8 @@ def append_or_create_parquet_file( if contains_images: to_parquet_with_hf_images(final_df, target_path, features=hf_features) + elif one_row_group_per_episode: + to_parquet_one_row_group_per_episode(final_df, target_path) else: final_df.to_parquet(target_path) diff --git a/src/lerobot/datasets/io_utils.py b/src/lerobot/datasets/io_utils.py index a41f34704..b6344942c 100644 --- a/src/lerobot/datasets/io_utils.py +++ b/src/lerobot/datasets/io_utils.py @@ -20,6 +20,7 @@ import datasets import numpy as np import pandas import pandas as pd +import pyarrow as pa import pyarrow.dataset as pa_ds import pyarrow.parquet as pq import torch @@ -270,21 +271,49 @@ def hf_transform_to_torch(items_dict: dict[str, list[Any]]) -> dict[str, list[to return items_dict +def write_table_one_row_group_per_episode(table: pa.Table, path: Path) -> None: + """Write ``table`` with one parquet row group per episode (in episode order). + + Keeps shards random-access friendly (``read_row_group(i)`` fetches episode i), + mirroring the recording writer. ``table`` must carry a contiguous + ``episode_index`` column. + """ + episode_index = table.column("episode_index").to_numpy(zero_copy_only=False) + starts = np.concatenate(([0], np.nonzero(np.diff(episode_index))[0] + 1)) + writer = pq.ParquetWriter(str(path), table.schema, compression="snappy", use_dictionary=True) + try: + for start, stop in zip(starts, np.append(starts[1:], len(episode_index)), strict=True): + writer.write_table(table.slice(start, stop - start)) # one episode -> one row group + finally: + writer.close() + + def to_parquet_with_hf_images( df: pandas.DataFrame, path: Path, features: datasets.Features | None = None ) -> None: - """This function correctly writes to parquet a panda DataFrame that contains images encoded by HF dataset. - This way, it can be loaded by HF dataset and correctly formatted images are returned. + """Write a DataFrame with HF-encoded images to parquet, one row group per episode. - Args: - df: DataFrame to write to parquet. - path: Path to write the parquet file. - features: Optional HuggingFace Features schema. If provided, ensures image columns - are properly typed as Image() in the parquet schema. + Images are embedded into the arrow table first (``ParquetWriter.write_table`` + does not embed external image files like ``Dataset.to_parquet`` does). + ``features`` types image columns as ``Image()`` in the parquet schema. """ - # TODO(qlhoest): replace this weird synthax by `df.to_parquet(path)` only ds = datasets.Dataset.from_dict(df.to_dict(orient="list"), features=features) - ds.to_parquet(path) + ds = embed_images(ds) + table = ds.with_format("arrow")[:] + if "episode_index" in table.column_names: + write_table_one_row_group_per_episode(table, path) + else: + # No episode boundaries to align row groups to — keep a single write. + pq.write_table(table, str(path)) + + +def to_parquet_one_row_group_per_episode(df: pandas.DataFrame, path: Path) -> None: + """Write a (non-image) DataFrame to parquet with one row group per episode.""" + table = pa.Table.from_pandas(df, preserve_index=False) + if "episode_index" in table.column_names: + write_table_one_row_group_per_episode(table, path) + else: + pq.write_table(table, str(path)) def item_to_torch(item: dict) -> dict: diff --git a/tests/annotations/test_writer.py b/tests/annotations/test_writer.py index 0ea550327..184374f5b 100644 --- a/tests/annotations/test_writer.py +++ b/tests/annotations/test_writer.py @@ -28,6 +28,7 @@ import pytest pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])") pytest.importorskip("pandas", reason="pandas is required (install lerobot[dataset])") +import pandas as pd # noqa: E402 import pyarrow.parquet as pq # noqa: E402 from lerobot.annotations.steerable_pipeline.reader import iter_episodes # noqa: E402 @@ -344,6 +345,78 @@ def test_annotation_metadata_sync_allows_non_streaming_load( assert len(dataset) == 24 +def _build_packed_dataset(root: Path, episode_lengths: list[int], *, fps: int = 10) -> Path: + """Pack several episodes into a single shard (vs build_annotation_dataset's one-per-file), + so the writer's rewrite must re-emit one row group per episode instead of collapsing them.""" + from lerobot.datasets.io_utils import write_tasks + from lerobot.utils.io_utils import write_json + + data_dir = root / "data" / "chunk-000" + data_dir.mkdir(parents=True, exist_ok=True) + + episode_index, frame_index, timestamp, task_index, subtask_index = [], [], [], [], [] + for ep, length in enumerate(episode_lengths): + episode_index += [ep] * length + frame_index += list(range(length)) + timestamp += [round(i / fps, 6) for i in range(length)] + task_index += [0] * length + subtask_index += [0] * length # legacy column the writer must drop + pd.DataFrame( + { + "episode_index": episode_index, + "frame_index": frame_index, + "timestamp": timestamp, + "task_index": task_index, + "subtask_index": subtask_index, + } + ).to_parquet(data_dir / "file-000.parquet", index=False) + + tasks_df = pd.DataFrame({"task_index": [0]}, index=pd.Index(["do the thing"], name="task")) + write_tasks(tasks_df, root) + write_json( + {"codebase_version": "v3.1", "fps": fps, "features": {}, "total_episodes": len(episode_lengths)}, + root / "meta" / "info.json", + ) + return root + + +def test_writer_one_row_group_per_episode(tmp_path: Path) -> None: + """Rewriting a packed shard must keep one row group per episode, not collapse + every episode into a single giant row group.""" + episode_lengths = [4, 6, 5] # unequal lengths, all in one shard + root = _build_packed_dataset(tmp_path / "ds", episode_lengths) + shard = root / "data" / "chunk-000" / "file-000.parquet" + assert pq.ParquetFile(shard).metadata.num_row_groups == 1, "fixture should start collapsed" + + staging_dir = tmp_path / "stage" + for ep in range(len(episode_lengths)): + _stage_episode( + staging_dir, + ep, + plan=[ + { + "role": "assistant", + "content": f"subtask for ep {ep}", + "style": "subtask", + "timestamp": 0.0, + "tool_calls": None, + } + ], + ) + + records = list(iter_episodes(root)) + LanguageColumnsWriter().write_all(records, staging_dir, root) + + # One row group per episode, with row counts matching the episode lengths. + md = pq.ParquetFile(shard).metadata + assert md.num_row_groups == len(episode_lengths) + assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == episode_lengths + # Language columns are still present after the per-episode rewrite. + table = pq.read_table(shard) + assert "language_persistent" in table.column_names + assert "language_events" in table.column_names + + def test_speech_atom_shape_matches_plan_spec() -> None: atom = speech_atom(2.5, "I'm cleaning up!") assert atom["role"] == "assistant" diff --git a/tests/datasets/test_aggregate.py b/tests/datasets/test_aggregate.py index f3edc3af8..e9930575f 100644 --- a/tests/datasets/test_aggregate.py +++ b/tests/datasets/test_aggregate.py @@ -32,6 +32,26 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset from tests.fixtures.constants import DUMMY_REPO_ID +def assert_data_shards_one_row_group_per_episode(root): + """Every aggregated DATA shard must have exactly one parquet row group per episode.""" + import pyarrow.parquet as pq + + shards = sorted((root / "data").rglob("*.parquet")) + assert shards, f"no data shards found under {root}/data" + n_episodes = 0 + for shard in shards: + pf = pq.ParquetFile(shard) + episodes = pf.read(columns=["episode_index"]).column("episode_index").to_pylist() + assert pf.metadata.num_row_groups == len(set(episodes)), shard + for i in range(pf.metadata.num_row_groups): + rg_episodes = set( + pf.read_row_group(i, columns=["episode_index"]).column("episode_index").to_pylist() + ) + assert len(rg_episodes) == 1, f"{shard} row group {i} spans episodes {rg_episodes}" + n_episodes += len(set(episodes)) + return n_episodes + + def assert_episode_and_frame_counts(aggr_ds, expected_episodes, expected_frames): """Test that total number of episodes and frames are correctly aggregated.""" assert aggr_ds.num_episodes == expected_episodes, ( @@ -566,6 +586,41 @@ def assert_image_frames_integrity(aggr_ds, ds_0, ds_1): ) +@pytest.mark.parametrize("use_videos", [True, False], ids=["video", "image"]) +def test_aggregate_one_row_group_per_episode(tmp_path, lerobot_dataset_factory, use_videos): + """Aggregated DATA shards keep one row group per episode (not one collapsed group). + + Covers both the non-image (``df.to_parquet``) and image + (``to_parquet_with_hf_images``) write branches, including the merge-into- + existing-file branch via a low file-size threshold that forces packing. + """ + ds_0 = lerobot_dataset_factory( + root=tmp_path / "rg_0", + repo_id=f"{DUMMY_REPO_ID}_rg_0", + total_episodes=3, + total_frames=60, + use_videos=use_videos, + ) + ds_1 = lerobot_dataset_factory( + root=tmp_path / "rg_1", + repo_id=f"{DUMMY_REPO_ID}_rg_1", + total_episodes=4, + total_frames=80, + use_videos=use_videos, + ) + + aggr_root = tmp_path / "rg_aggr" + aggregate_datasets( + repo_ids=[ds_0.repo_id, ds_1.repo_id], + roots=[ds_0.root, ds_1.root], + aggr_repo_id=f"{DUMMY_REPO_ID}_rg_aggr", + aggr_root=aggr_root, + ) + + n_episodes = assert_data_shards_one_row_group_per_episode(aggr_root) + assert n_episodes == ds_0.num_episodes + ds_1.num_episodes + + def test_aggregate_image_datasets(tmp_path, lerobot_dataset_factory): """Test aggregation of image-based datasets preserves HuggingFace Image schema.