From 0a1da47527ceb653436f233232788103c7f527ca Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 30 Jun 2025 17:24:43 +0200 Subject: [PATCH] fix(precommit) solve precommit issues --- .../port_datasets/agibot_hdf5/download.py | 184 ------------------ .../port_datasets/agibot_hdf5/port_agibot.py | 110 ++++++----- .../agibot_hdf5/slurm_port_shards.py | 31 +-- .../droid_rlds/display_error_files.py | 35 ++-- .../droid_rlds/slurm_port_shards.py | 4 +- 5 files changed, 102 insertions(+), 262 deletions(-) delete mode 100644 examples/port_datasets/agibot_hdf5/download.py diff --git a/examples/port_datasets/agibot_hdf5/download.py b/examples/port_datasets/agibot_hdf5/download.py deleted file mode 100644 index 2b7938566..000000000 --- a/examples/port_datasets/agibot_hdf5/download.py +++ /dev/null @@ -1,184 +0,0 @@ - - -import json -import logging -from pathlib import Path -import shutil -from huggingface_hub import snapshot_download -import tarfile - -import tqdm -from examples.port_datasets.agibot_hdf5.port_agibot import port_agibot -from lerobot.common.constants import HF_LEROBOT_HOME -from lerobot.common.datasets.lerobot_dataset import LeRobotDataset -from lerobot.common.utils.utils import init_logging -from huggingface_hub import HfApi, HfFileSystem - -RAW_REPO_ID = "agibot-world/AgiBotWorld-Alpha" - - -def download(raw_dir, allow_patterns=None, ignore_patterns=None): - snapshot_download( - RAW_REPO_ID, - repo_type="dataset", - local_dir=str(raw_dir), - allow_patterns=allow_patterns, - ignore_patterns=ignore_patterns, - ) - -def download_proprio_stats(raw_dir): - proprio_stats_dir = raw_dir / "proprio_stats" - - if proprio_stats_dir.exists(): - logging.info("Skipping download proprio stats") - return - - download(raw_dir, allow_patterns="proprio_stats/*.tar") - - for path in proprio_stats_dir.glob("*.tar"): - logging.info(f"Untar-ing {path}...") - with tarfile.open(path, 'r') as tar: - tar.extractall(path=proprio_stats_dir) - - logging.info(f"Deleting {path}...") - path.unlink() - - -def download_parameters(raw_dir): - params_dir = raw_dir / "parameters" - - if params_dir.exists(): - logging.info("Skipping download parameters") - return - - download(raw_dir, allow_patterns="parameters/*.tar") - - for path in params_dir.glob("*.tar"): - logging.info(f"Untar-ing {path}...") - with tarfile.open(path, 'r') as tar: - tar.extractall(path=params_dir) - - logging.info(f"Deleting {path}...") - path.unlink() - - -def get_observations_files(raw_dir, raw_repo_id): - files_json_path = raw_dir / "observations_files.json" - sizes_json_path = raw_dir / "observations_sizes.json" - if files_json_path.exists() and sizes_json_path.exists(): - with open(files_json_path) as f: - files = json.load(f) - with open(sizes_json_path) as f: - sizes = json.load(f) - return files, sizes - - api = HfApi() - files = api.list_repo_files(repo_id=raw_repo_id, repo_type="dataset") - files = [file for file in files if "observations/" in file] - - fs = HfFileSystem() - sizes = [] - for file in tqdm.tqdm(files, desc="Downloading file sizes"): - file_info = fs.info(f"datasets/{raw_repo_id}/{file}") - size = file_info["size"] / 1000**3 - sizes.append(size) - - # Sort ASC to start with smaller size files - sizes, files = zip(*sorted(zip(sizes, files))) - - with open(files_json_path, "w") as f: - json.dump(files, f) - with open(sizes_json_path, "w") as f: - json.dump(sizes, f) - return files, sizes - -def display_observations_sizes(files, sizes): - size_per_task = {} - for i, (file, size) in enumerate(zip(files, sizes)): - logging.info(f"{i}/{len(files)}: {file} {size:.2f}GB") - - task = int(file.split('/')[1]) - - if task not in size_per_task: - size_per_task[task] = 0 - - size_per_task[task] += size - - for task, size in size_per_task.items(): - logging.info(f"{task} {size:.2f}GB") - - total_size = sum(list(size_per_task.values())) - logging.info(f"Total size: {total_size:.2f}GB") - - -def download_meta_data(raw_dir): - # Download task data - download(raw_dir, allow_patterns="task_info/task_*.json") - - # Download all camera parameters ~170 GB - download_parameters(raw_dir) - - # Download all proprio stats ~26 GB - download_proprio_stats(raw_dir) - -def no_depth(tarinfo, path): - """ Utility to not untar depth data""" - if "depth" in tarinfo.name: - return None - return tarinfo - -def main(): - init_logging() - - repo_id = "cadene/agibot_alpha_v30" - raw_dir = Path("/fsx/remi_cadene/data/AgiBotWorld-Alpha") - download_meta_data(raw_dir) - # Get list of tar files containing observation data (containing several episodes each) - obs_files, obs_sizes = get_observations_files(raw_dir, RAW_REPO_ID) - display_observations_sizes(obs_files, obs_sizes) - - shard_indices = range(len(obs_files)) - num_shards = len(obs_files) - - # TOOD: remove - obs_files = obs_files[:2] - shard_indices = [0,1] - - # Iterate on each subset of episodes - for shard_index, obs_file in zip(shard_indices, obs_files): - - shard_repo_id = f"{repo_id}_world_{num_shards}_rank_{shard_index}" - dataset_dir = HF_LEROBOT_HOME / shard_repo_id - if dataset_dir.exists(): - shutil.rmtree(dataset_dir) - - # Download subset - download(raw_dir, allow_patterns=obs_file) - - tar_path = raw_dir / obs_file - with tarfile.open(tar_path, 'r') as tar: - extracted_files = tar.getnames() - - task_index = int(tar_path.parent.name) - episode_names = [int(p) for p in extracted_files if '/' not in p] - - # Untar if needed - if not all([(tar_path.parent / f"{ep_name}").exists() for ep_name in episode_names]): - logging.info(f"Untar-ing {tar_path}...") - with tarfile.open(tar_path, 'r') as tar: - tar.extractall(path=tar_path.parent, filter=no_depth) - - port_agibot(raw_dir, shard_repo_id, task_index, episode_names, push_to_hub=False) - - for ep_name in episode_names: - shutil.rmtree(tar_path.parent / f"{ep_name}") - - tar_path.unlink() - - # dataset = LeRobotDataset(shard_repo_id, root=dataset_dir) - # lol=1 - - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/examples/port_datasets/agibot_hdf5/port_agibot.py b/examples/port_datasets/agibot_hdf5/port_agibot.py index 83af6d01a..e9b1a61a0 100644 --- a/examples/port_datasets/agibot_hdf5/port_agibot.py +++ b/examples/port_datasets/agibot_hdf5/port_agibot.py @@ -1,18 +1,27 @@ - import json import logging -from pathlib import Path import shutil import time -import numpy as np +from pathlib import Path + import h5py +import numpy as np import pandas as pd from lerobot.common.datasets.lerobot_dataset import LeRobotDataset -from lerobot.common.datasets.utils import DEFAULT_CHUNK_SIZE, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_VIDEO_PATH, EPISODES_DIR, concat_video_files, get_video_duration_in_s, get_video_size_in_mb, update_chunk_file_indices, write_info +from lerobot.common.datasets.utils import ( + DEFAULT_CHUNK_SIZE, + DEFAULT_VIDEO_FILE_SIZE_IN_MB, + DEFAULT_VIDEO_PATH, + EPISODES_DIR, + concat_video_files, + get_video_duration_in_s, + get_video_size_in_mb, + update_chunk_file_indices, + write_info, +) from lerobot.common.utils.utils import get_elapsed_time_in_days_hours_minutes_seconds - AGIBOT_FPS = 30 AGIBOT_ROBOT_TYPE = "AgiBot_A2D" AGIBOT_FEATURES = { @@ -77,12 +86,12 @@ AGIBOT_FEATURES = { "dtype": "float32", "shape": (20,), "names": { - "axes": ["head_yaw", "head_pitch"] + \ - [f"left_joint_{i}" for i in range(7)] + \ - ["left_gripper"] + \ - [f"right_joint_{i}" for i in range(7)] + \ - ["right_gripper"] + \ - ["waist_pitch", "waist_lift"], + "axes": ["head_yaw", "head_pitch"] + + [f"left_joint_{i}" for i in range(7)] + + ["left_gripper"] + + [f"right_joint_{i}" for i in range(7)] + + ["right_gripper"] + + ["waist_pitch", "waist_lift"], }, }, # gripper open range in mm (0 for pull open, 1 for full close) @@ -145,13 +154,13 @@ AGIBOT_FEATURES = { "dtype": "float32", "shape": (22,), "names": { - "axes": ["head_yaw", "head_pitch"] + \ - [f"left_joint_{i}" for i in range(7)] + \ - ["left_gripper"] + \ - [f"right_joint_{i}" for i in range(7)] + \ - ["right_gripper"] + \ - ["waist_pitch", "waist_lift"] + \ - ["velocity_x", "yaw_rate"], + "axes": ["head_yaw", "head_pitch"] + + [f"left_joint_{i}" for i in range(7)] + + ["left_gripper"] + + [f"right_joint_{i}" for i in range(7)] + + ["right_gripper"] + + ["waist_pitch", "waist_lift"] + + ["velocity_x", "yaw_rate"], }, }, # episode level annotation @@ -217,11 +226,12 @@ AGIBOT_IMAGES_FEATURES = { }, } + def load_info_per_task(raw_dir): info_per_task = {} task_info_dir = raw_dir / "task_info" for path in task_info_dir.glob("task_*.json"): - task_index = int(path.name.replace("task_","").replace(".json","")) + task_index = int(path.name.replace("task_", "").replace(".json", "")) with open(path) as f: task_info = json.load(f) @@ -230,6 +240,7 @@ def load_info_per_task(raw_dir): return info_per_task + def create_frame_idx_to_frames_label_idx(ep_info): frame_idx_to_frames_label_idx = {} for label_idx, frames_label in enumerate(ep_info["label_info"]["action_config"]): @@ -237,9 +248,9 @@ def create_frame_idx_to_frames_label_idx(ep_info): frame_idx_to_frames_label_idx[frame_idx] = label_idx return frame_idx_to_frames_label_idx + def generate_lerobot_frames(raw_dir: Path, task_index: int, episode_index: int): - """ /!\ The frames dont contain observation.cameras.* - """ + r"""/!\ The frames dont contain observation.cameras.*""" info_per_task = load_info_per_task(raw_dir) ep_info = info_per_task[task_index][episode_index] frame_idx_to_frames_label_idx = create_frame_idx_to_frames_label_idx(ep_info) @@ -297,7 +308,9 @@ def generate_lerobot_frames(raw_dir: Path, task_index: int, episode_index: int): for h5_key in keys_mapping.values(): col_num_frames = h5[h5_key].shape[0] if col_num_frames != num_frames: - raise ValueError(f"HDF5 column '{h5_key}' is expected to have {num_frames} but has {col_num_frames}' frames instead.") + raise ValueError( + f"HDF5 column '{h5_key}' is expected to have {num_frames} but has {col_num_frames}' frames instead." + ) for i in range(num_frames): # Create frame @@ -308,26 +321,30 @@ def generate_lerobot_frames(raw_dir: Path, task_index: int, episode_index: int): f["observation.state.end.position"] = f["observation.state.end.position"].reshape(6) f["observation.state.end.orientation"] = f["observation.state.end.orientation"].reshape(8) - f["observation.state"] = np.concatenate([ - f["observation.state.head.position"], - f["observation.state.joint.position"][:7], # left - f["observation.state.effector.position"][[0]], # left - f["observation.state.joint.position"][7:], # right - f["observation.state.effector.position"][[1]], # right - f["observation.state.waist.position"], - ]) + f["observation.state"] = np.concatenate( + [ + f["observation.state.head.position"], + f["observation.state.joint.position"][:7], # left + f["observation.state.effector.position"][[0]], # left + f["observation.state.joint.position"][7:], # right + f["observation.state.effector.position"][[1]], # right + f["observation.state.waist.position"], + ] + ) f["action.end.position"] = f["action.end.position"].reshape(6) f["action.end.orientation"] = f["action.end.orientation"].reshape(8) - f["action"] = np.concatenate([ - f["action.head.position"], - f["action.joint.position"][:7], # left - f["action.effector.position"][[0]], # left - f["action.joint.position"][7:], # right - f["action.effector.position"][[1]], # right - f["action.waist.position"], - f["action.robot.velocity"], - ]) + f["action"] = np.concatenate( + [ + f["action.head.position"], + f["action.joint.position"][:7], # left + f["action.effector.position"][[0]], # left + f["action.joint.position"][7:], # right + f["action.effector.position"][[1]], # right + f["action.waist.position"], + f["action.robot.velocity"], + ] + ) # episode level annotation f["task"] = ep_info["task_name"] @@ -361,6 +378,7 @@ def update_meta_data( return df.apply(_update, axis=1) + def move_videos_to_lerobot_directory(lerobot_dataset, raw_dir, task_index, episode_names): keys_mapping = { "observation.images.top_head": "head_color", @@ -378,7 +396,6 @@ def move_videos_to_lerobot_directory(lerobot_dataset, raw_dir, task_index, episo if key not in lerobot_dataset.meta.info["features"]: raise ValueError(f"Key '{key}' not found in features.") - video_keys = keys_mapping.keys() chunk_idx = dict.fromkeys(video_keys, 0) file_idx = dict.fromkeys(video_keys, 0) @@ -438,12 +455,15 @@ def move_videos_to_lerobot_directory(lerobot_dataset, raw_dir, task_index, episo latest_duration_in_s[key] += ep_duration_in_s # Update episodes meta data - for meta_path in (lerobot_dataset.root / EPISODES_DIR).glob("chunk-*/file-*.parquet"): + for meta_path in (lerobot_dataset.root / EPISODES_DIR).glob("chunk-*/file-*.parquet"): df = pd.read_parquet(meta_path) df = update_meta_data(df, ep_to_meta) df.to_parquet(meta_path) -def port_agibot(raw_dir: Path, repo_id: str, task_index: int, episode_indices: list[int], push_to_hub: bool = False): + +def port_agibot( + raw_dir: Path, repo_id: str, task_index: int, episode_indices: list[int], push_to_hub: bool = False +): lerobot_dataset = LeRobotDataset.create( repo_id=repo_id, robot_type=AGIBOT_ROBOT_TYPE, @@ -459,7 +479,9 @@ def port_agibot(raw_dir: Path, repo_id: str, task_index: int, episode_indices: l elapsed_time = time.time() - start_time d, h, m, s = get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time) - logging.info(f"{i} / {num_episodes} episodes processed (after {d} days, {h} hours, {m} minutes, {s:.3f} seconds)") + logging.info( + f"{i} / {num_episodes} episodes processed (after {d} days, {h} hours, {m} minutes, {s:.3f} seconds)" + ) for frame in generate_lerobot_frames(raw_dir, task_index, episode_index): lerobot_dataset.add_frame(frame) @@ -478,4 +500,4 @@ def port_agibot(raw_dir: Path, repo_id: str, task_index: int, episode_indices: l # Add agibot tag, since it belongs to the agibot collection of datasets tags=["agibot"], private=False, - ) \ No newline at end of file + ) diff --git a/examples/port_datasets/agibot_hdf5/slurm_port_shards.py b/examples/port_datasets/agibot_hdf5/slurm_port_shards.py index 2fd4f5115..c9d3dad93 100644 --- a/examples/port_datasets/agibot_hdf5/slurm_port_shards.py +++ b/examples/port_datasets/agibot_hdf5/slurm_port_shards.py @@ -1,14 +1,17 @@ import argparse import logging -from pathlib import Path import tarfile +from pathlib import Path from datatrove.executor import LocalPipelineExecutor from datatrove.executor.slurm import SlurmPipelineExecutor from datatrove.pipeline.base import PipelineStep -from examples.port_datasets.agibot_hdf5.download import RAW_REPO_ID, download_meta_data, get_observations_files - +from examples.port_datasets.agibot_hdf5.download import ( + RAW_REPO_ID, + download_meta_data, + get_observations_files, +) class PortAgiBotShards(PipelineStep): @@ -23,14 +26,18 @@ class PortAgiBotShards(PipelineStep): def run(self, data=None, rank: int = 0, world_size: int = 1): import shutil - import logging - import tarfile + from datasets.utils.tqdm import disable_progress_bars - from lerobot.common.constants import HF_LEROBOT_HOME + from examples.port_datasets.agibot_hdf5.download import ( + RAW_REPO_ID, + download, + get_observations_files, + no_depth, + ) from examples.port_datasets.agibot_hdf5.port_agibot import port_agibot - from examples.port_datasets.agibot_hdf5.download import get_observations_files, download, no_depth, RAW_REPO_ID from examples.port_datasets.droid_rlds.port_droid import validate_dataset + from lerobot.common.constants import HF_LEROBOT_HOME from lerobot.common.utils.utils import init_logging init_logging() @@ -49,17 +56,17 @@ class PortAgiBotShards(PipelineStep): download(self.raw_dir, allow_patterns=obs_file) tar_path = self.raw_dir / obs_file - with tarfile.open(tar_path, 'r') as tar: + with tarfile.open(tar_path, "r") as tar: extracted_files = tar.getnames() task_index = int(tar_path.parent.name) - episode_names = [int(p) for p in extracted_files if '/' not in p] + episode_names = [int(p) for p in extracted_files if "/" not in p] # Untar if needed - if not all([(tar_path.parent / f"{ep_name}").exists() for ep_name in episode_names]): + if not all((tar_path.parent / f"{ep_name}").exists() for ep_name in episode_names): logging.info(f"Untar-ing {tar_path}...") - with tarfile.open(tar_path, 'r') as tar: - tar.extractall(path=tar_path.parent, filter=no_depth) + with tarfile.open(tar_path, "r") as tar: + tar.extractall(path=tar_path.parent, filter=no_depth) # nosec B202 port_agibot(self.raw_dir, shard_repo_id, task_index, episode_names, push_to_hub=False) diff --git a/examples/port_datasets/droid_rlds/display_error_files.py b/examples/port_datasets/droid_rlds/display_error_files.py index a01f11c7b..cc6395481 100644 --- a/examples/port_datasets/droid_rlds/display_error_files.py +++ b/examples/port_datasets/droid_rlds/display_error_files.py @@ -1,30 +1,31 @@ import argparse -from pathlib import Path import json +from pathlib import Path -def find_missings(completions_dir, world_size): - """ Find workers that are not completed and returns their indices. - """ + +def find_missing_workers(completions_dir, world_size): + """Find workers that are not completed and returns their indices.""" full = list(range(world_size)) completed = [] for path in completions_dir.glob("*"): - if path.name in ['.', '..']: + if path.name in [".", ".."]: continue - index = path.name.lstrip('0') + index = path.name.lstrip("0") index = 0 if index == "" else int(index) completed.append(index) - missings = set(full) - set(completed) - return missings + missing_workers = set(full) - set(completed) + return missing_workers + def find_output_files(slurm_dir, worker_indices): - """ Find output files associated to worker indices, and return tuples + """Find output files associated to worker indices, and return tuples of (worker index, output file path) """ out_files = [] for path in slurm_dir.glob("*.out"): - _, worker_id = path.name.replace(".out", "").split('_') + _, worker_id = path.name.replace(".out", "").split("_") worker_id = int(worker_id) if worker_id in worker_indices: out_files.append((worker_id, path)) @@ -34,22 +35,15 @@ def find_output_files(slurm_dir, worker_indices): def display_error_files(logs_dir, job_name): executor_path = Path(logs_dir) / job_name / "executor.json" completions_dir = Path(logs_dir) / job_name / "completions" - slurm_dir = Path(logs_dir) / job_name / "slurm_logs" with open(executor_path) as f: executor = json.load(f) - missings = find_missings(completions_dir, executor["world_size"]) + missing_workers = find_missing_workers(completions_dir, executor["world_size"]) - for missing in sorted(list(missings))[::-1]: + for missing in sorted(missing_workers)[::-1]: print(missing) - # error_files = find_output_files(slurm_dir, missings) - # error_files = sorted(error_files, key=lambda x: x[0]) - - # for _, path in error_files[::-1]: - # print(path) - def main(): parser = argparse.ArgumentParser() @@ -70,5 +64,6 @@ def main(): display_error_files(**vars(args)) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/examples/port_datasets/droid_rlds/slurm_port_shards.py b/examples/port_datasets/droid_rlds/slurm_port_shards.py index 36b4d795b..093194ce6 100644 --- a/examples/port_datasets/droid_rlds/slurm_port_shards.py +++ b/examples/port_datasets/droid_rlds/slurm_port_shards.py @@ -32,8 +32,8 @@ class PortDroidShards(PipelineStep): try: validate_dataset(shard_repo_id) return - except: - pass + except Exception: + pass # nosec B110 - Dataset doesn't exist yet, continue with porting port_droid( self.raw_dir,