diff --git a/README.md b/README.md index 5a6b833..ecc13ee 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg ## 🚀 What's New +- **\[2025.06.27\]** We have supported Data Conversion from LIBERO to LeRobot! 🔥🔥🔥 - **\[2025.05.16\]** We have supported Data Conversion from LeRobot to RLDS! 🔥🔥🔥 - **\[2025.05.12\]** We have supported Data Conversion from RoboMIND to LeRobot! 🔥🔥🔥 - **\[2025.04.20\]** We add Dataset Version Converter for LeRobotv2.0 to LeRobotv2.1! 🔥🔥🔥 @@ -36,7 +37,7 @@ A curated collection of utilities for [LeRobot Projects](https://github.com/hugg - [x] [AgiBot-World to LeRobot](./agibot2lerobot/README.md) - [x] [RoboMIND to LeRobot](./robomind2lerobot/README.md) - [x] [LeRobot to RLDS](./lerobot2rlds/README.md) - - [ ] LIBERO to LeRobot + - [x] [LIBERO to LeRobot](./libero2lerobot/README.md) - **Training**: diff --git a/libero2lerobot/README.md b/libero2lerobot/README.md new file mode 100644 index 0000000..550b3b7 --- /dev/null +++ b/libero2lerobot/README.md @@ -0,0 +1,233 @@ +# LIBERO to LeRobot + +LIBERO consists of 4 task suites and 130 tasks for studying LLDM. Specifically, the tasks in 3 of the 4 task suites vary only in one type of knowledge, while the last task suite requires transfer of entangled knowledge. (Copied from [docs](https://lifelong-robot-learning.github.io/LIBERO/html/getting_started/overview.html)) + +## 🚀 What's New in This Script + +In this dataset, we have made several key improvements: + +- **OpenVLA-based LIBERO Regeneration**: Resolution enhancement, No-op action filtration, 180° RGB frame rotation, Failed trajectory filtering. +- **State Data Preservation**: Maintained native LIBERO state information (accessible via `states.ee_state`, `states.joint_state` and etc.). +- **Robust Conversion Pipeline**: Using DataTrove framework for High-speed dataset transformation and automatic failure recovery during conversion + +Dataset Structure of `meta/info.json`: + +```json +{ + "codebase_version": "v2.1", // lastest lerobot format + "robot_type": "franka", // specific robot type + "fps": 20, // control frequency + "features": { + "observation.images.image": { + "dtype": "video", + "shape": [ + 256, + 256, + 3 + ], + "names": [ + "height", + "width", + "rgb" + ], + "info": { + "video.height": 256, + "video.width": 256, + "video.codec": "av1", + "video.pix_fmt": "yuv420p", + "video.is_depth_map": false, + "video.fps": 20, + "video.channels": 3, + "has_audio": false + } + }, + // for more states key, see configs + "observation.state": { + "dtype": "float32", + "shape": [ + 8 + ], + "names": { + "motors": [ + "x", + "y", + "z", + "roll", + "pitch", + "yaw", + "gripper", + "gripper" + ] + } + }, + ... + "action": { + "dtype": "float32", + "shape": [ + 7 + ], + "names": { + "motors": [ + "x", + "y", + "z", + "roll", + "pitch", + "yaw", + "gripper" + ] + } + }, + ... + } +} +``` + +## Installation + +1. Install LeRobot: + Follow instructions in [official repo](https://github.com/huggingface/lerobot?tab=readme-ov-file#installation). + +2. Install others: + We use datatrove[ray] for parallel conversion, significantly speeding up data processing tasks by distributing the workload across multiple cores or nodes (if any). + ```bash + pip install h5py + pip install -U datatrove + pip install -U "datatrove[ray]" # if you want ray features + ``` + +## Get started + +> [!NOTE] +> This script supports converting from original hdf5 to lerobot. If you want to convert from rlds to lerobot, check [openx2lerobot](../openx2lerobot/README.md). + +### Download source code: + +```bash +git clone https://github.com/Tavish9/any4lerobot.git +``` + +### Regenerate LIBERO Trajectory: + +1. [Install LIBERO dependency](https://github.com/Lifelong-Robot-Learning/LIBERO?tab=readme-ov-file#installtion) +2. Replace `libero_90` with your target libero dataset. + +```bash +python libero_utils/regenerate_libero_dataset.py \ + --resolution 256 \ + --libero_task_suite libero_90 \ + --libero_raw_data_dir /path/to/libero/datasets/libero_90 \ + --libero_target_dir /path/to/libero/datasets/libero_90_no_noops +``` + +### Modify in `convert.sh`: + +1. If you have installed `datatrove[ray]`, we recommend using `ray` executor for faster conversion. +2. Increase `workers` and `tasks-per-job` if you have sufficient computing resources. +3. To merge many datasets into one, simply specify both paths like: `--src-paths /path/libero_10 /path/libero_90` +4. To resume from a previous conversion, provide the appropriate log directory using `--resume-from-save` and `--resume-from-aggregate` +5. If you want different image resolution, regenerate the trajectory, and change the [config](./libero_utils/config.py). (DO NOT use resize) + +```bash +python libero_h5.py \ + --src-paths /path/to/libero/ \ + --output-path /path/to/local \ + --executor local \ + --tasks-per-job 3 \ + --workers 10 +``` + +### Execute the script: + +#### For single node + +```bash +bash convert.sh +``` + +#### For multi nodes (Install ray first) + +**Direct Access to Nodes (2 nodes in example)** + +On Node 1: + +```bash +ray start --head --port=6379 +``` + +On Node 2: + +```bash +ray start --address='node_1_ip:6379' +``` + +On either Node, check the ray cluster status, and start the script + +```bash +ray status +bash convert.sh +``` + +**Slurm-managed System** + +```bash +#!/bin/bash +#SBATCH --job-name=ray-cluster +#SBATCH --ntasks=2 +#SBATCH --nodes=2 +#SBATCH --partition=partition + +# Getting the node names +nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") +nodes_array=($nodes) + +head_node=${nodes_array[0]} +head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address) + +# if we detect a space character in the head node IP, we'll +# convert it to an ipv4 address. This step is optional. +if [[ "$head_node_ip" == *" "* ]]; then +IFS=' ' read -ra ADDR <<<"$head_node_ip" +if [[ ${#ADDR[0]} -gt 16 ]]; then + head_node_ip=${ADDR[1]} +else + head_node_ip=${ADDR[0]} +fi +echo "IPV6 address detected. We split the IPV4 address as $head_node_ip" +fi + +port=6379 +ip_head=$head_node_ip:$port +export ip_head +echo "IP Head: $ip_head" + +echo "Starting HEAD at $head_node" +srun --nodes=1 --ntasks=1 -w "$head_node" \ + ray start --head \ + --node-ip-address="$head_node_ip" \ + --port=$port \ + --block & + +sleep 10 + +# number of nodes other than the head node +worker_num=$((SLURM_JOB_NUM_NODES - 1)) + +for ((i = 1; i <= worker_num; i++)); do + node_i=${nodes_array[$i]} + echo "Starting WORKER $i at $node_i" + srun --nodes=1 --ntasks=1 -w "$node_i" \ + ray start \ + --address "$ip_head" \ + --block & + sleep 5 +done + +sleep 10 + +bash convert.sh +``` + +**Other Community Supported Cluster Managers** + +See the [doc](https://docs.ray.io/en/latest/cluster/vms/user-guides/community/index.html) for more details. diff --git a/libero2lerobot/convert.sh b/libero2lerobot/convert.sh new file mode 100644 index 0000000..e775941 --- /dev/null +++ b/libero2lerobot/convert.sh @@ -0,0 +1,10 @@ +export SVT_LOG=1 +export HF_DATASETS_DISABLE_PROGRESS_BARS=TRUE +export HDF5_USE_FILE_LOCKING=FALSE + +python libero_h5.py \ + --src-paths /path/to/libero/ \ + --output-path /path/to/local \ + --executor local \ + --tasks-per-job 3 \ + --workers 10 diff --git a/libero2lerobot/libero_h5.py b/libero2lerobot/libero_h5.py new file mode 100644 index 0000000..bbff34b --- /dev/null +++ b/libero2lerobot/libero_h5.py @@ -0,0 +1,313 @@ +import argparse +import os +import re +import shutil +from pathlib import Path + +import pandas as pd +import ray +from datatrove.executor import LocalPipelineExecutor, RayPipelineExecutor +from datatrove.pipeline.base import PipelineStep +from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata +from lerobot.common.datasets.utils import ( + write_episode, + write_episode_stats, + write_info, + write_task, +) +from libero_utils.config import LIBERO_FEATURES +from libero_utils.lerobot_utils import validate_all_metadata +from libero_utils.libero_utils import load_local_episodes +from ray.runtime_env import RuntimeEnv +from tqdm import tqdm + + +def setup_logger(): + import sys + + from datatrove.utils.logging import logger + + logger.remove() + logger.add(sys.stdout, level="INFO", colorize=True) + return logger + + +class SaveLerobotDataset(PipelineStep): + def __init__(self, tasks: list[tuple[Path, Path, str]]): + self.tasks = tasks + + def run(self, data=None, rank: int = 0, world_size: int = 1): + logger = setup_logger() + + input_h5, output_path, task_instruction = self.tasks[rank] + + if output_path.exists(): + shutil.rmtree(output_path) + + dataset = LeRobotDataset.create( + repo_id=f"{input_h5.parent.name}/{input_h5.name}", + root=output_path, + fps=20, + robot_type="franka", + features=LIBERO_FEATURES, + ) + + logger.info(f"start processing for {input_h5}, saving to {output_path}") + + raw_dataset = load_local_episodes(input_h5) + for episode_index, episode_data in enumerate(raw_dataset): + for frame_data in episode_data: + dataset.add_frame( + frame_data, + task=task_instruction, + ) + dataset.save_episode() + logger.info(f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}") + + +class AggregateDatasets(PipelineStep): + def __init__( + self, + raw_dirs: list[Path], + aggregated_dir: Path, + ): + super().__init__() + self.raw_dirs = raw_dirs + self.aggregated_dir = aggregated_dir + + self.create_aggr_dataset() + + def create_aggr_dataset(self): + logger = setup_logger() + + all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in self.raw_dirs] + + fps, robot_type, features = validate_all_metadata(all_metadata) + + if self.aggregated_dir.exists(): + shutil.rmtree(self.aggregated_dir) + + aggr_meta = LeRobotDatasetMetadata.create( + repo_id=f"{self.aggregated_dir.parent.name}/{self.aggregated_dir.name}", + root=self.aggregated_dir, + fps=fps, + robot_type=robot_type, + features=features, + ) + + datasets_task_index_to_aggr_task_index = {} + aggr_task_index = 0 + for dataset_index, meta in enumerate(tqdm(all_metadata, desc="Aggregate tasks index")): + task_index_to_aggr_task_index = {} + + for task_index, task in meta.tasks.items(): + if task not in aggr_meta.task_to_task_index: + # add the task to aggr tasks mappings + aggr_meta.tasks[aggr_task_index] = task + aggr_meta.task_to_task_index[task] = aggr_task_index + aggr_task_index += 1 + + task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task] + + datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index + + datasets_ep_idx_to_aggr_ep_idx = {} + datasets_aggr_episode_index_shift = {} + datasets_aggr_index_shift = {} + aggr_episode_index_shift = 0 + for dataset_index, meta in enumerate(tqdm(all_metadata, desc="Aggregate episodes and global index")): + ep_idx_to_aggr_ep_idx = {} + + for episode_index in range(meta.total_episodes): + aggr_episode_index = episode_index + aggr_episode_index_shift + ep_idx_to_aggr_ep_idx[episode_index] = aggr_episode_index + + datasets_ep_idx_to_aggr_ep_idx[dataset_index] = ep_idx_to_aggr_ep_idx + datasets_aggr_episode_index_shift[dataset_index] = aggr_episode_index_shift + datasets_aggr_index_shift[dataset_index] = aggr_meta.total_frames + + # populate episodes + for episode_index, episode_dict in meta.episodes.items(): + aggr_episode_index = episode_index + aggr_episode_index_shift + episode_dict["episode_index"] = aggr_episode_index + aggr_meta.episodes[aggr_episode_index] = episode_dict + + # populate episodes_stats + for episode_index, episode_stats in meta.episodes_stats.items(): + aggr_episode_index = episode_index + aggr_episode_index_shift + aggr_meta.episodes_stats[aggr_episode_index] = episode_stats + + # populate info + aggr_meta.info["total_episodes"] += meta.total_episodes + aggr_meta.info["total_frames"] += meta.total_frames + aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes + + aggr_episode_index_shift += meta.total_episodes + + logger.info("Write meta data") + aggr_meta.info["total_tasks"] = len(aggr_meta.tasks) + aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_episode_index_shift - 1) + aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"} + + # create a new episodes jsonl with updated episode_index using write_episode + for episode_dict in tqdm(aggr_meta.episodes.values(), desc="Write episodes info"): + write_episode(episode_dict, aggr_meta.root) + + # create a new episode_stats jsonl with updated episode_index using write_episode_stats + for episode_index, episode_stats in tqdm(aggr_meta.episodes_stats.items(), desc="Write episodes stats info"): + write_episode_stats(episode_index, episode_stats, aggr_meta.root) + + # create a new task jsonl with updated episode_index using write_task + for task_index, task in tqdm(aggr_meta.tasks.items(), desc="Write tasks info"): + write_task(task_index, task, aggr_meta.root) + + write_info(aggr_meta.info, aggr_meta.root) + + self.datasets_task_index_to_aggr_task_index = datasets_task_index_to_aggr_task_index + self.datasets_ep_idx_to_aggr_ep_idx = datasets_ep_idx_to_aggr_ep_idx + self.datasets_aggr_episode_index_shift = datasets_aggr_episode_index_shift + self.datasets_aggr_index_shift = datasets_aggr_index_shift + + logger.info("Meta data done writing") + + def run(self, data=None, rank: int = 0, world_size: int = 1): + logger = setup_logger() + + dataset_index = rank + aggr_meta = LeRobotDatasetMetadata("", root=self.aggregated_dir) + meta = LeRobotDatasetMetadata("", root=self.raw_dirs[dataset_index]) + aggr_episode_index_shift = self.datasets_aggr_episode_index_shift[dataset_index] + aggr_index_shift = self.datasets_aggr_index_shift[dataset_index] + task_index_to_aggr_task_index = self.datasets_task_index_to_aggr_task_index[dataset_index] + + logger.info("Copy data") + for episode_index in range(meta.total_episodes): + aggr_episode_index = self.datasets_ep_idx_to_aggr_ep_idx[dataset_index][episode_index] + data_path = meta.root / meta.get_data_file_path(episode_index) + aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index) + aggr_data_path.parent.mkdir(parents=True, exist_ok=True) + + # update index, episode_index and task_index + df = pd.read_parquet(data_path) + df["index"] += aggr_index_shift + df["episode_index"] += aggr_episode_index_shift + df["task_index"] = df["task_index"].map(task_index_to_aggr_task_index) + df.to_parquet(aggr_data_path) + + logger.info("Copy videos") + for episode_index in range(meta.total_episodes): + aggr_episode_index = episode_index + aggr_episode_index_shift + for vid_key in meta.video_keys: + video_path = meta.root / meta.get_video_file_path(episode_index, vid_key) + aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key) + aggr_video_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(video_path, aggr_video_path) + + logger.info("Remove original data") + shutil.rmtree(meta.root) + + +def main( + src_paths: list[Path], + output_path: Path, + executor: str, + cpus_per_task: int, + tasks_per_job: int, + workers: int, + resume_from_save: Path, + resume_from_aggregate: Path, + debug: bool = False, + repo_id: str = None, + push_to_hub: bool = False, +): + tasks = [] + pattern = re.compile(r"_SCENE\d+_(.*?)_demo\.hdf5") + for src_path in src_paths: + for input_h5 in src_path.glob("*.hdf5"): + match = pattern.search(input_h5.name) + if match is None: + continue + tasks.append( + ( + input_h5, + (output_path / (src_path.name + "_temp") / input_h5.stem).resolve(), + match.group(1).replace("_", " "), + ) + ) + if len(src_paths) > 1: + aggregate_output_path = output_path / ("_".join([src_path.name for src_path in src_paths]) + "_aggregated_lerobot") + else: + aggregate_output_path = output_path / f"{src_paths[0].name}_lerobot" + if debug: + SaveLerobotDataset([tasks[0]]).run() + else: + save_config = { + "tasks": len(tasks), + "workers": workers, + "logging_dir": resume_from_save, + } + aggregate_config = { + "tasks": len(tasks), + "workers": workers, + "logging_dir": resume_from_aggregate, + } + + match executor: + case "local": + workers = os.cpu_count() // cpus_per_task if workers == -1 else workers + save_config["workers"] = workers + aggregate_config["workers"] = workers + executor = LocalPipelineExecutor + case "ray": + runtime_env = RuntimeEnv( + env_vars={ + "HDF5_USE_FILE_LOCKING": "FALSE", + "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE", + "SVT_LOG": "1", + }, + ) + ray.init(runtime_env=runtime_env) + save_config.update({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job}) + aggregate_config.update({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job}) + executor = RayPipelineExecutor + case _: + raise ValueError(f"Executor {executor} not supported") + + executor(pipeline=[SaveLerobotDataset(tasks)], **save_config).run() + executor(pipeline=[AggregateDatasets([task[1] for task in tasks], aggregate_output_path)], **aggregate_config).run() + + for task in tasks: + shutil.rmtree(task[1].parent, ignore_errors=True) + + if push_to_hub: + assert repo_id is not None + tags = ["LeRobot", "libero", "franka"] + tags.extend([src_path.name for src_path in src_paths]) + LeRobotDataset( + repo_id=repo_id, + root=aggregate_output_path, + ).push_to_hub( + tags=tags, + private=False, + push_videos=True, + license="apache-2.0", + upload_large_folder=False, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--src-paths", type=Path, nargs="+", required=True) + parser.add_argument("--output-path", type=Path, required=True) + parser.add_argument("--executor", type=str, choices=["local", "ray"], default="local") + parser.add_argument("--cpus-per-task", type=int, default=1) + parser.add_argument("--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray") + parser.add_argument("--workers", type=int, default=-1, help="number of concurrent jobs to run") + parser.add_argument("--resume-from-save", type=Path, help="logs directory to resume from save step") + parser.add_argument("--resume-from-aggregate", type=Path, help="logs directory to resume from aggregate step") + parser.add_argument("--debug", action="store_true") + parser.add_argument("--repo-id", type=str, help="required when push-to-hub is True") + parser.add_argument("--push-to-hub", action="store_true", help="upload to hub") + args = parser.parse_args() + + main(**vars(args)) diff --git a/libero2lerobot/libero_utils/config.py b/libero2lerobot/libero_utils/config.py new file mode 100644 index 0000000..35250fa --- /dev/null +++ b/libero2lerobot/libero_utils/config.py @@ -0,0 +1,37 @@ +LIBERO_FEATURES = { + "observation.images.image": { + "dtype": "video", + "shape": (256, 256, 3), + "names": ["height", "width", "rgb"], + }, + "observation.images.wrist_image": { + "dtype": "video", + "shape": (256, 256, 3), + "names": ["height", "width", "rgb"], + }, + "observation.state": { + "dtype": "float32", + "shape": (8,), + "names": {"motors": ["x", "y", "z", "roll", "pitch", "yaw", "gripper", "gripper"]}, + }, + "observation.states.ee_state": { + "dtype": "float32", + "shape": (6,), + "names": {"motors": ["x", "y", "z", "roll", "pitch", "yaw"]}, + }, + "observation.states.joint_state": { + "dtype": "float32", + "shape": (7,), + "names": {"motors": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]}, + }, + "observation.states.gripper_state": { + "dtype": "float32", + "shape": (2,), + "names": {"motors": ["gripper", "gripper"]}, + }, + "action": { + "dtype": "float32", + "shape": (7,), + "names": {"motors": ["x", "y", "z", "roll", "pitch", "yaw", "gripper"]}, + }, +} diff --git a/libero2lerobot/libero_utils/lerobot_utils.py b/libero2lerobot/libero_utils/lerobot_utils.py new file mode 100644 index 0000000..228ede0 --- /dev/null +++ b/libero2lerobot/libero_utils/lerobot_utils.py @@ -0,0 +1,23 @@ +import tqdm +from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata + + +def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]): + """ + implemented by @Cadene + """ + # validate same fps, robot_type, features + + fps = all_metadata[0].fps + robot_type = all_metadata[0].robot_type + features = all_metadata[0].features + + for meta in tqdm.tqdm(all_metadata, desc="Validate all meta data"): + if fps != meta.fps: + raise ValueError(f"Same fps is expected, but got fps={meta.fps} instead of {fps}.") + if robot_type != meta.robot_type: + raise ValueError(f"Same robot_type is expected, but got robot_type={meta.robot_type} instead of {robot_type}.") + if features != meta.features: + raise ValueError(f"Same features is expected, but got features={meta.features} instead of {features}.") + + return fps, robot_type, features diff --git a/libero2lerobot/libero_utils/libero_utils.py b/libero2lerobot/libero_utils/libero_utils.py new file mode 100644 index 0000000..8d689a2 --- /dev/null +++ b/libero2lerobot/libero_utils/libero_utils.py @@ -0,0 +1,36 @@ +from pathlib import Path + +import numpy as np +from h5py import File + + +def load_local_episodes(input_h5: Path): + with File(input_h5, "r") as f: + for demo in f["data"].values(): + demo_len = len(demo["obs/agentview_rgb"]) + # (-1: open, 1: close) -> (0: close, 1: open) + action = np.array(demo["actions"]) + action = np.concatenate( + [ + action[:, :6], + (1 - np.clip(action[:, -1], 0, 1))[:, None], + ], + axis=1, + ) + state = np.concatenate( + [ + np.array(demo["obs/ee_states"]), + np.array(demo["obs/gripper_states"]), + ], + axis=1, + ) + episode = { + "observation.images.image": np.array(demo["obs/agentview_rgb"]), + "observation.images.wrist_image": np.array(demo["obs/eye_in_hand_rgb"]), + "observation.state": np.array(state, dtype=np.float32), + "observation.states.ee_state": np.array(demo["obs/ee_states"], dtype=np.float32), + "observation.states.joint_state": np.array(demo["obs/joint_states"], dtype=np.float32), + "observation.states.gripper_state": np.array(demo["obs/gripper_states"], dtype=np.float32), + "action": np.array(action, dtype=np.float32), + } + yield [{**{k: v[i] for k, v in episode.items()}} for i in range(demo_len)] diff --git a/libero2lerobot/libero_utils/regenerate_libero_dataset.py b/libero2lerobot/libero_utils/regenerate_libero_dataset.py new file mode 100644 index 0000000..5143466 --- /dev/null +++ b/libero2lerobot/libero_utils/regenerate_libero_dataset.py @@ -0,0 +1,278 @@ +""" +Adapted from https://github.com/openvla/openvla/blob/main/experiments/robot/libero/regenerate_libero_dataset.py + +Regenerates a LIBERO dataset (HDF5 files) by replaying demonstrations in the environments. + +Notes: + - We save image observations at 256x256px resolution (instead of 128x128). + - We filter out transitions with "no-op" (zero) actions that do not change the robot's state. + - We filter out unsuccessful demonstrations. + - In the LIBERO HDF5 data -> RLDS data conversion (not shown here), we rotate the images by + 180 degrees because we observe that the environments return images that are upside down + on our platform. + +Usage: + python experiments/robot/libero/regenerate_libero_dataset.py \ + --libero_task_suite [ libero_spatial | libero_object | libero_goal | libero_10 ] \ + --libero_raw_data_dir \ + --libero_target_dir + + Example (LIBERO-Spatial): + python experiments/robot/libero/regenerate_libero_dataset.py \ + --libero_task_suite libero_spatial \ + --libero_raw_data_dir ./LIBERO/libero/datasets/libero_spatial \ + --libero_target_dir ./LIBERO/libero/datasets/libero_spatial_no_noops + +""" + +import argparse +import json +import os + +import h5py +import numpy as np +import robosuite.utils.transform_utils as T +import tqdm +from libero.libero import benchmark, get_libero_path +from libero.libero.envs import OffScreenRenderEnv + + +def get_libero_dummy_action(model_family: str): + """Get dummy/no-op action, used to roll out the simulation while the robot does nothing.""" + return [0, 0, 0, 0, 0, 0, -1] + + +def get_libero_env(task, model_family, resolution=256): + """Initializes and returns the LIBERO environment, along with the task description.""" + task_description = task.language + task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file) + env_args = {"bddl_file_name": task_bddl_file, "camera_heights": resolution, "camera_widths": resolution} + env = OffScreenRenderEnv(**env_args) + env.seed(0) # IMPORTANT: seed seems to affect object positions even when using fixed initial state + return env, task_description + + +def is_noop(action, prev_action=None, threshold=1e-4): + """ + Returns whether an action is a no-op action. + + A no-op action satisfies two criteria: + (1) All action dimensions, except for the last one (gripper action), are near zero. + (2) The gripper action is equal to the previous timestep's gripper action. + + Explanation of (2): + Naively filtering out actions with just criterion (1) is not good because you will + remove actions where the robot is staying still but opening/closing its gripper. + So you also need to consider the current state (by checking the previous timestep's + gripper action as a proxy) to determine whether the action really is a no-op. + """ + # Special case: Previous action is None if this is the first action in the episode + # Then we only care about criterion (1) + if prev_action is None: + return np.linalg.norm(action[:-1]) < threshold + + # Normal case: Check both criteria (1) and (2) + gripper_action = action[-1] + prev_gripper_action = prev_action[-1] + return np.linalg.norm(action[:-1]) < threshold and gripper_action == prev_gripper_action + + +def main(args): + print(f"Regenerating {args.libero_task_suite} dataset!") + + # Create target directory + if os.path.isdir(args.libero_target_dir): + user_input = input( + f"Target directory already exists at path: {args.libero_target_dir}\nEnter 'y' to overwrite the directory, or anything else to exit: " + ) + if user_input != "y": + exit() + os.makedirs(args.libero_target_dir, exist_ok=True) + + # Prepare JSON file to record success/false and initial states per episode + metainfo_json_dict = {} + metainfo_json_out_path = f"./experiments/robot/libero/{args.libero_task_suite}_metainfo.json" + with open(metainfo_json_out_path, "w") as f: + # Just test that we can write to this file (we overwrite it later) + json.dump(metainfo_json_dict, f) + + # Get task suite + benchmark_dict = benchmark.get_benchmark_dict() + task_suite = benchmark_dict[args.libero_task_suite]() + num_tasks_in_suite = task_suite.n_tasks + + # Setup + num_replays = 0 + num_success = 0 + num_noops = 0 + + for task_id in tqdm.tqdm(range(num_tasks_in_suite)): + # Get task in suite + task = task_suite.get_task(task_id) + env, task_description = get_libero_env(task, "llava", resolution=args.resolution) + + # Get dataset for task + orig_data_path = os.path.join(args.libero_raw_data_dir, f"{task.name}_demo.hdf5") + assert os.path.exists(orig_data_path), f"Cannot find raw data file {orig_data_path}." + orig_data_file = h5py.File(orig_data_path, "r") + orig_data = orig_data_file["data"] + + # Create new HDF5 file for regenerated demos + new_data_path = os.path.join(args.libero_target_dir, f"{task.name}_demo.hdf5") + new_data_file = h5py.File(new_data_path, "w") + grp = new_data_file.create_group("data") + + for i in range(len(orig_data.keys())): + # Get demo data + demo_data = orig_data[f"demo_{i}"] + orig_actions = demo_data["actions"][()] + orig_states = demo_data["states"][()] + + # Reset environment, set initial state, and wait a few steps for environment to settle + env.reset() + env.set_init_state(orig_states[0]) + for _ in range(10): + obs, reward, done, info = env.step(get_libero_dummy_action("llava")) + + # Set up new data lists + states = [] + actions = [] + ee_states = [] + gripper_states = [] + joint_states = [] + robot_states = [] + agentview_images = [] + eye_in_hand_images = [] + + # Replay original demo actions in environment and record observations + for _, action in enumerate(orig_actions): + # Skip transitions with no-op actions + prev_action = actions[-1] if len(actions) > 0 else None + if is_noop(action, prev_action): + print(f"\tSkipping no-op action: {action}") + num_noops += 1 + continue + + if states == []: + # In the first timestep, since we're using the original initial state to initialize the environment, + # copy the initial state (first state in episode) over from the original HDF5 to the new one + states.append(orig_states[0]) + robot_states.append(demo_data["robot_states"][0]) + else: + # For all other timesteps, get state from environment and record it + states.append(env.sim.get_state().flatten()) + robot_states.append( + np.concatenate([obs["robot0_gripper_qpos"], obs["robot0_eef_pos"], obs["robot0_eef_quat"]]) + ) + + # Record original action (from demo) + actions.append(action) + + # Record data returned by environment + if "robot0_gripper_qpos" in obs: + gripper_states.append(obs["robot0_gripper_qpos"]) + joint_states.append(obs["robot0_joint_pos"]) + ee_states.append( + np.hstack( + ( + obs["robot0_eef_pos"], + T.quat2axisangle(obs["robot0_eef_quat"]), + ) + ) + ) + agentview_images.append(np.ascontiguousarray(obs["agentview_image"][::-1, ::-1])) + eye_in_hand_images.append(np.ascontiguousarray(obs["robot0_eye_in_hand_image"][::-1, ::-1])) + + # Execute demo action in environment + obs, reward, done, info = env.step(action.tolist()) + + # At end of episode, save replayed trajectories to new HDF5 files (only keep successes) + if done: + dones = np.zeros(len(actions)).astype(np.uint8) + dones[-1] = 1 + rewards = np.zeros(len(actions)).astype(np.uint8) + rewards[-1] = 1 + assert len(actions) == len(agentview_images) + + ep_data_grp = grp.create_group(f"demo_{i}") + obs_grp = ep_data_grp.create_group("obs") + obs_grp.create_dataset("gripper_states", data=np.stack(gripper_states, axis=0)) + obs_grp.create_dataset("joint_states", data=np.stack(joint_states, axis=0)) + obs_grp.create_dataset("ee_states", data=np.stack(ee_states, axis=0)) + obs_grp.create_dataset("ee_pos", data=np.stack(ee_states, axis=0)[:, :3]) + obs_grp.create_dataset("ee_ori", data=np.stack(ee_states, axis=0)[:, 3:]) + obs_grp.create_dataset("agentview_rgb", data=np.stack(agentview_images, axis=0)) + obs_grp.create_dataset("eye_in_hand_rgb", data=np.stack(eye_in_hand_images, axis=0)) + ep_data_grp.create_dataset("actions", data=actions) + ep_data_grp.create_dataset("states", data=np.stack(states)) + ep_data_grp.create_dataset("robot_states", data=np.stack(robot_states, axis=0)) + ep_data_grp.create_dataset("rewards", data=rewards) + ep_data_grp.create_dataset("dones", data=dones) + + num_success += 1 + + num_replays += 1 + + # Record success/false and initial environment state in metainfo dict + task_key = task_description.replace(" ", "_") + episode_key = f"demo_{i}" + if task_key not in metainfo_json_dict: + metainfo_json_dict[task_key] = {} + if episode_key not in metainfo_json_dict[task_key]: + metainfo_json_dict[task_key][episode_key] = {} + metainfo_json_dict[task_key][episode_key]["success"] = bool(done) + metainfo_json_dict[task_key][episode_key]["initial_state"] = orig_states[0].tolist() + + # Write metainfo dict to JSON file + # (We repeatedly overwrite, rather than doing this once at the end, just in case the script crashes midway) + with open(metainfo_json_out_path, "w") as f: + json.dump(metainfo_json_dict, f, indent=2) + + # Count total number of successful replays so far + print( + f"Total # episodes replayed: {num_replays}, Total # successes: {num_success} ({num_success / num_replays * 100:.1f} %)" + ) + + # Report total number of no-op actions filtered out so far + print(f" Total # no-op actions filtered out: {num_noops}") + + # Close HDF5 files + orig_data_file.close() + if len(new_data_file["data"]) == 0: + new_data_file.close() + os.remove(new_data_path) + else: + new_data_file.close() + print(f"Saved regenerated demos for task '{task_description}' at: {new_data_path}") + + print(f"Dataset regeneration complete! Saved new dataset at: {args.libero_target_dir}") + print(f"Saved metainfo JSON at: {metainfo_json_out_path}") + + +if __name__ == "__main__": + # Parse command-line arguments + parser = argparse.ArgumentParser() + parser.add_argument("--resolution", type=int, default=256, help="Resolution of the images. Example: 256") + parser.add_argument( + "--libero_task_suite", + type=str, + choices=["libero_spatial", "libero_object", "libero_goal", "libero_10", "libero_90"], + help="LIBERO task suite. Example: libero_spatial", + required=True, + ) + parser.add_argument( + "--libero_raw_data_dir", + type=str, + help="Path to directory containing raw HDF5 dataset. Example: ./LIBERO/libero/datasets/libero_spatial", + required=True, + ) + parser.add_argument( + "--libero_target_dir", + type=str, + help="Path to regenerated dataset directory. Example: ./LIBERO/libero/datasets/libero_spatial_no_noops", + required=True, + ) + args = parser.parse_args() + + # Start data regeneration + main(args)