Compare commits

..

11 Commits

Author SHA1 Message Date
Jade Choghari 03cce79c88 Merge branch 'main' into feat/behavior-1k 2025-12-04 18:50:56 +01:00
Michel Aractingi 3918ab7882 Merge branch 'main' into feat/behavior-1k 2025-11-03 13:28:31 +01:00
Michel Aractingi 65b0e73ae4 * refactor behaviour1k_lerobot_dataset.py
* add example scripts to load behaviour 1k data in `load_behaviour1k_dataset.py`
2025-11-03 12:23:12 +00:00
Jade Choghari ca7c5fcdfe remove tester 2025-10-30 18:14:09 +01:00
Jade Choghari 28f8098df4 fix style 2025-10-30 18:12:50 +01:00
Jade Choghari db7d501281 remove comments 2025-10-30 18:12:06 +01:00
Jade Choghari 88380fe34e update changes 2025-10-30 18:11:27 +01:00
Jade Choghari 154abfd233 update
Signed-off-by: Jade Choghari <chogharijade@gmail.com>
2025-10-27 17:52:21 +01:00
Jade Choghari dc14266762 add
Signed-off-by: Jade Choghari <chogharijade@gmail.com>
2025-10-27 16:44:58 +01:00
Michel Aractingi fd623e0cc5 Modify convert_to_lerobot_v3 script for behaviours dataset to take a single task id and create a dataset outof it 2025-10-24 17:06:21 +02:00
Michel Aractingi a52e88d349 add scripts for convert behavior-1k to datasetv3 2025-10-24 14:17:30 +02:00
59 changed files with 1647 additions and 14193 deletions
BIN
View File
Binary file not shown.

Before

Width:  |  Height:  |  Size: 51 KiB

