add missing files for porting agibot

This commit is contained in:
Michel Aractingi
2025-06-30 15:16:53 +02:00
parent 8ad085d882
commit e9e16d77f5
4 changed files with 915 additions and 0 deletions
@@ -0,0 +1,184 @@
import json
import logging
from pathlib import Path
import shutil
from huggingface_hub import snapshot_download
import tarfile
import tqdm
from examples.port_datasets.agibot_hdf5.port_agibot import port_agibot
from lerobot.common.constants import HF_LEROBOT_HOME
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
from lerobot.common.utils.utils import init_logging
from huggingface_hub import HfApi, HfFileSystem
RAW_REPO_ID = "agibot-world/AgiBotWorld-Alpha"
def download(raw_dir, allow_patterns=None, ignore_patterns=None):
snapshot_download(
RAW_REPO_ID,
repo_type="dataset",
local_dir=str(raw_dir),
allow_patterns=allow_patterns,
ignore_patterns=ignore_patterns,
)
def download_proprio_stats(raw_dir):
proprio_stats_dir = raw_dir / "proprio_stats"
if proprio_stats_dir.exists():
logging.info("Skipping download proprio stats")
return
download(raw_dir, allow_patterns="proprio_stats/*.tar")
for path in proprio_stats_dir.glob("*.tar"):
logging.info(f"Untar-ing {path}...")
with tarfile.open(path, 'r') as tar:
tar.extractall(path=proprio_stats_dir)
logging.info(f"Deleting {path}...")
path.unlink()
def download_parameters(raw_dir):
params_dir = raw_dir / "parameters"
if params_dir.exists():
logging.info("Skipping download parameters")
return
download(raw_dir, allow_patterns="parameters/*.tar")
for path in params_dir.glob("*.tar"):
logging.info(f"Untar-ing {path}...")
with tarfile.open(path, 'r') as tar:
tar.extractall(path=params_dir)
logging.info(f"Deleting {path}...")
path.unlink()
def get_observations_files(raw_dir, raw_repo_id):
files_json_path = raw_dir / "observations_files.json"
sizes_json_path = raw_dir / "observations_sizes.json"
if files_json_path.exists() and sizes_json_path.exists():
with open(files_json_path) as f:
files = json.load(f)
with open(sizes_json_path) as f:
sizes = json.load(f)
return files, sizes
api = HfApi()
files = api.list_repo_files(repo_id=raw_repo_id, repo_type="dataset")
files = [file for file in files if "observations/" in file]
fs = HfFileSystem()
sizes = []
for file in tqdm.tqdm(files, desc="Downloading file sizes"):
file_info = fs.info(f"datasets/{raw_repo_id}/{file}")
size = file_info["size"] / 1000**3
sizes.append(size)
# Sort ASC to start with smaller size files
sizes, files = zip(*sorted(zip(sizes, files)))
with open(files_json_path, "w") as f:
json.dump(files, f)
with open(sizes_json_path, "w") as f:
json.dump(sizes, f)
return files, sizes
def display_observations_sizes(files, sizes):
size_per_task = {}
for i, (file, size) in enumerate(zip(files, sizes)):
logging.info(f"{i}/{len(files)}: {file} {size:.2f}GB")
task = int(file.split('/')[1])
if task not in size_per_task:
size_per_task[task] = 0
size_per_task[task] += size
for task, size in size_per_task.items():
logging.info(f"{task} {size:.2f}GB")
total_size = sum(list(size_per_task.values()))
logging.info(f"Total size: {total_size:.2f}GB")
def download_meta_data(raw_dir):
# Download task data
download(raw_dir, allow_patterns="task_info/task_*.json")
# Download all camera parameters ~170 GB
download_parameters(raw_dir)
# Download all proprio stats ~26 GB
download_proprio_stats(raw_dir)
def no_depth(tarinfo, path):
""" Utility to not untar depth data"""
if "depth" in tarinfo.name:
return None
return tarinfo
def main():
init_logging()
repo_id = "cadene/agibot_alpha_v30"
raw_dir = Path("/fsx/remi_cadene/data/AgiBotWorld-Alpha")
download_meta_data(raw_dir)
# Get list of tar files containing observation data (containing several episodes each)
obs_files, obs_sizes = get_observations_files(raw_dir, RAW_REPO_ID)
display_observations_sizes(obs_files, obs_sizes)
shard_indices = range(len(obs_files))
num_shards = len(obs_files)
# TOOD: remove
obs_files = obs_files[:2]
shard_indices = [0,1]
# Iterate on each subset of episodes
for shard_index, obs_file in zip(shard_indices, obs_files):
shard_repo_id = f"{repo_id}_world_{num_shards}_rank_{shard_index}"
dataset_dir = HF_LEROBOT_HOME / shard_repo_id
if dataset_dir.exists():
shutil.rmtree(dataset_dir)
# Download subset
download(raw_dir, allow_patterns=obs_file)
tar_path = raw_dir / obs_file
with tarfile.open(tar_path, 'r') as tar:
extracted_files = tar.getnames()
task_index = int(tar_path.parent.name)
episode_names = [int(p) for p in extracted_files if '/' not in p]
# Untar if needed
if not all([(tar_path.parent / f"{ep_name}").exists() for ep_name in episode_names]):
logging.info(f"Untar-ing {tar_path}...")
with tarfile.open(tar_path, 'r') as tar:
tar.extractall(path=tar_path.parent, filter=no_depth)
port_agibot(raw_dir, shard_repo_id, task_index, episode_names, push_to_hub=False)
for ep_name in episode_names:
shutil.rmtree(tar_path.parent / f"{ep_name}")
tar_path.unlink()
# dataset = LeRobotDataset(shard_repo_id, root=dataset_dir)
# lol=1
if __name__ == "__main__":
main()
@@ -0,0 +1,481 @@
import json
import logging
from pathlib import Path
import shutil
import time
import numpy as np
import h5py
import pandas as pd
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
from lerobot.common.datasets.utils import DEFAULT_CHUNK_SIZE, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_VIDEO_PATH, EPISODES_DIR, concat_video_files, get_video_duration_in_s, get_video_size_in_mb, update_chunk_file_indices, write_info
from lerobot.common.utils.utils import get_elapsed_time_in_days_hours_minutes_seconds
AGIBOT_FPS = 30
AGIBOT_ROBOT_TYPE = "AgiBot_A2D"
AGIBOT_FEATURES = {
# gripper open range in mm (0 for pull open, 1 for full close)
"observation.state.effector.position": {
"dtype": "float32",
"shape": (2,),
"names": {
"axes": ["left_gripper", "right_gripper"],
},
},
# flange xyz in meters
"observation.state.end.position": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["left_x", "left_y", "left_z", "right_x", "right_y", "right_z"],
},
},
# flange quaternion with xyzw
"observation.state.end.orientation": {
"dtype": "float32",
"shape": (8,),
"names": {
"axes": ["left_x", "left_y", "left_z", "left_w", "right_x", "right_y", "right_z", "right_w"],
},
},
# in radians
"observation.state.head.position": {
"dtype": "float32",
"shape": (2,),
"names": {
"axes": ["yaw", "pitch"],
},
},
# in motor steps
"observation.state.joint.current_value": {
"dtype": "float32",
"shape": (14,),
"names": {
"axes": [f"left_joint_{i}" for i in range(7)] + [f"right_joint_{i}" for i in range(7)],
},
},
# same as current_value but in radians
"observation.state.joint.position": {
"dtype": "float32",
"shape": (14,),
"names": {
"axes": [f"left_joint_{i}" for i in range(7)] + [f"right_joint_{i}" for i in range(7)],
},
},
# pitch in radians, lift in meters
"observation.state.waist.position": {
"dtype": "float32",
"shape": (2,),
"names": {
"axes": ["pitch", "lift"],
},
},
# concatenation of head.position, joint.position, effector.position, waist.position
"observation.state": {
"dtype": "float32",
"shape": (20,),
"names": {
"axes": ["head_yaw", "head_pitch"] + \
[f"left_joint_{i}" for i in range(7)] + \
["left_gripper"] + \
[f"right_joint_{i}" for i in range(7)] + \
["right_gripper"] + \
["waist_pitch", "waist_lift"],
},
},
# gripper open range in mm (0 for pull open, 1 for full close)
"action.effector.position": {
"dtype": "float32",
"shape": (2,),
"names": {
"axes": ["left_gripper", "right_gripper"],
},
},
# flange xyz in meters
"action.end.position": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["left_x", "left_y", "left_z", "right_x", "right_y", "right_z"],
},
},
# flange quaternion with xyzw
"action.end.orientation": {
"dtype": "float32",
"shape": (8,),
"names": {
"axes": ["left_x", "left_y", "left_z", "left_w", "right_x", "right_y", "right_z", "right_w"],
},
},
# in radians
"action.head.position": {
"dtype": "float32",
"shape": (2,),
"names": {
"axes": ["yaw", "pitch"],
},
},
# goal joint position in radians
"action.joint.position": {
"dtype": "float32",
"shape": (14,),
"names": {
"axes": [f"left_joint_{i}" for i in range(7)] + [f"right_joint_{i}" for i in range(7)],
},
},
"action.robot.velocity": {
"dtype": "float32",
"shape": (2,),
"names": {
"axes": ["velocity_x", "yaw_rate"],
},
},
# pitch in radians, lift in meters
"action.waist.position": {
"dtype": "float32",
"shape": (2,),
"names": {
"axes": ["pitch", "lift"],
},
},
# concatenation of head.position, joint.position, effector.position, waist.position, robot.velocity
"action": {
"dtype": "float32",
"shape": (22,),
"names": {
"axes": ["head_yaw", "head_pitch"] + \
[f"left_joint_{i}" for i in range(7)] + \
["left_gripper"] + \
[f"right_joint_{i}" for i in range(7)] + \
["right_gripper"] + \
["waist_pitch", "waist_lift"] + \
["velocity_x", "yaw_rate"],
},
},
# episode level annotation
"init_scene_text": {
"dtype": "string",
"shape": (1,),
"names": None,
},
# frame level annotation
"action_text": {
"dtype": "string",
"shape": (1,),
"names": None,
},
# frame level annotation
"skill": {
"dtype": "string",
"shape": (1,),
"names": None,
},
}
AGIBOT_IMAGES_FEATURES = {
"observation.images.top_head": {
"dtype": "video",
"shape": (480, 640, 3),
"names": ["height", "width", "channel"],
},
"observation.images.hand_left": {
"dtype": "video",
"shape": (480, 640, 3),
"names": ["height", "width", "channel"],
},
"observation.images.hand_right": {
"dtype": "video",
"shape": (480, 640, 3),
"names": ["height", "width", "channel"],
},
"observation.images.head_center_fisheye": {
"dtype": "video",
"shape": (748, 960, 3),
"names": ["height", "width", "channel"],
},
"observation.images.head_left_fisheye": {
"dtype": "video",
"shape": (748, 960, 3),
"names": ["height", "width", "channel"],
},
"observation.images.head_right_fisheye": {
"dtype": "video",
"shape": (748, 960, 3),
"names": ["height", "width", "channel"],
},
"observation.images.back_left_fisheye": {
"dtype": "video",
"shape": (748, 960, 3),
"names": ["height", "width", "channel"],
},
"observation.images.back_right_fisheye": {
"dtype": "video",
"shape": (748, 960, 3),
"names": ["height", "width", "channel"],
},
}
def load_info_per_task(raw_dir):
info_per_task = {}
task_info_dir = raw_dir / "task_info"
for path in task_info_dir.glob("task_*.json"):
task_index = int(path.name.replace("task_","").replace(".json",""))
with open(path) as f:
task_info = json.load(f)
task_info = {ep["episode_id"]: ep for ep in task_info}
info_per_task[task_index] = task_info
return info_per_task
def create_frame_idx_to_frames_label_idx(ep_info):
frame_idx_to_frames_label_idx = {}
for label_idx, frames_label in enumerate(ep_info["label_info"]["action_config"]):
for frame_idx in range(frames_label["start_frame"], frames_label["end_frame"]):
frame_idx_to_frames_label_idx[frame_idx] = label_idx
return frame_idx_to_frames_label_idx
def generate_lerobot_frames(raw_dir: Path, task_index: int, episode_index: int):
""" /!\ The frames dont contain observation.cameras.*
"""
info_per_task = load_info_per_task(raw_dir)
ep_info = info_per_task[task_index][episode_index]
frame_idx_to_frames_label_idx = create_frame_idx_to_frames_label_idx(ep_info)
# Empty features are commented out.
keys_mapping = {
# STATE
# "observation.state.effector.force": "state/effector/force",
"observation.state.effector.position": "state/effector/position",
# "observation.state.end.angular": "state/end/angular",
"observation.state.end.position": "state/end/position",
"observation.state.end.orientation": "state/end/orientation",
# "observation.state.end.velocity": "state/end/velocity",
# "observation.state.end.wrench": "state/end/wrench",
# "observation.state.head.effort": "state/head/effort",
"observation.state.head.position": "state/head/position",
# "observation.state.head.velocity": "state/head/velocity",
"observation.state.joint.current_value": "state/joint/current_value",
# "observation.state.joint.effort": "state/joint/effort",
"observation.state.joint.position": "state/joint/position",
# "observation.state.joint.velocity": "state/joint/velocity",
# "observation.state.robot.orientation": "state/robot/orientation",
# "observation.state.robot.orientation_drift": "state/robot/orientation_drift",
# "observation.state.robot.position": "state/robot/position",
# "observation.state.robot.position_drift": "state/robot/position_drift",
# "observation.state.waist.effort": "state/waist/effort",
"observation.state.waist.position": "state/waist/position",
# "observation.state.waist.velocity": "state/waist/velocity",
# ----- ACTION (index are also commented out) -----
# "action.effector.index": "action/effector/index",
"action.effector.position": "action/effector/position",
# "action.effector.force": "action/effector/force",
# "action.end.index": "action/end/index",
"action.end.position": "action/end/position",
"action.end.orientation": "action/end/orientation",
# "action.head.index": "action/head/index",
"action.head.position": "action/head/position",
# "action.joint.index": "action/joint/index",
"action.joint.position": "action/joint/position",
# "action.joint.effort": "action/joint/effort",
# "action.joint.velocity": "action/joint/velocity",
# "action.robot.index": "action/robot/index",
# "action.robot.position": "action/robot/position",
# "action.robot.orientation": "action/robot/orientation",
# "action.robot.angular": "action/robot/angular",
"action.robot.velocity": "action/robot/velocity",
# "action.waist.index": "action/waist/index",
"action.waist.position": "action/waist/position",
}
h5_path = raw_dir / f"proprio_stats/{task_index}/{episode_index}/proprio_stats.h5"
with h5py.File(h5_path) as h5:
num_frames = len(h5["state/joint/position"])
for h5_key in keys_mapping.values():
col_num_frames = h5[h5_key].shape[0]
if col_num_frames != num_frames:
raise ValueError(f"HDF5 column '{h5_key}' is expected to have {num_frames} but has {col_num_frames}' frames instead.")
for i in range(num_frames):
# Create frame
f = {new_key: h5[h5_key][i] for new_key, h5_key in keys_mapping.items()}
for key in f:
f[key] = np.array(f[key]).astype(np.float32)
f["observation.state.end.position"] = f["observation.state.end.position"].reshape(6)
f["observation.state.end.orientation"] = f["observation.state.end.orientation"].reshape(8)
f["observation.state"] = np.concatenate([
f["observation.state.head.position"],
f["observation.state.joint.position"][:7], # left
f["observation.state.effector.position"][[0]], # left
f["observation.state.joint.position"][7:], # right
f["observation.state.effector.position"][[1]], # right
f["observation.state.waist.position"],
])
f["action.end.position"] = f["action.end.position"].reshape(6)
f["action.end.orientation"] = f["action.end.orientation"].reshape(8)
f["action"] = np.concatenate([
f["action.head.position"],
f["action.joint.position"][:7], # left
f["action.effector.position"][[0]], # left
f["action.joint.position"][7:], # right
f["action.effector.position"][[1]], # right
f["action.waist.position"],
f["action.robot.velocity"],
])
# episode level annotation
f["task"] = ep_info["task_name"]
f["init_scene_text"] = ep_info["init_scene_text"]
# frame level annotation
if i in frame_idx_to_frames_label_idx:
frames_label_idx = frame_idx_to_frames_label_idx[i]
frames_label = ep_info["label_info"]["action_config"][frames_label_idx]
f["action_text"] = frames_label["action_text"]
f["skill"] = frames_label["skill"]
else:
f["action_text"] = ""
f["skill"] = ""
yield f
def update_meta_data(
df,
ep_to_meta,
):
def _update(row):
ep_idx = row["episode_index"]
for key, meta in ep_to_meta[ep_idx].items():
row[f"videos/{key}/chunk_index"] = meta["chunk_index"]
row[f"videos/{key}/file_index"] = meta["file_index"]
row[f"videos/{key}/from_timestamp"] = meta["from_timestamp"]
row[f"videos/{key}/to_timestamp"] = meta["to_timestamp"]
return row
return df.apply(_update, axis=1)
def move_videos_to_lerobot_directory(lerobot_dataset, raw_dir, task_index, episode_names):
keys_mapping = {
"observation.images.top_head": "head_color",
"observation.images.hand_left": "hand_left_color",
"observation.images.hand_right": "hand_right_color",
"observation.images.head_center_fisheye": "head_center_fisheye_color",
"observation.images.head_left_fisheye": "head_left_fisheye_color",
"observation.images.head_right_fisheye": "head_right_fisheye_color",
"observation.images.back_left_fisheye": "back_left_fisheye_color",
"observation.images.back_right_fisheye": "back_right_fisheye_color",
}
# sanity check
for key in keys_mapping:
if key not in lerobot_dataset.meta.info["features"]:
raise ValueError(f"Key '{key}' not found in features.")
video_keys = keys_mapping.keys()
chunk_idx = dict.fromkeys(video_keys, 0)
file_idx = dict.fromkeys(video_keys, 0)
latest_duration_in_s = dict.fromkeys(video_keys, 0)
ep_to_meta = {}
for ep_idx, ep_name in enumerate(episode_names):
for key in video_keys:
raw_videos_dir = raw_dir / f"observations/{task_index}/{ep_name}/videos"
old_key = keys_mapping[key]
ep_path = raw_videos_dir / f"{old_key}.mp4"
ep_duration_in_s = get_video_duration_in_s(ep_path)
aggr_path = lerobot_dataset.root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=chunk_idx[key],
file_index=file_idx[key],
)
if not aggr_path.exists():
# First video
aggr_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(ep_path), str(aggr_path))
else:
size_in_mb = get_video_size_in_mb(ep_path)
aggr_size_in_mb = get_video_size_in_mb(aggr_path)
if aggr_size_in_mb + size_in_mb >= DEFAULT_VIDEO_FILE_SIZE_IN_MB:
# Size limit is reached, prepare new parquet file
chunk_idx[key], file_idx[key] = update_chunk_file_indices(
chunk_idx[key], file_idx[key], DEFAULT_CHUNK_SIZE
)
aggr_path = lerobot_dataset.root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=chunk_idx[key],
file_index=file_idx[key],
)
aggr_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(ep_path), str(aggr_path))
latest_duration_in_s[key] = 0
else:
# Update the existing parquet file with new rows
concat_video_files(
[aggr_path, ep_path],
lerobot_dataset.root,
key,
chunk_idx[key],
file_idx[key],
)
if ep_idx not in ep_to_meta:
ep_to_meta[ep_idx] = {}
ep_to_meta[ep_idx][key] = {
"chunk_index": chunk_idx[key],
"file_index": file_idx[key],
"from_timestamp": latest_duration_in_s[key],
"to_timestamp": latest_duration_in_s[key] + ep_duration_in_s,
}
latest_duration_in_s[key] += ep_duration_in_s
# Update episodes meta data
for meta_path in (lerobot_dataset.root / EPISODES_DIR).glob("chunk-*/file-*.parquet"):
df = pd.read_parquet(meta_path)
df = update_meta_data(df, ep_to_meta)
df.to_parquet(meta_path)
def port_agibot(raw_dir: Path, repo_id: str, task_index: int, episode_indices: list[int], push_to_hub: bool = False):
lerobot_dataset = LeRobotDataset.create(
repo_id=repo_id,
robot_type=AGIBOT_ROBOT_TYPE,
fps=AGIBOT_FPS,
features=AGIBOT_FEATURES,
)
start_time = time.time()
num_episodes = len(episode_indices)
logging.info(f"Number of episodes {num_episodes}")
for i, episode_index in enumerate(episode_indices):
elapsed_time = time.time() - start_time
d, h, m, s = get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time)
logging.info(f"{i} / {num_episodes} episodes processed (after {d} days, {h} hours, {m} minutes, {s:.3f} seconds)")
for frame in generate_lerobot_frames(raw_dir, task_index, episode_index):
lerobot_dataset.add_frame(frame)
lerobot_dataset.save_episode()
logging.info("Save_episode")
# Videos have already been encoded with the proper format, so we rely on hacks
# HACK: Add extra images features
lerobot_dataset.meta.info["features"].update(AGIBOT_IMAGES_FEATURES)
write_info(lerobot_dataset.meta.info, lerobot_dataset.meta.root)
move_videos_to_lerobot_directory(lerobot_dataset, raw_dir, task_index, episode_indices)
if push_to_hub:
lerobot_dataset.push_to_hub(
# Add agibot tag, since it belongs to the agibot collection of datasets
tags=["agibot"],
private=False,
)
@@ -0,0 +1,176 @@
import argparse
import logging
from pathlib import Path
import tarfile
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from examples.port_datasets.agibot_hdf5.download import RAW_REPO_ID, download_meta_data, get_observations_files
class PortAgiBotShards(PipelineStep):
def __init__(
self,
raw_dir: Path | str,
repo_id: str = None,
):
super().__init__()
self.raw_dir = Path(raw_dir)
self.repo_id = repo_id
def run(self, data=None, rank: int = 0, world_size: int = 1):
import shutil
import logging
import tarfile
from datasets.utils.tqdm import disable_progress_bars
from lerobot.common.constants import HF_LEROBOT_HOME
from examples.port_datasets.agibot_hdf5.port_agibot import port_agibot
from examples.port_datasets.agibot_hdf5.download import get_observations_files, download, no_depth, RAW_REPO_ID
from examples.port_datasets.droid_rlds.port_droid import validate_dataset
from lerobot.common.utils.utils import init_logging
init_logging()
disable_progress_bars()
shard_repo_id = f"{self.repo_id}_world_{world_size}_rank_{rank}"
dataset_dir = HF_LEROBOT_HOME / shard_repo_id
if dataset_dir.exists():
shutil.rmtree(dataset_dir)
obs_files, _ = get_observations_files(self.raw_dir, RAW_REPO_ID)
obs_file = obs_files[rank]
# Download subset
download(self.raw_dir, allow_patterns=obs_file)
tar_path = self.raw_dir / obs_file
with tarfile.open(tar_path, 'r') as tar:
extracted_files = tar.getnames()
task_index = int(tar_path.parent.name)
episode_names = [int(p) for p in extracted_files if '/' not in p]
# Untar if needed
if not all([(tar_path.parent / f"{ep_name}").exists() for ep_name in episode_names]):
logging.info(f"Untar-ing {tar_path}...")
with tarfile.open(tar_path, 'r') as tar:
tar.extractall(path=tar_path.parent, filter=no_depth)
port_agibot(self.raw_dir, shard_repo_id, task_index, episode_names, push_to_hub=False)
for ep_name in episode_names:
shutil.rmtree(str(tar_path.parent / f"{ep_name}"))
tar_path.unlink()
validate_dataset(shard_repo_id)
def make_port_executor(
raw_dir, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
):
download_meta_data(raw_dir)
obs_files, _ = get_observations_files(raw_dir, RAW_REPO_ID)
num_shards = len(obs_files)
kwargs = {
"pipeline": [
PortAgiBotShards(raw_dir, repo_id),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": num_shards,
"workers": workers,
"time": "08:00:00",
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": num_shards,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--raw-dir",
type=Path,
required=True,
help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).",
)
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
)
parser.add_argument(
"--logs-dir",
type=Path,
help="Path to logs directory for `datatrove`.",
)
parser.add_argument(
"--job-name",
type=str,
default="port_droid",
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
)
parser.add_argument(
"--workers",
type=int,
default=2048,
help="Number of slurm workers. It should be less than the maximum number of shards.",
)
parser.add_argument(
"--partition",
type=str,
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=8,
help="Number of cpus that each slurm worker will use.",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="1950M",
help="Memory per cpu that each worker will use.",
)
args = parser.parse_args()
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
port_executor = make_port_executor(**kwargs)
port_executor.run()
if __name__ == "__main__":
main()
@@ -0,0 +1,74 @@
import argparse
from pathlib import Path
import json
def find_missings(completions_dir, world_size):
""" Find workers that are not completed and returns their indices.
"""
full = list(range(world_size))
completed = []
for path in completions_dir.glob("*"):
if path.name in ['.', '..']:
continue
index = path.name.lstrip('0')
index = 0 if index == "" else int(index)
completed.append(index)
missings = set(full) - set(completed)
return missings
def find_output_files(slurm_dir, worker_indices):
""" Find output files associated to worker indices, and return tuples
of (worker index, output file path)
"""
out_files = []
for path in slurm_dir.glob("*.out"):
_, worker_id = path.name.replace(".out", "").split('_')
worker_id = int(worker_id)
if worker_id in worker_indices:
out_files.append((worker_id, path))
return out_files
def display_error_files(logs_dir, job_name):
executor_path = Path(logs_dir) / job_name / "executor.json"
completions_dir = Path(logs_dir) / job_name / "completions"
slurm_dir = Path(logs_dir) / job_name / "slurm_logs"
with open(executor_path) as f:
executor = json.load(f)
missings = find_missings(completions_dir, executor["world_size"])
for missing in sorted(list(missings))[::-1]:
print(missing)
# error_files = find_output_files(slurm_dir, missings)
# error_files = sorted(error_files, key=lambda x: x[0])
# for _, path in error_files[::-1]:
# print(path)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--logs-dir",
type=str,
help="Path to logs directory for `datatrove`.",
)
parser.add_argument(
"--job-name",
type=str,
default="port_droid",
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
)
args = parser.parse_args()
display_error_files(**vars(args))
if __name__ == "__main__":
main()