Compare commits

..

32 Commits

Author SHA1 Message Date
Jade Choghari e29e89e4ed improve script, time saving subtask array
Signed-off-by: Jade Choghari <chogharijade@gmail.com>
2026-03-06 17:07:44 +03:00
root 3d55c5e484 add qwen 3.5 and fix video extraction 2026-03-04 12:22:41 +00:00
Jade Choghari 51b3b31927 more annotation changes 2026-02-12 12:46:45 +00:00
Jade Choghari 4503019d18 clean subtask 2026-02-09 10:55:22 +01:00
Jade Choghari 6aa0cc267f merge branch main 2026-02-09 08:55:11 +01:00
Jade Choghari 6629b454b2 Merge branch 'feat/add-pi05' of github.com:huggingface/lerobot into feat/add-pi05 2026-02-09 08:34:01 +01:00
Jade Choghari 0059ca7924 add cached subtask inference 2026-02-09 07:33:12 +00:00
Jade Choghari 6c94fcd1b1 add KI optional 2026-02-02 15:58:47 +00:00
Jade Choghari 092f4617ca more changes 2026-02-02 09:04:55 +00:00
Jade Choghari 6380c0d0dd example change 2026-01-29 11:21:03 +00:00
Jade Choghari 0947111edd Merge branch 'feat/add-pi05' of github.com:huggingface/lerobot into feat/add-pi05 2026-01-28 21:39:40 +01:00
Jade Choghari 477204d485 add eos to subtask token 2026-01-28 12:32:13 +00:00
Jade Choghari 4eb912da30 Merge remote-tracking branch 'origin/main' into feat/add-pi05 2026-01-27 17:48:22 +01:00
Jade Choghari 99dbbd56c2 add generation inference for subtask 2026-01-27 16:21:44 +00:00
Jade Choghari 6a6912ec37 revert .clone 2026-01-27 16:00:40 +00:00
Jade Choghari 2bf6359d24 more changes 2026-01-27 11:14:22 +00:00
Jade Choghari 4c694e20c7 comments 2026-01-26 09:19:14 +00:00
Jade Choghari 5e609426fd add knowledge insulation 2026-01-26 09:14:39 +00:00
Jade Choghari d0b6a66f34 update subtask annotate 2026-01-21 13:59:16 +00:00
Jade Choghari dc85e9b742 remove brkp 2026-01-20 23:05:44 +00:00
Jade Choghari 90d9698c7e Merge remote-tracking branch 'origin/main' into feat/add-pi05 2026-01-20 11:05:38 +00:00
Jade Choghari bbef8bb077 more 2026-01-20 10:02:59 +00:00
Jade Choghari 80417111d3 handle failed annotations 2026-01-19 16:11:32 +00:00
Jade Choghari d44f3a3bd9 update 2026-01-19 15:48:14 +00:00
Jade Choghari b864c13dfb add docs 2026-01-19 10:36:25 +00:00
Jade Choghari fd917e4fa0 add high/low/normal level annotation 2026-01-15 17:21:52 +00:00
Jade Choghari 966fedfeef add more 2026-01-15 16:35:58 +00:00
Jade Choghari 6e88d6f387 make it work- runnning example 2026-01-15 13:21:17 +00:00
Jade Choghari 83276eeb2f loss naming 2026-01-14 14:53:18 +00:00
Jade Choghari 72b0af4ed7 add three losses: flow_mse, subtask_ce, action_ce 2026-01-14 14:52:32 +00:00
Jade Choghari b57504b89e run inference, attention mask 2026-01-14 11:52:31 +00:00
Jade Choghari 72f7aaedb5 add annotation pipeline 2026-01-13 11:05:26 +00:00
59 changed files with 10412 additions and 867 deletions
+3 -5
View File
@@ -101,11 +101,9 @@ jobs:
runs-on:
group: aws-general-8-plus
if: |
github.repository == 'huggingface/lerobot' && (
(github.event_name == 'pull_request_review' && github.event.review.state == 'approved' && github.event.pull_request.head.repo.fork == false) ||
github.event_name == 'push' ||
github.event_name == 'workflow_dispatch'
)
(github.event_name == 'pull_request_review' && github.event.review.state == 'approved' && github.event.pull_request.head.repo.fork == false) ||
github.event_name == 'push' ||
github.event_name == 'workflow_dispatch'
outputs:
image_tag: ${{ steps.set_tag.outputs.image_tag }}
env:
-1
View File
@@ -91,7 +91,6 @@ jobs:
name: Build and Push Docker
runs-on:
group: aws-general-8-plus
if: github.repository == 'huggingface/lerobot'
outputs:
image_tag: ${{ env.DOCKER_IMAGE_NAME }}
env:
-134
View File
@@ -1,134 +0,0 @@
# Action tokenizer benchmark
## Questions
What is the trade-off between:
- **Compression**: how many tokens are needed to represent an action chunk (e.g. horizon × action_dim floats)?
- **Reconstruction quality**: how well does encode-then-decode preserve the original actions?
- **Speed**: how long does encoding and decoding take per chunk?
How to choose an action tokenizer?
- Which tokenizer architecture (e.g. dct + BPE, DCT + BPE)?
- Which **action horizon** and **encoded dimensions** to use?
- Which **normalization** (QUANTILES, MEAN_STD, MIN_MAX) and **delta transform** (relative vs absolute actions)?
- How do reconstruction error and compression ratio vary across datasets and tokenizer settings?
This benchmark loads action chunks from a LeRobot dataset using the same pipeline as `lerobot-train-tokenizer`, runs a trained action tokenizer in encode/decode mode, and reports reconstruction error, compression stats, and timing. Results are saved as JSON under `outputs/` for comparison and analysis.
## Variables
**Dataset & chunking**
- **repo_id**: LeRobot dataset (e.g. `lerobot/pusht`). Action statistics and normalization are taken from the dataset metadata when available.
- **action_horizon**: Number of future steps per action chunk (must match the tokenizers training).
- **encoded_dims**: Dimension ranges to encode (e.g. `0:6` or `0:6,7:14`). Must match the tokenizer.
- **max_episodes**: Cap on episodes to load (default: all).
- **sample_fraction**: Fraction of chunks to sample per episode (default `0.2`) to keep runtime manageable.
**Transform & normalization**
- **normalization_mode**: `IDENTITY`, `MEAN_STD`, `MIN_MAX`, `QUANTILES`, `QUANTILE10`. Should match the tokenizers training.
- **delta_dims**: Comma-separated dimension indices for delta (relative) transform.
- **use_delta_transform**: Whether to convert actions to relative to current state for those dimensions.
- **state_key**: Dataset key for state (e.g. `observation.state`) used when applying delta transform.
**Tokenizer & evaluation**
- **action_tokenizer_path**: Path or HuggingFace repo id of the trained tokenizer (e.g. `outputs/wavetoken`).
- **max_chunks_for_reconstruction**: Max number of chunks to use for reconstruction and timing (default `500`) to limit runtime.
### Main parameters
| parameter | default | description |
| -------------------------------- | ---------------------------- | ------------------------------------------------ |
| **action_tokenizer_path** | (required) | Path or Hub id of the trained action tokenizer. |
| **repo_id** | (required) | LeRobot dataset repo id. |
| **action_horizon** | `10` | Future steps per chunk. |
| **encoded_dims** | `0:6` | Dimension ranges to encode (e.g. `0:6,7:14`). |
| **normalization_mode** | `QUANTILES` | Normalization mode for actions. |
| **max_episodes** | all | Max episodes to load. |
| **sample_fraction** | `0.2` | Fraction of chunks sampled per episode. |
| **max_chunks_for_reconstruction**| `500` | Chunks used for reconstruction and timing. |
| **output_dir** | `outputs/action_tokenizer_benchmark` | Directory for results JSON. |
## Metrics
**Reconstruction (lower is better)**
- **reconstruction_mae**: Mean absolute error between original and decoded action chunks.
- **reconstruction_mse**: Mean squared error.
- **reconstruction_rmse**: Root mean squared error.
- **reconstruction_max_abs_error**: Maximum absolute error over all dimensions and samples.
- **per_dimension_mae**: MAE per action dimension (list of length `action_dim`).
**Compression**
- **compression_ratio**: Ratio (action_horizon × action_dim) / mean number of tokens. Higher means more compression.
- **mean_token_length**, **std_token_length**: Mean and standard deviation of token count per chunk.
- **min_token_length**, **max_token_length**: Min and max token count.
- **p50_token_length**, **p99_token_length**: 50th and 99th percentile token counts.
**Timing (seconds per chunk)**
- **mean_encode_time_sec**: Mean time to encode one chunk.
- **mean_decode_time_sec**: Mean time to decode one chunk.
The JSON output also includes **num_chunks_evaluated** and **total_chunks_available** for context.
## How the benchmark works
1. **Load dataset**: LeRobot dataset is loaded for the given `repo_id` and `root`.
2. **Build action chunks**: For each episode (up to `max_episodes`), action chunks are built with the same logic as `lerobot-train-tokenizer`: sliding window of length `action_horizon`, optional delta transform, and per-episode sampling with `sample_fraction`.
3. **Extract and normalize**: Only `encoded_dims` are kept. Normalization is applied using the datasets action stats when available, according to `normalization_mode`.
4. **Encode / decode**: A random sample of chunks (size `max_chunks_for_reconstruction`) is encoded and then decoded with the tokenizer. Encode and decode times are recorded per chunk.
5. **Compute metrics**: Reconstruction metrics are computed between original and decoded chunks; compression and timing stats are aggregated.
6. **Save results**: A JSON file is written to `output_dir` with name `{timestamp}_{repo_id}_action_tokenizer_results.json`, containing the full config and all metrics.
The pipeline (chunking, dimensions, normalization, delta) must match how the tokenizer was trained; otherwise reconstruction error can be large or the tokenizer may raise.
## Caveats
- The tokenizers **action_horizon** and **action_dim** (and optionally DCT settings) are fixed at training time. The benchmark infers dimensions from the dataset and encoded dims; the tokenizer path must correspond to a model trained with the same horizon and encoded dimensions.
- Reconstruction is evaluated in **normalized space** (the same space the tokenizer sees). For interpretation in raw action space, you would need to invert normalization outside this script.
- Only one tokenizer and one dataset are evaluated per run. To compare tokenizers or datasets, run the script multiple times and compare the saved JSON files.
## Example
Quick run with a local tokenizer and a small number of episodes:
```bash
python benchmarks/tokens/run_action_tokenizer_benchmark.py \
--action-tokenizer-path=outputs/wavetoken \
--repo-id=lerobot/pusht \
--action-horizon=10 \
--max-episodes=50 \
--output-dir=outputs/action_tokenizer_benchmark
```
With delta transform and custom encoded dimensions:
```bash
python benchmarks/tokens/run_action_tokenizer_benchmark.py \
--action-tokenizer-path=outputs/wavetoken \
--repo-id=lerobot/pusht \
--action-horizon=10 \
--encoded-dims=0:6,7:14 \
--delta-dims=0,1,2,3,4,5 \
--use-delta-transform \
--normalization-mode=QUANTILES \
--max-chunks-for-reconstruction=500 \
--output-dir=outputs/action_tokenizer_benchmark
```
Results are written to e.g. `outputs/action_tokenizer_benchmark/2026-02-12_14-30-00_lerobot_pusht_action_tokenizer_results.json`.
## Results
Results are stored as JSON in the directory given by `--output-dir` (default: `outputs/action_tokenizer_benchmark`). Each file contains:
- **config**: All script arguments (tokenizer path, repo_id, action_horizon, encoded_dims, normalization_mode, etc.) for reproducibility.
- **metrics**: All reconstruction, compression, and timing metrics described above.
To compare runs, load and diff or aggregate these JSON files with your own scripts or notebooks.
@@ -1,442 +0,0 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Benchmark action tokenization: reconstruction error, compression ratio, and timing.
Loads action chunks from a LeRobot dataset, encodes/decodes them with a trained action
tokenizer, and reports:
- Reconstruction: MAE, MSE, RMSE, max absolute error, per-dimension MAE
- Jerk: mean absolute jerk (original and reconstructed), jerk reconstruction MAE
- Compression: ratio (input size / mean tokens), token length stats
- Timing: mean encode/decode time per chunk
Results are saved to outputs/action_tokenizer_benchmark/<timestamp>_results.json.
Example:
```bash
python benchmarks/tokens/run_action_tokenizer_benchmark.py \
--action-tokenizer-path=outputs/wavetoken \
--repo-id=lerobot/pusht \
--action-horizon=10 \
--max-episodes=50 \
--output-dir=outputs/action_tokenizer_benchmark
```
"""
import argparse
import json
import time
from pathlib import Path
import numpy as np
from lerobot.configs.types import NormalizationMode
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.constants import ACTION, OBS_STATE
# Optional: use same helpers as train script if we want to avoid duplication
from lerobot.scripts.lerobot_train_tokenizer import (
apply_normalization,
process_episode,
)
def load_action_chunks(
repo_id: str,
root: str | None,
action_horizon: int,
max_episodes: int | None,
sample_fraction: float,
encoded_dims: str,
delta_dims: str | None,
use_delta_transform: bool,
state_key: str,
normalization_mode: NormalizationMode,
):
"""Load and normalize action chunks from a LeRobot dataset (same pipeline as training)."""
dataset = LeRobotDataset(repo_id=repo_id, root=root)
num_episodes = dataset.num_episodes
if max_episodes is not None:
num_episodes = min(max_episodes, num_episodes)
# Parse encoded dims
encoded_dim_ranges = []
for range_str in encoded_dims.split(","):
start, end = map(int, range_str.strip().split(":"))
encoded_dim_ranges.append((start, end))
total_encoded_dims = sum(end - start for start, end in encoded_dim_ranges)
delta_dim_list = None
if delta_dims is not None and delta_dims.strip():
delta_dim_list = [int(d.strip()) for d in delta_dims.split(",")]
all_chunks = []
for ep_idx in range(num_episodes):
chunks = process_episode(
(
dataset,
ep_idx,
action_horizon,
delta_dim_list,
sample_fraction,
state_key,
use_delta_transform,
)
)
if chunks is not None:
all_chunks.append(chunks)
if not all_chunks:
raise ValueError("No action chunks collected. Check action_horizon and dataset.")
all_chunks = np.concatenate(all_chunks, axis=0)
# Extract encoded dimensions only
encoded_chunks = []
for start, end in encoded_dim_ranges:
encoded_chunks.append(all_chunks[:, :, start:end])
encoded_chunks = np.concatenate(encoded_chunks, axis=-1)
# Normalize
norm_stats = dataset.meta.stats
if norm_stats is not None and ACTION in norm_stats:
action_stats = norm_stats[ACTION]
encoded_dim_indices = []
for start, end in encoded_dim_ranges:
encoded_dim_indices.extend(range(start, end))
encoded_dim_indices = np.array(encoded_dim_indices)
encoded_stats = {}
for stat_name, stat_values in action_stats.items():
if isinstance(stat_values, (list, np.ndarray)):
stat_array = np.array(stat_values)
if len(stat_array) > max(encoded_dim_indices):
encoded_stats[stat_name] = stat_array[encoded_dim_indices]
if encoded_stats:
try:
encoded_chunks = apply_normalization(
encoded_chunks, encoded_stats, normalization_mode, eps=1e-8
)
except ValueError:
pass
return encoded_chunks, total_encoded_dims, action_horizon, dataset.repo_id
def compute_reconstruction_metrics(original: np.ndarray, reconstructed: np.ndarray):
"""Compute reconstruction error metrics (original and reconstructed same shape [N, T, D])."""
diff = reconstructed - original
mae = float(np.mean(np.abs(diff)))
mse = float(np.mean(diff**2))
rmse = float(np.sqrt(mse))
max_abs_err = float(np.max(np.abs(diff)))
# Per-dimension MAE (over N and T)
per_dim_mae = np.mean(np.abs(diff), axis=(0, 1))
per_dim_mae = per_dim_mae.tolist()
return {
"reconstruction_mae": mae,
"reconstruction_mse": mse,
"reconstruction_rmse": rmse,
"reconstruction_max_abs_error": max_abs_err,
"per_dimension_mae": per_dim_mae,
}
def compute_jerk_metrics(original: np.ndarray, reconstructed: np.ndarray) -> dict:
"""Compute jerk (3rd derivative of action w.r.t. time) metrics.
Args:
original: Action chunks [N, T, D].
reconstructed: Reconstructed action chunks [N, T, D].
Returns:
Dict with mean absolute jerk for original, reconstructed, and jerk reconstruction MAE.
"""
# Jerk = 3rd discrete difference along time axis; need T >= 4
if original.shape[1] < 4:
return {}
jerk_orig = np.diff(original, n=3, axis=1) # (N, T-3, D)
jerk_recon = np.diff(reconstructed, n=3, axis=1)
mae_jerk_orig = float(np.mean(np.abs(jerk_orig)))
mae_jerk_recon = float(np.mean(np.abs(jerk_recon)))
jerk_reconstruction_mae = float(np.mean(np.abs(jerk_recon - jerk_orig)))
return {
"jerk_mae_original": mae_jerk_orig,
"jerk_mae_reconstructed": mae_jerk_recon,
"jerk_reconstruction_mae": jerk_reconstruction_mae,
}
def run_benchmark(
action_chunks: np.ndarray,
action_horizon: int,
action_dim: int,
tokenizer_path: str,
max_chunks_for_reconstruction: int | None = 500,
):
"""Encode/decode action chunks and compute metrics."""
from transformers import AutoProcessor
processor = AutoProcessor.from_pretrained(tokenizer_path, trust_remote_code=True)
n_chunks = len(action_chunks)
sample_size = n_chunks
if max_chunks_for_reconstruction is not None:
sample_size = min(max_chunks_for_reconstruction, n_chunks)
rng = np.random.RandomState(42)
indices = rng.choice(n_chunks, size=sample_size, replace=False)
sample_chunks = action_chunks[indices]
# Encode
token_lengths = []
encode_times = []
all_tokens = []
for i in range(len(sample_chunks)):
chunk = sample_chunks[i : i + 1]
t0 = time.perf_counter()
tokens = processor(chunk)[0]
encode_times.append(time.perf_counter() - t0)
if isinstance(tokens, list):
token_lengths.append(len(tokens))
all_tokens.append(tokens)
else:
n = tokens.shape[0] if hasattr(tokens, "shape") else len(tokens)
token_lengths.append(n)
all_tokens.append(tokens.tolist() if hasattr(tokens, "tolist") else list(tokens))
# Decode (processor keeps time_horizon/action_dim from encode)
decoded_list = []
decode_times = []
for i, tok_list in enumerate(all_tokens):
t0 = time.perf_counter()
recon = processor.decode(
[tok_list],
time_horizon=action_horizon,
action_dim=action_dim,
)
decode_times.append(time.perf_counter() - t0)
decoded_list.append(recon)
decoded = np.concatenate(decoded_list, axis=0)
# Reconstruction metrics
metrics = compute_reconstruction_metrics(sample_chunks, decoded)
# Jerk metrics (3rd derivative along time)
jerk_metrics = compute_jerk_metrics(sample_chunks, decoded)
metrics.update(jerk_metrics)
# Compression
token_lengths = np.array(token_lengths)
input_size = action_horizon * action_dim
compression_ratio = input_size / float(np.mean(token_lengths))
metrics["compression_ratio"] = compression_ratio
metrics["mean_token_length"] = float(np.mean(token_lengths))
metrics["std_token_length"] = float(np.std(token_lengths))
metrics["min_token_length"] = int(np.min(token_lengths))
metrics["max_token_length"] = int(np.max(token_lengths))
metrics["p50_token_length"] = float(np.percentile(token_lengths, 50))
metrics["p99_token_length"] = float(np.percentile(token_lengths, 99))
# Timing (seconds per chunk)
metrics["mean_encode_time_sec"] = float(np.mean(encode_times))
metrics["mean_decode_time_sec"] = float(np.mean(decode_times))
metrics["num_chunks_evaluated"] = sample_size
metrics["total_chunks_available"] = n_chunks
return metrics
def main(
action_tokenizer_path: str,
repo_id: str,
root: str | None = None,
action_horizon: int = 10,
max_episodes: int | None = 100,
sample_fraction: float = 0.2,
encoded_dims: str = "0:6",
delta_dims: str | None = None,
use_delta_transform: bool = False,
state_key: str = OBS_STATE,
normalization_mode: str = "QUANTILES",
max_chunks_for_reconstruction: int | None = 500,
output_dir: str | None = None,
):
if output_dir is None:
output_dir = "outputs/action_tokenizer_benchmark"
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
try:
norm_mode = NormalizationMode(normalization_mode)
except ValueError:
norm_mode = NormalizationMode.QUANTILES
print("Loading action chunks...")
encoded_chunks, action_dim, horizon, _ = load_action_chunks(
repo_id=repo_id,
root=root,
action_horizon=action_horizon,
max_episodes=max_episodes,
sample_fraction=sample_fraction,
encoded_dims=encoded_dims,
delta_dims=delta_dims,
use_delta_transform=use_delta_transform,
state_key=state_key,
normalization_mode=norm_mode,
)
print(f"Loaded {len(encoded_chunks)} chunks, shape {encoded_chunks.shape} (H={horizon}, D={action_dim})")
print("Running tokenizer benchmark...")
metrics = run_benchmark(
action_chunks=encoded_chunks,
action_horizon=horizon,
action_dim=action_dim,
tokenizer_path=action_tokenizer_path,
max_chunks_for_reconstruction=max_chunks_for_reconstruction,
)
# Attach config for reproducibility
results = {
"config": {
"action_tokenizer_path": action_tokenizer_path,
"repo_id": repo_id,
"action_horizon": action_horizon,
"max_episodes": max_episodes,
"sample_fraction": sample_fraction,
"encoded_dims": encoded_dims,
"delta_dims": delta_dims,
"use_delta_transform": use_delta_transform,
"state_key": state_key,
"normalization_mode": normalization_mode,
},
"metrics": metrics,
}
timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
safe_repo = repo_id.replace("/", "_")
out_file = output_path / f"{timestamp}_{safe_repo}_action_tokenizer_results.json"
with open(out_file, "w") as f:
json.dump(results, f, indent=2)
print(f"Results saved to {out_file}")
print("Metrics:")
for k, v in metrics.items():
if isinstance(v, list):
print(f" {k}: (length {len(v)})")
else:
print(f" {k}: {v}")
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Benchmark action tokenization (reconstruction error, compression, timing)."
)
parser.add_argument(
"--action-tokenizer-path",
type=str,
required=True,
help="Path or HuggingFace repo id of the trained action tokenizer (e.g. outputs/wavetoken).",
)
parser.add_argument(
"--repo-id",
type=str,
required=True,
help="LeRobot dataset repo id (e.g. lerobot/pusht).",
)
parser.add_argument(
"--root",
type=str,
default=None,
help="Root directory for LeRobot datasets.",
)
parser.add_argument(
"--action-horizon",
type=int,
default=10,
help="Number of future steps per action chunk.",
)
parser.add_argument(
"--max-episodes",
type=int,
default=None,
help="Max episodes to use (default: all).",
)
parser.add_argument(
"--sample-fraction",
type=float,
default=0.2,
help="Fraction of chunks to sample per episode.",
)
parser.add_argument(
"--encoded-dims",
type=str,
default="0:6",
help="Dimension ranges to encode (e.g. 0:6,7:14).",
)
parser.add_argument(
"--delta-dims",
type=str,
default=None,
help="Comma-separated dimensions for delta transform.",
)
parser.add_argument(
"--use-delta-transform",
action="store_true",
help="Apply delta (relative) transform to specified dimensions.",
)
parser.add_argument(
"--state-key",
type=str,
default=OBS_STATE,
help="Dataset key for state (for delta transform).",
)
parser.add_argument(
"--normalization-mode",
type=str,
default="QUANTILES",
choices=[m.value for m in NormalizationMode],
help="Normalization mode for actions.",
)
parser.add_argument(
"--max-chunks-for-reconstruction",
type=int,
default=500,
help="Max chunks to use for reconstruction metrics (default: 500).",
)
parser.add_argument(
"--output-dir",
type=str,
default="outputs/action_tokenizer_benchmark",
help="Directory to save results JSON (default: outputs/action_tokenizer_benchmark).",
)
args = parser.parse_args()
main(
action_tokenizer_path=args.action_tokenizer_path,
repo_id=args.repo_id,
root=args.root,
action_horizon=args.action_horizon,
max_episodes=args.max_episodes,
sample_fraction=args.sample_fraction,
encoded_dims=args.encoded_dims,
delta_dims=args.delta_dims,
use_delta_transform=args.use_delta_transform,
state_key=args.state_key,
normalization_mode=args.normalization_mode,
max_chunks_for_reconstruction=args.max_chunks_for_reconstruction,
output_dir=args.output_dir,
)
+2
View File
@@ -27,6 +27,8 @@
title: Porting Large Datasets
- local: using_dataset_tools
title: Using the Dataset Tools
- local: annotation_tools
title: Using the Annotation Tools
- local: dataset_subtask
title: Using Subtasks in the Dataset
title: "Datasets"
+425
View File
@@ -0,0 +1,425 @@
# Dataset Annotation Tools
This guide explains how to use the automatic annotation tools to add skill labels and synthetic dialogue to your LeRobot datasets.
## Overview
The annotation pipeline consists of two main components:
1. **Subtask Annotation** (`subtask_annotate.py`): Automatically segments robot demonstrations into atomic skills using Vision-Language Models (VLMs)
2. **High-Level Annotation** (`high_level_annotate.py`): Generates synthetic user prompts and robot utterances for hierarchical policy training
These tools enable you to transform raw robot demonstration data into richly annotated datasets suitable for training hierarchical policies.
## Installation Requirements
Before using the annotation tools, ensure you have the required dependencies:
```bash
pip install transformers qwen-vl-utils opencv-python rich pandas pyarrow
```
You'll also need FFmpeg for video processing:
```bash
# Ubuntu/Debian
sudo apt-get install ffmpeg
# macOS
brew install ffmpeg
```
## Part 1: Subtask Annotation
### What It Does
The subtask annotator segments each episode into short atomic manipulation skills (1-3 seconds each). For example, a "pick and place" episode might be segmented into:
- "reach towards object" (0.0s - 1.2s)
- "grasp object" (1.2s - 2.1s)
- "lift object" (2.1s - 3.5s)
- "move to target" (3.5s - 5.0s)
- "release object" (5.0s - 6.2s)
### Usage
#### Basic Example
```bash
python src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--repo-id your-username/your-dataset \
--video-key observation.images.base \
--output-dir /path/to/output
```
#### With Local Dataset
```bash
python src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--data-dir /path/to/local/dataset \
--video-key observation.images.base \
--output-dir /path/to/output
```
#### Advanced Options
```bash
python src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--repo-id your-username/your-dataset \
--video-key observation.images.base \
--model Qwen/Qwen2-VL-7B-Instruct \
--batch-size 16 \
--output-dir /path/to/output \
--push-to-hub
```
### Parameters
| Parameter | Description | Default |
|-----------|-------------|---------|
| `--repo-id` | HuggingFace Hub dataset ID | Required (or use --data-dir) |
| `--data-dir` | Path to local dataset | Required (or use --repo-id) |
| `--video-key` | Video observation key | Required |
| `--model` | VLM model to use | `Qwen/Qwen2-VL-7B-Instruct` |
| `--device` | Device to run model on | `cuda` |
| `--dtype` | Model dtype | `bfloat16` |
| `--batch-size` | Episodes per batch | `8` |
| `--episodes` | Specific episodes to annotate | All episodes |
| `--output-dir` | Output directory | Auto-generated |
| `--push-to-hub` | Push to HuggingFace Hub | `False` |
### Supported Models
- **Qwen2-VL**: `Qwen/Qwen2-VL-2B-Instruct`, `Qwen/Qwen2-VL-7B-Instruct`, `Qwen/Qwen2-VL-72B-Instruct`
- **Qwen3-VL**: `Qwen/Qwen3-VL-30B-A3B-Instruct`
### Output Files
The subtask annotation creates the following files in your dataset:
1. **`meta/subtasks.parquet`**: DataFrame with unique subtask names
```python
# Structure:
# Index: subtask name (string)
# Column: subtask_index (int64)
```
2. **`meta/skills.json`**: Raw skill annotations with timestamps
```json
{
"coarse_description": "Pick and place the object",
"skill_to_subtask_index": {
"reach towards object": 0,
"grasp object": 1,
...
},
"episodes": {
"0": {
"episode_index": 0,
"description": "Pick and place the object",
"skills": [
{"name": "reach towards object", "start": 0.0, "end": 1.2},
{"name": "grasp object", "start": 1.2, "end": 2.1},
...
]
}
}
}
```
3. **`subtask_index` feature**: Added to each frame in the dataset
- Type: `int64`
- Shape: `(1,)`
- Maps each frame to its corresponding subtask
### Accessing Subtask Annotations
```python
from lerobot.datasets.lerobot_dataset import LeRobotDataset
# Load annotated dataset
dataset = LeRobotDataset(repo_id="your/dataset_with_subtasks")
# Get a frame
frame = dataset[100]
# Get the subtask for this frame
subtask_idx = frame["subtask_index"].item()
subtask_name = dataset.meta.subtasks.iloc[subtask_idx].name
print(f"Frame 100 is performing: {subtask_name}")
# Load all subtasks
subtasks_df = dataset.meta.subtasks
print(subtasks_df)
```
## Part 2: High-Level Annotation
### What It Does
The high-level annotator generates synthetic dialogue for hierarchical policy training. For each skill, it creates:
- **User Prompt** (`_t`): A natural language request from the user
- **Robot Utterance** (`u_t`): A natural language response from the robot
This enables training policies that can understand and respond to human instructions in natural dialogue.
### Prerequisites
**Important**: You must run subtask annotation first! High-level annotation requires the `skills.json` file generated by subtask annotation.
### Usage
#### Image Mode (Default)
Samples frames at regular intervals and passes images to the VLM:
```bash
python src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--repo-id your/dataset_with_subtasks \
--model Qwen/Qwen2-VL-7B-Instruct \
--image-key observation.images.base \
--output-dir /path/to/output
```
#### Video Mode
Passes entire episode videos to the VLM for better temporal understanding:
```bash
python src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--repo-id your/dataset_with_subtasks \
--model Qwen/Qwen2-VL-7B-Instruct \
--video-mode \
--video-key observation.images.base \
--video-batch-size 4 \
--output-dir /path/to/output
```
### Parameters
| Parameter | Description | Default |
|-----------|-------------|---------|
| `--repo-id` | HuggingFace Hub dataset ID | Required (or use --data-dir) |
| `--data-dir` | Path to local dataset | Required (or use --repo-id) |
| `--model` | VLM model to use | `Qwen/Qwen2-VL-7B-Instruct` |
| `--image-key` | Image observation key (image mode) | First camera key |
| `--video-mode` | Use video instead of images | `False` |
| `--video-key` | Video observation key (video mode) | Auto-detected |
| `--video-batch-size` | Episodes per batch (video mode) | `1` |
| `--sample-interval` | Sampling interval in seconds | `1.0` |
| `--temperature` | Sampling temperature | `0.7` |
| `--output-dir` | Output directory | Auto-generated |
| `--push-to-hub` | Push to HuggingFace Hub | `False` |
### Output Files
The high-level annotation creates:
1. **`meta/tasks_high_level.parquet`**: DataFrame with high-level tasks
```python
# Structure:
# Index: task string (concatenated user_prompt | robot_utterance)
# Columns:
# - task_index: int64
# - user_prompt: string
# - robot_utterance: string
# - skill: string (associated subtask)
# - scenario_type: string
# - response_type: string
```
2. **`meta/syn_annotations.jsonl`**: Debug annotations (JSONL format)
```json
{"episode_id": 0, "timestamp": 1.5, "skill_current": "grasp object", "user_prompt": "Can you pick that up?", "robot_utterance": "Sure, I'll grasp it now", ...}
```
3. **`task_index_high_level` feature**: Added to each frame
- Type: `int64`
- Shape: `(1,)`
- Maps each frame to its high-level task
### Dialogue Types Generated
The system generates diverse interaction types:
**Scenario Types:**
- `specific_object`: "Pick up the red block"
- `negative_task`: "Don't touch the blue one"
- `situated_correction`: "Actually, move to the other box instead"
- `implicit_request`: "I need something red for the tower"
- `constraint_based`: "Make sure to handle it gently"
**Response Types:**
- `confirmation`: "OK, I'll pick it up"
- `clarification`: "Just to confirm, you want me to pick up the red block?"
- `acknowledgment`: "Got it, picking up the red block"
- `constraint_acknowledgment`: "Sure, I'll pick it up gently"
### Accessing High-Level Annotations
```python
from lerobot.datasets.lerobot_dataset import LeRobotDataset
import pandas as pd
# Load annotated dataset
dataset = LeRobotDataset(repo_id="your/dataset_with_high_level_tasks")
# Get a frame
frame = dataset[100]
# Get the high-level task
task_idx = frame["task_index_high_level"].item()
# Load tasks metadata
tasks_df = pd.read_parquet(dataset.root / "meta" / "tasks_high_level.parquet")
task_row = tasks_df[tasks_df["task_index"] == task_idx].iloc[0]
print(f"User: {task_row['user_prompt']}")
print(f"Robot: {task_row['robot_utterance']}")
print(f"Skill: {task_row['skill']}")
# Use in a DataLoader
import torch
from torch.utils.data import DataLoader
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
batch = next(iter(dataloader))
print(f"Task indices: {batch['task_index_high_level']}")
print(f"User prompts: {batch['user_prompt'][0]}")
print(f"Robot utterances: {batch['robot_utterance'][0]}")
```
## Complete Pipeline Example
Here's how to run both annotation stages:
```bash
#!/bin/bash
REPO_ID="your-username/your-dataset"
MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/path/to/output"
# Step 1: Subtask Annotation
python src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--repo-id "$REPO_ID" \
--video-key observation.images.base \
--model "$MODEL" \
--batch-size 8 \
--output-dir "${OUTPUT_DIR}/subtasks"
# Step 2: High-Level Annotation (Image Mode)
python src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--data-dir "${OUTPUT_DIR}/subtasks" \
--model "$MODEL" \
--image-key observation.images.base \
--sample-interval 1.0 \
--output-dir "${OUTPUT_DIR}/final"
# Or Step 2: High-Level Annotation (Video Mode - Recommended)
python src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--data-dir "${OUTPUT_DIR}/subtasks" \
--model "$MODEL" \
--video-mode \
--video-key observation.images.base \
--video-batch-size 4 \
--output-dir "${OUTPUT_DIR}/final"
```
## Performance Tips
### For Faster Processing
1. **Increase batch size**: Use `--batch-size 16` or higher (subtask annotation)
2. **Increase video batch size**: Use `--video-batch-size 8` (high-level annotation in video mode)
3. **Larger sampling interval**: Use `--sample-interval 5.0` for testing (samples every 5 seconds instead of 1)
4. **Use smaller models**: `Qwen/Qwen2-VL-2B-Instruct` is faster than `Qwen2-VL-7B-Instruct`
5. **Process specific episodes**: Use `--episodes 0 1 2 3` to annotate only a subset
### For Better Quality
1. **Use larger models**: `Qwen/Qwen3-VL-30B-A3B-Instruct` or `Qwen/Qwen2-VL-72B-Instruct`
2. **Use video mode**: Provides better temporal context
3. **Smaller sampling intervals**: `--sample-interval 0.5` for dense annotations
4. **Adjust temperature**: Use `--temperature 0.9` for more diverse dialogue
## Memory Requirements
| Model | GPU Memory | Recommended Batch Size |
|-------|------------|------------------------|
| Qwen2-VL-2B | ~8 GB | 16-32 |
| Qwen2-VL-7B | ~16 GB | 8-16 |
| Qwen2-VL-72B | ~80 GB | 1-2 |
| Qwen3-VL-30B | ~40 GB | 4-8 |
## Troubleshooting
### "FFmpeg not found"
```bash
# Install FFmpeg
sudo apt-get install ffmpeg # Ubuntu/Debian
brew install ffmpeg # macOS
```
### "CUDA out of memory"
- Reduce batch size: `--batch-size 1` or `--video-batch-size 1`
- Use smaller model: `Qwen/Qwen2-VL-2B-Instruct`
- Use CPU: `--device cpu` (much slower)
### "No skills.json found"
Run subtask annotation first before high-level annotation.
### "Video key not found"
List available keys:
```python
from lerobot.datasets.lerobot_dataset import LeRobotDataset
dataset = LeRobotDataset(repo_id="your/dataset")
print("Video keys:", dataset.meta.video_keys)
print("Camera keys:", dataset.meta.camera_keys)
```
## Dataset Structure After Annotation
```
your_dataset_with_high_level_tasks/
├── meta/
│ ├── info.json # Original metadata
│ ├── tasks.parquet # Original tasks (preserved)
│ ├── subtasks.parquet # NEW: Subtask names and indices
│ ├── skills.json # NEW: Raw skill annotations with timestamps
│ ├── tasks_high_level.parquet # NEW: High-level tasks with dialogue
│ └── syn_annotations.jsonl # NEW: Debug annotations
├── data/
│ └── chunk-000/
│ ├── observation.images.base.mp4
│ ├── action.safetensors
│ ├── subtask_index.safetensors # NEW: Subtask per frame
│ └── task_index_high_level.safetensors # NEW: High-level task per frame
└── videos/
└── ...
```
## Citation
If you use these annotation tools in your research, please cite:
```bibtex
@article{lerobot2024,
title={LeRobot: State-of-the-art Machine Learning for Real-World Robotics},
author={LeRobot Contributors},
year={2024},
url={https://github.com/huggingface/lerobot}
}
```
## Next Steps
After annotation, you can:
1. Train hierarchical policies using the subtask and high-level annotations
2. Use the synthetic dialogue for instruction-following policy training
3. Analyze skill distributions and dialogue patterns
4. Share your annotated dataset on HuggingFace Hub with `--push-to-hub`
For training examples, see the [training documentation](../training/).
+3 -5
View File
@@ -1,15 +1,13 @@
# Installation
This guide uses conda (via miniforge) to manage environments. If you prefer another environment manager (e.g. `uv`, `venv`), ensure you have Python >=3.10 and ffmpeg installed with the `libsvtav1` encoder, then skip ahead to [Install LeRobot](#step-3-install-lerobot-).
## Step 1: Install [`miniforge`](https://conda-forge.org/download/)
## Install [`miniforge`](https://conda-forge.org/download/)
```bash
wget "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh"
bash Miniforge3-$(uname)-$(uname -m).sh
```
## Step 2: Environment Setup
## Environment Setup
Create a virtual environment with Python 3.10, using conda:
@@ -40,7 +38,7 @@ conda install ffmpeg -c conda-forge
>
> - _[On Linux only]_ If you want to bring your own ffmpeg: Install [ffmpeg build dependencies](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#GettheDependencies) and [compile ffmpeg from source with libsvtav1](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#libsvtav1), and make sure you use the corresponding ffmpeg binary to your install with `which ffmpeg`.
## Step 3: Install LeRobot 🤗
## Install LeRobot 🤗
### From Source
+3 -3
View File
@@ -360,9 +360,9 @@ ignore_errors = false
module = "lerobot.cameras.*"
ignore_errors = false
[[tool.mypy.overrides]]
module = "lerobot.motors.*"
ignore_errors = false
# [[tool.mypy.overrides]]
# module = "lerobot.motors.*"
# ignore_errors = false
# [[tool.mypy.overrides]]
# module = "lerobot.robots.*"
+1 -1
View File
@@ -13,5 +13,5 @@
# limitations under the License.
from .camera import Camera
from .configs import CameraConfig, ColorMode, Cv2Backends, Cv2Rotation
from .configs import CameraConfig, ColorMode, Cv2Rotation
from .utils import make_cameras_from_configs
-23
View File
@@ -25,10 +25,6 @@ class ColorMode(str, Enum):
RGB = "rgb"
BGR = "bgr"
@classmethod
def _missing_(cls, value: object) -> None:
raise ValueError(f"`color_mode` is expected to be in {list(cls)}, but {value} is provided.")
class Cv2Rotation(int, Enum):
NO_ROTATION = 0
@@ -36,25 +32,6 @@ class Cv2Rotation(int, Enum):
ROTATE_180 = 180
ROTATE_270 = -90
@classmethod
def _missing_(cls, value: object) -> None:
raise ValueError(f"`rotation` is expected to be in {list(cls)}, but {value} is provided.")
# Subset from https://docs.opencv.org/3.4/d4/d15/group__videoio__flags__base.html
class Cv2Backends(int, Enum):
ANY = 0
V4L2 = 200
DSHOW = 700
PVAPI = 800
ANDROID = 1000
AVFOUNDATION = 1200
MSMF = 1400
@classmethod
def _missing_(cls, value: object) -> None:
raise ValueError(f"`backend` is expected to be in {list(cls)}, but {value} is provided.")
@dataclass(kw_only=True)
class CameraConfig(draccus.ChoiceRegistry, abc.ABC): # type: ignore # TODO: add type stubs for draccus
+14 -9
View File
@@ -32,11 +32,10 @@ if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2 # type: ignore # TODO: add type stubs for OpenCV
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.errors import DeviceNotConnectedError
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..camera import Camera
from ..utils import get_cv2_rotation
from ..utils import get_cv2_backend, get_cv2_rotation
from .configuration_opencv import ColorMode, OpenCVCameraConfig
# NOTE(Steven): The maximum opencv device index depends on your operating system. For instance,
@@ -118,7 +117,7 @@ class OpenCVCamera(Camera):
self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = config.backend
self.backend: int = get_cv2_backend()
if self.height and self.width:
self.capture_width, self.capture_height = self.width, self.height
@@ -133,7 +132,6 @@ class OpenCVCamera(Camera):
"""Checks if the camera is currently connected and opened."""
return isinstance(self.videocapture, cv2.VideoCapture) and self.videocapture.isOpened()
@check_if_already_connected
def connect(self, warmup: bool = True) -> None:
"""
Connects to the OpenCV camera specified in the configuration.
@@ -150,6 +148,8 @@ class OpenCVCamera(Camera):
ConnectionError: If the specified camera index/path is not found or fails to open.
RuntimeError: If the camera opens but fails to apply requested settings.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
# Use 1 thread for OpenCV operations to avoid potential conflicts or
# blocking in multi-threaded applications, especially during data collection.
@@ -178,7 +178,6 @@ class OpenCVCamera(Camera):
logger.info(f"{self} connected.")
@check_if_not_connected
def _configure_capture_settings(self) -> None:
"""
Applies the specified FOURCC, FPS, width, and height settings to the connected camera.
@@ -198,6 +197,8 @@ class OpenCVCamera(Camera):
to the requested value.
DeviceNotConnectedError: If the camera is not connected.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
# Set FOURCC first (if specified) as it can affect available FPS/resolution options
if self.config.fourcc is not None:
@@ -347,7 +348,6 @@ class OpenCVCamera(Camera):
return frame
@check_if_not_connected
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""
Reads a single frame synchronously from the camera.
@@ -374,6 +374,9 @@ class OpenCVCamera(Camera):
f"{self} read() color_mode parameter is deprecated and will be removed in future versions."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -487,7 +490,6 @@ class OpenCVCamera(Camera):
self.latest_timestamp = None
self.new_frame_event.clear()
@check_if_not_connected
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame asynchronously.
@@ -510,6 +512,8 @@ class OpenCVCamera(Camera):
TimeoutError: If no frame becomes available within the specified timeout.
RuntimeError: If an unexpected error occurs.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -529,7 +533,6 @@ class OpenCVCamera(Camera):
return frame
@check_if_not_connected
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
@@ -545,6 +548,8 @@ class OpenCVCamera(Camera):
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -15,9 +15,9 @@
from dataclasses import dataclass
from pathlib import Path
from ..configs import CameraConfig, ColorMode, Cv2Backends, Cv2Rotation
from ..configs import CameraConfig, ColorMode, Cv2Rotation
__all__ = ["OpenCVCameraConfig", "ColorMode", "Cv2Rotation", "Cv2Backends"]
__all__ = ["OpenCVCameraConfig", "ColorMode", "Cv2Rotation"]
@CameraConfig.register_subclass("opencv")
@@ -50,7 +50,6 @@ class OpenCVCameraConfig(CameraConfig):
rotation: Image rotation setting (0°, 90°, 180°, or 270°). Defaults to no rotation.
warmup_s: Time reading frames before returning from connect (in seconds)
fourcc: FOURCC code for video format (e.g., "MJPG", "YUYV", "I420"). Defaults to None (auto-detect).
backend: OpenCV backend identifier (https://docs.opencv.org/3.4/d4/d15/group__videoio__flags__base.html). Defaults to ANY.
Note:
- Only 3-channel color output (RGB/BGR) is currently supported.
@@ -63,12 +62,22 @@ class OpenCVCameraConfig(CameraConfig):
rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION
warmup_s: int = 1
fourcc: str | None = None
backend: Cv2Backends = Cv2Backends.ANY
def __post_init__(self) -> None:
self.color_mode = ColorMode(self.color_mode)
self.rotation = Cv2Rotation(self.rotation)
self.backend = Cv2Backends(self.backend)
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."
)
if self.rotation not in (
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
):
raise ValueError(
f"`rotation` is expected to be in {(Cv2Rotation.NO_ROTATION, Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_180, Cv2Rotation.ROTATE_270)}, but {self.rotation} is provided."
)
if self.fourcc is not None and (not isinstance(self.fourcc, str) or len(self.fourcc) != 4):
raise ValueError(
@@ -74,4 +74,7 @@ class Reachy2CameraConfig(CameraConfig):
f"`image_type` is expected to be 'left' or 'right' for teleop camera, and 'rgb' or 'depth' for depth camera, but {self.image_type} is provided."
)
self.color_mode = ColorMode(self.color_mode)
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
@@ -32,7 +32,6 @@ if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"
import cv2 # type: ignore # TODO: add type stubs for OpenCV
import numpy as np # type: ignore # TODO: add type stubs for numpy
from lerobot.utils.decorators import check_if_not_connected
from lerobot.utils.import_utils import _reachy2_sdk_available
if TYPE_CHECKING or _reachy2_sdk_available:
@@ -124,7 +123,6 @@ class Reachy2Camera(Camera):
"""
raise NotImplementedError("Camera detection is not implemented for Reachy2 cameras.")
@check_if_not_connected
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""
Reads a single frame synchronously from the camera.
@@ -138,6 +136,9 @@ class Reachy2Camera(Camera):
"""
start_time = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.cam_manager is None:
raise DeviceNotConnectedError(f"{self} is not connected.")
@@ -183,7 +184,6 @@ class Reachy2Camera(Camera):
return frame
@check_if_not_connected
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Same as read()
@@ -197,10 +197,11 @@ class Reachy2Camera(Camera):
TimeoutError: If no frame becomes available within the specified timeout.
RuntimeError: If an unexpected error occurs.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
return self.read()
@check_if_not_connected
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
@@ -218,6 +219,8 @@ class Reachy2Camera(Camera):
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.latest_frame is None or self.latest_timestamp is None:
raise RuntimeError(f"{self} has not captured any frames yet.")
@@ -230,7 +233,6 @@ class Reachy2Camera(Camera):
return self.latest_frame
@check_if_not_connected
def disconnect(self) -> None:
"""
Stops the background read thread (if running).
@@ -238,6 +240,8 @@ class Reachy2Camera(Camera):
Raises:
DeviceNotConnectedError: If the camera is already disconnected.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} not connected.")
if self.cam_manager is not None:
self.cam_manager.disconnect()
@@ -30,8 +30,7 @@ try:
except Exception as e:
logging.info(f"Could not import realsense: {e}")
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.errors import DeviceNotConnectedError
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..camera import Camera
from ..configs import ColorMode
@@ -153,7 +152,6 @@ class RealSenseCamera(Camera):
"""Checks if the camera pipeline is started and streams are active."""
return self.rs_pipeline is not None and self.rs_profile is not None
@check_if_already_connected
def connect(self, warmup: bool = True) -> None:
"""
Connects to the RealSense camera specified in the configuration.
@@ -171,6 +169,8 @@ class RealSenseCamera(Camera):
ConnectionError: If the camera is found but fails to start the pipeline or no RealSense devices are detected at all.
RuntimeError: If the pipeline starts but fails to apply requested settings.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
self.rs_pipeline = rs.pipeline()
rs_config = rs.config()
@@ -290,7 +290,6 @@ class RealSenseCamera(Camera):
if self.use_depth:
rs_config.enable_stream(rs.stream.depth)
@check_if_not_connected
def _configure_capture_settings(self) -> None:
"""Sets fps, width, and height from device stream if not already configured.
@@ -300,6 +299,8 @@ class RealSenseCamera(Camera):
Raises:
DeviceNotConnectedError: If device is not connected.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot validate settings for {self} as it is not connected.")
if self.rs_profile is None:
raise RuntimeError(f"{self}: rs_profile must be initialized before use.")
@@ -319,7 +320,6 @@ class RealSenseCamera(Camera):
self.width, self.height = actual_width, actual_height
self.capture_width, self.capture_height = actual_width, actual_height
@check_if_not_connected
def read_depth(self, timeout_ms: int = 200) -> NDArray[Any]:
"""
Reads a single frame (depth) synchronously from the camera.
@@ -345,6 +345,9 @@ class RealSenseCamera(Camera):
f"Failed to capture depth frame '.read_depth()'. Depth stream is not enabled for {self}."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -371,7 +374,6 @@ class RealSenseCamera(Camera):
return frame
@check_if_not_connected
def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 0) -> NDArray[Any]:
"""
Reads a single frame (color) synchronously from the camera.
@@ -401,6 +403,9 @@ class RealSenseCamera(Camera):
f"{self} read() timeout_ms parameter is deprecated and will be removed in future versions."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -529,7 +534,6 @@ class RealSenseCamera(Camera):
self.new_frame_event.clear()
# NOTE(Steven): Missing implementation for depth for now
@check_if_not_connected
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame data (color) asynchronously.
@@ -552,6 +556,8 @@ class RealSenseCamera(Camera):
TimeoutError: If no frame data becomes available within the specified timeout.
RuntimeError: If the background thread died unexpectedly or another error occurs.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -572,7 +578,6 @@ class RealSenseCamera(Camera):
return frame
# NOTE(Steven): Missing implementation for depth for now
@check_if_not_connected
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent (color) frame captured immediately (Peeking).
@@ -588,6 +593,8 @@ class RealSenseCamera(Camera):
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -60,8 +60,20 @@ class RealSenseCameraConfig(CameraConfig):
warmup_s: int = 1
def __post_init__(self) -> None:
self.color_mode = ColorMode(self.color_mode)
self.rotation = Cv2Rotation(self.rotation)
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."
)
if self.rotation not in (
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
):
raise ValueError(
f"`rotation` is expected to be in {(Cv2Rotation.NO_ROTATION, Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_180, Cv2Rotation.ROTATE_270)}, but {self.rotation} is provided."
)
values = (self.fps, self.width, self.height)
if any(v is not None for v in values) and any(v is None for v in values):
+12
View File
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import platform
from typing import cast
from lerobot.utils.import_utils import make_device_from_device_class
@@ -67,3 +68,14 @@ def get_cv2_rotation(rotation: Cv2Rotation) -> int | None:
return int(cv2.ROTATE_90_COUNTERCLOCKWISE)
else:
return None
def get_cv2_backend() -> int:
import cv2
if platform.system() == "Windows":
return int(cv2.CAP_MSMF) # Use MSMF for Windows instead of AVFOUNDATION
# elif platform.system() == "Darwin": # macOS
# return cv2.CAP_AVFOUNDATION
else: # Linux and others
return int(cv2.CAP_ANY)
+10 -6
View File
@@ -34,8 +34,7 @@ import cv2
import numpy as np
from numpy.typing import NDArray
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.errors import DeviceNotConnectedError
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..camera import Camera
from ..configs import ColorMode
@@ -105,7 +104,6 @@ class ZMQCamera(Camera):
"""Checks if the ZMQ socket is initialized and connected."""
return self._connected and self.context is not None and self.socket is not None
@check_if_already_connected
def connect(self, warmup: bool = True) -> None:
"""Connect to ZMQ camera server.
@@ -113,6 +111,8 @@ class ZMQCamera(Camera):
warmup (bool): If True, waits for the camera to provide at least one
valid frame before returning. Defaults to True.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
logger.info(f"Connecting to {self}...")
@@ -211,7 +211,6 @@ class ZMQCamera(Camera):
return frame
@check_if_not_connected
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""
Reads a single frame synchronously from the camera.
@@ -229,6 +228,9 @@ class ZMQCamera(Camera):
f"{self} read() color_mode parameter is deprecated and will be removed in future versions."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -299,7 +301,6 @@ class ZMQCamera(Camera):
self.latest_timestamp = None
self.new_frame_event.clear()
@check_if_not_connected
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame asynchronously.
@@ -316,6 +317,8 @@ class ZMQCamera(Camera):
TimeoutError: If no frame data becomes available within the specified timeout.
RuntimeError: If the background thread is not running.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -332,7 +335,6 @@ class ZMQCamera(Camera):
return frame
@check_if_not_connected
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
@@ -348,6 +350,8 @@ class ZMQCamera(Camera):
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
+4 -1
View File
@@ -32,7 +32,10 @@ class ZMQCameraConfig(CameraConfig):
warmup_s: int = 1
def __post_init__(self) -> None:
self.color_mode = ColorMode(self.color_mode)
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."
)
if self.timeout_ms <= 0:
raise ValueError(f"`timeout_ms` must be positive, but {self.timeout_ms} is provided.")
@@ -0,0 +1,50 @@
#!/bin/bash
# Example script to run synthetic data generation with Qwen VLM
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="lerobot/libero_10"
MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct"
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/libero-10-annotate-high"
BATCH_SIZE=16
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
# Run subtask annotation
# python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
# --repo-id "$REPO_ID" \
# --video-key observation.images.image \
# --output-dir "$OUTPUT_DIR" \
# --skip-existing \
# --output-repo-id "jadechoghari/libero10-annotate" \
# --batch-size "$BATCH_SIZE" \
# run synthetic data generation (all episodes processed)
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --temperature "$TEMPERATURE" \
# --batch-size "$BATCH_SIZE" \
# --sample-interval "$SAMPLE_INTERVAL" \
# --image-key observation.images.base \
# --num-image-views-per-sample 1
# for faster testing, increase sample interval:
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
# to push to hub after generation:
# add --push-to-hub flag
# efficient batch processing: 4 episodes at once
python src/lerobot/data_processing/annotations/high_level_annotate.py \
--data-dir "/fsx/jade_choghari/outputs/libero-10-annotate" \
--output-dir "$OUTPUT_DIR" \
--video-mode \
--video-key observation.images.image \
--video-batch-size "$BATCH_SIZE" \
--sample-interval 5.0
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,52 @@
import torch
from huggingface_hub import HfApi
import lerobot
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.policies.factory import make_pre_post_processors
from lerobot.configs.policies import PreTrainedConfig
# /fsx/jade_choghari/data/libero_10_subtasks_kw_converted
dataset = LeRobotDataset(repo_id="lerobot/libero_10_image_subtask")
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=2,
shuffle=True,
)
cfg = PreTrainedConfig.from_pretrained(
pretrained_name_or_path="/fsx/jade_choghari/models/pi05-base",
)
cfg.dtype = "bfloat16"
pre_processor, post_processor = make_pre_post_processors(
policy_cfg=cfg,
pretrained_path="/fsx/jade_choghari/models/pi05-base",
)
batch = next(iter(dataloader))
breakpoint()
batch1 = pre_processor(batch)
breakpoint()
print(batch.keys())
# print(batch['task_index_high_level'].shape)
# print(batch['task_index_high_level'])
# print(batch['user_prompt'][0])
# print(batch['robot_utterance'][0])
# print(batch['task'][0])
valid_episode_list = []
for episode_idx in range(len(dataset.meta.episodes)):
subtask_index = dataset[episode_idx]["subtask_index"]
valid_episode_list.append(episode_idx)
print(len(valid_episode_list))
# read this parquet /fsx/jade_choghari/outputs/pgen_annotations1/meta/tasks.parquett
# import pandas as pd
# tasks_df = pd.read_parquet('/fsx/jade_choghari/outputs/pgen_annotations1/meta/tasks.parquet')
# # print all
# print(tasks_df.columns)
# breakpoint()
@@ -0,0 +1,74 @@
#!/bin/bash
# Example script to run synthetic data generation with Qwen VLM
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="jadechoghari/piper-demo-20260205_103303"
# MODEL="Qwen/Qwen3-VL-30B-A3B-Thinking"
MODEL="Qwen/Qwen3.5-27B"
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen_new"
BATCH_SIZE=2
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
# Run subtask annotation.
# To use closed-vocabulary labels, add a line: --subtask-labels "label1" "label2" ...
# Example (add backslash after "$MODEL" and uncomment the next line):
# --model "$MODEL" \
# --subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can"
python /home/lerobot/src/lerobot/data_processing/annotations/subtask_annotate.py \
--repo-id "$REPO_ID" \
--video-key observation.images.top \
--output-dir "$OUTPUT_DIR" \
--output-repo-id "jadechoghari/piper-demo-annotated1" \
--push-to-hub \
--no-timer-overlay \
--model "$MODEL" \
--subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can" \
--batch-size 2
# Run subtask annotation (image-window: frames as images for better accuracy)
# python /admin/home/jade_choghari/lerobot/src/lerobot/data_processing/annotations/subtask_annotate_image.py \
# --repo-id "$REPO_ID" \
# --camera-key observation.images.wrist \
# --output-dir "$OUTPUT_DIR" \
# --output-repo-id "jadechoghari/piper-demo-annotated1-image" \
# --push-to-hub \
# --model "$MODEL" \
# --window-size 184 \
# --max-frames-per-window 16 \
# --subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can" \
# --batch-size 2
# run synthetic data generation (all episodes processed)
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --temperature "$TEMPERATURE" \
# --batch-size "$BATCH_SIZE" \
# --sample-interval "$SAMPLE_INTERVAL" \
# --image-key observation.images.base \
# --num-image-views-per-sample 1
# for faster testing, increase sample interval:
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
# to push to hub after generation:
# add --push-to-hub flag
# efficient batch processing: 4 episodes at once
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --video-mode \
# --video-key observation.images.up \
# --video-batch-size "$BATCH_SIZE" \
# --sample-interval 1.0
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,561 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Image-window subtask annotation for LeRobot datasets using Qwen VLMs.
This script assigns a subtask to each window of consecutive frames by sending
those frames as images to the VLM (instead of a video) for better accuracy.
Supports Qwen2-VL and Qwen3-VL (same models as subtask_annotate.py).
Pipeline:
1. Load a LeRobot dataset (local or Hub).
2. For each episode, slide a window over frame indices.
3. For each window, load the corresponding images (from image_key or decoded video_key).
4. Send the window of images to Qwen2-VL with the same skill prompt; get one subtask name.
5. Assign that subtask to all frames in the window.
6. Write subtasks.parquet and add subtask_index via add_features (same as subtask_annotate).
Usage:
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--data-dir /path/to/dataset --camera-key observation.images.base \\
--window-size 8 --stride 8 --output-dir ./output
"""
from __future__ import annotations
import argparse
import random
import textwrap
from pathlib import Path
import numpy as np
import PIL.Image
import torch
from rich.console import Console
from lerobot.datasets.lerobot_dataset import LeRobotDataset
# Reuse data structures and save/load from the video-based annotator
from lerobot.data_processing.annotations.subtask_annotate import (
EpisodeSkills,
Skill,
load_skill_annotations,
save_skill_annotations,
)
def create_window_skill_prompt(
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> str:
"""Prompt for labeling a single window of frames with one atomic skill.
If subtask_labels are provided, the model must choose exactly one from that list.
"""
goal_context = f'The overall goal is: "{coarse_goal}".\n\n' if coarse_goal else ""
if subtask_labels:
labels_list = ", ".join(f'"{l}"' for l in subtask_labels)
label_instruction = (
f"You must choose exactly ONE skill from this list: [{labels_list}]. "
"Do not create new labels. Reply with only that label.\n\n"
)
else:
label_instruction = ""
return textwrap.dedent(f"""\
# Role
You are a Robotics Vision System that labels short clips from robot manipulation demonstrations.
# Task
{goal_context}{label_instruction}The following images are consecutive frames from a single short clip of a robot demonstration.
What single atomic manipulation skill is being performed in this clip?
# Requirements
- Reply with ONLY one short skill name (e.g. "pick up object", "move arm left", "release gripper").
- No explanation, no timestamps, no JSON. Just the skill name.
""").strip()
def _run_image_segmenter(
self,
images: list[PIL.Image.Image],
coarse_goal: str | None,
subtask_labels: list[str] | None = None,
) -> str:
"""Shared inference for Qwen2-VL and Qwen3-VL image window labeling."""
prompt = create_window_skill_prompt(coarse_goal, subtask_labels)
content = []
for img in images:
content.append({"type": "image", "image": img})
content.append({"type": "text", "text": "What single atomic skill is shown in these frames? Reply with only the skill name."})
messages = [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
{"role": "user", "content": content},
]
text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = self.process_vision_info(messages)
inputs = self.processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(**inputs, max_new_tokens=128, do_sample=False)
response = self.processor.batch_decode(
[out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids)],
skip_special_tokens=True,
)[0].strip()
skill_name = response.split("\n")[0].strip().strip('."')
return skill_name if skill_name else "unknown"
def _run_image_segmenter_batch(
self,
batch_images: list[list[PIL.Image.Image]],
coarse_goal: str | None,
subtask_labels: list[str] | None = None,
) -> list[str]:
"""Run VLM on multiple windows at once; returns one skill name per window."""
if not batch_images:
return []
prompt = create_window_skill_prompt(coarse_goal, subtask_labels)
all_texts = []
all_image_inputs = []
all_video_inputs = []
for images in batch_images:
content = []
for img in images:
content.append({"type": "image", "image": img})
content.append({"type": "text", "text": "What single atomic skill is shown in these frames? Reply with only the skill name."})
messages = [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
{"role": "user", "content": content},
]
text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = self.process_vision_info(messages)
all_texts.append(text)
if image_inputs is not None:
all_image_inputs.extend(image_inputs if isinstance(image_inputs, list) else [image_inputs])
if video_inputs is not None:
all_video_inputs.extend(video_inputs if isinstance(video_inputs, list) else [video_inputs])
inputs = self.processor(
text=all_texts,
images=all_image_inputs if all_image_inputs else None,
videos=all_video_inputs if all_video_inputs else None,
padding=True,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(**inputs, max_new_tokens=128, do_sample=False)
responses = self.processor.batch_decode(
[out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids)],
skip_special_tokens=True,
)
return [
(r.split("\n")[0].strip().strip('."') or "unknown")
for r in responses
]
class Qwen2VLImageSegmenter:
"""Uses Qwen2-VL to assign one skill name to a window of images (same model as subtask_annotate)."""
def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16):
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
self.console = Console()
self.device = device
self.process_vision_info = process_vision_info
self.console.print(f"[cyan]Loading Qwen2-VL for image-window labeling: {model_name}...[/cyan]")
self.model = Qwen2VLForConditionalGeneration.from_pretrained(
model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True
)
self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
self.console.print(f"[green]✓ Model loaded on {device}[/green]")
def segment_skill_from_images(
self,
images: list[PIL.Image.Image],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> str:
"""Return a single skill name for the given window of images."""
return _run_image_segmenter(self, images, coarse_goal, subtask_labels)
def segment_skill_from_images_batch(
self,
batch_images: list[list[PIL.Image.Image]],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[str]:
"""Return one skill name per window; processes multiple windows in one forward pass."""
return _run_image_segmenter_batch(self, batch_images, coarse_goal, subtask_labels)
class Qwen3VLImageSegmenter:
"""Uses Qwen3-VL (MoE) to assign one skill name to a window of images."""
def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16):
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor, Qwen3VLMoeForConditionalGeneration
self.console = Console()
self.device = device
self.process_vision_info = process_vision_info
self.console.print(f"[cyan]Loading Qwen3-VL for image-window labeling: {model_name}...[/cyan]")
self.model = Qwen3VLMoeForConditionalGeneration.from_pretrained(
model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True
)
self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
self.console.print(f"[green]✓ Model loaded on {device}[/green]")
def segment_skill_from_images(
self,
images: list[PIL.Image.Image],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> str:
"""Return a single skill name for the given window of images."""
return _run_image_segmenter(self, images, coarse_goal, subtask_labels)
def segment_skill_from_images_batch(
self,
batch_images: list[list[PIL.Image.Image]],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[str]:
"""Return one skill name per window; processes multiple windows in one forward pass."""
return _run_image_segmenter_batch(self, batch_images, coarse_goal, subtask_labels)
def get_image_segmenter(
model_name: str,
device: str = "cuda",
torch_dtype: torch.dtype = torch.bfloat16,
):
"""Return the appropriate image-window segmenter for the model (Qwen2-VL or Qwen3-VL)."""
model_lower = model_name.lower()
if "qwen3" in model_lower:
return Qwen3VLImageSegmenter(model_name, device, torch_dtype)
return Qwen2VLImageSegmenter(model_name, device, torch_dtype)
def frame_to_pil(frame_value) -> PIL.Image.Image:
"""Convert a single frame from dataset (tensor or PIL or path) to PIL.Image."""
if isinstance(frame_value, PIL.Image.Image):
return frame_value
if isinstance(frame_value, (str, Path)):
return PIL.Image.open(frame_value).convert("RGB")
if hasattr(frame_value, "numpy"):
arr = frame_value.numpy()
else:
arr = np.asarray(frame_value)
if arr.ndim == 3 and arr.shape[0] in (1, 3, 4):
arr = np.transpose(arr, (1, 2, 0))
if arr.dtype == np.float32 or arr.dtype == np.float64:
arr = (np.clip(arr, 0, 1) * 255).astype(np.uint8)
elif arr.dtype != np.uint8:
arr = np.clip(arr, 0, 255).astype(np.uint8)
if arr.shape[-1] == 1:
arr = np.repeat(arr, 3, axis=-1)
return PIL.Image.fromarray(arr)
def _sample_window_indices(window_length: int, max_frames: int) -> list[int]:
"""Return indices into a window of length window_length, at most max_frames, in order.
If window_length <= max_frames, returns range(window_length).
Otherwise returns sorted random sample of max_frames indices (temporal order preserved).
"""
if max_frames <= 0 or window_length <= max_frames:
return list(range(window_length))
return sorted(random.sample(range(window_length), max_frames))
class SkillAnnotatorImage:
"""Annotates episodes by sliding a window over frames and labeling each window with the VLM."""
def __init__(
self,
segmenter: Qwen2VLImageSegmenter | Qwen3VLImageSegmenter,
window_size: int = 8,
stride: int | None = None,
batch_size: int = 1,
max_frames_per_window: int | None = None,
console: Console | None = None,
):
self.segmenter = segmenter
self.window_size = window_size
self.stride = stride if stride is not None else window_size
self.batch_size = max(1, batch_size)
self.max_frames_per_window = max_frames_per_window
self.console = console or Console()
def annotate_dataset(
self,
dataset: LeRobotDataset,
camera_key: str,
episodes: list[int] | None = None,
skip_existing: bool = False,
subtask_labels: list[str] | None = None,
) -> dict[int, EpisodeSkills]:
"""Annotate episodes using image windows. camera_key can be an image_key or video_key."""
episode_indices = episodes or list(range(dataset.meta.total_episodes))
coarse_goal = self._get_coarse_goal(dataset)
annotations: dict[int, EpisodeSkills] = {}
if skip_existing:
existing = load_skill_annotations(dataset.root)
if existing and existing.get("episodes"):
existing_eps = {int(k) for k in existing["episodes"] if existing["episodes"][k].get("skills")}
episode_indices = [i for i in episode_indices if i not in existing_eps]
for ep_idx in episode_indices:
try:
skills = self._annotate_episode(
dataset, ep_idx, camera_key, coarse_goal, subtask_labels
)
if skills:
annotations[ep_idx] = EpisodeSkills(
episode_index=ep_idx,
description=coarse_goal,
skills=skills,
)
self.console.print(f"[green]✓ Episode {ep_idx}: {len(skills)} window skills[/green]")
else:
self.console.print(f"[yellow]⚠ Episode {ep_idx}: no skills[/yellow]")
except Exception as e:
self.console.print(f"[red]Episode {ep_idx} failed: {e}[/red]")
return annotations
def _get_coarse_goal(self, dataset: LeRobotDataset) -> str:
if dataset.meta.tasks is not None and len(dataset.meta.tasks) > 0:
return str(dataset.meta.tasks.index[0])
return "Perform the demonstrated manipulation task."
def _annotate_episode(
self,
dataset: LeRobotDataset,
episode_index: int,
camera_key: str,
coarse_goal: str,
subtask_labels: list[str] | None = None,
) -> list[Skill]:
ep = dataset.meta.episodes[episode_index]
ep_from = int(ep["dataset_from_index"])
ep_to = int(ep["dataset_to_index"])
length = ep_to - ep_from
fps = dataset.meta.fps
if length == 0:
return []
# Collect full windows: (images, t_start, t_end) using frame timestamps.
# If max_frames_per_window is set and window is larger, sample that many frames (order preserved).
window_specs: list[tuple[list[PIL.Image.Image], float, float]] = []
start = 0
while start + self.window_size <= length:
offsets = _sample_window_indices(
self.window_size,
self.max_frames_per_window or self.window_size,
)
frame_indices = [ep_from + start + i for i in offsets]
images = []
t_start = float(dataset[frame_indices[0]]["timestamp"].item())
for idx in frame_indices:
item = dataset[idx]
images.append(frame_to_pil(item[camera_key]))
t_end = t_start + self.window_size / fps
window_specs.append((images, t_start, t_end))
start += self.stride
# Last partial window
if start < length:
partial_len = ep_to - (ep_from + start)
offsets = _sample_window_indices(
partial_len,
self.max_frames_per_window or partial_len,
)
frame_indices = [ep_from + start + i for i in offsets]
images = []
t_start = float(dataset[frame_indices[0]]["timestamp"].item())
for idx in frame_indices:
item = dataset[idx]
images.append(frame_to_pil(item[camera_key]))
t_end = float(dataset[frame_indices[-1]]["timestamp"].item()) + 1.0 / fps
window_specs.append((images, t_start, t_end))
# Run in batches
skills: list[Skill] = []
for i in range(0, len(window_specs), self.batch_size):
chunk = window_specs[i : i + self.batch_size]
batch_images = [spec[0] for spec in chunk]
if len(batch_images) > 1:
skill_names = self.segmenter.segment_skill_from_images_batch(
batch_images, coarse_goal, subtask_labels
)
else:
skill_names = [
self.segmenter.segment_skill_from_images(
batch_images[0], coarse_goal, subtask_labels
)
]
for (_, t_start, t_end), name in zip(chunk, skill_names, strict=True):
skills.append(Skill(name=name, start=t_start, end=t_end))
return skills
def main():
parser = argparse.ArgumentParser(
description="Image-window subtask annotation using Qwen VLM (frames as images for better accuracy)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent("""\
Examples:
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--data-dir /path/to/dataset --camera-key observation.images.base \\
--window-size 8 --output-dir ./output
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--repo-id user/dataset --camera-key observation.images.base \\
--window-size 6 --stride 3 --model Qwen/Qwen2-VL-7B-Instruct
# Use Qwen3-VL (MoE)
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--data-dir /path/to/dataset --camera-key observation.images.base \\
--model Qwen/Qwen3-VL-30B-A3B-Instruct
"""),
)
data_group = parser.add_mutually_exclusive_group(required=True)
data_group.add_argument("--data-dir", type=str, help="Path to local LeRobot dataset")
data_group.add_argument("--repo-id", type=str, help="HuggingFace Hub dataset repository ID")
parser.add_argument(
"--camera-key",
type=str,
required=True,
help="Image or video observation key (e.g. observation.images.base)",
)
parser.add_argument(
"--model",
type=str,
default="Qwen/Qwen2-VL-7B-Instruct",
help="VLM model: Qwen2-VL or Qwen3-VL (default: Qwen/Qwen2-VL-7B-Instruct)",
)
parser.add_argument(
"--device",
type=str,
default="cuda",
)
parser.add_argument(
"--window-size",
type=int,
default=8,
help="Number of frames per window (default: 8)",
)
parser.add_argument(
"--stride",
type=int,
default=None,
help="Stride for sliding window (default: window_size = non-overlapping)",
)
parser.add_argument(
"--batch-size",
type=int,
default=1,
help="Number of windows to process in one VLM call (default: 1; increase for speed)",
)
parser.add_argument(
"--max-frames-per-window",
type=int,
default=None,
metavar="N",
help="If window has more than N frames, randomly sample N frames (order kept) to avoid OOM (e.g. 16)",
)
parser.add_argument("--episodes", type=int, nargs="+", help="Episode indices to annotate (default: all)")
parser.add_argument("--skip-existing", action="store_true", help="Skip episodes that already have annotations")
parser.add_argument(
"--subtask-labels",
type=str,
nargs="*",
default=None,
help="Closed vocabulary: model must choose only from these labels",
)
parser.add_argument("--output-dir", type=str, help="Output directory for dataset with subtask_index")
parser.add_argument("--output-repo-id", type=str, help="Output repo id (default: <repo_id>_with_subtasks)")
parser.add_argument("--push-to-hub", action="store_true")
args = parser.parse_args()
console = Console()
# Load dataset
console.print("[cyan]Loading dataset...[/cyan]")
if args.data_dir:
dataset = LeRobotDataset(repo_id="local/dataset", root=args.data_dir, download_videos=False)
else:
dataset = LeRobotDataset(repo_id=args.repo_id, download_videos=True)
camera_keys = dataset.meta.camera_keys
if args.camera_key not in camera_keys:
console.print(f"[red]Error: camera key '{args.camera_key}' not in {camera_keys}[/red]")
return
console.print(f"[green]✓ Loaded dataset, {dataset.meta.total_episodes} episodes[/green]")
# Same Qwen VLM as subtask_annotate (Qwen2-VL or Qwen3-VL), image windows instead of video
segmenter = get_image_segmenter(args.model, args.device, torch.bfloat16)
annotator = SkillAnnotatorImage(
segmenter=segmenter,
window_size=args.window_size,
stride=args.stride,
batch_size=args.batch_size,
max_frames_per_window=args.max_frames_per_window,
console=console,
)
annotations = annotator.annotate_dataset(
dataset=dataset,
camera_key=args.camera_key,
episodes=args.episodes,
skip_existing=args.skip_existing,
subtask_labels=args.subtask_labels,
)
if not annotations:
console.print("[yellow]No annotations to save.[/yellow]")
return
output_dir = Path(args.output_dir) if args.output_dir else None
output_repo_id = args.output_repo_id
new_dataset = save_skill_annotations(dataset, annotations, output_dir, output_repo_id)
total_skills = sum(len(a.skills) for a in annotations.values())
console.print(f"[bold green]✓ Done.[/bold green] Episodes: {len(annotations)}, total window skills: {total_skills}")
console.print(f" Dataset with subtask_index: {new_dataset.root}")
if args.push_to_hub and not args.data_dir:
console.print("[cyan]Pushing to Hub...[/cyan]")
try:
new_dataset.push_to_hub(push_videos=False)
console.print("[green]✓ Pushed.[/green]")
except Exception as e:
console.print(f"[red]Push failed: {e}[/red]")
if __name__ == "__main__":
main()
+22 -1
View File
@@ -59,6 +59,7 @@ from lerobot.datasets.utils import (
load_stats,
load_subtasks,
load_tasks,
load_tasks_high_level,
update_chunk_file_indices,
validate_episode_buffer,
validate_frame,
@@ -163,6 +164,7 @@ class LeRobotDatasetMetadata:
self.info = load_info(self.root)
check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION)
self.tasks = load_tasks(self.root)
self.tasks_high_level = load_tasks_high_level(self.root)
self.subtasks = load_subtasks(self.root)
self.episodes = load_episodes(self.root)
self.stats = load_stats(self.root)
@@ -520,6 +522,7 @@ class LeRobotDatasetMetadata:
_validate_feature_names(features)
obj.tasks = None
obj.tasks_high_level = None
obj.subtasks = None
obj.episodes = None
obj.stats = None
@@ -1067,7 +1070,17 @@ class LeRobotDataset(torch.utils.data.Dataset):
if len(self.meta.video_keys) > 0:
current_ts = item["timestamp"].item()
query_timestamps = self._get_query_timestamps(current_ts, query_indices)
video_frames = self._query_videos(query_timestamps, ep_idx)
try:
video_frames = self._query_videos(query_timestamps, ep_idx)
except Exception as e:
print("\n" + "=" * 120)
print("[VIDEO DECODE FAILURE]")
print(f"item={item}")
print(f"query_indices={query_indices}")
print(f"query_timestamps={query_timestamps}")
print(f"ep_idx={ep_idx}")
print("=" * 120 + "\n")
raise
item = {**video_frames, **item}
if self.image_transforms is not None:
@@ -1078,6 +1091,14 @@ class LeRobotDataset(torch.utils.data.Dataset):
# Add task as a string
task_idx = item["task_index"].item()
item["task"] = self.meta.tasks.iloc[task_idx].name
# optionally add high level task index
if "task_index_high_level" in self.features:
high_level_task_idx = item["task_index_high_level"].item()
item["robot_utterance"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["robot_utterance"]
item["user_prompt"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["user_prompt"]
# add subtask information if available
if "subtask_index" in self.features and self.meta.subtasks is not None:
+16
View File
@@ -62,6 +62,8 @@ CHUNK_FILE_PATTERN = "chunk-{chunk_index:03d}/file-{file_index:03d}"
DEFAULT_TASKS_PATH = "meta/tasks.parquet"
DEFAULT_SUBTASKS_PATH = "meta/subtasks.parquet"
DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_TASKS_HIGH_LEVEL_PATH = "meta/tasks_high_level.parquet"
DEFAULT_SUBTASKS_PATH = "meta/subtasks.parquet"
DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png"
@@ -353,6 +355,20 @@ def load_tasks(local_dir: Path) -> pandas.DataFrame:
tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_PATH)
return tasks
def load_tasks_high_level(local_dir: Path) -> pandas.DataFrame | None:
"""Load high-level tasks from tasks_high_level.parquet if it exists."""
tasks_high_level_path = local_dir / DEFAULT_TASKS_HIGH_LEVEL_PATH
if tasks_high_level_path.exists():
return pd.read_parquet(tasks_high_level_path)
return None
def load_subtasks(local_dir: Path) -> pandas.DataFrame | None:
"""Load subtasks from subtasks.parquet if it exists."""
subtasks_path = local_dir / DEFAULT_SUBTASKS_PATH
if subtasks_path.exists():
return pd.read_parquet(subtasks_path)
return None
def load_subtasks(local_dir: Path) -> pandas.DataFrame | None:
"""Load subtasks from subtasks.parquet if it exists."""
+2 -7
View File
@@ -112,7 +112,6 @@ class LiberoEnv(gym.Env):
visualization_height: int = 480,
init_states: bool = True,
episode_index: int = 0,
n_envs: int = 1,
camera_name_mapping: dict[str, str] | None = None,
num_steps_wait: int = 10,
control_mode: str = "relative",
@@ -146,9 +145,7 @@ class LiberoEnv(gym.Env):
self.episode_length = episode_length
# Load once and keep
self._init_states = get_task_init_states(task_suite, self.task_id) if self.init_states else None
self._reset_stride = n_envs # when performing a reset, append `_reset_stride` to `init_state_id`.
self.init_state_id = self.episode_index # tie each sub-env to a fixed init state
self._init_state_id = self.episode_index # tie each sub-env to a fixed init state
self._env = self._make_envs_task(task_suite, self.task_id)
default_steps = 500
@@ -298,8 +295,7 @@ class LiberoEnv(gym.Env):
self._env.seed(seed)
raw_obs = self._env.reset()
if self.init_states and self._init_states is not None:
raw_obs = self._env.set_init_state(self._init_states[self.init_state_id % len(self._init_states)])
self.init_state_id += self._reset_stride # Change init_state_id when reset
raw_obs = self._env.set_init_state(self._init_states[self._init_state_id])
# After reset, objects may be unstable (slightly floating, intersecting, etc.).
# Step the simulator with a no-op action for a few frames so everything settles.
@@ -377,7 +373,6 @@ def _make_env_fns(
init_states=init_states,
episode_length=episode_length,
episode_index=episode_index,
n_envs=n_envs,
control_mode=control_mode,
**local_kwargs,
)
+4 -6
View File
@@ -221,7 +221,7 @@ class RangeFinderGUI:
self.bus = bus
self.groups = groups if groups is not None else {"all": list(bus.motors)}
self.group_names = list(self.groups)
self.group_names = list(groups)
self.current_group = self.group_names[0]
if not bus.is_connected:
@@ -230,20 +230,18 @@ class RangeFinderGUI:
self.calibration = bus.read_calibration()
self.res_table = bus.model_resolution_table
self.present_cache = {
m: bus.read("Present_Position", m, normalize=False)
for motors in self.groups.values()
for m in motors
m: bus.read("Present_Position", m, normalize=False) for motors in groups.values() for m in motors
}
pygame.init()
self.font = pygame.font.Font(None, FONT_SIZE)
label_pad = max(self.font.size(m)[0] for ms in self.groups.values() for m in ms)
label_pad = max(self.font.size(m)[0] for ms in groups.values() for m in ms)
self.label_pad = label_pad
width = 40 + label_pad + BAR_LEN + 6 + BTN_W + 10 + SAVE_W + 10
self.controls_bottom = 10 + SAVE_H
self.base_y = self.controls_bottom + TOP_GAP
height = self.base_y + PADDING_Y * len(self.groups[self.current_group]) + 40
height = self.base_y + PADDING_Y * len(groups[self.current_group]) + 40
self.screen = pygame.display.set_mode((width, height))
pygame.display.set_caption("Motors range finder")
+15 -41
View File
@@ -23,7 +23,6 @@ from copy import deepcopy
from functools import cached_property
from typing import TYPE_CHECKING, Any, TypedDict
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.import_utils import _can_available
if TYPE_CHECKING or _can_available:
@@ -37,6 +36,7 @@ else:
import numpy as np
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.utils import enter_pressed, move_cursor_up
@@ -155,7 +155,6 @@ class DamiaoMotorsBus(MotorsBusBase):
"""Check if the CAN bus is connected."""
return self._is_connected and self.canbus is not None
@check_if_already_connected
def connect(self, handshake: bool = True) -> None:
"""
Open the CAN bus and initialize communication.
@@ -163,6 +162,10 @@ class DamiaoMotorsBus(MotorsBusBase):
Args:
handshake: If True, ping all motors to verify they're present
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(
f"{self.__class__.__name__}('{self.port}') is already connected."
)
try:
# Auto-detect interface type based on port name
@@ -208,9 +211,6 @@ class DamiaoMotorsBus(MotorsBusBase):
logger.info("Starting handshake with motors...")
# Drain any pending messages
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
while self.canbus.recv(timeout=0.01):
pass
@@ -246,7 +246,6 @@ class DamiaoMotorsBus(MotorsBusBase):
)
logger.info("Handshake successful. All motors ready.")
@check_if_not_connected
def disconnect(self, disable_torque: bool = True) -> None:
"""
Close the CAN bus connection.
@@ -254,6 +253,8 @@ class DamiaoMotorsBus(MotorsBusBase):
Args:
disable_torque: If True, disable torque on all motors before disconnecting
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self.__class__.__name__}('{self.port}') is not connected.")
if disable_torque:
try:
@@ -282,10 +283,6 @@ class DamiaoMotorsBus(MotorsBusBase):
recv_id = self._get_motor_recv_id(motor)
data = [0xFF] * 7 + [command_byte]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
self.canbus.send(msg)
if msg := self._recv_motor_response(expected_recv_id=recv_id):
self._process_response(motor_name, msg)
@@ -344,10 +341,6 @@ class DamiaoMotorsBus(MotorsBusBase):
recv_id = self._get_motor_recv_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False, is_fd=self.use_can_fd)
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
self.canbus.send(msg)
return self._recv_motor_response(expected_recv_id=recv_id)
@@ -363,10 +356,6 @@ class DamiaoMotorsBus(MotorsBusBase):
Returns:
CAN message if received, None otherwise
"""
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
try:
start_time = time.time()
messages_seen = []
@@ -405,13 +394,10 @@ class DamiaoMotorsBus(MotorsBusBase):
Returns:
Dictionary mapping recv_id to CAN message
"""
responses: dict[int, can.Message] = {}
responses = {}
expected_set = set(expected_recv_ids)
start_time = time.time()
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
try:
while len(responses) < len(expected_recv_ids) and (time.time() - start_time) < timeout:
# 100us poll timeout
@@ -475,9 +461,6 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_name = self._get_motor_name(motor)
motor_type = self._motor_types[motor_name]
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
@@ -505,9 +488,6 @@ class DamiaoMotorsBus(MotorsBusBase):
recv_id_to_motor: dict[int, str] = {}
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
# Step 1: Send all MIT control commands
for motor, (kp, kd, position_degrees, velocity_deg_per_sec, torque) in commands.items():
motor_id = self._get_motor_id(motor)
@@ -582,9 +562,10 @@ class DamiaoMotorsBus(MotorsBusBase):
except Exception as e:
logger.warning(f"Failed to decode response from {motor}: {e}")
@check_if_not_connected
def read(self, data_name: str, motor: str) -> Value:
"""Read a value from a single motor. Positions are always in degrees."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Refresh motor to get latest state
msg = self._refresh_motor(motor)
@@ -614,7 +595,6 @@ class DamiaoMotorsBus(MotorsBusBase):
raise ValueError(f"Unknown data_name: {data_name}")
return mapping[data_name]
@check_if_not_connected
def write(
self,
data_name: str,
@@ -625,6 +605,8 @@ class DamiaoMotorsBus(MotorsBusBase):
Write a value to a single motor. Positions are always in degrees.
Can write 'Goal_Position', 'Kp', or 'Kd'.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if data_name in ("Kp", "Kd"):
self._gains[motor][data_name.lower()] = float(value)
@@ -674,10 +656,6 @@ class DamiaoMotorsBus(MotorsBusBase):
def _batch_refresh(self, motors: list[str]) -> None:
"""Internal helper to refresh a list of motors and update cache."""
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
# Send refresh commands
for motor in motors:
motor_id = self._get_motor_id(motor)
@@ -700,12 +678,10 @@ class DamiaoMotorsBus(MotorsBusBase):
else:
logger.warning(f"Packet drop: {motor} (ID: 0x{recv_id:02X}). Using last known state.")
@check_if_not_connected
def sync_write(self, data_name: str, values: dict[str, Value]) -> None:
def sync_write(self, data_name: str, values: Value | dict[str, Value]) -> None:
"""
Write values to multiple motors simultaneously. Positions are always in degrees.
"""
if data_name in ("Kp", "Kd"):
key = data_name.lower()
for motor, val in values.items():
@@ -714,8 +690,6 @@ class DamiaoMotorsBus(MotorsBusBase):
elif data_name == "Goal_Position":
# Step 1: Send all MIT control commands
recv_id_to_motor: dict[int, str] = {}
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
for motor, value_degrees in values.items():
motor_id = self._get_motor_id(motor)
motor_name = self._get_motor_name(motor)
@@ -758,9 +732,9 @@ class DamiaoMotorsBus(MotorsBusBase):
def record_ranges_of_motion(
self,
motors: str | list[str] | None = None,
motors: NameOrID | list[NameOrID] | None = None,
display_values: bool = True,
) -> tuple[dict[str, Value], dict[str, Value]]:
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
"""
Interactively record the min/max values of each motor in degrees.
+8 -8
View File
@@ -181,10 +181,10 @@ class DynamixelMotorsBus(SerialMotorsBus):
for motor, m in self.motors.items():
calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=int(drive_modes[motor]),
homing_offset=int(offsets[motor]),
range_min=int(mins[motor]),
range_max=int(maxes[motor]),
drive_mode=drive_modes[motor],
homing_offset=offsets[motor],
range_min=mins[motor],
range_max=maxes[motor],
)
return calibration
@@ -198,7 +198,7 @@ class DynamixelMotorsBus(SerialMotorsBus):
if cache:
self.calibration = calibration_dict
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)
@@ -206,7 +206,7 @@ class DynamixelMotorsBus(SerialMotorsBus):
addr, length = get_address(self.model_ctrl_table, model, "Torque_Enable")
self._write(addr, length, motor, TorqueMode.DISABLED.value, num_retry=num_retry)
def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)
@@ -235,7 +235,7 @@ class DynamixelMotorsBus(SerialMotorsBus):
On Dynamixel Motors:
Present_Position = Actual_Position + Homing_Offset
"""
half_turn_homings: dict[NameOrID, Value] = {}
half_turn_homings = {}
for motor, pos in positions.items():
model = self._get_motor_model(motor)
max_res = self.model_resolution_table[model] - 1
@@ -258,6 +258,6 @@ class DynamixelMotorsBus(SerialMotorsBus):
if raise_on_error:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
return None
return
return {id_: data[0] for id_, data in data_list.items()}
+9 -9
View File
@@ -126,7 +126,7 @@ class FeetechMotorsBus(SerialMotorsBus):
self.port_handler = scs.PortHandler(self.port)
# HACK: monkeypatch
self.port_handler.setPacketTimeout = patch_setPacketTimeout.__get__( # type: ignore[method-assign]
self.port_handler.setPacketTimeout = patch_setPacketTimeout.__get__(
self.port_handler, scs.PortHandler
)
self.packet_handler = scs.PacketHandler(protocol_version)
@@ -262,9 +262,9 @@ class FeetechMotorsBus(SerialMotorsBus):
calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=int(offsets[motor]),
range_min=int(mins[motor]),
range_max=int(maxes[motor]),
homing_offset=offsets[motor],
range_min=mins[motor],
range_max=maxes[motor],
)
return calibration
@@ -284,7 +284,7 @@ class FeetechMotorsBus(SerialMotorsBus):
On Feetech Motors:
Present_Position = Actual_Position - Homing_Offset
"""
half_turn_homings: dict[NameOrID, Value] = {}
half_turn_homings = {}
for motor, pos in positions.items():
model = self._get_motor_model(motor)
max_res = self.model_resolution_table[model] - 1
@@ -292,7 +292,7 @@ class FeetechMotorsBus(SerialMotorsBus):
return half_turn_homings
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)
self.write("Lock", motor, 0, num_retry=num_retry)
@@ -303,7 +303,7 @@ class FeetechMotorsBus(SerialMotorsBus):
addr, length = get_address(self.model_ctrl_table, model, "Lock")
self._write(addr, length, motor, 0, num_retry=num_retry)
def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)
self.write("Lock", motor, 1, num_retry=num_retry)
@@ -334,7 +334,7 @@ class FeetechMotorsBus(SerialMotorsBus):
def _broadcast_ping(self) -> tuple[dict[int, int], int]:
import scservo_sdk as scs
data_list: dict[int, int] = {}
data_list = {}
status_length = 6
@@ -414,7 +414,7 @@ class FeetechMotorsBus(SerialMotorsBus):
if not self._is_comm_success(comm):
if raise_on_error:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
return None
return
ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}
if ids_errors:
+90 -93
View File
@@ -23,7 +23,6 @@ from __future__ import annotations
import abc
import logging
from collections.abc import Sequence
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
@@ -94,7 +93,7 @@ class MotorsBusBase(abc.ABC):
pass
@abc.abstractmethod
def sync_write(self, data_name: str, values: dict[str, Value]) -> None:
def sync_write(self, data_name: str, values: Value | dict[str, Value]) -> None:
"""Write values to multiple motors."""
pass
@@ -180,16 +179,15 @@ class Motor:
class PortHandler(Protocol):
is_open: bool
baudrate: int
packet_start_time: float
packet_timeout: float
tx_time_per_byte: float
is_using: bool
port_name: str
ser: serial.Serial
def __init__(self, port_name: str) -> None: ...
def __init__(self, port_name):
self.is_open: bool
self.baudrate: int
self.packet_start_time: float
self.packet_timeout: float
self.tx_time_per_byte: float
self.is_using: bool
self.port_name: str
self.ser: serial.Serial
def openPort(self): ...
def closePort(self): ...
@@ -242,22 +240,19 @@ class PacketHandler(Protocol):
def regWriteTxRx(self, port, id, address, length, data): ...
def syncReadTx(self, port, start_address, data_length, param, param_length): ...
def syncWriteTxOnly(self, port, start_address, data_length, param, param_length): ...
def broadcastPing(self, port): ...
class GroupSyncRead(Protocol):
port: str
ph: PortHandler
start_address: int
data_length: int
last_result: bool
is_param_changed: bool
param: list
data_dict: dict
def __init__(self, port, ph, start_address, data_length):
self.port: str
self.ph: PortHandler
self.start_address: int
self.data_length: int
self.last_result: bool
self.is_param_changed: bool
self.param: list
self.data_dict: dict
def __init__(
self, port: PortHandler, ph: PacketHandler, start_address: int, data_length: int
) -> None: ...
def makeParam(self): ...
def addParam(self, id): ...
def removeParam(self, id): ...
@@ -270,17 +265,15 @@ class GroupSyncRead(Protocol):
class GroupSyncWrite(Protocol):
port: str
ph: PortHandler
start_address: int
data_length: int
is_param_changed: bool
param: list
data_dict: dict
def __init__(self, port, ph, start_address, data_length):
self.port: str
self.ph: PortHandler
self.start_address: int
self.data_length: int
self.is_param_changed: bool
self.param: list
self.data_dict: dict
def __init__(
self, port: PortHandler, ph: PacketHandler, start_address: int, data_length: int
) -> None: ...
def makeParam(self): ...
def addParam(self, id, data): ...
def removeParam(self, id): ...
@@ -407,7 +400,7 @@ class SerialMotorsBus(MotorsBusBase):
else:
raise TypeError(f"'{motor}' should be int, str.")
def _get_motor_model(self, motor: NameOrID) -> str:
def _get_motor_model(self, motor: NameOrID) -> int:
if isinstance(motor, str):
return self.motors[motor].model
elif isinstance(motor, int):
@@ -415,19 +408,17 @@ class SerialMotorsBus(MotorsBusBase):
else:
raise TypeError(f"'{motor}' should be int, str.")
def _get_motors_list(self, motors: NameOrID | Sequence[NameOrID] | None) -> list[str]:
def _get_motors_list(self, motors: str | list[str] | None) -> list[str]:
if motors is None:
return list(self.motors)
elif isinstance(motors, str):
return [motors]
elif isinstance(motors, int):
return [self._id_to_name(motors)]
elif isinstance(motors, Sequence):
return [m if isinstance(m, str) else self._id_to_name(m) for m in motors]
elif isinstance(motors, list):
return motors.copy()
else:
raise TypeError(motors)
def _get_ids_values_dict(self, values: Value | dict[str, Value] | None) -> dict[int, Value]:
def _get_ids_values_dict(self, values: Value | dict[str, Value] | None) -> list[str]:
if isinstance(values, (int | float)):
return dict.fromkeys(self.ids, values)
elif isinstance(values, dict):
@@ -649,19 +640,18 @@ class SerialMotorsBus(MotorsBusBase):
pass
@abc.abstractmethod
def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
"""Enable torque on selected motors.
Args:
motors (int | str | list[str] | None, optional): Same semantics as :pymeth:`disable_torque`.
Defaults to `None`.
motor (int): Same semantics as :pymeth:`disable_torque`. Defaults to `None`.
num_retry (int, optional): Number of additional retry attempts on communication failure.
Defaults to 0.
"""
pass
@contextmanager
def torque_disabled(self, motors: str | list[str] | None = None):
def torque_disabled(self, motors: int | str | list[str] | None = None):
"""Context-manager that guarantees torque is re-enabled.
This helper is useful to temporarily disable torque when configuring motors.
@@ -738,19 +728,24 @@ class SerialMotorsBus(MotorsBusBase):
"""
pass
def reset_calibration(self, motors: NameOrID | Sequence[NameOrID] | None = None) -> None:
def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
"""Restore factory calibration for the selected motors.
Homing offset is set to ``0`` and min/max position limits are set to the full usable range.
The in-memory :pyattr:`calibration` is cleared.
Args:
motors (NameOrID | Sequence[NameOrID] | None, optional): Selection of motors. `None` (default)
motors (NameOrID | list[NameOrID] | None, optional): Selection of motors. `None` (default)
resets every motor.
"""
motor_names = self._get_motors_list(motors)
if motors is None:
motors = list(self.motors)
elif isinstance(motors, (str | int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
for motor in motor_names:
for motor in motors:
model = self._get_motor_model(motor)
max_res = self.model_resolution_table[model] - 1
self.write("Homing_Offset", motor, 0, normalize=False)
@@ -759,9 +754,7 @@ class SerialMotorsBus(MotorsBusBase):
self.calibration = {}
def set_half_turn_homings(
self, motors: NameOrID | Sequence[NameOrID] | None = None
) -> dict[NameOrID, Value]:
def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]:
"""Centre each motor range around its current position.
The function computes and writes a homing offset such that the present position becomes exactly one
@@ -771,12 +764,17 @@ class SerialMotorsBus(MotorsBusBase):
motors (NameOrID | list[NameOrID] | None, optional): Motors to adjust. Defaults to all motors (`None`).
Returns:
dict[str, Value]: Mapping *motor name written homing offset*.
dict[NameOrID, Value]: Mapping *motor written homing offset*.
"""
motor_names = self._get_motors_list(motors)
if motors is None:
motors = list(self.motors)
elif isinstance(motors, (str | int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
self.reset_calibration(motor_names)
actual_positions = self.sync_read("Present_Position", motor_names, normalize=False)
self.reset_calibration(motors)
actual_positions = self.sync_read("Present_Position", motors, normalize=False)
homing_offsets = self._get_half_turn_homings(actual_positions)
for motor, offset in homing_offsets.items():
self.write("Homing_Offset", motor, offset)
@@ -788,8 +786,8 @@ class SerialMotorsBus(MotorsBusBase):
pass
def record_ranges_of_motion(
self, motors: NameOrID | Sequence[NameOrID] | None = None, display_values: bool = True
) -> tuple[dict[str, Value], dict[str, Value]]:
self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
"""Interactively record the min/max encoder values of each motor.
Move the joints by hand (with torque disabled) while the method streams live positions. Press
@@ -801,25 +799,30 @@ class SerialMotorsBus(MotorsBusBase):
display_values (bool, optional): When `True` (default) a live table is printed to the console.
Returns:
tuple[dict[str, Value], dict[str, Value]]: Two dictionaries *mins* and *maxes* with the
tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: Two dictionaries *mins* and *maxes* with the
extreme values observed for each motor.
"""
motor_names = self._get_motors_list(motors)
if motors is None:
motors = list(self.motors)
elif isinstance(motors, (str | int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
start_positions = self.sync_read("Present_Position", motor_names, normalize=False)
start_positions = self.sync_read("Present_Position", motors, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
user_pressed_enter = False
while not user_pressed_enter:
positions = self.sync_read("Present_Position", motor_names, normalize=False)
positions = self.sync_read("Present_Position", motors, normalize=False)
mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()}
maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()}
if display_values:
print("\n-------------------------------------------")
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
for motor in motor_names:
for motor in motors:
print(f"{motor:<15} | {mins[motor]:>6} | {positions[motor]:>6} | {maxes[motor]:>6}")
if enter_pressed():
@@ -827,9 +830,9 @@ class SerialMotorsBus(MotorsBusBase):
if display_values and not user_pressed_enter:
# Move cursor up to overwrite the previous output
move_cursor_up(len(motor_names) + 3)
move_cursor_up(len(motors) + 3)
same_min_max = [motor for motor in motor_names if mins[motor] == maxes[motor]]
same_min_max = [motor for motor in motors if mins[motor] == maxes[motor]]
if same_min_max:
raise ValueError(f"Some motors have the same min and max values:\n{pformat(same_min_max)}")
@@ -952,12 +955,12 @@ class SerialMotorsBus(MotorsBusBase):
if raise_on_error:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
else:
return None
return
if self._is_error(error):
if raise_on_error:
raise RuntimeError(self.packet_handler.getRxPacketError(error))
else:
return None
return
return model_number
@@ -1004,13 +1007,12 @@ class SerialMotorsBus(MotorsBusBase):
err_msg = f"Failed to read '{data_name}' on {id_=} after {num_retry + 1} tries."
value, _, _ = self._read(addr, length, id_, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
decoded = self._decode_sign(data_name, {id_: value})
id_value = self._decode_sign(data_name, {id_: value})
if normalize and data_name in self.normalized_data:
normalized = self._normalize(decoded)
return normalized[id_]
id_value = self._normalize(id_value)
return decoded[id_]
return id_value[id_]
def _read(
self,
@@ -1021,7 +1023,7 @@ class SerialMotorsBus(MotorsBusBase):
num_retry: int = 0,
raise_on_error: bool = True,
err_msg: str = "",
) -> tuple[int, int, int]:
) -> tuple[int, int]:
if length == 1:
read_fn = self.packet_handler.read1ByteTxRx
elif length == 2:
@@ -1071,14 +1073,13 @@ class SerialMotorsBus(MotorsBusBase):
model = self.motors[motor].model
addr, length = get_address(self.model_ctrl_table, model, data_name)
int_value = int(value)
if normalize and data_name in self.normalized_data:
int_value = self._unnormalize({id_: value})[id_]
value = self._unnormalize({id_: value})[id_]
int_value = self._encode_sign(data_name, {id_: int_value})[id_]
value = self._encode_sign(data_name, {id_: value})[id_]
err_msg = f"Failed to write '{data_name}' on {id_=} with '{int_value}' after {num_retry + 1} tries."
self._write(addr, length, id_, int_value, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
err_msg = f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
self._write(addr, length, id_, value, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
def _write(
self,
@@ -1112,7 +1113,7 @@ class SerialMotorsBus(MotorsBusBase):
def sync_read(
self,
data_name: str,
motors: NameOrID | Sequence[NameOrID] | None = None,
motors: str | list[str] | None = None,
*,
normalize: bool = True,
num_retry: int = 0,
@@ -1121,7 +1122,7 @@ class SerialMotorsBus(MotorsBusBase):
Args:
data_name (str): Register name.
motors (NameOrID | Sequence[NameOrID] | None, optional): Motors to query. `None` (default) reads every motor.
motors (str | list[str] | None, optional): Motors to query. `None` (default) reads every motor.
normalize (bool, optional): Normalisation flag. Defaults to `True`.
num_retry (int, optional): Retry attempts. Defaults to `0`.
@@ -1142,17 +1143,16 @@ class SerialMotorsBus(MotorsBusBase):
addr, length = get_address(self.model_ctrl_table, model, data_name)
err_msg = f"Failed to sync read '{data_name}' on {ids=} after {num_retry + 1} tries."
raw_ids_values, _ = self._sync_read(
ids_values, _ = self._sync_read(
addr, length, ids, num_retry=num_retry, raise_on_error=True, err_msg=err_msg
)
decoded = self._decode_sign(data_name, raw_ids_values)
ids_values = self._decode_sign(data_name, ids_values)
if normalize and data_name in self.normalized_data:
normalized = self._normalize(decoded)
return {self._id_to_name(id_): value for id_, value in normalized.items()}
ids_values = self._normalize(ids_values)
return {self._id_to_name(id_): value for id_, value in decoded.items()}
return {self._id_to_name(id_): value for id_, value in ids_values.items()}
def _sync_read(
self,
@@ -1224,24 +1224,21 @@ class SerialMotorsBus(MotorsBusBase):
num_retry (int, optional): Retry attempts. Defaults to `0`.
"""
raw_ids_values = self._get_ids_values_dict(values)
models = [self._id_to_model(id_) for id_ in raw_ids_values]
ids_values = self._get_ids_values_dict(values)
models = [self._id_to_model(id_) for id_ in ids_values]
if self._has_different_ctrl_tables:
assert_same_address(self.model_ctrl_table, models, data_name)
model = next(iter(models))
addr, length = get_address(self.model_ctrl_table, model, data_name)
int_ids_values = {id_: int(val) for id_, val in raw_ids_values.items()}
if normalize and data_name in self.normalized_data:
int_ids_values = self._unnormalize(raw_ids_values)
ids_values = self._unnormalize(ids_values)
int_ids_values = self._encode_sign(data_name, int_ids_values)
ids_values = self._encode_sign(data_name, ids_values)
err_msg = f"Failed to sync write '{data_name}' with ids_values={int_ids_values} after {num_retry + 1} tries."
self._sync_write(
addr, length, int_ids_values, num_retry=num_retry, raise_on_error=True, err_msg=err_msg
)
err_msg = f"Failed to sync write '{data_name}' with {ids_values=} after {num_retry + 1} tries."
self._sync_write(addr, length, ids_values, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
def _sync_write(
self,
+8
View File
@@ -34,6 +34,7 @@ from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig
from lerobot.policies.groot.configuration_groot import GrootConfig
from lerobot.policies.pi0.configuration_pi0 import PI0Config
from lerobot.policies.pi05.configuration_pi05 import PI05Config
from lerobot.policies.pi05_full.configuration_pi05 import PI05FullConfig
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.sac.configuration_sac import SACConfig
from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig
@@ -390,6 +391,13 @@ def make_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, PI05FullConfig):
from lerobot.policies.pi05_full.processor_pi05 import make_pi05_full_pre_post_processors
processors = make_pi05_full_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
else:
try:
+49
View File
@@ -0,0 +1,49 @@
# π₀.₅ (pi05)
This repository contains the Hugging Face port of **π₀.₅**, adapted from [OpenPI](https://github.com/Physical-Intelligence/openpi) by the Physical Intelligence.
It is designed as a **Vision-Language-Action model with open-world generalization**.
---
## Model Overview
| Feature | π₀ | π₀.₅ |
| -------------------- | ------------------------------------------------------ | ----------------------------------------- |
| Time Conditioning | Concatenates time with actions via `action_time_mlp_*` | Uses `time_mlp_*` for AdaRMS conditioning |
| AdaRMS | Not used | Used in action expert |
| Tokenizer Length | 48 tokens | 200 tokens |
| Discrete State Input | False (Uses `state_proj` layer) | True |
| Parameter Count | Higher (includes state embedding) | Lower (no state embedding) |
---
## Citation
If you use this work, please cite both **OpenPI** and the π₀.₅ paper:
```bibtex
@misc{openpi2024,
author = {Physical Intelligence Lab},
title = {OpenPI: PyTorch Implementation of π0 and π0.5 Policies},
year = {2024},
publisher = {GitHub},
howpublished = {\url{https://github.com/Physical-Intelligence/openpi}},
license = {Apache-2.0}
}
@misc{intelligence2025pi05visionlanguageactionmodelopenworld,
title = {π₀.₅: a Vision-Language-Action Model with Open-World Generalization},
author = {Physical Intelligence and Kevin Black and Noah Brown and James Darpinian and Karan Dhabalia and Danny Driess and Adnan Esmail and Michael Equi and Chelsea Finn and Niccolo Fusai and Manuel Y. Galliker and Dibya Ghosh and Lachy Groom and Karol Hausman and Brian Ichter and Szymon Jakubczak and Tim Jones and Liyiming Ke and Devin LeBlanc and Sergey Levine and Adrian Li-Bell and Mohith Mothukuri and Suraj Nair and Karl Pertsch and Allen Z. Ren and Lucy Xiaoyang Shi and Laura Smith and Jost Tobias Springenberg and Kyle Stachowicz and James Tanner and Quan Vuong and Homer Walke and Anna Walling and Haohuan Wang and Lili Yu and Ury Zhilinsky},
year = {2025},
eprint = {2504.16054},
archivePrefix= {arXiv},
primaryClass = {cs.LG},
url = {https://arxiv.org/abs/2504.16054},
}
```
---
## License
This port follows the **Apache 2.0 License**, consistent with the original [OpenPI repository](https://github.com/Physical-Intelligence/openpi).
@@ -0,0 +1,21 @@
#!/usr/bin/env python
# Copyright 2025 Physical Intelligence and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_pi05 import PI05FullConfig
from .modeling_pi05 import PI05FullPolicy
from .processor_pi05 import make_pi05_full_pre_post_processors
__all__ = ["PI05FullConfig", "PI05FullPolicy", "make_pi05_full_pre_post_processors"]
@@ -0,0 +1,50 @@
#!/bin/bash
# Example script to run synthetic data generation with Qwen VLM
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="lerobot/libero_10"
MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct"
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/libero-10-annotate-high"
BATCH_SIZE=16
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
# Run subtask annotation
# python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
# --repo-id "$REPO_ID" \
# --video-key observation.images.image \
# --output-dir "$OUTPUT_DIR" \
# --skip-existing \
# --output-repo-id "jadechoghari/libero10-annotate" \
# --batch-size "$BATCH_SIZE" \
# run synthetic data generation (all episodes processed)
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --temperature "$TEMPERATURE" \
# --batch-size "$BATCH_SIZE" \
# --sample-interval "$SAMPLE_INTERVAL" \
# --image-key observation.images.base \
# --num-image-views-per-sample 1
# for faster testing, increase sample interval:
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
# to push to hub after generation:
# add --push-to-hub flag
# efficient batch processing: 4 episodes at once
python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--data-dir "/fsx/jade_choghari/outputs/libero-10-annotate" \
--output-dir "$OUTPUT_DIR" \
--video-mode \
--video-key observation.images.image \
--video-batch-size "$BATCH_SIZE" \
--sample-interval 5.0
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,52 @@
import torch
from huggingface_hub import HfApi
import lerobot
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.policies.factory import make_pre_post_processors
from lerobot.configs.policies import PreTrainedConfig
# /fsx/jade_choghari/data/libero_10_subtasks_kw_converted
dataset = LeRobotDataset(repo_id="lerobot/libero_10_image_subtask")
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=2,
shuffle=True,
)
cfg = PreTrainedConfig.from_pretrained(
pretrained_name_or_path="/fsx/jade_choghari/models/pi05-base",
)
cfg.dtype = "bfloat16"
pre_processor, post_processor = make_pre_post_processors(
policy_cfg=cfg,
pretrained_path="/fsx/jade_choghari/models/pi05-base",
)
batch = next(iter(dataloader))
breakpoint()
batch1 = pre_processor(batch)
breakpoint()
print(batch.keys())
# print(batch['task_index_high_level'].shape)
# print(batch['task_index_high_level'])
# print(batch['user_prompt'][0])
# print(batch['robot_utterance'][0])
# print(batch['task'][0])
valid_episode_list = []
for episode_idx in range(len(dataset.meta.episodes)):
subtask_index = dataset[episode_idx]["subtask_index"]
valid_episode_list.append(episode_idx)
print(len(valid_episode_list))
# read this parquet /fsx/jade_choghari/outputs/pgen_annotations1/meta/tasks.parquett
# import pandas as pd
# tasks_df = pd.read_parquet('/fsx/jade_choghari/outputs/pgen_annotations1/meta/tasks.parquet')
# # print all
# print(tasks_df.columns)
# breakpoint()
@@ -0,0 +1,49 @@
#!/bin/bash
# Example script to run synthetic data generation with Qwen VLM
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="jadechoghari/collect-data"
MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct"
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen_new"
BATCH_SIZE=32
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
# Run subtask annotation
python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--repo-id "$REPO_ID" \
--video-key observation.images.base \
--output-dir "$OUTPUT_DIR" \
--output-repo-id "jadechoghari/collect-data-with-subtasks"
# run synthetic data generation (all episodes processed)
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --temperature "$TEMPERATURE" \
# --batch-size "$BATCH_SIZE" \
# --sample-interval "$SAMPLE_INTERVAL" \
# --image-key observation.images.base \
# --num-image-views-per-sample 1
# for faster testing, increase sample interval:
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
# to push to hub after generation:
# add --push-to-hub flag
# efficient batch processing: 4 episodes at once
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --video-mode \
# --video-key observation.images.up \
# --video-batch-size "$BATCH_SIZE" \
# --sample-interval 1.0
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,183 @@
#!/usr/bin/env python
# Copyright 2025 Physical Intelligence and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.optim.optimizers import AdamWConfig
from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE
DEFAULT_IMAGE_SIZE = 224
@PreTrainedConfig.register_subclass("pi05_full")
@dataclass
class PI05FullConfig(PreTrainedConfig):
paligemma_variant: str = "gemma_2b"
action_expert_variant: str = "gemma_300m"
dtype: str = "float32" # Options: "bfloat16", "float32"
n_obs_steps: int = 1
chunk_size: int = 50 # Number of action steps to predict, in openpi called "action_horizon"
n_action_steps: int = 50 # Number of action steps to execute
# Shorter state and action vectors will be padded to these dimensions
max_state_dim: int = 32
max_action_dim: int = 32
# Flow matching parameters: see openpi `PI0Pytorch`
num_inference_steps: int = 10
time_sampling_beta_alpha: float = 1.5
time_sampling_beta_beta: float = 1.0
time_sampling_scale: float = 0.999
time_sampling_offset: float = 0.001
min_period: float = 4e-3
max_period: float = 4.0
# Real-Time Chunking (RTC) configuration
rtc_config: RTCConfig | None = None
image_resolution: tuple[int, int] = (
DEFAULT_IMAGE_SIZE,
DEFAULT_IMAGE_SIZE,
) # see openpi `preprocessing_pytorch.py`
# Add empty images. Used to add empty cameras when no image features are present.
empty_cameras: int = 0
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.IDENTITY,
"STATE": NormalizationMode.MEAN_STD, # Pi0.5 uses quantiles for state
"ACTION": NormalizationMode.MEAN_STD, # Pi0.5 uses quantiles for action
}
)
action_tokenizer_name: str = "physical-intelligence/fast"
text_tokenizer_name: str = "google/paligemma-3b-pt-224"
max_action_tokens: int = 256
fast_skip_tokens: int = 128
# subtask stuff
max_decoding_steps: int = 200
temperature: float = 0.0
subtask_regeneration_interval: float = 1.0 # Regenerate subtask tokens every N seconds (0 = every call)
# Training settings
gradient_checkpointing: bool = False # Enable gradient checkpointing for memory optimization
compile_model: bool = False # Whether to use torch.compile for model optimization
compile_mode: str = "max-autotune" # Torch compile mode
device: str | None = None # Device to use for the model (None = auto-detect)
# Finetuning settings
freeze_vision_encoder: bool = False # Freeze only the vision encoder
train_expert_only: bool = False # Freeze entire VLM, train only action expert and projections
knowledge_insulation: bool = True # Enable knowledge insulation in attention (blocks gradients from action to VLM K/V)
# Loss weights (used when knowledge_insulation is enabled)
loss_weight_flow: float = 1.0 # Weight for flow matching MSE loss (continuous actions)
loss_weight_action_ce: float = 1.0 # Weight for FAST action token cross-entropy loss
loss_weight_subtask_ce: float = 1.0 # Weight for subtask token cross-entropy loss
# Optimizer settings: see openpi `AdamW`
optimizer_lr: float = 2.5e-5 # see openpi `CosineDecaySchedule: peak_lr`
optimizer_betas: tuple[float, float] = (0.9, 0.95)
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 0.01
optimizer_grad_clip_norm: float = 1.0
# Scheduler settings: see openpi `CosineDecaySchedule`
# Note: These will auto-scale if --steps < scheduler_decay_steps
# For example, --steps=3000 will scale warmup to 100 and decay to 3000
scheduler_warmup_steps: int = 1_000
scheduler_decay_steps: int = 30_000
scheduler_decay_lr: float = 2.5e-6
tokenizer_max_length: int = 48 # see openpi `__post_init__`
def __post_init__(self):
super().__post_init__()
# Validate configuration
if self.n_action_steps > self.chunk_size:
raise ValueError(
f"n_action_steps ({self.n_action_steps}) cannot be greater than chunk_size ({self.chunk_size})"
)
if self.paligemma_variant not in ["gemma_300m", "gemma_2b"]:
raise ValueError(f"Invalid paligemma_variant: {self.paligemma_variant}")
if self.action_expert_variant not in ["gemma_300m", "gemma_2b"]:
raise ValueError(f"Invalid action_expert_variant: {self.action_expert_variant}")
if self.dtype not in ["bfloat16", "float32"]:
raise ValueError(f"Invalid dtype: {self.dtype}")
def validate_features(self) -> None:
"""Validate and set up input/output features."""
for i in range(self.empty_cameras):
key = OBS_IMAGES + f".empty_camera_{i}"
empty_camera = PolicyFeature(
type=FeatureType.VISUAL,
shape=(3, *self.image_resolution), # Use configured image resolution
)
self.input_features[key] = empty_camera
if OBS_STATE not in self.input_features:
state_feature = PolicyFeature(
type=FeatureType.STATE,
shape=(self.max_state_dim,), # Padded to max_state_dim
)
self.input_features[OBS_STATE] = state_feature
if ACTION not in self.output_features:
action_feature = PolicyFeature(
type=FeatureType.ACTION,
shape=(self.max_action_dim,), # Padded to max_action_dim
)
self.output_features[ACTION] = action_feature
def get_optimizer_preset(self) -> AdamWConfig:
return AdamWConfig(
lr=self.optimizer_lr,
betas=self.optimizer_betas,
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
)
def get_scheduler_preset(self):
return CosineDecayWithWarmupSchedulerConfig(
peak_lr=self.optimizer_lr,
decay_lr=self.scheduler_decay_lr,
num_warmup_steps=self.scheduler_warmup_steps,
num_decay_steps=self.scheduler_decay_steps,
)
@property
def observation_delta_indices(self) -> None:
return None
@property
def action_delta_indices(self) -> list:
return list(range(self.chunk_size))
@property
def reward_delta_indices(self) -> None:
return None
@@ -0,0 +1,92 @@
import torch
from huggingface_hub import HfApi
import lerobot
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
# import make_pre_post_processors
from lerobot.policies.factory import make_pre_post_processors
from lerobot.policies.pi05.configuration_pi05 import PI05Config
from lerobot.policies.factory import make_policy, make_policy_config
from lerobot.configs.policies import PreTrainedConfig
cfg = PreTrainedConfig.from_pretrained(
pretrained_name_or_path="/fsx/jade_choghari/models/pi05-base",
)
cfg.dtype = "bfloat16"
pre_processor, post_processor = make_pre_post_processors(
policy_cfg=cfg,
pretrained_path="/fsx/jade_choghari/models/pi05-base",
)
delta_timestamps = {'action': [0.0, 0.03333333333333333, 0.06666666666666667, 0.1, 0.13333333333333333, 0.16666666666666666, 0.2, 0.23333333333333334, 0.26666666666666666, 0.3, 0.3333333333333333, 0.36666666666666664, 0.4, 0.43333333333333335, 0.4666666666666667, 0.5, 0.5333333333333333, 0.5666666666666667, 0.6, 0.6333333333333333, 0.6666666666666666, 0.7, 0.7333333333333333, 0.7666666666666667, 0.8, 0.8333333333333334, 0.8666666666666667, 0.9, 0.9333333333333333, 0.9666666666666667, 1.0, 1.0333333333333334, 1.0666666666666667, 1.1, 1.1333333333333333, 1.1666666666666667, 1.2, 1.2333333333333334, 1.2666666666666666, 1.3, 1.3333333333333333, 1.3666666666666667, 1.4, 1.4333333333333333, 1.4666666666666666, 1.5, 1.5333333333333334, 1.5666666666666667, 1.6, 1.6333333333333333]}
dataset = LeRobotDataset(repo_id="local", root="/fsx/jade_choghari/outputs/pgen_annotations1", delta_timestamps=delta_timestamps)
# rename map --rename_map='{
# "observation.images.side": "observation.images.base_0_rgb",
# "observation.images.up": "observation.images.left_wrist_0_rgb"
# }'
rename_map = {
"observation.images.side": "observation.images.base_0_rgb",
"observation.images.up": "observation.images.left_wrist_0_rgb"
}
policy = make_policy(
cfg=cfg,
ds_meta=dataset.meta,
rename_map=rename_map,
)
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=4,
shuffle=True,
)
batch = next(iter(dataloader))
breakpoint()
batch = pre_processor(batch)
policy.train()
# run inference
# action = policy.select_action(batch)
loss, loss_dict = policy.forward(batch)
breakpoint()
# import requests
# from PIL import Image
# from transformers import AutoProcessor
# model = policy.model.paligemma_with_expert.paligemma
# model = model.to(device="cuda", dtype=torch.bfloat16)
# model.eval()
# prompt = "Describe this image."
# url = "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/pipeline-cat-chonk.jpeg"
# image = Image.open(requests.get(url, stream=True).raw)
# processor = AutoProcessor.from_pretrained(
# "google/paligemma-3b-pt-224",
# )
# inputs = processor(image, prompt, return_tensors="pt").to(model.device)
# print("generating...")
# output = model.generate(
# **inputs,
# max_new_tokens=50,
# use_cache=True, # default dynamic cache
# )
# print(processor.decode(output[0], skip_special_tokens=True))
# # other model
# from transformers import PaliGemmaForConditionalGeneration
# model = PaliGemmaForConditionalGeneration.from_pretrained(
# "google/paligemma2-3b-pt-224",
# torch_dtype=torch.bfloat16,
# device_map="auto",
# )
# model.eval()
# print("generating...")
# output = model.generate(
# **inputs,
# max_new_tokens=100,
# use_cache=True, # default dynamic cache
# )
# print("Model 2 output:")
# print(processor.decode(output[0], skip_special_tokens=True))
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,194 @@
#!/usr/bin/env python
# Copyright 2025 Physical Intelligence and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import deepcopy
from dataclasses import dataclass
from typing import Any
import numpy as np
import torch
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.policies.pi05_full.configuration_pi05 import PI05FullConfig
from lerobot.policies.pi05_full.modeling_pi05 import pad_vector
from lerobot.processor import (
ActionTokenizerProcessorStep,
AddBatchDimensionProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyProcessorPipeline,
ProcessorStep,
ProcessorStepRegistry,
RenameObservationsProcessorStep,
TokenizerProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action
from lerobot.processor.core import EnvTransition, TransitionKey
from lerobot.utils.constants import (
OBS_STATE,
POLICY_POSTPROCESSOR_DEFAULT_NAME,
POLICY_PREPROCESSOR_DEFAULT_NAME,
)
@ProcessorStepRegistry.register(name="pi05_full_prepare_state_tokenizer_processor_step")
@dataclass
class Pi05FullPrepareStateTokenizerProcessorStep(ProcessorStep):
"""
Processor step to prepare the state and tokenize the language input.
"""
max_state_dim: int = 32
task_key: str = "task"
subtask_key: str = "subtask"
def __call__(self, transition: EnvTransition) -> EnvTransition:
transition = transition.copy()
state = transition.get(TransitionKey.OBSERVATION, {}).get(OBS_STATE)
if state is None:
raise ValueError("State is required for PI05")
user_prompts = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get(self.task_key)
if user_prompts is None:
raise ValueError("No user prompts found in complementary data")
commands = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get(self.subtask_key)
# TODO: check if this necessary
state = deepcopy(state)
# Prepare state (pad to max_state_dim)
state = pad_vector(state, self.max_state_dim)
# State should already be normalized to [-1, 1] by the NormalizerProcessorStep that runs before this step
# Discretize into 256 bins (see openpi `PaligemmaTokenizer.tokenize()`)
state_np = state.cpu().numpy()
discretized_states = np.digitize(state_np, bins=np.linspace(-1, 1, 256 + 1)[:-1]) - 1
full_prompts = []
for i, user_prompt in enumerate(user_prompts):
cleaned_text = user_prompt.strip().replace("_", " ").replace("\n", " ")
cleaned_text = cleaned_text.lower() # all lowercase # NOTE: added by (jadechoghari)
state_str = " ".join(map(str, discretized_states[i]))
full_prompt = f"Task: {cleaned_text}, State: {state_str};\n"
full_prompts.append(full_prompt)
transition[TransitionKey.COMPLEMENTARY_DATA][self.task_key] = full_prompts
# process commands (optional)
if commands is not None:
full_commands = []
for i, command in enumerate(commands):
cleaned_text = command.strip().replace("_", " ").replace("\n", " ")
cleaned_text = cleaned_text.lower() # all lowercase # NOTE: added by (jadechoghari)
full_command = f"Subtask: {cleaned_text};\n"
full_commands.append(full_command)
transition[TransitionKey.COMPLEMENTARY_DATA][self.subtask_key] = full_commands
# note: action tokens will be processed in the ActionTokenizerProcessorStep
# Normalize state to [-1, 1] range if needed (assuming it's already normalized by normalizer processor step!!)
# Discretize into 256 bins (see openpi `PaligemmaTokenizer.tokenize()`)
return transition
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
"""
This step does not alter the feature definitions.
"""
return features
def make_pi05_full_pre_post_processors(
config: PI05FullConfig,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""
Constructs pre-processor and post-processor pipelines for the PI0 policy.
The pre-processing pipeline prepares input data for the model by:
1. Renaming features to match pretrained configurations.
2. Normalizing input and output features based on dataset statistics.
3. Adding a batch dimension.
4. Appending a newline character to the task description for tokenizer compatibility.
5. Tokenizing the text prompt using the PaliGemma tokenizer.
6. Moving all data to the specified device.
The post-processing pipeline handles the model's output by:
1. Moving data to the CPU.
2. Unnormalizing the output features to their original scale.
Args:
config: The configuration object for the PI0 policy.
dataset_stats: A dictionary of statistics for normalization.
preprocessor_kwargs: Additional arguments for the pre-processor pipeline.
postprocessor_kwargs: Additional arguments for the post-processor pipeline.
Returns:
A tuple containing the configured pre-processor and post-processor pipelines.
"""
# Add remaining processors
input_steps: list[ProcessorStep] = [
RenameObservationsProcessorStep(rename_map={}), # To mimic the same processor as pretrained one
AddBatchDimensionProcessorStep(),
# NOTE: NormalizerProcessorStep MUST come before Pi05PrepareStateTokenizerProcessorStep
# because the tokenizer step expects normalized state in [-1, 1] range for discretization
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
Pi05FullPrepareStateTokenizerProcessorStep(max_state_dim=config.max_state_dim),
TokenizerProcessorStep(
tokenizer_name=config.text_tokenizer_name,
max_length=config.tokenizer_max_length,
padding_side="right",
padding="max_length",
),
ActionTokenizerProcessorStep(
action_tokenizer_name=config.action_tokenizer_name,
max_action_tokens=config.max_action_tokens,
fast_skip_tokens=config.fast_skip_tokens,
paligemma_tokenizer_name=config.text_tokenizer_name,
),
DeviceProcessorStep(device=config.device),
]
output_steps: list[ProcessorStep] = [
UnnormalizerProcessorStep(
features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
name=POLICY_PREPROCESSOR_DEFAULT_NAME,
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps,
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
),
)
@@ -378,16 +378,16 @@ class SmolVLAPolicy(PreTrainedPolicy):
actions_is_pad = batch.get("actions_id_pad")
loss_dict = {}
losses = self.model.forward(images, img_masks, lang_tokens, lang_masks, state, actions, noise, time)
loss_dict["losses_after_forward"] = losses.clone().mean().item()
loss_dict["losses_after_forward"] = losses.clone()
if actions_is_pad is not None:
in_episode_bound = ~actions_is_pad
losses = losses * in_episode_bound.unsqueeze(-1)
loss_dict["losses_after_in_ep_bound"] = losses.clone().mean().item()
loss_dict["losses_after_in_ep_bound"] = losses.clone()
# Remove padding
losses = losses[:, :, : self.config.max_action_dim]
loss_dict["losses_after_rm_padding"] = losses.clone().mean().item()
loss_dict["losses_after_rm_padding"] = losses.clone()
if reduction == "none":
# Return per-sample losses (B,) by averaging over time and action dims
+3 -1
View File
@@ -171,9 +171,11 @@ def _extract_complementary_data(batch: dict[str, Any]) -> dict[str, Any]:
subtask_key = {"subtask": batch["subtask"]} if "subtask" in batch else {}
index_key = {"index": batch["index"]} if "index" in batch else {}
task_index_key = {"task_index": batch["task_index"]} if "task_index" in batch else {}
user_prompt_key = {"user_prompt": batch["user_prompt"]} if "user_prompt" in batch else {}
subtask_key = {"subtask": batch["subtask"]} if "subtask" in batch else {}
episode_index_key = {"episode_index": batch["episode_index"]} if "episode_index" in batch else {}
return {**pad_keys, **task_key, **subtask_key, **index_key, **task_index_key, **episode_index_key}
return {**pad_keys, **task_key, **index_key, **task_index_key, **episode_index_key, **user_prompt_key, **subtask_key}
def create_transition(
+6 -4
View File
@@ -17,7 +17,7 @@ from dataclasses import dataclass
import torch
from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.utils.constants import OBS_IMAGES, OBS_PREFIX, OBS_STATE, OBS_STR
from .pipeline import ObservationProcessorStep, ProcessorStepRegistry
@@ -92,7 +92,7 @@ class LiberoProcessorStep(ObservationProcessorStep):
# copy over non-STATE features
for ft, feats in features.items():
if ft != FeatureType.STATE:
if ft != PipelineFeatureType.STATE:
new_features[ft] = feats.copy()
# rebuild STATE features
@@ -100,11 +100,13 @@ class LiberoProcessorStep(ObservationProcessorStep):
# add our new flattened state
state_feats[OBS_STATE] = PolicyFeature(
type=FeatureType.STATE,
key=OBS_STATE,
shape=(8,), # [eef_pos(3), axis_angle(3), gripper(2)]
dtype="float32",
description=("Concatenated end-effector position (3), axis-angle (3), and gripper qpos (2)."),
)
new_features[FeatureType.STATE] = state_feats
new_features[PipelineFeatureType.STATE] = state_feats
return new_features
+89 -4
View File
@@ -37,6 +37,9 @@ from lerobot.utils.constants import (
OBS_LANGUAGE_SUBTASK_ATTENTION_MASK,
OBS_LANGUAGE_SUBTASK_TOKENS,
OBS_LANGUAGE_TOKENS,
OBS_LANGUAGE_USER_PROMPT,
OBS_LANGUAGE_USER_PROMPT_ATTENTION_MASK,
OBS_LANGUAGE_USER_PROMPT_TOKENS,
)
from lerobot.utils.import_utils import _transformers_available
@@ -141,6 +144,32 @@ class TokenizerProcessorStep(ObservationProcessorStep):
return None
def get_user_prompt(self, transition: EnvTransition) -> list[str] | None:
"""
Extracts the user_prompt from the transition's complementary data.
Args:
transition: The environment transition.
Returns:
A list of user_prompt strings, or None if the user_prompt key is not found or the value is None.
"""
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA)
if complementary_data is None:
return None
user_prompt = complementary_data.get("user_prompt")
if user_prompt is None:
return None
# Standardize to a list of strings for the tokenizer
if isinstance(user_prompt, str):
return [user_prompt]
elif isinstance(user_prompt, list) and all(isinstance(t, str) for t in user_prompt):
return user_prompt
return None
def get_subtask(self, transition: EnvTransition) -> list[str] | None:
"""
Extracts the subtask from the transition's complementary data.
@@ -169,16 +198,16 @@ class TokenizerProcessorStep(ObservationProcessorStep):
def observation(self, observation: RobotObservation) -> RobotObservation:
"""
Tokenizes the task description and adds it to the observation dictionary.
Tokenizes the task description and user_prompt (if available) and adds them to the observation dictionary.
This method retrieves the task, tokenizes it, moves the resulting tensors to the
This method retrieves the task and user_prompt, tokenizes them, moves the resulting tensors to the
same device as other data in the transition, and updates the observation.
Args:
observation: The original observation dictionary.
Returns:
The updated observation dictionary including token IDs and an attention mask.
The updated observation dictionary including token IDs and attention masks.
"""
task = self.get_task(self.transition)
if task is None:
@@ -204,11 +233,45 @@ class TokenizerProcessorStep(ObservationProcessorStep):
new_observation[OBS_LANGUAGE_TOKENS] = tokenized_prompt["input_ids"]
new_observation[OBS_LANGUAGE_ATTENTION_MASK] = tokenized_prompt["attention_mask"].to(dtype=torch.bool)
# Tokenize user_prompt if available
user_prompt = self.get_user_prompt(self.transition)
if user_prompt is not None:
tokenized_user_prompt = self._tokenize_text(user_prompt)
# Move new tokenized tensors to the detected device
if target_device is not None:
tokenized_user_prompt = {
k: v.to(target_device) if isinstance(v, torch.Tensor) else v
for k, v in tokenized_user_prompt.items()
}
# Add tokenized user_prompt to the observation
new_observation[OBS_LANGUAGE_USER_PROMPT_TOKENS] = tokenized_user_prompt["input_ids"]
new_observation[OBS_LANGUAGE_USER_PROMPT_ATTENTION_MASK] = tokenized_user_prompt["attention_mask"].to(dtype=torch.bool)
# Tokenize subtask if available
subtask = self.get_subtask(self.transition)
if subtask is not None:
tokenized_subtask = self._tokenize_text(subtask)
# Add EOS token at the end of each subtask sequence (before padding)
eos_token_id = self.input_tokenizer.eos_token_id
input_ids = tokenized_subtask["input_ids"]
attention_mask = tokenized_subtask["attention_mask"]
for i in range(input_ids.size(0)):
# Find the length of actual tokens (sum of attention mask)
seq_len = attention_mask[i].sum().item()
max_len = input_ids.size(1)
if seq_len >= max_len:
raise ValueError(
f"No room to append EOS: seq_len={seq_len} equals max_length={max_len}. "
"Increase max_length or tokenize with padding=False then pad after adding EOS."
)
# Add EOS token at the end
input_ids[i, seq_len] = eos_token_id
attention_mask[i, seq_len] = 1
# Move new tokenized tensors to the detected device
if target_device is not None:
tokenized_subtask = {
@@ -320,6 +383,28 @@ class TokenizerProcessorStep(ObservationProcessorStep):
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
# Add features for user_prompt tokens and attention mask if they don't already exist
if OBS_LANGUAGE_USER_PROMPT_TOKENS not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_USER_PROMPT_TOKENS] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
if OBS_LANGUAGE_USER_PROMPT_ATTENTION_MASK not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_USER_PROMPT_ATTENTION_MASK] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
# Add features for subtask tokens and attention mask if they don't already exist
if OBS_LANGUAGE_SUBTASK_TOKENS not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
if OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
return features
@@ -573,4 +658,4 @@ class ActionTokenizerProcessorStep(ActionProcessorStep):
Returns:
The updated dictionary of policy features.
"""
return features
return features
@@ -19,7 +19,6 @@ from functools import cached_property
from lerobot.processor import RobotAction, RobotObservation
from lerobot.robots.openarm_follower import OpenArmFollower, OpenArmFollowerConfig
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from ..robot import Robot
from .config_bi_openarm_follower import BiOpenArmFollowerConfig
@@ -113,7 +112,6 @@ class BiOpenArmFollower(Robot):
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@@ -135,7 +133,6 @@ class BiOpenArmFollower(Robot):
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
@check_if_not_connected
def get_observation(self) -> RobotObservation:
obs_dict = {}
@@ -149,7 +146,6 @@ class BiOpenArmFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(
self,
action: RobotAction,
@@ -174,7 +170,6 @@ class BiOpenArmFollower(Robot):
return {**prefixed_sent_action_left, **prefixed_sent_action_right}
@check_if_not_connected
def disconnect(self):
self.left_arm.disconnect()
self.right_arm.disconnect()
@@ -19,7 +19,6 @@ from functools import cached_property
from lerobot.processor import RobotAction, RobotObservation
from lerobot.robots.so_follower import SOFollower, SOFollowerRobotConfig
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from ..robot import Robot
from .config_bi_so_follower import BiSOFollowerConfig
@@ -97,7 +96,6 @@ class BiSOFollower(Robot):
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@@ -118,7 +116,6 @@ class BiSOFollower(Robot):
self.left_arm.setup_motors()
self.right_arm.setup_motors()
@check_if_not_connected
def get_observation(self) -> RobotObservation:
obs_dict = {}
@@ -132,7 +129,6 @@ class BiSOFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
# Remove "left_" prefix
left_action = {
@@ -152,7 +148,6 @@ class BiSOFollower(Robot):
return {**prefixed_sent_action_left, **prefixed_sent_action_right}
@check_if_not_connected
def disconnect(self):
self.left_arm.disconnect()
self.right_arm.disconnect()
@@ -23,7 +23,7 @@ from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
from lerobot.processor import RobotAction, RobotObservation
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..robot import Robot
from ..utils import ensure_safe_goal_position
@@ -119,7 +119,6 @@ class OpenArmFollower(Robot):
"""Check if robot is connected."""
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the robot and optionally calibrate.
@@ -127,6 +126,8 @@ class OpenArmFollower(Robot):
We assume that at connection time, the arms are in a safe rest position,
and torque can be safely disabled to run calibration if needed.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
logger.info(f"Connecting arm on {self.config.port}...")
@@ -218,7 +219,6 @@ class OpenArmFollower(Robot):
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
@check_if_not_connected
def get_observation(self) -> RobotObservation:
"""
Get current observation from robot including position, velocity, and torque.
@@ -228,6 +228,9 @@ class OpenArmFollower(Robot):
"""
start = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict: dict[str, Any] = {}
states = self.bus.sync_read_all_states()
@@ -250,7 +253,6 @@ class OpenArmFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(
self,
action: RobotAction,
@@ -270,6 +272,8 @@ class OpenArmFollower(Robot):
Returns:
The action actually sent (potentially clipped)
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
@@ -329,9 +333,10 @@ class OpenArmFollower(Robot):
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
@check_if_not_connected
def disconnect(self):
"""Disconnect from robot."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Disconnect CAN bus
self.bus.disconnect(self.config.disable_torque_on_disconnect)
+2 -2
View File
@@ -45,7 +45,7 @@ from dataclasses import dataclass, field
import draccus
from lerobot.utils.import_utils import _can_available
from lerobot.utils.import_utils import is_package_available
MOTOR_NAMES = {
0x01: "joint_1",
@@ -336,7 +336,7 @@ def run_speed(cfg: CANSetupConfig):
@draccus.wrap()
def setup_can(cfg: CANSetupConfig):
if not _can_available:
if not is_package_available("can"):
print("Error: python-can not installed. Install with: pip install python-can")
sys.exit(1)
+11 -1
View File
@@ -338,11 +338,21 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None):
# create dataloader for offline training
if hasattr(cfg.policy, "drop_n_last_frames"):
# loop over dataset subtask parquet file to find episode indices that don't have subtask index != -1
# valid_episode_list passed to episode_indexes_to_use
valid_episode_list = []
for episode_idx in range(len(dataset.meta.episodes)):
subtask_index = dataset[episode_idx]["subtask_index"]
if subtask_index != -1:
valid_episode_list.append(episode_idx)
episode_indices_to_use = valid_episode_list
shuffle = False
sampler = EpisodeAwareSampler(
dataset.meta.episodes["dataset_from_index"],
dataset.meta.episodes["dataset_to_index"],
episode_indices_to_use=dataset.episodes,
episode_indices_to_use=episode_indices_to_use,
drop_n_last_frames=cfg.policy.drop_n_last_frames,
shuffle=True,
)
@@ -166,9 +166,9 @@ def apply_normalization(
if q01 is None or q99 is None:
raise ValueError("QUANTILES mode requires 'q01' and 'q99' in stats")
denom = np.maximum(q99 - q01, eps)
# No clipping: match training pipeline NormalizerProcessorStep so tokenizer
# is fit on the full range of normalized values (including tails outside [-1, 1]).
return 2.0 * (data - q01) / denom - 1.0
# Clip to quantile range then normalize to [-1, 1]
clipped = np.clip(data, q01, q99)
return 2.0 * (clipped - q01) / denom - 1.0
if mode == NormalizationMode.QUANTILE10:
q10 = stats.get("q10")
@@ -176,8 +176,9 @@ def apply_normalization(
if q10 is None or q90 is None:
raise ValueError("QUANTILE10 mode requires 'q10' and 'q90' in stats")
denom = np.maximum(q90 - q10, eps)
# No clipping: match training pipeline NormalizerProcessorStep.
return 2.0 * (data - q10) / denom - 1.0
# Clip to quantile range then normalize to [-1, 1]
clipped = np.clip(data, q10, q90)
return 2.0 * (clipped - q10) / denom - 1.0
raise ValueError(f"Unsupported normalization mode: {mode}")
@@ -305,7 +306,7 @@ def train_fast_tokenizer(
# download the tokenizer source code (not pretrained weights)
# we'll train a new tokenizer on our own data
base_tokenizer = AutoProcessor.from_pretrained("/fsx/jade_choghari/outputs/libero_tokenizer_wavetoken1", trust_remote_code=True)
base_tokenizer = AutoProcessor.from_pretrained("physical-intelligence/fast", trust_remote_code=True)
# convert action_chunks array to list of arrays (expected by .fit())
action_data_list = [action_chunks[i] for i in range(len(action_chunks))]
@@ -319,8 +320,6 @@ def train_fast_tokenizer(
vocab_size=vocab_size,
time_horizon=action_chunks.shape[1], # action_horizon
action_dim=action_chunks.shape[2], # encoded dimensions
wavelet="dmey",
level=1,
)
print("✓ Tokenizer training complete!")
@@ -19,7 +19,6 @@ from functools import cached_property
from lerobot.processor import RobotAction
from lerobot.teleoperators.openarm_leader import OpenArmLeaderConfig
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from ..openarm_leader import OpenArmLeader
from ..teleoperator import Teleoperator
@@ -89,7 +88,6 @@ class BiOpenArmLeader(Teleoperator):
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@@ -111,7 +109,6 @@ class BiOpenArmLeader(Teleoperator):
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
@check_if_not_connected
def get_action(self) -> RobotAction:
action_dict = {}
@@ -129,7 +126,6 @@ class BiOpenArmLeader(Teleoperator):
# TODO: Implement force feedback
raise NotImplementedError
@check_if_not_connected
def disconnect(self) -> None:
self.left_arm.disconnect()
self.right_arm.disconnect()
@@ -18,7 +18,7 @@ import logging
from functools import cached_property
from lerobot.teleoperators.so_leader import SOLeaderTeleopConfig
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.decorators import check_if_not_connected
from ..so_leader import SOLeader
from ..teleoperator import Teleoperator
@@ -72,7 +72,6 @@ class BiSOLeader(Teleoperator):
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@@ -111,7 +110,6 @@ class BiSOLeader(Teleoperator):
# TODO: Implement force feedback
raise NotImplementedError
@check_if_not_connected
def disconnect(self) -> None:
self.left_arm.disconnect()
self.right_arm.disconnect()
@@ -21,7 +21,7 @@ from typing import Any
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
from lerobot.processor import RobotAction
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .config_openarm_leader import OpenArmLeaderConfig
@@ -84,7 +84,6 @@ class OpenArmLeader(Teleoperator):
"""Check if teleoperator is connected."""
return self.bus.is_connected
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the teleoperator.
@@ -92,6 +91,8 @@ class OpenArmLeader(Teleoperator):
For manual control, we disable torque after connecting so the
arm can be moved by hand.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
logger.info(f"Connecting arm on {self.config.port}...")
@@ -182,7 +183,6 @@ class OpenArmLeader(Teleoperator):
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
@check_if_not_connected
def get_action(self) -> RobotAction:
"""
Get current action from the leader arm.
@@ -193,6 +193,8 @@ class OpenArmLeader(Teleoperator):
Reads all motor states (pos/vel/torque) in one CAN refresh cycle.
"""
start = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
action_dict: dict[str, Any] = {}
@@ -212,9 +214,10 @@ class OpenArmLeader(Teleoperator):
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm leader.")
@check_if_not_connected
def disconnect(self) -> None:
"""Disconnect from teleoperator."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Disconnect CAN bus
# For manual control, ensure torque is disabled before disconnecting
+101
View File
@@ -26,6 +26,9 @@ OBS_IMAGES = OBS_IMAGE + "s"
OBS_LANGUAGE = OBS_STR + ".language"
OBS_LANGUAGE_TOKENS = OBS_LANGUAGE + ".tokens"
OBS_LANGUAGE_ATTENTION_MASK = OBS_LANGUAGE + ".attention_mask"
OBS_LANGUAGE_USER_PROMPT = OBS_STR + ".user_prompt"
OBS_LANGUAGE_USER_PROMPT_TOKENS = OBS_LANGUAGE_USER_PROMPT + ".tokens"
OBS_LANGUAGE_USER_PROMPT_ATTENTION_MASK = OBS_LANGUAGE_USER_PROMPT_TOKENS + ".attention_mask"
OBS_LANGUAGE_SUBTASK = OBS_STR + ".subtask"
OBS_LANGUAGE_SUBTASK_TOKENS = OBS_LANGUAGE_SUBTASK + ".tokens"
OBS_LANGUAGE_SUBTASK_ATTENTION_MASK = OBS_LANGUAGE_SUBTASK + ".attention_mask"
@@ -89,3 +92,101 @@ LIBERO_KEY_JOINTS_POS = "robot_state/joints/pos"
LIBERO_KEY_JOINTS_VEL = "robot_state/joints/vel"
LIBERO_KEY_PIXELS_AGENTVIEW = "pixels/agentview_image"
LIBERO_KEY_PIXELS_EYE_IN_HAND = "pixels/robot0_eye_in_hand_image"
# Skill segmentation prompt template for VLM-based subtask annotation
# Placeholders: {goal_context}, {subtask_labels_section}
# When subtask_labels are provided, use format_subtask_labels_section() to fill {subtask_labels_section}.
def format_subtask_labels_section(subtask_labels: list[str]) -> str:
"""Format a list of subtask labels for insertion into SKILL_SEGMENTATION_PROMPT_TEMPLATE.
The model will be instructed to choose only from these labels.
"""
if not subtask_labels:
return ""
return "\n".join(f' "{label}",' for label in subtask_labels).rstrip(",")
SKILL_SEGMENTATION_PROMPT_TEMPLATE = """# Role
You are a Robotics Vision System specializing in temporal action segmentation for robot manipulation demonstrations.
# Video duration (critical)
The total video length is **{video_duration_seconds} seconds** ({video_duration_mm_ss}). All "start" and "end" values in your JSON must be numeric seconds in the range [0.0, {video_duration_seconds}]. The last skill's "end" must be exactly **{video_duration_seconds}**. Do not stop earlier.
# Task
{goal_context}Segment this robot demonstration video into short atomic manipulation skills. Each skill should:
- Last approximately 1-3 seconds (or longer if the action takes longer)
- Describe a clear, single action (e.g., "pick up object", "move arm left", "release gripper")
- Have precise start and end timestamps in seconds (float)
# Requirements
1. **Atomic Actions**: Each skill should be a single, indivisible action
2. **Complete Coverage**: Skills must cover the entire video from 0.0 to {video_duration_seconds} seconds with no gaps
3. **Boundary Consistency**: The end of one skill equals the start of the next
4. **Natural Language**: Use clear, descriptive names for each skill
5. **Timestamps**: Use seconds as floats (e.g. 12.5) for all timestamps; the last "end" must be {video_duration_seconds}. If the video has a visible timer in the corner showing elapsed time in seconds, use it to report accurate start and end times for each skill.
# Subtask Label Set (Closed Vocabulary)
You MUST strictly identify the video segments using ONLY the following labels. Do not create new labels or modify existing ones:
[
{subtask_labels_section}
]
The video shows one successful execution of all subtasks in a logical order.
# Ground-Truth Semantics (Very Important)
Use **visual state changes** to define when a subtask starts and ends. Do NOT assume equal durations for the subtasks.
- A subtask **starts** at the first frame where the robot's motion clearly initiates that subtask.
- A subtask **ends** at the first frame where that specific action is visually completed and the manipulated object reaches a temporary, stable configuration.
If there are short pauses or micro-motions that don't clearly correspond to a new subtask, they belong to the **current** subtask.
# Hard Constraints & Logic
1. **Continuous Coverage (No Gaps):**
- The entire video from 0.0 to {video_duration_seconds} seconds must be covered by subtasks.
- There can be no gaps between subtasks.
- If there is any idle or ambiguous time between clear actions, extend the *preceding* subtask to cover it.
2. **Boundary Consistency:**
- The `"end"` timestamp of one subtask must be exactly equal to the `"start"` timestamp of the next subtask.
- Boundaries must coincide with a real visual state transition, not just a convenient time split.
3. **Chronological Order, One Occurrence Each:**
- This is a single successful demonstration.
- Each subtask from the vocabulary appears **exactly once**, in the correct logical order.
- **Durations may be very different** between subtasks. Never assume they are similar lengths. Base all boundaries only on the video.
4. **Reject Uniform Segmentation (Important):**
- Do NOT simply divide the video into equal or nearly equal time chunks.
- If your boundaries would result in subtasks with similar durations (e.g. all around 5 seconds), treat this as evidence that your segmentation is wrong and refine the boundaries.
- Only use nearly equal durations if the video truly shows each subtask taking the same amount of time (this is very rare).
5. **Timestamps (critical):**
- Use numeric seconds (float) in the JSON, e.g. 0.0, 5.2, 12.8.
- The first subtask always starts at 0.0.
- The last subtask must end at exactly {video_duration_seconds} (the full video length).
- **Time is displayed inside the video**: a visible timer in one corner shows the elapsed time in seconds (from 0.0 to the end). Use this on-screen timer to set accurate start and end times for each skill.
Format this as a bullet list.
# Output Format
output ONLY valid JSON with this exact structure. The last skill's "end" MUST be exactly {video_duration_seconds}. Use the timestamps you read from the visible timer in the video:
```json
{{
"skills": [
{{"name": "first skill", "start": 0.0, "end": 5.0}},
{{"name": "second skill", "start": 5.0, "end": 12.0}},
{{"name": "last skill", "start": 12.0, "end": {video_duration_seconds}}}
]
}}
```
The first skill must start at 0.0 and the last skill must end at **{video_duration_seconds}** (the total video duration in seconds).
# Strict Structural Rule
This video contains exactly 4 subtasks.
You MUST output exactly 4 segments.
Each segment must use a unique label from the vocabulary.
No label may be repeated.
"""