From 6906178b393749cb87429f632284192d6b363b8e Mon Sep 17 00:00:00 2001 From: CarolinePascal Date: Thu, 5 Mar 2026 13:19:11 +0100 Subject: [PATCH] fix(pandas->parquet): switching from a pandas to a native parquet approach for aggregating and editing datasets --- src/lerobot/datasets/aggregate.py | 78 +++++++++++++--------- src/lerobot/datasets/dataset_tools.py | 95 ++++++++++++++++----------- 2 files changed, 103 insertions(+), 70 deletions(-) diff --git a/src/lerobot/datasets/aggregate.py b/src/lerobot/datasets/aggregate.py index b32116233..c0605aed7 100644 --- a/src/lerobot/datasets/aggregate.py +++ b/src/lerobot/datasets/aggregate.py @@ -21,6 +21,8 @@ from pathlib import Path import datasets import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq import tqdm from lerobot.datasets.compute_stats import aggregate_stats @@ -35,7 +37,6 @@ from lerobot.datasets.utils import ( get_file_size_in_mb, get_hf_features_from_features, get_parquet_file_size_in_mb, - to_parquet_with_hf_images, update_chunk_file_indices, write_info, write_stats, @@ -80,28 +81,41 @@ def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]): return fps, robot_type, features -def update_data_df(df, src_meta, dst_meta): - """Updates a data DataFrame with new indices and task mappings for aggregation. +def update_data_table(table: pa.Table, src_meta, dst_meta) -> pa.Table: + """Updates a pyarrow Table with new indices and task mappings for aggregation. Adjusts episode indices, frame indices, and task indices to account for previously aggregated data in the destination dataset. Args: - df: DataFrame containing the data to be updated. + table: pyarrow Table containing the data to be updated. src_meta: Source dataset metadata. dst_meta: Destination dataset metadata. Returns: - pd.DataFrame: Updated DataFrame with adjusted indices. + pa.Table: Updated Table with adjusted indices. """ + ep_offset = dst_meta.info["total_episodes"] + idx_offset = dst_meta.info["total_frames"] - df["episode_index"] = df["episode_index"] + dst_meta.info["total_episodes"] - df["index"] = df["index"] + dst_meta.info["total_frames"] + ep_col = table.column("episode_index") + new_ep = pa.array([v + ep_offset for v in ep_col.to_pylist()], type=ep_col.type) + table = table.set_column(table.column_names.index("episode_index"), "episode_index", new_ep) - src_task_names = src_meta.tasks.index.take(df["task_index"].to_numpy()) - df["task_index"] = dst_meta.tasks.loc[src_task_names, "task_index"].to_numpy() + idx_col = table.column("index") + new_idx = pa.array([v + idx_offset for v in idx_col.to_pylist()], type=idx_col.type) + table = table.set_column(table.column_names.index("index"), "index", new_idx) - return df + old_task_indices = table.column("task_index").to_pylist() + src_task_names = src_meta.tasks.index.take(old_task_indices) + new_task_indices = dst_meta.tasks.loc[src_task_names, "task_index"].to_numpy().tolist() + table = table.set_column( + table.column_names.index("task_index"), + "task_index", + pa.array(new_task_indices, type=table.schema.field("task_index").type), + ) + + return table def update_meta_data( @@ -468,18 +482,13 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si src_path = src_meta.root / DEFAULT_DATA_PATH.format( chunk_index=src_chunk_idx, file_index=src_file_idx ) - if contains_images: - # Use HuggingFace datasets to read source data to preserve image format - src_ds = datasets.Dataset.from_parquet(str(src_path)) - df = src_ds.to_pandas() - else: - df = pd.read_parquet(src_path) - df = update_data_df(df, src_meta, dst_meta) + table = pq.read_table(src_path) + table = update_data_table(table, src_meta, dst_meta) # Write data and get the actual destination file it was written to # This avoids duplicating the rotation logic here data_idx, (dst_chunk, dst_file) = append_or_create_parquet_file( - df, + table, src_path, data_idx, data_files_size_in_mb, @@ -554,8 +563,16 @@ def aggregate_metadata(src_meta, dst_meta, meta_idx, data_idx, videos_idx): return meta_idx +def _write_table_with_hf_images( + table: pa.Table, path: Path, features: datasets.Features | None = None +) -> None: + """Write a pyarrow Table to parquet with proper HF image encoding.""" + ds = datasets.Dataset.from_dict(table.to_pydict(), features=features) + ds.to_parquet(path) + + def append_or_create_parquet_file( - df: pd.DataFrame, + data: pd.DataFrame | pa.Table, src_path: Path, idx: dict[str, int], max_mb: float, @@ -571,7 +588,7 @@ def append_or_create_parquet_file( from becoming too large. Handles both regular parquet files and those containing images. Args: - df: DataFrame to write to the parquet file. + data: Data to write, as a pandas DataFrame or pyarrow Table. src_path: Path to the source file (used for size estimation). idx: Dictionary containing current 'chunk' and 'file' indices. max_mb: Maximum allowed file size in MB before rotation. @@ -585,15 +602,17 @@ def append_or_create_parquet_file( tuple: (updated_idx, (dst_chunk, dst_file)) where updated_idx is the index dict and (dst_chunk, dst_file) is the actual destination file the data was written to. """ + table = data if isinstance(data, pa.Table) else pa.Table.from_pandas(data) + dst_chunk, dst_file = idx["chunk"], idx["file"] dst_path = aggr_root / default_path.format(chunk_index=dst_chunk, file_index=dst_file) if not dst_path.exists(): dst_path.parent.mkdir(parents=True, exist_ok=True) if contains_images: - to_parquet_with_hf_images(df, dst_path, features=hf_features) + _write_table_with_hf_images(table, dst_path, features=hf_features) else: - df.to_parquet(dst_path) + pq.write_table(table, dst_path) return idx, (dst_chunk, dst_file) src_size = get_parquet_file_size_in_mb(src_path) @@ -604,22 +623,17 @@ def append_or_create_parquet_file( dst_chunk, dst_file = idx["chunk"], idx["file"] new_path = aggr_root / default_path.format(chunk_index=dst_chunk, file_index=dst_file) new_path.parent.mkdir(parents=True, exist_ok=True) - final_df = df + final_table = table target_path = new_path else: - if contains_images: - # Use HuggingFace datasets to read existing data to preserve image format - existing_ds = datasets.Dataset.from_parquet(str(dst_path)) - existing_df = existing_ds.to_pandas() - else: - existing_df = pd.read_parquet(dst_path) - final_df = pd.concat([existing_df, df], ignore_index=True) + existing_table = pq.read_table(dst_path) + final_table = pa.concat_tables([existing_table, table], promote_options="permissive") target_path = dst_path if contains_images: - to_parquet_with_hf_images(final_df, target_path, features=hf_features) + _write_table_with_hf_images(final_table, target_path, features=hf_features) else: - final_df.to_parquet(target_path) + pq.write_table(final_table, target_path) return idx, (dst_chunk, dst_file) diff --git a/src/lerobot/datasets/dataset_tools.py b/src/lerobot/datasets/dataset_tools.py index 546b3d67f..52efe6a6f 100644 --- a/src/lerobot/datasets/dataset_tools.py +++ b/src/lerobot/datasets/dataset_tools.py @@ -32,6 +32,9 @@ from pathlib import Path import datasets import numpy as np import pandas as pd +import pyarrow as pa +import pyarrow.compute as pc +import pyarrow.dataset as pa_ds import pyarrow.parquet as pq import torch from tqdm import tqdm @@ -496,13 +499,16 @@ def _copy_and_reindex_data( global_index = 0 episode_data_metadata: dict[int, dict] = {} + episode_keys = list(episode_mapping.keys()) + ep_filter = pa_ds.field("episode_index").isin(episode_keys) + if dst_meta.tasks is None: - all_task_indices = set() + all_task_indices: set[int] = set() for src_path in file_to_episodes: - df = pd.read_parquet(src_dataset.root / src_path) - mask = df["episode_index"].isin(list(episode_mapping.keys())) - task_series: pd.Series = df[mask]["task_index"] - all_task_indices.update(task_series.unique().tolist()) + table = pq.read_table( + src_dataset.root / src_path, columns=["episode_index", "task_index"], filters=ep_filter + ) + all_task_indices.update(pc.unique(table.column("task_index")).to_pylist()) tasks = [src_dataset.meta.tasks.iloc[idx].name for idx in all_task_indices] dst_meta.save_episode_tasks(list(set(tasks))) @@ -514,52 +520,41 @@ def _copy_and_reindex_data( task_mapping[old_task_idx] = new_task_idx for src_path in tqdm(sorted(file_to_episodes.keys()), desc="Processing data files"): - df = pd.read_parquet(src_dataset.root / src_path) - - all_episodes_in_file = set(df["episode_index"].unique()) + table = pq.read_table(src_dataset.root / src_path, filters=ep_filter) episodes_to_keep = file_to_episodes[src_path] - if all_episodes_in_file == episodes_to_keep: - df["episode_index"] = df["episode_index"].replace(episode_mapping) - df["index"] = range(global_index, global_index + len(df)) - df["task_index"] = df["task_index"].replace(task_mapping) + if table.num_rows == 0: + continue - first_ep_old_idx = min(episodes_to_keep) - src_ep = src_dataset.meta.episodes[first_ep_old_idx] - chunk_idx = src_ep["data/chunk_index"] - file_idx = src_ep["data/file_index"] - else: - mask = df["episode_index"].isin(list(episode_mapping.keys())) - df = df[mask].copy().reset_index(drop=True) + table = _replace_column_values(table, "episode_index", episode_mapping) + col_pos = table.column_names.index("index") + new_indices = pa.array(range(global_index, global_index + table.num_rows), type=pa.int64()) + table = table.set_column(col_pos, "index", new_indices) + table = _replace_column_values(table, "task_index", task_mapping) - if len(df) == 0: - continue - - df["episode_index"] = df["episode_index"].replace(episode_mapping) - df["index"] = range(global_index, global_index + len(df)) - df["task_index"] = df["task_index"].replace(task_mapping) - - first_ep_old_idx = min(episodes_to_keep) - src_ep = src_dataset.meta.episodes[first_ep_old_idx] - chunk_idx = src_ep["data/chunk_index"] - file_idx = src_ep["data/file_index"] + first_ep_old_idx = min(episodes_to_keep) + src_ep = src_dataset.meta.episodes[first_ep_old_idx] + chunk_idx = src_ep["data/chunk_index"] + file_idx = src_ep["data/file_index"] dst_path = dst_meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) dst_path.parent.mkdir(parents=True, exist_ok=True) - _write_parquet(df, dst_path, dst_meta) + _write_parquet(table, dst_path, dst_meta) + ep_col = table.column("episode_index").to_pylist() + idx_col = table.column("index").to_pylist() for ep_old_idx in episodes_to_keep: ep_new_idx = episode_mapping[ep_old_idx] - ep_df = df[df["episode_index"] == ep_new_idx] + ep_indices = [idx_col[i] for i, e in enumerate(ep_col) if e == ep_new_idx] episode_data_metadata[ep_new_idx] = { "data/chunk_index": chunk_idx, "data/file_index": file_idx, - "dataset_from_index": int(ep_df["index"].min()), - "dataset_to_index": int(ep_df["index"].max() + 1), + "dataset_from_index": min(ep_indices), + "dataset_to_index": max(ep_indices) + 1, } - global_index += len(df) + global_index += table.num_rows return episode_data_metadata @@ -910,15 +905,39 @@ def _copy_and_reindex_episodes_metadata( write_stats(filtered_stats, dst_meta.root) -def _write_parquet(df: pd.DataFrame, path: Path, meta: LeRobotDatasetMetadata) -> None: - """Write DataFrame to parquet +def _replace_column_values(table: pa.Table, column: str, mapping: dict) -> pa.Table: + """Replace values in a pyarrow Table column using a mapping dict.""" + old_values = table.column(column).to_pylist() + new_values = [mapping.get(v, v) for v in old_values] + col_pos = table.column_names.index(column) + return table.set_column(col_pos, column, pa.array(new_values, type=table.schema.field(column).type)) + + +def _write_parquet( + data: pd.DataFrame | pa.Table | dict, path: Path, meta: LeRobotDatasetMetadata +) -> None: + """Write data to parquet. This ensures images are properly embedded and the file can be loaded correctly by HF datasets. + + Args: + data: Input data as a pandas DataFrame, pyarrow Table, or dict of lists. + path: Destination parquet file path. + meta: Dataset metadata for feature schema. """ from lerobot.datasets.utils import embed_images, get_hf_features_from_features + if isinstance(data, pd.DataFrame): + data_dict = data.to_dict(orient="list") + elif isinstance(data, pa.Table): + data_dict = data.to_pydict() + elif isinstance(data, dict): + data_dict = data + else: + raise TypeError(f"Unsupported data type: {type(data)}") + hf_features = get_hf_features_from_features(meta.features) - ep_dataset = datasets.Dataset.from_dict(df.to_dict(orient="list"), features=hf_features, split="train") + ep_dataset = datasets.Dataset.from_dict(data_dict, features=hf_features, split="train") if len(meta.image_keys) > 0: ep_dataset = embed_images(ep_dataset)