Speed up generic converter aggregation (#108)

* Speed up generic converter aggregation

Co-authored-by: Codex <codex@openai.com>

* Vectorize generic metadata video updates

Co-authored-by: Codex <codex@openai.com>

---------

Co-authored-by: Codex <codex@openai.com>
This commit is contained in:
Qizhi Chen
2026-06-21 22:03:10 -07:00
committed by GitHub
parent 67091bc4a7
commit 2baee72741
+314 -5
View File
@@ -39,6 +39,7 @@ class SaveLeRobotDataset(PipelineStep):
f"start processing for {task.input_path}, saving to {task.output_path}"
)
raw_dataset = self.adapter.load_subset(task)
saved_episodes = 0
for episode_index, episode_data in enumerate(raw_dataset):
with self.track_time("saving episode"):
saved = self.adapter.save_episode(
@@ -51,7 +52,14 @@ class SaveLeRobotDataset(PipelineStep):
f"{status} for {dataset.repo_id}, episode {episode_index}, "
f"len {self.adapter.get_episode_length(episode_data)}"
)
if saved is not False:
saved_episodes += 1
dataset.finalize()
if saved_episodes == 0:
logger.info(
f"no episodes saved for {dataset.repo_id}; deleting temp output"
)
shutil.rmtree(task.output_path, ignore_errors=True)
def run_converter(
@@ -131,7 +139,11 @@ def run_converter(
**executor_config,
logging_dir=logging_dir,
).run()
aggregate_tasks(tasks, output_path, aggr_repo_id=local_repo_id)
aggregate_tasks(
tasks,
output_path,
aggr_repo_id=local_repo_id,
)
if cleanup_temp:
logger = setup_logger()
@@ -215,16 +227,313 @@ def aggregate_tasks(
if output_dir.exists():
shutil.rmtree(output_dir)
roots = [task.output_path for task in tasks]
roots = [task.output_path for task in tasks if task.output_path.exists()]
if not roots:
raise ValueError("No temporary datasets were produced; nothing to aggregate.")
resolved_aggr_repo_id = aggr_repo_id or output_dir.name
logger.info(
f"aggregate {len(tasks)} temporary datasets into {output_dir} as {resolved_aggr_repo_id}"
f"aggregate {len(roots)} temporary datasets into {output_dir} as {resolved_aggr_repo_id}"
)
aggregate_datasets(
repo_ids=[None] * len(tasks),
_aggregate_datasets_with_normalized_arrays(
repo_ids=[None] * len(roots),
roots=roots,
aggr_repo_id=resolved_aggr_repo_id,
aggr_root=output_dir,
)
logger.info(f"aggregation complete: {output_dir}")
def _aggregate_datasets_with_normalized_arrays(**kwargs) -> None:
from lerobot.datasets import aggregate as aggregate_module
original_aggregate_videos = aggregate_module.aggregate_videos
original_read_parquet = aggregate_module.pd.read_parquet
original_writer = aggregate_module.to_parquet_one_row_group_per_episode
original_update_meta_data = aggregate_module.update_meta_data
def read_normalized_arrays(*args, **kwargs):
return _normalize_array_values(original_read_parquet(*args, **kwargs))
def write_normalized_arrays(df, path):
return original_writer(_normalize_array_values(df), path)
aggregate_module.aggregate_videos = _aggregate_videos_by_key_parallel
aggregate_module.pd.read_parquet = read_normalized_arrays
aggregate_module.to_parquet_one_row_group_per_episode = write_normalized_arrays
aggregate_module.update_meta_data = _update_meta_data_without_fragmenting
try:
aggregate_datasets(**kwargs)
finally:
aggregate_module.aggregate_videos = original_aggregate_videos
aggregate_module.pd.read_parquet = original_read_parquet
aggregate_module.to_parquet_one_row_group_per_episode = original_writer
aggregate_module.update_meta_data = original_update_meta_data
def _aggregate_videos_by_key_parallel(
src_meta,
dst_meta,
videos_idx,
video_files_size_in_mb,
chunk_size,
concatenate_videos=True,
):
from concurrent.futures import ThreadPoolExecutor
for video_idx in videos_idx.values():
video_idx["episode_duration"] = 0
video_idx["src_to_offset"] = {}
video_idx["src_to_dst"] = {}
if "dst_file_durations" not in video_idx:
video_idx["dst_file_durations"] = {}
def aggregate_key(key):
return (
key,
_aggregate_video_key(
key,
src_meta,
dst_meta,
videos_idx[key],
video_files_size_in_mb,
chunk_size,
concatenate_videos,
),
)
keys = list(videos_idx)
if not keys:
return videos_idx
max_workers = min(len(keys), os.cpu_count() or len(keys))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for key, video_idx in executor.map(aggregate_key, keys):
videos_idx[key] = video_idx
return videos_idx
def _aggregate_video_key(
key,
src_meta,
dst_meta,
video_idx,
video_files_size_in_mb,
chunk_size,
concatenate_videos,
):
from lerobot.datasets import aggregate as aggregate_module
unique_chunk_file_pairs = {
(chunk, file)
for chunk, file in zip(
src_meta.episodes[f"videos/{key}/chunk_index"],
src_meta.episodes[f"videos/{key}/file_index"],
strict=False,
)
}
unique_chunk_file_pairs = sorted(unique_chunk_file_pairs)
chunk_idx = video_idx["chunk"]
file_idx = video_idx["file"]
dst_file_durations = video_idx["dst_file_durations"]
for src_chunk_idx, src_file_idx in unique_chunk_file_pairs:
src_path = src_meta.root / aggregate_module.DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=src_chunk_idx,
file_index=src_file_idx,
)
dst_path = dst_meta.root / aggregate_module.DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=chunk_idx,
file_index=file_idx,
)
src_duration = aggregate_module.get_video_duration_in_s(src_path)
dst_key = (chunk_idx, file_idx)
if not dst_path.exists():
video_idx["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0
video_idx["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key
dst_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(src_path), str(dst_path))
dst_file_durations[dst_key] = src_duration
video_idx["episode_duration"] += src_duration
continue
src_size = aggregate_module.get_file_size_in_mb(src_path)
dst_size = aggregate_module.get_file_size_in_mb(dst_path)
if not concatenate_videos or dst_size + src_size >= video_files_size_in_mb:
chunk_idx, file_idx = aggregate_module.update_chunk_file_indices(
chunk_idx, file_idx, chunk_size
)
dst_key = (chunk_idx, file_idx)
video_idx["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0
video_idx["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key
dst_path = dst_meta.root / aggregate_module.DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=chunk_idx,
file_index=file_idx,
)
dst_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(src_path), str(dst_path))
dst_file_durations[dst_key] = src_duration
else:
current_dst_duration = dst_file_durations.get(dst_key, 0)
video_idx["src_to_offset"][(src_chunk_idx, src_file_idx)] = (
current_dst_duration
)
video_idx["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key
aggregate_module.concatenate_video_files(
[dst_path, src_path],
dst_path,
compatibility_check=True,
)
dst_file_durations[dst_key] = current_dst_duration + src_duration
video_idx["episode_duration"] += src_duration
video_idx["chunk"] = chunk_idx
video_idx["file"] = file_idx
return video_idx
def _update_meta_data_without_fragmenting(df, dst_meta, meta_idx, data_idx, videos_idx):
import pandas as pd
df["meta/episodes/chunk_index"] = (
df["meta/episodes/chunk_index"] + meta_idx["chunk"]
)
df["meta/episodes/file_index"] = df["meta/episodes/file_index"] + meta_idx["file"]
data_src_to_dst = data_idx.get("src_to_dst", {})
if data_src_to_dst:
orig_data_chunk = df["data/chunk_index"].copy()
orig_data_file = df["data/file_index"].copy()
mapping_index = pd.MultiIndex.from_tuples(
list(data_src_to_dst.keys()),
names=["chunk_index", "file_index"],
)
mapping_df = pd.DataFrame(
list(data_src_to_dst.values()),
index=mapping_index,
columns=["dst_chunk", "dst_file"],
)
row_index = pd.MultiIndex.from_arrays(
[orig_data_chunk, orig_data_file],
names=["chunk_index", "file_index"],
)
reindexed = mapping_df.reindex(row_index)
reindexed[["dst_chunk", "dst_file"]] = reindexed[
["dst_chunk", "dst_file"]
].fillna({"dst_chunk": data_idx["chunk"], "dst_file": data_idx["file"]})
df["data/chunk_index"] = reindexed["dst_chunk"].to_numpy()
df["data/file_index"] = reindexed["dst_file"].to_numpy()
else:
df["data/chunk_index"] = df["data/chunk_index"] + data_idx["chunk"]
df["data/file_index"] = df["data/file_index"] + data_idx["file"]
for key, video_idx in videos_idx.items():
orig_chunk_col = f"videos/{key}/chunk_index"
orig_file_col = f"videos/{key}/file_index"
orig_chunks = df[orig_chunk_col].copy()
orig_files = df[orig_file_col].copy()
src_to_offset = video_idx.get("src_to_offset", {})
src_to_dst = video_idx.get("src_to_dst", {})
row_index = pd.MultiIndex.from_arrays(
[orig_chunks, orig_files],
names=["chunk_index", "file_index"],
)
if src_to_dst:
src_keys = list(src_to_dst)
mapping_index = pd.MultiIndex.from_tuples(
src_keys,
names=["chunk_index", "file_index"],
)
mapping_df = pd.DataFrame(
[
(
*src_to_dst[src_key],
src_to_offset.get(src_key, 0.0),
)
for src_key in src_keys
],
index=mapping_index,
columns=["dst_chunk", "dst_file", "offset"],
)
reindexed = mapping_df.reindex(row_index)
df[orig_chunk_col] = (
reindexed["dst_chunk"]
.fillna(video_idx["chunk"])
.astype(orig_chunks.dtype, copy=False)
.to_numpy()
)
df[orig_file_col] = (
reindexed["dst_file"]
.fillna(video_idx["file"])
.astype(orig_files.dtype, copy=False)
.to_numpy()
)
offsets = reindexed["offset"].fillna(0.0).to_numpy(dtype=float)
df[f"videos/{key}/from_timestamp"] += offsets
df[f"videos/{key}/to_timestamp"] += offsets
elif src_to_offset:
df[orig_chunk_col] = video_idx["chunk"]
df[orig_file_col] = video_idx["file"]
mapping_series = pd.Series(src_to_offset, dtype=float)
offsets = mapping_series.reindex(row_index).fillna(0.0).to_numpy()
df[f"videos/{key}/from_timestamp"] += offsets
df[f"videos/{key}/to_timestamp"] += offsets
else:
df[orig_chunk_col] = video_idx["chunk"]
df[orig_file_col] = video_idx["file"]
df[f"videos/{key}/from_timestamp"] = (
df[f"videos/{key}/from_timestamp"] + video_idx["latest_duration"]
)
df[f"videos/{key}/to_timestamp"] = (
df[f"videos/{key}/to_timestamp"] + video_idx["latest_duration"]
)
df["dataset_from_index"] = df["dataset_from_index"] + dst_meta.info.total_frames
df["dataset_to_index"] = df["dataset_to_index"] + dst_meta.info.total_frames
df["episode_index"] = df["episode_index"] + dst_meta.info.total_episodes
return df
def _normalize_array_values(df):
import pandas as pd
df = df.copy()
for column in df.columns:
if _has_array_values(df[column]):
df[column] = pd.Series(
[_normalize_array_value(value) for value in df[column]],
dtype=object,
index=df.index,
)
return df
def _normalize_array_value(value):
import numpy as np
if isinstance(value, np.ndarray) and value.ndim > 1:
return [_normalize_array_value(item) for item in value]
return value
def _has_array_values(series) -> bool:
import numpy as np
for value in series.head(32):
if isinstance(value, np.ndarray):
return True
return False