Compare commits

...

6 Commits

Author SHA1 Message Date
pepijn 1f658023f1 add trim_episode_start dataset edit operation
Add a dataset edit operation to trim the first N seconds from episodes while rebuilding frame and episode indices and metadata consistently. Skip episodes that are too short to trim and cover parsing plus metadata invariants with focused tests.

Made-with: Cursor
2026-03-06 14:00:15 +00:00
Steven Palma a225127527 chore(dependencies): sync intelrealsense + added notes (#3094) 2026-03-06 10:50:46 +01:00
Steven Palma e489ba24fc feat(dependencies): require Python 3.12+ as minimum version (#3023)
* feat(dependecies): upgrade to python3.12

* fix(test): processor regex message

* fix(test): processor regex message

* fix(dependecies): resolve all tags in python 3.12

* fix(dependecies): add more hints to faster resolve

* chore(dependecies): remove cli tag huggingface-hub dep

* refactor(policy): update eagle for python3.12

* chore(docs): update policy creation for python 3.12

* chore(test): skip failing tests in macos
2026-03-06 10:15:13 +01:00
Steven Palma d324ffe810 fix(ci): test only multi-gpu tests in multi-gpu runner (#3092) 2026-03-05 19:53:40 +01:00
Pepijn 1a24f770d3 Feat/slurm compute rabc script (#3041)
* Add SLURM SARM progress annotation script.

Provide a standalone two-stage compute/aggregate pipeline for RA-BC progress generation so large datasets can be processed in parallel and optionally uploaded to the Hub.

Made-with: Cursor

* fix pr comments

* remove comments
2026-03-05 18:27:58 +01:00
Caroline Pascal 92fba37225 fix(num_frames): fixing redundant frames count in conversion script (#3091) 2026-03-05 15:49:50 +01:00
39 changed files with 1065 additions and 110 deletions
+1 -1
View File
@@ -44,7 +44,7 @@ permissions:
# Sets up the environment variables
env:
UV_VERSION: "0.8.0"
PYTHON_VERSION: "3.10"
PYTHON_VERSION: "3.12"
# Ensures that only the latest commit for a PR or branch is built, canceling older runs.
concurrency:
+2 -2
View File
@@ -37,7 +37,7 @@ permissions:
# Sets up the environment variables
env:
UV_VERSION: "0.8.0"
PYTHON_VERSION: "3.10"
PYTHON_VERSION: "3.12"
DOCKER_IMAGE_NAME: huggingface/lerobot-gpu
# Ensures that only the latest action is built, canceling older runs.
@@ -185,7 +185,7 @@ jobs:
hf auth login --token "$HF_USER_TOKEN" --add-to-git-credential
hf auth whoami
- name: Fix ptxas permissions
run: chmod +x /lerobot/.venv/lib/python3.10/site-packages/triton/backends/nvidia/bin/ptxas
run: chmod +x /lerobot/.venv/lib/python3.12/site-packages/triton/backends/nvidia/bin/ptxas
- name: Run pytest on GPU
run: pytest tests -vv --maxfail=10
- name: Run end-to-end tests
+2 -3
View File
@@ -28,7 +28,7 @@ on:
# Sets up the environment variables
env:
UV_VERSION: "0.8.0"
PYTHON_VERSION: "3.10"
PYTHON_VERSION: "3.12"
DOCKER_IMAGE_NAME_CPU: huggingface/lerobot-cpu:latest
DOCKER_IMAGE_NAME_GPU: huggingface/lerobot-gpu:latest
@@ -206,5 +206,4 @@ jobs:
python -c "import torch; print(f'PyTorch CUDA available: {torch.cuda.is_available()}'); print(f'Number of GPUs: {torch.cuda.device_count()}')"
- name: Run multi-GPU training tests
# TODO(Steven): Investigate why motors tests are failing in multi-GPU setup
run: pytest tests -vv --maxfail=10 --ignore=tests/motors/
run: pytest -vv tests/training/
+1 -1
View File
@@ -50,7 +50,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.10'
python-version: '3.12'
- name: Run pre-commit hooks
uses: pre-commit/action@v3.0.1 # zizmor: ignore[unpinned-uses]
+2 -2
View File
@@ -22,7 +22,7 @@ on:
# Sets up the environment variables
env:
UV_VERSION: "0.8.0"
PYTHON_VERSION: "3.10"
PYTHON_VERSION: "3.12"
jobs:
# This job builds the Python package and publishes it to PyPI
@@ -45,7 +45,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.10'
python-version: '3.12'
- name: Extract Version
id: extract_info
+1 -1
View File
@@ -29,7 +29,7 @@ permissions:
# Sets up the environment variables
env:
UV_VERSION: "0.8.0"
PYTHON_VERSION: "3.10"
PYTHON_VERSION: "3.12"
DOCKER_IMAGE_NAME: huggingface/lerobot-gpu:unbound
# Ensures that only the latest action is built, canceling older runs.
+2 -2
View File
@@ -13,7 +13,7 @@
# limitations under the License.
default_language_version:
python: python3.10
python: python3.12
exclude: "tests/artifacts/.*\\.safetensors$"
@@ -55,7 +55,7 @@ repos:
rev: v3.21.0
hooks:
- id: pyupgrade
args: [--py310-plus]
args: [--py312-plus]
##### Markdown Quality #####
- repo: https://github.com/rbubley/mirrors-prettier
+1 -1
View File
@@ -24,7 +24,7 @@ ARG OS_VERSION=22.04
FROM nvidia/cuda:${CUDA_VERSION}-base-ubuntu${OS_VERSION}
# Define Python version argument
ARG PYTHON_VERSION=3.10
ARG PYTHON_VERSION=3.12
# Configure environment variables
ENV DEBIAN_FRONTEND=noninteractive \
+1 -1
View File
@@ -19,7 +19,7 @@
# docker run -it --rm lerobot-user
# Configure the base image
ARG PYTHON_VERSION=3.10
ARG PYTHON_VERSION=3.12
FROM python:${PYTHON_VERSION}-slim
# Configure environment variables
+4 -4
View File
@@ -32,7 +32,7 @@ version = "0.1.0"
dependencies = [
# your policy-specific dependencies
]
requires-python = ">= 3.11"
requires-python = ">= 3.12"
[build-system]
build-backend = # your-build-backend
@@ -82,7 +82,7 @@ Create your policy implementation by inheriting from LeRobot's base `PreTrainedP
# modeling_my_custom_policy.py
import torch
import torch.nn as nn
from typing import Dict, Any
from typing import Any
from lerobot.policies.pretrained import PreTrainedPolicy
from .configuration_my_custom_policy import MyCustomPolicyConfig
@@ -91,7 +91,7 @@ class MyCustomPolicy(PreTrainedPolicy):
config_class = MyCustomPolicyConfig
name = "my_custom_policy"
def __init__(self, config: MyCustomPolicyConfig, dataset_stats: Dict[str, Any] = None):
def __init__(self, config: MyCustomPolicyConfig, dataset_stats: dict[str, Any] = None):
super().__init__(config, dataset_stats)
...
```
@@ -102,7 +102,7 @@ Create processor functions:
```python
# processor_my_custom_policy.py
from typing import Dict, Any
from typing import Any
import torch
+1 -1
View File
@@ -13,7 +13,7 @@ The EarthRover Mini Plus is a fully open source mobile robot that connects throu
### Hardware
- EarthRover Mini robot
- Computer with Python 3.10 or newer
- Computer with Python 3.12 or newer
- Internet connection
### Setting Up the Frodobots SDK
+3 -3
View File
@@ -1,6 +1,6 @@
# Installation
This guide uses conda (via miniforge) to manage environments. If you prefer another environment manager (e.g. `uv`, `venv`), ensure you have Python >=3.10 and ffmpeg installed with the `libsvtav1` encoder, then skip ahead to [Install LeRobot](#step-3-install-lerobot-).
This guide uses conda (via miniforge) to manage environments. If you prefer another environment manager (e.g. `uv`, `venv`), ensure you have Python >=3.12 and ffmpeg installed with the `libsvtav1` encoder, then skip ahead to [Install LeRobot](#step-3-install-lerobot-).
## Step 1: Install [`miniforge`](https://conda-forge.org/download/)
@@ -11,10 +11,10 @@ bash Miniforge3-$(uname)-$(uname -m).sh
## Step 2: Environment Setup
Create a virtual environment with Python 3.10, using conda:
Create a virtual environment with Python 3.12, using conda:
```bash
conda create -y -n lerobot python=3.10
conda create -y -n lerobot python=3.12
```
Then activate your conda environment, you have to do this each time you open a shell to use lerobot:
+2 -2
View File
@@ -123,7 +123,7 @@ SSH into the robot and install LeRobot:
```bash
ssh unitree@<YOUR_ROBOT_IP>
conda create -y -n lerobot python=3.10
conda create -y -n lerobot python=3.12
conda activate lerobot
git clone https://github.com/huggingface/lerobot.git
cd lerobot
@@ -153,7 +153,7 @@ With the robot server running, you can now control the robot remotely. Let's lau
### Step 1: Install LeRobot on your machine
```bash
conda create -y -n lerobot python=3.10
conda create -y -n lerobot python=3.12
conda activate lerobot
git clone https://github.com/huggingface/lerobot.git
cd lerobot
+490
View File
@@ -0,0 +1,490 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SLURM-distributed SARM RA-BC annotation pipeline.
Computes SARM progress values for all frames in a dataset, distributed across
SLURM workers, then merges the shards into a single sarm_progress.parquet.
Two subcommands, each a separate SLURM submission:
compute N workers, each computes progress for a subset of episodes
aggregate 1 worker, merges N shards into sarm_progress.parquet, pushes to hub
Usage:
python slurm_compute_rabc.py compute \\
--repo-id user/dataset --reward-model-path user/sarm_model \\
--stride 10 --device cpu --workers 50 --partition cpu
python slurm_compute_rabc.py aggregate \\
--repo-id user/dataset --reward-model-path user/sarm_model \\
--partition cpu --push-to-hub
"""
import argparse
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
class ComputeProgressShards(PipelineStep):
"""Each worker computes SARM progress for its assigned episodes."""
def __init__(
self, repo_id, reward_model_path, stride=1, head_mode="sparse", device="cpu", shard_dir="rabc_shards"
):
super().__init__()
if stride < 1:
raise ValueError(f"stride must be >= 1, got {stride}")
self.repo_id = repo_id
self.reward_model_path = reward_model_path
self.stride = stride
self.head_mode = head_mode
self.device = device
self.shard_dir = shard_dir
def run(self, data=None, rank: int = 0, world_size: int = 1):
import logging
from pathlib import Path
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from tqdm import tqdm
from lerobot.policies.sarm.compute_rabc_weights import (
generate_all_frame_indices,
interpolate_progress,
load_sarm_resources,
)
from lerobot.utils.utils import init_logging
init_logging()
dataset, reward_model, preprocess = load_sarm_resources(
self.repo_id,
self.reward_model_path,
self.device,
)
if hasattr(preprocess, "eval"):
preprocess.eval()
for step in preprocess.steps:
if hasattr(step, "eval"):
step.eval()
image_key = reward_model.config.image_key
state_key = reward_model.config.state_key
frame_gap = reward_model.config.frame_gap
center_idx = reward_model.config.n_obs_steps // 2
dual_mode = reward_model.config.uses_dual_heads
compute_sparse = self.head_mode in ("sparse", "both") or not dual_mode
compute_dense = self.head_mode in ("dense", "both") and dual_mode
my_episodes = list(range(dataset.num_episodes))[rank::world_size]
if not my_episodes:
logging.info(f"Rank {rank}: no episodes assigned")
return
logging.info(f"Rank {rank}: {len(my_episodes)} / {dataset.num_episodes} episodes")
all_rows = []
for ep_idx in tqdm(my_episodes, desc=f"Rank {rank}"):
ep = dataset.meta.episodes[ep_idx]
ep_start, ep_end = ep["dataset_from_index"], ep["dataset_to_index"]
task = dataset[ep_start].get("task", "perform the task")
all_ep_indices = generate_all_frame_indices(ep_start, ep_end, frame_gap)
if self.stride > 1:
compute_indices = [i for i in all_ep_indices if (i - ep_start) % self.stride == 0]
if (ep_end - 1) not in compute_indices:
compute_indices.append(ep_end - 1)
compute_indices = sorted(set(compute_indices))
else:
compute_indices = all_ep_indices
frame_results = {}
for qi in tqdm(compute_indices, desc=f" Ep {ep_idx}", leave=False):
try:
sample = dataset[qi]
batch = {
image_key: sample[image_key],
"task": task,
"index": qi,
"episode_index": ep_idx,
}
if state_key in sample:
batch[state_key] = sample[state_key]
with torch.no_grad():
processed = preprocess(batch)
vf = processed["video_features"].to(self.device)
tf = processed["text_features"].to(self.device)
sf = processed.get("state_features")
if sf is not None:
sf = sf.to(self.device)
lengths = processed.get("lengths")
sparse_val = dense_val = np.nan
if compute_sparse:
r = reward_model.calculate_rewards(
text_embeddings=tf,
video_embeddings=vf,
state_features=sf,
lengths=lengths,
return_all_frames=True,
head_mode="sparse",
)
sparse_val = float(r[0, center_idx] if r.ndim == 2 else r[center_idx])
if compute_dense:
r = reward_model.calculate_rewards(
text_embeddings=tf,
video_embeddings=vf,
state_features=sf,
lengths=lengths,
return_all_frames=True,
head_mode="dense",
)
dense_val = float(r[0, center_idx] if r.ndim == 2 else r[center_idx])
frame_results[qi] = (sparse_val, dense_val)
except Exception as e:
logging.warning(f"Failed frame {qi}: {e}")
if not frame_results:
logging.warning(f"Episode {ep_idx}: all frames failed, skipping")
continue
# Interpolate to all frames in this episode
computed_idx = np.array(sorted(frame_results.keys()))
all_frame_arr = np.arange(ep_start, ep_end)
sparse_vals = np.array([frame_results[i][0] for i in computed_idx]) if compute_sparse else None
dense_vals = np.array([frame_results[i][1] for i in computed_idx]) if compute_dense else None
if self.stride > 1 and len(computed_idx) > 1:
if compute_sparse:
sparse_vals = interpolate_progress(computed_idx, sparse_vals, all_frame_arr)
if compute_dense:
dense_vals = interpolate_progress(computed_idx, dense_vals, all_frame_arr)
output_frames = all_frame_arr
else:
# Use only successfully computed frames to avoid indexing mismatch on failures
output_frames = computed_idx
for i, fi in enumerate(output_frames):
row = {"index": int(fi), "episode_index": ep_idx, "frame_index": int(fi - ep_start)}
if compute_sparse:
row["progress_sparse"] = float(sparse_vals[i])
if compute_dense:
row["progress_dense"] = float(dense_vals[i])
all_rows.append(row)
if all_rows:
import pandas as pd
df = pd.DataFrame(all_rows).sort_values("index").reset_index(drop=True)
table = pa.Table.from_pandas(df, preserve_index=False)
table = table.replace_schema_metadata({b"reward_model_path": self.reward_model_path.encode()})
shard_dir = Path(self.shard_dir)
shard_dir.mkdir(parents=True, exist_ok=True)
out = shard_dir / f"shard_{rank:05d}.parquet"
pq.write_table(table, out)
logging.info(f"Rank {rank}: saved {len(df)} rows to {out}")
class AggregateProgress(PipelineStep):
"""Merge all shard parquets into final sarm_progress.parquet."""
def __init__(self, repo_id, reward_model_path, shard_dir="rabc_shards", push_to_hub=False):
super().__init__()
self.repo_id = repo_id
self.reward_model_path = reward_model_path
self.shard_dir = shard_dir
self.push_to_hub = push_to_hub
def run(self, data=None, rank: int = 0, world_size: int = 1):
import datetime
import logging
import os
from pathlib import Path
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.utils import init_logging
init_logging()
if rank != 0:
return
shard_dir = Path(self.shard_dir)
shards = sorted(shard_dir.glob("shard_*.parquet"))
if not shards:
raise FileNotFoundError(f"No shards found in {shard_dir}")
# Log shard modification time range to help detect stale files
mtimes = [os.path.getmtime(s) for s in shards]
oldest = datetime.datetime.fromtimestamp(min(mtimes)).isoformat(timespec="seconds")
newest = datetime.datetime.fromtimestamp(max(mtimes)).isoformat(timespec="seconds")
logging.info(f"Aggregating {len(shards)} shards (oldest: {oldest}, newest: {newest})")
df = pd.concat([pd.read_parquet(s) for s in shards], ignore_index=True)
df = df.sort_values("index").reset_index(drop=True)
table = pa.Table.from_pandas(df, preserve_index=False)
table = table.replace_schema_metadata({b"reward_model_path": self.reward_model_path.encode()})
temp_ds = LeRobotDataset(self.repo_id, download_videos=False)
out_path = Path(temp_ds.root) / "sarm_progress.parquet"
out_path.parent.mkdir(parents=True, exist_ok=True)
pq.write_table(table, out_path)
logging.info(f"Saved {len(df)} rows to {out_path}")
for col in ["progress_sparse", "progress_dense"]:
if col in df.columns:
v = df[col].dropna()
logging.info(
f"{col}: mean={v.mean():.4f} std={v.std():.4f} min={v.min():.4f} max={v.max():.4f}"
)
if self.push_to_hub:
from huggingface_hub import HfApi
api = HfApi()
hub_path = "sarm_progress.parquet"
logging.info(f"Uploading to {self.repo_id}/{hub_path}")
api.upload_file(
path_or_fileobj=str(out_path),
path_in_repo=hub_path,
repo_id=self.repo_id,
repo_type="dataset",
)
logging.info(f"Uploaded: https://huggingface.co/datasets/{self.repo_id}/blob/main/{hub_path}")
def make_compute_executor(
repo_id,
reward_model_path,
stride,
head_mode,
device,
shard_dir,
logs_dir,
job_name,
slurm,
workers,
partition,
cpus_per_task,
mem_per_cpu,
):
kwargs = {
"pipeline": [
ComputeProgressShards(repo_id, reward_model_path, stride, head_mode, device, str(shard_dir)),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": workers,
"workers": workers,
"time": "24:00:00",
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
return SlurmPipelineExecutor(**kwargs)
kwargs.update({"tasks": workers, "workers": 1})
return LocalPipelineExecutor(**kwargs)
def make_aggregate_executor(
repo_id,
reward_model_path,
shard_dir,
logs_dir,
job_name,
slurm,
partition,
cpus_per_task,
mem_per_cpu,
push_to_hub,
):
kwargs = {
"pipeline": [
AggregateProgress(repo_id, reward_model_path, str(shard_dir), push_to_hub),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": 1,
"workers": 1,
"time": "02:00:00",
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
return SlurmPipelineExecutor(**kwargs)
kwargs.update({"tasks": 1, "workers": 1})
return LocalPipelineExecutor(**kwargs)
def _add_shared_args(p):
p.add_argument(
"--repo-id",
type=str,
required=True,
help="Hugging Face repository identifier, e.g. 'user/dataset'.",
)
p.add_argument(
"--shard-dir",
type=Path,
default=Path("rabc_shards"),
help="Directory to read/write per-rank parquet shards.",
)
p.add_argument(
"--logs-dir",
type=Path,
default=Path("logs"),
help="Directory for datatrove logs.",
)
p.add_argument(
"--job-name",
type=str,
default=None,
help="SLURM job name (defaults to rabc_<subcommand>).",
)
p.add_argument(
"--slurm",
type=int,
default=1,
help="1 = submit via SLURM; 0 = run locally (useful for debugging).",
)
p.add_argument(
"--partition",
type=str,
default=None,
help="SLURM partition to submit to.",
)
p.add_argument(
"--cpus-per-task",
type=int,
default=4,
help="Number of CPUs per SLURM task.",
)
p.add_argument(
"--mem-per-cpu",
type=str,
default="4G",
help="Memory per CPU, e.g. '4G' or '1950M'.",
)
def main():
parser = argparse.ArgumentParser(
description="SLURM-distributed SARM RA-BC annotation pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
sub = parser.add_subparsers(dest="command", required=True)
# compute subcommand
cp = sub.add_parser(
"compute",
help="Distribute progress computation across SLURM workers.",
)
_add_shared_args(cp)
cp.add_argument(
"--reward-model-path",
type=str,
required=True,
help="Path or HF repo id of the SARM reward model.",
)
cp.add_argument(
"--stride",
type=int,
default=1,
help="Compute every Nth frame; intermediate frames are interpolated (must be >= 1).",
)
cp.add_argument(
"--head-mode",
type=str,
default="sparse",
choices=["sparse", "dense", "both"],
help="Which reward head(s) to compute.",
)
cp.add_argument(
"--device",
type=str,
default="cpu",
help="Device for reward model inference, e.g. 'cpu' or 'cuda'.",
)
cp.add_argument(
"--workers",
type=int,
default=50,
help="Number of parallel SLURM tasks (one shard per worker).",
)
# aggregate subcommand
ap = sub.add_parser(
"aggregate",
help="Merge per-rank shards into a single sarm_progress.parquet.",
)
_add_shared_args(ap)
ap.add_argument(
"--reward-model-path",
type=str,
required=True,
help="Path or HF repo id of the SARM reward model (stored in parquet metadata).",
)
ap.add_argument(
"--push-to-hub",
action="store_true",
help="Upload sarm_progress.parquet to the Hugging Face Hub after aggregation.",
)
args = parser.parse_args()
job_name = args.job_name or f"rabc_{args.command}"
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
kwargs["job_name"] = job_name
command = kwargs.pop("command")
executor = make_compute_executor(**kwargs) if command == "compute" else make_aggregate_executor(**kwargs)
executor.run()
if __name__ == "__main__":
main()
+33 -23
View File
@@ -29,7 +29,7 @@ version = "0.4.5"
description = "🤗 LeRobot: State-of-the-art Machine Learning for Real-World Robotics in Pytorch"
dynamic = ["readme"]
license = { text = "Apache-2.0" }
requires-python = ">=3.10"
requires-python = ">=3.12"
authors = [
{ name = "Rémi Cadène", email = "re.cadene@gmail.com" },
{ name = "Simon Alibert", email = "alibert.sim@gmail.com" },
@@ -50,7 +50,8 @@ classifiers = [
"Intended Audience :: Education",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Build Tools",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
]
@@ -61,26 +62,28 @@ dependencies = [
# Hugging Face dependencies
"datasets>=4.0.0,<5.0.0",
"diffusers>=0.27.2,<0.36.0",
"huggingface-hub[cli]>=1.0.0,<2.0.0",
"huggingface-hub>=1.0.0,<2.0.0",
"accelerate>=1.10.0,<2.0.0",
# Core dependencies
"numpy>=2.0.0,<2.3.0", # NOTE: Explicitly listing numpy helps the resolver converge faster. Upper bound imposed by opencv-python-headless.
"setuptools>=71.0.0,<81.0.0",
"cmake>=3.29.0.1,<4.2.0",
"packaging>=24.2,<26.0",
"torch>=2.2.1,<2.11.0",
"torchcodec>=0.2.1,<0.11.0; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')",
"torchvision>=0.21.0,<0.26.0",
"einops>=0.8.0,<0.9.0",
"opencv-python-headless>=4.9.0,<4.13.0",
"av>=15.0.0,<16.0.0",
"jsonlines>=4.0.0,<5.0.0",
"packaging>=24.2,<26.0",
"pynput>=1.7.7,<1.9.0",
"pynput>=1.7.8,<1.9.0",
"pyserial>=3.5,<4.0",
"wandb>=0.24.0,<0.25.0",
"torch>=2.2.1,<2.11.0", # TODO: Bump dependency
"torchcodec>=0.2.1,<0.11.0; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')", # TODO: Bump dependency
"torchvision>=0.21.0,<0.26.0", # TODO: Bump dependency
"draccus==0.10.0", # TODO: Remove ==
"draccus==0.10.0", # TODO: Relax version constraint
"gymnasium>=1.1.1,<2.0.0",
"rerun-sdk>=0.24.0,<0.27.0",
@@ -95,13 +98,14 @@ dependencies = [
# Common
pygame-dep = ["pygame>=2.5.1,<2.7.0"]
placo-dep = ["placo>=0.9.6,<0.10.0"]
placo-dep = ["placo>=0.9.6,<0.9.17"]
transformers-dep = ["transformers>=5.3.0,<6.0.0"]
grpcio-dep = ["grpcio==1.73.1", "protobuf>=6.31.1,<6.32.0"]
can-dep = ["python-can>=4.2.0,<5.0.0"]
peft-dep = ["peft>=0.18.0,<1.0.0"]
scipy-dep = ["scipy>=1.14.0,<2.0.0"]
qwen-vl-utils-dep = ["qwen-vl-utils>=0.0.11,<0.1.0"]
matplotlib-dep = ["matplotlib>=3.10.3,<4.0.0", "contourpy>=1.3.0,<2.0.0"] # NOTE: Explicitly listing contourpy helps the resolver converge faster.
# Motors
feetech = ["feetech-servo-sdk>=1.0.0,<2.0.0"]
@@ -119,16 +123,16 @@ unitree_g1 = [
"onnxruntime>=1.16.0,<2.0.0",
"pin>=3.0.0,<4.0.0",
"meshcat>=0.3.0,<0.4.0",
"matplotlib>=3.9.0,<4.0.0",
"lerobot[matplotlib-dep]",
"casadi>=3.6.0,<4.0.0",
]
reachy2 = ["reachy2_sdk>=1.0.15,<1.1.0"]
kinematics = ["lerobot[placo-dep]"]
intelrealsense = [
"pyrealsense2>=2.55.1.6486,<2.57.0 ; sys_platform != 'darwin'",
"pyrealsense2-macosx>=2.54,<2.55.0 ; sys_platform == 'darwin'",
"pyrealsense2-macosx>=2.54,<2.57.0 ; sys_platform == 'darwin'",
]
phone = ["hebi-py>=2.8.0,<2.12.0", "teleop>=0.1.0,<0.2.0", "fastapi<1.0"]
phone = ["hebi-py>=2.8.0,<2.12.0", "teleop>=0.1.0,<0.2.0", "fastapi<1.0", "lerobot[scipy-dep]"]
# Policies
wallx = [
@@ -151,12 +155,12 @@ groot = [
"ninja>=1.11.1,<2.0.0",
"flash-attn>=2.5.9,<3.0.0 ; sys_platform != 'darwin'"
]
sarm = ["lerobot[transformers-dep]", "faker>=33.0.0,<35.0.0", "matplotlib>=3.10.3,<4.0.0", "lerobot[qwen-vl-utils-dep]"]
sarm = ["lerobot[transformers-dep]", "faker>=33.0.0,<35.0.0", "lerobot[matplotlib-dep]", "lerobot[qwen-vl-utils-dep]"]
xvla = ["lerobot[transformers-dep]"]
hilserl = ["lerobot[transformers-dep]", "gym-hil>=0.1.13,<0.2.0", "lerobot[grpcio-dep]", "lerobot[placo-dep]"]
# Features
async = ["lerobot[grpcio-dep]", "matplotlib>=3.10.3,<4.0.0"]
async = ["lerobot[grpcio-dep]", "lerobot[matplotlib-dep]"]
peft = ["lerobot[transformers-dep]", "lerobot[peft-dep]"]
# Development
@@ -165,13 +169,19 @@ test = ["pytest>=8.1.0,<9.0.0", "pytest-timeout>=2.4.0,<3.0.0", "pytest-cov>=5.0
video_benchmark = ["scikit-image>=0.23.2,<0.26.0", "pandas>=2.2.2,<2.4.0"]
# Simulation
aloha = ["gym-aloha>=0.1.2,<0.2.0"]
# NOTE: Explicitly listing scipy helps flatten the dependecy tree.
aloha = ["gym-aloha>=0.1.2,<0.2.0", "lerobot[scipy-dep]"]
pusht = ["gym-pusht>=0.1.5,<0.2.0", "pymunk>=6.6.0,<7.0.0"] # TODO: Fix pymunk version in gym-pusht instead
libero = ["lerobot[transformers-dep]", "hf-libero>=0.1.3,<0.2.0"]
metaworld = ["metaworld==3.0.0"]
libero = ["lerobot[transformers-dep]", "hf-libero>=0.1.3,<0.2.0; sys_platform == 'linux'", "lerobot[scipy-dep]"]
metaworld = ["metaworld==3.0.0", "lerobot[scipy-dep]"]
# All
all = [
# NOTE(resolver hint): scipy is pulled in transitively via lerobot[scipy-dep] through
# multiple extras (aloha, metaworld, pi, wallx, phone). Listing it explicitly
# helps pip's resolver converge by constraining scipy early, before it encounters
# the loose scipy requirements from transitive deps like dm-control and metaworld.
"scipy>=1.14.0,<2.0.0",
"lerobot[dynamixel]",
"lerobot[gamepad]",
"lerobot[hopejr]",
@@ -192,7 +202,7 @@ all = [
"lerobot[aloha]",
"lerobot[pusht]",
"lerobot[phone]",
"lerobot[libero]",
"lerobot[libero]; sys_platform == 'linux'",
"lerobot[metaworld]",
"lerobot[sarm]",
"lerobot[peft]",
@@ -224,7 +234,7 @@ lerobot = ["envs/*.json"]
where = ["src"]
[tool.ruff]
target-version = "py310"
target-version = "py312"
line-length = 110
exclude = ["tests/artifacts/**/*.safetensors", "*_pb2.py", "*_pb2_grpc.py"]
@@ -316,7 +326,7 @@ default.extend-ignore-identifiers-re = [
# Uncomment [tool.mypy] first, then uncomment individual module overrides as they get proper type annotations
[tool.mypy]
python_version = "3.10"
python_version = "3.12"
ignore_missing_imports = true
follow_imports = "skip"
# warn_return_any = true
+312
View File
@@ -25,6 +25,7 @@ This module provides utilities for:
import logging
import shutil
from collections import defaultdict
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
@@ -45,6 +46,8 @@ from lerobot.datasets.utils import (
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH,
DEFAULT_EPISODES_PATH,
DEFAULT_SUBTASKS_PATH,
flatten_dict,
get_parquet_file_size_in_mb,
load_episodes,
update_chunk_file_indices,
@@ -141,6 +144,315 @@ def delete_episodes(
return new_dataset
def trim_episode_start(
dataset: LeRobotDataset,
seconds: float,
episode_indices: list[int] | None = None,
output_dir: str | Path | None = None,
repo_id: str | None = None,
) -> LeRobotDataset:
"""Trim the first N seconds from selected episodes and create a new dataset.
The operation rewrites data parquet files and updates episode metadata so that:
- frame_index starts at 0 for each trimmed episode
- timestamp starts at 0 for each trimmed episode
- global index remains contiguous across the full dataset
- dataset_from_index / dataset_to_index reflect new frame ranges
Video files are copied as-is and per-episode video timestamps are shifted forward
for trimmed episodes.
Episodes selected for trimming that are too short (length <= trim_frames) are skipped
from the output dataset.
Args:
dataset: The source LeRobotDataset.
seconds: Number of seconds to remove from episode starts.
episode_indices: Optional list of episode indices to trim. If None, trims all episodes.
output_dir: Directory to save the new dataset. If None, uses default location.
repo_id: Repository ID for the new dataset. If None, appends "_trimmed" to original.
"""
if seconds <= 0:
raise ValueError(f"seconds must be strictly positive, got {seconds}")
if dataset.meta.episodes is None:
dataset.meta.episodes = load_episodes(dataset.meta.root)
trim_frames = int(seconds * dataset.meta.fps)
if trim_frames <= 0:
raise ValueError(
f"seconds={seconds} corresponds to 0 frames at fps={dataset.meta.fps}. "
"Increase seconds so at least one frame is trimmed."
)
if episode_indices is None:
episode_indices = list(range(dataset.meta.total_episodes))
if len(episode_indices) == 0:
raise ValueError("No episodes specified to trim")
episode_indices = sorted(set(episode_indices))
valid_indices = set(range(dataset.meta.total_episodes))
invalid = set(episode_indices) - valid_indices
if invalid:
raise ValueError(f"Invalid episode indices: {invalid}")
too_short = sorted(
ep_idx for ep_idx in episode_indices if int(dataset.meta.episodes[ep_idx]["length"]) <= trim_frames
)
trim_set = set(episode_indices)
skipped_set = set(too_short)
trim_set -= skipped_set
if too_short:
logging.warning(
f"Skipping {len(too_short)} episode(s) that are too short to trim "
f"({trim_frames} frames): {too_short}"
)
episodes_to_keep = [ep_idx for ep_idx in range(dataset.meta.total_episodes) if ep_idx not in skipped_set]
if not episodes_to_keep:
raise ValueError(
"All episodes selected for trimming are too short and would be skipped. "
"Try a smaller trim duration."
)
logging.info(
f"Trimming {len(trim_set)} episode(s) by {seconds}s and keeping {len(episodes_to_keep)} "
f"episode(s) in output"
)
if repo_id is None:
repo_id = f"{dataset.repo_id}_trimmed"
output_dir = Path(output_dir) if output_dir is not None else HF_LEROBOT_HOME / repo_id
new_meta = LeRobotDatasetMetadata.create(
repo_id=repo_id,
fps=dataset.meta.fps,
features=dataset.meta.features,
robot_type=dataset.meta.robot_type,
root=output_dir,
use_videos=len(dataset.meta.video_keys) > 0,
chunks_size=dataset.meta.chunks_size,
data_files_size_in_mb=dataset.meta.data_files_size_in_mb,
video_files_size_in_mb=dataset.meta.video_files_size_in_mb,
)
if dataset.meta.tasks is not None:
write_tasks(dataset.meta.tasks, new_meta.root)
new_meta.tasks = dataset.meta.tasks.copy()
subtasks_path = dataset.root / DEFAULT_SUBTASKS_PATH
if subtasks_path.exists():
dst_subtasks_path = new_meta.root / DEFAULT_SUBTASKS_PATH
dst_subtasks_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(subtasks_path, dst_subtasks_path)
episode_mapping = {old_idx: new_idx for new_idx, old_idx in enumerate(episodes_to_keep)}
trim_duration_s = trim_frames / dataset.meta.fps
episode_lengths: dict[int, int] = {}
episode_ranges: dict[int, tuple[int, int]] = {}
total_frames = 0
for old_ep_idx in episodes_to_keep:
new_ep_idx = episode_mapping[old_ep_idx]
src_length = int(dataset.meta.episodes[old_ep_idx]["length"])
new_length = src_length - trim_frames if old_ep_idx in trim_set else src_length
episode_lengths[new_ep_idx] = new_length
episode_ranges[new_ep_idx] = (total_frames, total_frames + new_length)
total_frames += new_length
numeric_features = {
k: v
for k, v in dataset.meta.features.items()
if v["dtype"] not in ["image", "video", "string"]
}
episode_stats_parts: dict[int, list[dict[str, dict]]] = defaultdict(list)
episode_file_metadata: dict[int, dict[str, int]] = {}
data_dir = dataset.root / DATA_DIR
parquet_files = sorted(data_dir.glob("*/*.parquet"))
if not parquet_files:
raise ValueError(f"No parquet files found in {data_dir}")
for src_path in tqdm(parquet_files, desc="Trimming data files"):
df = pd.read_parquet(src_path).reset_index(drop=True)
if len(df) == 0:
continue
if skipped_set:
keep_mask = ~df["episode_index"].isin(skipped_set)
if not keep_mask.all():
df = df.loc[keep_mask].copy().reset_index(drop=True)
if len(df) == 0:
continue
if trim_set:
trim_mask = df["episode_index"].isin(trim_set) & (df["frame_index"] < trim_frames)
if trim_mask.any():
df = df.loc[~trim_mask].copy().reset_index(drop=True)
if len(df) == 0:
continue
relative_path = src_path.relative_to(dataset.root)
chunk_idx = int(relative_path.parts[1].split("-")[1])
file_idx = int(relative_path.parts[2].split("-")[1].split(".")[0])
for old_ep_idx in sorted(df["episode_index"].unique().tolist()):
ep_mask = df["episode_index"] == old_ep_idx
new_ep_idx = episode_mapping[old_ep_idx]
if old_ep_idx in trim_set:
df.loc[ep_mask, "frame_index"] = df.loc[ep_mask, "frame_index"] - trim_frames
shifted_timestamps = df.loc[ep_mask, "timestamp"].to_numpy(dtype=np.float64) - trim_duration_s
df.loc[ep_mask, "timestamp"] = np.clip(shifted_timestamps, a_min=0.0, a_max=None)
df.loc[ep_mask, "episode_index"] = new_ep_idx
ep_start, _ = episode_ranges[new_ep_idx]
new_indices = ep_start + df.loc[ep_mask, "frame_index"].to_numpy(dtype=np.int64)
df.loc[ep_mask, "index"] = new_indices
if new_ep_idx in episode_file_metadata:
existing = episode_file_metadata[new_ep_idx]
if (
existing["data/chunk_index"] != chunk_idx
or existing["data/file_index"] != file_idx
):
raise ValueError(
f"Episode {old_ep_idx} spans multiple data files. "
"trim_episode_start currently expects one data file per episode."
)
else:
episode_file_metadata[new_ep_idx] = {
"data/chunk_index": chunk_idx,
"data/file_index": file_idx,
}
if numeric_features:
ep_df = df.loc[ep_mask]
episode_data: dict[str, np.ndarray] = {}
episode_feature_spec: dict[str, dict] = {}
for key, feature in numeric_features.items():
if key not in ep_df.columns:
continue
values = ep_df[key].to_numpy()
if len(values) == 0:
continue
first_value = values[0]
if isinstance(first_value, np.ndarray):
episode_data[key] = np.stack(values)
elif isinstance(first_value, (list, tuple)):
episode_data[key] = np.stack(values)
else:
episode_data[key] = np.asarray(values)
episode_feature_spec[key] = feature
if episode_data:
episode_stats_parts[new_ep_idx].append(
compute_episode_stats(episode_data, episode_feature_spec)
)
df["index"] = df["index"].astype(np.int64)
if "frame_index" in df.columns:
df["frame_index"] = df["frame_index"].astype(np.int64)
dst_path = new_meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
dst_path.parent.mkdir(parents=True, exist_ok=True)
_write_parquet(df, dst_path, new_meta)
all_episode_stats = []
for old_ep_idx in tqdm(episodes_to_keep, desc="Writing episode metadata"):
new_ep_idx = episode_mapping[old_ep_idx]
if new_ep_idx not in episode_file_metadata:
raise ValueError(f"Missing data file metadata for episode {old_ep_idx}")
from_idx, to_idx = episode_ranges[new_ep_idx]
src_episode = dataset.meta.episodes[old_ep_idx]
ep_data_meta = episode_file_metadata[new_ep_idx]
stats_parts = episode_stats_parts.get(new_ep_idx, [])
ep_stats = aggregate_stats(stats_parts) if len(stats_parts) > 1 else (stats_parts[0] if stats_parts else {})
if ep_stats:
all_episode_stats.append(ep_stats)
episode_meta = {
"data/chunk_index": ep_data_meta["data/chunk_index"],
"data/file_index": ep_data_meta["data/file_index"],
"dataset_from_index": from_idx,
"dataset_to_index": to_idx,
}
for video_key in dataset.meta.video_keys:
from_ts = src_episode[f"videos/{video_key}/from_timestamp"]
if old_ep_idx in trim_set:
from_ts += trim_duration_s
episode_meta.update(
{
f"videos/{video_key}/chunk_index": src_episode[f"videos/{video_key}/chunk_index"],
f"videos/{video_key}/file_index": src_episode[f"videos/{video_key}/file_index"],
f"videos/{video_key}/from_timestamp": from_ts,
f"videos/{video_key}/to_timestamp": src_episode[f"videos/{video_key}/to_timestamp"],
}
)
episode_dict = {
"episode_index": new_ep_idx,
"tasks": src_episode["tasks"],
"length": episode_lengths[new_ep_idx],
}
episode_dict.update(episode_meta)
if ep_stats:
episode_dict.update(flatten_dict({"stats": ep_stats}))
new_meta._save_episode_metadata(episode_dict)
new_meta._close_writer()
if new_meta.video_keys:
_copy_videos(dataset, new_meta)
new_meta.info.update(
{
"total_episodes": len(episodes_to_keep),
"total_frames": total_frames,
"total_tasks": len(new_meta.tasks) if new_meta.tasks is not None else 0,
"splits": {"train": f"0:{len(episodes_to_keep)}"},
}
)
if new_meta.video_keys and dataset.meta.video_keys:
for key in new_meta.video_keys:
if key in dataset.meta.features:
new_meta.info["features"][key]["info"] = dataset.meta.info["features"][key].get("info", {})
write_info(new_meta.info, new_meta.root)
merged_stats = aggregate_stats(all_episode_stats) if all_episode_stats else {}
if dataset.meta.stats:
for key, value in dataset.meta.stats.items():
if key not in merged_stats:
merged_stats[key] = value
if merged_stats:
write_stats(merged_stats, new_meta.root)
return LeRobotDataset(
repo_id=repo_id,
root=output_dir,
image_transforms=dataset.image_transforms,
delta_timestamps=dataset.delta_timestamps,
tolerance_s=dataset.tolerance_s,
)
def split_dataset(
dataset: LeRobotDataset,
splits: dict[str, float | list[int]],
+2 -4
View File
@@ -21,7 +21,7 @@ from collections import deque
from collections.abc import Iterable, Iterator
from pathlib import Path
from pprint import pformat
from typing import Any, Generic, TypeVar
from typing import Any
import datasets
import numpy as np
@@ -78,8 +78,6 @@ DEFAULT_FEATURES = {
"task_index": {"dtype": "int64", "shape": (1,), "names": None},
}
T = TypeVar("T")
def get_parquet_file_size_in_mb(parquet_path: str | Path) -> float:
metadata = pq.read_metadata(parquet_path)
@@ -1234,7 +1232,7 @@ class LookAheadError(Exception):
pass
class Backtrackable(Generic[T]):
class Backtrackable[T]:
"""
Wrap any iterator/iterable so you can step back up to `history` items
and look ahead up to `lookahead` items.
@@ -228,7 +228,6 @@ def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):
# Reset for the next file
size_in_mb = 0
num_frames += ep_num_frames # Still need to accumulate total frames
paths_to_cat = []
# Now create metadata with correct chunk/file indices
+4 -4
View File
@@ -29,7 +29,7 @@ from dataclasses import dataclass
from enum import Enum
from functools import cached_property
from pprint import pformat
from typing import Protocol, TypeAlias
from typing import Protocol
import serial
from deepdiff import DeepDiff
@@ -38,8 +38,8 @@ from tqdm import tqdm
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.utils import enter_pressed, move_cursor_up
NameOrID: TypeAlias = str | int
Value: TypeAlias = int | float
type NameOrID = str | int
type Value = int | float
logger = logging.getLogger(__name__)
@@ -1277,4 +1277,4 @@ class SerialMotorsBus(MotorsBusBase):
# Backward compatibility alias
MotorsBus: TypeAlias = SerialMotorsBus
MotorsBus = SerialMotorsBus
+1 -2
View File
@@ -18,10 +18,9 @@ from __future__ import annotations
import importlib
import logging
from typing import Any, TypedDict
from typing import Any, TypedDict, Unpack
import torch
from typing_extensions import Unpack
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import FeatureType
@@ -4,10 +4,9 @@
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
from __future__ import annotations
# copy from https://github.com/huggingface/transformers/blob/main/src/transformers/models/llava_onevision/image_processing_llava_onevision_fast.py
from typing import Optional
from transformers.image_processing_utils import (
BatchFeature,
get_patch_output_size,
@@ -165,11 +164,11 @@ class Eagle25VLImageProcessorFast(BaseImageProcessorFast):
def _resize_for_patching(
self,
image: "torch.Tensor",
image: torch.Tensor,
target_resolution: tuple,
interpolation: "F.InterpolationMode",
interpolation: F.InterpolationMode,
input_data_format: ChannelDimension,
) -> "torch.Tensor":
) -> torch.Tensor:
"""
Resizes an image to a target resolution while maintaining aspect ratio.
@@ -219,8 +218,8 @@ class Eagle25VLImageProcessorFast(BaseImageProcessorFast):
return best_ratio
def _pad_for_patching(
self, image: "torch.Tensor", target_resolution: tuple, input_data_format: ChannelDimension
) -> "torch.Tensor":
self, image: torch.Tensor, target_resolution: tuple, input_data_format: ChannelDimension
) -> torch.Tensor:
"""
Pad an image to a target resolution while maintaining aspect ratio.
"""
@@ -236,15 +235,15 @@ class Eagle25VLImageProcessorFast(BaseImageProcessorFast):
def _get_image_patches(
self,
image: "torch.Tensor",
image: torch.Tensor,
min_num: int,
max_num: int,
size: tuple,
tile_size: int,
use_thumbnail: bool,
interpolation: "F.InterpolationMode",
interpolation: F.InterpolationMode,
pad_during_tiling: bool,
) -> list["torch.Tensor"]:
) -> list[torch.Tensor]:
image_size = get_image_size(image, channel_dim=ChannelDimension.FIRST)
orig_height, orig_width = image_size
aspect_ratio = orig_width / orig_height
@@ -305,8 +304,8 @@ class Eagle25VLImageProcessorFast(BaseImageProcessorFast):
def _pad_for_batching(
self,
pixel_values: list["torch.Tensor"],
) -> list["torch.Tensor"]:
pixel_values: list[torch.Tensor],
) -> list[torch.Tensor]:
"""
Pads images on the `num_of_patches` dimension with zeros to form a batch of same number of patches.
@@ -327,14 +326,14 @@ class Eagle25VLImageProcessorFast(BaseImageProcessorFast):
def _preprocess(
self,
images: list["torch.Tensor"],
images: list[torch.Tensor],
do_resize: bool,
size: SizeDict,
max_dynamic_tiles: int,
min_dynamic_tiles: int,
use_thumbnail: bool,
pad_during_tiling: bool,
interpolation: Optional["F.InterpolationMode"],
interpolation: F.InterpolationMode | None,
do_center_crop: bool,
crop_size: SizeDict,
do_rescale: bool,
+1 -2
View File
@@ -20,12 +20,11 @@ import logging
import math
from collections import deque
from pathlib import Path
from typing import TYPE_CHECKING, Literal, TypedDict
from typing import TYPE_CHECKING, Literal, TypedDict, Unpack
import torch
import torch.nn.functional as F # noqa: N812
from torch import Tensor, nn
from typing_extensions import Unpack
from lerobot.utils.import_utils import _transformers_available
+1 -2
View File
@@ -20,12 +20,11 @@ import logging
import math
from collections import deque
from pathlib import Path
from typing import TYPE_CHECKING, Literal, TypedDict
from typing import TYPE_CHECKING, Literal, TypedDict, Unpack
import torch
import torch.nn.functional as F # noqa: N812
from torch import Tensor, nn
from typing_extensions import Unpack
from lerobot.utils.import_utils import _transformers_available
@@ -19,13 +19,12 @@ import logging
import math
from collections import deque
from pathlib import Path
from typing import TYPE_CHECKING, Literal, TypedDict
from typing import TYPE_CHECKING, Literal, TypedDict, Unpack
import numpy as np
import torch
import torch.nn.functional as F # noqa: N812
from torch import Tensor, nn
from typing_extensions import Unpack
from lerobot.utils.import_utils import _scipy_available, _transformers_available
+1 -2
View File
@@ -19,7 +19,7 @@ import os
from importlib.resources import files
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TypedDict, TypeVar
from typing import TypedDict, TypeVar, Unpack
import packaging
import safetensors
@@ -28,7 +28,6 @@ from huggingface_hub.constants import SAFETENSORS_SINGLE_FILE
from huggingface_hub.errors import HfHubHTTPError
from safetensors.torch import load_model as load_model_as_safetensor, save_model as save_model_as_safetensor
from torch import Tensor, nn
from typing_extensions import Unpack
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.train import TrainPipelineConfig
@@ -54,12 +54,11 @@ policy = SmolVLAPolicy.from_pretrained("lerobot/smolvla_base")
import math
from collections import deque
from typing import TypedDict
from typing import TypedDict, Unpack
import torch
import torch.nn.functional as F # noqa: N812
from torch import Tensor, nn
from typing_extensions import Unpack
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.rtc.modeling_rtc import RTCProcessor
+5 -5
View File
@@ -17,7 +17,7 @@
from __future__ import annotations
from enum import Enum
from typing import Any, TypeAlias, TypedDict
from typing import Any, TypedDict
import numpy as np
import torch
@@ -36,10 +36,10 @@ class TransitionKey(str, Enum):
COMPLEMENTARY_DATA = "complementary_data"
PolicyAction: TypeAlias = torch.Tensor
RobotAction: TypeAlias = dict[str, Any]
EnvAction: TypeAlias = np.ndarray
RobotObservation: TypeAlias = dict[str, Any]
PolicyAction = torch.Tensor
RobotAction = dict[str, Any]
EnvAction = np.ndarray
RobotObservation = dict[str, Any]
EnvTransition = TypedDict(
+4 -4
View File
@@ -39,7 +39,7 @@ from collections.abc import Callable, Iterable, Sequence
from copy import deepcopy
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Generic, TypeAlias, TypedDict, TypeVar, cast
from typing import Any, TypedDict, TypeVar, cast
import torch
from huggingface_hub import hf_hub_download
@@ -251,7 +251,7 @@ class ProcessorMigrationError(Exception):
@dataclass
class DataProcessorPipeline(HubMixin, Generic[TInput, TOutput]):
class DataProcessorPipeline[TInput, TOutput](HubMixin):
"""A sequential pipeline for processing data, integrated with the Hugging Face Hub.
This class chains together multiple `ProcessorStep` instances to form a complete
@@ -1432,8 +1432,8 @@ class DataProcessorPipeline(HubMixin, Generic[TInput, TOutput]):
# Type aliases for semantic clarity.
RobotProcessorPipeline: TypeAlias = DataProcessorPipeline[TInput, TOutput]
PolicyProcessorPipeline: TypeAlias = DataProcessorPipeline[TInput, TOutput]
RobotProcessorPipeline = DataProcessorPipeline[TInput, TOutput]
PolicyProcessorPipeline = DataProcessorPipeline[TInput, TOutput]
class ObservationProcessorStep(ProcessorStep, ABC):
@@ -15,7 +15,6 @@
# limitations under the License.
from dataclasses import dataclass, field
from typing import TypeAlias
from lerobot.cameras import CameraConfig
@@ -50,5 +49,5 @@ class SOFollowerRobotConfig(RobotConfig, SOFollowerConfig):
pass
SO100FollowerConfig: TypeAlias = SOFollowerRobotConfig
SO101FollowerConfig: TypeAlias = SOFollowerRobotConfig
SO100FollowerConfig = SOFollowerRobotConfig
SO101FollowerConfig = SOFollowerRobotConfig
@@ -17,7 +17,6 @@
import logging
import time
from functools import cached_property
from typing import TypeAlias
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
@@ -230,5 +229,5 @@ class SOFollower(Robot):
logger.info(f"{self} disconnected.")
SO100Follower: TypeAlias = SOFollower
SO101Follower: TypeAlias = SOFollower
SO100Follower = SOFollower
SO101Follower = SOFollower
@@ -117,6 +117,13 @@ Modify tasks - set default task with overrides for specific episodes (WARNING: m
--operation.new_task "Default task" \
--operation.episode_tasks '{"5": "Special task for episode 5"}'
Trim first 3 seconds from all episodes:
python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht \
--new_repo_id lerobot/pusht_trim3s \
--operation.type trim_episode_start \
--operation.seconds 3.0
Convert image dataset to video format and save locally:
lerobot-edit-dataset \
--repo_id lerobot/pusht_image \
@@ -170,6 +177,7 @@ from lerobot.datasets.dataset_tools import (
modify_tasks,
remove_feature,
split_dataset,
trim_episode_start,
)
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.constants import HF_LEROBOT_HOME
@@ -215,6 +223,13 @@ class ModifyTasksConfig(OperationConfig):
episode_tasks: dict[str, str] | None = None
@OperationConfig.register_subclass("trim_episode_start")
@dataclass
class TrimEpisodeStartConfig(OperationConfig):
seconds: float | None = None
episode_indices: list[int] | None = None
@OperationConfig.register_subclass("convert_image_to_video")
@dataclass
class ConvertImageToVideoConfig(OperationConfig):
@@ -464,6 +479,41 @@ def handle_modify_tasks(cfg: EditDatasetConfig) -> None:
modified_dataset.push_to_hub()
def handle_trim_episode_start(cfg: EditDatasetConfig) -> None:
if not isinstance(cfg.operation, TrimEpisodeStartConfig):
raise ValueError("Operation config must be TrimEpisodeStartConfig")
if cfg.operation.seconds is None:
raise ValueError("seconds must be specified for trim_episode_start operation")
dataset = LeRobotDataset(cfg.repo_id, root=cfg.root)
output_repo_id, output_dir = get_output_path(
cfg.repo_id, cfg.new_repo_id, Path(cfg.root) if cfg.root else None
)
if cfg.new_repo_id is None:
dataset.root = Path(str(dataset.root) + "_old")
logging.info(
f"Trimming first {cfg.operation.seconds}s from episodes "
f"{cfg.operation.episode_indices if cfg.operation.episode_indices else 'ALL'} in {cfg.repo_id}"
)
new_dataset = trim_episode_start(
dataset=dataset,
seconds=cfg.operation.seconds,
episode_indices=cfg.operation.episode_indices,
output_dir=output_dir,
repo_id=output_repo_id,
)
logging.info(f"Dataset saved to {output_dir}")
logging.info(f"Episodes: {new_dataset.meta.total_episodes}, Frames: {new_dataset.meta.total_frames}")
if cfg.push_to_hub:
logging.info(f"Pushing to hub as {output_repo_id}")
LeRobotDataset(output_repo_id, root=output_dir).push_to_hub()
def handle_convert_image_to_video(cfg: EditDatasetConfig) -> None:
# Note: Parser may create any config type with the right fields, so we access fields directly
# instead of checking isinstance()
@@ -594,6 +644,8 @@ def edit_dataset(cfg: EditDatasetConfig) -> None:
handle_remove_feature(cfg)
elif operation_type == "modify_tasks":
handle_modify_tasks(cfg)
elif operation_type == "trim_episode_start":
handle_trim_episode_start(cfg)
elif operation_type == "convert_image_to_video":
handle_convert_image_to_video(cfg)
elif operation_type == "info":
@@ -15,7 +15,6 @@
# limitations under the License.
from dataclasses import dataclass
from typing import TypeAlias
from ..config import TeleoperatorConfig
@@ -38,5 +37,5 @@ class SOLeaderTeleopConfig(TeleoperatorConfig, SOLeaderConfig):
pass
SO100LeaderConfig: TypeAlias = SOLeaderTeleopConfig
SO101LeaderConfig: TypeAlias = SOLeaderTeleopConfig
SO100LeaderConfig = SOLeaderTeleopConfig
SO101LeaderConfig = SOLeaderTeleopConfig
@@ -16,7 +16,6 @@
import logging
import time
from typing import TypeAlias
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.feetech import (
@@ -156,5 +155,5 @@ class SOLeader(Teleoperator):
logger.info(f"{self} disconnected.")
SO100Leader: TypeAlias = SOLeader
SO101Leader: TypeAlias = SOLeader
SO100Leader = SOLeader
SO101Leader = SOLeader
+1 -3
View File
@@ -16,12 +16,10 @@
import json
import warnings
from pathlib import Path
from typing import TypeVar
import imageio
JsonLike = str | int | float | bool | None | list["JsonLike"] | dict[str, "JsonLike"] | tuple["JsonLike", ...]
T = TypeVar("T", bound=JsonLike)
def write_video(video_path, stacked_frames, fps):
@@ -33,7 +31,7 @@ def write_video(video_path, stacked_frames, fps):
imageio.mimsave(video_path, stacked_frames, fps=fps)
def deserialize_json_into_object(fpath: Path, obj: T) -> T:
def deserialize_json_into_object[T: JsonLike](fpath: Path, obj: T) -> T:
"""
Loads the JSON data from `fpath` and recursively fills `obj` with the
corresponding values (strictly matching structure and types).
+99
View File
@@ -29,6 +29,7 @@ from lerobot.datasets.dataset_tools import (
modify_tasks,
remove_feature,
split_dataset,
trim_episode_start,
)
from lerobot.scripts.lerobot_edit_dataset import convert_image_to_video_dataset
@@ -142,6 +143,104 @@ def test_delete_empty_list(sample_dataset, tmp_path):
)
def test_trim_episode_start_updates_indices(sample_dataset, tmp_path):
"""Test trimming episode starts updates frame/timestamp/index metadata consistently."""
output_dir = tmp_path / "trimmed"
trim_seconds = 0.1 # 3 frames at 30 FPS
trim_frames = int(trim_seconds * sample_dataset.meta.fps)
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(output_dir)
new_dataset = trim_episode_start(
sample_dataset,
seconds=trim_seconds,
output_dir=output_dir,
)
expected_length = 10 - trim_frames
assert new_dataset.meta.total_episodes == sample_dataset.meta.total_episodes
assert new_dataset.meta.total_frames == sample_dataset.meta.total_episodes * expected_length
indices = [int(i.item()) for i in new_dataset.hf_dataset["index"]]
assert indices == list(range(new_dataset.meta.total_frames))
episode_indices = [int(i.item()) for i in new_dataset.hf_dataset["episode_index"]]
frame_indices = [int(i.item()) for i in new_dataset.hf_dataset["frame_index"]]
timestamps = [float(i.item()) for i in new_dataset.hf_dataset["timestamp"]]
for ep_idx in range(sample_dataset.meta.total_episodes):
ep_frame_indices = [f for e, f in zip(episode_indices, frame_indices, strict=False) if e == ep_idx]
ep_timestamps = [t for e, t in zip(episode_indices, timestamps, strict=False) if e == ep_idx]
assert len(ep_frame_indices) == expected_length
assert ep_frame_indices == list(range(expected_length))
assert ep_timestamps[0] == pytest.approx(0.0)
assert ep_timestamps[-1] == pytest.approx((expected_length - 1) / sample_dataset.meta.fps)
ep_meta = new_dataset.meta.episodes[ep_idx]
assert int(ep_meta["length"]) == expected_length
assert int(ep_meta["dataset_from_index"]) == ep_idx * expected_length
assert int(ep_meta["dataset_to_index"]) == (ep_idx + 1) * expected_length
def test_trim_episode_start_skips_too_short_episodes(tmp_path, empty_lerobot_dataset_factory):
"""Test too-short episodes are skipped and remaining episodes are reindexed."""
features = {
"action": {"dtype": "float32", "shape": (2,), "names": None},
"observation.state": {"dtype": "float32", "shape": (2,), "names": None},
"observation.images.top": {"dtype": "image", "shape": (32, 32, 3), "names": None},
}
dataset = empty_lerobot_dataset_factory(root=tmp_path / "source", features=features)
for ep_len in [10, 2, 10]:
for _ in range(ep_len):
dataset.add_frame(
{
"action": np.random.randn(2).astype(np.float32),
"observation.state": np.random.randn(2).astype(np.float32),
"observation.images.top": np.random.randint(0, 255, size=(32, 32, 3), dtype=np.uint8),
"task": "task",
}
)
dataset.save_episode()
dataset.finalize()
trim_seconds = 0.1 # 3 frames at 30 FPS
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "trimmed")
new_dataset = trim_episode_start(
dataset,
seconds=trim_seconds,
output_dir=tmp_path / "trimmed",
)
# Episode 1 is too short and gets skipped. Remaining episodes are trimmed and reindexed.
assert new_dataset.meta.total_episodes == 2
assert new_dataset.meta.total_frames == 14
assert sorted({int(idx.item()) for idx in new_dataset.hf_dataset["episode_index"]}) == [0, 1]
assert [int(ep["length"]) for ep in new_dataset.meta.episodes] == [7, 7]
def test_trim_episode_start_rejects_when_all_selected_are_too_short(sample_dataset, tmp_path):
"""Test trimming fails when all selected episodes are too short and would be skipped."""
with pytest.raises(ValueError, match="All episodes selected for trimming are too short"):
trim_episode_start(
sample_dataset,
seconds=1.0, # 30 frames > 10-frame episodes
output_dir=tmp_path / "trimmed",
)
def test_split_by_episodes(sample_dataset, tmp_path):
"""Test splitting dataset by specific episode indices."""
splits = {
+7
View File
@@ -143,12 +143,18 @@ def test_policy(ds_repo_id, env_name, env_kwargs, policy_name, policy_kwargs):
Note: We test various combinations of policy and dataset. The combinations are by no means exhaustive,
and for now we add tests as we see fit.
"""
if policy_name == "vqbet" and DEVICE == "mps":
pytest.skip("VQBet does not support MPS backend")
if policy_name == "act" and "aloha" in ds_repo_id and DEVICE == "mps":
pytest.skip("ACT with aloha has batch mutation issues on MPS")
train_cfg = TrainPipelineConfig(
# TODO(rcadene, aliberts): remove dataset download
dataset=DatasetConfig(repo_id=ds_repo_id, episodes=[0]),
policy=make_policy_config(policy_name, push_to_hub=False, **policy_kwargs),
env=make_env_config(env_name, **env_kwargs),
)
train_cfg.policy.device = DEVICE
train_cfg.validate()
# Check that we can make the policy object.
@@ -227,6 +233,7 @@ def test_act_backbone_lr():
dataset=DatasetConfig(repo_id="lerobot/aloha_sim_insertion_scripted", episodes=[0]),
policy=make_policy_config("act", optimizer_lr=0.01, optimizer_lr_backbone=0.001, push_to_hub=False),
)
cfg.policy.device = DEVICE
cfg.validate() # Needed for auto-setting some parameters
assert cfg.policy.optimizer_lr == 0.01
+1 -3
View File
@@ -1870,9 +1870,7 @@ class NonCallableStep(ProcessorStep):
def test_construction_rejects_step_without_call():
"""Test that DataProcessorPipeline rejects steps that don't inherit from ProcessorStep."""
with pytest.raises(
TypeError, match=r"Can't instantiate abstract class NonCallableStep with abstract method __call_"
):
with pytest.raises(TypeError, match=r"Can't instantiate abstract class NonCallableStep"):
DataProcessorPipeline([NonCallableStep()])
with pytest.raises(TypeError, match=r"must inherit from ProcessorStep"):
@@ -28,6 +28,7 @@ from lerobot.scripts.lerobot_edit_dataset import (
RemoveFeatureConfig,
SplitConfig,
_validate_config,
TrimEpisodeStartConfig,
)
@@ -47,6 +48,7 @@ class TestOperationTypeParsing:
("merge", MergeConfig),
("remove_feature", RemoveFeatureConfig),
("modify_tasks", ModifyTasksConfig),
("trim_episode_start", TrimEpisodeStartConfig),
("convert_image_to_video", ConvertImageToVideoConfig),
("info", InfoConfig),
],
@@ -77,6 +79,7 @@ class TestOperationTypeParsing:
("merge", MergeConfig),
("remove_feature", RemoveFeatureConfig),
("modify_tasks", ModifyTasksConfig),
("trim_episode_start", TrimEpisodeStartConfig),
("convert_image_to_video", ConvertImageToVideoConfig),
("info", InfoConfig),
],
+2 -1
View File
@@ -22,8 +22,9 @@ import torch
from lerobot import available_cameras, available_motors, available_robots
from lerobot.utils.import_utils import is_package_available
from lerobot.utils.utils import auto_select_torch_device
DEVICE = os.environ.get("LEROBOT_TEST_DEVICE", "cuda") if torch.cuda.is_available() else "cpu"
DEVICE = os.environ.get("LEROBOT_TEST_DEVICE", str(auto_select_torch_device()))
TEST_ROBOT_TYPES = []
for robot_type in available_robots: