diff --git a/examples/dataset/config.py b/examples/dataset/config.py new file mode 100644 index 000000000..9ccd23ebd --- /dev/null +++ b/examples/dataset/config.py @@ -0,0 +1,93 @@ +XVLA_SOFT_FOLD_FEATURES = { + "observation.images.cam_high": { + "dtype": "video", + "names": ["height", "width", "channels"], + "shape": (256, 256, 3), + "names": ["height", "width", "rgb"], + }, + "observation.images.cam_left_wrist": { + "dtype": "video", + "names": ["height", "width", "channels"], + "shape": (256, 256, 3), + "names": ["height", "width", "rgb"], + }, + "observation.images.cam_right_wrist": { + "dtype": "video", + "names": ["height", "width", "channels"], + "shape": (256, 256, 3), + "names": ["height", "width", "rgb"], + }, + + "observation.states.eef_euler": { + "dtype": "float32", + "shape": (14,), # 14 = 7 joints per arm × 2 arms OR 14-d state representation + "names": {"values": [f"eef_euler_{i}" for i in range(14)]}, + }, + + "observation.states.eef_quaternion": { + "dtype": "float32", + "shape": (16,), # 16 = 8 quaternion floats per arm × 2 arms + "names": {"values": [f"eef_quat_{i}" for i in range(16)]}, + }, + + "observation.states.eef_6d": { + "dtype": "float32", + "shape": (20,), # 20 = pos(3) + rot6d(6) + extra dims + "names": {"values": [f"eef6d_{i}" for i in range(20)]}, + }, + + "observation.states.eef_left_time": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["eef_left_time"]}, + }, + + "observation.states.eef_right_time": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["eef_right_time"]}, + }, + + "observation.states.qpos": { + "dtype": "float32", + "shape": (14,), # 7 per arm × 2 arms + "names": {"motors": [f"qpos_{i}" for i in range(14)]}, + }, + + "observation.states.qvel": { + "dtype": "float32", + "shape": (14,), + "names": {"motors": [f"qvel_{i}" for i in range(14)]}, + }, + + "observation.states.effort": { + "dtype": "float32", + "shape": (14,), + "names": {"motors": [f"effort_{i}" for i in range(14)]}, + }, + + "observation.states.qpos_left_time": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["qpos_left_time"]}, + }, + + "observation.states.qpos_right_time": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["qpos_right_time"]}, + }, + + "action": { + "dtype": "float32", + "shape": (14,), + "names": {"motors": [f"joint_action_{i}" for i in range(14)]}, + }, + + "time_stamp": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["global_timestamp"]}, + }, + +} diff --git a/examples/dataset/convert.sh b/examples/dataset/convert.sh new file mode 100644 index 000000000..e9f6ee039 --- /dev/null +++ b/examples/dataset/convert.sh @@ -0,0 +1,6 @@ +python ./examples/dataset/convert_hdf5_lerobot.py \ + --src-paths /fsx/jade_choghari/XVLA-Soft-Fold/0808_12am_stage_1_stage2new_new_cam_very_slow_no_sleeve \ + --output-path /fsx/jade_choghari/new-data \ + --executor local \ + --tasks-per-job 3 \ + --workers 10 diff --git a/examples/dataset/convert_hdf5_lerobot.py b/examples/dataset/convert_hdf5_lerobot.py new file mode 100644 index 000000000..37b98abdd --- /dev/null +++ b/examples/dataset/convert_hdf5_lerobot.py @@ -0,0 +1,437 @@ +import argparse +import os +import re +import shutil +from pathlib import Path + +import pandas as pd +# import ray +# from datatrove.executor import LocalPipelineExecutor, RayPipelineExecutor +from datatrove.executor import LocalPipelineExecutor +from datatrove.pipeline.base import PipelineStep +from lerobot.datasets.aggregate import ( + aggregate_data, + aggregate_metadata, + aggregate_stats, + aggregate_videos, + validate_all_metadata, +) +from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata +from lerobot.datasets.utils import ( + DEFAULT_CHUNK_SIZE, + DEFAULT_DATA_FILE_SIZE_IN_MB, + DEFAULT_VIDEO_FILE_SIZE_IN_MB, + write_info, + write_stats, + write_tasks, +) +XVLA_SOFT_FOLD_FEATURES = { + "observation.images.cam_high": { + "dtype": "video", + "names": ["height", "width", "channels"], + "shape": (480, 640, 3), + "names": ["height", "width", "rgb"], + }, + "observation.images.cam_left_wrist": { + "dtype": "video", + "names": ["height", "width", "channels"], + "shape": (480, 640, 3), + "names": ["height", "width", "rgb"], + }, + "observation.images.cam_right_wrist": { + "dtype": "video", + "names": ["height", "width", "channels"], + "shape": (480, 640, 3), + "names": ["height", "width", "rgb"], + }, + + "observation.states.eef_euler": { + "dtype": "float32", + "shape": (14,), # 14 = 7 joints per arm × 2 arms OR 14-d state representation + "names": {"values": [f"eef_euler_{i}" for i in range(14)]}, + }, + + "observation.states.eef_quaternion": { + "dtype": "float32", + "shape": (16,), # 16 = 8 quaternion floats per arm × 2 arms + "names": {"values": [f"eef_quat_{i}" for i in range(16)]}, + }, + + "observation.states.eef_6d": { + "dtype": "float32", + "shape": (20,), # 20 = pos(3) + rot6d(6) + extra dims + "names": {"values": [f"eef6d_{i}" for i in range(20)]}, + }, + + "observation.states.eef_left_time": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["eef_left_time"]}, + }, + + "observation.states.eef_right_time": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["eef_right_time"]}, + }, + + "observation.states.qpos": { + "dtype": "float32", + "shape": (14,), # 7 per arm × 2 arms + "names": {"motors": [f"qpos_{i}" for i in range(14)]}, + }, + + "observation.states.qvel": { + "dtype": "float32", + "shape": (14,), + "names": {"motors": [f"qvel_{i}" for i in range(14)]}, + }, + + "observation.states.effort": { + "dtype": "float32", + "shape": (14,), + "names": {"motors": [f"effort_{i}" for i in range(14)]}, + }, + + "observation.states.qpos_left_time": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["qpos_left_time"]}, + }, + + "observation.states.qpos_right_time": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["qpos_right_time"]}, + }, + + "action": { + "dtype": "float32", + "shape": (14,), + "names": {"motors": [f"joint_action_{i}" for i in range(14)]}, + }, + + "time_stamp": { + "dtype": "float32", + "shape": (1,), + "names": {"values": ["global_timestamp"]}, + }, +} +import cv2 +import numpy as np + +def decode_image(encoded_array): + # HDF5 gives you an array of uint8 → convert to raw bytes + data = np.asarray(encoded_array, dtype=np.uint8) + img = cv2.imdecode(data, cv2.IMREAD_COLOR) # returns HWC BGR + img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # convert to RGB + return img + +from pathlib import Path + +import numpy as np +from h5py import File + + +def load_local_episodes(input_h5: Path): + """ + Load one XVLA Soft-Fold episode from a single .hdf5 file. + This dataset stores ONE episode per file, NOT a /data/ group. + """ + + import h5py + import numpy as np + + with h5py.File(input_h5, "r") as f: + + # Determine episode length from any observation vector + episode_len = f["observations/eef_6d"].shape[0] + + episode = [] + + for i in range(episode_len): + frame = { + # ---------------------- + # ROOT-LEVEL + # ---------------------- + "task": "fold the cloth", + "time_stamp": np.array([f["time_stamp"][i]], dtype=np.float32), + + # ---------------------- + # OBSERVATIONS + # ---------------------- + "observation": { + "images": { + "cam_high": f["observations/images/cam_high"][i], + "cam_left_wrist": f["observations/images/cam_left_wrist"][i], + "cam_right_wrist": f["observations/images/cam_right_wrist"][i], + }, + "states": { + "eef_euler": f["observations/eef"][i], + "eef_quaternion": f["observations/eef_quaternion"][i], + "eef_6d": f["observations/eef_6d"][i], + + "eef_left_time": np.array([f["observations/eef_left_time"][i]], dtype=np.float32), + "eef_right_time": np.array([f["observations/eef_right_time"][i]], dtype=np.float32), + + "qpos": f["observations/qpos"][i], + "qvel": f["observations/qvel"][i], + "effort": f["observations/effort"][i], + + "qpos_left_time": np.array([f["observations/qpos_left_time"][i]], dtype=np.float32), + "qpos_right_time": np.array([f["observations/qpos_right_time"][i]], dtype=np.float32), + }, + }, + + # ---------------------- + # ACTION (your joint 14-D) + # ---------------------- + "action": f["action"][i].astype(np.float32), + } + + episode.append(frame) + + yield episode + +# from ray.runtime_env import RuntimeEnv +from tqdm import tqdm + + +def setup_logger(): + import sys + + from datatrove.utils.logging import logger + + logger.remove() + logger.add(sys.stdout, level="INFO", colorize=True) + return logger + + +class SaveLerobotDataset(PipelineStep): + name = "Save Temp LerobotDataset" + type = "libero2lerobot" + + def __init__(self, tasks: list[tuple[Path, Path, str]]): + super().__init__() + self.tasks = tasks + + def run(self, data=None, rank: int = 0, world_size: int = 1): + logger = setup_logger() + + input_h5, output_path, task_instruction = self.tasks[rank] + + if output_path.exists(): + shutil.rmtree(output_path) + + dataset = LeRobotDataset.create( + repo_id=f"{input_h5.parent.name}/{input_h5.name}", + root=output_path, + fps=20, + robot_type="franka", + features=XVLA_SOFT_FOLD_FEATURES, + ) + + logger.info(f"start processing for {input_h5}, saving to {output_path}") + + raw_dataset = load_local_episodes(input_h5) + for episode_index, episode_data in enumerate(raw_dataset): + with self.track_time("saving episode"): + + for raw_frame in episode_data: + frame_data = { + "task": task_instruction, + + # ---------------------- IMAGES ---------------------- + "observation.images.cam_high": decode_image(raw_frame["observation"]["images"]["cam_high"]), + "observation.images.cam_left_wrist": decode_image(raw_frame["observation"]["images"]["cam_left_wrist"]), + "observation.images.cam_right_wrist": decode_image(raw_frame["observation"]["images"]["cam_right_wrist"]), + + # ---------------------- EEF STATES ---------------------- + "observation.states.eef_euler": raw_frame["observation"]["states"]["eef_euler"], + "observation.states.eef_quaternion": raw_frame["observation"]["states"]["eef_quaternion"], + "observation.states.eef_6d": raw_frame["observation"]["states"]["eef_6d"], + + "observation.states.eef_left_time": raw_frame["observation"]["states"]["eef_left_time"], + "observation.states.eef_right_time": raw_frame["observation"]["states"]["eef_right_time"], + + # ---------------------- JOINT STATES ---------------------- + "observation.states.qpos": raw_frame["observation"]["states"]["qpos"], + "observation.states.qvel": raw_frame["observation"]["states"]["qvel"], + "observation.states.effort": raw_frame["observation"]["states"]["effort"], + + "observation.states.qpos_left_time": raw_frame["observation"]["states"]["qpos_left_time"], + "observation.states.qpos_right_time": raw_frame["observation"]["states"]["qpos_right_time"], + + # ---------------------- ACTION ---------------------- + "action": raw_frame["action"], + + # ---------------------- TIME ---------------------- + "time_stamp": raw_frame["time_stamp"], + } + + dataset.add_frame(frame_data) + + dataset.save_episode() + logger.info(f"Processed {dataset.repo_id}, episode {episode_index}, len={len(episode_data)}") + + +def create_aggr_dataset(raw_dirs: list[Path], aggregated_dir: Path): + logger = setup_logger() + + all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in raw_dirs] + + fps, robot_type, features = validate_all_metadata(all_metadata) + + if aggregated_dir.exists(): + shutil.rmtree(aggregated_dir) + + aggr_meta = LeRobotDatasetMetadata.create( + repo_id=f"{aggregated_dir.parent.name}/{aggregated_dir.name}", + root=aggregated_dir, + fps=fps, + robot_type=robot_type, + features=features, + ) + + video_keys = [key for key in features if features[key]["dtype"] == "video"] + unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique() + aggr_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks) + + meta_idx = {"chunk": 0, "file": 0} + data_idx = {"chunk": 0, "file": 0} + videos_idx = {key: {"chunk": 0, "file": 0, "latest_duration": 0, "episode_duration": 0} for key in video_keys} + + aggr_meta.episodes = {} + + for src_meta in tqdm(all_metadata, desc="Copy data and videos"): + videos_idx = aggregate_videos( + src_meta, aggr_meta, videos_idx, DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE + ) + data_idx = aggregate_data(src_meta, aggr_meta, data_idx, DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_CHUNK_SIZE) + + meta_idx = aggregate_metadata(src_meta, aggr_meta, meta_idx, data_idx, videos_idx) + + aggr_meta.info["total_episodes"] += src_meta.total_episodes + aggr_meta.info["total_frames"] += src_meta.total_frames + + logger.info("write tasks") + write_tasks(aggr_meta.tasks, aggr_meta.root) + + logger.info("write info") + aggr_meta.info.update( + { + "total_tasks": len(aggr_meta.tasks), + "total_episodes": sum(m.total_episodes for m in all_metadata), + "total_frames": sum(m.total_frames for m in all_metadata), + "splits": {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"}, + } + ) + write_info(aggr_meta.info, aggr_meta.root) + + logger.info("write stats") + aggr_meta.stats = aggregate_stats([m.stats for m in all_metadata]) + write_stats(aggr_meta.stats, aggr_meta.root) + + +def delete_temp_data(temp_dirs: list[Path]): + logger = setup_logger() + logger.info("Delete temp data_dir") + for temp_dir in temp_dirs: + shutil.rmtree(temp_dir) + + +def main( + src_paths: list[Path], + output_path: Path, + executor: str, + cpus_per_task: int, + tasks_per_job: int, + workers: int, + resume_dir: Path = None, + debug: bool = False, + repo_id: str = None, + push_to_hub: bool = False, +): + tasks = [] + for src_path in src_paths: + for input_h5 in src_path.glob("*.hdf5"): + tasks.append( + ( + input_h5, + (output_path / (src_path.name + "_temp") / input_h5.stem).resolve(), + "fold the cloth", # fixed single task + ) + ) + if len(src_paths) > 1: + aggregate_output_path = output_path / ("_".join([src_path.name for src_path in src_paths]) + "_aggregated_lerobot") + else: + aggregate_output_path = output_path / f"{src_paths[0].name}_lerobot" + aggregate_output_path = aggregate_output_path.resolve() + + if debug: + executor = "local" + workers = 1 + tasks = tasks[:2] + push_to_hub = False + + match executor: + case "local": + workers = os.cpu_count() // cpus_per_task if workers == -1 else workers + executor = LocalPipelineExecutor + # case "ray": + # runtime_env = RuntimeEnv( + # env_vars={ + # "HDF5_USE_FILE_LOCKING": "FALSE", + # "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE", + # "SVT_LOG": "1", + # }, + # ) + # ray.init(runtime_env=runtime_env) + # executor = RayPipelineExecutor + case _: + raise ValueError(f"Executor {executor} not supported") + + executor_config = { + "tasks": len(tasks), + "workers": workers, + **({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job} if False else {}), + } + + executor(pipeline=[SaveLerobotDataset(tasks)], **executor_config, logging_dir=resume_dir).run() + create_aggr_dataset([task[1] for task in tasks], aggregate_output_path) + delete_temp_data([task[1] for task in tasks]) + + for task in tasks: + shutil.rmtree(task[1].parent, ignore_errors=True) + + if push_to_hub: + assert repo_id is not None + tags = ["LeRobot", "libero", "franka"] + tags.extend([src_path.name for src_path in src_paths]) + LeRobotDataset( + repo_id=repo_id, + root=aggregate_output_path, + ).push_to_hub( + tags=tags, + private=False, + push_videos=True, + license="apache-2.0", + upload_large_folder=False, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--src-paths", type=Path, nargs="+", required=True) + parser.add_argument("--output-path", type=Path, required=True) + parser.add_argument("--executor", type=str, choices=["local", "ray"], default="local") + parser.add_argument("--cpus-per-task", type=int, default=1) + parser.add_argument("--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray") + parser.add_argument("--workers", type=int, default=-1, help="number of concurrent jobs to run") + parser.add_argument("--resume-dir", type=Path, help="logs directory to resume") + parser.add_argument("--debug", action="store_true") + parser.add_argument("--repo-id", type=str, help="required when push-to-hub is True") + parser.add_argument("--push-to-hub", action="store_true", help="upload to hub") + args = parser.parse_args() + + main(**vars(args)) \ No newline at end of file diff --git a/examples/dataset/draft.txt b/examples/dataset/draft.txt new file mode 100644 index 000000000..ea592fa6d --- /dev/null +++ b/examples/dataset/draft.txt @@ -0,0 +1,59 @@ +LIBERO_FEATURES = { + "observation.images.image": { + "dtype": "video", + "shape": (256, 256, 3), + "names": ["height", "width", "rgb"], + }, + "observation.images.wrist_image": { + "dtype": "video", + "shape": (256, 256, 3), + "names": ["height", "width", "rgb"], + }, + "observation.state": { + "dtype": "float32", + "shape": (8,), + "names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper", "gripper"]}, + }, + "observation.states.ee_state": { + "dtype": "float32", + "shape": (6,), + "names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3"]}, + }, + "observation.states.joint_state": { + "dtype": "float32", + "shape": (7,), + "names": {"motors": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]}, + }, + "observation.states.gripper_state": { + "dtype": "float32", + "shape": (2,), + "names": {"motors": ["gripper", "gripper"]}, + }, + "action": { + "dtype": "float32", + "shape": (7,), + "names": {"motors": ["x", "y", "z", "axis_angle1", "axis_angle2", "axis_angle3", "gripper"]}, + }, +} + +Everything are float32 except for language_instructions and images , are +├── action # nx14 absolute bimanual joints, not used in our paper +├── base_action # nx2 chassis actions, not used in our paper +├── language_instruction # 🌟"fold the cloth" +├── observations +│ ├── eef # nx14 absolute eef pos using euler angles to represent the rotation, not used in our paper +│ │ eef_quaternion # nx16 absolute eef pos using quaternion to represent the rotation, not used in our paper +│ │ eef_6d # 🌟nx20 absolute eef pos using rotate6d to represent the rotation +│ │ eef_left_time # 🌟nx1 the time stamp for left arm eef pos, can be used for resample or interpolation +│ │ eef_right_time # 🌟nx1 the time stamp for right arm eef pos, can be used for resample or interpolation +│ ├── qpos # nx14 absolute bimanual joints, not used in our paper +│ ├── qpos_left_time # nx1 the time stamp for left arm joint pos, can be used for resample or interpolation, not used in our paper +│ ├── qpos_right_time # nx1 the time stamp for right arm joint pos, can be used for resample or interpolation, not used in our paper +│ ├── qvel # nx14 bimanual joint velocity, not used in our paper +│ ├── effort # nx14 bimanual joint effort, not used in our paper +│ ├── images +│ │ ├── cam_high # 🌟the encoded head cam view, should be decoded using cv2 +│ │ ├── cam_left_wrist # 🌟the encoded left wrist view, should be decoded using cv2 +│ │ ├── cam_right_wrist # 🌟the encoded right wrist view, should be decoded using cv2 +├── time_stamp # the time stamp for each sample, not used in our paper + diff --git a/examples/dataset/utils.py b/examples/dataset/utils.py new file mode 100644 index 000000000..27e08f000 --- /dev/null +++ b/examples/dataset/utils.py @@ -0,0 +1,36 @@ +from pathlib import Path + +import numpy as np +from h5py import File + + +def load_local_episodes(input_h5: Path): + with File(input_h5, "r") as f: + for demo in f["data"].values(): + demo_len = len(demo["obs/agentview_rgb"]) + # (-1: open, 1: close) -> (0: close, 1: open) + action = np.array(demo["actions"]) + action = np.concatenate( + [ + action[:, :6], + (1 - np.clip(action[:, -1], 0, 1))[:, None], + ], + axis=1, + ) + state = np.concatenate( + [ + np.array(demo["obs/ee_states"]), + np.array(demo["obs/gripper_states"]), + ], + axis=1, + ) + episode = { + "observation.images.image": np.array(demo["obs/agentview_rgb"]), + "observation.images.wrist_image": np.array(demo["obs/eye_in_hand_rgb"]), + "observation.state": np.array(state, dtype=np.float32), + "observation.states.ee_state": np.array(demo["obs/ee_states"], dtype=np.float32), + "observation.states.joint_state": np.array(demo["obs/joint_states"], dtype=np.float32), + "observation.states.gripper_state": np.array(demo["obs/gripper_states"], dtype=np.float32), + "action": np.array(action, dtype=np.float32), + } + yield [{**{k: v[i] for k, v in episode.items()}} for i in range(demo_len)] \ No newline at end of file