Compare commits

..

25 Commits

Author SHA1 Message Date
Maximellerbach 6021554770 chore(rollout): nice collored cli 2026-05-07 11:12:02 +02:00
Haoming Song e99c55af4b feat(policies): add EO-1 model (#3403)
* feat(policies): add EO-1 model

* chore(eo1): adjust policy_eo1_README.md to to avoid duplicate with eo1.mdx

* chore(eo1): remove policy_eo1_README.md, link eo1.mdx in policy folder

---------

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
2026-05-06 18:01:16 +02:00
Steven Palma 408e0ca763 fix(robots): openarm features with openarmmini (#3524) 2026-05-06 17:03:09 +02:00
Maxime Ellerbach ce24063efd feat(dagger): adding smooth handover (#3506)
* feat(dagger): adding smooth handover


* update docstring


* small phase fix and documenting potential issues


* cleaning up
2026-05-05 14:44:32 +02:00
Steven Palma 82934719db chore(dep): bump transformers to 5.4.0 (#3374)
* fix(deps): breaking change from transformers 5.4.0

* Update src/lerobot/policies/xvla/modeling_florence2.py

Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>

* Update src/lerobot/policies/wall_x/qwen_model/qwen2_5_vl_moe.py

Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>

* removing dataclass

* bumping transformers 5.4.0

* weird i can't even pass the test on main

* oops, typo

* chore(style): fix pre-commit run

* chore: update uv.lock

* seems like a weird numerical precision issue, lets check in runners

* chore: update uv.lock

* chore(dependecies): adjust transformers version

* chore: update uv.lock

---------

Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>
Co-authored-by: Maximellerbach <maxime.ellerbach@huggingface.co>
Co-authored-by: raushan <raushan@huggingface.co>
2026-05-05 14:19:09 +02:00
Steven Palma 401a217597 chore(ci): increase time stale (#3507) 2026-05-04 22:35:16 +02:00
Steven Palma 40094b0464 chore(ci): upgrade docker internal (#3505) 2026-05-04 21:28:52 +02:00
Jash Shah fdbfc015a2 fix(peft): fix LoRA resume from Hub (PosixPath + double wrap) (#3485) 2026-05-04 10:52:37 +02:00
Haoming Song d656da8ccc fix(pi): keep training sampling outside compiled forwards (#3487)
Move PI0 and PI0.5 noise/time sampling into the policy wrappers so the compiled PyTorch cores receive them as tensor inputs.

This keeps Beta sampling out of torch.compile on MPS, avoiding aten::_sample_dirichlet compilation errors while preserving the CUDA training path.

Validation: .venv/bin/python -m pre_commit run --files src/lerobot/policies/pi0/modeling_pi0.py src/lerobot/policies/pi05/modeling_pi05.py; .venv/bin/python -m pytest -sv -rs tests/policies/pi0_pi05/test_pi0.py tests/policies/pi0_pi05/test_pi05.py tests/policies/pi0_pi05/test_pi0_rtc.py tests/policies/pi0_pi05/test_pi05_rtc.py

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
2026-04-30 13:21:17 +02:00
Khalil Meftah b5f65e5332 Expose sarm package API and ship reward model card template (#3477)
* chore: List lerobot_rewardmodel_modelcard_template.md in MANIFEST.in

* chore: export SARMConfig, SARMRewardModel, and make_sarm_pre_post_processors from rewards.sarm.
2026-04-29 16:17:16 +02:00
Khalil Meftah cd6b43ea7a fix(train): migrate legacy RA-BC fields in train config loading (#3480) 2026-04-29 16:17:00 +02:00
Steven Palma 2236bbe7a3 fix(rollout): propagate policy-specific CLI config paramaters (#3483)
Co-authored-by: Maxime Ellerbach <maxime.ellerbach@huggingface.co>
2026-04-29 16:13:10 +02:00
Maxime Ellerbach cb0a944941 refactor(datasets): replace untyped dict with typed DatasetInfo dataclass (#3472)
* refactor(datasets): replace untyped dict with typed DatasetInfo dataclass

Introduce typed DatasetInfo dataclass to replace untyped dict representation of info.json.

Changes:
- Add DatasetInfo dataclass with explicit fields and validation
- Implement __post_init__ for shape conversion (list ↔ tuple)
- Add dict-style compatibility layer (__getitem__, __setitem__, .get())
- Add from_dict() and to_dict() for JSON serialization
- Update io_utils to use load_info/write_info with DatasetInfo
- Update dataset utilities and metadata to use attribute access
- Remove aggregate.py dict-style field access
- Add tests fixture support for DatasetInfo

Benefits:
- Type safety with IDE auto-completion
- Validation at construction time
- Explicit schema documentation

* fix pre-commit

* update docstring inside DatasetInfo.from_dict()

* sorts the unknown to have deterministic output

Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>

* refactoring the last few old fieds


* fix crop dataset roi type mismatch


* use consistantly int for data and video_files_size_in_mb

---------

Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>
Co-authored-by: jjolla93 <jjolla93@gmail.com>
2026-04-28 18:40:30 +02:00
Khalil Meftah 8a3d64033f Reward models refactor (#3142)
* feat(rewards): add RewardModelConfig and PreTrainedRewardModel base classes

* refactor(rewards): migrate Classifier from policies/sac/reward_model/ to rewards/classifier/

* refactor(rewards): migrate SARM from policies/sarm/ to rewards/sarm/

* refactor(rewards): add rewards/factory.py and remove reward model code from policies/factory.py

* refactor(rewards): update imports and delete old reward model locations

* test(rewards): add reward model tests and update existing test imports

* fix(rewards): restore full Classifier and SARM implementations

* test(rewards): restore missing CUDA and mixed precision classifier processor tests

* refactor(lerobot_train.py): remove rabc specific configuration and replace it with a generic samplerweight class in lerobot_train

* refactor(lerobot_train.py): add missing sampling weight script

* linter + missing files

* add testing for sampl weighter

* revert some useless changes, improve typing

* update docs

* add automatic detection of the progress path

* remove type exp

* improve comment

* fix: move rabc.py to rewards/sarm/ and update import paths

* refactor(imports): update reward model imports to new module structure

* refactor(imports): update reward model imports to reflect new module structure

* refactor(imports): conditionally import pandas based on availability

* feat(configs): add reward_model field to TrainPipelineConfig and Hub fields to RewardModelConfig

* refactor(policies): remove reward model branches from policy factory and __init__

* refactor(rewards): expand __init__ facade and fix SARMConfig __post_init__ crash

* feat(train): route reward model training through rewards/factory instead of policies/factory

* refactor(train): streamline reward model training logic

* fix(rewards): ensure FileNotFoundError is raised for missing config_file

* refactor(train): update __get_path_fields__ to include reward_model for config loading

* refactor(classifier): remove redundant input normalization in predict_reward method

* fix(train): raise ValueError for non-trainable reward models in train function

* refactor(pretrained_rm): add model card template

* refactor(tests): reward models

* refactor(sarm): update reset method and remove unused action prediction methods

* refactor(wandb): differentiate tags for reward model and policy training in cfg_to_group function

* fix(train): raise ValueError for PEFT usage in reward model training

* refactor(rewards): enhance RewardModelConfig with device handling and delta indices properties

---------

Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co>
2026-04-28 17:56:24 +02:00
Steven Palma 03ee50e08f chore(ci): bump docs workflows (#3476) 2026-04-28 15:06:44 +02:00
Steven Palma ca87ccd941 feat(rollout): decouple policy deployment from data recording with new lerobot-rollout CLI (#3413)
* feat(scripts): lerobot-rollout

* fix(rollout) require dataset in dagger + use duration too

* fix(docs): dagger num_episodes

* test(rollout): fix expectations

* fix(rollout): features check

* fix(rollout): device and task propagation + feature pos + warn fps + move rename_map config

* docs(rollout): edit rename_map instructions

* chore(rollout): multiple minor improvements

* chore(rollout): address coments + minor improvements

* fix(rollout): enable default

* fix(tests): default value RTCConfig

* fix(rollout): robot_observation_processor and notify_observation at policy frequency instead of interpolator rate

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(rollout): prevent relativeactions with sync inference engine

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(rollout): rtc reanchor to non normalized state

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(rollout): fixing the episode length to use hwc (#3469)

also reducing default length to 5 minutes

* feat(rollout): go back to initial position is now a config

* fix(rollout): properly propagating video_files_size_in_mb to lerobot_dataset (#3470)

* chore(rollout): note about dagger correction stage

* chore(docs): update comments and docstring

* fix(test): move rtc relative out of rollout module

* fix(rollout): address the review comments

---------

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
Co-authored-by: Maxime Ellerbach <maxime.ellerbach@huggingface.co>
2026-04-28 00:57:35 +02:00
Steven Palma 77352c495c chore(dependencies): update uv.lock (#3437)
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2026-04-27 23:15:46 +02:00
Steven Palma 05a5223885 fix(pi): avoid peak RAM in PiGemma construction by freeing replaced submodules (#3454)
Co-Authored-By: Daiki Kamata <daiki.kamata@access-company.com>
Co-Authored-By: Jack Vial <jackvial@users.noreply.github.com>
Co-Authored-By: Ajay Anubolu <AjAnubolu@users.noreply.github.com>
Co-Authored-By: Finn F. <F-Fer@users.noreply.github.com>
2026-04-24 17:50:12 +02:00
Steven Palma 580d818aa9 fix(dataset): no default overwrite in lerobot tool recompute stats (#3452) 2026-04-24 15:07:19 +02:00
Steven Palma 587aa82021 fix(imports): realsense import name is platform dependent (#3451) 2026-04-24 12:55:38 +02:00
Chuyao Shen 12b88fce02 not use dataclass (#3414)
Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
2026-04-24 11:26:59 +02:00
masato-ka fc6c94c82a fix(sarm): handle BaseModelOutputWithPooling from transformers 5.x in… (#3419)
* fix(sarm): handle BaseModelOutputWithPooling from transformers 5.x in CLIP encoding

In transformers 5.x, CLIPModel.get_image_features() and get_text_features()
return BaseModelOutputWithPooling instead of a plain torch.FloatTensor.
Added isinstance check to extract pooler_output when the return value is not
a tensor, maintaining backward compatibility with transformers 4.x.

Fixes AttributeError: 'BaseModelOutputWithPooling' object has no attribute 'detach'

* Adding assertion check for pooler_output of CLIP. This change is response to below comment.
https://github.com/huggingface/lerobot/pull/3419#discussion_r3112594387

* Adding assertion check for pooler_output of CLIP. This change is response to below comment. Change to simple check and rise
https://github.com/huggingface/lerobot/pull/3419#discussion_r3126953776

---------
Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
2026-04-23 16:26:58 +02:00
Steven Palma 1add460678 fix(policy): loss normalization for padded actions in ACT, Diffusion, and MultiTaskDiT (#3442)
* Fix loss normalization for padded actions in ACT, Diffusion, and MultiTaskDiT

When action_is_pad masks out padded timesteps, the subsequent .mean()
still divides by the total element count (including zeroed-out padding),
underestimating the loss. With 60-70% padding this can cut the effective
gradient signal by 2-3x.

Replace mask-then-mean with mask-then-sum / valid-count for all three
affected policies. TDMPC is not affected because it sums over time
before averaging over batch.

Fixes #3353

* linting

Co-authored-by: whats2000 <60466660+whats2000@users.noreply.github.com>
Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>

* Update src/lerobot/policies/diffusion/modeling_diffusion.py

Co-authored-by: whats2000 <60466660+whats2000@users.noreply.github.com>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>

* Update src/lerobot/policies/multi_task_dit/modeling_multi_task_dit.py

Co-authored-by: whats2000 <60466660+whats2000@users.noreply.github.com>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>

* Update src/lerobot/policies/multi_task_dit/modeling_multi_task_dit.py

Co-authored-by: whats2000 <60466660+whats2000@users.noreply.github.com>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>

* apply ACT loss normalization suggestion from review

Divide by num_valid (timesteps * action_dim) instead of just timesteps,
matching the diffusion/multi_task_dit fix. Addresses review from
@whats2000 (https://github.com/huggingface/lerobot/pull/3377#discussion_r3106845791).

* fix(test): update safetensor act

---------

Signed-off-by: Maxime Ellerbach <maxime@ellerbach.net>
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Yufeng He <40085740+he-yufeng@users.noreply.github.com>
Co-authored-by: Maxime Ellerbach <maxime@ellerbach.net>
Co-authored-by: whats2000 <60466660+whats2000@users.noreply.github.com>
2026-04-23 15:23:54 +02:00
Qi Jia 4587c2b648 fix xvla docs (#3291)
Co-authored-by: Qi Jia <kaufou@gmail.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-04-23 14:50:32 +02:00
whats2000 2236cdb302 fix(smolvla): correct loss normalization for padded actions (#3434)
Apply the same per-scalar-mean fix to SmolVLA that #3377 landed for
ACT / Diffusion / MultiTaskDiT. The pre-patch form applies the
`action_is_pad` mask to zero out padded timesteps, then calls `.mean()`
(or `.mean(dim=(1, 2))`). Because `.mean()` divides by the total number
of elements including the zeroed padding, the loss is diluted by the
padding fraction.

Fixed by normalizing only over valid (non-padded) scalar entries:

    num_valid = ((~actions_is_pad).sum(...) * losses.shape[-1]).clamp_min(1)
    loss = losses.sum(...) / num_valid

`clamp_min(1)` preserves the all-padded-batch edge case (0/1 = 0). Both
reduction paths are updated. Behavior when `action_is_pad` is missing is
unchanged (`losses.mean()`).

Empirical A/B on aloha_sim_transfer_cube_human (chunk_size=40, batch=2,
30 steps, fixed seed, GB200) shows `loss_A / loss_B = 0.9672 (±0.088)` —
same direction and magnitude as PR #3377's `loss_A / loss_C ≈ 0.96` for
ACT. Heavier-padding recipes will see a larger gap.

Refs: #3353 (original report for ACT), #3377 (fix for the other three
policies).
2026-04-23 10:34:11 +02:00
115 changed files with 5328 additions and 1311 deletions
@@ -33,7 +33,7 @@ jobs:
github.event.workflow_run.event == 'pull_request' &&
github.event.workflow_run.conclusion == 'success' &&
github.repository == 'huggingface/lerobot'
uses: huggingface/doc-builder/.github/workflows/upload_pr_documentation.yml@9ad2de8582b56c017cb530c1165116d40433f1c6 # main
uses: huggingface/doc-builder/.github/workflows/upload_pr_documentation.yml@2430c1ec91d04667414e2fa31ecfc36c153ea391 # main
with:
package_name: lerobot
secrets:
+2 -2
View File
@@ -55,7 +55,7 @@ jobs:
github.repository == 'huggingface/lerobot'
permissions:
contents: read
uses: huggingface/doc-builder/.github/workflows/build_main_documentation.yml@90b4ee2c10b81b5c1a6367c4e6fc9e2fb510a7e3 # main
uses: huggingface/doc-builder/.github/workflows/build_main_documentation.yml@2430c1ec91d04667414e2fa31ecfc36c153ea391 # main
with:
commit_sha: ${{ github.sha }}
package: lerobot
@@ -78,7 +78,7 @@ jobs:
permissions:
contents: read
pull-requests: write
uses: huggingface/doc-builder/.github/workflows/build_pr_documentation.yml@90b4ee2c10b81b5c1a6367c4e6fc9e2fb510a7e3 # main
uses: huggingface/doc-builder/.github/workflows/build_pr_documentation.yml@2430c1ec91d04667414e2fa31ecfc36c153ea391 # main
with:
commit_sha: ${{ github.event.pull_request.head.sha }}
pr_number: ${{ github.event.number }}
+6 -6
View File
@@ -24,14 +24,14 @@ on:
env:
CLOSE_ISSUE_MESSAGE: >
This issue was closed because it has been stalled for 14 days with no activity.
This issue was closed because it has been stalled for 30 days with no activity.
Feel free to reopen if is still relevant, or to ping a collaborator if you have any questions.
CLOSE_PR_MESSAGE: >
This PR was closed because it has been stalled for 21 days with no activity.
This PR was closed because it has been stalled for 30 days with no activity.
Feel free to reopen if is still relevant, or to ping a collaborator if you have any questions.
WARN_ISSUE_MESSAGE: >
This issue has been automatically marked as stale because it has not had
recent activity (6 months). It will be closed if no further activity occurs.
recent activity (1 year). It will be closed if no further activity occurs.
Any change, comment or update to this issue will reset this count.
Thank you for your contributions.
WARN_PR_MESSAGE: >
@@ -59,10 +59,10 @@ jobs:
stale-pr-label: stale
exempt-issue-labels: never-stale
exempt-pr-labels: never-stale
days-before-issue-stale: 180
days-before-issue-close: 14
days-before-issue-stale: 365
days-before-issue-close: 30
days-before-pr-stale: 365
days-before-pr-close: 21
days-before-pr-close: 30
delete-branch: true
close-issue-message: ${{ env.CLOSE_ISSUE_MESSAGE }}
close-pr-message: ${{ env.CLOSE_PR_MESSAGE }}
+1
View File
@@ -1,3 +1,4 @@
include src/lerobot/templates/lerobot_modelcard_template.md
include src/lerobot/templates/lerobot_rewardmodel_modelcard_template.md
include src/lerobot/datasets/card_template.md
include src/lerobot/envs/metaworld_config.json
+7 -11
View File
@@ -18,9 +18,8 @@
# docker build -f docker/Dockerfile.internal -t lerobot-internal .
# Configure the base image for CI with GPU access
# TODO(Steven): Bump these versions
ARG CUDA_VERSION=12.4.1
ARG OS_VERSION=22.04
ARG CUDA_VERSION=12.6.3
ARG OS_VERSION=24.04
FROM nvidia/cuda:${CUDA_VERSION}-base-ubuntu${OS_VERSION}
# Define Python version argument
@@ -36,16 +35,13 @@ ENV DEBIAN_FRONTEND=noninteractive \
# Install Python, system dependencies, and uv (as root)
RUN apt-get update && apt-get install -y --no-install-recommends \
software-properties-common build-essential git curl \
libglib2.0-0 libgl1-mesa-glx libegl1-mesa ffmpeg \
build-essential git curl \
libglib2.0-0 libgl1 libegl1 ffmpeg \
libusb-1.0-0-dev speech-dispatcher libgeos-dev portaudio19-dev \
cmake pkg-config ninja-build \
&& add-apt-repository -y ppa:deadsnakes/ppa \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
python${PYTHON_VERSION} \
python${PYTHON_VERSION}-venv \
python${PYTHON_VERSION}-dev \
python${PYTHON_VERSION} \
python${PYTHON_VERSION}-venv \
python${PYTHON_VERSION}-dev \
&& curl -LsSf https://astral.sh/uv/install.sh | sh \
&& mv /root/.local/bin/uv /usr/local/bin/uv \
&& useradd --create-home --shell /bin/bash user_lerobot \
+2
View File
@@ -47,6 +47,8 @@
title: π₀-FAST (Pi0Fast)
- local: pi05
title: π₀.₅ (Pi05)
- local: eo1
title: EO-1
- local: groot
title: NVIDIA GR00T N1.5
- local: xvla
+168
View File
@@ -0,0 +1,168 @@
# EO-1
EO-1 is a **Vision-Language-Action policy for robot control**. The LeRobot implementation integrates EO-1 with the standard LeRobot training, evaluation, processor interface.
## Model Overview
EO-1 uses a Qwen2.5-VL backbone for vision-language understanding and adds a continuous flow-matching action head for robot control. The policy formats each robot-control sample as a multimodal conversation: camera images are passed to Qwen2.5-VL, the robot state is represented with EO-1 state tokens, and the future action chunk is represented with EO-1 action tokens.
<img
src="https://huggingface.co/datasets/HaomingSong/lerobot-documentation-images/resolve/main/lerobot/eo_pipeline.png"
alt="An overview of EO-1"
width="85%"
/>
During training, EO-1 learns to denoise continuous action chunks at the action-token positions. During inference, it samples an action chunk, returns continuous actions, and executes `n_action_steps` from the chunk before sampling again.
### What the LeRobot Integration Covers
- Standard `policy.type=eo1` configuration through LeRobot
- Qwen2.5-VL image and text preprocessing through policy processors
- Continuous flow-matching action prediction
- Checkpoint save/load through LeRobot policy APIs
- Training with `lerobot-train` and evaluation with `lerobot-eval`
The broader EO-1 project also includes interleaved vision-text-action pretraining and multimodal reasoning workflows. This page focuses on the LeRobot robot-control policy path.
## Installation Requirements
1. Install LeRobot by following the [Installation Guide](./installation).
2. Install EO-1 dependencies by running:
```bash
pip install -e ".[eo1]"
```
3. If you want to train or evaluate on LIBERO, install the LIBERO dependencies too:
```bash
pip install -e ".[eo1,libero]"
```
EO-1 can use the standard PyTorch scaled-dot-product attention backend through `policy.attn_implementation=sdpa`. If your environment has a compatible `flash_attn` installation, you can request `policy.attn_implementation=flash_attention_2`.
## Data Requirements
EO-1 expects a LeRobot dataset with:
- At least one visual observation, for example `observation.images.image`
- `observation.state`
- `action`
- A language task instruction through the dataset `task` field
If your dataset uses different observation names, use `rename_map` to align them with the names expected by your training or evaluation setup.
## Usage
To use EO-1 in a LeRobot configuration, specify the policy type as:
```python
policy.type=eo1
```
By default, a new EO-1 policy initializes its backbone from:
```python
policy.vlm_base=Qwen/Qwen2.5-VL-3B-Instruct
```
Once a LeRobot-format EO-1 checkpoint is available, load it with:
```python
policy.path=your-org/your-eo1-checkpoint
```
## Training
### Training Command Example
```bash
lerobot-train \
--dataset.repo_id=your_org/your_dataset \
--policy.type=eo1 \
--policy.vlm_base=Qwen/Qwen2.5-VL-3B-Instruct \
--policy.dtype=bfloat16 \
--policy.attn_implementation=sdpa \
--policy.gradient_checkpointing=false \
--output_dir=./outputs/eo1_training \
--job_name=eo1_training \
--steps=300000 \
--batch_size=16 \
--policy.device=cuda
```
### Key Training Parameters
| Parameter | Default | Description |
| -------------------------------------- | ----------------------------- | ----------------------------------------------------------------------- |
| `policy.vlm_base` | `Qwen/Qwen2.5-VL-3B-Instruct` | Qwen2.5-VL checkpoint used to initialize a new policy |
| `policy.dtype` | `auto` | Backbone dtype request: `auto`, `bfloat16`, or `float32` |
| `policy.attn_implementation` | `None` | Optional Qwen attention backend, such as `sdpa` |
| `policy.gradient_checkpointing` | `false` | Reduces memory usage during training |
| `policy.chunk_size` | `8` | Number of future actions predicted per chunk |
| `policy.n_action_steps` | `8` | Number of actions consumed from a sampled chunk |
| `policy.num_denoise_steps` | `10` | Number of flow-matching denoising steps used during sampling |
| `policy.max_state_dim` | `32` | State padding dimension |
| `policy.max_action_dim` | `32` | Action padding dimension |
| `policy.force_fp32_autocast` | `true` | Keeps the flow head in fp32 even when the backbone uses mixed precision |
| `policy.supervise_padding_action_dims` | `true` | Controls whether padded action dimensions are supervised |
| `policy.supervise_padding_actions` | `true` | Controls whether padded future action rows are supervised |
## Evaluation
EO-1 can be evaluated through `lerobot-eval` once you have a LeRobot-format checkpoint:
```bash
lerobot-eval \
--policy.path=your-org/your-eo1-checkpoint \
--env.type=libero \
--env.task=libero_object \
--eval.batch_size=1 \
--eval.n_episodes=20
```
For datasets or environments whose camera names differ from the checkpoint configuration, pass a `rename_map`:
```bash
lerobot-eval \
--policy.path=your-org/your-eo1-checkpoint \
--env.type=libero \
--env.task=libero_object \
--rename_map='{"observation.images.image2":"observation.images.wrist_image"}'
```
## Configuration Notes
### Image Processing
EO-1 uses the Qwen2.5-VL processor. The `policy.image_min_pixels` and `policy.image_max_pixels` settings control the image resizing bounds before the visual tokens are passed into the backbone.
### State and Action Dimensions
The policy pads state and action vectors to `policy.max_state_dim` and `policy.max_action_dim` before the EO-1 flow head. Predictions are cropped back to the original action dimension before being returned by the policy.
### Attention Backend
Use `policy.attn_implementation=sdpa` for a portable setup. Use `flash_attention_2` only when `flash_attn` is installed and compatible with your environment.
## References
- [EO-1 project](https://github.com/EO-Robotics/EO1)
- [EO-1 paper](https://arxiv.org/abs/2508.21112)
- [Qwen2.5-VL-3B-Instruct](https://huggingface.co/Qwen/Qwen2.5-VL-3B-Instruct)
## Citation
```bibtex
@article{eo1,
title={EO-1: Interleaved Vision-Text-Action Pretraining for General Robot Control},
author={Delin Qu and Haoming Song and Qizhi Chen and Zhaoqing Chen and Xianqiang Gao and Xinyi Ye and Qi Lv and Modi Shi and Guanghui Ren and Cheng Ruan and Maoqing Yao and Haoran Yang and Jiacheng Bao and Bin Zhao and Dong Wang},
journal={arXiv preprint},
year={2025},
url={https://arxiv.org/abs/2508.21112}
}
```
## License
This LeRobot integration follows the **Apache 2.0 License** used by LeRobot. Check the upstream EO-1 model and dataset pages for the licenses of released EO-1 checkpoints and data.
+29 -28
View File
@@ -46,7 +46,7 @@ This ensures identical task states map to consistent progress values, even acros
## Inputs and Targets (What the new code expects)
SARM is trained through its processor (`src/lerobot/policies/sarm/processor_sarm.py`), which:
SARM is trained through its processor (`src/lerobot/rewards/sarm/processor_sarm.py`), which:
- **Encodes** images and task text with CLIP (ViT-B/32) into `video_features` and `text_features`
- **Pads/truncates** robot state into `state_features` (up to `max_state_dim`)
@@ -347,7 +347,7 @@ Use `compute_rabc_weights.py` with `--visualize-only` to visualize model predict
<hfoption id="single_stage">
```bash
python src/lerobot/policies/sarm/compute_rabc_weights.py \
python -m lerobot.rewards.sarm.compute_rabc_weights \
--dataset-repo-id your-username/your-dataset \
--reward-model-path your-username/sarm-model \
--visualize-only \
@@ -360,7 +360,7 @@ python src/lerobot/policies/sarm/compute_rabc_weights.py \
<hfoption id="dense_only">
```bash
python src/lerobot/policies/sarm/compute_rabc_weights.py \
python -m lerobot.rewards.sarm.compute_rabc_weights \
--dataset-repo-id your-username/your-dataset \
--reward-model-path your-username/sarm-model \
--visualize-only \
@@ -373,7 +373,7 @@ python src/lerobot/policies/sarm/compute_rabc_weights.py \
<hfoption id="dual">
```bash
python src/lerobot/policies/sarm/compute_rabc_weights.py \
python -m lerobot.rewards.sarm.compute_rabc_weights \
--dataset-repo-id your-username/your-dataset \
--reward-model-path your-username/sarm-model \
--visualize-only \
@@ -429,7 +429,7 @@ The weighting follows **Equations 8-9** from the paper:
First, run the SARM model on all frames in your dataset to compute progress values:
```bash
python src/lerobot/policies/sarm/compute_rabc_weights.py \
python -m lerobot.rewards.sarm.compute_rabc_weights \
--dataset-repo-id your-username/your-dataset \
--reward-model-path your-username/sarm-model \
--head-mode sparse \
@@ -465,15 +465,15 @@ This script:
### Step 5b: Train Policy with RA-BC
Once you have the progress file, train your policy with RA-BC weighting. The progress file is auto-detected from the dataset path (`sarm_progress.parquet`). Currently PI0, PI0.5 and SmolVLA are supported with RA-BC:
Once you have the progress file, train your policy with RA-BC weighting. The progress file is auto-detected from the dataset path (`sarm_progress.parquet`) if not explicitly provided. Currently PI0, PI0.5 and SmolVLA are supported with RA-BC:
```bash
lerobot-train \
--dataset.repo_id=your-username/your-dataset \
--policy.type=pi0 \
--use_rabc=true \
--rabc_head_mode=sparse \
--rabc_kappa=0.01 \
--sample_weighting.type=rabc \
--sample_weighting.head_mode=sparse \
--sample_weighting.kappa=0.01 \
--output_dir=outputs/train/policy_rabc \
--batch_size=32 \
--steps=40000
@@ -488,12 +488,13 @@ The training script automatically:
**RA-BC Arguments:**
| Argument | Description | Default |
| ---------------------- | ---------------------------------------------------------- | ---------------------------------- |
| `--use_rabc` | Enable RA-BC sample weighting | `false` |
| `--rabc_progress_path` | Path to progress parquet file (auto-detected from dataset) | `sarm_progress.parquet` in dataset |
| `--rabc_head_mode` | Which SARM head's progress to use: `sparse` or `dense` | `sparse` |
| `--rabc_kappa` | Threshold κ for high-quality samples | `0.01` |
| Argument | Description | Default |
| ---------------------------------- | ------------------------------------------------------ | ----------------------- |
| `--sample_weighting.type` | Weighting strategy type (`rabc` or `uniform`) | `rabc` |
| `--sample_weighting.progress_path` | Path to progress parquet file | `sarm_progress.parquet` |
| `--sample_weighting.head_mode` | Which SARM head's progress to use: `sparse` or `dense` | `sparse` |
| `--sample_weighting.kappa` | Threshold κ for high-quality samples | `0.01` |
| `--sample_weighting.epsilon` | Small constant for numerical stability | `1e-6` |
### Tuning RA-BC Kappa
@@ -511,30 +512,30 @@ The `kappa` parameter is the threshold that determines which samples get full we
Monitor these WandB metrics during training:
| Metric | Healthy Range | Problem Indicator |
| ------------------ | ------------- | ------------------------- |
| `rabc_mean_weight` | 0.3 - 0.8 | ≈ 1.0 means kappa too low |
| `rabc_delta_mean` | > 0 | Should be positive |
| `rabc_delta_std` | > 0 | Variance in data quality |
| Metric | Healthy Range | Problem Indicator |
| ----------------------------- | ------------- | ------------------------- |
| `sample_weight_mean_weight` | 0.3 - 0.8 | ≈ 1.0 means kappa too low |
| `sample_weighting/delta_mean` | > 0 | Should be positive |
| `sample_weighting/delta_std` | > 0 | Variance in data quality |
**If `rabc_mean_weight ≈ 1.0`:** Your kappa is too low. Most samples have `delta > kappa` and bypass the soft-weighting entirely. RA-BC becomes equivalent to vanilla BC.
**If `sample_weight_mean_weight ≈ 1.0`:** Your kappa is too low. Most samples have `delta > kappa` and bypass the soft-weighting entirely. RA-BC becomes equivalent to vanilla BC.
**Setting kappa based on your data:**
The default `kappa=0.01` was tuned for the paper's T-shirt folding task (~90s episodes at 30fps). For your dataset, check the logged `rabc_delta_mean` and `rabc_delta_std`:
The default `kappa=0.01` was tuned for the paper's T-shirt folding task (~90s episodes at 30fps). For your dataset, check the logged `sample_weighting/delta_mean` and `sample_weighting/delta_std`:
```
# If delta_mean ≈ 0.03 and delta_std ≈ 0.02:
# Most deltas fall in range [0.01, 0.05]
# Option 1: Set kappa = delta_mean (medium selectivity)
--rabc_kappa=0.03
--sample_weighting.kappa=0.03
# Option 2: Set kappa = delta_mean + delta_std (high selectivity)
--rabc_kappa=0.05
--sample_weighting.kappa=0.05
# Option 3: Set kappa = delta_mean + 2*delta_std (very selective)
--rabc_kappa=0.07
--sample_weighting.kappa=0.07
```
**When RA-BC may not help:**
@@ -550,8 +551,8 @@ accelerate launch \
src/lerobot/scripts/lerobot_train.py \
--dataset.repo_id=your-username/your-dataset \
--policy.type=pi0 \
--use_rabc=true \
--rabc_kappa=0.01 \
--sample_weighting.type=rabc \
--sample_weighting.kappa=0.01 \
--output_dir=outputs/train/policy_rabc \
--batch_size=32 \
--steps=40000
@@ -576,7 +577,7 @@ accelerate launch \
### RA-BC
1. **Train SARM first**: RA-BC quality depends entirely on SARM quality
2. **Monitor `rabc_mean_weight`**: If it's ≈ 1.0, increase kappa (see [Tuning RA-BC Kappa](#tuning-ra-bc-kappa))
2. **Monitor `sample_weight_mean_weight`**: If it's ≈ 1.0, increase kappa (see [Tuning RA-BC Kappa](#tuning-ra-bc-kappa))
---
+4 -4
View File
@@ -220,7 +220,7 @@ REAL_DIM = 12
# Postprocessing: Trim 20D predictions to 12D for deployment
```
See the [action_hub.py](/home/jade_choghari/robot/lerobot/src/lerobot/policies/xvla/action_hub.py) implementation for details.
See the [action_hub.py](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/xvla/action_hub.py) implementation for details.
#### Auto Action Mode (Recommended)
@@ -519,9 +519,9 @@ If you use X-VLA in your research, please cite:
- [X-VLA Paper](https://arxiv.org/pdf/2510.10274)
- [LeRobot Documentation](https://github.com/huggingface/lerobot)
- [Action Registry Implementation](https://github.com/huggingface/lerobot/src/lerobot/policies/xvla/action_hub.py)
- [Processor Implementation](https://github.com/huggingface/lerobot/src/lerobot/policies/xvla/processor_xvla.py)
- [Model Configuration](https://github.com/huggingface/lerobot/src/lerobot/policies/xvla/configuration_xvla.py)
- [Action Registry Implementation](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/xvla/action_hub.py)
- [Processor Implementation](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/xvla/processor_xvla.py)
- [Model Configuration](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/xvla/configuration_xvla.py)
## Contributing
+1 -1
View File
@@ -69,7 +69,7 @@ class ComputeProgressShards(PipelineStep):
import torch
from tqdm import tqdm
from lerobot.policies.sarm.compute_rabc_weights import (
from lerobot.rewards.sarm.compute_rabc_weights import (
generate_all_frame_indices,
interpolate_progress,
load_sarm_resources,
+1 -1
View File
@@ -10,7 +10,7 @@ from lerobot.datasets import LeRobotDataset
from lerobot.envs.configs import HILSerlProcessorConfig, HILSerlRobotEnvConfig
from lerobot.policies import SACConfig
from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
from lerobot.rewards.classifier.modeling_classifier import Classifier
from lerobot.rl.buffer import ReplayBuffer
from lerobot.rl.gym_manipulator import make_robot_env
from lerobot.robots.so_follower import SO100FollowerConfig
@@ -1,7 +1,7 @@
import torch
from lerobot.datasets import LeRobotDataset
from lerobot.policies import RewardClassifierConfig, make_policy, make_pre_post_processors
from lerobot.rewards import RewardClassifierConfig, make_reward_model, make_reward_pre_post_processors
def main():
@@ -22,10 +22,10 @@ def main():
model_name="microsoft/resnet-18",
)
# Make policy, preprocessor, and optimizer
policy = make_policy(config, ds_meta=dataset.meta)
optimizer = config.get_optimizer_preset().build(policy.parameters())
preprocessor, _ = make_pre_post_processors(policy_cfg=config, dataset_stats=dataset.meta.stats)
# Make reward model, preprocessor, and optimizer
reward_model = make_reward_model(config, dataset_stats=dataset.meta.stats)
optimizer = config.get_optimizer_preset().build(reward_model.parameters())
preprocessor, _ = make_reward_pre_post_processors(config, dataset_stats=dataset.meta.stats)
classifier_id = "<user>/reward_classifier_hil_serl_example"
@@ -42,7 +42,7 @@ def main():
batch = preprocessor(batch)
# Forward pass
loss, output_dict = policy.forward(batch)
loss, output_dict = reward_model.forward(batch)
# Backward pass and optimization
optimizer.zero_grad()
@@ -58,8 +58,8 @@ def main():
print("Training finished!")
# You can now save the trained policy.
policy.push_to_hub(classifier_id)
# You can now save the trained reward model.
reward_model.push_to_hub(classifier_id)
if __name__ == "__main__":
+2 -1
View File
@@ -128,7 +128,7 @@ dataset_viz = ["lerobot[dataset]", "lerobot[viz]"]
av-dep = ["av>=15.0.0,<16.0.0"]
pygame-dep = ["pygame>=2.5.1,<2.7.0"]
placo-dep = ["placo>=0.9.6,<0.9.17"]
transformers-dep = ["transformers==5.3.0"] # TODO(Steven): https://github.com/huggingface/lerobot/pull/3249
transformers-dep = ["transformers>=5.4.0,<5.6.0"]
grpcio-dep = ["grpcio==1.73.1", "protobuf>=6.31.1,<6.32.0"]
can-dep = ["python-can>=4.2.0,<5.0.0"]
peft-dep = ["peft>=0.18.0,<1.0.0"]
@@ -194,6 +194,7 @@ groot = [
]
sarm = ["lerobot[transformers-dep]", "pydantic>=2.0.0,<3.0.0", "faker>=33.0.0,<35.0.0", "lerobot[matplotlib-dep]", "lerobot[qwen-vl-utils-dep]"]
xvla = ["lerobot[transformers-dep]"]
eo1 = ["lerobot[transformers-dep]", "lerobot[qwen-vl-utils-dep]"]
hilserl = ["lerobot[transformers-dep]", "gym-hil>=0.1.13,<0.2.0", "lerobot[grpcio-dep]", "lerobot[placo-dep]"]
# Features
@@ -17,6 +17,7 @@ Provides the RealSenseCamera class for capturing frames from Intel RealSense cam
"""
import logging
import sys
import time
from threading import Event, Lock, Thread
from typing import TYPE_CHECKING, Any
@@ -41,6 +42,7 @@ from ..utils import get_cv2_rotation
from .configuration_realsense import RealSenseCameraConfig
logger = logging.getLogger(__name__)
pkg_name = "pyrealsense2-macosx" if sys.platform == "darwin" else "pyrealsense2"
class RealSenseCamera(Camera):
@@ -114,7 +116,7 @@ class RealSenseCamera(Camera):
Args:
config: The configuration settings for the camera.
"""
require_package("pyrealsense2", extra="intelrealsense")
require_package(pkg_name, extra="intelrealsense", import_name="pyrealsense2")
super().__init__(config)
self.config = config
+5 -1
View File
@@ -41,8 +41,12 @@ def cfg_to_group(
return tag
return tag[:max_tag_length]
if cfg.is_reward_model_training:
trainable_tag = f"reward_model:{cfg.reward_model.type}"
else:
trainable_tag = f"policy:{cfg.policy.type}"
lst = [
f"policy:{cfg.policy.type}",
trainable_tag,
f"seed:{cfg.seed}",
]
if cfg.dataset is not None:
+163
View File
@@ -0,0 +1,163 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import builtins
import json
import logging
import os
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, TypeVar
import draccus
from huggingface_hub import hf_hub_download
from huggingface_hub.constants import CONFIG_NAME
from huggingface_hub.errors import HfHubHTTPError
from lerobot.configs.types import PolicyFeature
from lerobot.optim.optimizers import OptimizerConfig
from lerobot.optim.schedulers import LRSchedulerConfig
from lerobot.utils.device_utils import auto_select_torch_device, is_torch_device_available
from lerobot.utils.hub import HubMixin
T = TypeVar("T", bound="RewardModelConfig")
logger = logging.getLogger(__name__)
@dataclass
class RewardModelConfig(draccus.ChoiceRegistry, HubMixin, abc.ABC):
"""Base configuration for reward models.
Args:
input_features: A dictionary defining the PolicyFeature of the input data for the reward. The key represents
the input data name, and the value is PolicyFeature, which consists of FeatureType and shape attributes.
output_features: A dictionary defining the PolicyFeature of the output data for the reward. The key represents
the output data name, and the value is PolicyFeature, which consists of FeatureType and shape attributes.
"""
# Reuses PolicyFeature
input_features: dict[str, PolicyFeature] = field(default_factory=dict)
output_features: dict[str, PolicyFeature] = field(default_factory=dict)
device: str | None = None
pretrained_path: str | None = None
push_to_hub: bool = False
repo_id: str | None = None
# Hub metadata
license: str | None = None
tags: list[str] | None = None
private: bool | None = None
def __post_init__(self) -> None:
if not self.device or not is_torch_device_available(self.device):
auto_device = auto_select_torch_device()
logger.warning(f"Device '{self.device}' is not available. Switching to '{auto_device}'.")
self.device = auto_device.type
@property
def type(self) -> str:
choice_name = self.get_choice_name(self.__class__)
if not isinstance(choice_name, str):
raise TypeError(f"Expected string from get_choice_name, got {type(choice_name)}")
return choice_name
@property
def observation_delta_indices(self) -> list | None: # type: ignore[type-arg]
return None
@property
def action_delta_indices(self) -> list | None: # type: ignore[type-arg]
return None
@property
def reward_delta_indices(self) -> list | None: # type: ignore[type-arg]
return None
@abc.abstractmethod
def get_optimizer_preset(self) -> OptimizerConfig:
raise NotImplementedError
def get_scheduler_preset(self) -> LRSchedulerConfig | None:
return None
def validate_features(self) -> None:
pass
def _save_pretrained(self, save_directory: Path) -> None:
with open(save_directory / CONFIG_NAME, "w") as f, draccus.config_type("json"):
draccus.dump(self, f, indent=4)
@classmethod
def from_pretrained(
cls: builtins.type[T],
pretrained_name_or_path: str | Path,
*,
force_download: bool = False,
resume_download: bool | None = None,
proxies: dict[Any, Any] | None = None,
token: str | bool | None = None,
cache_dir: str | Path | None = None,
local_files_only: bool = False,
revision: str | None = None,
**reward_kwargs: Any,
) -> T:
model_id = str(pretrained_name_or_path)
config_file: str | None = None
if Path(model_id).is_dir():
if CONFIG_NAME in os.listdir(model_id):
config_file = os.path.join(model_id, CONFIG_NAME)
else:
logger.error(f"{CONFIG_NAME} not found in {Path(model_id).resolve()}")
else:
try:
config_file = hf_hub_download(
repo_id=model_id,
filename=CONFIG_NAME,
revision=revision,
cache_dir=cache_dir,
force_download=force_download,
proxies=proxies,
resume_download=resume_download,
token=token,
local_files_only=local_files_only,
)
except HfHubHTTPError as e:
raise FileNotFoundError(
f"{CONFIG_NAME} not found on the HuggingFace Hub in {model_id}"
) from e
if config_file is None:
raise FileNotFoundError(f"{CONFIG_NAME} not found in {model_id}")
# HACK: Parse the original config to get the config subclass, so that we can
# apply cli overrides.
with draccus.config_type("json"):
orig_config = draccus.parse(cls, config_file, args=[])
with open(config_file) as f:
config = json.load(f)
config.pop("type", None)
with tempfile.NamedTemporaryFile("w+", delete=False, suffix=".json") as f:
json.dump(config, f)
config_file = f.name
cli_overrides = reward_kwargs.pop("cli_overrides", [])
with draccus.config_type("json"):
return draccus.parse(orig_config.__class__, config_file, args=cli_overrides)
+87 -29
View File
@@ -13,7 +13,9 @@
# limitations under the License.
import builtins
import datetime as dt
import json
import os
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@@ -26,18 +28,57 @@ from lerobot import envs
from lerobot.configs import parser
from lerobot.optim import LRSchedulerConfig, OptimizerConfig
from lerobot.utils.hub import HubMixin
from lerobot.utils.sample_weighting import SampleWeightingConfig
from .default import DatasetConfig, EvalConfig, PeftConfig, WandBConfig
from .policies import PreTrainedConfig
from .rewards import RewardModelConfig
TRAIN_CONFIG_NAME = "train_config.json"
def _migrate_legacy_rabc_fields(config: dict[str, Any]) -> dict[str, Any] | None:
"""Return migrated payload for legacy RA-BC fields, or None when no migration is needed."""
legacy_fields = (
"use_rabc",
"rabc_progress_path",
"rabc_kappa",
"rabc_epsilon",
"rabc_head_mode",
)
if not any(key in config for key in legacy_fields):
return None
migrated_config = dict(config)
use_rabc = bool(migrated_config.pop("use_rabc", False))
rabc_progress_path = migrated_config.pop("rabc_progress_path", None)
rabc_kappa = migrated_config.pop("rabc_kappa", None)
rabc_epsilon = migrated_config.pop("rabc_epsilon", None)
rabc_head_mode = migrated_config.pop("rabc_head_mode", None)
# New configs may already define sample_weighting explicitly. In that case,
# legacy fields are ignored after being stripped from the payload.
if migrated_config.get("sample_weighting") is None and use_rabc:
sample_weighting: dict[str, Any] = {"type": "rabc"}
if rabc_progress_path is not None:
sample_weighting["progress_path"] = rabc_progress_path
if rabc_kappa is not None:
sample_weighting["kappa"] = rabc_kappa
if rabc_epsilon is not None:
sample_weighting["epsilon"] = rabc_epsilon
if rabc_head_mode is not None:
sample_weighting["head_mode"] = rabc_head_mode
migrated_config["sample_weighting"] = sample_weighting
return migrated_config
@dataclass
class TrainPipelineConfig(HubMixin):
dataset: DatasetConfig
env: envs.EnvConfig | None = None
policy: PreTrainedConfig | None = None
reward_model: RewardModelConfig | None = None
# Set `dir` to where you would like to save all of the run outputs. If you run another training session
# with the same value for `dir` its contents will be overwritten unless you set `resume` to true.
output_dir: Path | None = None
@@ -72,27 +113,41 @@ class TrainPipelineConfig(HubMixin):
wandb: WandBConfig = field(default_factory=WandBConfig)
peft: PeftConfig | None = None
# RA-BC (Reward-Aligned Behavior Cloning) parameters
use_rabc: bool = False # Enable reward-weighted training
rabc_progress_path: str | None = None # Path to precomputed SARM progress parquet file
rabc_kappa: float = 0.01 # Hard threshold for high-quality samples
rabc_epsilon: float = 1e-6 # Small constant for numerical stability
rabc_head_mode: str | None = "sparse" # For dual-head models: "sparse" or "dense"
# Sample weighting configuration (e.g., for RA-BC training)
sample_weighting: SampleWeightingConfig | None = None
# Rename map for the observation to override the image and state keys
rename_map: dict[str, str] = field(default_factory=dict)
checkpoint_path: Path | None = field(init=False, default=None)
@property
def is_reward_model_training(self) -> bool:
"""True when the config targets a reward model rather than a policy."""
return self.reward_model is not None
@property
def trainable_config(self) -> PreTrainedConfig | RewardModelConfig:
"""Return whichever config (policy or reward_model) is active."""
if self.is_reward_model_training:
return self.reward_model # type: ignore[return-value]
return self.policy # type: ignore[return-value]
def validate(self) -> None:
# HACK: We parse again the cli args here to get the pretrained paths if there was some.
policy_path = parser.get_path_arg("policy")
if policy_path:
# Only load the policy config
reward_model_path = parser.get_path_arg("reward_model")
if reward_model_path:
cli_overrides = parser.get_cli_overrides("reward_model")
self.reward_model = RewardModelConfig.from_pretrained(
reward_model_path, cli_overrides=cli_overrides
)
self.reward_model.pretrained_path = str(Path(reward_model_path))
elif policy_path:
cli_overrides = parser.get_cli_overrides("policy")
self.policy = PreTrainedConfig.from_pretrained(policy_path, cli_overrides=cli_overrides)
self.policy.pretrained_path = Path(policy_path)
elif self.resume:
# The entire train config is already loaded, we just need to get the checkpoint dir
config_path = parser.parse_arg("config_path")
if not config_path:
raise ValueError(
@@ -108,18 +163,22 @@ class TrainPipelineConfig(HubMixin):
policy_dir = Path(config_path).parent
if self.policy is not None:
self.policy.pretrained_path = policy_dir
if self.reward_model is not None:
self.reward_model.pretrained_path = str(policy_dir)
self.checkpoint_path = policy_dir.parent
if self.policy is None:
if self.policy is None and self.reward_model is None:
raise ValueError(
"Policy is not configured. Please specify a pretrained policy with `--policy.path`."
"Neither policy nor reward_model is configured. "
"Please specify one with `--policy.path` or `--reward_model.path`."
)
active_cfg = self.trainable_config
if not self.job_name:
if self.env is None:
self.job_name = f"{self.policy.type}"
self.job_name = f"{active_cfg.type}"
else:
self.job_name = f"{self.env.type}_{self.policy.type}"
self.job_name = f"{self.env.type}_{active_cfg.type}"
if not self.resume and isinstance(self.output_dir, Path) and self.output_dir.is_dir():
raise FileExistsError(
@@ -137,26 +196,16 @@ class TrainPipelineConfig(HubMixin):
if not self.use_policy_training_preset and (self.optimizer is None or self.scheduler is None):
raise ValueError("Optimizer and Scheduler must be set when the policy presets are not used.")
elif self.use_policy_training_preset and not self.resume:
self.optimizer = self.policy.get_optimizer_preset()
self.scheduler = self.policy.get_scheduler_preset()
self.optimizer = active_cfg.get_optimizer_preset()
self.scheduler = active_cfg.get_scheduler_preset()
if self.policy.push_to_hub and not self.policy.repo_id:
raise ValueError(
"'policy.repo_id' argument missing. Please specify it to push the model to the hub."
)
if self.use_rabc and not self.rabc_progress_path:
# Auto-detect from dataset path
repo_id = self.dataset.repo_id
if self.dataset.root:
self.rabc_progress_path = str(Path(self.dataset.root) / "sarm_progress.parquet")
else:
self.rabc_progress_path = f"hf://datasets/{repo_id}/sarm_progress.parquet"
if hasattr(active_cfg, "push_to_hub") and active_cfg.push_to_hub and not active_cfg.repo_id:
raise ValueError("'repo_id' argument missing. Please specify it to push the model to the hub.")
@classmethod
def __get_path_fields__(cls) -> list[str]:
"""This enables the parser to load config from the policy using `--policy.path=local/dir`"""
return ["policy"]
"""Keys for draccus pretrained-path loading."""
return ["policy", "reward_model"]
def to_dict(self) -> dict[str, Any]:
return draccus.encode(self) # type: ignore[no-any-return] # because of the third-party library draccus uses Any as the return type
@@ -207,6 +256,15 @@ class TrainPipelineConfig(HubMixin):
) from e
cli_args = kwargs.pop("cli_args", [])
if config_file is not None:
with open(config_file) as f:
config = json.load(f)
migrated_config = _migrate_legacy_rabc_fields(config)
if migrated_config is not None:
with tempfile.NamedTemporaryFile("w+", delete=False, suffix=".json") as f:
json.dump(migrated_config, f)
config_file = f.name
with draccus.config_type("json"):
return draccus.parse(cls, config_file, args=cli_args)
+13 -17
View File
@@ -97,8 +97,8 @@ def update_data_df(df, src_meta, dst_meta):
pd.DataFrame: Updated DataFrame with adjusted indices.
"""
df["episode_index"] = df["episode_index"] + dst_meta.info["total_episodes"]
df["index"] = df["index"] + dst_meta.info["total_frames"]
df["episode_index"] = df["episode_index"] + dst_meta.info.total_episodes
df["index"] = df["index"] + dst_meta.info.total_frames
src_task_names = src_meta.tasks.index.take(df["task_index"].to_numpy())
df["task_index"] = dst_meta.tasks.loc[src_task_names, "task_index"].to_numpy()
@@ -225,9 +225,9 @@ def update_meta_data(
# Clean up temporary columns
df = df.drop(columns=["_orig_chunk", "_orig_file"])
df["dataset_from_index"] = df["dataset_from_index"] + dst_meta.info["total_frames"]
df["dataset_to_index"] = df["dataset_to_index"] + dst_meta.info["total_frames"]
df["episode_index"] = df["episode_index"] + dst_meta.info["total_episodes"]
df["dataset_from_index"] = df["dataset_from_index"] + dst_meta.info.total_frames
df["dataset_to_index"] = df["dataset_to_index"] + dst_meta.info.total_frames
df["episode_index"] = df["episode_index"] + dst_meta.info.total_episodes
return df
@@ -237,8 +237,8 @@ def aggregate_datasets(
aggr_repo_id: str,
roots: list[Path] | None = None,
aggr_root: Path | None = None,
data_files_size_in_mb: float | None = None,
video_files_size_in_mb: float | None = None,
data_files_size_in_mb: int | None = None,
video_files_size_in_mb: int | None = None,
chunk_size: int | None = None,
):
"""Aggregates multiple LeRobot datasets into a single unified dataset.
@@ -313,8 +313,8 @@ def aggregate_datasets(
# to avoid interference between different source datasets
data_idx.pop("src_to_dst", None)
dst_meta.info["total_episodes"] += src_meta.total_episodes
dst_meta.info["total_frames"] += src_meta.total_frames
dst_meta.info.total_episodes += src_meta.total_episodes
dst_meta.info.total_frames += src_meta.total_frames
finalize_aggregation(dst_meta, all_metadata)
logging.info("Aggregation complete.")
@@ -640,14 +640,10 @@ def finalize_aggregation(aggr_meta, all_metadata):
write_tasks(aggr_meta.tasks, aggr_meta.root)
logging.info("write info")
aggr_meta.info.update(
{
"total_tasks": len(aggr_meta.tasks),
"total_episodes": sum(m.total_episodes for m in all_metadata),
"total_frames": sum(m.total_frames for m in all_metadata),
"splits": {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"},
}
)
aggr_meta.info.total_tasks = len(aggr_meta.tasks)
aggr_meta.info.total_episodes = sum(m.total_episodes for m in all_metadata)
aggr_meta.info.total_frames = sum(m.total_frames for m in all_metadata)
aggr_meta.info.splits = {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"}
write_info(aggr_meta.info, aggr_meta.root)
logging.info("write stats")
+21 -23
View File
@@ -37,13 +37,11 @@ from .io_utils import (
load_subtasks,
load_tasks,
write_info,
write_json,
write_stats,
write_tasks,
)
from .utils import (
DEFAULT_EPISODES_PATH,
INFO_PATH,
check_version_compatibility,
get_safe_version,
has_legacy_hub_download_metadata,
@@ -228,7 +226,7 @@ class LeRobotDatasetMetadata:
@property
def _version(self) -> packaging.version.Version:
"""Codebase version used to create this dataset."""
return packaging.version.parse(self.info["codebase_version"])
return packaging.version.parse(self.info.codebase_version)
def get_data_file_path(self, ep_index: int) -> Path:
"""Return the relative parquet file path for the given episode index.
@@ -283,27 +281,27 @@ class LeRobotDatasetMetadata:
@property
def data_path(self) -> str:
"""Formattable string for the parquet files."""
return self.info["data_path"]
return self.info.data_path
@property
def video_path(self) -> str | None:
"""Formattable string for the video files."""
return self.info["video_path"]
return self.info.video_path
@property
def robot_type(self) -> str | None:
"""Robot type used in recording this dataset."""
return self.info["robot_type"]
return self.info.robot_type
@property
def fps(self) -> int:
"""Frames per second used during data collection."""
return self.info["fps"]
return self.info.fps
@property
def features(self) -> dict[str, dict]:
"""All features contained in the dataset."""
return self.info["features"]
return self.info.features
@property
def image_keys(self) -> list[str]:
@@ -333,32 +331,32 @@ class LeRobotDatasetMetadata:
@property
def total_episodes(self) -> int:
"""Total number of episodes available."""
return self.info["total_episodes"]
return self.info.total_episodes
@property
def total_frames(self) -> int:
"""Total number of frames saved in this dataset."""
return self.info["total_frames"]
return self.info.total_frames
@property
def total_tasks(self) -> int:
"""Total number of different tasks performed in this dataset."""
return self.info["total_tasks"]
return self.info.total_tasks
@property
def chunks_size(self) -> int:
"""Max number of files per chunk."""
return self.info["chunks_size"]
return self.info.chunks_size
@property
def data_files_size_in_mb(self) -> int:
"""Max size of data file in mega bytes."""
return self.info["data_files_size_in_mb"]
return self.info.data_files_size_in_mb
@property
def video_files_size_in_mb(self) -> int:
"""Max size of video file in mega bytes."""
return self.info["video_files_size_in_mb"]
return self.info.video_files_size_in_mb
def get_task_index(self, task: str) -> int | None:
"""
@@ -502,10 +500,10 @@ class LeRobotDatasetMetadata:
self._save_episode_metadata(episode_dict)
# Update info
self.info["total_episodes"] += 1
self.info["total_frames"] += episode_length
self.info["total_tasks"] = len(self.tasks)
self.info["splits"] = {"train": f"0:{self.info['total_episodes']}"}
self.info.total_episodes += 1
self.info.total_frames += episode_length
self.info.total_tasks = len(self.tasks)
self.info.splits = {"train": f"0:{self.info.total_episodes}"}
write_info(self.info, self.root)
@@ -524,7 +522,7 @@ class LeRobotDatasetMetadata:
for key in video_keys:
if not self.features[key].get("info", None):
video_path = self.root / self.video_path.format(video_key=key, chunk_index=0, file_index=0)
self.info["features"][key]["info"] = get_video_info(video_path)
self.info.features[key]["info"] = get_video_info(video_path)
def update_chunk_settings(
self,
@@ -546,17 +544,17 @@ class LeRobotDatasetMetadata:
if chunks_size is not None:
if chunks_size <= 0:
raise ValueError(f"chunks_size must be positive, got {chunks_size}")
self.info["chunks_size"] = chunks_size
self.info.chunks_size = chunks_size
if data_files_size_in_mb is not None:
if data_files_size_in_mb <= 0:
raise ValueError(f"data_files_size_in_mb must be positive, got {data_files_size_in_mb}")
self.info["data_files_size_in_mb"] = data_files_size_in_mb
self.info.data_files_size_in_mb = data_files_size_in_mb
if video_files_size_in_mb is not None:
if video_files_size_in_mb <= 0:
raise ValueError(f"video_files_size_in_mb must be positive, got {video_files_size_in_mb}")
self.info["video_files_size_in_mb"] = video_files_size_in_mb
self.info.video_files_size_in_mb = video_files_size_in_mb
# Update the info file on disk
write_info(self.info, self.root)
@@ -653,7 +651,7 @@ class LeRobotDatasetMetadata:
f"Features contain video keys {obj.video_keys}, but 'use_videos' is set to False. "
"Either remove video features from the features dict, or set 'use_videos=True'."
)
write_json(obj.info, obj.root / INFO_PATH)
write_info(obj.info, obj.root)
obj.revision = None
obj._pq_writer = None
obj.latest_episode = None
+19 -24
View File
@@ -897,14 +897,10 @@ def _copy_and_reindex_episodes_metadata(
dst_meta.finalize()
dst_meta.info.update(
{
"total_episodes": len(episode_mapping),
"total_frames": total_frames,
"total_tasks": len(dst_meta.tasks) if dst_meta.tasks is not None else 0,
"splits": {"train": f"0:{len(episode_mapping)}"},
}
)
dst_meta.info.total_episodes = len(episode_mapping)
dst_meta.info.total_frames = total_frames
dst_meta.info.total_tasks = len(dst_meta.tasks) if dst_meta.tasks is not None else 0
dst_meta.info.splits = {"train": f"0:{len(episode_mapping)}"}
write_info(dst_meta.info, dst_meta.root)
if not all_stats:
@@ -1069,21 +1065,20 @@ def _copy_episodes_metadata_and_stats(
if episodes_dir.exists():
shutil.copytree(episodes_dir, dst_episodes_dir, dirs_exist_ok=True)
dst_meta.info.update(
{
"total_episodes": src_dataset.meta.total_episodes,
"total_frames": src_dataset.meta.total_frames,
"total_tasks": src_dataset.meta.total_tasks,
"splits": src_dataset.meta.info.get("splits", {"train": f"0:{src_dataset.meta.total_episodes}"}),
}
dst_meta.info.total_episodes = src_dataset.meta.total_episodes
dst_meta.info.total_frames = src_dataset.meta.total_frames
dst_meta.info.total_tasks = src_dataset.meta.total_tasks
# Preserve original splits if available, otherwise create default
dst_meta.info.splits = (
src_dataset.meta.info.splits
if src_dataset.meta.info.splits
else {"train": f"0:{src_dataset.meta.total_episodes}"}
)
if dst_meta.video_keys and src_dataset.meta.video_keys:
for key in dst_meta.video_keys:
if key in src_dataset.meta.features:
dst_meta.info["features"][key]["info"] = src_dataset.meta.info["features"][key].get(
"info", {}
)
dst_meta.info.features[key]["info"] = src_dataset.meta.info.features[key].get("info", {})
write_info(dst_meta.info, dst_meta.root)
@@ -1525,7 +1520,7 @@ def modify_tasks(
write_tasks(new_task_df, root)
# Update info.json
dataset.meta.info["total_tasks"] = len(unique_tasks)
dataset.meta.info.total_tasks = len(unique_tasks)
write_info(dataset.meta.info, root)
# Reload metadata to reflect changes
@@ -1858,10 +1853,10 @@ def convert_image_to_video_dataset(
episodes_df.to_parquet(episodes_path, index=False)
# Update metadata info
new_meta.info["total_episodes"] = len(episode_indices)
new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata.values())
new_meta.info["total_tasks"] = dataset.meta.total_tasks
new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"}
new_meta.info.total_episodes = len(episode_indices)
new_meta.info.total_frames = sum(ep["length"] for ep in all_episode_metadata.values())
new_meta.info.total_tasks = dataset.meta.total_tasks
new_meta.info.splits = {"train": f"0:{len(episode_indices)}"}
# Update video info for all image keys (now videos)
# We need to manually set video info since update_video_info() checks video_keys first
@@ -1870,7 +1865,7 @@ def convert_image_to_video_dataset(
video_path = new_meta.root / new_meta.video_path.format(
video_key=img_key, chunk_index=0, file_index=0
)
new_meta.info["features"][img_key]["info"] = get_video_info(video_path)
new_meta.info.features[img_key]["info"] = get_video_info(video_path)
write_info(new_meta.info, new_meta.root)
+7 -4
View File
@@ -19,6 +19,7 @@ from pprint import pformat
import torch
from lerobot.configs import PreTrainedConfig
from lerobot.configs.rewards import RewardModelConfig
from lerobot.configs.train import TrainPipelineConfig
from lerobot.transforms import ImageTransforms
from lerobot.utils.constants import ACTION, IMAGENET_STATS, OBS_PREFIX, REWARD
@@ -30,12 +31,14 @@ from .streaming_dataset import StreamingLeRobotDataset
def resolve_delta_timestamps(
cfg: PreTrainedConfig, ds_meta: LeRobotDatasetMetadata
cfg: PreTrainedConfig | RewardModelConfig, ds_meta: LeRobotDatasetMetadata
) -> dict[str, list] | None:
"""Resolves delta_timestamps by reading from the 'delta_indices' properties of the PreTrainedConfig.
"""Resolves delta_timestamps by reading from the 'delta_indices' properties of the config.
Args:
cfg (PreTrainedConfig): The PreTrainedConfig to read delta_indices from.
cfg (PreTrainedConfig | RewardModelConfig): The config to read delta_indices from. Both
``PreTrainedConfig`` and concrete ``RewardModelConfig`` subclasses expose the
``{observation,action,reward}_delta_indices`` properties used below.
ds_meta (LeRobotDatasetMetadata): The dataset from which features and fps are used to build
delta_timestamps against.
@@ -82,7 +85,7 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas
ds_meta = LeRobotDatasetMetadata(
cfg.dataset.repo_id, root=cfg.dataset.root, revision=cfg.dataset.revision
)
delta_timestamps = resolve_delta_timestamps(cfg.policy, ds_meta)
delta_timestamps = resolve_delta_timestamps(cfg.trainable_config, ds_meta)
if not cfg.dataset.streaming:
dataset = LeRobotDataset(
cfg.dataset.repo_id,
+18 -18
View File
@@ -28,6 +28,7 @@ from .utils import (
DEFAULT_DATA_PATH,
DEFAULT_VIDEO_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_PATH,
DatasetInfo,
)
@@ -78,8 +79,8 @@ def create_empty_dataset_info(
chunks_size: int | None = None,
data_files_size_in_mb: int | None = None,
video_files_size_in_mb: int | None = None,
) -> dict:
"""Create a template dictionary for a new dataset's `info.json`.
) -> DatasetInfo:
"""Create a template ``DatasetInfo`` object for a new dataset's ``meta/info.json``.
Args:
codebase_version (str): The version of the LeRobot codebase.
@@ -87,25 +88,24 @@ def create_empty_dataset_info(
features (dict): The LeRobot features dictionary for the dataset.
use_videos (bool): Whether the dataset will store videos.
robot_type (str | None): The type of robot used, if any.
chunks_size (int | None): Max files per chunk directory. Defaults to ``DEFAULT_CHUNK_SIZE``.
data_files_size_in_mb (int | None): Max parquet file size in MB. Defaults to ``DEFAULT_DATA_FILE_SIZE_IN_MB``.
video_files_size_in_mb (int | None): Max video file size in MB. Defaults to ``DEFAULT_VIDEO_FILE_SIZE_IN_MB``.
Returns:
dict: A dictionary with the initial dataset metadata.
DatasetInfo: A typed dataset information object with initial metadata.
"""
return {
"codebase_version": codebase_version,
"robot_type": robot_type,
"total_episodes": 0,
"total_frames": 0,
"total_tasks": 0,
"chunks_size": chunks_size or DEFAULT_CHUNK_SIZE,
"data_files_size_in_mb": data_files_size_in_mb or DEFAULT_DATA_FILE_SIZE_IN_MB,
"video_files_size_in_mb": video_files_size_in_mb or DEFAULT_VIDEO_FILE_SIZE_IN_MB,
"fps": fps,
"splits": {},
"data_path": DEFAULT_DATA_PATH,
"video_path": DEFAULT_VIDEO_PATH if use_videos else None,
"features": features,
}
return DatasetInfo(
codebase_version=codebase_version,
fps=fps,
features=features,
robot_type=robot_type,
chunks_size=chunks_size or DEFAULT_CHUNK_SIZE,
data_files_size_in_mb=data_files_size_in_mb or DEFAULT_DATA_FILE_SIZE_IN_MB,
video_files_size_in_mb=video_files_size_in_mb or DEFAULT_VIDEO_FILE_SIZE_IN_MB,
data_path=DEFAULT_DATA_PATH,
video_path=DEFAULT_VIDEO_PATH if use_videos else None,
)
def check_delta_timestamps(
+7 -10
View File
@@ -39,6 +39,7 @@ from .utils import (
EPISODES_DIR,
INFO_PATH,
STATS_PATH,
DatasetInfo,
serialize_dict,
)
@@ -115,25 +116,21 @@ def embed_images(dataset: datasets.Dataset) -> datasets.Dataset:
return dataset
def write_info(info: dict, local_dir: Path) -> None:
write_json(info, local_dir / INFO_PATH)
def write_info(info: DatasetInfo, local_dir: Path) -> None:
write_json(info.to_dict(), local_dir / INFO_PATH)
def load_info(local_dir: Path) -> dict:
def load_info(local_dir: Path) -> DatasetInfo:
"""Load dataset info metadata from its standard file path.
Also converts shape lists to tuples for consistency.
Args:
local_dir (Path): The root directory of the dataset.
Returns:
dict: The dataset information dictionary.
DatasetInfo: The typed dataset information object.
"""
info = load_json(local_dir / INFO_PATH)
for ft in info["features"].values():
ft["shape"] = tuple(ft["shape"])
return info
raw = load_json(local_dir / INFO_PATH)
return DatasetInfo.from_dict(raw)
def write_stats(stats: dict, local_dir: Path) -> None:
+4
View File
@@ -630,6 +630,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
streaming_encoding: bool = False,
encoder_queue_maxsize: int = 30,
encoder_threads: int | None = None,
video_files_size_in_mb: int | None = None,
data_files_size_in_mb: int | None = None,
) -> "LeRobotDataset":
"""Create a new LeRobotDataset from scratch for recording data.
@@ -677,6 +679,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
root=root,
use_videos=use_videos,
metadata_buffer_size=metadata_buffer_size,
video_files_size_in_mb=video_files_size_in_mb,
data_files_size_in_mb=data_files_size_in_mb,
)
obj.repo_id = obj.meta.repo_id
obj._requested_root = obj.meta.root
+2 -2
View File
@@ -123,7 +123,7 @@ class MultiLeRobotDataset(torch.utils.data.Dataset):
NOTE: Fow now, this relies on a check in __init__ to make sure all sub-datasets have the same info.
"""
return self._datasets[0].meta.info["fps"]
return self._datasets[0].meta.info.fps
@property
def video(self) -> bool:
@@ -133,7 +133,7 @@ class MultiLeRobotDataset(torch.utils.data.Dataset):
NOTE: Fow now, this relies on a check in __init__ to make sure all sub-datasets have the same info.
"""
return self._datasets[0].meta.info.get("video", False)
return len(self._datasets[0].meta.video_keys) > 0
@property
def features(self) -> datasets.Features:
+1 -1
View File
@@ -434,7 +434,7 @@ class StreamingLeRobotDataset(torch.utils.data.IterableDataset):
def _make_padding_camera_frame(self, camera_key: str):
"""Variable-shape padding frame for given camera keys, given in (H, W, C)"""
return torch.zeros(self.meta.info["features"][camera_key]["shape"]).permute(-1, 0, 1)
return torch.zeros(self.meta.info.features[camera_key]["shape"]).permute(-1, 0, 1)
def _get_video_frame_padding_mask(
self,
+127 -5
View File
@@ -14,9 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import dataclasses
import importlib.resources
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
import datasets
@@ -70,9 +72,12 @@ class ForwardCompatibilityError(CompatibilityError):
super().__init__(message)
logger = logging.getLogger(__name__)
DEFAULT_CHUNK_SIZE = 1000 # Max number of files per chunk
DEFAULT_DATA_FILE_SIZE_IN_MB = 50 # Max size per file
DEFAULT_VIDEO_FILE_SIZE_IN_MB = 100 # Max size per file
DEFAULT_DATA_FILE_SIZE_IN_MB = 100 # Max size per file
DEFAULT_VIDEO_FILE_SIZE_IN_MB = 200 # Max size per file
INFO_PATH = "meta/info.json"
STATS_PATH = "meta/stats.json"
@@ -94,6 +99,123 @@ LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl"
LEGACY_TASKS_PATH = "meta/tasks.jsonl"
@dataclass
class DatasetInfo:
"""Typed representation of the ``meta/info.json`` file for a LeRobot dataset.
Replaces the previously untyped ``dict`` returned by ``load_info()`` and
created by ``create_empty_dataset_info()``. Using a dataclass provides
explicit field definitions, IDE auto-completion, and validation at
construction time.
"""
codebase_version: str
fps: int
features: dict[str, dict]
# Episode / frame counters — start at zero for new datasets
total_episodes: int = 0
total_frames: int = 0
total_tasks: int = 0
# Storage settings
chunks_size: int = field(default=DEFAULT_CHUNK_SIZE)
data_files_size_in_mb: int = field(default=DEFAULT_DATA_FILE_SIZE_IN_MB)
video_files_size_in_mb: int = field(default=DEFAULT_VIDEO_FILE_SIZE_IN_MB)
# File path templates
data_path: str = field(default=DEFAULT_DATA_PATH)
video_path: str | None = field(default=DEFAULT_VIDEO_PATH)
# Optional metadata
robot_type: str | None = None
splits: dict[str, str] = field(default_factory=dict)
def __post_init__(self) -> None:
# Coerce feature shapes from list to tuple — JSON deserialisation
# returns lists, but the rest of the codebase expects tuples.
for ft in self.features.values():
if isinstance(ft.get("shape"), list):
ft["shape"] = tuple(ft["shape"])
if self.fps <= 0:
raise ValueError(f"fps must be positive, got {self.fps}")
if self.chunks_size <= 0:
raise ValueError(f"chunks_size must be positive, got {self.chunks_size}")
if self.data_files_size_in_mb <= 0:
raise ValueError(f"data_files_size_in_mb must be positive, got {self.data_files_size_in_mb}")
if self.video_files_size_in_mb <= 0:
raise ValueError(f"video_files_size_in_mb must be positive, got {self.video_files_size_in_mb}")
def to_dict(self) -> dict:
"""Return a JSON-serialisable dict.
Converts tuple shapes back to lists so ``json.dump`` can handle them.
"""
d = dataclasses.asdict(self)
for ft in d["features"].values():
if isinstance(ft.get("shape"), tuple):
ft["shape"] = list(ft["shape"])
return d
@classmethod
def from_dict(cls, data: dict) -> "DatasetInfo":
"""Construct from a raw dict (e.g. loaded directly from JSON).
Unknown keys are ignored for forward compatibility with datasets that
carry additional fields (e.g. ``total_videos`` from v2.x). A warning is
logged when such fields are present.
"""
known = {f.name for f in dataclasses.fields(cls)}
unknown = sorted(k for k in data if k not in known)
if unknown:
logger.warning(f"Unknown fields in DatasetInfo: {unknown}. These will be ignored.")
return cls(**{k: v for k, v in data.items() if k in known})
# ---------------------------------------------------------------------------
# Temporary dict-style compatibility layer
# Allows existing ``info["key"]`` call-sites to keep working without changes.
# Once all callers have been migrated to attribute access, remove these.
# ---------------------------------------------------------------------------
def __getitem__(self, key: str):
import warnings
warnings.warn(
f"Accessing DatasetInfo with dict-style syntax info['{key}'] is deprecated. "
f"Use attribute access info.{key} instead.",
DeprecationWarning,
stacklevel=2,
)
try:
return getattr(self, key)
except AttributeError as err:
raise KeyError(key) from err
def __setitem__(self, key: str, value) -> None:
import warnings
warnings.warn(
f"Setting DatasetInfo with dict-style syntax info['{key}'] = ... is deprecated. "
f"Use attribute assignment info.{key} = ... instead.",
DeprecationWarning,
stacklevel=2,
)
if not hasattr(self, key):
raise KeyError(f"DatasetInfo has no field '{key}'")
setattr(self, key, value)
def __contains__(self, key: str) -> bool:
"""Check if a field exists (dict-like interface)."""
return hasattr(self, key)
def get(self, key: str, default=None):
"""Get attribute value with default fallback (dict-like interface)."""
try:
return getattr(self, key)
except AttributeError:
return default
def has_legacy_hub_download_metadata(root: Path) -> bool:
"""Return ``True`` when *root* looks like a legacy Hub ``local_dir`` mirror.
@@ -294,7 +416,7 @@ def create_branch(repo_id: str, *, branch: str, repo_type: str | None = None) ->
def create_lerobot_dataset_card(
tags: list | None = None,
dataset_info: dict | None = None,
dataset_info: DatasetInfo | None = None,
**kwargs,
) -> DatasetCard:
"""Create a `DatasetCard` for a LeRobot dataset.
@@ -305,7 +427,7 @@ def create_lerobot_dataset_card(
Args:
tags (list | None): A list of tags to add to the dataset card.
dataset_info (dict | None): The dataset's info dictionary, which will
dataset_info (DatasetInfo | None): The dataset's info object, which will
be displayed on the card.
**kwargs: Additional keyword arguments to populate the card template.
@@ -318,7 +440,7 @@ def create_lerobot_dataset_card(
card_tags += tags
if dataset_info:
dataset_structure = "[meta/info.json](meta/info.json):\n"
dataset_structure += f"```json\n{json.dumps(dataset_info, indent=4)}\n```\n"
dataset_structure += f"```json\n{json.dumps(dataset_info.to_dict(), indent=4)}\n```\n"
kwargs = {**kwargs, "dataset_structure": dataset_structure}
card_data = DatasetCardData(
license=kwargs.get("license"),
+2 -4
View File
@@ -16,6 +16,7 @@ from lerobot.utils.action_interpolator import ActionInterpolator as ActionInterp
from .act.configuration_act import ACTConfig as ACTConfig
from .diffusion.configuration_diffusion import DiffusionConfig as DiffusionConfig
from .eo1.configuration_eo1 import EO1Config as EO1Config
from .factory import get_policy_class, make_policy, make_policy_config, make_pre_post_processors
from .groot.configuration_groot import GrootConfig as GrootConfig
from .multi_task_dit.configuration_multi_task_dit import MultiTaskDiTConfig as MultiTaskDiTConfig
@@ -24,8 +25,6 @@ from .pi0_fast.configuration_pi0_fast import PI0FastConfig as PI0FastConfig
from .pi05.configuration_pi05 import PI05Config as PI05Config
from .pretrained import PreTrainedPolicy as PreTrainedPolicy
from .sac.configuration_sac import SACConfig as SACConfig
from .sac.reward_model.configuration_classifier import RewardClassifierConfig as RewardClassifierConfig
from .sarm.configuration_sarm import SARMConfig as SARMConfig
from .smolvla.configuration_smolvla import SmolVLAConfig as SmolVLAConfig
from .tdmpc.configuration_tdmpc import TDMPCConfig as TDMPCConfig
from .utils import make_robot_action, prepare_observation_for_inference
@@ -43,12 +42,11 @@ __all__ = [
"DiffusionConfig",
"GrootConfig",
"MultiTaskDiTConfig",
"EO1Config",
"PI0Config",
"PI0FastConfig",
"PI05Config",
"RewardClassifierConfig",
"SACConfig",
"SARMConfig",
"SmolVLAConfig",
"TDMPCConfig",
"VQBeTConfig",
+4 -3
View File
@@ -142,9 +142,10 @@ class ACTPolicy(PreTrainedPolicy):
actions_hat, (mu_hat, log_sigma_x2_hat) = self.model(batch)
l1_loss = (
F.l1_loss(batch[ACTION], actions_hat, reduction="none") * ~batch["action_is_pad"].unsqueeze(-1)
).mean()
abs_err = F.l1_loss(batch[ACTION], actions_hat, reduction="none")
valid_mask = ~batch["action_is_pad"].unsqueeze(-1)
num_valid = valid_mask.sum() * abs_err.shape[-1]
l1_loss = (abs_err * valid_mask).sum() / num_valid.clamp_min(1)
loss_dict = {"l1_loss": l1_loss.item()}
if self.config.use_vae:
@@ -380,7 +380,9 @@ class DiffusionModel(nn.Module):
f"{self.config.do_mask_loss_for_padding=}."
)
in_episode_bound = ~batch["action_is_pad"]
loss = loss * in_episode_bound.unsqueeze(-1)
mask = in_episode_bound.unsqueeze(-1)
num_valid = mask.sum() * loss.shape[-1]
return (loss * mask).sum() / num_valid.clamp_min(1)
return loss.mean()
+1
View File
@@ -0,0 +1 @@
../../../../docs/source/eo1.mdx
+7
View File
@@ -0,0 +1,7 @@
#!/usr/bin/env python
from .configuration_eo1 import EO1Config
from .modeling_eo1 import EO1Policy
from .processor_eo1 import make_eo1_pre_post_processors
__all__ = ["EO1Config", "EO1Policy", "make_eo1_pre_post_processors"]
@@ -0,0 +1,193 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from copy import deepcopy
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.optim.optimizers import AdamWConfig
from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
from lerobot.utils.constants import ACTION, OBS_STATE
from lerobot.utils.import_utils import _transformers_available, require_package
if TYPE_CHECKING or _transformers_available:
from transformers.models.qwen2_5_vl.configuration_qwen2_5_vl import (
Qwen2_5_VLConfig,
Qwen2_5_VLTextConfig,
Qwen2_5_VLVisionConfig,
)
else:
Qwen2_5_VLConfig = None
Qwen2_5_VLTextConfig = None
Qwen2_5_VLVisionConfig = None
@PreTrainedConfig.register_subclass("eo1")
@dataclass
class EO1Config(PreTrainedConfig):
"""Configuration for native EO1 policy integration in LeRobot."""
vlm_base: str = "Qwen/Qwen2.5-VL-3B-Instruct"
vlm_config: dict | None = None
# Vision processor settings.
image_min_pixels: int | None = 64 * 28 * 28
image_max_pixels: int | None = 128 * 28 * 28
use_fast_processor: bool = False
# Execution and action horizon.
n_obs_steps: int = 1
chunk_size: int = 8
n_action_steps: int = 8
# State/action padding to match EO1 flow head dimensionality.
max_state_dim: int = 32
max_action_dim: int = 32
# Flow matching sampling.
num_denoise_steps: int = 10
num_action_layers: int = 2
action_act: str = "linear"
time_sampling_beta_alpha: float = 1.5
time_sampling_beta_beta: float = 1.0
time_sampling_scale: float = 0.999
time_sampling_offset: float = 0.001
min_period: float = 4e-3
max_period: float = 4.0
supervise_padding_action_dims: bool = True
supervise_padding_actions: bool = True
# Policy-level dtype request for the Qwen backbone.
# - "auto": follow the backbone config/checkpoint default dtype. For Qwen2.5-VL this resolves to bf16.
# The EO1 flow-matching head still keeps its own parameters in fp32.
# - "bfloat16": force the backbone to initialize/load in bf16 regardless of the saved config default.
# - "float32": force the backbone to initialize/load in fp32 for maximum numerical conservatism.
dtype: str = "auto" # Options: "auto", "bfloat16", "float32"
force_fp32_autocast: bool = True
# Optional attention backend request passed through to the Qwen backbone.
# Common values: None, "eager", "sdpa", "flash_attention_2".
attn_implementation: str | None = None
# Training settings.
gradient_checkpointing: bool = False # Enable gradient checkpointing for memory optimization
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.IDENTITY,
"STATE": NormalizationMode.MEAN_STD,
"ACTION": NormalizationMode.MEAN_STD,
}
)
# Optimizer settings aligned with EO1/experiments/2_libero/train.sh and EO1 TrainPipelineConfig defaults.
optimizer_lr: float = 1e-4
optimizer_betas: tuple[float, float] = (0.9, 0.999)
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 0.1
optimizer_grad_clip_norm: float = 1.0
# Scheduler settings aligned with EO1 train.sh: cosine schedule with warmup_ratio=0.03.
# Note: These will auto-scale if --steps < scheduler_decay_steps
# For example, --steps=3000 will scale warmup to 100 and decay to 3000
scheduler_warmup_steps: int = 900 # 0.03 * 30_000 long-run steps
scheduler_decay_steps: int = 30_000
scheduler_decay_lr: float = 0.0
def __post_init__(self):
super().__post_init__()
if self.n_action_steps > self.chunk_size:
raise ValueError(
f"n_action_steps ({self.n_action_steps}) cannot be greater than chunk_size ({self.chunk_size})"
)
# Populate the serialized backbone config only when the caller did not provide one.
if self.vlm_config is None:
require_package("transformers", extra="eo1")
self.vlm_config = Qwen2_5_VLConfig.from_pretrained(self.vlm_base).to_dict()
@property
def vlm_backbone_config(self) -> Qwen2_5_VLConfig:
require_package("transformers", extra="eo1")
config_dict = deepcopy(self.vlm_config)
if self.attn_implementation is not None:
config_dict["attn_implementation"] = self.attn_implementation
return Qwen2_5_VLConfig(**config_dict)
@property
def text_config(self) -> Qwen2_5_VLTextConfig:
return self.vlm_backbone_config.text_config
@property
def vision_config(self) -> Qwen2_5_VLVisionConfig:
return self.vlm_backbone_config.vision_config
def validate_features(self) -> None:
"""Validate and set up EO1 input and output features."""
image_features = [key for key, feat in self.input_features.items() if feat.type == FeatureType.VISUAL]
if not image_features:
raise ValueError(
"EO1 policy requires at least one visual input feature. "
"No features of type FeatureType.VISUAL found in input_features."
)
if OBS_STATE not in self.input_features:
state_feature = PolicyFeature(
type=FeatureType.STATE,
shape=(self.max_state_dim,),
)
self.input_features[OBS_STATE] = state_feature
if ACTION not in self.output_features:
action_feature = PolicyFeature(
type=FeatureType.ACTION,
shape=(self.max_action_dim,),
)
self.output_features[ACTION] = action_feature
def get_optimizer_preset(self) -> AdamWConfig:
return AdamWConfig(
lr=self.optimizer_lr,
betas=self.optimizer_betas,
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
)
def get_scheduler_preset(self):
return CosineDecayWithWarmupSchedulerConfig(
peak_lr=self.optimizer_lr,
decay_lr=self.scheduler_decay_lr,
num_warmup_steps=self.scheduler_warmup_steps,
num_decay_steps=self.scheduler_decay_steps,
)
@property
def observation_delta_indices(self) -> None:
return None
@property
def action_delta_indices(self) -> list[int]:
return list(range(self.chunk_size))
@property
def reward_delta_indices(self) -> None:
return None
+620
View File
@@ -0,0 +1,620 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import contextlib
import logging
import math
from collections import deque
from typing import TYPE_CHECKING, Any
import torch
import torch.nn as nn
import torch.nn.functional as F # noqa: N812
import torch.utils.checkpoint
from torch import Tensor
from lerobot.policies.eo1.configuration_eo1 import EO1Config
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.utils.constants import ACTION, OBS_STATE
from lerobot.utils.import_utils import _transformers_available, require_package
if TYPE_CHECKING or _transformers_available:
from transformers.activations import ACT2FN
from transformers.models.qwen2_5_vl import Qwen2_5_VLForConditionalGeneration
from transformers.utils import torch_compilable_check
else:
ACT2FN = None
Qwen2_5_VLForConditionalGeneration = None
torch_compilable_check = None
logger = logging.getLogger(__name__)
def pad_vector(vector, new_dim):
"""Pad the last dimension of a vector to new_dim with zeros.
Can be (batch_size x sequence_length x features_dimension)
or (batch_size x features_dimension)
"""
if vector.shape[-1] >= new_dim:
return vector
return F.pad(vector, (0, new_dim - vector.shape[-1]))
class EO1Policy(PreTrainedPolicy):
"""EO1 policy wrapper for LeRobot robot-only training/evaluation."""
config_class = EO1Config
name = "eo1"
def __init__(self, config: EO1Config, **kwargs):
require_package("transformers", extra="eo1")
super().__init__(config)
config.validate_features()
self.config = config
if config.pretrained_path is None:
# Initialize from pretrained VLM
vlm_backbone = Qwen2_5_VLForConditionalGeneration.from_pretrained(
config.vlm_base,
dtype=config.dtype,
attn_implementation=config.attn_implementation,
)
else:
vlm_backbone = Qwen2_5_VLForConditionalGeneration._from_config(
config.vlm_backbone_config,
dtype=config.vlm_backbone_config.dtype if config.dtype == "auto" else config.dtype,
)
self.model = EO1VisionFlowMatchingModel(config, vlm_backbone)
if config.gradient_checkpointing:
self.model.gradient_checkpointing_enable()
self.model.to(config.device)
self.reset()
def reset(self):
self._action_queue = deque(maxlen=self.config.n_action_steps)
@staticmethod
def _get_model_inputs(batch: dict[str, Tensor], excluded_keys: set[str]) -> dict[str, Tensor]:
return {key: value for key, value in batch.items() if key not in excluded_keys}
def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict]:
state = self.prepare_state(batch[OBS_STATE])
actions = self.prepare_action(batch[ACTION])
model_inputs = self._get_model_inputs(batch, {OBS_STATE, ACTION})
loss = self.model(states=state, action=actions, **model_inputs)
loss_dict = {"loss": loss.item()}
return loss, loss_dict
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor:
self.eval()
states = self.prepare_state(batch[OBS_STATE])
model_inputs = self._get_model_inputs(batch, {OBS_STATE})
actions = self.model.sample_actions(states=states, **model_inputs).to(torch.float32)
original_action_dim = self.config.output_features[ACTION].shape[0]
return actions[:, :, :original_action_dim]
def prepare_state(self, state: Tensor) -> Tensor:
return pad_vector(state, self.config.max_state_dim)
def prepare_action(self, action: Tensor) -> Tensor:
return pad_vector(action, self.config.max_action_dim)
@torch.no_grad()
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
self.eval()
if len(self._action_queue) == 0:
actions = self.predict_action_chunk(batch)[:, : self.config.n_action_steps]
self._action_queue.extend(actions.transpose(0, 1))
return self._action_queue.popleft()
def get_optim_params(self) -> dict:
return self.parameters()
def get_safe_dtype(target_dtype, device_type):
"""Get a safe dtype for the given device type."""
if device_type == "mps" and target_dtype == torch.float64:
return torch.float32
if device_type == "cpu":
# CPU doesn't support bfloat16, use float32 instead
if target_dtype == torch.bfloat16:
return torch.float32
if target_dtype == torch.float64:
return torch.float64
return target_dtype
def create_sinusoidal_pos_embedding( # see openpi `create_sinusoidal_pos_embedding` (exact copy)
time: torch.Tensor, dimension: int, min_period: float, max_period: float, device="cpu"
) -> Tensor:
"""Computes sine-cosine positional embedding vectors for scalar positions."""
if dimension % 2 != 0:
raise ValueError(f"dimension ({dimension}) must be divisible by 2")
if time.ndim != 1:
raise ValueError("The time tensor is expected to be of shape `(batch_size, )`.")
dtype = get_safe_dtype(torch.float64, device.type)
fraction = torch.linspace(0.0, 1.0, dimension // 2, dtype=dtype, device=device)
period = min_period * (max_period / min_period) ** fraction
# Compute the outer product
scaling_factor = 1.0 / period * 2 * math.pi
sin_input = scaling_factor[None, :] * time[:, None]
return torch.cat([torch.sin(sin_input), torch.cos(sin_input)], dim=1)
def sample_beta(alpha, beta, bsize, device): # see openpi `sample_beta` (exact copy)
# Beta sampling uses _sample_dirichlet which isn't implemented for MPS, so sample on CPU
alpha_t = torch.tensor(alpha, dtype=torch.float32)
beta_t = torch.tensor(beta, dtype=torch.float32)
dist = torch.distributions.Beta(alpha_t, beta_t)
return dist.sample((bsize,)).to(device)
class EO1VisionActionProjector(torch.nn.Sequential):
"""This block implements the multi-layer perceptron (MLP) module."""
def __init__(
self,
in_channels: int,
out_channels: int,
num_layers: int = 2,
activation_layer: str = "linear",
bias: bool = True,
device: Any = None,
dtype: torch.dtype = torch.float32,
):
layers = []
in_dim = in_channels
hidden_channels = [in_dim] * (num_layers - 1) + [out_channels]
for hidden_dim in hidden_channels[:-1]:
layers.append(torch.nn.Linear(in_dim, hidden_dim, bias=bias, dtype=dtype, device=device))
layers.append(ACT2FN[activation_layer])
in_dim = hidden_dim
layers.append(torch.nn.Linear(in_dim, hidden_channels[-1], bias=bias, dtype=dtype, device=device))
super().__init__(*layers)
@property
def dtype(self):
return self[0].weight.dtype
class EO1VisionFlowMatchingModel(nn.Module):
def __init__(
self,
config: EO1Config,
vlm_backbone: Qwen2_5_VLForConditionalGeneration | None = None,
):
require_package("transformers", extra="eo1")
super().__init__()
self.config = config
# Preserve the backbone dtype selected at construction time so Qwen's fp32 rotary buffers stay intact.
self.vlm_backbone = vlm_backbone
self.hidden_size = self.vlm_backbone.config.text_config.hidden_size
max_state_dim = config.max_state_dim
max_action_dim = config.max_action_dim
self.state_proj = nn.Linear(max_state_dim, self.hidden_size, dtype=torch.float32)
self.action_in_proj = nn.Linear(max_action_dim, self.hidden_size, dtype=torch.float32)
self.action_out_proj = EO1VisionActionProjector(
self.hidden_size,
max_action_dim,
config.num_action_layers,
config.action_act,
dtype=torch.float32,
)
self.action_time_mlp_in = nn.Linear(self.hidden_size * 2, self.hidden_size, dtype=torch.float32)
self.action_time_mlp_out = nn.Linear(self.hidden_size, self.hidden_size, dtype=torch.float32)
self.gradient_checkpointing_enabled = False
def get_input_embeddings(self):
return self.vlm_backbone.get_input_embeddings()
def flow_head_autocast_context(self):
if self.config.force_fp32_autocast:
return torch.autocast(
device_type=self.state_proj.weight.device.type,
enabled=False,
)
return contextlib.nullcontext()
def gradient_checkpointing_enable(self):
"""Enable gradient checkpointing for the Qwen2.5-VL backbone."""
self.gradient_checkpointing_enabled = True
self.vlm_backbone.gradient_checkpointing_enable(
gradient_checkpointing_kwargs={"use_reentrant": False}
)
logger.info("Enabled gradient checkpointing for EO1VisionFlowMatchingModel")
def gradient_checkpointing_disable(self):
"""Disable gradient checkpointing for the Qwen2.5-VL backbone."""
self.gradient_checkpointing_enabled = False
self.vlm_backbone.gradient_checkpointing_disable()
logger.info("Disabled gradient checkpointing for EO1VisionFlowMatchingModel")
def _apply_checkpoint(self, func, *args, **kwargs):
"""Apply manual gradient checkpointing to EO1 flow-head computations when training."""
if self.gradient_checkpointing_enabled and self.training and torch.is_grad_enabled():
return torch.utils.checkpoint.checkpoint(
func, *args, use_reentrant=False, preserve_rng_state=False, **kwargs
)
return func(*args, **kwargs)
def sample_noise(self, shape, device):
noise = torch.normal(
mean=0.0,
std=1.0,
size=shape,
dtype=torch.float32,
device=device,
)
return noise
def sample_time(self, bsize, device):
time_beta = sample_beta(
self.config.time_sampling_beta_alpha, self.config.time_sampling_beta_beta, bsize, device
)
time = time_beta * self.config.time_sampling_scale + self.config.time_sampling_offset
return time.to(dtype=torch.float32, device=device)
def get_placeholder_mask(
self,
input_ids: torch.LongTensor | None,
inputs_embeds: torch.FloatTensor | None,
state_features: torch.FloatTensor | None = None,
action_features: torch.FloatTensor | None = None,
*,
state_token_id: int,
action_token_id: int,
) -> tuple[torch.BoolTensor, torch.BoolTensor]:
"""Return EO1 state/action placeholder masks, following Qwen's multimodal mask style."""
if input_ids is None:
special_state_mask = inputs_embeds == self.get_input_embeddings()(
torch.tensor(state_token_id, dtype=torch.long, device=inputs_embeds.device)
)
special_state_mask = special_state_mask.all(-1)
special_action_mask = inputs_embeds == self.get_input_embeddings()(
torch.tensor(action_token_id, dtype=torch.long, device=inputs_embeds.device)
)
special_action_mask = special_action_mask.all(-1)
else:
special_state_mask = input_ids == state_token_id
special_action_mask = input_ids == action_token_id
n_state_tokens = special_state_mask.sum()
special_state_mask = (
special_state_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device)
)
if state_features is not None:
torch_compilable_check(
inputs_embeds[special_state_mask].numel() == state_features.numel(),
f"State features and state tokens do not match, tokens: {n_state_tokens}, features: {state_features.shape[0]}",
)
n_action_tokens = special_action_mask.sum()
special_action_mask = (
special_action_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device)
)
if action_features is not None:
torch_compilable_check(
inputs_embeds[special_action_mask].numel() == action_features.numel(),
f"Action features and action tokens do not match, tokens: {n_action_tokens}, features: {action_features.shape[0]}",
)
return special_state_mask, special_action_mask
def embed_prefix(
self,
input_ids: torch.LongTensor,
states: torch.Tensor,
*,
state_token_id: int,
action_token_id: int,
) -> torch.FloatTensor:
"""Embed the EO1 prefix tokens before native Qwen injects multimodal features."""
# Get the input embeddings for the input IDs
def input_embed_func(input_ids: torch.LongTensor) -> torch.FloatTensor:
return self.get_input_embeddings()(input_ids)
inputs_embeds = self._apply_checkpoint(input_embed_func, input_ids)
# Project the states to the hidden size
def state_proj_func(states: torch.Tensor) -> torch.FloatTensor:
with self.flow_head_autocast_context():
states = states.to(dtype=self.state_proj.weight.dtype)
return self.state_proj(states)
state_embs = self._apply_checkpoint(state_proj_func, states)
state_mask, _ = self.get_placeholder_mask(
input_ids,
inputs_embeds,
state_features=state_embs,
state_token_id=state_token_id,
action_token_id=action_token_id,
)
state_embs = state_embs.to(inputs_embeds.device, inputs_embeds.dtype)
inputs_embeds = inputs_embeds.masked_scatter(state_mask, state_embs)
return inputs_embeds
def embed_suffix(
self,
timestep: torch.Tensor,
noisy_actions: torch.Tensor,
) -> torch.FloatTensor:
"""Embed the suffix"""
def action_proj_func(noisy_actions: torch.Tensor) -> torch.FloatTensor:
with self.flow_head_autocast_context():
noisy_actions = noisy_actions.to(dtype=self.action_in_proj.weight.dtype)
return self.action_in_proj(noisy_actions)
action_embs = self._apply_checkpoint(action_proj_func, noisy_actions)
time_embs = create_sinusoidal_pos_embedding(
timestep,
self.hidden_size,
min_period=self.config.min_period,
max_period=self.config.max_period,
device=action_embs.device,
)
time_embs = time_embs.to(dtype=action_embs.dtype)
time_embs = time_embs[:, None, :].expand_as(action_embs)
action_time_embs = torch.cat([action_embs, time_embs], dim=2)
def mlp_func(action_time_embs: torch.Tensor) -> torch.FloatTensor:
with self.flow_head_autocast_context():
action_time_embs = action_time_embs.to(dtype=self.action_time_mlp_in.weight.dtype)
action_time_embs = self.action_time_mlp_in(action_time_embs)
action_time_embs = F.silu(action_time_embs)
return self.action_time_mlp_out(action_time_embs)
action_time_embs = self._apply_checkpoint(mlp_func, action_time_embs)
return action_time_embs
def forward(
self,
input_ids: torch.LongTensor | None = None,
attention_mask: torch.LongTensor | None = None,
pixel_values: torch.FloatTensor | None = None,
image_grid_thw: torch.LongTensor | None = None,
mm_token_type_ids: torch.IntTensor | None = None,
states: torch.FloatTensor | None = None,
action: torch.FloatTensor | None = None,
action_is_pad: torch.BoolTensor | None = None,
*,
state_token_id: int,
action_token_id: int,
**kwargs,
) -> Tensor:
"""Run the EO1 training forward pass and compute the flow-matching loss."""
# 1. Build the EO1 prefix with state placeholders resolved.
inputs_embeds = self.embed_prefix(
input_ids,
states=states,
state_token_id=state_token_id,
action_token_id=action_token_id,
)
# 2. Sample the diffusion target and replace the action placeholders.
time = self.sample_time(action.shape[0], inputs_embeds.device)
noise = self.sample_noise(action.shape, inputs_embeds.device)
time_expanded = time[:, None, None]
x_t = time_expanded * noise + (1 - time_expanded) * action
u_t = noise - action
action_time_embs = self.embed_suffix(time, x_t)
_, action_mask = self.get_placeholder_mask(
input_ids,
inputs_embeds,
action_features=action_time_embs,
state_token_id=state_token_id,
action_token_id=action_token_id,
)
action_time_embs = action_time_embs.to(inputs_embeds.device, inputs_embeds.dtype)
inputs_embeds = inputs_embeds.masked_scatter(action_mask, action_time_embs)
# 3. Optionally drop padded action tokens from backbone attention.
if attention_mask is not None:
attention_mask = attention_mask.to(inputs_embeds.device)
if not self.config.supervise_padding_actions:
action_is_pad = action_is_pad.to(device=inputs_embeds.device, dtype=torch.bool)
action_token_mask = action_mask[..., 0]
action_padding_mask = torch.zeros_like(action_token_mask)
action_padding_mask = action_padding_mask.masked_scatter(
action_token_mask,
action_is_pad.reshape(-1),
)
attention_mask = attention_mask.masked_fill(action_padding_mask, 0)
# 4. Run the Qwen backbone on the fused EO1 sequence.
def vlm_forward_func(
input_ids: torch.LongTensor,
attention_mask: torch.Tensor | None,
inputs_embeds: torch.FloatTensor,
pixel_values: torch.Tensor | None,
image_grid_thw: torch.LongTensor | None,
mm_token_type_ids: torch.IntTensor | None,
) -> torch.FloatTensor:
outputs = self.vlm_backbone.model(
input_ids=input_ids,
attention_mask=attention_mask,
inputs_embeds=inputs_embeds,
pixel_values=pixel_values,
image_grid_thw=image_grid_thw,
mm_token_type_ids=mm_token_type_ids,
use_cache=False,
output_hidden_states=False,
return_dict=True,
)
return outputs.last_hidden_state
hidden_states = self._apply_checkpoint(
vlm_forward_func,
input_ids,
attention_mask,
inputs_embeds,
pixel_values,
image_grid_thw,
mm_token_type_ids,
)
action_hidden_states = hidden_states[action_mask[..., 0]]
# 5. Project the action-token hidden states back to the flow target space.
def action_out_proj_func(action_hidden_states: torch.FloatTensor) -> torch.FloatTensor:
with self.flow_head_autocast_context():
action_hidden_states = action_hidden_states.to(dtype=self.action_out_proj.dtype)
return self.action_out_proj(action_hidden_states)
v_t = self._apply_checkpoint(action_out_proj_func, action_hidden_states)
v_t = v_t.reshape(u_t.shape).to(dtype=u_t.dtype)
losses = F.mse_loss(u_t, v_t, reduction="none")
# 6. Apply the configured supervision mask and reduce the loss.
if not self.config.supervise_padding_action_dims:
original_action_dim = self.config.output_features[ACTION].shape[0]
losses = losses[..., :original_action_dim]
if not self.config.supervise_padding_actions:
losses = losses[~action_is_pad]
return losses.mean()
@torch.no_grad()
def sample_actions(
self,
input_ids: torch.LongTensor | None = None,
attention_mask: torch.Tensor | None = None,
pixel_values: torch.Tensor | None = None,
image_grid_thw: torch.LongTensor | None = None,
mm_token_type_ids: torch.IntTensor | None = None,
states: torch.Tensor | None = None,
*,
state_token_id: int,
action_token_id: int,
**kwargs,
) -> Tensor:
"""Sample actions from the model."""
if states is None:
raise ValueError("states are required for EO1 action sampling.")
if mm_token_type_ids is None:
raise ValueError("mm_token_type_ids are required for EO1 action sampling.")
# 1. Resolve the left-padded rollout prompt and locate the action span.
chunk_size = self.config.chunk_size
inputs_embeds = self.embed_prefix(
input_ids,
states=states,
state_token_id=state_token_id,
action_token_id=action_token_id,
).clone()
_, action_placeholder_mask = self.get_placeholder_mask(
input_ids,
inputs_embeds,
state_token_id=state_token_id,
action_token_id=action_token_id,
)
action_mask = action_placeholder_mask[..., 0]
token_counts = action_mask.sum(dim=1)
if not torch.all(token_counts == chunk_size):
raise ValueError(
f"Each sample must contain exactly {chunk_size} action tokens, got {token_counts.tolist()}."
)
if action_mask.ne(action_mask[:1]).any():
raise ValueError(
"Batch inference expects all samples to share the same action token mask after left padding."
)
act_start = int(action_mask[0].to(torch.int64).argmax().item())
act_end = act_start + self.config.chunk_size
if not torch.all(action_mask[:, act_start:act_end]):
raise ValueError("Action tokens must form a contiguous chunk of length chunk_size.")
act_slice = slice(act_start, act_end)
# 2. Encode the fixed prefix once and cache its KV state.
batch_size = input_ids.shape[0]
device = inputs_embeds.device
attention_mask = attention_mask.to(device)
mm_token_type_ids = mm_token_type_ids.to(device)
position_ids, _ = self.vlm_backbone.model.get_rope_index(
input_ids,
image_grid_thw=image_grid_thw,
attention_mask=attention_mask,
mm_token_type_ids=mm_token_type_ids,
)
position_ids = position_ids.to(device)
outputs = self.vlm_backbone.model(
input_ids=input_ids[:, :act_start],
attention_mask=attention_mask[:, :act_start],
position_ids=position_ids[..., :act_start],
inputs_embeds=inputs_embeds[:, :act_start],
pixel_values=pixel_values,
image_grid_thw=image_grid_thw,
mm_token_type_ids=mm_token_type_ids[:, :act_start],
use_cache=True,
return_dict=True,
)
x_t = self.sample_noise(
(batch_size, chunk_size, self.config.max_action_dim),
device,
).to(dtype=self.action_in_proj.weight.dtype)
dt = -1.0 / self.config.num_denoise_steps
past_key_values = outputs.past_key_values
# 3. Denoise only the action chunk while keeping the prefix cache invariant.
for step in range(self.config.num_denoise_steps):
time = torch.full(
(batch_size,),
1.0 + step * dt,
device=device,
dtype=torch.float32,
)
action_time_embs = self.embed_suffix(time, x_t)
inputs_embeds[:, act_slice] = action_time_embs.to(inputs_embeds.dtype)
# Keep the prefix KV cache invariant across denoising steps.
past_key_values.crop(act_start)
outputs = self.vlm_backbone.model(
attention_mask=attention_mask[:, :act_end],
past_key_values=past_key_values,
inputs_embeds=inputs_embeds[:, act_slice],
position_ids=position_ids[..., act_slice],
use_cache=True,
return_dict=True,
)
with self.flow_head_autocast_context():
hidden_states = outputs.last_hidden_state[:, :chunk_size]
hidden_states = hidden_states.to(dtype=self.action_out_proj.dtype)
v_t = self.action_out_proj(hidden_states)
x_t += dt * v_t.reshape(x_t.shape)
return x_t
+282
View File
@@ -0,0 +1,282 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import torch
from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.policies.eo1.configuration_eo1 import EO1Config
from lerobot.processor import (
AddBatchDimensionProcessorStep,
ComplementaryDataProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyProcessorPipeline,
ProcessorStep,
ProcessorStepRegistry,
RenameObservationsProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action
from lerobot.types import TransitionKey
from lerobot.utils.constants import (
OBS_STATE,
POLICY_POSTPROCESSOR_DEFAULT_NAME,
POLICY_PREPROCESSOR_DEFAULT_NAME,
)
from lerobot.utils.import_utils import _transformers_available, require_package
if TYPE_CHECKING or _transformers_available:
from transformers.models.qwen2_5_vl import Qwen2_5_VLProcessor
else:
Qwen2_5_VLProcessor = None
SYSTEM_MESSAGE = "You are a helpful physical assistant."
# EO-1 special tokens
ACTION_START_TOKEN = "<|action_start|>" # nosec B105
DEFAULT_ACTION_TOKEN = "<|action_pad|>" # nosec B105
ACTION_END_TOKEN = "<|action_end|>" # nosec B105
STATE_START_TOKEN = "<|state_start|>" # nosec B105
DEFAULT_STATE_TOKEN = "<|state_pad|>" # nosec B105
STATE_END_TOKEN = "<|state_end|>" # nosec B105
TASK_VLA_TOKEN = "<|vla|>" # nosec B105
EO1_SPECIAL_TOKENS = [
ACTION_START_TOKEN,
DEFAULT_ACTION_TOKEN,
ACTION_END_TOKEN,
STATE_START_TOKEN,
DEFAULT_STATE_TOKEN,
STATE_END_TOKEN,
TASK_VLA_TOKEN,
]
@dataclass
@ProcessorStepRegistry.register(name="eo1_conversation_template_processor")
class EO1ConversationTemplateStep(ComplementaryDataProcessorStep):
input_features: dict[str, PolicyFeature] | dict[str, dict[str, Any]]
chunk_size: int
_image_keys: list[str] = field(default_factory=list, init=False, repr=False)
def __post_init__(self):
# Robust JSON deserialization handling (guard empty maps).
if self.input_features:
first_val = next(iter(self.input_features.values()))
if isinstance(first_val, dict):
reconstructed = {}
for key, ft_dict in self.input_features.items():
reconstructed[key] = PolicyFeature(
type=FeatureType(ft_dict["type"]), shape=tuple(ft_dict["shape"])
)
self.input_features = reconstructed
self._image_keys = [
key for key, value in self.input_features.items() if value.type == FeatureType.VISUAL
]
def complementary_data(self, complementary_data):
tasks = complementary_data.get("task")
if tasks is None:
raise ValueError("Task is required for EO1ConversationTemplateStep.")
observation = self.transition.get(TransitionKey.OBSERVATION)
if observation is None:
raise ValueError("Observation is required for EO1ConversationTemplateStep.")
if OBS_STATE in observation and observation[OBS_STATE].shape[0] != len(tasks):
raise ValueError("Batch size mismatch between observation.state and task list.")
# LeRobot visual observations reach in processor as float32 tensors in [0, 1].
# Convert to uint8 in [0, 255] to meet the input requirement of Qwen2.5-VL-3B-Instruct.
images = {
key: observation[key].clamp(0, 1).mul(255.0).round().to(torch.uint8) for key in self._image_keys
}
messages = []
for i in range(len(tasks)):
content = [
*[{"type": "image", "image": images[key][i]} for key in self._image_keys],
{
"type": "text",
"text": (
f"{STATE_START_TOKEN}{DEFAULT_STATE_TOKEN}{STATE_END_TOKEN}{tasks[i]}{TASK_VLA_TOKEN}"
),
},
]
messages.append(
[
{"role": "system", "content": [{"type": "text", "text": SYSTEM_MESSAGE}]},
{"role": "user", "content": content},
{
"role": "assistant",
"content": [
{
"type": "text",
"text": f"{ACTION_START_TOKEN}{DEFAULT_ACTION_TOKEN * self.chunk_size}{ACTION_END_TOKEN}",
}
],
},
]
)
complementary_data["messages"] = messages
return complementary_data
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
"""
This step only materializes EO1-specific message objects in complementary_data.
PipelineFeatureType tracks only ACTION and OBSERVATION, so there is no static
feature contract change to record here.
"""
return features
def get_config(self) -> dict[str, Any]:
return {
"input_features": {
key: {"type": ft.type.value, "shape": ft.shape} for key, ft in self.input_features.items()
},
"chunk_size": self.chunk_size,
}
@dataclass
@ProcessorStepRegistry.register(name="eo1_qwen_processor")
class EO1QwenProcessorStep(ComplementaryDataProcessorStep):
processor_name: str = "Qwen/Qwen2.5-VL-3B-Instruct"
image_min_pixels: int | None = 64 * 28 * 28
image_max_pixels: int | None = 128 * 28 * 28
use_fast_processor: bool = False
_processor: Qwen2_5_VLProcessor | None = field(default=None, init=False, repr=False)
_state_token_id: int | None = field(default=None, init=False, repr=False)
_action_token_id: int | None = field(default=None, init=False, repr=False)
def __post_init__(self):
require_package("transformers", extra="eo1")
self._processor = Qwen2_5_VLProcessor.from_pretrained(
self.processor_name,
use_fast=self.use_fast_processor,
)
self._processor.tokenizer.add_tokens(EO1_SPECIAL_TOKENS, special_tokens=True)
self._state_token_id = self._processor.tokenizer.convert_tokens_to_ids(DEFAULT_STATE_TOKEN)
self._action_token_id = self._processor.tokenizer.convert_tokens_to_ids(DEFAULT_ACTION_TOKEN)
def complementary_data(self, complementary_data):
messages = complementary_data.pop("messages", None)
if messages is None:
raise ValueError("Messages are required for EO1QwenProcessorStep.")
# Rollout batches use left padding so action spans stay aligned across samples.
# Supervised batches use right padding to match standard training collation.
padding_side = "right" if self.transition.get(TransitionKey.ACTION) is not None else "left"
inputs = self._processor.apply_chat_template(
messages,
tokenize=True,
padding=True,
padding_side=padding_side,
min_pixels=self.image_min_pixels,
max_pixels=self.image_max_pixels,
add_generation_prompt=False,
return_dict=True,
return_tensors="pt",
)
complementary_data["input_ids"] = inputs["input_ids"]
complementary_data["pixel_values"] = inputs["pixel_values"]
complementary_data["image_grid_thw"] = inputs["image_grid_thw"]
complementary_data["attention_mask"] = inputs["attention_mask"]
complementary_data["mm_token_type_ids"] = inputs["mm_token_type_ids"]
complementary_data["state_token_id"] = self._state_token_id
complementary_data["action_token_id"] = self._action_token_id
return complementary_data
def get_config(self) -> dict[str, Any]:
return {
"processor_name": self.processor_name,
"image_min_pixels": self.image_min_pixels,
"image_max_pixels": self.image_max_pixels,
"use_fast_processor": self.use_fast_processor,
}
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
"""
This step only converts the messages to the model input format.
"""
return features
def make_eo1_pre_post_processors(
config: EO1Config,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""Build pre/post processor pipelines for EO1."""
input_steps: list[ProcessorStep] = [
RenameObservationsProcessorStep(rename_map={}),
AddBatchDimensionProcessorStep(),
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
EO1ConversationTemplateStep(input_features=config.input_features, chunk_size=config.chunk_size),
EO1QwenProcessorStep(
processor_name=config.vlm_base,
image_min_pixels=config.image_min_pixels,
image_max_pixels=config.image_max_pixels,
use_fast_processor=config.use_fast_processor,
),
DeviceProcessorStep(device=config.device),
]
output_steps: list[ProcessorStep] = [
UnnormalizerProcessorStep(
features=config.output_features,
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
name=POLICY_PREPROCESSOR_DEFAULT_NAME,
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps,
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
),
)
+20 -32
View File
@@ -46,14 +46,13 @@ from lerobot.utils.feature_utils import dataset_to_policy_features
from .act.configuration_act import ACTConfig
from .diffusion.configuration_diffusion import DiffusionConfig
from .eo1.configuration_eo1 import EO1Config
from .groot.configuration_groot import GrootConfig
from .multi_task_dit.configuration_multi_task_dit import MultiTaskDiTConfig
from .pi0.configuration_pi0 import PI0Config
from .pi05.configuration_pi05 import PI05Config
from .pretrained import PreTrainedPolicy
from .sac.configuration_sac import SACConfig
from .sac.reward_model.configuration_classifier import RewardClassifierConfig
from .sarm.configuration_sarm import SARMConfig
from .smolvla.configuration_smolvla import SmolVLAConfig
from .tdmpc.configuration_tdmpc import TDMPCConfig
from .utils import validate_visual_features_consistency
@@ -89,7 +88,7 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
Args:
name: The name of the policy. Supported names are "tdmpc", "diffusion", "act",
"multi_task_dit", "vqbet", "pi0", "pi05", "sac", "reward_classifier", "smolvla", "wall_x".
"multi_task_dit", "vqbet", "pi0", "pi05", "sac", "smolvla", "wall_x".
Returns:
The policy class corresponding to the given name.
@@ -132,18 +131,10 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
from .sac.modeling_sac import SACPolicy
return SACPolicy
elif name == "reward_classifier":
from .sac.reward_model.modeling_classifier import Classifier
return Classifier
elif name == "smolvla":
from .smolvla.modeling_smolvla import SmolVLAPolicy
return SmolVLAPolicy
elif name == "sarm":
from .sarm.modeling_sarm import SARMRewardModel
return SARMRewardModel
elif name == "groot":
from .groot.modeling_groot import GrootPolicy
@@ -156,6 +147,10 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
from .wall_x.modeling_wall_x import WallXPolicy
return WallXPolicy
elif name == "eo1":
from .eo1.modeling_eo1 import EO1Policy
return EO1Policy
else:
try:
return _get_policy_cls_from_policy_name(name=name)
@@ -173,7 +168,7 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
Args:
policy_type: The type of the policy. Supported types include "tdmpc",
"multi_task_dit", "diffusion", "act", "vqbet", "pi0", "pi05", "sac",
"smolvla", "reward_classifier", "wall_x".
"smolvla", "wall_x".
**kwargs: Keyword arguments to be passed to the configuration class constructor.
Returns:
@@ -200,14 +195,14 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
return SACConfig(**kwargs)
elif policy_type == "smolvla":
return SmolVLAConfig(**kwargs)
elif policy_type == "reward_classifier":
return RewardClassifierConfig(**kwargs)
elif policy_type == "groot":
return GrootConfig(**kwargs)
elif policy_type == "xvla":
return XVLAConfig(**kwargs)
elif policy_type == "wall_x":
return WallXConfig(**kwargs)
elif policy_type == "eo1":
return EO1Config(**kwargs)
else:
try:
config_cls = PreTrainedConfig.get_choice_class(policy_type)
@@ -378,14 +373,6 @@ def make_pre_post_processors(
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, RewardClassifierConfig):
from .sac.reward_model.processor_classifier import make_classifier_processor
processors = make_classifier_processor(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, SmolVLAConfig):
from .smolvla.processor_smolvla import make_smolvla_pre_post_processors
@@ -394,14 +381,6 @@ def make_pre_post_processors(
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, SARMConfig):
from .sarm.processor_sarm import make_sarm_pre_post_processors
processors = make_sarm_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
dataset_meta=kwargs.get("dataset_meta"),
)
elif isinstance(policy_cfg, GrootConfig):
from .groot.processor_groot import make_groot_pre_post_processors
@@ -427,6 +406,13 @@ def make_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, EO1Config):
from .eo1.processor_eo1 import make_eo1_pre_post_processors
processors = make_eo1_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
else:
try:
@@ -542,7 +528,7 @@ def make_policy(
logging.info("Loading policy's PEFT adapter.")
peft_pretrained_path = cfg.pretrained_path
peft_pretrained_path = str(cfg.pretrained_path)
peft_config = PeftConfig.from_pretrained(peft_pretrained_path)
kwargs["pretrained_name_or_path"] = peft_config.base_model_name_or_path
@@ -555,7 +541,9 @@ def make_policy(
)
policy = policy_cls.from_pretrained(**kwargs)
policy = PeftModel.from_pretrained(policy, peft_pretrained_path, config=peft_config)
policy = PeftModel.from_pretrained(
policy, peft_pretrained_path, config=peft_config, is_trainable=True
)
else:
# Make a fresh policy.
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from dataclasses import field
from typing import TYPE_CHECKING
import torch
@@ -109,7 +109,6 @@ class MultiEmbodimentActionEncoder(nn.Module):
return x
@dataclass
class FlowmatchingActionHeadConfig(PretrainedConfig):
"""NOTE: N1.5 uses XEmbFlowmatchingPolicyHeadConfig as action head"""
+5 -9
View File
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING
@@ -174,17 +173,14 @@ N_COLOR_CHANNELS = 3
# config
@dataclass
class GR00TN15Config(PretrainedConfig):
model_type = "gr00t_n1_5"
backbone_cfg: dict = field(init=False, metadata={"help": "Backbone configuration."})
action_head_cfg: dict = field(init=False, metadata={"help": "Action head configuration."})
action_horizon: int = field(init=False, metadata={"help": "Action horizon."})
action_dim: int = field(init=False, metadata={"help": "Action dimension."})
compute_dtype: str = field(default="float32", metadata={"help": "Compute dtype."})
backbone_cfg: dict
action_head_cfg: dict
action_horizon: int
action_dim: int
compute_dtype: str = "float32"
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -688,8 +688,9 @@ class DiffusionObjective(nn.Module):
loss = F.mse_loss(predicted, target, reduction="none")
if self.do_mask_loss_for_padding and "action_is_pad" in batch:
valid_actions = ~batch["action_is_pad"]
loss = loss * valid_actions.unsqueeze(-1)
mask = ~batch["action_is_pad"].unsqueeze(-1)
num_valid = mask.sum() * loss.shape[-1]
return (loss * mask).sum() / num_valid.clamp_min(1)
return loss.mean()
@@ -752,8 +753,9 @@ class FlowMatchingObjective(nn.Module):
loss = F.mse_loss(predicted_velocity, target_velocity, reduction="none")
if self.do_mask_loss_for_padding and "action_is_pad" in batch:
valid_mask = ~batch["action_is_pad"]
loss = loss * valid_mask.unsqueeze(-1)
mask = ~batch["action_is_pad"].unsqueeze(-1)
num_valid = mask.sum() * loss.shape[-1]
return (loss * mask).sum() / num_valid.clamp_min(1)
return loss.mean()
+8 -14
View File
@@ -444,13 +444,13 @@ class PaliGemmaWithExpertModel(
if image.dtype != torch.float32:
image = image.to(torch.float32)
image_outputs = self.paligemma.model.get_image_features(image)
features = image_outputs.pooler_output * self.paligemma.config.text_config.hidden_size**0.5
features = image_outputs.pooler_output
if features.dtype != out_dtype:
features = features.to(out_dtype)
return features
def embed_language_tokens(self, tokens: torch.Tensor):
return self.paligemma.model.language_model.embed_tokens(tokens)
return self.paligemma.model.language_model.get_input_embeddings()(tokens)
def forward(
self,
@@ -666,8 +666,7 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
# Process language tokens
def lang_embed_func(lang_tokens):
lang_emb = self.paligemma_with_expert.embed_language_tokens(lang_tokens)
lang_emb_dim = lang_emb.shape[-1]
return lang_emb * math.sqrt(lang_emb_dim)
return lang_emb
lang_emb = self._apply_checkpoint(lang_embed_func, lang_tokens)
embs.append(lang_emb)
@@ -748,16 +747,8 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
return embs, pad_masks, att_masks, adarms_cond
def forward(
self, images, img_masks, lang_tokens, lang_masks, state, actions, noise=None, time=None
) -> Tensor:
def forward(self, images, img_masks, lang_tokens, lang_masks, state, actions, noise, time) -> Tensor:
"""Do a full training forward pass and compute the loss."""
if noise is None:
noise = self.sample_noise(actions.shape, actions.device)
if time is None:
time = self.sample_time(actions.shape[0], actions.device)
time_expanded = time[:, None, None]
x_t = time_expanded * noise + (1 - time_expanded) * actions
u_t = noise - actions
@@ -1292,8 +1283,11 @@ class PI0Policy(PreTrainedPolicy):
state = self.prepare_state(batch)
actions = self.prepare_action(batch)
noise = self.model.sample_noise(actions.shape, actions.device)
time = self.model.sample_time(actions.shape[0], actions.device)
# Compute loss
losses = self.model.forward(images, img_masks, lang_tokens, lang_masks, state, actions)
losses = self.model.forward(images, img_masks, lang_tokens, lang_masks, state, actions, noise, time)
# Truncate losses to actual action dimensions
original_action_dim = self.config.output_features[ACTION].shape[0]
+5 -8
View File
@@ -728,14 +728,8 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
return embs, pad_masks, att_masks, adarms_cond
def forward(self, images, img_masks, tokens, masks, actions, noise=None, time=None) -> Tensor:
def forward(self, images, img_masks, tokens, masks, actions, noise, time) -> Tensor:
"""Do a full training forward pass and compute the loss."""
if noise is None:
noise = self.sample_noise(actions.shape, actions.device)
if time is None:
time = self.sample_time(actions.shape[0], actions.device)
time_expanded = time[:, None, None]
x_t = time_expanded * noise + (1 - time_expanded) * actions
u_t = noise - actions
@@ -1262,8 +1256,11 @@ class PI05Policy(PreTrainedPolicy):
actions = self.prepare_action(batch)
noise = self.model.sample_noise(actions.shape, actions.device)
time = self.model.sample_time(actions.shape[0], actions.device)
# Compute loss (no separate state needed for PI05)
losses = self.model.forward(images, img_masks, tokens, masks, actions)
losses = self.model.forward(images, img_masks, tokens, masks, actions, noise, time)
# Truncate losses to actual action dimensions
original_action_dim = self.config.output_features[ACTION].shape[0]
@@ -16,7 +16,6 @@
import builtins
import logging
import math
from collections import deque
from pathlib import Path
from typing import TYPE_CHECKING, Literal, TypedDict, Unpack
@@ -227,6 +226,7 @@ class PI0FastPaliGemma(nn.Module):
# forward(..., adarms_cond=...) is supported (same as pi0/pi05).
if use_adarms[0]:
text_config = self.paligemma.config.text_config
del self.paligemma.model.language_model
self.paligemma.model.language_model = PiGemmaModel(text_config)
self.to_bfloat16_for_selected_params(precision)
@@ -260,13 +260,15 @@ class PI0FastPaliGemma(nn.Module):
if image.dtype != torch.float32:
image = image.to(torch.float32)
image_outputs = self.paligemma.model.get_image_features(image)
features = image_outputs.pooler_output * self.paligemma.config.text_config.hidden_size**0.5
features = image_outputs.pooler_output
norm = 2048**0.5
features = features / norm * norm
if features.dtype != out_dtype:
features = features.to(out_dtype)
return features
def embed_language_tokens(self, tokens: torch.Tensor):
return self.paligemma.model.language_model.embed_tokens(tokens)
return self.paligemma.model.language_model.get_input_embeddings()(tokens)
def forward(
self,
@@ -416,8 +418,7 @@ class PI0FastPytorch(nn.Module): # see openpi `PI0Pytorch`
# Process language instruction tokens
def lang_embed_func(tokens):
lang_emb = self.paligemma_with_expert.embed_language_tokens(tokens)
lang_emb_dim = lang_emb.shape[-1]
return lang_emb * math.sqrt(lang_emb_dim)
return lang_emb
lang_emb = self._apply_checkpoint(lang_embed_func, tokens)
embs.append(lang_emb)
@@ -431,8 +432,7 @@ class PI0FastPytorch(nn.Module): # see openpi `PI0Pytorch`
def fast_action_embed_func(fast_action_tokens):
fast_emb = self.paligemma_with_expert.embed_language_tokens(fast_action_tokens)
fast_emb_dim = fast_emb.shape[-1]
return fast_emb * math.sqrt(fast_emb_dim)
return fast_emb
fast_action_emb = self._apply_checkpoint(fast_action_embed_func, fast_action_tokens)
embs.append(fast_action_emb)
@@ -665,7 +665,6 @@ class PI0FastPytorch(nn.Module): # see openpi `PI0Pytorch`
if t < max_decoding_steps - 1:
# embed the newly generated token
next_token_emb = self.paligemma_with_expert.embed_language_tokens(next_token)
next_token_emb = next_token_emb * math.sqrt(next_token_emb.shape[-1])
if prefix_embs.dtype == torch.bfloat16:
next_token_emb = next_token_emb.to(dtype=torch.bfloat16)
@@ -770,7 +769,6 @@ class PI0FastPytorch(nn.Module): # see openpi `PI0Pytorch`
# Embed the single previous token
# We use embed_language_tokens directly to avoid overhead of full prefix embedding
next_token_emb = self.paligemma_with_expert.embed_language_tokens(next_token)
next_token_emb = next_token_emb * math.sqrt(next_token_emb.shape[-1])
if prefix_embs.dtype == torch.bfloat16:
next_token_emb = next_token_emb.to(dtype=torch.bfloat16)
+6
View File
@@ -197,6 +197,9 @@ class PiGemmaModel(GemmaModel): # type: ignore[misc]
def __init__(self, config: GemmaConfig, **kwargs):
super().__init__(config, **kwargs)
# Free parent-allocated layers/norm before replacing to avoid ~2x peak memory.
del self.layers
del self.norm
# if not getattr(config, "use_adarms", False):
# return
cond_dim = getattr(config, "adarms_cond_dim", None)
@@ -328,6 +331,7 @@ class PiGemmaForCausalLM(GemmaForCausalLM): # type: ignore[misc]
def __init__(self, config: GemmaConfig, **kwargs):
super().__init__(config, **kwargs)
del self.model
self.model = PiGemmaModel(config)
@@ -336,6 +340,7 @@ class PaliGemmaModelWithPiGemma(PaliGemmaModel):
def __init__(self, config):
super().__init__(config)
del self.language_model
self.language_model = PiGemmaModel(config.text_config)
@@ -344,6 +349,7 @@ class PaliGemmaForConditionalGenerationWithPiGemma(PaliGemmaForConditionalGenera
def __init__(self, config):
super().__init__(config)
del self.model
self.model = PaliGemmaModelWithPiGemma(config)
# Make modules available through conditional class for BC
+2
View File
@@ -19,6 +19,7 @@ from .action_queue import ActionQueue
from .configuration_rtc import RTCConfig
from .latency_tracker import LatencyTracker
from .modeling_rtc import RTCProcessor
from .relative import reanchor_relative_rtc_prefix
__all__ = [
"ActionInterpolator",
@@ -26,4 +27,5 @@ __all__ = [
"LatencyTracker",
"RTCConfig",
"RTCProcessor",
"reanchor_relative_rtc_prefix",
]
+58
View File
@@ -0,0 +1,58 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Relative-action helpers for Real-Time Chunking (RTC)."""
from __future__ import annotations
import torch
from lerobot.processor import (
NormalizerProcessorStep,
RelativeActionsProcessorStep,
TransitionKey,
create_transition,
to_relative_actions,
)
def reanchor_relative_rtc_prefix(
prev_actions_absolute: torch.Tensor,
current_state: torch.Tensor,
relative_step: RelativeActionsProcessorStep,
normalizer_step: NormalizerProcessorStep | None,
policy_device: torch.device | str,
) -> torch.Tensor:
"""Convert absolute leftover actions into model-space for relative-action RTC policies.
When using relative actions, the RTC prefix (previous chunk's unexecuted tail)
is stored in absolute coordinates. Before feeding it back to the policy, this
helper re-expresses those actions relative to the robot's current joint state
and optionally normalizes them so the policy receives correctly scaled inputs.
"""
state = current_state.detach().cpu()
if state.dim() == 1:
state = state.unsqueeze(0)
action_cpu = prev_actions_absolute.detach().cpu()
mask = relative_step._build_mask(action_cpu.shape[-1])
relative_actions = to_relative_actions(action_cpu, state, mask)
transition = create_transition(action=relative_actions)
if normalizer_step is not None:
transition = normalizer_step(transition)
return transition[TransitionKey.ACTION].to(policy_device)
-1
View File
@@ -1 +0,0 @@
../../../../docs/source/policy_sarm_README.md
@@ -394,13 +394,21 @@ class SmolVLAPolicy(PreTrainedPolicy):
loss_dict["losses_after_rm_padding"] = losses.clone().mean().item()
if reduction == "none":
# Return per-sample losses (B,) by averaging over time and action dims
per_sample_loss = losses.mean(dim=(1, 2))
# Return per-sample losses (B,) by averaging over valid (time, action) entries
if actions_is_pad is None:
per_sample_loss = losses.mean(dim=(1, 2))
else:
num_valid = ((~actions_is_pad).sum(dim=1) * losses.shape[-1]).clamp_min(1)
per_sample_loss = losses.sum(dim=(1, 2)) / num_valid
loss_dict["loss"] = per_sample_loss.mean().item()
return per_sample_loss, loss_dict
else:
# Default: return scalar mean loss
loss = losses.mean()
# Default: return scalar mean loss over valid (time, action) entries
if actions_is_pad is None:
loss = losses.mean()
else:
num_valid = ((~actions_is_pad).sum() * losses.shape[-1]).clamp_min(1)
loss = losses.sum() / num_valid
loss_dict["loss"] = loss.item()
return loss, loss_dict
@@ -22,7 +22,7 @@ from transformers.utils import (
add_start_docstrings,
add_start_docstrings_to_model_forward,
is_flash_attn_2_available,
is_flash_attn_greater_or_equal_2_10,
is_flash_attn_greater_or_equal,
is_torchdynamo_compiling,
logging,
replace_return_docstrings,
@@ -890,7 +890,7 @@ class Qwen2_5_VLFlashAttention2(Qwen2_5_VLAttention):
# TODO: Should be removed once Flash Attention for RoCm is bumped to 2.1.
# flash_attn<2.1 generates top-left aligned causal mask, while what is needed here is bottom-right alignment, that was made default for flash_attn>=2.1. This attribute is used to handle this difference. Reference: https://github.com/Dao-AILab/flash-attention/releases/tag/v2.1.0.
# Beware that with flash_attn<2.1, using q_seqlen != k_seqlen (except for the case q_seqlen == 1) produces a wrong mask (top-left).
self._flash_attn_uses_top_left_mask = not is_flash_attn_greater_or_equal_2_10()
self._flash_attn_uses_top_left_mask = not is_flash_attn_greater_or_equal("2.1.0")
def forward(
self,
@@ -45,7 +45,7 @@ from transformers.utils import (
add_start_docstrings,
add_start_docstrings_to_model_forward,
is_flash_attn_2_available,
is_flash_attn_greater_or_equal_2_10,
is_flash_attn_greater_or_equal,
logging,
replace_return_docstrings,
)
@@ -909,7 +909,7 @@ class Florence2FlashAttention2(Florence2Attention):
# TODO: Should be removed once Flash Attention for RoCm is bumped to 2.1.
# flash_attn<2.1 generates top-left aligned causal mask, while what is needed here is bottom-right alignment, that was made default for flash_attn>=2.1. This attribute is used to handle this difference. Reference: https://github.com/Dao-AILab/flash-attention/releases/tag/v2.1.0.
# Beware that with flash_attn<2.1, using q_seqlen != k_seqlen (except for the case q_seqlen == 1) produces a wrong mask (top-left).
self._flash_attn_uses_top_left_mask = not is_flash_attn_greater_or_equal_2_10()
self._flash_attn_uses_top_left_mask = not is_flash_attn_greater_or_equal("2.1.0")
def _reshape(self, tensor: torch.Tensor, seq_len: int, bsz: int):
return tensor.view(bsz, seq_len, self.num_heads, self.head_dim)
+1 -1
View File
@@ -557,7 +557,7 @@ class RewardClassifierProcessorStep(ProcessorStep):
def __post_init__(self):
"""Initializes the reward classifier model after the dataclass is created."""
if self.pretrained_path is not None:
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
from lerobot.rewards.classifier.modeling_classifier import Classifier
self.reward_classifier = Classifier.from_pretrained(self.pretrained_path)
self.reward_classifier.to(self.device)
@@ -142,6 +142,10 @@ class RelativeActionsProcessorStep(ProcessorStep):
new_transition[TransitionKey.ACTION] = to_relative_actions(action, state, mask)
return new_transition
def get_cached_state(self) -> torch.Tensor | None:
"""Return the cached ``observation.state`` used as the reference point for relative/absolute action conversions."""
return self._last_state
def get_config(self) -> dict[str, Any]:
return {
"enabled": self.enabled,
@@ -182,7 +186,8 @@ class AbsoluteActionsProcessorStep(ProcessorStep):
"but relative_step is None. Ensure relative_step is set when constructing the postprocessor."
)
if self.relative_step._last_state is None:
cached_state = self.relative_step.get_cached_state()
if cached_state is None:
raise RuntimeError(
"AbsoluteActionsProcessorStep requires state from RelativeActionsProcessorStep "
"but no state has been cached. Ensure the preprocessor runs before the postprocessor."
@@ -194,9 +199,7 @@ class AbsoluteActionsProcessorStep(ProcessorStep):
return new_transition
mask = self.relative_step._build_mask(action.shape[-1])
new_transition[TransitionKey.ACTION] = to_absolute_actions(
action, self.relative_step._last_state, mask
)
new_transition[TransitionKey.ACTION] = to_absolute_actions(action, cached_state, mask)
return new_transition
def get_config(self) -> dict[str, Any]:
+36
View File
@@ -0,0 +1,36 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .classifier.configuration_classifier import RewardClassifierConfig as RewardClassifierConfig
from .factory import (
get_reward_model_class as get_reward_model_class,
make_reward_model as make_reward_model,
make_reward_model_config as make_reward_model_config,
make_reward_pre_post_processors as make_reward_pre_post_processors,
)
from .pretrained import PreTrainedRewardModel as PreTrainedRewardModel
from .sarm.configuration_sarm import SARMConfig as SARMConfig
__all__ = [
# Configuration classes
"RewardClassifierConfig",
"SARMConfig",
# Base class
"PreTrainedRewardModel",
# Factory functions
"get_reward_model_class",
"make_reward_model",
"make_reward_model_config",
"make_reward_pre_post_processors",
]
@@ -1,5 +1,3 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,14 +13,15 @@
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.configs import NormalizationMode, PreTrainedConfig
from lerobot.configs import NormalizationMode
from lerobot.configs.rewards import RewardModelConfig
from lerobot.optim import AdamWConfig, LRSchedulerConfig, OptimizerConfig
from lerobot.utils.constants import OBS_IMAGE
@PreTrainedConfig.register_subclass(name="reward_classifier")
@RewardModelConfig.register_subclass(name="reward_classifier")
@dataclass
class RewardClassifierConfig(PreTrainedConfig):
class RewardClassifierConfig(RewardModelConfig):
"""Configuration for the Reward Classifier model."""
name: str = "reward_classifier"
@@ -1,5 +1,3 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -19,11 +17,10 @@ import logging
import torch
from torch import Tensor, nn
from lerobot.rewards.classifier.configuration_classifier import RewardClassifierConfig
from lerobot.rewards.pretrained import PreTrainedRewardModel
from lerobot.utils.constants import OBS_IMAGE, REWARD
from ...pretrained import PreTrainedPolicy
from .configuration_classifier import RewardClassifierConfig
class ClassifierOutput:
"""Wrapper for classifier outputs with additional metadata."""
@@ -99,7 +96,7 @@ class SpatialLearnedEmbeddings(nn.Module):
return output
class Classifier(PreTrainedPolicy):
class Classifier(PreTrainedRewardModel):
"""Image classifier built on top of a pre-trained encoder."""
name = "reward_classifier"
@@ -235,6 +232,16 @@ class Classifier(PreTrainedPolicy):
return ClassifierOutput(logits=logits, probabilities=probabilities, hidden_states=encoder_outputs)
def compute_reward(self, batch: dict[str, Tensor]) -> Tensor:
"""Returns 1.0 for success, 0.0 for failure based on image observations."""
images = [batch[key] for key in self.config.input_features if key.startswith(OBS_IMAGE)]
output = self.predict(images)
if self.config.num_classes == 2:
return (output.probabilities > 0.5).float()
else:
return torch.argmax(output.probabilities, dim=1).float()
def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict[str, Tensor]]:
"""Standard forward pass for training compatible with train.py."""
# Extract images and labels
@@ -269,10 +276,6 @@ class Classifier(PreTrainedPolicy):
def predict_reward(self, batch, threshold=0.5):
"""Eval method. Returns predicted reward with the decision threshold as argument."""
# Check for both OBS_IMAGE and OBS_IMAGES prefixes
batch = self.normalize_inputs(batch)
batch = self.normalize_targets(batch)
# Extract images from batch dict
images = [batch[key] for key in self.config.input_features if key.startswith(OBS_IMAGE)]
@@ -282,28 +285,3 @@ class Classifier(PreTrainedPolicy):
return (probs > threshold).float()
else:
return torch.argmax(self.predict(images).probabilities, dim=1)
def get_optim_params(self):
"""Return optimizer parameters for the policy."""
return self.parameters()
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
"""
This method is required by PreTrainedPolicy but not used for reward classifiers.
The reward classifier is not an actor and does not select actions.
"""
raise NotImplementedError("Reward classifiers do not select actions")
def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor:
"""
This method is required by PreTrainedPolicy but not used for reward classifiers.
The reward classifier is not an actor and does not produce action chunks.
"""
raise NotImplementedError("Reward classifiers do not predict action chunks")
def reset(self):
"""
This method is required by PreTrainedPolicy but not used for reward classifiers.
The reward classifier is not an actor and does not select actions.
"""
pass
@@ -1,5 +1,3 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,8 +25,7 @@ from lerobot.processor import (
policy_action_to_transition,
transition_to_policy_action,
)
from .configuration_classifier import RewardClassifierConfig
from lerobot.rewards.classifier.configuration_classifier import RewardClassifierConfig
def make_classifier_processor(
@@ -52,8 +49,6 @@ def make_classifier_processor(
Args:
config: The configuration object for the RewardClassifier.
dataset_stats: A dictionary of statistics for normalization.
preprocessor_kwargs: Additional arguments for the pre-processor pipeline.
postprocessor_kwargs: Additional arguments for the post-processor pipeline.
Returns:
A tuple containing the configured pre-processor and post-processor pipelines.
+238
View File
@@ -0,0 +1,238 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import logging
from typing import Any
import torch
from lerobot.configs.rewards import RewardModelConfig
from lerobot.processor import PolicyAction, PolicyProcessorPipeline
from lerobot.rewards.classifier.configuration_classifier import RewardClassifierConfig
from lerobot.rewards.pretrained import PreTrainedRewardModel
from lerobot.rewards.sarm.configuration_sarm import SARMConfig
def get_reward_model_class(name: str) -> type[PreTrainedRewardModel]:
"""
Retrieves a reward model class by its registered name.
This function uses dynamic imports to avoid loading all reward model classes into
memory at once, improving startup time and reducing dependencies.
Args:
name: The name of the reward model. Supported names are "reward_classifier",
"sarm".
Returns:
The reward model class corresponding to the given name.
Raises:
ValueError: If the reward model name is not recognized.
"""
if name == "reward_classifier":
from lerobot.rewards.classifier.modeling_classifier import Classifier
return Classifier
elif name == "sarm":
from lerobot.rewards.sarm.modeling_sarm import SARMRewardModel
return SARMRewardModel
else:
try:
return _get_reward_model_cls_from_name(name=name)
except Exception as e:
raise ValueError(f"Reward model type '{name}' is not available.") from e
def make_reward_model_config(reward_type: str, **kwargs) -> RewardModelConfig:
"""
Instantiates a reward model configuration object based on the reward type.
This factory function simplifies the creation of reward model configuration objects
by mapping a string identifier to the corresponding config class.
Args:
reward_type: The type of the reward model. Supported types include
"reward_classifier", "sarm".
**kwargs: Keyword arguments to be passed to the configuration class constructor.
Returns:
An instance of a `RewardModelConfig` subclass.
Raises:
ValueError: If the `reward_type` is not recognized.
"""
if reward_type == "reward_classifier":
return RewardClassifierConfig(**kwargs)
elif reward_type == "sarm":
return SARMConfig(**kwargs)
else:
try:
config_cls = RewardModelConfig.get_choice_class(reward_type)
return config_cls(**kwargs)
except Exception as e:
raise ValueError(f"Reward model type '{reward_type}' is not available.") from e
def make_reward_model(cfg: RewardModelConfig, **kwargs) -> PreTrainedRewardModel:
"""
Instantiate a reward model from its configuration.
Args:
cfg: The configuration for the reward model to be created. If
`cfg.pretrained_path` is set, the model will be loaded with weights
from that path.
**kwargs: Additional keyword arguments forwarded to the model constructor
(e.g., ``dataset_stats``, ``dataset_meta``).
Returns:
An instantiated and device-placed reward model.
"""
reward_cls = get_reward_model_class(cfg.type)
kwargs["config"] = cfg
if cfg.pretrained_path:
kwargs["pretrained_name_or_path"] = cfg.pretrained_path
reward_model = reward_cls.from_pretrained(**kwargs)
else:
reward_model = reward_cls(**kwargs)
reward_model.to(cfg.device)
assert isinstance(reward_model, torch.nn.Module)
return reward_model
def make_reward_pre_post_processors(
reward_cfg: RewardModelConfig,
**kwargs,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""
Create pre- and post-processor pipelines for a given reward model.
Each reward model type has a dedicated factory function for its processors.
Args:
reward_cfg: The configuration of the reward model for which to create processors.
**kwargs: Additional keyword arguments passed to the processor factory
(e.g., ``dataset_stats``, ``dataset_meta``).
Returns:
A tuple containing the input (pre-processor) and output (post-processor) pipelines.
Raises:
ValueError: If a processor factory is not implemented for the given reward
model configuration type.
"""
# Create a new processor based on reward model type
if isinstance(reward_cfg, RewardClassifierConfig):
from lerobot.rewards.classifier.processor_classifier import make_classifier_processor
return make_classifier_processor(
config=reward_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(reward_cfg, SARMConfig):
from lerobot.rewards.sarm.processor_sarm import make_sarm_pre_post_processors
return make_sarm_pre_post_processors(
config=reward_cfg,
dataset_stats=kwargs.get("dataset_stats"),
dataset_meta=kwargs.get("dataset_meta"),
)
else:
try:
processors = _make_processors_from_reward_model_config(
config=reward_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
except Exception as e:
raise ValueError(
f"Processor for reward model type '{reward_cfg.type}' is not implemented."
) from e
return processors
def _get_reward_model_cls_from_name(name: str) -> type[PreTrainedRewardModel]:
"""Get reward model class from its registered name using dynamic imports.
This is used as a helper function to import reward models from 3rd party lerobot
plugins.
Args:
name: The name of the reward model.
Returns:
The reward model class corresponding to the given name.
"""
if name not in RewardModelConfig.get_known_choices():
raise ValueError(
f"Unknown reward model name '{name}'. "
f"Available reward models: {RewardModelConfig.get_known_choices()}"
)
config_cls = RewardModelConfig.get_choice_class(name)
config_cls_name = config_cls.__name__
model_name = config_cls_name.removesuffix("Config")
if model_name == config_cls_name:
raise ValueError(
f"The config class name '{config_cls_name}' does not follow the expected naming convention. "
f"Make sure it ends with 'Config'!"
)
cls_name = model_name + "RewardModel"
module_path = config_cls.__module__.replace("configuration_", "modeling_")
module = importlib.import_module(module_path)
reward_cls = getattr(module, cls_name)
return reward_cls
def _make_processors_from_reward_model_config(
config: RewardModelConfig,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[Any, Any]:
"""Create pre- and post-processors from a reward model configuration using dynamic imports.
This is used as a helper function to import processor factories from 3rd party
lerobot reward model plugins.
Args:
config: The reward model configuration object.
dataset_stats: Dataset statistics for normalization.
Returns:
A tuple containing the input (pre-processor) and output (post-processor) pipelines.
"""
reward_type = config.type
function_name = f"make_{reward_type}_pre_post_processors"
module_path = config.__class__.__module__.replace("configuration_", "processor_")
logging.debug(
f"Instantiating reward pre/post processors using function '{function_name}' "
f"from module '{module_path}'"
)
module = importlib.import_module(module_path)
function = getattr(module, function_name)
return function(config, dataset_stats=dataset_stats)
+244
View File
@@ -0,0 +1,244 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import builtins
import logging
import os
from importlib.resources import files
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any, TypeVar
import packaging
import safetensors
from huggingface_hub import HfApi, ModelCard, ModelCardData, hf_hub_download
from huggingface_hub.constants import SAFETENSORS_SINGLE_FILE
from huggingface_hub.errors import HfHubHTTPError
from safetensors.torch import load_model as load_model_as_safetensor, save_model as save_model_as_safetensor
from torch import Tensor, nn
from lerobot.configs.rewards import RewardModelConfig
from lerobot.utils.hub import HubMixin
if TYPE_CHECKING:
from lerobot.configs.train import TrainPipelineConfig
T = TypeVar("T", bound="PreTrainedRewardModel")
class PreTrainedRewardModel(nn.Module, HubMixin, abc.ABC):
"""Base class for reward models."""
config_class: None
name: None
def __init__(self, config: RewardModelConfig, *inputs, **kwargs):
super().__init__()
if not isinstance(config, RewardModelConfig):
raise ValueError(
f"Parameter config in `{self.__class__.__name__}(config)` should be an instance of class "
"`RewardModelConfig`. To create a model from a pretrained model use "
f"`model = {self.__class__.__name__}.from_pretrained(PRETRAINED_MODEL_NAME)`"
)
self.config = config
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if not getattr(cls, "config_class", None):
raise TypeError(f"Class {cls.__name__} must define 'config_class'")
if not getattr(cls, "name", None):
raise TypeError(f"Class {cls.__name__} must define 'name'")
def _save_pretrained(self, save_directory: Path) -> None:
self.config._save_pretrained(save_directory)
model_to_save = self.module if hasattr(self, "module") else self
save_model_as_safetensor(model_to_save, str(save_directory / SAFETENSORS_SINGLE_FILE))
@classmethod
def from_pretrained(
cls: builtins.type[T],
pretrained_name_or_path: str | Path,
*,
config: RewardModelConfig | None = None,
force_download: bool = False,
resume_download: bool | None = None,
proxies: dict | None = None,
token: str | bool | None = None,
cache_dir: str | Path | None = None,
local_files_only: bool = False,
revision: str | None = None,
strict: bool = False,
**kwargs,
) -> T:
"""
The reward model is set in evaluation mode by default using `reward.eval()` (dropout modules are
deactivated). To train it, you should first set it back in training mode with `reward.train()`.
"""
if config is None:
config = RewardModelConfig.from_pretrained(
pretrained_name_or_path=pretrained_name_or_path,
force_download=force_download,
resume_download=resume_download,
proxies=proxies,
token=token,
cache_dir=cache_dir,
local_files_only=local_files_only,
revision=revision,
**kwargs,
)
model_id = str(pretrained_name_or_path)
instance = cls(config, **kwargs)
if os.path.isdir(model_id):
print("Loading weights from local directory")
model_file = os.path.join(model_id, SAFETENSORS_SINGLE_FILE)
reward = cls._load_as_safetensor(instance, model_file, config.device or "cpu", strict)
else:
try:
model_file = hf_hub_download(
repo_id=model_id,
filename=SAFETENSORS_SINGLE_FILE,
revision=revision,
cache_dir=cache_dir,
force_download=force_download,
proxies=proxies,
resume_download=resume_download,
token=token,
local_files_only=local_files_only,
)
reward = cls._load_as_safetensor(instance, model_file, config.device or "cpu", strict)
except HfHubHTTPError as e:
raise FileNotFoundError(
f"{SAFETENSORS_SINGLE_FILE} not found on the HuggingFace Hub in {model_id}"
) from e
reward.to(config.device)
reward.eval()
return reward
@classmethod
def _load_as_safetensor(cls, model: T, model_file: str, map_location: str, strict: bool) -> T:
# Create base kwargs
kwargs = {"strict": strict}
# Add device parameter for newer versions that support it
if packaging.version.parse(safetensors.__version__) >= packaging.version.parse("0.4.3"):
kwargs["device"] = map_location
# Load the model with appropriate kwargs
missing_keys, unexpected_keys = load_model_as_safetensor(model, model_file, **kwargs)
if missing_keys:
logging.warning(f"Missing key(s) when loading model: {missing_keys}")
if unexpected_keys:
logging.warning(f"Unexpected key(s) when loading model: {unexpected_keys}")
# For older versions, manually move to device if needed
if "device" not in kwargs and map_location != "cpu":
logging.warning(
"Loading model weights on other devices than 'cpu' is not supported natively in your version of safetensors."
" This means that the model is loaded on 'cpu' first and then copied to the device."
" This leads to a slower loading time."
" Please update safetensors to version 0.4.3 or above for improved performance."
)
model.to(map_location)
return model
def get_optim_params(self):
"""
Returns the reward-model-specific parameters dict to be passed on to the optimizer.
"""
return self.parameters()
def reset(self) -> None:
"""Reset any internal state."""
pass
@abc.abstractmethod
def compute_reward(self, batch: dict[str, Tensor]) -> Tensor:
"""Compute a scalar reward signal for a batch of observations.
Args:
batch: Dictionary containing at minimum observation tensors.
May also contain "action", "next_observation.*", etc.
Returns:
Tensor of shape ``(batch_size,)`` with reward values.
"""
...
def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict[str, Any]]:
"""Training forward pass — override for trainable reward models."""
raise NotImplementedError(
f"{self.__class__.__name__} is not trainable. Only use compute_reward() for inference."
)
@property
def is_trainable(self) -> bool:
"""Whether this reward model can be trained via ``lerobot-train``.
Trainable reward models override :meth:`forward`; zero-shot models
inherit the base implementation that raises ``NotImplementedError``.
"""
return type(self).forward is not PreTrainedRewardModel.forward
def push_model_to_hub(self, cfg: "TrainPipelineConfig"):
api = HfApi()
repo_id = api.create_repo(
repo_id=self.config.repo_id, private=self.config.private, exist_ok=True
).repo_id
# Push the files to the repo in a single commit
with TemporaryDirectory(ignore_cleanup_errors=True) as tmp:
saved_path = Path(tmp) / repo_id
self.save_pretrained(saved_path) # Calls _save_pretrained and stores model tensors
card = self.generate_model_card(
cfg.dataset.repo_id, self.config.type, self.config.license, self.config.tags
)
card.save(str(saved_path / "README.md"))
cfg.save_pretrained(saved_path) # Calls _save_pretrained and stores train config
commit_info = api.upload_folder(
repo_id=repo_id,
repo_type="model",
folder_path=saved_path,
commit_message="Upload reward model weights, train config and readme",
allow_patterns=["*.safetensors", "*.json", "*.yaml", "*.md"],
ignore_patterns=["*.tmp", "*.log"],
)
logging.info(f"Model pushed to {commit_info.repo_url.url}")
def generate_model_card(
self, dataset_repo_id: str, model_type: str, license: str | None, tags: list[str] | None
) -> ModelCard:
card_data = ModelCardData(
license=license or "apache-2.0",
library_name="lerobot",
pipeline_tag="robotics",
tags=list(set(tags or []).union({"robotics", "lerobot", "reward-model", model_type})),
model_name=model_type,
datasets=dataset_repo_id,
)
template_card = (
files("lerobot.templates")
.joinpath("lerobot_rewardmodel_modelcard_template.md")
.read_text(encoding="utf-8")
)
card = ModelCard.from_template(card_data, template_str=template_card)
card.validate()
return card
@@ -1,4 +1,4 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,5 +14,6 @@
from .configuration_sarm import SARMConfig
from .modeling_sarm import SARMRewardModel
from .processor_sarm import make_sarm_pre_post_processors
__all__ = ["SARMConfig", "SARMRewardModel"]
__all__ = ["SARMConfig", "SARMRewardModel", "make_sarm_pre_post_processors"]
@@ -25,18 +25,18 @@ need ~num_frames/30 queries instead of one per frame (~30x speedup).
Usage:
# Full RA-BC computation with visualizations
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
python src/lerobot/rewards/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path <USER>/sarm_single_uni4
# Faster computation with stride (compute every 5 frames, interpolate the rest)
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
python src/lerobot/rewards/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path <USER>/sarm_single_uni4 \\
--stride 5
# Visualize predictions only (no RA-BC computation)
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
python src/lerobot/rewards/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path <USER>/sarm_single_uni4 \\
--visualize-only \\
@@ -58,10 +58,9 @@ import torch
from tqdm import tqdm
from lerobot.datasets import LeRobotDataset
from .modeling_sarm import SARMRewardModel
from .processor_sarm import make_sarm_pre_post_processors
from .sarm_utils import normalize_stage_tau
from lerobot.rewards.sarm.modeling_sarm import SARMRewardModel
from lerobot.rewards.sarm.processor_sarm import make_sarm_pre_post_processors
from lerobot.rewards.sarm.sarm_utils import normalize_stage_tau
def get_reward_model_path_from_parquet(parquet_path: Path) -> str | None:
@@ -713,12 +712,12 @@ def main():
epilog="""
Examples:
# Full RA-BC computation with visualizations
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
python src/lerobot/rewards/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path <USER>/sarm_single_uni4
# Visualize predictions only (no RA-BC computation)
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
python src/lerobot/rewards/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path <USER>/sarm_single_uni4 \\
--visualize-only \\
@@ -1,5 +1,3 @@
#!/usr/bin/env python
# Copyright 2025 Qianzhong Chen, Justin Yu, Mac Schwager, Pieter Abbeel, Yide Shentu, Philipp Wu
# and The HuggingFace Inc. team. All rights reserved.
#
@@ -22,14 +20,15 @@ Paper: https://arxiv.org/abs/2509.25358
from dataclasses import dataclass, field
from lerobot.configs import FeatureType, NormalizationMode, PolicyFeature, PreTrainedConfig
from lerobot.configs import FeatureType, NormalizationMode, PolicyFeature
from lerobot.configs.rewards import RewardModelConfig
from lerobot.optim import AdamWConfig, CosineDecayWithWarmupSchedulerConfig
from lerobot.utils.constants import OBS_IMAGES, OBS_STATE
@PreTrainedConfig.register_subclass("sarm")
@RewardModelConfig.register_subclass("sarm")
@dataclass
class SARMConfig(PreTrainedConfig):
class SARMConfig(RewardModelConfig):
"""Configuration class for SARM (Stage-Aware Reward Modeling).
Supports three annotation modes:
@@ -110,7 +109,6 @@ class SARMConfig(PreTrainedConfig):
def __post_init__(self):
super().__post_init__()
if self.annotation_mode not in ["single_stage", "dense_only", "dual"]:
raise ValueError(
f"annotation_mode must be 'single_stage', 'dense_only', or 'dual', got {self.annotation_mode}"
@@ -1,5 +1,3 @@
#!/usr/bin/env python
# Copyright 2025 Qianzhong Chen, Justin Yu, Mac Schwager, Pieter Abbeel, Yide Shentu, Philipp Wu
# and The HuggingFace Inc. team. All rights reserved.
#
@@ -34,14 +32,13 @@ import torch.nn as nn
import torch.nn.functional as F # noqa: N812
from torch import Tensor
from lerobot.utils.constants import OBS_STR
from ..pretrained import PreTrainedPolicy
from .configuration_sarm import SARMConfig
from .sarm_utils import (
from lerobot.rewards.pretrained import PreTrainedRewardModel
from lerobot.rewards.sarm.configuration_sarm import SARMConfig
from lerobot.rewards.sarm.sarm_utils import (
normalize_stage_tau,
pad_state_to_max_dim,
)
from lerobot.utils.constants import OBS_STR
class StageTransformer(nn.Module):
@@ -353,7 +350,7 @@ def gen_stage_emb(num_classes: int, targets: torch.Tensor) -> torch.Tensor:
return stage_onehot
class SARMRewardModel(PreTrainedPolicy):
class SARMRewardModel(PreTrainedRewardModel):
"""
SARM Reward Model for stage-aware task completion rewards.
@@ -471,6 +468,23 @@ class SARMRewardModel(PreTrainedPolicy):
self.subtask_model.to(device)
return self
def compute_reward(self, batch: dict[str, Tensor]) -> Tensor:
"""Compute dense progress reward in [0, 1] from batch.
Expects batch to contain:
- "observation_features" or video embeddings: (B, T, 512)
- "language_embedding" or text embeddings: (B, 512)
- optionally "observation.state": (B, T, state_dim)
"""
text_emb = batch.get("language_embedding", batch.get("text_features"))
video_emb = batch.get("observation_features", batch.get("video_features"))
state = batch.get("observation.state", batch.get("state_features"))
rewards = self.calculate_rewards(text_emb, video_emb, state)
if isinstance(rewards, np.ndarray):
rewards = torch.from_numpy(rewards).float()
return rewards
@torch.no_grad()
def calculate_rewards(
self,
@@ -631,17 +645,9 @@ class SARMRewardModel(PreTrainedPolicy):
return self.parameters()
def reset(self):
"""Required by PreTrainedPolicy but not used for reward models."""
"""SARM has no episode-level state to reset."""
pass
def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor:
"""Required by PreTrainedPolicy but not used for reward models."""
raise NotImplementedError("SARM model does not predict action chunks")
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
"""Required by PreTrainedPolicy but not used for SARM."""
raise NotImplementedError("SARM model does not select actions")
def _train_step(
self,
img_emb: torch.Tensor, # (B, N, T, D)
@@ -1,5 +1,3 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -60,16 +58,15 @@ from lerobot.processor import (
policy_action_to_transition,
transition_to_policy_action,
)
from lerobot.types import EnvTransition, PolicyAction, TransitionKey
from lerobot.utils.constants import POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME
from .configuration_sarm import SARMConfig
from .sarm_utils import (
from lerobot.rewards.sarm.configuration_sarm import SARMConfig
from lerobot.rewards.sarm.sarm_utils import (
apply_rewind_augmentation,
compute_absolute_indices,
find_stage_and_tau,
pad_state_to_max_dim,
)
from lerobot.types import EnvTransition, PolicyAction, TransitionKey
from lerobot.utils.constants import POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME
class SARMEncodingProcessorStep(ProcessorStep):
@@ -455,7 +452,13 @@ class SARMEncodingProcessorStep(ProcessorStep):
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Get image embeddings
embeddings = self.clip_model.get_image_features(**inputs).detach().cpu()
# transformers 5.x returns BaseModelOutputWithPooling instead of a plain tensor
output = self.clip_model.get_image_features(**inputs)
if not isinstance(output, torch.Tensor):
output = output.pooler_output
if output is None:
raise ValueError("pooler_output should not be None for CLIP models.")
embeddings = output.detach().cpu()
# Handle single frame case
if embeddings.dim() == 1:
@@ -482,7 +485,13 @@ class SARMEncodingProcessorStep(ProcessorStep):
inputs = self.clip_processor.tokenizer([text], return_tensors="pt", padding=True, truncation=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
text_embedding = self.clip_model.get_text_features(**inputs).detach().cpu()
# transformers 5.x returns BaseModelOutputWithPooling instead of a plain tensor
output = self.clip_model.get_text_features(**inputs)
if not isinstance(output, torch.Tensor):
output = output.pooler_output
if output is None:
raise ValueError("pooler_output should not be None for CLIP models.")
text_embedding = output.detach().cpu()
text_embedding = text_embedding.expand(batch_size, -1)
return text_embedding
@@ -1,5 +1,3 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,14 +12,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
RA-BC (Reward-Aligned Behavior Cloning) sample weighting implementation.
This module implements the SampleWeighter protocol for RA-BC training,
which weights training samples based on their task progress as measured
by the SARM reward model.
The weights are computed based on progress deltas:
delta = progress[t + chunk_size] - progress[t]
High-quality samples (positive progress) get higher weights, while
samples with negative progress (going backwards) get zero weight.
See: https://arxiv.org/abs/2509.25358 for the SARM paper.
"""
import logging
from pathlib import Path
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
import torch
from huggingface_hub import hf_hub_download
from lerobot.utils.import_utils import _pandas_available
from lerobot.utils.sample_weighting import SampleWeighter
if TYPE_CHECKING or _pandas_available:
import pandas as pd
else:
pd = None # type: ignore[assignment]
def resolve_hf_path(path: str | Path) -> Path:
"""Resolve a path that may be a HuggingFace URL (hf://datasets/...) to a local path."""
@@ -34,23 +56,27 @@ def resolve_hf_path(path: str | Path) -> Path:
return Path(path)
class RABCWeights:
class RABCWeights(SampleWeighter):
"""
Load precomputed SARM progress values and compute RA-BC weights during training.
This class implements the SampleWeighter ABC for use with the generic
sample weighting infrastructure in lerobot.
Progress values are loaded from a parquet file (generated by compute_rabc_weights.py).
During training, computes:
- progress_delta = progress[t + chunk_size] - progress[t]
- rabc_weight based on the delta (paper Eq. 8-9)
Args:
progress_path: Path to parquet file with precomputed progress values
chunk_size: Number of frames ahead for computing progress delta
head_mode: Which SARM head to use ("sparse" or "dense")
kappa: Hard threshold for high-quality samples (default: 0.01)
epsilon: Small constant for numerical stability (default: 1e-6)
fallback_weight: Weight to use for frames without valid delta (default: 1.0)
device: Device to return tensors on
progress_path: Path to parquet file with precomputed progress values.
Supports HuggingFace URLs (hf://datasets/...).
chunk_size: Number of frames ahead for computing progress delta.
head_mode: Which SARM head to use ("sparse" or "dense").
kappa: Hard threshold for high-quality samples (default: 0.01).
epsilon: Small constant for numerical stability (default: 1e-6).
fallback_weight: Weight to use for frames without valid delta (default: 1.0).
device: Device to return tensors on.
"""
def __init__(
@@ -61,7 +87,7 @@ class RABCWeights:
kappa: float = 0.01,
epsilon: float = 1e-6,
fallback_weight: float = 1.0,
device: torch.device = None,
device: torch.device | None = None,
):
self.progress_path = resolve_hf_path(progress_path)
self.chunk_size = chunk_size
@@ -87,8 +113,8 @@ class RABCWeights:
logging.info(f"Using progress column: {self.progress_column}")
self.progress_lookup = {}
self.episode_lookup = {}
self.progress_lookup: dict[int, float] = {}
self.episode_lookup: dict[int, int] = {}
for _, row in self.df.iterrows():
global_idx = int(row["index"])
@@ -100,7 +126,7 @@ class RABCWeights:
self.episode_lookup[global_idx] = episode_idx
# Build episode boundaries for delta computation
self.episode_boundaries = {}
self.episode_boundaries: dict[int, dict[str, int]] = {}
for episode_idx in self.df["episode_index"].unique():
ep_df = self.df[self.df["episode_index"] == episode_idx]
self.episode_boundaries[int(episode_idx)] = {
@@ -114,7 +140,7 @@ class RABCWeights:
# Compute global statistics for weight computation
self._compute_global_stats()
def _compute_global_stats(self):
def _compute_global_stats(self) -> None:
"""Compute global mean and std of progress deltas for weight calculation."""
all_deltas = []
@@ -138,8 +164,8 @@ class RABCWeights:
all_deltas.append(delta)
if all_deltas:
self.delta_mean = max(np.mean(all_deltas), 0.0)
self.delta_std = max(np.std(all_deltas), self.epsilon)
self.delta_mean = max(float(np.mean(all_deltas)), 0.0)
self.delta_std = max(float(np.std(all_deltas)), self.epsilon)
logging.info(f"Progress delta stats: mean={self.delta_mean:.4f}, std={self.delta_std:.4f}")
else:
self.delta_mean = 0.0
@@ -157,18 +183,19 @@ class RABCWeights:
4. Compute weight using paper Eq. 8-9
Args:
batch: Training batch containing "index" key with global frame indices
batch: Training batch containing "index" key with global frame indices.
Returns:
Tuple of:
- Weights tensor (batch_size,) normalized to sum to batch_size
- Stats dict with raw_mean_weight, num_zero_weight, num_full_weight
- Weights tensor (batch_size,) normalized to sum to batch_size.
- Stats dict with weighting statistics for logging.
"""
indices = batch.get("index")
if indices is None:
logging.warning("RA-BC: Batch missing 'index' key, using uniform weights")
batch_size = self._get_batch_size(batch)
return torch.ones(batch_size, device=self.device), {"raw_mean_weight": 1.0}
stats = {"mean_weight": 1.0, "num_zero_weight": 0, "num_full_weight": batch_size}
return torch.ones(batch_size, device=self.device), stats
# Convert to list of ints
if isinstance(indices, torch.Tensor):
@@ -183,29 +210,29 @@ class RABCWeights:
delta = self._compute_delta(idx)
deltas.append(delta)
deltas = np.array(deltas, dtype=np.float32)
deltas_array = np.array(deltas, dtype=np.float32)
# Compute weights from deltas
weights = self._compute_weights(deltas)
weights = self._compute_weights(deltas_array)
# Compute stats before normalization for logging
raw_mean_weight = float(np.nanmean(weights))
num_zero_weight = int(np.sum(weights == 0))
num_full_weight = int(np.sum(weights == 1.0))
batch_stats = {
"raw_mean_weight": raw_mean_weight,
"mean_weight": raw_mean_weight,
"num_zero_weight": num_zero_weight,
"num_full_weight": num_full_weight,
}
weights = torch.tensor(weights, device=self.device, dtype=torch.float32)
weights_tensor = torch.tensor(weights, device=self.device, dtype=torch.float32)
# Normalize to sum to batch_size
batch_size = len(weights)
weight_sum = weights.sum() + self.epsilon
weights = weights * batch_size / weight_sum
batch_size = len(weights_tensor)
weight_sum = weights_tensor.sum() + self.epsilon
weights_tensor = weights_tensor * batch_size / weight_sum
return weights, batch_stats
return weights_tensor, batch_stats
def _compute_delta(self, global_idx: int) -> float:
"""Compute progress delta for a single frame."""
@@ -241,7 +268,7 @@ class RABCWeights:
- Final weight: wi = 1{ri > κ} + 1{0 ri κ}˜wi
Returns:
Array of weights
Array of weights.
"""
valid_mask = ~np.isnan(deltas)
@@ -273,12 +300,13 @@ class RABCWeights:
if key in batch:
val = batch[key]
if isinstance(val, (torch.Tensor, np.ndarray)):
return val.shape[0]
return int(val.shape[0])
return 1
def get_stats(self) -> dict:
"""Get statistics."""
"""Get global statistics about the RA-BC weighting."""
return {
"type": "rabc",
"num_frames": len(self.progress_lookup),
"chunk_size": self.chunk_size,
"head_mode": self.head_mode,
@@ -1,5 +1,3 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
+3 -3
View File
@@ -193,15 +193,15 @@ def convert_lerobot_dataset_to_cropped_lerobot_dataset(
fps=int(original_dataset.fps),
root=new_dataset_root,
robot_type=original_dataset.meta.robot_type,
features=original_dataset.meta.info["features"],
features=original_dataset.meta.info.features,
use_videos=len(original_dataset.meta.video_keys) > 0,
)
# Update the metadata for every image key that will be cropped:
# (Here we simply set the shape to be the final resize_size.)
for key in crop_params_dict:
if key in new_dataset.meta.info["features"]:
new_dataset.meta.info["features"][key]["shape"] = [3] + list(resize_size)
if key in new_dataset.meta.info.features:
new_dataset.meta.info.features[key]["shape"] = (3, *resize_size)
# TODO: Directly modify the mp4 video + meta info features, instead of recreating a dataset
prev_episode_index = 0
@@ -54,6 +54,7 @@ class BiOpenArmFollower(Robot):
calibration_dir=config.calibration_dir,
port=config.left_arm_config.port,
disable_torque_on_disconnect=config.left_arm_config.disable_torque_on_disconnect,
use_velocity_and_torque=config.left_arm_config.use_velocity_and_torque,
max_relative_target=config.left_arm_config.max_relative_target,
cameras=left_cameras,
side=config.left_arm_config.side,
@@ -72,6 +73,7 @@ class BiOpenArmFollower(Robot):
calibration_dir=config.calibration_dir,
port=config.right_arm_config.port,
disable_torque_on_disconnect=config.right_arm_config.disable_torque_on_disconnect,
use_velocity_and_torque=config.right_arm_config.use_velocity_and_torque,
max_relative_target=config.right_arm_config.max_relative_target,
cameras=right_cameras,
side=config.right_arm_config.side,
@@ -66,6 +66,10 @@ class OpenArmFollowerConfigBase:
# Whether to disable torque when disconnecting
disable_torque_on_disconnect: bool = True
# When True, expose `.vel` and `.torque` per motor in observation features.
# Default False for compatibility with the position-only openarm_mini teleoperator.
use_velocity_and_torque: bool = False
# Safety limit for relative target positions
# Set to a positive scalar for all motors, or a dict mapping motor names to limits
max_relative_target: float | dict[str, float] | None = None
@@ -93,8 +93,9 @@ class OpenArmFollower(Robot):
features: dict[str, type] = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float # Add this
features[f"{motor}.torque"] = float # Add this
if self.config.use_velocity_and_torque:
features[f"{motor}.vel"] = float
features[f"{motor}.torque"] = float
return features
@property
@@ -235,8 +236,9 @@ class OpenArmFollower(Robot):
for motor in self.bus.motors:
state = states.get(motor, {})
obs_dict[f"{motor}.pos"] = state.get("position", 0.0)
obs_dict[f"{motor}.vel"] = state.get("velocity", 0.0)
obs_dict[f"{motor}.torque"] = state.get("torque", 0.0)
if self.config.use_velocity_and_torque:
obs_dict[f"{motor}.vel"] = state.get("velocity", 0.0)
obs_dict[f"{motor}.torque"] = state.get("torque", 0.0)
# Capture images from cameras
for cam_key, cam in self.cameras.items():
+9 -3
View File
@@ -75,7 +75,7 @@ class SentryStrategyConfig(RolloutStrategyConfig):
# Target video file size in MB for episode rotation. Episodes are
# saved once the estimated video duration would exceed this limit.
# Defaults to DEFAULT_VIDEO_FILE_SIZE_IN_MB when set to None.
target_video_file_size_mb: float | None = None
target_video_file_size_mb: int | None = None
@RolloutStrategyConfig.register_subclass("highlight")
@@ -90,7 +90,7 @@ class HighlightStrategyConfig(RolloutStrategyConfig):
"""
ring_buffer_seconds: float = 10.0
ring_buffer_max_memory_mb: float = 1024.0
ring_buffer_max_memory_mb: int = 1024
save_key: str = "s"
push_key: str = "h"
@@ -150,7 +150,7 @@ class DAggerStrategyConfig(RolloutStrategyConfig):
upload_every_n_episodes: int = 5
# Target video file size in MB for episode rotation (record_autonomous
# mode only). Defaults to DEFAULT_VIDEO_FILE_SIZE_IN_MB when None.
target_video_file_size_mb: float | None = None
target_video_file_size_mb: int | None = None
input_device: str = "keyboard"
keyboard: DAggerKeyboardConfig = field(default_factory=DAggerKeyboardConfig)
pedal: DAggerPedalConfig = field(default_factory=DAggerPedalConfig)
@@ -209,6 +209,12 @@ class RolloutConfig:
# Rename map for mapping robot/dataset observation keys to policy keys
rename_map: dict[str, str] = field(default_factory=dict)
# Hardware teardown
# When True (default), smoothly interpolate the robot back to the joint
# positions captured at startup before disconnecting. Set to False to
# leave the robot in its final achieved pose at shutdown.
return_to_initial_position: bool = True
# Torch compile
use_torch_compile: bool = False
torch_compile_backend: str = "inductor"
+36 -23
View File
@@ -27,7 +27,7 @@ from threading import Event
import torch
from lerobot.configs import FeatureType, PreTrainedConfig
from lerobot.configs import FeatureType
from lerobot.datasets import (
LeRobotDataset,
aggregate_pipeline_dataset_features,
@@ -43,6 +43,7 @@ from lerobot.processor import (
make_default_processors,
rename_stats,
)
from lerobot.processor.relative_action_processor import RelativeActionsProcessorStep
from lerobot.robots import make_robot_from_config
from lerobot.teleoperators import Teleoperator, make_teleoperator_from_config
from lerobot.utils.feature_utils import combine_feature_dicts, hw_to_dataset_features
@@ -51,6 +52,7 @@ from .configs import BaseStrategyConfig, DAggerStrategyConfig, RolloutConfig
from .inference import (
InferenceEngine,
RTCInferenceConfig,
SyncInferenceConfig,
create_inference_engine,
)
from .robot_wrapper import ThreadSafeRobot
@@ -176,33 +178,26 @@ def build_rollout_context(
policy_config = cfg.policy
policy_class = get_policy_class(policy_config.type)
full_config = PreTrainedConfig.from_pretrained(cfg.policy.pretrained_path)
for attr in ("device", "use_amp"):
if hasattr(cfg.policy, attr) and hasattr(full_config, attr):
cli_val = getattr(cfg.policy, attr)
if cli_val is not None:
setattr(full_config, attr, cli_val)
if hasattr(policy_config, "compile_model"):
policy_config.compile_model = cfg.use_torch_compile
if hasattr(full_config, "compile_model"):
full_config.compile_model = cfg.use_torch_compile
if full_config.type == "vqbet" and cfg.device == "mps":
if policy_config.type == "vqbet" and cfg.device == "mps":
raise NotImplementedError(
"Current implementation of VQBeT does not support `mps` backend. "
"Please use `cpu` or `cuda` backend."
)
if full_config.use_peft:
if policy_config.use_peft:
from peft import PeftConfig, PeftModel
peft_path = cfg.policy.pretrained_path
peft_path = policy_config.pretrained_path
peft_config = PeftConfig.from_pretrained(peft_path)
policy = policy_class.from_pretrained(
pretrained_name_or_path=peft_config.base_model_name_or_path, config=full_config
pretrained_name_or_path=peft_config.base_model_name_or_path, config=policy_config
)
policy = PeftModel.from_pretrained(policy, peft_path, config=peft_config)
else:
policy = policy_class.from_pretrained(cfg.policy.pretrained_path, config=full_config)
policy = policy_class.from_pretrained(policy_config.pretrained_path, config=policy_config)
if is_rtc:
policy.config.rtc_config = cfg.inference.rtc
@@ -257,10 +252,12 @@ def build_rollout_context(
teleop.connect()
logger.info("Teleoperator connected")
# DAgger requires teleop with motor control capabilities (enable_torque,
# disable_torque, write_goal_positions).
# TODO(Steven): either enforce this (meaning all teleop must implement these methods) or
# user is responsible for moving the teleop to the same position as the robot when starting the correction.
# TODO(Steven): once Teleoperator motor-control methods are standardised
# (``enable_torque`` / ``disable_torque`` / ``write_goal_positions``), gate
# the DAgger strategy on their presence here and fail fast with a helpful
# message instead of relying on the operator to pre-align the leader by
# hand. See :func:`DAggerStrategy._apply_transition` for the matching
# disabled call sites.
# if isinstance(cfg.strategy, DAggerStrategyConfig) and teleop is not None:
# required_teleop_methods = ("enable_torque", "disable_torque", "write_goal_positions")
# missing = [m for m in required_teleop_methods if not callable(getattr(teleop, m, None))]
@@ -272,10 +269,13 @@ def build_rollout_context(
# )
# --- 4. Features + action-key reconciliation ---------------------
# TODO(Steven): Only `.pos` joint features are used for policy inference — velocity and
# torque channels are observation-only and must be excluded from the state
# and action tensors that the policy sees.
# TODO(Steven):Only ``.pos`` joint features are routed to the policy as state and as the
# action target; velocity and torque channels (when present) are kept in
# the raw observation but excluded from the policy-facing tensors.
all_obs_features = robot.observation_features
# ``observation_features`` values are either a tuple (camera shape) or the
# ``float`` type itself used as a sentinel for scalar motor features —
# see ``dict[str, type | tuple]`` annotation on ``Robot.observation_features``.
observation_features_hw = {
k: v
for k, v in all_obs_features.items()
@@ -308,7 +308,9 @@ def build_rollout_context(
# Validate visual features if no rename_map is active
rename_map = cfg.rename_map
if not rename_map:
expected_visuals = {k for k, v in full_config.input_features.items() if v.type == FeatureType.VISUAL}
expected_visuals = {
k for k, v in policy_config.input_features.items() if v.type == FeatureType.VISUAL
}
provided_visuals = {
f"observation.images.{k}" for k, v in robot.observation_features.items() if isinstance(v, tuple)
}
@@ -353,6 +355,7 @@ def build_rollout_context(
"Use --dataset.repo_id=<user>/rollout_<name> for policy deployment datasets."
)
cfg.dataset.stamp_repo_id()
target_video_mb = getattr(cfg.strategy, "target_video_file_size_mb", None)
dataset = LeRobotDataset.create(
cfg.dataset.repo_id,
cfg.dataset.fps,
@@ -368,6 +371,7 @@ def build_rollout_context(
streaming_encoding=cfg.dataset.streaming_encoding,
encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize,
encoder_threads=cfg.dataset.encoder_threads,
video_files_size_in_mb=target_video_mb,
)
if dataset is not None:
@@ -391,6 +395,15 @@ def build_rollout_context(
},
)
if isinstance(cfg.inference, SyncInferenceConfig) and any(
isinstance(step, RelativeActionsProcessorStep) and step.enabled
for step in getattr(preprocessor, "steps", ())
):
raise NotImplementedError(
"SyncInferenceEngine does not support policies with relative actions for now."
"Use --inference.type=rtc or remove relative action processor steps from the policy pipeline."
)
# --- 7. Inference strategy (needs policy + pre/post + hardware) --
logger.info(
"Creating inference engine (type=%s)...",
+2 -2
View File
@@ -14,8 +14,8 @@
"""Inference engine package — backend-agnostic action production.
Concrete strategies (sync, RTC, ) expose the same small interface so
rollout strategies never branch on the inference backend.
Concrete backends (``sync``, ``rtc``, ...) expose the same small interface so
rollout strategies never branch on which backend is in use.
"""
from .base import InferenceEngine
+8 -7
View File
@@ -15,8 +15,8 @@
"""Inference engine ABC.
Rollout strategies consume actions through this small interface so they
do not need to know whether the inference engine is synchronous, runs in
a background thread (RTC), or comes from an external source.
do not need to know whether inference happens inline on the control thread
or asynchronously in a background thread (RTC).
"""
from __future__ import annotations
@@ -29,9 +29,10 @@ import torch
class InferenceEngine(abc.ABC):
"""Abstract backend for producing actions during rollout.
Subclasses decide whether inference happens inline, in a background
thread, or externally. The contract is minimal so new backends can
be added without touching rollout strategies.
Subclasses decide whether inference happens inline on the control
thread or asynchronously in a background thread. The contract is
minimal so additional backends can be plugged in without touching
rollout strategies.
Lifecycle
---------
@@ -43,8 +44,8 @@ class InferenceEngine(abc.ABC):
-----------------
``get_action(obs_frame)`` return the next action tensor, or
``None`` if none is available (e.g. async queue empty). Sync
backends always compute from ``obs_frame``; async backends may
ignore it (they get observations via ``notify_observation``).
backends always compute from ``obs_frame``; async backends ignore
it (they receive observations via ``notify_observation``).
Optional hooks
--------------
+2 -3
View File
@@ -68,9 +68,8 @@ class SyncInferenceConfig(InferenceEngineConfig):
class RTCInferenceConfig(InferenceEngineConfig):
"""Real-Time Chunking: async policy inference in a background thread."""
# ``RTCConfig`` is a small dataclass with default-only fields, so eagerly
# constructing one here costs nothing and keeps draccus' CLI surface flat
# (``--inference.rtc.execution_horizon=...`` etc.). No need to lazy-init.
# Eagerly constructed so draccus exposes nested fields directly on the CLI
# (e.g. ``--inference.rtc.execution_horizon=...``).
rtc: RTCConfig = field(default_factory=RTCConfig)
queue_threshold: int = 30
+7 -55
View File
@@ -32,18 +32,14 @@ from typing import Any
import torch
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.rtc import ActionQueue, LatencyTracker
from lerobot.policies.rtc import ActionQueue, LatencyTracker, reanchor_relative_rtc_prefix
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.policies.utils import prepare_observation_for_inference
from lerobot.processor import (
NormalizerProcessorStep,
PolicyProcessorPipeline,
RelativeActionsProcessorStep,
TransitionKey,
create_transition,
to_relative_actions,
)
from lerobot.utils.constants import OBS_STATE
from lerobot.utils.feature_utils import build_dataset_frame
from ..robot_wrapper import ThreadSafeRobot
@@ -66,35 +62,6 @@ _RTC_JOIN_TIMEOUT_S: float = 3.0
# ---------------------------------------------------------------------------
def _reanchor_relative_rtc_prefix(
prev_actions_absolute: torch.Tensor,
current_state: torch.Tensor,
relative_step: RelativeActionsProcessorStep,
normalizer_step: NormalizerProcessorStep | None,
policy_device: torch.device | str,
) -> torch.Tensor:
"""Convert absolute leftover actions into model-space for relative-action RTC policies.
When using relative actions, the RTC prefix (previous chunk's unexecuted tail)
is stored in absolute coordinates. Before feeding it back to the policy, this
helper re-expresses those actions relative to the robot's current joint state
and optionally normalizes them so the policy receives correctly scaled inputs.
"""
state = current_state.detach().cpu()
if state.dim() == 1:
state = state.unsqueeze(0)
action_cpu = prev_actions_absolute.detach().cpu()
mask = relative_step._build_mask(action_cpu.shape[-1])
relative_actions = to_relative_actions(action_cpu, state, mask)
transition = create_transition(action=relative_actions)
if normalizer_step is not None:
transition = normalizer_step(transition)
return transition[TransitionKey.ACTION].to(policy_device)
def _normalize_prev_actions_length(prev_actions: torch.Tensor, target_steps: int) -> torch.Tensor:
"""Pad or truncate RTC prefix actions to a fixed length for stable compiled inference."""
if prev_actions.ndim != 2:
@@ -109,21 +76,6 @@ def _normalize_prev_actions_length(prev_actions: torch.Tensor, target_steps: int
return padded
def _get_current_raw_state(
relative_step: RelativeActionsProcessorStep,
fallback_state: torch.Tensor | None,
) -> torch.Tensor | None:
"""Return the current raw state cached by the relative-action step.
``RelativeActionsProcessorStep`` caches the observation state before any
observation normalization. Re-anchoring RTC leftovers must use that raw
state rather than the normalized observation that the policy consumes.
"""
if relative_step._last_state is not None:
return relative_step._last_state
return fallback_state
# ---------------------------------------------------------------------------
# RTCInferenceEngine
# ---------------------------------------------------------------------------
@@ -333,15 +285,15 @@ class RTCInferenceEngine(InferenceEngine):
preprocessed = self._preprocessor(obs_batch)
if prev_actions is not None and self._relative_step is not None:
state_tensor = _get_current_raw_state(
self._relative_step, obs_batch.get(OBS_STATE)
)
if state_tensor is not None:
# Rebase against the raw cached state so the leftover tail stays in
# the training-time coordinate frame.
raw_state = self._relative_step.get_cached_state()
if raw_state is not None:
prev_abs = queue.get_processed_left_over()
if prev_abs is not None and prev_abs.numel() > 0:
prev_actions = _reanchor_relative_rtc_prefix(
prev_actions = reanchor_relative_rtc_prefix(
prev_actions_absolute=prev_abs,
current_state=state_tensor,
current_state=raw_state,
relative_step=self._relative_step,
normalizer_step=self._normalizer_step,
policy_device=policy_device,
+25 -31
View File
@@ -17,7 +17,6 @@
from __future__ import annotations
import logging
from collections import deque
from contextlib import nullcontext
from copy import copy
@@ -32,12 +31,27 @@ from .base import InferenceEngine
logger = logging.getLogger(__name__)
# TODO(Steven): support relative-action policies. The per-tick flow refreshes
# ``RelativeActionsProcessorStep._last_state`` every call, so cached chunk
# actions popped on later ticks get reanchored to the *current* robot state and
# absolute targets drift through the chunk. Relative-action policies are
# rejected at context-build time today; RTC postprocesses the whole chunk and
# is unaffected.
#
# Candidate fix: drive the policy via ``predict_action_chunk`` and serve a
# local FIFO of postprocessed actions. Eliminates drift by construction and
# saves per-tick pre/post work, but bypasses ``select_action`` — needs
# fallbacks for SAC (raises), ACT temporal ensembling (ensembler lives in
# ``select_action``), and Diffusion-family (obs-history queues populated as a
# side effect of ``select_action``).
class SyncInferenceEngine(InferenceEngine):
"""Inline synchronous inference: compute one action per call.
``get_action`` runs the full policy pipeline when its local action
queue is empty, postprocesses the whole predicted chunk immediately,
and then returns one already-postprocessed CPU action at a time.
``get_action`` runs the full policy pipeline (pre/post-processor +
``select_action``) on the given observation frame and returns a
CPU action tensor reordered to match the dataset action keys.
"""
def __init__(
@@ -59,8 +73,6 @@ class SyncInferenceEngine(InferenceEngine):
self._task = task
self._device = torch.device(device or "cpu")
self._robot_type = robot_type
self._processed_action_queue: deque[torch.Tensor] = deque()
logger.info(
"SyncInferenceEngine initialized (device=%s, action_keys=%d)",
self._device,
@@ -81,28 +93,9 @@ class SyncInferenceEngine(InferenceEngine):
self._policy.reset()
self._preprocessor.reset()
self._postprocessor.reset()
self._processed_action_queue.clear()
def _enqueue_processed_chunk(self, action_chunk: torch.Tensor) -> None:
"""Convert a postprocessed action chunk into ordered per-step CPU tensors."""
if action_chunk.ndim == 2:
action_chunk = action_chunk.unsqueeze(0)
n_action_steps = getattr(self._policy.config, "n_action_steps", action_chunk.shape[1])
action_chunk = action_chunk[:, : min(n_action_steps, action_chunk.shape[1])]
for action in action_chunk.squeeze(0):
action_tensor = action.cpu()
action_dict = make_robot_action(action_tensor, self._dataset_features)
ordered_action = torch.tensor(
[action_dict[k] for k in self._ordered_action_keys], dtype=action_tensor.dtype
)
self._processed_action_queue.append(ordered_action)
def get_action(self, obs_frame: dict | None) -> torch.Tensor | None:
"""Run the full inference pipeline on ``obs_frame`` and return an action tensor."""
if self._processed_action_queue:
return self._processed_action_queue.popleft().clone()
if obs_frame is None:
return None
# Shallow copy is intentional: the caller (`send_next_action`) builds
@@ -119,10 +112,11 @@ class SyncInferenceEngine(InferenceEngine):
observation, self._device, self._task, self._robot_type
)
observation = self._preprocessor(observation)
action_chunk = self._policy.predict_action_chunk(observation)
processed_chunk = self._postprocessor(action_chunk)
action = self._policy.select_action(observation)
action = self._postprocessor(action)
action_tensor = action.squeeze(0).cpu()
self._enqueue_processed_chunk(processed_chunk)
if not self._processed_action_queue:
return None
return self._processed_action_queue.popleft().clone()
# Reorder to match dataset action ordering so the caller can treat
# the returned tensor uniformly across backends.
action_dict = make_robot_action(action_tensor, self._dataset_features)
return torch.tensor([action_dict[k] for k in self._ordered_action_keys])
+1 -1
View File
@@ -47,7 +47,7 @@ class RolloutRingBuffer:
count.
"""
def __init__(self, max_seconds: float = 30.0, max_memory_mb: float = 2048.0, fps: float = 30.0) -> None:
def __init__(self, max_seconds: float = 30.0, max_memory_mb: int = 2048, fps: float = 30.0) -> None:
self._max_frames = int(max_seconds * fps)
self._max_bytes = int(max_memory_mb * 1024 * 1024)
self._buffer: deque[dict] = deque(maxlen=self._max_frames)
+10 -16
View File
@@ -23,6 +23,7 @@ from lerobot.utils.robot_utils import precise_sleep
from ..context import RolloutContext
from .core import RolloutStrategy, send_next_action
from .display import BaseDisplay
logger = logging.getLogger(__name__)
@@ -38,6 +39,8 @@ class BaseStrategy(RolloutStrategy):
"""Initialise the inference engine."""
self._init_engine(ctx)
logger.info("Base strategy ready")
self._display = BaseDisplay(duration=ctx.runtime.cfg.duration)
self._display.show_banner()
def run(self, ctx: RolloutContext) -> None:
"""Run the autonomous control loop until shutdown or duration expires."""
@@ -47,12 +50,8 @@ class BaseStrategy(RolloutStrategy):
interpolator = self._interpolator
control_interval = interpolator.get_control_interval(cfg.fps)
observation_interval = 1.0 / cfg.fps
start_time = time.perf_counter()
next_observation_time = 0.0
obs = None
obs_processed = None
engine.resume()
logger.info("Base strategy control loop started")
@@ -63,18 +62,12 @@ class BaseStrategy(RolloutStrategy):
logger.info("Duration limit reached (%.0fs)", cfg.duration)
break
if obs is None or loop_start >= next_observation_time:
obs = robot.get_observation()
obs_processed = ctx.processors.robot_observation_processor(obs)
engine.notify_observation(obs_processed)
next_observation_time = loop_start + observation_interval
obs = robot.get_observation()
obs_processed = self._process_observation_and_notify(ctx.processors, obs)
if self._handle_warmup(cfg.use_torch_compile, loop_start, control_interval):
continue
if obs_processed is None:
continue
action_dict = send_next_action(obs_processed, obs, ctx, interpolator)
self._log_telemetry(obs_processed, action_dict, ctx.runtime)
@@ -82,11 +75,12 @@ class BaseStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
self._warn_slow_loop(dt, control_interval, cfg.fps)
def teardown(self, ctx: RolloutContext) -> None:
"""Disconnect hardware and stop inference."""
self._teardown_hardware(ctx.hardware)
self._teardown_hardware(
ctx.hardware,
return_to_initial_position=ctx.runtime.cfg.return_to_initial_position,
)
logger.info("Base strategy teardown complete")
+55 -14
View File
@@ -32,7 +32,8 @@ from ..inference import InferenceEngine
if TYPE_CHECKING:
from ..configs import RolloutStrategyConfig
from ..context import HardwareContext, RolloutContext, RuntimeContext
from ..context import HardwareContext, ProcessorContext, RolloutContext, RuntimeContext
from .display import RolloutStatusDisplay
logger = logging.getLogger(__name__)
@@ -50,6 +51,18 @@ class RolloutStrategy(abc.ABC):
self._engine: InferenceEngine | None = None
self._interpolator: ActionInterpolator | None = None
self._warmup_flushed: bool = False
self._cached_obs_processed: dict | None = None
self._display: RolloutStatusDisplay | None = None
def _warn_slow_loop(self, dt: float, control_interval: float, fps: float) -> None:
"""Warn when the control loop runs slower than the target FPS."""
if dt > control_interval:
logger.warning(
"Control loop running slower (%.1f Hz) than target (%.0f Hz). "
"Possible causes: camera FPS not keeping up, slow policy inference, CPU starvation.",
1 / dt,
fps,
)
def _init_engine(self, ctx: RolloutContext) -> None:
"""Attach the inference engine and action interpolator, then start the backend.
@@ -65,8 +78,32 @@ class RolloutStrategy(abc.ABC):
self._engine.reset()
self._engine.start()
self._warmup_flushed = False
self._cached_obs_processed = None
logger.info("Inference engine started")
def _process_observation_and_notify(self, processors: ProcessorContext, obs_raw: dict) -> dict:
"""Run the observation processor and notify the engine — throttled to policy ticks.
Callers are responsible for calling ``robot.get_observation()`` every loop
iteration so ``obs_raw`` stays fresh for the action post-processor. This
helper gates only the comparatively expensive bits the processor pipeline
and ``engine.notify_observation`` to fire when the interpolator signals
it needs a new action (once per ``interpolation_multiplier`` ticks). On
interpolated ticks the cached ``obs_processed`` is reused.
With ``interpolation_multiplier == 1`` this is equivalent to the unthrottled
path: ``needs_new_action()`` is True every tick.
The cache is implicitly invalidated whenever ``interpolator.reset()`` is
called (warmup completion, DAgger phase transitions back to AUTONOMOUS),
because reset makes ``needs_new_action()`` return True on the next call.
"""
if self._cached_obs_processed is None or self._interpolator.needs_new_action():
obs_processed = processors.robot_observation_processor(obs_raw)
self._engine.notify_observation(obs_processed)
self._cached_obs_processed = obs_processed
return self._cached_obs_processed
def _handle_warmup(self, use_torch_compile: bool, loop_start: float, control_interval: float) -> bool:
"""Handle torch.compile warmup phase.
@@ -91,16 +128,20 @@ class RolloutStrategy(abc.ABC):
engine.resume()
return False
def _teardown_hardware(self, hw: HardwareContext) -> None:
"""Stop the inference engine, return robot to initial position, and disconnect hardware."""
def _teardown_hardware(self, hw: HardwareContext, return_to_initial_position: bool = True) -> None:
"""Stop the inference engine, optionally return robot to initial position, and disconnect hardware."""
if self._engine is not None:
logger.info("Stopping inference engine...")
self._engine.stop()
robot = hw.robot_wrapper.inner
if robot.is_connected:
if hw.initial_position:
if return_to_initial_position and hw.initial_position:
logger.info("Returning robot to initial position before shutdown...")
self._return_to_initial_position(hw)
elif not return_to_initial_position:
logger.info(
"Skipping return-to-initial-position (disabled by config); leaving robot in final pose."
)
logger.info("Disconnecting robot...")
robot.disconnect()
teleop = hw.teleop
@@ -194,7 +235,7 @@ def estimate_max_episode_seconds(
The estimate ignores codec-specific settings (CRF, preset) on purpose:
we only need a rough lower bound on bitrate, not a precise prediction.
Falls back to 600 s (10 min) when no video features are present.
Falls back to 300 s (5 min) when no video features are present.
"""
# 0.1 bits-per-pixel is a *low* estimate for CRF-30 streaming video of
# robot footage (real-world is typically 0.1 0.3 bpp). Under-
@@ -208,16 +249,16 @@ def estimate_max_episode_seconds(
if feat.get("dtype") == "video":
shape = feat.get("shape", ())
# Assuming shape could be (C, H, W) or (T, C, H, W)
# We want to extract the spatial dimensions.
if len(shape) >= 3:
h, w = shape[-2], shape[-1]
pixels = h * w
if pixels > 0:
camera_pixels.append(pixels)
# (H, W, C) — bits-per-pixel is a per-spatial-pixel metric,
# so we exclude the channel dimension from the count.
if len(shape) == 3:
pixels = shape[0] * shape[1]
camera_pixels.append(pixels)
else:
raise ValueError(f"Unexpected video feature shape: {shape}")
if not camera_pixels:
return 600.0
return 300.0
# Use the smallest camera: it produces the lowest bitrate and therefore
# takes the longest to reach the target — the conservative choice.
@@ -227,7 +268,7 @@ def estimate_max_episode_seconds(
# Guard against division by zero just in case
if bytes_per_second <= 0:
return 600.0
return 300.0
return (target_size_mb * 1024 * 1024) / bytes_per_second
+179 -71
View File
@@ -24,14 +24,22 @@ the ``input_device`` config field. Each device exposes three actions:
1. **pause_resume** Toggle policy execution (AUTONOMOUS <-> PAUSED).
2. **correction** Toggle correction recording (PAUSED <-> CORRECTING).
3. **upload** Push dataset to hub on demand (corrections-only mode).
ESC (keyboard only) Stop session.
ESC (keyboard only) Stop session.
Recording Modes:
Recording modes:
``record_autonomous=True``: Sentry-like continuous recording with
time-based episode rotation. Both autonomous and correction
frames are recorded; corrections tagged ``intervention=True``.
``record_autonomous=False``: Only correction windows are recorded.
Each correction (start to stop) becomes one episode.
Teleoperator handover:
On AUTONOMOUS PAUSED, actuated teleops (those with non-empty
``feedback_features``, e.g. SO-101, OpenArmMini) are smoothly driven to
the follower's last position via ``send_feedback`` so the operator takes
over without a jerk. Non-actuated teleops cannot be driven,
so on PAUSED CORRECTING the follower is instead slid to the teleop's
current pose before the correction begins.
"""
from __future__ import annotations
@@ -63,6 +71,7 @@ from ..configs import DAggerKeyboardConfig, DAggerPedalConfig, DAggerStrategyCon
from ..context import RolloutContext
from ..robot_wrapper import ThreadSafeRobot
from .core import RolloutStrategy, estimate_max_episode_seconds, safe_push_to_hub, send_next_action
from .display import DAggerDisplay
PYNPUT_AVAILABLE = _pynput_available
keyboard = None
@@ -168,15 +177,27 @@ class DAggerEvents:
# ---------------------------------------------------------------------------
# TODO(Steven): either enforce this (meaning all teleop must implement these methods) or
# user is responsible for moving the teleop to the same position as the robot when starting the correction.
def _teleop_smooth_move_to(
teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 50
) -> None:
"""Smoothly move teleop to target position via linear interpolation.
def _teleop_supports_feedback(teleop: Teleoperator) -> bool:
"""Return True when the teleop can receive position feedback (is actuated).
TODO(Maxime): See if it is possible to unify this interface across teleops instead of duck-typing.
"""
return (
bool(teleop.feedback_features)
and hasattr(teleop, "disable_torque")
and hasattr(teleop, "enable_torque")
)
Requires the teleoperator to support motor control methods
(``enable_torque``, ``write_goal_positions``, ``get_action``).
def _teleop_smooth_move_to(
teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 30
) -> None:
"""Smoothly move an actuated teleop to ``target_pos`` via linear interpolation.
Requires the teleoperator to support feedback
(i.e. have non-empty ``feedback_features`` and implement ``disable_torque`` / ``enable_torque``).
TODO(Maxime): This blocks up to ``duration_s`` seconds, during this time
the follower robot doesn't receive new actions, this could be an issue on LeKiwi.
"""
teleop.enable_torque()
current = teleop.get_action()
@@ -184,13 +205,28 @@ def _teleop_smooth_move_to(
for step in range(steps + 1):
t = step / steps
interp = {}
for k in current:
if k in target_pos:
interp[k] = current[k] * (1 - t) + target_pos[k] * t
else:
interp[k] = current[k]
teleop.write_goal_positions(interp)
interp = {
k: current[k] * (1 - t) + target_pos[k] * t if k in target_pos else current[k] for k in current
}
teleop.send_feedback(interp)
time.sleep(1 / fps)
def _follower_smooth_move_to(
robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30
) -> None:
"""Smoothly move the follower robot from ``current`` to ``target`` action.
Used when the teleop is non-actuated: instead of driving the leader arm
to the follower, we bring the follower to the teleop's current pose.
Both ``current`` and ``target`` must be in robot-action key space.
"""
steps = max(int(duration_s * fps), 1)
for step in range(steps + 1):
t = step / steps
interp = {k: current[k] * (1 - t) + target[k] * t if k in target else current[k] for k in current}
robot.send_action(interp)
time.sleep(1 / fps)
@@ -251,7 +287,7 @@ def _init_dagger_keyboard(events: DAggerEvents, cfg: DAggerKeyboardConfig):
listener = keyboard.Listener(on_press=on_press)
listener.start()
logger.info(
logger.debug(
"DAgger keyboard listener started (pause_resume='%s', correction='%s', upload='%s', ESC=stop)",
cfg.pause_resume,
cfg.correction,
@@ -335,6 +371,28 @@ class DAggerStrategy(RolloutStrategy):
self._episode_duration_s,
)
if self.config.input_device == "keyboard":
kb = self.config.keyboard
pause_key, correction_key, upload_key = (
kb.pause_resume.upper(),
kb.correction.upper(),
kb.upload.upper(),
)
else:
pb = self.config.pedal
pause_key, correction_key, upload_key = pb.pause_resume, pb.correction, pb.upload
self._display = DAggerDisplay(
record_autonomous=self.config.record_autonomous,
num_episodes=self.config.num_episodes,
episode_duration_s=self._episode_duration_s,
input_device=self.config.input_device,
pause_key=pause_key,
correction_key=correction_key,
upload_key=upload_key,
)
self._display.show_banner()
def run(self, ctx: RolloutContext) -> None:
"""Run DAgger episodes with human-in-the-loop intervention."""
if self.config.record_autonomous:
@@ -371,7 +429,10 @@ class DAggerStrategy(RolloutStrategy):
logger.info("Dataset uploaded to hub")
log_say("Dataset uploaded to hub", play_sounds)
self._teardown_hardware(ctx.hardware)
self._teardown_hardware(
ctx.hardware,
return_to_initial_position=ctx.runtime.cfg.return_to_initial_position,
)
logger.info("DAgger strategy teardown complete")
# ------------------------------------------------------------------
@@ -403,10 +464,8 @@ class DAggerStrategy(RolloutStrategy):
engine.reset()
interpolator.reset()
events.reset()
# TODO(Steven): either enforce this (meaning all teleop must implement these methods) or
# user is responsible for moving the teleop to the same position as the robot when starting the correction.
# teleop.disable_torque()
engine.resume()
self._display.show_state(DAggerPhase.AUTONOMOUS)
last_action: dict[str, Any] | None = None
record_tick = 0
@@ -429,24 +488,36 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition()
if transition is not None:
old_phase, new_phase = transition
self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop)
last_action = None
self._apply_transition(
old_phase,
new_phase,
engine,
interpolator,
ctx,
last_action,
)
self._display.show_state(new_phase)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
phase = events.phase
obs = robot.get_observation()
obs_processed = ctx.processors.robot_observation_processor(obs)
obs_frame = build_dataset_frame(features, obs_processed, prefix=OBS_STR)
# --- CORRECTING: human teleop control ---
# TODO(Steven): teleop runs at the same FPS as the policy. To
# decouple the two, sample teleop at its native rate and
# interpolate to the control loop's tick rate.
if phase == DAggerPhase.CORRECTING:
obs_processed = ctx.processors.robot_observation_processor(obs)
teleop_action = teleop.get_action()
processed_teleop = ctx.processors.teleop_action_processor((teleop_action, obs))
robot_action_to_send = ctx.processors.robot_action_processor((processed_teleop, obs))
robot.send_action(robot_action_to_send)
last_action = robot_action_to_send
self._log_telemetry(obs_processed, processed_teleop, ctx.runtime)
action_frame = build_dataset_frame(features, processed_teleop, prefix=ACTION)
if record_tick % record_stride == 0:
obs_frame = build_dataset_frame(features, obs_processed, prefix=OBS_STR)
action_frame = build_dataset_frame(features, processed_teleop, prefix=ACTION)
frame = {
**obs_frame,
**action_frame,
@@ -463,7 +534,7 @@ class DAggerStrategy(RolloutStrategy):
# --- AUTONOMOUS: policy control ---
else:
engine.notify_observation(obs_processed)
obs_processed = self._process_observation_and_notify(ctx.processors, obs)
if self._handle_warmup(cfg.use_torch_compile, loop_start, control_interval):
continue
@@ -472,8 +543,9 @@ class DAggerStrategy(RolloutStrategy):
if action_dict is not None:
self._log_telemetry(obs_processed, action_dict, ctx.runtime)
last_action = ctx.processors.robot_action_processor((action_dict, obs))
action_frame = build_dataset_frame(features, action_dict, prefix=ACTION)
if record_tick % record_stride == 0:
obs_frame = build_dataset_frame(features, obs_processed, prefix=OBS_STR)
action_frame = build_dataset_frame(features, action_dict, prefix=ACTION)
frame = {
**obs_frame,
**action_frame,
@@ -483,9 +555,9 @@ class DAggerStrategy(RolloutStrategy):
dataset.add_frame(frame)
record_tick += 1
# Episode rotation derived from video file-size target.
# Do NOT save mid-correction — wait for the correction
# to finish so the episode boundary is clean.
# Episode rotation derived from the video file-size target.
# Saving is deferred while a correction is ongoing so the
# episode boundary lands on a clean autonomous frame.
elapsed = time.perf_counter() - episode_start
if elapsed >= episode_duration_s and phase != DAggerPhase.CORRECTING:
with self._episode_lock:
@@ -509,16 +581,11 @@ class DAggerStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
self._warn_slow_loop(dt, control_interval, cfg.fps)
finally:
logger.info("DAgger continuous control loop ended — pausing engine")
engine.pause()
# TODO(Steven): either enforce this (meaning all teleop must implement these methods) or
# user is responsible for moving the teleop to the same position as the robot when starting the correction.
# teleop.disable_torque()
with contextlib.suppress(Exception):
with self._episode_lock:
dataset.save_episode()
@@ -554,10 +621,8 @@ class DAggerStrategy(RolloutStrategy):
engine.reset()
interpolator.reset()
events.reset()
# TODO(Steven): either enforce this (meaning all teleop must implement these methods) or
# user is responsible for moving the teleop to the same position as the robot when starting the correction.
# teleop.disable_torque()
engine.resume()
self._display.show_state(DAggerPhase.AUTONOMOUS)
last_action: dict[str, Any] | None = None
start_time = time.perf_counter()
@@ -584,8 +649,17 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition()
if transition is not None:
old_phase, new_phase = transition
self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop)
last_action = None
self._apply_transition(
old_phase,
new_phase,
engine,
interpolator,
ctx,
last_action,
)
self._display.show_state(new_phase)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
# Correction ended -> save episode (blocking if not streaming)
if old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED:
@@ -608,10 +682,13 @@ class DAggerStrategy(RolloutStrategy):
phase = events.phase
obs = robot.get_observation()
obs_processed = ctx.processors.robot_observation_processor(obs)
# --- CORRECTING: human teleop control + recording ---
# TODO(Steven): teleop runs at the same FPS as the policy. To
# decouple the two, sample teleop at its native rate and
# interpolate to the control loop's tick rate.
if phase == DAggerPhase.CORRECTING:
obs_processed = ctx.processors.robot_observation_processor(obs)
teleop_action = teleop.get_action()
processed_teleop = ctx.processors.teleop_action_processor((teleop_action, obs))
robot_action_to_send = ctx.processors.robot_action_processor((processed_teleop, obs))
@@ -619,9 +696,9 @@ class DAggerStrategy(RolloutStrategy):
last_action = robot_action_to_send
self._log_telemetry(obs_processed, processed_teleop, ctx.runtime)
obs_frame = build_dataset_frame(features, obs_processed, prefix=OBS_STR)
action_frame = build_dataset_frame(features, processed_teleop, prefix=ACTION)
if record_tick % record_stride == 0:
obs_frame = build_dataset_frame(features, obs_processed, prefix=OBS_STR)
action_frame = build_dataset_frame(features, processed_teleop, prefix=ACTION)
dataset.add_frame(
{
**obs_frame,
@@ -639,7 +716,7 @@ class DAggerStrategy(RolloutStrategy):
# --- AUTONOMOUS: policy control (no recording) ---
else:
engine.notify_observation(obs_processed)
obs_processed = self._process_observation_and_notify(ctx.processors, obs)
if self._handle_warmup(cfg.use_torch_compile, loop_start, control_interval):
continue
@@ -653,16 +730,11 @@ class DAggerStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
self._warn_slow_loop(dt, control_interval, cfg.fps)
finally:
logger.info("DAgger corrections-only loop ended — pausing engine")
engine.pause()
# TODO(Steven): either enforce this (meaning all teleop must implement these methods) or
# user is responsible for moving the teleop to the same position as the robot when starting the correction.
# teleop.disable_torque()
with contextlib.suppress(Exception):
with self._episode_lock:
dataset.save_episode()
@@ -679,35 +751,71 @@ class DAggerStrategy(RolloutStrategy):
new_phase: DAggerPhase,
engine,
interpolator,
robot: ThreadSafeRobot,
teleop: Teleoperator,
ctx: RolloutContext,
prev_action: dict | None,
) -> None:
"""Execute side-effects for a validated phase transition."""
"""Execute side-effects for a validated phase transition, including smooth handovers.
AUTONOMOUS -> PAUSED (actuated teleop):
Pause the engine, then drive the leader arm to the follower's last
commanded position so the operator takes over without a jerk.
PAUSED -> CORRECTING (non-actuated teleop):
Slide the follower to the teleop's current pose so the robot meets
the operator's hand rather than jumping to it on the first frame.
CORRECTING -> PAUSED (actuated teleop):
Re-enable torque to hold position after correction.
This will be potentially useful if cancelling the correction recording
PAUSED -> AUTONOMOUS:
Reset and resume the inference engine.
"""
teleop = ctx.hardware.teleop
robot = ctx.hardware.robot_wrapper
logger.info("Phase transition: %s -> %s", old_phase.value, new_phase.value)
if old_phase == DAggerPhase.AUTONOMOUS and new_phase == DAggerPhase.PAUSED:
logger.info("Pausing engine robot holds position")
logger.info("Pausing engine - robot holds position")
engine.pause()
obs = robot.get_observation()
_robot_pos = {
k: v for k, v in obs.items() if k.endswith(".pos") and k in robot.observation_features
}
# TODO(Steven): either enforce this (meaning all teleop must implement these methods) or
# user is responsible for moving the teleop to the same position as the robot when starting the correction.
# Consider also a method that moves the robot to the teleop smoothly (similar to what we do at HW shutdown).
# _teleop_smooth_move_to(teleop, robot_pos, duration_s=2.0, fps=50)
elif new_phase == DAggerPhase.CORRECTING:
logger.info("Entering correction mode — human teleop control")
# TODO(Steven): either enforce this (meaning all teleop must implement these methods) or
# user is responsible for moving the teleop to the same position as the robot when starting the correction.
# teleop.disable_torque()
if _teleop_supports_feedback(teleop) and prev_action is not None:
# TODO(Maxime): prev_action is in robot action key space (output of robot_action_processor).
# send_feedback expects teleop feedback key space. For homogeneous setups (e.g. SO-101
# leader + SO-101 follower) the keys are identical so this works. If the processor pipeline
# does non-trivial key renaming (e.g. a rename_map on action keys), the interpolation in
# _teleop_smooth_move_to silently no-ops and the arm doesn't move.
logger.info("Smooth handover: moving leader arm to follower position")
_teleop_smooth_move_to(teleop, prev_action)
elif old_phase == DAggerPhase.PAUSED and new_phase == DAggerPhase.CORRECTING:
logger.info("Entering correction mode - human teleop control")
if not _teleop_supports_feedback(teleop) and prev_action is not None:
logger.info("Smooth handover: sliding follower to teleop position")
obs = robot.get_observation()
teleop_action = teleop.get_action()
processed = ctx.processors.teleop_action_processor((teleop_action, obs))
target = ctx.processors.robot_action_processor((processed, obs))
_follower_smooth_move_to(robot, prev_action, target)
# unlock the teleop for human control
if _teleop_supports_feedback(teleop):
teleop.disable_torque()
elif old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED:
if _teleop_supports_feedback(teleop):
teleop.enable_torque()
elif new_phase == DAggerPhase.AUTONOMOUS:
logger.info("Resuming autonomous mode resetting engine and interpolator")
logger.info("Resuming autonomous mode - resetting engine and interpolator")
interpolator.reset()
engine.reset()
engine.resume()
# release teleop before resuming the policy
if _teleop_supports_feedback(teleop):
teleop.disable_torque()
# ------------------------------------------------------------------
# Background push (shared by both modes)
# ------------------------------------------------------------------
+263
View File
@@ -0,0 +1,263 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Console status display for rollout strategies.
One subclass per strategy static states/controls are declared as class
constants; runtime-dependent values are passed to ``__init__``.
In each strategy's ``setup()``:
self._display = DAggerDisplay(
record_autonomous=self.config.record_autonomous,
num_episodes=self.config.num_episodes,
episode_duration_s=self._episode_duration_s,
input_device=self.config.input_device,
pause_key="SPACE",
correction_key="TAB",
upload_key="ENTER",
)
self._display.show_banner()
On each state transition:
self._display.show_state("correcting")
"""
from __future__ import annotations
import enum
import sys
from dataclasses import dataclass
def _supports_color() -> bool:
return hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
class _C:
"""ANSI escape codes."""
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[1;92m"
YELLOW = "\033[1;93m"
RED = "\033[1;91m"
CYAN = "\033[1;96m"
WHITE = "\033[1;97m"
GRAY = "\033[2;37m"
@dataclass
class StateConfig:
"""One named rollout state.
``key`` must match the string passed to ``RolloutStatusDisplay.show_state()``.
"""
key: str
emoji: str
label: str
description: str
color: str = _C.WHITE
@dataclass
class ControlConfig:
"""One keyboard/pedal binding shown in the startup banner."""
key: str
description: str
# ---------------------------------------------------------------------------
# Base display class
# ---------------------------------------------------------------------------
class RolloutStatusDisplay:
"""Unified console status display. Subclass once per strategy."""
def __init__(
self,
strategy: str,
states: list[StateConfig],
controls: list[ControlConfig],
info: list[str] | None = None,
) -> None:
self.strategy = strategy
self._states = {s.key: s for s in states}
self._controls = controls
self._info = info or []
self._use_color = _supports_color()
def _c(self, code: str, text: str) -> str:
if not self._use_color:
return text
return f"{code}{text}{_C.RESET}"
def show_banner(self) -> None:
"""Print startup banner: strategy name, states, controls, config info."""
width = 62
sep = self._c(_C.BOLD, "" * width)
print(f"\n{sep}")
print(self._c(_C.BOLD, f" lerobot-rollout │ {self.strategy}"))
if self._states:
print()
for state in self._states.values():
label = self._c(state.color, f"{state.label:<14}")
desc = self._c(_C.GRAY, state.description)
print(f" {state.emoji} {label} {desc}")
if self._controls:
print()
key_width = max(len(c.key) for c in self._controls)
for ctrl in self._controls:
key_str = self._c(_C.CYAN, f"[{ctrl.key:<{key_width}}]")
print(f" {key_str} {ctrl.description}")
if self._info:
print()
for item in self._info:
print(f" {item}")
print(f"{sep}\n")
def show_state(self, state_key: str | enum.Enum) -> None:
"""Print the current state and available controls - call this on every transition."""
key = state_key.value if isinstance(state_key, enum.Enum) else state_key
state = self._states.get(key)
if state is None:
return
label = self._c(state.color, f"{state.label:<14}")
desc = self._c(_C.GRAY, state.description)
print(f"\n {state.emoji} {label} {desc}\n")
if self._controls:
key_width = max(len(c.key) for c in self._controls)
for ctrl in self._controls:
key_str = self._c(_C.CYAN, f"[{ctrl.key:<{key_width}}]")
print(f" {key_str} {ctrl.description}")
print()
# ---------------------------------------------------------------------------
# One display subclass per strategy
# ---------------------------------------------------------------------------
class BaseDisplay(RolloutStatusDisplay):
"""Status display for the base (eval-only, no recording) strategy."""
_STATES = [StateConfig("running", "🟢", "RUNNING", "autonomous rollout — no recording", _C.GREEN)]
_CONTROLS = [ControlConfig("Ctrl+C", "stop session")]
def __init__(self, duration: float = 0) -> None:
info = ["No recording — evaluation only."]
if duration > 0:
info.append(f"Duration: {duration:.0f}s")
super().__init__("base", self._STATES, self._CONTROLS, info)
class SentryDisplay(RolloutStatusDisplay):
"""Status display for the sentry (continuous autonomous recording) strategy."""
_STATES = [StateConfig("recording", "🟢", "RECORDING", "continuous autonomous recording", _C.GREEN)]
_CONTROLS = [ControlConfig("Ctrl+C", "stop session")]
def __init__(self, episode_duration_s: float, upload_every_n_episodes: int) -> None:
info = [
f"Episode rotation: ~{episode_duration_s:.0f}s | "
f"Upload every {upload_every_n_episodes} episodes",
]
super().__init__("sentry", self._STATES, self._CONTROLS, info)
class HighlightDisplay(RolloutStatusDisplay):
"""Status display for the highlight (ring-buffer on-demand save) strategy."""
def __init__(self, ring_buffer_seconds: float, save_key: str, push_key: str) -> None:
states = [
StateConfig(
"buffering",
"",
"BUFFERING",
f"ring buffer active — last {ring_buffer_seconds:.0f}s captured",
_C.WHITE,
),
StateConfig("recording", "🔴", "RECORDING", "live recording — press [s] to save episode", _C.RED),
]
controls = [
ControlConfig(save_key, "BUFFERING ↔ RECORDING start recording / save episode"),
ControlConfig(push_key, "push dataset to Hub (background)"),
ControlConfig("ESC", "stop session"),
]
super().__init__("highlight", states, controls)
class DAggerDisplay(RolloutStatusDisplay):
"""Status display for the dagger (human-in-the-loop) strategy."""
_PAUSED_STATE = StateConfig("paused", "🟡", "PAUSED", "holding last position — awaiting input", _C.YELLOW)
_CORRECTING_STATE = StateConfig(
"correcting", "🔴", "CORRECTING", "human teleop active — recording correction", _C.RED
)
def __init__(
self,
record_autonomous: bool,
num_episodes: int,
episode_duration_s: float,
input_device: str,
pause_key: str,
correction_key: str,
upload_key: str,
) -> None:
mode = "continuous recording" if record_autonomous else "corrections only"
auto_desc = "policy running — recording" if record_autonomous else "policy running — no recording"
states = [
StateConfig("autonomous", "🟢", "AUTONOMOUS", auto_desc, _C.GREEN),
self._PAUSED_STATE,
self._CORRECTING_STATE,
]
controls = [
ControlConfig(pause_key, "AUTONOMOUS ↔ PAUSED pause / resume policy"),
ControlConfig(correction_key, "PAUSED ↔ CORRECTING start / stop correction"),
ControlConfig(upload_key, "push dataset to Hub"),
ControlConfig("ESC", "stop session"),
]
info = [f"Target: {num_episodes} episodes | Input: {input_device}"]
if record_autonomous:
info.append(f"Episode rotation: ~{episode_duration_s:.0f}s")
super().__init__(f"dagger [{mode}]", states, controls, info)
if __name__ == "__main__":
dagger_display = DAggerDisplay(
record_autonomous=False,
num_episodes=20,
episode_duration_s=30,
input_device="keyboard",
pause_key="SPACE",
correction_key="TAB",
upload_key="ENTER",
)
dagger_display.show_banner()
dagger_display.show_state("paused")
dagger_display.show_state("correcting")
dagger_display.show_state("paused")
dagger_display.show_state("autonomous")
+27 -9
View File
@@ -17,6 +17,7 @@
from __future__ import annotations
import contextlib
import enum
import logging
import os
import sys
@@ -36,6 +37,7 @@ from ..configs import HighlightStrategyConfig
from ..context import RolloutContext
from ..ring_buffer import RolloutRingBuffer
from .core import RolloutStrategy, safe_push_to_hub, send_next_action
from .display import HighlightDisplay
PYNPUT_AVAILABLE = _pynput_available
keyboard = None
@@ -53,6 +55,13 @@ if PYNPUT_AVAILABLE:
logger = logging.getLogger(__name__)
class HighlightPhase(enum.Enum):
"""Observable phases of a Highlight session."""
BUFFERING = "buffering" # Ring buffer accumulating frames, not recording
RECORDING = "recording" # Live recording active
class HighlightStrategy(RolloutStrategy):
"""Autonomous rollout with on-demand recording via ring buffer.
@@ -64,8 +73,8 @@ class HighlightStrategy(RolloutStrategy):
3. The episode is saved and the ring buffer resumes capturing.
Requires ``streaming_encoding=True`` (enforced in config validation)
so that ``dataset.add_frame`` is a non-blocking queue put draining
900 frames stays sub-ms per frame.
so that ``dataset.add_frame`` is a non-blocking queue put flushing
the entire ring buffer in one tick must not stall the control loop.
"""
config: HighlightStrategyConfig
@@ -105,6 +114,13 @@ class HighlightStrategy(RolloutStrategy):
self.config.save_key,
self.config.push_key,
)
self._display = HighlightDisplay(
ring_buffer_seconds=self.config.ring_buffer_seconds,
save_key=self.config.save_key,
push_key=self.config.push_key,
)
self._display.show_banner()
self._display.show_state(HighlightPhase.BUFFERING)
def run(self, ctx: RolloutContext) -> None:
"""Run the autonomous loop, buffering frames and recording on demand."""
@@ -135,8 +151,7 @@ class HighlightStrategy(RolloutStrategy):
break
obs = robot.get_observation()
obs_processed = ctx.processors.robot_observation_processor(obs)
engine.notify_observation(obs_processed)
obs_processed = self._process_observation_and_notify(ctx.processors, obs)
if self._handle_warmup(cfg.use_torch_compile, loop_start, control_interval):
continue
@@ -163,6 +178,7 @@ class HighlightStrategy(RolloutStrategy):
for buffered_frame in ring.drain():
dataset.add_frame(buffered_frame)
self._recording_live.set()
self._display.show_state(HighlightPhase.RECORDING)
else:
dataset.add_frame(frame)
with self._episode_lock:
@@ -173,6 +189,7 @@ class HighlightStrategy(RolloutStrategy):
play_sounds,
)
self._recording_live.clear()
self._display.show_state(HighlightPhase.BUFFERING)
continue # frame already consumed — skip ring.append
if self._push_requested.is_set():
@@ -189,9 +206,7 @@ class HighlightStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
self._warn_slow_loop(dt, control_interval, cfg.fps)
finally:
logger.info("Highlight control loop ended")
@@ -228,7 +243,10 @@ class HighlightStrategy(RolloutStrategy):
logger.info("Dataset uploaded to hub")
log_say("Dataset uploaded to hub", play_sounds)
self._teardown_hardware(ctx.hardware)
self._teardown_hardware(
ctx.hardware,
return_to_initial_position=ctx.runtime.cfg.return_to_initial_position,
)
logger.info("Highlight strategy teardown complete")
def _setup_keyboard(self, shutdown_event: ThreadingEvent) -> None:
@@ -253,7 +271,7 @@ class HighlightStrategy(RolloutStrategy):
self._listener = keyboard.Listener(on_press=on_press)
self._listener.start()
logger.info("Keyboard listener started (save='%s', push='%s', ESC=stop)", save_key, push_key)
logger.debug("Keyboard listener started (save='%s', push='%s', ESC=stop)", save_key, push_key)
except ImportError:
logger.warning("pynput not available — keyboard listener disabled")
+12 -6
View File
@@ -32,6 +32,7 @@ from lerobot.utils.utils import log_say
from ..configs import SentryStrategyConfig
from ..context import RolloutContext
from .core import RolloutStrategy, estimate_max_episode_seconds, safe_push_to_hub, send_next_action
from .display import SentryDisplay
logger = logging.getLogger(__name__)
@@ -79,6 +80,11 @@ class SentryStrategy(RolloutStrategy):
self._episode_duration_s,
self.config.upload_every_n_episodes,
)
self._display = SentryDisplay(
episode_duration_s=self._episode_duration_s,
upload_every_n_episodes=self.config.upload_every_n_episodes,
)
self._display.show_banner()
def run(self, ctx: RolloutContext) -> None:
"""Run the continuous recording loop with automatic episode rotation."""
@@ -111,8 +117,7 @@ class SentryStrategy(RolloutStrategy):
break
obs = robot.get_observation()
obs_processed = ctx.processors.robot_observation_processor(obs)
engine.notify_observation(obs_processed)
obs_processed = self._process_observation_and_notify(ctx.processors, obs)
if self._handle_warmup(cfg.use_torch_compile, loop_start, control_interval):
continue
@@ -161,9 +166,7 @@ class SentryStrategy(RolloutStrategy):
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
else:
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({cfg.fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
self._warn_slow_loop(dt, control_interval, cfg.fps)
finally:
logger.info("Sentry control loop ended — saving final episode")
@@ -197,7 +200,10 @@ class SentryStrategy(RolloutStrategy):
logger.info("Dataset uploaded to hub")
log_say("Dataset uploaded to hub", play_sounds)
self._teardown_hardware(ctx.hardware)
self._teardown_hardware(
ctx.hardware,
return_to_initial_position=ctx.runtime.cfg.return_to_initial_position,
)
logger.info("Sentry strategy teardown complete")
def _background_push(self, dataset, cfg) -> None:
@@ -70,6 +70,7 @@ from lerobot.datasets.io_utils import (
get_parquet_file_size_in_mb,
get_parquet_num_frames,
load_info,
load_json,
write_episodes,
write_info,
write_stats,
@@ -81,9 +82,11 @@ from lerobot.datasets.utils import (
DEFAULT_DATA_PATH,
DEFAULT_VIDEO_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_PATH,
INFO_PATH,
LEGACY_EPISODES_PATH,
LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH,
DatasetInfo,
update_chunk_file_indices,
)
from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s
@@ -165,7 +168,7 @@ def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]:
def validate_local_dataset_version(local_path: Path) -> None:
"""Validate that the local dataset has the expected v2.1 version."""
info = load_info(local_path)
dataset_version = info.get("codebase_version", "unknown")
dataset_version = info.codebase_version or "unknown"
if dataset_version != V21:
raise ValueError(
f"Local dataset has codebase version '{dataset_version}', expected '{V21}'. "
@@ -256,14 +259,14 @@ def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):
def get_video_keys(root):
info = load_info(root)
features = info["features"]
features = info.features
video_keys = [key for key, ft in features.items() if ft["dtype"] == "video"]
return video_keys
def get_image_keys(root):
info = load_info(root)
features = info["features"]
features = info.features
image_keys = [key for key, ft in features.items() if ft["dtype"] == "image"]
return image_keys
@@ -434,7 +437,8 @@ def convert_episodes_metadata(root, new_root, episodes_metadata, episodes_video_
def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb):
info = load_info(root)
# Load as raw dict to remove legacy v2.1 fields before constructing DatasetInfo.
info = load_json(root / INFO_PATH)
info["codebase_version"] = V30
del info["total_chunks"]
del info["total_videos"]
@@ -449,7 +453,9 @@ def convert_info(root, new_root, data_file_size_in_mb, video_file_size_in_mb):
# already has fps in video_info
continue
info["features"][key]["fps"] = info["fps"]
write_info(info, new_root)
# Convert raw dict to typed DatasetInfo before writing
dataset_info = DatasetInfo.from_dict(info)
write_info(dataset_info, new_root)
def convert_dataset(
+67 -7
View File
@@ -150,11 +150,24 @@ Show dataset information without feature details:
--operation.type info \
--operation.show_features false
Recompute dataset statistics:
Recompute dataset statistics (saves to lerobot/pusht_recomputed_stats by default):
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type recompute_stats
Recompute stats and save to a specific new repo_id:
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--new_repo_id lerobot/pusht_new_stats \
--operation.type recompute_stats
Recompute stats in-place (overwrites original dataset stats):
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--new_repo_id lerobot/pusht \
--operation.type recompute_stats \
--operation.overwrite true
Recompute stats for relative actions and push to hub:
lerobot-edit-dataset \
--repo_id lerobot/pusht \
@@ -256,6 +269,7 @@ class RecomputeStatsConfig(OperationConfig):
relative_exclude_joints: list[str] | None = None
chunk_size: int = 50
num_workers: int = 0
overwrite: bool = False
@OperationConfig.register_subclass("info")
@@ -280,16 +294,30 @@ class EditDatasetConfig:
push_to_hub: bool = False
def _resolve_io_paths(
repo_id: str,
new_repo_id: str | None,
root: Path | str | None,
new_root: Path | str | None,
default_new_repo_id: str | None = None,
) -> tuple[str, Path, Path]:
"""Resolve input/output paths and repo_id for dataset operations.
Returns (output_repo_id, input_path, output_path) with resolved (symlink-safe) paths.
"""
input_path = (Path(root) if root else HF_LEROBOT_HOME / repo_id).resolve()
output_repo_id = new_repo_id or default_new_repo_id or repo_id
output_path = (Path(new_root) if new_root else HF_LEROBOT_HOME / output_repo_id).resolve()
return output_repo_id, input_path, output_path
def get_output_path(
repo_id: str,
new_repo_id: str | None,
root: Path | str | None,
new_root: Path | str | None,
) -> tuple[str, Path]:
input_path = Path(root) if root else HF_LEROBOT_HOME / repo_id
output_repo_id = new_repo_id if new_repo_id else repo_id
output_path = Path(new_root) if new_root else HF_LEROBOT_HOME / output_repo_id
output_repo_id, input_path, output_path = _resolve_io_paths(repo_id, new_repo_id, root, new_root)
# In case of in-place modification, create a backup of the original dataset (if it exists)
if output_path == input_path:
@@ -557,7 +585,39 @@ def handle_recompute_stats(cfg: EditDatasetConfig) -> None:
if not isinstance(cfg.operation, RecomputeStatsConfig):
raise ValueError("Operation config must be RecomputeStatsConfig")
dataset = LeRobotDataset(cfg.repo_id, root=cfg.root)
# Determine whether this is an in-place operation
output_repo_id, input_root, output_root = _resolve_io_paths(
cfg.repo_id,
cfg.new_repo_id,
cfg.root,
cfg.new_root,
default_new_repo_id=f"{cfg.repo_id}_recomputed_stats",
)
in_place = output_root == input_root
if in_place and not cfg.operation.overwrite:
raise ValueError(
f"recompute_stats would overwrite the dataset in-place at {input_root}. "
"Pass --operation.overwrite true to allow in-place modification, "
"or use --new_repo_id / --new_root to write to a different location. "
f"Default output repo_id when neither is set: '{cfg.repo_id}_recomputed_stats'."
)
if in_place:
logging.warning(
f"Overwriting dataset stats in-place at {input_root}. The original stats will be lost."
)
dataset = LeRobotDataset(cfg.repo_id, root=input_root)
else:
logging.info(f"Copying dataset from {input_root} to {output_root}")
if output_root.exists():
backup_path = output_root.with_name(output_root.name + "_old")
logging.warning(f"Output directory {output_root} already exists. Moving to {backup_path}")
if backup_path.exists():
shutil.rmtree(backup_path)
shutil.move(output_root, backup_path)
shutil.copytree(input_root, output_root)
dataset = LeRobotDataset(output_repo_id, root=output_root)
logging.info(f"Recomputing stats for {cfg.repo_id}")
if cfg.operation.relative_action:
@@ -578,7 +638,7 @@ def handle_recompute_stats(cfg: EditDatasetConfig) -> None:
logging.info(f"Stats written to {dataset.root}")
if cfg.push_to_hub:
logging.info(f"Pushing to hub as {dataset.meta.repo_id}...")
logging.info(f"Pushing to hub as {dataset.repo_id}...")
dataset.push_to_hub()
+2 -1
View File
@@ -389,7 +389,8 @@ def record(
sanity_check_dataset_robot_compatibility(dataset, robot, cfg.dataset.fps, dataset_features)
else:
# Reject eval_ prefix — for policy evaluation use lerobot-rollout
if cfg.dataset.repo_id.startswith("eval_"):
repo_name = cfg.dataset.repo_id.split("/", 1)[-1]
if repo_name.startswith("eval_"):
raise ValueError(
"Dataset names starting with 'eval_' are reserved for policy evaluation. "
"lerobot-record is for data collection only. Use lerobot-rollout for policy deployment."
+103 -85
View File
@@ -47,6 +47,7 @@ from lerobot.datasets import EpisodeAwareSampler, make_dataset
from lerobot.envs import close_envs, make_env, make_env_pre_post_processors
from lerobot.optim.factory import make_optimizer_and_scheduler
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
from lerobot.rewards import make_reward_pre_post_processors
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.logging_utils import AverageMeter, MetricsTracker
from lerobot.utils.random_utils import set_seed
@@ -70,8 +71,8 @@ def update_policy(
accelerator: "Accelerator",
lr_scheduler=None,
lock=None,
rabc_weights_provider=None,
) -> tuple[MetricsTracker, dict]:
sample_weighter=None,
) -> tuple[MetricsTracker, dict | None]:
"""
Performs a single training step to update the policy's weights.
@@ -87,7 +88,7 @@ def update_policy(
accelerator: The Accelerator instance for distributed training and mixed precision.
lr_scheduler: An optional learning rate scheduler.
lock: An optional lock for thread-safe optimizer updates.
rabc_weights_provider: Optional RABCWeights instance for sample weighting.
sample_weighter: Optional SampleWeighter instance for per-sample loss weighting.
Returns:
A tuple containing:
@@ -97,27 +98,31 @@ def update_policy(
start_time = time.perf_counter()
policy.train()
# Get RA-BC weights if enabled
rabc_batch_weights = None
rabc_batch_stats = None
if rabc_weights_provider is not None:
rabc_batch_weights, rabc_batch_stats = rabc_weights_provider.compute_batch_weights(batch)
# Compute sample weights if a weighter is provided
sample_weights = None
weight_stats = None
if sample_weighter is not None:
sample_weights, weight_stats = sample_weighter.compute_batch_weights(batch)
# Let accelerator handle mixed precision
with accelerator.autocast():
# Use per-sample loss when RA-BC is enabled for proper weighting
if rabc_batch_weights is not None:
# Get per-sample losses
if sample_weights is not None:
# Use per-sample loss for weighted training
# Note: Policies supporting sample weighting must implement forward(batch, reduction="none")
per_sample_loss, output_dict = policy.forward(batch, reduction="none")
# Apply RA-BC weights: L_RA-BC = Σ(w_i * l_i) / (Σw_i + ε)
# rabc_batch_weights is already normalized to sum to batch_size
# Weighted loss: each sample's contribution is scaled by its weight.
# We divide by weight sum (not batch size) so that if some weights are zero,
# the remaining samples contribute proportionally more, preserving gradient scale.
# Weights are pre-normalized to sum to batch_size for stable training dynamics.
epsilon = 1e-6
loss = (per_sample_loss * rabc_batch_weights).sum() / (rabc_batch_weights.sum() + epsilon)
# Log raw mean weight (before normalization) - this is the meaningful metric
output_dict["rabc_mean_weight"] = rabc_batch_stats["raw_mean_weight"]
output_dict["rabc_num_zero_weight"] = rabc_batch_stats["num_zero_weight"]
output_dict["rabc_num_full_weight"] = rabc_batch_stats["num_full_weight"]
loss = (per_sample_loss * sample_weights).sum() / (sample_weights.sum() + epsilon)
# Log weighting statistics
if output_dict is None:
output_dict = {}
for key, value in weight_stats.items():
output_dict[f"sample_weight_{key}"] = value
else:
loss, output_dict = policy.forward(batch)
@@ -188,8 +193,8 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=True)
# Accelerate auto-detects the device based on the available hardware and ignores the policy.device setting.
# Force the device to be CPU when policy.device is set to CPU.
force_cpu = cfg.policy.device == "cpu"
# Force the device to be CPU when the active config's device is set to CPU (works for both policy and reward model training).
force_cpu = cfg.trainable_config.device == "cpu"
accelerator = Accelerator(
step_scheduler_with_optimizer=False,
kwargs_handlers=[ddp_kwargs],
@@ -245,26 +250,49 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
logging.info("Creating env")
eval_env = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs)
if is_main_process:
logging.info("Creating policy")
policy = make_policy(
cfg=cfg.policy,
ds_meta=dataset.meta,
rename_map=cfg.rename_map,
)
if cfg.is_reward_model_training:
if is_main_process:
logging.info("Creating reward model")
from lerobot.rewards import make_reward_model
policy = make_reward_model(
cfg=cfg.reward_model,
dataset_stats=dataset.meta.stats,
dataset_meta=dataset.meta,
)
if not policy.is_trainable:
raise ValueError(
f"Reward model '{policy.name}' is zero-shot and cannot be trained via lerobot-train. "
"Use it directly for inference via compute_reward() (e.g. offline precompute)."
)
else:
if is_main_process:
logging.info("Creating policy")
policy = make_policy(
cfg=cfg.policy,
ds_meta=dataset.meta,
rename_map=cfg.rename_map,
)
if cfg.peft is not None:
logging.info("Using PEFT! Wrapping model.")
# Convert CLI peft config to dict for overrides
peft_cli_overrides = dataclasses.asdict(cfg.peft)
policy = policy.wrap_with_peft(peft_cli_overrides=peft_cli_overrides)
if cfg.is_reward_model_training:
raise ValueError("PEFT is only supported for policy training. ")
from peft import PeftModel
# Wait for all processes to finish policy creation before continuing
if isinstance(policy, PeftModel):
logging.info("PEFT adapter already loaded from checkpoint, skipping wrap_with_peft.")
else:
logging.info("Using PEFT! Wrapping model.")
peft_cli_overrides = dataclasses.asdict(cfg.peft)
policy = policy.wrap_with_peft(peft_cli_overrides=peft_cli_overrides)
# Wait for all processes to finish model creation before continuing
accelerator.wait_for_everyone()
processor_pretrained_path = cfg.policy.pretrained_path
active_cfg = cfg.trainable_config
processor_pretrained_path = active_cfg.pretrained_path
if (
getattr(cfg.policy, "use_relative_actions", False)
getattr(active_cfg, "use_relative_actions", False)
and processor_pretrained_path is not None
and not cfg.resume
):
@@ -274,18 +302,15 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
)
processor_pretrained_path = None
# Create processors - only provide dataset_stats if not resuming from saved processors
processor_kwargs = {}
postprocessor_kwargs = {}
if (processor_pretrained_path and not cfg.resume) or not processor_pretrained_path:
# Only provide dataset_stats when not resuming from saved processor state
processor_kwargs["dataset_stats"] = dataset.meta.stats
# For SARM, always provide dataset_meta for progress normalization
if cfg.policy.type == "sarm":
if cfg.is_reward_model_training:
processor_kwargs["dataset_meta"] = dataset.meta
if processor_pretrained_path is not None:
if not cfg.is_reward_model_training and processor_pretrained_path is not None:
processor_kwargs["preprocessor_overrides"] = {
"device_processor": {"device": device.type},
"normalizer_processor": {
@@ -305,38 +330,36 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
},
}
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
pretrained_path=processor_pretrained_path,
**processor_kwargs,
**postprocessor_kwargs,
)
if cfg.is_reward_model_training:
preprocessor, postprocessor = make_reward_pre_post_processors(
cfg.reward_model,
**processor_kwargs,
)
else:
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
pretrained_path=processor_pretrained_path,
**processor_kwargs,
**postprocessor_kwargs,
)
if is_main_process:
logging.info("Creating optimizer and scheduler")
optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy)
# Load precomputed SARM progress for RA-BC if enabled
# Generate progress using: src/lerobot/policies/sarm/compute_rabc_weights.py
rabc_weights = None
if cfg.use_rabc:
from lerobot.utils.rabc import RABCWeights
# Create sample weighter if configured (e.g., for RA-BC training)
sample_weighter = None
if cfg.sample_weighting is not None:
from lerobot.utils.sample_weighting import make_sample_weighter
# Get chunk_size from policy config
chunk_size = getattr(policy.config, "chunk_size", None)
if chunk_size is None:
raise ValueError("Chunk size is not found in policy config")
head_mode = getattr(cfg, "rabc_head_mode", "sparse")
logging.info(f"Loading SARM progress for RA-BC from {cfg.rabc_progress_path}")
logging.info(f"Using chunk_size={chunk_size} from policy config, head_mode={head_mode}")
rabc_weights = RABCWeights(
progress_path=cfg.rabc_progress_path,
chunk_size=chunk_size,
head_mode=head_mode,
kappa=getattr(cfg, "rabc_kappa", 0.01),
epsilon=getattr(cfg, "rabc_epsilon", 1e-6),
device=device,
if is_main_process:
logging.info(f"Creating sample weighter: {cfg.sample_weighting.type}")
sample_weighter = make_sample_weighter(
cfg.sample_weighting,
policy,
device,
dataset_root=cfg.dataset.root,
dataset_repo_id=cfg.dataset.repo_id,
)
step = 0 # number of policy updates (forward + backward + optim)
@@ -365,13 +388,13 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
logging.info(f"{num_total_params=} ({format_big_number(num_total_params)})")
# create dataloader for offline training
if hasattr(cfg.policy, "drop_n_last_frames"):
if hasattr(active_cfg, "drop_n_last_frames"):
shuffle = False
sampler = EpisodeAwareSampler(
dataset.meta.episodes["dataset_from_index"],
dataset.meta.episodes["dataset_to_index"],
episode_indices_to_use=dataset.episodes,
drop_n_last_frames=cfg.policy.drop_n_last_frames,
drop_n_last_frames=active_cfg.drop_n_last_frames,
shuffle=True,
)
else:
@@ -448,7 +471,7 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
cfg.optimizer.grad_clip_norm,
accelerator=accelerator,
lr_scheduler=lr_scheduler,
rabc_weights_provider=rabc_weights,
sample_weighter=sample_weighter,
)
# Note: eval and checkpoint happens *after* the `step`th training update has completed, so we
@@ -467,16 +490,10 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
wandb_log_dict = train_tracker.to_dict()
if output_dict:
wandb_log_dict.update(output_dict)
# Log RA-BC statistics if enabled
if rabc_weights is not None:
rabc_stats = rabc_weights.get_stats()
wandb_log_dict.update(
{
"rabc_delta_mean": rabc_stats["delta_mean"],
"rabc_delta_std": rabc_stats["delta_std"],
"rabc_num_frames": rabc_stats["num_frames"],
}
)
# Log sample weighting statistics if enabled
if sample_weighter is not None:
weighter_stats = sample_weighter.get_stats()
wandb_log_dict.update({f"sample_weighting/{k}": v for k, v in weighter_stats.items()})
wandb_logger.log_dict(wandb_log_dict, step)
train_tracker.reset_averages()
@@ -558,14 +575,15 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
if is_main_process:
logging.info("End of training")
if cfg.policy.push_to_hub:
unwrapped_policy = accelerator.unwrap_model(policy)
if cfg.policy.use_peft:
unwrapped_policy.push_model_to_hub(cfg, peft_model=unwrapped_policy)
if getattr(active_cfg, "push_to_hub", False):
unwrapped_model = accelerator.unwrap_model(policy)
# PEFT only applies when training a policy — reward models use the plain path.
if not cfg.is_reward_model_training and cfg.policy.use_peft:
unwrapped_model.push_model_to_hub(cfg, peft_model=unwrapped_model)
else:
unwrapped_policy.push_model_to_hub(cfg)
preprocessor.push_to_hub(cfg.policy.repo_id)
postprocessor.push_to_hub(cfg.policy.repo_id)
unwrapped_model.push_model_to_hub(cfg)
preprocessor.push_to_hub(active_cfg.repo_id)
postprocessor.push_to_hub(active_cfg.repo_id)
# Properly clean up the distributed process group
accelerator.wait_for_everyone()
@@ -49,6 +49,7 @@ class BiOpenArmLeader(Teleoperator):
can_data_bitrate=config.left_arm_config.can_data_bitrate,
motor_config=config.left_arm_config.motor_config,
manual_control=config.left_arm_config.manual_control,
use_velocity_and_torque=config.left_arm_config.use_velocity_and_torque,
position_kd=config.left_arm_config.position_kd,
position_kp=config.left_arm_config.position_kp,
)
@@ -63,6 +64,7 @@ class BiOpenArmLeader(Teleoperator):
can_data_bitrate=config.right_arm_config.can_data_bitrate,
motor_config=config.right_arm_config.motor_config,
manual_control=config.right_arm_config.manual_control,
use_velocity_and_torque=config.right_arm_config.use_velocity_and_torque,
position_kd=config.right_arm_config.position_kd,
position_kp=config.right_arm_config.position_kp,
)
@@ -60,6 +60,10 @@ class OpenArmLeaderConfigBase:
# When enabled, motors have torque disabled for manual movement
manual_control: bool = True
# When True, expose `.vel` and `.torque` per motor in action features.
# Default False for compatibility with the position-only openarm_mini teleoperator.
use_velocity_and_torque: bool = False
# TODO(Steven, Pepijn): Not used ... ?
# MIT control parameters (used when manual_control=False for torque control)
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
@@ -70,8 +70,9 @@ class OpenArmLeader(Teleoperator):
features: dict[str, type] = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float
features[f"{motor}.torque"] = float
if self.config.use_velocity_and_torque:
features[f"{motor}.vel"] = float
features[f"{motor}.torque"] = float
return features
@property
@@ -201,8 +202,9 @@ class OpenArmLeader(Teleoperator):
for motor in self.bus.motors:
state = states.get(motor, {})
action_dict[f"{motor}.pos"] = state.get("position")
action_dict[f"{motor}.vel"] = state.get("velocity")
action_dict[f"{motor}.torque"] = state.get("torque")
if self.config.use_velocity_and_torque:
action_dict[f"{motor}.vel"] = state.get("velocity")
action_dict[f"{motor}.torque"] = state.get("torque")
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
@@ -112,7 +112,7 @@ class OpenArmMini(Teleoperator):
@property
def feedback_features(self) -> dict[str, type]:
return {}
return self.action_features
@property
def is_connected(self) -> bool:
@@ -348,8 +348,9 @@ class OpenArmMini(Teleoperator):
if left_goals:
self.bus_left.sync_write("Goal_Position", left_goals)
@check_if_not_connected
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm Mini.")
self.write_goal_positions(feedback)
@check_if_not_connected
def disconnect(self) -> None:
@@ -59,7 +59,7 @@ class SOLeader(Teleoperator):
@property
def feedback_features(self) -> dict[str, type]:
return {}
return self.action_features
@property
def is_connected(self) -> bool:
@@ -130,6 +130,12 @@ class SOLeader(Teleoperator):
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
def enable_torque(self) -> None:
self.bus.enable_torque()
def disable_torque(self) -> None:
self.bus.disable_torque()
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
@@ -145,9 +151,11 @@ class SOLeader(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
@check_if_not_connected
def send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError
goals = {k.removesuffix(".pos"): v for k, v in feedback.items() if k.endswith(".pos")}
if goals:
self.bus.sync_write("Goal_Position", goals)
@check_if_not_connected
def disconnect(self) -> None:
@@ -0,0 +1,55 @@
---
# For reference on model card metadata, see the spec: https://github.com/huggingface/hub-docs/blob/main/modelcard.md?plain=1
# Doc / guide: https://huggingface.co/docs/hub/model-cards
# prettier-ignore
{{card_data}}
---
# Reward Model Card for {{ model_name | default("Reward Model ID", true) }}
<!-- Provide a quick summary of what the reward model is/does. -->
{% if model_name == "reward_classifier" %}
A reward classifier is a lightweight neural network that scores observations or trajectories for task success, providing a learned reward signal or offline evaluation when explicit rewards are unavailable.
{% elif model_name == "sarm" %}
A Success-Aware Reward Model (SARM) predicts a dense reward signal from observations, typically used downstream for reinforcement learning or human-in-the-loop fine-tuning when task success is not directly observable.
{% else %}
_Reward model type not recognized — please update this template._
{% endif %}
This reward model has been trained and pushed to the Hub using [LeRobot](https://github.com/huggingface/lerobot).
See the full documentation at [LeRobot Docs](https://huggingface.co/docs/lerobot/index).
---
## How to Get Started with the Reward Model
### Train from scratch
```bash
lerobot-train \
--dataset.repo_id=${HF_USER}/<dataset> \
--reward_model.type={{ model_name | default("reward_classifier", true) }} \
--output_dir=outputs/train/<desired_reward_model_repo_id> \
--job_name=lerobot_reward_training \
--reward_model.device=cuda \
--reward_model.repo_id=${HF_USER}/<desired_reward_model_repo_id> \
--wandb.enable=true
```
_Writes checkpoints to `outputs/train/<desired_reward_model_repo_id>/checkpoints/`._
### Load the reward model in Python
```python
from lerobot.rewards import make_reward_model
reward_model = make_reward_model(pretrained_path="<hf_user>/<reward_model_repo_id>")
reward = reward_model.compute_reward(batch)
```
---
## Model Details
- **License:** {{ license | default("\[More Information Needed]", true) }}
+3 -1
View File
@@ -115,7 +115,9 @@ _feetech_sdk_available = is_package_available("feetech-servo-sdk", import_name="
_reachy2_sdk_available = is_package_available("reachy2_sdk")
_can_available = is_package_available("python-can", "can")
_unitree_sdk_available = is_package_available("unitree-sdk2py", "unitree_sdk2py")
_pyrealsense2_available = is_package_available("pyrealsense2")
_pyrealsense2_available = is_package_available("pyrealsense2") or is_package_available(
"pyrealsense2-macosx", import_name="pyrealsense2"
)
_zmq_available = is_package_available("pyzmq", import_name="zmq")
_hebi_available = is_package_available("hebi-py", import_name="hebi")
_teleop_available = is_package_available("teleop")
+239
View File
@@ -0,0 +1,239 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Sample weighting abstraction for training.
This module provides an abstract base class for sample weighting strategies (e.g., RA-BC)
that can be used during training without polluting the training script with
policy-specific code.
Example usage:
# In training config
sample_weighting:
type: rabc
progress_path: hf://datasets/my-dataset/sarm_progress.parquet
head_mode: sparse
kappa: 0.01
# In training script
sample_weighter = make_sample_weighter(cfg.sample_weighting, policy, device, dataset_root=cfg.dataset.root, dataset_repo_id=cfg.dataset.repo_id)
...
weights, stats = sample_weighter.compute_batch_weights(batch)
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING
import torch
if TYPE_CHECKING:
from lerobot.policies.pretrained import PreTrainedPolicy
class SampleWeighter(ABC):
"""
Implementations compute per-sample weights that can be used to weight
the loss during training. This enables techniques like:
- RA-BC (Reward-Aligned Behavior Cloning)
- Importance sampling
- Curriculum learning
- Quality-based filtering
"""
@abstractmethod
def compute_batch_weights(self, batch: dict) -> tuple[torch.Tensor, dict]:
"""
Compute per-sample weights for a training batch.
Args:
batch: Training batch dictionary containing at minimum an "index" key
with global frame indices.
"""
@abstractmethod
def get_stats(self) -> dict:
"""
Get global statistics about the weighting strategy.
"""
@dataclass
class SampleWeightingConfig:
"""
Configuration for sample weighting during training.
This is a generic config that supports multiple weighting strategies.
The `type` field determines which implementation to use, and `extra_params`
contains additional type-specific parameters.
Attributes:
type: Weighting strategy type ("rabc", "uniform", etc.)
progress_path: Path to precomputed progress values (for RABC)
head_mode: Which model head to use for progress ("sparse" or "dense")
kappa: Hard threshold for high-quality samples (RABC-specific)
epsilon: Small constant for numerical stability
extra_params: Additional type-specific parameters passed to the weighter
"""
type: str = "rabc"
progress_path: str | None = None
head_mode: str = "sparse"
kappa: float = 0.01
epsilon: float = 1e-6
# Additional type-specific params can be added here or passed via extra_params
extra_params: dict = field(default_factory=dict)
def make_sample_weighter(
config: SampleWeightingConfig | None,
policy: PreTrainedPolicy,
device: torch.device,
dataset_root: str | None = None,
dataset_repo_id: str | None = None,
) -> SampleWeighter | None:
"""
Factory function to create a SampleWeighter from config.
This keeps policy-specific initialization logic out of the training script.
Args:
config: Sample weighting configuration, or None to disable weighting.
policy: The policy being trained (used to extract chunk_size, etc.)
device: Device to place weight tensors on.
dataset_root: Local path to dataset root (for auto-detecting progress_path).
dataset_repo_id: HuggingFace repo ID (for auto-detecting progress_path).
"""
if config is None:
return None
if config.type == "rabc":
return _make_rabc_weighter(config, policy, device, dataset_root, dataset_repo_id)
if config.type == "uniform":
# No-op weighter that returns uniform weights
return UniformWeighter(device=device)
raise ValueError(f"Unknown sample weighting type: '{config.type}'. Supported types: 'rabc', 'uniform'")
def _make_rabc_weighter(
config: SampleWeightingConfig,
policy: PreTrainedPolicy,
device: torch.device,
dataset_root: str | None = None,
dataset_repo_id: str | None = None,
) -> SampleWeighter:
"""Create RABC weighter with policy-specific initialization.
Args:
config: Sample weighting configuration.
policy: The policy being trained (used to extract chunk_size).
device: Device to place weight tensors on.
dataset_root: Local path to dataset root (for auto-detecting progress_path).
dataset_repo_id: HuggingFace repo ID (for auto-detecting progress_path).
"""
# Import here to avoid circular imports and keep RABC code in SARM module
from lerobot.rewards.sarm.rabc import RABCWeights
# Extract chunk_size from policy config
chunk_size = getattr(policy.config, "chunk_size", None)
if chunk_size is None:
raise ValueError(
"RABC sample weighting requires a policy with 'chunk_size' in its config. "
"This is typically set for action-chunking policies like ACT, Diffusion, PI0, etc."
)
# Determine progress_path: use explicit config or auto-detect from dataset
progress_path = config.progress_path
if progress_path is None:
if dataset_root:
progress_path = str(Path(dataset_root) / "sarm_progress.parquet")
elif dataset_repo_id:
progress_path = f"hf://datasets/{dataset_repo_id}/sarm_progress.parquet"
else:
raise ValueError(
"RABC sample weighting requires 'progress_path' to be set, "
"or dataset_root/dataset_repo_id for auto-detection. "
"Generate progress values using: "
"python -m lerobot.rewards.sarm.compute_rabc_weights --help"
)
return RABCWeights(
progress_path=progress_path,
chunk_size=chunk_size,
head_mode=config.head_mode,
kappa=config.kappa,
epsilon=config.epsilon,
device=device,
**config.extra_params,
)
class UniformWeighter(SampleWeighter):
"""
No-op sample weighter that returns uniform weights.
Useful as a baseline or when you want to disable weighting without
changing the training code structure.
Note:
Batch size is determined by looking for tensor values in the batch
dictionary. The method checks common keys like "action", "index",
and "observation.state" first, then falls back to scanning all values.
"""
def __init__(self, device: torch.device):
self.device = device
def compute_batch_weights(self, batch: dict) -> tuple[torch.Tensor, dict]:
"""Return uniform weights (all ones)."""
batch_size = self._determine_batch_size(batch)
weights = torch.ones(batch_size, device=self.device)
stats = {"mean_weight": 1.0, "type": "uniform"}
return weights, stats
def _determine_batch_size(self, batch: dict) -> int:
"""
Determine batch size from the batch dictionary.
Checks common keys first, then scans all values for tensors.
Args:
batch: Training batch dictionary.
"""
if not batch:
raise ValueError("Cannot determine batch size from empty batch")
# Check common keys first
for key in ["action", "index", "observation.state"]:
if key in batch and isinstance(batch[key], torch.Tensor):
return batch[key].shape[0]
# Scan all values for any tensor
for value in batch.values():
if isinstance(value, torch.Tensor) and value.ndim >= 1:
return value.shape[0]
# Last resort: return 1 (this handles non-tensor batches)
return 1
def get_stats(self) -> dict:
"""Return empty stats for uniform weighting."""
return {"type": "uniform"}
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:c2b8f8532c7a0b776de5e536b8b54e30b1a0c2e3d5cc25a2d86fe43e40ae5e8c
oid sha256:8a31653c11eccdd4d80fd3f6a351cd54c49b8a48db1f7e9faf38fddd7900a09f
size 515400
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:224b5fa4828aa88171b68c036e8919c1eae563e2113f03b6461eadf5bf8525a6
oid sha256:75bf051698b37dcd7517ec8025a896ab5a0551a6dde5f89d0a3d5d50966e83e6
size 31672
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:016d2fa8fe5f58017dfd46f4632fdc19dfd751e32a2c7cde2077c6f95546d6bd
oid sha256:88e10930a10041d50f2cf369e6813ac14618d13dad1c21bdde1ac7798611c6ba
size 68
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:eca0d87a699620e4fec7e68539b0be91e4cc933f6bf12032da52c182ab6f38cf
oid sha256:89833a5ccdb7d85c83f717ff8ec68b8e822005cb8803899acaae88c578e2e3ae
size 31672
+3 -3
View File
@@ -113,7 +113,7 @@ def assert_metadata_consistency(aggr_ds, ds_0, ds_1):
"""Test that metadata is correctly aggregated."""
# Test basic info
assert aggr_ds.fps == ds_0.fps == ds_1.fps, "FPS should be the same across all datasets"
assert aggr_ds.meta.info["robot_type"] == ds_0.meta.info["robot_type"] == ds_1.meta.info["robot_type"], (
assert aggr_ds.meta.info.robot_type == ds_0.meta.info.robot_type == ds_1.meta.info.robot_type, (
"Robot type should be the same"
)
@@ -153,8 +153,8 @@ def assert_video_frames_integrity(aggr_ds, ds_0, ds_1):
video_keys = list(
filter(
lambda key: aggr_ds.meta.info["features"][key]["dtype"] == "video",
aggr_ds.meta.info["features"].keys(),
lambda key: aggr_ds.meta.info.features[key]["dtype"] == "video",
aggr_ds.meta.info.features.keys(),
)
)

Some files were not shown because too many files have changed in this diff Show More