diff --git a/examples/port_datasets/agibot_hdf5/download.py b/examples/port_datasets/agibot_hdf5/download.py new file mode 100644 index 000000000..2b7938566 --- /dev/null +++ b/examples/port_datasets/agibot_hdf5/download.py @@ -0,0 +1,184 @@ + + +import json +import logging +from pathlib import Path +import shutil +from huggingface_hub import snapshot_download +import tarfile + +import tqdm +from examples.port_datasets.agibot_hdf5.port_agibot import port_agibot +from lerobot.common.constants import HF_LEROBOT_HOME +from lerobot.common.datasets.lerobot_dataset import LeRobotDataset +from lerobot.common.utils.utils import init_logging +from huggingface_hub import HfApi, HfFileSystem + +RAW_REPO_ID = "agibot-world/AgiBotWorld-Alpha" + + +def download(raw_dir, allow_patterns=None, ignore_patterns=None): + snapshot_download( + RAW_REPO_ID, + repo_type="dataset", + local_dir=str(raw_dir), + allow_patterns=allow_patterns, + ignore_patterns=ignore_patterns, + ) + +def download_proprio_stats(raw_dir): + proprio_stats_dir = raw_dir / "proprio_stats" + + if proprio_stats_dir.exists(): + logging.info("Skipping download proprio stats") + return + + download(raw_dir, allow_patterns="proprio_stats/*.tar") + + for path in proprio_stats_dir.glob("*.tar"): + logging.info(f"Untar-ing {path}...") + with tarfile.open(path, 'r') as tar: + tar.extractall(path=proprio_stats_dir) + + logging.info(f"Deleting {path}...") + path.unlink() + + +def download_parameters(raw_dir): + params_dir = raw_dir / "parameters" + + if params_dir.exists(): + logging.info("Skipping download parameters") + return + + download(raw_dir, allow_patterns="parameters/*.tar") + + for path in params_dir.glob("*.tar"): + logging.info(f"Untar-ing {path}...") + with tarfile.open(path, 'r') as tar: + tar.extractall(path=params_dir) + + logging.info(f"Deleting {path}...") + path.unlink() + + +def get_observations_files(raw_dir, raw_repo_id): + files_json_path = raw_dir / "observations_files.json" + sizes_json_path = raw_dir / "observations_sizes.json" + if files_json_path.exists() and sizes_json_path.exists(): + with open(files_json_path) as f: + files = json.load(f) + with open(sizes_json_path) as f: + sizes = json.load(f) + return files, sizes + + api = HfApi() + files = api.list_repo_files(repo_id=raw_repo_id, repo_type="dataset") + files = [file for file in files if "observations/" in file] + + fs = HfFileSystem() + sizes = [] + for file in tqdm.tqdm(files, desc="Downloading file sizes"): + file_info = fs.info(f"datasets/{raw_repo_id}/{file}") + size = file_info["size"] / 1000**3 + sizes.append(size) + + # Sort ASC to start with smaller size files + sizes, files = zip(*sorted(zip(sizes, files))) + + with open(files_json_path, "w") as f: + json.dump(files, f) + with open(sizes_json_path, "w") as f: + json.dump(sizes, f) + return files, sizes + +def display_observations_sizes(files, sizes): + size_per_task = {} + for i, (file, size) in enumerate(zip(files, sizes)): + logging.info(f"{i}/{len(files)}: {file} {size:.2f}GB") + + task = int(file.split('/')[1]) + + if task not in size_per_task: + size_per_task[task] = 0 + + size_per_task[task] += size + + for task, size in size_per_task.items(): + logging.info(f"{task} {size:.2f}GB") + + total_size = sum(list(size_per_task.values())) + logging.info(f"Total size: {total_size:.2f}GB") + + +def download_meta_data(raw_dir): + # Download task data + download(raw_dir, allow_patterns="task_info/task_*.json") + + # Download all camera parameters ~170 GB + download_parameters(raw_dir) + + # Download all proprio stats ~26 GB + download_proprio_stats(raw_dir) + +def no_depth(tarinfo, path): + """ Utility to not untar depth data""" + if "depth" in tarinfo.name: + return None + return tarinfo + +def main(): + init_logging() + + repo_id = "cadene/agibot_alpha_v30" + raw_dir = Path("/fsx/remi_cadene/data/AgiBotWorld-Alpha") + download_meta_data(raw_dir) + # Get list of tar files containing observation data (containing several episodes each) + obs_files, obs_sizes = get_observations_files(raw_dir, RAW_REPO_ID) + display_observations_sizes(obs_files, obs_sizes) + + shard_indices = range(len(obs_files)) + num_shards = len(obs_files) + + # TOOD: remove + obs_files = obs_files[:2] + shard_indices = [0,1] + + # Iterate on each subset of episodes + for shard_index, obs_file in zip(shard_indices, obs_files): + + shard_repo_id = f"{repo_id}_world_{num_shards}_rank_{shard_index}" + dataset_dir = HF_LEROBOT_HOME / shard_repo_id + if dataset_dir.exists(): + shutil.rmtree(dataset_dir) + + # Download subset + download(raw_dir, allow_patterns=obs_file) + + tar_path = raw_dir / obs_file + with tarfile.open(tar_path, 'r') as tar: + extracted_files = tar.getnames() + + task_index = int(tar_path.parent.name) + episode_names = [int(p) for p in extracted_files if '/' not in p] + + # Untar if needed + if not all([(tar_path.parent / f"{ep_name}").exists() for ep_name in episode_names]): + logging.info(f"Untar-ing {tar_path}...") + with tarfile.open(tar_path, 'r') as tar: + tar.extractall(path=tar_path.parent, filter=no_depth) + + port_agibot(raw_dir, shard_repo_id, task_index, episode_names, push_to_hub=False) + + for ep_name in episode_names: + shutil.rmtree(tar_path.parent / f"{ep_name}") + + tar_path.unlink() + + # dataset = LeRobotDataset(shard_repo_id, root=dataset_dir) + # lol=1 + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/port_datasets/agibot_hdf5/port_agibot.py b/examples/port_datasets/agibot_hdf5/port_agibot.py new file mode 100644 index 000000000..83af6d01a --- /dev/null +++ b/examples/port_datasets/agibot_hdf5/port_agibot.py @@ -0,0 +1,481 @@ + +import json +import logging +from pathlib import Path +import shutil +import time +import numpy as np +import h5py +import pandas as pd + +from lerobot.common.datasets.lerobot_dataset import LeRobotDataset +from lerobot.common.datasets.utils import DEFAULT_CHUNK_SIZE, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_VIDEO_PATH, EPISODES_DIR, concat_video_files, get_video_duration_in_s, get_video_size_in_mb, update_chunk_file_indices, write_info +from lerobot.common.utils.utils import get_elapsed_time_in_days_hours_minutes_seconds + + +AGIBOT_FPS = 30 +AGIBOT_ROBOT_TYPE = "AgiBot_A2D" +AGIBOT_FEATURES = { + # gripper open range in mm (0 for pull open, 1 for full close) + "observation.state.effector.position": { + "dtype": "float32", + "shape": (2,), + "names": { + "axes": ["left_gripper", "right_gripper"], + }, + }, + # flange xyz in meters + "observation.state.end.position": { + "dtype": "float32", + "shape": (6,), + "names": { + "axes": ["left_x", "left_y", "left_z", "right_x", "right_y", "right_z"], + }, + }, + # flange quaternion with xyzw + "observation.state.end.orientation": { + "dtype": "float32", + "shape": (8,), + "names": { + "axes": ["left_x", "left_y", "left_z", "left_w", "right_x", "right_y", "right_z", "right_w"], + }, + }, + # in radians + "observation.state.head.position": { + "dtype": "float32", + "shape": (2,), + "names": { + "axes": ["yaw", "pitch"], + }, + }, + # in motor steps + "observation.state.joint.current_value": { + "dtype": "float32", + "shape": (14,), + "names": { + "axes": [f"left_joint_{i}" for i in range(7)] + [f"right_joint_{i}" for i in range(7)], + }, + }, + # same as current_value but in radians + "observation.state.joint.position": { + "dtype": "float32", + "shape": (14,), + "names": { + "axes": [f"left_joint_{i}" for i in range(7)] + [f"right_joint_{i}" for i in range(7)], + }, + }, + # pitch in radians, lift in meters + "observation.state.waist.position": { + "dtype": "float32", + "shape": (2,), + "names": { + "axes": ["pitch", "lift"], + }, + }, + # concatenation of head.position, joint.position, effector.position, waist.position + "observation.state": { + "dtype": "float32", + "shape": (20,), + "names": { + "axes": ["head_yaw", "head_pitch"] + \ + [f"left_joint_{i}" for i in range(7)] + \ + ["left_gripper"] + \ + [f"right_joint_{i}" for i in range(7)] + \ + ["right_gripper"] + \ + ["waist_pitch", "waist_lift"], + }, + }, + # gripper open range in mm (0 for pull open, 1 for full close) + "action.effector.position": { + "dtype": "float32", + "shape": (2,), + "names": { + "axes": ["left_gripper", "right_gripper"], + }, + }, + # flange xyz in meters + "action.end.position": { + "dtype": "float32", + "shape": (6,), + "names": { + "axes": ["left_x", "left_y", "left_z", "right_x", "right_y", "right_z"], + }, + }, + # flange quaternion with xyzw + "action.end.orientation": { + "dtype": "float32", + "shape": (8,), + "names": { + "axes": ["left_x", "left_y", "left_z", "left_w", "right_x", "right_y", "right_z", "right_w"], + }, + }, + # in radians + "action.head.position": { + "dtype": "float32", + "shape": (2,), + "names": { + "axes": ["yaw", "pitch"], + }, + }, + # goal joint position in radians + "action.joint.position": { + "dtype": "float32", + "shape": (14,), + "names": { + "axes": [f"left_joint_{i}" for i in range(7)] + [f"right_joint_{i}" for i in range(7)], + }, + }, + "action.robot.velocity": { + "dtype": "float32", + "shape": (2,), + "names": { + "axes": ["velocity_x", "yaw_rate"], + }, + }, + # pitch in radians, lift in meters + "action.waist.position": { + "dtype": "float32", + "shape": (2,), + "names": { + "axes": ["pitch", "lift"], + }, + }, + # concatenation of head.position, joint.position, effector.position, waist.position, robot.velocity + "action": { + "dtype": "float32", + "shape": (22,), + "names": { + "axes": ["head_yaw", "head_pitch"] + \ + [f"left_joint_{i}" for i in range(7)] + \ + ["left_gripper"] + \ + [f"right_joint_{i}" for i in range(7)] + \ + ["right_gripper"] + \ + ["waist_pitch", "waist_lift"] + \ + ["velocity_x", "yaw_rate"], + }, + }, + # episode level annotation + "init_scene_text": { + "dtype": "string", + "shape": (1,), + "names": None, + }, + # frame level annotation + "action_text": { + "dtype": "string", + "shape": (1,), + "names": None, + }, + # frame level annotation + "skill": { + "dtype": "string", + "shape": (1,), + "names": None, + }, +} + +AGIBOT_IMAGES_FEATURES = { + "observation.images.top_head": { + "dtype": "video", + "shape": (480, 640, 3), + "names": ["height", "width", "channel"], + }, + "observation.images.hand_left": { + "dtype": "video", + "shape": (480, 640, 3), + "names": ["height", "width", "channel"], + }, + "observation.images.hand_right": { + "dtype": "video", + "shape": (480, 640, 3), + "names": ["height", "width", "channel"], + }, + "observation.images.head_center_fisheye": { + "dtype": "video", + "shape": (748, 960, 3), + "names": ["height", "width", "channel"], + }, + "observation.images.head_left_fisheye": { + "dtype": "video", + "shape": (748, 960, 3), + "names": ["height", "width", "channel"], + }, + "observation.images.head_right_fisheye": { + "dtype": "video", + "shape": (748, 960, 3), + "names": ["height", "width", "channel"], + }, + "observation.images.back_left_fisheye": { + "dtype": "video", + "shape": (748, 960, 3), + "names": ["height", "width", "channel"], + }, + "observation.images.back_right_fisheye": { + "dtype": "video", + "shape": (748, 960, 3), + "names": ["height", "width", "channel"], + }, +} + +def load_info_per_task(raw_dir): + info_per_task = {} + task_info_dir = raw_dir / "task_info" + for path in task_info_dir.glob("task_*.json"): + task_index = int(path.name.replace("task_","").replace(".json","")) + with open(path) as f: + task_info = json.load(f) + + task_info = {ep["episode_id"]: ep for ep in task_info} + info_per_task[task_index] = task_info + + return info_per_task + +def create_frame_idx_to_frames_label_idx(ep_info): + frame_idx_to_frames_label_idx = {} + for label_idx, frames_label in enumerate(ep_info["label_info"]["action_config"]): + for frame_idx in range(frames_label["start_frame"], frames_label["end_frame"]): + frame_idx_to_frames_label_idx[frame_idx] = label_idx + return frame_idx_to_frames_label_idx + +def generate_lerobot_frames(raw_dir: Path, task_index: int, episode_index: int): + """ /!\ The frames dont contain observation.cameras.* + """ + info_per_task = load_info_per_task(raw_dir) + ep_info = info_per_task[task_index][episode_index] + frame_idx_to_frames_label_idx = create_frame_idx_to_frames_label_idx(ep_info) + + # Empty features are commented out. + keys_mapping = { + # STATE + # "observation.state.effector.force": "state/effector/force", + "observation.state.effector.position": "state/effector/position", + # "observation.state.end.angular": "state/end/angular", + "observation.state.end.position": "state/end/position", + "observation.state.end.orientation": "state/end/orientation", + # "observation.state.end.velocity": "state/end/velocity", + # "observation.state.end.wrench": "state/end/wrench", + # "observation.state.head.effort": "state/head/effort", + "observation.state.head.position": "state/head/position", + # "observation.state.head.velocity": "state/head/velocity", + "observation.state.joint.current_value": "state/joint/current_value", + # "observation.state.joint.effort": "state/joint/effort", + "observation.state.joint.position": "state/joint/position", + # "observation.state.joint.velocity": "state/joint/velocity", + # "observation.state.robot.orientation": "state/robot/orientation", + # "observation.state.robot.orientation_drift": "state/robot/orientation_drift", + # "observation.state.robot.position": "state/robot/position", + # "observation.state.robot.position_drift": "state/robot/position_drift", + # "observation.state.waist.effort": "state/waist/effort", + "observation.state.waist.position": "state/waist/position", + # "observation.state.waist.velocity": "state/waist/velocity", + # ----- ACTION (index are also commented out) ----- + # "action.effector.index": "action/effector/index", + "action.effector.position": "action/effector/position", + # "action.effector.force": "action/effector/force", + # "action.end.index": "action/end/index", + "action.end.position": "action/end/position", + "action.end.orientation": "action/end/orientation", + # "action.head.index": "action/head/index", + "action.head.position": "action/head/position", + # "action.joint.index": "action/joint/index", + "action.joint.position": "action/joint/position", + # "action.joint.effort": "action/joint/effort", + # "action.joint.velocity": "action/joint/velocity", + # "action.robot.index": "action/robot/index", + # "action.robot.position": "action/robot/position", + # "action.robot.orientation": "action/robot/orientation", + # "action.robot.angular": "action/robot/angular", + "action.robot.velocity": "action/robot/velocity", + # "action.waist.index": "action/waist/index", + "action.waist.position": "action/waist/position", + } + + h5_path = raw_dir / f"proprio_stats/{task_index}/{episode_index}/proprio_stats.h5" + with h5py.File(h5_path) as h5: + num_frames = len(h5["state/joint/position"]) + + for h5_key in keys_mapping.values(): + col_num_frames = h5[h5_key].shape[0] + if col_num_frames != num_frames: + raise ValueError(f"HDF5 column '{h5_key}' is expected to have {num_frames} but has {col_num_frames}' frames instead.") + + for i in range(num_frames): + # Create frame + f = {new_key: h5[h5_key][i] for new_key, h5_key in keys_mapping.items()} + + for key in f: + f[key] = np.array(f[key]).astype(np.float32) + + f["observation.state.end.position"] = f["observation.state.end.position"].reshape(6) + f["observation.state.end.orientation"] = f["observation.state.end.orientation"].reshape(8) + f["observation.state"] = np.concatenate([ + f["observation.state.head.position"], + f["observation.state.joint.position"][:7], # left + f["observation.state.effector.position"][[0]], # left + f["observation.state.joint.position"][7:], # right + f["observation.state.effector.position"][[1]], # right + f["observation.state.waist.position"], + ]) + + f["action.end.position"] = f["action.end.position"].reshape(6) + f["action.end.orientation"] = f["action.end.orientation"].reshape(8) + f["action"] = np.concatenate([ + f["action.head.position"], + f["action.joint.position"][:7], # left + f["action.effector.position"][[0]], # left + f["action.joint.position"][7:], # right + f["action.effector.position"][[1]], # right + f["action.waist.position"], + f["action.robot.velocity"], + ]) + + # episode level annotation + f["task"] = ep_info["task_name"] + f["init_scene_text"] = ep_info["init_scene_text"] + + # frame level annotation + if i in frame_idx_to_frames_label_idx: + frames_label_idx = frame_idx_to_frames_label_idx[i] + frames_label = ep_info["label_info"]["action_config"][frames_label_idx] + f["action_text"] = frames_label["action_text"] + f["skill"] = frames_label["skill"] + else: + f["action_text"] = "" + f["skill"] = "" + + yield f + + +def update_meta_data( + df, + ep_to_meta, +): + def _update(row): + ep_idx = row["episode_index"] + for key, meta in ep_to_meta[ep_idx].items(): + row[f"videos/{key}/chunk_index"] = meta["chunk_index"] + row[f"videos/{key}/file_index"] = meta["file_index"] + row[f"videos/{key}/from_timestamp"] = meta["from_timestamp"] + row[f"videos/{key}/to_timestamp"] = meta["to_timestamp"] + return row + + return df.apply(_update, axis=1) + +def move_videos_to_lerobot_directory(lerobot_dataset, raw_dir, task_index, episode_names): + keys_mapping = { + "observation.images.top_head": "head_color", + "observation.images.hand_left": "hand_left_color", + "observation.images.hand_right": "hand_right_color", + "observation.images.head_center_fisheye": "head_center_fisheye_color", + "observation.images.head_left_fisheye": "head_left_fisheye_color", + "observation.images.head_right_fisheye": "head_right_fisheye_color", + "observation.images.back_left_fisheye": "back_left_fisheye_color", + "observation.images.back_right_fisheye": "back_right_fisheye_color", + } + + # sanity check + for key in keys_mapping: + if key not in lerobot_dataset.meta.info["features"]: + raise ValueError(f"Key '{key}' not found in features.") + + + video_keys = keys_mapping.keys() + chunk_idx = dict.fromkeys(video_keys, 0) + file_idx = dict.fromkeys(video_keys, 0) + latest_duration_in_s = dict.fromkeys(video_keys, 0) + ep_to_meta = {} + for ep_idx, ep_name in enumerate(episode_names): + for key in video_keys: + raw_videos_dir = raw_dir / f"observations/{task_index}/{ep_name}/videos" + old_key = keys_mapping[key] + ep_path = raw_videos_dir / f"{old_key}.mp4" + ep_duration_in_s = get_video_duration_in_s(ep_path) + + aggr_path = lerobot_dataset.root / DEFAULT_VIDEO_PATH.format( + video_key=key, + chunk_index=chunk_idx[key], + file_index=file_idx[key], + ) + if not aggr_path.exists(): + # First video + aggr_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(ep_path), str(aggr_path)) + else: + size_in_mb = get_video_size_in_mb(ep_path) + aggr_size_in_mb = get_video_size_in_mb(aggr_path) + + if aggr_size_in_mb + size_in_mb >= DEFAULT_VIDEO_FILE_SIZE_IN_MB: + # Size limit is reached, prepare new parquet file + chunk_idx[key], file_idx[key] = update_chunk_file_indices( + chunk_idx[key], file_idx[key], DEFAULT_CHUNK_SIZE + ) + aggr_path = lerobot_dataset.root / DEFAULT_VIDEO_PATH.format( + video_key=key, + chunk_index=chunk_idx[key], + file_index=file_idx[key], + ) + aggr_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(ep_path), str(aggr_path)) + latest_duration_in_s[key] = 0 + else: + # Update the existing parquet file with new rows + concat_video_files( + [aggr_path, ep_path], + lerobot_dataset.root, + key, + chunk_idx[key], + file_idx[key], + ) + + if ep_idx not in ep_to_meta: + ep_to_meta[ep_idx] = {} + ep_to_meta[ep_idx][key] = { + "chunk_index": chunk_idx[key], + "file_index": file_idx[key], + "from_timestamp": latest_duration_in_s[key], + "to_timestamp": latest_duration_in_s[key] + ep_duration_in_s, + } + latest_duration_in_s[key] += ep_duration_in_s + + # Update episodes meta data + for meta_path in (lerobot_dataset.root / EPISODES_DIR).glob("chunk-*/file-*.parquet"): + df = pd.read_parquet(meta_path) + df = update_meta_data(df, ep_to_meta) + df.to_parquet(meta_path) + +def port_agibot(raw_dir: Path, repo_id: str, task_index: int, episode_indices: list[int], push_to_hub: bool = False): + lerobot_dataset = LeRobotDataset.create( + repo_id=repo_id, + robot_type=AGIBOT_ROBOT_TYPE, + fps=AGIBOT_FPS, + features=AGIBOT_FEATURES, + ) + + start_time = time.time() + num_episodes = len(episode_indices) + logging.info(f"Number of episodes {num_episodes}") + + for i, episode_index in enumerate(episode_indices): + elapsed_time = time.time() - start_time + d, h, m, s = get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time) + + logging.info(f"{i} / {num_episodes} episodes processed (after {d} days, {h} hours, {m} minutes, {s:.3f} seconds)") + + for frame in generate_lerobot_frames(raw_dir, task_index, episode_index): + lerobot_dataset.add_frame(frame) + + lerobot_dataset.save_episode() + logging.info("Save_episode") + + # Videos have already been encoded with the proper format, so we rely on hacks + # HACK: Add extra images features + lerobot_dataset.meta.info["features"].update(AGIBOT_IMAGES_FEATURES) + write_info(lerobot_dataset.meta.info, lerobot_dataset.meta.root) + move_videos_to_lerobot_directory(lerobot_dataset, raw_dir, task_index, episode_indices) + + if push_to_hub: + lerobot_dataset.push_to_hub( + # Add agibot tag, since it belongs to the agibot collection of datasets + tags=["agibot"], + private=False, + ) \ No newline at end of file diff --git a/examples/port_datasets/agibot_hdf5/slurm_port_shards.py b/examples/port_datasets/agibot_hdf5/slurm_port_shards.py new file mode 100644 index 000000000..2fd4f5115 --- /dev/null +++ b/examples/port_datasets/agibot_hdf5/slurm_port_shards.py @@ -0,0 +1,176 @@ +import argparse +import logging +from pathlib import Path +import tarfile + +from datatrove.executor import LocalPipelineExecutor +from datatrove.executor.slurm import SlurmPipelineExecutor +from datatrove.pipeline.base import PipelineStep + +from examples.port_datasets.agibot_hdf5.download import RAW_REPO_ID, download_meta_data, get_observations_files + + + +class PortAgiBotShards(PipelineStep): + def __init__( + self, + raw_dir: Path | str, + repo_id: str = None, + ): + super().__init__() + self.raw_dir = Path(raw_dir) + self.repo_id = repo_id + + def run(self, data=None, rank: int = 0, world_size: int = 1): + import shutil + import logging + import tarfile + from datasets.utils.tqdm import disable_progress_bars + + from lerobot.common.constants import HF_LEROBOT_HOME + from examples.port_datasets.agibot_hdf5.port_agibot import port_agibot + from examples.port_datasets.agibot_hdf5.download import get_observations_files, download, no_depth, RAW_REPO_ID + from examples.port_datasets.droid_rlds.port_droid import validate_dataset + from lerobot.common.utils.utils import init_logging + + init_logging() + disable_progress_bars() + + shard_repo_id = f"{self.repo_id}_world_{world_size}_rank_{rank}" + + dataset_dir = HF_LEROBOT_HOME / shard_repo_id + if dataset_dir.exists(): + shutil.rmtree(dataset_dir) + + obs_files, _ = get_observations_files(self.raw_dir, RAW_REPO_ID) + obs_file = obs_files[rank] + + # Download subset + download(self.raw_dir, allow_patterns=obs_file) + + tar_path = self.raw_dir / obs_file + with tarfile.open(tar_path, 'r') as tar: + extracted_files = tar.getnames() + + task_index = int(tar_path.parent.name) + episode_names = [int(p) for p in extracted_files if '/' not in p] + + # Untar if needed + if not all([(tar_path.parent / f"{ep_name}").exists() for ep_name in episode_names]): + logging.info(f"Untar-ing {tar_path}...") + with tarfile.open(tar_path, 'r') as tar: + tar.extractall(path=tar_path.parent, filter=no_depth) + + port_agibot(self.raw_dir, shard_repo_id, task_index, episode_names, push_to_hub=False) + + for ep_name in episode_names: + shutil.rmtree(str(tar_path.parent / f"{ep_name}")) + + tar_path.unlink() + + validate_dataset(shard_repo_id) + + +def make_port_executor( + raw_dir, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True +): + download_meta_data(raw_dir) + obs_files, _ = get_observations_files(raw_dir, RAW_REPO_ID) + num_shards = len(obs_files) + + kwargs = { + "pipeline": [ + PortAgiBotShards(raw_dir, repo_id), + ], + "logging_dir": str(logs_dir / job_name), + } + + if slurm: + kwargs.update( + { + "job_name": job_name, + "tasks": num_shards, + "workers": workers, + "time": "08:00:00", + "partition": partition, + "cpus_per_task": cpus_per_task, + "sbatch_args": {"mem-per-cpu": mem_per_cpu}, + } + ) + executor = SlurmPipelineExecutor(**kwargs) + else: + kwargs.update( + { + "tasks": num_shards, + "workers": 1, + } + ) + executor = LocalPipelineExecutor(**kwargs) + + return executor + + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--raw-dir", + type=Path, + required=True, + help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).", + ) + parser.add_argument( + "--repo-id", + type=str, + help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.", + ) + parser.add_argument( + "--logs-dir", + type=Path, + help="Path to logs directory for `datatrove`.", + ) + parser.add_argument( + "--job-name", + type=str, + default="port_droid", + help="Job name used in slurm, and name of the directory created inside the provided logs directory.", + ) + parser.add_argument( + "--slurm", + type=int, + default=1, + help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).", + ) + parser.add_argument( + "--workers", + type=int, + default=2048, + help="Number of slurm workers. It should be less than the maximum number of shards.", + ) + parser.add_argument( + "--partition", + type=str, + help="Slurm partition. Ideally a CPU partition. No need for GPU partition.", + ) + parser.add_argument( + "--cpus-per-task", + type=int, + default=8, + help="Number of cpus that each slurm worker will use.", + ) + parser.add_argument( + "--mem-per-cpu", + type=str, + default="1950M", + help="Memory per cpu that each worker will use.", + ) + + args = parser.parse_args() + kwargs = vars(args) + kwargs["slurm"] = kwargs.pop("slurm") == 1 + port_executor = make_port_executor(**kwargs) + port_executor.run() + + +if __name__ == "__main__": + main() diff --git a/examples/port_datasets/droid_rlds/display_error_files.py b/examples/port_datasets/droid_rlds/display_error_files.py new file mode 100644 index 000000000..a01f11c7b --- /dev/null +++ b/examples/port_datasets/droid_rlds/display_error_files.py @@ -0,0 +1,74 @@ +import argparse +from pathlib import Path +import json + +def find_missings(completions_dir, world_size): + """ Find workers that are not completed and returns their indices. + """ + full = list(range(world_size)) + + completed = [] + for path in completions_dir.glob("*"): + if path.name in ['.', '..']: + continue + index = path.name.lstrip('0') + index = 0 if index == "" else int(index) + completed.append(index) + + missings = set(full) - set(completed) + return missings + +def find_output_files(slurm_dir, worker_indices): + """ Find output files associated to worker indices, and return tuples + of (worker index, output file path) + """ + out_files = [] + for path in slurm_dir.glob("*.out"): + _, worker_id = path.name.replace(".out", "").split('_') + worker_id = int(worker_id) + if worker_id in worker_indices: + out_files.append((worker_id, path)) + return out_files + + +def display_error_files(logs_dir, job_name): + executor_path = Path(logs_dir) / job_name / "executor.json" + completions_dir = Path(logs_dir) / job_name / "completions" + slurm_dir = Path(logs_dir) / job_name / "slurm_logs" + + with open(executor_path) as f: + executor = json.load(f) + + missings = find_missings(completions_dir, executor["world_size"]) + + for missing in sorted(list(missings))[::-1]: + print(missing) + + # error_files = find_output_files(slurm_dir, missings) + # error_files = sorted(error_files, key=lambda x: x[0]) + + # for _, path in error_files[::-1]: + # print(path) + + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--logs-dir", + type=str, + help="Path to logs directory for `datatrove`.", + ) + parser.add_argument( + "--job-name", + type=str, + default="port_droid", + help="Job name used in slurm, and name of the directory created inside the provided logs directory.", + ) + + args = parser.parse_args() + + display_error_files(**vars(args)) + +if __name__ == "__main__": + main() \ No newline at end of file