mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-30 07:59:43 +00:00
Fix aggregate (num_frames, dataset_from_index, index)
This commit is contained in:
committed by
Michel Aractingi
parent
6f0fc7f386
commit
a231930044
@@ -47,9 +47,10 @@ def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]):
|
|||||||
return fps, robot_type, features
|
return fps, robot_type, features
|
||||||
|
|
||||||
|
|
||||||
def update_episode_and_task(df, episode_index_to_add, old_tasks, new_tasks):
|
def update_episode_frame_task(df, episode_index_to_add, old_tasks, new_tasks, frame_index_to_add):
|
||||||
def _update(row):
|
def _update(row):
|
||||||
row["episode_index"] = row["episode_index"] + episode_index_to_add
|
row["episode_index"] = row["episode_index"] + episode_index_to_add
|
||||||
|
row["index"] = row["index"] + frame_index_to_add
|
||||||
task = old_tasks.iloc[row["task_index"]].name
|
task = old_tasks.iloc[row["task_index"]].name
|
||||||
row["task_index"] = new_tasks.loc[task].task_index.item()
|
row["task_index"] = new_tasks.loc[task].task_index.item()
|
||||||
return row
|
return row
|
||||||
@@ -150,8 +151,10 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
|
|||||||
aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format(
|
aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format(
|
||||||
chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx
|
chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx
|
||||||
)
|
)
|
||||||
|
if not aggr_path.exists():
|
||||||
if aggr_path.exists():
|
aggr_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
df.to_parquet(aggr_path)
|
||||||
|
else:
|
||||||
size_in_mb = get_parquet_file_size_in_mb(path)
|
size_in_mb = get_parquet_file_size_in_mb(path)
|
||||||
aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path)
|
aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path)
|
||||||
|
|
||||||
@@ -160,12 +163,15 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
|
|||||||
aggr_meta_chunk_idx, aggr_meta_file_idx = update_chunk_file_indices(
|
aggr_meta_chunk_idx, aggr_meta_file_idx = update_chunk_file_indices(
|
||||||
aggr_meta_chunk_idx, aggr_meta_file_idx, DEFAULT_CHUNK_SIZE
|
aggr_meta_chunk_idx, aggr_meta_file_idx, DEFAULT_CHUNK_SIZE
|
||||||
)
|
)
|
||||||
|
aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format(
|
||||||
|
chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx
|
||||||
|
)
|
||||||
|
aggr_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
df.to_parquet(aggr_path)
|
||||||
else:
|
else:
|
||||||
# Update the existing parquet file with new rows
|
# Update the existing parquet file with new rows
|
||||||
aggr_df = pd.read_parquet(aggr_path)
|
aggr_df = pd.read_parquet(aggr_path)
|
||||||
df = pd.concat([aggr_df, df], ignore_index=True)
|
df = pd.concat([aggr_df, df], ignore_index=True)
|
||||||
|
|
||||||
aggr_path.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
df.to_parquet(aggr_path)
|
df.to_parquet(aggr_path)
|
||||||
|
|
||||||
# Aggregate videos if any
|
# Aggregate videos if any
|
||||||
@@ -187,7 +193,11 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
|
|||||||
chunk_index=aggr_videos_chunk_idx[key],
|
chunk_index=aggr_videos_chunk_idx[key],
|
||||||
file_index=aggr_videos_file_idx[key],
|
file_index=aggr_videos_file_idx[key],
|
||||||
)
|
)
|
||||||
if aggr_path.exists():
|
if not aggr_path.exists():
|
||||||
|
# First video
|
||||||
|
aggr_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
shutil.copy(str(path), str(aggr_path))
|
||||||
|
else:
|
||||||
size_in_mb = get_video_size_in_mb(path)
|
size_in_mb = get_video_size_in_mb(path)
|
||||||
aggr_size_in_mb = get_video_size_in_mb(aggr_path)
|
aggr_size_in_mb = get_video_size_in_mb(aggr_path)
|
||||||
|
|
||||||
@@ -196,6 +206,13 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
|
|||||||
aggr_videos_chunk_idx[key], aggr_videos_file_idx[key] = update_chunk_file_indices(
|
aggr_videos_chunk_idx[key], aggr_videos_file_idx[key] = update_chunk_file_indices(
|
||||||
aggr_videos_chunk_idx[key], aggr_videos_file_idx[key], DEFAULT_CHUNK_SIZE
|
aggr_videos_chunk_idx[key], aggr_videos_file_idx[key], DEFAULT_CHUNK_SIZE
|
||||||
)
|
)
|
||||||
|
aggr_path = aggr_root / DEFAULT_VIDEO_PATH.format(
|
||||||
|
video_key=key,
|
||||||
|
chunk_index=aggr_videos_chunk_idx[key],
|
||||||
|
file_index=aggr_videos_file_idx[key],
|
||||||
|
)
|
||||||
|
aggr_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
shutil.copy(str(path), str(aggr_path))
|
||||||
else:
|
else:
|
||||||
# Update the existing parquet file with new rows
|
# Update the existing parquet file with new rows
|
||||||
concat_video_files(
|
concat_video_files(
|
||||||
@@ -205,10 +222,6 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
|
|||||||
aggr_videos_chunk_idx[key],
|
aggr_videos_chunk_idx[key],
|
||||||
aggr_videos_file_idx[key],
|
aggr_videos_file_idx[key],
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
aggr_path.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
shutil.copy(str(path), str(aggr_path))
|
|
||||||
|
|
||||||
# copy_command = f"cp {video_path} {aggr_video_path} &"
|
# copy_command = f"cp {video_path} {aggr_video_path} &"
|
||||||
# subprocess.Popen(copy_command, shell=True)
|
# subprocess.Popen(copy_command, shell=True)
|
||||||
|
|
||||||
@@ -220,13 +233,15 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
|
|||||||
for chunk_idx, file_idx in data_chunk_file_ids:
|
for chunk_idx, file_idx in data_chunk_file_ids:
|
||||||
path = meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
|
path = meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
|
||||||
df = pd.read_parquet(path)
|
df = pd.read_parquet(path)
|
||||||
# TODO(rcadene): update frame index
|
df = update_episode_frame_task(df, num_episodes, meta.tasks, aggr_meta.tasks, num_frames)
|
||||||
df = update_episode_and_task(df, num_episodes, meta.tasks, aggr_meta.tasks)
|
|
||||||
|
|
||||||
aggr_path = aggr_root / DEFAULT_DATA_PATH.format(
|
aggr_path = aggr_root / DEFAULT_DATA_PATH.format(
|
||||||
chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx
|
chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx
|
||||||
)
|
)
|
||||||
if aggr_path.exists():
|
if not aggr_path.exists():
|
||||||
|
aggr_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
df.to_parquet(aggr_path)
|
||||||
|
else:
|
||||||
size_in_mb = get_parquet_file_size_in_mb(path)
|
size_in_mb = get_parquet_file_size_in_mb(path)
|
||||||
aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path)
|
aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path)
|
||||||
|
|
||||||
@@ -235,12 +250,15 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
|
|||||||
aggr_data_chunk_idx, aggr_data_file_idx = update_chunk_file_indices(
|
aggr_data_chunk_idx, aggr_data_file_idx = update_chunk_file_indices(
|
||||||
aggr_data_chunk_idx, aggr_data_file_idx, DEFAULT_CHUNK_SIZE
|
aggr_data_chunk_idx, aggr_data_file_idx, DEFAULT_CHUNK_SIZE
|
||||||
)
|
)
|
||||||
|
aggr_path = aggr_root / DEFAULT_DATA_PATH.format(
|
||||||
|
chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx
|
||||||
|
)
|
||||||
|
aggr_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
df.to_parquet(aggr_path)
|
||||||
else:
|
else:
|
||||||
# Update the existing parquet file with new rows
|
# Update the existing parquet file with new rows
|
||||||
aggr_df = pd.read_parquet(aggr_path)
|
aggr_df = pd.read_parquet(aggr_path)
|
||||||
df = pd.concat([aggr_df, df], ignore_index=True)
|
df = pd.concat([aggr_df, df], ignore_index=True)
|
||||||
|
|
||||||
aggr_path.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
df.to_parquet(aggr_path)
|
df.to_parquet(aggr_path)
|
||||||
|
|
||||||
num_episodes += meta.total_episodes
|
num_episodes += meta.total_episodes
|
||||||
|
|||||||
Reference in New Issue
Block a user