add support for libero2lerobot (#42)

* add libero2lerobot readme

* use datatrove for libero2lerobot

* update libero2lerobot readme

* update README.md

* Update libero2lerobot/README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update libero2lerobot/README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fix

* set upload_large_folder to false

* use vectorized operations for faster transform

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Qizhi Chen
2025-06-27 11:36:25 +08:00
committed by GitHub
parent e7ee6a0052
commit 4dc21b9b70
8 changed files with 932 additions and 1 deletions
+233
View File
@@ -0,0 +1,233 @@
# LIBERO to LeRobot
LIBERO consists of 4 task suites and 130 tasks for studying LLDM. Specifically, the tasks in 3 of the 4 task suites vary only in one type of knowledge, while the last task suite requires transfer of entangled knowledge. (Copied from [docs](https://lifelong-robot-learning.github.io/LIBERO/html/getting_started/overview.html))
## 🚀 What's New in This Script
In this dataset, we have made several key improvements:
- **OpenVLA-based LIBERO Regeneration**: Resolution enhancement, No-op action filtration, 180° RGB frame rotation, Failed trajectory filtering.
- **State Data Preservation**: Maintained native LIBERO state information (accessible via `states.ee_state`, `states.joint_state` and etc.).
- **Robust Conversion Pipeline**: Using DataTrove framework for High-speed dataset transformation and automatic failure recovery during conversion
Dataset Structure of `meta/info.json`:
```json
{
"codebase_version": "v2.1", // lastest lerobot format
"robot_type": "franka", // specific robot type
"fps": 20, // control frequency
"features": {
"observation.images.image": {
"dtype": "video",
"shape": [
256,
256,
3
],
"names": [
"height",
"width",
"rgb"
],
"info": {
"video.height": 256,
"video.width": 256,
"video.codec": "av1",
"video.pix_fmt": "yuv420p",
"video.is_depth_map": false,
"video.fps": 20,
"video.channels": 3,
"has_audio": false
}
},
// for more states key, see configs
"observation.state": {
"dtype": "float32",
"shape": [
8
],
"names": {
"motors": [
"x",
"y",
"z",
"roll",
"pitch",
"yaw",
"gripper",
"gripper"
]
}
},
...
"action": {
"dtype": "float32",
"shape": [
7
],
"names": {
"motors": [
"x",
"y",
"z",
"roll",
"pitch",
"yaw",
"gripper"
]
}
},
...
}
}
```
## Installation
1. Install LeRobot:
Follow instructions in [official repo](https://github.com/huggingface/lerobot?tab=readme-ov-file#installation).
2. Install others:
We use datatrove[ray] for parallel conversion, significantly speeding up data processing tasks by distributing the workload across multiple cores or nodes (if any).
```bash
pip install h5py
pip install -U datatrove
pip install -U "datatrove[ray]" # if you want ray features
```
## Get started
> [!NOTE]
> This script supports converting from original hdf5 to lerobot. If you want to convert from rlds to lerobot, check [openx2lerobot](../openx2lerobot/README.md).
### Download source code:
```bash
git clone https://github.com/Tavish9/any4lerobot.git
```
### Regenerate LIBERO Trajectory:
1. [Install LIBERO dependency](https://github.com/Lifelong-Robot-Learning/LIBERO?tab=readme-ov-file#installtion)
2. Replace `libero_90` with your target libero dataset.
```bash
python libero_utils/regenerate_libero_dataset.py \
--resolution 256 \
--libero_task_suite libero_90 \
--libero_raw_data_dir /path/to/libero/datasets/libero_90 \
--libero_target_dir /path/to/libero/datasets/libero_90_no_noops
```
### Modify in `convert.sh`:
1. If you have installed `datatrove[ray]`, we recommend using `ray` executor for faster conversion.
2. Increase `workers` and `tasks-per-job` if you have sufficient computing resources.
3. To merge many datasets into one, simply specify both paths like: `--src-paths /path/libero_10 /path/libero_90`
4. To resume from a previous conversion, provide the appropriate log directory using `--resume-from-save` and `--resume-from-aggregate`
5. If you want different image resolution, regenerate the trajectory, and change the [config](./libero_utils/config.py). (DO NOT use resize)
```bash
python libero_h5.py \
--src-paths /path/to/libero/ \
--output-path /path/to/local \
--executor local \
--tasks-per-job 3 \
--workers 10
```
### Execute the script:
#### For single node
```bash
bash convert.sh
```
#### For multi nodes (Install ray first)
**Direct Access to Nodes (2 nodes in example)**
On Node 1:
```bash
ray start --head --port=6379
```
On Node 2:
```bash
ray start --address='node_1_ip:6379'
```
On either Node, check the ray cluster status, and start the script
```bash
ray status
bash convert.sh
```
**Slurm-managed System**
```bash
#!/bin/bash
#SBATCH --job-name=ray-cluster
#SBATCH --ntasks=2
#SBATCH --nodes=2
#SBATCH --partition=partition
# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
# if we detect a space character in the head node IP, we'll
# convert it to an ipv4 address. This step is optional.
if [[ "$head_node_ip" == *" "* ]]; then
IFS=' ' read -ra ADDR <<<"$head_node_ip"
if [[ ${#ADDR[0]} -gt 16 ]]; then
head_node_ip=${ADDR[1]}
else
head_node_ip=${ADDR[0]}
fi
echo "IPV6 address detected. We split the IPV4 address as $head_node_ip"
fi
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"
echo "Starting HEAD at $head_node"
srun --nodes=1 --ntasks=1 -w "$head_node" \
ray start --head \
--node-ip-address="$head_node_ip" \
--port=$port \
--block &
sleep 10
# number of nodes other than the head node
worker_num=$((SLURM_JOB_NUM_NODES - 1))
for ((i = 1; i <= worker_num; i++)); do
node_i=${nodes_array[$i]}
echo "Starting WORKER $i at $node_i"
srun --nodes=1 --ntasks=1 -w "$node_i" \
ray start \
--address "$ip_head" \
--block &
sleep 5
done
sleep 10
bash convert.sh
```
**Other Community Supported Cluster Managers**
See the [doc](https://docs.ray.io/en/latest/cluster/vms/user-guides/community/index.html) for more details.
+10
View File
@@ -0,0 +1,10 @@
export SVT_LOG=1
export HF_DATASETS_DISABLE_PROGRESS_BARS=TRUE
export HDF5_USE_FILE_LOCKING=FALSE
python libero_h5.py \
--src-paths /path/to/libero/ \
--output-path /path/to/local \
--executor local \
--tasks-per-job 3 \
--workers 10
+313
View File
@@ -0,0 +1,313 @@
import argparse
import os
import re
import shutil
from pathlib import Path
import pandas as pd
import ray
from datatrove.executor import LocalPipelineExecutor, RayPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.common.datasets.utils import (
write_episode,
write_episode_stats,
write_info,
write_task,
)
from libero_utils.config import LIBERO_FEATURES
from libero_utils.lerobot_utils import validate_all_metadata
from libero_utils.libero_utils import load_local_episodes
from ray.runtime_env import RuntimeEnv
from tqdm import tqdm
def setup_logger():
import sys
from datatrove.utils.logging import logger
logger.remove()
logger.add(sys.stdout, level="INFO", colorize=True)
return logger
class SaveLerobotDataset(PipelineStep):
def __init__(self, tasks: list[tuple[Path, Path, str]]):
self.tasks = tasks
def run(self, data=None, rank: int = 0, world_size: int = 1):
logger = setup_logger()
input_h5, output_path, task_instruction = self.tasks[rank]
if output_path.exists():
shutil.rmtree(output_path)
dataset = LeRobotDataset.create(
repo_id=f"{input_h5.parent.name}/{input_h5.name}",
root=output_path,
fps=20,
robot_type="franka",
features=LIBERO_FEATURES,
)
logger.info(f"start processing for {input_h5}, saving to {output_path}")
raw_dataset = load_local_episodes(input_h5)
for episode_index, episode_data in enumerate(raw_dataset):
for frame_data in episode_data:
dataset.add_frame(
frame_data,
task=task_instruction,
)
dataset.save_episode()
logger.info(f"process done for {dataset.repo_id}, episode {episode_index}, len {len(episode_data)}")
class AggregateDatasets(PipelineStep):
def __init__(
self,
raw_dirs: list[Path],
aggregated_dir: Path,
):
super().__init__()
self.raw_dirs = raw_dirs
self.aggregated_dir = aggregated_dir
self.create_aggr_dataset()
def create_aggr_dataset(self):
logger = setup_logger()
all_metadata = [LeRobotDatasetMetadata("", root=raw_dir) for raw_dir in self.raw_dirs]
fps, robot_type, features = validate_all_metadata(all_metadata)
if self.aggregated_dir.exists():
shutil.rmtree(self.aggregated_dir)
aggr_meta = LeRobotDatasetMetadata.create(
repo_id=f"{self.aggregated_dir.parent.name}/{self.aggregated_dir.name}",
root=self.aggregated_dir,
fps=fps,
robot_type=robot_type,
features=features,
)
datasets_task_index_to_aggr_task_index = {}
aggr_task_index = 0
for dataset_index, meta in enumerate(tqdm(all_metadata, desc="Aggregate tasks index")):
task_index_to_aggr_task_index = {}
for task_index, task in meta.tasks.items():
if task not in aggr_meta.task_to_task_index:
# add the task to aggr tasks mappings
aggr_meta.tasks[aggr_task_index] = task
aggr_meta.task_to_task_index[task] = aggr_task_index
aggr_task_index += 1
task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task]
datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index
datasets_ep_idx_to_aggr_ep_idx = {}
datasets_aggr_episode_index_shift = {}
datasets_aggr_index_shift = {}
aggr_episode_index_shift = 0
for dataset_index, meta in enumerate(tqdm(all_metadata, desc="Aggregate episodes and global index")):
ep_idx_to_aggr_ep_idx = {}
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
ep_idx_to_aggr_ep_idx[episode_index] = aggr_episode_index
datasets_ep_idx_to_aggr_ep_idx[dataset_index] = ep_idx_to_aggr_ep_idx
datasets_aggr_episode_index_shift[dataset_index] = aggr_episode_index_shift
datasets_aggr_index_shift[dataset_index] = aggr_meta.total_frames
# populate episodes
for episode_index, episode_dict in meta.episodes.items():
aggr_episode_index = episode_index + aggr_episode_index_shift
episode_dict["episode_index"] = aggr_episode_index
aggr_meta.episodes[aggr_episode_index] = episode_dict
# populate episodes_stats
for episode_index, episode_stats in meta.episodes_stats.items():
aggr_episode_index = episode_index + aggr_episode_index_shift
aggr_meta.episodes_stats[aggr_episode_index] = episode_stats
# populate info
aggr_meta.info["total_episodes"] += meta.total_episodes
aggr_meta.info["total_frames"] += meta.total_frames
aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes
aggr_episode_index_shift += meta.total_episodes
logger.info("Write meta data")
aggr_meta.info["total_tasks"] = len(aggr_meta.tasks)
aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_episode_index_shift - 1)
aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"}
# create a new episodes jsonl with updated episode_index using write_episode
for episode_dict in tqdm(aggr_meta.episodes.values(), desc="Write episodes info"):
write_episode(episode_dict, aggr_meta.root)
# create a new episode_stats jsonl with updated episode_index using write_episode_stats
for episode_index, episode_stats in tqdm(aggr_meta.episodes_stats.items(), desc="Write episodes stats info"):
write_episode_stats(episode_index, episode_stats, aggr_meta.root)
# create a new task jsonl with updated episode_index using write_task
for task_index, task in tqdm(aggr_meta.tasks.items(), desc="Write tasks info"):
write_task(task_index, task, aggr_meta.root)
write_info(aggr_meta.info, aggr_meta.root)
self.datasets_task_index_to_aggr_task_index = datasets_task_index_to_aggr_task_index
self.datasets_ep_idx_to_aggr_ep_idx = datasets_ep_idx_to_aggr_ep_idx
self.datasets_aggr_episode_index_shift = datasets_aggr_episode_index_shift
self.datasets_aggr_index_shift = datasets_aggr_index_shift
logger.info("Meta data done writing")
def run(self, data=None, rank: int = 0, world_size: int = 1):
logger = setup_logger()
dataset_index = rank
aggr_meta = LeRobotDatasetMetadata("", root=self.aggregated_dir)
meta = LeRobotDatasetMetadata("", root=self.raw_dirs[dataset_index])
aggr_episode_index_shift = self.datasets_aggr_episode_index_shift[dataset_index]
aggr_index_shift = self.datasets_aggr_index_shift[dataset_index]
task_index_to_aggr_task_index = self.datasets_task_index_to_aggr_task_index[dataset_index]
logger.info("Copy data")
for episode_index in range(meta.total_episodes):
aggr_episode_index = self.datasets_ep_idx_to_aggr_ep_idx[dataset_index][episode_index]
data_path = meta.root / meta.get_data_file_path(episode_index)
aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index)
aggr_data_path.parent.mkdir(parents=True, exist_ok=True)
# update index, episode_index and task_index
df = pd.read_parquet(data_path)
df["index"] += aggr_index_shift
df["episode_index"] += aggr_episode_index_shift
df["task_index"] = df["task_index"].map(task_index_to_aggr_task_index)
df.to_parquet(aggr_data_path)
logger.info("Copy videos")
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
for vid_key in meta.video_keys:
video_path = meta.root / meta.get_video_file_path(episode_index, vid_key)
aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key)
aggr_video_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(video_path, aggr_video_path)
logger.info("Remove original data")
shutil.rmtree(meta.root)
def main(
src_paths: list[Path],
output_path: Path,
executor: str,
cpus_per_task: int,
tasks_per_job: int,
workers: int,
resume_from_save: Path,
resume_from_aggregate: Path,
debug: bool = False,
repo_id: str = None,
push_to_hub: bool = False,
):
tasks = []
pattern = re.compile(r"_SCENE\d+_(.*?)_demo\.hdf5")
for src_path in src_paths:
for input_h5 in src_path.glob("*.hdf5"):
match = pattern.search(input_h5.name)
if match is None:
continue
tasks.append(
(
input_h5,
(output_path / (src_path.name + "_temp") / input_h5.stem).resolve(),
match.group(1).replace("_", " "),
)
)
if len(src_paths) > 1:
aggregate_output_path = output_path / ("_".join([src_path.name for src_path in src_paths]) + "_aggregated_lerobot")
else:
aggregate_output_path = output_path / f"{src_paths[0].name}_lerobot"
if debug:
SaveLerobotDataset([tasks[0]]).run()
else:
save_config = {
"tasks": len(tasks),
"workers": workers,
"logging_dir": resume_from_save,
}
aggregate_config = {
"tasks": len(tasks),
"workers": workers,
"logging_dir": resume_from_aggregate,
}
match executor:
case "local":
workers = os.cpu_count() // cpus_per_task if workers == -1 else workers
save_config["workers"] = workers
aggregate_config["workers"] = workers
executor = LocalPipelineExecutor
case "ray":
runtime_env = RuntimeEnv(
env_vars={
"HDF5_USE_FILE_LOCKING": "FALSE",
"HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE",
"SVT_LOG": "1",
},
)
ray.init(runtime_env=runtime_env)
save_config.update({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job})
aggregate_config.update({"cpus_per_task": cpus_per_task, "tasks_per_job": tasks_per_job})
executor = RayPipelineExecutor
case _:
raise ValueError(f"Executor {executor} not supported")
executor(pipeline=[SaveLerobotDataset(tasks)], **save_config).run()
executor(pipeline=[AggregateDatasets([task[1] for task in tasks], aggregate_output_path)], **aggregate_config).run()
for task in tasks:
shutil.rmtree(task[1].parent, ignore_errors=True)
if push_to_hub:
assert repo_id is not None
tags = ["LeRobot", "libero", "franka"]
tags.extend([src_path.name for src_path in src_paths])
LeRobotDataset(
repo_id=repo_id,
root=aggregate_output_path,
).push_to_hub(
tags=tags,
private=False,
push_videos=True,
license="apache-2.0",
upload_large_folder=False,
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--src-paths", type=Path, nargs="+", required=True)
parser.add_argument("--output-path", type=Path, required=True)
parser.add_argument("--executor", type=str, choices=["local", "ray"], default="local")
parser.add_argument("--cpus-per-task", type=int, default=1)
parser.add_argument("--tasks-per-job", type=int, default=1, help="number of concurrent tasks per job, only used for ray")
parser.add_argument("--workers", type=int, default=-1, help="number of concurrent jobs to run")
parser.add_argument("--resume-from-save", type=Path, help="logs directory to resume from save step")
parser.add_argument("--resume-from-aggregate", type=Path, help="logs directory to resume from aggregate step")
parser.add_argument("--debug", action="store_true")
parser.add_argument("--repo-id", type=str, help="required when push-to-hub is True")
parser.add_argument("--push-to-hub", action="store_true", help="upload to hub")
args = parser.parse_args()
main(**vars(args))
+37
View File
@@ -0,0 +1,37 @@
LIBERO_FEATURES = {
"observation.images.image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.images.wrist_image": {
"dtype": "video",
"shape": (256, 256, 3),
"names": ["height", "width", "rgb"],
},
"observation.state": {
"dtype": "float32",
"shape": (8,),
"names": {"motors": ["x", "y", "z", "roll", "pitch", "yaw", "gripper", "gripper"]},
},
"observation.states.ee_state": {
"dtype": "float32",
"shape": (6,),
"names": {"motors": ["x", "y", "z", "roll", "pitch", "yaw"]},
},
"observation.states.joint_state": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]},
},
"observation.states.gripper_state": {
"dtype": "float32",
"shape": (2,),
"names": {"motors": ["gripper", "gripper"]},
},
"action": {
"dtype": "float32",
"shape": (7,),
"names": {"motors": ["x", "y", "z", "roll", "pitch", "yaw", "gripper"]},
},
}
@@ -0,0 +1,23 @@
import tqdm
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]):
"""
implemented by @Cadene
"""
# validate same fps, robot_type, features
fps = all_metadata[0].fps
robot_type = all_metadata[0].robot_type
features = all_metadata[0].features
for meta in tqdm.tqdm(all_metadata, desc="Validate all meta data"):
if fps != meta.fps:
raise ValueError(f"Same fps is expected, but got fps={meta.fps} instead of {fps}.")
if robot_type != meta.robot_type:
raise ValueError(f"Same robot_type is expected, but got robot_type={meta.robot_type} instead of {robot_type}.")
if features != meta.features:
raise ValueError(f"Same features is expected, but got features={meta.features} instead of {features}.")
return fps, robot_type, features
@@ -0,0 +1,36 @@
from pathlib import Path
import numpy as np
from h5py import File
def load_local_episodes(input_h5: Path):
with File(input_h5, "r") as f:
for demo in f["data"].values():
demo_len = len(demo["obs/agentview_rgb"])
# (-1: open, 1: close) -> (0: close, 1: open)
action = np.array(demo["actions"])
action = np.concatenate(
[
action[:, :6],
(1 - np.clip(action[:, -1], 0, 1))[:, None],
],
axis=1,
)
state = np.concatenate(
[
np.array(demo["obs/ee_states"]),
np.array(demo["obs/gripper_states"]),
],
axis=1,
)
episode = {
"observation.images.image": np.array(demo["obs/agentview_rgb"]),
"observation.images.wrist_image": np.array(demo["obs/eye_in_hand_rgb"]),
"observation.state": np.array(state, dtype=np.float32),
"observation.states.ee_state": np.array(demo["obs/ee_states"], dtype=np.float32),
"observation.states.joint_state": np.array(demo["obs/joint_states"], dtype=np.float32),
"observation.states.gripper_state": np.array(demo["obs/gripper_states"], dtype=np.float32),
"action": np.array(action, dtype=np.float32),
}
yield [{**{k: v[i] for k, v in episode.items()}} for i in range(demo_len)]
@@ -0,0 +1,278 @@
"""
Adapted from https://github.com/openvla/openvla/blob/main/experiments/robot/libero/regenerate_libero_dataset.py
Regenerates a LIBERO dataset (HDF5 files) by replaying demonstrations in the environments.
Notes:
- We save image observations at 256x256px resolution (instead of 128x128).
- We filter out transitions with "no-op" (zero) actions that do not change the robot's state.
- We filter out unsuccessful demonstrations.
- In the LIBERO HDF5 data -> RLDS data conversion (not shown here), we rotate the images by
180 degrees because we observe that the environments return images that are upside down
on our platform.
Usage:
python experiments/robot/libero/regenerate_libero_dataset.py \
--libero_task_suite [ libero_spatial | libero_object | libero_goal | libero_10 ] \
--libero_raw_data_dir <PATH TO RAW HDF5 DATASET DIR> \
--libero_target_dir <PATH TO TARGET DIR>
Example (LIBERO-Spatial):
python experiments/robot/libero/regenerate_libero_dataset.py \
--libero_task_suite libero_spatial \
--libero_raw_data_dir ./LIBERO/libero/datasets/libero_spatial \
--libero_target_dir ./LIBERO/libero/datasets/libero_spatial_no_noops
"""
import argparse
import json
import os
import h5py
import numpy as np
import robosuite.utils.transform_utils as T
import tqdm
from libero.libero import benchmark, get_libero_path
from libero.libero.envs import OffScreenRenderEnv
def get_libero_dummy_action(model_family: str):
"""Get dummy/no-op action, used to roll out the simulation while the robot does nothing."""
return [0, 0, 0, 0, 0, 0, -1]
def get_libero_env(task, model_family, resolution=256):
"""Initializes and returns the LIBERO environment, along with the task description."""
task_description = task.language
task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file)
env_args = {"bddl_file_name": task_bddl_file, "camera_heights": resolution, "camera_widths": resolution}
env = OffScreenRenderEnv(**env_args)
env.seed(0) # IMPORTANT: seed seems to affect object positions even when using fixed initial state
return env, task_description
def is_noop(action, prev_action=None, threshold=1e-4):
"""
Returns whether an action is a no-op action.
A no-op action satisfies two criteria:
(1) All action dimensions, except for the last one (gripper action), are near zero.
(2) The gripper action is equal to the previous timestep's gripper action.
Explanation of (2):
Naively filtering out actions with just criterion (1) is not good because you will
remove actions where the robot is staying still but opening/closing its gripper.
So you also need to consider the current state (by checking the previous timestep's
gripper action as a proxy) to determine whether the action really is a no-op.
"""
# Special case: Previous action is None if this is the first action in the episode
# Then we only care about criterion (1)
if prev_action is None:
return np.linalg.norm(action[:-1]) < threshold
# Normal case: Check both criteria (1) and (2)
gripper_action = action[-1]
prev_gripper_action = prev_action[-1]
return np.linalg.norm(action[:-1]) < threshold and gripper_action == prev_gripper_action
def main(args):
print(f"Regenerating {args.libero_task_suite} dataset!")
# Create target directory
if os.path.isdir(args.libero_target_dir):
user_input = input(
f"Target directory already exists at path: {args.libero_target_dir}\nEnter 'y' to overwrite the directory, or anything else to exit: "
)
if user_input != "y":
exit()
os.makedirs(args.libero_target_dir, exist_ok=True)
# Prepare JSON file to record success/false and initial states per episode
metainfo_json_dict = {}
metainfo_json_out_path = f"./experiments/robot/libero/{args.libero_task_suite}_metainfo.json"
with open(metainfo_json_out_path, "w") as f:
# Just test that we can write to this file (we overwrite it later)
json.dump(metainfo_json_dict, f)
# Get task suite
benchmark_dict = benchmark.get_benchmark_dict()
task_suite = benchmark_dict[args.libero_task_suite]()
num_tasks_in_suite = task_suite.n_tasks
# Setup
num_replays = 0
num_success = 0
num_noops = 0
for task_id in tqdm.tqdm(range(num_tasks_in_suite)):
# Get task in suite
task = task_suite.get_task(task_id)
env, task_description = get_libero_env(task, "llava", resolution=args.resolution)
# Get dataset for task
orig_data_path = os.path.join(args.libero_raw_data_dir, f"{task.name}_demo.hdf5")
assert os.path.exists(orig_data_path), f"Cannot find raw data file {orig_data_path}."
orig_data_file = h5py.File(orig_data_path, "r")
orig_data = orig_data_file["data"]
# Create new HDF5 file for regenerated demos
new_data_path = os.path.join(args.libero_target_dir, f"{task.name}_demo.hdf5")
new_data_file = h5py.File(new_data_path, "w")
grp = new_data_file.create_group("data")
for i in range(len(orig_data.keys())):
# Get demo data
demo_data = orig_data[f"demo_{i}"]
orig_actions = demo_data["actions"][()]
orig_states = demo_data["states"][()]
# Reset environment, set initial state, and wait a few steps for environment to settle
env.reset()
env.set_init_state(orig_states[0])
for _ in range(10):
obs, reward, done, info = env.step(get_libero_dummy_action("llava"))
# Set up new data lists
states = []
actions = []
ee_states = []
gripper_states = []
joint_states = []
robot_states = []
agentview_images = []
eye_in_hand_images = []
# Replay original demo actions in environment and record observations
for _, action in enumerate(orig_actions):
# Skip transitions with no-op actions
prev_action = actions[-1] if len(actions) > 0 else None
if is_noop(action, prev_action):
print(f"\tSkipping no-op action: {action}")
num_noops += 1
continue
if states == []:
# In the first timestep, since we're using the original initial state to initialize the environment,
# copy the initial state (first state in episode) over from the original HDF5 to the new one
states.append(orig_states[0])
robot_states.append(demo_data["robot_states"][0])
else:
# For all other timesteps, get state from environment and record it
states.append(env.sim.get_state().flatten())
robot_states.append(
np.concatenate([obs["robot0_gripper_qpos"], obs["robot0_eef_pos"], obs["robot0_eef_quat"]])
)
# Record original action (from demo)
actions.append(action)
# Record data returned by environment
if "robot0_gripper_qpos" in obs:
gripper_states.append(obs["robot0_gripper_qpos"])
joint_states.append(obs["robot0_joint_pos"])
ee_states.append(
np.hstack(
(
obs["robot0_eef_pos"],
T.quat2axisangle(obs["robot0_eef_quat"]),
)
)
)
agentview_images.append(np.ascontiguousarray(obs["agentview_image"][::-1, ::-1]))
eye_in_hand_images.append(np.ascontiguousarray(obs["robot0_eye_in_hand_image"][::-1, ::-1]))
# Execute demo action in environment
obs, reward, done, info = env.step(action.tolist())
# At end of episode, save replayed trajectories to new HDF5 files (only keep successes)
if done:
dones = np.zeros(len(actions)).astype(np.uint8)
dones[-1] = 1
rewards = np.zeros(len(actions)).astype(np.uint8)
rewards[-1] = 1
assert len(actions) == len(agentview_images)
ep_data_grp = grp.create_group(f"demo_{i}")
obs_grp = ep_data_grp.create_group("obs")
obs_grp.create_dataset("gripper_states", data=np.stack(gripper_states, axis=0))
obs_grp.create_dataset("joint_states", data=np.stack(joint_states, axis=0))
obs_grp.create_dataset("ee_states", data=np.stack(ee_states, axis=0))
obs_grp.create_dataset("ee_pos", data=np.stack(ee_states, axis=0)[:, :3])
obs_grp.create_dataset("ee_ori", data=np.stack(ee_states, axis=0)[:, 3:])
obs_grp.create_dataset("agentview_rgb", data=np.stack(agentview_images, axis=0))
obs_grp.create_dataset("eye_in_hand_rgb", data=np.stack(eye_in_hand_images, axis=0))
ep_data_grp.create_dataset("actions", data=actions)
ep_data_grp.create_dataset("states", data=np.stack(states))
ep_data_grp.create_dataset("robot_states", data=np.stack(robot_states, axis=0))
ep_data_grp.create_dataset("rewards", data=rewards)
ep_data_grp.create_dataset("dones", data=dones)
num_success += 1
num_replays += 1
# Record success/false and initial environment state in metainfo dict
task_key = task_description.replace(" ", "_")
episode_key = f"demo_{i}"
if task_key not in metainfo_json_dict:
metainfo_json_dict[task_key] = {}
if episode_key not in metainfo_json_dict[task_key]:
metainfo_json_dict[task_key][episode_key] = {}
metainfo_json_dict[task_key][episode_key]["success"] = bool(done)
metainfo_json_dict[task_key][episode_key]["initial_state"] = orig_states[0].tolist()
# Write metainfo dict to JSON file
# (We repeatedly overwrite, rather than doing this once at the end, just in case the script crashes midway)
with open(metainfo_json_out_path, "w") as f:
json.dump(metainfo_json_dict, f, indent=2)
# Count total number of successful replays so far
print(
f"Total # episodes replayed: {num_replays}, Total # successes: {num_success} ({num_success / num_replays * 100:.1f} %)"
)
# Report total number of no-op actions filtered out so far
print(f" Total # no-op actions filtered out: {num_noops}")
# Close HDF5 files
orig_data_file.close()
if len(new_data_file["data"]) == 0:
new_data_file.close()
os.remove(new_data_path)
else:
new_data_file.close()
print(f"Saved regenerated demos for task '{task_description}' at: {new_data_path}")
print(f"Dataset regeneration complete! Saved new dataset at: {args.libero_target_dir}")
print(f"Saved metainfo JSON at: {metainfo_json_out_path}")
if __name__ == "__main__":
# Parse command-line arguments
parser = argparse.ArgumentParser()
parser.add_argument("--resolution", type=int, default=256, help="Resolution of the images. Example: 256")
parser.add_argument(
"--libero_task_suite",
type=str,
choices=["libero_spatial", "libero_object", "libero_goal", "libero_10", "libero_90"],
help="LIBERO task suite. Example: libero_spatial",
required=True,
)
parser.add_argument(
"--libero_raw_data_dir",
type=str,
help="Path to directory containing raw HDF5 dataset. Example: ./LIBERO/libero/datasets/libero_spatial",
required=True,
)
parser.add_argument(
"--libero_target_dir",
type=str,
help="Path to regenerated dataset directory. Example: ./LIBERO/libero/datasets/libero_spatial_no_noops",
required=True,
)
args = parser.parse_args()
# Start data regeneration
main(args)