-4
View File
@@ -92,10 +92,6 @@
- local: phone_teleop
title: Phone
title: "Teleoperators"
- sections:
- local: torch_accelerators
title: PyTorch accelerators
title: "Supported Hardware"
- sections:
- local: notebooks
title: Notebooks
-42
View File
@@ -1,42 +0,0 @@
# PyTorch accelerators
LeRobot supports multiple hardware acceleration options for both training and inference.
These options include:
- **CPU**: CPU executes all computations, no dedicated accelerator is used
- **CUDA**: acceleration with NVIDIA & AMD GPUs
- **MPS**: acceleration with Apple Silicon GPUs
- **XPU**: acceleration with Intel integrated and discrete GPUs
## Getting Started
To use particular accelerator, a suitable version of PyTorch should be installed.
For CPU, CUDA, and MPS backends follow instructions provided on [PyTorch installation page](https://pytorch.org/get-started/locally).
For XPU backend, follow instructions from [PyTorch documentation](https://docs.pytorch.org/docs/stable/notes/get_start_xpu.html).
### Verifying the installation
After installation, accelerator availability can be verified by running
```python
import torch
print(torch.<backend_name>.is_available()) # <backend_name> is cuda, mps, or xpu
```
## How to run training or evaluation
To select the desired accelerator, use the `--policy.device` flag when running `lerobot-train` or `lerobot-eval`. For example, to use MPS on Apple Silicon, run:
```bash
lerobot-train
--policy.device=mps ...
```
```bash
lerobot-eval \
--policy.device=mps ...
```
However, in most cases, presence of an accelerator is detected automatically and `policy.device` parameter can be omitted from CLI commands.
@@ -0,0 +1,464 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BehaviorLeRobotDatasetV3: A wrapper around LeRobotDataset v3.0 for loading BEHAVIOR-1K data.
This wrapper extends LeRobotDataset to support BEHAVIOR-1K specific features:
- Modality and camera selection (rgb, depth, seg_instance_id)
- Efficient chunk streaming mode with keyframe access
- Additional BEHAVIOR-1K metadata (cam_rel_poses, task_info, etc.)
"""
import logging
from collections.abc import Callable
from pathlib import Path
import datasets
import numpy as np
from behaviour_1k_constants import ROBOT_CAMERA_NAMES, ROBOT_TYPE
from torch.utils.data import Dataset, get_worker_info
from lerobot.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import (
check_delta_timestamps,
get_delta_indices,
get_safe_version,
hf_transform_to_torch,
)
from lerobot.datasets.video_utils import decode_video_frames, get_safe_default_codec
from lerobot.utils.constants import HF_LEROBOT_HOME
logger = logging.getLogger(__name__)
class BehaviorLeRobotDatasetMetadata(LeRobotDatasetMetadata):
"""
Extended metadata class for BEHAVIOR-1K datasets.
Adds support for:
- Modality and camera filtering
- Custom metainfo and annotation paths
"""
def __init__(
self,
repo_id: str,
root: str | Path | None = None,
revision: str | None = None,
force_cache_sync: bool = False,
metadata_buffer_size: int = 10,
modalities: set[str] | None = None,
cameras: set[str] | None = None,
):
self.modalities = set(modalities) if modalities else {"rgb", "depth", "seg_instance_id"}
self.camera_names = set(cameras) if cameras else {"head", "left_wrist", "right_wrist"}
assert self.modalities.issubset({"rgb", "depth", "seg_instance_id"}), (
f"Modalities must be subset of ['rgb', 'depth', 'seg_instance_id'], got {self.modalities}"
)
assert self.camera_names.issubset(set(ROBOT_CAMERA_NAMES[ROBOT_TYPE])), (
f"Camera names must be subset of {list(ROBOT_CAMERA_NAMES[ROBOT_TYPE])}, got {self.camera_names}"
)
super().__init__(repo_id, root, revision, force_cache_sync, metadata_buffer_size)
@property
def filtered_features(self) -> dict[str, dict]:
"""Return only features matching selected modalities and cameras."""
features = {}
for name, feature_info in self.features.items():
if not name.startswith("observation.images."):
features[name] = feature_info
continue
parts = name.split(".")
if len(parts) >= 4:
modality = parts[2]
camera = parts[3]
if modality in self.modalities and camera in self.camera_names:
features[name] = feature_info
return features
@property
def video_keys(self) -> list[str]:
"""Return only video keys for selected modalities and cameras."""
all_video_keys = super().video_keys
filtered_keys = []
for key in all_video_keys:
parts = key.split(".")
if len(parts) >= 4:
modality = parts[2]
camera = parts[3]
if modality in self.modalities and camera in self.camera_names:
filtered_keys.append(key)
return filtered_keys
def get_metainfo_path(self, ep_index: int) -> Path:
"""Get path to episode metainfo file."""
if "metainfo_path" in self.info:
fpath = self.info["metainfo_path"].format(episode_index=ep_index)
return Path(fpath)
return None
def get_annotation_path(self, ep_index: int) -> Path:
"""Get path to episode annotation file."""
if "annotation_path" in self.info:
fpath = self.info["annotation_path"].format(episode_index=ep_index)
return Path(fpath)
return None
class BehaviorLeRobotDatasetV3(LeRobotDataset):
"""
BEHAVIOR-1K wrapper for LeRobotDataset v3.0.
Each BEHAVIOR-1K dataset contains a single task (e.g., behavior1k-task0000).
See https://huggingface.co/collections/lerobot/behavior-1k for all available tasks.
Key features:
- Modality and camera selection
- Efficient chunk streaming with keyframe access (recommended for B1K with GOP=250)
- Support for BEHAVIOR-1K specific observations (cam_rel_poses, task_info, task_index)
"""
def __init__(
self,
repo_id: str,
root: str | Path | None = None,
episodes: list[int] | None = None,
image_transforms: Callable | None = None,
delta_timestamps: dict[list[float]] | None = None,
tolerance_s: float = 1e-4,
revision: str | None = None,
force_cache_sync: bool = False,
download_videos: bool = True,
video_backend: str | None = None,
batch_encoding_size: int = 1,
# BEHAVIOR-1K specific arguments
modalities: list[str] | None = None,
cameras: list[str] | None = None,
check_timestamp_sync: bool = True,
chunk_streaming_using_keyframe: bool = True,
shuffle: bool = True,
seed: int = 42,
):
"""
Initialize BEHAVIOR-1K dataset.
Args:
repo_id: HuggingFace repository ID (e.g., "lerobot/behavior1k-task0000")
root: Local directory for dataset storage
episodes: List of episode indices to load (for train/val split)
image_transforms: Torchvision v2 transforms for images
delta_timestamps: Temporal offsets for history/future frames
tolerance_s: Tolerance for timestamp synchronization
revision: Git revision/branch to load
force_cache_sync: Force re-download from hub
download_videos: Whether to download video files
video_backend: Video decoder ('pyav' or 'torchcodec')
batch_encoding_size: Batch size for video encoding
modalities: List of modalities to load (None = all: rgb, depth, seg_instance_id)
cameras: List of cameras to load (None = all: head, left_wrist, right_wrist)
check_timestamp_sync: Verify timestamp synchronization (can be slow)
chunk_streaming_using_keyframe: Use keyframe-based streaming (STRONGLY RECOMMENDED for B1K)
shuffle: Shuffle chunks in streaming mode
seed: Random seed for shuffling
"""
Dataset.__init__(self)
self.repo_id = repo_id
if root:
self.root = Path(root)
else:
dataset_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id
self.root = HF_LEROBOT_HOME / dataset_name
self.image_transforms = image_transforms
self.delta_timestamps = delta_timestamps
self.tolerance_s = tolerance_s
self.revision = revision if revision else CODEBASE_VERSION
self.video_backend = video_backend if video_backend else get_safe_default_codec()
self.delta_indices = None
self.batch_encoding_size = batch_encoding_size
self.episodes_since_last_encoding = 0
self.seed = seed
self.image_writer = None
self.episode_buffer = None
self.writer = None
self.latest_episode = None
self._current_file_start_frame = None
self.root.mkdir(exist_ok=True, parents=True)
if modalities is None:
modalities = ["rgb", "depth", "seg_instance_id"]
if "seg_instance_id" in modalities:
assert chunk_streaming_using_keyframe, (
"For performance, seg_instance_id requires chunk_streaming_using_keyframe=True"
)
if "depth" in modalities:
assert self.video_backend == "pyav", "Depth videos require video_backend='pyav'"
if cameras is None:
cameras = ["head", "left_wrist", "right_wrist"]
self.meta = BehaviorLeRobotDatasetMetadata(
repo_id=self.repo_id,
root=self.root,
revision=self.revision,
force_cache_sync=force_cache_sync,
modalities=modalities,
cameras=cameras,
)
if episodes is not None:
self.episodes = sorted([i for i in episodes if i < len(self.meta.episodes)])
else:
self.episodes = list(range(len(self.meta.episodes)))
logger.info(f"Total episodes: {len(self.episodes)}")
self._chunk_streaming_using_keyframe = chunk_streaming_using_keyframe
if self._chunk_streaming_using_keyframe:
if not shuffle:
logger.warning("Chunk streaming enabled but shuffle=False. This may reduce randomness.")
self.chunks = self._get_keyframe_chunk_indices()
self.current_streaming_chunk_idx = None if shuffle else 0
self.current_streaming_frame_idx = None if shuffle else self.chunks[0][0] if self.chunks else 0
self.obs_loaders = {}
self._should_obs_loaders_reload = True
self._lazy_loading = False
self._recorded_frames = self.meta.total_frames
self._writer_closed_for_reading = False
try:
if force_cache_sync:
raise FileNotFoundError
self.hf_dataset = self.load_hf_dataset()
except (AssertionError, FileNotFoundError, NotADirectoryError):
self.revision = get_safe_version(self.repo_id, self.revision)
self.download_episodes(download_videos)
self.hf_dataset = self.load_hf_dataset()
if self.delta_timestamps is not None:
check_delta_timestamps(self.delta_timestamps, self.meta.fps, self.tolerance_s)
self.delta_indices = get_delta_indices(self.delta_timestamps, self.meta.fps)
@property
def fps(self) -> int:
"""Frames per second."""
return self.meta.fps
@property
def features(self) -> dict:
"""Dataset features (filtered by modalities/cameras)."""
return self.meta.filtered_features
@property
def num_episodes(self) -> int:
"""Number of episodes."""
return len(self.episodes)
@property
def num_frames(self) -> int:
"""Total number of frames."""
return len(self.hf_dataset)
def get_episodes_file_paths(self) -> list[str]:
"""
Get download patterns for requested episodes.
Returns glob patterns for download rather than specific file paths.
Note: Unlike the base LeRobotDataset, this method cannot filter downloads to only
requested episodes because:
1. BEHAVIOR-1K episode indices are encoded (e.g., 10010 for task 1, episode 10)
2. Episodes are chunked across multiple parquet/video files
3. The parquet files are organized by chunk, not by episode
Therefore, we download full data/meta/video directories and rely on
`self.load_hf_dataset()` to filter to requested episodes from the loaded data.
"""
allow_patterns = ["data/**", "meta/**"]
# Filter by modalities and cameras for video patterns
if len(self.meta.video_keys) > 0:
if len(self.meta.modalities) != 3 or len(self.meta.camera_names) != 3:
# Only download specific modality/camera combinations
for modality in self.meta.modalities:
for camera in self.meta.camera_names:
allow_patterns.append(f"**/observation.images.{modality}.{camera}/**")
else:
# Download all videos (no filtering needed)
allow_patterns.append("videos/**")
return allow_patterns
def download_episodes(self, download_videos: bool = True) -> None:
"""
Download episodes with modality/camera filtering.
Follows the same pattern as base LeRobotDataset.download() but uses
get_episodes_file_paths() which returns patterns for modality/camera filtering.
"""
ignore_patterns = None if download_videos else "videos/"
files = self.get_episodes_file_paths()
self.pull_from_repo(allow_patterns=files, ignore_patterns=ignore_patterns)
def pull_from_repo(
self,
allow_patterns: list[str] | str | None = None,
ignore_patterns: list[str] | str | None = None,
) -> None:
"""Pull dataset from HuggingFace Hub."""
from huggingface_hub import snapshot_download
logger.info(f"Pulling dataset {self.repo_id} from HuggingFace Hub...")
snapshot_download(
self.repo_id,
repo_type="dataset",
revision=self.revision,
local_dir=self.root,
allow_patterns=allow_patterns,
ignore_patterns=ignore_patterns,
)
def load_hf_dataset(self) -> datasets.Dataset:
"""Load dataset from parquet files."""
from datasets import load_dataset
path = str(self.root / "data")
hf_dataset = load_dataset("parquet", data_dir=path, split="train")
hf_dataset.set_transform(hf_transform_to_torch)
return hf_dataset
def _get_keyframe_chunk_indices(self, chunk_size: int = 250) -> list[tuple[int, int, int]]:
"""
Divide episodes into chunks based on GOP size (keyframe interval).
For BEHAVIOR-1K, GOP size is 250 frames for efficient storage.
Returns:
List of (start_index, end_index, local_start_index) tuples
"""
chunks = []
offset = 0
for ep_array_idx in self.episodes:
# self.episodes contains array indices, so access directly
ep = self.meta.episodes[ep_array_idx]
length = ep["length"]
local_starts = list(range(0, length, chunk_size))
local_ends = local_starts[1:] + [length]
for local_start, local_end in zip(local_starts, local_ends, strict=True):
chunks.append((offset + local_start, offset + local_end, local_start))
offset += length
return chunks
def __getitem__(self, idx: int) -> dict:
"""Get item by index, with optional chunk streaming."""
if not self._chunk_streaming_using_keyframe:
item = self.hf_dataset[idx]
for key in self.meta.video_keys:
if key in self.features:
ep_idx = item["episode_index"].item()
timestamp = item["timestamp"].item()
video_path = self.root / self.meta.get_video_file_path(ep_idx, key)
frames = decode_video_frames(
video_path, [timestamp], self.tolerance_s, self.video_backend
)
item[key] = frames.squeeze(0)
if self.image_transforms is not None:
for key in self.features:
if key.startswith("observation.images."):
item[key] = self.image_transforms(item[key])
if "task_index" in item:
task_idx = item["task_index"].item()
try:
item["task"] = self.meta.tasks.iloc[task_idx].name
except (IndexError, AttributeError):
item["task"] = f"task_{task_idx}"
return item
return self._get_item_streaming(idx)
def _get_item_streaming(self, idx: int) -> dict:
"""Get item in chunk streaming mode."""
if self.current_streaming_chunk_idx is None:
worker_info = get_worker_info()
worker_id = 0 if worker_info is None else worker_info.id
rng = np.random.default_rng(self.seed + worker_id)
rng.shuffle(self.chunks)
self.current_streaming_chunk_idx = rng.integers(0, len(self.chunks)).item()
self.current_streaming_frame_idx = self.chunks[self.current_streaming_chunk_idx][0]
if self.current_streaming_frame_idx >= self.chunks[self.current_streaming_chunk_idx][1]:
self.current_streaming_chunk_idx += 1
if self.current_streaming_chunk_idx >= len(self.chunks):
self.current_streaming_chunk_idx = 0
self.current_streaming_frame_idx = self.chunks[self.current_streaming_chunk_idx][0]
self._should_obs_loaders_reload = True
item = self.hf_dataset[self.current_streaming_frame_idx]
ep_idx = item["episode_index"].item()
if self._should_obs_loaders_reload:
for loader in self.obs_loaders.values():
if hasattr(loader, "close"):
loader.close()
self.obs_loaders = {}
self.current_streaming_episode_idx = ep_idx
self._should_obs_loaders_reload = False
for key in self.meta.video_keys:
if key in self.features:
timestamp = item["timestamp"].item()
video_path = self.root / self.meta.get_video_file_path(ep_idx, key)
frames = decode_video_frames(video_path, [timestamp], self.tolerance_s, self.video_backend)
item[key] = frames.squeeze(0)
if self.image_transforms is not None:
for key in self.features:
if key.startswith("observation.images."):
item[key] = self.image_transforms(item[key])
if "task_index" in item:
task_idx = item["task_index"].item()
try:
item["task"] = self.meta.tasks.iloc[task_idx].name
except (IndexError, AttributeError):
item["task"] = f"task_{task_idx}"
self.current_streaming_frame_idx += 1
return item
def __len__(self) -> int:
"""Total number of frames."""
return len(self.hf_dataset)
@@ -0,0 +1,350 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
import numpy as np
import torch as th
ROBOT_TYPE = "R1Pro"
FPS = 30
ROBOT_CAMERA_NAMES = {
"A1": {
"external": "external::external_camera",
"wrist": "external::wrist_camera",
},
"R1Pro": {
"left_wrist": "robot_r1::robot_r1:left_realsense_link:Camera:0",
"right_wrist": "robot_r1::robot_r1:right_realsense_link:Camera:0",
"head": "robot_r1::robot_r1:zed_link:Camera:0",
},
}
# Camera resolutions and corresponding intrinstics
HEAD_RESOLUTION = (720, 720)
WRIST_RESOLUTION = (480, 480)
# TODO: Fix A1
CAMERA_INTRINSICS = {
"A1": {
"external": np.array(
[[306.0, 0.0, 360.0], [0.0, 306.0, 360.0], [0.0, 0.0, 1.0]], dtype=np.float32
), # 240x240
"wrist": np.array(
[[388.6639, 0.0, 240.0], [0.0, 388.6639, 240.0], [0.0, 0.0, 1.0]], dtype=np.float32
), # 240x240
},
"R1Pro": {
"head": np.array(
[[306.0, 0.0, 360.0], [0.0, 306.0, 360.0], [0.0, 0.0, 1.0]], dtype=np.float32
), # 720x720
"left_wrist": np.array(
[[388.6639, 0.0, 240.0], [0.0, 388.6639, 240.0], [0.0, 0.0, 1.0]], dtype=np.float32
), # 480x480
"right_wrist": np.array(
[[388.6639, 0.0, 240.0], [0.0, 388.6639, 240.0], [0.0, 0.0, 1.0]], dtype=np.float32
), # 480x480
},
}
# Dataset features for BEHAVIOR-1K LeRobotDataset v3.0
BEHAVIOR_DATASET_FEATURES = {
# Actions
"action": {
"dtype": "float32",
"shape": (23,), # 23-dimensional action space for R1Pro
"names": None,
},
# Proprioception
"observation.state": {
"dtype": "float32",
"shape": (256,), # Full proprioception state
"names": None,
},
# Camera relative poses
"observation.cam_rel_poses": {
"dtype": "float32",
"shape": (21,), # 3 cameras * 7 (pos + quat)
"names": None,
},
# Task information
"observation.task_info": {
"dtype": "float32",
"shape": (None,), # Variable size
"names": None,
},
# RGB images
"observation.images.rgb.head": {
"dtype": "video",
"shape": [720, 720, 3],
"names": ["height", "width", "channels"],
},
"observation.images.rgb.left_wrist": {
"dtype": "video",
"shape": [480, 480, 3],
"names": ["height", "width", "channels"],
},
"observation.images.rgb.right_wrist": {
"dtype": "video",
"shape": [480, 480, 3],
"names": ["height", "width", "channels"],
},
# Depth images
"observation.images.depth.head": {
"dtype": "video",
"shape": [720, 720, 1],
"names": ["height", "width", "channels"],
},
"observation.images.depth.left_wrist": {
"dtype": "video",
"shape": [480, 480, 1],
"names": ["height", "width", "channels"],
},
"observation.images.depth.right_wrist": {
"dtype": "video",
"shape": [480, 480, 1],
"names": ["height", "width", "channels"],
},
# Segmentation instance ID images
"observation.images.seg_instance_id.head": {
"dtype": "video",
"shape": [720, 720, 1],
"names": ["height", "width", "channels"],
},
"observation.images.seg_instance_id.left_wrist": {
"dtype": "video",
"shape": [480, 480, 1],
"names": ["height", "width", "channels"],
},
"observation.images.seg_instance_id.right_wrist": {
"dtype": "video",
"shape": [480, 480, 1],
"names": ["height", "width", "channels"],
},
}
# Action indices
ACTION_QPOS_INDICES = {
"A1": OrderedDict(
{
"arm": np.s_[0:6],
"gripper": np.s_[6:7],
}
),
"R1Pro": OrderedDict(
{
"base": np.s_[0:3],
"torso": np.s_[3:7],
"left_arm": np.s_[7:14],
"left_gripper": np.s_[14:15],
"right_arm": np.s_[15:22],
"right_gripper": np.s_[22:23],
}
),
}
# Proprioception configuration
PROPRIOCEPTION_INDICES = {
"A1": OrderedDict(
{
"joint_qpos": np.s_[0:8],
"joint_qpos_sin": np.s_[8:16],
"joint_qpos_cos": np.s_[16:24],
"joint_qvel": np.s_[24:32],
"joint_qeffort": np.s_[32:40],
"eef_0_pos": np.s_[40:43],
"eef_0_quat": np.s_[43:47],
"grasp_0": np.s_[47:48],
"gripper_0_qpos": np.s_[48:50],
"gripper_0_qvel": np.s_[50:52],
}
),
"R1Pro": OrderedDict(
{
"joint_qpos": np.s_[
0:28
], # Full robot joint positions, the first 6 are base joints, which is NOT allowed in standard track
"joint_qpos_sin": np.s_[
28:56
], # Full robot joint positions, the first 6 are base joints, which is NOT allowed in standard track
"joint_qpos_cos": np.s_[
56:84
], # Full robot joint positions, the first 6 are base joints, which is NOT allowed in standard track
"joint_qvel": np.s_[84:112],
"joint_qeffort": np.s_[112:140],
"robot_pos": np.s_[140:143], # Global pos, this is NOT allowed in standard track
"robot_ori_cos": np.s_[143:146], # Global ori, this is NOT allowed in standard track
"robot_ori_sin": np.s_[146:149], # Global ori, this is NOT allowed in standard track
"robot_2d_ori": np.s_[149:150], # 2D global ori, this is NOT allowed in standard track
"robot_2d_ori_cos": np.s_[150:151], # 2D global ori, this is NOT allowed in standard track
"robot_2d_ori_sin": np.s_[151:152], # 2D global ori, this is NOT allowed in standard track
"robot_lin_vel": np.s_[152:155],
"robot_ang_vel": np.s_[155:158],
"arm_left_qpos": np.s_[158:165],
"arm_left_qpos_sin": np.s_[165:172],
"arm_left_qpos_cos": np.s_[172:179],
"arm_left_qvel": np.s_[179:186],
"eef_left_pos": np.s_[186:189],
"eef_left_quat": np.s_[189:193],
"gripper_left_qpos": np.s_[193:195],
"gripper_left_qvel": np.s_[195:197],
"arm_right_qpos": np.s_[197:204],
"arm_right_qpos_sin": np.s_[204:211],
"arm_right_qpos_cos": np.s_[211:218],
"arm_right_qvel": np.s_[218:225],
"eef_right_pos": np.s_[225:228],
"eef_right_quat": np.s_[228:232],
"gripper_right_qpos": np.s_[232:234],
"gripper_right_qvel": np.s_[234:236],
"trunk_qpos": np.s_[236:240],
"trunk_qvel": np.s_[240:244],
"base_qpos": np.s_[244:247], # Base joint position, this is NOT allowed in standard track
"base_qpos_sin": np.s_[247:250], # Base joint position, this is NOT allowed in standard track
"base_qpos_cos": np.s_[250:253], # Base joint position, this is NOT allowed in standard track
"base_qvel": np.s_[253:256],
}
),
}
# Proprioception indices
PROPRIO_QPOS_INDICES = {
"A1": OrderedDict(
{
"arm": np.s_[0:6],
"gripper": np.s_[6:8],
}
),
"R1Pro": OrderedDict(
{
"torso": np.s_[6:10],
"left_arm": np.s_[10:24:2],
"right_arm": np.s_[11:24:2],
"left_gripper": np.s_[24:26],
"right_gripper": np.s_[26:28],
}
),
}
# Joint limits (lower, upper)
JOINT_RANGE = {
"A1": {
"arm": (
th.tensor([-2.8798, 0.0, -3.3161, -2.8798, -1.6581, -2.8798], dtype=th.float32),
th.tensor([2.8798, 3.1415, 0.0, 2.8798, 1.6581, 2.8798], dtype=th.float32),
),
"gripper": (th.tensor([0.00], dtype=th.float32), th.tensor([0.03], dtype=th.float32)),
},
"R1Pro": {
"base": (
th.tensor([-0.75, -0.75, -1.0], dtype=th.float32),
th.tensor([0.75, 0.75, 1.0], dtype=th.float32),
),
"torso": (
th.tensor([-1.1345, -2.7925, -1.8326, -3.0543], dtype=th.float32),
th.tensor([1.8326, 2.5307, 1.5708, 3.0543], dtype=th.float32),
),
"left_arm": (
th.tensor([-4.4506, -0.1745, -2.3562, -2.0944, -2.3562, -1.0472, -1.5708], dtype=th.float32),
th.tensor([1.3090, 3.1416, 2.3562, 0.3491, 2.3562, 1.0472, 1.5708], dtype=th.float32),
),
"left_gripper": (th.tensor([0.00], dtype=th.float32), th.tensor([0.05], dtype=th.float32)),
"right_arm": (
th.tensor([-4.4506, -3.1416, -2.3562, -2.0944, -2.3562, -1.0472, -1.5708], dtype=th.float32),
th.tensor([1.3090, 0.1745, 2.3562, 0.3491, 2.3562, 1.0472, 1.5708], dtype=th.float32),
),
"right_gripper": (th.tensor([0.00], dtype=th.float32), th.tensor([0.05], dtype=th.float32)),
},
}
EEF_POSITION_RANGE = {
"A1": {
"0": (th.tensor([0.0, -0.7, 0.0], dtype=th.float32), th.tensor([0.7, 0.7, 0.7], dtype=th.float32)),
},
"R1Pro": {
"left": (
th.tensor([0.0, -0.65, 0.0], dtype=th.float32),
th.tensor([0.65, 0.65, 2.5], dtype=th.float32),
),
"right": (
th.tensor([0.0, -0.65, 0.0], dtype=th.float32),
th.tensor([0.65, 0.65, 2.5], dtype=th.float32),
),
},
}
TASK_NAMES_TO_INDICES = {
# B10
"turning_on_radio": 0,
"picking_up_trash": 1,
"putting_away_Halloween_decorations": 2,
"cleaning_up_plates_and_food": 3,
"can_meat": 4,
"setting_mousetraps": 5,
"hiding_Easter_eggs": 6,
"picking_up_toys": 7,
"rearranging_kitchen_furniture": 8,
"putting_up_Christmas_decorations_inside": 9,
# B20
"set_up_a_coffee_station_in_your_kitchen": 10,
"putting_dishes_away_after_cleaning": 11,
"preparing_lunch_box": 12,
"loading_the_car": 13,
"carrying_in_groceries": 14,
"bringing_in_wood": 15,
"moving_boxes_to_storage": 16,
"bringing_water": 17,
"tidying_bedroom": 18,
"outfit_a_basic_toolbox": 19,
# B30
"sorting_vegetables": 20,
"collecting_childrens_toys": 21,
"putting_shoes_on_rack": 22,
"boxing_books_up_for_storage": 23,
"storing_food": 24,
"clearing_food_from_table_into_fridge": 25,
"assembling_gift_baskets": 26,
"sorting_household_items": 27,
"getting_organized_for_work": 28,
"clean_up_your_desk": 29,
# B40
"setting_the_fire": 30,
"clean_boxing_gloves": 31,
"wash_a_baseball_cap": 32,
"wash_dog_toys": 33,
"hanging_pictures": 34,
"attach_a_camera_to_a_tripod": 35,
"clean_a_patio": 36,
"clean_a_trumpet": 37,
"spraying_for_bugs": 38,
"spraying_fruit_trees": 39,
# B50
"make_microwave_popcorn": 40,
"cook_cabbage": 41,
"chop_an_onion": 42,
"slicing_vegetables": 43,
"chopping_wood": 44,
"cook_hot_dogs": 45,
"cook_bacon": 46,
"freeze_pies": 47,
"canning_food": 48,
"make_pizza": 49,
}
TASK_INDICES_TO_NAMES = {v: k for k, v in TASK_NAMES_TO_INDICES.items()}
+605
View File
@@ -0,0 +1,605 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Convert Behavior Dataset to LeRobotDataset v3.0 format"""
import argparse
import json
import logging
import shutil
from pathlib import Path
import jsonlines
import pandas as pd
import pyarrow as pa
import tqdm
from datasets import Dataset, Features, Image
from lerobot.datasets.compute_stats import aggregate_stats
from lerobot.datasets.utils import (
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH,
DEFAULT_VIDEO_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_PATH,
LEGACY_EPISODES_PATH,
LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH,
cast_stats_to_numpy,
flatten_dict,
get_file_size_in_mb,
get_parquet_file_size_in_mb,
get_parquet_num_frames,
load_info,
update_chunk_file_indices,
write_episodes,
write_info,
write_stats,
write_tasks,
)
from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s
from lerobot.utils.utils import init_logging
# script to convert one single task to v3.1
# TASK = 1
NEW_ROOT = Path("/fsx/jade_choghari/tmp/bb")
def get_total_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step) -> int:
"""
Calculates the total number of episodes for a single, specified task.
"""
# Simply load the episodes for the task and count them.
episodes = legacy_load_episodes_task(
local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step
)
return len(episodes)
NUM_CAMERAS = 9
def get_total_frames_task(local_dir, meta_path, task_id: int, task_ranges: dict, step: int) -> int:
episodes_metadata = legacy_load_episodes_task(
local_dir=local_dir, task_id=task_id, task_ranges=task_ranges, step=step
)
total_frames = 0
# like 'duration'
for ep in episodes_metadata.values():
duration_s = ep["length"]
total_frames += int(duration_s)
return total_frames
def convert_info(
root, new_root, data_file_size_in_mb, video_file_size_in_mb, meta_path, task_id: int, task_ranges, step
):
info = load_info(root)
info["codebase_version"] = "v3.0"
del info["total_videos"]
info["data_files_size_in_mb"] = data_file_size_in_mb
info["video_files_size_in_mb"] = video_file_size_in_mb
info["data_path"] = DEFAULT_DATA_PATH
info["video_path"] = DEFAULT_VIDEO_PATH if info["video_path"] is not None else None
info["fps"] = int(info["fps"])
for key in info["features"]:
if info["features"][key]["dtype"] == "video":
# already has fps in video_info
continue
info["features"][key]["fps"] = info["fps"]
info["total_episodes"] = get_total_episodes_task(root, task_id, task_ranges, step)
info["total_videos"] = info["total_episodes"] * NUM_CAMERAS
info["total_frames"] = get_total_frames_task(root, meta_path, task_id, task_ranges, step)
info["total_tasks"] = 1
write_info(info, new_root)
def load_jsonlines(fpath: Path) -> list[any]:
with jsonlines.open(fpath, "r") as reader:
return list(reader)
def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]:
tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH)
# return tasks dict such that
tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])}
task_to_task_index = {task: task_index for task_index, task in tasks.items()}
return tasks, task_to_task_index
def convert_tasks(root, new_root, task_id: int):
tasks, _ = legacy_load_tasks(root)
if task_id not in tasks:
raise ValueError(f"Task ID {task_id} not found in tasks (available: {list(tasks.keys())})")
tasks = {task_id: tasks[task_id]}
task_indices = tasks.keys()
task_strings = tasks.values()
df_tasks = pd.DataFrame({"task_index": task_indices}, index=task_strings)
write_tasks(df_tasks, new_root)
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
dataframes = [pd.read_parquet(file) for file in paths_to_cat]
# Concatenate all DataFrames along rows
concatenated_df = pd.concat(dataframes, ignore_index=True)
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
if len(image_keys) > 0:
schema = pa.Schema.from_pandas(concatenated_df)
features = Features.from_arrow_schema(schema)
for key in image_keys:
features[key] = Image()
schema = features.arrow_schema
else:
schema = None
concatenated_df.to_parquet(path, index=False, schema=schema)
def get_image_keys(root):
info = load_info(root)
features = info["features"]
image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"]
return image_keys
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int, task_index: int):
task_dir_name = f"task-00{task_index}"
data_dir = root / "data" / task_dir_name
ep_paths = sorted(data_dir.glob("*.parquet"))
image_keys = get_image_keys(root)
ep_idx = 0
chunk_idx = 0
file_idx = 0
size_in_mb = 0
num_frames = 0
paths_to_cat = []
episodes_metadata = []
logging.info(f"Converting data files from {len(ep_paths)} episodes")
for ep_path in tqdm.tqdm(ep_paths, desc="convert data files"):
ep_size_in_mb = get_parquet_file_size_in_mb(ep_path)
ep_num_frames = get_parquet_num_frames(ep_path)
ep_metadata = {
"episode_index": ep_idx,
"data/chunk_index": chunk_idx,
"data/file_index": file_idx,
"dataset_from_index": num_frames,
"dataset_to_index": num_frames + ep_num_frames,
}
size_in_mb += ep_size_in_mb
num_frames += ep_num_frames
episodes_metadata.append(ep_metadata)
ep_idx += 1
if size_in_mb < data_file_size_in_mb:
paths_to_cat.append(ep_path)
continue
if paths_to_cat:
concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys)
# Reset for the next file
size_in_mb = ep_size_in_mb
paths_to_cat = [ep_path]
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE)
# Write remaining data if any
if paths_to_cat:
concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys)
return episodes_metadata
def convert_videos_of_camera(
root: Path, new_root: Path, video_key: str, video_file_size_in_mb: int, task_index: int
):
# Access old paths to mp4
# videos_dir = root / "videos"
# ep_paths = sorted(videos_dir.glob(f"*/{video_key}/*.mp4"))
task_dir_name = f"task-00{task_index}"
videos_dir = root / "videos" / task_dir_name / video_key
ep_paths = sorted(videos_dir.glob("*.mp4"))
print("ep_paths", ep_paths)
ep_idx = 0
chunk_idx = 0
file_idx = 0
size_in_mb = 0
duration_in_s = 0.0
paths_to_cat = []
episodes_metadata = []
for ep_path in tqdm.tqdm(ep_paths, desc=f"convert videos of {video_key}"):
ep_size_in_mb = get_file_size_in_mb(ep_path)
ep_duration_in_s = get_video_duration_in_s(ep_path)
# Check if adding this episode would exceed the limit
if size_in_mb + ep_size_in_mb >= video_file_size_in_mb and len(paths_to_cat) > 0:
# Size limit would be exceeded, save current accumulation WITHOUT this episode
concatenate_video_files(
paths_to_cat,
new_root
/ DEFAULT_VIDEO_PATH.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx),
)
# Update episodes metadata for the file we just saved
for i, _ in enumerate(paths_to_cat):
past_ep_idx = ep_idx - len(paths_to_cat) + i
episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx
episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx
# Move to next file and start fresh with current episode
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE)
size_in_mb = 0
duration_in_s = 0.0
paths_to_cat = []
# Add current episode metadata
ep_metadata = {
"episode_index": ep_idx,
f"videos/{video_key}/chunk_index": chunk_idx, # Will be updated when file is saved
f"videos/{video_key}/file_index": file_idx, # Will be updated when file is saved
f"videos/{video_key}/from_timestamp": duration_in_s,
f"videos/{video_key}/to_timestamp": duration_in_s + ep_duration_in_s,
}
episodes_metadata.append(ep_metadata)
# Add current episode to accumulation
paths_to_cat.append(ep_path)
size_in_mb += ep_size_in_mb
duration_in_s += ep_duration_in_s
ep_idx += 1
# Write remaining videos if any
if paths_to_cat:
concatenate_video_files(
paths_to_cat,
new_root
/ DEFAULT_VIDEO_PATH.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx),
)
# Update episodes metadata for the final file
for i, _ in enumerate(paths_to_cat):
past_ep_idx = ep_idx - len(paths_to_cat) + i
episodes_metadata[past_ep_idx][f"videos/{video_key}/chunk_index"] = chunk_idx
episodes_metadata[past_ep_idx][f"videos/{video_key}/file_index"] = file_idx
return episodes_metadata
def get_video_keys(root):
info = load_info(root)
features = info["features"]
video_keys = [key for key, ft in features.items() if ft["dtype"] == "video"]
return video_keys
def convert_videos(root: Path, new_root: Path, video_file_size_in_mb: int, task_id: int):
logging.info(f"Converting videos from {root} to {new_root}")
video_keys = get_video_keys(root)
if len(video_keys) == 0:
return None
video_keys = sorted(video_keys)
eps_metadata_per_cam = []
for camera in video_keys:
eps_metadata = convert_videos_of_camera(root, new_root, camera, video_file_size_in_mb, task_id)
eps_metadata_per_cam.append(eps_metadata)
num_eps_per_cam = [len(eps_cam_map) for eps_cam_map in eps_metadata_per_cam]
if len(set(num_eps_per_cam)) != 1:
raise ValueError(f"All cams dont have same number of episodes ({num_eps_per_cam}).")
episods_metadata = []
num_cameras = len(video_keys)
num_episodes = num_eps_per_cam[0]
for ep_idx in tqdm.tqdm(range(num_episodes), desc="convert videos"):
# Sanity check
ep_ids = [eps_metadata_per_cam[cam_idx][ep_idx]["episode_index"] for cam_idx in range(num_cameras)]
ep_ids += [ep_idx]
if len(set(ep_ids)) != 1:
raise ValueError(f"All episode indices need to match ({ep_ids}).")
ep_dict = {}
for cam_idx in range(num_cameras):
ep_dict.update(eps_metadata_per_cam[cam_idx][ep_idx])
episods_metadata.append(ep_dict)
return episods_metadata
def infer_task_episode_ranges(episodes_jsonl_path: Path) -> dict:
"""
Parse the Behavior-1K episodes.jsonl metadata and infer contiguous episode ranges per unique task.
Returns a dict:
{ task_id: { "task_string": ..., "ep_start": ..., "ep_end": ... } }
"""
task_ranges = {}
task_id = 0
current_task_str = None
ep_start = None
ep_end = None
with open(episodes_jsonl_path) as f:
for line in f:
if not line.strip():
continue
ep = json.loads(line)
ep_idx = ep["episode_index"]
task_str = ep["tasks"][0] if ep["tasks"] else "UNKNOWN"
if current_task_str is None:
current_task_str = task_str
ep_start = ep_idx
ep_end = ep_idx
elif task_str == current_task_str:
ep_end = ep_idx
else:
# close previous task group
task_ranges[task_id] = {
"task_string": current_task_str,
"ep_start": ep_start,
"ep_end": ep_end,
}
task_id += 1
# start new one
current_task_str = task_str
ep_start = ep_idx
ep_end = ep_idx
# store last task
if current_task_str is not None:
task_ranges[task_id] = {
"task_string": current_task_str,
"ep_start": ep_start,
"ep_end": ep_end,
}
return task_ranges
def legacy_load_episodes_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict:
"""
Load only the episodes belonging to a specific task, inferred automatically from episode ranges.
Args:
local_dir (Path): Root path containing legacy meta/episodes.jsonl
task_id (int): Which task to load (key from the inferred task_ranges dict)
task_ranges (dict): Mapping from infer_task_episode_ranges()
step (int): Episode index step (Behavior-1K = 10)
"""
all_episodes = legacy_load_episodes(local_dir)
# get the range for this task
if task_id not in task_ranges:
raise ValueError(f"Task id {task_id} not found in task_ranges")
ep_start = task_ranges[task_id]["ep_start"]
ep_end = task_ranges[task_id]["ep_end"]
task_episode_indices = range(ep_start, ep_end + step, step)
return {i: all_episodes[i] for i in task_episode_indices if i in all_episodes}
def legacy_load_episodes(local_dir: Path) -> dict:
episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH)
return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])}
def legacy_load_episodes_stats(local_dir: Path) -> dict:
episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH)
return {
item["episode_index"]: cast_stats_to_numpy(item["stats"])
for item in sorted(episodes_stats, key=lambda x: x["episode_index"])
}
def legacy_load_episodes_stats_task(local_dir: Path, task_id: int, task_ranges: dict, step: int = 10) -> dict:
all_stats = legacy_load_episodes_stats(local_dir)
if task_id not in task_ranges:
raise ValueError(f"Task id {task_id} not found in task_ranges")
ep_start = task_ranges[task_id]["ep_start"]
ep_end = task_ranges[task_id]["ep_end"]
task_episode_indices = range(ep_start, ep_end + step, step)
return {i: all_stats[i] for i in task_episode_indices if i in all_stats}
def generate_episode_metadata_dict(
episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_videos=None
):
num_episodes = len(episodes_metadata)
episodes_legacy_metadata_vals = list(episodes_legacy_metadata.values())
episodes_stats_vals = list(episodes_stats.values())
episodes_stats_keys = list(episodes_stats.keys())
for i in range(num_episodes):
ep_legacy_metadata = episodes_legacy_metadata_vals[i]
ep_metadata = episodes_metadata[i]
ep_stats = episodes_stats_vals[i]
ep_ids_set = {
ep_legacy_metadata["episode_index"],
ep_metadata["episode_index"],
episodes_stats_keys[i],
}
if episodes_videos is None:
ep_video = {}
else:
ep_video = episodes_videos[i]
ep_ids_set.add(ep_video["episode_index"])
# we skip this check because ep_ids have a step of 10, whereas we convert with a step of 1
# if len(ep_ids_set) != 1:
# raise ValueError(f"Number of episodes is not the same ({ep_ids_set}).")
ep_dict = {**ep_metadata, **ep_video, **ep_legacy_metadata, **flatten_dict({"stats": ep_stats})}
ep_dict["meta/episodes/chunk_index"] = 0
ep_dict["meta/episodes/file_index"] = 0
yield ep_dict
def convert_episodes_metadata(
root, new_root, episodes_metadata, task_id: int, task_ranges, episodes_video_metadata=None
):
logging.info(f"Converting episodes metadata from {root} to {new_root}")
# filter by task
episodes_legacy_metadata = legacy_load_episodes_task(root, task_id=task_id, task_ranges=task_ranges)
episodes_stats = legacy_load_episodes_stats_task(root, task_id=task_id, task_ranges=task_ranges)
num_eps_set = {len(episodes_legacy_metadata), len(episodes_metadata)}
if episodes_video_metadata is not None:
num_eps_set.add(len(episodes_video_metadata))
if len(num_eps_set) != 1:
raise ValueError(f"Number of episodes is not the same ({num_eps_set}).")
ds_episodes = Dataset.from_generator(
lambda: generate_episode_metadata_dict(
episodes_legacy_metadata, episodes_metadata, episodes_stats, episodes_video_metadata
)
)
write_episodes(ds_episodes, new_root)
stats = aggregate_stats(list(episodes_stats.values()))
write_stats(stats, new_root)
def convert_dataset_local(
data_path: Path,
new_repo: Path,
task_id: int,
data_file_size_in_mb: int = DEFAULT_DATA_FILE_SIZE_IN_MB,
video_file_size_in_mb: int = DEFAULT_VIDEO_FILE_SIZE_IN_MB,
force_conversion: bool = False,
):
"""
Convert a local dataset to v3.x format, task-by-task, without using the Hugging Face Hub.
Args:
data_path (Path): path to local dataset root (e.g. /fsx/.../2025-challenge-demos)
new_repo (Path): path where converted dataset will be written (e.g. /fsx/.../behavior1k_v3)
task_id (int): which task to convert (index)
data_file_size_in_mb (int): max size per data chunk
video_file_size_in_mb (int): max size per video chunk
force_conversion (bool): overwrite existing conversion if True
"""
root = Path(data_path)
new_root = Path(new_repo)
# Clean up if needed
if new_root.exists() and force_conversion:
shutil.rmtree(new_root)
new_root.mkdir(parents=True, exist_ok=True)
print(f"🔹 Starting conversion for task {task_id}")
print(f"Input root: {root}")
print(f"Output root: {new_root}")
# Infer task episode ranges
episodes_meta_path = root / "meta" / "episodes.jsonl"
task_ranges = infer_task_episode_ranges(episodes_meta_path)
convert_info(
root,
new_root,
data_file_size_in_mb,
video_file_size_in_mb,
episodes_meta_path,
task_id,
task_ranges,
step=10,
)
convert_tasks(root, new_root, task_id)
episodes_metadata = convert_data(root, new_root, data_file_size_in_mb, task_index=task_id)
episodes_videos_metadata = convert_videos(root, new_root, video_file_size_in_mb, task_id=task_id)
convert_episodes_metadata(
root,
new_root,
episodes_metadata,
task_id=task_id,
task_ranges=task_ranges,
episodes_video_metadata=episodes_videos_metadata,
)
print(f"✅ Conversion complete for task {task_id}")
print(f"Converted dataset written to: {new_root}")
if __name__ == "__main__":
import argparse
from pathlib import Path
init_logging()
parser = argparse.ArgumentParser(
description="Convert Behavior-1K tasks to LeRobot v3 format (local only)"
)
parser.add_argument(
"--data-path",
type=str,
required=True,
help="Path to the local Behavior-1K dataset (e.g. /fsx/francesco_capuano/.cache/behavior-1k/2025-challenge-demos)",
)
parser.add_argument(
"--new-repo",
type=str,
required=True,
help="Path to the output directory for the converted dataset",
)
parser.add_argument(
"--task-id",
type=int,
required=True,
help="Task index to convert (e.g. 0, 1, 2, ...)",
)
parser.add_argument(
"--data-file-size-in-mb",
type=int,
default=DEFAULT_DATA_FILE_SIZE_IN_MB,
help=f"Maximum size per data chunk (default: {DEFAULT_DATA_FILE_SIZE_IN_MB})",
)
parser.add_argument(
"--video-file-size-in-mb",
type=int,
default=DEFAULT_VIDEO_FILE_SIZE_IN_MB,
help=f"Maximum size per video chunk (default: {DEFAULT_VIDEO_FILE_SIZE_IN_MB})",
)
parser.add_argument(
"--force-conversion",
action="store_true",
help="Force overwrite of existing conversion output if present.",
)
args = parser.parse_args()
convert_dataset_local(
data_path=Path(args.data_path),
new_repo=Path(args.new_repo),
task_id=args.task_id,
data_file_size_in_mb=args.data_file_size_in_mb,
video_file_size_in_mb=args.video_file_size_in_mb,
force_conversion=args.force_conversion,
)
@@ -0,0 +1,130 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test script to verify BEHAVIOR-1K dataset loading with v3.0 wrapper.
"""
import argparse
import logging
from behavior_lerobot_dataset_v3 import BehaviorLeRobotDatasetV3
from lerobot.utils.utils import init_logging
init_logging()
def load_behavior1k_dataset(repo_id, root):
"""Test basic dataset loading."""
logging.info("=" * 80)
logging.info("Testing BEHAVIOR-1K dataset loading")
logging.info("=" * 80)
logging.info(f"\n1. Loading dataset with repo_id: {repo_id}")
dataset = BehaviorLeRobotDatasetV3(
repo_id=repo_id,
root=root,
modalities=["rgb"],
cameras=["head"],
chunk_streaming_using_keyframe=False,
check_timestamp_sync=False,
)
logging.info("\n2. Dataset loaded successfully!")
logging.info(f" - Number of episodes: {dataset.num_episodes}")
logging.info(f" - Number of frames: {dataset.num_frames}")
logging.info(f" - FPS: {dataset.fps}")
logging.info(f" - Features: {list(dataset.features)}")
return dataset
def load_behavior1k_dataset_with_multiple_modalities(repo_id, root):
"""Test loading multiple modalities and cameras."""
logging.info("\n" + "=" * 80)
logging.info("Testing multi-modality loading with repo_id: {repo_id}")
logging.info("=" * 80)
logging.info(f"\n1. Loading dataset with RGB + Depth with repo_id: {repo_id}")
dataset = BehaviorLeRobotDatasetV3(
repo_id=repo_id,
root=root,
modalities=["rgb", "depth"],
cameras=["head", "left_wrist", "right_wrist"],
chunk_streaming_using_keyframe=False,
check_timestamp_sync=False,
video_backend="pyav",
)
logging.info(f"\n2. Dataset loaded with modalities: {list(dataset.features)}")
logging.info(f" - Total features: {len(dataset.features)}")
rgb_keys = [k for k in dataset.features if "rgb" in k]
depth_keys = [k for k in dataset.features if "depth" in k]
logging.info(f" - RGB features: {rgb_keys}")
logging.info(f" - Depth features: {depth_keys}")
logging.info("\n3. SUCCESS! Multi-modality loading works.")
return dataset
def stream_behavior1k_dataset(repo_id, root):
"""Test chunk streaming mode."""
logging.info("\n" + "=" * 80)
logging.info("Testing chunk streaming mode")
logging.info("=" * 80)
logging.info("\n1. Loading dataset with chunk streaming...")
dataset = BehaviorLeRobotDatasetV3(
repo_id=repo_id,
root=root,
modalities=["rgb"],
cameras=["head"],
chunk_streaming_using_keyframe=True,
shuffle=True,
seed=42,
check_timestamp_sync=False,
)
logging.info("\n2. Dataset loaded in streaming mode")
logging.info(f" - Number of chunks: {len(dataset.chunks)}")
logging.info(f" - First chunk range: {dataset.chunks[0]}")
logging.info("\n3. Testing frame access in streaming mode...")
for i in range(min(3, len(dataset))):
frame = dataset[i]
logging.info(
f" - Frame {i}: episode_index={frame['episode_index'].item()}, "
f"task_index={frame['task_index'].item()}"
)
logging.info("\n4. SUCCESS! Chunk streaming works.")
return dataset
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--repo-id", type=str, default=None)
parser.add_argument("--root", type=str, default=None)
args = parser.parse_args()
load_behavior1k_dataset(args.repo_id, args.root)
load_behavior1k_dataset_with_multiple_modalities(args.repo_id, args.root)
stream_behavior1k_dataset(args.repo_id, args.root)
-243
View File
@@ -1,243 +0,0 @@
# Synthetic Data Generation Script - Summary
## ✅ What Was Created
### Main Script: `annotate_pgen.py` (717 lines)
A production-ready script implementing the Hi-Robot synthetic data generation pipeline.
**Key Features:**
- ✅ Loads LeRobot datasets with skill annotations
- ✅ Generates synthetic user prompts and robot utterances using Qwen VLM
-**Temporal sampling** - generates dialogue every N seconds (default: 1s)
- ✅ Adds `task_index_high_level` feature to dataset parquets
- ✅ Saves high-level tasks to `meta/tasks_high_level.parquet`
- ✅ Exports debug JSONL for quality analysis
- ✅ Supports both Qwen2-VL and Qwen3-VL models
- ✅ Multi-view camera support
- ✅ Episode-aware processing with automatic first-frame sampling
- ✅ Modular architecture for easy extension
### Supporting Files Created
1. **`run_pgen.sh`** - Convenience script with sensible defaults
2. **`README_PGEN.md`** - Comprehensive documentation with examples
3. **`example_pgen_usage.md`** - Practical examples and performance estimates
4. **`SAMPLING_DIAGRAM.md`** - Visual explanation of temporal sampling strategy
5. **`PGEN_SUMMARY.md`** - This file
## 🚀 Key Innovation: Temporal Sampling
The script processes **ALL episodes** in the dataset efficiently via `--sample-interval`:
```bash
# Instead of calling VLM for every frame (expensive):
# 15,000 frames × VLM call = ~5 hours
# Generate dialogue every 1 second (efficient):
python annotate_pgen.py --repo-id dataset --model qwen --sample-interval 1.0
# 15,000 frames processed, only ~500 VLM calls (30x speedup!)
```
**How it works:**
- Process ALL frames in ALL episodes (complete coverage)
- Generate dialogue at sampled timepoints (e.g., every 1 second)
- Propagate task indices to intermediate frames
- Always sample first frame of each episode
- All frames get labeled, but VLM is only called for samples
- No dummy values or skipped episodes
**Benefits:**
- 30-100x speedup depending on interval
- Maintains temporal coherence
- Reduces cost without losing quality
- Configurable based on skill duration
## 📊 Efficiency Comparison
For a typical 15,000 frame dataset at 30 fps:
| Method | VLM Calls | Time | Cost |
|--------|-----------|------|------|
| Every frame | 15,000 | ~5 hours | $$$$ |
| Every 0.5s | 1,000 | ~20 min | $$$ |
| **Every 1s** (default) | **500** | **~10 min** | **$$** |
| Every 2s | 250 | ~5 min | $ |
## 🎯 Usage
### Quick Test (5s sampling for fast iteration)
```bash
python examples/dataset/annotate_pgen.py \
--data-dir /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace \
--model Qwen/Qwen2-VL-7B-Instruct \
--sample-interval 5.0 \
--output-dir ./outputs/test_quick
```
### Production Run (Recommended Settings)
```bash
python examples/dataset/annotate_pgen.py \
--data-dir /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace \
--model Qwen/Qwen2-VL-7B-Instruct \
--sample-interval 1.0 \
--output-dir ./outputs/full_pgen
```
### High-Quality with Qwen3
```bash
python examples/dataset/annotate_pgen.py \
--data-dir /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace \
--model Qwen/Qwen3-VL-30B-A3B-Instruct \
--sample-interval 0.5 \
--temperature 0.6 \
--output-dir ./outputs/high_quality
```
## 📦 Output Structure
After running, you'll have:
```
dataset_root/
├── meta/
│ ├── tasks_high_level.parquet # High-level tasks with prompts/utterances
│ └── syn_annotations.jsonl # Debug: full context for each sample
└── data/
└── chunk-000/
└── file-000.parquet # Updated with task_index_high_level
```
**New feature added to all parquet files:**
- `task_index_high_level` (int64): Links to tasks_high_level.parquet
## 🔧 All Parameters
| Parameter | Default | Description |
|-----------|---------|-------------|
| `--repo-id` / `--data-dir` | - | Dataset source |
| `--model` | Qwen/Qwen2-VL-7B-Instruct | VLM model |
| `--device` | cuda | Device to use |
| `--dtype` | bfloat16 | Model precision |
| `--temperature` | 0.7 | Sampling temperature |
| **`--sample-interval`** | **1.0** | **Generate every N seconds (all episodes processed)** |
| `--num-image-views-per-sample` | 1 | Number of cameras |
| `--batch-size` | 1 | Batch size (currently unused) |
| `--output-dir` | None | Output directory |
| `--push-to-hub` | False | Push to HuggingFace |
## 🎨 Generated Data Format
Each sampled frame produces:
```json
{
"scenario_type": "specific_object",
"response_type": "confirmation",
"user_prompt": "Can you pick up the pink brick?",
"robot_utterance": "Sure, I'll grab the pink lego brick.",
"skill": "robot arm picks up pink lego brick",
"episode_id": 0,
"frame_index": 45,
"timestamp": 1.5,
"skill_history": ["robot arm moves towards pink lego brick"],
"task_description": "pink lego brick into the transparent box"
}
```
**Scenario Types:**
- specific_object, negative_task, situated_correction, implicit_request, constraint_based
**Response Types:**
- confirmation, clarification, acknowledgment, constraint_acknowledgment
## 🔬 Code Architecture
```python
# Main components (modular design)
class QwenPgen:
"""VLM wrapper supporting Qwen2/3"""
def call_qwen(images, prompt) -> dict
def construct_prompt(task, history, skill) -> str:
"""Build contextual prompt with history"""
def annotate_sample(pgen, images, ...) -> dict:
"""Generate dialogue for one sample"""
def generate_synthetic_data(dataset, pgen, ...) -> tuple:
"""Process entire dataset with temporal sampling"""
# Core sampling logic:
# - Track last_sample_timestamp per episode
# - Sample if time_elapsed >= sample_interval
# - Always sample first frame of episodes
# - Propagate task_index to intermediate frames
def main():
"""CLI entrypoint with argparse"""
```
## ✨ Next Steps
1. **Quick test with large interval:**
```bash
# Fast iteration - samples every 5 seconds
python examples/dataset/annotate_pgen.py \
--data-dir /path/to/dataset \
--model Qwen/Qwen2-VL-7B-Instruct \
--sample-interval 5.0 \
--output-dir ./outputs/quick_test
```
2. **Verify output quality:**
```bash
head outputs/quick_test/meta/syn_annotations.jsonl
```
3. **Production run:**
```bash
# Standard 1 second sampling for production
bash examples/dataset/run_pgen.sh
```
4. **Use in training:**
```python
from lerobot.datasets.lerobot_dataset import LeRobotDataset
ds = LeRobotDataset(repo_id="...", root="outputs/pgen_annotations")
# Access high-level task for each frame
frame = ds[100]
task_idx = frame["task_index_high_level"].item()
```
## 📚 Documentation Files
- **`README_PGEN.md`**: Full API reference and troubleshooting
- **`example_pgen_usage.md`**: Practical examples with performance estimates
- **`SAMPLING_DIAGRAM.md`**: Visual explanation of temporal sampling
- **`PGEN_SUMMARY.md`**: This overview document
## 🎯 Success Criteria
✅ Script generates synthetic dialogue using Qwen VLM
✅ Adds `task_index_high_level` feature to dataset
✅ Saves tasks to `tasks_high_level.parquet`
✅ Implements efficient temporal sampling (30-100x speedup)
✅ Handles episode boundaries correctly
✅ Produces diverse interaction types (scenarios + responses)
✅ Maintains temporal coherence within episodes
✅ Includes comprehensive documentation and examples
✅ Ready for production use on real datasets
## 💡 Key Takeaway
**The script processes ALL episodes with intelligent sampling:**
- `--sample-interval` controls how often VLM is called (default: 1.0s)
- ALL frames in ALL episodes get labeled (complete coverage)
- Intermediate frames inherit from most recent sample (temporal coherence)
- Achieves 30-100x speedup while maintaining quality
- Adjust interval based on use case: 5.0s for testing, 1.0s for production, 0.5s for fine detail
This makes the synthetic data generation **practical, scalable, and complete** for real-world datasets!
-243
View File
@@ -1,243 +0,0 @@
# Synthetic Data Generation for Hierarchical Robot Policies
This directory contains `annotate_pgen.py`, a script for generating synthetic user prompts and robot utterances for hierarchical policy training using Vision-Language Models (VLMs).
## Overview
The script implements the synthetic data generation pipeline described in the Hi-Robot paper:
1. **Load** a LeRobot dataset with skill annotations (from `annotate.py`)
2. **Generate** synthetic dialogue using Qwen VLM:
- User prompts (_t): Natural requests that lead to specific skills
- Robot utterances (u_t): Acknowledgments and clarifications
3. **Save** results as a new dataset feature `task_index_high_level`
## Prerequisites
1. First, annotate your dataset with skills using `annotate.py`:
```bash
python examples/dataset/annotate.py \
--repo-id lerobot/svla_so101_pickplace \
--video-key observation.images.base \
--model Qwen/Qwen2-VL-7B-Instruct
```
This creates `meta/skills.json` with skill segmentation for each episode.
## Usage
### Basic Usage
```bash
python examples/dataset/annotate_pgen.py \
--repo-id lerobot/svla_so101_pickplace \
--model Qwen/Qwen2-VL-7B-Instruct \
--sample-interval 1.0 \
--output-dir ./outputs/pgen_dataset
```
**Note**: The script processes **all episodes** in the dataset. It generates dialogue every 1 second (`--sample-interval 1.0`) using temporal sampling. Frames between samples reuse the last generated dialogue. This makes the process efficient while ensuring complete dataset coverage.
### Advanced Options
```bash
python examples/dataset/annotate_pgen.py \
--repo-id lerobot/svla_so101_pickplace \
--model Qwen/Qwen3-VL-30B-A3B-Instruct \
--temperature 0.8 \
--sample-interval 0.5 \
--num-image-views-per-sample 2 \
--output-dir ./outputs/pgen_dataset \
--push-to-hub
```
This example uses a more powerful model and samples every 0.5 seconds for finer granularity.
### Fast Testing (larger interval)
```bash
python examples/dataset/annotate_pgen.py \
--repo-id lerobot/svla_so101_pickplace \
--model Qwen/Qwen2-VL-7B-Instruct \
--sample-interval 5.0 \
--output-dir ./outputs/pgen_quick_test
```
Use a larger interval (5.0 seconds) for rapid iteration during development. All episodes are still processed.
### Using Local Dataset
```bash
python examples/dataset/annotate_pgen.py \
--data-dir /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace \
--model Qwen/Qwen2-VL-7B-Instruct \
--output-dir ./outputs/pgen_dataset
```
## Output Files
The script produces several outputs:
1. **`meta/tasks_high_level.parquet`**: High-level tasks with user prompts and robot utterances
- Columns: task_index, user_prompt, robot_utterance, skill, scenario_type, response_type
2. **`meta/syn_annotations.jsonl`**: Debug file with all generated dialogues
- One JSON object per line with full context for each frame
3. **Modified dataset**: New dataset with `task_index_high_level` feature added to all parquet files
## Scenario and Response Types
The generator produces diverse interaction types:
### Scenario Types
- **specific_object**: Direct specification of objects/actions
- **negative_task**: Instructions about what NOT to do
- **situated_correction**: Adjustments based on current state
- **implicit_request**: Implied needs without direct commands
- **constraint_based**: Specific constraints or preferences
### Response Types
- **confirmation**: Simple acknowledgment ("OK, I'll do X")
- **clarification**: Seeking confirmation ("Just to confirm...")
- **acknowledgment**: Action acknowledgment ("Got it, doing X")
- **constraint_acknowledgment**: Acknowledging constraints ("Sure, I'll X while Y")
## Example Generated Data
```json
{
"episode_id": 0,
"frame_index": 45,
"timestamp": 2.5,
"skill_current": "robot arm picks up pink lego brick",
"skill_history": ["robot arm moves towards pink lego brick"],
"task_description": "pink lego brick into the transparent box",
"scenario_type": "specific_object",
"response_type": "confirmation",
"user_prompt": "Can you grab the pink brick?",
"robot_utterance": "Sure, I'll pick up the pink lego brick."
}
```
## Accessing the Data
After running the script, access the synthetic data in your code:
```python
from lerobot.datasets.lerobot_dataset import LeRobotDataset
import pandas as pd
# Load modified dataset
dataset = LeRobotDataset(repo_id="lerobot/svla_so101_pickplace_with_high_level_tasks")
# Access frame with high-level task
frame = dataset[100]
high_level_task_idx = frame["task_index_high_level"].item()
# Load high-level tasks
tasks_df = pd.read_parquet(dataset.root / "meta" / "tasks_high_level.parquet")
task_info = tasks_df.iloc[high_level_task_idx]
print(f"User prompt: {task_info['user_prompt']}")
print(f"Robot utterance: {task_info['robot_utterance']}")
print(f"Skill: {task_info['skill']}")
```
## Architecture
The script is modular and extensible:
```python
# Core components
class QwenPgen:
"""VLM wrapper for generation"""
def call_qwen(images, prompt) -> dict
def construct_prompt(task, history, skill) -> str
"""Build prompt for VLM"""
def annotate_sample(pgen, images, ...) -> dict
"""Generate dialogue for one sample"""
def generate_synthetic_data(dataset, pgen, ...) -> tuple
"""Process entire dataset"""
```
## Parameters
| Parameter | Default | Description |
|-----------|---------|-------------|
| `--repo-id` | - | HuggingFace dataset ID |
| `--data-dir` | - | Local dataset path |
| `--model` | Qwen/Qwen2-VL-7B-Instruct | VLM model name |
| `--device` | cuda | Device (cuda/cpu) |
| `--dtype` | bfloat16 | Model precision |
| `--temperature` | 0.7 | Sampling temperature |
| `--sample-interval` | 1.0 | Generate dialogue every N seconds (all episodes processed) |
| `--num-image-views-per-sample` | 1 | Number of cameras |
| `--output-dir` | None | Output directory |
| `--push-to-hub` | False | Push to HuggingFace Hub |
## Sampling Strategy
The script uses **temporal sampling** to efficiently generate dialogue:
- **Default**: Generate dialogue every 1 second (`--sample-interval 1.0`)
- **Efficiency**: If a dataset runs at 30fps, this samples ~3% of frames
- **Propagation**: Frames between samples reuse the last generated task_index
- **Episode-aware**: Always samples the first frame of each episode
### Example with 30 fps dataset:
```bash
# Sample every 1 second (every 30 frames)
--sample-interval 1.0 # ~3,000 generations for a 100 episode dataset (3 sec/episode)
# Sample every 0.5 seconds (every 15 frames)
--sample-interval 0.5 # ~6,000 generations (more granular)
# Sample every 2 seconds (every 60 frames)
--sample-interval 2.0 # ~1,500 generations (more efficient)
```
### Why sampling works:
- Skills typically last 1-3 seconds
- Dialogue doesn't need to change every frame
- Reduces computational cost by 30-100x
- Still provides good coverage for training
## Tips
1. **Quick testing**: Use larger `--sample-interval` (e.g., 5.0 or 10.0) for rapid iteration
2. **Monitor GPU**: VLM inference is memory-intensive
3. **Check outputs**: Review `syn_annotations.jsonl` for quality
4. **Adjust temperature**: Higher = more diverse, lower = more consistent
5. **Multiple views**: Use `--num-image-views-per-sample 2+` for better context
6. **Tune sampling**: Start with 1.0s, increase for speed (testing), decrease for granularity (production)
## Troubleshooting
### No skills.json found
Run `annotate.py` first to generate skill annotations.
### Out of memory
- Reduce batch size to 1
- Use smaller model (Qwen2-VL-7B instead of Qwen3-VL-30B)
- Process fewer samples at a time
### Poor quality generations
- Adjust temperature (try 0.6-0.9)
- Check that skills.json has good annotations
- Ensure images are loading correctly
## Citation
Based on the Hi-Robot paper's synthetic data generation approach:
```
@article{hirobot2024,
title={Hi-Robot: Hierarchical Robot Learning with Vision-Language Models},
year={2024}
}
```
-141
View File
@@ -1,141 +0,0 @@
# Temporal Sampling Strategy Visualization
## How `--sample-interval` Works
### Example: 30 fps dataset, `--sample-interval 1.0` (1 second)
```
Timeline (seconds): 0.0 0.5 1.0 1.5 2.0 2.5 3.0
│ │ │ │ │ │ │
Frames: 0───15───30───45───60───75───90───105──120──135──150
│ │ │ │ │ │ │
▼ ▼ ▼ ▼
Sampled: YES NO YES NO YES NO YES
│ │ │ │
Task Index: [0]──────────────>[1]──────────────>[2]──────────────>[3]
│ │ │ │
VLM Called: ✓ Gen ✓ Gen ✓ Gen ✓ Gen
dialogue dialogue dialogue dialogue
│ │ │ │
Frames 0-29 ─────┘ │ │ │
get task 0 │ │ │
│ │ │
Frames 30-59 ────────────────────────┘ │ │
get task 1 │ │
│ │
Frames 60-89 ──────────────────────────────────────────┘ │
get task 2 │
Frames 90-119 ────────────────────────────────────────────────────────────┘
get task 3
```
## Comparison: Different Sampling Intervals
### `--sample-interval 2.0` (every 2 seconds)
```
Timeline: 0.0 1.0 2.0 3.0 4.0 5.0 6.0
│ │ │ │ │ │ │
Sampled: YES NO YES NO YES NO YES
│ │ │ │
Tasks: [0]───────────────>[1]───────────────>[2]───────────────>[3]
VLM Calls: 4 (fewer calls, faster but less granular)
```
### `--sample-interval 1.0` (every 1 second) - **DEFAULT**
```
Timeline: 0.0 0.5 1.0 1.5 2.0 2.5 3.0 3.5 4.0 4.5 5.0 5.5 6.0
│ │ │ │ │ │ │ │ │ │ │ │ │
Sampled: YES NO YES NO YES NO YES NO YES NO YES NO YES
│ │ │ │ │ │ │
Tasks: [0]─────────>[1]─────────>[2]─────────>[3]─────────>[4]─────────>[5]─────>[6]
VLM Calls: 7 (balanced coverage and speed)
```
### `--sample-interval 0.5` (every 0.5 seconds)
```
Timeline: 0.0 0.5 1.0 1.5 2.0 2.5 3.0 3.5 4.0 4.5 5.0 5.5 6.0
│ │ │ │ │ │ │ │ │ │ │ │ │
Sampled: YES YES YES YES YES YES YES YES YES YES YES YES YES
│ │ │ │ │ │ │ │ │ │ │ │ │
Tasks: [0]─>[1]─>[2]─>[3]─>[4]─>[5]─>[6]─>[7]─>[8]─>[9]─>[10]>[11]>[12]
VLM Calls: 13 (high granularity, slower but more detailed)
```
## Episode Boundaries
The script always samples the **first frame** of each episode:
```
Episode 0 Episode 1 Episode 2
├─────────────────────────────────┤├─────────────────────────────────┤├──────...
│ ││ ││
Frame: 0 30 60 90 120 130 160 190 220 250 260 290 320
Time: 0.0 1.0 2.0 3.0 4.0 0.0 1.0 2.0 3.0 4.0 0.0 1.0 2.0
│ │ │ │ │ │ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
Sample:YES YES YES YES YES YES YES YES YES YES YES YES YES
│ │ │ │ │ │ │ │ │ │ │ │ │
Task: 0────1─────2─────3────4 5─────6─────7─────8────9 10────11───12
Note: Frames 0, 130, 260 are ALWAYS sampled (episode starts)
Even if they're within the sample-interval window
```
## Real-World Example: svla_so101_pickplace Dataset
Typical stats:
- **Total episodes**: 50
- **Avg episode length**: 300 frames (10 seconds at 30 fps)
- **Total frames**: 15,000
### Without Sampling (every frame)
```
Frames processed: 15,000
VLM calls: 15,000
Time estimate: ~5 hours
Unique tasks: ~12,000 (lots of duplicates)
```
### With `--sample-interval 1.0` (every 1 second)
```
Frames processed: 15,000 ✓
VLM calls: 500
Time estimate: ~10 minutes
Unique tasks: ~450 (meaningful variety)
Efficiency gain: 30x faster
```
### With `--sample-interval 2.0` (every 2 seconds)
```
Frames processed: 15,000 ✓
VLM calls: 250
Time estimate: ~5 minutes
Unique tasks: ~220
Efficiency gain: 60x faster
```
## Key Points
1. **All frames get labeled**: Every frame gets a `task_index_high_level`
2. **Only sampled frames call VLM**: Huge efficiency gain
3. **Temporal coherence**: Nearby frames share the same task
4. **Episode-aware**: Always samples episode starts
5. **Configurable**: Adjust `--sample-interval` based on your needs
## Choosing Your Sampling Interval
| Use Case | Recommended Interval | Why |
|----------|---------------------|-----|
| Quick testing | 2.0s | Fastest iteration |
| Standard training | 1.0s | Good balance |
| High-quality dataset | 0.5s | Better coverage |
| Fine-grained control | 0.33s | Very detailed |
| Dense annotations | 0.1s | Nearly every frame |
**Rule of thumb**: Match your sampling interval to your typical skill duration.
If skills last 1-3 seconds, sampling every 1 second captures each skill multiple times.
@@ -1,138 +0,0 @@
#!/usr/bin/env python
"""
Example demonstrating how to use the ActionTokenizerProcessorStep to tokenize actions.
This example shows how to:
1. Load a dataset with action data
2. Apply the action tokenizer processor to tokenize actions with proper padding/truncation
3. Access both the tokenized actions and the attention mask
4. Decode tokenized actions back to their original form
"""
import torch
from transformers import AutoProcessor
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.processor.core import EnvTransition, TransitionKey
from lerobot.processor.tokenizer_processor import ActionTokenizerProcessorStep
from lerobot.utils.constants import ACTION_TOKEN_MASK
# Define delta timestamps for the dataset
delta_timestamps = {
'action': [
0.0, 0.03333333333333333, 0.06666666666666667, 0.1, 0.13333333333333333,
0.16666666666666666, 0.2, 0.23333333333333334, 0.26666666666666666, 0.3,
0.3333333333333333, 0.36666666666666664, 0.4, 0.43333333333333335,
0.4666666666666667, 0.5, 0.5333333333333333, 0.5666666666666667, 0.6,
0.6333333333333333, 0.6666666666666666, 0.7, 0.7333333333333333,
0.7666666666666667, 0.8, 0.8333333333333334, 0.8666666666666667, 0.9,
0.9333333333333333, 0.9666666666666667, 1.0, 1.0333333333333334,
1.0666666666666667, 1.1, 1.1333333333333333, 1.1666666666666667, 1.2,
1.2333333333333334, 1.2666666666666666, 1.3, 1.3333333333333333,
1.3666666666666667, 1.4, 1.4333333333333333, 1.4666666666666666, 1.5,
1.5333333333333334, 1.5666666666666667, 1.6, 1.6333333333333333
]
}
# Load the dataset
print("Loading dataset...")
dataset = LeRobotDataset(
repo_id="local",
root="/fsx/jade_choghari/outputs/pgen_annotations1",
delta_timestamps=delta_timestamps
)
# Create a dataloader
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=4,
shuffle=True,
)
# Get a batch of data
batch = next(iter(dataloader))
action_data = batch["action"] # Shape: (batch_size, action_horizon, action_dim)
print(f"\nOriginal action shape: {action_data.shape}")
print(f"Original action data (first sample, first timestep):\n{action_data[0, 0]}")
# Method 1: Using the tokenizer directly (as in fast_tokenize.py)
print("\n" + "="*80)
print("Method 1: Direct tokenizer usage")
print("="*80)
tokenizer = AutoProcessor.from_pretrained("physical-intelligence/fast", trust_remote_code=True)
# Tokenize directly
tokens = tokenizer(action_data)
print(f"\nDirect tokenization result type: {type(tokens)}")
print(f"Tokens shape/length: {tokens.shape if isinstance(tokens, torch.Tensor) else len(tokens)}")
# Decode
decoded_actions = tokenizer.decode(tokens)
print(f"Decoded actions shape: {decoded_actions.shape}")
reconstruction_error = torch.abs(action_data - decoded_actions).mean()
print(f"Mean absolute reconstruction error: {reconstruction_error.item():.6f}")
# Method 2: Using the ActionTokenizerProcessorStep with proper padding/truncation
print("\n" + "="*80)
print("Method 2: Using ActionTokenizerProcessorStep (with padding & mask)")
print("="*80)
# Create the action tokenizer processor step
action_tokenizer_processor = ActionTokenizerProcessorStep(
tokenizer_name="physical-intelligence/fast",
trust_remote_code=True,
max_action_tokens=32, # Maximum number of tokens per action
)
# Create a transition with the action data
transition = {
TransitionKey.ACTION: action_data,
TransitionKey.OBSERVATION: {}, # Empty for this example
}
# Apply the processor
processed_transition = action_tokenizer_processor(transition)
# Extract tokenized actions and mask
tokenized_actions = processed_transition[TransitionKey.ACTION]
complementary_data = processed_transition[TransitionKey.COMPLEMENTARY_DATA]
action_mask = complementary_data[ACTION_TOKEN_MASK]
print(f"\nTokenized actions shape: {tokenized_actions.shape}") # (batch_size, max_action_tokens)
print(f"Action mask shape: {action_mask.shape}") # (batch_size, max_action_tokens)
print(f"Tokenized actions dtype: {tokenized_actions.dtype}")
print(f"Action mask dtype: {action_mask.dtype}")
# Show token statistics
print(f"\nFirst sample tokens: {tokenized_actions[0]}")
print(f"First sample mask: {action_mask[0]}")
num_real_tokens = action_mask[0].sum().item()
print(f"Number of real tokens (non-padding): {num_real_tokens}")
print(f"Number of padding tokens: {action_mask.shape[1] - num_real_tokens}")
# Decode using the mask
print("\nDecoding tokenized actions...")
decoded_with_processor = tokenizer.decode(tokenized_actions)
print(f"Decoded actions shape: {decoded_with_processor.shape}")
# Calculate reconstruction error
reconstruction_error_processor = torch.abs(action_data - decoded_with_processor).mean()
print(f"Mean absolute reconstruction error: {reconstruction_error_processor.item():.6f}")
# Show that masking works correctly
print("\n" + "="*80)
print("Mask demonstration")
print("="*80)
for i in range(min(4, tokenized_actions.shape[0])):
mask_i = action_mask[i]
num_real = mask_i.sum().item()
print(f"Sample {i}: {num_real} real tokens, {len(mask_i) - num_real} padding tokens")
print("\n" + "="*80)
print("Action tokenization example completed successfully!")
print("="*80)
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
-143
View File
@@ -1,143 +0,0 @@
# Example: Synthetic Data Generation with Sampling
## Quick Start
### 1. Test with 100 frames and 1 second sampling
```bash
python examples/dataset/annotate_pgen.py \
--data-dir /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace \
--model Qwen/Qwen2-VL-7B-Instruct \
--num-samples 100 \
--sample-interval 1.0 \
--output-dir ./outputs/test_pgen
```
**Expected behavior** (assuming 30 fps):
- Total frames: 100
- Frames sampled: ~4 (every 30 frames = 1 second)
- Efficiency: 96% fewer VLM calls
- Output: All 100 frames get `task_index_high_level`, but only 4 unique dialogues generated
### 2. Process full dataset with different sampling rates
#### Conservative (every 2 seconds)
```bash
python examples/dataset/annotate_pgen.py \
--data-dir /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace \
--model Qwen/Qwen2-VL-7B-Instruct \
--sample-interval 2.0 \
--output-dir ./outputs/pgen_2s
```
#### Standard (every 1 second) - **RECOMMENDED**
```bash
python examples/dataset/annotate_pgen.py \
--data-dir /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace \
--model Qwen/Qwen2-VL-7B-Instruct \
--sample-interval 1.0 \
--output-dir ./outputs/pgen_1s
```
#### Fine-grained (every 0.5 seconds)
```bash
python examples/dataset/annotate_pgen.py \
--data-dir /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace \
--model Qwen/Qwen2-VL-7B-Instruct \
--sample-interval 0.5 \
--output-dir ./outputs/pgen_0.5s
```
## Performance Estimates
For a dataset with:
- 100 episodes
- 10 seconds per episode (average)
- 30 fps
- Total frames: 30,000
| Sampling Interval | Frames Sampled | % Sampled | Speedup | Time Estimate |
|-------------------|----------------|-----------|---------|---------------|
| Every frame (0.033s) | 30,000 | 100% | 1x | ~10 hours |
| 0.5 seconds | 2,000 | 6.7% | 15x | ~40 min |
| **1.0 seconds** | **1,000** | **3.3%** | **30x** | **~20 min** |
| 2.0 seconds | 500 | 1.7% | 60x | ~10 min |
*Note: Times are approximate and depend on GPU, model size, and generation speed*
## Understanding the Output
### Console Output Example
```
[cyan]Generating synthetic data for 30000 frames...[/cyan]
[cyan]Sampling interval: 1.0s (fps: 30)[/cyan]
Generating synthetic dialogue: 100%|████████| 30000/30000 [20:15<00:00, 24.68it/s]
[green]✓ Sampled 1000 frames out of 30000 (3.3%)[/green]
[green]✓ Generated 450 unique high-level tasks[/green]
```
### What happens:
1. **Frame 0 (t=0.0s)**: Generate dialogue → Task index 0
2. **Frames 1-29 (t=0.033s-0.967s)**: Reuse task index 0
3. **Frame 30 (t=1.0s)**: Generate new dialogue → Task index 1
4. **Frames 31-59 (t=1.033s-1.967s)**: Reuse task index 1
5. And so on...
### Result:
- Every frame has a `task_index_high_level`
- Only sampled frames have unique dialogues generated
- Intermediate frames inherit from the most recent sample
- Maintains temporal coherence within episodes
## Checking Your Results
After running, verify the output:
```bash
# Check the generated tasks
python -c "
import pandas as pd
from pathlib import Path
tasks = pd.read_parquet('outputs/test_pgen/meta/tasks_high_level.parquet')
print(f'Total unique tasks: {len(tasks)}')
print(f'Sample tasks:')
print(tasks[['user_prompt', 'robot_utterance', 'skill']].head())
"
# Check debug output
head outputs/test_pgen/meta/syn_annotations.jsonl
# Load and verify dataset
python -c "
from lerobot.datasets.lerobot_dataset import LeRobotDataset
ds = LeRobotDataset(repo_id='local_with_high_level_tasks',
root='outputs/test_pgen')
print(f'Dataset has {len(ds)} frames')
print(f'Features: {list(ds.features.keys())}')
assert 'task_index_high_level' in ds.features
print('✓ task_index_high_level feature added successfully!')
"
```
## Common Use Cases
### Development/Testing
```bash
--sample-interval 2.0 # Fast iteration
--num-samples 500 # Small subset
```
### Production Training
```bash
--sample-interval 1.0 # Good coverage
# Process all samples (no --num-samples)
```
### High-Quality Dataset
```bash
--sample-interval 0.5 # Fine-grained
--temperature 0.6 # More consistent
--model Qwen/Qwen3-VL-30B-A3B-Instruct # Larger model
```
-25
View File
@@ -1,25 +0,0 @@
import numpy as np
from transformers import AutoProcessor
import torch
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
delta_timestamps = {'action': [0.0, 0.03333333333333333, 0.06666666666666667, 0.1, 0.13333333333333333, 0.16666666666666666, 0.2, 0.23333333333333334, 0.26666666666666666, 0.3, 0.3333333333333333, 0.36666666666666664, 0.4, 0.43333333333333335, 0.4666666666666667, 0.5, 0.5333333333333333, 0.5666666666666667, 0.6, 0.6333333333333333, 0.6666666666666666, 0.7, 0.7333333333333333, 0.7666666666666667, 0.8, 0.8333333333333334, 0.8666666666666667, 0.9, 0.9333333333333333, 0.9666666666666667, 1.0, 1.0333333333333334, 1.0666666666666667, 1.1, 1.1333333333333333, 1.1666666666666667, 1.2, 1.2333333333333334, 1.2666666666666666, 1.3, 1.3333333333333333, 1.3666666666666667, 1.4, 1.4333333333333333, 1.4666666666666666, 1.5, 1.5333333333333334, 1.5666666666666667, 1.6, 1.6333333333333333]}
dataset = LeRobotDataset(repo_id="local", root="/fsx/jade_choghari/outputs/pgen_annotations1", delta_timestamps=delta_timestamps)
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=4,
shuffle=True,
)
batch = next(iter(dataloader))
# Load the tokenizer from the Hugging Face hub
tokenizer = AutoProcessor.from_pretrained("physical-intelligence/fast", trust_remote_code=True)
# Tokenize & decode action chunks (we use dummy data here)
action_data = batch["action"] # one batch of action chunks
tokens = tokenizer(action_data) # tokens = list[int]
decoded_actions = tokenizer.decode(tokens)
print("tokenized actions: ", tokens)
-17
View File
@@ -1,17 +0,0 @@
from transformers import AutoProcessor, PaliGemmaForConditionalGeneration
model_id = "google/paligemma-3b-pt-224"
model = PaliGemmaForConditionalGeneration.from_pretrained(model_id)
processor = AutoProcessor.from_pretrained(model_id)
breakpoint()
prefix_output = model.language_model.forward(
inputs_embeds=inputs_embeds[0],
attention_mask=attention_mask,
position_ids=position_ids,
adarms_cond=adarms_cond[0] if adarms_cond is not None else None,
)
prefix_past_key_values = prefix_output.past_key_values
# prefix_output to be used for the language head
# shape: [batch_size, seq_len, hidden_size] with hidden_size = 2048
prefix_output = prefix_output.last_hidden_state
-91
View File
@@ -1,91 +0,0 @@
import torch
from huggingface_hub import HfApi
import lerobot
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
# import make_pre_post_processors
from lerobot.policies.factory import make_pre_post_processors
from lerobot.policies.pi05.configuration_pi05 import PI05Config
from lerobot.policies.factory import make_policy, make_policy_config
from lerobot.configs.policies import PreTrainedConfig
cfg = PreTrainedConfig.from_pretrained(
pretrained_name_or_path="/fsx/jade_choghari/outputs/pi0_training/checkpoints/last/pretrained_model",
)
cfg.dtype = "bfloat16"
pre_processor, post_processor = make_pre_post_processors(
policy_cfg=cfg,
pretrained_path="/fsx/jade_choghari/outputs/pi0_training/checkpoints/last/pretrained_model",
)
delta_timestamps = {'action': [0.0, 0.03333333333333333, 0.06666666666666667, 0.1, 0.13333333333333333, 0.16666666666666666, 0.2, 0.23333333333333334, 0.26666666666666666, 0.3, 0.3333333333333333, 0.36666666666666664, 0.4, 0.43333333333333335, 0.4666666666666667, 0.5, 0.5333333333333333, 0.5666666666666667, 0.6, 0.6333333333333333, 0.6666666666666666, 0.7, 0.7333333333333333, 0.7666666666666667, 0.8, 0.8333333333333334, 0.8666666666666667, 0.9, 0.9333333333333333, 0.9666666666666667, 1.0, 1.0333333333333334, 1.0666666666666667, 1.1, 1.1333333333333333, 1.1666666666666667, 1.2, 1.2333333333333334, 1.2666666666666666, 1.3, 1.3333333333333333, 1.3666666666666667, 1.4, 1.4333333333333333, 1.4666666666666666, 1.5, 1.5333333333333334, 1.5666666666666667, 1.6, 1.6333333333333333]}
dataset = LeRobotDataset(repo_id="local", root="/fsx/jade_choghari/outputs/pgen_annotations1", delta_timestamps=delta_timestamps)
# rename map --rename_map='{
# "observation.images.side": "observation.images.base_0_rgb",
# "observation.images.up": "observation.images.left_wrist_0_rgb"
# }'
rename_map = {
"observation.images.side": "observation.images.base_0_rgb",
"observation.images.up": "observation.images.left_wrist_0_rgb"
}
policy = make_policy(
cfg=cfg,
ds_meta=dataset.meta,
rename_map=rename_map,
)
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=4,
shuffle=True,
)
batch = next(iter(dataloader))
batch = pre_processor(batch)
policy.train()
# run inference
# action = policy.select_action(batch)
loss, loss_dict = policy.forward(batch)
breakpoint()
# import requests
# from PIL import Image
# from transformers import AutoProcessor
# model = policy.model.paligemma_with_expert.paligemma
# model = model.to(device="cuda", dtype=torch.bfloat16)
# model.eval()
# prompt = "Describe this image."
# url = "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/pipeline-cat-chonk.jpeg"
# image = Image.open(requests.get(url, stream=True).raw)
# processor = AutoProcessor.from_pretrained(
# "google/paligemma-3b-pt-224",
# )
# inputs = processor(image, prompt, return_tensors="pt").to(model.device)
# print("generating...")
# output = model.generate(
# **inputs,
# max_new_tokens=50,
# use_cache=True, # default dynamic cache
# )
# print(processor.decode(output[0], skip_special_tokens=True))
# # other model
# from transformers import PaliGemmaForConditionalGeneration
# model = PaliGemmaForConditionalGeneration.from_pretrained(
# "google/paligemma2-3b-pt-224",
# torch_dtype=torch.bfloat16,
# device_map="auto",
# )
# model.eval()
# print("generating...")
# output = model.generate(
# **inputs,
# max_new_tokens=100,
# use_cache=True, # default dynamic cache
# )
# print("Model 2 output:")
# print(processor.decode(output[0], skip_special_tokens=True))
-23
View File
@@ -1,23 +0,0 @@
import torch
from huggingface_hub import HfApi
import lerobot
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
dataset = LeRobotDataset(repo_id="local", root="/fsx/jade_choghari/outputs/pgen_annotations1")
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=32,
shuffle=True,
)
batch = next(iter(dataloader))
print(batch.keys())
print(batch['task_index_high_level'].shape)
print(batch['task_index_high_level'])
print(batch['user_prompt'][0])
print(batch['robot_utterance'][0])
print(batch['task'][0])
breakpoint()
-18
View File
@@ -1,18 +0,0 @@
import torch
from huggingface_hub import HfApi
import lerobot
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
dataset = LeRobotDataset(repo_id="lerobot/libero")
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=4,
shuffle=True,
)
batch = next(iter(dataloader))
print(batch.keys())
breakpoint()
-159
View File
@@ -1,159 +0,0 @@
## One-sentence answer
> `make_att_2d_masks(prefix_pad_masks, prefix_att_masks)` builds the **actual 2D attention mask** `[B, L, L]` that tells the transformer **which token positions may attend to which others**, combining **padding** and **causality**.
Everything else youve seen so far was just metadata.
---
## What goes in
### Inputs
```python
prefix_pad_masks # shape [B, L]
prefix_att_masks # shape [B, L]
```
Where:
* `prefix_pad_masks[b, i] = True`
→ token `i` exists (not padding)
* `prefix_att_masks[b, i] = False`
→ token `i` is **bidirectional**
* `prefix_att_masks[b, i] = True`
→ token `i` is **causal (autoregressive)**
---
## What comes out
```python
att_2d_prefix # shape [B, L, L]
```
Each entry:
```text
att_2d_prefix[b, i, j] = True
```
means:
> “In batch `b`, **token i (query)** is allowed to attend to **token j (key)**.”
---
## How it is constructed (conceptually)
For **each batch b**, **each query position i**, **each key position j**:
```python
if not prefix_pad_masks[b, j]:
att[b, i, j] = False # cannot attend to padding
else if not prefix_att_masks[b, i]:
att[b, i, j] = True # bidirectional token → can see all real tokens
else:
att[b, i, j] = (j <= i) # causal token → can see only past + itself
```
Thats it.
---
## Tiny concrete example (exactly matching your code)
Suppose:
```python
prefix_pad_masks[0] = [T, T, T, T, T, F]
prefix_att_masks[0] = [F, F, F, T, T, T]
```
Tokens:
```
0: IMG
1: IMG
2: LANG
3: SUB0
4: SUB1
5: PAD
```
---
### Resulting `att_2d_prefix[0]`
`✓ = True, ✗ = False`
| Q \ K | 0 | 1 | 2 | 3 | 4 | 5 |
| ---------- | - | - | - | - | - | - |
| 0 (bi) | ✓ | ✓ | ✓ | ✓ | ✓ | ✗ |
| 1 (bi) | ✓ | ✓ | ✓ | ✓ | ✓ | ✗ |
| 2 (bi) | ✓ | ✓ | ✓ | ✓ | ✓ | ✗ |
| 3 (causal) | ✓ | ✓ | ✓ | ✓ | ✗ | ✗ |
| 4 (causal) | ✓ | ✓ | ✓ | ✓ | ✓ | ✗ |
| 5 (pad) | ✗ | ✗ | ✗ | ✗ | ✗ | ✗ |
---
## Why this matters for your training code
This line:
```python
att_2d_prefix_4d = self._prepare_attention_masks_4d(att_2d_prefix)
```
Converts `[B, L, L] → [B, 1, L, L]` and possibly flips True/False to `0/-inf`.
This is **exactly what Paligemma uses inside self-attention**.
---
## Key implications (VERY important)
### 1️⃣ This mask does **not isolate token groups**
* Bidirectional tokens can attend to **everything**
* Causal tokens only restrict *their own row*
So **flow/action tokens must be blocked separately**.
---
### 2️⃣ This is why your AR subtask prediction works
* Subtask tokens are causal
* Output at position `i` predicts token `i+1`
* Padding is fully ignored
---
### 3️⃣ Inference behavior
When `subtask_tokens = None`:
* `prefix_att_masks` contains only `False`
* `att_2d_prefix` becomes **fully bidirectional**
* No AR behavior remains
Exactly what you want.
---
## One-sentence takeaway (commit this)
> `make_att_2d_masks` fuses **padding** and **causality** into a concrete `[B, L, L]` attention matrix that the transformer actually uses.
If you want next, I can:
* inspect `make_att_2d_masks()` source with you
* show how to block **flow → subtask** attention
* explain how this changes when suffix tokens are added
* help you refactor this into a cleaner “grouped attention” API
Youre now at the point where the models behavior should feel *predictable*, not magical.
-334
View File
@@ -1,334 +0,0 @@
Generate annotate_pgen.py using Qwen for synthetic data generation
You are writing a Python script called annotate_pgen.py.
This script generates synthetic user prompts (_t) and robot utterances (u_t) for Hi Robotstyle hierarchical policy training, using Qwen 3vl as the generator model (pgen).
SCRIPT PURPOSE
The script must:
Load Dlabeled which is a LeRobot Dataset that has been annotate using the annotate.py script, which contains:
images: list of image paths at time t
skill_current: the annotated skill label (ℓ̂_t)
skill_history: list of previous skill labels (ℓ̂₀ … ℓ̂_{t1}), those where annotated, and you can find details on them stored in teh dataset inside the the DATA_PATH/meta/skills.json
you will find something like
{
"coarse_description": "pink lego brick into the transparent box",
"skill_to_task_index": {
"robot arm picks up pink lego brick": 19,
"robot arm approaches transparent box": 3,
"robot arm retracts from transparent box": 28,
"robot arm moves towards pink lego brick": 12,
"robot arm releases red lego brick into box": 26,
"robot arm releases red lego brick into transparent box": 27,
"robot arm closes gripper to pick up the pink lego brick": 5,
"robot arm lifts the pink lego brick": 7,
etc..
},
"episodes": {
"0": {
"episode_index": 0,
"description": "pink lego brick into the transparent box",
"skills": [
{
"name": "robot arm moves towards pink lego brick",
"start": 0.0,
"end": 1.8
},
{
"name": "robot arm picks up pink lego brick",
"start": 1.8,
"end": 3.1
},
{
"name": "robot arm moves towards transparent box",
"start": 3.1,
"end": 5.5
},
{
"name": "robot arm releases pink lego brick into transparent box",
"start": 5.5,
"end": 7.0
},
{
"name": "robot arm retracts from transparent box",
"start": 7.0,
"end": 10.1
}
]
},
"1": {
"episode_index": 1,
"description": "pink lego brick into the transparent box",
"skills": [
{
"name": "robot arm moves towards red lego brick",
"start": 0.0,
"end": 1.2
},
{
"name": "robot arm picks up red lego brick",
"start": 1.2,
"end": 2.0
},
{
"name": "robot arm moves towards transparent box",
"start": 2.0,
"end": 3.8
},
{
"name": "robot arm places red lego brick into transparent box",
"start": 3.8,
"end": 5.0
},
{
"name": "robot arm moves away from transparent box",
"start": 5.0,
"end": 8.9
}
]
},
notice how task_description: is a high-level description (e.g., "make a sandwich") stored in description for each episode
For each sample, call Qwen VLM to generate:
synthetic user prompt _t
synthetic robot response u_t
Save results to D_syn in Parquet format insdie DATA_PATH/meta/tasks.parquet ; note tasks.parquet already contains the other tasks, so you need to update
Should be modular, clean, easy to extend, with:
a PGEN_PROMPT_TEMPLATE
a construct_prompt() method
a call_qwen() method
a annotate_sample() method
a CLI entrypoint (if __name__ == "__main__":)
📦 INPUT FORMAT (Dlabeled)
The script should expect Dlabeled as a .jsonl file where each line has:
{
"episode_id": "ep_001",
"t": 37,
"images": ["path/to/cam0_t.jpg", "path/to/cam1_t.jpg"],
"skill_current": "pick up the KitKat",
"skill_history": ["open fridge", "pick up lettuce", "place lettuce"],
"task_description": "making a sandwich"
}
📤 OUTPUT FORMAT (D_syn)
Each line of synthetically generated data should be:
{
"episode_id": "ep_001",
"t": 37,
"images": ["path/to/cam0_t.jpg", "path/to/cam1_t.jpg"],
"skill_current": "pick up the KitKat",
"skill_history": [...],
"user_prompt": "Can you grab me something sweet?",
"robot_utterance": "Sure, I can pick up the KitKat.",
"task_description": "making a sandwich"
}
Store as syn_annotations.jsonl. for debugging
🧠 pgen MODEL (Qwen) REQUIREMENTS
Use HuggingFace Transformers:
Qwen/Qwen2-VL-7B-Instruct (or any Qwen2-VL Vision-Language model available)
Use the image + text chat interface
Vision inputs should be loaded with PIL
Use a single forward pass that outputs BOTH _t and u_t in a structured JSON
📝 PROMPT FORMAT FOR pgen
Create a template like:
You are a robot-assistant dialogue generator for hierarchical robot policies.
You will receive:
- A list of images showing the current robot scene.
- The high-level task: {task_description}
- Previous skill steps completed: {skill_history}
- The next skill to be performed by the robot: {skill_current}
Generate two things in JSON:
1. "user_prompt": a natural-sounding user request that logically leads to the robot performing the skill "{skill_current}" given the task and history.
2. "robot_utterance": a natural robot reply acknowledging or clarifying the request.
The responses must be grounded in the visual scene, the task, and the skill history.
Respond ONLY in JSON:
{
"user_prompt": "...",
"robot_utterance": "..."
}
This resposne will have a corresponsing task_index, and the task will be saved in task.parqeut and you must update each dataset parquet in for example /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace/data/chunk-000/
file-000.parquet to include this new feature called task_index_high_level consider udpatign the metadata in info.json as well
📌 LOGIC REQUIRED
construct_prompt(sample)
Loads sample dict
Inserts:
task_description
skill_history
skill_current
Returns a full text prompt string
call_qwen(images, prompt)
Loads images into Qwen-VL multimodal input format
Calls model.generate
Parses JSON output
annotate_sample(sample)
Builds prompt
Calls Qwen
Returns augmented sample with user_prompt + robot_utterance
🚀 CLI Usage
The script should run as:
python annotate_pgen.py \
--output-dir PATH \
--model Qwen/Qwen2-VL-7B-Instruct \
--repo-id lerobot/svla_so101_pickplace \
--model Qwen/Qwen3-VL-30B-A3B-Instruct \
--batch-size 1
Include arguments via argparse.
🔧 OTHER REQUIREMENTS
Use tqdm for progress bars
Log errors gracefully and continue
Support GPU acceleration (device="cuda")
Cache model loading so it's not reloaded every call
Make the prompt deterministic but allow temperature parameter
Add a flag --num-image-views-per-sample
Add automatic JSON parsing with helpful error messages
🎯 FINAL DELIVERABLE
Cursor must now generate:
A full Python file named annotate_pgen.py implementing the above functionality end-to-end.
It should be production-ready, runnable on real data, cleanly structured, and easy to modify.
from the paper:
Next, we use a large vision-language model (VLM) pgen
to produce synthetic user prompts and interjections t,
and corresponding robot utterance ut. Given Dlabeled, we
prompt pgen with both the visual context I1
t ,...,In
t and the
skill labelˆ
t (e.g., pick up the lettuce). pgen then imag-
ines an appropriate interaction that might have led toˆ
t in a
real user interaction: it generates possible user prompts t
(e.g., “Can you add some lettuce for me?”) along with the
robots verbal responses and clarifications ut. We detail the
A. Synthetic Data Generation
A.1. Scenario and Response Categorization
To ensure the quality and diversity of the synthetic data,
we incorporate structured scenario classification and re-
sponse categorization into the prompt design for pgen, fol-
lowing (Stephan et al., 2024). Specifically, we classify
interactions into different scenario types, such as nega-
tive task (where the user instructs the robot what not to
do), situated correction (where the user adjusts an earlier
command based on the evolving task state), and specific
constraint (where the user specifies particular constraints,
such as dietary preferences). In addition, we categorize
the robots responses into types such as simple confirma-
tions, clarifications, and error handling. These classifica-
tions guide the generation process to ensure a broad range
of user-robot interactions.
A.2. Prompt Construction for Contextual Grounding
In prompt P, we include a detailed description of the task
(e.g., bussing a table, making a sandwich, grocery shop-
ping) and instruct the model to ground responses in visual
observations and prior context. A key advantage of lever-
aging large pretrained VLMs is their ability to incorporate
world knowledge when generating interactions. For in-
stance, the model can infer dietary constraints when gener-
ating prompts for sandwich-making, producing user com-
mands such as “Can you make a sandwich for me? Im
lactose intolerant” and an appropriate robot response like
“Sure, I wont put cheese on it.” Similarly, it can reason
over ambiguous or implicit requests, such as inferring that
“I want something sweet” in a grocery shopping scenario
should lead to suggestions like chocolate or candy.
To maintain consistency in multi-step tasks, we condition
pgen on prior skill labels within an episodeˆ
ˆ
0,...,
t1,
allowing it to generate coherent user commands that
account for past actions. For instance, if the robot
has already placed lettuce and tomato on a sandwich,
the generated user prompt might request additional in-
gredients that logically follow. This ensures that the
synthetic interactions reflect realistic task progression
rather than isolated commands. As such, we leverage
ˆ
ˆ
ˆ
pgen(t,ut|I1
t ,...,In
t ,
0,...,
t1,
t,P) to produce a richer,
more diverse synthetic dataset Dsyn that provides mean-
ingful supervision for training our high-level policy.
While in this work we generate a separate Dsyn and train
a separate high-level policy for each task (e.g., sandwich
making vs. table cleaning) for clarity and ease of bench-
marking, the architecture is readily amenable to a unified
multi-task formulation. In principle, the same hierarchical
approach could be used to train a single high-level policy
across a multitude of tasks, facilitating knowledge transfer
The result should be a new LeRobotDataset with a new feature called task_index_high_level inside each dataset parquet
-11
View File
@@ -1,11 +0,0 @@
python examples/dataset/annotate.py \
--repo-id jadechoghari/collect-data \
--video-key observation.images.base \
--model Qwen/Qwen3-VL-30B-A3B-Instruct \
--episodes 16 22
# python examples/dataset/annotate.py \
# --repo-id lerobot/svla_so101_pickplace \
# --video-key observation.images.side \
# --model Qwen/Qwen3-VL-30B-A3B-Instruct \
# --episodes 5
-43
View File
@@ -1,43 +0,0 @@
#!/bin/bash
# Example script to run synthetic data generation with Qwen VLM
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="jadechoghari/collect-data"
MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct"
# Alternative: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen"
BATCH_SIZE=32
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # Generate dialogue every 1 second (all episodes processed)
# Run synthetic data generation (processes ALL episodes)
python examples/dataset/annotate_pgen.py \
--repo-id "$REPO_ID" \
--model "$MODEL" \
--output-dir "$OUTPUT_DIR" \
--temperature "$TEMPERATURE" \
--batch-size "$BATCH_SIZE" \
--sample-interval "$SAMPLE_INTERVAL" \
--image-key observation.images.base \
--num-image-views-per-sample 1
# For faster testing, increase sample interval:
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
# To push to hub after generation:
# Add --push-to-hub flag
# Efficient batch processing: 4 episodes at once
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --video-mode \
# --video-key observation.images.up \
# --video-batch-size "$BATCH_SIZE" \
# --sample-interval 1.0
-802
View File
@@ -1,802 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SARM Subtask Annotation using local GPU (Qwen3-VL).
This script implements the annotation approach from the SARM paper using local GPU inference:
"SARM: Stage-Aware Reward Modeling for Long Horizon Robot Manipulation"
Paper: https://arxiv.org/pdf/2509.25358
What it does:
1. Takes videos from a LeRobot dataset
2. Uses Qwen3-VL running locally on GPU to identify when subtasks occur
3. Saves subtask timestamps to the dataset metadata
4. Optionally pushes the annotated dataset to HuggingFace Hub
SARM trains reward models that predict:
- Stage: Which subtask is currently being executed (discrete classification)
- Progress: How far along the subtask we are (continuous 0-1)
Supports three annotation modes:
1. No annotations (no args): Auto-creates single sparse "task" stage covering full episode.
Use with SARM config annotation_mode="single_stage" for simple tasks.
2. Dense-only (--dense-only --dense-subtasks): Dense annotations from VLM, auto-generated
single sparse "task" stage. Use with annotation_mode="dense_only".
3. Dual mode (--sparse-subtasks + --dense-subtasks): Both sparse and dense annotations
from VLM. Use with annotation_mode="dual".
Requirements:
- GPU with sufficient VRAM (16GB+ recommended for 30B model)
- `pip install transformers, torch, qwen-vl-utils`
Run with:
```bash
python examples/dataset_annotation/subtask_annotation.py \
--repo-id your-username/your-dataset \
--sparse-subtasks "Do ..." \
--dense-subtasks "Do task 1, Do task 2, Do task 3" \
--video-key observation.images.base \
--push-to-hub
```
"""
import argparse
import json
import multiprocessing as mp
import re
import subprocess
import tempfile
import textwrap
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import cv2
import pandas as pd
import torch
from qwen_vl_utils import process_vision_info
from rich.console import Console
from transformers import AutoProcessor, Qwen3VLMoeForConditionalGeneration
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.policies.sarm.sarm_utils import (
Subtask,
SubtaskAnnotation,
Timestamp,
compute_temporal_proportions,
)
def create_sarm_prompt(subtask_list: list[str]) -> str:
subtask_str = "\n".join([f" - {name}" for name in subtask_list])
return textwrap.dedent(f"""\
# Role
You are a Robotics Vision System specializing in temporal action localization for robot manipulation. Your job is to segment a single demonstration video into distinct, non-overlapping atomic actions from a fixed subtask list.
# Subtask Label Set (Closed Vocabulary)
You must strictly identify the video segments using ONLY the following labels. Do not create new labels or modify existing ones:
[
{subtask_str}
]
The video shows one successful execution of all subtasks in a logical order.
# Ground-Truth Semantics (Very Important)
Use **visual state changes** to define when a subtask starts and ends. Do NOT assume equal durations for the subtasks.
- A subtask **starts** at the first frame where the robot's motion clearly initiates that subtask.
- A subtask **ends** at the first frame where that specific action is visually completed and the manipulated object reaches a temporary, stable configuration.
If there are short pauses or micro-motions that don't clearly correspond to a new subtask, they belong to the **current** subtask.
# Hard Constraints & Logic
1. **Continuous Coverage (No Gaps):**
- The entire video duration from "00:00" to the final timestamp must be covered by subtasks.
- There can be no gaps between subtasks.
- If there is any idle or ambiguous time between clear actions, extend the *preceding* subtask to cover it.
2. **Boundary Consistency:**
- The `"end"` timestamp of one subtask must be exactly equal to the `"start"` timestamp of the next subtask.
- Boundaries must coincide with a real visual state transition, not just a convenient time split.
3. **Chronological Order, One Occurrence Each:**
- This is a single successful demonstration.
- Each subtask from the vocabulary appears **exactly once**, in the correct logical order.
- **Durations may be very different** between subtasks. Never assume they are similar lengths. Base all boundaries only on the video.
4. **Reject Uniform Segmentation (Important):**
- Do NOT simply divide the video into equal or nearly equal time chunks.
- If your boundaries would result in subtasks with similar durations (e.g. all around 5 seconds), treat this as evidence that your segmentation is wrong and refine the boundaries.
- Only use nearly equal durations if the video truly shows each subtask taking the same amount of time (this is very rare).
5. **Timestamps:**
- Timestamps must be in `"MM:SS"` format.
- The first subtask always starts at `"00:00"`.
- The last subtask ends at the final visible frame of the video.
# Step 1 — Textual Timeline (must do this first)
First, write a extensive and detailed textual timeline describing what happens in the video with approximate timestamps.
For each subtask, include:
- its name
- an approximate start and end time,
- an description of the visual event at the boundary (e.g. "shirt fully folded to the left", "robot rotates folded shirt 90 degrees").
Format this as a bullet list.
# Step 2 — JSON Output (final answer)
After the textual timeline, output **only** valid JSON with this structure.
The JSON **must** be consistent with the textual timeline above:
{{
"subtasks": [
{{
"name": "EXACT_NAME_FROM_LIST",
"timestamps": {{
"start": "MM:SS",
"end": "MM:SS"
}}
}},
{{
"name": "EXACT_NAME_FROM_LIST",
"timestamps": {{
"start": "MM:SS",
"end": "MM:SS"
}}
}}
]
}}
Do not add any extra keys to the JSON.
""")
class VideoAnnotator:
"""Annotates robot manipulation videos using local Qwen3-VL model on GPU"""
def __init__(
self,
subtask_list: list[str],
model_name: str = "Qwen/Qwen3-VL-30B-A3B-Instruct",
device: str = "cuda",
torch_dtype: torch.dtype = torch.bfloat16,
model: "Qwen3VLMoeForConditionalGeneration | None" = None,
processor: "AutoProcessor | None" = None,
):
"""
Initialize the video annotator with local model.
Args:
subtask_list: List of allowed subtask names (for consistency)
model_name: Hugging Face model name (default: Qwen/Qwen3-VL-30B-A3B-Instruct)
device: Device to use (cuda, cpu)
torch_dtype: Data type for model (bfloat16, float16, float32)
model: Pre-loaded model instance (optional, to share between annotators)
processor: Pre-loaded processor instance (optional, to share between annotators)
"""
self.subtask_list = subtask_list
self.prompt = create_sarm_prompt(subtask_list)
self.console = Console()
self.device = device
# Use provided model/processor or load new ones
if model is not None and processor is not None:
self.model = model
self.processor = processor
self.console.print(f"[green]✓ Using shared model on {device}[/green]")
else:
self.console.print(f"[cyan]Loading model: {model_name}...[/cyan]")
self.model = Qwen3VLMoeForConditionalGeneration.from_pretrained(
model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True
)
self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
self.console.print(f"[green]✓ Model loaded successfully on {device}[/green]")
def extract_episode_segment(
self, file_path: Path, start_timestamp: float, end_timestamp: float, target_fps: int = 1
) -> Path:
"""
Extract a specific episode segment from concatenated video.
Uses minimal compression to preserve quality for local inference.
Args:
file_path: Path to the concatenated video file
start_timestamp: Starting timestamp in seconds (within this video file)
end_timestamp: Ending timestamp in seconds (within this video file)
target_fps: Target FPS (default: 1 for faster processing)
Returns:
Path to extracted video file
"""
# Create temporary file for extracted video
tmp_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
tmp_path = Path(tmp_file.name)
tmp_file.close()
try:
# Check if ffmpeg is available
subprocess.run(
["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
except (subprocess.CalledProcessError, FileNotFoundError):
raise RuntimeError("ffmpeg not found, cannot extract episode segment") from e
try:
# Calculate duration
duration = end_timestamp - start_timestamp
self.console.print(
f"[cyan]Extracting episode: {start_timestamp:.1f}s-{end_timestamp:.1f}s ({duration:.1f}s)[/cyan]"
)
# Use ffmpeg to extract segment with minimal quality loss
cmd = [
"ffmpeg",
"-i",
str(file_path),
"-ss",
str(start_timestamp),
"-t",
str(duration),
"-r",
str(target_fps),
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-crf",
"23",
"-an",
"-y",
str(tmp_path),
]
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
# Verify the output file was created and is not empty
if not tmp_path.exists() or tmp_path.stat().st_size == 0:
self.console.print("[red]✗ Video extraction failed (0 bytes) - skipping episode[/red]")
if tmp_path.exists():
tmp_path.unlink()
raise RuntimeError("FFmpeg produced empty video file")
# Show extraction results
file_size_mb = tmp_path.stat().st_size / (1024 * 1024)
# Fail if file is too small (< 100KB likely means extraction failed)
if file_size_mb < 0.1:
self.console.print(
f"[red]✗ Extracted video too small ({file_size_mb:.2f}MB) - skipping episode[/red]"
)
tmp_path.unlink()
raise RuntimeError(f"Video extraction produced invalid file ({file_size_mb:.2f}MB)")
self.console.print(f"[green]✓ Extracted: {file_size_mb:.1f}MB ({target_fps} FPS)[/green]")
return tmp_path
except subprocess.CalledProcessError as e:
raise RuntimeError(f"ffmpeg failed ({e})") from e
def annotate(
self,
file_path: str | Path,
fps: int,
start_timestamp: float = 0.0,
end_timestamp: float | None = None,
max_retries: int = 3,
) -> SubtaskAnnotation:
"""Annotate a video segment using local GPU."""
file_path = Path(file_path)
if end_timestamp is None:
cap = cv2.VideoCapture(str(file_path))
end_timestamp = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) / (cap.get(cv2.CAP_PROP_FPS) or 1)
cap.release()
duration = end_timestamp - start_timestamp
duration_str = f"{int(duration // 60):02d}:{int(duration % 60):02d}"
extracted_path = self.extract_episode_segment(file_path, start_timestamp, end_timestamp, 1)
is_extracted = extracted_path != file_path
try:
messages = [
{"role": "system", "content": [{"type": "text", "text": self.prompt}]},
{
"role": "user",
"content": [
{"type": "video", "video": str(extracted_path), "fps": 1.0},
{
"type": "text",
"text": f"Video is {duration_str} (~{duration:.1f}s). Follow instructions.",
},
],
},
]
for attempt in range(max_retries):
try:
text = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = self.processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(
**inputs, max_new_tokens=1024, do_sample=True, temperature=0.7
)
response = self.processor.batch_decode(
[out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids)],
skip_special_tokens=True,
)[0].strip()
# Extract JSON
if "```json" in response:
response = response.split("```json")[1].split("```")[0]
elif "```" in response:
response = response.split("```")[1].split("```")[0]
try:
return SubtaskAnnotation.model_validate(json.loads(response))
except json.JSONDecodeError:
match = re.search(r"\{.*\}", response, re.DOTALL)
if match:
return SubtaskAnnotation.model_validate(json.loads(match.group()))
raise ValueError("No JSON found")
except Exception as e:
if attempt == max_retries - 1:
raise RuntimeError(f"Failed after {max_retries} attempts") from e
time.sleep(1)
finally:
if is_extracted and extracted_path.exists():
extracted_path.unlink()
def display_annotation(
annotation: SubtaskAnnotation, console: Console, episode_idx: int, fps: int, prefix: str = ""
):
"""Display annotation summary."""
subtask_summary = ", ".join(
f"{s.name}({s.timestamps.start}-{s.timestamps.end})" for s in annotation.subtasks
)
console.print(
f"[green]Episode {episode_idx} {prefix}: {len(annotation.subtasks)} subtasks - {subtask_summary}[/green]"
)
def timestamp_to_seconds(timestamp: str) -> float:
"""Convert MM:SS or SS timestamp to seconds"""
parts = timestamp.split(":")
if len(parts) == 2:
return int(parts[0]) * 60 + int(parts[1])
else:
return int(parts[0])
def save_annotations_to_dataset(
dataset_path: Path, annotations: dict[int, SubtaskAnnotation], fps: int, prefix: str = "sparse"
):
"""Save annotations to LeRobot dataset parquet format."""
from lerobot.datasets.utils import DEFAULT_EPISODES_PATH, load_episodes
episodes_dataset = load_episodes(dataset_path)
if not episodes_dataset or len(episodes_dataset) == 0:
return
episodes_df = episodes_dataset.to_pandas()
cols = [
f"{prefix}_{c}"
for c in [
"subtask_names",
"subtask_start_times",
"subtask_end_times",
"subtask_start_frames",
"subtask_end_frames",
]
]
for col in cols:
episodes_df[col] = None
for ep_idx, ann in annotations.items():
if ep_idx >= len(episodes_df):
continue
names, starts, ends, start_frames, end_frames = [], [], [], [], []
for s in ann.subtasks:
names.append(s.name)
st, et = timestamp_to_seconds(s.timestamps.start), timestamp_to_seconds(s.timestamps.end)
starts.append(st)
ends.append(et)
start_frames.append(int(st * fps))
end_frames.append(int(et * fps))
episodes_df.at[ep_idx, cols[0]] = names
episodes_df.at[ep_idx, cols[1]] = starts
episodes_df.at[ep_idx, cols[2]] = ends
episodes_df.at[ep_idx, cols[3]] = start_frames
episodes_df.at[ep_idx, cols[4]] = end_frames
# Group by file and write
for ep_idx in episodes_df.index:
key = (
episodes_df.loc[ep_idx, "meta/episodes/chunk_index"],
episodes_df.loc[ep_idx, "meta/episodes/file_index"],
)
path = dataset_path / DEFAULT_EPISODES_PATH.format(chunk_index=key[0], file_index=key[1])
if path.exists():
file_df = pd.read_parquet(path)
for col in cols + (
[
"subtask_names",
"subtask_start_times",
"subtask_end_times",
"subtask_start_frames",
"subtask_end_frames",
]
if prefix == "sparse"
else []
):
if col not in file_df.columns:
file_df[col] = None
if ep_idx in annotations:
for col in cols:
file_df.at[ep_idx, col] = episodes_df.loc[ep_idx, col]
if prefix == "sparse": # Legacy columns
for i, legacy in enumerate(
[
"subtask_names",
"subtask_start_times",
"subtask_end_times",
"subtask_start_frames",
"subtask_end_frames",
]
):
file_df.at[ep_idx, legacy] = episodes_df.loc[ep_idx, cols[i]]
file_df.to_parquet(path, engine="pyarrow", compression="snappy")
def generate_auto_sparse_annotations(
dataset: LeRobotDataset, episode_indices: list[int], video_key: str
) -> dict[int, SubtaskAnnotation]:
"""Auto-generate single 'task' stage annotations for all episodes."""
annotations = {}
for ep_idx in episode_indices:
start = float(dataset.meta.episodes[f"videos/{video_key}/from_timestamp"][ep_idx])
end = float(dataset.meta.episodes[f"videos/{video_key}/to_timestamp"][ep_idx])
duration = end - start
end_str = f"{int(duration // 60):02d}:{int(duration % 60):02d}"
annotations[ep_idx] = SubtaskAnnotation(
subtasks=[Subtask(name="task", timestamps=Timestamp(start="00:00", end=end_str))]
)
return annotations
def load_annotations_from_dataset(dataset_path: Path, prefix: str = "sparse") -> dict[int, SubtaskAnnotation]:
"""Load annotations from LeRobot dataset parquet files."""
from lerobot.datasets.utils import load_episodes
episodes_dataset = load_episodes(dataset_path)
if not episodes_dataset or len(episodes_dataset) == 0:
return {}
col_names = f"{prefix}_subtask_names"
col_start = f"{prefix}_subtask_start_times"
col_end = f"{prefix}_subtask_end_times"
# Fall back to legacy columns for sparse
if col_names not in episodes_dataset.column_names:
if prefix == "sparse" and "subtask_names" in episodes_dataset.column_names:
col_names, col_start, col_end = "subtask_names", "subtask_start_times", "subtask_end_times"
else:
return {}
df = episodes_dataset.to_pandas()
annotations = {}
for ep_idx in df.index:
names = df.loc[ep_idx, col_names]
if names is None or (isinstance(names, float) and pd.isna(names)):
continue
starts, ends = df.loc[ep_idx, col_start], df.loc[ep_idx, col_end]
annotations[int(ep_idx)] = SubtaskAnnotation(
subtasks=[
Subtask(
name=n,
timestamps=Timestamp(
start=f"{int(s) // 60:02d}:{int(s) % 60:02d}",
end=f"{int(e) // 60:02d}:{int(e) % 60:02d}",
),
)
for n, s, e in zip(names, starts, ends)
]
)
return annotations
def process_single_episode(
ep_idx: int,
dataset_root: Path,
dataset_meta,
video_key: str,
fps: int,
annotator: VideoAnnotator,
console: Console,
) -> tuple[int, SubtaskAnnotation | None, str | None]:
"""Process a single episode annotation."""
try:
video_path = dataset_root / dataset_meta.get_video_file_path(ep_idx, video_key)
if not video_path.exists():
return ep_idx, None, f"Video not found: {video_path}"
start = float(dataset_meta.episodes[f"videos/{video_key}/from_timestamp"][ep_idx])
end = float(dataset_meta.episodes[f"videos/{video_key}/to_timestamp"][ep_idx])
return ep_idx, annotator.annotate(video_path, fps, start, end), None
except Exception as e:
return ep_idx, None, str(e)
def worker_process_episodes(
worker_id: int,
gpu_id: int,
episode_indices: list[int],
repo_id: str,
video_key: str,
sparse_subtask_list: list[str],
dense_subtask_list: list[str] | None,
model_name: str,
torch_dtype: torch.dtype,
) -> tuple[dict, dict | None]:
"""Worker for parallel processing across GPUs."""
device = f"cuda:{gpu_id}"
console = Console()
dataset = LeRobotDataset(repo_id, download_videos=False)
sparse_annotator = VideoAnnotator(sparse_subtask_list, model_name, device, torch_dtype)
dense_annotator = (
VideoAnnotator(
dense_subtask_list,
model_name,
device,
torch_dtype,
sparse_annotator.model,
sparse_annotator.processor,
)
if dense_subtask_list
else None
)
sparse_annotations, dense_annotations = {}, {} if dense_subtask_list else None
for ep_idx in episode_indices:
_, sparse_ann, err = process_single_episode(
ep_idx, dataset.root, dataset.meta, video_key, dataset.fps, sparse_annotator, console
)
if sparse_ann:
sparse_annotations[ep_idx] = sparse_ann
if dense_annotator:
_, dense_ann, _ = process_single_episode(
ep_idx, dataset.root, dataset.meta, video_key, dataset.fps, dense_annotator, console
)
if dense_ann:
dense_annotations[ep_idx] = dense_ann
return sparse_annotations, dense_annotations
def main():
parser = argparse.ArgumentParser(description="SARM-style subtask annotation using local GPU (Qwen3-VL)")
parser.add_argument("--repo-id", type=str, required=True, help="HuggingFace dataset repository ID")
parser.add_argument(
"--sparse-subtasks", type=str, default=None, help="Comma-separated sparse subtask names"
)
parser.add_argument(
"--dense-subtasks", type=str, default=None, help="Comma-separated dense subtask names"
)
parser.add_argument(
"--dense-only", action="store_true", help="Dense-only mode with auto-generated sparse 'task' stage"
)
parser.add_argument("--episodes", type=int, nargs="+", default=None, help="Episode indices to annotate")
parser.add_argument("--model", type=str, default="Qwen/Qwen3-VL-30B-A3B-Instruct", help="VLM model")
parser.add_argument("--skip-existing", action="store_true", help="Skip already annotated episodes")
parser.add_argument("--video-key", type=str, default=None, help="Video key (default: first available)")
parser.add_argument("--push-to-hub", action="store_true", help="Push to HuggingFace Hub")
parser.add_argument("--output-repo-id", type=str, default=None, help="Output repo ID for push")
parser.add_argument("--device", type=str, default="cuda", help="Device (cuda/cpu)")
parser.add_argument("--dtype", type=str, default="bfloat16", choices=["bfloat16", "float16", "float32"])
parser.add_argument("--num-workers", type=int, default=1, help="Parallel workers for multi-GPU")
parser.add_argument("--gpu-ids", type=int, nargs="+", default=None, help="GPU IDs to use")
args = parser.parse_args()
console = Console()
# Validate arguments
if args.dense_only and not args.dense_subtasks:
return console.print("[red]Error: --dense-only requires --dense-subtasks[/red]")
if args.dense_subtasks and not args.sparse_subtasks and not args.dense_only:
return console.print("[red]Error: --dense-subtasks requires --sparse-subtasks or --dense-only[/red]")
sparse_subtask_list = (
[s.strip() for s in args.sparse_subtasks.split(",")] if args.sparse_subtasks else None
)
dense_subtask_list = [s.strip() for s in args.dense_subtasks.split(",")] if args.dense_subtasks else None
auto_sparse = sparse_subtask_list is None
dense_mode = dense_subtask_list is not None
torch_dtype = {"bfloat16": torch.bfloat16, "float16": torch.float16, "float32": torch.float32}[args.dtype]
console.print(f"[cyan]Loading dataset: {args.repo_id}[/cyan]")
dataset = LeRobotDataset(args.repo_id, download_videos=True)
fps = dataset.fps
if not dataset.meta.video_keys:
raise ValueError("No video keys found")
video_key = (
args.video_key if args.video_key in (dataset.meta.video_keys or []) else dataset.meta.video_keys[0]
)
console.print(f"[cyan]Using camera: {video_key}, FPS: {fps}[/cyan]")
# Determine episodes
episode_indices = args.episodes or list(range(dataset.meta.total_episodes))
existing_annotations = load_annotations_from_dataset(dataset.root, prefix="sparse")
if args.skip_existing:
episode_indices = [ep for ep in episode_indices if ep not in existing_annotations]
if not episode_indices:
return console.print("[green]All episodes already annotated![/green]")
console.print(f"[cyan]Annotating {len(episode_indices)} episodes[/cyan]")
# GPU setup
gpu_ids = args.gpu_ids or list(
range(min(args.num_workers, torch.cuda.device_count() if torch.cuda.is_available() else 1))
)
args.num_workers = len(gpu_ids)
sparse_annotations = existing_annotations.copy()
dense_annotations = {} if dense_mode else None
# Auto-sparse mode
if auto_sparse:
sparse_annotations.update(generate_auto_sparse_annotations(dataset, episode_indices, video_key))
save_annotations_to_dataset(dataset.root, sparse_annotations, fps, prefix="sparse")
console.print(f"[green]Auto-generated {len(episode_indices)} sparse 'task' annotations[/green]")
# VLM annotation (for sparse if not auto, and for dense)
need_vlm = (not auto_sparse) or dense_mode
if need_vlm:
if args.num_workers > 1 and not auto_sparse:
# Parallel processing
console.print(f"[cyan]Parallel processing with {args.num_workers} workers[/cyan]")
episodes_per_worker = [[] for _ in range(args.num_workers)]
for i, ep_idx in enumerate(episode_indices):
episodes_per_worker[i % args.num_workers].append(ep_idx)
with ProcessPoolExecutor(
max_workers=args.num_workers, mp_context=mp.get_context("spawn")
) as executor:
futures = [
executor.submit(
worker_process_episodes,
w,
gpu_ids[w],
episodes_per_worker[w],
args.repo_id,
video_key,
sparse_subtask_list,
dense_subtask_list,
args.model,
torch_dtype,
)
for w in range(args.num_workers)
if episodes_per_worker[w]
]
for future in as_completed(futures):
try:
worker_sparse, worker_dense = future.result()
sparse_annotations.update(worker_sparse)
if dense_mode and worker_dense:
dense_annotations.update(worker_dense)
save_annotations_to_dataset(dataset.root, sparse_annotations, fps, prefix="sparse")
if dense_mode:
save_annotations_to_dataset(dataset.root, dense_annotations, fps, prefix="dense")
except Exception as e:
raise RuntimeError(f"Worker failed: {e}") from e
else:
# Sequential processing
sparse_annotator = (
VideoAnnotator(sparse_subtask_list, args.model, args.device, torch_dtype)
if not auto_sparse and sparse_subtask_list
else None
)
dense_annotator = (
VideoAnnotator(
dense_subtask_list,
args.model,
args.device,
torch_dtype,
sparse_annotator.model if sparse_annotator else None,
sparse_annotator.processor if sparse_annotator else None,
)
if dense_mode
else None
)
for i, ep_idx in enumerate(episode_indices):
console.print(f"[cyan]Episode {ep_idx} ({i + 1}/{len(episode_indices)})[/cyan]")
if sparse_annotator:
_, sparse_ann, err = process_single_episode(
ep_idx, dataset.root, dataset.meta, video_key, fps, sparse_annotator, console
)
if sparse_ann:
sparse_annotations[ep_idx] = sparse_ann
save_annotations_to_dataset(dataset.root, sparse_annotations, fps, prefix="sparse")
elif err:
console.print(f"[red]Sparse failed: {err}[/red]")
if dense_annotator:
_, dense_ann, err = process_single_episode(
ep_idx, dataset.root, dataset.meta, video_key, fps, dense_annotator, console
)
if dense_ann:
dense_annotations[ep_idx] = dense_ann
save_annotations_to_dataset(dataset.root, dense_annotations, fps, prefix="dense")
elif err:
console.print(f"[red]Dense failed: {err}[/red]")
# Save temporal proportions
def save_proportions(annotations, prefix, is_auto=False):
props: dict[str, float] = {"task": 1.0} if is_auto else compute_temporal_proportions(annotations, fps)
path = dataset.root / "meta" / f"temporal_proportions_{prefix}.json"
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(props, f, indent=2)
console.print(f"[green]Saved {prefix} temporal proportions[/green]")
save_proportions(sparse_annotations, "sparse", auto_sparse)
if dense_mode and dense_annotations:
save_proportions(dense_annotations, "dense")
console.print(
f"\n[bold green]Complete! {len(sparse_annotations)} sparse, {len(dense_annotations or {})} dense annotations[/bold green]"
)
if args.push_to_hub:
try:
dataset.push_to_hub(push_videos=True)
console.print(f"[green]Pushed to {args.output_repo_id or args.repo_id}[/green]")
except Exception as e:
console.print(f"[red]Push failed: {e}[/red]")
if __name__ == "__main__":
main()
-1
View File
@@ -1 +0,0 @@
srun --time 12:00:00 --qos=high --gres=gpu:1 --mem=24G --partition=hopper-prod --container-image /fsx/michel_aractingi/docker_images/huggingface+lerobot-gpu+dev.sqsh --container-mounts /fsx/jade_choghari
-44
View File
@@ -1,44 +0,0 @@
#!/bin/bash
# Quick test to verify the fix for task_indices length mismatch
# This should now work correctly even with --num-samples < full dataset length
echo "Testing annotate_pgen.py with --num-samples=100 on full dataset..."
python examples/dataset/annotate_pgen.py \
--data-dir /fsx/jade_choghari/.cache/huggingface/lerobot/lerobot/svla_so101_pickplace \
--model Qwen/Qwen3-VL-30B-A3B-Instruct \
--num-samples 100 \
--sample-interval 1.0 \
--output-dir /fsx/jade_choghari/outputs/pgen_test_fixed
if [ $? -eq 0 ]; then
echo "✓ SUCCESS: Script completed without errors!"
echo ""
echo "Verifying output..."
# Check that all frames have task_index_high_level
python -c "
from lerobot.datasets.lerobot_dataset import LeRobotDataset
import numpy as np
ds = LeRobotDataset(repo_id='local_test', root='/fsx/jade_choghari/outputs/pgen_test_fixed')
print(f'Dataset has {len(ds)} frames')
print(f'Features: {list(ds.features.keys())}')
# Check that task_index_high_level exists
assert 'task_index_high_level' in ds.features, 'task_index_high_level not in features!'
# Sample some frames
for idx in [0, 50, 99, 100, 500, 1000, 11938]:
if idx < len(ds):
frame = ds[idx]
task_idx = frame['task_index_high_level'].item()
print(f'Frame {idx}: task_index_high_level = {task_idx}')
print('✓ All checks passed!')
"
else
echo "✗ FAILED: Script exited with error code $?"
fi
-47
View File
@@ -1,47 +0,0 @@
# Voice Assistant Examples
Voice-enabled robot assistant examples using speech-to-text (STT), and text-to-speech (TTS).
## Overview
These examples demonstrate how to build a voice interface for robot control:
1. **Hold SPACE** → Push-to-talk recording starts
2. **Release SPACE** → Recording stops
3. **STT (Whisper)** → Converts speech to text (high-level task prompt)
4. **Pi0.5** → Generates robot response/utterance
5. **TTS (Kokoro)** → Speaks the response back
## Requirements
```bash
pip install torch transformers sounddevice numpy pynput kokoro>=0.9.2
```
## Usage
### With Pi0.5 Model
```bash
python examples/voice_assistant/voice_assistant_pi05.py \
--pretrained_path path/to/pi05/checkpoint
```
## How It Works
### Pi0.5 Voice Integration
Pi0.5 can generate robot utterances as part of its subtask prediction. The flow:
1. **High-level prompt**: User voice command is transcribed and formatted as a task prompt
2. **Subtask generation**: Pi0.5 autoregressively generates a response
3. **Utterance extraction**: If the response contains `<utterance>...</utterance>` tags, the content is extracted
4. **TTS output**: The response is spoken back to the user
## Configuration Options
| Option | Default | Description |
|--------|---------|-------------|
| `--pretrained_path` | None | Path to Pi0.5 checkpoint |
| `--record_seconds` | 5.0 | Audio recording duration |
| `--max_response_tokens` | 100 | Max tokens in generated response |
@@ -1,336 +0,0 @@
#!/usr/bin/env python
"""
Voice Assistant with Pi0.5: Microphone → STT → Pi0.5 → TTS → Speaker
This example demonstrates how to use Pi0.5 as a conversational robot assistant:
1. Hold SPACE to record your voice command
2. Speech-to-text (Whisper) converts speech to text
3. Text is fed as a high-level prompt to Pi0.5
4. Pi0.5 generates a response (robot utterance)
5. Text-to-speech (Kokoro) speaks the response back
Requirements:
pip install torch transformers sounddevice numpy pynput kokoro>=0.9.2
Usage:
python examples/voice_assistant/voice_assistant_pi05.py \
--pretrained_path lerobot/pi0.5-base
"""
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import argparse
import re
import subprocess
import threading
import time
import numpy as np
import sounddevice as sd
import torch
from pynput import keyboard
from transformers import AutoTokenizer, WhisperForConditionalGeneration, WhisperProcessor
from lerobot.policies.pi05.configuration_pi05 import PI05Config
from lerobot.policies.pi05.modeling_pi05 import PI05Pytorch
SAMPLE_RATE = 16000
def get_device():
if torch.cuda.is_available():
return torch.device("cuda")
elif torch.backends.mps.is_available():
return torch.device("mps")
return torch.device("cpu")
class Pi05VoiceAssistant:
"""Voice assistant using Pi0.5 for generating robot utterances."""
def __init__(
self,
pretrained_path: str | None = None,
max_response_tokens: int = 100,
max_record_seconds: float = 30.0,
):
self.device = get_device()
self.dtype = torch.float32 if self.device.type == "mps" else torch.bfloat16
self.max_response_tokens = max_response_tokens
self.max_record_seconds = max_record_seconds
# Push-to-talk state
self._recording = False
self._audio_chunks: list[np.ndarray] = []
self._stream: sd.InputStream | None = None
print(f"Using device: {self.device}")
self._load_models(pretrained_path)
def _load_models(self, pretrained_path: str | None):
print("Loading STT (Whisper tiny)...")
self.stt_processor = WhisperProcessor.from_pretrained("openai/whisper-tiny.en")
self.stt_model = WhisperForConditionalGeneration.from_pretrained(
"openai/whisper-tiny.en", torch_dtype=self.dtype
).to(self.device)
print("Loading Pi0.5 model...")
self._load_pi05(pretrained_path)
print("Loading tokenizer...")
self.tokenizer = AutoTokenizer.from_pretrained("google/paligemma-3b-pt-224")
self._load_tts()
print("Ready!\n")
def _load_pi05(self, pretrained_path: str | None):
"""Load Pi0.5 model for utterance generation."""
config = PI05Config()
config.dtype = "float32" if self.device.type == "mps" else "bfloat16"
self.pi05_model = PI05Pytorch(config)
if pretrained_path:
try:
from safetensors.torch import load_file
state_dict = load_file(f"{pretrained_path}/model.safetensors")
self.pi05_model.load_state_dict(state_dict, strict=False)
print(f"✓ Loaded Pi0.5 weights from {pretrained_path}")
except Exception as e:
print(f"Warning: Could not load pretrained weights: {e}")
print("Using randomly initialized model for demo purposes")
self.pi05_model = self.pi05_model.to(self.device)
self.pi05_model.eval()
def _load_tts(self):
try:
print("Loading TTS (Kokoro 82M)...")
from kokoro import KPipeline
self.tts_pipeline = KPipeline(lang_code="a") # American English
self.tts_voice = "af_heart"
self.tts_type = "kokoro"
print("Kokoro loaded!")
except Exception as e:
print(f"Kokoro not available ({e})")
print("Using macOS `say` for TTS")
self.tts_pipeline = None
self.tts_type = "system"
def _audio_callback(self, indata, frames, time_info, status):
"""Callback for audio stream - collects chunks while recording."""
if self._recording:
self._audio_chunks.append(indata.copy())
def _start_recording(self):
"""Start recording audio."""
if self._recording:
return
self._recording = True
self._audio_chunks = []
print("🎤 Recording... (release SPACE to stop)")
def _stop_recording(self) -> np.ndarray | None:
"""Stop recording and return the audio."""
if not self._recording:
return None
self._recording = False
if not self._audio_chunks:
return None
audio = np.concatenate(self._audio_chunks, axis=0).flatten()
duration = len(audio) / SAMPLE_RATE
volume = np.abs(audio).max()
print(f"Recorded {duration:.1f}s, volume: {volume:.4f}")
if volume < 0.001:
print("⚠️ Very low audio - check microphone permissions!")
return None
return audio
def wait_for_spacebar(self) -> np.ndarray | None:
"""Wait for spacebar press, record while held, return audio on release."""
audio_result = None
recording_done = threading.Event()
def on_press(key):
if key == keyboard.Key.space:
self._start_recording()
def on_release(key):
nonlocal audio_result
if key == keyboard.Key.space and self._recording:
audio_result = self._stop_recording()
recording_done.set()
return False # Stop listener
# Start audio stream
self._stream = sd.InputStream(
samplerate=SAMPLE_RATE,
channels=1,
dtype="float32",
callback=self._audio_callback,
blocksize=int(SAMPLE_RATE * 0.1), # 100ms blocks
)
with self._stream:
print("\n⏳ Press and hold SPACE to speak...")
with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
# Wait for recording to complete or timeout
recording_done.wait(timeout=self.max_record_seconds)
if self._recording:
audio_result = self._stop_recording()
return audio_result
def transcribe(self, audio: np.ndarray) -> str:
start = time.perf_counter()
inputs = self.stt_processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt")
input_features = inputs.input_features.to(self.device, dtype=self.dtype)
tokens = self.stt_model.generate(input_features)
text = self.stt_processor.batch_decode(tokens, skip_special_tokens=True)[0]
print(f"STT: {time.perf_counter() - start:.2f}s")
return text.strip()
def _create_dummy_images(self, batch_size: int = 1) -> tuple[list[torch.Tensor], list[torch.Tensor]]:
"""Create placeholder images for Pi0.5 when no camera is available."""
image_shape = (batch_size, 3, 224, 224)
dummy_image = torch.zeros(image_shape, dtype=torch.float32, device=self.device)
dummy_mask = torch.ones(batch_size, dtype=torch.bool, device=self.device)
return [dummy_image], [dummy_mask]
def _tokenize_prompt(self, text: str) -> tuple[torch.Tensor, torch.Tensor]:
"""Tokenize the user prompt for Pi0.5."""
prompt = f"User request: {text}\nRobot response:"
tokenized = self.tokenizer(
[prompt],
max_length=200,
truncation=True,
padding="max_length",
return_tensors="pt",
)
tokens = tokenized["input_ids"].to(self.device)
masks = tokenized["attention_mask"].to(self.device, dtype=torch.bool)
return tokens, masks
def generate_response(self, user_text: str) -> str:
"""Generate robot utterance using Pi0.5's language generation."""
start = time.perf_counter()
images, img_masks = self._create_dummy_images()
tokens, masks = self._tokenize_prompt(user_text)
with torch.no_grad():
generated_tokens = self.pi05_model._generate_subtask_tokens(
images=images,
img_masks=img_masks,
tokens=tokens,
masks=masks,
tokenizer=self.tokenizer,
max_length=self.max_response_tokens,
device=self.device,
)
# Decode generated tokens
valid_tokens = generated_tokens[0][generated_tokens[0] != 0]
response = self.tokenizer.decode(valid_tokens, skip_special_tokens=True)
# Extract utterance if marked with special tokens
response = self._extract_utterance(response)
print(f"Pi0.5: {time.perf_counter() - start:.2f}s")
return response.strip()
def _extract_utterance(self, text: str) -> str:
"""Extract utterance from between <utterance> tokens if present."""
pattern = r"<utterance>(.*?)</utterance>"
match = re.search(pattern, text, re.DOTALL)
if match:
return match.group(1).strip()
return text
def speak(self, text: str):
start = time.perf_counter()
if self.tts_type == "kokoro":
generator = self.tts_pipeline(text, voice=self.tts_voice)
audio_chunks = [audio for _, _, audio in generator]
if audio_chunks:
audio = np.concatenate(audio_chunks)
sd.play(audio, 24000)
sd.wait()
else:
subprocess.run(["say", text], check=True)
print(f"TTS: {time.perf_counter() - start:.2f}s")
def run(self):
print("=" * 50)
print("Pi0.5 Voice Assistant")
print("=" * 50)
print("• Hold SPACE to record your voice command")
print("• Release SPACE when done speaking")
print("• Press Ctrl+C to exit")
print("=" * 50)
while True:
try:
audio = self.wait_for_spacebar()
if audio is None:
print("(no audio captured)\n")
continue
user_text = self.transcribe(audio)
if not user_text:
print("(no speech detected)\n")
continue
print(f"You: {user_text}")
response = self.generate_response(user_text)
print(f"Robot: {response}\n")
self.speak(response)
except KeyboardInterrupt:
print("\nGoodbye!")
break
def main():
parser = argparse.ArgumentParser(description="Pi0.5 Voice Assistant")
parser.add_argument(
"--pretrained_path",
type=str,
default=None,
help="Path to pretrained Pi0.5 model (optional)",
)
parser.add_argument(
"--max_response_tokens",
type=int,
default=100,
help="Maximum tokens in generated response",
)
parser.add_argument(
"--max_record_seconds",
type=float,
default=30.0,
help="Maximum recording duration in seconds",
)
args = parser.parse_args()
assistant = Pi05VoiceAssistant(
pretrained_path=args.pretrained_path,
max_response_tokens=args.max_response_tokens,
max_record_seconds=args.max_record_seconds,
)
assistant.run()
if __name__ == "__main__":
main()
-27
View File
@@ -1,27 +0,0 @@
{
"repo_id": "local",
"vocab_size": 1024,
"scale": 10.0,
"encoded_dims": "0:7",
"encoded_dim_ranges": [
[
0,
7
]
],
"total_encoded_dims": 7,
"delta_dims": null,
"delta_dim_list": null,
"use_delta_transform": false,
"state_key": "observation.state",
"normalization_mode": "QUANTILES",
"action_horizon": 10,
"num_training_chunks": 25065,
"compression_stats": {
"compression_ratio": 3.464660463274599,
"mean_token_length": 20.204,
"p99_token_length": 36.00999999999999,
"min_token_length": 5.0,
"max_token_length": 38.0
}
}
@@ -1,158 +0,0 @@
import logging
from typing import ClassVar
import numpy as np
from scipy.fft import dct
from scipy.fft import idct
from tokenizers import ByteLevelBPETokenizer
from tokenizers.trainers import BpeTrainer
from transformers import PreTrainedTokenizerFast
from transformers.processing_utils import ProcessorMixin
class UniversalActionProcessor(ProcessorMixin):
attributes: ClassVar[list[str]] = ["bpe_tokenizer"]
bpe_tokenizer_class: str = "AutoTokenizer"
def __init__(
self,
bpe_tokenizer: PreTrainedTokenizerFast,
scale: float = 10,
vocab_size: int = 1024,
min_token: int = 0,
*,
action_dim: int | None = None,
time_horizon: int | None = None,
):
self.scale = scale
self.vocab_size = vocab_size
self.min_token = min_token
# Action horizon and dimension needed during decoding. These can be specified
# in three ways (in order of priority):
# 1. passed in as kwargs to decode()
# 2. in the constructor
# 3. cached from the last time decode() was called
self.time_horizon = time_horizon
self.action_dim = action_dim
self.called_time_horizon = time_horizon
self.called_action_dim = action_dim
super().__init__(bpe_tokenizer)
def __call__(self, action_chunk: np.array) -> np.array:
assert action_chunk.ndim <= 3, "Only 3 dimensions supported: [batch, timesteps, action_dim]"
if action_chunk.ndim == 2:
action_chunk = action_chunk[None, ...]
# Cache the time horizon and action dimension for decoding
self.called_time_horizon = action_chunk.shape[-2]
self.called_action_dim = action_chunk.shape[-1]
dct_coeff = dct(action_chunk, axis=1, norm="ortho")
dct_coeff = np.around(dct_coeff * self.scale)
tokens = []
for elem in dct_coeff:
token_str = "".join(map(chr, np.maximum(elem.flatten() - self.min_token, 0).astype(int)))
tokens.append(self.bpe_tokenizer(token_str)["input_ids"])
return tokens
def decode(
self,
tokens: list[list[int]],
*,
time_horizon: int | None = None,
action_dim: int | None = None,
) -> np.array:
self.time_horizon = time_horizon or self.time_horizon or self.called_time_horizon
self.action_dim = action_dim or self.action_dim or self.called_action_dim
# Cache the time horizon and action dimension for the next call
self.called_time_horizon = self.time_horizon
self.called_action_dim = self.action_dim
assert (
self.time_horizon is not None and self.action_dim is not None
), "Tokenizer not initialized, call encode() once or pass in time_horizon and action_dim."
decoded_actions = []
for token in tokens:
try:
decoded_tokens = self.bpe_tokenizer.decode(token)
decoded_dct_coeff = np.array(list(map(ord, decoded_tokens))) + self.min_token
decoded_dct_coeff = decoded_dct_coeff.reshape(-1, self.action_dim)
assert (
decoded_dct_coeff.shape
== (
self.time_horizon,
self.action_dim,
)
), f"Decoded DCT coefficients have shape {decoded_dct_coeff.shape}, expected ({self.time_horizon}, {self.action_dim})"
except Exception as e:
print(f"Error decoding tokens: {e}")
print(f"Tokens: {token}")
decoded_dct_coeff = np.zeros((self.time_horizon, self.action_dim))
decoded_actions.append(idct(decoded_dct_coeff / self.scale, axis=0, norm="ortho"))
return np.stack(decoded_actions)
@classmethod
def fit(
cls,
action_data: list[np.array],
scale: float = 10,
vocab_size: int = 1024,
*,
time_horizon: int | None = None,
action_dim: int | None = None,
) -> "UniversalActionProcessor":
# Run DCT over all inputs
dct_tokens = [dct(a, axis=0, norm="ortho").flatten() for a in action_data]
# Quantize and find min token
max_token = int(np.around(np.concatenate(dct_tokens) * scale).max())
min_token = int(np.around(np.concatenate(dct_tokens) * scale).min())
min_vocab_size = max_token - min_token
assert (
min_vocab_size <= vocab_size
), f"Vocab size {vocab_size} is too small for the range of tokens {min_vocab_size}"
if min_vocab_size + 100 > vocab_size:
logging.warning(
f"Initial alphabet size {min_vocab_size} is almost as large as the vocab"
f"size {vocab_size}, consider increasing vocab size"
)
# Make token iterator for BPE training
def _token_iter():
for tokens in dct_tokens:
rounded_tokens = np.around(tokens * scale) - min_token
rounded_tokens = rounded_tokens.astype(int)
string = "".join(map(chr, rounded_tokens))
yield string
# Train BPE tokenizer
bpe = ByteLevelBPETokenizer()
# Set up the entire range of possible tokens as the initial alphabet
alphabet = [chr(i) for i in range(max_token - min_token + 1)]
trainer = BpeTrainer(
vocab_size=vocab_size,
min_frequency=2,
show_progress=True,
special_tokens=[],
initial_alphabet=alphabet,
max_token_length=10000,
)
# Train the inner tokenizer (don't use ByteLevelBPETokenizer.train_from_iterator()
# because it doesn't support custom alphabets)
bpe._tokenizer.train_from_iterator(_token_iter(), trainer=trainer)
return cls(
PreTrainedTokenizerFast(tokenizer_object=bpe, clean_up_tokenization_spaces=False),
scale=scale,
vocab_size=vocab_size,
min_token=min_token,
time_horizon=time_horizon,
action_dim=action_dim,
)
@@ -1,11 +0,0 @@
{
"action_dim": 7,
"auto_map": {
"AutoProcessor": "processing_action_tokenizer.UniversalActionProcessor"
},
"min_token": -32,
"processor_class": "UniversalActionProcessor",
"scale": 10.0,
"time_horizon": 10,
"vocab_size": 1024
}
@@ -1 +0,0 @@
{}
File diff suppressed because it is too large Load Diff
@@ -1,11 +0,0 @@
{
"added_tokens_decoder": {},
"auto_map": {
"AutoProcessor": "processing_action_tokenizer.UniversalActionProcessor"
},
"clean_up_tokenization_spaces": false,
"extra_special_tokens": {},
"model_max_length": 1000000000000000019884624838656,
"processor_class": "UniversalActionProcessor",
"tokenizer_class": "PreTrainedTokenizerFast"
}
+20 -57
View File
@@ -136,40 +136,21 @@ def update_meta_data(
df["_orig_chunk"] = df[orig_chunk_col].copy()
df["_orig_file"] = df[orig_file_col].copy()
# Get mappings for this video key
# Update chunk and file indices to point to destination
df[orig_chunk_col] = video_idx["chunk"]
df[orig_file_col] = video_idx["file"]
# Apply per-source-file timestamp offsets
src_to_offset = video_idx.get("src_to_offset", {})
src_to_dst = video_idx.get("src_to_dst", {})
# Apply per-source-file mappings
if src_to_dst:
# Map each episode to its correct destination file and apply offset
if src_to_offset:
# Apply offset based on original source file
for idx in df.index:
# Convert to Python int to avoid numpy type mismatch in dict lookup
src_key = (int(df.at[idx, "_orig_chunk"]), int(df.at[idx, "_orig_file"]))
# Get destination chunk/file for this source file
dst_chunk, dst_file = src_to_dst.get(src_key, (video_idx["chunk"], video_idx["file"]))
df.at[idx, orig_chunk_col] = dst_chunk
df.at[idx, orig_file_col] = dst_file
# Apply timestamp offset
offset = src_to_offset.get(src_key, 0)
df.at[idx, f"videos/{key}/from_timestamp"] += offset
df.at[idx, f"videos/{key}/to_timestamp"] += offset
elif src_to_offset:
# Fallback: use same destination for all, but apply per-file offsets
df[orig_chunk_col] = video_idx["chunk"]
df[orig_file_col] = video_idx["file"]
for idx in df.index:
# Convert to Python int to avoid numpy type mismatch in dict lookup
src_key = (int(df.at[idx, "_orig_chunk"]), int(df.at[idx, "_orig_file"]))
src_key = (df.at[idx, "_orig_chunk"], df.at[idx, "_orig_file"])
offset = src_to_offset.get(src_key, 0)
df.at[idx, f"videos/{key}/from_timestamp"] += offset
df.at[idx, f"videos/{key}/to_timestamp"] += offset
else:
# Fallback to simple offset (for backward compatibility)
df[orig_chunk_col] = video_idx["chunk"]
df[orig_file_col] = video_idx["file"]
df[f"videos/{key}/from_timestamp"] = (
df[f"videos/{key}/from_timestamp"] + video_idx["latest_duration"]
)
@@ -287,12 +268,6 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
videos_idx[key]["episode_duration"] = 0
# Track offset for each source (chunk, file) pair
videos_idx[key]["src_to_offset"] = {}
# Track destination (chunk, file) for each source (chunk, file) pair
videos_idx[key]["src_to_dst"] = {}
# Initialize dst_file_durations if not present
# dst_file_durations tracks duration of each destination file
if "dst_file_durations" not in videos_idx[key]:
videos_idx[key]["dst_file_durations"] = {}
for key, video_idx in videos_idx.items():
unique_chunk_file_pairs = {
@@ -307,13 +282,9 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
chunk_idx = video_idx["chunk"]
file_idx = video_idx["file"]
dst_file_durations = video_idx["dst_file_durations"]
current_offset = video_idx["latest_duration"]
for src_chunk_idx, src_file_idx in unique_chunk_file_pairs:
# Convert to Python int to ensure consistent dict keys
src_chunk_idx = int(src_chunk_idx)
src_file_idx = int(src_file_idx)
src_path = src_meta.root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=src_chunk_idx,
@@ -327,17 +298,14 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
)
src_duration = get_video_duration_in_s(src_path)
dst_key = (chunk_idx, file_idx)
if not dst_path.exists():
# New destination file: offset is 0
videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0
videos_idx[key]["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key
# Store offset before incrementing
videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = current_offset
dst_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(src_path), str(dst_path))
# Track duration of this destination file
dst_file_durations[dst_key] = src_duration
videos_idx[key]["episode_duration"] += src_duration
current_offset += src_duration
continue
# Check file sizes before appending
@@ -345,11 +313,10 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
dst_size = get_file_size_in_mb(dst_path)
if dst_size + src_size >= video_files_size_in_mb:
# Rotate to a new file - offset is 0
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, chunk_size)
dst_key = (chunk_idx, file_idx)
# Rotate to a new file, this source becomes start of new destination
# So its offset should be 0
videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0
videos_idx[key]["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, chunk_size)
dst_path = dst_meta.root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=chunk_idx,
@@ -357,20 +324,16 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
)
dst_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(src_path), str(dst_path))
# Track duration of this new destination file
dst_file_durations[dst_key] = src_duration
# Reset offset for next file
current_offset = src_duration
else:
# Append to existing destination file
# Offset is the current duration of this destination file
current_dst_duration = dst_file_durations.get(dst_key, 0)
videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = current_dst_duration
videos_idx[key]["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key
# Append to existing video file - use current accumulated offset
videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = current_offset
concatenate_video_files(
[dst_path, src_path],
dst_path,
)
# Update duration of this destination file
dst_file_durations[dst_key] = current_dst_duration + src_duration
current_offset += src_duration
videos_idx[key]["episode_duration"] += src_duration
-8
View File
@@ -58,7 +58,6 @@ from lerobot.datasets.utils import (
load_nested_dataset,
load_stats,
load_tasks,
load_tasks_high_level,
update_chunk_file_indices,
validate_episode_buffer,
validate_frame,
@@ -162,7 +161,6 @@ class LeRobotDatasetMetadata:
self.info = load_info(self.root)
check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION)
self.tasks = load_tasks(self.root)
# self.tasks_high_level = load_tasks_high_level(self.root)
self.episodes = load_episodes(self.root)
self.stats = load_stats(self.root)
@@ -1052,12 +1050,6 @@ class LeRobotDataset(torch.utils.data.Dataset):
# Add task as a string
task_idx = item["task_index"].item()
item["task"] = self.meta.tasks.iloc[task_idx].name
# Optionally add high level task index
if "task_index_high_level" in self.features:
high_level_task_idx = item["task_index_high_level"].item()
item["robot_utterance"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["robot_utterance"]
item["user_prompt"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["user_prompt"]
return item
def __repr__(self):
-4
View File
@@ -60,7 +60,6 @@ VIDEO_DIR = "videos"
CHUNK_FILE_PATTERN = "chunk-{chunk_index:03d}/file-{file_index:03d}"
DEFAULT_TASKS_PATH = "meta/tasks.parquet"
DEFAULT_TASKS_HIGH_LEVEL_PATH = "meta/tasks_high_level.parquet"
DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
@@ -353,9 +352,6 @@ def load_tasks(local_dir: Path) -> pandas.DataFrame:
tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_PATH)
return tasks
def load_tasks_high_level(local_dir: Path) -> pandas.DataFrame:
tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_HIGH_LEVEL_PATH)
return tasks
def write_episodes(episodes: Dataset, local_dir: Path) -> None:
"""Write episode metadata to a parquet file in the LeRobot v3.0 format.
@@ -1,196 +0,0 @@
# FAST Tokenizer Training for LeRobotDataset
This directory contains tools for training a FAST (Factorized Action Sequence Tokenizer) on LeRobot datasets.
## Files
- **`train_fast_tokenizer.py`**: Main training script (refactored for LeRobotDataset)
- **`train_fast_tokenizer_example.md`**: Usage examples and parameter documentation
- **`MIGRATION_NOTES.md`**: Migration guide from B1K to LeRobotDataset
## Quick Start
```bash
# Basic usage
python train_fast_tokenizer.py \
--repo_id "lerobot/aloha_sim_insertion_human" \
--action_horizon 10 \
--encoded_dims "0:14"
# With delta transform
python train_fast_tokenizer.py \
--repo_id "lerobot/aloha_sim_insertion_human" \
--action_horizon 10 \
--encoded_dims "0:14" \
--delta_dims "0,1,2,3,4,5,6,7,8,9,10,11,12,13" \
--state_key "observation.state" \
--vocab_size 1024
```
## What is FAST?
FAST is a tokenizer for robotic action sequences that:
1. Applies DCT (Discrete Cosine Transform) to action chunks
2. Quantizes DCT coefficients
3. Uses BPE (Byte-Pair Encoding) to compress the quantized sequence
4. Achieves high compression ratios (e.g., 10-20x) while maintaining accuracy
This enables efficient storage and processing of long action sequences in vision-language-action models.
## Requirements
- Python 3.10+
- LeRobot dataset (either local or from HuggingFace Hub)
- transformers (for AutoProcessor)
- numpy
- torch
- tyro
## Workflow
```
LeRobotDataset → Extract Episodes → Apply Delta Transform
Select Dimensions → Normalize (q01, q99) → Create Chunks
Train FAST Tokenizer → Compute Stats → Save
```
## Parameters Guide
### Essential Parameters
- **`repo_id`**: HuggingFace dataset repository ID
- Example: `"lerobot/aloha_sim_insertion_human"`
- **`action_horizon`**: Length of action sequences to tokenize
- Typical: 10-16 steps
- **`encoded_dims`**: Which action dimensions to encode
- Format: `"start:end,start:end"`
- Example: `"0:7"` = dimensions 0-6
- Example: `"0:3,7:10"` = dimensions 0-2 and 7-9
### Optional Parameters
- **`delta_dims`**: Apply delta transform (action - state) to these dimensions
- Format: `"0,1,2,3,4,5"`
- Use for position-based actions
- **`state_key`**: Dataset key containing state observations
- Default: `"observation.state"`
- **`vocab_size`**: BPE vocabulary size
- Default: 1024
- Larger = better compression but more memory
- **`scale`**: DCT quantization scale
- Default: 10.0
- Smaller = finer quantization, larger = coarser
- **`sample_fraction`**: Fraction of action chunks to use per episode
- Default: 0.1 (10%)
- Increase for small datasets, decrease for large datasets
## Output
The script creates a directory (default: `./fast_tokenizer_{repo_id}`) containing:
1. **Tokenizer files**: Can be loaded with `AutoProcessor.from_pretrained()`
2. **`metadata.json`**: Contains:
- Training configuration
- Compression statistics
- Dataset information
## Example Output
```
Loading dataset: lerobot/aloha_sim_insertion_human
Dataset loaded: 50 episodes, 5000 frames
Encoding 14 dimensions: 0:14
Delta dimensions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
Action horizon: 10
Processing 50 episodes...
Collected 4500 action chunks
Extracted 14 encoded dimensions
Before normalization - overall stats:
Min: -2.3451, Max: 3.1234, Mean: 0.0234, Std: 0.8765
Applied quantile normalization [q01, q99] → [-1, 1]
After normalization - overall stats:
Min: -1.0000, Max: 1.0000, Mean: 0.0156, Std: 0.4321
Training FAST tokenizer on 4500 action chunks...
Action chunk shape: (4500, 10, 14)
Vocab size: 1024
DCT scale: 10.0
✓ Tokenizer training complete!
Compression Statistics:
Average compression ratio: 14.23x
Mean token length: 9.8
P99 token length: 15
Min token length: 6
Max token length: 18
✅ Saved FAST tokenizer to ./fast_tokenizer_lerobot_aloha_sim_insertion_human
```
## Using the Trained Tokenizer
```python
from transformers import AutoProcessor
# Load tokenizer
tokenizer = AutoProcessor.from_pretrained(
"./fast_tokenizer_lerobot_aloha_sim_insertion_human",
trust_remote_code=True
)
# Encode action chunk [horizon, action_dim]
action_chunk = np.random.randn(10, 14) # Example
tokens = tokenizer(action_chunk[None])[0] # Returns token IDs
# Decode tokens back to actions
reconstructed = tokenizer.decode(tokens)
```
## Tips
1. **Start Small**: Use `--max_episodes 10` for initial testing
2. **Check Dimensions**: Verify encoded dimensions match your robot's action space
3. **Delta Transform**: Use for position-based actions, not velocity-based
4. **Normalization**: Ensure dataset has proper statistics computed
5. **Compression Ratio**: Aim for 10-20x for good balance of compression and accuracy
## Troubleshooting
**Issue**: "No normalization stats found"
- **Solution**: Compute dataset statistics first, or use raw actions
**Issue**: "Episode too short for action horizon"
- **Solution**: Reduce `--action_horizon` or filter short episodes
**Issue**: "State key not found"
- **Solution**: Check dataset features and use correct `--state_key`
**Issue**: Memory error with large datasets
- **Solution**: Reduce `--sample_fraction` or `--max_episodes`
## Citation
If you use FAST in your research, please cite:
```bibtex
@article{black2023fast,
title={FAST: Factorized Action Sequence Tokenizer for Vision-Language-Action Models},
author={Black, Kevin and others},
journal={arXiv preprint},
year={2023}
}
```
@@ -37,11 +37,6 @@ class PI05Config(PreTrainedConfig):
# Shorter state and action vectors will be padded to these dimensions
max_state_dim: int = 32
max_action_dim: int = 32
max_action_tokens: int = 32
fast_vocab_size: int = 2048
# FAST-only mode: train with only discrete action token prediction (no flow matching, no subtask)
fast_only: bool = False
# Flow matching parameters: see openpi `PI0Pytorch`
num_inference_steps: int = 10
@@ -65,8 +60,8 @@ class PI05Config(PreTrainedConfig):
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.IDENTITY,
"STATE": NormalizationMode.MEAN_STD, # Pi0.5 uses quantiles for state
"ACTION": NormalizationMode.MEAN_STD, # Pi0.5 uses quantiles for action
"STATE": NormalizationMode.QUANTILES, # Pi0.5 uses quantiles for state
"ACTION": NormalizationMode.QUANTILES, # Pi0.5 uses quantiles for action
}
)
-21
View File
@@ -1,21 +0,0 @@
lerobot-train \
--dataset.repo_id=lerobot \
--dataset.root=/fsx/jade_choghari/outputs/collect-data-pgen \
--output_dir=/fsx/jade_choghari/outputs/pi0test1 \
--job_name=pi0_training \
--policy.repo_id=jade_choghari/pi0-base \
--policy.path=/fsx/jade_choghari/outputs/pi0_fast_fruit1/checkpoints/last/pretrained_model \
--policy.dtype=bfloat16 \
--steps=3000 \
--save_freq=1000 \
--rename_map='{
"observation.images.base": "observation.images.base_0_rgb",
"observation.images.left_wrist": "observation.images.left_wrist_0_rgb",
"observation.images.right_wrist": "observation.images.right_wrist_0_rgb",
}' \
--batch_size=4 \
--policy.device=cuda \
# --wandb.enable=true \
# --wandb.disable_artifact=true \
# --wandb.project=pi05hi-training \
File diff suppressed because it is too large Load Diff
+11 -44
View File
@@ -33,7 +33,6 @@ from lerobot.processor import (
ProcessorStep,
ProcessorStepRegistry,
RenameObservationsProcessorStep,
ActionTokenizerProcessorStep,
TokenizerProcessorStep,
UnnormalizerProcessorStep,
)
@@ -48,15 +47,13 @@ from lerobot.utils.constants import (
@ProcessorStepRegistry.register(name="pi05_prepare_state_tokenizer_processor_step")
@dataclass
class Pi05PrepareStateAndLanguageTokenizerProcessorStep(ProcessorStep):
class Pi05PrepareStateTokenizerProcessorStep(ProcessorStep):
"""
Processor step to prepare the state and tokenize the language input.
"""
max_state_dim: int = 32
task_key: str = "task"
high_level_task_key: str = "user_prompt"
subtask_only_key: str = "subtask"
def __call__(self, transition: EnvTransition) -> EnvTransition:
transition = transition.copy()
@@ -67,8 +64,6 @@ class Pi05PrepareStateAndLanguageTokenizerProcessorStep(ProcessorStep):
tasks = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get(self.task_key)
if tasks is None:
raise ValueError("No task found in complementary data")
high_level_tasks = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get(self.high_level_task_key)
# TODO: check if this necessary
state = deepcopy(state)
@@ -81,42 +76,16 @@ class Pi05PrepareStateAndLanguageTokenizerProcessorStep(ProcessorStep):
state_np = state.cpu().numpy()
discretized_states = np.digitize(state_np, bins=np.linspace(-1, 1, 256 + 1)[:-1]) - 1
# Clean high level tasks first (if available)
cleaned_high_level_tasks = []
if high_level_tasks is not None:
for high_level_task in high_level_tasks:
cleaned_high_level_tasks.append(high_level_task.strip().replace("_", " ").replace("\n", " "))
# Process low level tasks with state information
low_level_prompts = []
subtask_only_prompts = [] # Store only the subtask text for prediction
full_prompts = []
for i, task in enumerate(tasks):
cleaned_text = task.strip().replace("_", " ").replace("\n", " ")
state_str = " ".join(map(str, discretized_states[i]))
# Store only the subtask text (used as prediction target)
subtask_only_prompts.append(cleaned_text)
if cleaned_high_level_tasks:
cleaned_high_level_task = cleaned_high_level_tasks[i]
full_prompt = f"High level task: {cleaned_high_level_task}; State: {state_str}; Subtask: {cleaned_text}"
else:
full_prompt = f"Task: {cleaned_text}, State: {state_str};\n" #remove Action by jade
full_prompt = f"Task: {cleaned_text}, State: {state_str};\nAction: "
full_prompts.append(full_prompt)
low_level_prompts.append(full_prompt)
transition[TransitionKey.COMPLEMENTARY_DATA][self.task_key] = low_level_prompts
transition[TransitionKey.COMPLEMENTARY_DATA][self.subtask_only_key] = subtask_only_prompts
# Process high level tasks without state information (if available)
if high_level_tasks is not None:
high_level_prompts = []
for i, cleaned_high_level_task in enumerate(cleaned_high_level_tasks):
state_str = " ".join(map(str, discretized_states[i]))
full_prompt = f"High level task: {cleaned_high_level_task}; State: {state_str}; Subtask:"
high_level_prompts.append(full_prompt)
transition[TransitionKey.COMPLEMENTARY_DATA][self.high_level_task_key] = high_level_prompts
transition[TransitionKey.COMPLEMENTARY_DATA][self.task_key] = full_prompts
# Normalize state to [-1, 1] range if needed (assuming it's already normalized by normalizer processor step!!)
# Discretize into 256 bins (see openpi `PaligemmaTokenizer.tokenize()`)
return transition
def transform_features(
@@ -159,27 +128,25 @@ def make_pi05_pre_post_processors(
Returns:
A tuple containing the configured pre-processor and post-processor pipelines.
"""
# Add remaining processors
input_steps: list[ProcessorStep] = [
RenameObservationsProcessorStep(rename_map={}), # To mimic the same processor as pretrained one
AddBatchDimensionProcessorStep(),
# NOTE: NormalizerProcessorStep MUST come before Pi05PrepareStateAndLanguageTokenizerProcessorStep
# NOTE: NormalizerProcessorStep MUST come before Pi05PrepareStateTokenizerProcessorStep
# because the tokenizer step expects normalized state in [-1, 1] range for discretization
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
Pi05PrepareStateAndLanguageTokenizerProcessorStep(max_state_dim=config.max_state_dim),
Pi05PrepareStateTokenizerProcessorStep(max_state_dim=config.max_state_dim),
TokenizerProcessorStep(
tokenizer_name="google/paligemma-3b-pt-224",
max_length=config.tokenizer_max_length,
padding_side="right",
padding="max_length",
),
ActionTokenizerProcessorStep(
tokenizer_name="/fsx/jade_choghari/outputs/fast_tokenizer", # TODO: jade put the PI
),
DeviceProcessorStep(device=config.device),
]
@@ -189,7 +156,7 @@ def make_pi05_pre_post_processors(
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
-23
View File
@@ -1,23 +0,0 @@
export CUDA_LAUNCH_BLOCKING=1
lerobot-train \
--dataset.repo_id=local \
--dataset.root=/fsx/jade_choghari/outputs/collect-data-pgen \
--output_dir=/fsx/jade_choghari/outputs/pi0_fast_fruit2 \
--job_name=pi0_training \
--policy.repo_id=jade_choghari/pi0-base1 \
--policy.path=lerobot/pi05_base \
--policy.dtype=bfloat16 \
--steps=200000 \
--save_freq=5000 \
--rename_map='{
"observation.images.base": "observation.images.base_0_rgb",
"observation.images.left_wrist": "observation.images.left_wrist_0_rgb",
"observation.images.right_wrist": "observation.images.right_wrist_0_rgb",
}' \
--batch_size=16 \
--policy.device=cuda \
--policy.fast_only=true \
# --wandb.enable=true \
# --wandb.disable_artifact=true \
# --wandb.project=pi05hi-training \
# /fsx/jade_choghari/.cache/huggingface/lerobot/jadechoghari/collect-data
-13
View File
@@ -1,13 +0,0 @@
rm -rf /fsx/jade_choghari/outputs/pi0_multi_training
lerobot-train \
--dataset.repo_id=local\
--dataset.root=/fsx/jade_choghari/data/libero \
--output_dir=/fsx/jade_choghari/outputs/pi0_multi_training \
--job_name=pi0_multi_training \
--policy.repo_id=jadechoghari/pi0-base1 \
--policy.path=/fsx/jade_choghari/outputs/libero_training_fast_6/checkpoints/last/pretrained_model/ \
--policy.dtype=bfloat16 \
--steps=50000 \
--save_freq=5000 \
--batch_size=4 \
--policy.device=cuda \
-12
View File
@@ -1,12 +0,0 @@
python src/lerobot/policies/pi05/train_fast_tokenizer.py \
--repo_id "local" \
--root /fsx/jade_choghari/data/libero \
--action_horizon 10 \
--encoded_dims "0:7" \
--vocab_size 1024 \
--push_to_hub \
--hub_repo_id jadechoghari/fast-libero-tokenizer-quantiles \
--normalization_mode QUANTILES \
# python train_fast_tokenizer.py --repo_id my_dataset
@@ -1,533 +0,0 @@
"""Train FAST tokenizer for action encoding.
This script:
1. Loads action chunks from LeRobotDataset (with sampling)
2. Applies delta transforms and per-timestamp normalization
3. Trains FAST tokenizer on specified action dimensions
4. Saves tokenizer to assets directory
5. Reports compression statistics
"""
import json
import numpy as np
import tyro
from pathlib import Path
from transformers import AutoProcessor
import torch
from huggingface_hub import HfApi
from lerobot.configs.types import NormalizationMode
from lerobot.datasets.lerobot_dataset import LeRobotDataset
def apply_delta_transform(state: np.ndarray, actions: np.ndarray, delta_dims: list[int] | None) -> np.ndarray:
"""Apply delta transform to specified dimensions.
Args:
state: Current state [D]
actions: Future actions [D]
delta_dims: List of dimension indices to apply delta transform to
Returns:
Transformed actions [D]
"""
if delta_dims is None or len(delta_dims) == 0:
return actions
delta_actions = actions.copy()
for dim in delta_dims:
delta_actions[dim] = actions[dim] - state[dim]
return delta_actions
def apply_normalization(
data: np.ndarray,
stats: dict[str, np.ndarray],
mode: NormalizationMode,
eps: float = 1e-8,
) -> np.ndarray:
"""Apply normalization to data based on the specified mode.
Args:
data: Data to normalize [N, H, D] or [D]
stats: Dictionary of statistics (mean, std, min, max, q01, q99, q10, q90)
mode: Normalization mode to apply
eps: Small epsilon for numerical stability
Returns:
Normalized data with the same shape as input
"""
if mode == NormalizationMode.IDENTITY:
return data
if mode == NormalizationMode.MEAN_STD:
mean = stats.get("mean")
std = stats.get("std")
if mean is None or std is None:
raise ValueError("MEAN_STD mode requires 'mean' and 'std' in stats")
return (data - mean) / np.maximum(std, eps)
if mode == NormalizationMode.MIN_MAX:
min_val = stats.get("min")
max_val = stats.get("max")
if min_val is None or max_val is None:
raise ValueError("MIN_MAX mode requires 'min' and 'max' in stats")
denom = np.maximum(max_val - min_val, eps)
return 2.0 * (data - min_val) / denom - 1.0
if mode == NormalizationMode.QUANTILES:
q01 = stats.get("q01")
q99 = stats.get("q99")
if q01 is None or q99 is None:
raise ValueError("QUANTILES mode requires 'q01' and 'q99' in stats")
denom = np.maximum(q99 - q01, eps)
# Clip to quantile range then normalize to [-1, 1]
clipped = np.clip(data, q01, q99)
return 2.0 * (clipped - q01) / denom - 1.0
if mode == NormalizationMode.QUANTILE10:
q10 = stats.get("q10")
q90 = stats.get("q90")
if q10 is None or q90 is None:
raise ValueError("QUANTILE10 mode requires 'q10' and 'q90' in stats")
denom = np.maximum(q90 - q10, eps)
# Clip to quantile range then normalize to [-1, 1]
clipped = np.clip(data, q10, q90)
return 2.0 * (clipped - q10) / denom - 1.0
raise ValueError(f"Unsupported normalization mode: {mode}")
def process_episode(args):
"""Process single episode and return action chunks."""
dataset, ep_idx, action_horizon, delta_dims, sample_fraction, state_key, use_delta_transform = args
try:
# Get episode info
ep_info = dataset.meta.episodes[ep_idx]
from_idx = ep_info["dataset_from_index"]
to_idx = ep_info["dataset_to_index"]
ep_length = to_idx - from_idx
if ep_length < action_horizon:
return None
# Load all frames in episode
# If dataset has episode filtering, we need to use the mapping
states = []
actions = []
for abs_idx in range(from_idx, to_idx):
# Map absolute index to relative index if needed
if dataset._absolute_to_relative_idx is not None:
if abs_idx not in dataset._absolute_to_relative_idx:
# This episode's frames aren't in the filtered dataset
return None
rel_idx = dataset._absolute_to_relative_idx[abs_idx]
else:
rel_idx = abs_idx
frame = dataset.hf_dataset[rel_idx]
# Get state (could be from observation.state or other state key)
if state_key in frame:
state = frame[state_key].numpy() if torch.is_tensor(frame[state_key]) else np.array(frame[state_key])
else:
# If no state key, use zeros (no delta transform)
state = np.zeros_like(frame["action"].numpy() if torch.is_tensor(frame["action"]) else np.array(frame["action"]))
action = frame["action"].numpy() if torch.is_tensor(frame["action"]) else np.array(frame["action"])
states.append(state)
actions.append(action)
states = np.array(states)
actions = np.array(actions)
# Create action chunks (sliding window)
# All actions in a chunk are relative to the FIRST state in that chunk
action_chunks = []
for i in range(len(states) - action_horizon + 1):
current_state = states[i] # First state in chunk
future_absolute_actions = actions[i:i + action_horizon]
if use_delta_transform:
# Relative actions
delta_chunk = np.zeros_like(future_absolute_actions)
for t in range(action_horizon):
delta_chunk[t] = apply_delta_transform(
current_state,
future_absolute_actions[t],
delta_dims,
)
action_chunks.append(delta_chunk)
else:
# Absolute actions (NO delta)
action_chunks.append(future_absolute_actions)
if len(action_chunks) == 0:
return None
action_chunks = np.array(action_chunks)
# Sample chunks
if sample_fraction < 1.0:
n_chunks = len(action_chunks)
n_samples = max(1, int(n_chunks * sample_fraction))
episode_seed = hash(ep_idx) % (2**31)
rng = np.random.RandomState(episode_seed)
indices = rng.choice(n_chunks, size=n_samples, replace=False)
action_chunks = action_chunks[indices]
return action_chunks
except Exception as e:
print(f"Error processing episode {ep_idx}: {e}")
import traceback
traceback.print_exc()
return None
def train_fast_tokenizer(
action_chunks: np.ndarray,
vocab_size: int = 1024,
scale: float = 10.0,
) -> AutoProcessor:
"""
Train FAST tokenizer (BPE on DCT coefficients) on action chunks.
Uses the .fit() method to train a new tokenizer on the provided data.
Args:
action_chunks: Array of action chunks [N, H, D] where N=num_chunks, H=horizon, D=action_dim
vocab_size: BPE vocabulary size
scale: DCT scaling factor for quantization
Returns:
Trained FAST tokenizer
"""
print(f"Training FAST tokenizer on {len(action_chunks)} action chunks...")
print(f"Action chunk shape: {action_chunks.shape}")
print(f"Vocab size: {vocab_size}")
print(f"DCT scale: {scale}")
# Download the tokenizer source code (not pretrained weights)
# We'll train a new tokenizer on our own data
base_tokenizer = AutoProcessor.from_pretrained(
"physical-intelligence/fast",
trust_remote_code=True
)
# Convert action_chunks array to list of arrays (expected by .fit())
action_data_list = [action_chunks[i] for i in range(len(action_chunks))]
# Train the new tokenizer on our action data using .fit()
# This trains the BPE tokenizer on DCT coefficients
print("Training new tokenizer (this may take a few minutes)...")
tokenizer = base_tokenizer.fit(
action_data_list,
scale=scale,
vocab_size=vocab_size,
time_horizon=action_chunks.shape[1], # action_horizon
action_dim=action_chunks.shape[2], # encoded dimensions
)
print("✓ Tokenizer training complete!")
# Validate it works
sample_chunk = action_chunks[0]
encoded = tokenizer(sample_chunk[None])[0]
if isinstance(encoded, list):
encoded = np.array(encoded)
print(f"Sample encoding: {len(encoded)} tokens for chunk shape {sample_chunk.shape}")
return tokenizer
def compute_compression_stats(tokenizer, action_chunks: np.ndarray):
"""Compute compression statistics."""
print("\nComputing compression statistics...")
# Sample for stats (use max 1000 chunks for speed)
sample_size = min(1000, len(action_chunks))
sample_indices = np.random.RandomState(42).choice(len(action_chunks), size=sample_size, replace=False)
sample_chunks = action_chunks[sample_indices]
token_lengths = []
for chunk in sample_chunks:
encoded = tokenizer(chunk[None])[0]
if isinstance(encoded, list):
token_lengths.append(len(encoded))
else:
token_lengths.append(encoded.shape[0] if hasattr(encoded, 'shape') else len(encoded))
token_lengths = np.array(token_lengths)
# Compression ratio: (H * D) / avg_tokens
input_size = action_chunks.shape[1] * action_chunks.shape[2]
avg_tokens = np.mean(token_lengths)
compression_ratio = input_size / avg_tokens
stats = {
'compression_ratio': float(compression_ratio),
'mean_token_length': float(np.mean(token_lengths)),
'p99_token_length': float(np.percentile(token_lengths, 99)),
'min_token_length': float(np.min(token_lengths)),
'max_token_length': float(np.max(token_lengths)),
}
print(f"Compression Statistics:")
print(f" Average compression ratio: {stats['compression_ratio']:.2f}x")
print(f" Mean token length: {stats['mean_token_length']:.1f}")
print(f" P99 token length: {stats['p99_token_length']:.0f}")
print(f" Min token length: {stats['min_token_length']:.0f}")
print(f" Max token length: {stats['max_token_length']:.0f}")
return stats
def main(
repo_id: str,
root: str | None = None,
action_horizon: int = 10,
max_episodes: int | None = None,
sample_fraction: float = 0.1,
encoded_dims: str = "0:6,7:23",
delta_dims: str | None = None,
use_delta_transform: bool = False,
state_key: str = "observation.state",
normalization_mode: str = "QUANTILES",
vocab_size: int = 1024,
scale: float = 10.0,
output_dir: str | None = None,
push_to_hub: bool = False,
hub_repo_id: str | None = None,
hub_private: bool = False,
):
"""
Train FAST tokenizer for action encoding.
Args:
repo_id: LeRobot dataset repository ID
root: Root directory for dataset (default: ~/.cache/huggingface/lerobot)
action_horizon: Number of future actions in each chunk
max_episodes: Max episodes to use (None = all episodes in dataset)
sample_fraction: Fraction of chunks to sample per episode
encoded_dims: Comma-separated dimension ranges to encode (e.g., "0:6,7:23")
delta_dims: Comma-separated dimension indices for delta transform (e.g., "0,1,2,3,4,5")
use_delta_transform: Whether to apply delta transform (relative actions vs absolute actions)
state_key: Dataset key for state observations (default: "observation.state")
normalization_mode: Normalization mode (MEAN_STD, MIN_MAX, QUANTILES, QUANTILE10, IDENTITY)
vocab_size: FAST vocabulary size (BPE vocab size)
scale: DCT scaling factor (default: 10.0)
output_dir: Directory to save tokenizer (default: ./fast_tokenizer_{repo_id})
push_to_hub: Whether to push the tokenizer to Hugging Face Hub
hub_repo_id: Hub repository ID (e.g., "username/tokenizer-name"). If None, uses output_dir name
hub_private: Whether to create a private repository on the Hub
"""
# Load dataset
print(f"Loading dataset: {repo_id}")
dataset = LeRobotDataset(repo_id=repo_id, root=root)
print(f"Dataset loaded: {dataset.num_episodes} episodes, {dataset.num_frames} frames")
# Parse normalization mode
try:
norm_mode = NormalizationMode(normalization_mode)
except ValueError:
raise ValueError(
f"Invalid normalization_mode: {normalization_mode}. "
f"Must be one of: {', '.join([m.value for m in NormalizationMode])}"
)
print(f"Normalization mode: {norm_mode.value}")
# Parse encoded dimensions
encoded_dim_ranges = []
for range_str in encoded_dims.split(','):
start, end = map(int, range_str.strip().split(':'))
encoded_dim_ranges.append((start, end))
total_encoded_dims = sum(end - start for start, end in encoded_dim_ranges)
print(f"Encoding {total_encoded_dims} dimensions: {encoded_dims}")
# Parse delta dimensions
delta_dim_list = None
if delta_dims is not None and delta_dims.strip():
delta_dim_list = [int(d.strip()) for d in delta_dims.split(',')]
print(f"Delta dimensions: {delta_dim_list}")
else:
print("No delta dimensions specified")
print(f"Use delta transform: {use_delta_transform}")
if use_delta_transform and (delta_dim_list is None or len(delta_dim_list) == 0):
print("Warning: use_delta_transform=True but no delta_dims specified. No delta will be applied.")
print(f"Action horizon: {action_horizon}")
print(f"State key: {state_key}")
# Determine episodes to process
num_episodes = dataset.num_episodes
if max_episodes is not None:
num_episodes = min(max_episodes, num_episodes)
print(f"Processing {num_episodes} episodes...")
# Process episodes sequentially (to avoid pickling issues with dataset)
all_chunks = []
for ep_idx in range(num_episodes):
if ep_idx % 10 == 0:
print(f" Processing episode {ep_idx}/{num_episodes}...")
chunks = process_episode(
(dataset, ep_idx, action_horizon, delta_dim_list, sample_fraction, state_key, use_delta_transform)
)
if chunks is not None:
all_chunks.append(chunks)
# Concatenate all chunks
all_chunks = np.concatenate(all_chunks, axis=0)
print(f"Collected {len(all_chunks)} action chunks")
# Extract only encoded dimensions FIRST (before normalization)
encoded_chunks = []
for start, end in encoded_dim_ranges:
encoded_chunks.append(all_chunks[:, :, start:end])
encoded_chunks = np.concatenate(encoded_chunks, axis=-1) # [N, H, D_encoded]
print(f"Extracted {encoded_chunks.shape[-1]} encoded dimensions")
# Apply normalization to encoded dimensions
print(f"\nBefore normalization - overall stats:")
print(f" Min: {np.min(encoded_chunks):.4f}, Max: {np.max(encoded_chunks):.4f}")
print(f" Mean: {np.mean(encoded_chunks):.4f}, Std: {np.std(encoded_chunks):.4f}")
# Get normalization stats from dataset
norm_stats = dataset.meta.stats
if norm_stats is not None and "action" in norm_stats:
action_stats = norm_stats["action"]
# Build encoded dimension indices
encoded_dim_indices = []
for start, end in encoded_dim_ranges:
encoded_dim_indices.extend(range(start, end))
encoded_dim_indices = np.array(encoded_dim_indices)
# Extract stats for encoded dimensions only
encoded_stats = {}
for stat_name, stat_values in action_stats.items():
if isinstance(stat_values, (list, np.ndarray)):
stat_array = np.array(stat_values)
if len(stat_array) > max(encoded_dim_indices):
encoded_stats[stat_name] = stat_array[encoded_dim_indices]
if encoded_stats:
print(f"\nNormalization stats for encoded dimensions (mode: {norm_mode.value}):")
for stat_name, stat_values in encoded_stats.items():
print(f" {stat_name}: shape={stat_values.shape}, "
f"range=[{np.min(stat_values):.4f}, {np.max(stat_values):.4f}]")
# Apply normalization based on mode
try:
encoded_chunks = apply_normalization(
encoded_chunks,
encoded_stats,
norm_mode,
eps=1e-8
)
print(f"\nApplied {norm_mode.value} normalization")
except ValueError as e:
print(f"Warning: {e}. Using raw actions without normalization.")
print(f"\nAfter normalization - overall stats:")
print(f" Min: {np.min(encoded_chunks):.4f}, Max: {np.max(encoded_chunks):.4f}")
print(f" Mean: {np.mean(encoded_chunks):.4f}, Std: {np.std(encoded_chunks):.4f}")
print(f"\nPer-dimension stats (after normalization):")
for d in range(encoded_chunks.shape[-1]):
dim_data = encoded_chunks[:, :, d]
print(f" Dim {d}: min={np.min(dim_data):7.4f}, max={np.max(dim_data):7.4f}, "
f"mean={np.mean(dim_data):7.4f}, std={np.std(dim_data):7.4f}")
else:
print("Warning: Could not extract stats for encoded dimensions, using raw actions")
else:
print("Warning: No normalization stats found in dataset, using raw actions")
print(f"Encoded chunks shape: {encoded_chunks.shape}")
# Train FAST tokenizer
tokenizer = train_fast_tokenizer(
encoded_chunks,
vocab_size=vocab_size,
scale=scale,
)
# Compute compression statistics
compression_stats = compute_compression_stats(tokenizer, encoded_chunks)
# Save tokenizer
if output_dir is None:
output_dir = f"fast_tokenizer_{repo_id.replace('/', '_')}"
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
tokenizer.save_pretrained(output_path)
# Save metadata
metadata = {
'repo_id': repo_id,
'vocab_size': vocab_size,
'scale': scale,
'encoded_dims': encoded_dims,
'encoded_dim_ranges': encoded_dim_ranges,
'total_encoded_dims': total_encoded_dims,
'delta_dims': delta_dims,
'delta_dim_list': delta_dim_list,
'use_delta_transform': use_delta_transform,
'state_key': state_key,
'normalization_mode': norm_mode.value,
'action_horizon': action_horizon,
'num_training_chunks': len(encoded_chunks),
'compression_stats': compression_stats,
}
with open(output_path / "metadata.json", 'w') as f:
json.dump(metadata, f, indent=2)
print(f"\nSaved FAST tokenizer to {output_path}")
print(f"Metadata: {json.dumps(metadata, indent=2)}")
# Push to Hugging Face Hub if requested
if push_to_hub:
# Determine the hub repository ID
if hub_repo_id is None:
hub_repo_id = output_path.name
print(f"\nNo hub_repo_id provided, using: {hub_repo_id}")
print(f"\nPushing tokenizer to Hugging Face Hub: {hub_repo_id}")
print(f" Private: {hub_private}")
try:
# Use the tokenizer's push_to_hub method
tokenizer.push_to_hub(
repo_id=hub_repo_id,
private=hub_private,
commit_message=f"Upload FAST tokenizer trained on {repo_id}"
)
# Also upload the metadata.json file separately
api = HfApi()
api.upload_file(
path_or_fileobj=str(output_path / "metadata.json"),
path_in_repo="metadata.json",
repo_id=hub_repo_id,
repo_type="model",
commit_message="Upload tokenizer metadata"
)
print(f"Successfully pushed tokenizer to: https://huggingface.co/{hub_repo_id}")
except Exception as e:
print(f"Error pushing to hub: {e}")
print(" Make sure you're logged in with `huggingface-cli login`")
if __name__ == "__main__":
tyro.cli(main)
@@ -1,101 +0,0 @@
# Train FAST Tokenizer - Usage Examples
This script trains a FAST (Factorized Action Sequence Tokenizer) on LeRobotDataset action data.
## Basic Usage
```bash
python src/lerobot/policies/pi05/train_fast_tokenizer.py \
--repo_id "lerobot/aloha_sim_insertion_human" \
--action_horizon 10 \
--encoded_dims "0:7" \
--vocab_size 1024 \
--scale 10.0
```
## Parameters
### Required
- `--repo_id`: LeRobot dataset repository ID (e.g., "lerobot/aloha_sim_insertion_human")
### Optional
- `--root`: Root directory for dataset (default: ~/.cache/huggingface/lerobot)
- `--action_horizon`: Number of future actions in each chunk (default: 10)
- `--max_episodes`: Maximum number of episodes to use (default: None = all)
- `--sample_fraction`: Fraction of chunks to sample per episode (default: 0.1)
- `--encoded_dims`: Comma-separated dimension ranges to encode (default: "0:6,7:23")
- Example: "0:7" encodes dimensions 0-6
- Example: "0:3,6:9" encodes dimensions 0-2 and 6-8
- `--delta_dims`: Comma-separated dimension indices for delta transform (default: None)
- Example: "0,1,2,3,4,5" applies delta transform to first 6 dimensions
- Delta transform: action[i] - state[i] for specified dimensions
- `--state_key`: Dataset key for state observations (default: "observation.state")
- `--vocab_size`: FAST vocabulary size / BPE vocab size (default: 1024)
- `--scale`: DCT scaling factor (default: 10.0)
- `--output_dir`: Directory to save tokenizer (default: ./fast_tokenizer_{repo_id})
## Examples
### Example 1: Train on full action space
```bash
python src/lerobot/policies/pi05/train_fast_tokenizer.py \
--repo_id "lerobot/pusht" \
--action_horizon 16 \
--encoded_dims "0:2" \
--vocab_size 512 \
--max_episodes 100
```
### Example 2: Train with delta transform
```bash
python src/lerobot/policies/pi05/train_fast_tokenizer.py \
--repo_id "lerobot/aloha_sim_insertion_human" \
--action_horizon 10 \
--encoded_dims "0:14" \
--delta_dims "0,1,2,3,4,5,6,7,8,9,10,11,12,13" \
--state_key "observation.state" \
--vocab_size 1024 \
--scale 10.0 \
--sample_fraction 0.2
```
### Example 3: Train on subset of dimensions
```bash
python src/lerobot/policies/pi05/train_fast_tokenizer.py \
--repo_id "lerobot/aloha_sim_insertion_human" \
--action_horizon 10 \
--encoded_dims "0:7" \
--vocab_size 1024 \
--output_dir "./my_tokenizer"
```
## Output
The script saves:
1. **Tokenizer files**: Trained FAST tokenizer (can be loaded with `AutoProcessor.from_pretrained()`)
2. **metadata.json**: Contains:
- Configuration parameters
- Compression statistics (compression ratio, token lengths)
- Training dataset information
## Understanding the Process
1. **Load Dataset**: Loads the LeRobotDataset from HuggingFace
2. **Extract Action Chunks**: Creates sliding windows of actions with specified horizon
3. **Apply Delta Transform**: (Optional) Computes action deltas relative to current state
4. **Select Encoded Dimensions**: Extracts only the dimensions to be encoded
5. **Normalize**: Applies quantile normalization ([q01, q99] → [-1, 1])
6. **Train Tokenizer**: Trains BPE tokenizer on DCT coefficients
7. **Compute Stats**: Reports compression ratio and token length statistics
8. **Save**: Saves tokenizer and metadata
## Notes
- **Normalization**: The script uses quantile normalization (q01, q99) from the dataset's statistics
- **Sampling**: To speed up training, you can sample a fraction of chunks per episode
- **Delta Transform**: Applied per-dimension to make actions relative to current state
- **Compression**: FAST uses DCT + BPE to compress action sequences efficiently
-28
View File
@@ -1,28 +0,0 @@
#!/bin/bash
# FSDP training script for PI05 with aggressive memory optimization
# Use this for large models that OOM with standard DDP
accelerate launch --config_file /admin/home/jade_choghari/lerobot/fsdp_config.yaml \
$(which lerobot-train) \
--dataset.repo_id=local \
--dataset.root=/fsx/jade_choghari/data/libero \
--output_dir=/fsx/jade_choghari/outputs/libero_training_fsdp \
--job_name=libero_training_fsdp \
--policy.repo_id=jade_choghari/pi05-fast-libero-fsdp \
--policy.path=/fsx/jade_choghari/models/libero-pi-fast \
--policy.dtype=bfloat16 \
--steps=100000 \
--save_freq=10 \
--batch_size=8 \
--policy.device=cuda \
--policy.fast_only=true \
--policy.scheduler_warmup_steps=2000 \
--policy.scheduler_decay_steps=60000 \
--policy.scheduler_decay_lr=1e-5 \
--policy.gradient_checkpointing=false \
--wandb.enable=true \
--wandb.disable_artifact=true \
--wandb.project=pi05-libero-training-fsdp
-24
View File
@@ -1,24 +0,0 @@
export CUDA_LAUNCH_BLOCKING=1
lerobot-train \
--dataset.repo_id=local \
--dataset.root=/fsx/jade_choghari/data/libero \
--output_dir=/fsx/jade_choghari/outputs/libero_training_fast_4 \
--job_name=libero_training_fast \
--policy.repo_id=jade_choghari/pi05-fast-libero \
--policy.path=/fsx/jade_choghari/models/pi05-base \
--policy.dtype=bfloat16 \
--steps=100000 \
--save_freq=20000 \
--batch_size=4 \
--policy.device=cuda \
--policy.fast_only=true \
--policy.scheduler_warmup_steps=1000 \
--policy.scheduler_decay_steps=30000 \
--policy.scheduler_decay_lr=1e-5 \
--policy.gradient_checkpointing=true \
--rename_map='{
"observation.images.image1": "observation.images.base_0_rgb",
"observation.images.image2": "observation.images.left_wrist_0_rgb",
}' \
--policy.empty_cameras=1 \
# /fsx/jade_choghari/.cache/huggingface/lerobot/jadechoghari/collect-data
@@ -1,15 +0,0 @@
#!/bin/bash
#SBATCH --job-name=pi05-train
#SBATCH --time=24:00:00
#SBATCH --qos=high
#SBATCH --gres=gpu:8
#SBATCH --mem=256G
#SBATCH --partition=hopper-prod
#SBATCH --output=/fsx/jade_choghari/logs/%x-%j.out
#SBATCH --error=/fsx/jade_choghari/logs/%x-%j.err
srun \
--container-image=/fsx/michel_aractingi/docker_images/huggingface+lerobot-gpu+dev.sqsh \
--container-mounts=/fsx/jade_choghari \
--container-workdir=$HOME/lerobot \
bash /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05/train_multi.sh
-36
View File
@@ -1,36 +0,0 @@
#!/bin/bash
set -euxo pipefail
# Source YOUR Miniforge conda (mounted from FSX)
source /fsx/jade_choghari/miniforge3/etc/profile.d/conda.sh
conda activate lerobot
accelerate launch --mixed_precision=bf16 --multi_gpu --num_processes=8 \
$(which lerobot-train) \
--dataset.repo_id=local \
--dataset.root=/fsx/jade_choghari/data/libero \
--output_dir=/fsx/jade_choghari/outputs/libero_training_fast_mean_1 \
--job_name=libero_training_fast \
--policy.repo_id=jade_choghari/pi05-fast-libero \
--policy.path=/fsx/jade_choghari/models/pi05-base \
--policy.dtype=bfloat16 \
--steps=100000 \
--save_freq=20000 \
--batch_size=4 \
--policy.device=cuda \
--policy.fast_only=true \
--policy.scheduler_warmup_steps=4000 \
--policy.scheduler_decay_steps=100000 \
--policy.scheduler_decay_lr=1e-5 \
--policy.gradient_checkpointing=true \
--policy.chunk_size=10 \
--policy.n_action_steps=10 \
--policy.max_action_tokens=256 \
--rename_map='{
"observation.images.image1": "observation.images.base_0_rgb",
"observation.images.image2": "observation.images.left_wrist_0_rgb",
}' \
--policy.empty_cameras=1 \
--wandb.enable=true \
--wandb.disable_artifact=true \
--wandb.project=pi05-libero-training \
+1 -1
View File
@@ -75,7 +75,7 @@ from .policy_robot_bridge import (
RobotActionToPolicyActionProcessorStep,
)
from .rename_processor import RenameObservationsProcessorStep
from .tokenizer_processor import TokenizerProcessorStep, ActionTokenizerProcessorStep
from .tokenizer_processor import TokenizerProcessorStep
__all__ = [
"ActionProcessorStep",
+1 -3
View File
@@ -168,12 +168,10 @@ def _extract_complementary_data(batch: dict[str, Any]) -> dict[str, Any]:
"""
pad_keys = {k: v for k, v in batch.items() if "_is_pad" in k}
task_key = {"task": batch["task"]} if "task" in batch else {}
user_prompt_key = {"user_prompt": batch["user_prompt"]} if "user_prompt" in batch else {}
subtask_key = {"subtask": batch["subtask"]} if "subtask" in batch else {}
index_key = {"index": batch["index"]} if "index" in batch else {}
task_index_key = {"task_index": batch["task_index"]} if "task_index" in batch else {}
return {**pad_keys, **task_key, **index_key, **task_index_key, **user_prompt_key, **subtask_key}
return {**pad_keys, **task_key, **index_key, **task_index_key}
def create_transition(
@@ -47,6 +47,7 @@ class RenameObservationsProcessorStep(ObservationProcessorStep):
processed_obs[self.rename_map[key]] = value
else:
processed_obs[key] = value
return processed_obs
def get_config(self) -> dict[str, Any]:
File diff suppressed because it is too large Load Diff
-1
View File
@@ -173,7 +173,6 @@ def rollout(
observation = env_preprocessor(observation)
observation = preprocessor(observation)
with torch.inference_mode():
action = policy.select_action(observation)
action = postprocessor(action)
+1 -9
View File
@@ -62,7 +62,6 @@ def update_policy(
accelerator: Accelerator,
lr_scheduler=None,
lock=None,
postprocessor = None,
) -> tuple[MetricsTracker, dict]:
"""
Performs a single training step to update the policy's weights.
@@ -91,10 +90,6 @@ def update_policy(
# Let accelerator handle mixed precision
with accelerator.autocast():
loss, output_dict = policy.forward(batch)
# action = policy.predict_action_chunk(batch)
# if postprocessor is not None:
# action = postprocessor(action)
# breakpoint()
# TODO(rcadene): policy.unnormalize_outputs(out_dict)
# Use accelerator's backward method
@@ -156,7 +151,7 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None):
from accelerate.utils import DistributedDataParallelKwargs
ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=True)
accelerator = Accelerator(step_scheduler_with_optimizer=False, gradient_accumulation_steps=4, kwargs_handlers=[ddp_kwargs])
accelerator = Accelerator(step_scheduler_with_optimizer=False, kwargs_handlers=[ddp_kwargs])
init_logging(accelerator=accelerator)
@@ -211,7 +206,6 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None):
ds_meta=dataset.meta,
rename_map=cfg.rename_map,
)
# Wait for all processes to finish policy creation before continuing
accelerator.wait_for_everyone()
@@ -250,7 +244,6 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None):
**postprocessor_kwargs,
)
if is_main_process:
logging.info("Creating optimizer and scheduler")
optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy)
@@ -350,7 +343,6 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None):
cfg.optimizer.grad_clip_norm,
accelerator=accelerator,
lr_scheduler=lr_scheduler,
postprocessor=postprocessor,
)
# Note: eval and checkpoint happens *after* the `step`th training update has completed, so we
+1 -8
View File
@@ -26,15 +26,8 @@ OBS_IMAGES = OBS_IMAGE + "s"
OBS_LANGUAGE = OBS_STR + ".language"
OBS_LANGUAGE_TOKENS = OBS_LANGUAGE + ".tokens"
OBS_LANGUAGE_ATTENTION_MASK = OBS_LANGUAGE + ".attention_mask"
OBS_LANGUAGE_HIGH_LEVEL_TASK = OBS_STR + ".user_prompt"
OBS_LANGUAGE_HIGH_LEVEL_TASK_TOKENS = OBS_LANGUAGE_HIGH_LEVEL_TASK + ".tokens"
OBS_LANGUAGE_HIGH_LEVEL_TASK_ATTENTION_MASK = OBS_LANGUAGE_HIGH_LEVEL_TASK + ".attention_mask"
OBS_LANGUAGE_SUBTASK_ONLY = OBS_STR + ".subtask"
OBS_LANGUAGE_SUBTASK_ONLY_TOKENS = OBS_LANGUAGE_SUBTASK_ONLY + ".tokens"
OBS_LANGUAGE_SUBTASK_ONLY_ATTENTION_MASK = OBS_LANGUAGE_SUBTASK_ONLY + ".attention_mask"
ACTION = "action"
ACTION_TOKENS = ACTION + ".tokens"
ACTION_TOKEN_MASK = ACTION + ".token_mask"
REWARD = "next.reward"
TRUNCATED = "next.truncated"
DONE = "next.done"
@@ -266,7 +266,7 @@ def create_original_observation_with_openpi_preprocessing(batch):
elif len(tasks) == 1:
tasks = tasks * batch_size
# Use pi05 state and input tokenizer logic (same as Pi05PrepareStateAndLanguageTokenizerProcessorStep)
# Use pi05 state and input tokenizer logic (same as Pi05PrepareStateTokenizerProcessorStep)
state = batch["observation.state"]
state = deepcopy(state)