fix style

This commit is contained in:
Jade Choghari
2025-10-30 18:12:50 +01:00
parent db7d501281
commit 28f8098df4
2 changed files with 127 additions and 43 deletions
+60 -22
View File
@@ -14,8 +14,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Convert Behavior Dataset to LeRobotDataset v3.0 format""" """Convert Behavior Dataset to LeRobotDataset v3.0 format"""
from pathlib import Path
import jsonlines
import argparse import argparse
import logging import logging
import shutil import shutil
@@ -50,32 +49,30 @@ from lerobot.datasets.utils import (
write_tasks, write_tasks,
) )
from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s
from lerobot.utils.constants import HF_LEROBOT_HOME
from lerobot.utils.utils import init_logging from lerobot.utils.utils import init_logging
# script to convert one single task to v3.1 # script to convert one single task to v3.1
# TASK = 1 # TASK = 1
NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb") NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb")
from lerobot.datasets.utils import load_info
def get_total_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step) -> int: def get_total_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step) -> int:
""" """
Calculates the total number of episodes for a single, specified task. Calculates the total number of episodes for a single, specified task.
""" """
# Simply load the episodes for the task and count them. # Simply load the episodes for the task and count them.
episodes = legacy_load_episodes_task( episodes = legacy_load_episodes_task(
local_dir=local_dir, local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step
task_id=task_id,
task_ranges=task_ranges,
step=step
) )
return len(episodes) return len(episodes)
NUM_CAMERAS = 9 NUM_CAMERAS = 9
def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, step: int) -> int: def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, step: int) -> int:
episodes_metadata = legacy_load_episodes_task( episodes_metadata = legacy_load_episodes_task(
local_dir=local_dir, local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step
task_id=task_id,
task_ranges=task_ranges,
step=step
) )
total_frames = 0 total_frames = 0
# like 'duration' # like 'duration'
@@ -84,7 +81,10 @@ def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict,
total_frames += int(duration_s) total_frames += int(duration_s)
return total_frames return total_frames
def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step):
def convert_info(
root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step
):
info = load_info(root) info = load_info(root)
info["codebase_version"] = "v3.0" info["codebase_version"] = "v3.0"
del info["total_videos"] del info["total_videos"]
@@ -98,24 +98,27 @@ def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, me
# already has fps in video_info # already has fps in video_info
continue continue
info["features"][key]["fps"] = info["fps"] info["features"][key]["fps"] = info["fps"]
info["total_episodes"] = get_total_episodes_task(root, task_id, task_ranges, step) info["total_episodes"] = get_total_episodes_task(root, task_id, task_ranges, step)
info["total_videos"] = info["total_episodes"] * NUM_CAMERAS info["total_videos"] = info["total_episodes"] * NUM_CAMERAS
info["total_frames"] = get_total_frames_task(root, meta_path, task_id, task_ranges, step) info["total_frames"] = get_total_frames_task(root, meta_path, task_id, task_ranges, step)
info["total_tasks"] = 1 info["total_tasks"] = 1
write_info(info, new_root) write_info(info, new_root)
def load_jsonlines(fpath: Path) -> list[any]: def load_jsonlines(fpath: Path) -> list[any]:
with jsonlines.open(fpath, "r") as reader: with jsonlines.open(fpath, "r") as reader:
return list(reader) return list(reader)
def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]: def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]:
tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH)
# return tasks dict such that # return tasks dict such that
tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])} tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])}
task_to_task_index = {task: task_index for task_index, task in tasks.items()} task_to_task_index = {task: task_index for task_index, task in tasks.items()}
return tasks, task_to_task_index return tasks, task_to_task_index
def convert_tasks(root, new_root, task_id: int): def convert_tasks(root, new_root, task_id: int):
tasks, _ = legacy_load_tasks(root) tasks, _ = legacy_load_tasks(root)
if task_id not in tasks: if task_id not in tasks:
@@ -146,12 +149,14 @@ def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
concatenated_df.to_parquet(path, index=False, schema=schema) concatenated_df.to_parquet(path, index=False, schema=schema)
def get_image_keys(root): def get_image_keys(root):
info = load_info(root) info = load_info(root)
features = info["features"] features = info["features"]
image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"] image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"]
return image_keys return image_keys
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int): def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int):
task_dir_name = f"task-00{task_index}" task_dir_name = f"task-00{task_index}"
data_dir = root / "data" / task_dir_name data_dir = root / "data" / task_dir_name
@@ -202,7 +207,10 @@ def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_ind
return episodes_metadata return episodes_metadata
def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int):
def convert_videos_of_camera(
root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int
):
# Access old paths to mp4 # Access old paths to mp4
# videos_dir = root / "videos" # videos_dir = root / "videos"
# ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4")) # ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4"))
@@ -275,6 +283,7 @@ def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_f
return episodes_metadata return episodes_metadata
def get_video_keys(root): def get_video_keys(root):
info = load_info(root) info = load_info(root)
features = info["features"] features = info["features"]
@@ -321,6 +330,7 @@ def convert_videos(root: Path, new_root: Path, video_file_size_in_mb: int, task_
import json import json
from pathlib import Path from pathlib import Path
def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict:
""" """
Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task. Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task.
@@ -333,7 +343,7 @@ def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict:
ep_start = None ep_start = None
ep_end = None ep_end = None
with open(episodes_jsonl_path, "r") as f: with open(episodes_jsonl_path) as f:
for line in f: for line in f:
if not line.strip(): if not line.strip():
continue continue
@@ -370,6 +380,7 @@ def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict:
return task_ranges return task_ranges
def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict:
""" """
Load only the episodes belonging to a specific task, inferred automatically from episode ranges. Load only the episodes belonging to a specific task, inferred automatically from episode ranges.
@@ -392,10 +403,12 @@ def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict,
task_episode_indices = range(ep_start, ep_end + step, step) task_episode_indices = range(ep_start, ep_end + step, step)
return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes} return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes}
def legacy_load_episodes(local_dir: Path) -> dict: def legacy_load_episodes(local_dir: Path) -> dict:
episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH) episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH)
return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])} return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])}
def legacy_load_episodes_stats(local_dir: Path) -> dict: def legacy_load_episodes_stats(local_dir: Path) -> dict:
episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH) episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH)
return { return {
@@ -403,6 +416,7 @@ def legacy_load_episodes_stats(local_dir: Path) -> dict:
for item in sorted(episodes_stats, key=lambda x: x["episode_index"]) for item in sorted(episodes_stats, key=lambda x: x["episode_index"])
} }
def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict:
all_stats = legacy_load_episodes_stats(local_dir) all_stats = legacy_load_episodes_stats(local_dir)
@@ -415,6 +429,7 @@ def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges:
task_episode_indices = range(ep_start, ep_end + step, step) task_episode_indices = range(ep_start, ep_end + step, step)
return {i: all_stats[i] for i in task_episode_indices if i in all_stats} return {i: all_stats[i] for i in task_episode_indices if i in all_stats}
def generate_episode_metadata_dict( def generate_episode_metadata_dict(
episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None
): ):
@@ -448,7 +463,10 @@ def generate_episode_metadata_dict(
ep_dict["meta/episodes/file_index"] = 0 ep_dict["meta/episodes/file_index"] = 0
yield ep_dict yield ep_dict
def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None):
def convert_episodes_metadata(
root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None
):
logging.info(f"Converting episodes metadata from {root} to {new_root}") logging.info(f"Converting episodes metadata from {root} to {new_root}")
# filter by task # filter by task
@@ -472,9 +490,10 @@ def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, t
stats = aggregate_stats(list(episodes_stats.values())) stats = aggregate_stats(list(episodes_stats.values()))
write_stats(stats, new_root) write_stats(stats, new_root)
import shutil
from pathlib import Path from pathlib import Path
def convert_dataset_local( def convert_dataset_local(
data_path: Path, data_path: Path,
new_repo: Path, new_repo: Path,
@@ -511,22 +530,41 @@ def convert_dataset_local(
EPISODES_META_PATH = root / "meta" / "episodes.jsonl" EPISODES_META_PATH = root / "meta" / "episodes.jsonl"
task_ranges = infer_task_episode_ranges(EPISODES_META_PATH) task_ranges = infer_task_episode_ranges(EPISODES_META_PATH)
# def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step): # def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step):
convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, EPISODES_META_PATH, task_id, task_ranges, STEP) convert_info(
root,
new_root,
data_file_size_in_mb,
video_file_size_in_mb,
EPISODES_META_PATH,
task_id,
task_ranges,
STEP,
)
convert_tasks(root, new_root, task_id) convert_tasks(root, new_root, task_id)
episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id) episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id)
episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id) episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id)
convert_episodes_metadata(root, new_root, episodes_metadata, task_id=task_id, task_ranges=task_ranges, episodes_video_metadata=episodes_videos_metadata) convert_episodes_metadata(
root,
new_root,
episodes_metadata,
task_id=task_id,
task_ranges=task_ranges,
episodes_video_metadata=episodes_videos_metadata,
)
print(f"✅ Conversion complete for task {task_id}") print(f"✅ Conversion complete for task {task_id}")
print(f"Converted dataset written to: {new_root}") print(f"Converted dataset written to: {new_root}")
if __name__ == "__main__": if __name__ == "__main__":
import argparse import argparse
from pathlib import Path from pathlib import Path
init_logging() init_logging()
parser = argparse.ArgumentParser(description="Convert Behavior-1K tasks to LeRobot v3 format (local only)") parser = argparse.ArgumentParser(
description="Convert Behavior-1K tasks to LeRobot v3 format (local only)"
)
parser.add_argument( parser.add_argument(
"--data-path", "--data-path",
type=str, type=str,
+67 -21
View File
@@ -1,14 +1,14 @@
from pathlib import Path from pathlib import Path
import jsonlines import jsonlines
DATA_PATH = Path("/fsx/francesco_capuano/.cache/behavior-1k/2025-challenge-demos") DATA_PATH = Path("/fsx/francesco_capuano/.cache/behavior-1k/2025-challenge-demos")
NEW_PATH = Path("/fsx/jade_choghari/.cache/behavior-1k-task0/") NEW_PATH = Path("/fsx/jade_choghari/.cache/behavior-1k-task0/")
import argparse import argparse
import logging import logging
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import Any
import jsonlines
import pandas as pd import pandas as pd
import pyarrow as pa import pyarrow as pa
import tqdm import tqdm
@@ -42,30 +42,29 @@ from lerobot.datasets.utils import (
from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s
from lerobot.utils.constants import HF_LEROBOT_HOME from lerobot.utils.constants import HF_LEROBOT_HOME
from lerobot.utils.utils import init_logging from lerobot.utils.utils import init_logging
# script to convert one single task to v3.1 # script to convert one single task to v3.1
# TASK = 1 # TASK = 1
NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb") NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb")
from lerobot.datasets.utils import load_info
def get_total_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step) -> int: def get_total_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step) -> int:
""" """
Calculates the total number of episodes for a single, specified task. Calculates the total number of episodes for a single, specified task.
""" """
# Simply load the episodes for the task and count them. # Simply load the episodes for the task and count them.
episodes = legacy_load_episodes_task( episodes = legacy_load_episodes_task(
local_dir=local_dir, local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step
task_id=task_id,
task_ranges=task_ranges,
step=step
) )
return len(episodes) return len(episodes)
NUM_CAMERAS = 9 NUM_CAMERAS = 9
def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, step: int) -> int: def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, step: int) -> int:
episodes_metadata = legacy_load_episodes_task( episodes_metadata = legacy_load_episodes_task(
local_dir=local_dir, local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step
task_id=task_id,
task_ranges=task_ranges,
step=step
) )
total_frames = 0 total_frames = 0
# like 'duration' # like 'duration'
@@ -74,7 +73,10 @@ def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict,
total_frames += int(duration_s) total_frames += int(duration_s)
return total_frames return total_frames
def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step):
def convert_info(
root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step
):
info = load_info(root) info = load_info(root)
info["codebase_version"] = "v3.0" info["codebase_version"] = "v3.0"
del info["total_videos"] del info["total_videos"]
@@ -88,26 +90,30 @@ def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, me
# already has fps in video_info # already has fps in video_info
continue continue
info["features"][key]["fps"] = info["fps"] info["features"][key]["fps"] = info["fps"]
info["total_episodes"] = get_total_episodes_task(root, task_id, task_ranges, step) info["total_episodes"] = get_total_episodes_task(root, task_id, task_ranges, step)
info["total_videos"] = info["total_episodes"] * NUM_CAMERAS info["total_videos"] = info["total_episodes"] * NUM_CAMERAS
info["total_frames"] = get_total_frames_task(root, meta_path, task_id, task_ranges, step) info["total_frames"] = get_total_frames_task(root, meta_path, task_id, task_ranges, step)
info["total_tasks"] = 1 info["total_tasks"] = 1
write_info(info, new_root) write_info(info, new_root)
# convert_info(DATA_PATH, 12, 24) # convert_info(DATA_PATH, 12, 24)
def load_jsonlines(fpath: Path) -> list[any]: def load_jsonlines(fpath: Path) -> list[any]:
with jsonlines.open(fpath, "r") as reader: with jsonlines.open(fpath, "r") as reader:
return list(reader) return list(reader)
def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]: def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]:
tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH)
# return tasks dict such that # return tasks dict such that
tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])} tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])}
task_to_task_index = {task: task_index for task_index, task in tasks.items()} task_to_task_index = {task: task_index for task_index, task in tasks.items()}
return tasks, task_to_task_index return tasks, task_to_task_index
def convert_tasks(root, new_root, task_id: int): def convert_tasks(root, new_root, task_id: int):
tasks, _ = legacy_load_tasks(root) tasks, _ = legacy_load_tasks(root)
if task_id not in tasks: if task_id not in tasks:
@@ -118,8 +124,10 @@ def convert_tasks(root, new_root, task_id: int):
df_tasks = pd.DataFrame({"task_index": task_indices}, index=task_strings) df_tasks = pd.DataFrame({"task_index": task_indices}, index=task_strings)
write_tasks(df_tasks, new_root) write_tasks(df_tasks, new_root)
# convert_tasks(DATA_PATH) # convert_tasks(DATA_PATH)
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys): def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets # TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
dataframes = [pd.read_parquet(file) for file in paths_to_cat] dataframes = [pd.read_parquet(file) for file in paths_to_cat]
@@ -139,12 +147,14 @@ def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
concatenated_df.to_parquet(path, index=False, schema=schema) concatenated_df.to_parquet(path, index=False, schema=schema)
def get_image_keys(root): def get_image_keys(root):
info = load_info(root) info = load_info(root)
features = info["features"] features = info["features"]
image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"] image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"]
return image_keys return image_keys
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int): def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int):
task_dir_name = f"task-000{task_index}" task_dir_name = f"task-000{task_index}"
data_dir = root / "data" / task_dir_name data_dir = root / "data" / task_dir_name
@@ -198,10 +208,14 @@ def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_ind
return episodes_metadata return episodes_metadata
# episodes_metadata = convert_data(DATA_PATH, NEW_ROOT, 50, TASK) # episodes_metadata = convert_data(DATA_PATH, NEW_ROOT, 50, TASK)
# print("episodes meta: ", episodes_metadata) # print("episodes meta: ", episodes_metadata)
def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int):
def convert_videos_of_camera(
root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int
):
# Access old paths to mp4 # Access old paths to mp4
# videos_dir = root / "videos" # videos_dir = root / "videos"
# ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4")) # ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4"))
@@ -274,6 +288,7 @@ def convert_videos_of_camera(root: Path, new_root: Path, video_key: str, video_f
return episodes_metadata return episodes_metadata
def get_video_keys(root): def get_video_keys(root):
info = load_info(root) info = load_info(root)
features = info["features"] features = info["features"]
@@ -320,6 +335,7 @@ def convert_videos(root: Path, new_root: Path, video_file_size_in_mb: int, task_
import json import json
from pathlib import Path from pathlib import Path
def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict: def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict:
""" """
Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task. Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task.
@@ -332,7 +348,7 @@ def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict:
ep_start = None ep_start = None
ep_end = None ep_end = None
with open(episodes_jsonl_path, "r") as f: with open(episodes_jsonl_path) as f:
for line in f: for line in f:
if not line.strip(): if not line.strip():
continue continue
@@ -369,6 +385,7 @@ def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict:
return task_ranges return task_ranges
def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict:
""" """
Load only the episodes belonging to a specific task, inferred automatically from episode ranges. Load only the episodes belonging to a specific task, inferred automatically from episode ranges.
@@ -391,10 +408,12 @@ def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict,
task_episode_indices = range(ep_start, ep_end + step, step) task_episode_indices = range(ep_start, ep_end + step, step)
return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes} return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes}
def legacy_load_episodes(local_dir: Path) -> dict: def legacy_load_episodes(local_dir: Path) -> dict:
episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH) episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH)
return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])} return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])}
# episodes_videos_metadata = convert_videos(DATA_PATH, NEW_ROOT, 50) # episodes_videos_metadata = convert_videos(DATA_PATH, NEW_ROOT, 50)
# episodes_legacy_metadata = legacy_load_episodes(DATA_PATH) # episodes_legacy_metadata = legacy_load_episodes(DATA_PATH)
# episodes_task_0 = legacy_load_episodes_task(DATA_PATH, task_id=TASK, task_ranges=task_ranges) # episodes_task_0 = legacy_load_episodes_task(DATA_PATH, task_id=TASK, task_ranges=task_ranges)
@@ -407,6 +426,7 @@ def legacy_load_episodes_stats(local_dir: Path) -> dict:
for item in sorted(episodes_stats, key=lambda x: x["episode_index"]) for item in sorted(episodes_stats, key=lambda x: x["episode_index"])
} }
def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict: def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict:
all_stats = legacy_load_episodes_stats(local_dir) all_stats = legacy_load_episodes_stats(local_dir)
@@ -419,6 +439,7 @@ def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges:
task_episode_indices = range(ep_start, ep_end + step, step) task_episode_indices = range(ep_start, ep_end + step, step)
return {i: all_stats[i] for i in task_episode_indices if i in all_stats} return {i: all_stats[i] for i in task_episode_indices if i in all_stats}
# ep = legacy_load_episodes_stats_task(DATA_PATH, task_id=TASK, task_ranges=task_ranges) # ep = legacy_load_episodes_stats_task(DATA_PATH, task_id=TASK, task_ranges=task_ranges)
def generate_episode_metadata_dict( def generate_episode_metadata_dict(
episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None
@@ -452,7 +473,10 @@ def generate_episode_metadata_dict(
ep_dict["meta/episodes/file_index"] = 0 ep_dict["meta/episodes/file_index"] = 0
yield ep_dict yield ep_dict
def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None):
def convert_episodes_metadata(
root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None
):
logging.info(f"Converting episodes metadata from {root} to {new_root}") logging.info(f"Converting episodes metadata from {root} to {new_root}")
# filter by task # filter by task
@@ -475,9 +499,11 @@ def convert_episodes_metadata(root, new_root, episodes_metadata, task_id: int, t
stats = aggregate_stats(list(episodes_stats.values())) stats = aggregate_stats(list(episodes_stats.values()))
write_stats(stats, new_root) write_stats(stats, new_root)
import shutil
from pathlib import Path from pathlib import Path
def convert_dataset_local( def convert_dataset_local(
data_path: Path, data_path: Path,
new_repo: Path, new_repo: Path,
@@ -514,22 +540,41 @@ def convert_dataset_local(
EPISODES_META_PATH = DATA_PATH / "meta" / "episodes.jsonl" EPISODES_META_PATH = DATA_PATH / "meta" / "episodes.jsonl"
task_ranges = infer_task_episode_ranges(EPISODES_META_PATH) task_ranges = infer_task_episode_ranges(EPISODES_META_PATH)
# def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step): # def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step):
convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb, EPISODES_META_PATH, task_id, task_ranges, STEP) convert_info(
root,
new_root,
data_file_size_in_mb,
video_file_size_in_mb,
EPISODES_META_PATH,
task_id,
task_ranges,
STEP,
)
convert_tasks(root, new_root, task_id) convert_tasks(root, new_root, task_id)
episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id) episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id)
episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id) episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id)
convert_episodes_metadata(root, new_root, episodes_metadata, task_id=task_id, task_ranges=task_ranges, episodes_video_metadata=episodes_videos_metadata) convert_episodes_metadata(
root,
new_root,
episodes_metadata,
task_id=task_id,
task_ranges=task_ranges,
episodes_video_metadata=episodes_videos_metadata,
)
print(f"✅ Conversion complete for task {task_id}") print(f"✅ Conversion complete for task {task_id}")
print(f"Converted dataset written to: {new_root}") print(f"Converted dataset written to: {new_root}")
if __name__ == "__main__": if __name__ == "__main__":
import argparse import argparse
from pathlib import Path from pathlib import Path
init_logging() init_logging()
parser = argparse.ArgumentParser(description="Convert Behavior-1K tasks to LeRobot v3 format (local only)") parser = argparse.ArgumentParser(
description="Convert Behavior-1K tasks to LeRobot v3 format (local only)"
)
parser.add_argument( parser.add_argument(
"--data-path", "--data-path",
type=str, type=str,
@@ -577,6 +622,7 @@ if __name__ == "__main__":
force_conversion=args.force_conversion, force_conversion=args.force_conversion,
) )
def convert_dataset( def convert_dataset(
repo_id: str, repo_id: str,
branch: str | None = None, branch: str | None = None,