Compare commits

...

31 Commits

Author SHA1 Message Date
fracapuano 1c8f922379 fix: minor things on the aggregation job 2025-11-21 09:30:39 +00:00
fracapuano 2b2ff19366 fix the number of workers to prevent contention 2025-11-21 09:30:39 +00:00
fracapuano c912b1dd03 fix: upload with multiple workers 2025-11-21 09:30:38 +00:00
fracapuano ca1841f5fc add: aggregation util 2025-11-21 09:30:38 +00:00
fracapuano f6755dbf20 add: utils for stabler, large scale upload (ds.push_to_hub may fail) 2025-11-21 09:30:38 +00:00
fracapuano 0846b5704c fix: resources trim 2025-11-21 09:30:38 +00:00
fracapuano f386591be7 fix: jobs for conversion and aggregation 2025-11-21 09:30:38 +00:00
fracapuano f875566e1d add: downloading data utils 2025-11-21 09:30:37 +00:00
fracapuano eaea3806e8 add: util to download behavior data 2025-11-21 09:30:37 +00:00
fracapuano 1ef0f0bb86 remove: unused constants file 2025-11-21 09:30:37 +00:00
fracapuano e70dd620f3 add: final aggregation utils to obtain one dataset only 2025-11-21 09:30:37 +00:00
fracapuano 31274975f0 fix: minor checks 2025-11-21 09:30:37 +00:00
fracapuano edbfa3d3e6 fix: slurm job for parallel conversion on nodes 2025-11-21 09:30:36 +00:00
fracapuano 09e2a55901 fix: add upload to hub option 2025-11-21 09:30:36 +00:00
fracapuano 413c5e01be fix: implement actual conversion for lerobotdataset-v3 compatibility 2025-11-21 09:30:36 +00:00
fracapuano 91a0a4fe7a add: slurm conversion script 2025-11-21 09:30:36 +00:00
fracapuano 7710411d3a remove: unused, useless bespoke dataset format 2025-11-21 09:30:36 +00:00
fracapuano 4a153825ee fix: minor 2025-11-21 09:30:36 +00:00
fracapuano 46606359fc fix: metadata stores the saved 0-based episode index 2025-11-21 09:30:35 +00:00
fracapuano 1d0eb922bd fix: episode index is asserted 0-based in lerobot dataset 2025-11-21 09:30:35 +00:00
fracapuano 1612aa7ac7 fix bug: correctly specify paths 2025-11-21 09:30:35 +00:00
Francesco Capuano c1f5d8f48f fix: add frame idx 2025-11-21 09:30:35 +00:00
Michel Aractingi 14743b896e * refactor behaviour1k_lerobot_dataset.py
* add example scripts to load behaviour 1k data in `load_behaviour1k_dataset.py`
2025-11-21 09:30:35 +00:00
Jade Choghari 624939c71c remove tester 2025-11-21 09:30:34 +00:00
Jade Choghari a276f5b8ac fix style 2025-11-21 09:30:34 +00:00
Jade Choghari 33ff386dbc remove comments 2025-11-21 09:30:34 +00:00
Jade Choghari 50f8cbc392 update changes 2025-11-21 09:30:34 +00:00
Jade Choghari 23999ba40d update
Signed-off-by: Jade Choghari <chogharijade@gmail.com>
2025-11-21 09:30:34 +00:00
Jade Choghari dd4837f06e add
Signed-off-by: Jade Choghari <chogharijade@gmail.com>
2025-11-21 09:30:34 +00:00
Michel Aractingi 9f00d2c3a2 Modify convert_to_lerobot_v3 script for behaviours dataset to take a single task id and create a dataset outof it 2025-11-21 09:30:33 +00:00
Michel Aractingi 950a6fb83d add scripts for convert behavior-1k to datasetv3 2025-11-21 09:30:33 +00:00
8 changed files with 1036 additions and 0 deletions
+29
View File
@@ -0,0 +1,29 @@
#!/bin/bash
#SBATCH -J b1k-aggregate
#SBATCH -p hopper-cpu
#SBATCH --qos=high
#SBATCH -c 2
#SBATCH -t 20:00:00
#SBATCH --mem=4G
#SBATCH -D /admin/home/francesco_capuano/lerobot
#SBATCH -o /admin/home/francesco_capuano/lerobot/examples/behavior_1k/logs/%x-%j.out
#SBATCH -e /admin/home/francesco_capuano/lerobot/examples/behavior_1k/logs/%x-%j.err
set -euo pipefail
set -x
export PYTHONUNBUFFERED=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK:-1}
source "$HOME/.bashrc" 2>/dev/null || true
if ! command -v conda >/dev/null 2>&1; then
source "$HOME/miniconda3/etc/profile.d/conda.sh" 2>/dev/null || true
source "$HOME/anaconda3/etc/profile.d/conda.sh" 2>/dev/null || true
fi
conda activate lerobot
python examples/behavior_1k/aggregate_tasks_datasets.py \
--task-datasets-dir /fsx/francesco_capuano/behavior1k-v3 \
--aggregated-root /fsx/francesco_capuano/behavior1k-v3/behavior1k \
--num-tasks 50 \
--hf-user fracapuano \
--push-to-hub
@@ -0,0 +1,100 @@
"""Aggregate multiple task-specific LeRobot datasets into a single combined dataset."""
import argparse
import os
from pathlib import Path
from lerobot.datasets.aggregate import aggregate_datasets
from lerobot.datasets.lerobot_dataset import LeRobotDataset
def main():
parser = argparse.ArgumentParser(
description="Aggregate multiple task-specific datasets into a single LeRobot dataset"
)
parser.add_argument(
"--task-datasets-dir",
type=str,
required=True,
help="Directory containing individual task datasets (e.g., /path/to/behavior1k/)",
)
parser.add_argument(
"--aggregated-root",
type=str,
required=True,
help="Path where the aggregated dataset will be written",
)
parser.add_argument(
"--num-tasks",
type=int,
default=50,
help="Number of tasks to aggregate (default: 50)",
)
parser.add_argument(
"--task-start-idx",
type=int,
default=0,
help="Starting task index (default: 0)",
)
parser.add_argument(
"--hf-user",
type=str,
default=None,
help="HuggingFace username for repo IDs (defaults to HF_USER env var or 'lerobot')",
)
parser.add_argument(
"--aggregated-repo-id",
type=str,
default=None,
help="Repository ID for the aggregated dataset (defaults to {hf_user}/behavior1k)",
)
parser.add_argument(
"--push-to-hub",
action="store_true",
help="Push the aggregated dataset to the Hugging Face Hub",
)
args = parser.parse_args()
# Determine HF user
hf_user = args.hf_user or os.environ.get("HF_USER", "lerobot")
# Set default aggregated repo ID if not provided
aggregated_repo_id = args.aggregated_repo_id or f"{hf_user}/behavior1k"
# Generate task indices
task_indices = range(args.task_start_idx, args.task_start_idx + args.num_tasks)
# Generate repo IDs for individual tasks
repo_ids = [f"{hf_user}/behavior1k-task{i:04d}" for i in task_indices]
# Generate local paths for individual task datasets
task_datasets_dir = Path(args.task_datasets_dir)
roots = [task_datasets_dir / f"behavior1k-task{i:04d}" for i in task_indices]
# Aggregated dataset path
aggregated_root = Path(args.aggregated_root)
print(f"🔹 Aggregating {args.num_tasks} task datasets")
print(f"Task datasets directory: {task_datasets_dir}")
print(f"Aggregated output: {aggregated_root}")
print(f"Aggregated repo ID: {aggregated_repo_id}")
aggregate_datasets(
repo_ids=repo_ids,
roots=roots,
aggr_repo_id=aggregated_repo_id,
aggr_root=aggregated_root,
)
print("✅ Aggregation complete")
if args.push_to_hub:
print(f"📤 Pushing aggregated dataset to {aggregated_repo_id}")
ds = LeRobotDataset(repo_id=aggregated_repo_id, root=aggregated_root)
ds.push_to_hub()
print("✅ Successfully pushed to hub")
if __name__ == "__main__":
main()
+38
View File
@@ -0,0 +1,38 @@
#!/bin/bash
#SBATCH -J b1k-convert
#SBATCH -p hopper-cpu # pick your partition
#SBATCH --qos=high
#SBATCH --array=0-49%8 # 50 tasks, max 8 running concurrently (conversion is I/O bound)
#SBATCH -c 1 # CPUs per conversion (tune as needed)
#SBATCH -t 2:00:00 # Time per conversion
#SBATCH --mem=3G # ~1.75GB for task 0, ~doubled for safety
#SBATCH -D /admin/home/francesco_capuano/lerobot
#SBATCH -o /admin/home/francesco_capuano/lerobot/examples/behavior_1k/logs/%x-%A_%a.out
#SBATCH -e /admin/home/francesco_capuano/lerobot/examples/behavior_1k/logs/%x-%A_%a.err
set -euo pipefail
set -x
export PYTHONUNBUFFERED=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK:-1} # avoid BLAS oversubscription
DATA_PATH="/fsx/francesco_capuano/behavior1k-2025-v21"
BASE_OUT="/fsx/francesco_capuano/behavior1k-v3"
mkdir -p "$BASE_OUT" logs
i="${SLURM_ARRAY_TASK_ID}"
OUT_DIR="$(printf "%s/behavior1k-task%04d" "$BASE_OUT" "$i")"
# activate your env if needed
source "$HOME/.bashrc" 2>/dev/null || true
if ! command -v conda >/dev/null 2>&1; then
source "$HOME/miniconda3/etc/profile.d/conda.sh" 2>/dev/null || true
source "$HOME/anaconda3/etc/profile.d/conda.sh" 2>/dev/null || true
fi
conda activate lerobot
python examples/behavior_1k/convert_to_lerobot_v3.py \
--data-path "$DATA_PATH" \
--new-repo "$OUT_DIR" \
--task-id "$i" \
--force-conversion \
--push-to-hub
+667
View File
@@ -0,0 +1,667 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Convert Behavior Dataset to LeRobotDataset v3.0 format"""
import argparse
import json
import logging
import os
import shutil
from pathlib import Path
import jsonlines
import pandas as pd
import pyarrow as pa
import tqdm
from datasets import Dataset, Features, Image
from lerobot.datasets.compute_stats import aggregate_stats
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import (
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH,
DEFAULT_FEATURES,
DEFAULT_VIDEO_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_PATH,
LEGACY_EPISODES_PATH,
LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH,
cast_stats_to_numpy,
flatten_dict,
get_file_size_in_mb,
get_parquet_file_size_in_mb,
get_parquet_num_frames,
load_info,
update_chunk_file_indices,
write_episodes,
write_info,
write_stats,
write_tasks,
)
from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s
from lerobot.utils.utils import init_logging
# script to convert one single task to v3.1
# TASK = 1
NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb")
def fix_episode_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Performs several fixes to an underlying dataframe to make it LeRobotDataset-v3 compatible"""
# Inject per-episode frame_index if missing (0..N-1 within each episode)
if "frame_index" not in df.columns:
df["frame_index"] = range(len(df))
# Remove variable-length task_info feature (NOTE(fracapuano): change to padding at some point?)
if "observation.task_info" in df.columns:
df = df.drop(columns=["observation.task_info"])
# NOTE(fracapuano): tasks are ordered (and there is one task per file/dataset)
if "task_index" in df.columns:
df["task_index"] = 0
return df
def get_total_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step) -> int:
"""
Calculates the total number of episodes for a single, specified task.
"""
# Simply load the episodes for the task and count them.
episodes = legacy_load_episodes_task(
local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step
)
return len(episodes)
NUM_CAMERAS = 9
def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, step: int) -> int:
episodes_metadata = legacy_load_episodes_task(
local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step
)
total_frames = 0
# like 'duration'
for ep in episodes_metadata.values():
duration_s = ep["length"]
total_frames += int(duration_s)
return total_frames
def convert_info(
root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step
):
info = load_info(root)
features = {**info["features"], **DEFAULT_FEATURES}
del features[
"observation.task_info"
] # variable-length task_info is not supported in LeRobotDataset v3.0!
info["codebase_version"] = "v3.0"
info["features"] = features
del info["total_videos"]
info["data_files_size_in_mb"] = data_file_size_in_mb
info["video_files_size_in_mb"] = video_file_size_in_mb
info["data_path"] = DEFAULT_DATA_PATH
info["video_path"] = DEFAULT_VIDEO_PATH if info["video_path"] is not None else None
info["fps"] = int(info["fps"])
for key in info["features"]:
if info["features"][key]["dtype"] == "video":
# already has fps in video_info
continue
info["features"][key]["fps"] = info["fps"]
info["total_episodes"] = get_total_episodes_task(root, task_id, task_ranges, step)
info["total_videos"] = info["total_episodes"] * NUM_CAMERAS
info["total_frames"] = get_total_frames_task(root, meta_path, task_id, task_ranges, step)
info["total_tasks"] = 1
write_info(info, new_root)
def load_jsonlines(fpath: Path) -> list[any]:
with jsonlines.open(fpath, "r") as reader:
return list(reader)
def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]:
tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH)
# return tasks dict such that
tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])}
task_to_task_index = {task: task_index for task_index, task in tasks.items()}
return tasks, task_to_task_index
def convert_tasks(root, new_root, task_id: int):
tasks, _ = legacy_load_tasks(root)
if task_id not in tasks:
raise ValueError(f"Task ID {task_id} not found in tasks (available: {list(tasks.keys())})")
tasks = {task_id: tasks[task_id]}
# Tasks are ordered with 0..ntasks-1 in the converted dataset
task_indices = range(len(tasks.keys()))
task_strings = tasks.values()
df_tasks = pd.DataFrame({"task_index": task_indices}, index=task_strings)
write_tasks(df_tasks, new_root)
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
dataframes = []
for file in paths_to_cat:
df = pd.read_parquet(file)
df = fix_episode_dataframe(df)
dataframes.append(df)
# Concatenate all DataFrames along rows
concatenated_df = pd.concat(dataframes, ignore_index=True)
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
if len(image_keys) > 0:
schema = pa.Schema.from_pandas(concatenated_df)
features = Features.from_arrow_schema(schema)
for key in image_keys:
features[key] = Image()
schema = features.arrow_schema
else:
schema = None
concatenated_df.to_parquet(path, index=False, schema=schema)
def get_image_keys(root):
info = load_info(root)
features = info["features"]
image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"]
return image_keys
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int):
task_dir_name = f"task-{task_index:04d}"
data_dir = root / "data" / task_dir_name
ep_paths = sorted(data_dir.glob("*.parquet"))
image_keys = get_image_keys(root)
ep_idx = 0
chunk_idx = 0
file_idx = 0
size_in_mb = 0
num_frames = 0
paths_to_cat = []
episodes_metadata = []
logging.info(f"Converting data files from {len(ep_paths)} episodes")
for ep_path in tqdm.tqdm(ep_paths, desc="convert data files"):
ep_size_in_mb = get_parquet_file_size_in_mb(ep_path)
ep_num_frames = get_parquet_num_frames(ep_path)
ep_metadata = {
"episode_index": ep_idx,
"data/chunk_index": chunk_idx,
"data/file_index": file_idx,
"dataset_from_index": num_frames,
"dataset_to_index": num_frames + ep_num_frames,
}
size_in_mb += ep_size_in_mb
num_frames += ep_num_frames
episodes_metadata.append(ep_metadata)
# write 0-based episode index instead of custom episode index (otherwise breaks compatibility with LeRobotDataset)
tmp_df = pd.read_parquet(ep_path)
tmp_df["episode_index"] = ep_idx
tmp_df.to_parquet(ep_path)
ep_idx += 1
if size_in_mb < data_file_size_in_mb:
paths_to_cat.append(ep_path)
continue
if paths_to_cat:
concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys)
# Reset for the next file
size_in_mb = ep_size_in_mb
paths_to_cat = [ep_path]
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE)
# Write remaining data if any
if paths_to_cat:
concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys)
return episodes_metadata
def convert_videos_of_camera(
root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int
):
# Access old paths to mp4
# videos_dir = root / "videos"
# ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4"))
task_dir_name = f"task-{task_index:04d}"
videos_dir = root / "videos" / task_dir_name / video_key
ep_paths = sorted(videos_dir.glob("*.mp4"))
ep_idx = 0
chunk_idx = 0
file_idx = 0
size_in_mb = 0
duration_in_s = 0.0
paths_to_cat = []
episodes_metadata = []
for ep_path in tqdm.tqdm(ep_paths, desc=f"convert videos of {video_key}"):
ep_size_in_mb = get_file_size_in_mb(ep_path)
ep_duration_in_s = get_video_duration_in_s(ep_path)
# Check if adding this episode would exceed the limit
if size_in_mb + ep_size_in_mb >= video_file_size_in_mb and len(paths_to_cat) > 0:
# Size limit would be exceeded, save current accumulation WITHOUT this episode
concatenate_video_files(
paths_to_cat,
new_root
/ DEFAULT_VIDEO_PATH.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx),
)
# Update episodes metadata for the file we just saved
for i, _ in enumerate(paths_to_cat):
past_ep_idx = ep_idx - len(paths_to_cat) + i
episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx
episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx
# Move to next file and start fresh with current episode
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE)
size_in_mb = 0
duration_in_s = 0.0
paths_to_cat = []
# Add current episode metadata
ep_metadata = {
"episode_index": ep_idx,
f"videos/{video_key}/chunk_index": chunk_idx, # Will be updated when file is saved
f"videos/{video_key}/file_index": file_idx, # Will be updated when file is saved
f"videos/{video_key}/from_timestamp": duration_in_s,
f"videos/{video_key}/to_timestamp": duration_in_s + ep_duration_in_s,
}
episodes_metadata.append(ep_metadata)
# Add current episode to accumulation
paths_to_cat.append(ep_path)
size_in_mb += ep_size_in_mb
duration_in_s += ep_duration_in_s
ep_idx += 1
# Write remaining videos if any
if paths_to_cat:
concatenate_video_files(
paths_to_cat,
new_root
/ DEFAULT_VIDEO_PATH.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx),
)
# Update episodes metadata for the final file
for i, _ in enumerate(paths_to_cat):
past_ep_idx = ep_idx - len(paths_to_cat) + i
episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx
episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx
return episodes_metadata
def get_video_keys(root):
info = load_info(root)
features = info["features"]
video_keys = [key for key, ft in features.items() if ft["dtype"] == "video"]
return video_keys
def convert_videos(root: Path, new_root: Path, video_file_size_in_mb: int, task_id: int):
logging.info(f"Converting videos from {root} to {new_root}")
video_keys = get_video_keys(root)
if len(video_keys) == 0:
return None
video_keys = sorted(video_keys)
eps_metadata_per_cam = []
for camera in video_keys:
eps_metadata = convert_videos_of_camera(root, new_root, camera, video_file_size_in_mb, task_id)
eps_metadata_per_cam.append(eps_metadata)
num_eps_per_cam = [len(eps_cam_map) for eps_cam_map in eps_metadata_per_cam]
if len(set(num_eps_per_cam)) != 1:
raise ValueError(f"All cams dont have same number of episodes ({num_eps_per_cam}).")
episodes_metadata = []
num_cameras = len(video_keys)
num_episodes = num_eps_per_cam[0]
for ep_idx in tqdm.tqdm(range(num_episodes), desc="convert videos"):
# Sanity check
ep_ids = [eps_metadata_per_cam[cam_idx][ep_idx]["episode_index"] for cam_idx in range(num_cameras)]
ep_ids += [ep_idx]
if len(set(ep_ids)) != 1:
raise ValueError(f"All episode indices need to match ({ep_ids}).")
ep_dict = {}
for cam_idx in range(num_cameras):
ep_dict.update(eps_metadata_per_cam[cam_idx][ep_idx])
episodes_metadata.append(ep_dict)
return episodes_metadata
def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict:
"""
Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task.
Returns a dict:
{ task_id: { "task_string": ..., "ep_start": ..., "ep_end": ... } }
"""
task_ranges = {}
task_id = 0
current_task_str = None
ep_start = None
ep_end = None
with open(episodes_jsonl_path) as f:
for line in f:
if not line.strip():
continue
ep = json.loads(line)
ep_idx = ep["episode_index"]
task_str = ep["tasks"][0] if ep["tasks"] else "UNKNOWN"
if current_task_str is None:
current_task_str = task_str
ep_start = ep_idx
ep_end = ep_idx
elif task_str == current_task_str:
ep_end = ep_idx
else:
# close previous task group
task_ranges[task_id] = {
"task_string": current_task_str,
"ep_start": ep_start,
"ep_end": ep_end,
}
task_id += 1
# start new one
current_task_str = task_str
ep_start = ep_idx
ep_end = ep_idx
# store last task
if current_task_str is not None:
task_ranges[task_id] = {
"task_string": current_task_str,
"ep_start": ep_start,
"ep_end": ep_end,
}
return task_ranges
def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict:
"""
Load only the episodes belonging to a specific task, inferred automatically from episode ranges.
Args:
local_dir (Path): Root path containing legacy meta/episodes.jsonl
task_id (int): Which task to load (key from the inferred task_ranges dict)
task_ranges (dict): Mapping from infer_task_episode_ranges()
step (int): Episode index step (Behavior-1K = 10)
"""
all_episodes = legacy_load_episodes(local_dir)
# get the range for this task
if task_id not in task_ranges:
raise ValueError(f"Task id {task_id} not found in task_ranges")
ep_start = task_ranges[task_id]["ep_start"]
ep_end = task_ranges[task_id]["ep_end"]
task_episode_indices = range(ep_start, ep_end + step, step)
return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes}
def legacy_load_episodes(local_dir: Path) -> dict:
episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH)
return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])}
def legacy_load_episodes_stats(local_dir: Path) -> dict:
episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH)
return {
item["episode_index"]: cast_stats_to_numpy(item["stats"])
for item in sorted(episodes_stats, key=lambda x: x["episode_index"])
}
def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict:
all_stats = legacy_load_episodes_stats(local_dir)
if task_id not in task_ranges:
raise ValueError(f"Task id {task_id} not found in task_ranges")
ep_start = task_ranges[task_id]["ep_start"]
ep_end = task_ranges[task_id]["ep_end"]
task_episode_indices = range(ep_start, ep_end + step, step)
return {i: all_stats[i] for i in task_episode_indices if i in all_stats}
def generate_episode_metadata_dict(
episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None
):
num_episodes = len(episodes_metadata)
episodes_legacy_metadata_vals = list(episodes_legacy_metadata.values())
episodes_stats_vals = list(episodes_stats.values())
episodes_stats_keys = list(episodes_stats.keys())
for i in range(num_episodes):
ep_legacy_metadata = episodes_legacy_metadata_vals[i]
ep_metadata = episodes_metadata[i]
ep_stats = episodes_stats_vals[i]
ep_ids_set = {
ep_legacy_metadata["episode_index"],
ep_metadata["episode_index"],
episodes_stats_keys[i],
}
if episodes_videos is None:
ep_video = {}
else:
ep_video = episodes_videos[i]
ep_ids_set.add(ep_video["episode_index"])
ep_dict = {
**ep_legacy_metadata,
**ep_video,
**ep_metadata,
**flatten_dict({"stats": ep_stats}),
}
# enforce contiguous indexing 0..n-1, but also stores the legacy episode index
ep_dict["episode_index"] = i
yield ep_dict
def convert_episodes_metadata(
root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None
):
logging.info(f"Converting episodes metadata from {root} to {new_root}")
# filter by task
episodes_legacy_metadata = legacy_load_episodes_task(root, task_id=task_id, task_ranges=task_ranges)
episodes_stats = legacy_load_episodes_stats_task(root, task_id=task_id, task_ranges=task_ranges)
num_eps_set = {len(episodes_legacy_metadata), len(episodes_metadata)}
if episodes_video_metadata is not None:
num_eps_set.add(len(episodes_video_metadata))
if len(num_eps_set) != 1:
raise ValueError(f"Number of episodes is not the same ({num_eps_set}).")
# Single file approach: set meta indices to 0 for all rows and write once
ds_episodes = Dataset.from_generator(
lambda: generate_episode_metadata_dict(
episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_video_metadata
)
)
num_eps = len(ds_episodes)
# NOTE(fracapuano): for the size of the average dataset this is fine!
ds_episodes = ds_episodes.add_column("meta/episodes/chunk_index", [0] * num_eps)
ds_episodes = ds_episodes.add_column("meta/episodes/file_index", [0] * num_eps)
write_episodes(ds_episodes, new_root)
stats = aggregate_stats(list(episodes_stats.values()))
write_stats(stats, new_root)
def convert_dataset_local(
data_path: Path,
new_repo: Path,
task_id: int,
data_file_size_in_mb: int = DEFAULT_DATA_FILE_SIZE_IN_MB,
video_file_size_in_mb: int = DEFAULT_VIDEO_FILE_SIZE_IN_MB,
force_conversion: bool = False,
):
"""
Convert a local dataset to v3.x format, task-by-task, without using the Hugging Face Hub.
Args:
data_path (Path): path to local dataset root (e.g. /fsx/.../2025-challenge-demos)
new_repo (Path): path where converted dataset will be written (e.g. /fsx/.../behavior1k_v3)
task_id (int): which task to convert (index)
data_file_size_in_mb (int): max size per data chunk
video_file_size_in_mb (int): max size per video chunk
force_conversion (bool): overwrite existing conversion if True
"""
root = Path(data_path)
new_root = Path(new_repo)
# Clean up if needed
if new_root.exists() and force_conversion:
shutil.rmtree(new_root)
new_root.mkdir(parents=True, exist_ok=True)
print(f"🔹 Starting conversion for task {task_id}")
print(f"Input root: {root}")
print(f"Output root: {new_root}")
# Infer task episode ranges
episodes_meta_path = root / "meta" / "episodes.jsonl"
task_ranges = infer_task_episode_ranges(episodes_meta_path)
convert_info(
root,
new_root,
data_file_size_in_mb,
video_file_size_in_mb,
episodes_meta_path,
task_id,
task_ranges,
step=10,
)
convert_tasks(root, new_root, task_id)
episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id)
episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id)
convert_episodes_metadata(
root,
new_root,
episodes_metadata,
task_id=task_id,
task_ranges=task_ranges,
episodes_video_metadata=episodes_videos_metadata,
)
print(f"✅ Conversion complete for task {task_id}")
print(f"Converted dataset written to: {new_root}")
if __name__ == "__main__":
import argparse
from pathlib import Path
init_logging()
parser = argparse.ArgumentParser(
description="Convert Behavior-1K tasks to LeRobot v3 format (local only)"
)
parser.add_argument(
"--data-path",
type=str,
required=True,
help="Path to the local Behavior-1K dataset (e.g. /fsx/francesco_capuano/.cache/behavior-1k/2025-challenge-demos)",
)
parser.add_argument(
"--new-repo",
type=str,
required=True,
help="Path to the output directory for the converted dataset",
)
parser.add_argument(
"--task-id",
type=int,
required=True,
help="Task index to convert (e.g. 0, 1, 2, ...)",
)
parser.add_argument(
"--data-file-size-in-mb",
type=int,
default=DEFAULT_DATA_FILE_SIZE_IN_MB,
help=f"Maximum size per data chunk (default: {DEFAULT_DATA_FILE_SIZE_IN_MB})",
)
parser.add_argument(
"--video-file-size-in-mb",
type=int,
default=DEFAULT_VIDEO_FILE_SIZE_IN_MB,
help=f"Maximum size per video chunk (default: {DEFAULT_VIDEO_FILE_SIZE_IN_MB})",
)
parser.add_argument(
"--force-conversion",
action="store_true",
help="Force overwrite of existing conversion output if present.",
)
parser.add_argument(
"--push-to-hub",
action="store_true",
help="Push the (converted) dataset to the hub.",
)
args = parser.parse_args()
if args.push_to_hub:
HF_USER = os.environ.get("HF_USER", "fracapuano")
if HF_USER is None:
raise ValueError(
"HF_USER environment variable is not set! Set before converting and pushing to hub."
)
convert_dataset_local(
data_path=Path(args.data_path),
new_repo=Path(args.new_repo),
task_id=args.task_id,
data_file_size_in_mb=args.data_file_size_in_mb,
video_file_size_in_mb=args.video_file_size_in_mb,
force_conversion=args.force_conversion,
)
if args.push_to_hub:
ds = LeRobotDataset(repo_id=f"{HF_USER}/behavior1k-task{args.task_id:04d}", root=args.new_repo)
ds.push_to_hub()
+27
View File
@@ -0,0 +1,27 @@
#!/bin/bash
#SBATCH -J b1k-download
#SBATCH -p hopper-cpu
#SBATCH --qos=high
#SBATCH -c 32 # CPUs per conversion (tune as needed)
#SBATCH -t 20:00:00 # Time per conversion
#SBATCH -D /admin/home/francesco_capuano/lerobot
#SBATCH -o /admin/home/francesco_capuano/lerobot/examples/behavior_1k/logs/%x-%A.out
#SBATCH -e /admin/home/francesco_capuano/lerobot/examples/behavior_1k/logs/%x-%A.err
set -euo pipefail
set -x
export PYTHONUNBUFFERED=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK:-1}
# activate your env if needed
source "$HOME/.bashrc" 2>/dev/null || true
if ! command -v conda >/dev/null 2>&1; then
source "$HOME/miniconda3/etc/profile.d/conda.sh" 2>/dev/null || true
source "$HOME/anaconda3/etc/profile.d/conda.sh" 2>/dev/null || true
fi
conda activate lerobot
python examples/behavior_1k/download_data.py \
--repo-id "behavior-1k/2025-challenge-demos" \
--local-dir "/fsx/francesco_capuano/behavior1k-2025-v21" \
--max-workers 32
+26
View File
@@ -0,0 +1,26 @@
import shutil
from huggingface_hub import snapshot_download
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--repo-id", type=str, required=True)
parser.add_argument("--max-workers", type=int, default=8)
parser.add_argument("--local-dir", type=str, required=True)
parser.add_argument("--force-download", action="store_true")
args = parser.parse_args()
if args.force_download:
shutil.rmtree(args.local_dir, ignore_errors=True)
snapshot_download(
repo_id=args.repo_id,
repo_type="dataset",
force_download=args.force_download,
max_workers=args.max_workers,
local_dir=args.local_dir,
ignore_patterns=["annotations/*"], # NOTE(fracapuano): Dropping textual annotations right now
)
+41
View File
@@ -0,0 +1,41 @@
#!/bin/bash
#SBATCH -J b1k-upload
#SBATCH -p hopper-cpu
#SBATCH --qos=high
#SBATCH -c 1
#SBATCH -t 48:00:00
#SBATCH --mem=4G
#SBATCH --array=0-49%2
#SBATCH -D /admin/home/francesco_capuano/lerobot
#SBATCH -o /admin/home/francesco_capuano/lerobot/examples/behavior_1k/logs/%x-%A_%a.out
#SBATCH -e /admin/home/francesco_capuano/lerobot/examples/behavior_1k/logs/%x-%A_%a.err
set -euo pipefail
set -x
export PYTHONUNBUFFERED=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK:-1}
source "$HOME/.bashrc" 2>/dev/null || true
if ! command -v conda >/dev/null 2>&1; then
source "$HOME/miniconda3/etc/profile.d/conda.sh" 2>/dev/null || true
source "$HOME/anaconda3/etc/profile.d/conda.sh" 2>/dev/null || true
fi
conda activate lerobot
# The SLURM_ARRAY_TASK_ID will be used as the task-id
TASK_ID=${SLURM_ARRAY_TASK_ID}
# Configuration
ROOT_PATH="/fsx/francesco_capuano/behavior1k-v3"
HF_USER="fracapuano"
# Limit upload workers to reduce network contention (default in HF Hub is 4)
# For I/O-bound uploads, 2-4 workers per task is optimal
NUM_WORKERS=2
echo "Task ${TASK_ID}: uploading with ${NUM_WORKERS} workers from ${ROOT_PATH}"
python examples/behavior_1k/upload_folders.py \
--task-id ${TASK_ID} \
--root-path ${ROOT_PATH} \
--hf-user ${HF_USER} \
--num-workers ${NUM_WORKERS}
+108
View File
@@ -0,0 +1,108 @@
import argparse
from pathlib import Path
from huggingface_hub import HfApi, upload_large_folder
def main():
parser = argparse.ArgumentParser(
description="Upload a folder to Hugging Face Hub using upload_large_folder"
)
parser.add_argument(
"--folder-path",
type=str,
required=False,
help="Path to the folder to upload (used if task-id is not provided)",
)
parser.add_argument(
"--repo-id",
type=str,
required=False,
help="Repository ID on Hugging Face Hub (e.g., 'username/repo-name'). If task-id is provided, will be constructed as '{hf-user}/behavior1k-task{task_id:04d}'",
)
parser.add_argument(
"--task-id",
type=int,
required=False,
help="Task index to upload (e.g., 0, 1, 2, ...). When provided, folder-path is constructed from root-path.",
)
parser.add_argument(
"--root-path",
type=str,
required=False,
help="Root path containing task folders (e.g., /fsx/user/behavior1k-v3). Used with --task-id to construct folder path.",
)
parser.add_argument(
"--hf-user",
type=str,
default=None,
help="Hugging Face username for constructing repo-id with task-id (default: from HF_USER env var or 'fracapuano')",
)
parser.add_argument(
"--create-repo", action="store_true", help="Create the repository if it doesn't exist"
)
parser.add_argument(
"--num-workers",
type=int,
default=2,
help="Number of parallel workers for upload (default: 2). For I/O-bound uploads, use 1-4 to avoid network contention.",
)
args = parser.parse_args()
# Construct folder path and repo ID based on task-id or use provided values
if args.task_id is not None:
if not args.root_path:
raise ValueError("--root-path is required when --task-id is provided")
task_folder_name = f"behavior1k-task{args.task_id:04d}"
folder_path = Path(args.root_path) / task_folder_name
repo_id = f"{args.hf_user}/{task_folder_name}"
print(f"Task mode: uploading task {args.task_id}")
else:
if not args.folder_path or not args.repo_id:
raise ValueError(
"Either --task-id with --root-path, or both --folder-path and --repo-id must be provided"
)
folder_path = Path(args.folder_path)
repo_id = args.repo_id
# Validate folder path
if not folder_path.exists():
raise ValueError(f"Folder path does not exist: {folder_path}")
if not folder_path.is_dir():
raise ValueError(f"Path is not a directory: {folder_path}")
print(f"Uploading folder: {folder_path}")
print(f"Repository: {repo_id}")
# Create repository if requested
if args.create_repo:
api = HfApi()
print(f"Creating repository {repo_id}...")
try:
api.create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True)
print("Repository created or already exists. Updating its contents")
except Exception as e:
print(f"Warning: Could not create repository: {e}")
# Upload the folder
print(f"Starting upload with {args.num_workers} parallel workers...")
try:
result = upload_large_folder(
folder_path=str(folder_path),
repo_id=repo_id,
repo_type="dataset",
num_workers=args.num_workers,
)
print("✓ Upload completed successfully!")
print(f"Commit URL: {result}")
except Exception as e:
print(f"✗ Upload failed: {e}")
raise
if __name__ == "__main__":
main()