From 2baee7274162b63db3696c490903b56b053bb580 Mon Sep 17 00:00:00 2001 From: Qizhi Chen Date: Sun, 21 Jun 2026 22:03:10 -0700 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=20Speed=20up=20generic=20converter=20?= =?UTF-8?q?aggregation=20(#108)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Speed up generic converter aggregation Co-authored-by: Codex * Vectorize generic metadata video updates Co-authored-by: Codex --------- Co-authored-by: Codex --- generic_converter/pipeline.py | 319 +++++++++++++++++++++++++++++++++- 1 file changed, 314 insertions(+), 5 deletions(-) diff --git a/generic_converter/pipeline.py b/generic_converter/pipeline.py index 9154b27..edb4d13 100644 --- a/generic_converter/pipeline.py +++ b/generic_converter/pipeline.py @@ -39,6 +39,7 @@ class SaveLeRobotDataset(PipelineStep): f"start processing for {task.input_path}, saving to {task.output_path}" ) raw_dataset = self.adapter.load_subset(task) + saved_episodes = 0 for episode_index, episode_data in enumerate(raw_dataset): with self.track_time("saving episode"): saved = self.adapter.save_episode( @@ -51,7 +52,14 @@ class SaveLeRobotDataset(PipelineStep): f"{status} for {dataset.repo_id}, episode {episode_index}, " f"len {self.adapter.get_episode_length(episode_data)}" ) + if saved is not False: + saved_episodes += 1 dataset.finalize() + if saved_episodes == 0: + logger.info( + f"no episodes saved for {dataset.repo_id}; deleting temp output" + ) + shutil.rmtree(task.output_path, ignore_errors=True) def run_converter( @@ -131,7 +139,11 @@ def run_converter( **executor_config, logging_dir=logging_dir, ).run() - aggregate_tasks(tasks, output_path, aggr_repo_id=local_repo_id) + aggregate_tasks( + tasks, + output_path, + aggr_repo_id=local_repo_id, + ) if cleanup_temp: logger = setup_logger() @@ -215,16 +227,313 @@ def aggregate_tasks( if output_dir.exists(): shutil.rmtree(output_dir) - roots = [task.output_path for task in tasks] + roots = [task.output_path for task in tasks if task.output_path.exists()] + if not roots: + raise ValueError("No temporary datasets were produced; nothing to aggregate.") + resolved_aggr_repo_id = aggr_repo_id or output_dir.name logger.info( - f"aggregate {len(tasks)} temporary datasets into {output_dir} as {resolved_aggr_repo_id}" + f"aggregate {len(roots)} temporary datasets into {output_dir} as {resolved_aggr_repo_id}" ) - aggregate_datasets( - repo_ids=[None] * len(tasks), + _aggregate_datasets_with_normalized_arrays( + repo_ids=[None] * len(roots), roots=roots, aggr_repo_id=resolved_aggr_repo_id, aggr_root=output_dir, ) logger.info(f"aggregation complete: {output_dir}") + + +def _aggregate_datasets_with_normalized_arrays(**kwargs) -> None: + from lerobot.datasets import aggregate as aggregate_module + + original_aggregate_videos = aggregate_module.aggregate_videos + original_read_parquet = aggregate_module.pd.read_parquet + original_writer = aggregate_module.to_parquet_one_row_group_per_episode + original_update_meta_data = aggregate_module.update_meta_data + + def read_normalized_arrays(*args, **kwargs): + return _normalize_array_values(original_read_parquet(*args, **kwargs)) + + def write_normalized_arrays(df, path): + return original_writer(_normalize_array_values(df), path) + + aggregate_module.aggregate_videos = _aggregate_videos_by_key_parallel + aggregate_module.pd.read_parquet = read_normalized_arrays + aggregate_module.to_parquet_one_row_group_per_episode = write_normalized_arrays + aggregate_module.update_meta_data = _update_meta_data_without_fragmenting + try: + aggregate_datasets(**kwargs) + finally: + aggregate_module.aggregate_videos = original_aggregate_videos + aggregate_module.pd.read_parquet = original_read_parquet + aggregate_module.to_parquet_one_row_group_per_episode = original_writer + aggregate_module.update_meta_data = original_update_meta_data + + +def _aggregate_videos_by_key_parallel( + src_meta, + dst_meta, + videos_idx, + video_files_size_in_mb, + chunk_size, + concatenate_videos=True, +): + from concurrent.futures import ThreadPoolExecutor + + for video_idx in videos_idx.values(): + video_idx["episode_duration"] = 0 + video_idx["src_to_offset"] = {} + video_idx["src_to_dst"] = {} + if "dst_file_durations" not in video_idx: + video_idx["dst_file_durations"] = {} + + def aggregate_key(key): + return ( + key, + _aggregate_video_key( + key, + src_meta, + dst_meta, + videos_idx[key], + video_files_size_in_mb, + chunk_size, + concatenate_videos, + ), + ) + + keys = list(videos_idx) + if not keys: + return videos_idx + + max_workers = min(len(keys), os.cpu_count() or len(keys)) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + for key, video_idx in executor.map(aggregate_key, keys): + videos_idx[key] = video_idx + + return videos_idx + + +def _aggregate_video_key( + key, + src_meta, + dst_meta, + video_idx, + video_files_size_in_mb, + chunk_size, + concatenate_videos, +): + from lerobot.datasets import aggregate as aggregate_module + + unique_chunk_file_pairs = { + (chunk, file) + for chunk, file in zip( + src_meta.episodes[f"videos/{key}/chunk_index"], + src_meta.episodes[f"videos/{key}/file_index"], + strict=False, + ) + } + unique_chunk_file_pairs = sorted(unique_chunk_file_pairs) + + chunk_idx = video_idx["chunk"] + file_idx = video_idx["file"] + dst_file_durations = video_idx["dst_file_durations"] + + for src_chunk_idx, src_file_idx in unique_chunk_file_pairs: + src_path = src_meta.root / aggregate_module.DEFAULT_VIDEO_PATH.format( + video_key=key, + chunk_index=src_chunk_idx, + file_index=src_file_idx, + ) + dst_path = dst_meta.root / aggregate_module.DEFAULT_VIDEO_PATH.format( + video_key=key, + chunk_index=chunk_idx, + file_index=file_idx, + ) + + src_duration = aggregate_module.get_video_duration_in_s(src_path) + dst_key = (chunk_idx, file_idx) + + if not dst_path.exists(): + video_idx["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0 + video_idx["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key + dst_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(src_path), str(dst_path)) + dst_file_durations[dst_key] = src_duration + video_idx["episode_duration"] += src_duration + continue + + src_size = aggregate_module.get_file_size_in_mb(src_path) + dst_size = aggregate_module.get_file_size_in_mb(dst_path) + + if not concatenate_videos or dst_size + src_size >= video_files_size_in_mb: + chunk_idx, file_idx = aggregate_module.update_chunk_file_indices( + chunk_idx, file_idx, chunk_size + ) + dst_key = (chunk_idx, file_idx) + video_idx["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0 + video_idx["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key + dst_path = dst_meta.root / aggregate_module.DEFAULT_VIDEO_PATH.format( + video_key=key, + chunk_index=chunk_idx, + file_index=file_idx, + ) + dst_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(src_path), str(dst_path)) + dst_file_durations[dst_key] = src_duration + else: + current_dst_duration = dst_file_durations.get(dst_key, 0) + video_idx["src_to_offset"][(src_chunk_idx, src_file_idx)] = ( + current_dst_duration + ) + video_idx["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key + aggregate_module.concatenate_video_files( + [dst_path, src_path], + dst_path, + compatibility_check=True, + ) + dst_file_durations[dst_key] = current_dst_duration + src_duration + + video_idx["episode_duration"] += src_duration + + video_idx["chunk"] = chunk_idx + video_idx["file"] = file_idx + + return video_idx + + +def _update_meta_data_without_fragmenting(df, dst_meta, meta_idx, data_idx, videos_idx): + import pandas as pd + + df["meta/episodes/chunk_index"] = ( + df["meta/episodes/chunk_index"] + meta_idx["chunk"] + ) + df["meta/episodes/file_index"] = df["meta/episodes/file_index"] + meta_idx["file"] + + data_src_to_dst = data_idx.get("src_to_dst", {}) + if data_src_to_dst: + orig_data_chunk = df["data/chunk_index"].copy() + orig_data_file = df["data/file_index"].copy() + mapping_index = pd.MultiIndex.from_tuples( + list(data_src_to_dst.keys()), + names=["chunk_index", "file_index"], + ) + mapping_df = pd.DataFrame( + list(data_src_to_dst.values()), + index=mapping_index, + columns=["dst_chunk", "dst_file"], + ) + row_index = pd.MultiIndex.from_arrays( + [orig_data_chunk, orig_data_file], + names=["chunk_index", "file_index"], + ) + reindexed = mapping_df.reindex(row_index) + reindexed[["dst_chunk", "dst_file"]] = reindexed[ + ["dst_chunk", "dst_file"] + ].fillna({"dst_chunk": data_idx["chunk"], "dst_file": data_idx["file"]}) + df["data/chunk_index"] = reindexed["dst_chunk"].to_numpy() + df["data/file_index"] = reindexed["dst_file"].to_numpy() + else: + df["data/chunk_index"] = df["data/chunk_index"] + data_idx["chunk"] + df["data/file_index"] = df["data/file_index"] + data_idx["file"] + + for key, video_idx in videos_idx.items(): + orig_chunk_col = f"videos/{key}/chunk_index" + orig_file_col = f"videos/{key}/file_index" + orig_chunks = df[orig_chunk_col].copy() + orig_files = df[orig_file_col].copy() + + src_to_offset = video_idx.get("src_to_offset", {}) + src_to_dst = video_idx.get("src_to_dst", {}) + row_index = pd.MultiIndex.from_arrays( + [orig_chunks, orig_files], + names=["chunk_index", "file_index"], + ) + + if src_to_dst: + src_keys = list(src_to_dst) + mapping_index = pd.MultiIndex.from_tuples( + src_keys, + names=["chunk_index", "file_index"], + ) + mapping_df = pd.DataFrame( + [ + ( + *src_to_dst[src_key], + src_to_offset.get(src_key, 0.0), + ) + for src_key in src_keys + ], + index=mapping_index, + columns=["dst_chunk", "dst_file", "offset"], + ) + reindexed = mapping_df.reindex(row_index) + df[orig_chunk_col] = ( + reindexed["dst_chunk"] + .fillna(video_idx["chunk"]) + .astype(orig_chunks.dtype, copy=False) + .to_numpy() + ) + df[orig_file_col] = ( + reindexed["dst_file"] + .fillna(video_idx["file"]) + .astype(orig_files.dtype, copy=False) + .to_numpy() + ) + offsets = reindexed["offset"].fillna(0.0).to_numpy(dtype=float) + df[f"videos/{key}/from_timestamp"] += offsets + df[f"videos/{key}/to_timestamp"] += offsets + elif src_to_offset: + df[orig_chunk_col] = video_idx["chunk"] + df[orig_file_col] = video_idx["file"] + mapping_series = pd.Series(src_to_offset, dtype=float) + offsets = mapping_series.reindex(row_index).fillna(0.0).to_numpy() + df[f"videos/{key}/from_timestamp"] += offsets + df[f"videos/{key}/to_timestamp"] += offsets + else: + df[orig_chunk_col] = video_idx["chunk"] + df[orig_file_col] = video_idx["file"] + df[f"videos/{key}/from_timestamp"] = ( + df[f"videos/{key}/from_timestamp"] + video_idx["latest_duration"] + ) + df[f"videos/{key}/to_timestamp"] = ( + df[f"videos/{key}/to_timestamp"] + video_idx["latest_duration"] + ) + + df["dataset_from_index"] = df["dataset_from_index"] + dst_meta.info.total_frames + df["dataset_to_index"] = df["dataset_to_index"] + dst_meta.info.total_frames + df["episode_index"] = df["episode_index"] + dst_meta.info.total_episodes + + return df + + +def _normalize_array_values(df): + import pandas as pd + + df = df.copy() + for column in df.columns: + if _has_array_values(df[column]): + df[column] = pd.Series( + [_normalize_array_value(value) for value in df[column]], + dtype=object, + index=df.index, + ) + return df + + +def _normalize_array_value(value): + import numpy as np + + if isinstance(value, np.ndarray) and value.ndim > 1: + return [_normalize_array_value(item) for item in value] + return value + + +def _has_array_values(series) -> bool: + import numpy as np + + for value in series.head(32): + if isinstance(value, np.ndarray): + return True + return False