Most unit tests are passing

This commit is contained in:
Remi Cadene
2025-04-11 14:04:22 +02:00
parent c1b28f0b58
commit 34c5d4ce07
6 changed files with 391 additions and 322 deletions
+128 -107
View File
@@ -16,17 +16,18 @@
import contextlib
import logging
import shutil
from pathlib import Path
import tempfile
from pathlib import Path
from typing import Callable
import datasets
import numpy as np
import packaging.version
import PIL.Image
import pandas as pd
import PIL.Image
import torch
import torch.utils
from datasets import concatenate_datasets, load_dataset, Dataset
from datasets import Dataset
from huggingface_hub import HfApi, snapshot_download
from huggingface_hub.constants import REPOCARD_NAME
from huggingface_hub.errors import RevisionNotFoundError
@@ -35,52 +36,36 @@ from lerobot.common.constants import HF_LEROBOT_HOME
from lerobot.common.datasets.compute_stats import aggregate_stats, compute_episode_stats
from lerobot.common.datasets.image_writer import AsyncImageWriter, write_image
from lerobot.common.datasets.utils import (
DEFAULT_DATA_PATH,
DEFAULT_EPISODES_PATH,
DEFAULT_EPISODES_STATS_PATH,
DEFAULT_FEATURES,
DEFAULT_IMAGE_PATH,
EPISODES_DIR,
EPISODES_STATS_DIR,
INFO_PATH,
LEGACY_TASKS_PATH,
append_jsonlines,
backward_compatible_episodes_stats,
check_delta_timestamps,
check_timestamps_sync,
check_version_compatibility,
concat_video_files,
create_empty_dataset_info,
create_lerobot_dataset_card,
embed_images,
flatten_dict,
get_chunk_file_indices,
get_delta_indices,
get_episode_data_index,
get_features_from_robot,
get_hf_dataset_size_in_mb,
get_hf_features_from_features,
get_latest_parquet_path,
get_latest_video_path,
get_parquet_num_frames,
get_pd_dataframe_size_in_mb,
get_safe_version,
get_video_duration_in_s,
get_video_size_in_mb,
hf_transform_to_torch,
is_valid_version,
legacy_load_episodes,
legacy_load_episodes_stats,
load_episodes,
load_episodes_stats,
load_info,
load_nested_dataset,
load_stats,
legacy_load_tasks,
load_tasks,
update_chunk_file_indices,
validate_episode_buffer,
validate_frame,
write_episode,
legacy_write_episode_stats,
write_info,
write_json,
write_tasks,
@@ -118,15 +103,17 @@ class LeRobotDatasetMetadata:
self.revision = get_safe_version(self.repo_id, self.revision)
(self.root / "meta").mkdir(exist_ok=True, parents=True)
# TODO(rcadene): instead of downloading all episodes metadata files,
# download only the ones associated to the requested episodes. This would
# require adding `episodes: list[int]` as argument.
self.pull_from_repo(allow_patterns="meta/")
self.load_metadata()
def load_metadata(self):
self.info = load_info(self.root)
check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION)
self.tasks, self.task_to_task_index = load_tasks(self.root)
self.tasks = load_tasks(self.root)
self.episodes = load_episodes(self.root)
self.episodes_stats = load_episodes_stats(self.root)
# TODO(rcadene): https://huggingface.slack.com/archives/C02V51Q3800/p1743517952388249?thread_ts=1742896075.499119&cid=C02V51Q3800
# self.stats = aggregate_stats(list(self.episodes_stats.values()))
@@ -150,8 +137,8 @@ class LeRobotDatasetMetadata:
return packaging.version.parse(self.info["codebase_version"])
def get_data_file_path(self, ep_index: int) -> Path:
chunk_idx = self.episodes[f"data/chunk_index"][ep_index]
file_idx = self.episodes[f"data/file_index"][ep_index]
chunk_idx = self.episodes["data/chunk_index"][ep_index]
file_idx = self.episodes["data/file_index"][ep_index]
fpath = self.data_path.format(chunk_index=chunk_idx, file_index=file_idx)
return Path(fpath)
@@ -161,9 +148,6 @@ class LeRobotDatasetMetadata:
fpath = self.video_path.format(video_key=vid_key, chunk_index=chunk_idx, file_index=file_idx)
return Path(fpath)
# def get_episode_chunk(self, ep_index: int) -> int:
# return ep_index // self.chunks_size
@property
def data_path(self) -> str:
"""Formattable string for the parquet files."""
@@ -233,7 +217,7 @@ class LeRobotDatasetMetadata:
def chunks_size(self) -> int:
"""Max number of files per chunk."""
return self.info["chunks_size"]
@property
def files_size_in_mb(self) -> int:
"""Max size of file in mega bytes."""
@@ -244,71 +228,84 @@ class LeRobotDatasetMetadata:
Given a task in natural language, returns its task_index if the task already exists in the dataset,
otherwise return None.
"""
return self.tasks.index[task] if task in self.tasks.index else None
def has_task(self, task: str) -> bool:
return task in self.task_to_task_index
if task in self.tasks.index:
return int(self.tasks.loc[task].task_index)
else:
return None
def save_episode_tasks(self, tasks: list[str]):
new_tasks = [task for task in tasks if not self.has_task(task)]
if len(set(tasks)) != len(tasks):
raise ValueError(f"Tasks are not unique: {tasks}")
for task in new_tasks:
task_index = len(self.tasks)
self.tasks.loc[task] = task_index
if self.tasks is None:
new_tasks = tasks
task_indices = range(len(tasks))
self.tasks = pd.DataFrame({"task_index": task_indices}, index=tasks)
else:
new_tasks = [task for task in tasks if task not in self.tasks.index]
new_task_indices = range(len(self.tasks), len(new_tasks))
for task_idx, task in zip(new_task_indices, new_tasks, strict=False):
self.tasks.loc[task] = task_idx
if len(new_tasks) > 0:
# Update on disk
write_tasks(self.tasks, self.root)
def _save_episode(self, episode_dict: dict):
def _save_episode_metadata(self, episode_dict: dict) -> None:
"""Save episode metadata to a parquet file and update the Hugging Face dataset of episodes metadata.
This function processes episodes metadata from a dictionary, converts it into a Hugging Face dataset,
and saves it as a parquet file. It handles both the creation of new parquet files and the
updating of existing ones based on size constraints. After saving the metadata, it reloads
the Hugging Face dataset to ensure it is up-to-date.
Notes: We both need to update parquet files and HF dataset:
- `pandas` loads parquet file in RAM
- `datasets` relies on a memory mapping from pyarrow (no RAM). It either converts parquet files to a pyarrow cache on disk,
or loads directly from pyarrow cache.
"""
# Convert buffer into HF Dataset
episode_dict = {key: [value] for key, value in episode_dict.items()}
ep_dataset = Dataset.from_dict(episode_dict)
ep_size_in_mb = get_hf_dataset_size_in_mb(ep_dataset)
df = pd.DataFrame(ep_dataset)
# Access latest parquet file information
latest_path = get_latest_parquet_path(self.root / EPISODES_DIR)
latest_size_in_mb = get_parquet_file_size_in_mb(latest_path)
chunk_idx, file_idx = get_chunk_file_indices(latest_path)
if latest_size_in_mb + ep_size_in_mb >= self.meta.files_size_in_mb:
# Create new parquet file
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size)
new_path = self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
new_path.parent.mkdir(parents=True, exist_ok=True)
ep_df.to_parquet(new_path, index=False)
if self.episodes is None:
# Initialize indices and frame count for a new dataset made of the first episode data
chunk_idx, file_idx = 0, 0
df["meta/episodes/chunk_index"] = [chunk_idx]
df["meta/episodes/file_index"] = [file_idx]
else:
# Update latest parquet file with new row
ep_df = pd.DataFrame(ep_dataset)
latest_df = pd.concat([latest_df, ep_df], ignore_index=True) # RAM
latest_df.to_parquet(latest_path, index=False)
# Retrieve information from the latest parquet file
latest_ep = self.episodes.with_format(columns=["chunk_index", "file_index"])[-1]
chunk_idx, file_idx = latest_ep["chunk_index"], latest_ep["file_index"]
latest_path = self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
latest_size_in_mb = get_parquet_file_size_in_mb(latest_path)
# Determine if a new parquet file is needed
if latest_size_in_mb + ep_size_in_mb >= self.files_size_in_mb:
# Size limit is reached, prepare new parquet file
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size)
df["meta/episodes/chunk_index"] = [chunk_idx]
df["meta/episodes/file_index"] = [file_idx]
else:
# Update the existing parquet file with new row
df["meta/episodes/chunk_index"] = [chunk_idx]
df["meta/episodes/file_index"] = [file_idx]
latest_df = pd.read_parquet(latest_path)
latest_df = pd.concat([latest_df, df], ignore_index=True)
# Write the resulting dataframe from RAM to disk
path = self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(path, index=False)
# Update the Hugging Face dataset by reloading it.
# This process should be fast because only the latest Parquet file has been modified.
# Therefore, only this file needs to be converted to PyArrow; the rest is loaded from the PyArrow memory-mapped cache.
self.episodes = load_episodes(self.root)
def _save_episode_stats(self, episodes_stats: dict):
ep_dataset = Dataset.from_dict(episodes_stats)
ep_size_in_mb = get_hf_dataset_size_in_mb(ep_dataset)
# Access latest parquet file information
latest_path = get_latest_parquet_path(self.root / EPISODES_STATS_DIR)
latest_size_in_mb = get_parquet_file_size_in_mb(latest_path)
chunk_idx, file_idx = get_chunk_file_indices(latest_path)
if latest_size_in_mb + ep_size_in_mb >= self.meta.files_size_in_mb:
# Create new parquet file
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size)
new_path = self.root / DEFAULT_EPISODES_STATS_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
new_path.parent.mkdir(parents=True, exist_ok=True)
ep_df.to_parquet(new_path, index=False)
else:
# Update latest parquet file with new row
ep_df = pd.DataFrame(ep_dataset)
latest_df = pd.concat([latest_df, ep_df], ignore_index=True) # RAM
latest_df.to_parquet(latest_path, index=False)
self.episodes_stats = load_episodes_stats(self.root)
def save_episode(
self,
episode_index: int,
@@ -331,8 +328,8 @@ class LeRobotDatasetMetadata:
"length": episode_length,
}
episode_dict.update(episode_metadata)
self._save_episode(episode_dict)
self._save_episode_stats(episode_stats)
episode_dict.update(flatten_dict({"stats": episode_stats}))
self._save_episode_metadata(episode_dict)
self.stats = aggregate_stats([self.stats, episode_stats]) if self.stats else episode_stats
# TODO: write stats
@@ -401,7 +398,6 @@ class LeRobotDatasetMetadata:
features = {**features, **DEFAULT_FEATURES}
obj.tasks = None
obj.episodes_stats = None
obj.episodes = None
# TODO(rcadene) stats
obj.stats = {}
@@ -557,7 +553,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.hf_dataset = self.load_hf_dataset()
except (AssertionError, FileNotFoundError, NotADirectoryError):
self.revision = get_safe_version(self.repo_id, self.revision)
self.download_episodes(download_videos)
self.download(download_videos)
self.hf_dataset = self.load_hf_dataset()
# Setup delta_indices
@@ -635,7 +631,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
ignore_patterns=ignore_patterns,
)
def download_episodes(self, download_videos: bool = True) -> None:
def download(self, download_videos: bool = True) -> None:
"""Downloads the dataset from the given 'repo_id' at the provided version. If 'episodes' is given, this
will only download those episodes (selected by their episode_index). If 'episodes' is None, the whole
dataset will be downloaded. Thanks to the behavior of snapshot_download, if the files are already present
@@ -795,10 +791,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
# Add task as a string
task_idx = item["task_index"].item()
if self.meta.tasks["task_index"][task_idx] != task_idx:
raise ValueError("Sanity check on task index failed.")
item["task"] = self.meta.tasks["task"][task_idx]
item["task"] = self.meta.tasks.iloc[task_idx].name
return item
def __repr__(self):
@@ -827,7 +820,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
image_key=image_key, episode_index=episode_index, frame_index=frame_index
)
return self.root / fpath
def _get_image_file_dir(self, episode_index: int, image_key: str) -> Path:
return self._get_image_file_path(episode_index, image_key, frame_index=0).parent
@@ -926,11 +919,11 @@ class LeRobotDataset(torch.utils.data.Dataset):
self._wait_image_writer()
ep_stats = compute_episode_stats(episode_buffer, self.features)
ep_metadata = self._save_episode_data(episode_buffer, episode_index)
ep_metadata = self._save_episode_data(episode_buffer)
for video_key in self.meta.video_keys:
ep_metadata.update(self._save_episode_video(video_key, episode_index))
# `meta.save_episode` neeed to be executed after encoding the videos
# `meta.save_episode` need to be executed after encoding the videos
self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
# TODO(rcadene): remove? there is only one episode in the episode buffer, no need for ep_data_index
@@ -954,31 +947,57 @@ class LeRobotDataset(torch.utils.data.Dataset):
# Reset episode buffer
self.episode_buffer = self.create_episode_buffer()
def _save_episode_data(self, episode_buffer: dict) -> None:
def _save_episode_data(self, episode_buffer: dict) -> dict:
"""Save episode data to a parquet file and update the Hugging Face dataset of frames data.
This function processes episodes data from a buffer, converts it into a Hugging Face dataset,
and saves it as a parquet file. It handles both the creation of new parquet files and the
updating of existing ones based on size constraints. After saving the data, it reloads
the Hugging Face dataset to ensure it is up-to-date.
Notes: We both need to update parquet files and HF dataset:
- `pandas` loads parquet file in RAM
- `datasets` relies on a memory mapping from pyarrow (no RAM). It either converts parquet files to a pyarrow cache on disk,
or loads directly from pyarrow cache.
"""
# Convert buffer into HF Dataset
ep_dict = {key: episode_buffer[key] for key in self.hf_features}
ep_dataset = datasets.Dataset.from_dict(ep_dict, features=self.hf_features, split="train")
ep_dataset = embed_images(ep_dataset)
ep_size_in_mb = get_hf_dataset_size_in_mb(ep_dataset)
ep_num_frames = len(ep_dataset)
df = pd.DataFrame(ep_dataset)
# Access latest parquet file information
latest_path = get_latest_parquet_path(self.root / "data")
latest_size_in_mb = get_parquet_file_size_in_mb(latest_path)
latest_num_frames = get_parquet_num_frames(latest_path)
chunk_idx, file_idx = get_chunk_file_indices(latest_path)
if latest_size_in_mb + ep_size_in_mb >= self.meta.files_size_in_mb:
# Create new parquet file
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size)
new_path = self.meta.data_path.format(chunk_index=chunk_idx, file_index=file_idx)
new_path.parent.mkdir(parents=True, exist_ok=True)
ep_df.to_parquet(new_path, index=False)
if self.meta.episodes is None:
# Initialize indices and frame count for a new dataset made of the first episode data
chunk_idx, file_idx = 0, 0
latest_num_frames = 0
else:
# Update latest parquet file with new rows
ep_df = pd.DataFrame(ep_dataset)
latest_df = pd.concat([latest_df, ep_df], ignore_index=True) # RAM
latest_df.to_parquet(latest_path, index=False)
# Retrieve information from the latest parquet file
latest_ep = self.meta.episodes.with_format(columns=["data/chunk_index", "data/file_index"])[-1]
chunk_idx, file_idx = latest_ep["data/chunk_index"], latest_ep["data/file_index"]
latest_path = self.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
latest_size_in_mb = get_parquet_file_size_in_mb(latest_path)
latest_num_frames = get_parquet_num_frames(latest_path)
# Determine if a new parquet file is needed
if latest_size_in_mb + ep_size_in_mb >= self.meta.files_size_in_mb:
# Size limit is reached, prepare new parquet file
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size)
latest_num_frames = 0
else:
# Update the existing parquet file with new rows
latest_df = pd.read_parquet(latest_path)
df = pd.concat([latest_df, df], ignore_index=True)
# Write the resulting dataframe from RAM to disk
path = self.root / self.meta.data_path.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
if len(self.meta.image_keys) > 0:
datasets.Dataset.from_dict(df.to_dict(orient="list")).to_parquet(path)
else:
df.to_parquet(path)
# Update the Hugging Face dataset by reloading it.
# This process should be fast because only the latest Parquet file has been modified.
@@ -1008,7 +1027,9 @@ class LeRobotDataset(torch.utils.data.Dataset):
if latest_size_in_mb + ep_size_in_mb >= self.meta.files_size_in_mb:
# Move temporary episode video to a new video file in the dataset
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size)
new_path = self.meta.video_path.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx)
new_path = self.meta.video_path.format(
video_key=video_key, chunk_index=chunk_idx, file_index=file_idx
)
new_path.parent.mkdir(parents=True, exist_ok=True)
ep_path.replace(new_path)
else: