Compare commits

..

22 Commits

Author SHA1 Message Date
javadcc_mac f9b8f297b4 Fix EVO1 LIBERO rollout processors 2026-06-09 15:10:10 +08:00
javadcc_mac 95527f6051 Merge remote-tracking branch 'upstream/main' into codex/add-evo1-policy 2026-05-12 17:40:59 +08:00
javadcc_mac 407ee867b9 docs(evo1): format results table 2026-05-12 17:40:18 +08:00
Steven Palma 26ff40ddd7 chore(deps): cap torch ceiling at <2.12, pin Linux wheels to cu128 (#3570)
* chore(deps): ceiling + cuda

* ci: bump cuda version docker image

* ci: add cpu wheel to release workflow

* chore(deps): update uv.lock

* docs: update installation with cuda note
2026-05-11 19:47:55 +02:00
javadcc_mac a5e6409985 fix(evo1): finalize policy guide alignment 2026-05-11 21:51:41 +08:00
Maxime Ellerbach 6d269b28c8 docs(omx): adding some examples and scripts (#3566)
* docs(omx): adding some examples and scripts

* cleaning up and reviewing the cli args

* adding __init__.py to example folder, adjusting the examples

* adding reference to pretrained act policy

* moving `.send_action` before `dataset.add_frame` for consistency

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>

* adjusting docstring

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>

* adressing hardcoded dataset fps

* removed init as it worked without

---------

Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>
2026-05-11 15:36:32 +02:00
Steven Palma b607c8458e docs: add policy & compute guide (#3534)
* docs(policy): contributing a policy guide

* docs(training): HW compute guide

* chore(docs): add to readme and index

* Apply suggestions from code review

Co-authored-by: Haoming Song <1847575517@qq.com>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>

* chore(docs): slight improvements

* refactor(docs): consolidate add policy docs

* chore(style): fix pre-commit

---------

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Haoming Song <1847575517@qq.com>
2026-05-11 15:19:12 +02:00
Jash Shah 9e83510c99 fix(datasets): close file handle on VideoDecoder init failure in cache (#3542)
If VideoDecoder() raises during initialization, the fsspec file handle
was leaked since it was opened via __enter__() but never closed on the
exception path. Now explicitly closes the handle before re-raising.
2026-05-10 17:30:37 +02:00
javadcc_mac 1c9fbba9a9 chore(evo1): align with policy contribution guide conventions
- Add `src/lerobot/policies/evo1/README.md` symlink into `docs/source/evo1.mdx`
  to match the in-tree README convention (mirroring the EO-1 layout).
- Convert `transformers` import in `internvl3_embedder.py` to the standard
  `TYPE_CHECKING + _transformers_available` two-step gating used by other
  optional-backbone policies (e.g. diffusion). The previous lazy-in-`__init__`
  import was functionally equivalent for runtime gating but didn't expose the
  real symbols to type checkers.
- Add `lerobot[evo1]` to the `all` extra in `pyproject.toml` so
  `pip install 'lerobot[all]'` keeps installing every optional policy.

Per the guidance in https://moon-ci-docs.huggingface.co/docs/lerobot/pr_3534/en/contributing_a_policy.
2026-05-10 23:14:23 +08:00
javadcc_mac 6a1b5ceb9d Merge remote-tracking branch 'upstream/main' into codex/add-evo1-policy
# Conflicts:
#	uv.lock
2026-05-10 22:48:17 +08:00
javadcc_mac daa4c4dd30 chore(lock): regenerate uv.lock for evo1 extra
Adds the `evo1` entry to `[package.metadata.requires-dist]` and the
`provides-extras` list so that `uv sync --locked --extra test` (used by
fast_tests.yml) no longer reports the lockfile as stale.

Generated with `uv 0.8.0` (matching `UV_VERSION` in fast_tests.yml).
The non-evo1 marker tweaks are produced by `uv lock` re-resolving the
existing dep graph and are not introduced by this PR.
2026-05-10 22:43:26 +08:00
Anthony Shoumikhin 1f7b03f5f2 chore(deps): allow torch 2.11/2.12 and fix autocast deprecation (#3435)
* chore(deps): allow torch 2.11/2.12 and fix autocast deprecation

- Bump torch to >=2.7,<2.13 (was <2.11), torchvision to <0.28 (was <0.26),
  and torchcodec to <0.13 (was <0.11) to allow installs against the latest
  stable torch 2.11 and the upcoming 2.12 line.
- Replace removed torch.get_autocast_gpu_dtype() with torch.get_autocast_dtype("cuda")
  in Florence2 and Qwen2.5-VL-MoE FlashAttention paths (the former is removed in 2.11+).
- Refresh uv.lock for the new resolution (torch 2.11.0+cu130, torchvision 0.26.0+cu130,
  torchcodec 0.11.1, full CUDA 13 stack).

Verified locally with `uv sync --locked` from a clean .venv and the lerobot
test suite (pytest -n 8 --dist=loadfile --timeout=300). Failure set is
identical to the pre-bump baseline: 18 pre-existing failures
(test_sac_policy*, test_pi0_rtc*, test_pi05_rtc*, test_replay_buffer*),
0 new, 0 fixed.

AI assistance: this change was authored with Claude Code per AI_POLICY.md.

* fix(policies): use device-agnostic autocast dtype lookup

Pass query_states.device.type to torch.get_autocast_dtype() instead of
hardcoding 'cuda', so the cast matches the active autocast context when
running under CPU/MPS/XPU autocast.

---------

Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-05-10 13:05:35 +02:00
Yiming Wang ff992a7a1d Merge branch 'main' into codex/add-evo1-policy 2026-05-10 18:54:35 +08:00
Steven Palma cb8edf17e6 chore(dependencies): update uv.lock (#3475) 2026-05-10 12:24:22 +02:00
Steven Palma 5699f6cbf4 chore(ci): disable auto-stale (#3550) 2026-05-10 11:49:31 +02:00
javadcc_mac 48269dddb3 fix(evo1): infer batch size after normalizing image dims
`_collect_image_batches` read `batch_size = batch[camera_keys[0]].shape[0]`
before normalizing per-camera tensors to `(B, C, H, W)`. For an unbatched
`(C, H, W)` input (which the function tries to support via the `image.dim() == 3`
branch), this picked up the channel count `C` instead of the real batch size,
making the subsequent per-sample loop iterate `C` times and indexing go
out of bounds.

Normalize each camera tensor up-front, then read `batch_size` from the
normalized batch dim. Adds `test_collect_image_batches_handles_unbatched_chw`
covering the regression.

Reported by Copilot review on huggingface/lerobot#3545.
2026-05-10 11:29:23 +08:00
javadcc_mac 8df8d3d866 feat(policies): add EVO1 policy 2026-05-09 21:39:19 +08:00
masato-ka 0e6114ac36 fix(train): restrict legacy RA-BC migration to JSON checkpoints only (#3490)
* fix(train): restrict legacy RA-BC migration to JSON checkpoints only

_migrate_legacy_rabc_fields was called for all config files, causing
json.load to raise DecodeError when a YAML/TOML config was passed to
lerobot-train for a new training run. Guard the block with an
.endswith(".json") check so migration only runs when resuming from
a JSON checkpoint.
2026-05-08 20:27:01 +02:00
Steven Palma c8ce413d73 fix(robots): allign lekiwi default with so100 use_degrees (#3531) 2026-05-07 17:52:34 +02:00
Pepijn 82dffde7fa fix(ci): speed up multi-task benchmark evals (parallelize + cap VLABench steps) (#3529)
* fix(ci): run multi-task benchmark evals 5-at-a-time in parallel

The eval script supports running tasks concurrently via a
ThreadPoolExecutor (env.max_parallel_tasks). Apply it to the four
multi-task benchmark CI jobs (RoboTwin, RoboCasa, RoboMME, LIBERO-plus
— 8-10 tasks/task_ids each) so they finish in ~2 waves of 5 instead of
running sequentially. Single-task jobs (Libero, MetaWorld, RoboCerebra)
are unchanged.

* fix(ci): cap VLABench smoke eval at 50 steps per task

VLABench's default episode_length is 500 steps; with 10 tasks at ~1 it/s
the smoke eval took ~80 minutes of rollouts on top of the image build.
The eval is a pipeline smoke test (running_success_rate stays at 0% on
this short rollout anyway), so we don't need full episodes — cap each
task at 50 steps to bring total rollout time down ~10x.

* fix(ci): run VLABench tasks 5-at-a-time in parallel

The eval script already supports running multiple tasks concurrently via
a ThreadPoolExecutor (env.max_parallel_tasks). Set it to 5 so the 10
VLABench tasks finish in ~2 waves instead of running sequentially.
2026-05-07 13:37:16 +02:00
Ville Kuosmanen eaf0218bc8 feat(policy): use pretrained vision encoder weights by default for diffusion and vqbet (#3202)
* feat: add pretrained vision encoder weights for diffusion and vqbet

* fix test by re-generating artifacts

---------

Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-05-07 12:10:38 +02:00
Pepijn a0e52d52fe fix(ci): bump robotwin benchmark image to CUDA 12.6 (#3525)
The robotwin benchmark Dockerfile still installed cuda-nvcc-12-4 and
cuda-cudart-dev-12-4 after #3505 upgraded the base image to CUDA 12.6.3
on Ubuntu 24.04. Those packages aren't available in the ubuntu2404 CUDA
repo, so the build failed at apt-get install. Bumping both to -12-6 to
match the base image.
2026-05-07 11:11:12 +02:00
51 changed files with 4618 additions and 983 deletions
+6
View File
@@ -382,6 +382,7 @@ jobs:
--policy.path=\"\$ROBOTWIN_POLICY\" \
--env.type=robotwin \
--env.task=\"\$ROBOTWIN_TASKS\" \
--env.max_parallel_tasks=5 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--eval.use_async_envs=false \
@@ -482,6 +483,7 @@ jobs:
--policy.path=lerobot/smolvla_robocasa \
--env.type=robocasa \
--env.task=CloseFridge,OpenCabinet,OpenDrawer,TurnOnMicrowave,TurnOffStove,CloseToasterOvenDoor,SlideDishwasherRack,TurnOnSinkFaucet,NavigateKitchen,TurnOnElectricKettle \
--env.max_parallel_tasks=5 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--eval.use_async_envs=false \
@@ -693,6 +695,7 @@ jobs:
--env.task=\"\$ROBOMME_TASKS\" \
--env.dataset_split=test \
--env.task_ids=[0] \
--env.max_parallel_tasks=5 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--eval.use_async_envs=false \
@@ -800,6 +803,7 @@ jobs:
--env.type=libero_plus \
--env.task=\"\$LIBERO_PLUS_SUITE\" \
--env.task_ids=\"\$LIBERO_PLUS_TASK_IDS\" \
--env.max_parallel_tasks=5 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--eval.use_async_envs=false \
@@ -900,6 +904,8 @@ jobs:
--policy.path=lerobot/smolvla_vlabench \
--env.type=vlabench \
--env.task=select_fruit,select_toy,select_book,select_painting,select_drink,select_ingredient,select_billiards,select_poker,add_condiment,insert_flower \
--env.episode_length=50 \
--env.max_parallel_tasks=5 \
--eval.batch_size=1 \
--eval.n_episodes=1 \
--eval.use_async_envs=false \
+2 -1
View File
@@ -152,13 +152,14 @@ jobs:
BASE_VERSION="${VERSION%%-*}"
echo "Installing pre-release version $BASE_VERSION from TestPyPI..."
uv pip install \
--torch-backend cpu \
--index-url https://test.pypi.org/simple/ \
--extra-index-url https://pypi.org/simple \
--index-strategy unsafe-best-match \
"lerobot[all]==$BASE_VERSION"
else
echo "Installing release version $VERSION from PyPI..."
uv pip install "lerobot[all]==$VERSION"
uv pip install --torch-backend cpu "lerobot[all]==$VERSION"
fi
- name: Check lerobot version
run: uv run python -c "import lerobot; print(lerobot.__version__)"
+2 -2
View File
@@ -19,8 +19,8 @@ on:
workflow_dispatch:
# Runs at 02:00
schedule:
- cron: "0 2 * * *"
# schedule:
# - cron: "0 2 * * *"
env:
CLOSE_ISSUE_MESSAGE: >
+2
View File
@@ -232,6 +232,8 @@ Match the policy to the user's **GPU memory** and **time budget**. Numbers below
All policies typically train for **510 epochs** (see §7).
> **Human-facing version:** the [Compute Hardware Guide](./docs/source/hardware_guide.mdx) reuses the table below and adds a cloud-GPU tier guide and a Hugging Face Jobs pointer.
| Policy | Batch | Update (ms) | Peak GPU mem (GB) | Best for |
| ----------- | ----: | ----------: | ----------------: | ------------------------------------------------------------------------------------------------ |
| `act` | 4 | **83.9** | **0.94** | First-time users, laptops, single-task. Fast and reliable. |
+1 -1
View File
@@ -109,7 +109,7 @@ lerobot-train \
Similarly to the hardware, you can easily implement your own policy & leverage LeRobot's data collection, training, and visualization tools, and share your model to the HF Hub
For detailed policy setup guides, see the [Policy Documentation](https://huggingface.co/docs/lerobot/bring_your_own_policies).
For detailed policy setup guides, see the [Policy Documentation](https://huggingface.co/docs/lerobot/bring_your_own_policies). For GPU/RAM requirements and expected training time per policy, see the [Compute Hardware Guide](https://huggingface.co/docs/lerobot/hardware_guide).
## Inference & Evaluation
+1 -1
View File
@@ -35,7 +35,7 @@ USER root
ARG ROBOTWIN_SHA=0aeea2d669c0f8516f4d5785f0aa33ba812c14b4
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
cuda-nvcc-12-4 cuda-cudart-dev-12-4 \
cuda-nvcc-12-8 cuda-cudart-dev-12-8 \
libvulkan1 vulkan-tools \
&& mkdir -p /usr/share/vulkan/icd.d \
&& echo '{"file_format_version":"1.0.0","ICD":{"library_path":"libGLX_nvidia.so.0","api_version":"1.3.0"}}' \
+1 -1
View File
@@ -18,7 +18,7 @@
# docker build -f docker/Dockerfile.internal -t lerobot-internal .
# Configure the base image for CI with GPU access
ARG CUDA_VERSION=12.6.3
ARG CUDA_VERSION=12.8.1
ARG OS_VERSION=24.04
FROM nvidia/cuda:${CUDA_VERSION}-base-ubuntu${OS_VERSION}
+9 -5
View File
@@ -8,7 +8,7 @@
- local: il_robots
title: Imitation Learning for Robots
- local: bring_your_own_policies
title: Bring Your Own Policies
title: Adding a Policy
- local: integrate_hardware
title: Bring Your Own Hardware
- local: hilserl
@@ -24,6 +24,12 @@
- local: rename_map
title: Using Rename Map and Empty Cameras
title: "Tutorials"
- sections:
- local: hardware_guide
title: Compute Hardware Guide
- local: torch_accelerators
title: PyTorch accelerators
title: "Compute & Hardware"
- sections:
- local: lerobot-dataset-v3
title: Using LeRobotDataset
@@ -49,6 +55,8 @@
title: π₀.₅ (Pi05)
- local: eo1
title: EO-1
- local: evo1
title: EVO1
- local: groot
title: NVIDIA GR00T N1.5
- local: xvla
@@ -142,10 +150,6 @@
- local: cameras
title: Cameras
title: "Sensors"
- sections:
- local: torch_accelerators
title: PyTorch accelerators
title: "Supported Hardware"
- sections:
- local: notebooks
title: Notebooks
+220 -81
View File
@@ -1,60 +1,37 @@
# Bring Your Own Policies
# Adding a Policy
This tutorial explains how to integrate your own custom policy implementations into the LeRobot ecosystem, allowing you to leverage all LeRobot tools for training, evaluation, and deployment while using your own algorithms.
This guide walks you through implementing a custom policy and getting it to work with LeRobot's training, evaluation, and deployment tools. There are two paths:
## Step 1: Create a Policy Package
- **Plugin (out-of-tree)** — ship your policy as a standalone `lerobot_policy_*` package. Faster, no PR required, easy to iterate. Right for experimentation, internal use, or when you want to publish independently.
- **In-tree (contributed to LeRobot)** — land your policy directly in `src/lerobot/policies/`. Requires a PR, but makes your policy a first-class citizen of the library.
Your custom policy should be organized as an installable Python package following LeRobot's plugin conventions.
The plugin route is usually the right starting point — promote to in-tree once the policy has stabilized and there's clear value in shipping it with the library.
### Package Structure
Either way, the building blocks are the same: a configuration class, a policy class, and a processor factory. The first half of this guide covers those shared pieces; the second half covers the path-specific scaffolding ([Path A](#path-a-out-of-tree-plugin), [Path B](#path-b-contributing-in-tree)).
Create a package with the prefix `lerobot_policy_` (IMPORTANT!) followed by your policy name:
A note on tone: robot-learning is an actively evolving field, and "what a policy looks like" can shift with each new architecture. The conventions described here exist because they let `lerobot-train` and `lerobot-eval` work uniformly across very different models. When a new policy genuinely doesn't fit them, raise it (in your PR, or an issue) — the conventions are not sacred.
```bash
lerobot_policy_my_custom_policy/
├── pyproject.toml
└── src/
└── lerobot_policy_my_custom_policy/
├── __init__.py
├── configuration_my_custom_policy.py
├── modeling_my_custom_policy.py
└── processor_my_custom_policy.py
```
---
### Package Configuration
## Anatomy of a policy
Set up your `pyproject.toml`:
Three building blocks make up every policy. The names below use `my_policy` as a placeholder — replace with your policy's name. That name is load-bearing: it must match the string you pass to `@PreTrainedConfig.register_subclass`, the `MyPolicy.name` class attribute, and the `make_<name>_pre_post_processors` factory function (more on each below).
```toml
[project]
name = "lerobot_policy_my_custom_policy"
version = "0.1.0"
dependencies = [
# your policy-specific dependencies
]
requires-python = ">= 3.12"
### Configuration class
[build-system]
build-backend = # your-build-backend
requires = # your-build-system
```
## Step 2: Define the Policy Configuration
Create a configuration class that inherits from [`PreTrainedConfig`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/configs/policies.py) and registers your policy type:
Here is a template to get you started, customize the parameters and methods as needed for your policy's architecture and training requirements.
Inherit from [`PreTrainedConfig`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/configs/policies.py) and register your policy type. Here is a template — customize the parameters and methods as needed for your policy's architecture and training requirements.
```python
# configuration_my_custom_policy.py
# configuration_my_policy.py
from dataclasses import dataclass, field
from lerobot.configs import PreTrainedConfig
from lerobot.optim import AdamWConfig
from lerobot.optim import CosineDecayWithWarmupSchedulerConfig
@PreTrainedConfig.register_subclass("my_custom_policy")
@PreTrainedConfig.register_subclass("my_policy")
@dataclass
class MyCustomPolicyConfig(PreTrainedConfig):
"""Configuration class for MyCustomPolicy.
class MyPolicyConfig(PreTrainedConfig):
"""Configuration class for MyPolicy.
Args:
n_obs_steps: Number of observation steps to use as input
@@ -77,16 +54,20 @@ class MyCustomPolicyConfig(PreTrainedConfig):
raise ValueError("n_action_steps cannot exceed horizon")
def validate_features(self) -> None:
"""Validate input/output feature compatibility."""
"""Validate input/output feature compatibility.
Call this explicitly from your policy's __init__ — the base class does not.
"""
if not self.image_features:
raise ValueError("MyCustomPolicy requires at least one image feature.")
raise ValueError("MyPolicy requires at least one image feature.")
if self.action_feature is None:
raise ValueError("MyCustomPolicy requires 'action' in output_features.")
raise ValueError("MyPolicy requires 'action' in output_features.")
def get_optimizer_preset(self) -> AdamWConfig:
return AdamWConfig(lr=self.optimizer_lr, weight_decay=self.optimizer_weight_decay)
def get_scheduler_preset(self):
"""Return a LRSchedulerConfig from lerobot.optim, or None."""
return None
@property
@@ -101,8 +82,7 @@ class MyCustomPolicyConfig(PreTrainedConfig):
@property
def action_delta_indices(self) -> list[int]:
"""Relative timestep offsets for the action chunk the dataset loader returns.
"""
"""Relative timestep offsets for the action chunk the dataset loader returns."""
return list(range(self.horizon))
@property
@@ -110,32 +90,34 @@ class MyCustomPolicyConfig(PreTrainedConfig):
return None
```
## Step 3: Implement the Policy Class
The string you pass to `@register_subclass` must match `MyPolicy.name` (next section) and is what users supply as `--policy.type` on the CLI. Default to `AdamW` from `lerobot.optim` for `get_optimizer_preset` unless you genuinely need otherwise.
Create your policy implementation by inheriting from [`PreTrainedPolicy`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/pretrained.py):
### Policy class
Inherit from [`PreTrainedPolicy`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/pretrained.py) and set two class attributes — both are checked by `__init_subclass__`:
```python
# modeling_my_custom_policy.py
# modeling_my_policy.py
import torch
import torch.nn as nn
from typing import Any
from lerobot.policies import PreTrainedPolicy
from lerobot.utils.constants import ACTION
from .configuration_my_custom_policy import MyCustomPolicyConfig
from .configuration_my_policy import MyPolicyConfig
class MyCustomPolicy(PreTrainedPolicy):
config_class = MyCustomPolicyConfig # must match the string in @register_subclass
name = "my_custom_policy"
class MyPolicy(PreTrainedPolicy):
config_class = MyPolicyConfig # must match the string in @register_subclass
name = "my_policy"
def __init__(self, config: MyCustomPolicyConfig, dataset_stats: dict[str, Any] = None):
def __init__(self, config: MyPolicyConfig, dataset_stats: dict[str, Any] = None):
super().__init__(config, dataset_stats)
config.validate_features() # not called automatically by the base class
self.config = config
self.model = ... # your nn.Module here
def reset(self):
"""Reset episode state."""
"""Reset per-episode state. Called by lerobot-eval at the start of each episode."""
...
def get_optim_params(self) -> dict:
@@ -147,35 +129,51 @@ class MyCustomPolicy(PreTrainedPolicy):
...
def select_action(self, batch: dict[str, torch.Tensor], **kwargs) -> torch.Tensor:
"""Return a single action for the current timestep (called at inference)."""
"""Return a single action for the current timestep (called every step at inference)."""
...
def forward(self, batch: dict[str, torch.Tensor]) -> dict[str, torch.Tensor]:
def forward(self, batch: dict[str, torch.Tensor]) -> tuple[torch.Tensor, dict | None]:
"""Compute the training loss.
Returns `(loss, output_dict)`. `output_dict` may be `None`; everything in it must be
logging-friendly Python natives (no tensors with gradients).
`batch["action_is_pad"]` is a bool mask of shape (B, horizon) that marks
timesteps padded because the episode ended before `horizon` steps, you
timesteps padded because the episode ended before `horizon` steps; you
can exclude those from your loss.
"""
actions = batch[ACTION]
action_is_pad = batch.get("action_is_pad")
...
return {"loss": ...}
return loss, {"some_loss_component": some_loss_component.item()}
```
## Step 4: Add Data Processors
The methods called by the train/eval loops:
Create processor functions. For a concrete reference, see [processor_act.py](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/act/processor_act.py) or [processor_diffusion.py](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/diffusion/processor_diffusion.py).
| Method | Used by | What it does |
| ----------------------------------------------------------------- | ----------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `reset() -> None` | `lerobot-eval` | Clear per-episode state at the start of each episode. |
| `select_action(batch, **kwargs) -> Tensor` | `lerobot-eval` | Return the next action `(B, action_dim)`. Called every step. |
| `predict_action_chunk(batch, **kwargs) -> Tensor` | the policy itself | Return an action chunk `(B, chunk_size, action_dim)`. Currently abstract on the base class — raise `NotImplementedError` if your policy doesn't chunk. |
| `forward(batch, reduction="mean") -> tuple[Tensor, dict \| None]` | `lerobot-train` | Return `(loss, output_dict)`. Accept `reduction="none"` if you want to support per-sample weighting. |
| `get_optim_params() -> dict` | the optimizer | Return `self.parameters()` for simple policies; return a named parameter dict for [multi-optimizer policies](https://github.com/huggingface/lerobot/blob/ecd38c50d7d15b4184cf42649ff1185ee2e11eeb/src/lerobot/policies/sac/modeling_sac.py#L61-L73). |
| `update() -> None` _(optional)_ | `lerobot-train` | Called after each optimizer step _if defined_. Use for EMA, target nets, replay buffers (TDMPC uses this). |
Batches are flat dictionaries keyed by the constants in [`lerobot.utils.constants`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/utils/constants.py): `OBS_STATE` (`observation.state.<motor>`), `OBS_IMAGES` (`observation.images.<camera>`), `OBS_LANGUAGE`, `ACTION`, etc. Reuse the constants — don't invent new prefixes.
### Processor functions
LeRobot uses `PolicyProcessorPipeline`s to normalize inputs and de-normalize outputs around your policy. For a concrete reference, see [`processor_act.py`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/act/processor_act.py) or [`processor_diffusion.py`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/diffusion/processor_diffusion.py).
```python
# processor_my_custom_policy.py
# processor_my_policy.py
from typing import Any
import torch
from lerobot.processor import PolicyAction, PolicyProcessorPipeline
def make_my_custom_policy_pre_post_processors(
def make_my_policy_pre_post_processors(
config,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
@@ -187,11 +185,48 @@ def make_my_custom_policy_pre_post_processors(
return preprocessor, postprocessor
```
**Important - function naming:** LeRobot discovers your processor by name. The function **must** be called `make_{policy_name}_pre_post_processors` (matching the string you passed to `@PreTrainedConfig.register_subclass`).
**Important function naming:** LeRobot discovers your processor by name. The function **must** be called `make_{policy_name}_pre_post_processors` (matching the string you passed to `@PreTrainedConfig.register_subclass`).
## Step 5: Package Initialization
---
Expose your classes in the package's `__init__.py`:
## Path A: Out-of-tree plugin
The fastest way to ship a policy: package it as a standalone Python distribution and install it alongside LeRobot. No PR required, you own the release cycle, and you can publish to PyPI under your own namespace.
### Package structure
Create a package with the prefix `lerobot_policy_` (IMPORTANT!) followed by your policy name:
```bash
lerobot_policy_my_policy/
├── pyproject.toml
└── src/
└── lerobot_policy_my_policy/
├── __init__.py
├── configuration_my_policy.py
├── modeling_my_policy.py
└── processor_my_policy.py
```
### `pyproject.toml`
```toml
[project]
name = "lerobot_policy_my_policy"
version = "0.1.0"
dependencies = [
# your policy-specific dependencies
]
requires-python = ">= 3.12"
[build-system]
build-backend = # your-build-backend
requires = # your-build-system
```
### Package `__init__.py`
Expose your classes in the package's `__init__.py` and guard against missing `lerobot`:
```python
# __init__.py
@@ -204,44 +239,148 @@ except ImportError:
"lerobot is not installed. Please install lerobot to use this policy package."
)
from .configuration_my_custom_policy import MyCustomPolicyConfig
from .modeling_my_custom_policy import MyCustomPolicy
from .processor_my_custom_policy import make_my_custom_policy_pre_post_processors
from .configuration_my_policy import MyPolicyConfig
from .modeling_my_policy import MyPolicy
from .processor_my_policy import make_my_policy_pre_post_processors
__all__ = [
"MyCustomPolicyConfig",
"MyCustomPolicy",
"make_my_custom_policy_pre_post_processors",
"MyPolicyConfig",
"MyPolicy",
"make_my_policy_pre_post_processors",
]
```
## Step 6: Installation and Usage
### Install Your Policy Package
### Install and use
```bash
cd lerobot_policy_my_custom_policy
cd lerobot_policy_my_policy
pip install -e .
# Or install from PyPI if published
pip install lerobot_policy_my_custom_policy
pip install lerobot_policy_my_policy
```
### Use Your Policy
Once installed, your policy automatically integrates with LeRobot's training and evaluation tools:
```bash
lerobot-train \
--policy.type my_custom_policy \
--policy.type my_policy \
--env.type pusht \
--steps 200000
```
## Examples and Community Contributions
---
## Path B: Contributing in-tree
When your policy has stabilized and there's clear value in shipping it with the library, you can land it directly in LeRobot. Read the general [contribution guide](./contributing) and the [PR template](https://github.com/huggingface/lerobot/blob/main/.github/PULL_REQUEST_TEMPLATE.md) first — that's where you'll find the testing/quality expectations every PR has to meet (`pre-commit run -a`, `pytest`, the community-review rule, etc.). What's below is the policy-specific layer on top of that.
### In-tree layout
```
src/lerobot/policies/my_policy/
├── __init__.py # re-exports config + modeling + processor factory
├── configuration_my_policy.py # MyPolicyConfig + @register_subclass
├── modeling_my_policy.py # MyPolicy(PreTrainedPolicy)
├── processor_my_policy.py # make_my_policy_pre_post_processors
└── README.md # symlink → ../../../../docs/source/policy_my_policy_README.md
```
Two notes:
- The `README.md` next to the source is a **symlink** into `docs/source/policy_<name>_README.md` — the actual file lives under `docs/`. Existing policies (act, smolvla, diffusion, …) all do this; copy one of those symlinks. The policy README is conventionally minimal: paper link + BibTeX citation.
- The user-facing tutorial — what to install, how to train, hyperparameters, benchmark numbers — lives separately at `docs/source/<my_policy>.mdx` and is registered in `_toctree.yml` under "Policies".
The file names are load-bearing: the factory does lazy imports by name, and the processor is discovered by the `make_<policy_name>_pre_post_processors` convention.
### Wiring
Three places need to know about your policy. All by name.
1. **`policies/__init__.py`** — re-export `MyPolicyConfig` and add it to `__all__`. **Don't** re-export the modeling class; it loads lazily through the factory (so `import lerobot` stays fast).
2. **`factory.py:get_policy_class`** — add a branch returning `MyPolicy` from a lazy import.
3. **`factory.py:make_policy_config`** and **`factory.py:make_pre_post_processors`** — same idea, two more branches.
Mirror an existing policy that's structurally similar to yours; the diff is small.
### Heavy / optional dependencies
Most policies need a heavy backbone (transformers, diffusers, a specific VLM SDK). The convention is **two-step gating**: a `TYPE_CHECKING`-guarded import at module top, and a `require_package` runtime check in the constructor. [`modeling_diffusion.py`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/diffusion/modeling_diffusion.py) is the canonical reference:
```python
from typing import TYPE_CHECKING
from lerobot.utils.import_utils import _diffusers_available, require_package
if TYPE_CHECKING or _diffusers_available:
from diffusers.schedulers.scheduling_ddim import DDIMScheduler
else:
DDIMScheduler = None # keeps the symbol bindable at import time
class DiffusionPolicy(PreTrainedPolicy):
def __init__(self, config):
require_package("diffusers", extra="diffusion")
super().__init__(config)
...
```
This way:
- `import lerobot.policies` keeps working without the extra installed (the symbol is just bound to `None`).
- Type checkers see the real symbol.
- Instantiating the policy without the extra raises a clear `ImportError` pointing at `pip install 'lerobot[diffusion]'`.
Add a matching extra to [`pyproject.toml`](https://github.com/huggingface/lerobot/blob/main/pyproject.toml) `[project.optional-dependencies]` and include it in the `all` extra so `pip install 'lerobot[all]'` keeps installing everything.
### Benchmarks and a published checkpoint
A new policy is much easier to review — and far more useful — when it ships with a working checkpoint and at least one number you can reproduce.
**Pick at least one in-tree benchmark.** LeRobot ships sim benchmarks with per-benchmark Docker images (LIBERO, LIBERO-plus, Meta-World, RoboTwin 2.0, RoboCasa365, RoboCerebra, RoboMME, VLABench and more). Pick the one that matches your policy's modality — VLAs usually go to LIBERO or VLABench; image-only BC to LIBERO or Meta-World. The full list lives under [Benchmarks](./libero) in the docs sidebar.
**Push the checkpoint & processors** to the Hub under `lerobot/<policy>_<benchmark>` (or your namespace if you don't have write access; a maintainer can mirror it). Use `PreTrainedPolicy.push_model_to_hub` so the repo gets `config.json`, `model.safetensors`, and a model card.
**Report results in your policy's MDX**, with the exact `lerobot-eval` command and hardware so anyone can re-run:
```markdown
## Results
Evaluated on LIBERO with `lerobot/<policy>_libero`:
| Suite | Success rate | n_episodes |
| -------------- | -----------: | ---------: |
| libero_spatial | 87.5% | 50 |
| libero_object | 93.0% | 50 |
| libero_goal | 81.5% | 50 |
| libero_10 | 62.0% | 50 |
| **average** | **81.0%** | 200 |
Reproduce: `lerobot-eval --policy.path=lerobot/<policy>_libero --env.type=libero --env.task=libero_spatial --eval.n_episodes=50` (1× A100 40 GB).
```
Use `n_episodes ≥ 50` per suite for stable success-rate estimates.
If your policy is real-robot-only and no sim benchmark applies, swap the sim eval for: a public training dataset on the Hub, the `lerobot-train` command, the checkpoint, and a real-robot success rate over ≥10 episodes via `lerobot-rollout --policy.path=...`.
### PR checklist
The general expectations are in [`CONTRIBUTING.md`](https://github.com/huggingface/lerobot/blob/main/CONTRIBUTING.md) and the [PR template](https://github.com/huggingface/lerobot/blob/main/.github/PULL_REQUEST_TEMPLATE.md). On top of those, reviewers will look for:
- [ ] `MyPolicy` and `MyPolicyConfig` cover the surface above; `__init_subclass__` accepts the class.
- [ ] `factory.py` and `policies/__init__.py` are wired (lazy imports for modeling).
- [ ] `make_my_policy_pre_post_processors` follows the naming convention.
- [ ] Optional deps live behind a `[project.optional-dependencies]` extra and the `TYPE_CHECKING + require_package` guard.
- [ ] `tests/policies/` updated; backward-compat artifact committed & policy-specific tests.
- [ ] `src/lerobot/policies/<name>/README.md` symlinked into `docs/source/policy_<name>_README.md`; user-facing `docs/source/<name>.mdx` written and added to `_toctree.yml`.
- [ ] At least one reproducible benchmark eval in the policy MDX with a published checkpoint (sim benchmark, or real-robot dataset + checkpoint).
The fastest way to get a clean PR is to copy the directory of the existing policy closest to yours, rename, and replace contents method by method. Don't wait until everything is polished — open a draft PR early and iterate with us; reviewers would much rather give feedback on a half-finished branch than a fully-merged one.
---
## Examples and community contributions
Check out these example policy implementations:
- [DiTFlow Policy](https://github.com/danielsanjosepro/lerobot_policy_ditflow) - Diffusion Transformer policy with flow-matching objective. Try it out in this example: [DiTFlow Example](https://github.com/danielsanjosepro/test_lerobot_policy_ditflow)
- [DiTFlow Policy](https://github.com/danielsanjosepro/lerobot_policy_ditflow) Diffusion Transformer policy with flow-matching objective. Try it out in this example: [DiTFlow Example](https://github.com/danielsanjosepro/test_lerobot_policy_ditflow)
Share your policy implementations with the community! 🤗
Thanks for taking the time to bring a new policy into LeRobot. Every architecture that lands in `main` — and every plugin published by the community — makes the library a little more useful for the next person, and a little more representative of where robot learning is going. We're looking forward to seeing what you ship. 🤗
+186
View File
@@ -0,0 +1,186 @@
# EVO1
EVO1 is a Vision-Language-Action policy for robot control built around an InternVL3 backbone and a continuous flow-matching action head. This LeRobot integration exposes EVO1 as a standard policy type so it can be trained and evaluated with the usual LeRobot dataset, checkpoint, and processor APIs.
## Model Overview
The policy embeds one or more camera images and the language task prompt with InternVL3, pads robot state/action vectors to fixed maximum dimensions, and predicts future action chunks with a flow-matching action head. During inference, the policy samples an action chunk and returns `n_action_steps` actions from that chunk before sampling again.
### What the LeRobot Integration Covers
- Standard `policy.type=evo1` configuration through LeRobot
- InternVL3 image/text embedding with optional FlashAttention fallback
- Stage-based finetuning controls for action-head-only and VLM finetuning runs
- Continuous flow-matching action prediction
- Checkpoint save/load through LeRobot policy APIs
- Training with `lerobot-train` and evaluation with standard policy inference APIs
The broader EVO1 project may include additional training scripts and dataset tooling. This page focuses on the LeRobot robot-control policy path.
## Installation Requirements
1. Install LeRobot by following the [Installation Guide](./installation).
2. Install EVO1 dependencies:
```bash
pip install -e ".[evo1]"
```
For LIBERO evaluation, install the LIBERO extra as well:
```bash
pip install -e ".[evo1,libero]"
```
3. Install a `flash-attn` wheel only if it is compatible with your Python, PyTorch, CUDA, and GPU stack. EVO1 falls back to standard attention when `flash_attn` is not available, but reproducing the official LIBERO checkpoint conversion result below requires the same FlashAttention path used by the original EVO1 checkpoint.
EVO1 uses InternVL3 through the Hugging Face `transformers` remote-code path, so the first run may download the configured VLM checkpoint unless `policy.vlm_model_name` points to a local model directory.
## Data Requirements
EVO1 expects a LeRobot dataset with:
- One to `policy.max_views` visual observations, for example `observation.images.image`
- `observation.state`
- `action`
- A language task instruction in the dataset `task` field, or another field configured with `policy.task_field`
State and action vectors are padded to `policy.max_state_dim` and `policy.max_action_dim`. Predictions are cropped back to the dataset action dimension before being returned.
## Usage
To use EVO1 in a LeRobot configuration, specify:
```python
policy.type=evo1
```
By default, a new EVO1 policy initializes its VLM from:
```python
policy.vlm_model_name=OpenGVLab/InternVL3-1B
```
Once a LeRobot-format EVO1 checkpoint is available, load it with:
```python
policy.path=your-org/your-evo1-checkpoint
```
The converted LIBERO checkpoint used for this PR is available at:
```python
policy.path=javadcc/evo1-libero-lerobot
```
## Training
### Stage 1
Stage 1 freezes the VLM and trains the action head:
```bash
lerobot-train \
--dataset.repo_id=your_org/your_dataset \
--policy.type=evo1 \
--policy.training_stage=stage1 \
--policy.vlm_model_name=OpenGVLab/InternVL3-1B \
--policy.device=cuda \
--policy.chunk_size=50 \
--policy.n_action_steps=50 \
--policy.max_state_dim=24 \
--policy.max_action_dim=24 \
--policy.optimizer_lr=1e-5 \
--batch_size=4 \
--steps=5000 \
--output_dir=./outputs/evo1_stage1
```
### Stage 2
Stage 2 finetunes the VLM branches and action head. A common workflow starts from a Stage 1 checkpoint:
```bash
lerobot-train \
--dataset.repo_id=your_org/your_dataset \
--policy.path=./outputs/evo1_stage1/checkpoints/005000/pretrained_model \
--policy.training_stage=stage2 \
--policy.vlm_model_name=OpenGVLab/InternVL3-1B \
--policy.device=cuda \
--policy.chunk_size=50 \
--policy.n_action_steps=50 \
--policy.max_state_dim=24 \
--policy.max_action_dim=24 \
--policy.optimizer_lr=1e-5 \
--batch_size=4 \
--steps=80000 \
--output_dir=./outputs/evo1_stage2
```
By default, `policy.training_stage` reapplies the finetuning defaults for that stage. This is important when
starting Stage 2 from a Stage 1 checkpoint, because the Stage 1 checkpoint config stores the VLM finetuning
flags as disabled. These stage defaults take precedence over saved or manually supplied `policy.finetune_*`
flags unless `policy.apply_training_stage_defaults=false`, so set that flag only when manually controlling
every finetuning flag.
### Key Training Parameters
| Parameter | Default | Description |
| --------------------------------------------- | ------------------------ | ----------------------------------------------------------------- |
| `policy.vlm_model_name` | `OpenGVLab/InternVL3-1B` | InternVL3 checkpoint or local model directory |
| `policy.training_stage` | `stage1` | `stage1` trains the action head; `stage2` finetunes VLM branches |
| `policy.apply_training_stage_defaults` | `true` | Reapplies stage finetuning defaults after loading a checkpoint |
| `policy.vlm_num_layers` | `14` | Number of InternVL3 language layers kept for the policy |
| `policy.vlm_dtype` | `bfloat16` | Requested VLM dtype |
| `policy.use_flash_attn` | `true` | Requests FlashAttention when installed; otherwise falls back |
| `policy.enable_gradient_checkpointing` | `true` | Enables checkpointing on supported InternVL3 modules |
| `policy.gradient_checkpointing_use_reentrant` | `false` | Reentrant setting passed to gradient checkpointing when supported |
| `policy.chunk_size` | `50` | Number of future actions predicted per chunk |
| `policy.n_action_steps` | `50` | Number of actions consumed from a sampled chunk |
| `policy.max_state_dim` | `24` | State padding dimension |
| `policy.max_action_dim` | `24` | Action padding dimension |
| `policy.task_field` | `task` | Batch field used as the language prompt |
## Results
### LIBERO Object Checkpoint Conversion
The checkpoint [javadcc/evo1-libero-lerobot](https://huggingface.co/javadcc/evo1-libero-lerobot)
is the LeRobot-format conversion of the official EVO1 LIBERO checkpoint. The conversion was checked against
the official EVO1 checkpoint with the same LIBERO Object initial states and action postprocessing.
| Checkpoint | Suite | Episodes | Success Rate |
| ---------------------------- | --------------- | ---------------- | ------------ |
| Official EVO1 checkpoint | `libero_object` | 10, one per task | 100% |
| LeRobot converted checkpoint | `libero_object` | 10, one per task | 100% |
For a fixed `libero_object` rollout, the official checkpoint and LeRobot checkpoint produced identical
pixel embeddings, VLM fused tokens, normalized actions, and denormalized actions for the checked action step
(`max_abs_diff=0.0`).
The published checkpoint expects the raw LIBERO camera feature names
`observation.images.agentview_image` and `observation.images.robot0_eye_in_hand_image`. To run the converted
checkpoint with LeRobot LIBERO evaluation for the same one-episode-per-task setting, keep those camera names
instead of the default `image`/`image2` mapping:
```bash
lerobot-eval \
--policy.path=javadcc/evo1-libero-lerobot \
--policy.device=cuda \
--env.type=libero \
--env.task=libero_object \
--env.camera_name_mapping="{agentview_image: agentview_image, robot0_eye_in_hand_image: robot0_eye_in_hand_image}" \
--env.observation_height=448 \
--env.observation_width=448 \
--eval.batch_size=1 \
--eval.n_episodes=1
```
## References
- [EVO1 repository](https://github.com/MINT-SJTU/Evo-1)
- [InternVL3-1B](https://huggingface.co/OpenGVLab/InternVL3-1B)
## License
This LeRobot integration follows the Apache 2.0 License used by LeRobot. Check the upstream EVO1 and InternVL3 model pages for the licenses of released checkpoints and data.
+98
View File
@@ -0,0 +1,98 @@
# Compute HW Guide for LeRobot Training
Rough sizing for training a LeRobot policy: how much VRAM each policy needs, what training time looks like, and where to run when local hardware isn't enough.
The numbers below are **indicative** — order-of-magnitude figures for picking hardware, not exact predictions. Throughput depends heavily on dataset I/O, image resolution, batch size, and number of GPUs.
## Memory by policy group
Policies cluster by backbone size; the groupings below give a single VRAM envelope per group instead of repeating numbers per policy. Memory scales roughly linearly with batch size; AdamW (the LeRobot default) carries optimizer state that adds ~30100% over a forward+backward pass alone.
| Group | Policies | Peak VRAM (BS 8, AdamW) | Suitable starter GPUs |
| ---------- | ------------------------------------------- | ----------------------: | --------------------------------- |
| Light BC | `act`, `vqbet`, `tdmpc` | ~26GB | Laptop GPU (RTX 3060), L4, A10G |
| Diffusion | `diffusion`, `multi_task_dit` | ~814GB | RTX 4070+ / L4 / A10G |
| Small VLA | `smolvla` | ~1016GB | RTX 4080+ / L4 / A10G |
| Large VLA | `pi0`, `pi0_fast`, `pi05`, `xvla`, `wall_x` | ~2440GB | A100 40 GB+ (24 GB tight at BS 1) |
| Multimodal | `groot`, `eo1` | ~2440GB | A100 40 GB+ |
| RL | `sac` | config-dep. | See [HIL-SERL guide](./hilserl) |
Memory-bound? Drop the batch size (~linear), use gradient accumulation to recover effective batch, or for SmolVLA leave `freeze_vision_encoder=True`.
## Training time
Robotics imitation learning typically converges in **510 epochs over the dataset**, not hundreds of thousands of raw steps. Once you know your epoch count, wall-clock is essentially:
```text
total_frames = sum of frames over all episodes # 50 ep × 30 fps × 30 s ≈ 45,000
steps_per_epoch = ceil(total_frames / (num_gpus × batch_size))
total_steps = epochs × steps_per_epoch
wall_clock ≈ total_steps × per_step_time
```
Per-step time depends on the policy and the GPU. The numbers in the table below are anchors — pick the row closest to your setup and scale linearly with `total_steps` if you train longer or shorter.
### Common scenarios
Indicative wall-clock for **5 epochs on a ~50-episode dataset (~45k frames at 30 fps × 30 s)**, default optimizer (AdamW), 640×480 images:
| Setup | Policy | Batch | Wall-clock |
| ------------------------------------ | -------------- | ----- | ---------: |
| Single RTX 4090 / RTX 3090 (24 GB) | `act` | 8 | ~3060min |
| Single RTX 4090 / RTX 3090 (24 GB) | `diffusion` | 8 | ~24h |
| Single L4 / A10G (24 GB) | `act` | 8 | ~12h |
| Single L4 / A10G (24 GB) | `smolvla` | 4 | ~36h |
| Single A100 40 GB | `smolvla` | 16 | ~12h |
| Single A100 40 GB | `pi0` / `pi05` | 4 | ~48h |
| 4× H100 80 GB cluster (`accelerate`) | `diffusion` | 32 | ~3060min |
| 4× H100 80 GB cluster (`accelerate`) | `smolvla` | 32 | ~12h |
| Apple Silicon M1/M2/M3 Max (MPS) | `act` | 4 | ~614h |
These are order-of-magnitude figures. Real runs deviate by ±50% depending on image resolution, dataset I/O, dataloader threading, and exact GPU SKU. They are useful as "is this run going to take an hour or a day?" intuition, not as SLAs.
### Multi-GPU matters a lot
`accelerate launch --num_processes=N` is the easiest way to cut training time. Each optimizer step processes `N × batch_size` samples in roughly the same wall-clock as a single-GPU step, so 4 GPUs ≈ 4× speedup for compute-bound runs. See the [Multi GPU training](./multi_gpu_training) guide for the full setup.
Reference data points on a 4×H100 80 GB cluster (`accelerate launch --num_processes=4`), 5000 steps, batch 32, AdamW, dataset [`imstevenpmwork/super_poulain_draft`](https://huggingface.co/datasets/imstevenpmwork/super_poulain_draft) (~50 episodes, ~640×480 images):
| Policy | Wall-clock | `update_s` | `dataloading_s` | GPU util | Notable flags |
| ----------- | ---------- | ---------: | --------------: | -------- | ------------------------------------------------------------------------------------------------------------------------------ |
| `diffusion` | 16m 17s | 0.167 | 0.015 | ~90% | defaults (training from scratch) |
| `smolvla` | 27m 49s | 0.312 | 0.011 | ~80% | `--policy.path=lerobot/smolvla_base`, `freeze_vision_encoder=false`, `train_expert_only=false` |
| `pi05` | 3h 41m | 2.548 | 0.014 | ~95% | `--policy.pretrained_path=lerobot/pi05_base`, `gradient_checkpointing=true`, `dtype=bfloat16`, vision encoder + expert trained |
The `dataloading_s` vs. `update_s` ratio is the diagnostic that matters: when `dataloading_s` approaches `update_s`, more GPUs stop helping — your dataloader is the bottleneck and you should look at `--num_workers`, image resolution, and disk speed before adding compute.
### Schedule and checkpoints
If you shorten training (e.g. 5k10k steps on a small dataset), also shorten the LR schedule with `--policy.scheduler_decay_steps≈--steps`. Otherwise the LR stays near its peak and never decays. Same for `--save_freq`.
## Where to run
VRAM is the first filter. Within a tier, pick by budget and availability — the `$``$$$$` columns are relative; check current pricing on the provider you actually use.
| Class | VRAM | Tier | Comfortable for |
| -------------------------- | ----- | ------ | ----------------------------------------------------------- |
| RTX 3090 / 4090 (consumer) | 24 GB | `$` | Light BC, Diffusion, SmolVLA. Tight for VLAs at batch 1. |
| L4 / A10G (cloud) | 24 GB | `$$$` | Same envelope; common on Google Cloud, RunPod, AWS `g5/g6`. |
| A100 40 GB | 40 GB | `$$$` | Any policy at reasonable batch sizes. |
| A100 80 GB / H100 80 GB | 80 GB | `$$$$` | Multi-GPU clusters; large batches for VLAs. |
| **CPU only** | — | — | Don't train. Use Colab or rent a GPU. |
### Hugging Face Jobs
[Hugging Face Jobs](https://huggingface.co/docs/hub/jobs) lets you run training on managed HF infrastructure, billed by the second. The repo publishes a ready-to-use image: **`huggingface/lerobot-gpu:latest`**, rebuilt **every night at 02:00 UTC from `main`** ([`docker_publish.yml`](https://github.com/huggingface/lerobot/blob/main/.github/workflows/docker_publish.yml)) — so it tracks the current state of the repo, not a tagged release.
```bash
hf jobs run --flavor a10g-large huggingface/lerobot-gpu:latest \
bash -c "nvidia-smi && lerobot-train \
--policy.type=act --dataset.repo_id=<USER>/<DATASET> \
--policy.repo_id=<USER>/act_<task> --batch_size=8 --steps=50000"
```
Notes:
- The leading `nvidia-smi` is a quick sanity check that CUDA is visible inside the container — useful to fail fast if the flavor or driver mismatched.
- The default Job timeout is 30 minutes; pass `--timeout 4h` (or longer) for real training.
- `--flavor` maps onto the table above: `t4-small`/`t4-medium` (T4, ACT only), `l4x1`/`l4x4` (L4 24 GB), `a10g-small/large/largex2/largex4` (A10G 24 GB scaled out), `a100-large` (A100). For the current full catalogue + pricing see [https://huggingface.co/docs/hub/jobs](https://huggingface.co/docs/hub/jobs).
+50
View File
@@ -207,6 +207,56 @@ pip install 'lerobot[feetech]' # Feetech motor support
_Multiple extras can be combined (e.g., `.[core_scripts,pi,pusht]`). For a full list of available extras, refer to `pyproject.toml`._
### PyTorch CUDA variant (Linux only)
On Linux, the install path determines which CUDA wheel you get. macOS and Windows installs use the PyPI default (MPS / CPU / CUDA-Windows wheel respectively) and can skip this section.
<!-- prettier-ignore-start -->
<hfoptions id="cuda_variant">
<hfoption id="uv-source">
**Source install via `uv` (`uv sync` or `uv pip install -e .`)**
`torch` and `torchvision` are pinned by the project to the **CUDA 12.8** PyTorch index (`https://download.pytorch.org/whl/cu128`, driver floor **570.86**) — covers Ampere/Ada/Hopper/Blackwell GPUs. No action needed for typical NVIDIA setups.
To override for a different CUDA variant:
```bash
uv pip install --force-reinstall torch torchvision \
--index-url https://download.pytorch.org/whl/cu126 # older drivers; or cu130 for Blackwell on driver ≥ 580
```
</hfoption>
<hfoption id="pip-conda">
**Source install via `pip`/`conda`, or `pip install lerobot` from PyPI**
PyPI default torch wheel is currently a cu130-bundled Linux wheel, driver floor **580.65**.
To pick a specific CUDA variant:
**Using `pip` or `conda`** — install torch first with an explicit index, then lerobot:
```bash
pip install --index-url https://download.pytorch.org/whl/cu128 torch torchvision
pip install -e ".[all]" # source
# — or —
pip install lerobot # from PyPI
```
**Using `uv` to install from PyPI** — one-liner via `--torch-backend` (uv ≥ 0.6):
```bash
uv pip install --torch-backend cu128 lerobot
```
Supported values include `auto`, `cpu`, `cu126`, `cu128`, `cu129`, `cu130`, plus various `rocm*` and `xpu`. Swap as needed for your driver.
</hfoption>
</hfoptions>
<!-- prettier-ignore-end -->
### Troubleshooting
If you encounter build errors, you may need to install additional system dependencies: `cmake`, `build-essential`, and `ffmpeg libs`.
+18
View File
@@ -0,0 +1,18 @@
# EVO1
EVO1 is a Vision-Language-Action policy for robot control. The LeRobot
integration uses an InternVL3 vision-language backbone with a flow-matching
action head, and supports staged training through the standard LeRobot policy
APIs.
The upstream EVO1 project is available at
[MINT-SJTU/Evo-1](https://github.com/MINT-SJTU/Evo-1).
```bibtex
@misc{evo1,
title = {EVO1},
author = {{MINT-SJTU}},
year = {2026},
howpublished = {\url{https://github.com/MINT-SJTU/Evo-1}},
}
```
+136
View File
@@ -0,0 +1,136 @@
# OMX Follower — Cube Pick And Place Example
This is an example of what is possible to do with LeRobot on a physical setup.
It is a WIP and being used internally at LeRobot and specific to our setup, but we hope it can be a useful reference for how to use LeRobot APIs and CLIs.
It includes an end-to-end example for the **OMX Follower** robot arm: pick and place a cube dataset, train a policy, and deploy it autonomously.
## Hardware
| Component | Value |
| --------- | ------------------------------------ |
| Robot | OMX Follower |
| Cameras | 2× OpenCV cameras (wrist + top-down) |
## Scripts
| Script | Purpose |
| ---------------------- | --------------------------------------------------------------- |
| `reset_environment.py` | Standalone utility: sweep workspace, grab cube, place cube |
| `record_grab.py` | Automated data collection: reset → place → record grab episodes |
## Setup
Make sure you have LeRobot installed in your env. (See [the installation guide](https://huggingface.co/docs/lerobot/installation))
Next, we will declare some environment variables for convenience. Adjust the camera indices and robot port to match your system configuration.
```bash
export ROBOT_PORT=/dev/ttyACM0
export TELEOP_PORT=/dev/ttyACM1
export HF_USERNAME=<your_hf_username>
export ROBOT_CAMERAS="{ wrist: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30, fourcc: MJPG}, top: {type: opencv, index_or_path: 2, width: 640, height: 480, fps: 30, fourcc: MJPG} }"
```
## Step 1 — Collect Data
```bash
lerobot-record \
--robot.type=omx_follower \
--robot.port=$ROBOT_PORT \
--robot.id=omx_follower \
--robot.cameras="$ROBOT_CAMERAS" \
--teleop.type=omx_leader \
--teleop.port=$TELEOP_PORT \
--teleop.id=omx_leader \
--dataset.repo_id=$HF_USERNAME/omx_pickandplace \
--dataset.root=data/omx_pickandplace \
--dataset.num_episodes=50 \
--dataset.single_task="Pick the cube and place it in the blue square" \
--dataset.streaming_encoding=true \
--dataset.push_to_hub=true
```
### Bonus Auto-Collect script
/!\ This is specific to our setup and the task of picking and placing a cube. It is not a general-purpose data collection script. As you may notice, it doesn't require a teleop.
```bash
python -m examples.omx.record_grab \
--robot.type=omx_follower \
--robot.port=$ROBOT_PORT \
--robot.id=omx_follower \
--robot.cameras="$ROBOT_CAMERAS" \
--dataset.repo_id=$HF_USERNAME/omx_pickandplace \
--dataset.root=data/omx_pickandplace \
--dataset.num_episodes=50 \
--dataset.single_task="Pick the cube and place it in the blue square" \
--dataset.streaming_encoding=true \
--dataset.push_to_hub=true
```
Each episode:
1. The arm grabs the cube from the center of the workspace and places it at a random position.
2. The arm returns to HOME.
3. A targeted grab is recorded: HOME → approach raised → lower onto cube → grasp → lift → carry → drop → HOME.
A dataset is already available here [`maximellerbach/omx_pickandplace`](https://huggingface.co/datasets/maximellerbach/omx_pickandplace), so you can skip directly to training if you want.
## Step 2 — Train
To train a simple `ACT` policy on the collected dataset, you can use the `lerobot-train` CLI:
```bash
lerobot-train \
--dataset.repo_id=$HF_USERNAME/omx_pickandplace \
--policy.type=act \
--output_dir=outputs/train/omx_pickandplace_act \
--policy.device=cuda \
--policy.repo_id=$HF_USERNAME/omx_pickandplace_act \
--steps=20000 \
--wandb.enable=true
```
A pretrained `ACT` policy is already available here [`maximellerbach/omx_pickandplace_act`](https://huggingface.co/maximellerbach/omx_pickandplace_act).
## Step 3 — Rollout
Use the `lerobot-rollout` CLI with base strategy:
```bash
lerobot-rollout \
--strategy.type=base \
--robot.type=omx_follower \
--robot.port=$ROBOT_PORT \
--robot.id=omx_follower \
--robot.cameras="$ROBOT_CAMERAS" \
--policy.path=$HF_USERNAME/omx_pickandplace_act \
```
For continuous recording with automatic upload (sentry mode):
```bash
lerobot-rollout \
--strategy.type=sentry \
--strategy.upload_every_n_episodes=10 \
--robot.type=omx_follower \
--robot.port=$ROBOT_PORT \
--robot.id=omx_follower \
--robot.cameras="$ROBOT_CAMERAS" \
--policy.path=$HF_USERNAME/omx_pickandplace_act \
--dataset.repo_id=$HF_USERNAME/rollout_omx_pickandplace_act \
```
## Environment Reset Utility
Those are specific to this particular physical setup. Those are scripts that execute hardcoded sequences of actions on the robot to reset the environment, which is useful for data collection and evaluation. They are not general-purpose scripts.
`reset_environment.py` can be run standalone to prepare the workspace:
```bash
# Grab cube + place it at a random position on the left side
python -m examples.omx.reset_environment --port $ROBOT_PORT --mode grab_and_place
```
It also exposes `grab_cube(robot)` and `place_cube(robot)` for use in custom scripts.
+422
View File
@@ -0,0 +1,422 @@
#!/usr/bin/env python3
"""
Auto-record grab episodes for the OMX robot arm.
Each episode cycle:
1. grab_and_place — grab cube from workspace center and place at a random (pan, reach) position
2. HOME — return arm to home with gripper open
3. record_grab — execute a targeted grab to the stored position while recording
observations + actions to a LeRobotDataset
Usage (run from repo root):
python -m examples.omx.record_grab \\
--robot.type=omx_follower \\
--robot.port=/dev/ttyACM0 \\
--robot.id=omx_follower \\
--robot.cameras="{ wrist: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30, fourcc: MJPG}, top: {type: opencv, index_or_path: 4, width: 640, height: 480, fps: 30, fourcc: MJPG} }" \\
--dataset.repo_id=<hf_username>/<dataset_name> \\
--dataset.root=data/omx_grab \\
--dataset.num_episodes=50 \\
--dataset.single_task="Grab the cube" \\
--dataset.streaming_encoding=true
"""
import logging
from dataclasses import dataclass
from pprint import pformat
import numpy as np
from lerobot.cameras import CameraConfig # noqa: F401
from lerobot.cameras.opencv import OpenCVCameraConfig # noqa: F401
from lerobot.configs import parser
from lerobot.configs.dataset import DatasetRecordConfig
from lerobot.datasets import (
LeRobotDataset,
VideoEncodingManager,
aggregate_pipeline_dataset_features,
create_initial_features,
)
from lerobot.processor import make_default_processors
from lerobot.robots import RobotConfig, make_robot_from_config
from lerobot.robots.omx_follower import OmxFollower
from lerobot.utils.constants import ACTION, OBS_STR
from lerobot.utils.feature_utils import build_dataset_frame, combine_feature_dicts
from lerobot.utils.robot_utils import precise_sleep
from .reset_environment import (
APPROACH_SPEED,
GRIPPER_CLOSE_POS,
HOME_POSE,
PUSH_END_ELBOW_FLEX,
PUSH_END_SHOULDER_LIFT,
PUSH_START_ELBOW_FLEX,
PUSH_START_SHOULDER_LIFT,
array_to_pose,
grab_cube,
horizontal_wrist_flex,
move_to_pose,
place_cube,
pose_to_array,
)
# ── Grab-episode motion parameters ────────────────────────────────────────────
# Shoulder-lift offset for the raised approach phase (subtracted from the target sl, arm is higher).
GRAB_RAISE_SL_OFFSET = 20.0
GRAB_LOWER_SPEED = 20.0
RECORD_SPEED = 30.0
# Pose the arm travels to after closing the gripper (cube held).
GRAB_CARRY_POSE = {
"shoulder_pan.pos": -23.0,
"shoulder_lift.pos": 5.0,
"elbow_flex.pos": 18.0,
"wrist_flex.pos": -14.0,
"wrist_roll.pos": 0.0,
"gripper.pos": GRIPPER_CLOSE_POS,
}
# Per-joint jitter limits (degrees) applied to transit waypoints for human-like variation.
# Cube-approach and carry poses are never jittered to preserve precision.
_JITTER_LIMITS: dict[str, float] = {
"shoulder_pan.pos": 5.0,
"shoulder_lift.pos": 4.0,
"elbow_flex.pos": 4.0,
"wrist_flex.pos": 3.0,
"wrist_roll.pos": 2.0,
"gripper.pos": 0.0,
}
def _jitter_pose(pose: dict, rng: np.random.Generator) -> dict:
"""Return a copy of pose with independent per-joint random perturbations."""
return {
k: v + rng.uniform(-_JITTER_LIMITS.get(k, 0.0), _JITTER_LIMITS.get(k, 0.0)) for k, v in pose.items()
}
def _random_stuck_pose(rng: np.random.Generator) -> dict:
"""Return a physically plausible stuck pose (failed grasp), gripper closed.
ef bounds are piecewise-linear in sl so the arm stays in a reachable,
table-safe envelope across the full sl range:
sl=-50 → ef ∈ [ 0, 50] (arm raised, can be bent forward)
sl= 0 → ef ∈ [-25, 25] (mid reach)
sl= 30 → ef ∈ [-20, 0] (arm extended, little room to flex)
wrist_flex is randomly offset from the horizontal value.
"""
pan = float(rng.uniform(-5.0, 35.0))
sl = float(rng.uniform(-50.0, 30.0))
if sl <= 0.0:
alpha = (sl + 50.0) / 50.0 # 0 at sl=-50, 1 at sl=0
ef_lo = alpha * -25.0 # 0 → -25
ef_hi = 50.0 + alpha * -25.0 # 50 → 25
else:
alpha = sl / 30.0 # 0 at sl=0, 1 at sl=30
ef_lo = -25.0 + alpha * 5.0 # -25 → -20
ef_hi = 25.0 + alpha * -25.0 # 25 → 0
ef = float(rng.uniform(ef_lo, ef_hi))
wf = horizontal_wrist_flex(sl, ef) + float(rng.uniform(-15.0, 15.0))
return {
"shoulder_pan.pos": pan,
"shoulder_lift.pos": sl,
"elbow_flex.pos": ef,
"wrist_flex.pos": wf,
"wrist_roll.pos": float(rng.uniform(-15.0, 15.0)),
"gripper.pos": GRIPPER_CLOSE_POS,
}
logger = logging.getLogger(__name__)
@dataclass
class OmxRecordGrabConfig:
robot: RobotConfig
dataset: DatasetRecordConfig
# Resume recording on an existing dataset.
resume: bool = False
# Fraction of episodes that start from a random stuck pose (gripper closed) to
# generate recovery data. 0.0 = disabled, 1.0 = all episodes are recovery starts.
recovery_prob: float = 0.5
def record_episode_spline(
robot: OmxFollower,
waypoints: list[dict],
speeds: list[float],
dataset: LeRobotDataset,
task: str,
) -> None:
"""Execute a Catmull-Rom-style spline through waypoints, recording each frame.
Segment durations are parameterized from the maximum absolute joint delta
between consecutive waypoints divided by the requested segment speed,
producing non-uniform timing in joint space. Interior tangents are derived
from the adjacent per-segment velocities, with clamped (zero-velocity)
endpoints so the arm starts and stops smoothly. Each segment is cubic
Hermite, giving C1 continuity at every waypoint.
"""
pts = [pose_to_array(w) for w in waypoints]
n = len(pts)
# Steps and duration per segment
n_steps_list = []
timestamps = []
for i in range(n - 1):
max_dist = float(np.max(np.abs(pts[i + 1] - pts[i])))
ns = max(1, int(max_dist / speeds[i] * dataset.fps)) if max_dist >= 0.5 else 0
n_steps_list.append(ns)
timestamps.append(ns / dataset.fps)
# Velocity tangents (deg/sec) — clamped at endpoints, Catmull-Rom for interior
vels = [np.zeros_like(pts[0])]
for i in range(1, n - 1):
v_prev = (pts[i] - pts[i - 1]) / timestamps[i - 1] if timestamps[i - 1] > 0 else np.zeros_like(pts[0])
v_next = (pts[i + 1] - pts[i]) / timestamps[i] if timestamps[i] > 0 else np.zeros_like(pts[0])
vels.append(0.5 * (v_prev + v_next))
vels.append(np.zeros_like(pts[0]))
dt = 1.0 / dataset.fps
for seg in range(n - 1):
ns = n_steps_list[seg]
if ns == 0:
continue
p0, p1 = pts[seg], pts[seg + 1]
# Scale velocity (deg/sec) to t-space tangent (deg/t-unit, where t: 0→1 over ns steps)
m0 = vels[seg] * timestamps[seg]
m1 = vels[seg + 1] * timestamps[seg]
for step in range(1, ns + 1):
t = step / ns
h00 = 2 * t**3 - 3 * t**2 + 1
h10 = t**3 - 2 * t**2 + t
h01 = -2 * t**3 + 3 * t**2
h11 = t**3 - t**2
commanded = h00 * p0 + h10 * m0 + h01 * p1 + h11 * m1
action = array_to_pose(commanded)
robot.send_action(action)
obs = robot.get_observation()
obs_frame = build_dataset_frame(dataset.features, obs, prefix=OBS_STR)
action_frame = build_dataset_frame(dataset.features, action, prefix=ACTION)
dataset.add_frame({**obs_frame, **action_frame, "task": task})
precise_sleep(dt)
def record_grab_episode(
robot: OmxFollower,
dataset: LeRobotDataset,
pan: float,
t: float,
task: str,
recovery_start: bool = False,
) -> None:
"""Execute a targeted grab to the stored (pan, t) position, recording every frame.
Normal sequence (initial HOME move is NOT recorded):
HOME → raised approach above cube → lower → close gripper
→ raise [jittered] → retract [jittered] → GRAB_CARRY_POSE → drop → HOME
Recovery sequence (recovery_start=True): arm is moved to a random stuck pose
(gripper closed) without recording, then recording begins from there:
stuck_pose → raised approach above cube → [normal grab sequence from there]
All segments are joined by a Catmull-Rom spline (C1-continuous velocities).
"""
sl = PUSH_START_SHOULDER_LIFT + t * (PUSH_END_SHOULDER_LIFT - PUSH_START_SHOULDER_LIFT)
ef = PUSH_START_ELBOW_FLEX + t * (PUSH_END_ELBOW_FLEX - PUSH_START_ELBOW_FLEX)
sl_raised = sl - GRAB_RAISE_SL_OFFSET
wf_horizontal = horizontal_wrist_flex(sl, ef)
rng = np.random.default_rng()
if recovery_start:
stuck_pose = _random_stuck_pose(rng)
logger.info(f"Recovery start: {stuck_pose}")
move_to_pose(robot, stuck_pose, APPROACH_SPEED)
first_waypoints = [stuck_pose]
first_speeds = []
else:
jittery_start = _jitter_pose(HOME_POSE, rng)
move_to_pose(robot, jittery_start, APPROACH_SPEED)
first_waypoints = [jittery_start]
first_speeds = []
waypoints = first_waypoints + [
{ # raised approach: arm above cube
"shoulder_pan.pos": pan,
"shoulder_lift.pos": sl_raised,
"elbow_flex.pos": ef,
"wrist_flex.pos": horizontal_wrist_flex(sl_raised, ef),
"wrist_roll.pos": 0.0,
"gripper.pos": 60.0,
},
{ # lower onto cube — no jitter: precision needed
"shoulder_pan.pos": pan,
"shoulder_lift.pos": sl,
"elbow_flex.pos": ef,
"wrist_flex.pos": wf_horizontal,
"wrist_roll.pos": 0.0,
"gripper.pos": 60.0,
},
{ # close gripper — no jitter: precision needed
"shoulder_pan.pos": pan,
"shoulder_lift.pos": sl,
"elbow_flex.pos": ef,
"wrist_flex.pos": wf_horizontal,
"wrist_roll.pos": 0.0,
"gripper.pos": GRIPPER_CLOSE_POS,
},
_jitter_pose(
{ # raise with cube
"shoulder_pan.pos": pan,
"shoulder_lift.pos": sl_raised,
"elbow_flex.pos": ef,
"wrist_flex.pos": horizontal_wrist_flex(sl_raised, ef),
"wrist_roll.pos": 0.0,
"gripper.pos": GRIPPER_CLOSE_POS,
},
rng,
),
_jitter_pose(
{ # retract: fold arm toward HOME before sweeping to carry zone
"shoulder_pan.pos": pan * 0.25,
"shoulder_lift.pos": HOME_POSE["shoulder_lift.pos"] + 5.0,
"elbow_flex.pos": HOME_POSE["elbow_flex.pos"] - 5.0,
"wrist_flex.pos": 0.0,
"wrist_roll.pos": 0.0,
"gripper.pos": GRIPPER_CLOSE_POS,
},
rng,
),
GRAB_CARRY_POSE, # no jitter: target drop zone
{**GRAB_CARRY_POSE, "gripper.pos": 60.0}, # drop cube
HOME_POSE,
]
speeds = first_speeds + [
RECORD_SPEED, # (HOME →) raised approach
GRAB_LOWER_SPEED, # raised approach → lower
GRAB_LOWER_SPEED, # lower → close gripper
RECORD_SPEED, # close gripper → raise
RECORD_SPEED, # raise → retract
RECORD_SPEED, # retract → carry pose
RECORD_SPEED, # carry pose → drop
RECORD_SPEED, # drop → HOME
]
record_episode_spline(robot, waypoints, speeds, dataset, task)
# Dwell at HOME for ~0.5 s before next episode
home_action = build_dataset_frame(dataset.features, HOME_POSE, prefix=ACTION)
dt = 1.0 / dataset.fps
for _ in range(int(dataset.fps * 0.5)):
robot.send_action(HOME_POSE)
obs = robot.get_observation()
obs_frame = build_dataset_frame(dataset.features, obs, prefix=OBS_STR)
dataset.add_frame({**obs_frame, **home_action, "task": task})
precise_sleep(dt)
@parser.wrap()
def record_grab(cfg: OmxRecordGrabConfig) -> LeRobotDataset:
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger.info(pformat(cfg))
robot = make_robot_from_config(cfg.robot)
use_videos = cfg.dataset.video
teleop_action_processor, _, robot_obs_processor = make_default_processors()
dataset_features = combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=teleop_action_processor,
initial_features=create_initial_features(action=robot.action_features),
use_videos=use_videos,
),
aggregate_pipeline_dataset_features(
pipeline=robot_obs_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=use_videos,
),
)
num_cameras = len(robot.cameras) if hasattr(robot, "cameras") else 0
dataset = None
try:
if cfg.resume:
dataset = LeRobotDataset.resume(
cfg.dataset.repo_id,
root=cfg.dataset.root,
streaming_encoding=cfg.dataset.streaming_encoding,
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
vcodec=cfg.dataset.vcodec,
encoder_threads=cfg.dataset.encoder_threads,
image_writer_processes=cfg.dataset.num_image_writer_processes if num_cameras > 0 else 0,
image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * num_cameras
if num_cameras > 0
else 0,
)
else:
cfg.dataset.stamp_repo_id()
dataset = LeRobotDataset.create(
cfg.dataset.repo_id,
cfg.dataset.fps,
root=cfg.dataset.root,
robot_type=robot.name,
features=dataset_features,
use_videos=use_videos,
streaming_encoding=cfg.dataset.streaming_encoding,
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
vcodec=cfg.dataset.vcodec,
encoder_threads=cfg.dataset.encoder_threads,
image_writer_processes=cfg.dataset.num_image_writer_processes if num_cameras > 0 else 0,
image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * num_cameras
if num_cameras > 0
else 0,
)
robot.connect(calibrate=True)
rng = np.random.default_rng()
with VideoEncodingManager(dataset):
for episode_idx in range(cfg.dataset.num_episodes):
logger.info(f"=== Episode {episode_idx + 1}/{cfg.dataset.num_episodes} ===")
logger.info("Step 1: grabbing and placing cube...")
grab_cube(robot)
pan, t = place_cube(robot)
logger.info(f"Cube placed at pan={pan:.1f}, reach={t:.2f}")
recovery_start = cfg.recovery_prob > 0 and float(rng.random()) < cfg.recovery_prob
logger.info(f"Step 2: recording {'recovery ' if recovery_start else ''}grab episode...")
record_grab_episode(
robot,
dataset,
pan,
t,
cfg.dataset.single_task,
recovery_start=recovery_start,
)
dataset.save_episode()
logger.info(f"Episode {episode_idx + 1} saved.")
finally:
if dataset:
dataset.finalize()
if robot.is_connected:
robot.disconnect()
if cfg.dataset.push_to_hub and dataset and dataset.num_episodes > 0:
dataset.push_to_hub(tags=cfg.dataset.tags, private=cfg.dataset.private)
return dataset
if __name__ == "__main__":
record_grab()
+267
View File
@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
Auto-reset and cube-grab utility for the OMX robot arm.
Provides:
- grab_cube(robot): sweep workspace, center cube, close gripper
- place_cube(robot): carry cube to a random position, release
Standalone usage (run from repo root):
python -m examples.omx.reset_environment --port /dev/ttyACM1 --mode grab
python -m examples.omx.reset_environment --port /dev/ttyACM1 --mode grab_and_place
Joint range: -100 to 100 for arm joints; gripper: 50 = closed, 80 = open.
To read current joint values for calibration, add after robot.connect():
obs = robot.get_observation()
print({k: round(obs[k], 1) for k in JOINT_NAMES})
robot.disconnect(); raise SystemExit
Parallel-to-ground IK: wrist_flex = WRIST_HORIZONTAL_OFFSET - shoulder_lift - elbow_flex.
Linear interpolation preserves this constraint between any two poses that satisfy it.
"""
import argparse
import logging
import numpy as np
from lerobot.robots.omx_follower import OmxFollower, OmxFollowerConfig
from lerobot.robots.robot import Robot
from lerobot.utils.robot_utils import precise_sleep
logger = logging.getLogger(__name__)
# ── Poses ─────────────────────────────────────────────────────────────────────
HOME_POSE = {
"shoulder_pan.pos": 0.0,
"shoulder_lift.pos": -50.0,
"elbow_flex.pos": 50.0,
"wrist_flex.pos": 0.0,
"wrist_roll.pos": 0.0,
"gripper.pos": 60.0,
}
SWEEP_WAYPOINTS = [
{
"shoulder_pan.pos": -60.0,
"shoulder_lift.pos": 50.0,
"elbow_flex.pos": -60.0,
"wrist_flex.pos": -20.0,
"wrist_roll.pos": 0.0,
"gripper.pos": 60.0,
},
{
"shoulder_pan.pos": -30.0,
"shoulder_lift.pos": 50.0,
"elbow_flex.pos": -60.0,
"wrist_flex.pos": -5.0,
"wrist_roll.pos": 0.0,
"gripper.pos": 60.0,
},
{
"shoulder_pan.pos": 20.0,
"shoulder_lift.pos": 50.0,
"elbow_flex.pos": -55.0,
"wrist_flex.pos": -5.0,
"wrist_roll.pos": 0.0,
"gripper.pos": 60.0,
},
]
# ── Motion parameters ─────────────────────────────────────────────────────────
CONTROL_HZ = 30
APPROACH_SPEED = 50.0
SWEEP_SPEED = 40.0
# ── Grab-sequence parameters ──────────────────────────────────────────────────
GRAB_PAN = 0.0
SWEEP_LEFT_PAN = -60.0
SWEEP_RIGHT_PAN = 60.0
SWEEP_END_OFFSET = 5.0 # stop before center so the cube isn't pushed past GRAB_PAN
SWEEP_END_PAN_RANGE = (15.0, 20.0)
SWEEP_LOW_SHOULDER_LIFT = 50.0
SWEEP_LOW_ELBOW_FLEX_START = -60.0
SWEEP_LOW_ELBOW_FLEX_END = -55.0
SWEEP_HIGH_WRIST_FLEX = -20.0 # wrist tilted up during high approach to clear obstacles
PUSH_START_SHOULDER_LIFT = 0.0
PUSH_START_ELBOW_FLEX = 45.0
PUSH_END_SHOULDER_LIFT = 50.0
PUSH_END_ELBOW_FLEX = -50.0
# Subtracted from shoulder_lift during the push sweep to clear the platform surface.
# Does not affect the grab-target interpolation in record_grab.py.
PUSH_RAISE_OFFSET = 5.0
WRIST_HORIZONTAL_OFFSET = 0.0 # tune if gripper tilts during push: + tilts nose up, - down
GRIPPER_CLOSE_POS = 50.0
PLACE_LEFT_PAN_RANGE = (5.0, 30.0) # random pan range for cube placement on the left side
PLACE_REACH_RANGE = (0.1, 0.7) # 0 = arm retracted (PUSH_START), 1 = fully extended (PUSH_END)
JOINT_NAMES = [
"shoulder_pan.pos",
"shoulder_lift.pos",
"elbow_flex.pos",
"wrist_flex.pos",
"wrist_roll.pos",
"gripper.pos",
]
# ── Helpers ───────────────────────────────────────────────────────────────────
def pose_to_array(pose: dict) -> np.ndarray:
return np.array([pose[k] for k in JOINT_NAMES])
def array_to_pose(arr: np.ndarray) -> dict:
return {k: float(arr[i]) for i, k in enumerate(JOINT_NAMES)}
def horizontal_wrist_flex(shoulder_lift: float, elbow_flex: float) -> float:
return WRIST_HORIZONTAL_OFFSET - shoulder_lift - elbow_flex
def _low_sweep_pose(pan: float, elbow_flex: float, wrist_flex: float | None = None) -> dict:
sl = SWEEP_LOW_SHOULDER_LIFT
return {
"shoulder_pan.pos": pan,
"shoulder_lift.pos": sl,
"elbow_flex.pos": elbow_flex,
"wrist_flex.pos": horizontal_wrist_flex(sl, elbow_flex) if wrist_flex is None else wrist_flex,
"wrist_roll.pos": 0.0,
"gripper.pos": 60.0,
}
def _high_sweep_pose(pan: float) -> dict:
return {**HOME_POSE, "shoulder_pan.pos": pan, "wrist_flex.pos": SWEEP_HIGH_WRIST_FLEX}
def _push_pose(shoulder_lift: float, elbow_flex: float, pan: float = GRAB_PAN, gripper: float = 70.0) -> dict:
return {
"shoulder_pan.pos": pan,
"shoulder_lift.pos": shoulder_lift,
"elbow_flex.pos": elbow_flex,
"wrist_flex.pos": horizontal_wrist_flex(shoulder_lift, elbow_flex),
"wrist_roll.pos": 0.0,
"gripper.pos": gripper,
}
def move_to_pose(robot: Robot, target: dict, speed: float) -> None:
"""Interpolate from current position to target at the given speed (units/s)."""
obs = robot.get_observation()
current = np.array([obs[k] for k in JOINT_NAMES])
goal = pose_to_array(target)
max_distance = float(np.max(np.abs(goal - current)))
if max_distance < 0.5:
return
n_steps = max(1, int(max_distance / speed * CONTROL_HZ))
dt = 1.0 / CONTROL_HZ
for step in range(1, n_steps + 1):
t = step / n_steps
robot.send_action(array_to_pose(current + t * (goal - current)))
precise_sleep(dt)
# ── Sequences ─────────────────────────────────────────────────────────────────
def grab_cube(robot: Robot) -> None:
"""Left sweep → right sweep → extend arm parallel to ground → close gripper."""
move_to_pose(robot, HOME_POSE, APPROACH_SPEED)
for pan, end_pan in [
(SWEEP_LEFT_PAN, GRAB_PAN - SWEEP_END_OFFSET),
(SWEEP_RIGHT_PAN, GRAB_PAN + SWEEP_END_OFFSET),
]:
logger.info(f"Sweeping {'left' if pan < 0 else 'right'} → center...")
move_to_pose(robot, _high_sweep_pose(pan), APPROACH_SPEED)
move_to_pose(
robot, _low_sweep_pose(pan, SWEEP_LOW_ELBOW_FLEX_START, wrist_flex=-20.0), APPROACH_SPEED
)
move_to_pose(robot, _low_sweep_pose(end_pan, SWEEP_LOW_ELBOW_FLEX_END, wrist_flex=0.0), SWEEP_SPEED)
move_to_pose(robot, HOME_POSE, APPROACH_SPEED)
logger.info("Extending to push cube into gripper...")
move_to_pose(
robot,
_push_pose(PUSH_START_SHOULDER_LIFT - PUSH_RAISE_OFFSET, PUSH_START_ELBOW_FLEX),
APPROACH_SPEED,
)
move_to_pose(
robot,
_push_pose(PUSH_END_SHOULDER_LIFT - PUSH_RAISE_OFFSET, PUSH_END_ELBOW_FLEX),
SWEEP_SPEED,
)
logger.info("Closing gripper...")
move_to_pose(
robot,
_push_pose(PUSH_END_SHOULDER_LIFT, PUSH_END_ELBOW_FLEX, gripper=GRIPPER_CLOSE_POS),
APPROACH_SPEED,
)
logger.info("Grab complete.")
def place_cube(robot: Robot) -> tuple[float, float]:
"""Carry the cube (gripper closed) to a random position on the left side, then release.
Returns:
(pan, t): pan angle and reach scalar [0, 1] of the placement position.
"""
pan = float(np.random.uniform(*PLACE_LEFT_PAN_RANGE))
t = float(np.random.uniform(*PLACE_REACH_RANGE))
sl = PUSH_START_SHOULDER_LIFT + t * (PUSH_END_SHOULDER_LIFT - PUSH_START_SHOULDER_LIFT)
ef = PUSH_START_ELBOW_FLEX + t * (PUSH_END_ELBOW_FLEX - PUSH_START_ELBOW_FLEX)
logger.info(f"Placing cube at pan={pan:.1f}, reach={t:.2f}...")
move_to_pose(robot, {**HOME_POSE, "gripper.pos": GRIPPER_CLOSE_POS}, APPROACH_SPEED)
move_to_pose(
robot, {**HOME_POSE, "shoulder_pan.pos": pan, "gripper.pos": GRIPPER_CLOSE_POS}, APPROACH_SPEED
)
move_to_pose(robot, _push_pose(sl, ef, pan=pan, gripper=GRIPPER_CLOSE_POS), APPROACH_SPEED)
move_to_pose(robot, _push_pose(sl, ef, pan=pan, gripper=80.0), APPROACH_SPEED)
move_to_pose(robot, HOME_POSE, APPROACH_SPEED)
logger.info("Place complete.")
return pan, t
# ── Entry point ───────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="OMX arm reset / grab script")
parser.add_argument("--port", default="/dev/ttyACM1")
parser.add_argument("--robot_id", default="omx_follower")
parser.add_argument("--mode", choices=["grab", "grab_and_place"], default="grab_and_place")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
robot = OmxFollower(OmxFollowerConfig(port=args.port, id=args.robot_id))
robot.connect(calibrate=True)
try:
if args.mode == "grab":
grab_cube(robot)
elif args.mode == "grab_and_place":
grab_cube(robot)
place_cube(robot)
finally:
robot.disconnect()
if __name__ == "__main__":
main()
+20 -3
View File
@@ -59,8 +59,8 @@ keywords = ["lerobot", "huggingface", "robotics", "machine learning", "artifici
dependencies = [
# Core ML
"torch>=2.7,<2.11.0",
"torchvision>=0.22.0,<0.26.0",
"torch>=2.7,<2.12.0",
"torchvision>=0.22.0,<0.27.0",
"numpy>=2.0.0,<2.3.0", # NOTE: Explicitly listing numpy helps the resolver converge faster. Upper bound imposed by opencv-python-headless.
"opencv-python-headless>=4.9.0,<4.14.0",
"Pillow>=10.0.0,<13.0.0",
@@ -99,7 +99,7 @@ dataset = [
"pandas>=2.0.0,<3.0.0", # NOTE: Transitive dependency of datasets
"pyarrow>=21.0.0,<30.0.0", # NOTE: Transitive dependency of datasets
"lerobot[av-dep]",
"torchcodec>=0.3.0,<0.11.0; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')", # NOTE: Windows support starts at version 0.7 (needs torch==2.8), ffmpeg>=8 support starts at version 0.8.1 (needs torch==2.9), system-wide ffmpeg support starts at version 0.10 (needs torch==2.10).
"torchcodec>=0.3.0,<0.12.0; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')", # NOTE: Windows support starts at version 0.7 (needs torch==2.8), ffmpeg>=8 support starts at version 0.8.1 (needs torch==2.9), system-wide ffmpeg support starts at version 0.10 (needs torch==2.10), 0.11 needs torch==2.11, 0.12 needs torch==2.12.
"jsonlines>=4.0.0,<5.0.0",
]
training = [
@@ -195,6 +195,7 @@ groot = [
sarm = ["lerobot[transformers-dep]", "pydantic>=2.0.0,<3.0.0", "faker>=33.0.0,<35.0.0", "lerobot[matplotlib-dep]", "lerobot[qwen-vl-utils-dep]"]
xvla = ["lerobot[transformers-dep]"]
eo1 = ["lerobot[transformers-dep]", "lerobot[qwen-vl-utils-dep]"]
evo1 = ["lerobot[transformers-dep]", "timm>=1.0.0,<1.1.0"]
hilserl = ["lerobot[transformers-dep]", "gym-hil>=0.1.13,<0.2.0", "lerobot[grpcio-dep]", "lerobot[placo-dep]"]
# Features
@@ -258,6 +259,7 @@ all = [
"lerobot[smolvla]",
# "lerobot[groot]", TODO(Steven): Gr00t requires specific installation instructions for flash-attn
"lerobot[xvla]",
"lerobot[evo1]",
"lerobot[hilserl]",
"lerobot[async]",
"lerobot[dev]",
@@ -293,6 +295,20 @@ lerobot-setup-can="lerobot.scripts.lerobot_setup_can:main"
lerobot-rollout="lerobot.scripts.lerobot_rollout:main"
# ---------------- Tool Configurations ----------------
# cu128 wheels keep broad hardware reach; the driver floor is 570.86.
# To use a different CUDA variant, reinstall torch with an explicit index, e.g.:
# uv pip install --force-reinstall torch torchvision \
# --index-url https://download.pytorch.org/whl/cu130
[[tool.uv.index]]
name = "pytorch-cu128"
url = "https://download.pytorch.org/whl/cu128"
explicit = true
[tool.uv.sources]
torch = [{ index = "pytorch-cu128", marker = "sys_platform == 'linux'" }]
torchvision = [{ index = "pytorch-cu128", marker = "sys_platform == 'linux'" }]
[tool.setuptools.package-data]
lerobot = ["envs/*.json"]
@@ -334,6 +350,7 @@ ignore = [
# E402: conditional-import guards (TYPE_CHECKING / is_package_available) must precede the imports they protect
"src/lerobot/scripts/convert_dataset_v21_to_v30.py" = ["E402"]
"src/lerobot/policies/wall_x/**" = ["N801", "N812", "SIM102", "SIM108", "SIM210", "SIM211", "B006", "B007", "SIM118"] # Supprese these as they are coming from original Qwen2_5_vl code TODO(pepijn): refactor original
"src/lerobot/policies/evo1/**" = ["N801", "N812"]
[tool.ruff.lint.isort]
combine-as-imports = true
+3 -1
View File
@@ -256,7 +256,9 @@ class TrainPipelineConfig(HubMixin):
) from e
cli_args = kwargs.pop("cli_args", [])
if config_file is not None:
# Legacy RA-BC migration only applies to framework-saved checkpoints (always JSON).
# Hand-written YAML/TOML configs are expected to use the current sample_weighting schema.
if config_file is not None and config_file.endswith(".json"):
with open(config_file) as f:
config = json.load(f)
migrated_config = _migrate_legacy_rabc_fields(config)
+5 -1
View File
@@ -282,7 +282,11 @@ class VideoDecoderCache:
with self._lock:
if video_path not in self._cache:
file_handle = fsspec.open(video_path).__enter__()
decoder = VideoDecoder(file_handle, seek_mode="approximate")
try:
decoder = VideoDecoder(file_handle, seek_mode="approximate")
except Exception:
file_handle.close()
raise
self._cache[video_path] = (decoder, file_handle)
return self._cache[video_path][0]
+14 -6
View File
@@ -24,7 +24,12 @@ import gymnasium as gym
from gymnasium.envs.registration import registry as gym_registry
from lerobot.configs import FeatureType, PolicyFeature
from lerobot.processor import IsaaclabArenaProcessorStep, LiberoProcessorStep, PolicyProcessorPipeline
from lerobot.processor import (
IsaaclabArenaProcessorStep,
LiberoActionProcessorStep,
LiberoProcessorStep,
PolicyProcessorPipeline,
)
from lerobot.robots import RobotConfig
from lerobot.teleoperators.config import TeleoperatorConfig
from lerobot.utils.constants import (
@@ -123,7 +128,7 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
vec = env_cls([_make_one for _ in range(n_envs)], **extra_kwargs)
return {self.type: {0: vec}}
def get_env_processors(self):
def get_env_processors(self, policy_cfg: Any | None = None):
"""Return (preprocessor, postprocessor) for this env. Default: identity."""
return PolicyProcessorPipeline(steps=[]), PolicyProcessorPipeline(steps=[])
@@ -436,10 +441,13 @@ class LiberoEnv(EnvConfig):
is_libero_plus=self.is_libero_plus,
)
def get_env_processors(self):
def get_env_processors(self, policy_cfg: Any | None = None):
max_state_dim = getattr(policy_cfg, "max_state_dim", None) if getattr(policy_cfg, "type", None) == "evo1" else None
action_feature = self.features.get(ACTION)
action_dim = int(action_feature.shape[0]) if action_feature is not None else 7
return (
PolicyProcessorPipeline(steps=[LiberoProcessorStep()]),
PolicyProcessorPipeline(steps=[]),
PolicyProcessorPipeline(steps=[LiberoProcessorStep(max_state_dim=max_state_dim)]),
PolicyProcessorPipeline(steps=[LiberoActionProcessorStep(action_dim=action_dim)]),
)
@@ -705,7 +713,7 @@ class IsaaclabArenaEnv(HubEnvConfig):
def gym_kwargs(self) -> dict:
return {}
def get_env_processors(self):
def get_env_processors(self, policy_cfg: Any | None = None):
state_keys = tuple(k.strip() for k in (self.state_keys or "").split(",") if k.strip())
camera_keys = tuple(k.strip() for k in (self.camera_keys or "").split(",") if k.strip())
if not state_keys and not camera_keys:
+9 -1
View File
@@ -15,6 +15,7 @@
# limitations under the License.
from __future__ import annotations
import inspect
from typing import Any
import gymnasium as gym
@@ -52,7 +53,14 @@ def make_env_pre_post_processors(
return make_xvla_libero_pre_post_processors()
return env_cfg.get_env_processors()
get_processors = env_cfg.get_env_processors
signature = inspect.signature(get_processors)
supports_policy_cfg = "policy_cfg" in signature.parameters or any(
param.kind is inspect.Parameter.VAR_KEYWORD for param in signature.parameters.values()
)
if supports_policy_cfg:
return get_processors(policy_cfg=policy_cfg)
return get_processors()
def make_env(
+2
View File
@@ -17,6 +17,7 @@ from lerobot.utils.action_interpolator import ActionInterpolator as ActionInterp
from .act.configuration_act import ACTConfig as ACTConfig
from .diffusion.configuration_diffusion import DiffusionConfig as DiffusionConfig
from .eo1.configuration_eo1 import EO1Config as EO1Config
from .evo1.configuration_evo1 import Evo1Config as Evo1Config
from .factory import get_policy_class, make_policy, make_policy_config, make_pre_post_processors
from .groot.configuration_groot import GrootConfig as GrootConfig
from .multi_task_dit.configuration_multi_task_dit import MultiTaskDiTConfig as MultiTaskDiTConfig
@@ -40,6 +41,7 @@ __all__ = [
# Configuration classes
"ACTConfig",
"DiffusionConfig",
"Evo1Config",
"GrootConfig",
"MultiTaskDiTConfig",
"EO1Config",
@@ -100,8 +100,8 @@ class DiffusionConfig(PreTrainedConfig):
# Inputs / output structure.
n_obs_steps: int = 2
horizon: int = 16
n_action_steps: int = 8
horizon: int = 64
n_action_steps: int = 32
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
@@ -122,10 +122,10 @@ class DiffusionConfig(PreTrainedConfig):
crop_ratio: float = 1.0
crop_shape: tuple[int, int] | None = None
crop_is_random: bool = True
pretrained_backbone_weights: str | None = None
use_group_norm: bool = True
pretrained_backbone_weights: str | None = "ResNet18_Weights.IMAGENET1K_V1"
use_group_norm: bool = False
spatial_softmax_num_keypoints: int = 32
use_separate_rgb_encoder_per_camera: bool = False
use_separate_rgb_encoder_per_camera: bool = True
# Unet.
down_dims: tuple[int, ...] = (512, 1024, 2048)
kernel_size: int = 5
+1
View File
@@ -0,0 +1 @@
../../../../docs/source/policy_evo1_README.md
+19
View File
@@ -0,0 +1,19 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_evo1 import Evo1Config
from .modeling_evo1 import EVO1Policy
from .processor_evo1 import make_evo1_pre_post_processors
__all__ = ["Evo1Config", "EVO1Policy", "make_evo1_pre_post_processors"]
@@ -0,0 +1,225 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import math
from dataclasses import dataclass, field
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.optim.optimizers import AdamWConfig
from lerobot.optim.schedulers import LRSchedulerConfig
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE
@LRSchedulerConfig.register_subclass("evo1_exact")
@dataclass
class Evo1SchedulerConfig(LRSchedulerConfig):
num_warmup_steps: int
def build(self, optimizer: Optimizer, num_training_steps: int) -> LambdaLR:
def lr_lambda(current_step: int) -> float:
if current_step < self.num_warmup_steps:
return current_step / max(1, self.num_warmup_steps)
progress = (current_step - self.num_warmup_steps) / max(
1, num_training_steps - self.num_warmup_steps
)
return max(0.0, 0.5 * (1.0 + math.cos(math.pi * progress)))
return LambdaLR(optimizer, lr_lambda, -1)
@PreTrainedConfig.register_subclass("evo1")
@dataclass
class Evo1Config(PreTrainedConfig):
training_stage: str = "stage1"
use_amp: bool = True
n_obs_steps: int = 1
chunk_size: int = 50
n_action_steps: int = 50
max_state_dim: int = 24
max_action_dim: int = 24
max_views: int = 3
image_resolution: tuple[int, int] = (448, 448)
empty_cameras: int = 0
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.IDENTITY,
"STATE": NormalizationMode.MIN_MAX,
"ACTION": NormalizationMode.MIN_MAX,
}
)
vlm_model_name: str = "OpenGVLab/InternVL3-1B"
vlm_num_layers: int | None = 14
vlm_dtype: str = "bfloat16"
use_flash_attn: bool = True
action_head: str = "flowmatching"
embed_dim: int = 896
hidden_dim: int = 1024
state_hidden_dim: int = 1024
num_heads: int = 8
num_layers: int = 8
dropout: float = 0.0
num_inference_timesteps: int = 32
num_categories: int = 1
return_cls_only: bool = False
enable_gradient_checkpointing: bool = True
gradient_checkpointing_use_reentrant: bool = False
finetune_vlm: bool | None = None
finetune_language_model: bool | None = None
finetune_vision_model: bool | None = None
finetune_action_head: bool | None = None
# Reapply stage defaults after loading checkpoint configs so stage2 cannot
# accidentally inherit the frozen VLM flags stored by a stage1 checkpoint.
apply_training_stage_defaults: bool = True
task_field: str = "task"
embodiment_id_field: str | None = None
default_embodiment_id: int = 0
optimizer_lr: float = 1e-5
optimizer_betas: tuple[float, float] = (0.9, 0.999)
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 1e-5
optimizer_grad_clip_norm: float = 1.0
scheduler_warmup_steps: int = 300
drop_last: bool = True
def __post_init__(self):
super().__post_init__()
if self.training_stage not in {"stage1", "stage2"}:
raise ValueError(
f"Unsupported EVO1 training_stage '{self.training_stage}', expected 'stage1' or 'stage2'"
)
if self.apply_training_stage_defaults:
if self.training_stage == "stage1":
self.finetune_vlm = False
self.finetune_language_model = False
self.finetune_vision_model = False
self.finetune_action_head = True
elif self.training_stage == "stage2":
self.finetune_vlm = True
self.finetune_language_model = True
self.finetune_vision_model = True
self.finetune_action_head = True
elif self.training_stage == "stage1":
if self.finetune_vlm is None:
self.finetune_vlm = False
if self.finetune_language_model is None:
self.finetune_language_model = False
if self.finetune_vision_model is None:
self.finetune_vision_model = False
if self.finetune_action_head is None:
self.finetune_action_head = True
elif self.training_stage == "stage2":
has_explicit_branch_flags = any(
flag is not None for flag in (self.finetune_language_model, self.finetune_vision_model)
)
if not has_explicit_branch_flags:
if self.finetune_vlm is None:
self.finetune_vlm = True
if self.finetune_language_model is None:
self.finetune_language_model = True
if self.finetune_vision_model is None:
self.finetune_vision_model = True
elif self.finetune_vlm is None:
self.finetune_vlm = bool(self.finetune_language_model or self.finetune_vision_model)
if self.finetune_action_head is None:
self.finetune_action_head = True
if self.finetune_vlm is None:
self.finetune_vlm = False
if self.finetune_language_model is None:
self.finetune_language_model = False
if self.finetune_vision_model is None:
self.finetune_vision_model = False
if self.finetune_action_head is None:
self.finetune_action_head = False
branch_vlm = self.finetune_language_model or self.finetune_vision_model
if self.finetune_vlm != branch_vlm:
raise ValueError(
"Inconsistent EVO1 finetune config: "
f"finetune_vlm={self.finetune_vlm} but "
f"(finetune_language_model or finetune_vision_model)={branch_vlm}. "
"When branch-level flags are used, finetune_vlm must match their effective union."
)
if self.n_action_steps > self.chunk_size:
raise ValueError(
f"n_action_steps ({self.n_action_steps}) must be <= chunk_size ({self.chunk_size})"
)
def validate_features(self) -> None:
if self.input_features is None:
self.input_features = {}
if self.output_features is None:
self.output_features = {}
for i in range(self.empty_cameras):
key = OBS_IMAGES + f".empty_camera_{i}"
if key not in self.input_features:
self.input_features[key] = PolicyFeature(
type=FeatureType.VISUAL,
shape=(3, *self.image_resolution),
)
if OBS_STATE not in self.input_features:
self.input_features[OBS_STATE] = PolicyFeature(
type=FeatureType.STATE,
shape=(self.max_state_dim,),
)
if ACTION not in self.output_features:
self.output_features[ACTION] = PolicyFeature(
type=FeatureType.ACTION,
shape=(self.max_action_dim,),
)
def get_optimizer_preset(self) -> AdamWConfig:
return AdamWConfig(
lr=self.optimizer_lr,
betas=self.optimizer_betas,
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
)
def get_scheduler_preset(self):
return Evo1SchedulerConfig(
num_warmup_steps=self.scheduler_warmup_steps,
)
@property
def observation_delta_indices(self) -> list[int]:
return [0]
@property
def action_delta_indices(self) -> list[int]:
return list(range(self.chunk_size))
@property
def reward_delta_indices(self) -> None:
return None
+234
View File
@@ -0,0 +1,234 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
import torch
import torch.nn as nn
from PIL import Image
from lerobot.policies.evo1.flow_matching import FlowmatchingActionHead
from lerobot.policies.evo1.internvl3_embedder import InternVL3Embedder
def _cfgget(config: Any, key: str, default=None):
if isinstance(config, dict):
return config.get(key, default)
return getattr(config, key, default)
class EVO1(nn.Module):
def __init__(self, config: dict):
super().__init__()
self.config = config
self._device = _cfgget(config, "device", "cuda")
self.return_cls_only = _cfgget(config, "return_cls_only", False)
vlm_name = _cfgget(config, "vlm_name", "OpenGVLab/InternVL3-1B")
image_size = _cfgget(config, "image_size", 448)
if image_size is None:
image_resolution = _cfgget(config, "image_resolution", (448, 448))
image_size = int(image_resolution[0])
self.embedder = InternVL3Embedder(
model_name=vlm_name,
image_size=image_size,
device=self._device,
num_language_layers=_cfgget(config, "vlm_num_layers", 14),
model_dtype=_cfgget(config, "vlm_dtype", "bfloat16"),
use_flash_attn=_cfgget(config, "use_flash_attn", True),
enable_gradient_checkpointing=_cfgget(config, "enable_gradient_checkpointing", True),
gradient_checkpointing_use_reentrant=_cfgget(
config, "gradient_checkpointing_use_reentrant", False
),
)
action_head_type = _cfgget(config, "action_head", "flowmatching").lower()
if action_head_type != "flowmatching":
raise NotImplementedError(f"Unknown action_head: {action_head_type}")
horizon = _cfgget(config, "action_horizon", _cfgget(config, "horizon", 16))
per_action_dim = _cfgget(config, "per_action_dim", 7)
action_dim = horizon * per_action_dim
if isinstance(config, dict):
config["horizon"] = horizon
config["per_action_dim"] = per_action_dim
config["action_dim"] = action_dim
self.horizon = horizon
self.per_action_dim = per_action_dim
self.action_head = FlowmatchingActionHead(config=config).to(self._device)
def _normalize_image_batches(
self,
images: Sequence[Image.Image | torch.Tensor] | Sequence[Sequence[Image.Image | torch.Tensor]],
prompt: str | list[str] | None,
image_mask: torch.Tensor,
) -> tuple[list[list[Image.Image | torch.Tensor]], list[str], torch.Tensor]:
if not images:
raise ValueError("EVO1 expects at least one image per sample.")
first = images[0]
if isinstance(first, (Image.Image, torch.Tensor)):
image_batches = [list(images)] # type: ignore[arg-type]
else:
image_batches = [list(sample) for sample in images] # type: ignore[arg-type]
batch_size = len(image_batches)
if prompt is None:
prompts = [""] * batch_size
elif isinstance(prompt, str):
prompts = [prompt] * batch_size
else:
prompts = [str(p) for p in prompt]
if len(prompts) != batch_size:
raise ValueError(
f"Prompt batch size {len(prompts)} does not match image batch size {batch_size}"
)
if image_mask.dim() == 1:
image_mask = image_mask.unsqueeze(0)
if image_mask.shape[0] != batch_size:
raise ValueError(
f"image_mask batch size {image_mask.shape[0]} does not match image batch size {batch_size}"
)
return image_batches, prompts, image_mask
def get_vl_embeddings(
self,
images: list[Image.Image | torch.Tensor] | list[list[Image.Image | torch.Tensor]],
image_mask: torch.Tensor,
prompt: str | list[str] | None = None,
return_cls_only: bool | None = None,
) -> torch.Tensor:
if return_cls_only is None:
return_cls_only = self.return_cls_only
image_batches, prompts, image_mask = self._normalize_image_batches(images, prompt, image_mask)
return self.embedder.get_fused_image_text_embedding_from_tensor_images(
image_tensors_batch=image_batches,
image_masks=image_mask,
text_prompts=prompts,
return_cls_only=return_cls_only,
)
def prepare_state(self, state_input: list | torch.Tensor) -> torch.Tensor:
if isinstance(state_input, list):
state_tensor = torch.tensor(state_input)
elif isinstance(state_input, torch.Tensor):
state_tensor = state_input
else:
raise TypeError(f"Unsupported state input type: {type(state_input)}")
if state_tensor.ndim == 1:
state_tensor = state_tensor.unsqueeze(0)
return state_tensor.to(self._device)
def predict_action(
self,
fused_tokens: torch.Tensor,
state: torch.Tensor,
actions_gt: torch.Tensor | None = None,
action_mask: torch.Tensor | None = None,
embodiment_ids: torch.Tensor | None = None,
):
if actions_gt is None:
return self.action_head.get_action(
fused_tokens,
state=state,
action_mask=action_mask,
embodiment_id=embodiment_ids,
)
return self.action_head(
fused_tokens,
state=state,
actions_gt=actions_gt,
action_mask=action_mask,
embodiment_id=embodiment_ids,
)
@torch.no_grad()
def run_inference(
self,
images: list[Image.Image | torch.Tensor],
image_mask: torch.Tensor,
prompt: str,
state_input: list | torch.Tensor,
return_cls_only: bool | None = None,
action_mask: torch.Tensor | None = None,
embodiment_ids: torch.Tensor | None = None,
) -> torch.Tensor:
if image_mask.dim() == 1:
image_mask = image_mask.unsqueeze(0)
fused_tokens = self.get_vl_embeddings(
images=[images],
image_mask=image_mask,
prompt=[prompt],
return_cls_only=return_cls_only,
)
state_tensor = self.prepare_state(state_input)
action = self.predict_action(
fused_tokens,
state_tensor,
action_mask=action_mask,
embodiment_ids=embodiment_ids,
)
if isinstance(action, torch.Tensor) and action.dtype == torch.bfloat16:
action = action.to(torch.float32)
return action
def forward(
self,
fused_tokens: torch.Tensor,
state: torch.Tensor | None = None,
actions_gt: torch.Tensor | None = None,
action_mask: torch.Tensor | None = None,
embodiment_ids: torch.Tensor | None = None,
):
return self.predict_action(fused_tokens, state, actions_gt, action_mask, embodiment_ids)
def _set_module_trainable(self, module: nn.Module, trainable: bool):
for param in module.parameters():
param.requires_grad = trainable
def set_finetune_flags(self):
finetune_vlm = _cfgget(self.config, "finetune_vlm", False)
finetune_language_model = _cfgget(self.config, "finetune_language_model", False)
finetune_vision_model = _cfgget(self.config, "finetune_vision_model", False)
has_explicit_branch_flags = any(
flag is not None for flag in (finetune_language_model, finetune_vision_model)
)
finetune_language_model = bool(finetune_language_model)
finetune_vision_model = bool(finetune_vision_model)
finetune_vlm = bool(finetune_vlm)
if has_explicit_branch_flags:
self._set_module_trainable(self.embedder, False)
if hasattr(self.embedder.model, "language_model"):
self._set_module_trainable(self.embedder.model.language_model, finetune_language_model)
if hasattr(self.embedder.model, "vision_model"):
self._set_module_trainable(self.embedder.model.vision_model, finetune_vision_model)
if hasattr(self.embedder.model, "mlp1"):
self._set_module_trainable(self.embedder.model.mlp1, finetune_vision_model)
elif not finetune_vlm:
self._set_module_trainable(self.embedder, False)
if not _cfgget(self.config, "finetune_action_head", False):
self._set_module_trainable(self.action_head, False)
+456
View File
@@ -0,0 +1,456 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import math
from types import SimpleNamespace
import torch
import torch.nn as nn
logger = logging.getLogger(__name__)
def _cfgget(config, key: str, default=None):
if isinstance(config, dict):
return config.get(key, default)
return getattr(config, key, default)
class SinusoidalPositionalEncoding(nn.Module):
def __init__(self, dim: int, max_len: int = 1000):
super().__init__()
pe = torch.zeros(max_len, dim)
position = torch.arange(0, max_len).unsqueeze(1)
div_term = torch.exp(torch.arange(0, dim, 2) * -(math.log(10000.0) / dim))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer("pe", pe)
def forward(self, seq_len: int):
if seq_len > self.pe.size(1):
self._extend_pe(seq_len)
return self.pe[:, :seq_len, :]
def _extend_pe(self, new_max_len):
old_max_len, dim = self.pe.size(1), self.pe.size(2)
if new_max_len <= old_max_len:
return
extra_positions = torch.arange(old_max_len, new_max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, dim, 2, dtype=torch.float) * -(math.log(10000.0) / dim))
extra_pe = torch.zeros(new_max_len - old_max_len, dim)
extra_pe[:, 0::2] = torch.sin(extra_positions * div_term)
extra_pe[:, 1::2] = torch.cos(extra_positions * div_term)
extra_pe = extra_pe.unsqueeze(0)
new_pe = torch.cat([self.pe, extra_pe.to(self.pe.device)], dim=1)
self.pe = new_pe
class CategorySpecificLinear(nn.Module):
def __init__(self, in_dim: int, out_dim: int, num_categories: int = 1):
super().__init__()
self.num_categories = num_categories
if num_categories <= 1:
self.linear = nn.Linear(in_dim, out_dim)
else:
self.weight = nn.Parameter(torch.empty(num_categories, in_dim, out_dim))
self.bias = nn.Parameter(torch.zeros(num_categories, out_dim))
nn.init.xavier_uniform_(self.weight)
def forward(self, x: torch.Tensor, category_id: torch.LongTensor):
if self.num_categories <= 1:
if x.dtype != self.linear.weight.dtype:
x = x.to(dtype=self.linear.weight.dtype)
return self.linear(x)
if x.dtype != self.weight.dtype:
x = x.to(dtype=self.weight.dtype)
orig_shape = x.shape
x_flat = x.reshape(-1, orig_shape[-1])
if category_id.dim() == 0:
cid = category_id.item()
out = x_flat @ self.weight[cid] + self.bias[cid]
else:
category_id = category_id.reshape(-1)
if category_id.numel() != x_flat.size(0):
raise ValueError(
f"category_id length {category_id.numel()} does not match flattened batch {x_flat.size(0)}"
)
weight_selected = self.weight[category_id]
bias_selected = self.bias[category_id]
out = torch.bmm(x_flat.unsqueeze(1), weight_selected).squeeze(1) + bias_selected
out_shape = orig_shape[:-1] + (out.shape[-1],)
return out.view(out_shape)
class CategorySpecificMLP(nn.Module):
def __init__(self, input_dim: int, hidden_dim: int, output_dim: int, num_categories: int = 1):
super().__init__()
self.fc1 = CategorySpecificLinear(input_dim, hidden_dim, num_categories)
self.fc2 = CategorySpecificLinear(hidden_dim, output_dim, num_categories)
self.activation = nn.ReLU(inplace=True)
def forward(self, x: torch.Tensor, category_id: torch.LongTensor):
out = self.activation(self.fc1(x, category_id))
out = self.fc2(out, category_id)
return out
class MultiEmbodimentActionEncoder(nn.Module):
def __init__(
self, action_dim: int, embed_dim: int, hidden_dim: int, horizon: int, num_categories: int = 1
):
super().__init__()
self.horizon = horizon
self.embed_dim = embed_dim
self.num_categories = num_categories
self.W1 = CategorySpecificLinear(action_dim, hidden_dim, num_categories)
self.W2 = CategorySpecificLinear(hidden_dim, hidden_dim, num_categories)
self.W3 = CategorySpecificLinear(hidden_dim, embed_dim, num_categories)
self.pos_encoding = SinusoidalPositionalEncoding(hidden_dim, max_len=horizon)
self.activation = nn.ReLU(inplace=True)
def forward(self, action_seq: torch.Tensor, category_id: torch.LongTensor):
batch_size, horizon, action_dim = action_seq.shape
assert self.horizon == horizon, "Action sequence length must match horizon"
x = action_seq.reshape(batch_size * horizon, action_dim)
if category_id.dim() == 0:
cat_ids = category_id.expand(horizon * batch_size)
else:
cat_ids = category_id.unsqueeze(1).expand(batch_size, horizon).reshape(batch_size * horizon)
out = self.activation(self.W1(x, cat_ids))
pos_enc = self.pos_encoding(horizon).to(device=out.device, dtype=out.dtype)
out = out.view(batch_size, horizon, -1) + pos_enc
out = out.view(batch_size * horizon, -1)
out = self.activation(self.W2(out, cat_ids))
out = self.W3(out, cat_ids)
return out.view(batch_size, horizon, self.embed_dim)
class BasicTransformerBlock(nn.Module):
def __init__(self, embed_dim: int, num_heads: int, hidden_dim: int, dropout: float = 0.0):
super().__init__()
self.attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout, batch_first=True)
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
self.ff = nn.Sequential(nn.Linear(embed_dim, hidden_dim), nn.GELU(), nn.Linear(hidden_dim, embed_dim))
def forward(self, action_tokens: torch.Tensor, context_tokens: torch.Tensor, time_emb: torch.Tensor):
x = self.norm1(action_tokens)
attn_out, _ = self.attn(x, context_tokens, context_tokens)
x = action_tokens + attn_out
x2 = self.norm2(x)
if time_emb is not None:
x2 = x2 + time_emb.unsqueeze(1)
ff_out = self.ff(x2)
return x + ff_out
class FlowmatchingActionHead(nn.Module):
def __init__(
self,
config=None,
embed_dim: int = 896,
hidden_dim: int = 1024,
action_dim: int = 16 * 7,
horizon: int = 16,
per_action_dim: int = 7,
num_heads: int = 8,
num_layers: int = 8,
dropout: float = 0.0,
num_inference_timesteps: int = 20,
num_categories: int = 1,
):
super().__init__()
if config is not None:
embed_dim = _cfgget(config, "embed_dim", embed_dim)
hidden_dim = _cfgget(config, "hidden_dim", hidden_dim)
action_dim = _cfgget(config, "action_dim", action_dim)
horizon = _cfgget(config, "horizon", horizon)
per_action_dim = _cfgget(config, "per_action_dim", per_action_dim)
num_heads = _cfgget(config, "num_heads", num_heads)
num_layers = _cfgget(config, "num_layers", num_layers)
dropout = _cfgget(config, "dropout", dropout)
num_inference_timesteps = _cfgget(config, "num_inference_timesteps", num_inference_timesteps)
num_categories = _cfgget(config, "num_categories", num_categories)
self.config = config
else:
self.config = SimpleNamespace(
embed_dim=embed_dim,
hidden_dim=hidden_dim,
action_dim=action_dim,
horizon=horizon,
per_action_dim=per_action_dim,
num_heads=num_heads,
num_layers=num_layers,
dropout=dropout,
num_inference_timesteps=num_inference_timesteps,
num_categories=num_categories,
)
logger.info("FlowmatchingActionHead num_inference_timesteps=%s", num_inference_timesteps)
self.embed_dim = embed_dim
self.horizon = horizon
self.per_action_dim = _cfgget(self.config, "per_action_dim", per_action_dim)
self.action_dim = _cfgget(self.config, "action_dim", action_dim)
self.time_pos_enc = SinusoidalPositionalEncoding(embed_dim, max_len=1000)
self.transformer_blocks = nn.ModuleList(
[
BasicTransformerBlock(
embed_dim=embed_dim,
num_heads=num_heads,
hidden_dim=embed_dim * 4,
dropout=dropout,
)
for _ in range(num_layers)
]
)
self.norm_out = nn.LayerNorm(embed_dim)
self.seq_pool_proj = nn.Linear(self.horizon * self.embed_dim, self.embed_dim)
self.mlp_head = CategorySpecificMLP(
input_dim=embed_dim,
hidden_dim=hidden_dim,
output_dim=action_dim,
num_categories=num_categories,
)
self.state_encoder = None
state_dim = _cfgget(self.config, "state_dim")
if state_dim is not None:
state_hidden = _cfgget(self.config, "state_hidden_dim", embed_dim)
self.state_encoder = CategorySpecificMLP(
input_dim=state_dim,
hidden_dim=state_hidden,
output_dim=embed_dim,
num_categories=num_categories,
)
if horizon > 1:
self.action_encoder = MultiEmbodimentActionEncoder(
action_dim=self.per_action_dim,
embed_dim=embed_dim,
hidden_dim=embed_dim,
horizon=horizon,
num_categories=num_categories,
)
self.single_action_proj = None
else:
self.action_encoder = None
self.single_action_proj = nn.Linear(self.per_action_dim, self.embed_dim)
def _project_actions(self, action_seq: torch.Tensor, embodiment_id: torch.LongTensor) -> torch.Tensor:
if self.horizon > 1 and self.action_encoder is not None:
return self.action_encoder(action_seq, embodiment_id)
if self.single_action_proj is None:
raise RuntimeError("single_action_proj is not initialized for horizon <= 1.")
return self.single_action_proj(action_seq)
def _expand_action_mask(
self,
action_mask: torch.Tensor,
batch_size: int,
per_action_dim: int,
device: torch.device,
dtype: torch.dtype,
) -> torch.Tensor:
if action_mask is None:
raise ValueError("action_mask must be provided for flow matching inference.")
if action_mask.dim() == 2:
expected_last_dim = self.horizon * per_action_dim
if action_mask.shape == (batch_size, expected_last_dim):
expanded_mask = action_mask.reshape(batch_size, self.horizon, per_action_dim)
elif action_mask.shape == (batch_size, per_action_dim):
expanded_mask = action_mask.unsqueeze(1).expand(batch_size, self.horizon, per_action_dim)
else:
raise ValueError(
f"Expected action_mask shape {(batch_size, expected_last_dim)} or "
f"{(batch_size, per_action_dim)}, got {tuple(action_mask.shape)}"
)
elif action_mask.dim() == 3:
expected_shape = (batch_size, self.horizon, per_action_dim)
if tuple(action_mask.shape) != expected_shape:
raise ValueError(
f"Expected action_mask shape {expected_shape}, got {tuple(action_mask.shape)}"
)
expanded_mask = action_mask
else:
raise ValueError(f"Unsupported action_mask rank: {action_mask.dim()}")
return expanded_mask.to(device=device, dtype=dtype)
def forward(
self,
fused_tokens: torch.Tensor,
state: torch.Tensor = None,
actions_gt: torch.Tensor = None,
embodiment_id: torch.LongTensor = None,
state_mask: torch.Tensor = None,
action_mask: torch.Tensor = None,
):
if actions_gt is None:
return self.get_action(
fused_tokens, state=state, embodiment_id=embodiment_id, action_mask=action_mask
)
batch_size = fused_tokens.size(0)
device = fused_tokens.device
if embodiment_id is None:
embodiment_id = torch.zeros(batch_size, dtype=torch.long, device=device)
context_tokens = fused_tokens
if state is not None and self.state_encoder is not None:
state_emb = self.state_encoder(state, embodiment_id).unsqueeze(1)
context_tokens = torch.cat([context_tokens, state_emb], dim=1)
t = (
torch.distributions.Beta(2, 2)
.sample((batch_size,))
.clamp(0.02, 0.98)
.to(device)
.to(dtype=self.dtype)
)
time_index = (t * 999).long().clamp_(0, 999)
time_emb = self.time_pos_enc(1000)[:, time_index, :].squeeze(0).to(dtype=context_tokens.dtype)
actions_gt_seq = actions_gt
noise = torch.rand_like(actions_gt) * 2 - 1
if action_mask is not None:
action_mask = action_mask.to(dtype=noise.dtype, device=noise.device)
if action_mask.shape != noise.shape:
raise ValueError(f"action_mask shape {action_mask.shape} != noise shape {noise.shape}")
actions_gt_seq = actions_gt_seq * action_mask
noise = noise * action_mask
if self.horizon > 1:
noise_seq = noise.view(batch_size, self.horizon, self.per_action_dim)
else:
noise_seq = noise if noise.dim() == 3 else noise.unsqueeze(1)
t_broadcast = t.view(batch_size, 1, 1)
action_intermediate_seq = (1 - t_broadcast) * noise_seq + t_broadcast * actions_gt_seq
action_tokens = self._project_actions(action_intermediate_seq, embodiment_id)
target_dtype = self.dtype
action_tokens = action_tokens.to(dtype=target_dtype)
context_tokens = context_tokens.to(dtype=target_dtype)
time_emb = time_emb.to(dtype=target_dtype)
x = action_tokens
for block in self.transformer_blocks:
x = block(x, context_tokens, time_emb)
x = self.norm_out(x)
if self.horizon > 1:
x_flat = x.reshape(batch_size, -1)
x_pooled = self.seq_pool_proj(x_flat)
else:
x_pooled = x.squeeze(1)
pred_velocity = self.mlp_head(x_pooled, embodiment_id)
return pred_velocity, noise
def get_action(
self,
fused_tokens: torch.Tensor,
state: torch.Tensor = None,
embodiment_id: torch.LongTensor = None,
action_mask: torch.Tensor = None,
):
batch_size = fused_tokens.size(0)
device = fused_tokens.device
if embodiment_id is None:
embodiment_id = torch.zeros(batch_size, dtype=torch.long, device=device)
context_tokens = fused_tokens
if state is not None and self.state_encoder is not None:
state_emb = self.state_encoder(state, embodiment_id).unsqueeze(1)
context_tokens = torch.cat([context_tokens, state_emb], dim=1)
action_dim_total = _cfgget(self.config, "action_dim", self.action_dim)
per_action_dim = _cfgget(self.config, "per_action_dim", action_dim_total // max(self.horizon, 1))
action = torch.rand(batch_size, action_dim_total, device=device, dtype=context_tokens.dtype) * 2 - 1
action_seq = (
action.view(batch_size, self.horizon, per_action_dim)
if self.horizon > 1
else action.view(batch_size, 1, per_action_dim)
)
action_mask = self._expand_action_mask(
action_mask,
batch_size=batch_size,
per_action_dim=per_action_dim,
device=action_seq.device,
dtype=action_seq.dtype,
)
action_seq = action_seq * action_mask
target_dtype = self.dtype
context_tokens = context_tokens.to(dtype=target_dtype)
num_steps = int(_cfgget(self.config, "num_inference_timesteps", 32))
if num_steps <= 0:
raise ValueError(f"num_inference_timesteps must be positive, got {num_steps}")
dt = 1.0 / num_steps
for i in range(num_steps):
t = i / num_steps
time_index = min(int(t * 999), 999)
time_emb = (
self.time_pos_enc(1000)[:, time_index, :].to(device).squeeze(0).to(dtype=context_tokens.dtype)
)
time_emb = time_emb.unsqueeze(0).repeat(batch_size, 1)
action_seq = action_seq * action_mask
action_tokens = self._project_actions(action_seq, embodiment_id).to(dtype=target_dtype)
time_emb = time_emb.to(dtype=target_dtype)
x = action_tokens
for block in self.transformer_blocks:
x = block(x, context_tokens, time_emb)
x = self.norm_out(x)
if self.horizon > 1:
x_flat = x.reshape(batch_size, -1)
x_pooled = self.seq_pool_proj(x_flat)
else:
x_pooled = x.squeeze(1)
pred = self.mlp_head(x_pooled, embodiment_id)
action = action + dt * pred
action_seq = (
action.view(batch_size, self.horizon, per_action_dim)
if self.horizon > 1
else action.view(batch_size, 1, per_action_dim)
)
action_seq = action_seq * action_mask
return action_seq.reshape(batch_size, -1)
@property
def device(self):
return next(self.parameters()).device
@property
def dtype(self):
return next(self.parameters()).dtype
@@ -0,0 +1,435 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import functools
import logging
import types
from collections.abc import Sequence
from contextlib import contextmanager
from typing import TYPE_CHECKING
import torch
import torch.nn as nn
import torch.utils.checkpoint
import torchvision.transforms.functional as TF
from PIL import Image
from torchvision.transforms.functional import to_pil_image
from lerobot.utils.import_utils import _transformers_available, require_package
if TYPE_CHECKING or _transformers_available:
from transformers import AutoModel, AutoTokenizer
else:
AutoModel = None
AutoTokenizer = None
IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD = (0.229, 0.224, 0.225)
IMG_CONTEXT_TOKEN = "<IMG_CONTEXT>" # nosec B105
IMG_START_TOKEN = "<img>" # nosec B105
IMG_END_TOKEN = "</img>" # nosec B105
logger = logging.getLogger(__name__)
def _patch_vision_encoder_checkpointing(encoder: nn.Module, use_reentrant: bool) -> None:
if getattr(encoder, "_evo1_checkpoint_patch_applied", False):
encoder.gradient_checkpointing_use_reentrant = use_reentrant
return
original_forward = encoder.forward
def forward_with_checkpoint_kwargs(self, *args, **kwargs):
original_checkpoint = torch.utils.checkpoint.checkpoint
def checkpoint(function, *checkpoint_args, **checkpoint_kwargs):
checkpoint_kwargs.setdefault("use_reentrant", self.gradient_checkpointing_use_reentrant)
return original_checkpoint(function, *checkpoint_args, **checkpoint_kwargs)
torch.utils.checkpoint.checkpoint = checkpoint
try:
return original_forward(*args, **kwargs)
finally:
torch.utils.checkpoint.checkpoint = original_checkpoint
encoder.gradient_checkpointing_use_reentrant = use_reentrant
encoder.forward = types.MethodType(forward_with_checkpoint_kwargs, encoder)
encoder._evo1_checkpoint_patch_applied = True
def flash_attn_is_available() -> bool:
try:
import flash_attn # noqa: F401
except ModuleNotFoundError:
return False
return True
@contextmanager
def _internvl_transformers5_load_compatibility():
from transformers.modeling_utils import PreTrainedModel
original_linspace = torch.linspace
original_mark_tied = PreTrainedModel.mark_tied_weights_as_initialized
def linspace(*args, **kwargs):
if kwargs.get("device") is None:
kwargs["device"] = torch.device("cpu")
return original_linspace(*args, **kwargs)
def mark_tied_weights_as_initialized(self, loading_info):
if not hasattr(self, "all_tied_weights_keys"):
self.all_tied_weights_keys = {}
return original_mark_tied(self, loading_info)
torch.linspace = linspace
PreTrainedModel.mark_tied_weights_as_initialized = mark_tied_weights_as_initialized
try:
yield
finally:
torch.linspace = original_linspace
PreTrainedModel.mark_tied_weights_as_initialized = original_mark_tied
@functools.lru_cache(maxsize=10000)
def get_target_aspect_ratio(orig_width: int, orig_height: int, image_size: int, min_num: int, max_num: int):
aspect_ratio = orig_width / orig_height
target_ratios = {
(i, j)
for n in range(min_num, max_num + 1)
for i in range(1, n + 1)
for j in range(1, n + 1)
if i * j <= max_num and i * j >= min_num
}
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])
best_ratio_diff = float("inf")
best_ratio = (1, 1)
area = orig_width * orig_height
for ratio in target_ratios:
target_ar = ratio[0] / ratio[1]
diff = abs(aspect_ratio - target_ar)
if diff < best_ratio_diff:
best_ratio_diff = diff
best_ratio = ratio
elif diff == best_ratio_diff and area > 0.5 * image_size**2 * ratio[0] * ratio[1]:
best_ratio = ratio
return best_ratio
def dynamic_preprocess(image, min_num=1, max_num=1, image_size=448, use_thumbnail=False):
orig_width, orig_height = image.size
ratio_w, ratio_h = get_target_aspect_ratio(orig_width, orig_height, image_size, min_num, max_num)
target_width = image_size * ratio_w
target_height = image_size * ratio_h
blocks = ratio_w * ratio_h
resized_img = image.resize((target_width, target_height))
processed_images = []
for i in range(blocks):
box = (
(i % (target_width // image_size)) * image_size,
(i // (target_width // image_size)) * image_size,
((i % (target_width // image_size)) + 1) * image_size,
((i // (target_width // image_size)) + 1) * image_size,
)
processed_images.append(resized_img.crop(box))
if use_thumbnail and len(processed_images) != 1:
processed_images.append(image.resize((image_size, image_size)))
return processed_images
class InternVL3Embedder(nn.Module):
def __init__(
self,
model_name="OpenGVLab/InternVL3-1B",
image_size=448,
device="cuda",
num_language_layers: int | None = 14,
model_dtype: str | torch.dtype = "bfloat16",
use_flash_attn: bool = True,
enable_gradient_checkpointing: bool = True,
gradient_checkpointing_use_reentrant: bool = False,
):
super().__init__()
self._requested_device = device
self.image_size = image_size
self.num_language_layers = num_language_layers
self.max_text_length = 1024
self.enable_gradient_checkpointing = bool(enable_gradient_checkpointing)
self.gradient_checkpointing_use_reentrant = bool(gradient_checkpointing_use_reentrant)
require_package("transformers", extra="evo1")
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True, use_fast=False)
if isinstance(model_dtype, str):
try:
model_dtype = getattr(torch, model_dtype)
except AttributeError as exc:
raise ValueError(f"Unsupported EVO1 vlm_dtype '{model_dtype}'") from exc
resolved_use_flash_attn = bool(use_flash_attn and flash_attn_is_available())
if use_flash_attn and not resolved_use_flash_attn:
logger.warning("flash_attn is not installed. Falling back to standard attention.")
# InternVL3 remote code predates Transformers 5 post-init conventions:
# it computes stochastic-depth scalars via torch.linspace(...).item()
# while Transformers initializes under torch.device("meta"), and it
# does not populate all_tied_weights_keys before loading finalization.
with _internvl_transformers5_load_compatibility():
self.model = AutoModel.from_pretrained(
model_name,
torch_dtype=model_dtype,
trust_remote_code=True,
use_flash_attn=resolved_use_flash_attn,
low_cpu_mem_usage=True,
_fast_init=False,
).to(self._requested_device)
if hasattr(self.model.language_model, "model"):
layers = self.model.language_model.model.layers
else:
layers = self.model.language_model.layers
if self.num_language_layers is not None:
layers = layers[: self.num_language_layers]
if hasattr(self.model.language_model, "model"):
self.model.language_model.model.layers = torch.nn.ModuleList(layers)
else:
self.model.language_model.layers = torch.nn.ModuleList(layers)
self.model.language_model.lm_head = torch.nn.Identity()
self._configure_memory_features()
self.img_context_token_id = self.tokenizer.convert_tokens_to_ids(IMG_CONTEXT_TOKEN)
def _configure_memory_features(self) -> None:
checkpoint_kwargs = {"use_reentrant": self.gradient_checkpointing_use_reentrant}
if not self.enable_gradient_checkpointing:
if hasattr(self.model, "vision_model") and hasattr(self.model.vision_model, "encoder"):
self.model.vision_model.encoder.gradient_checkpointing = False
language_model = getattr(self.model, "language_model", None)
if language_model is not None:
if hasattr(language_model, "gradient_checkpointing_disable"):
language_model.gradient_checkpointing_disable()
elif hasattr(language_model, "gradient_checkpointing"):
language_model.gradient_checkpointing = False
if hasattr(language_model, "model"):
inner = language_model.model
if hasattr(inner, "gradient_checkpointing_disable"):
inner.gradient_checkpointing_disable()
elif hasattr(inner, "gradient_checkpointing"):
inner.gradient_checkpointing = False
return
def _enable_ckpt(module: nn.Module | None) -> bool:
if module is None:
return False
if hasattr(module, "gradient_checkpointing_enable"):
try:
module.gradient_checkpointing_enable(gradient_checkpointing_kwargs=checkpoint_kwargs)
except TypeError:
module.gradient_checkpointing_enable()
return True
if hasattr(module, "gradient_checkpointing"):
module.gradient_checkpointing = True
return True
return False
enabled_any = _enable_ckpt(self.model)
if hasattr(self.model, "vision_model") and hasattr(self.model.vision_model, "encoder"):
encoder = self.model.vision_model.encoder
encoder.gradient_checkpointing = True
_patch_vision_encoder_checkpointing(
encoder, use_reentrant=self.gradient_checkpointing_use_reentrant
)
enabled_any = True
language_model = getattr(self.model, "language_model", None)
if language_model is not None:
enabled_any = _enable_ckpt(language_model) or enabled_any
if hasattr(language_model, "model"):
enabled_any = _enable_ckpt(language_model.model) or enabled_any
if hasattr(language_model, "config"):
language_model.config.use_cache = False
if hasattr(self.model, "config"):
self.model.config.use_cache = False
if hasattr(self.model, "enable_input_require_grads"):
self.model.enable_input_require_grads()
if enabled_any:
logger.info("Gradient checkpointing enabled for InternVL3 embedder.")
else:
logger.warning(
"Requested gradient checkpointing, but model does not expose checkpointing controls."
)
def _preprocess_single_image(self, image: Image.Image | torch.Tensor) -> torch.Tensor:
if isinstance(image, torch.Tensor):
pil_image = to_pil_image(image.detach().cpu())
else:
pil_image = image.convert("RGB")
tiles = dynamic_preprocess(pil_image, image_size=self.image_size)
tile_tensors = torch.stack([TF.to_tensor(tile) for tile in tiles]).to(
device=self.device, dtype=torch.bfloat16
)
mean = torch.tensor(IMAGENET_MEAN, device=self.device, dtype=torch.bfloat16).view(1, 3, 1, 1)
std = torch.tensor(IMAGENET_STD, device=self.device, dtype=torch.bfloat16).view(1, 3, 1, 1)
return (tile_tensors - mean) / std
def _preprocess_images(
self,
image_tensors_batch: Sequence[Sequence[Image.Image | torch.Tensor]],
) -> tuple[torch.Tensor, list[list[int]]]:
pixel_values_list = []
batch_num_tiles_list: list[list[int]] = []
for image_tensors in image_tensors_batch:
num_tiles_list: list[int] = []
for image in image_tensors:
tiles = self._preprocess_single_image(image)
pixel_values_list.append(tiles)
num_tiles_list.append(int(tiles.shape[0]))
batch_num_tiles_list.append(num_tiles_list)
if pixel_values_list:
pixel_values = torch.cat(pixel_values_list, dim=0)
else:
pixel_values = torch.empty(
0, 3, self.image_size, self.image_size, dtype=torch.bfloat16, device=self.device
)
return pixel_values, batch_num_tiles_list
def _build_multimodal_prompts(
self,
batch_num_tiles_list: list[list[int]],
text_prompts: Sequence[str],
) -> list[str]:
prompts = []
for num_tiles_list, text_prompt in zip(batch_num_tiles_list, text_prompts, strict=True):
prompt_segments = []
for i, tile_count in enumerate(num_tiles_list):
token_count = self.model.num_image_token * tile_count
image_tokens = IMG_START_TOKEN + IMG_CONTEXT_TOKEN * token_count + IMG_END_TOKEN
prompt_segments.append(f"Image-{i + 1}: {image_tokens}\n")
prompts.append("".join(prompt_segments) + text_prompt.strip())
return prompts
def _prepare_and_fuse_embeddings(
self,
prompts: Sequence[str],
vit_embeds: torch.Tensor,
image_masks: torch.Tensor,
batch_num_tiles_list: list[list[int]],
) -> tuple[torch.Tensor, torch.Tensor]:
untruncated_ids = self.tokenizer(list(prompts), padding=False, truncation=False)["input_ids"]
true_sequence_length = max((len(ids) for ids in untruncated_ids), default=0)
if true_sequence_length > self.max_text_length:
logger.warning(
"InternVL3 prompt truncated in batch: max_length=%s actual_max_length=%s",
self.max_text_length,
true_sequence_length,
)
model_inputs = self.tokenizer(
list(prompts),
return_tensors="pt",
padding="max_length",
truncation=True,
max_length=self.max_text_length,
).to(self.device)
input_ids = model_inputs["input_ids"]
attention_mask = model_inputs["attention_mask"]
img_token_mask = input_ids == self.img_context_token_id
input_embeds = self.model.language_model.get_input_embeddings()(input_ids).clone()
batch_size, _, channels = input_embeds.shape
vit_embeds = vit_embeds.reshape(-1, channels).to(dtype=input_embeds.dtype, device=input_embeds.device)
tokens_per_tile = self.model.num_image_token
actual_vis_tokens_list = img_token_mask.sum(dim=1).tolist()
vit_idx = 0
for batch_index in range(batch_size):
expected_vis_tokens = sum(batch_num_tiles_list[batch_index]) * tokens_per_tile
mask_b = img_token_mask[batch_index]
actual_vis_tokens = actual_vis_tokens_list[batch_index]
item_vit_embeds = vit_embeds[vit_idx : vit_idx + expected_vis_tokens]
vit_idx += expected_vis_tokens
if actual_vis_tokens > 0:
if item_vit_embeds.shape[0] < actual_vis_tokens:
raise ValueError(
f"InternVL3 produced fewer image tokens than expected for sample {batch_index}: "
f"got {item_vit_embeds.shape[0]}, need {actual_vis_tokens}"
)
input_embeds[batch_index, mask_b] = item_vit_embeds[:actual_vis_tokens]
current_token_idx = 0
img_token_locations = torch.where(mask_b)[0]
for image_index, num_tiles in enumerate(batch_num_tiles_list[batch_index]):
num_tokens_for_image = num_tiles * tokens_per_tile
if not bool(image_masks[batch_index, image_index].item()):
start_offset = current_token_idx
end_offset = min(current_token_idx + num_tokens_for_image, len(img_token_locations))
if start_offset < end_offset:
idxs = img_token_locations[start_offset:end_offset]
attention_mask[batch_index, idxs] = 0
current_token_idx += num_tokens_for_image
return input_embeds, attention_mask
def get_fused_image_text_embedding_from_tensor_images(
self,
image_tensors_batch: Sequence[Sequence[Image.Image | torch.Tensor]],
image_masks: torch.Tensor,
text_prompts: Sequence[str],
return_cls_only: bool = True,
):
pixel_values, batch_num_tiles_list = self._preprocess_images(image_tensors_batch)
if pixel_values.shape[0] == 0:
logger.warning("InternVL3 received an empty image batch after preprocessing.")
hidden_size = getattr(self.model.config, "hidden_size", None)
if hidden_size is None and hasattr(self.model.language_model, "config"):
hidden_size = getattr(self.model.language_model.config, "hidden_size", None)
if hidden_size is None:
raise RuntimeError("Unable to infer hidden size for empty InternVL3 batch.")
empty = torch.empty(0, hidden_size, device=self.device, dtype=torch.float32)
return empty
prompts = self._build_multimodal_prompts(batch_num_tiles_list, text_prompts)
vit_embeds = self.model.extract_feature(pixel_values)
inputs_embeds, attention_mask = self._prepare_and_fuse_embeddings(
prompts,
vit_embeds,
image_masks.to(device=self.device),
batch_num_tiles_list,
)
outputs = self.model.language_model(
inputs_embeds=inputs_embeds,
attention_mask=attention_mask,
output_hidden_states=True,
use_cache=False,
return_dict=True,
)
fused_hidden = outputs.hidden_states[-1].to(torch.float32)
return fused_hidden[:, 0, :] if return_cls_only else fused_hidden
@property
def device(self) -> torch.device:
return next(self.model.parameters()).device
+450
View File
@@ -0,0 +1,450 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import builtins
from collections import deque
from contextlib import nullcontext
from pathlib import Path
import torch
from torch import Tensor
from lerobot.configs.policies import PreTrainedConfig
from lerobot.policies.evo1.configuration_evo1 import Evo1Config
from lerobot.policies.evo1.evo1_model import EVO1
from lerobot.policies.pretrained import PreTrainedPolicy, T
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE
class EVO1Policy(PreTrainedPolicy):
config_class = Evo1Config
name = "evo1"
def __init__(self, config: Evo1Config, **kwargs):
super().__init__(config)
config.validate_features()
if len(config.image_features) > config.max_views:
raise ValueError(
f"EVO1 supports at most {config.max_views} camera streams, got {len(config.image_features)}"
)
self.config = config
self.model = EVO1(self._build_model_config(config))
self.model.set_finetune_flags()
self.reset()
@classmethod
def from_pretrained(
cls: builtins.type[T],
pretrained_name_or_path: str | Path,
*,
config: PreTrainedConfig | None = None,
force_download: bool = False,
resume_download: bool | None = None,
proxies: dict | None = None,
token: str | bool | None = None,
cache_dir: str | Path | None = None,
local_files_only: bool = False,
revision: str | None = None,
strict: bool | None = None,
**kwargs,
) -> T:
if strict is None:
strict = not (config is not None and getattr(config, "training_stage", None) == "stage2")
return super().from_pretrained(
pretrained_name_or_path=pretrained_name_or_path,
config=config,
force_download=force_download,
resume_download=resume_download,
proxies=proxies,
token=token,
cache_dir=cache_dir,
local_files_only=local_files_only,
revision=revision,
strict=strict,
**kwargs,
)
@staticmethod
def _build_model_config(config: Evo1Config) -> dict:
return {
"device": config.device,
"return_cls_only": config.return_cls_only,
"vlm_name": config.vlm_model_name,
"vlm_num_layers": config.vlm_num_layers,
"vlm_dtype": config.vlm_dtype,
"use_flash_attn": config.use_flash_attn,
"action_head": config.action_head,
"action_horizon": config.chunk_size,
"per_action_dim": config.max_action_dim,
"state_dim": config.max_state_dim,
"embed_dim": config.embed_dim,
"hidden_dim": config.hidden_dim,
"state_hidden_dim": config.state_hidden_dim,
"num_heads": config.num_heads,
"num_layers": config.num_layers,
"dropout": config.dropout,
"num_inference_timesteps": config.num_inference_timesteps,
"num_categories": config.num_categories,
"enable_gradient_checkpointing": config.enable_gradient_checkpointing,
"gradient_checkpointing_use_reentrant": config.gradient_checkpointing_use_reentrant,
"finetune_vlm": config.finetune_vlm,
"finetune_language_model": config.finetune_language_model,
"finetune_vision_model": config.finetune_vision_model,
"finetune_action_head": config.finetune_action_head,
}
@property
def _camera_keys(self) -> list[str]:
return list(self.config.image_features)
@property
def _env_action_dim(self) -> int:
action_feature = self.config.action_feature
if action_feature is None:
return self.config.max_action_dim
return int(action_feature.shape[0])
@property
def _compute_dtype(self) -> torch.dtype:
return next(self.model.action_head.parameters()).dtype
@property
def _training_compute_dtype(self) -> torch.dtype:
if str(self.config.device).startswith("cuda"):
return torch.bfloat16
return self._compute_dtype
@property
def _inference_compute_dtype(self) -> torch.dtype:
if str(self.config.device).startswith("cuda") and self.config.use_amp:
return torch.bfloat16
return self._compute_dtype
def get_optim_params(self) -> list[dict]:
decay, no_decay = [], []
for name, param in self.named_parameters():
if not param.requires_grad:
continue
is_bias = name.endswith("bias") or ".bias" in name
is_norm = param.dim() == 1 or "norm" in name.lower()
if is_bias or is_norm:
no_decay.append(param)
else:
decay.append(param)
return [
{"params": decay, "weight_decay": self.config.optimizer_weight_decay},
{"params": no_decay, "weight_decay": 0.0},
]
def reset(self):
self._action_queue = deque([], maxlen=self.config.n_action_steps)
def _normalize_task_batch(self, batch: dict[str, Tensor | list[str] | str]) -> list[str]:
prompts = batch.get(self.config.task_field)
if prompts is None and self.config.task_field != "task":
prompts = batch.get("task")
if prompts is None:
raise ValueError(f"EVO1 expects a '{self.config.task_field}' text field in the batch.")
if isinstance(prompts, str):
return [prompts]
if isinstance(prompts, (list, tuple)):
return [str(prompt) for prompt in prompts]
raise TypeError(f"Unsupported prompt batch type: {type(prompts)}")
def _prepare_state(self, batch: dict[str, Tensor]) -> tuple[Tensor, Tensor]:
if OBS_STATE not in batch:
raise ValueError(f"EVO1 requires '{OBS_STATE}' in the batch.")
state = batch[OBS_STATE]
if state.dim() == 1:
state = state.unsqueeze(0)
elif state.dim() == 3:
state = state[:, -1]
elif state.dim() != 2:
raise ValueError(f"Unsupported state tensor shape for EVO1: {tuple(state.shape)}")
batch_size, state_dim = state.shape
if state_dim > self.config.max_state_dim:
raise ValueError(
f"State dim {state_dim} exceeds configured max_state_dim {self.config.max_state_dim}"
)
explicit_mask = batch.get("state_mask")
if explicit_mask is not None:
if explicit_mask.dim() == 1:
explicit_mask = explicit_mask.unsqueeze(0)
elif explicit_mask.dim() == 3:
explicit_mask = explicit_mask[:, -1]
elif explicit_mask.dim() != 2:
raise ValueError(
f"Unsupported state_mask tensor shape for EVO1: {tuple(explicit_mask.shape)}"
)
if explicit_mask.shape != (batch_size, state_dim):
raise ValueError(
f"state_mask shape {tuple(explicit_mask.shape)} does not match state shape {(batch_size, state_dim)}"
)
padded = torch.zeros(
batch_size,
self.config.max_state_dim,
dtype=state.dtype,
device=self.config.device,
)
padded[:, :state_dim] = state.to(device=self.config.device)
mask = torch.zeros(
batch_size,
self.config.max_state_dim,
dtype=torch.bool,
device=self.config.device,
)
if explicit_mask is None:
mask[:, :state_dim] = True
else:
mask[:, :state_dim] = explicit_mask.to(device=self.config.device, dtype=torch.bool)
return padded.to(dtype=self._compute_dtype), mask
def _prepare_actions(self, batch: dict[str, Tensor]) -> tuple[Tensor, Tensor]:
if ACTION not in batch:
raise ValueError(f"EVO1 requires '{ACTION}' in the batch for training.")
action = batch[ACTION]
if action.dim() == 2:
action = action.unsqueeze(1)
batch_size, horizon, action_dim = action.shape
if horizon != self.config.chunk_size:
raise ValueError(
f"EVO1 expects chunk_size={self.config.chunk_size}, got action horizon {horizon}"
)
if action_dim > self.config.max_action_dim:
raise ValueError(
f"Action dim {action_dim} exceeds configured max_action_dim {self.config.max_action_dim}"
)
explicit_mask = batch.get("action_mask")
if explicit_mask is not None:
if explicit_mask.dim() == 2:
if horizon == 1:
explicit_mask = explicit_mask.unsqueeze(1)
else:
raise ValueError(
f"2D action_mask is only supported when chunk_size=1, got action horizon {horizon}"
)
elif explicit_mask.dim() != 3:
raise ValueError(
f"Unsupported action_mask tensor shape for EVO1: {tuple(explicit_mask.shape)}"
)
if explicit_mask.shape != (batch_size, horizon, action_dim):
raise ValueError(
"action_mask shape "
f"{tuple(explicit_mask.shape)} does not match action shape {(batch_size, horizon, action_dim)}"
)
padded = torch.zeros(
batch_size,
horizon,
self.config.max_action_dim,
dtype=action.dtype,
device=self.config.device,
)
padded[:, :, :action_dim] = action.to(device=self.config.device)
mask = torch.zeros(
batch_size,
horizon,
self.config.max_action_dim,
dtype=torch.bool,
device=self.config.device,
)
if explicit_mask is None:
mask[:, :, :action_dim] = True
else:
mask[:, :, :action_dim] = explicit_mask.to(device=self.config.device, dtype=torch.bool)
return padded.to(dtype=self._compute_dtype), mask
def _prepare_inference_action_mask(self, batch_size: int) -> Tensor:
mask = torch.zeros(
batch_size,
self.config.max_action_dim,
dtype=torch.bool,
device=self.config.device,
)
mask[:, : self._env_action_dim] = True
return mask
def _get_embodiment_ids(self, batch: dict[str, Tensor], batch_size: int) -> Tensor:
embodiment_ids = batch.get("embodiment_id")
if embodiment_ids is None and self.config.embodiment_id_field:
embodiment_ids = batch.get(self.config.embodiment_id_field)
if embodiment_ids is None:
return torch.full(
(batch_size,),
self.config.default_embodiment_id,
dtype=torch.long,
device=self.config.device,
)
if embodiment_ids.dim() == 0:
embodiment_ids = embodiment_ids.unsqueeze(0)
elif embodiment_ids.dim() > 1:
embodiment_ids = embodiment_ids[:, -1]
return embodiment_ids.to(device=self.config.device, dtype=torch.long)
@property
def _tracks_vlm_gradients(self) -> bool:
return bool(
self.config.finetune_vlm
or self.config.finetune_language_model
or self.config.finetune_vision_model
)
def _collect_image_batches(self, batch: dict[str, Tensor]) -> tuple[list[list[Tensor]], Tensor]:
camera_keys = self._camera_keys or sorted(key for key in batch if key.startswith(f"{OBS_IMAGES}."))
if not camera_keys:
raise ValueError("EVO1 requires at least one visual observation feature.")
# Normalize each camera tensor to (B, C, H, W) up-front so that batch_size is read
# from a real batch dim and not from C in the unbatched (C, H, W) case.
normalized: dict[str, Tensor] = {}
for camera_key in camera_keys[: self.config.max_views]:
image = batch[camera_key]
if image.dim() == 3:
image = image.unsqueeze(0)
elif image.dim() == 5:
image = image[:, -1]
elif image.dim() != 4:
raise ValueError(
f"Unsupported image tensor shape for EVO1: key={camera_key} shape={tuple(image.shape)}"
)
normalized[camera_key] = image
batch_size = normalized[camera_keys[0]].shape[0]
image_batches: list[list[Tensor]] = []
image_masks = torch.zeros(batch_size, self.config.max_views, dtype=torch.bool)
for batch_index in range(batch_size):
sample_images: list[Tensor] = []
for camera_key in camera_keys[: self.config.max_views]:
sample_images.append(normalized[camera_key][batch_index].detach().cpu())
if not sample_images:
raise ValueError("EVO1 received a batch without any image tensor.")
while len(sample_images) < self.config.max_views:
sample_images.append(torch.zeros_like(sample_images[0]))
image_batches.append(sample_images[: self.config.max_views])
image_masks[batch_index, : min(len(camera_keys), self.config.max_views)] = True
return image_batches, image_masks
def _compute_fused_tokens(
self,
prompts: list[str],
image_batches: list[list[Tensor]],
image_masks: Tensor,
) -> Tensor:
track_vlm_gradients = self._tracks_vlm_gradients
grad_context = nullcontext() if track_vlm_gradients else torch.no_grad()
embedder = getattr(self.model, "embedder", None)
embedder_was_training = embedder.training if embedder is not None else None
if not track_vlm_gradients and embedder is not None:
embedder.eval()
try:
with grad_context:
fused_tokens = self.model.get_vl_embeddings(
images=image_batches,
image_mask=image_masks,
prompt=prompts,
return_cls_only=self.config.return_cls_only,
)
finally:
if not track_vlm_gradients and embedder is not None and embedder_was_training is not None:
embedder.train(embedder_was_training)
if not track_vlm_gradients:
fused_tokens = fused_tokens.detach()
return fused_tokens.to(device=self.config.device, dtype=self._compute_dtype)
def _compute_masked_loss(
self,
pred_velocity: Tensor,
target_velocity: Tensor,
action_mask: Tensor,
reduction: str,
) -> Tensor:
flat_mask = action_mask.view(action_mask.shape[0], -1).to(dtype=pred_velocity.dtype)
sq_error = ((pred_velocity - target_velocity) * flat_mask).pow(2)
active = flat_mask.sum(dim=1).clamp_min(1.0)
per_sample_loss = sq_error.sum(dim=1) / active
if reduction == "none":
return per_sample_loss
if reduction != "mean":
raise ValueError(f"Unsupported reduction '{reduction}'")
return sq_error.sum() / active.sum()
def forward(self, batch: dict[str, Tensor], reduction: str = "mean") -> tuple[Tensor, dict]:
prompts = self._normalize_task_batch(batch)
image_batches, image_masks = self._collect_image_batches(batch)
states, _state_mask = self._prepare_state(batch)
actions_gt, action_mask = self._prepare_actions(batch)
fused_tokens = self._compute_fused_tokens(prompts, image_batches, image_masks)
states = states.to(dtype=self._training_compute_dtype)
actions_gt = actions_gt.to(dtype=self._training_compute_dtype)
fused_tokens = fused_tokens.to(dtype=self._training_compute_dtype)
embodiment_ids = self._get_embodiment_ids(batch, states.shape[0])
pred_velocity, noise = self.model(
fused_tokens,
state=states,
actions_gt=actions_gt,
action_mask=action_mask.to(device=self.config.device, dtype=self._compute_dtype),
embodiment_ids=embodiment_ids,
)
flat_action_mask = action_mask.view(action_mask.shape[0], -1).to(dtype=actions_gt.dtype)
target_velocity = (actions_gt - noise).view(actions_gt.shape[0], -1) * flat_action_mask
loss = self._compute_masked_loss(pred_velocity, target_velocity, action_mask, reduction)
loss_mean = loss.mean().item() if loss.ndim > 0 else loss.item()
return loss, {
"loss": loss_mean,
"active_action_dims": float(action_mask.sum(dim=(1, 2)).float().mean().item()),
}
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor:
self.eval()
prompts = self._normalize_task_batch(batch)
image_batches, image_masks = self._collect_image_batches(batch)
states, _state_mask = self._prepare_state(batch)
fused_tokens = self._compute_fused_tokens(prompts, image_batches, image_masks)
states = states.to(dtype=self._inference_compute_dtype)
fused_tokens = fused_tokens.to(dtype=self._inference_compute_dtype)
embodiment_ids = self._get_embodiment_ids(batch, states.shape[0])
action_mask = self._prepare_inference_action_mask(states.shape[0])
with (
torch.autocast(device_type="cuda", dtype=torch.bfloat16)
if self.config.use_amp and str(self.config.device).startswith("cuda")
else nullcontext()
):
actions = self.model(
fused_tokens,
state=states,
action_mask=action_mask,
embodiment_ids=embodiment_ids,
)
actions = actions.view(states.shape[0], self.config.chunk_size, self.config.max_action_dim)
return actions[:, :, : self._env_action_dim]
@torch.no_grad()
def select_action(self, batch: dict[str, Tensor], **kwargs) -> Tensor:
self.eval()
if len(self._action_queue) == 0:
action_chunk = self.predict_action_chunk(batch)[:, : self.config.n_action_steps]
self._action_queue.extend(action_chunk.transpose(0, 1))
return self._action_queue.popleft()
+106
View File
@@ -0,0 +1,106 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import Any
import torch
from lerobot.policies.evo1.configuration_evo1 import Evo1Config
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyProcessorPipeline,
RenameObservationsProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import (
batch_to_transition,
create_transition,
policy_action_to_transition,
transition_to_policy_action,
)
from lerobot.utils.constants import (
ACTION,
DONE,
INFO,
OBS_PREFIX,
POLICY_POSTPROCESSOR_DEFAULT_NAME,
POLICY_PREPROCESSOR_DEFAULT_NAME,
REWARD,
TRUNCATED,
)
def evo1_batch_to_transition(batch: dict[str, Any]):
transition = batch_to_transition(batch)
complementary_data = dict(transition.get("complementary_data") or {})
reserved = {ACTION, REWARD, DONE, TRUNCATED, INFO}
for key, value in batch.items():
if key in reserved or key.startswith(OBS_PREFIX):
continue
complementary_data.setdefault(key, value)
return create_transition(
observation=transition.get("observation"),
action=transition.get("action"),
reward=transition.get("reward", 0.0),
done=transition.get("done", False),
truncated=transition.get("truncated", False),
info=transition.get("info", {}),
complementary_data=complementary_data,
)
def make_evo1_pre_post_processors(
config: Evo1Config,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
input_steps = [
RenameObservationsProcessorStep(rename_map={}),
AddBatchDimensionProcessorStep(),
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
DeviceProcessorStep(device=config.device),
]
output_steps = [
UnnormalizerProcessorStep(
features=config.output_features,
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
name=POLICY_PREPROCESSOR_DEFAULT_NAME,
to_transition=evo1_batch_to_transition,
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps,
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
),
)
+16 -2
View File
@@ -47,6 +47,7 @@ from lerobot.utils.feature_utils import dataset_to_policy_features
from .act.configuration_act import ACTConfig
from .diffusion.configuration_diffusion import DiffusionConfig
from .eo1.configuration_eo1 import EO1Config
from .evo1.configuration_evo1 import Evo1Config
from .groot.configuration_groot import GrootConfig
from .multi_task_dit.configuration_multi_task_dit import MultiTaskDiTConfig
from .pi0.configuration_pi0 import PI0Config
@@ -88,7 +89,7 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
Args:
name: The name of the policy. Supported names are "tdmpc", "diffusion", "act",
"multi_task_dit", "vqbet", "pi0", "pi05", "sac", "smolvla", "wall_x".
"multi_task_dit", "vqbet", "pi0", "pi05", "sac", "smolvla", "wall_x", "eo1", "evo1".
Returns:
The policy class corresponding to the given name.
@@ -151,6 +152,10 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
from .eo1.modeling_eo1 import EO1Policy
return EO1Policy
elif name == "evo1":
from .evo1.modeling_evo1 import EVO1Policy
return EVO1Policy
else:
try:
return _get_policy_cls_from_policy_name(name=name)
@@ -168,7 +173,7 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
Args:
policy_type: The type of the policy. Supported types include "tdmpc",
"multi_task_dit", "diffusion", "act", "vqbet", "pi0", "pi05", "sac",
"smolvla", "wall_x".
"smolvla", "wall_x", "eo1", "evo1".
**kwargs: Keyword arguments to be passed to the configuration class constructor.
Returns:
@@ -203,6 +208,8 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
return WallXConfig(**kwargs)
elif policy_type == "eo1":
return EO1Config(**kwargs)
elif policy_type == "evo1":
return Evo1Config(**kwargs)
else:
try:
config_cls = PreTrainedConfig.get_choice_class(policy_type)
@@ -413,6 +420,13 @@ def make_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, Evo1Config):
from .evo1.processor_evo1 import make_evo1_pre_post_processors
processors = make_evo1_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
else:
try:
@@ -97,8 +97,8 @@ class VQBeTConfig(PreTrainedConfig):
vision_backbone: str = "resnet18"
crop_shape: tuple[int, int] | None = (84, 84)
crop_is_random: bool = True
pretrained_backbone_weights: str | None = None
use_group_norm: bool = True
pretrained_backbone_weights: str | None = "ResNet18_Weights.IMAGENET1K_V1"
use_group_norm: bool = False
spatial_softmax_num_keypoints: int = 32
# VQ-VAE
n_vqvae_training_steps: int = 20000
@@ -939,7 +939,7 @@ class Qwen2_5_VLFlashAttention2(Qwen2_5_VLAttention):
input_dtype = query_states.dtype
if input_dtype == torch.float32:
if torch.is_autocast_enabled():
target_dtype = torch.get_autocast_gpu_dtype()
target_dtype = torch.get_autocast_dtype(query_states.device.type)
# Handle the case where the model is quantized
elif hasattr(self.config, "_pre_quantization_dtype"):
target_dtype = self.config._pre_quantization_dtype
@@ -985,7 +985,7 @@ class Florence2FlashAttention2(Florence2Attention):
input_dtype = query_states.dtype
if input_dtype == torch.float32:
if torch.is_autocast_enabled():
target_dtype = torch.get_autocast_gpu_dtype()
target_dtype = torch.get_autocast_dtype(query_states.device.type)
# Handle the case where the model is quantized
elif hasattr(self.config, "_pre_quantization_dtype"):
target_dtype = self.config._pre_quantization_dtype
+2 -1
View File
@@ -40,7 +40,7 @@ from .converters import (
)
from .delta_action_processor import MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep
from .device_processor import DeviceProcessorStep
from .env_processor import IsaaclabArenaProcessorStep, LiberoProcessorStep
from .env_processor import IsaaclabArenaProcessorStep, LiberoActionProcessorStep, LiberoProcessorStep
from .factory import (
make_default_processors,
make_default_robot_action_processor,
@@ -149,6 +149,7 @@ __all__ = [
"RewardProcessorStep",
"DataProcessorPipeline",
"IsaaclabArenaProcessorStep",
"LiberoActionProcessorStep",
"LiberoProcessorStep",
"TimeLimitProcessorStep",
"AddBatchDimensionProcessorStep",
+43 -3
View File
@@ -18,9 +18,9 @@ from dataclasses import dataclass
import torch
from lerobot.configs import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.utils.constants import OBS_IMAGES, OBS_PREFIX, OBS_STATE, OBS_STR
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_PREFIX, OBS_STATE, OBS_STR
from .pipeline import ObservationProcessorStep, ProcessorStepRegistry
from .pipeline import ActionProcessorStep, ObservationProcessorStep, ProcessorStepRegistry
@dataclass
@@ -46,6 +46,8 @@ class LiberoProcessorStep(ObservationProcessorStep):
- This accounts for the HuggingFaceVLA/libero camera orientation convention.
"""
max_state_dim: int | None = None
def _process_observation(self, observation):
"""
Processes both image and robot_state observations from LIBERO.
@@ -78,6 +80,15 @@ class LiberoProcessorStep(ObservationProcessorStep):
state = state.float()
if state.dim() == 1:
state = state.unsqueeze(0)
if self.max_state_dim is not None:
if state.shape[-1] > self.max_state_dim:
raise ValueError(
f"LIBERO state has {state.shape[-1]} dims, which is larger than "
f"configured max_state_dim={self.max_state_dim}."
)
if state.shape[-1] < self.max_state_dim:
pad_width = self.max_state_dim - state.shape[-1]
state = torch.nn.functional.pad(state, (0, pad_width))
processed_obs[OBS_STATE] = state
return processed_obs
@@ -101,7 +112,7 @@ class LiberoProcessorStep(ObservationProcessorStep):
# add our new flattened state
state_feats[OBS_STATE] = PolicyFeature(
type=FeatureType.STATE,
shape=(8,), # [eef_pos(3), axis_angle(3), gripper(2)]
shape=(self.max_state_dim or 8,), # [eef_pos(3), axis_angle(3), gripper(2)] plus padding
)
new_features[FeatureType.STATE] = state_feats
@@ -111,6 +122,9 @@ class LiberoProcessorStep(ObservationProcessorStep):
def observation(self, observation):
return self._process_observation(observation)
def get_config(self) -> dict:
return {"max_state_dim": self.max_state_dim}
def _quat2axisangle(self, quat: torch.Tensor) -> torch.Tensor:
"""
Convert batched quaternions to axis-angle format.
@@ -153,6 +167,32 @@ class LiberoProcessorStep(ObservationProcessorStep):
return result
@dataclass
@ProcessorStepRegistry.register(name="libero_action_processor")
class LiberoActionProcessorStep(ActionProcessorStep):
"""Slices padded policy actions back to the executable LIBERO action space."""
action_dim: int = 7
def action(self, action):
if action.shape[-1] < self.action_dim:
raise ValueError(
f"LIBERO action has {action.shape[-1]} dims, which is smaller than action_dim={self.action_dim}."
)
return action[..., : self.action_dim]
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
new_features = {ft: feats.copy() for ft, feats in features.items()}
action_feats = new_features.setdefault(FeatureType.ACTION, {})
action_feats[ACTION] = PolicyFeature(type=FeatureType.ACTION, shape=(self.action_dim,))
return new_features
def get_config(self) -> dict:
return {"action_dim": self.action_dim}
@dataclass
@ProcessorStepRegistry.register(name="isaaclab_arena_processor")
class IsaaclabArenaProcessorStep(ObservationProcessorStep):
+1 -1
View File
@@ -46,7 +46,7 @@ class LeKiwiConfig(RobotConfig):
cameras: dict[str, CameraConfig] = field(default_factory=lekiwi_cameras_config)
# Set to `True` for backward compatibility with previous policies/dataset
use_degrees: bool = False
use_degrees: bool = True
@dataclass
+3 -4
View File
@@ -23,7 +23,6 @@ from lerobot.utils.robot_utils import precise_sleep
from ..context import RolloutContext
from .core import RolloutStrategy, send_next_action
from .display import BaseDisplay
logger = logging.getLogger(__name__)
@@ -39,8 +38,6 @@ class BaseStrategy(RolloutStrategy):
"""Initialise the inference engine."""
self._init_engine(ctx)
logger.info("Base strategy ready")
self._display = BaseDisplay(duration=ctx.runtime.cfg.duration)
self._display.show_banner()
def run(self, ctx: RolloutContext) -> None:
"""Run the autonomous control loop until shutdown or duration expires."""
@@ -75,7 +72,9 @@ class BaseStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
self._warn_slow_loop(dt, control_interval, cfg.fps)
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
def teardown(self, ctx: RolloutContext) -> None:
"""Disconnect hardware and stop inference."""
-12
View File
@@ -33,7 +33,6 @@ from ..inference import InferenceEngine
if TYPE_CHECKING:
from ..configs import RolloutStrategyConfig
from ..context import HardwareContext, ProcessorContext, RolloutContext, RuntimeContext
from .display import RolloutStatusDisplay
logger = logging.getLogger(__name__)
@@ -52,17 +51,6 @@ class RolloutStrategy(abc.ABC):
self._interpolator: ActionInterpolator | None = None
self._warmup_flushed: bool = False
self._cached_obs_processed: dict | None = None
self._display: RolloutStatusDisplay | None = None
def _warn_slow_loop(self, dt: float, control_interval: float, fps: float) -> None:
"""Warn when the control loop runs slower than the target FPS."""
if dt > control_interval:
logger.warning(
"Control loop running slower (%.1f Hz) than target (%.0f Hz). "
"Possible causes: camera FPS not keeping up, slow policy inference, CPU starvation.",
1 / dt,
fps,
)
def _init_engine(self, ctx: RolloutContext) -> None:
"""Attach the inference engine and action interpolator, then start the backend.
+7 -30
View File
@@ -71,7 +71,6 @@ from ..configs import DAggerKeyboardConfig, DAggerPedalConfig, DAggerStrategyCon
from ..context import RolloutContext
from ..robot_wrapper import ThreadSafeRobot
from .core import RolloutStrategy, estimate_max_episode_seconds, safe_push_to_hub, send_next_action
from .display import DAggerDisplay
PYNPUT_AVAILABLE = _pynput_available
keyboard = None
@@ -287,7 +286,7 @@ def _init_dagger_keyboard(events: DAggerEvents, cfg: DAggerKeyboardConfig):
listener = keyboard.Listener(on_press=on_press)
listener.start()
logger.debug(
logger.info(
"DAgger keyboard listener started (pause_resume='%s', correction='%s', upload='%s', ESC=stop)",
cfg.pause_resume,
cfg.correction,
@@ -371,28 +370,6 @@ class DAggerStrategy(RolloutStrategy):
self._episode_duration_s,
)
if self.config.input_device == "keyboard":
kb = self.config.keyboard
pause_key, correction_key, upload_key = (
kb.pause_resume.upper(),
kb.correction.upper(),
kb.upload.upper(),
)
else:
pb = self.config.pedal
pause_key, correction_key, upload_key = pb.pause_resume, pb.correction, pb.upload
self._display = DAggerDisplay(
record_autonomous=self.config.record_autonomous,
num_episodes=self.config.num_episodes,
episode_duration_s=self._episode_duration_s,
input_device=self.config.input_device,
pause_key=pause_key,
correction_key=correction_key,
upload_key=upload_key,
)
self._display.show_banner()
def run(self, ctx: RolloutContext) -> None:
"""Run DAgger episodes with human-in-the-loop intervention."""
if self.config.record_autonomous:
@@ -465,7 +442,6 @@ class DAggerStrategy(RolloutStrategy):
interpolator.reset()
events.reset()
engine.resume()
self._display.show_state(DAggerPhase.AUTONOMOUS)
last_action: dict[str, Any] | None = None
record_tick = 0
@@ -496,7 +472,6 @@ class DAggerStrategy(RolloutStrategy):
ctx,
last_action,
)
self._display.show_state(new_phase)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
@@ -581,7 +556,9 @@ class DAggerStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
self._warn_slow_loop(dt, control_interval, cfg.fps)
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
finally:
logger.info("DAgger continuous control loop ended — pausing engine")
@@ -622,7 +599,6 @@ class DAggerStrategy(RolloutStrategy):
interpolator.reset()
events.reset()
engine.resume()
self._display.show_state(DAggerPhase.AUTONOMOUS)
last_action: dict[str, Any] | None = None
start_time = time.perf_counter()
@@ -657,7 +633,6 @@ class DAggerStrategy(RolloutStrategy):
ctx,
last_action,
)
self._display.show_state(new_phase)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
@@ -730,7 +705,9 @@ class DAggerStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
self._warn_slow_loop(dt, control_interval, cfg.fps)
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
finally:
logger.info("DAgger corrections-only loop ended — pausing engine")
-263
View File
@@ -1,263 +0,0 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Console status display for rollout strategies.
One subclass per strategy static states/controls are declared as class
constants; runtime-dependent values are passed to ``__init__``.
In each strategy's ``setup()``:
self._display = DAggerDisplay(
record_autonomous=self.config.record_autonomous,
num_episodes=self.config.num_episodes,
episode_duration_s=self._episode_duration_s,
input_device=self.config.input_device,
pause_key="SPACE",
correction_key="TAB",
upload_key="ENTER",
)
self._display.show_banner()
On each state transition:
self._display.show_state("correcting")
"""
from __future__ import annotations
import enum
import sys
from dataclasses import dataclass
def _supports_color() -> bool:
return hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
class _C:
"""ANSI escape codes."""
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[1;92m"
YELLOW = "\033[1;93m"
RED = "\033[1;91m"
CYAN = "\033[1;96m"
WHITE = "\033[1;97m"
GRAY = "\033[2;37m"
@dataclass
class StateConfig:
"""One named rollout state.
``key`` must match the string passed to ``RolloutStatusDisplay.show_state()``.
"""
key: str
emoji: str
label: str
description: str
color: str = _C.WHITE
@dataclass
class ControlConfig:
"""One keyboard/pedal binding shown in the startup banner."""
key: str
description: str
# ---------------------------------------------------------------------------
# Base display class
# ---------------------------------------------------------------------------
class RolloutStatusDisplay:
"""Unified console status display. Subclass once per strategy."""
def __init__(
self,
strategy: str,
states: list[StateConfig],
controls: list[ControlConfig],
info: list[str] | None = None,
) -> None:
self.strategy = strategy
self._states = {s.key: s for s in states}
self._controls = controls
self._info = info or []
self._use_color = _supports_color()
def _c(self, code: str, text: str) -> str:
if not self._use_color:
return text
return f"{code}{text}{_C.RESET}"
def show_banner(self) -> None:
"""Print startup banner: strategy name, states, controls, config info."""
width = 62
sep = self._c(_C.BOLD, "" * width)
print(f"\n{sep}")
print(self._c(_C.BOLD, f" lerobot-rollout │ {self.strategy}"))
if self._states:
print()
for state in self._states.values():
label = self._c(state.color, f"{state.label:<14}")
desc = self._c(_C.GRAY, state.description)
print(f" {state.emoji} {label} {desc}")
if self._controls:
print()
key_width = max(len(c.key) for c in self._controls)
for ctrl in self._controls:
key_str = self._c(_C.CYAN, f"[{ctrl.key:<{key_width}}]")
print(f" {key_str} {ctrl.description}")
if self._info:
print()
for item in self._info:
print(f" {item}")
print(f"{sep}\n")
def show_state(self, state_key: str | enum.Enum) -> None:
"""Print the current state and available controls - call this on every transition."""
key = state_key.value if isinstance(state_key, enum.Enum) else state_key
state = self._states.get(key)
if state is None:
return
label = self._c(state.color, f"{state.label:<14}")
desc = self._c(_C.GRAY, state.description)
print(f"\n {state.emoji} {label} {desc}\n")
if self._controls:
key_width = max(len(c.key) for c in self._controls)
for ctrl in self._controls:
key_str = self._c(_C.CYAN, f"[{ctrl.key:<{key_width}}]")
print(f" {key_str} {ctrl.description}")
print()
# ---------------------------------------------------------------------------
# One display subclass per strategy
# ---------------------------------------------------------------------------
class BaseDisplay(RolloutStatusDisplay):
"""Status display for the base (eval-only, no recording) strategy."""
_STATES = [StateConfig("running", "🟢", "RUNNING", "autonomous rollout — no recording", _C.GREEN)]
_CONTROLS = [ControlConfig("Ctrl+C", "stop session")]
def __init__(self, duration: float = 0) -> None:
info = ["No recording — evaluation only."]
if duration > 0:
info.append(f"Duration: {duration:.0f}s")
super().__init__("base", self._STATES, self._CONTROLS, info)
class SentryDisplay(RolloutStatusDisplay):
"""Status display for the sentry (continuous autonomous recording) strategy."""
_STATES = [StateConfig("recording", "🟢", "RECORDING", "continuous autonomous recording", _C.GREEN)]
_CONTROLS = [ControlConfig("Ctrl+C", "stop session")]
def __init__(self, episode_duration_s: float, upload_every_n_episodes: int) -> None:
info = [
f"Episode rotation: ~{episode_duration_s:.0f}s | "
f"Upload every {upload_every_n_episodes} episodes",
]
super().__init__("sentry", self._STATES, self._CONTROLS, info)
class HighlightDisplay(RolloutStatusDisplay):
"""Status display for the highlight (ring-buffer on-demand save) strategy."""
def __init__(self, ring_buffer_seconds: float, save_key: str, push_key: str) -> None:
states = [
StateConfig(
"buffering",
"",
"BUFFERING",
f"ring buffer active — last {ring_buffer_seconds:.0f}s captured",
_C.WHITE,
),
StateConfig("recording", "🔴", "RECORDING", "live recording — press [s] to save episode", _C.RED),
]
controls = [
ControlConfig(save_key, "BUFFERING ↔ RECORDING start recording / save episode"),
ControlConfig(push_key, "push dataset to Hub (background)"),
ControlConfig("ESC", "stop session"),
]
super().__init__("highlight", states, controls)
class DAggerDisplay(RolloutStatusDisplay):
"""Status display for the dagger (human-in-the-loop) strategy."""
_PAUSED_STATE = StateConfig("paused", "🟡", "PAUSED", "holding last position — awaiting input", _C.YELLOW)
_CORRECTING_STATE = StateConfig(
"correcting", "🔴", "CORRECTING", "human teleop active — recording correction", _C.RED
)
def __init__(
self,
record_autonomous: bool,
num_episodes: int,
episode_duration_s: float,
input_device: str,
pause_key: str,
correction_key: str,
upload_key: str,
) -> None:
mode = "continuous recording" if record_autonomous else "corrections only"
auto_desc = "policy running — recording" if record_autonomous else "policy running — no recording"
states = [
StateConfig("autonomous", "🟢", "AUTONOMOUS", auto_desc, _C.GREEN),
self._PAUSED_STATE,
self._CORRECTING_STATE,
]
controls = [
ControlConfig(pause_key, "AUTONOMOUS ↔ PAUSED pause / resume policy"),
ControlConfig(correction_key, "PAUSED ↔ CORRECTING start / stop correction"),
ControlConfig(upload_key, "push dataset to Hub"),
ControlConfig("ESC", "stop session"),
]
info = [f"Target: {num_episodes} episodes | Input: {input_device}"]
if record_autonomous:
info.append(f"Episode rotation: ~{episode_duration_s:.0f}s")
super().__init__(f"dagger [{mode}]", states, controls, info)
if __name__ == "__main__":
dagger_display = DAggerDisplay(
record_autonomous=False,
num_episodes=20,
episode_duration_s=30,
input_device="keyboard",
pause_key="SPACE",
correction_key="TAB",
upload_key="ENTER",
)
dagger_display.show_banner()
dagger_display.show_state("paused")
dagger_display.show_state("correcting")
dagger_display.show_state("paused")
dagger_display.show_state("autonomous")
+4 -20
View File
@@ -17,7 +17,6 @@
from __future__ import annotations
import contextlib
import enum
import logging
import os
import sys
@@ -37,7 +36,6 @@ from ..configs import HighlightStrategyConfig
from ..context import RolloutContext
from ..ring_buffer import RolloutRingBuffer
from .core import RolloutStrategy, safe_push_to_hub, send_next_action
from .display import HighlightDisplay
PYNPUT_AVAILABLE = _pynput_available
keyboard = None
@@ -55,13 +53,6 @@ if PYNPUT_AVAILABLE:
logger = logging.getLogger(__name__)
class HighlightPhase(enum.Enum):
"""Observable phases of a Highlight session."""
BUFFERING = "buffering" # Ring buffer accumulating frames, not recording
RECORDING = "recording" # Live recording active
class HighlightStrategy(RolloutStrategy):
"""Autonomous rollout with on-demand recording via ring buffer.
@@ -114,13 +105,6 @@ class HighlightStrategy(RolloutStrategy):
self.config.save_key,
self.config.push_key,
)
self._display = HighlightDisplay(
ring_buffer_seconds=self.config.ring_buffer_seconds,
save_key=self.config.save_key,
push_key=self.config.push_key,
)
self._display.show_banner()
self._display.show_state(HighlightPhase.BUFFERING)
def run(self, ctx: RolloutContext) -> None:
"""Run the autonomous loop, buffering frames and recording on demand."""
@@ -178,7 +162,6 @@ class HighlightStrategy(RolloutStrategy):
for buffered_frame in ring.drain():
dataset.add_frame(buffered_frame)
self._recording_live.set()
self._display.show_state(HighlightPhase.RECORDING)
else:
dataset.add_frame(frame)
with self._episode_lock:
@@ -189,7 +172,6 @@ class HighlightStrategy(RolloutStrategy):
play_sounds,
)
self._recording_live.clear()
self._display.show_state(HighlightPhase.BUFFERING)
continue # frame already consumed — skip ring.append
if self._push_requested.is_set():
@@ -206,7 +188,9 @@ class HighlightStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
self._warn_slow_loop(dt, control_interval, cfg.fps)
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
finally:
logger.info("Highlight control loop ended")
@@ -271,7 +255,7 @@ class HighlightStrategy(RolloutStrategy):
self._listener = keyboard.Listener(on_press=on_press)
self._listener.start()
logger.debug("Keyboard listener started (save='%s', push='%s', ESC=stop)", save_key, push_key)
logger.info("Keyboard listener started (save='%s', push='%s', ESC=stop)", save_key, push_key)
except ImportError:
logger.warning("pynput not available — keyboard listener disabled")
+3 -7
View File
@@ -32,7 +32,6 @@ from lerobot.utils.utils import log_say
from ..configs import SentryStrategyConfig
from ..context import RolloutContext
from .core import RolloutStrategy, estimate_max_episode_seconds, safe_push_to_hub, send_next_action
from .display import SentryDisplay
logger = logging.getLogger(__name__)
@@ -80,11 +79,6 @@ class SentryStrategy(RolloutStrategy):
self._episode_duration_s,
self.config.upload_every_n_episodes,
)
self._display = SentryDisplay(
episode_duration_s=self._episode_duration_s,
upload_every_n_episodes=self.config.upload_every_n_episodes,
)
self._display.show_banner()
def run(self, ctx: RolloutContext) -> None:
"""Run the continuous recording loop with automatic episode rotation."""
@@ -166,7 +160,9 @@ class SentryStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
self._warn_slow_loop(dt, control_interval, cfg.fps)
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
finally:
logger.info("Sentry control loop ended — saving final episode")
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:54aecbc1af72a4cd5e9261492f5e7601890517516257aacdf2a0ffb3ce281f1b
oid sha256:51effd76b73e972f10d31f5084ab906386134b600c87b2668767d30232a902bd
size 992
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:88a9c3775a2aa1e90a08850521970070a4fcf0f6b82aab43cd8ccc5cf77e0013
size 47424
oid sha256:d4d7a16ca67f9adefac0e0620a7b2e9c822f2db42faaaced7a89fbad60e5ead4
size 47680
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:91a2635e05a75fe187a5081504c5f35ce3417378813fa2deaf9ca4e8200e1819
oid sha256:796c439ee8a64bf9901ff8325e7419bda8bd316360ee95e6304e8e1ae0f4c36c
size 68
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:645bff922ac7bea63ad018ebf77c303c0e4cd2c1c0dc5ef3192865281bef3dc6
size 47424
oid sha256:ad33a8b47c39c2e1374567ff9da43cdb95e2dbe904c1b02a35051346d3043095
size 47680
+79 -2
View File
@@ -7,11 +7,14 @@ from dataclasses import dataclass, field
import gymnasium as gym
import pytest
import torch
from gymnasium.envs.registration import register, registry as gym_registry
from lerobot.configs.types import PolicyFeature
from lerobot.envs.configs import EnvConfig
from lerobot.envs.configs import EnvConfig, LiberoEnv
from lerobot.envs.factory import make_env, make_env_config, make_env_pre_post_processors
from lerobot.processor import LiberoActionProcessorStep, LiberoProcessorStep
from lerobot.utils.constants import OBS_PREFIX, OBS_STATE
logger = logging.getLogger(__name__)
@@ -61,6 +64,80 @@ def test_processors_delegation():
assert len(pre.steps) == 0
def test_processors_delegation_supports_legacy_override_signature():
"""External EnvConfig subclasses with the old get_env_processors() signature keep working."""
from lerobot.processor.pipeline import DataProcessorPipeline
@EnvConfig.register_subclass("_dispatch_legacy_proc_test")
@dataclass
class _Env(EnvConfig):
task: str = "x"
features: dict[str, PolicyFeature] = field(default_factory=dict)
@property
def gym_kwargs(self):
return {}
def get_env_processors(self):
return DataProcessorPipeline(steps=[]), DataProcessorPipeline(steps=[])
pre, post = make_env_pre_post_processors(_Env(), policy_cfg=object())
assert isinstance(pre, DataProcessorPipeline)
assert isinstance(post, DataProcessorPipeline)
def test_libero_evo1_processors_use_padded_state_and_env_action_dim():
"""EVO1 uses padded LIBERO state features while env actions stay executable."""
class _Evo1Config:
type = "evo1"
max_state_dim = 24
cfg = LiberoEnv()
pre, post = make_env_pre_post_processors(cfg, policy_cfg=_Evo1Config())
assert isinstance(pre.steps[0], LiberoProcessorStep)
assert pre.steps[0].max_state_dim == 24
assert isinstance(post.steps[0], LiberoActionProcessorStep)
assert post.steps[0].action_dim == cfg.features["action"].shape[0] == 7
class _OtherConfig:
type = "other"
pre_other, _ = make_env_pre_post_processors(cfg, policy_cfg=_OtherConfig())
assert pre_other.steps[0].max_state_dim is None
def test_libero_processor_pads_state_to_max_dim():
step = LiberoProcessorStep(max_state_dim=24)
observation = {
OBS_PREFIX
+ "robot_state": {
"eef": {
"pos": torch.tensor([[1.0, 2.0, 3.0]]),
"quat": torch.tensor([[0.0, 0.0, 0.0, 1.0]]),
},
"gripper": {"qpos": torch.tensor([[4.0, 5.0]])},
}
}
state = step.observation(observation)[OBS_STATE]
assert state.shape == (1, 24)
assert torch.allclose(state[:, :8], torch.tensor([[1.0, 2.0, 3.0, 0.0, 0.0, 0.0, 4.0, 5.0]]))
assert torch.count_nonzero(state[:, 8:]).item() == 0
def test_libero_action_processor_slices_padded_action():
step = LiberoActionProcessorStep(action_dim=7)
action = torch.arange(2 * 3 * 24, dtype=torch.float32).reshape(2, 3, 24)
sliced = step.action(action)
assert sliced.shape == (2, 3, 7)
assert torch.equal(sliced, action[..., :7])
with pytest.raises(ValueError, match="smaller than action_dim=7"):
step.action(torch.zeros(2, 6))
def test_base_create_envs():
"""Base class create_envs() should build a single-task VectorEnv via gym.make()."""
gym_id = "_dispatch_test/CartPole-v99"
@@ -136,7 +213,7 @@ def test_custom_get_env_processors_override():
def gym_kwargs(self):
return {}
def get_env_processors(self):
def get_env_processors(self, policy_cfg=None):
return DataProcessorPipeline(steps=[]), DataProcessorPipeline(steps=[])
pre, post = _Env().get_env_processors()
+298
View File
@@ -0,0 +1,298 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import torch
from torch import nn
import lerobot.policies.evo1.modeling_evo1 as modeling_evo1
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.policies.evo1.configuration_evo1 import Evo1Config
from lerobot.policies.evo1.flow_matching import FlowmatchingActionHead
from lerobot.policies.factory import get_policy_class, make_policy_config
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE
STATE_DIM = 4
ACTION_DIM = 3
MAX_STATE_DIM = 6
MAX_ACTION_DIM = 5
CHUNK_SIZE = 2
EMBED_DIM = 8
class DummyEVO1(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.embedder = nn.Dropout(p=0.0)
self.action_head = nn.Linear(1, 1)
self.get_vl_embeddings_calls = 0
self.grad_enabled_calls = []
self.embedder_training_calls = []
def set_finetune_flags(self):
return None
def get_vl_embeddings(self, images, image_mask, prompt=None, return_cls_only=False):
self.get_vl_embeddings_calls += 1
self.grad_enabled_calls.append(torch.is_grad_enabled())
self.embedder_training_calls.append(self.embedder.training)
return torch.ones(len(images), 4, EMBED_DIM, requires_grad=torch.is_grad_enabled())
def forward(
self,
fused_tokens,
state=None,
actions_gt=None,
action_mask=None,
embodiment_ids=None,
):
batch_size = fused_tokens.shape[0]
if actions_gt is None:
return torch.ones(batch_size, CHUNK_SIZE * MAX_ACTION_DIM)
pred_velocity = torch.zeros(batch_size, CHUNK_SIZE * MAX_ACTION_DIM)
noise = torch.zeros_like(actions_gt)
return pred_velocity, noise
def make_config(training_stage="stage1", **kwargs):
config_kwargs = {
"device": "cpu",
"vlm_model_name": "dummy-internvl3",
"training_stage": training_stage,
"chunk_size": CHUNK_SIZE,
"n_action_steps": 1,
"max_state_dim": MAX_STATE_DIM,
"max_action_dim": MAX_ACTION_DIM,
"max_views": 2,
"embed_dim": EMBED_DIM,
"hidden_dim": 16,
"state_hidden_dim": 16,
"num_heads": 2,
"num_layers": 1,
"num_inference_timesteps": 2,
"input_features": {
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(STATE_DIM,)),
f"{OBS_IMAGES}.front": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 16, 16)),
},
"output_features": {
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(ACTION_DIM,)),
},
}
config_kwargs.update(kwargs)
return Evo1Config(**config_kwargs)
def make_batch(include_action=True):
batch = {
"task": ["pick the block", "place the block"],
OBS_STATE: torch.randn(2, STATE_DIM),
f"{OBS_IMAGES}.front": torch.rand(2, 3, 16, 16),
}
if include_action:
batch[ACTION] = torch.randn(2, CHUNK_SIZE, ACTION_DIM)
return batch
def test_evo1_factory_registration():
cfg = make_policy_config(
"evo1",
device="cpu",
vlm_model_name="dummy-internvl3",
input_features={
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(STATE_DIM,)),
f"{OBS_IMAGES}.front": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 16, 16)),
},
output_features={ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(ACTION_DIM,))},
)
assert isinstance(cfg, Evo1Config)
assert get_policy_class("evo1") is modeling_evo1.EVO1Policy
def test_evo1_stage_defaults_and_consistency():
stage1 = make_config(training_stage="stage1")
assert (stage1.finetune_vlm, stage1.finetune_language_model, stage1.finetune_vision_model) == (
False,
False,
False,
)
assert stage1.finetune_action_head is True
stage2 = make_config(training_stage="stage2")
assert (stage2.finetune_vlm, stage2.finetune_language_model, stage2.finetune_vision_model) == (
True,
True,
True,
)
assert stage2.finetune_action_head is True
stage2_from_stage1_checkpoint_flags = make_config(
training_stage="stage2",
finetune_vlm=False,
finetune_language_model=False,
finetune_vision_model=False,
finetune_action_head=False,
)
assert (
stage2_from_stage1_checkpoint_flags.finetune_vlm,
stage2_from_stage1_checkpoint_flags.finetune_language_model,
stage2_from_stage1_checkpoint_flags.finetune_vision_model,
) == (
True,
True,
True,
)
assert stage2_from_stage1_checkpoint_flags.finetune_action_head is True
explicit_off = make_config(
training_stage="stage2",
apply_training_stage_defaults=False,
finetune_vlm=False,
finetune_language_model=False,
finetune_vision_model=False,
finetune_action_head=False,
)
assert (
explicit_off.finetune_vlm,
explicit_off.finetune_language_model,
explicit_off.finetune_vision_model,
) == (
False,
False,
False,
)
assert explicit_off.finetune_action_head is False
try:
make_config(
training_stage="stage2",
apply_training_stage_defaults=False,
finetune_vlm=True,
finetune_language_model=False,
)
except ValueError as exc:
assert "Inconsistent EVO1 finetune config" in str(exc)
else:
raise AssertionError("Expected inconsistent finetune config to raise ValueError")
def test_evo1_policy_forward_and_inference_use_batched_embedding(monkeypatch):
monkeypatch.setattr(modeling_evo1, "EVO1", DummyEVO1)
policy = modeling_evo1.EVO1Policy(make_config())
loss, metrics = policy.forward(make_batch(include_action=True))
assert loss.ndim == 0
assert torch.isfinite(loss)
assert metrics["active_action_dims"] == ACTION_DIM * CHUNK_SIZE
assert policy.model.get_vl_embeddings_calls == 1
action_chunk = policy.predict_action_chunk(make_batch(include_action=False))
assert action_chunk.shape == (2, CHUNK_SIZE, ACTION_DIM)
policy.reset()
selected = policy.select_action(make_batch(include_action=False))
assert selected.shape == (2, ACTION_DIM)
def test_stage1_frozen_vlm_embeddings_do_not_track_gradients(monkeypatch):
monkeypatch.setattr(modeling_evo1, "EVO1", DummyEVO1)
policy = modeling_evo1.EVO1Policy(make_config(training_stage="stage1"))
policy.train()
image_batches, image_masks = policy._collect_image_batches(make_batch(include_action=False))
fused_tokens = policy._compute_fused_tokens(["pick", "place"], image_batches, image_masks)
assert policy.model.grad_enabled_calls == [False]
assert policy.model.embedder_training_calls == [False]
assert not fused_tokens.requires_grad
assert policy.model.embedder.training is True
def test_stage2_vlm_embeddings_track_gradients(monkeypatch):
monkeypatch.setattr(modeling_evo1, "EVO1", DummyEVO1)
policy = modeling_evo1.EVO1Policy(make_config(training_stage="stage2"))
policy.train()
image_batches, image_masks = policy._collect_image_batches(make_batch(include_action=False))
fused_tokens = policy._compute_fused_tokens(["pick", "place"], image_batches, image_masks)
assert policy.model.grad_enabled_calls == [True]
assert policy.model.embedder_training_calls == [True]
assert fused_tokens.requires_grad
def test_collect_image_batches_handles_unbatched_chw(monkeypatch):
# Regression for an issue where batch_size was read from shape[0] before normalizing
# per-camera tensor dims, so an unbatched (C, H, W) input was treated as batch_size=C.
monkeypatch.setattr(modeling_evo1, "EVO1", DummyEVO1)
policy = modeling_evo1.EVO1Policy(make_config())
batch = {
OBS_STATE: torch.randn(1, STATE_DIM),
f"{OBS_IMAGES}.front": torch.rand(3, 16, 16),
}
image_batches, image_masks = policy._collect_image_batches(batch)
assert len(image_batches) == 1
assert len(image_batches[0]) == policy.config.max_views
assert image_masks.tolist() == [[True, False]]
def test_evo1_action_mask_accepts_chunk_size_one(monkeypatch):
monkeypatch.setattr(modeling_evo1, "EVO1", DummyEVO1)
config = make_config(chunk_size=1, n_action_steps=1)
policy = modeling_evo1.EVO1Policy(config)
batch = make_batch(include_action=True)
batch[ACTION] = torch.randn(2, ACTION_DIM)
batch["action_mask"] = torch.ones(2, ACTION_DIM, dtype=torch.bool)
actions, action_mask = policy._prepare_actions(batch)
assert actions.shape == (2, 1, MAX_ACTION_DIM)
assert action_mask.shape == (2, 1, MAX_ACTION_DIM)
assert action_mask[:, :, :ACTION_DIM].all()
assert not action_mask[:, :, ACTION_DIM:].any()
def test_flowmatching_dict_config_enables_state_encoder_for_horizon_one():
head = FlowmatchingActionHead(
config={
"embed_dim": EMBED_DIM,
"hidden_dim": 16,
"action_dim": ACTION_DIM,
"horizon": 1,
"per_action_dim": ACTION_DIM,
"num_heads": 2,
"num_layers": 1,
"num_inference_timesteps": 2,
"state_dim": STATE_DIM,
"state_hidden_dim": 16,
"num_categories": 1,
}
)
assert head.state_encoder is not None
pred_velocity, noise = head(
torch.randn(2, 4, EMBED_DIM),
state=torch.randn(2, STATE_DIM),
actions_gt=torch.randn(2, 1, ACTION_DIM),
action_mask=torch.ones(2, 1, ACTION_DIM, dtype=torch.bool),
)
assert pred_velocity.shape == (2, ACTION_DIM)
assert noise.shape == (2, 1, ACTION_DIM)
Generated
+747 -519
View File
File diff suppressed because it is too large Load Diff