From a276f5b8ac0554d050c132415420ba7e17fd82e8 Mon Sep 17 00:00:00 2001 From: Jade Choghari Date: Thu, 30 Oct 2025 18:12:50 +0100 Subject: [PATCH] fix style --- examples/behavior_1k/convert_to_lerobot_v3.py | 82 ++++++++++++----- examples/behavior_1k/tester.py | 88 ++++++++++++++----- 2 files changed, 127 insertions(+), 43 deletions(-) diff --git a/examples/behavior_1k/convert_to_lerobot_v3.py b/examples/behavior_1k/convert_to_lerobot_v3.py index 37ca835ce..abc00fe39 100755 --- a/examples/behavior_1k/convert_to_lerobot_v3.py +++ b/examples/behavior_1k/convert_to_lerobot_v3.py @@ -14,8 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Convert Behavior Dataset to LeRobotDataset v3.0 format""" -from pathlib import Path -import jsonlines + import argparse import logging import shutil @@ -50,32 +49,30 @@ from lerobot.datasets.utils import ( write_tasks, ) from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s -from lerobot.utils.constants import HF_LEROBOT_HOME from lerobot.utils.utils import init_logging + # script to convert one single task to v3.1 # TASK = 1 NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb") -from lerobot.datasets.utils import load_info + + def get_total_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step) -> int: """ Calculates the total number of episodes for a single, specified task. """ # Simply load the episodes for the task and count them. episodes = legacy_load_episodes_task( - local_dir=local_dir, - task_id=task_id, - task_ranges=task_ranges, - step=step + local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step ) return len(episodes) + NUM_CAMERAS = 9 + + def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, step: int) -> int: episodes_metadata = legacy_load_episodes_task( - local_dir=local_dir, - task_id=task_id, - task_ranges=task_ranges, - step=step + local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step ) total_frames = 0 # like 'duration' @@ -84,7 +81,10 @@ def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, total_frames += int(duration_s) return total_frames -def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step): + +def convert_info( + root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step +): info = load_info(root) info["codebase_version"] = "v3.0" del info["total_videos"] @@ -98,24 +98,27 @@ def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, me # already has fps in video_info continue info["features"][key]["fps"] = info["fps"] - + info["total_episodes"] = get_total_episodes_task(root, task_id, task_ranges, step) info["total_videos"] = info["total_episodes"] * NUM_CAMERAS info["total_frames"] = get_total_frames_task(root, meta_path, task_id, task_ranges, step) info["total_tasks"] = 1 write_info(info, new_root) + def load_jsonlines(fpath: Path) -> list[any]: with jsonlines.open(fpath, "r") as reader: return list(reader) + def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]: tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) - # return tasks dict such that + # return tasks dict such that tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])} task_to_task_index = {task: task_index for task_index, task in tasks.items()} return tasks, task_to_task_index + def convert_tasks(root, new_root, task_id: int): tasks, _ = legacy_load_tasks(root) if task_id not in tasks: @@ -146,12 +149,14 @@ def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys): concatenated_df.to_parquet(path, index=False, schema=schema) + def get_image_keys(root): info = load_info(root) features = info["features"] image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"] return image_keys + def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int): task_dir_name = f"task-00{task_index}" data_dir = root / "data" / task_dir_name @@ -202,7 +207,10 @@ def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_ind return episodes_metadata -def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int): + +def convert_videos_of_camera( + root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int +): # Access old paths to mp4 # videos_dir = root / "videos" # ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4")) @@ -275,6 +283,7 @@ def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_f return episodes_metadata + def get_video_keys(root): info = load_info(root) features = info["features"] @@ -321,6 +330,7 @@ def convert_videos(root: Path, new_root: Path, video_file_size_in_mb: int, task_ import json from pathlib import Path + def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: """ Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task. @@ -333,7 +343,7 @@ def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: ep_start = None ep_end = None - with open(episodes_jsonl_path, "r") as f: + with open(episodes_jsonl_path) as f: for line in f: if not line.strip(): continue @@ -370,6 +380,7 @@ def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: return task_ranges + def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: """ Load only the episodes belonging to a specific task, inferred automatically from episode ranges. @@ -392,10 +403,12 @@ def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, task_episode_indices = range(ep_start, ep_end + step, step) return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes} + def legacy_load_episodes(local_dir: Path) -> dict: episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH) return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])} + def legacy_load_episodes_stats(local_dir: Path) -> dict: episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH) return { @@ -403,6 +416,7 @@ def legacy_load_episodes_stats(local_dir: Path) -> dict: for item in sorted(episodes_stats, key=lambda x: x["episode_index"]) } + def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: all_stats = legacy_load_episodes_stats(local_dir) @@ -415,6 +429,7 @@ def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: task_episode_indices = range(ep_start, ep_end + step, step) return {i: all_stats[i] for i in task_episode_indices if i in all_stats} + def generate_episode_metadata_dict( episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None ): @@ -448,7 +463,10 @@ def generate_episode_metadata_dict( ep_dict["meta/episodes/file_index"] = 0 yield ep_dict -def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None): + +def convert_episodes_metadata( + root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None +): logging.info(f"Converting episodes metadata from {root} to {new_root}") # filter by task @@ -472,9 +490,10 @@ def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, t stats = aggregate_stats(list(episodes_stats.values())) write_stats(stats, new_root) -import shutil + from pathlib import Path + def convert_dataset_local( data_path: Path, new_repo: Path, @@ -511,22 +530,41 @@ def convert_dataset_local( EPISODES_META_PATH = root / "meta" / "episodes.jsonl" task_ranges = infer_task_episode_ranges(EPISODES_META_PATH) # def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step): - convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, EPISODES_META_PATH, task_id, task_ranges, STEP) + convert_info( + root, + new_root, + data_file_size_in_mb, + video_file_size_in_mb, + EPISODES_META_PATH, + task_id, + task_ranges, + STEP, + ) convert_tasks(root, new_root, task_id) episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id) episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id) - convert_episodes_metadata(root, new_root, episodes_metadata, task_id=task_id, task_ranges=task_ranges, episodes_video_metadata=episodes_videos_metadata) + convert_episodes_metadata( + root, + new_root, + episodes_metadata, + task_id=task_id, + task_ranges=task_ranges, + episodes_video_metadata=episodes_videos_metadata, + ) print(f"✅ Conversion complete for task {task_id}") print(f"Converted dataset written to: {new_root}") + if __name__ == "__main__": import argparse from pathlib import Path init_logging() - parser = argparse.ArgumentParser(description="Convert Behavior-1K tasks to LeRobot v3 format (local only)") + parser = argparse.ArgumentParser( + description="Convert Behavior-1K tasks to LeRobot v3 format (local only)" + ) parser.add_argument( "--data-path", type=str, diff --git a/examples/behavior_1k/tester.py b/examples/behavior_1k/tester.py index d4096cd4a..54a93b79f 100644 --- a/examples/behavior_1k/tester.py +++ b/examples/behavior_1k/tester.py @@ -1,14 +1,14 @@ from pathlib import Path + import jsonlines + DATA_PATH = Path("/fsx/francesco_capuano/.cache/behavior-1k/2025-challenge-demos") NEW_PATH = Path("/fsx/jade_choghari/.cache/behavior-1k-task0/") import argparse import logging import shutil from pathlib import Path -from typing import Any -import jsonlines import pandas as pd import pyarrow as pa import tqdm @@ -42,30 +42,29 @@ from lerobot.datasets.utils import ( from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s from lerobot.utils.constants import HF_LEROBOT_HOME from lerobot.utils.utils import init_logging + # script to convert one single task to v3.1 # TASK = 1 NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb") -from lerobot.datasets.utils import load_info + + def get_total_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step) -> int: """ Calculates the total number of episodes for a single, specified task. """ # Simply load the episodes for the task and count them. episodes = legacy_load_episodes_task( - local_dir=local_dir, - task_id=task_id, - task_ranges=task_ranges, - step=step + local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step ) return len(episodes) + NUM_CAMERAS = 9 + + def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, step: int) -> int: episodes_metadata = legacy_load_episodes_task( - local_dir=local_dir, - task_id=task_id, - task_ranges=task_ranges, - step=step + local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step ) total_frames = 0 # like 'duration' @@ -74,7 +73,10 @@ def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, total_frames += int(duration_s) return total_frames -def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step): + +def convert_info( + root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step +): info = load_info(root) info["codebase_version"] = "v3.0" del info["total_videos"] @@ -88,26 +90,30 @@ def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, me # already has fps in video_info continue info["features"][key]["fps"] = info["fps"] - + info["total_episodes"] = get_total_episodes_task(root, task_id, task_ranges, step) info["total_videos"] = info["total_episodes"] * NUM_CAMERAS info["total_frames"] = get_total_frames_task(root, meta_path, task_id, task_ranges, step) info["total_tasks"] = 1 write_info(info, new_root) + # convert_info(DATA_PATH, 12, 24) + def load_jsonlines(fpath: Path) -> list[any]: with jsonlines.open(fpath, "r") as reader: return list(reader) + def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]: tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) - # return tasks dict such that + # return tasks dict such that tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])} task_to_task_index = {task: task_index for task_index, task in tasks.items()} return tasks, task_to_task_index + def convert_tasks(root, new_root, task_id: int): tasks, _ = legacy_load_tasks(root) if task_id not in tasks: @@ -118,8 +124,10 @@ def convert_tasks(root, new_root, task_id: int): df_tasks = pd.DataFrame({"task_index": task_indices}, index=task_strings) write_tasks(df_tasks, new_root) + # convert_tasks(DATA_PATH) + def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys): # TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets dataframes = [pd.read_parquet(file) for file in paths_to_cat] @@ -139,12 +147,14 @@ def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys): concatenated_df.to_parquet(path, index=False, schema=schema) + def get_image_keys(root): info = load_info(root) features = info["features"] image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"] return image_keys + def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int): task_dir_name = f"task-000{task_index}" data_dir = root / "data" / task_dir_name @@ -198,10 +208,14 @@ def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_ind return episodes_metadata + # episodes_metadata = convert_data(DATA_PATH, NEW_ROOT, 50, TASK) # print("episodes meta: ", episodes_metadata) -def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int): + +def convert_videos_of_camera( + root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int +): # Access old paths to mp4 # videos_dir = root / "videos" # ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4")) @@ -274,6 +288,7 @@ def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_f return episodes_metadata + def get_video_keys(root): info = load_info(root) features = info["features"] @@ -320,6 +335,7 @@ def convert_videos(root: Path, new_root: Path, video_file_size_in_mb: int, task_ import json from pathlib import Path + def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: """ Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task. @@ -332,7 +348,7 @@ def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: ep_start = None ep_end = None - with open(episodes_jsonl_path, "r") as f: + with open(episodes_jsonl_path) as f: for line in f: if not line.strip(): continue @@ -369,6 +385,7 @@ def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: return task_ranges + def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: """ Load only the episodes belonging to a specific task, inferred automatically from episode ranges. @@ -391,10 +408,12 @@ def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, task_episode_indices = range(ep_start, ep_end + step, step) return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes} + def legacy_load_episodes(local_dir: Path) -> dict: episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH) return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])} + # episodes_videos_metadata = convert_videos(DATA_PATH, NEW_ROOT, 50) # episodes_legacy_metadata = legacy_load_episodes(DATA_PATH) # episodes_task_0 = legacy_load_episodes_task(DATA_PATH, task_id=TASK, task_ranges=task_ranges) @@ -407,6 +426,7 @@ def legacy_load_episodes_stats(local_dir: Path) -> dict: for item in sorted(episodes_stats, key=lambda x: x["episode_index"]) } + def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: all_stats = legacy_load_episodes_stats(local_dir) @@ -419,6 +439,7 @@ def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: task_episode_indices = range(ep_start, ep_end + step, step) return {i: all_stats[i] for i in task_episode_indices if i in all_stats} + # ep = legacy_load_episodes_stats_task(DATA_PATH, task_id=TASK, task_ranges=task_ranges) def generate_episode_metadata_dict( episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None @@ -452,7 +473,10 @@ def generate_episode_metadata_dict( ep_dict["meta/episodes/file_index"] = 0 yield ep_dict -def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None): + +def convert_episodes_metadata( + root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None +): logging.info(f"Converting episodes metadata from {root} to {new_root}") # filter by task @@ -475,9 +499,11 @@ def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, t stats = aggregate_stats(list(episodes_stats.values())) write_stats(stats, new_root) -import shutil + + from pathlib import Path + def convert_dataset_local( data_path: Path, new_repo: Path, @@ -514,22 +540,41 @@ def convert_dataset_local( EPISODES_META_PATH = DATA_PATH / "meta" / "episodes.jsonl" task_ranges = infer_task_episode_ranges(EPISODES_META_PATH) # def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step): - convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, EPISODES_META_PATH, task_id, task_ranges, STEP) + convert_info( + root, + new_root, + data_file_size_in_mb, + video_file_size_in_mb, + EPISODES_META_PATH, + task_id, + task_ranges, + STEP, + ) convert_tasks(root, new_root, task_id) episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id) episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id) - convert_episodes_metadata(root, new_root, episodes_metadata, task_id=task_id, task_ranges=task_ranges, episodes_video_metadata=episodes_videos_metadata) + convert_episodes_metadata( + root, + new_root, + episodes_metadata, + task_id=task_id, + task_ranges=task_ranges, + episodes_video_metadata=episodes_videos_metadata, + ) print(f"✅ Conversion complete for task {task_id}") print(f"Converted dataset written to: {new_root}") + if __name__ == "__main__": import argparse from pathlib import Path init_logging() - parser = argparse.ArgumentParser(description="Convert Behavior-1K tasks to LeRobot v3 format (local only)") + parser = argparse.ArgumentParser( + description="Convert Behavior-1K tasks to LeRobot v3 format (local only)" + ) parser.add_argument( "--data-path", type=str, @@ -577,6 +622,7 @@ if __name__ == "__main__": force_conversion=args.force_conversion, ) + def convert_dataset( repo_id: str, branch: str | None = None,