mirror of
https://github.com/Tavish9/any4lerobot.git
synced 2026-05-15 22:19:41 +00:00
ad1381915c
* update agibot2lerobot * update libero2lerobot * update robomind2lerobot * fix robomind2lerobot
396 lines
16 KiB
Python
396 lines
16 KiB
Python
import argparse
|
|
import concurrent.futures
|
|
import inspect
|
|
import json
|
|
import logging
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import pyarrow as pa
|
|
import pyarrow.parquet as pq
|
|
import ray
|
|
from lerobot.datasets import LeRobotDataset, LeRobotDatasetMetadata
|
|
from lerobot.datasets.compute_stats import aggregate_stats
|
|
from lerobot.datasets.dataset_writer import DatasetWriter, _encode_video_worker
|
|
from lerobot.datasets.feature_utils import validate_episode_buffer
|
|
from lerobot.datasets.io_utils import write_info, write_stats
|
|
from lerobot.datasets.utils import DEFAULT_EPISODES_PATH, flatten_dict
|
|
from ray.runtime_env import RuntimeEnv
|
|
from robomind_uitls.configs import ROBOMIND_CONFIG
|
|
from robomind_uitls.lerobot_uitls import compute_episode_stats, generate_features_from_config
|
|
from robomind_uitls.robomind_uitls import load_local_dataset
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
|
|
|
|
class RoboMINDDatasetMetadata(LeRobotDatasetMetadata):
|
|
def _flush_metadata_buffer(self) -> None:
|
|
"""Write all buffered episode metadata to parquet file."""
|
|
if not hasattr(self, "_metadata_buffer") or len(self._metadata_buffer) == 0:
|
|
return
|
|
|
|
combined_dict = {}
|
|
for episode_dict in self._metadata_buffer:
|
|
for key, value in episode_dict.items():
|
|
if key not in combined_dict:
|
|
combined_dict[key] = []
|
|
# Extract value and serialize numpy arrays
|
|
# because PyArrow's from_pydict function doesn't support numpy arrays
|
|
val = value[0] if isinstance(value, list) else value
|
|
combined_dict[key].append(val.tolist() if isinstance(val, np.ndarray) else val)
|
|
|
|
first_ep = self._metadata_buffer[0]
|
|
chunk_idx = first_ep["meta/episodes/chunk_index"][0]
|
|
file_idx = first_ep["meta/episodes/file_index"][0]
|
|
|
|
schema = None if not self._pq_writer else self._pq_writer.schema
|
|
table = pa.Table.from_pydict(combined_dict, schema=schema)
|
|
|
|
if not self._pq_writer:
|
|
path = Path(self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx))
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._pq_writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
|
|
|
|
self._pq_writer.write_table(table)
|
|
|
|
self.latest_episode = self._metadata_buffer[-1]
|
|
self._metadata_buffer.clear()
|
|
|
|
def save_episode(
|
|
self,
|
|
split,
|
|
episode_index: int,
|
|
episode_length: int,
|
|
episode_tasks: list[str],
|
|
episode_stats: dict[str, dict],
|
|
episode_metadata: dict,
|
|
) -> None:
|
|
episode_dict = {
|
|
"episode_index": episode_index,
|
|
"tasks": episode_tasks,
|
|
"length": episode_length,
|
|
}
|
|
episode_dict.update(episode_metadata)
|
|
episode_dict.update(flatten_dict({"stats": episode_stats}))
|
|
self._save_episode_metadata(episode_dict)
|
|
|
|
# Update info
|
|
self.info["total_episodes"] += 1
|
|
self.info["total_frames"] += episode_length
|
|
self.info["total_tasks"] = len(self.tasks)
|
|
if split == "train":
|
|
self.info["splits"]["train"] = f"0:{self.info['total_episodes']}"
|
|
self.train_count = self.info["total_episodes"]
|
|
elif "val" in split:
|
|
self.info["splits"]["validation"] = f"{self.train_count}:{self.info['total_episodes']}"
|
|
|
|
write_info(self.info, self.root)
|
|
|
|
self.stats = aggregate_stats([self.stats, episode_stats]) if self.stats is not None else episode_stats
|
|
write_stats(self.stats, self.root)
|
|
|
|
|
|
class RoboMINDDatasetWriter(DatasetWriter):
|
|
def save_episode(
|
|
self,
|
|
split,
|
|
action_config: dict,
|
|
episode_data: dict | None = None,
|
|
parallel_encoding: bool = True,
|
|
) -> None:
|
|
"""Save the current episode in self.episode_buffer to disk."""
|
|
episode_buffer = episode_data if episode_data is not None else self.episode_buffer
|
|
|
|
validate_episode_buffer(episode_buffer, self._meta.total_episodes, self._meta.features)
|
|
|
|
# size and task are special cases that won't be added to hf_dataset
|
|
episode_length = episode_buffer.pop("size")
|
|
tasks = episode_buffer.pop("task")
|
|
episode_tasks = list(set(tasks))
|
|
episode_index = episode_buffer["episode_index"]
|
|
|
|
episode_buffer["index"] = np.arange(self._meta.total_frames, self._meta.total_frames + episode_length)
|
|
episode_buffer["episode_index"] = np.full((episode_length,), episode_index)
|
|
|
|
# Update tasks and task indices with new tasks if any
|
|
self._meta.save_episode_tasks(episode_tasks)
|
|
|
|
# Given tasks in natural language, find their corresponding task indices
|
|
episode_buffer["task_index"] = np.array([self._meta.get_task_index(task) for task in tasks])
|
|
|
|
for key, ft in self._meta.features.items():
|
|
if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["video"]:
|
|
continue
|
|
episode_buffer[key] = np.stack(episode_buffer[key]).squeeze()
|
|
|
|
# Wait for image writer to end, so that episode stats over images can be computed
|
|
self._wait_image_writer()
|
|
has_video_keys = len(self._meta.video_keys) > 0
|
|
use_streaming = self._streaming_encoder is not None and has_video_keys
|
|
use_batched_encoding = self._batch_encoding_size > 1
|
|
|
|
if use_streaming:
|
|
non_video_buffer = {
|
|
k: v for k, v in episode_buffer.items() if self._meta.features.get(k, {}).get("dtype") not in ("video",)
|
|
}
|
|
non_video_features = {k: v for k, v in self._meta.features.items() if v["dtype"] != "video"}
|
|
ep_stats = compute_episode_stats(non_video_buffer, non_video_features)
|
|
else:
|
|
ep_stats = compute_episode_stats(episode_buffer, self._meta.features)
|
|
|
|
ep_metadata = self._save_episode_data(episode_buffer)
|
|
|
|
if use_streaming:
|
|
streaming_results = self._streaming_encoder.finish_episode()
|
|
for video_key in self._meta.video_keys:
|
|
temp_path, video_stats = streaming_results[video_key]
|
|
if video_stats is not None:
|
|
ep_stats[video_key] = {
|
|
k: v if k == "count" else np.squeeze(v.reshape(1, -1, 1, 1) / 255.0, axis=0)
|
|
for k, v in video_stats.items()
|
|
}
|
|
ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path))
|
|
elif has_video_keys and not use_batched_encoding:
|
|
num_cameras = len(self._meta.video_keys)
|
|
if parallel_encoding and num_cameras > 1:
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=num_cameras) as executor:
|
|
future_to_key = {
|
|
executor.submit(
|
|
_encode_video_worker,
|
|
video_key,
|
|
episode_index,
|
|
self._root,
|
|
self._meta.fps,
|
|
self._vcodec,
|
|
self._encoder_threads,
|
|
): video_key
|
|
for video_key in self._meta.video_keys
|
|
}
|
|
|
|
results = {}
|
|
for future in concurrent.futures.as_completed(future_to_key):
|
|
video_key = future_to_key[future]
|
|
try:
|
|
temp_path = future.result()
|
|
results[video_key] = temp_path
|
|
except Exception as exc:
|
|
logging.error(f"Video encoding failed for {video_key}: {exc}")
|
|
raise exc
|
|
|
|
for video_key in self._meta.video_keys:
|
|
temp_path = results[video_key]
|
|
ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path))
|
|
else:
|
|
for video_key in self._meta.video_keys:
|
|
ep_metadata.update(self._save_episode_video(video_key, episode_index))
|
|
|
|
# `meta.save_episode` be executed after encoding the videos
|
|
ep_metadata.update({"action_config": action_config})
|
|
self._meta.save_episode(split, episode_index, episode_length, episode_tasks, ep_stats, ep_metadata)
|
|
|
|
if has_video_keys and use_batched_encoding:
|
|
self._episodes_since_last_encoding += 1
|
|
if self._episodes_since_last_encoding == self._batch_encoding_size:
|
|
start_ep = self._meta.total_episodes - self._batch_encoding_size
|
|
end_ep = self._meta.total_episodes
|
|
self._batch_save_episode_video(start_ep, end_ep)
|
|
self._episodes_since_last_encoding = 0
|
|
|
|
if not episode_data:
|
|
self.clear_episode_buffer(delete_images=len(self._meta.image_keys) > 0)
|
|
|
|
|
|
class RoboMINDDataset(LeRobotDataset):
|
|
@classmethod
|
|
def create(cls, *args, **kwargs) -> "RoboMINDDataset":
|
|
sig = inspect.signature(super().create)
|
|
bound = sig.bind_partial(*args, **kwargs)
|
|
bound.apply_defaults()
|
|
params = bound.arguments
|
|
|
|
obj = super().create(*args, **kwargs)
|
|
|
|
shutil.rmtree(params["root"], ignore_errors=True)
|
|
obj.meta: RoboMINDDatasetMetadata = RoboMINDDatasetMetadata.create(
|
|
repo_id=params["repo_id"],
|
|
fps=params["fps"],
|
|
robot_type=params["robot_type"],
|
|
features=params["features"],
|
|
root=params["root"],
|
|
use_videos=params["use_videos"],
|
|
metadata_buffer_size=params["metadata_buffer_size"],
|
|
)
|
|
obj.writer: RoboMINDDatasetWriter = RoboMINDDatasetWriter(
|
|
meta=obj.meta,
|
|
root=obj.root,
|
|
vcodec=obj._vcodec,
|
|
encoder_threads=obj._encoder_threads,
|
|
batch_encoding_size=obj._batch_encoding_size,
|
|
)
|
|
return obj
|
|
|
|
def save_episode(
|
|
self, split, action_config: dict, episode_data: dict | None = None, parallel_encoding: bool = True
|
|
) -> None:
|
|
self._require_writer("save_episode")
|
|
self.writer.save_episode(split, action_config, episode_data, parallel_encoding)
|
|
|
|
|
|
def get_all_tasks(src_path: Path, output_path: Path, embodiment: str):
|
|
output_path = output_path / src_path.name / embodiment
|
|
src_path = src_path / f"h5_{embodiment}"
|
|
|
|
if src_path.exists():
|
|
df = pd.read_csv(src_path.parent.parent / "RoboMIND_v1_2_instr.csv", index_col=0).drop_duplicates()
|
|
instruction_dict = df.set_index("task")["instruction"].to_dict()
|
|
for task_type in src_path.iterdir():
|
|
yield (
|
|
task_type.name,
|
|
{"train": task_type / "success_episodes" / "train", "val": task_type / "success_episodes" / "val"},
|
|
(output_path / task_type.name).resolve(),
|
|
instruction_dict[task_type.name],
|
|
)
|
|
|
|
|
|
def save_as_lerobot_dataset(task: tuple[dict, Path, str], src_path, benchmark, embodiment, save_depth):
|
|
task_type, splits, local_dir, task_instruction = task
|
|
|
|
config = ROBOMIND_CONFIG[embodiment]
|
|
features = generate_features_from_config(config)
|
|
|
|
# [HACK]: franka and ur image is bgr...
|
|
bgr2rgb = False
|
|
if embodiment in ["franka_1rgb", "franka_3rgb", "franka_fr3_dual", "ur_1rgb"]:
|
|
bgr2rgb = True
|
|
|
|
if local_dir.exists():
|
|
shutil.rmtree(local_dir)
|
|
|
|
if not save_depth:
|
|
features = dict(filter(lambda item: "depth" not in item[0], features.items()))
|
|
|
|
dataset: RoboMINDDataset = RoboMINDDataset.create(
|
|
repo_id=f"{embodiment}/{local_dir.name}",
|
|
root=local_dir,
|
|
fps=30,
|
|
robot_type=embodiment,
|
|
features=features,
|
|
)
|
|
|
|
logging.info(f"start processing for {benchmark}, {embodiment}, {task_type}, saving to {local_dir}")
|
|
for split, path in splits.items():
|
|
action_config_path = src_path / "language_description_annotation_json" / f"h5_{embodiment}.json"
|
|
if action_config_path.exists():
|
|
action_config = json.load(open(action_config_path))
|
|
action_config = {
|
|
Path(config["id"]).parent.name: config["response"]
|
|
for config in action_config
|
|
if local_dir.name in config["id"] and split in config["id"]
|
|
}
|
|
else:
|
|
action_config = {}
|
|
for episode_path in path.glob("**/trajectory.hdf5"):
|
|
status, raw_dataset, err = load_local_dataset(episode_path, config, save_depth, bgr2rgb)
|
|
if status and len(raw_dataset) >= 50:
|
|
try:
|
|
for frame_data in raw_dataset:
|
|
frame_data["task"] = task_instruction
|
|
dataset.add_frame(frame_data)
|
|
dataset.save_episode(
|
|
split, action_config.get(episode_path.parent.parent.name, {"task_summary": None, "steps": None})
|
|
)
|
|
logging.info(f"process done for {path}, len {len(raw_dataset)}")
|
|
except Exception:
|
|
# [HACK]: not consistent image shape...
|
|
if config["images"]["camera_top"]["shape"] == (720, 1280, 3):
|
|
config["images"]["camera_top"]["shape"] = (480, 640, 3)
|
|
config["images"]["camera_top_depth"]["shape"] = (480, 640, 1)
|
|
else:
|
|
config["images"]["camera_top"]["shape"] = (720, 1280, 3)
|
|
config["images"]["camera_top_depth"]["shape"] = (720, 1280, 1)
|
|
save_as_lerobot_dataset(task, src_path, benchmark, embodiment, save_depth)
|
|
return
|
|
else:
|
|
logging.warning(f"Skipped {episode_path}: len of dataset:{len(raw_dataset)} or {str(err)}")
|
|
|
|
dataset.finalize()
|
|
|
|
if dataset.meta.total_episodes == 0:
|
|
shutil.rmtree(local_dir)
|
|
|
|
|
|
def main(
|
|
src_path: Path,
|
|
output_path: Path,
|
|
benchmark: str,
|
|
embodiments: list[str],
|
|
cpus_per_task: int,
|
|
save_depth: bool,
|
|
debug: bool = False,
|
|
):
|
|
if debug:
|
|
tasks = get_all_tasks(src_path / benchmark, output_path, embodiments[0])
|
|
save_as_lerobot_dataset(next(tasks), src_path, benchmark, embodiments[0], save_depth)
|
|
else:
|
|
runtime_env = RuntimeEnv(
|
|
env_vars={"HDF5_USE_FILE_LOCKING": "FALSE", "HF_DATASETS_DISABLE_PROGRESS_BARS": "TRUE"}
|
|
)
|
|
ray.init(runtime_env=runtime_env)
|
|
resources = ray.available_resources()
|
|
cpus = int(resources["CPU"])
|
|
|
|
logging.info(f"Available CPUs: {cpus}, num_cpus_per_task: {cpus_per_task}")
|
|
remote_task = ray.remote(save_as_lerobot_dataset).options(num_cpus=cpus_per_task)
|
|
|
|
futures = []
|
|
for embodiment in embodiments:
|
|
tasks = get_all_tasks(src_path / benchmark, output_path, embodiment)
|
|
for task in tasks:
|
|
futures.append((task[1], remote_task.remote(task, src_path, benchmark, embodiment, save_depth)))
|
|
|
|
for task_path, future in futures:
|
|
try:
|
|
ray.get(future)
|
|
except Exception as e:
|
|
logging.error(f"Exception occurred for {task_path['train']}")
|
|
with open("output.txt", "a") as f:
|
|
f.write(f"{task_path['train']}, exception details: {str(e)}\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--src-path", type=Path, required=True)
|
|
parser.add_argument(
|
|
"--benchmark",
|
|
type=str,
|
|
choices=["benchmark1_0_release", "benchmark1_1_release", "benchmark1_2_release"],
|
|
default="benchmark1_1_release",
|
|
)
|
|
parser.add_argument("--output-path", type=Path, required=True)
|
|
parser.add_argument(
|
|
"--embodiments",
|
|
type=str,
|
|
nargs="+",
|
|
help=str(
|
|
[
|
|
"agilex_3rgb",
|
|
"franka_1rgb",
|
|
"franka_3rgb",
|
|
"franka_fr3_dual",
|
|
"tienkung_gello_1rgb",
|
|
"tienkung_prod1_gello_1rgb",
|
|
"tienkung_xsens_1rgb",
|
|
"ur_1rgb",
|
|
]
|
|
),
|
|
default=["agilex_3rgb"],
|
|
)
|
|
parser.add_argument("--cpus-per-task", type=int, default=2)
|
|
parser.add_argument("--save-depth", action="store_true")
|
|
parser.add_argument("--debug", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
main(**vars(args))
|