mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-26 12:47:18 +00:00
fix(convert_v2_v3) reverted concat data files from previous commit
fixed bug in meta data related chunk_index and file_index when concatenating video files, added clearer condition to respect conditions so that episode doesnt span multiple videos
This commit is contained in:
@@ -40,8 +40,9 @@ from typing import Any
|
||||
|
||||
import jsonlines
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import tqdm
|
||||
from datasets import Dataset, Image, concatenate_datasets
|
||||
from datasets import Dataset, Features, Image
|
||||
from huggingface_hub import HfApi, snapshot_download
|
||||
from requests import HTTPError
|
||||
|
||||
@@ -152,21 +153,24 @@ def convert_tasks(root, new_root):
|
||||
|
||||
|
||||
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
|
||||
# Save RAM by using Dataset.from_parquet and concatenate_datasets
|
||||
datasets = [Dataset.from_parquet(file) for file in paths_to_cat]
|
||||
concatenated_dataset = concatenate_datasets(datasets)
|
||||
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
|
||||
dataframes = [pd.read_parquet(file) for file in paths_to_cat]
|
||||
# Concatenate all DataFrames along rows
|
||||
concatenated_df = pd.concat(dataframes, ignore_index=True)
|
||||
|
||||
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if len(image_keys) > 0:
|
||||
# Handle image features by casting to the appropriate feature types
|
||||
features = concatenated_dataset.features.copy()
|
||||
schema = pa.Schema.from_pandas(concatenated_df)
|
||||
features = Features.from_arrow_schema(schema)
|
||||
for key in image_keys:
|
||||
features[key] = Image()
|
||||
concatenated_dataset = concatenated_dataset.cast(features)
|
||||
schema = features.arrow_schema
|
||||
else:
|
||||
schema = None
|
||||
|
||||
concatenated_dataset.to_parquet(path)
|
||||
concatenated_df.to_parquet(path, index=False, schema=schema)
|
||||
|
||||
|
||||
def convert_data(root, new_root):
|
||||
@@ -280,35 +284,50 @@ def convert_videos_of_camera(root: Path, new_root: Path, video_key):
|
||||
for ep_path in tqdm.tqdm(ep_paths, desc=f"convert videos of {video_key}"):
|
||||
ep_size_in_mb = get_video_size_in_mb(ep_path)
|
||||
ep_duration_in_s = get_video_duration_in_s(ep_path)
|
||||
|
||||
# Check if adding this episode would exceed the limit
|
||||
if size_in_mb + ep_size_in_mb >= DEFAULT_VIDEO_FILE_SIZE_IN_MB and len(paths_to_cat) > 0:
|
||||
# Size limit would be exceeded, save current accumulation WITHOUT this episode
|
||||
concat_video_files(paths_to_cat, new_root, video_key, chunk_idx, file_idx)
|
||||
|
||||
# Update episodes metadata for the file we just saved
|
||||
for i, _ in enumerate(paths_to_cat):
|
||||
past_ep_idx = ep_idx - len(paths_to_cat) + i
|
||||
episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx
|
||||
episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx
|
||||
|
||||
# Move to next file and start fresh with current episode
|
||||
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE)
|
||||
size_in_mb = 0
|
||||
duration_in_s = 0.0
|
||||
paths_to_cat = []
|
||||
|
||||
# Add current episode metadata
|
||||
ep_metadata = {
|
||||
"episode_index": ep_idx,
|
||||
f"videos/{video_key}/chunk_index": chunk_idx,
|
||||
f"videos/{video_key}/file_index": file_idx,
|
||||
f"videos/{video_key}/chunk_index": chunk_idx, # Will be updated when file is saved
|
||||
f"videos/{video_key}/file_index": file_idx, # Will be updated when file is saved
|
||||
f"videos/{video_key}/from_timestamp": duration_in_s,
|
||||
f"videos/{video_key}/to_timestamp": duration_in_s + ep_duration_in_s,
|
||||
}
|
||||
episodes_metadata.append(ep_metadata)
|
||||
|
||||
# Add current episode to accumulation
|
||||
paths_to_cat.append(ep_path)
|
||||
size_in_mb += ep_size_in_mb
|
||||
duration_in_s += ep_duration_in_s
|
||||
episodes_metadata.append(ep_metadata)
|
||||
ep_idx += 1
|
||||
|
||||
if size_in_mb < DEFAULT_VIDEO_FILE_SIZE_IN_MB:
|
||||
paths_to_cat.append(ep_path)
|
||||
continue
|
||||
|
||||
concat_video_files(paths_to_cat, new_root, video_key, chunk_idx, file_idx)
|
||||
|
||||
# Reset for the next file
|
||||
size_in_mb = ep_size_in_mb
|
||||
duration_in_s = ep_duration_in_s
|
||||
paths_to_cat = [ep_path]
|
||||
|
||||
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE)
|
||||
|
||||
# Write remaining videos if any
|
||||
if paths_to_cat:
|
||||
concat_video_files(paths_to_cat, new_root, video_key, chunk_idx, file_idx)
|
||||
|
||||
# Update episodes metadata for the final file
|
||||
for i, _ in enumerate(paths_to_cat):
|
||||
past_ep_idx = ep_idx - len(paths_to_cat) + i
|
||||
episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx
|
||||
episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx
|
||||
|
||||
return episodes_metadata
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user