Compare commits

...

6 Commits

Author SHA1 Message Date
Pepijn 9c74cbe599 push to specific repo 2025-12-02 18:35:16 +01:00
Pepijn fa3919a0ff add push to hub 2025-12-02 18:30:11 +01:00
Pepijn e38346316b add aggregate 2025-12-02 18:27:50 +01:00
Pepijn 2a2b648891 fix use local dir 2025-12-02 18:11:20 +01:00
Pepijn cf36f4b873 add localdir 2025-12-02 17:26:44 +01:00
Pepijn e1ae51b02a Add conversion script 2025-12-02 16:51:36 +01:00
3 changed files with 817 additions and 0 deletions
+245
View File
@@ -0,0 +1,245 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Aggregate EgoDex shards into a single dataset.
After distributed processing creates multiple shards, this script combines
them into a single unified dataset.
Reference: https://arxiv.org/abs/2505.11709, https://github.com/apple/ml-egodex
"""
import argparse
import logging
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
class AggregateEgoDexDatasets(PipelineStep):
"""Datatrove pipeline step for aggregating EgoDex shards."""
def __init__(
self,
repo_ids: list[str],
aggregated_repo_id: str,
local_dir: Path | str | None = None,
push_to_hub: bool = False,
hf_repo_id: str | None = None,
):
super().__init__()
self.repo_ids = repo_ids
self.aggr_repo_id = aggregated_repo_id
self.local_dir = Path(local_dir) if local_dir else None
self.push_to_hub = push_to_hub
self.hf_repo_id = hf_repo_id if hf_repo_id else aggregated_repo_id
def run(self, data=None, rank: int = 0, world_size: int = 1):
import logging
from lerobot.datasets.aggregate import aggregate_datasets
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.utils import init_logging
init_logging()
# Only worker 0 performs aggregation (aggregate_datasets handles parallelism internally)
if rank == 0:
logging.info(f"Starting aggregation of {len(self.repo_ids)} shards into {self.aggr_repo_id}")
# Build roots list if local_dir is specified
roots = None
if self.local_dir:
roots = [self.local_dir / repo_id for repo_id in self.repo_ids]
# Filter to only existing directories
existing_roots = [r for r in roots if r.exists()]
if len(existing_roots) != len(self.repo_ids):
logging.warning(
f"Only {len(existing_roots)} of {len(self.repo_ids)} shard directories found. "
"Missing shards will be skipped."
)
# Update repo_ids to match existing roots
existing_repo_ids = [
repo_id for repo_id, r in zip(self.repo_ids, roots, strict=False) if r.exists()
]
roots = existing_roots
self.repo_ids = existing_repo_ids
if len(self.repo_ids) == 0:
logging.error("No shard directories found. Nothing to aggregate.")
return
aggr_root = self.local_dir / self.aggr_repo_id if self.local_dir else None
aggregate_datasets(
repo_ids=self.repo_ids,
aggr_repo_id=self.aggr_repo_id,
roots=roots,
aggr_root=aggr_root,
)
logging.info("Aggregation complete!")
# Push to Hugging Face Hub if requested
if self.push_to_hub:
logging.info(f"Pushing to Hugging Face Hub as {self.hf_repo_id}...")
dataset = LeRobotDataset(
repo_id=self.aggr_repo_id,
root=aggr_root,
)
# Update repo_id for pushing to different HF account if specified
dataset.repo_id = self.hf_repo_id
dataset.push_to_hub(
tags=["egodex", "hand", "dexterous", "lerobot"],
license="cc-by-nc-nd-4.0",
)
logging.info("Push to hub complete!")
else:
logging.info(f"Worker {rank} skipping - only worker 0 performs aggregation")
def make_aggregate_executor(
repo_id,
num_shards,
job_name,
logs_dir,
partition,
cpus_per_task,
mem_per_cpu,
local_dir,
push_to_hub,
hf_repo_id,
slurm=True,
):
"""Create executor for aggregating EgoDex shards."""
# Generate repo IDs for all shards
repo_ids = [f"{repo_id}_world_{num_shards}_rank_{rank}" for rank in range(num_shards)]
kwargs = {
"pipeline": [
AggregateEgoDexDatasets(repo_ids, repo_id, local_dir, push_to_hub, hf_repo_id),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": 1, # Only need 1 task for aggregation
"workers": 1, # Only need 1 worker
"time": "24:00:00", # 24 hours for aggregation
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": 1,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser(
description="Aggregate EgoDex dataset shards into a single unified dataset."
)
parser.add_argument(
"--repo-id",
type=str,
required=True,
help="Repository identifier (base name without shard suffix, e.g., pepijn/egodex-test)",
)
parser.add_argument(
"--num-shards",
type=int,
required=True,
help="Number of shards to aggregate (must match --workers from slurm_port_egodex.py)",
)
parser.add_argument(
"--logs-dir",
type=Path,
default=Path("logs"),
help="Path to logs directory for datatrove",
)
parser.add_argument(
"--job-name",
type=str,
default="aggr_egodex",
help="Job name used in SLURM",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over SLURM. Use --slurm 0 to launch locally (for debugging)",
)
parser.add_argument(
"--partition",
type=str,
help="SLURM partition (ideally CPU partition)",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=16,
help="Number of CPUs for aggregation task",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="8G",
help="Memory per CPU for aggregation",
)
parser.add_argument(
"--local-dir",
type=Path,
default=None,
help="Local directory where shards are stored. If not specified, uses default HF cache.",
)
parser.add_argument(
"--push-to-hub",
action="store_true",
help="Push aggregated dataset to Hugging Face Hub after aggregation.",
)
parser.add_argument(
"--hf-repo-id",
type=str,
default=None,
help="Hugging Face repo ID for upload (e.g., username/dataset-name). Defaults to --repo-id.",
)
args = parser.parse_args()
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
aggregate_executor = make_aggregate_executor(**kwargs)
aggregate_executor.run()
if __name__ == "__main__":
main()
+129
View File
@@ -0,0 +1,129 @@
#!/bin/bash
# Download EgoDex dataset
# Reference: https://arxiv.org/abs/2505.11709, https://github.com/apple/ml-egodex
#
# Usage: ./download_egodex.sh [output_dir] [parts...]
#
# Examples:
# ./download_egodex.sh ./data test # Download test set only (16 GB)
# ./download_egodex.sh ./data part1 part2 # Download training parts 1 and 2
# ./download_egodex.sh ./data all # Download everything (~1.7 TB)
#
# Available parts:
# test - Test set (16 GB)
# part1 - Training set part 1 (300 GB)
# part2 - Training set part 2 (300 GB)
# part3 - Training set part 3 (300 GB)
# part4 - Training set part 4 (300 GB)
# part5 - Training set part 5 (300 GB)
# extra - Additional data (200 GB)
# all - Download all parts (~1.7 TB total)
set -e
BASE_URL="https://ml-site.cdn-apple.com/datasets/egodex"
# Map part names to filenames
declare -A PART_FILES=(
["test"]="test.zip"
["part1"]="part1.zip"
["part2"]="part2.zip"
["part3"]="part3.zip"
["part4"]="part4.zip"
["part5"]="part5.zip"
["extra"]="extra.zip"
)
ALL_PARTS=("test" "part1" "part2" "part3" "part4" "part5" "extra")
usage() {
echo "Usage: $0 <output_dir> <parts...>"
echo ""
echo "Examples:"
echo " $0 ./data test # Download test set only (16 GB)"
echo " $0 ./data part1 part2 # Download training parts 1 and 2"
echo " $0 ./data all # Download everything (~1.7 TB)"
echo ""
echo "Available parts: test, part1, part2, part3, part4, part5, extra, all"
exit 1
}
download_part() {
local output_dir="$1"
local part="$2"
local filename="${PART_FILES[$part]}"
local url="${BASE_URL}/${filename}"
local output_file="${output_dir}/${filename}"
echo "----------------------------------------"
echo "Downloading: ${part} (${filename})"
echo "URL: ${url}"
echo "Output: ${output_file}"
echo "----------------------------------------"
# Download with curl, showing progress
curl -L --progress-bar "${url}" -o "${output_file}"
# Unzip
echo "Extracting ${filename}..."
unzip -q "${output_file}" -d "${output_dir}"
# Optionally remove zip file to save space
# Uncomment the next line if you want to delete zips after extraction
# rm "${output_file}"
echo "Done: ${part}"
echo ""
}
# Check arguments
if [ $# -lt 2 ]; then
usage
fi
OUTPUT_DIR="$1"
shift
# Create output directory
mkdir -p "${OUTPUT_DIR}"
# Determine which parts to download
PARTS_TO_DOWNLOAD=()
for arg in "$@"; do
if [ "$arg" == "all" ]; then
PARTS_TO_DOWNLOAD=("${ALL_PARTS[@]}")
break
elif [ -n "${PART_FILES[$arg]}" ]; then
PARTS_TO_DOWNLOAD+=("$arg")
else
echo "Error: Unknown part '${arg}'"
echo "Available parts: test, part1, part2, part3, part4, part5, extra, all"
exit 1
fi
done
if [ ${#PARTS_TO_DOWNLOAD[@]} -eq 0 ]; then
echo "Error: No valid parts specified"
usage
fi
echo "========================================"
echo "EgoDex Dataset Download"
echo "========================================"
echo "Output directory: ${OUTPUT_DIR}"
echo "Parts to download: ${PARTS_TO_DOWNLOAD[*]}"
echo "========================================"
echo ""
# Download each part
for part in "${PARTS_TO_DOWNLOAD[@]}"; do
download_part "${OUTPUT_DIR}" "${part}"
done
echo "========================================"
echo "Download complete!"
echo "Data saved to: ${OUTPUT_DIR}"
echo "========================================"
+443
View File
@@ -0,0 +1,443 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Distributed EgoDex dataset porting using SLURM and datatrove.
EgoDex is a large-scale dataset for egocentric dexterous manipulation collected
with ARKit on Apple Vision Pro. This script converts EgoDex data to LeRobot format.
Reference: https://arxiv.org/abs/2505.11709, https://github.com/apple/ml-egodex
"""
import argparse
from pathlib import Path
import cv2
import h5py
import mediapy as mpy
import numpy as np
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from lerobot.datasets.lerobot_dataset import LeRobotDataset
# Image dimensions
DEFAULT_IMAGE_HEIGHT = 1080
DEFAULT_IMAGE_WIDTH = 1920
class PortEgoDexShards(PipelineStep):
def __init__(
self,
raw_dir: Path | str,
repo_id: str,
local_dir: Path | str = None,
percentage: float = 100.0,
):
super().__init__()
self.raw_dir = Path(raw_dir)
self.repo_id = repo_id
self.local_dir = Path(local_dir) if local_dir else Path("data/local_datasets")
self.percentage = percentage
def run(self, data=None, rank: int = 0, world_size: int = 1):
from pathlib import Path
import cv2
import h5py
import mediapy as mpy
import numpy as np
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.utils import init_logging
def _get_state_for_single_frame(transforms_group, frame_idx):
"""
Construct 48D hand state representation from EgoDex.
State vector composition (per hand = 24D, total = 48D):
- Wrist 3D position (3)
- Wrist orientation in 6D representation (6)
- 5 fingertip 3D positions (15)
"""
state_vector = []
fingertip_joints = {
"left": [
"leftThumbTip",
"leftIndexFingerTip",
"leftMiddleFingerTip",
"leftRingFingerTip",
"leftLittleFingerTip",
],
"right": [
"rightThumbTip",
"rightIndexFingerTip",
"rightMiddleFingerTip",
"rightRingFingerTip",
"rightLittleFingerTip",
],
}
for hand_side in ["left", "right"]:
hand_key = f"{hand_side}Hand"
hand_transform = transforms_group[hand_key][frame_idx]
# 1. Wrist 3D position
hand_position = hand_transform[:3, 3]
state_vector.extend(hand_position)
# 2. Wrist orientation in compact 6D representation
rotation_matrix = hand_transform[:3, :3]
rotation_6d = np.concatenate([rotation_matrix[:, 0], rotation_matrix[:, 1]])
state_vector.extend(rotation_6d)
# 3. 3D positions of 5 fingertips
for fingertip in fingertip_joints[hand_side]:
fingertip_transform = transforms_group[fingertip][frame_idx]
fingertip_pos = fingertip_transform[:3, 3]
state_vector.extend(fingertip_pos)
# Also return camera extrinsics for optional coordinate frame transformations
return np.array(state_vector, dtype=np.float32), transforms_group["camera"][frame_idx]
def get_state_and_action_from_egodex_annotations(demo):
"""
Convert EgoDex demo annotations into states and actions.
The "action" is the state at time t+1 (next-pose prediction).
"""
transforms_group = demo["transforms"]
total_frames = list(transforms_group.values())[0].shape[0]
states_list, extrinsics_list = [], []
for frame_idx in range(total_frames):
state_vector, extrinsics = _get_state_for_single_frame(transforms_group, frame_idx)
states_list.append(state_vector)
extrinsics_list.append(extrinsics.flatten()) # Flatten 4x4 to 16D
state = np.array(states_list, dtype=np.float32)
extrinsics = np.array(extrinsics_list, dtype=np.float32)
# Shift by 1 timestep to convert state to action
action = np.roll(state, -1, axis=0)
return state, action, extrinsics
def process_demo(hdf5_file_path, video_path):
"""Process a single EgoDex demo and return frames for LeRobot."""
video = mpy.read_video(str(video_path))
video = np.asarray(video)
num_frames = video.shape[0]
frames = []
with h5py.File(hdf5_file_path, "r") as demo:
state, action, extrinsics = get_state_and_action_from_egodex_annotations(demo)
# Get natural language task description
if demo.attrs.get("llm_type") == "reversible":
direction = demo.attrs.get("which_llm_description", "1")
lang_instruction = demo.attrs.get(
"llm_description" if direction == "1" else "llm_description2",
"manipulation task",
)
else:
lang_instruction = demo.attrs.get("llm_description", "manipulation task")
for step_idx in range(num_frames):
# Resize image to default dimensions
image_resized = cv2.resize(
video[step_idx],
(DEFAULT_IMAGE_WIDTH, DEFAULT_IMAGE_HEIGHT),
interpolation=cv2.INTER_AREA,
)
frame = {
"task": lang_instruction,
"observation.image": image_resized,
"observation.state": state[step_idx],
"observation.extrinsics": extrinsics[step_idx],
"action": action[step_idx],
}
frames.append(frame)
return frames
init_logging()
# Define EgoDex features
EGODEX_FEATURES = {
"observation.image": {
"dtype": "video",
"shape": (DEFAULT_IMAGE_HEIGHT, DEFAULT_IMAGE_WIDTH, 3),
"names": ["height", "width", "rgb"],
},
"observation.state": {
"dtype": "float32",
"shape": (48,),
"names": [
# Left hand wrist position (3)
"left_wrist_x",
"left_wrist_y",
"left_wrist_z",
# Left hand wrist rotation 6D (6)
"left_rot_0",
"left_rot_1",
"left_rot_2",
"left_rot_3",
"left_rot_4",
"left_rot_5",
# Left fingertips (15)
"left_thumb_x",
"left_thumb_y",
"left_thumb_z",
"left_index_x",
"left_index_y",
"left_index_z",
"left_middle_x",
"left_middle_y",
"left_middle_z",
"left_ring_x",
"left_ring_y",
"left_ring_z",
"left_little_x",
"left_little_y",
"left_little_z",
# Right hand wrist position (3)
"right_wrist_x",
"right_wrist_y",
"right_wrist_z",
# Right hand wrist rotation 6D (6)
"right_rot_0",
"right_rot_1",
"right_rot_2",
"right_rot_3",
"right_rot_4",
"right_rot_5",
# Right fingertips (15)
"right_thumb_x",
"right_thumb_y",
"right_thumb_z",
"right_index_x",
"right_index_y",
"right_index_z",
"right_middle_x",
"right_middle_y",
"right_middle_z",
"right_ring_x",
"right_ring_y",
"right_ring_z",
"right_little_x",
"right_little_y",
"right_little_z",
],
},
"observation.extrinsics": {
"dtype": "float32",
"shape": (16,),
"names": [f"extrinsic_{i}" for i in range(16)],
},
"action": {
"dtype": "float32",
"shape": (48,),
"names": [f"action_{i}" for i in range(48)],
},
}
# 1. Discover all HDF5 files
files = sorted(list(self.raw_dir.rglob("*.hdf5")))
if not files:
print(f"No HDF5 files found in {self.raw_dir}")
return
# 2. Apply percentage filter
if self.percentage < 100:
num_files = max(1, int(len(files) * self.percentage / 100))
files = files[:num_files]
print(f"Processing {self.percentage}% of dataset: {num_files} files")
# 3. Assign files to this worker
my_files = files[rank::world_size]
if not my_files:
print(f"Rank {rank} has no files to process.")
return
print(f"Rank {rank} processing {len(my_files)} files out of {len(files)} total.")
# 4. Create a LeRobot dataset for this shard
shard_repo_id = f"{self.repo_id}_world_{world_size}_rank_{rank}"
shard_root = self.local_dir / shard_repo_id if self.local_dir else None
dataset = LeRobotDataset.create(
repo_id=shard_repo_id,
fps=30,
robot_type="hand",
features=EGODEX_FEATURES,
root=shard_root,
)
# 5. Process each file
for input_h5 in my_files:
try:
# Derive corresponding video path
video_path = input_h5.with_suffix(".mp4")
if not video_path.exists():
print(f"Warning: Video file not found for {input_h5}, skipping.")
continue
# Process demo and add frames
frames = process_demo(input_h5, video_path)
for frame in frames:
dataset.add_frame(frame)
dataset.save_episode()
# Clean up to avoid OOM
del frames
except Exception as e:
print(f"Error processing {input_h5}: {e}")
continue
# 6. Finalize the dataset
dataset.finalize()
def make_port_executor(
raw_dir,
repo_id,
job_name,
logs_dir,
workers,
partition,
cpus_per_task,
mem_per_cpu,
local_dir,
percentage,
slurm=True,
):
kwargs = {
"pipeline": [
PortEgoDexShards(raw_dir, repo_id, local_dir, percentage),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": workers,
"workers": workers,
"time": "10:00:00", # EgoDex is large, allow more time
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": workers,
"workers": 1, # Run locally sequentially for debugging
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser(
description="Convert EgoDex dataset to LeRobot format using SLURM."
)
parser.add_argument(
"--raw-dir",
type=Path,
required=True,
help="Directory containing input EgoDex data (HDF5 + MP4 files).",
)
parser.add_argument(
"--repo-id",
type=str,
required=True,
help="Repository identifier (e.g., user/egodex-lerobot).",
)
parser.add_argument(
"--logs-dir",
type=Path,
default=Path("logs"),
help="Path to logs directory.",
)
parser.add_argument(
"--job-name",
type=str,
default="port_egodex",
help="Job name used in SLURM.",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over SLURM. Use --slurm 0 to launch sequentially (useful for debugging).",
)
parser.add_argument(
"--workers",
type=int,
default=50,
help="Number of SLURM workers.",
)
parser.add_argument(
"--partition",
type=str,
help="SLURM partition.",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=4,
help="Number of CPUs per worker.",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="4G",
help="Memory per CPU.",
)
parser.add_argument(
"--percentage",
type=float,
default=100.0,
help="Percentage of dataset to process (e.g., 1.0 for 1%%). Useful for testing.",
)
parser.add_argument(
"--local-dir",
type=Path,
default=None,
help="Local directory to save the LeRobot dataset. Defaults to data/local_datasets.",
)
args = parser.parse_args()
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
port_executor = make_port_executor(**kwargs)
port_executor.run()
if __name__ == "__main__":
main()